Commit ebfcb47b authored by Lars Ellenberg's avatar Lars Ellenberg Committed by Philipp Reisner

move drbd_free_ee outside of req_lock

fixes recently introduced potential spinlock deadlock.
reduces the size of various critical sections.
Signed-off-by: default avatarPhilipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: default avatarLars Ellenberg <lars.ellenberg@linbit.com>
parent 1cec97ee
......@@ -1443,7 +1443,7 @@ extern void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
extern void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
struct list_head *head);
extern void drbd_set_recv_tcq(struct drbd_conf *mdev, int tcq_enabled);
extern void _drbd_clear_done_ee(struct drbd_conf *mdev);
extern void _drbd_clear_done_ee(struct drbd_conf *mdev, struct list_head *to_be_freed);
/* yes, there is kernel_setsockopt, but only since 2.6.18. we don't need to
* mess with get_fs/set_fs, we know we are KERNEL_DS always. */
......
......@@ -118,7 +118,7 @@ static void maybe_kick_lo(struct drbd_conf *mdev)
drbd_kick_lo(mdev);
}
static void reclaim_net_ee(struct drbd_conf *mdev)
static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
{
struct drbd_epoch_entry *e;
struct list_head *le, *tle;
......@@ -132,17 +132,22 @@ static void reclaim_net_ee(struct drbd_conf *mdev)
e = list_entry(le, struct drbd_epoch_entry, w.list);
if (drbd_bio_has_active_page(e->private_bio))
break;
list_del(le);
drbd_free_ee(mdev, e);
list_move(le, to_be_freed);
}
}
static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
{
LIST_HEAD(reclaimed);
struct drbd_epoch_entry *e, *t;
maybe_kick_lo(mdev);
spin_lock_irq(&mdev->req_lock);
reclaim_net_ee(mdev);
reclaim_net_ee(mdev, &reclaimed);
spin_unlock_irq(&mdev->req_lock);
list_for_each_entry_safe(e, t, &reclaimed, w.list)
drbd_free_ee(mdev, e);
}
/**
......@@ -216,6 +221,37 @@ static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
wake_up(&drbd_pp_wait);
}
static void drbd_pp_free_bio_pages(struct drbd_conf *mdev, struct bio *bio)
{
struct page *p_to_be_freed = NULL;
struct page *page;
struct bio_vec *bvec;
int i;
spin_lock(&drbd_pp_lock);
__bio_for_each_segment(bvec, bio, i, 0) {
if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
set_page_private(bvec->bv_page, (unsigned long)p_to_be_freed);
p_to_be_freed = bvec->bv_page;
} else {
set_page_private(bvec->bv_page, (unsigned long)drbd_pp_pool);
drbd_pp_pool = bvec->bv_page;
drbd_pp_vacant++;
}
}
spin_unlock(&drbd_pp_lock);
atomic_sub(bio->bi_vcnt, &mdev->pp_in_use);
while (p_to_be_freed) {
page = p_to_be_freed;
p_to_be_freed = (struct page *)page_private(page);
set_page_private(page, 0); /* just to be polite */
__free_page(page);
}
wake_up(&drbd_pp_wait);
}
/*
You need to hold the req_lock:
drbd_free_ee()
......@@ -239,11 +275,9 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
{
struct request_queue *q;
struct drbd_epoch_entry *e;
struct bio_vec *bvec;
struct page *page;
struct bio *bio;
unsigned int ds;
int i;
e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
if (!e) {
......@@ -319,9 +353,7 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
return e;
fail2:
__bio_for_each_segment(bvec, bio, i, 0) {
drbd_pp_free(mdev, bvec->bv_page);
}
drbd_pp_free_bio_pages(mdev, bio);
bio_put(bio);
fail1:
mempool_free(e, drbd_ee_mempool);
......@@ -332,39 +364,28 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
{
struct bio *bio = e->private_bio;
struct bio_vec *bvec;
int i;
trace_drbd_ee(mdev, e, "freed");
__bio_for_each_segment(bvec, bio, i, 0) {
drbd_pp_free(mdev, bvec->bv_page);
}
drbd_pp_free_bio_pages(mdev, bio);
bio_put(bio);
D_ASSERT(hlist_unhashed(&e->colision));
mempool_free(e, drbd_ee_mempool);
}
/* currently on module unload only */
int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
{
LIST_HEAD(work_list);
struct drbd_epoch_entry *e, *t;
int count = 0;
struct drbd_epoch_entry *e;
struct list_head *le;
spin_lock_irq(&mdev->req_lock);
while (!list_empty(list)) {
le = list->next;
list_del(le);
e = list_entry(le, struct drbd_epoch_entry, w.list);
list_splice_init(list, &work_list);
spin_unlock_irq(&mdev->req_lock);
list_for_each_entry_safe(e, t, &work_list, w.list) {
drbd_free_ee(mdev, e);
count++;
}
spin_unlock_irq(&mdev->req_lock);
return count;
}
......@@ -381,14 +402,18 @@ int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
static int drbd_process_done_ee(struct drbd_conf *mdev)
{
LIST_HEAD(work_list);
LIST_HEAD(reclaimed);
struct drbd_epoch_entry *e, *t;
int ok = 1;
spin_lock_irq(&mdev->req_lock);
reclaim_net_ee(mdev);
reclaim_net_ee(mdev, &reclaimed);
list_splice_init(&mdev->done_ee, &work_list);
spin_unlock_irq(&mdev->req_lock);
list_for_each_entry_safe(e, t, &reclaimed, w.list)
drbd_free_ee(mdev, e);
/* possible callbacks here:
* e_end_block, and e_end_resync_block, e_send_discard_ack.
* all ignore the last argument.
......@@ -408,19 +433,16 @@ static int drbd_process_done_ee(struct drbd_conf *mdev)
/* clean-up helper for drbd_disconnect */
void _drbd_clear_done_ee(struct drbd_conf *mdev)
void _drbd_clear_done_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
{
struct list_head *le;
struct drbd_epoch_entry *e;
struct drbd_epoch *epoch;
int n = 0;
reclaim_net_ee(mdev);
while (!list_empty(&mdev->done_ee)) {
le = mdev->done_ee.next;
list_del(le);
list_move(le, to_be_freed);
e = list_entry(le, struct drbd_epoch_entry, w.list);
if (mdev->net_conf->wire_protocol == DRBD_PROT_C
|| is_syncer_block_id(e->block_id))
......@@ -437,7 +459,6 @@ void _drbd_clear_done_ee(struct drbd_conf *mdev)
}
drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + EV_CLEANUP);
}
drbd_free_ee(mdev, e);
}
sub_unacked(mdev, n);
......@@ -3515,6 +3536,8 @@ static void drbd_fail_pending_reads(struct drbd_conf *mdev)
static void drbd_disconnect(struct drbd_conf *mdev)
{
LIST_HEAD(to_be_freed);
struct drbd_epoch_entry *e, *t;
struct drbd_work prev_work_done;
enum drbd_fencing_p fp;
union drbd_state os, ns;
......@@ -3537,11 +3560,14 @@ static void drbd_disconnect(struct drbd_conf *mdev)
spin_lock_irq(&mdev->req_lock);
_drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
_drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
_drbd_clear_done_ee(mdev);
_drbd_clear_done_ee(mdev, &to_be_freed);
_drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
reclaim_net_ee(mdev);
reclaim_net_ee(mdev, &to_be_freed);
spin_unlock_irq(&mdev->req_lock);
list_for_each_entry_safe(e, t, &to_be_freed, w.list)
drbd_free_ee(mdev, e);
/* We do not have data structures that would allow us to
* get the rs_pending_cnt down to 0 again.
* * On C_SYNC_TARGET we do not have any data structures describing
......
......@@ -815,6 +815,18 @@ out:
return 1;
}
/* helper */
static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
{
if (drbd_bio_has_active_page(e->private_bio)) {
/* This might happen if sendpage() has not finished */
spin_lock_irq(&mdev->req_lock);
list_add_tail(&e->w.list, &mdev->net_ee);
spin_unlock_irq(&mdev->req_lock);
} else
drbd_free_ee(mdev, e);
}
/**
* w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
* @mdev: DRBD device.
......@@ -846,14 +858,7 @@ int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
dec_unacked(mdev);
spin_lock_irq(&mdev->req_lock);
if (drbd_bio_has_active_page(e->private_bio)) {
/* This might happen if sendpage() has not finished */
list_add_tail(&e->w.list, &mdev->net_ee);
} else {
drbd_free_ee(mdev, e);
}
spin_unlock_irq(&mdev->req_lock);
move_to_net_ee_or_free(mdev, e);
if (unlikely(!ok))
dev_err(DEV, "drbd_send_block() failed\n");
......@@ -907,14 +912,7 @@ int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
dec_unacked(mdev);
spin_lock_irq(&mdev->req_lock);
if (drbd_bio_has_active_page(e->private_bio)) {
/* This might happen if sendpage() has not finished */
list_add_tail(&e->w.list, &mdev->net_ee);
} else {
drbd_free_ee(mdev, e);
}
spin_unlock_irq(&mdev->req_lock);
move_to_net_ee_or_free(mdev, e);
if (unlikely(!ok))
dev_err(DEV, "drbd_send_block() failed\n");
......@@ -974,14 +972,7 @@ int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
kfree(di);
spin_lock_irq(&mdev->req_lock);
if (drbd_bio_has_active_page(e->private_bio)) {
/* This might happen if sendpage() has not finished */
list_add_tail(&e->w.list, &mdev->net_ee);
} else {
drbd_free_ee(mdev, e);
}
spin_unlock_irq(&mdev->req_lock);
move_to_net_ee_or_free(mdev, e);
if (unlikely(!ok))
dev_err(DEV, "drbd_send_block/ack() failed\n");
......@@ -1015,9 +1006,7 @@ int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
}
out:
spin_lock_irq(&mdev->req_lock);
drbd_free_ee(mdev, e);
spin_unlock_irq(&mdev->req_lock);
dec_unacked(mdev);
......@@ -1085,9 +1074,7 @@ int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
spin_lock_irq(&mdev->req_lock);
drbd_free_ee(mdev, e);
spin_unlock_irq(&mdev->req_lock);
if (--mdev->ov_left == 0) {
ov_oos_print(mdev);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment