ocfs2: Periodic quota syncing

This patch creates a work queue for periodic syncing of locally cached quota
information to the global quota files. We constantly queue a delayed work
item, to get the periodic behavior.

Signed-off-by: Mark Fasheh <mfasheh@suse.com>
Acked-by: Jan Kara <jack@suse.cz>
diff --git a/fs/ocfs2/quota_global.c b/fs/ocfs2/quota_global.c
index af8340c..adf5350 100644
--- a/fs/ocfs2/quota_global.c
+++ b/fs/ocfs2/quota_global.c
@@ -1,10 +1,14 @@
 /*
  *  Implementation of operations over global quota file
  */
+#include <linux/spinlock.h>
 #include <linux/fs.h>
 #include <linux/quota.h>
 #include <linux/quotaops.h>
 #include <linux/dqblk_qtree.h>
+#include <linux/jiffies.h>
+#include <linux/writeback.h>
+#include <linux/workqueue.h>
 
 #define MLOG_MASK_PREFIX ML_QUOTA
 #include <cluster/masklog.h>
@@ -20,6 +24,10 @@
 #include "uptodate.h"
 #include "quota.h"
 
+static struct workqueue_struct *ocfs2_quota_wq = NULL;
+
+static void qsync_work_fn(struct work_struct *work);
+
 static void ocfs2_global_disk2memdqb(struct dquot *dquot, void *dp)
 {
 	struct ocfs2_global_disk_dqblk *d = dp;
@@ -313,6 +321,7 @@
 	info->dqi_bgrace = le32_to_cpu(dinfo.dqi_bgrace);
 	info->dqi_igrace = le32_to_cpu(dinfo.dqi_igrace);
 	oinfo->dqi_syncms = le32_to_cpu(dinfo.dqi_syncms);
+	oinfo->dqi_syncjiff = msecs_to_jiffies(oinfo->dqi_syncms);
 	oinfo->dqi_gi.dqi_blocks = le32_to_cpu(dinfo.dqi_blocks);
 	oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(dinfo.dqi_free_blk);
 	oinfo->dqi_gi.dqi_free_entry = le32_to_cpu(dinfo.dqi_free_entry);
@@ -320,6 +329,10 @@
 	oinfo->dqi_gi.dqi_usable_bs = sb->s_blocksize -
 						OCFS2_QBLK_RESERVED_SPACE;
 	oinfo->dqi_gi.dqi_qtree_depth = qtree_depth(&oinfo->dqi_gi);
+	INIT_DELAYED_WORK(&oinfo->dqi_sync_work, qsync_work_fn);
+	queue_delayed_work(ocfs2_quota_wq, &oinfo->dqi_sync_work,
+			   oinfo->dqi_syncjiff);
+
 out_err:
 	mlog_exit(status);
 	return status;
@@ -520,6 +533,61 @@
 }
 
 /*
+ *  Functions for periodic syncing of dquots with global file
+ */
+static int ocfs2_sync_dquot_helper(struct dquot *dquot, unsigned long type)
+{
+	handle_t *handle;
+	struct super_block *sb = dquot->dq_sb;
+	struct ocfs2_mem_dqinfo *oinfo = sb_dqinfo(sb, type)->dqi_priv;
+	struct ocfs2_super *osb = OCFS2_SB(sb);
+	int status = 0;
+
+	mlog_entry("id=%u qtype=%u type=%lu device=%s\n", dquot->dq_id,
+		   dquot->dq_type, type, sb->s_id);
+	if (type != dquot->dq_type)
+		goto out;
+	status = ocfs2_lock_global_qf(oinfo, 1);
+	if (status < 0)
+		goto out;
+
+	handle = ocfs2_start_trans(osb, OCFS2_QSYNC_CREDITS);
+	if (IS_ERR(handle)) {
+		status = PTR_ERR(handle);
+		mlog_errno(status);
+		goto out_ilock;
+	}
+	mutex_lock(&sb_dqopt(sb)->dqio_mutex);
+	status = ocfs2_sync_dquot(dquot);
+	mutex_unlock(&sb_dqopt(sb)->dqio_mutex);
+	if (status < 0)
+		mlog_errno(status);
+	/* We have to write local structure as well... */
+	dquot_mark_dquot_dirty(dquot);
+	status = dquot_commit(dquot);
+	if (status < 0)
+		mlog_errno(status);
+	ocfs2_commit_trans(osb, handle);
+out_ilock:
+	ocfs2_unlock_global_qf(oinfo, 1);
+out:
+	mlog_exit(status);
+	return status;
+}
+
+static void qsync_work_fn(struct work_struct *work)
+{
+	struct ocfs2_mem_dqinfo *oinfo = container_of(work,
+						      struct ocfs2_mem_dqinfo,
+						      dqi_sync_work.work);
+	struct super_block *sb = oinfo->dqi_gqinode->i_sb;
+
+	dquot_scan_active(sb, ocfs2_sync_dquot_helper, oinfo->dqi_type);
+	queue_delayed_work(ocfs2_quota_wq, &oinfo->dqi_sync_work,
+			   oinfo->dqi_syncjiff);
+}
+
+/*
  *  Wrappers for generic quota functions
  */
 
@@ -917,3 +985,20 @@
 	.alloc_dquot	= ocfs2_alloc_dquot,
 	.destroy_dquot	= ocfs2_destroy_dquot,
 };
+
+int ocfs2_quota_setup(void)
+{
+	ocfs2_quota_wq = create_workqueue("o2quot");
+	if (!ocfs2_quota_wq)
+		return -ENOMEM;
+	return 0;
+}
+
+void ocfs2_quota_shutdown(void)
+{
+	if (ocfs2_quota_wq) {
+		flush_workqueue(ocfs2_quota_wq);
+		destroy_workqueue(ocfs2_quota_wq);
+		ocfs2_quota_wq = NULL;
+	}
+}