Commit 992b3f1d authored by Tejun Heo's avatar Tejun Heo Committed by Eric Van Hensbergen

9p-trans_fd: use single poller

trans_fd used pool of upto 100 pollers to monitor the r/w fds.  The
approach makes sense in userspace back when the only available
interfaces were poll(2) and select(2).  As each event monitor -
trigger - handling iteration took O(n) where `n' is the number of
watched fds, it makes sense to spread them to many pollers such that
the `n' can be divided by the number of pollers.  However, this
doesn't make any sense in kernel because persistent edge triggered
event monitoring is how the whole thing is implemented in the kernel
in the first place.

This patch converts trans_fd to use single poller which watches all
the fds instead of the poll of pollers approach.  All the fds are
registered for monitoring on creation and only the fds with pending
events are scanned when something happens much like how epoll is
implemented.

This change makes trans_fd fd monitoring more efficient and simpler.
Signed-off-by: default avatarTejun Heo <tj@kernel.org>
Signed-off-by: default avatarEric Van Hensbergen <ericvh@gmail.com>
parent 2e532d68
...@@ -44,7 +44,6 @@ ...@@ -44,7 +44,6 @@
#define P9_PORT 564 #define P9_PORT 564
#define MAX_SOCK_BUF (64*1024) #define MAX_SOCK_BUF (64*1024)
#define ERREQFLUSH 1 #define ERREQFLUSH 1
#define SCHED_TIMEOUT 10
#define MAXPOLLWADDR 2 #define MAXPOLLWADDR 2
/** /**
...@@ -135,17 +134,16 @@ struct p9_req { ...@@ -135,17 +134,16 @@ struct p9_req {
struct list_head req_list; struct list_head req_list;
}; };
struct p9_mux_poll_task { struct p9_poll_wait {
struct task_struct *task; struct p9_conn *conn;
struct list_head mux_list; wait_queue_t wait;
int muxnum; wait_queue_head_t *wait_addr;
}; };
/** /**
* struct p9_conn - fd mux connection state information * struct p9_conn - fd mux connection state information
* @lock: protects mux_list (?) * @lock: protects mux_list (?)
* @mux_list: list link for mux to manage multiple connections (?) * @mux_list: list link for mux to manage multiple connections (?)
* @poll_task: task polling on this connection
* @msize: maximum size for connection (dup) * @msize: maximum size for connection (dup)
* @extended: 9p2000.u flag (dup) * @extended: 9p2000.u flag (dup)
* @trans: reference to transport instance for this connection * @trans: reference to transport instance for this connection
...@@ -171,7 +169,6 @@ struct p9_mux_poll_task { ...@@ -171,7 +169,6 @@ struct p9_mux_poll_task {
struct p9_conn { struct p9_conn {
spinlock_t lock; /* protect lock structure */ spinlock_t lock; /* protect lock structure */
struct list_head mux_list; struct list_head mux_list;
struct p9_mux_poll_task *poll_task;
int msize; int msize;
unsigned char extended; unsigned char extended;
struct p9_trans *trans; struct p9_trans *trans;
...@@ -185,8 +182,8 @@ struct p9_conn { ...@@ -185,8 +182,8 @@ struct p9_conn {
int wpos; int wpos;
int wsize; int wsize;
char *wbuf; char *wbuf;
wait_queue_t poll_wait[MAXPOLLWADDR]; struct list_head poll_pending_link;
wait_queue_head_t *poll_waddr[MAXPOLLWADDR]; struct p9_poll_wait poll_wait[MAXPOLLWADDR];
poll_table pt; poll_table pt;
struct work_struct rq; struct work_struct rq;
struct work_struct wq; struct work_struct wq;
...@@ -220,12 +217,10 @@ static void p9_pollwait(struct file *filp, wait_queue_head_t *wait_address, ...@@ -220,12 +217,10 @@ static void p9_pollwait(struct file *filp, wait_queue_head_t *wait_address,
static int p9_fd_write(struct p9_trans *trans, void *v, int len); static int p9_fd_write(struct p9_trans *trans, void *v, int len);
static int p9_fd_read(struct p9_trans *trans, void *v, int len); static int p9_fd_read(struct p9_trans *trans, void *v, int len);
static DEFINE_MUTEX(p9_mux_task_lock); static DEFINE_SPINLOCK(p9_poll_lock);
static LIST_HEAD(p9_poll_pending_list);
static struct workqueue_struct *p9_mux_wq; static struct workqueue_struct *p9_mux_wq;
static struct task_struct *p9_poll_task;
static int p9_mux_num;
static int p9_mux_poll_task_num;
static struct p9_mux_poll_task p9_mux_poll_tasks[100];
static void p9_conn_destroy(struct p9_conn *); static void p9_conn_destroy(struct p9_conn *);
static unsigned int p9_fd_poll(struct p9_trans *trans, static unsigned int p9_fd_poll(struct p9_trans *trans,
...@@ -255,130 +250,23 @@ static void p9_mux_put_tag(struct p9_conn *m, u16 tag) ...@@ -255,130 +250,23 @@ static void p9_mux_put_tag(struct p9_conn *m, u16 tag)
p9_idpool_put(tag, m->tagpool); p9_idpool_put(tag, m->tagpool);
} }
/** static void p9_mux_poll_stop(struct p9_conn *m)
* p9_mux_calc_poll_procs - calculates the number of polling procs
* @muxnum: number of mounts
*
* Calculation is based on the number of mounted v9fs filesystems.
* The current implementation returns sqrt of the number of mounts.
*/
static int p9_mux_calc_poll_procs(int muxnum)
{
int n;
if (p9_mux_poll_task_num)
n = muxnum / p9_mux_poll_task_num +
(muxnum % p9_mux_poll_task_num ? 1 : 0);
else
n = 1;
if (n > ARRAY_SIZE(p9_mux_poll_tasks))
n = ARRAY_SIZE(p9_mux_poll_tasks);
return n;
}
static int p9_mux_poll_start(struct p9_conn *m)
{ {
int i, n; unsigned long flags;
struct p9_mux_poll_task *vpt, *vptlast; int i;
struct task_struct *pproc;
P9_DPRINTK(P9_DEBUG_MUX, "mux %p muxnum %d procnum %d\n", m, p9_mux_num,
p9_mux_poll_task_num);
mutex_lock(&p9_mux_task_lock);
n = p9_mux_calc_poll_procs(p9_mux_num + 1);
if (n > p9_mux_poll_task_num) {
for (i = 0; i < ARRAY_SIZE(p9_mux_poll_tasks); i++) {
if (p9_mux_poll_tasks[i].task == NULL) {
vpt = &p9_mux_poll_tasks[i];
P9_DPRINTK(P9_DEBUG_MUX, "create proc %p\n",
vpt);
pproc = kthread_create(p9_poll_proc, vpt,
"v9fs-poll");
if (!IS_ERR(pproc)) {
vpt->task = pproc;
INIT_LIST_HEAD(&vpt->mux_list);
vpt->muxnum = 0;
p9_mux_poll_task_num++;
wake_up_process(vpt->task);
}
break;
}
}
if (i >= ARRAY_SIZE(p9_mux_poll_tasks)) for (i = 0; i < ARRAY_SIZE(m->poll_wait); i++) {
P9_DPRINTK(P9_DEBUG_ERROR, struct p9_poll_wait *pwait = &m->poll_wait[i];
"warning: no free poll slots\n");
}
n = (p9_mux_num + 1) / p9_mux_poll_task_num +
((p9_mux_num + 1) % p9_mux_poll_task_num ? 1 : 0);
vptlast = NULL;
for (i = 0; i < ARRAY_SIZE(p9_mux_poll_tasks); i++) {
vpt = &p9_mux_poll_tasks[i];
if (vpt->task != NULL) {
vptlast = vpt;
if (vpt->muxnum < n) {
P9_DPRINTK(P9_DEBUG_MUX, "put in proc %d\n", i);
list_add(&m->mux_list, &vpt->mux_list);
vpt->muxnum++;
m->poll_task = vpt;
memset(&m->poll_waddr, 0,
sizeof(m->poll_waddr));
init_poll_funcptr(&m->pt, p9_pollwait);
break;
}
}
}
if (i >= ARRAY_SIZE(p9_mux_poll_tasks)) { if (pwait->wait_addr) {
if (vptlast == NULL) { remove_wait_queue(pwait->wait_addr, &pwait->wait);
mutex_unlock(&p9_mux_task_lock); pwait->wait_addr = NULL;
return -ENOMEM;
} }
P9_DPRINTK(P9_DEBUG_MUX, "put in proc %d\n", i);
list_add(&m->mux_list, &vptlast->mux_list);
vptlast->muxnum++;
m->poll_task = vptlast;
memset(&m->poll_waddr, 0, sizeof(m->poll_waddr));
init_poll_funcptr(&m->pt, p9_pollwait);
} }
p9_mux_num++; spin_lock_irqsave(&p9_poll_lock, flags);
mutex_unlock(&p9_mux_task_lock); list_del_init(&m->poll_pending_link);
spin_unlock_irqrestore(&p9_poll_lock, flags);
return 0;
}
static void p9_mux_poll_stop(struct p9_conn *m)
{
int i;
struct p9_mux_poll_task *vpt;
mutex_lock(&p9_mux_task_lock);
vpt = m->poll_task;
list_del(&m->mux_list);
for (i = 0; i < ARRAY_SIZE(m->poll_waddr); i++) {
if (m->poll_waddr[i] != NULL) {
remove_wait_queue(m->poll_waddr[i], &m->poll_wait[i]);
m->poll_waddr[i] = NULL;
}
}
vpt->muxnum--;
if (!vpt->muxnum) {
P9_DPRINTK(P9_DEBUG_MUX, "destroy proc %p\n", vpt);
kthread_stop(vpt->task);
vpt->task = NULL;
p9_mux_poll_task_num--;
}
p9_mux_num--;
mutex_unlock(&p9_mux_task_lock);
} }
/** /**
...@@ -414,11 +302,8 @@ static struct p9_conn *p9_conn_create(struct p9_trans *trans) ...@@ -414,11 +302,8 @@ static struct p9_conn *p9_conn_create(struct p9_trans *trans)
INIT_LIST_HEAD(&m->unsent_req_list); INIT_LIST_HEAD(&m->unsent_req_list);
INIT_WORK(&m->rq, p9_read_work); INIT_WORK(&m->rq, p9_read_work);
INIT_WORK(&m->wq, p9_write_work); INIT_WORK(&m->wq, p9_write_work);
n = p9_mux_poll_start(m); INIT_LIST_HEAD(&m->poll_pending_link);
if (n) { init_poll_funcptr(&m->pt, p9_pollwait);
kfree(m);
return ERR_PTR(n);
}
n = p9_fd_poll(trans, &m->pt); n = p9_fd_poll(trans, &m->pt);
if (n & POLLIN) { if (n & POLLIN) {
...@@ -431,11 +316,12 @@ static struct p9_conn *p9_conn_create(struct p9_trans *trans) ...@@ -431,11 +316,12 @@ static struct p9_conn *p9_conn_create(struct p9_trans *trans)
set_bit(Wpending, &m->wsched); set_bit(Wpending, &m->wsched);
} }
for (i = 0; i < ARRAY_SIZE(m->poll_waddr); i++) { for (i = 0; i < ARRAY_SIZE(m->poll_wait); i++) {
if (IS_ERR(m->poll_waddr[i])) { if (IS_ERR(m->poll_wait[i].wait_addr)) {
p9_mux_poll_stop(m); p9_mux_poll_stop(m);
kfree(m); kfree(m);
return (void *)m->poll_waddr; /* the error code */ /* return the error code */
return (void *)m->poll_wait[i].wait_addr;
} }
} }
...@@ -464,6 +350,23 @@ static void p9_conn_destroy(struct p9_conn *m) ...@@ -464,6 +350,23 @@ static void p9_conn_destroy(struct p9_conn *m)
kfree(m); kfree(m);
} }
static int p9_pollwake(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
struct p9_poll_wait *pwait =
container_of(wait, struct p9_poll_wait, wait);
struct p9_conn *m = pwait->conn;
unsigned long flags;
DECLARE_WAITQUEUE(dummy_wait, p9_poll_task);
spin_lock_irqsave(&p9_poll_lock, flags);
if (list_empty(&m->poll_pending_link))
list_add_tail(&m->poll_pending_link, &p9_poll_pending_list);
spin_unlock_irqrestore(&p9_poll_lock, flags);
/* perform the default wake up operation */
return default_wake_function(&dummy_wait, mode, sync, key);
}
/** /**
* p9_pollwait - add poll task to the wait queue * p9_pollwait - add poll task to the wait queue
* @filp: file pointer being polled * @filp: file pointer being polled
...@@ -476,29 +379,32 @@ static void p9_conn_destroy(struct p9_conn *m) ...@@ -476,29 +379,32 @@ static void p9_conn_destroy(struct p9_conn *m)
static void static void
p9_pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p) p9_pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p)
{ {
struct p9_conn *m = container_of(p, struct p9_conn, pt);
struct p9_poll_wait *pwait = NULL;
int i; int i;
struct p9_conn *m;
m = container_of(p, struct p9_conn, pt); for (i = 0; i < ARRAY_SIZE(m->poll_wait); i++) {
for (i = 0; i < ARRAY_SIZE(m->poll_waddr); i++) if (m->poll_wait[i].wait_addr == NULL) {
if (m->poll_waddr[i] == NULL) pwait = &m->poll_wait[i];
break; break;
}
}
if (i >= ARRAY_SIZE(m->poll_waddr)) { if (!pwait) {
P9_DPRINTK(P9_DEBUG_ERROR, "not enough wait_address slots\n"); P9_DPRINTK(P9_DEBUG_ERROR, "not enough wait_address slots\n");
return; return;
} }
m->poll_waddr[i] = wait_address;
if (!wait_address) { if (!wait_address) {
P9_DPRINTK(P9_DEBUG_ERROR, "no wait_address\n"); P9_DPRINTK(P9_DEBUG_ERROR, "no wait_address\n");
m->poll_waddr[i] = ERR_PTR(-EIO); pwait->wait_addr = ERR_PTR(-EIO);
return; return;
} }
init_waitqueue_entry(&m->poll_wait[i], m->poll_task->task); pwait->conn = m;
add_wait_queue(wait_address, &m->poll_wait[i]); pwait->wait_addr = wait_address;
init_waitqueue_func_entry(&pwait->wait, p9_pollwake);
add_wait_queue(wait_address, &pwait->wait);
} }
/** /**
...@@ -553,23 +459,34 @@ static void p9_poll_mux(struct p9_conn *m) ...@@ -553,23 +459,34 @@ static void p9_poll_mux(struct p9_conn *m)
static int p9_poll_proc(void *a) static int p9_poll_proc(void *a)
{ {
struct p9_conn *m, *mtmp; unsigned long flags;
struct p9_mux_poll_task *vpt;
vpt = a; P9_DPRINTK(P9_DEBUG_MUX, "start %p\n", current);
P9_DPRINTK(P9_DEBUG_MUX, "start %p %p\n", current, vpt); repeat:
while (!kthread_should_stop()) { spin_lock_irqsave(&p9_poll_lock, flags);
set_current_state(TASK_INTERRUPTIBLE); while (!list_empty(&p9_poll_pending_list)) {
struct p9_conn *conn = list_first_entry(&p9_poll_pending_list,
struct p9_conn,
poll_pending_link);
list_del_init(&conn->poll_pending_link);
spin_unlock_irqrestore(&p9_poll_lock, flags);
p9_poll_mux(conn);
list_for_each_entry_safe(m, mtmp, &vpt->mux_list, mux_list) { spin_lock_irqsave(&p9_poll_lock, flags);
p9_poll_mux(m);
} }
spin_unlock_irqrestore(&p9_poll_lock, flags);
set_current_state(TASK_INTERRUPTIBLE);
if (list_empty(&p9_poll_pending_list)) {
P9_DPRINTK(P9_DEBUG_MUX, "sleeping...\n"); P9_DPRINTK(P9_DEBUG_MUX, "sleeping...\n");
schedule_timeout(SCHED_TIMEOUT * HZ); schedule();
} }
__set_current_state(TASK_RUNNING); __set_current_state(TASK_RUNNING);
if (!kthread_should_stop())
goto repeat;
P9_DPRINTK(P9_DEBUG_MUX, "finish\n"); P9_DPRINTK(P9_DEBUG_MUX, "finish\n");
return 0; return 0;
} }
...@@ -1602,17 +1519,19 @@ static struct p9_trans_module p9_fd_trans = { ...@@ -1602,17 +1519,19 @@ static struct p9_trans_module p9_fd_trans = {
int p9_trans_fd_init(void) int p9_trans_fd_init(void)
{ {
int i;
for (i = 0; i < ARRAY_SIZE(p9_mux_poll_tasks); i++)
p9_mux_poll_tasks[i].task = NULL;
p9_mux_wq = create_workqueue("v9fs"); p9_mux_wq = create_workqueue("v9fs");
if (!p9_mux_wq) { if (!p9_mux_wq) {
printk(KERN_WARNING "v9fs: mux: creating workqueue failed\n"); printk(KERN_WARNING "v9fs: mux: creating workqueue failed\n");
return -ENOMEM; return -ENOMEM;
} }
p9_poll_task = kthread_run(p9_poll_proc, NULL, "v9fs-poll");
if (IS_ERR(p9_poll_task)) {
destroy_workqueue(p9_mux_wq);
printk(KERN_WARNING "v9fs: mux: creating poll task failed\n");
return PTR_ERR(p9_poll_task);
}
v9fs_register_trans(&p9_tcp_trans); v9fs_register_trans(&p9_tcp_trans);
v9fs_register_trans(&p9_unix_trans); v9fs_register_trans(&p9_unix_trans);
v9fs_register_trans(&p9_fd_trans); v9fs_register_trans(&p9_fd_trans);
...@@ -1622,6 +1541,7 @@ int p9_trans_fd_init(void) ...@@ -1622,6 +1541,7 @@ int p9_trans_fd_init(void)
void p9_trans_fd_exit(void) void p9_trans_fd_exit(void)
{ {
kthread_stop(p9_poll_task);
v9fs_unregister_trans(&p9_tcp_trans); v9fs_unregister_trans(&p9_tcp_trans);
v9fs_unregister_trans(&p9_unix_trans); v9fs_unregister_trans(&p9_unix_trans);
v9fs_unregister_trans(&p9_fd_trans); v9fs_unregister_trans(&p9_fd_trans);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment