Commit c5a4dd8b authored by Chuck Lever's avatar Chuck Lever Committed by Trond Myklebust

SUNRPC: Eliminate side effects from rpc_malloc

Currently rpc_malloc sets req->rq_buffer internally.  Make this a more
generic interface:  return a pointer to the new buffer (or NULL) and
make the caller set req->rq_buffer and req->rq_bufsize.  This looks much
more like kmalloc and eliminates the side effects.

To fix a potential deadlock, this patch also replaces GFP_NOFS with
GFP_NOWAIT in rpc_malloc.  This prevents async RPCs from sleeping outside
the RPC's task scheduler while allocating their buffer.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarTrond Myklebust <Trond.Myklebust@netapp.com>
parent 2bea90d4
...@@ -264,7 +264,7 @@ struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *); ...@@ -264,7 +264,7 @@ struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *);
void rpc_wake_up_status(struct rpc_wait_queue *, int); void rpc_wake_up_status(struct rpc_wait_queue *, int);
void rpc_delay(struct rpc_task *, unsigned long); void rpc_delay(struct rpc_task *, unsigned long);
void * rpc_malloc(struct rpc_task *, size_t); void * rpc_malloc(struct rpc_task *, size_t);
void rpc_free(struct rpc_task *); void rpc_free(void *);
int rpciod_up(void); int rpciod_up(void);
void rpciod_down(void); void rpciod_down(void);
int __rpc_wait_for_completion_task(struct rpc_task *task, int (*)(void *)); int __rpc_wait_for_completion_task(struct rpc_task *task, int (*)(void *));
......
...@@ -114,7 +114,7 @@ struct rpc_xprt_ops { ...@@ -114,7 +114,7 @@ struct rpc_xprt_ops {
void (*set_port)(struct rpc_xprt *xprt, unsigned short port); void (*set_port)(struct rpc_xprt *xprt, unsigned short port);
void (*connect)(struct rpc_task *task); void (*connect)(struct rpc_task *task);
void * (*buf_alloc)(struct rpc_task *task, size_t size); void * (*buf_alloc)(struct rpc_task *task, size_t size);
void (*buf_free)(struct rpc_task *task); void (*buf_free)(void *buffer);
int (*send_request)(struct rpc_task *task); int (*send_request)(struct rpc_task *task);
void (*set_retrans_timeout)(struct rpc_task *task); void (*set_retrans_timeout)(struct rpc_task *task);
void (*timer)(struct rpc_task *task); void (*timer)(struct rpc_task *task);
......
...@@ -774,7 +774,8 @@ call_allocate(struct rpc_task *task) ...@@ -774,7 +774,8 @@ call_allocate(struct rpc_task *task)
req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen; req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
req->rq_rcvsize <<= 2; req->rq_rcvsize <<= 2;
xprt->ops->buf_alloc(task, req->rq_callsize + req->rq_rcvsize); req->rq_buffer = xprt->ops->buf_alloc(task,
req->rq_callsize + req->rq_rcvsize);
if (req->rq_buffer != NULL) if (req->rq_buffer != NULL)
return; return;
......
...@@ -741,50 +741,53 @@ static void rpc_async_schedule(struct work_struct *work) ...@@ -741,50 +741,53 @@ static void rpc_async_schedule(struct work_struct *work)
* @task: RPC task that will use this buffer * @task: RPC task that will use this buffer
* @size: requested byte size * @size: requested byte size
* *
* We try to ensure that some NFS reads and writes can always proceed * To prevent rpciod from hanging, this allocator never sleeps,
* by using a mempool when allocating 'small' buffers. * returning NULL if the request cannot be serviced immediately.
* The caller can arrange to sleep in a way that is safe for rpciod.
*
* Most requests are 'small' (under 2KiB) and can be serviced from a
* mempool, ensuring that NFS reads and writes can always proceed,
* and that there is good locality of reference for these buffers.
*
* In order to avoid memory starvation triggering more writebacks of * In order to avoid memory starvation triggering more writebacks of
* NFS requests, we use GFP_NOFS rather than GFP_KERNEL. * NFS requests, we avoid using GFP_KERNEL.
*/ */
void * rpc_malloc(struct rpc_task *task, size_t size) void *rpc_malloc(struct rpc_task *task, size_t size)
{ {
struct rpc_rqst *req = task->tk_rqstp; size_t *buf;
gfp_t gfp; gfp_t gfp = RPC_IS_SWAPPER(task) ? GFP_ATOMIC : GFP_NOWAIT;
if (task->tk_flags & RPC_TASK_SWAPPER) size += sizeof(size_t);
gfp = GFP_ATOMIC; if (size <= RPC_BUFFER_MAXSIZE)
buf = mempool_alloc(rpc_buffer_mempool, gfp);
else else
gfp = GFP_NOFS; buf = kmalloc(size, gfp);
*buf = size;
if (size > RPC_BUFFER_MAXSIZE) { dprintk("RPC: %5u allocated buffer of size %u at %p\n",
req->rq_buffer = kmalloc(size, gfp); task->tk_pid, size, buf);
if (req->rq_buffer) return (void *) ++buf;
req->rq_bufsize = size;
} else {
req->rq_buffer = mempool_alloc(rpc_buffer_mempool, gfp);
if (req->rq_buffer)
req->rq_bufsize = RPC_BUFFER_MAXSIZE;
}
return req->rq_buffer;
} }
/** /**
* rpc_free - free buffer allocated via rpc_malloc * rpc_free - free buffer allocated via rpc_malloc
* @task: RPC task with a buffer to be freed * @buffer: buffer to free
* *
*/ */
void rpc_free(struct rpc_task *task) void rpc_free(void *buffer)
{ {
struct rpc_rqst *req = task->tk_rqstp; size_t size, *buf = (size_t *) buffer;
if (req->rq_buffer) { if (!buffer)
if (req->rq_bufsize == RPC_BUFFER_MAXSIZE) return;
mempool_free(req->rq_buffer, rpc_buffer_mempool); size = *buf;
else buf--;
kfree(req->rq_buffer);
req->rq_buffer = NULL; dprintk("RPC: freeing buffer of size %u at %p\n",
req->rq_bufsize = 0; size, buf);
} if (size <= RPC_BUFFER_MAXSIZE)
mempool_free(buf, rpc_buffer_mempool);
else
kfree(buf);
} }
/* /*
......
...@@ -854,7 +854,7 @@ void xprt_release(struct rpc_task *task) ...@@ -854,7 +854,7 @@ void xprt_release(struct rpc_task *task)
mod_timer(&xprt->timer, mod_timer(&xprt->timer,
xprt->last_used + xprt->idle_timeout); xprt->last_used + xprt->idle_timeout);
spin_unlock_bh(&xprt->transport_lock); spin_unlock_bh(&xprt->transport_lock);
xprt->ops->buf_free(task); xprt->ops->buf_free(req->rq_buffer);
task->tk_rqstp = NULL; task->tk_rqstp = NULL;
if (req->rq_release_snd_buf) if (req->rq_release_snd_buf)
req->rq_release_snd_buf(req); req->rq_release_snd_buf(req);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment