Commit d721304d authored by Oleg Nesterov's avatar Oleg Nesterov Committed by Linus Torvalds

workqueue: fix flush_workqueue() vs CPU_DEAD race

Many thanks to Srivatsa Vaddagiri for the helpful discussion and for spotting
the bug in my previous attempt.

work->func() (and thus flush_workqueue()) must not use workqueue_mutex,
this leads to deadlock when CPU_DEAD does kthread_stop(). However without
this mutex held we can't detect CPU_DEAD in progress, which can move pending
works to another CPU while the dead one is not on cpu_online_map.

Change flush_workqueue() to use for_each_possible_cpu(). This means that
flush_cpu_workqueue() may hit CPU which is already dead. However in that
case

	!list_empty(&cwq->worklist) || cwq->current_work != NULL

means that CPU_DEAD in progress, it will do kthread_stop() + take_over_work()
so we can proceed and insert a barrier. We hold cwq->lock, so we are safe.

Also, add migrate_sequence incremented by take_over_work() under cwq->lock.
If take_over_work() happened before we checked this CPU, we should see the
new value after spin_unlock().

Further possible changes:

	remove CPU_DEAD handling (along with take_over_work, migrate_sequence)
	from workqueue.c. CPU_DEAD just sets cwq->please_exit_after_flush flag.

	CPU_UP_PREPARE->create_workqueue_thread() clears this flag, and creates
	the new thread if cwq->thread == NULL.

This way the workqueue/cpu-hotplug interaction is almost zero, workqueue_mutex
just protects "workqueues" list, CPU_LOCK_ACQUIRE/CPU_LOCK_RELEASE go away.
Signed-off-by: default avatarOleg Nesterov <oleg@tv-sign.ru>
Cc: Srivatsa Vaddagiri <vatsa@in.ibm.com>
Cc: "Pallipadi, Venkatesh" <venkatesh.pallipadi@intel.com>
Cc: Gautham shenoy <ego@in.ibm.com>
Signed-off-by: default avatarAndrew Morton <akpm@linux-foundation.org>
Signed-off-by: default avatarLinus Torvalds <torvalds@linux-foundation.org>
parent 319c2a98
...@@ -64,6 +64,7 @@ struct workqueue_struct { ...@@ -64,6 +64,7 @@ struct workqueue_struct {
/* All the per-cpu workqueues on the system, for hotplug cpu to add/remove /* All the per-cpu workqueues on the system, for hotplug cpu to add/remove
threads to each one as cpus come/go. */ threads to each one as cpus come/go. */
static long migrate_sequence __read_mostly;
static DEFINE_MUTEX(workqueue_mutex); static DEFINE_MUTEX(workqueue_mutex);
static LIST_HEAD(workqueues); static LIST_HEAD(workqueues);
...@@ -421,13 +422,7 @@ static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) ...@@ -421,13 +422,7 @@ static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
* Probably keventd trying to flush its own queue. So simply run * Probably keventd trying to flush its own queue. So simply run
* it by hand rather than deadlocking. * it by hand rather than deadlocking.
*/ */
preempt_enable();
/*
* We can still touch *cwq here because we are keventd, and
* hot-unplug will be waiting us to exit.
*/
run_workqueue(cwq); run_workqueue(cwq);
preempt_disable();
} else { } else {
struct wq_barrier barr; struct wq_barrier barr;
int active = 0; int active = 0;
...@@ -439,11 +434,8 @@ static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) ...@@ -439,11 +434,8 @@ static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
} }
spin_unlock_irq(&cwq->lock); spin_unlock_irq(&cwq->lock);
if (active) { if (active)
preempt_enable();
wait_for_completion(&barr.done); wait_for_completion(&barr.done);
preempt_disable();
}
} }
} }
...@@ -462,17 +454,21 @@ static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) ...@@ -462,17 +454,21 @@ static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
*/ */
void fastcall flush_workqueue(struct workqueue_struct *wq) void fastcall flush_workqueue(struct workqueue_struct *wq)
{ {
preempt_disable(); /* CPU hotplug */
if (is_single_threaded(wq)) { if (is_single_threaded(wq)) {
/* Always use first cpu's area. */ /* Always use first cpu's area. */
flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu)); flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));
} else { } else {
long sequence;
int cpu; int cpu;
again:
sequence = migrate_sequence;
for_each_online_cpu(cpu) for_each_possible_cpu(cpu)
flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu)); flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
if (unlikely(sequence != migrate_sequence))
goto again;
} }
preempt_enable();
} }
EXPORT_SYMBOL_GPL(flush_workqueue); EXPORT_SYMBOL_GPL(flush_workqueue);
...@@ -544,17 +540,21 @@ out: ...@@ -544,17 +540,21 @@ out:
} }
EXPORT_SYMBOL_GPL(flush_work); EXPORT_SYMBOL_GPL(flush_work);
static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq, static void init_cpu_workqueue(struct workqueue_struct *wq, int cpu)
int cpu)
{ {
struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
struct task_struct *p;
spin_lock_init(&cwq->lock);
cwq->wq = wq; cwq->wq = wq;
cwq->thread = NULL; spin_lock_init(&cwq->lock);
INIT_LIST_HEAD(&cwq->worklist); INIT_LIST_HEAD(&cwq->worklist);
init_waitqueue_head(&cwq->more_work); init_waitqueue_head(&cwq->more_work);
}
static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,
int cpu)
{
struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
struct task_struct *p;
if (is_single_threaded(wq)) if (is_single_threaded(wq))
p = kthread_create(worker_thread, cwq, "%s", wq->name); p = kthread_create(worker_thread, cwq, "%s", wq->name);
...@@ -589,6 +589,7 @@ struct workqueue_struct *__create_workqueue(const char *name, ...@@ -589,6 +589,7 @@ struct workqueue_struct *__create_workqueue(const char *name,
mutex_lock(&workqueue_mutex); mutex_lock(&workqueue_mutex);
if (singlethread) { if (singlethread) {
INIT_LIST_HEAD(&wq->list); INIT_LIST_HEAD(&wq->list);
init_cpu_workqueue(wq, singlethread_cpu);
p = create_workqueue_thread(wq, singlethread_cpu); p = create_workqueue_thread(wq, singlethread_cpu);
if (!p) if (!p)
destroy = 1; destroy = 1;
...@@ -596,7 +597,11 @@ struct workqueue_struct *__create_workqueue(const char *name, ...@@ -596,7 +597,11 @@ struct workqueue_struct *__create_workqueue(const char *name,
wake_up_process(p); wake_up_process(p);
} else { } else {
list_add(&wq->list, &workqueues); list_add(&wq->list, &workqueues);
for_each_online_cpu(cpu) { for_each_possible_cpu(cpu) {
init_cpu_workqueue(wq, cpu);
if (!cpu_online(cpu))
continue;
p = create_workqueue_thread(wq, cpu); p = create_workqueue_thread(wq, cpu);
if (p) { if (p) {
kthread_bind(p, cpu); kthread_bind(p, cpu);
...@@ -831,6 +836,7 @@ static void take_over_work(struct workqueue_struct *wq, unsigned int cpu) ...@@ -831,6 +836,7 @@ static void take_over_work(struct workqueue_struct *wq, unsigned int cpu)
spin_lock_irq(&cwq->lock); spin_lock_irq(&cwq->lock);
list_replace_init(&cwq->worklist, &list); list_replace_init(&cwq->worklist, &list);
migrate_sequence++;
while (!list_empty(&list)) { while (!list_empty(&list)) {
printk("Taking work for %s\n", wq->name); printk("Taking work for %s\n", wq->name);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment