Commit 2205363d authored by Jan Kara's avatar Jan Kara Committed by Mark Fasheh

ocfs2: Implement quota recovery

Implement functions for recovery after a crash. Functions just
read local quota file and sync info to global quota file.
Signed-off-by: default avatarJan Kara <jack@suse.cz>
Signed-off-by: default avatarMark Fasheh <mfasheh@suse.com>
parent 171bf93c
...@@ -45,6 +45,7 @@ ...@@ -45,6 +45,7 @@
#include "slot_map.h" #include "slot_map.h"
#include "super.h" #include "super.h"
#include "sysfile.h" #include "sysfile.h"
#include "quota.h"
#include "buffer_head_io.h" #include "buffer_head_io.h"
...@@ -52,7 +53,7 @@ DEFINE_SPINLOCK(trans_inc_lock); ...@@ -52,7 +53,7 @@ DEFINE_SPINLOCK(trans_inc_lock);
static int ocfs2_force_read_journal(struct inode *inode); static int ocfs2_force_read_journal(struct inode *inode);
static int ocfs2_recover_node(struct ocfs2_super *osb, static int ocfs2_recover_node(struct ocfs2_super *osb,
int node_num); int node_num, int slot_num);
static int __ocfs2_recovery_thread(void *arg); static int __ocfs2_recovery_thread(void *arg);
static int ocfs2_commit_cache(struct ocfs2_super *osb); static int ocfs2_commit_cache(struct ocfs2_super *osb);
static int ocfs2_wait_on_mount(struct ocfs2_super *osb); static int ocfs2_wait_on_mount(struct ocfs2_super *osb);
...@@ -857,6 +858,7 @@ struct ocfs2_la_recovery_item { ...@@ -857,6 +858,7 @@ struct ocfs2_la_recovery_item {
int lri_slot; int lri_slot;
struct ocfs2_dinode *lri_la_dinode; struct ocfs2_dinode *lri_la_dinode;
struct ocfs2_dinode *lri_tl_dinode; struct ocfs2_dinode *lri_tl_dinode;
struct ocfs2_quota_recovery *lri_qrec;
}; };
/* Does the second half of the recovery process. By this point, the /* Does the second half of the recovery process. By this point, the
...@@ -877,6 +879,7 @@ void ocfs2_complete_recovery(struct work_struct *work) ...@@ -877,6 +879,7 @@ void ocfs2_complete_recovery(struct work_struct *work)
struct ocfs2_super *osb = journal->j_osb; struct ocfs2_super *osb = journal->j_osb;
struct ocfs2_dinode *la_dinode, *tl_dinode; struct ocfs2_dinode *la_dinode, *tl_dinode;
struct ocfs2_la_recovery_item *item, *n; struct ocfs2_la_recovery_item *item, *n;
struct ocfs2_quota_recovery *qrec;
LIST_HEAD(tmp_la_list); LIST_HEAD(tmp_la_list);
mlog_entry_void(); mlog_entry_void();
...@@ -922,6 +925,16 @@ void ocfs2_complete_recovery(struct work_struct *work) ...@@ -922,6 +925,16 @@ void ocfs2_complete_recovery(struct work_struct *work)
if (ret < 0) if (ret < 0)
mlog_errno(ret); mlog_errno(ret);
qrec = item->lri_qrec;
if (qrec) {
mlog(0, "Recovering quota files");
ret = ocfs2_finish_quota_recovery(osb, qrec,
item->lri_slot);
if (ret < 0)
mlog_errno(ret);
/* Recovery info is already freed now */
}
kfree(item); kfree(item);
} }
...@@ -935,7 +948,8 @@ void ocfs2_complete_recovery(struct work_struct *work) ...@@ -935,7 +948,8 @@ void ocfs2_complete_recovery(struct work_struct *work)
static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal, static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
int slot_num, int slot_num,
struct ocfs2_dinode *la_dinode, struct ocfs2_dinode *la_dinode,
struct ocfs2_dinode *tl_dinode) struct ocfs2_dinode *tl_dinode,
struct ocfs2_quota_recovery *qrec)
{ {
struct ocfs2_la_recovery_item *item; struct ocfs2_la_recovery_item *item;
...@@ -950,6 +964,9 @@ static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal, ...@@ -950,6 +964,9 @@ static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
if (tl_dinode) if (tl_dinode)
kfree(tl_dinode); kfree(tl_dinode);
if (qrec)
ocfs2_free_quota_recovery(qrec);
mlog_errno(-ENOMEM); mlog_errno(-ENOMEM);
return; return;
} }
...@@ -958,6 +975,7 @@ static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal, ...@@ -958,6 +975,7 @@ static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
item->lri_la_dinode = la_dinode; item->lri_la_dinode = la_dinode;
item->lri_slot = slot_num; item->lri_slot = slot_num;
item->lri_tl_dinode = tl_dinode; item->lri_tl_dinode = tl_dinode;
item->lri_qrec = qrec;
spin_lock(&journal->j_lock); spin_lock(&journal->j_lock);
list_add_tail(&item->lri_list, &journal->j_la_cleanups); list_add_tail(&item->lri_list, &journal->j_la_cleanups);
...@@ -977,6 +995,7 @@ void ocfs2_complete_mount_recovery(struct ocfs2_super *osb) ...@@ -977,6 +995,7 @@ void ocfs2_complete_mount_recovery(struct ocfs2_super *osb)
ocfs2_queue_recovery_completion(journal, ocfs2_queue_recovery_completion(journal,
osb->slot_num, osb->slot_num,
osb->local_alloc_copy, osb->local_alloc_copy,
NULL,
NULL); NULL);
ocfs2_schedule_truncate_log_flush(osb, 0); ocfs2_schedule_truncate_log_flush(osb, 0);
...@@ -985,11 +1004,26 @@ void ocfs2_complete_mount_recovery(struct ocfs2_super *osb) ...@@ -985,11 +1004,26 @@ void ocfs2_complete_mount_recovery(struct ocfs2_super *osb)
} }
} }
void ocfs2_complete_quota_recovery(struct ocfs2_super *osb)
{
if (osb->quota_rec) {
ocfs2_queue_recovery_completion(osb->journal,
osb->slot_num,
NULL,
NULL,
osb->quota_rec);
osb->quota_rec = NULL;
}
}
static int __ocfs2_recovery_thread(void *arg) static int __ocfs2_recovery_thread(void *arg)
{ {
int status, node_num; int status, node_num, slot_num;
struct ocfs2_super *osb = arg; struct ocfs2_super *osb = arg;
struct ocfs2_recovery_map *rm = osb->recovery_map; struct ocfs2_recovery_map *rm = osb->recovery_map;
int *rm_quota = NULL;
int rm_quota_used = 0, i;
struct ocfs2_quota_recovery *qrec;
mlog_entry_void(); mlog_entry_void();
...@@ -998,6 +1032,11 @@ static int __ocfs2_recovery_thread(void *arg) ...@@ -998,6 +1032,11 @@ static int __ocfs2_recovery_thread(void *arg)
goto bail; goto bail;
} }
rm_quota = kzalloc(osb->max_slots * sizeof(int), GFP_NOFS);
if (!rm_quota) {
status = -ENOMEM;
goto bail;
}
restart: restart:
status = ocfs2_super_lock(osb, 1); status = ocfs2_super_lock(osb, 1);
if (status < 0) { if (status < 0) {
...@@ -1011,8 +1050,28 @@ restart: ...@@ -1011,8 +1050,28 @@ restart:
* clear it until ocfs2_recover_node() has succeeded. */ * clear it until ocfs2_recover_node() has succeeded. */
node_num = rm->rm_entries[0]; node_num = rm->rm_entries[0];
spin_unlock(&osb->osb_lock); spin_unlock(&osb->osb_lock);
mlog(0, "checking node %d\n", node_num);
status = ocfs2_recover_node(osb, node_num); slot_num = ocfs2_node_num_to_slot(osb, node_num);
if (slot_num == -ENOENT) {
status = 0;
mlog(0, "no slot for this node, so no recovery"
"required.\n");
goto skip_recovery;
}
mlog(0, "node %d was using slot %d\n", node_num, slot_num);
/* It is a bit subtle with quota recovery. We cannot do it
* immediately because we have to obtain cluster locks from
* quota files and we also don't want to just skip it because
* then quota usage would be out of sync until some node takes
* the slot. So we remember which nodes need quota recovery
* and when everything else is done, we recover quotas. */
for (i = 0; i < rm_quota_used && rm_quota[i] != slot_num; i++);
if (i == rm_quota_used)
rm_quota[rm_quota_used++] = slot_num;
status = ocfs2_recover_node(osb, node_num, slot_num);
skip_recovery:
if (!status) { if (!status) {
ocfs2_recovery_map_clear(osb, node_num); ocfs2_recovery_map_clear(osb, node_num);
} else { } else {
...@@ -1034,13 +1093,27 @@ restart: ...@@ -1034,13 +1093,27 @@ restart:
if (status < 0) if (status < 0)
mlog_errno(status); mlog_errno(status);
/* Now it is right time to recover quotas... We have to do this under
* superblock lock so that noone can start using the slot (and crash)
* before we recover it */
for (i = 0; i < rm_quota_used; i++) {
qrec = ocfs2_begin_quota_recovery(osb, rm_quota[i]);
if (IS_ERR(qrec)) {
status = PTR_ERR(qrec);
mlog_errno(status);
continue;
}
ocfs2_queue_recovery_completion(osb->journal, rm_quota[i],
NULL, NULL, qrec);
}
ocfs2_super_unlock(osb, 1); ocfs2_super_unlock(osb, 1);
/* We always run recovery on our own orphan dir - the dead /* We always run recovery on our own orphan dir - the dead
* node(s) may have disallowd a previos inode delete. Re-processing * node(s) may have disallowd a previos inode delete. Re-processing
* is therefore required. */ * is therefore required. */
ocfs2_queue_recovery_completion(osb->journal, osb->slot_num, NULL, ocfs2_queue_recovery_completion(osb->journal, osb->slot_num, NULL,
NULL); NULL, NULL);
bail: bail:
mutex_lock(&osb->recovery_lock); mutex_lock(&osb->recovery_lock);
...@@ -1055,6 +1128,9 @@ bail: ...@@ -1055,6 +1128,9 @@ bail:
mutex_unlock(&osb->recovery_lock); mutex_unlock(&osb->recovery_lock);
if (rm_quota)
kfree(rm_quota);
mlog_exit(status); mlog_exit(status);
/* no one is callint kthread_stop() for us so the kthread() api /* no one is callint kthread_stop() for us so the kthread() api
* requires that we call do_exit(). And it isn't exported, but * requires that we call do_exit(). And it isn't exported, but
...@@ -1282,31 +1358,19 @@ done: ...@@ -1282,31 +1358,19 @@ done:
* far less concerning. * far less concerning.
*/ */
static int ocfs2_recover_node(struct ocfs2_super *osb, static int ocfs2_recover_node(struct ocfs2_super *osb,
int node_num) int node_num, int slot_num)
{ {
int status = 0; int status = 0;
int slot_num;
struct ocfs2_dinode *la_copy = NULL; struct ocfs2_dinode *la_copy = NULL;
struct ocfs2_dinode *tl_copy = NULL; struct ocfs2_dinode *tl_copy = NULL;
mlog_entry("(node_num=%d, osb->node_num = %d)\n", mlog_entry("(node_num=%d, slot_num=%d, osb->node_num = %d)\n",
node_num, osb->node_num); node_num, slot_num, osb->node_num);
mlog(0, "checking node %d\n", node_num);
/* Should not ever be called to recover ourselves -- in that /* Should not ever be called to recover ourselves -- in that
* case we should've called ocfs2_journal_load instead. */ * case we should've called ocfs2_journal_load instead. */
BUG_ON(osb->node_num == node_num); BUG_ON(osb->node_num == node_num);
slot_num = ocfs2_node_num_to_slot(osb, node_num);
if (slot_num == -ENOENT) {
status = 0;
mlog(0, "no slot for this node, so no recovery required.\n");
goto done;
}
mlog(0, "node %d was using slot %d\n", node_num, slot_num);
status = ocfs2_replay_journal(osb, node_num, slot_num); status = ocfs2_replay_journal(osb, node_num, slot_num);
if (status < 0) { if (status < 0) {
if (status == -EBUSY) { if (status == -EBUSY) {
...@@ -1342,7 +1406,7 @@ static int ocfs2_recover_node(struct ocfs2_super *osb, ...@@ -1342,7 +1406,7 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
/* This will kfree the memory pointed to by la_copy and tl_copy */ /* This will kfree the memory pointed to by la_copy and tl_copy */
ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy, ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy,
tl_copy); tl_copy, NULL);
status = 0; status = 0;
done: done:
......
...@@ -168,6 +168,7 @@ void ocfs2_recovery_thread(struct ocfs2_super *osb, ...@@ -168,6 +168,7 @@ void ocfs2_recovery_thread(struct ocfs2_super *osb,
int node_num); int node_num);
int ocfs2_mark_dead_nodes(struct ocfs2_super *osb); int ocfs2_mark_dead_nodes(struct ocfs2_super *osb);
void ocfs2_complete_mount_recovery(struct ocfs2_super *osb); void ocfs2_complete_mount_recovery(struct ocfs2_super *osb);
void ocfs2_complete_quota_recovery(struct ocfs2_super *osb);
static inline void ocfs2_start_checkpoint(struct ocfs2_super *osb) static inline void ocfs2_start_checkpoint(struct ocfs2_super *osb)
{ {
......
...@@ -206,6 +206,7 @@ enum ocfs2_mount_options ...@@ -206,6 +206,7 @@ enum ocfs2_mount_options
struct ocfs2_journal; struct ocfs2_journal;
struct ocfs2_slot_info; struct ocfs2_slot_info;
struct ocfs2_recovery_map; struct ocfs2_recovery_map;
struct ocfs2_quota_recovery;
struct ocfs2_super struct ocfs2_super
{ {
struct task_struct *commit_task; struct task_struct *commit_task;
...@@ -287,10 +288,11 @@ struct ocfs2_super ...@@ -287,10 +288,11 @@ struct ocfs2_super
char *local_alloc_debug_buf; char *local_alloc_debug_buf;
#endif #endif
/* Next two fields are for local node slot recovery during /* Next three fields are for local node slot recovery during
* mount. */ * mount. */
int dirty; int dirty;
struct ocfs2_dinode *local_alloc_copy; struct ocfs2_dinode *local_alloc_copy;
struct ocfs2_quota_recovery *quota_rec;
struct ocfs2_alloc_stats alloc_stats; struct ocfs2_alloc_stats alloc_stats;
char dev_str[20]; /* "major,minor" of the device */ char dev_str[20]; /* "major,minor" of the device */
......
...@@ -33,6 +33,17 @@ struct ocfs2_dquot { ...@@ -33,6 +33,17 @@ struct ocfs2_dquot {
s64 dq_originodes; /* Last globally synced inode usage */ s64 dq_originodes; /* Last globally synced inode usage */
}; };
/* Description of one chunk to recover in memory */
struct ocfs2_recovery_chunk {
struct list_head rc_list; /* List of chunks */
int rc_chunk; /* Chunk number */
unsigned long *rc_bitmap; /* Bitmap of entries to recover */
};
struct ocfs2_quota_recovery {
struct list_head r_list[MAXQUOTAS]; /* List of chunks to recover */
};
/* In-memory structure with quota header information */ /* In-memory structure with quota header information */
struct ocfs2_mem_dqinfo { struct ocfs2_mem_dqinfo {
unsigned int dqi_type; /* Quota type this structure describes */ unsigned int dqi_type; /* Quota type this structure describes */
...@@ -49,6 +60,10 @@ struct ocfs2_mem_dqinfo { ...@@ -49,6 +60,10 @@ struct ocfs2_mem_dqinfo {
struct buffer_head *dqi_ibh; /* Buffer with information header */ struct buffer_head *dqi_ibh; /* Buffer with information header */
struct qtree_mem_dqinfo dqi_gi; /* Info about global file */ struct qtree_mem_dqinfo dqi_gi; /* Info about global file */
struct delayed_work dqi_sync_work; /* Work for syncing dquots */ struct delayed_work dqi_sync_work; /* Work for syncing dquots */
struct ocfs2_quota_recovery *dqi_rec; /* Pointer to recovery
* information, in case we
* enable quotas on file
* needing it */
}; };
static inline struct ocfs2_dquot *OCFS2_DQUOT(struct dquot *dquot) static inline struct ocfs2_dquot *OCFS2_DQUOT(struct dquot *dquot)
...@@ -67,6 +82,12 @@ extern struct kmem_cache *ocfs2_qf_chunk_cachep; ...@@ -67,6 +82,12 @@ extern struct kmem_cache *ocfs2_qf_chunk_cachep;
extern struct qtree_fmt_operations ocfs2_global_ops; extern struct qtree_fmt_operations ocfs2_global_ops;
struct ocfs2_quota_recovery *ocfs2_begin_quota_recovery(
struct ocfs2_super *osb, int slot_num);
int ocfs2_finish_quota_recovery(struct ocfs2_super *osb,
struct ocfs2_quota_recovery *rec,
int slot_num);
void ocfs2_free_quota_recovery(struct ocfs2_quota_recovery *rec);
ssize_t ocfs2_quota_read(struct super_block *sb, int type, char *data, ssize_t ocfs2_quota_read(struct super_block *sb, int type, char *data,
size_t len, loff_t off); size_t len, loff_t off);
ssize_t ocfs2_quota_write(struct super_block *sb, int type, ssize_t ocfs2_quota_write(struct super_block *sb, int type,
......
...@@ -87,7 +87,6 @@ struct qtree_fmt_operations ocfs2_global_ops = { ...@@ -87,7 +87,6 @@ struct qtree_fmt_operations ocfs2_global_ops = {
.is_id = ocfs2_global_is_id, .is_id = ocfs2_global_is_id,
}; };
struct buffer_head *ocfs2_read_quota_block(struct inode *inode, struct buffer_head *ocfs2_read_quota_block(struct inode *inode,
int block, int *err) int block, int *err)
{ {
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment