[GFS2] Make journaled data files identical to normal files on disk

This is a very large patch, with a few still to be resolved issues
so you might want to check out the previous head of the tree since
this is known to be unstable. Fixes for the various bugs will be
forthcoming shortly.

This patch removes the special data format which has been used
up till now for journaled data files. Directories still retain the
old format so that they will remain on disk compatible with earlier
releases. As a result you can now do the following with journaled
data files:

 1) mmap them
 2) export them over NFS
 3) convert to/from normal files whenever you want to (the zero length
    restriction is gone)

In addition the level at which GFS' locking is done has changed for all
files (since they all now use the page cache) such that the locking is
done at the page cache level rather than the level of the fs operations.
This should mean that things like loopback mounts and other things which
touch the page cache directly should now work.

Current known issues:

 1. There is a lock mode inversion problem related to the resource
    group hold function which needs to be resolved.
 2. Any significant amount of I/O causes an oops with an offset of hex 320
    (NULL pointer dereference) which appears to be related to a journaled data
    buffer appearing on a list where it shouldn't be.
 3. Direct I/O writes are disabled for the time being (will reappear later)
 4. There is probably a deadlock between the page lock and GFS' locks under
    certain combinations of mmap and fs operation I/O.
 5. Issue relating to ref counting on internally used inodes causes a hang
    on umount (discovered before this patch, and not fixed by it)
 6. One part of the directory metadata is different from GFS1 and will need
    to be resolved before next release.

Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>
diff --git a/fs/gfs2/lops.c b/fs/gfs2/lops.c
index a065f76..dd41863 100644
--- a/fs/gfs2/lops.c
+++ b/fs/gfs2/lops.c
@@ -428,49 +428,188 @@
 	gfs2_assert_warn(sdp, !sdp->sd_log_num_rg);
 }
 
+/**
+ * databuf_lo_add - Add a databuf to the transaction.
+ *
+ * This is used in two distinct cases:
+ * i) In ordered write mode
+ *    We put the data buffer on a list so that we can ensure that its
+ *    synced to disk at the right time
+ * ii) In journaled data mode
+ *    We need to journal the data block in the same way as metadata in
+ *    the functions above. The difference is that here we have a tag
+ *    which is two __be64's being the block number (as per meta data)
+ *    and a flag which says whether the data block needs escaping or
+ *    not. This means we need a new log entry for each 251 or so data
+ *    blocks, which isn't an enormous overhead but twice as much as
+ *    for normal metadata blocks.
+ */
 static void databuf_lo_add(struct gfs2_sbd *sdp, struct gfs2_log_element *le)
 {
-	get_transaction->tr_touched = 1;
+	struct gfs2_bufdata *bd = container_of(le, struct gfs2_bufdata, bd_le);
+	struct gfs2_trans *tr = get_transaction;
+	struct address_space *mapping = bd->bd_bh->b_page->mapping;
+	struct gfs2_inode *ip = get_v2ip(mapping->host);
 
+	tr->tr_touched = 1;
+	if (!list_empty(&bd->bd_list_tr) &&
+	    (ip->i_di.di_flags & GFS2_DIF_JDATA)) {
+		tr->tr_num_buf++;
+		gfs2_trans_add_gl(bd->bd_gl);
+		list_add(&bd->bd_list_tr, &tr->tr_list_buf);
+		gfs2_pin(sdp, bd->bd_bh);
+	} else {
+		clear_buffer_pinned(bd->bd_bh);
+	}
 	gfs2_log_lock(sdp);
+	if (ip->i_di.di_flags & GFS2_DIF_JDATA)
+		sdp->sd_log_num_jdata++;
 	sdp->sd_log_num_databuf++;
 	list_add(&le->le_list, &sdp->sd_log_le_databuf);
 	gfs2_log_unlock(sdp);
 }
 
+static int gfs2_check_magic(struct buffer_head *bh)
+{
+	struct page *page = bh->b_page;
+	void *kaddr;
+	__be32 *ptr;
+	int rv = 0;
+
+	kaddr = kmap_atomic(page, KM_USER0);
+	ptr = kaddr + bh_offset(bh);
+	if (*ptr == cpu_to_be32(GFS2_MAGIC))
+		rv = 1;
+	kunmap_atomic(page, KM_USER0);
+
+	return rv;
+}
+
+/**
+ * databuf_lo_before_commit - Scan the data buffers, writing as we go
+ *
+ * Here we scan through the lists of buffers and make the assumption
+ * that any buffer thats been pinned is being journaled, and that
+ * any unpinned buffer is an ordered write data buffer and therefore
+ * will be written back rather than journaled.
+ */
 static void databuf_lo_before_commit(struct gfs2_sbd *sdp)
 {
-	struct list_head *head = &sdp->sd_log_le_databuf;
 	LIST_HEAD(started);
-	struct gfs2_bufdata *bd;
-	struct buffer_head *bh;
+	struct gfs2_bufdata *bd1 = NULL, *bd2, *bdt;
+	struct buffer_head *bh = NULL;
+	unsigned int offset = sizeof(struct gfs2_log_descriptor);
+	struct gfs2_log_descriptor *ld;
+	unsigned int limit;
+	unsigned int total_dbuf = sdp->sd_log_num_databuf;
+	unsigned int total_jdata = sdp->sd_log_num_jdata;
+	unsigned int num, n;
+	__be64 *ptr;
 
-	while (!list_empty(head)) {
-		bd = list_entry(head->prev, struct gfs2_bufdata, bd_le.le_list);
-		list_move(&bd->bd_le.le_list, &started);
+	offset += (2*sizeof(__be64) - 1);
+	offset &= ~(2*sizeof(__be64) - 1);
+	limit = (sdp->sd_sb.sb_bsize - offset)/sizeof(__be64);
 
-		gfs2_log_lock(sdp);
-		bh = bd->bd_bh;
-		if (bh) {
-			get_bh(bh);
-			gfs2_log_unlock(sdp);
-			if (buffer_dirty(bh)) {
-				wait_on_buffer(bh);
-				ll_rw_block(WRITE, 1, &bh);
+	/* printk(KERN_INFO "totals: jdata=%u dbuf=%u\n", total_jdata, total_dbuf); */
+	/*
+	 * Start writing ordered buffers, write journaled buffers
+	 * into the log along with a header
+	 */
+	bd2 = bd1 = list_prepare_entry(bd1, &sdp->sd_log_le_databuf, bd_le.le_list);
+	while(total_dbuf) {
+		num = total_jdata;
+		if (num > limit)
+			num = limit;
+		n = 0;
+		list_for_each_entry_safe_continue(bd1, bdt, &sdp->sd_log_le_databuf, bd_le.le_list) {
+			gfs2_log_lock(sdp);
+			/* An ordered write buffer */
+			if (bd1->bd_bh && !buffer_pinned(bd1->bd_bh)) {
+				list_move(&bd1->bd_le.le_list, &started);
+				if (bd1 == bd2) {
+					bd2 = NULL;
+					bd2 = list_prepare_entry(bd2, &sdp->sd_log_le_databuf, bd_le.le_list);
+				}
+				total_dbuf--;
+				if (bd1->bd_bh) {
+					get_bh(bd1->bd_bh);
+					gfs2_log_unlock(sdp);
+					if (buffer_dirty(bd1->bd_bh)) {
+						wait_on_buffer(bd1->bd_bh);
+						ll_rw_block(WRITE, 1, &bd1->bd_bh);
+					}
+					brelse(bd1->bd_bh);
+					continue;
+				}
+				gfs2_log_unlock(sdp);
+				continue;
+			} else if (bd1->bd_bh) { /* A journaled buffer */
+				int magic;
+				gfs2_log_unlock(sdp);
+				/* printk(KERN_INFO "journaled buffer\n"); */
+				if (!bh) {
+					bh = gfs2_log_get_buf(sdp);
+					ld = (struct gfs2_log_descriptor *)bh->b_data;
+					ptr = (__be64 *)(bh->b_data + offset);
+					ld->ld_header.mh_magic = cpu_to_be32(GFS2_MAGIC);
+					ld->ld_header.mh_type = cpu_to_be16(GFS2_METATYPE_LD);
+					ld->ld_header.mh_format = cpu_to_be16(GFS2_FORMAT_LD);
+					ld->ld_type = cpu_to_be32(GFS2_LOG_DESC_JDATA);
+					ld->ld_length = cpu_to_be32(num + 1);
+					ld->ld_data1 = cpu_to_be32(num);
+					ld->ld_data2 = cpu_to_be32(0);
+					memset(ld->ld_reserved, 0, sizeof(ld->ld_reserved));
+				}
+				magic = gfs2_check_magic(bd1->bd_bh);
+				*ptr++ = cpu_to_be64(bd1->bd_bh->b_blocknr);
+				*ptr++ = cpu_to_be64((__u64)magic);
+				clear_buffer_escaped(bd1->bd_bh);
+				if (unlikely(magic != 0))
+					set_buffer_escaped(bd1->bd_bh);
+				if (n++ > num)
+					break;
 			}
-			brelse(bh);
-		} else
-			gfs2_log_unlock(sdp);
+		}
+		if (bh) {
+			set_buffer_dirty(bh);
+			ll_rw_block(WRITE, 1, &bh);
+			bh = NULL;
+		}
+		n = 0;
+		/* printk(KERN_INFO "totals2: jdata=%u dbuf=%u\n", total_jdata, total_dbuf); */
+		list_for_each_entry_continue(bd2, &sdp->sd_log_le_databuf, bd_le.le_list) {
+			if (!bd2->bd_bh)
+				continue;
+			/* copy buffer if it needs escaping */
+			if (unlikely(buffer_escaped(bd2->bd_bh))) {
+				void *kaddr;
+				struct page *page = bd2->bd_bh->b_page;
+				bh = gfs2_log_get_buf(sdp);
+				kaddr = kmap_atomic(page, KM_USER0);
+				memcpy(bh->b_data, kaddr + bh_offset(bd2->bd_bh), sdp->sd_sb.sb_bsize);
+				kunmap_atomic(page, KM_USER0);
+				*(__be32 *)bh->b_data = 0;
+			} else {
+				bh = gfs2_log_fake_buf(sdp, bd2->bd_bh);
+			}
+			set_buffer_dirty(bh);
+			ll_rw_block(WRITE, 1, &bh);
+			if (++n >= num)
+				break;
+		}
+		bh = NULL;
+		total_dbuf -= num;
+		total_jdata -= num;
 	}
-
+	/* printk(KERN_INFO "wait on ordered data buffers\n"); */
+	/* Wait on all ordered buffers */
 	while (!list_empty(&started)) {
-		bd = list_entry(started.next, struct gfs2_bufdata,
-				bd_le.le_list);
-		list_del(&bd->bd_le.le_list);
+		bd1 = list_entry(started.next, struct gfs2_bufdata, bd_le.le_list);
+		list_del(&bd1->bd_le.le_list);
 		sdp->sd_log_num_databuf--;
 
 		gfs2_log_lock(sdp);
-		bh = bd->bd_bh;
+		bh = bd1->bd_bh;
 		if (bh) {
 			set_v2bd(bh, NULL);
 			gfs2_log_unlock(sdp);
@@ -479,12 +618,103 @@
 		} else
 			gfs2_log_unlock(sdp);
 
-		kfree(bd);
+		kfree(bd1);
 	}
 
-	gfs2_assert_warn(sdp, !sdp->sd_log_num_databuf);
+	/* printk(KERN_INFO "sd_log_num_databuf %u sd_log_num_jdata %u\n", sdp->sd_log_num_databuf, sdp->sd_log_num_jdata); */
+	/* We've removed all the ordered write bufs here, so only jdata left */
+	gfs2_assert_warn(sdp, sdp->sd_log_num_databuf == sdp->sd_log_num_jdata);
 }
 
+static int databuf_lo_scan_elements(struct gfs2_jdesc *jd, unsigned int start,
+				    struct gfs2_log_descriptor *ld,
+				    __be64 *ptr, int pass)
+{
+	struct gfs2_sbd *sdp = jd->jd_inode->i_sbd;
+	struct gfs2_glock *gl = jd->jd_inode->i_gl;
+	unsigned int blks = be32_to_cpu(ld->ld_data1);
+	struct buffer_head *bh_log, *bh_ip;
+	uint64_t blkno;
+	uint64_t esc;
+	int error = 0;
+
+	if (pass != 1 || be32_to_cpu(ld->ld_type) != GFS2_LOG_DESC_JDATA)
+		return 0;
+
+	gfs2_replay_incr_blk(sdp, &start);
+	for (; blks; gfs2_replay_incr_blk(sdp, &start), blks--) {
+		blkno = be64_to_cpu(*ptr++);
+		esc = be64_to_cpu(*ptr++);
+
+		sdp->sd_found_blocks++;
+
+		if (gfs2_revoke_check(sdp, blkno, start))
+			continue;
+
+		error = gfs2_replay_read_block(jd, start, &bh_log);
+		if (error)
+			return error;
+
+		bh_ip = gfs2_meta_new(gl, blkno);
+		memcpy(bh_ip->b_data, bh_log->b_data, bh_log->b_size);
+
+		/* Unescape */
+		if (esc) {
+			__be32 *eptr = (__be32 *)bh_ip->b_data;
+			*eptr = cpu_to_be32(GFS2_MAGIC);
+		}
+		mark_buffer_dirty(bh_ip);
+
+		brelse(bh_log);
+		brelse(bh_ip);
+		if (error)
+			break;
+
+		sdp->sd_replayed_blocks++;
+	}
+
+	return error;
+}
+
+/* FIXME: sort out accounting for log blocks etc. */
+
+static void databuf_lo_after_scan(struct gfs2_jdesc *jd, int error, int pass)
+{
+	struct gfs2_sbd *sdp = jd->jd_inode->i_sbd;
+
+	if (error) {
+		gfs2_meta_sync(jd->jd_inode->i_gl, DIO_START | DIO_WAIT);
+		return;
+	}
+	if (pass != 1)
+		return;
+
+	/* data sync? */
+	gfs2_meta_sync(jd->jd_inode->i_gl, DIO_START | DIO_WAIT);
+
+	fs_info(sdp, "jid=%u: Replayed %u of %u data blocks\n",
+		jd->jd_jid, sdp->sd_replayed_blocks, sdp->sd_found_blocks);
+}
+
+static void databuf_lo_after_commit(struct gfs2_sbd *sdp, struct gfs2_ail *ai)
+{
+	struct list_head *head = &sdp->sd_log_le_databuf;
+	struct gfs2_bufdata *bd;
+
+	while (!list_empty(head)) {
+		bd = list_entry(head->next, struct gfs2_bufdata, bd_le.le_list);
+		list_del_init(&bd->bd_le.le_list);
+		sdp->sd_log_num_databuf--;
+		sdp->sd_log_num_jdata--;
+		gfs2_unpin(sdp, bd->bd_bh, ai);
+		brelse(bd->bd_bh);
+		kfree(bd);
+	}
+	gfs2_assert_warn(sdp, !sdp->sd_log_num_databuf);
+	gfs2_assert_warn(sdp, !sdp->sd_log_num_jdata);
+}
+
+
 struct gfs2_log_operations gfs2_glock_lops = {
 	.lo_add = glock_lo_add,
 	.lo_after_commit = glock_lo_after_commit,
@@ -519,7 +749,11 @@
 
 struct gfs2_log_operations gfs2_databuf_lops = {
 	.lo_add = databuf_lo_add,
+	.lo_incore_commit = buf_lo_incore_commit,
 	.lo_before_commit = databuf_lo_before_commit,
+	.lo_after_commit = databuf_lo_after_commit,
+	.lo_scan_elements = databuf_lo_scan_elements,
+	.lo_after_scan = databuf_lo_after_scan,
 	.lo_name = "databuf"
 };