Commit fa743953 authored by Steven Rostedt's avatar Steven Rostedt Committed by Steven Rostedt

ring-buffer: use commit counters for commit pointer accounting

The ring buffer is made up of three sets of pointers.

The head page pointer, which points to the next page for the reader to
get.

The commit pointer and commit index, which points to the page and index
of the last committed write respectively.

The tail pointer and tail index, which points to the page and the index
of the last reserved data respectively (non committed).

The commit pointer is only moved forward by the outer most writer.
If a nested writer comes in, it will not move the pointer forward.

The current implementation has a flaw. It assumes that the outer most
writer successfully reserved data. There's a small race window where
the outer most writer could find the tail pointer, but a nested
writer could come in (via interrupt) and move the tail forward, and
even the commit forward.

The outer writer would not realized the commit moved forward and the
accounting will break.

This patch changes the design to use counters in the per cpu buffers
to keep track of commits. The counters are incremented at the start
of the commit, and decremented at the end. If the end commit counter
is 1, then it moves the commit pointers. A loop is made to check for
races between checking and moving the commit pointers. Only the outer
commit should move the pointers anyway.

The test of knowing if a reserve is equal to the last commit update
is still needed to know for time keeping. The time code is much less
racey than the commit updates.

This change not only solves the mentioned race, but also makes the
code simpler.

[ Impact: fix commit race and simplify code ]
Signed-off-by: default avatarSteven Rostedt <rostedt@goodmis.org>
parent 263294f3
...@@ -415,6 +415,8 @@ struct ring_buffer_per_cpu { ...@@ -415,6 +415,8 @@ struct ring_buffer_per_cpu {
unsigned long overrun; unsigned long overrun;
unsigned long read; unsigned long read;
local_t entries; local_t entries;
local_t committing;
local_t commits;
u64 write_stamp; u64 write_stamp;
u64 read_stamp; u64 read_stamp;
atomic_t record_disabled; atomic_t record_disabled;
...@@ -1015,7 +1017,7 @@ rb_event_index(struct ring_buffer_event *event) ...@@ -1015,7 +1017,7 @@ rb_event_index(struct ring_buffer_event *event)
} }
static inline int static inline int
rb_is_commit(struct ring_buffer_per_cpu *cpu_buffer, rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
struct ring_buffer_event *event) struct ring_buffer_event *event)
{ {
unsigned long addr = (unsigned long)event; unsigned long addr = (unsigned long)event;
...@@ -1028,31 +1030,6 @@ rb_is_commit(struct ring_buffer_per_cpu *cpu_buffer, ...@@ -1028,31 +1030,6 @@ rb_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
rb_commit_index(cpu_buffer) == index; rb_commit_index(cpu_buffer) == index;
} }
static void
rb_set_commit_event(struct ring_buffer_per_cpu *cpu_buffer,
struct ring_buffer_event *event)
{
unsigned long addr = (unsigned long)event;
unsigned long index;
index = rb_event_index(event);
addr &= PAGE_MASK;
while (cpu_buffer->commit_page->page != (void *)addr) {
if (RB_WARN_ON(cpu_buffer,
cpu_buffer->commit_page == cpu_buffer->tail_page))
return;
cpu_buffer->commit_page->page->commit =
cpu_buffer->commit_page->write;
rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
cpu_buffer->write_stamp =
cpu_buffer->commit_page->page->time_stamp;
}
/* Now set the commit to the event's index */
local_set(&cpu_buffer->commit_page->page->commit, index);
}
static void static void
rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
{ {
...@@ -1319,15 +1296,6 @@ rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, ...@@ -1319,15 +1296,6 @@ rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
rb_reset_tail(cpu_buffer, tail_page, tail, length); rb_reset_tail(cpu_buffer, tail_page, tail, length);
/*
* If this was a commit entry that failed,
* increment that too
*/
if (tail_page == cpu_buffer->commit_page &&
tail == rb_commit_index(cpu_buffer)) {
rb_set_commit_to_write(cpu_buffer);
}
__raw_spin_unlock(&cpu_buffer->lock); __raw_spin_unlock(&cpu_buffer->lock);
local_irq_restore(flags); local_irq_restore(flags);
...@@ -1377,11 +1345,11 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, ...@@ -1377,11 +1345,11 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
local_inc(&tail_page->entries); local_inc(&tail_page->entries);
/* /*
* If this is a commit and the tail is zero, then update * If this is the first commit on the page, then update
* this page's time stamp. * its timestamp.
*/ */
if (!tail && rb_is_commit(cpu_buffer, event)) if (!tail)
cpu_buffer->commit_page->page->time_stamp = *ts; tail_page->page->time_stamp = *ts;
return event; return event;
} }
...@@ -1450,16 +1418,16 @@ rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer, ...@@ -1450,16 +1418,16 @@ rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
return -EAGAIN; return -EAGAIN;
/* Only a commited time event can update the write stamp */ /* Only a commited time event can update the write stamp */
if (rb_is_commit(cpu_buffer, event)) { if (rb_event_is_commit(cpu_buffer, event)) {
/* /*
* If this is the first on the page, then we need to * If this is the first on the page, then it was
* update the page itself, and just put in a zero. * updated with the page itself. Try to discard it
* and if we can't just make it zero.
*/ */
if (rb_event_index(event)) { if (rb_event_index(event)) {
event->time_delta = *delta & TS_MASK; event->time_delta = *delta & TS_MASK;
event->array[0] = *delta >> TS_SHIFT; event->array[0] = *delta >> TS_SHIFT;
} else { } else {
cpu_buffer->commit_page->page->time_stamp = *ts;
/* try to discard, since we do not need this */ /* try to discard, since we do not need this */
if (!rb_try_to_discard(cpu_buffer, event)) { if (!rb_try_to_discard(cpu_buffer, event)) {
/* nope, just zero it */ /* nope, just zero it */
...@@ -1485,6 +1453,44 @@ rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer, ...@@ -1485,6 +1453,44 @@ rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
return ret; return ret;
} }
static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer)
{
local_inc(&cpu_buffer->committing);
local_inc(&cpu_buffer->commits);
}
static void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer)
{
unsigned long commits;
if (RB_WARN_ON(cpu_buffer,
!local_read(&cpu_buffer->committing)))
return;
again:
commits = local_read(&cpu_buffer->commits);
/* synchronize with interrupts */
barrier();
if (local_read(&cpu_buffer->committing) == 1)
rb_set_commit_to_write(cpu_buffer);
local_dec(&cpu_buffer->committing);
/* synchronize with interrupts */
barrier();
/*
* Need to account for interrupts coming in between the
* updating of the commit page and the clearing of the
* committing counter.
*/
if (unlikely(local_read(&cpu_buffer->commits) != commits) &&
!local_read(&cpu_buffer->committing)) {
local_inc(&cpu_buffer->committing);
goto again;
}
}
static struct ring_buffer_event * static struct ring_buffer_event *
rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer, rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
unsigned long length) unsigned long length)
...@@ -1494,6 +1500,8 @@ rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer, ...@@ -1494,6 +1500,8 @@ rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
int commit = 0; int commit = 0;
int nr_loops = 0; int nr_loops = 0;
rb_start_commit(cpu_buffer);
length = rb_calculate_event_length(length); length = rb_calculate_event_length(length);
again: again:
/* /*
...@@ -1506,7 +1514,7 @@ rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer, ...@@ -1506,7 +1514,7 @@ rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
* Bail! * Bail!
*/ */
if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
return NULL; goto out_fail;
ts = rb_time_stamp(cpu_buffer->buffer, cpu_buffer->cpu); ts = rb_time_stamp(cpu_buffer->buffer, cpu_buffer->cpu);
...@@ -1537,7 +1545,7 @@ rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer, ...@@ -1537,7 +1545,7 @@ rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
commit = rb_add_time_stamp(cpu_buffer, &ts, &delta); commit = rb_add_time_stamp(cpu_buffer, &ts, &delta);
if (commit == -EBUSY) if (commit == -EBUSY)
return NULL; goto out_fail;
if (commit == -EAGAIN) if (commit == -EAGAIN)
goto again; goto again;
...@@ -1551,28 +1559,19 @@ rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer, ...@@ -1551,28 +1559,19 @@ rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
if (unlikely(PTR_ERR(event) == -EAGAIN)) if (unlikely(PTR_ERR(event) == -EAGAIN))
goto again; goto again;
if (!event) { if (!event)
if (unlikely(commit)) goto out_fail;
/*
* Ouch! We needed a timestamp and it was commited. But
* we didn't get our event reserved.
*/
rb_set_commit_to_write(cpu_buffer);
return NULL;
}
/* if (!rb_event_is_commit(cpu_buffer, event))
* If the timestamp was commited, make the commit our entry
* now so that we will update it when needed.
*/
if (unlikely(commit))
rb_set_commit_event(cpu_buffer, event);
else if (!rb_is_commit(cpu_buffer, event))
delta = 0; delta = 0;
event->time_delta = delta; event->time_delta = delta;
return event; return event;
out_fail:
rb_end_commit(cpu_buffer);
return NULL;
} }
#define TRACE_RECURSIVE_DEPTH 16 #define TRACE_RECURSIVE_DEPTH 16
...@@ -1682,13 +1681,14 @@ static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer, ...@@ -1682,13 +1681,14 @@ static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
{ {
local_inc(&cpu_buffer->entries); local_inc(&cpu_buffer->entries);
/* Only process further if we own the commit */ /*
if (!rb_is_commit(cpu_buffer, event)) * The event first in the commit queue updates the
return; * time stamp.
*/
if (rb_event_is_commit(cpu_buffer, event))
cpu_buffer->write_stamp += event->time_delta; cpu_buffer->write_stamp += event->time_delta;
rb_set_commit_to_write(cpu_buffer); rb_end_commit(cpu_buffer);
} }
/** /**
...@@ -1777,15 +1777,15 @@ void ring_buffer_discard_commit(struct ring_buffer *buffer, ...@@ -1777,15 +1777,15 @@ void ring_buffer_discard_commit(struct ring_buffer *buffer,
/* The event is discarded regardless */ /* The event is discarded regardless */
rb_event_discard(event); rb_event_discard(event);
cpu = smp_processor_id();
cpu_buffer = buffer->buffers[cpu];
/* /*
* This must only be called if the event has not been * This must only be called if the event has not been
* committed yet. Thus we can assume that preemption * committed yet. Thus we can assume that preemption
* is still disabled. * is still disabled.
*/ */
RB_WARN_ON(buffer, preemptible()); RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing));
cpu = smp_processor_id();
cpu_buffer = buffer->buffers[cpu];
if (!rb_try_to_discard(cpu_buffer, event)) if (!rb_try_to_discard(cpu_buffer, event))
goto out; goto out;
...@@ -1796,13 +1796,7 @@ void ring_buffer_discard_commit(struct ring_buffer *buffer, ...@@ -1796,13 +1796,7 @@ void ring_buffer_discard_commit(struct ring_buffer *buffer,
*/ */
local_inc(&cpu_buffer->entries); local_inc(&cpu_buffer->entries);
out: out:
/* rb_end_commit(cpu_buffer);
* If a write came in and pushed the tail page
* we still need to update the commit pointer
* if we were the commit.
*/
if (rb_is_commit(cpu_buffer, event))
rb_set_commit_to_write(cpu_buffer);
trace_recursive_unlock(); trace_recursive_unlock();
...@@ -2720,6 +2714,8 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) ...@@ -2720,6 +2714,8 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
cpu_buffer->overrun = 0; cpu_buffer->overrun = 0;
cpu_buffer->read = 0; cpu_buffer->read = 0;
local_set(&cpu_buffer->entries, 0); local_set(&cpu_buffer->entries, 0);
local_set(&cpu_buffer->committing, 0);
local_set(&cpu_buffer->commits, 0);
cpu_buffer->write_stamp = 0; cpu_buffer->write_stamp = 0;
cpu_buffer->read_stamp = 0; cpu_buffer->read_stamp = 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment