Commit b4b610f6 authored by Alasdair G Kergon's avatar Alasdair G Kergon Committed by Linus Torvalds

[PATCH] device-mapper snapshot: replace sibling list

The siblings "list" is used unsafely at the moment.

Firstly, only the element on the list being changed gets locked (via the
snapshot lock), not the next and previous elements which have pointers that
are also being changed.

Secondly, if you have two or more snapshots and write to the same chunk a
second time before every snapshot has finished making its private copy of the
data, if you're unlucky, _origin_write() could attempt its list_merge() and
dereference a 'last' pointer to a pending_exception structure that has just
been freed.

Analysis reveals that the list is actually only there for reference counting.
If 5 pending_exceptions are needed in origin_write, then the 5 are joined
together into a 5-element list - without a separate list head because there's
nowhere suitable to store it.  As the pending_exceptions complete, they are
removed from the list one-by-one and any contents of origin_bios get moved
across to one of the remaining pending_exceptions on the list.  Whichever one
is last is detected because list_empty() is then true and the origin_bios get
submitted.

The fix proposed here uses an alternative reference counting mechanism by
choosing one of the pending_exceptions as primary and maintaining an atomic
counter there.
Signed-off-by: default avatarAlasdair G Kergon <agk@redhat.com>
Signed-off-by: default avatarAndrew Morton <akpm@osdl.org>
Signed-off-by: default avatarLinus Torvalds <torvalds@osdl.org>
parent eccf0817
...@@ -54,11 +54,21 @@ struct pending_exception { ...@@ -54,11 +54,21 @@ struct pending_exception {
struct list_head list; struct list_head list;
/* /*
* Other pending_exceptions that are processing this * The primary pending_exception is the one that holds
* chunk. When this list is empty, we know we can * the sibling_count and the list of origin_bios for a
* complete the origins. * group of pending_exceptions. It is always last to get freed.
* These fields get set up when writing to the origin.
*/ */
struct list_head siblings; struct pending_exception *primary_pe;
/*
* Number of pending_exceptions processing this chunk.
* When this drops to zero we must complete the origin bios.
* If incrementing or decrementing this, hold pe->snap->lock for
* the sibling concerned and not pe->primary_pe->snap->lock unless
* they are the same.
*/
atomic_t sibling_count;
/* Pointer back to snapshot context */ /* Pointer back to snapshot context */
struct dm_snapshot *snap; struct dm_snapshot *snap;
...@@ -593,20 +603,15 @@ static void error_bios(struct bio *bio) ...@@ -593,20 +603,15 @@ static void error_bios(struct bio *bio)
static struct bio *__flush_bios(struct pending_exception *pe) static struct bio *__flush_bios(struct pending_exception *pe)
{ {
struct pending_exception *sibling; /*
* If this pe is involved in a write to the origin and
if (list_empty(&pe->siblings)) * it is the last sibling to complete then release
return bio_list_get(&pe->origin_bios); * the bios for the original write to the origin.
sibling = list_entry(pe->siblings.next,
struct pending_exception, siblings);
list_del(&pe->siblings);
/* This is fine as long as kcopyd is single-threaded. If kcopyd
* becomes multi-threaded, we'll need some locking here.
*/ */
bio_list_merge(&sibling->origin_bios, &pe->origin_bios);
if (pe->primary_pe &&
atomic_dec_and_test(&pe->primary_pe->sibling_count))
return bio_list_get(&pe->primary_pe->origin_bios);
return NULL; return NULL;
} }
...@@ -614,6 +619,7 @@ static struct bio *__flush_bios(struct pending_exception *pe) ...@@ -614,6 +619,7 @@ static struct bio *__flush_bios(struct pending_exception *pe)
static void pending_complete(struct pending_exception *pe, int success) static void pending_complete(struct pending_exception *pe, int success)
{ {
struct exception *e; struct exception *e;
struct pending_exception *primary_pe;
struct dm_snapshot *s = pe->snap; struct dm_snapshot *s = pe->snap;
struct bio *flush = NULL; struct bio *flush = NULL;
...@@ -662,7 +668,20 @@ static void pending_complete(struct pending_exception *pe, int success) ...@@ -662,7 +668,20 @@ static void pending_complete(struct pending_exception *pe, int success)
} }
out: out:
free_pending_exception(pe); primary_pe = pe->primary_pe;
/*
* Free the pe if it's not linked to an origin write or if
* it's not itself a primary pe.
*/
if (!primary_pe || primary_pe != pe)
free_pending_exception(pe);
/*
* Free the primary pe if nothing references it.
*/
if (primary_pe && !atomic_read(&primary_pe->sibling_count))
free_pending_exception(primary_pe);
if (flush) if (flush)
flush_bios(flush); flush_bios(flush);
...@@ -757,7 +776,8 @@ __find_pending_exception(struct dm_snapshot *s, struct bio *bio) ...@@ -757,7 +776,8 @@ __find_pending_exception(struct dm_snapshot *s, struct bio *bio)
pe->e.old_chunk = chunk; pe->e.old_chunk = chunk;
bio_list_init(&pe->origin_bios); bio_list_init(&pe->origin_bios);
bio_list_init(&pe->snapshot_bios); bio_list_init(&pe->snapshot_bios);
INIT_LIST_HEAD(&pe->siblings); pe->primary_pe = NULL;
atomic_set(&pe->sibling_count, 1);
pe->snap = s; pe->snap = s;
pe->started = 0; pe->started = 0;
...@@ -916,26 +936,12 @@ static int snapshot_status(struct dm_target *ti, status_type_t type, ...@@ -916,26 +936,12 @@ static int snapshot_status(struct dm_target *ti, status_type_t type,
/*----------------------------------------------------------------- /*-----------------------------------------------------------------
* Origin methods * Origin methods
*---------------------------------------------------------------*/ *---------------------------------------------------------------*/
static void list_merge(struct list_head *l1, struct list_head *l2)
{
struct list_head *l1_n, *l2_p;
l1_n = l1->next;
l2_p = l2->prev;
l1->next = l2;
l2->prev = l1;
l2_p->next = l1_n;
l1_n->prev = l2_p;
}
static int __origin_write(struct list_head *snapshots, struct bio *bio) static int __origin_write(struct list_head *snapshots, struct bio *bio)
{ {
int r = 1, first = 1; int r = 1, first = 0;
struct dm_snapshot *snap; struct dm_snapshot *snap;
struct exception *e; struct exception *e;
struct pending_exception *pe, *next_pe, *last = NULL; struct pending_exception *pe, *next_pe, *primary_pe = NULL;
chunk_t chunk; chunk_t chunk;
LIST_HEAD(pe_queue); LIST_HEAD(pe_queue);
...@@ -962,6 +968,9 @@ static int __origin_write(struct list_head *snapshots, struct bio *bio) ...@@ -962,6 +968,9 @@ static int __origin_write(struct list_head *snapshots, struct bio *bio)
* Check exception table to see if block * Check exception table to see if block
* is already remapped in this snapshot * is already remapped in this snapshot
* and trigger an exception if not. * and trigger an exception if not.
*
* sibling_count is initialised to 1 so pending_complete()
* won't destroy the primary_pe while we're inside this loop.
*/ */
e = lookup_exception(&snap->complete, chunk); e = lookup_exception(&snap->complete, chunk);
if (!e) { if (!e) {
...@@ -971,31 +980,60 @@ static int __origin_write(struct list_head *snapshots, struct bio *bio) ...@@ -971,31 +980,60 @@ static int __origin_write(struct list_head *snapshots, struct bio *bio)
snap->valid = 0; snap->valid = 0;
} else { } else {
if (first) { if (!primary_pe) {
bio_list_add(&pe->origin_bios, bio); /*
* Either every pe here has same
* primary_pe or none has one yet.
*/
if (pe->primary_pe)
primary_pe = pe->primary_pe;
else {
primary_pe = pe;
first = 1;
}
bio_list_add(&primary_pe->origin_bios,
bio);
r = 0; r = 0;
first = 0;
} }
if (last && list_empty(&pe->siblings)) if (!pe->primary_pe) {
list_merge(&pe->siblings, atomic_inc(&primary_pe->sibling_count);
&last->siblings); pe->primary_pe = primary_pe;
}
if (!pe->started) { if (!pe->started) {
pe->started = 1; pe->started = 1;
list_add_tail(&pe->list, &pe_queue); list_add_tail(&pe->list, &pe_queue);
} }
last = pe;
} }
} }
up_write(&snap->lock); up_write(&snap->lock);
} }
if (!primary_pe)
goto out;
/*
* If this is the first time we're processing this chunk and
* sibling_count is now 1 it means all the pending exceptions
* got completed while we were in the loop above, so it falls to
* us here to remove the primary_pe and submit any origin_bios.
*/
if (first && atomic_dec_and_test(&primary_pe->sibling_count)) {
flush_bios(bio_list_get(&primary_pe->origin_bios));
free_pending_exception(primary_pe);
/* If we got here, pe_queue is necessarily empty. */
goto out;
}
/* /*
* Now that we have a complete pe list we can start the copying. * Now that we have a complete pe list we can start the copying.
*/ */
list_for_each_entry_safe(pe, next_pe, &pe_queue, list) list_for_each_entry_safe(pe, next_pe, &pe_queue, list)
start_copy(pe); start_copy(pe);
out:
return r; return r;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment