Commit 32bfb5c0 authored by Trond Myklebust's avatar Trond Myklebust

SUNRPC: Allow the rpc_release() callback to be run on another workqueue

A lot of the work done by the rpc_release() callback is inappropriate for
rpciod as it will often involve things like starting a new rpc call in
order to clean up state after an interrupted NFSv4 open() call, or
calls to mntput(), etc.

This patch allows the caller of rpc_run_task() to specify that the
rpc_release callback should run on a different workqueue than the default
rpciod_workqueue.
Signed-off-by: default avatarTrond Myklebust <Trond.Myklebust@netapp.com>
parent 383ba719
...@@ -123,6 +123,7 @@ struct rpc_task_setup { ...@@ -123,6 +123,7 @@ struct rpc_task_setup {
const struct rpc_message *rpc_message; const struct rpc_message *rpc_message;
const struct rpc_call_ops *callback_ops; const struct rpc_call_ops *callback_ops;
void *callback_data; void *callback_data;
struct workqueue_struct *workqueue;
unsigned short flags; unsigned short flags;
signed char priority; signed char priority;
}; };
......
...@@ -326,7 +326,7 @@ static void rpc_make_runnable(struct rpc_task *task) ...@@ -326,7 +326,7 @@ static void rpc_make_runnable(struct rpc_task *task)
int status; int status;
INIT_WORK(&task->u.tk_work, rpc_async_schedule); INIT_WORK(&task->u.tk_work, rpc_async_schedule);
status = queue_work(task->tk_workqueue, &task->u.tk_work); status = queue_work(rpciod_workqueue, &task->u.tk_work);
if (status < 0) { if (status < 0) {
printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status); printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
task->tk_status = status; task->tk_status = status;
...@@ -832,7 +832,7 @@ static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *ta ...@@ -832,7 +832,7 @@ static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *ta
task->tk_owner = current->tgid; task->tk_owner = current->tgid;
/* Initialize workqueue for async tasks */ /* Initialize workqueue for async tasks */
task->tk_workqueue = rpciod_workqueue; task->tk_workqueue = task_setup_data->workqueue;
task->tk_client = task_setup_data->rpc_client; task->tk_client = task_setup_data->rpc_client;
if (task->tk_client != NULL) { if (task->tk_client != NULL) {
...@@ -868,7 +868,7 @@ rpc_alloc_task(void) ...@@ -868,7 +868,7 @@ rpc_alloc_task(void)
return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS); return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS);
} }
static void rpc_free_task(struct rcu_head *rcu) static void rpc_free_task_rcu(struct rcu_head *rcu)
{ {
struct rpc_task *task = container_of(rcu, struct rpc_task, u.tk_rcu); struct rpc_task *task = container_of(rcu, struct rpc_task, u.tk_rcu);
dprintk("RPC: %5u freeing task\n", task->tk_pid); dprintk("RPC: %5u freeing task\n", task->tk_pid);
...@@ -898,12 +898,23 @@ out: ...@@ -898,12 +898,23 @@ out:
return task; return task;
} }
static void rpc_free_task(struct rpc_task *task)
void rpc_put_task(struct rpc_task *task)
{ {
const struct rpc_call_ops *tk_ops = task->tk_ops; const struct rpc_call_ops *tk_ops = task->tk_ops;
void *calldata = task->tk_calldata; void *calldata = task->tk_calldata;
if (task->tk_flags & RPC_TASK_DYNAMIC)
call_rcu_bh(&task->u.tk_rcu, rpc_free_task_rcu);
rpc_release_calldata(tk_ops, calldata);
}
static void rpc_async_release(struct work_struct *work)
{
rpc_free_task(container_of(work, struct rpc_task, u.tk_work));
}
void rpc_put_task(struct rpc_task *task)
{
if (!atomic_dec_and_test(&task->tk_count)) if (!atomic_dec_and_test(&task->tk_count))
return; return;
/* Release resources */ /* Release resources */
...@@ -915,9 +926,11 @@ void rpc_put_task(struct rpc_task *task) ...@@ -915,9 +926,11 @@ void rpc_put_task(struct rpc_task *task)
rpc_release_client(task->tk_client); rpc_release_client(task->tk_client);
task->tk_client = NULL; task->tk_client = NULL;
} }
if (task->tk_flags & RPC_TASK_DYNAMIC) if (task->tk_workqueue != NULL) {
call_rcu_bh(&task->u.tk_rcu, rpc_free_task); INIT_WORK(&task->u.tk_work, rpc_async_release);
rpc_release_calldata(tk_ops, calldata); queue_work(task->tk_workqueue, &task->u.tk_work);
} else
rpc_free_task(task);
} }
EXPORT_SYMBOL_GPL(rpc_put_task); EXPORT_SYMBOL_GPL(rpc_put_task);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment