Commit 9140db04 authored by Srinivas Eeda's avatar Srinivas Eeda Committed by Mark Fasheh

ocfs2: recover orphans in offline slots during recovery and mount

During recovery, a node recovers orphans in it's slot and the dead node(s). But
if the dead nodes were holding orphans in offline slots, they will be left
unrecovered.

If the dead node is the last one to die and is holding orphans in other slots
and is the first one to mount, then it only recovers it's own slot, which
leaves orphans in offline slots.

This patch queues complete_recovery to clean orphans for all offline slots
during mount and node recovery.
Signed-off-by: default avatarSrinivas Eeda <srinivas.eeda@oracle.com>
Acked-by: default avatarJoel Becker <joel.becker@oracle.com>
Signed-off-by: default avatarMark Fasheh <mfasheh@suse.com>
parent 1fca3a05
...@@ -65,6 +65,11 @@ static int ocfs2_trylock_journal(struct ocfs2_super *osb, ...@@ -65,6 +65,11 @@ static int ocfs2_trylock_journal(struct ocfs2_super *osb,
static int ocfs2_recover_orphans(struct ocfs2_super *osb, static int ocfs2_recover_orphans(struct ocfs2_super *osb,
int slot); int slot);
static int ocfs2_commit_thread(void *arg); static int ocfs2_commit_thread(void *arg);
static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
int slot_num,
struct ocfs2_dinode *la_dinode,
struct ocfs2_dinode *tl_dinode,
struct ocfs2_quota_recovery *qrec);
static inline int ocfs2_wait_on_mount(struct ocfs2_super *osb) static inline int ocfs2_wait_on_mount(struct ocfs2_super *osb)
{ {
...@@ -76,6 +81,97 @@ static inline int ocfs2_wait_on_quotas(struct ocfs2_super *osb) ...@@ -76,6 +81,97 @@ static inline int ocfs2_wait_on_quotas(struct ocfs2_super *osb)
return __ocfs2_wait_on_mount(osb, 1); return __ocfs2_wait_on_mount(osb, 1);
} }
/*
* This replay_map is to track online/offline slots, so we could recover
* offline slots during recovery and mount
*/
enum ocfs2_replay_state {
REPLAY_UNNEEDED = 0, /* Replay is not needed, so ignore this map */
REPLAY_NEEDED, /* Replay slots marked in rm_replay_slots */
REPLAY_DONE /* Replay was already queued */
};
struct ocfs2_replay_map {
unsigned int rm_slots;
enum ocfs2_replay_state rm_state;
unsigned char rm_replay_slots[0];
};
void ocfs2_replay_map_set_state(struct ocfs2_super *osb, int state)
{
if (!osb->replay_map)
return;
/* If we've already queued the replay, we don't have any more to do */
if (osb->replay_map->rm_state == REPLAY_DONE)
return;
osb->replay_map->rm_state = state;
}
int ocfs2_compute_replay_slots(struct ocfs2_super *osb)
{
struct ocfs2_replay_map *replay_map;
int i, node_num;
/* If replay map is already set, we don't do it again */
if (osb->replay_map)
return 0;
replay_map = kzalloc(sizeof(struct ocfs2_replay_map) +
(osb->max_slots * sizeof(char)), GFP_KERNEL);
if (!replay_map) {
mlog_errno(-ENOMEM);
return -ENOMEM;
}
spin_lock(&osb->osb_lock);
replay_map->rm_slots = osb->max_slots;
replay_map->rm_state = REPLAY_UNNEEDED;
/* set rm_replay_slots for offline slot(s) */
for (i = 0; i < replay_map->rm_slots; i++) {
if (ocfs2_slot_to_node_num_locked(osb, i, &node_num) == -ENOENT)
replay_map->rm_replay_slots[i] = 1;
}
osb->replay_map = replay_map;
spin_unlock(&osb->osb_lock);
return 0;
}
void ocfs2_queue_replay_slots(struct ocfs2_super *osb)
{
struct ocfs2_replay_map *replay_map = osb->replay_map;
int i;
if (!replay_map)
return;
if (replay_map->rm_state != REPLAY_NEEDED)
return;
for (i = 0; i < replay_map->rm_slots; i++)
if (replay_map->rm_replay_slots[i])
ocfs2_queue_recovery_completion(osb->journal, i, NULL,
NULL, NULL);
replay_map->rm_state = REPLAY_DONE;
}
void ocfs2_free_replay_slots(struct ocfs2_super *osb)
{
struct ocfs2_replay_map *replay_map = osb->replay_map;
if (!osb->replay_map)
return;
kfree(replay_map);
osb->replay_map = NULL;
}
int ocfs2_recovery_init(struct ocfs2_super *osb) int ocfs2_recovery_init(struct ocfs2_super *osb)
{ {
struct ocfs2_recovery_map *rm; struct ocfs2_recovery_map *rm;
...@@ -1194,24 +1290,24 @@ static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal, ...@@ -1194,24 +1290,24 @@ static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
} }
/* Called by the mount code to queue recovery the last part of /* Called by the mount code to queue recovery the last part of
* recovery for it's own slot. */ * recovery for it's own and offline slot(s). */
void ocfs2_complete_mount_recovery(struct ocfs2_super *osb) void ocfs2_complete_mount_recovery(struct ocfs2_super *osb)
{ {
struct ocfs2_journal *journal = osb->journal; struct ocfs2_journal *journal = osb->journal;
if (osb->dirty) { /* No need to queue up our truncate_log as regular cleanup will catch
/* No need to queue up our truncate_log as regular * that */
* cleanup will catch that. */ ocfs2_queue_recovery_completion(journal, osb->slot_num,
ocfs2_queue_recovery_completion(journal, osb->local_alloc_copy, NULL, NULL);
osb->slot_num,
osb->local_alloc_copy,
NULL,
NULL);
ocfs2_schedule_truncate_log_flush(osb, 0); ocfs2_schedule_truncate_log_flush(osb, 0);
osb->local_alloc_copy = NULL; osb->local_alloc_copy = NULL;
osb->dirty = 0; osb->dirty = 0;
}
/* queue to recover orphan slots for all offline slots */
ocfs2_replay_map_set_state(osb, REPLAY_NEEDED);
ocfs2_queue_replay_slots(osb);
ocfs2_free_replay_slots(osb);
} }
void ocfs2_complete_quota_recovery(struct ocfs2_super *osb) void ocfs2_complete_quota_recovery(struct ocfs2_super *osb)
...@@ -1254,6 +1350,14 @@ restart: ...@@ -1254,6 +1350,14 @@ restart:
goto bail; goto bail;
} }
status = ocfs2_compute_replay_slots(osb);
if (status < 0)
mlog_errno(status);
/* queue recovery for our own slot */
ocfs2_queue_recovery_completion(osb->journal, osb->slot_num, NULL,
NULL, NULL);
spin_lock(&osb->osb_lock); spin_lock(&osb->osb_lock);
while (rm->rm_used) { while (rm->rm_used) {
/* It's always safe to remove entry zero, as we won't /* It's always safe to remove entry zero, as we won't
...@@ -1319,11 +1423,8 @@ skip_recovery: ...@@ -1319,11 +1423,8 @@ skip_recovery:
ocfs2_super_unlock(osb, 1); ocfs2_super_unlock(osb, 1);
/* We always run recovery on our own orphan dir - the dead /* queue recovery for offline slots */
* node(s) may have disallowd a previos inode delete. Re-processing ocfs2_queue_replay_slots(osb);
* is therefore required. */
ocfs2_queue_recovery_completion(osb->journal, osb->slot_num, NULL,
NULL, NULL);
bail: bail:
mutex_lock(&osb->recovery_lock); mutex_lock(&osb->recovery_lock);
...@@ -1332,6 +1433,7 @@ bail: ...@@ -1332,6 +1433,7 @@ bail:
goto restart; goto restart;
} }
ocfs2_free_replay_slots(osb);
osb->recovery_thread_task = NULL; osb->recovery_thread_task = NULL;
mb(); /* sync with ocfs2_recovery_thread_running */ mb(); /* sync with ocfs2_recovery_thread_running */
wake_up(&osb->recovery_event); wake_up(&osb->recovery_event);
...@@ -1483,6 +1585,9 @@ static int ocfs2_replay_journal(struct ocfs2_super *osb, ...@@ -1483,6 +1585,9 @@ static int ocfs2_replay_journal(struct ocfs2_super *osb,
goto done; goto done;
} }
/* we need to run complete recovery for offline orphan slots */
ocfs2_replay_map_set_state(osb, REPLAY_NEEDED);
mlog(ML_NOTICE, "Recovering node %d from slot %d on device (%u,%u)\n", mlog(ML_NOTICE, "Recovering node %d from slot %d on device (%u,%u)\n",
node_num, slot_num, node_num, slot_num,
MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev)); MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
......
...@@ -150,6 +150,7 @@ void ocfs2_wait_for_recovery(struct ocfs2_super *osb); ...@@ -150,6 +150,7 @@ void ocfs2_wait_for_recovery(struct ocfs2_super *osb);
int ocfs2_recovery_init(struct ocfs2_super *osb); int ocfs2_recovery_init(struct ocfs2_super *osb);
void ocfs2_recovery_exit(struct ocfs2_super *osb); void ocfs2_recovery_exit(struct ocfs2_super *osb);
int ocfs2_compute_replay_slots(struct ocfs2_super *osb);
/* /*
* Journal Control: * Journal Control:
* Initialize, Load, Shutdown, Wipe a journal. * Initialize, Load, Shutdown, Wipe a journal.
......
...@@ -209,6 +209,7 @@ enum ocfs2_mount_options ...@@ -209,6 +209,7 @@ enum ocfs2_mount_options
struct ocfs2_journal; struct ocfs2_journal;
struct ocfs2_slot_info; struct ocfs2_slot_info;
struct ocfs2_recovery_map; struct ocfs2_recovery_map;
struct ocfs2_replay_map;
struct ocfs2_quota_recovery; struct ocfs2_quota_recovery;
struct ocfs2_dentry_lock; struct ocfs2_dentry_lock;
struct ocfs2_super struct ocfs2_super
...@@ -264,6 +265,7 @@ struct ocfs2_super ...@@ -264,6 +265,7 @@ struct ocfs2_super
atomic_t vol_state; atomic_t vol_state;
struct mutex recovery_lock; struct mutex recovery_lock;
struct ocfs2_recovery_map *recovery_map; struct ocfs2_recovery_map *recovery_map;
struct ocfs2_replay_map *replay_map;
struct task_struct *recovery_thread_task; struct task_struct *recovery_thread_task;
int disable_recovery; int disable_recovery;
wait_queue_head_t checkpoint_event; wait_queue_head_t checkpoint_event;
......
...@@ -2312,6 +2312,12 @@ static int ocfs2_check_volume(struct ocfs2_super *osb) ...@@ -2312,6 +2312,12 @@ static int ocfs2_check_volume(struct ocfs2_super *osb)
* lock, and it's marked as dirty, set the bit in the recover * lock, and it's marked as dirty, set the bit in the recover
* map and launch a recovery thread for it. */ * map and launch a recovery thread for it. */
status = ocfs2_mark_dead_nodes(osb); status = ocfs2_mark_dead_nodes(osb);
if (status < 0) {
mlog_errno(status);
goto finally;
}
status = ocfs2_compute_replay_slots(osb);
if (status < 0) if (status < 0)
mlog_errno(status); mlog_errno(status);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment