Commit 381ba74a authored by Trond Myklebust's avatar Trond Myklebust

SUNRPC: Ensure our task is notified when an rpcbind call is done

If another task is busy in rpcb_getport_async number, it is more efficient
to have it wake us up when it has finished instead of arbitrarily sleeping
for 5 seconds.

Also ensure that rpcb_wake_rpcbind_waiters() is called regardless of
whether or not rpcb_getport_done() gets called.
Signed-off-by: default avatarTrond Myklebust <Trond.Myklebust@netapp.com>
parent f45663ce
...@@ -942,11 +942,9 @@ call_bind_status(struct rpc_task *task) ...@@ -942,11 +942,9 @@ call_bind_status(struct rpc_task *task)
} }
switch (task->tk_status) { switch (task->tk_status) {
case -EAGAIN: case -ENOMEM:
dprintk("RPC: %5u rpcbind waiting for another request " dprintk("RPC: %5u rpcbind out of memory\n", task->tk_pid);
"to finish\n", task->tk_pid); rpc_delay(task, HZ >> 2);
/* avoid busy-waiting here -- could be a network outage. */
rpc_delay(task, 5*HZ);
goto retry_timeout; goto retry_timeout;
case -EACCES: case -EACCES:
dprintk("RPC: %5u remote rpcbind: RPC program/version " dprintk("RPC: %5u remote rpcbind: RPC program/version "
......
...@@ -68,6 +68,7 @@ enum { ...@@ -68,6 +68,7 @@ enum {
#define RPCB_MAXOWNERLEN sizeof(RPCB_OWNER_STRING) #define RPCB_MAXOWNERLEN sizeof(RPCB_OWNER_STRING)
static void rpcb_getport_done(struct rpc_task *, void *); static void rpcb_getport_done(struct rpc_task *, void *);
static void rpcb_map_release(void *data);
static struct rpc_program rpcb_program; static struct rpc_program rpcb_program;
struct rpcbind_args { struct rpcbind_args {
...@@ -80,6 +81,8 @@ struct rpcbind_args { ...@@ -80,6 +81,8 @@ struct rpcbind_args {
const char * r_netid; const char * r_netid;
const char * r_addr; const char * r_addr;
const char * r_owner; const char * r_owner;
int r_status;
}; };
static struct rpc_procinfo rpcb_procedures2[]; static struct rpc_procinfo rpcb_procedures2[];
...@@ -93,14 +96,6 @@ struct rpcb_info { ...@@ -93,14 +96,6 @@ struct rpcb_info {
static struct rpcb_info rpcb_next_version[]; static struct rpcb_info rpcb_next_version[];
static struct rpcb_info rpcb_next_version6[]; static struct rpcb_info rpcb_next_version6[];
static void rpcb_map_release(void *data)
{
struct rpcbind_args *map = data;
xprt_put(map->r_xprt);
kfree(map);
}
static const struct rpc_call_ops rpcb_getport_ops = { static const struct rpc_call_ops rpcb_getport_ops = {
.rpc_call_done = rpcb_getport_done, .rpc_call_done = rpcb_getport_done,
.rpc_release = rpcb_map_release, .rpc_release = rpcb_map_release,
...@@ -112,6 +107,15 @@ static void rpcb_wake_rpcbind_waiters(struct rpc_xprt *xprt, int status) ...@@ -112,6 +107,15 @@ static void rpcb_wake_rpcbind_waiters(struct rpc_xprt *xprt, int status)
rpc_wake_up_status(&xprt->binding, status); rpc_wake_up_status(&xprt->binding, status);
} }
static void rpcb_map_release(void *data)
{
struct rpcbind_args *map = data;
rpcb_wake_rpcbind_waiters(map->r_xprt, map->r_status);
xprt_put(map->r_xprt);
kfree(map);
}
static struct rpc_clnt *rpcb_create(char *hostname, struct sockaddr *srvaddr, static struct rpc_clnt *rpcb_create(char *hostname, struct sockaddr *srvaddr,
size_t salen, int proto, u32 version, size_t salen, int proto, u32 version,
int privileged) int privileged)
...@@ -293,17 +297,16 @@ void rpcb_getport_async(struct rpc_task *task) ...@@ -293,17 +297,16 @@ void rpcb_getport_async(struct rpc_task *task)
/* Autobind on cloned rpc clients is discouraged */ /* Autobind on cloned rpc clients is discouraged */
BUG_ON(clnt->cl_parent != clnt); BUG_ON(clnt->cl_parent != clnt);
/* Put self on the wait queue to ensure we get notified if
* some other task is already attempting to bind the port */
rpc_sleep_on(&xprt->binding, task, NULL);
if (xprt_test_and_set_binding(xprt)) { if (xprt_test_and_set_binding(xprt)) {
status = -EAGAIN; /* tell caller to check again */
dprintk("RPC: %5u %s: waiting for another binder\n", dprintk("RPC: %5u %s: waiting for another binder\n",
task->tk_pid, __func__); task->tk_pid, __func__);
goto bailout_nowake; return;
} }
/* Put self on queue before sending rpcbind request, in case
* rpcb_getport_done completes before we return from rpc_run_task */
rpc_sleep_on(&xprt->binding, task, NULL);
/* Someone else may have bound if we slept */ /* Someone else may have bound if we slept */
if (xprt_bound(xprt)) { if (xprt_bound(xprt)) {
status = 0; status = 0;
...@@ -365,15 +368,15 @@ void rpcb_getport_async(struct rpc_task *task) ...@@ -365,15 +368,15 @@ void rpcb_getport_async(struct rpc_task *task)
map->r_netid = rpc_peeraddr2str(clnt, RPC_DISPLAY_NETID); map->r_netid = rpc_peeraddr2str(clnt, RPC_DISPLAY_NETID);
map->r_addr = rpc_peeraddr2str(rpcb_clnt, RPC_DISPLAY_UNIVERSAL_ADDR); map->r_addr = rpc_peeraddr2str(rpcb_clnt, RPC_DISPLAY_UNIVERSAL_ADDR);
map->r_owner = RPCB_OWNER_STRING; /* ignored for GETADDR */ map->r_owner = RPCB_OWNER_STRING; /* ignored for GETADDR */
map->r_status = -EIO;
child = rpcb_call_async(rpcb_clnt, map, proc); child = rpcb_call_async(rpcb_clnt, map, proc);
rpc_release_client(rpcb_clnt); rpc_release_client(rpcb_clnt);
if (IS_ERR(child)) { if (IS_ERR(child)) {
status = -EIO;
/* rpcb_map_release() has freed the arguments */ /* rpcb_map_release() has freed the arguments */
dprintk("RPC: %5u %s: rpc_run_task failed\n", dprintk("RPC: %5u %s: rpc_run_task failed\n",
task->tk_pid, __func__); task->tk_pid, __func__);
goto bailout_nofree; return;
} }
rpc_put_task(child); rpc_put_task(child);
...@@ -382,7 +385,6 @@ void rpcb_getport_async(struct rpc_task *task) ...@@ -382,7 +385,6 @@ void rpcb_getport_async(struct rpc_task *task)
bailout_nofree: bailout_nofree:
rpcb_wake_rpcbind_waiters(xprt, status); rpcb_wake_rpcbind_waiters(xprt, status);
bailout_nowake:
task->tk_status = status; task->tk_status = status;
} }
EXPORT_SYMBOL_GPL(rpcb_getport_async); EXPORT_SYMBOL_GPL(rpcb_getport_async);
...@@ -421,7 +423,7 @@ static void rpcb_getport_done(struct rpc_task *child, void *data) ...@@ -421,7 +423,7 @@ static void rpcb_getport_done(struct rpc_task *child, void *data)
dprintk("RPC: %5u rpcb_getport_done(status %d, port %u)\n", dprintk("RPC: %5u rpcb_getport_done(status %d, port %u)\n",
child->tk_pid, status, map->r_port); child->tk_pid, status, map->r_port);
rpcb_wake_rpcbind_waiters(xprt, status); map->r_status = status;
} }
static int rpcb_encode_mapping(struct rpc_rqst *req, __be32 *p, static int rpcb_encode_mapping(struct rpc_rqst *req, __be32 *p,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment