Commit 466d1a45 authored by Kurt Hackel's avatar Kurt Hackel Committed by Mark Fasheh

ocfs2: make dlm recovery finalization 2 stage

Makes it easier for the recovery process to deal with node death.
Signed-off-by: default avatarKurt Hackel <kurt.hackel@oracle.com>
Signed-off-by: default avatarMark Fasheh <mark.fasheh@oracle.com>
parent 69d72b06
...@@ -71,7 +71,8 @@ static inline int dlm_is_recovery_lock(const char *lock_name, int name_len) ...@@ -71,7 +71,8 @@ static inline int dlm_is_recovery_lock(const char *lock_name, int name_len)
return 0; return 0;
} }
#define DLM_RECO_STATE_ACTIVE 0x0001 #define DLM_RECO_STATE_ACTIVE 0x0001
#define DLM_RECO_STATE_FINALIZE 0x0002
struct dlm_recovery_ctxt struct dlm_recovery_ctxt
{ {
...@@ -633,7 +634,8 @@ struct dlm_finalize_reco ...@@ -633,7 +634,8 @@ struct dlm_finalize_reco
{ {
u8 node_idx; u8 node_idx;
u8 dead_node; u8 dead_node;
__be16 pad1; u8 flags;
u8 pad1;
__be32 pad2; __be32 pad2;
}; };
......
...@@ -134,12 +134,18 @@ static inline void dlm_set_reco_master(struct dlm_ctxt *dlm, ...@@ -134,12 +134,18 @@ static inline void dlm_set_reco_master(struct dlm_ctxt *dlm,
dlm->reco.new_master = master; dlm->reco.new_master = master;
} }
static inline void dlm_reset_recovery(struct dlm_ctxt *dlm) static inline void __dlm_reset_recovery(struct dlm_ctxt *dlm)
{ {
spin_lock(&dlm->spinlock); assert_spin_locked(&dlm->spinlock);
clear_bit(dlm->reco.dead_node, dlm->recovery_map); clear_bit(dlm->reco.dead_node, dlm->recovery_map);
dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM); dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM); dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM);
}
static inline void dlm_reset_recovery(struct dlm_ctxt *dlm)
{
spin_lock(&dlm->spinlock);
__dlm_reset_recovery(dlm);
spin_unlock(&dlm->spinlock); spin_unlock(&dlm->spinlock);
} }
...@@ -2074,6 +2080,20 @@ static void __dlm_hb_node_down(struct dlm_ctxt *dlm, int idx) ...@@ -2074,6 +2080,20 @@ static void __dlm_hb_node_down(struct dlm_ctxt *dlm, int idx)
{ {
assert_spin_locked(&dlm->spinlock); assert_spin_locked(&dlm->spinlock);
if (dlm->reco.new_master == idx) {
mlog(0, "%s: recovery master %d just died\n",
dlm->name, idx);
if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
/* finalize1 was reached, so it is safe to clear
* the new_master and dead_node. that recovery
* is complete. */
mlog(0, "%s: dead master %d had reached "
"finalize1 state, clearing\n", dlm->name, idx);
dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE;
__dlm_reset_recovery(dlm);
}
}
/* check to see if the node is already considered dead */ /* check to see if the node is already considered dead */
if (!test_bit(idx, dlm->live_nodes_map)) { if (!test_bit(idx, dlm->live_nodes_map)) {
mlog(0, "for domain %s, node %d is already dead. " mlog(0, "for domain %s, node %d is already dead. "
...@@ -2364,6 +2384,14 @@ retry: ...@@ -2364,6 +2384,14 @@ retry:
* another ENOMEM */ * another ENOMEM */
msleep(100); msleep(100);
goto retry; goto retry;
} else if (ret == EAGAIN) {
mlog(0, "%s: trying to start recovery of node "
"%u, but node %u is waiting for last recovery "
"to complete, backoff for a bit\n", dlm->name,
dead_node, nodenum);
/* TODO Look into replacing msleep with cond_resched() */
msleep(100);
goto retry;
} }
} }
...@@ -2379,6 +2407,17 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data) ...@@ -2379,6 +2407,17 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
if (!dlm_grab(dlm)) if (!dlm_grab(dlm))
return 0; return 0;
spin_lock(&dlm->spinlock);
if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
mlog(0, "%s: node %u wants to recover node %u (%u:%u) "
"but this node is in finalize state, waiting on finalize2\n",
dlm->name, br->node_idx, br->dead_node,
dlm->reco.dead_node, dlm->reco.new_master);
spin_unlock(&dlm->spinlock);
return EAGAIN;
}
spin_unlock(&dlm->spinlock);
mlog(0, "%s: node %u wants to recover node %u (%u:%u)\n", mlog(0, "%s: node %u wants to recover node %u (%u:%u)\n",
dlm->name, br->node_idx, br->dead_node, dlm->name, br->node_idx, br->dead_node,
dlm->reco.dead_node, dlm->reco.new_master); dlm->reco.dead_node, dlm->reco.new_master);
...@@ -2432,6 +2471,7 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data) ...@@ -2432,6 +2471,7 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
return 0; return 0;
} }
#define DLM_FINALIZE_STAGE2 0x01
static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm) static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm)
{ {
int ret = 0; int ret = 0;
...@@ -2439,25 +2479,31 @@ static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm) ...@@ -2439,25 +2479,31 @@ static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm)
struct dlm_node_iter iter; struct dlm_node_iter iter;
int nodenum; int nodenum;
int status; int status;
int stage = 1;
mlog(0, "finishing recovery for node %s:%u\n", mlog(0, "finishing recovery for node %s:%u, "
dlm->name, dlm->reco.dead_node); "stage %d\n", dlm->name, dlm->reco.dead_node, stage);
spin_lock(&dlm->spinlock); spin_lock(&dlm->spinlock);
dlm_node_iter_init(dlm->domain_map, &iter); dlm_node_iter_init(dlm->domain_map, &iter);
spin_unlock(&dlm->spinlock); spin_unlock(&dlm->spinlock);
stage2:
memset(&fr, 0, sizeof(fr)); memset(&fr, 0, sizeof(fr));
fr.node_idx = dlm->node_num; fr.node_idx = dlm->node_num;
fr.dead_node = dlm->reco.dead_node; fr.dead_node = dlm->reco.dead_node;
if (stage == 2)
fr.flags |= DLM_FINALIZE_STAGE2;
while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
if (nodenum == dlm->node_num) if (nodenum == dlm->node_num)
continue; continue;
ret = o2net_send_message(DLM_FINALIZE_RECO_MSG, dlm->key, ret = o2net_send_message(DLM_FINALIZE_RECO_MSG, dlm->key,
&fr, sizeof(fr), nodenum, &status); &fr, sizeof(fr), nodenum, &status);
if (ret >= 0) { if (ret >= 0)
ret = status; ret = status;
if (ret < 0) {
mlog_errno(ret);
if (dlm_is_host_down(ret)) { if (dlm_is_host_down(ret)) {
/* this has no effect on this recovery /* this has no effect on this recovery
* session, so set the status to zero to * session, so set the status to zero to
...@@ -2466,12 +2512,15 @@ static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm) ...@@ -2466,12 +2512,15 @@ static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm)
"node finished recovery.\n", nodenum); "node finished recovery.\n", nodenum);
ret = 0; ret = 0;
} }
}
if (ret < 0) {
mlog_errno(ret);
break; break;
} }
} }
if (stage == 1) {
/* reset the node_iter back to the top and send finalize2 */
iter.curnode = -1;
stage = 2;
goto stage2;
}
return ret; return ret;
} }
...@@ -2480,15 +2529,19 @@ int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data) ...@@ -2480,15 +2529,19 @@ int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data)
{ {
struct dlm_ctxt *dlm = data; struct dlm_ctxt *dlm = data;
struct dlm_finalize_reco *fr = (struct dlm_finalize_reco *)msg->buf; struct dlm_finalize_reco *fr = (struct dlm_finalize_reco *)msg->buf;
int stage = 1;
/* ok to return 0, domain has gone away */ /* ok to return 0, domain has gone away */
if (!dlm_grab(dlm)) if (!dlm_grab(dlm))
return 0; return 0;
mlog(0, "%s: node %u finalizing recovery of node %u (%u:%u)\n", if (fr->flags & DLM_FINALIZE_STAGE2)
dlm->name, fr->node_idx, fr->dead_node, stage = 2;
dlm->reco.dead_node, dlm->reco.new_master);
mlog(0, "%s: node %u finalizing recovery stage%d of "
"node %u (%u:%u)\n", dlm->name, fr->node_idx, stage,
fr->dead_node, dlm->reco.dead_node, dlm->reco.new_master);
spin_lock(&dlm->spinlock); spin_lock(&dlm->spinlock);
if (dlm->reco.new_master != fr->node_idx) { if (dlm->reco.new_master != fr->node_idx) {
...@@ -2504,13 +2557,38 @@ int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data) ...@@ -2504,13 +2557,38 @@ int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data)
BUG(); BUG();
} }
dlm_finish_local_lockres_recovery(dlm, fr->dead_node, fr->node_idx); switch (stage) {
case 1:
spin_unlock(&dlm->spinlock); dlm_finish_local_lockres_recovery(dlm, fr->dead_node, fr->node_idx);
if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
dlm_reset_recovery(dlm); mlog(ML_ERROR, "%s: received finalize1 from "
"new master %u for dead node %u, but "
"this node has already received it!\n",
dlm->name, fr->node_idx, fr->dead_node);
dlm_print_reco_node_status(dlm);
BUG();
}
dlm->reco.state |= DLM_RECO_STATE_FINALIZE;
spin_unlock(&dlm->spinlock);
break;
case 2:
if (!(dlm->reco.state & DLM_RECO_STATE_FINALIZE)) {
mlog(ML_ERROR, "%s: received finalize2 from "
"new master %u for dead node %u, but "
"this node did not have finalize1!\n",
dlm->name, fr->node_idx, fr->dead_node);
dlm_print_reco_node_status(dlm);
BUG();
}
dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE;
spin_unlock(&dlm->spinlock);
dlm_reset_recovery(dlm);
dlm_kick_recovery_thread(dlm);
break;
default:
BUG();
}
dlm_kick_recovery_thread(dlm);
mlog(0, "%s: recovery done, reco master was %u, dead now %u, master now %u\n", mlog(0, "%s: recovery done, reco master was %u, dead now %u, master now %u\n",
dlm->name, fr->node_idx, dlm->reco.dead_node, dlm->reco.new_master); dlm->name, fr->node_idx, dlm->reco.dead_node, dlm->reco.new_master);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment