Commit 171bf93c authored by Mark Fasheh's avatar Mark Fasheh

ocfs2: Periodic quota syncing

This patch creates a work queue for periodic syncing of locally cached quota
information to the global quota files. We constantly queue a delayed work
item, to get the periodic behavior.
Signed-off-by: default avatarMark Fasheh <mfasheh@suse.com>
Acked-by: default avatarJan Kara <jack@suse.cz>
parent a90714c1
...@@ -39,6 +39,7 @@ struct ocfs2_mem_dqinfo { ...@@ -39,6 +39,7 @@ struct ocfs2_mem_dqinfo {
unsigned int dqi_chunks; /* Number of chunks in local quota file */ unsigned int dqi_chunks; /* Number of chunks in local quota file */
unsigned int dqi_blocks; /* Number of blocks allocated for local quota file */ unsigned int dqi_blocks; /* Number of blocks allocated for local quota file */
unsigned int dqi_syncms; /* How often should we sync with other nodes */ unsigned int dqi_syncms; /* How often should we sync with other nodes */
unsigned int dqi_syncjiff; /* Precomputed dqi_syncms in jiffies */
struct list_head dqi_chunk; /* List of chunks */ struct list_head dqi_chunk; /* List of chunks */
struct inode *dqi_gqinode; /* Global quota file inode */ struct inode *dqi_gqinode; /* Global quota file inode */
struct ocfs2_lock_res dqi_gqlock; /* Lock protecting quota information structure */ struct ocfs2_lock_res dqi_gqlock; /* Lock protecting quota information structure */
...@@ -47,6 +48,7 @@ struct ocfs2_mem_dqinfo { ...@@ -47,6 +48,7 @@ struct ocfs2_mem_dqinfo {
struct buffer_head *dqi_lqi_bh; /* Buffer head with local quota file inode */ struct buffer_head *dqi_lqi_bh; /* Buffer head with local quota file inode */
struct buffer_head *dqi_ibh; /* Buffer with information header */ struct buffer_head *dqi_ibh; /* Buffer with information header */
struct qtree_mem_dqinfo dqi_gi; /* Info about global file */ struct qtree_mem_dqinfo dqi_gi; /* Info about global file */
struct delayed_work dqi_sync_work; /* Work for syncing dquots */
}; };
static inline struct ocfs2_dquot *OCFS2_DQUOT(struct dquot *dquot) static inline struct ocfs2_dquot *OCFS2_DQUOT(struct dquot *dquot)
...@@ -90,4 +92,7 @@ struct buffer_head *ocfs2_read_quota_block(struct inode *inode, ...@@ -90,4 +92,7 @@ struct buffer_head *ocfs2_read_quota_block(struct inode *inode,
extern struct dquot_operations ocfs2_quota_operations; extern struct dquot_operations ocfs2_quota_operations;
extern struct quota_format_type ocfs2_quota_format; extern struct quota_format_type ocfs2_quota_format;
int ocfs2_quota_setup(void);
void ocfs2_quota_shutdown(void);
#endif /* _OCFS2_QUOTA_H */ #endif /* _OCFS2_QUOTA_H */
/* /*
* Implementation of operations over global quota file * Implementation of operations over global quota file
*/ */
#include <linux/spinlock.h>
#include <linux/fs.h> #include <linux/fs.h>
#include <linux/quota.h> #include <linux/quota.h>
#include <linux/quotaops.h> #include <linux/quotaops.h>
#include <linux/dqblk_qtree.h> #include <linux/dqblk_qtree.h>
#include <linux/jiffies.h>
#include <linux/writeback.h>
#include <linux/workqueue.h>
#define MLOG_MASK_PREFIX ML_QUOTA #define MLOG_MASK_PREFIX ML_QUOTA
#include <cluster/masklog.h> #include <cluster/masklog.h>
...@@ -20,6 +24,10 @@ ...@@ -20,6 +24,10 @@
#include "uptodate.h" #include "uptodate.h"
#include "quota.h" #include "quota.h"
static struct workqueue_struct *ocfs2_quota_wq = NULL;
static void qsync_work_fn(struct work_struct *work);
static void ocfs2_global_disk2memdqb(struct dquot *dquot, void *dp) static void ocfs2_global_disk2memdqb(struct dquot *dquot, void *dp)
{ {
struct ocfs2_global_disk_dqblk *d = dp; struct ocfs2_global_disk_dqblk *d = dp;
...@@ -313,6 +321,7 @@ int ocfs2_global_read_info(struct super_block *sb, int type) ...@@ -313,6 +321,7 @@ int ocfs2_global_read_info(struct super_block *sb, int type)
info->dqi_bgrace = le32_to_cpu(dinfo.dqi_bgrace); info->dqi_bgrace = le32_to_cpu(dinfo.dqi_bgrace);
info->dqi_igrace = le32_to_cpu(dinfo.dqi_igrace); info->dqi_igrace = le32_to_cpu(dinfo.dqi_igrace);
oinfo->dqi_syncms = le32_to_cpu(dinfo.dqi_syncms); oinfo->dqi_syncms = le32_to_cpu(dinfo.dqi_syncms);
oinfo->dqi_syncjiff = msecs_to_jiffies(oinfo->dqi_syncms);
oinfo->dqi_gi.dqi_blocks = le32_to_cpu(dinfo.dqi_blocks); oinfo->dqi_gi.dqi_blocks = le32_to_cpu(dinfo.dqi_blocks);
oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(dinfo.dqi_free_blk); oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(dinfo.dqi_free_blk);
oinfo->dqi_gi.dqi_free_entry = le32_to_cpu(dinfo.dqi_free_entry); oinfo->dqi_gi.dqi_free_entry = le32_to_cpu(dinfo.dqi_free_entry);
...@@ -320,6 +329,10 @@ int ocfs2_global_read_info(struct super_block *sb, int type) ...@@ -320,6 +329,10 @@ int ocfs2_global_read_info(struct super_block *sb, int type)
oinfo->dqi_gi.dqi_usable_bs = sb->s_blocksize - oinfo->dqi_gi.dqi_usable_bs = sb->s_blocksize -
OCFS2_QBLK_RESERVED_SPACE; OCFS2_QBLK_RESERVED_SPACE;
oinfo->dqi_gi.dqi_qtree_depth = qtree_depth(&oinfo->dqi_gi); oinfo->dqi_gi.dqi_qtree_depth = qtree_depth(&oinfo->dqi_gi);
INIT_DELAYED_WORK(&oinfo->dqi_sync_work, qsync_work_fn);
queue_delayed_work(ocfs2_quota_wq, &oinfo->dqi_sync_work,
oinfo->dqi_syncjiff);
out_err: out_err:
mlog_exit(status); mlog_exit(status);
return status; return status;
...@@ -519,6 +532,61 @@ out: ...@@ -519,6 +532,61 @@ out:
return err; return err;
} }
/*
* Functions for periodic syncing of dquots with global file
*/
static int ocfs2_sync_dquot_helper(struct dquot *dquot, unsigned long type)
{
handle_t *handle;
struct super_block *sb = dquot->dq_sb;
struct ocfs2_mem_dqinfo *oinfo = sb_dqinfo(sb, type)->dqi_priv;
struct ocfs2_super *osb = OCFS2_SB(sb);
int status = 0;
mlog_entry("id=%u qtype=%u type=%lu device=%s\n", dquot->dq_id,
dquot->dq_type, type, sb->s_id);
if (type != dquot->dq_type)
goto out;
status = ocfs2_lock_global_qf(oinfo, 1);
if (status < 0)
goto out;
handle = ocfs2_start_trans(osb, OCFS2_QSYNC_CREDITS);
if (IS_ERR(handle)) {
status = PTR_ERR(handle);
mlog_errno(status);
goto out_ilock;
}
mutex_lock(&sb_dqopt(sb)->dqio_mutex);
status = ocfs2_sync_dquot(dquot);
mutex_unlock(&sb_dqopt(sb)->dqio_mutex);
if (status < 0)
mlog_errno(status);
/* We have to write local structure as well... */
dquot_mark_dquot_dirty(dquot);
status = dquot_commit(dquot);
if (status < 0)
mlog_errno(status);
ocfs2_commit_trans(osb, handle);
out_ilock:
ocfs2_unlock_global_qf(oinfo, 1);
out:
mlog_exit(status);
return status;
}
static void qsync_work_fn(struct work_struct *work)
{
struct ocfs2_mem_dqinfo *oinfo = container_of(work,
struct ocfs2_mem_dqinfo,
dqi_sync_work.work);
struct super_block *sb = oinfo->dqi_gqinode->i_sb;
dquot_scan_active(sb, ocfs2_sync_dquot_helper, oinfo->dqi_type);
queue_delayed_work(ocfs2_quota_wq, &oinfo->dqi_sync_work,
oinfo->dqi_syncjiff);
}
/* /*
* Wrappers for generic quota functions * Wrappers for generic quota functions
*/ */
...@@ -917,3 +985,20 @@ struct dquot_operations ocfs2_quota_operations = { ...@@ -917,3 +985,20 @@ struct dquot_operations ocfs2_quota_operations = {
.alloc_dquot = ocfs2_alloc_dquot, .alloc_dquot = ocfs2_alloc_dquot,
.destroy_dquot = ocfs2_destroy_dquot, .destroy_dquot = ocfs2_destroy_dquot,
}; };
int ocfs2_quota_setup(void)
{
ocfs2_quota_wq = create_workqueue("o2quot");
if (!ocfs2_quota_wq)
return -ENOMEM;
return 0;
}
void ocfs2_quota_shutdown(void)
{
if (ocfs2_quota_wq) {
flush_workqueue(ocfs2_quota_wq);
destroy_workqueue(ocfs2_quota_wq);
ocfs2_quota_wq = NULL;
}
}
...@@ -368,6 +368,10 @@ static int ocfs2_local_free_info(struct super_block *sb, int type) ...@@ -368,6 +368,10 @@ static int ocfs2_local_free_info(struct super_block *sb, int type)
int mark_clean = 1, len; int mark_clean = 1, len;
int status; int status;
/* At this point we know there are no more dquots and thus
* even if there's some sync in the pdflush queue, it won't
* find any dquots and return without doing anything */
cancel_delayed_work_sync(&oinfo->dqi_sync_work);
iput(oinfo->dqi_gqinode); iput(oinfo->dqi_gqinode);
ocfs2_simple_drop_lockres(OCFS2_SB(sb), &oinfo->dqi_gqlock); ocfs2_simple_drop_lockres(OCFS2_SB(sb), &oinfo->dqi_gqlock);
ocfs2_lock_res_free(&oinfo->dqi_gqlock); ocfs2_lock_res_free(&oinfo->dqi_gqlock);
......
...@@ -1107,11 +1107,16 @@ static int __init ocfs2_init(void) ...@@ -1107,11 +1107,16 @@ static int __init ocfs2_init(void)
mlog(ML_ERROR, "Unable to create ocfs2 debugfs root.\n"); mlog(ML_ERROR, "Unable to create ocfs2 debugfs root.\n");
} }
status = ocfs2_quota_setup();
if (status)
goto leave;
ocfs2_set_locking_protocol(); ocfs2_set_locking_protocol();
status = register_quota_format(&ocfs2_quota_format); status = register_quota_format(&ocfs2_quota_format);
leave: leave:
if (status < 0) { if (status < 0) {
ocfs2_quota_shutdown();
ocfs2_free_mem_caches(); ocfs2_free_mem_caches();
exit_ocfs2_uptodate_cache(); exit_ocfs2_uptodate_cache();
} }
...@@ -1128,6 +1133,8 @@ static void __exit ocfs2_exit(void) ...@@ -1128,6 +1133,8 @@ static void __exit ocfs2_exit(void)
{ {
mlog_entry_void(); mlog_entry_void();
ocfs2_quota_shutdown();
if (ocfs2_wq) { if (ocfs2_wq) {
flush_workqueue(ocfs2_wq); flush_workqueue(ocfs2_wq);
destroy_workqueue(ocfs2_wq); destroy_workqueue(ocfs2_wq);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment