Commit 33a93eaa authored by Paul E. McKenney's avatar Paul E. McKenney Committed by Thomas Gleixner

RCU: QRCU with lockless fastpath

Hello!

This is an updated version of Oleg Nesterov's QRCU that avoids the
earlier lock acquisition on the synchronize_qrcu() fastpath.  This passes
rcutorture on x86 and the weakly ordered POWER.  A promela model of the
code passes as noted before for 2 readers and 3 updaters and for 3 readers
and 2 updaters.  3 readers and 3 updaters runs every machine that I have
access to out of memory -- nothing like a little combinatorial explosion!
However, after some thought, the proof ended up being simple enough:

1.	If synchronize_qrcu() exits too soon, then by definition
	there has been a reader present during synchronize_srcu()'s
	full execution.

2.	The counter corresponding to this reader will be at least
	1 at all times.

3.	The synchronize_qrcu() code forces at least one of the counters
	to be at least one at all times -- if there is a reader, the
	sum will be at least two.  (Unfortunately, we cannot fetch
	the pair of counters atomically.)

4.	Therefore, the only way that synchronize_qrcu()s fastpath can
	see a sum of 1 is if it races with another synchronize_qrcu() --
	the first synchronize_qrcu() must read one of the counters before
	the second synchronize_qrcu() increments it, and must read the
	other counter after the second synchronize_qrcu() decrements it.
	There can be at most one reader present through this entire
	operation -- otherwise, the first synchronize_qrcu() will see
	a sum of 2 or greater.

5.	But the second synchronize_qrcu() will not release the mutex
	until after the reader is done.  During this time, the first
	synchronize_qrcu() will always see a sum of at least 2, and
	therefore cannot take the remainder of the fastpath until the
	reader is done.

6.	Because the second synchronize_qrcu() holds the mutex, no other
	synchronize_qrcu() can manipulate the counters until the reader
	is done.  A repeat of the race called out in #4 above therefore
	cannot happen until after the reader is done, in which case it
	is safe for the first synchronize_qrcu() to proceed.

Therefore, two summations of the counter separated by a memory barrier
suffices and the implementation shown below also suffices.

(And, yes, the fastpath -could- check for a sum of zero and exit
immediately, but this would help only in case of a three-way race
between two synchronize_qrcu()s and a qrcu_read_unlock(), would add
another compare, so is not worth it.)
Signed-off-by: default avatarPaul E. McKenney <paulmck@linux.vnet.ibm.com>
Signed-off-by: default avatarIngo Molnar <mingo@elte.hu>
Signed-off-by: default avatarThomas Gleixner <tglx@linutronix.de>
parent 78af08d9
...@@ -27,6 +27,8 @@ ...@@ -27,6 +27,8 @@
#ifndef _LINUX_SRCU_H #ifndef _LINUX_SRCU_H
#define _LINUX_SRCU_H #define _LINUX_SRCU_H
#include <linux/wait.h>
struct srcu_struct_array { struct srcu_struct_array {
int c[2]; int c[2];
}; };
...@@ -50,4 +52,24 @@ void srcu_read_unlock(struct srcu_struct *sp, int idx) __releases(sp); ...@@ -50,4 +52,24 @@ void srcu_read_unlock(struct srcu_struct *sp, int idx) __releases(sp);
void synchronize_srcu(struct srcu_struct *sp); void synchronize_srcu(struct srcu_struct *sp);
long srcu_batches_completed(struct srcu_struct *sp); long srcu_batches_completed(struct srcu_struct *sp);
/*
* fully compatible with srcu, but optimized for writers.
*/
struct qrcu_struct {
int completed;
atomic_t ctr[2];
wait_queue_head_t wq;
struct mutex mutex;
};
int init_qrcu_struct(struct qrcu_struct *qp);
int qrcu_read_lock(struct qrcu_struct *qp);
void qrcu_read_unlock(struct qrcu_struct *qp, int idx);
void synchronize_qrcu(struct qrcu_struct *qp);
static inline void cleanup_qrcu_struct(struct qrcu_struct *qp)
{
}
#endif #endif
...@@ -255,3 +255,89 @@ EXPORT_SYMBOL_GPL(srcu_read_lock); ...@@ -255,3 +255,89 @@ EXPORT_SYMBOL_GPL(srcu_read_lock);
EXPORT_SYMBOL_GPL(srcu_read_unlock); EXPORT_SYMBOL_GPL(srcu_read_unlock);
EXPORT_SYMBOL_GPL(synchronize_srcu); EXPORT_SYMBOL_GPL(synchronize_srcu);
EXPORT_SYMBOL_GPL(srcu_batches_completed); EXPORT_SYMBOL_GPL(srcu_batches_completed);
int init_qrcu_struct(struct qrcu_struct *qp)
{
qp->completed = 0;
atomic_set(qp->ctr + 0, 1);
atomic_set(qp->ctr + 1, 0);
init_waitqueue_head(&qp->wq);
mutex_init(&qp->mutex);
return 0;
}
int qrcu_read_lock(struct qrcu_struct *qp)
{
for (;;) {
int idx = qp->completed & 0x1;
if (likely(atomic_inc_not_zero(qp->ctr + idx)))
return idx;
}
}
void qrcu_read_unlock(struct qrcu_struct *qp, int idx)
{
if (atomic_dec_and_test(qp->ctr + idx))
wake_up(&qp->wq);
}
void synchronize_qrcu(struct qrcu_struct *qp)
{
int idx;
smp_mb(); /* Force preceding change to happen before fastpath check. */
/*
* Fastpath: If the two counters sum to "1" at a given point in
* time, there are no readers. However, it takes two separate
* loads to sample both counters, which won't occur simultaneously.
* So we might race with a counter switch, so that we might see
* ctr[0]==0, then the counter might switch, then we might see
* ctr[1]==1 (unbeknownst to us because there is a reader still
* there). So we do a read memory barrier and recheck. If the
* same race happens again, there must have been a second counter
* switch. This second counter switch could not have happened
* until all preceding readers finished, so if the condition
* is true both times, we may safely proceed.
*
* This relies critically on the atomic increment and atomic
* decrement being seen as executing in order.
*/
if (atomic_read(&qp->ctr[0]) + atomic_read(&qp->ctr[1]) <= 1) {
smp_rmb(); /* Keep two checks independent. */
if (atomic_read(&qp->ctr[0]) + atomic_read(&qp->ctr[1]) <= 1)
goto out;
}
mutex_lock(&qp->mutex);
idx = qp->completed & 0x1;
if (atomic_read(qp->ctr + idx) == 1)
goto out_unlock;
atomic_inc(qp->ctr + (idx ^ 0x1));
/*
* Prevent subsequent decrement from being seen before previous
* increment -- such an inversion could cause the fastpath
* above to falsely conclude that there were no readers. Also,
* reduce the likelihood that qrcu_read_lock() will loop.
*/
smp_mb__after_atomic_inc();
qp->completed++;
atomic_dec(qp->ctr + idx);
__wait_event(qp->wq, !atomic_read(qp->ctr + idx));
out_unlock:
mutex_unlock(&qp->mutex);
out:
smp_mb(); /* force subsequent free after qrcu_read_unlock(). */
}
EXPORT_SYMBOL_GPL(init_qrcu_struct);
EXPORT_SYMBOL_GPL(qrcu_read_lock);
EXPORT_SYMBOL_GPL(qrcu_read_unlock);
EXPORT_SYMBOL_GPL(synchronize_qrcu);
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment