Commit b247bbf1 authored by Trond Myklebust's avatar Trond Myklebust

SUNRPC: Fix a race in rpciod_down()

The commit 4ada539e lead to the unpleasant
possibility of an asynchronous rpc_task being required to call
rpciod_down() when it is complete. This again means that the rpciod
workqueue may get to call destroy_workqueue on itself -> hang...

Change rpciod_up/rpciod_down to just get/put the module, and then
create/destroy the workqueues on module load/unload.
Signed-off-by: default avatarTrond Myklebust <Trond.Myklebust@netapp.com>
parent 4a2a4df7
...@@ -50,8 +50,6 @@ static RPC_WAITQ(delay_queue, "delayq"); ...@@ -50,8 +50,6 @@ static RPC_WAITQ(delay_queue, "delayq");
/* /*
* rpciod-related stuff * rpciod-related stuff
*/ */
static DEFINE_MUTEX(rpciod_mutex);
static atomic_t rpciod_users = ATOMIC_INIT(0);
struct workqueue_struct *rpciod_workqueue; struct workqueue_struct *rpciod_workqueue;
/* /*
...@@ -961,60 +959,49 @@ void rpc_killall_tasks(struct rpc_clnt *clnt) ...@@ -961,60 +959,49 @@ void rpc_killall_tasks(struct rpc_clnt *clnt)
spin_unlock(&clnt->cl_lock); spin_unlock(&clnt->cl_lock);
} }
int rpciod_up(void)
{
return try_module_get(THIS_MODULE) ? 0 : -EINVAL;
}
void rpciod_down(void)
{
module_put(THIS_MODULE);
}
/* /*
* Start up the rpciod process if it's not already running. * Start up the rpciod workqueue.
*/ */
int static int rpciod_start(void)
rpciod_up(void)
{ {
struct workqueue_struct *wq; struct workqueue_struct *wq;
int error = 0;
if (atomic_inc_not_zero(&rpciod_users))
return 0;
mutex_lock(&rpciod_mutex);
/* Guard against races with rpciod_down() */
if (rpciod_workqueue != NULL)
goto out_ok;
/* /*
* Create the rpciod thread and wait for it to start. * Create the rpciod thread and wait for it to start.
*/ */
dprintk("RPC: creating workqueue rpciod\n"); dprintk("RPC: creating workqueue rpciod\n");
error = -ENOMEM;
wq = create_workqueue("rpciod"); wq = create_workqueue("rpciod");
if (wq == NULL)
goto out;
rpciod_workqueue = wq; rpciod_workqueue = wq;
error = 0; return rpciod_workqueue != NULL;
out_ok:
atomic_inc(&rpciod_users);
out:
mutex_unlock(&rpciod_mutex);
return error;
} }
void static void rpciod_stop(void)
rpciod_down(void)
{ {
if (!atomic_dec_and_test(&rpciod_users)) struct workqueue_struct *wq = NULL;
return;
mutex_lock(&rpciod_mutex); if (rpciod_workqueue == NULL)
return;
dprintk("RPC: destroying workqueue rpciod\n"); dprintk("RPC: destroying workqueue rpciod\n");
if (atomic_read(&rpciod_users) == 0 && rpciod_workqueue != NULL) { wq = rpciod_workqueue;
destroy_workqueue(rpciod_workqueue);
rpciod_workqueue = NULL; rpciod_workqueue = NULL;
} destroy_workqueue(wq);
mutex_unlock(&rpciod_mutex);
} }
void void
rpc_destroy_mempool(void) rpc_destroy_mempool(void)
{ {
rpciod_stop();
if (rpc_buffer_mempool) if (rpc_buffer_mempool)
mempool_destroy(rpc_buffer_mempool); mempool_destroy(rpc_buffer_mempool);
if (rpc_task_mempool) if (rpc_task_mempool)
...@@ -1048,6 +1035,8 @@ rpc_init_mempool(void) ...@@ -1048,6 +1035,8 @@ rpc_init_mempool(void)
rpc_buffer_slabp); rpc_buffer_slabp);
if (!rpc_buffer_mempool) if (!rpc_buffer_mempool)
goto err_nomem; goto err_nomem;
if (!rpciod_start())
goto err_nomem;
return 0; return 0;
err_nomem: err_nomem:
rpc_destroy_mempool(); rpc_destroy_mempool();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment