Commit 4a68179d authored by Chuck Lever's avatar Chuck Lever Committed by Trond Myklebust

SUNRPC: Make RPC portmapper use per-transport storage

Move connection and bind state that was maintained in the rpc_clnt
structure to the rpc_xprt structure.  This will allow the creation of
a clean API for plugging in different types of bind mechanisms.

This brings improvements such as the elimination of a single spin lock to
control serialization for all in-kernel RPC binding.  A set of per-xprt
bitops is used to serialize tasks during RPC binding, just like it now
works for making RPC transport connections.

Test-plan:
Destructive testing (unplugging the network temporarily).  Connectathon
with UDP and TCP.  NFSv2/3 and NFSv4 mounting should be carefully checked.
Probably need to rig a server where certain services aren't running, or
that returns an error for some typical operation.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarTrond Myklebust <Trond.Myklebust@netapp.com>
parent ec739ef0
...@@ -18,18 +18,6 @@ ...@@ -18,18 +18,6 @@
#include <linux/sunrpc/timer.h> #include <linux/sunrpc/timer.h>
#include <asm/signal.h> #include <asm/signal.h>
/*
* This defines an RPC port mapping
*/
struct rpc_portmap {
__u32 pm_prog;
__u32 pm_vers;
__u32 pm_prot;
__u16 pm_port;
unsigned char pm_binding : 1; /* doing a getport() */
struct rpc_wait_queue pm_bindwait; /* waiting on getport() */
};
struct rpc_inode; struct rpc_inode;
/* /*
...@@ -40,7 +28,9 @@ struct rpc_clnt { ...@@ -40,7 +28,9 @@ struct rpc_clnt {
atomic_t cl_users; /* number of references */ atomic_t cl_users; /* number of references */
struct rpc_xprt * cl_xprt; /* transport */ struct rpc_xprt * cl_xprt; /* transport */
struct rpc_procinfo * cl_procinfo; /* procedure info */ struct rpc_procinfo * cl_procinfo; /* procedure info */
u32 cl_maxproc; /* max procedure number */ u32 cl_prog, /* RPC program number */
cl_vers, /* RPC version number */
cl_maxproc; /* max procedure number */
char * cl_server; /* server machine name */ char * cl_server; /* server machine name */
char * cl_protname; /* protocol name */ char * cl_protname; /* protocol name */
...@@ -55,7 +45,6 @@ struct rpc_clnt { ...@@ -55,7 +45,6 @@ struct rpc_clnt {
cl_dead : 1;/* abandoned */ cl_dead : 1;/* abandoned */
struct rpc_rtt * cl_rtt; /* RTO estimator data */ struct rpc_rtt * cl_rtt; /* RTO estimator data */
struct rpc_portmap * cl_pmap; /* port mapping */
int cl_nodelen; /* nodename length */ int cl_nodelen; /* nodename length */
char cl_nodename[UNX_MAXNODENAME]; char cl_nodename[UNX_MAXNODENAME];
...@@ -64,14 +53,8 @@ struct rpc_clnt { ...@@ -64,14 +53,8 @@ struct rpc_clnt {
struct dentry * cl_dentry; /* inode */ struct dentry * cl_dentry; /* inode */
struct rpc_clnt * cl_parent; /* Points to parent of clones */ struct rpc_clnt * cl_parent; /* Points to parent of clones */
struct rpc_rtt cl_rtt_default; struct rpc_rtt cl_rtt_default;
struct rpc_portmap cl_pmap_default;
char cl_inline_name[32]; char cl_inline_name[32];
}; };
#define cl_timeout cl_xprt->timeout
#define cl_prog cl_pmap->pm_prog
#define cl_vers cl_pmap->pm_vers
#define cl_port cl_pmap->pm_port
#define cl_prot cl_pmap->pm_prot
/* /*
* General RPC program info * General RPC program info
......
...@@ -138,6 +138,7 @@ struct rpc_xprt { ...@@ -138,6 +138,7 @@ struct rpc_xprt {
unsigned int tsh_size; /* size of transport specific unsigned int tsh_size; /* size of transport specific
header */ header */
struct rpc_wait_queue binding; /* requests waiting on rpcbind */
struct rpc_wait_queue sending; /* requests waiting to send */ struct rpc_wait_queue sending; /* requests waiting to send */
struct rpc_wait_queue resend; /* requests waiting to resend */ struct rpc_wait_queue resend; /* requests waiting to resend */
struct rpc_wait_queue pending; /* requests in flight */ struct rpc_wait_queue pending; /* requests in flight */
...@@ -270,6 +271,7 @@ int xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to); ...@@ -270,6 +271,7 @@ int xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to);
#define XPRT_CONNECTING (2) #define XPRT_CONNECTING (2)
#define XPRT_CLOSE_WAIT (3) #define XPRT_CLOSE_WAIT (3)
#define XPRT_BOUND (4) #define XPRT_BOUND (4)
#define XPRT_BINDING (5)
static inline void xprt_set_connected(struct rpc_xprt *xprt) static inline void xprt_set_connected(struct rpc_xprt *xprt)
{ {
...@@ -328,6 +330,18 @@ static inline void xprt_clear_bound(struct rpc_xprt *xprt) ...@@ -328,6 +330,18 @@ static inline void xprt_clear_bound(struct rpc_xprt *xprt)
clear_bit(XPRT_BOUND, &xprt->state); clear_bit(XPRT_BOUND, &xprt->state);
} }
static inline void xprt_clear_binding(struct rpc_xprt *xprt)
{
smp_mb__before_clear_bit();
clear_bit(XPRT_BINDING, &xprt->state);
smp_mb__after_clear_bit();
}
static inline int xprt_test_and_set_binding(struct rpc_xprt *xprt)
{
return test_and_set_bit(XPRT_BINDING, &xprt->state);
}
#endif /* __KERNEL__*/ #endif /* __KERNEL__*/
#endif /* _LINUX_SUNRPC_XPRT_H */ #endif /* _LINUX_SUNRPC_XPRT_H */
...@@ -147,13 +147,10 @@ rpc_new_client(struct rpc_xprt *xprt, char *servname, ...@@ -147,13 +147,10 @@ rpc_new_client(struct rpc_xprt *xprt, char *servname,
clnt->cl_procinfo = version->procs; clnt->cl_procinfo = version->procs;
clnt->cl_maxproc = version->nrprocs; clnt->cl_maxproc = version->nrprocs;
clnt->cl_protname = program->name; clnt->cl_protname = program->name;
clnt->cl_pmap = &clnt->cl_pmap_default;
clnt->cl_prog = program->number; clnt->cl_prog = program->number;
clnt->cl_vers = version->number; clnt->cl_vers = version->number;
clnt->cl_prot = xprt->prot;
clnt->cl_stats = program->stats; clnt->cl_stats = program->stats;
clnt->cl_metrics = rpc_alloc_iostats(clnt); clnt->cl_metrics = rpc_alloc_iostats(clnt);
rpc_init_wait_queue(&clnt->cl_pmap_default.pm_bindwait, "bindwait");
if (!xprt_bound(clnt->cl_xprt)) if (!xprt_bound(clnt->cl_xprt))
clnt->cl_autobind = 1; clnt->cl_autobind = 1;
...@@ -243,8 +240,6 @@ rpc_clone_client(struct rpc_clnt *clnt) ...@@ -243,8 +240,6 @@ rpc_clone_client(struct rpc_clnt *clnt)
atomic_set(&new->cl_users, 0); atomic_set(&new->cl_users, 0);
new->cl_parent = clnt; new->cl_parent = clnt;
atomic_inc(&clnt->cl_count); atomic_inc(&clnt->cl_count);
/* Duplicate portmapper */
rpc_init_wait_queue(&new->cl_pmap_default.pm_bindwait, "bindwait");
/* Turn off autobind on clones */ /* Turn off autobind on clones */
new->cl_autobind = 0; new->cl_autobind = 0;
new->cl_oneshot = 0; new->cl_oneshot = 0;
...@@ -254,7 +249,6 @@ rpc_clone_client(struct rpc_clnt *clnt) ...@@ -254,7 +249,6 @@ rpc_clone_client(struct rpc_clnt *clnt)
rpc_init_rtt(&new->cl_rtt_default, clnt->cl_xprt->timeout.to_initval); rpc_init_rtt(&new->cl_rtt_default, clnt->cl_xprt->timeout.to_initval);
if (new->cl_auth) if (new->cl_auth)
atomic_inc(&new->cl_auth->au_count); atomic_inc(&new->cl_auth->au_count);
new->cl_pmap = &new->cl_pmap_default;
new->cl_metrics = rpc_alloc_iostats(clnt); new->cl_metrics = rpc_alloc_iostats(clnt);
return new; return new;
out_no_clnt: out_no_clnt:
......
...@@ -24,11 +24,57 @@ ...@@ -24,11 +24,57 @@
#define PMAP_UNSET 2 #define PMAP_UNSET 2
#define PMAP_GETPORT 3 #define PMAP_GETPORT 3
struct portmap_args {
u32 pm_prog;
u32 pm_vers;
u32 pm_prot;
unsigned short pm_port;
struct rpc_task * pm_task;
};
static struct rpc_procinfo pmap_procedures[]; static struct rpc_procinfo pmap_procedures[];
static struct rpc_clnt * pmap_create(char *, struct sockaddr_in *, int, int); static struct rpc_clnt * pmap_create(char *, struct sockaddr_in *, int, int);
static void pmap_getport_done(struct rpc_task *); static void pmap_getport_done(struct rpc_task *, void *);
static struct rpc_program pmap_program; static struct rpc_program pmap_program;
static DEFINE_SPINLOCK(pmap_lock);
static void pmap_getport_prepare(struct rpc_task *task, void *calldata)
{
struct portmap_args *map = calldata;
struct rpc_message msg = {
.rpc_proc = &pmap_procedures[PMAP_GETPORT],
.rpc_argp = map,
.rpc_resp = &map->pm_port,
};
rpc_call_setup(task, &msg, 0);
}
static inline struct portmap_args *pmap_map_alloc(void)
{
return kmalloc(sizeof(struct portmap_args), GFP_NOFS);
}
static inline void pmap_map_free(struct portmap_args *map)
{
kfree(map);
}
static void pmap_map_release(void *data)
{
pmap_map_free(data);
}
static const struct rpc_call_ops pmap_getport_ops = {
.rpc_call_prepare = pmap_getport_prepare,
.rpc_call_done = pmap_getport_done,
.rpc_release = pmap_map_release,
};
static inline void pmap_wake_portmap_waiters(struct rpc_xprt *xprt)
{
xprt_clear_binding(xprt);
rpc_wake_up(&xprt->binding);
}
/* /*
* Obtain the port for a given RPC service on a given host. This one can * Obtain the port for a given RPC service on a given host. This one can
...@@ -37,67 +83,71 @@ static DEFINE_SPINLOCK(pmap_lock); ...@@ -37,67 +83,71 @@ static DEFINE_SPINLOCK(pmap_lock);
void void
rpc_getport(struct rpc_task *task, struct rpc_clnt *clnt) rpc_getport(struct rpc_task *task, struct rpc_clnt *clnt)
{ {
struct rpc_portmap *map = clnt->cl_pmap; struct rpc_xprt *xprt = task->tk_xprt;
struct sockaddr_in *sap = &clnt->cl_xprt->addr; struct sockaddr_in *sap = &xprt->addr;
struct rpc_message msg = { struct portmap_args *map;
.rpc_proc = &pmap_procedures[PMAP_GETPORT],
.rpc_argp = map,
.rpc_resp = &clnt->cl_port,
.rpc_cred = NULL
};
struct rpc_clnt *pmap_clnt; struct rpc_clnt *pmap_clnt;
struct rpc_task *child; struct rpc_task *child;
dprintk("RPC: %4d rpc_getport(%s, %d, %d, %d)\n", dprintk("RPC: %4d rpc_getport(%s, %u, %u, %d)\n",
task->tk_pid, clnt->cl_server, task->tk_pid, clnt->cl_server,
map->pm_prog, map->pm_vers, map->pm_prot); clnt->cl_prog, clnt->cl_vers, xprt->prot);
/* Autobind on cloned rpc clients is discouraged */ /* Autobind on cloned rpc clients is discouraged */
BUG_ON(clnt->cl_parent != clnt); BUG_ON(clnt->cl_parent != clnt);
spin_lock(&pmap_lock); if (xprt_test_and_set_binding(xprt)) {
if (map->pm_binding) { task->tk_status = -EACCES; /* tell caller to check again */
rpc_sleep_on(&map->pm_bindwait, task, NULL, NULL); rpc_sleep_on(&xprt->binding, task, NULL, NULL);
spin_unlock(&pmap_lock);
return; return;
} }
map->pm_binding = 1;
spin_unlock(&pmap_lock); /* Someone else may have bound if we slept */
if (xprt_bound(xprt)) {
task->tk_status = 0;
goto bailout_nofree;
}
map = pmap_map_alloc();
if (!map) {
task->tk_status = -ENOMEM;
goto bailout_nofree;
}
map->pm_prog = clnt->cl_prog;
map->pm_vers = clnt->cl_vers;
map->pm_prot = xprt->prot;
map->pm_port = 0;
map->pm_task = task;
pmap_clnt = pmap_create(clnt->cl_server, sap, map->pm_prot, 0); pmap_clnt = pmap_create(clnt->cl_server, sap, map->pm_prot, 0);
if (IS_ERR(pmap_clnt)) { if (IS_ERR(pmap_clnt)) {
task->tk_status = PTR_ERR(pmap_clnt); task->tk_status = PTR_ERR(pmap_clnt);
goto bailout; goto bailout;
} }
task->tk_status = 0;
/* child = rpc_run_task(pmap_clnt, RPC_TASK_ASYNC, &pmap_getport_ops, map);
* Note: rpc_new_child will release client after a failure. if (IS_ERR(child)) {
*/ task->tk_status = -EIO;
if (!(child = rpc_new_child(pmap_clnt, task)))
goto bailout; goto bailout;
}
rpc_release_task(child);
/* Setup the call info struct */ rpc_sleep_on(&xprt->binding, task, NULL, NULL);
rpc_call_setup(child, &msg, 0);
/* ... and run the child task */
task->tk_xprt->stat.bind_count++; task->tk_xprt->stat.bind_count++;
rpc_run_child(task, child, pmap_getport_done);
return; return;
bailout: bailout:
spin_lock(&pmap_lock); pmap_map_free(map);
map->pm_binding = 0; bailout_nofree:
rpc_wake_up(&map->pm_bindwait); pmap_wake_portmap_waiters(xprt);
spin_unlock(&pmap_lock);
rpc_exit(task, -EIO);
} }
#ifdef CONFIG_ROOT_NFS #ifdef CONFIG_ROOT_NFS
int int
rpc_getport_external(struct sockaddr_in *sin, __u32 prog, __u32 vers, int prot) rpc_getport_external(struct sockaddr_in *sin, __u32 prog, __u32 vers, int prot)
{ {
struct rpc_portmap map = { struct portmap_args map = {
.pm_prog = prog, .pm_prog = prog,
.pm_vers = vers, .pm_vers = vers,
.pm_prot = prot, .pm_prot = prot,
...@@ -133,32 +183,32 @@ rpc_getport_external(struct sockaddr_in *sin, __u32 prog, __u32 vers, int prot) ...@@ -133,32 +183,32 @@ rpc_getport_external(struct sockaddr_in *sin, __u32 prog, __u32 vers, int prot)
#endif #endif
static void static void
pmap_getport_done(struct rpc_task *task) pmap_getport_done(struct rpc_task *child, void *data)
{ {
struct rpc_clnt *clnt = task->tk_client; struct portmap_args *map = data;
struct rpc_task *task = map->pm_task;
struct rpc_xprt *xprt = task->tk_xprt; struct rpc_xprt *xprt = task->tk_xprt;
struct rpc_portmap *map = clnt->cl_pmap; int status = child->tk_status;
dprintk("RPC: %4d pmap_getport_done(status %d, port %d)\n",
task->tk_pid, task->tk_status, clnt->cl_port);
if (task->tk_status < 0) { if (status < 0) {
/* Make the calling task exit with an error */ /* Portmapper not available */
xprt->ops->set_port(xprt, 0); xprt->ops->set_port(xprt, 0);
task->tk_action = rpc_exit_task; task->tk_status = status;
} else if (clnt->cl_port == 0) { } else if (map->pm_port == 0) {
/* Program not registered */ /* Requested RPC service wasn't registered */
xprt->ops->set_port(xprt, 0); xprt->ops->set_port(xprt, 0);
rpc_exit(task, -EACCES); task->tk_status = -EACCES;
} else { } else {
xprt->ops->set_port(xprt, clnt->cl_port); /* Succeeded */
xprt->ops->set_port(xprt, map->pm_port);
xprt_set_bound(xprt); xprt_set_bound(xprt);
clnt->cl_port = htons(clnt->cl_port); task->tk_status = 0;
} }
spin_lock(&pmap_lock);
map->pm_binding = 0; dprintk("RPC: %4d pmap_getport_done(status %d, port %u)\n",
rpc_wake_up(&map->pm_bindwait); child->tk_pid, child->tk_status, map->pm_port);
spin_unlock(&pmap_lock);
pmap_wake_portmap_waiters(xprt);
} }
/* /*
...@@ -172,7 +222,7 @@ rpc_register(u32 prog, u32 vers, int prot, unsigned short port, int *okay) ...@@ -172,7 +222,7 @@ rpc_register(u32 prog, u32 vers, int prot, unsigned short port, int *okay)
.sin_family = AF_INET, .sin_family = AF_INET,
.sin_addr.s_addr = htonl(INADDR_LOOPBACK), .sin_addr.s_addr = htonl(INADDR_LOOPBACK),
}; };
struct rpc_portmap map = { struct portmap_args map = {
.pm_prog = prog, .pm_prog = prog,
.pm_vers = vers, .pm_vers = vers,
.pm_prot = prot, .pm_prot = prot,
...@@ -239,7 +289,7 @@ pmap_create(char *hostname, struct sockaddr_in *srvaddr, int proto, int privileg ...@@ -239,7 +289,7 @@ pmap_create(char *hostname, struct sockaddr_in *srvaddr, int proto, int privileg
* XDR encode/decode functions for PMAP * XDR encode/decode functions for PMAP
*/ */
static int static int
xdr_encode_mapping(struct rpc_rqst *req, u32 *p, struct rpc_portmap *map) xdr_encode_mapping(struct rpc_rqst *req, u32 *p, struct portmap_args *map)
{ {
dprintk("RPC: xdr_encode_mapping(%d, %d, %d, %d)\n", dprintk("RPC: xdr_encode_mapping(%d, %d, %d, %d)\n",
map->pm_prog, map->pm_vers, map->pm_prot, map->pm_port); map->pm_prog, map->pm_vers, map->pm_prot, map->pm_port);
......
...@@ -928,6 +928,7 @@ static struct rpc_xprt *xprt_setup(int proto, struct sockaddr_in *ap, struct rpc ...@@ -928,6 +928,7 @@ static struct rpc_xprt *xprt_setup(int proto, struct sockaddr_in *ap, struct rpc
xprt->last_used = jiffies; xprt->last_used = jiffies;
xprt->cwnd = RPC_INITCWND; xprt->cwnd = RPC_INITCWND;
rpc_init_wait_queue(&xprt->binding, "xprt_binding");
rpc_init_wait_queue(&xprt->pending, "xprt_pending"); rpc_init_wait_queue(&xprt->pending, "xprt_pending");
rpc_init_wait_queue(&xprt->sending, "xprt_sending"); rpc_init_wait_queue(&xprt->sending, "xprt_sending");
rpc_init_wait_queue(&xprt->resend, "xprt_resend"); rpc_init_wait_queue(&xprt->resend, "xprt_resend");
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment