[GFS2] Clean up journaled data writing

This patch cleans up the code for writing journaled data into the log.
It also removes the need to allocate a small "tag" structure for each
block written into the log. Instead we just keep count of the outstanding
I/O so that we can be sure that its all been written at the correct time.
Another result of this patch is that a number of ll_rw_block() calls
have become submit_bh() calls, closing some races at the same time.

Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>
diff --git a/fs/gfs2/log.c b/fs/gfs2/log.c
index 4d04e6f..ee70467 100644
--- a/fs/gfs2/log.c
+++ b/fs/gfs2/log.c
@@ -104,11 +104,8 @@
 			gfs2_assert(sdp, bd->bd_ail == ai);
 
 			if (!buffer_busy(bh)) {
-				if (!buffer_uptodate(bh)) {
-					gfs2_log_unlock(sdp);
+				if (!buffer_uptodate(bh))
 					gfs2_io_error_bh(sdp, bh);
-					gfs2_log_lock(sdp);
-				}
 				list_move(&bd->bd_ail_st_list, &ai->ai_ail2_list);
 				continue;
 			}
@@ -118,9 +115,16 @@
 
 			list_move(&bd->bd_ail_st_list, &ai->ai_ail1_list);
 
+			get_bh(bh);
 			gfs2_log_unlock(sdp);
-			wait_on_buffer(bh);
-			ll_rw_block(WRITE, 1, &bh);
+			lock_buffer(bh);
+			if (test_clear_buffer_dirty(bh)) {
+				bh->b_end_io = end_buffer_write_sync;
+				submit_bh(WRITE, bh);
+			} else {
+				unlock_buffer(bh);
+				brelse(bh);
+			}
 			gfs2_log_lock(sdp);
 
 			retry = 1;
@@ -446,10 +450,10 @@
 	return tail;
 }
 
-static inline void log_incr_head(struct gfs2_sbd *sdp)
+void gfs2_log_incr_head(struct gfs2_sbd *sdp)
 {
 	if (sdp->sd_log_flush_head == sdp->sd_log_tail)
-		gfs2_assert_withdraw(sdp, sdp->sd_log_flush_head == sdp->sd_log_head);
+		BUG_ON(sdp->sd_log_flush_head != sdp->sd_log_head);
 
 	if (++sdp->sd_log_flush_head == sdp->sd_jdesc->jd_blocks) {
 		sdp->sd_log_flush_head = 0;
@@ -458,6 +462,23 @@
 }
 
 /**
+ * gfs2_log_write_endio - End of I/O for a log buffer
+ * @bh: The buffer head
+ * @uptodate: I/O Status
+ *
+ */
+
+static void gfs2_log_write_endio(struct buffer_head *bh, int uptodate)
+{
+	struct gfs2_sbd *sdp = bh->b_private;
+	bh->b_private = NULL;
+
+	end_buffer_write_sync(bh, uptodate);
+	if (atomic_dec_and_test(&sdp->sd_log_in_flight))
+		wake_up(&sdp->sd_log_flush_wait);
+}
+
+/**
  * gfs2_log_get_buf - Get and initialize a buffer to use for log control data
  * @sdp: The GFS2 superblock
  *
@@ -467,25 +488,42 @@
 struct buffer_head *gfs2_log_get_buf(struct gfs2_sbd *sdp)
 {
 	u64 blkno = log_bmap(sdp, sdp->sd_log_flush_head);
-	struct gfs2_log_buf *lb;
 	struct buffer_head *bh;
 
-	lb = kzalloc(sizeof(struct gfs2_log_buf), GFP_NOFS | __GFP_NOFAIL);
-	list_add(&lb->lb_list, &sdp->sd_log_flush_list);
-
-	bh = lb->lb_bh = sb_getblk(sdp->sd_vfs, blkno);
+	bh = sb_getblk(sdp->sd_vfs, blkno);
 	lock_buffer(bh);
 	memset(bh->b_data, 0, bh->b_size);
 	set_buffer_uptodate(bh);
 	clear_buffer_dirty(bh);
-	unlock_buffer(bh);
-
-	log_incr_head(sdp);
+	gfs2_log_incr_head(sdp);
+	atomic_inc(&sdp->sd_log_in_flight);
+	bh->b_private = sdp;
+	bh->b_end_io = gfs2_log_write_endio;
 
 	return bh;
 }
 
 /**
+ * gfs2_fake_write_endio - 
+ * @bh: The buffer head
+ * @uptodate: The I/O Status
+ *
+ */
+
+static void gfs2_fake_write_endio(struct buffer_head *bh, int uptodate)
+{
+	struct buffer_head *real_bh = bh->b_private;
+	struct gfs2_sbd *sdp = GFS2_SB(real_bh->b_page->mapping->host);
+
+	end_buffer_write_sync(bh, uptodate);
+	free_buffer_head(bh);
+	unlock_buffer(real_bh);
+	brelse(real_bh);
+	if (atomic_dec_and_test(&sdp->sd_log_in_flight))
+		wake_up(&sdp->sd_log_flush_wait);
+}
+
+/**
  * gfs2_log_fake_buf - Build a fake buffer head to write metadata buffer to log
  * @sdp: the filesystem
  * @data: the data the buffer_head should point to
@@ -497,22 +535,20 @@
 				      struct buffer_head *real)
 {
 	u64 blkno = log_bmap(sdp, sdp->sd_log_flush_head);
-	struct gfs2_log_buf *lb;
 	struct buffer_head *bh;
 
-	lb = kzalloc(sizeof(struct gfs2_log_buf), GFP_NOFS | __GFP_NOFAIL);
-	list_add(&lb->lb_list, &sdp->sd_log_flush_list);
-	lb->lb_real = real;
-
-	bh = lb->lb_bh = alloc_buffer_head(GFP_NOFS | __GFP_NOFAIL);
+	bh = alloc_buffer_head(GFP_NOFS | __GFP_NOFAIL);
 	atomic_set(&bh->b_count, 1);
-	bh->b_state = (1 << BH_Mapped) | (1 << BH_Uptodate);
+	bh->b_state = (1 << BH_Mapped) | (1 << BH_Uptodate) | (1 << BH_Lock);
 	set_bh_page(bh, real->b_page, bh_offset(real));
 	bh->b_blocknr = blkno;
 	bh->b_size = sdp->sd_sb.sb_bsize;
 	bh->b_bdev = sdp->sd_vfs->s_bdev;
+	bh->b_private = real;
+	bh->b_end_io = gfs2_fake_write_endio;
 
-	log_incr_head(sdp);
+	gfs2_log_incr_head(sdp);
+	atomic_inc(&sdp->sd_log_in_flight);
 
 	return bh;
 }
@@ -579,45 +615,24 @@
 		gfs2_assert_withdraw(sdp, !pull);
 
 	sdp->sd_log_idle = (tail == sdp->sd_log_flush_head);
-	log_incr_head(sdp);
+	gfs2_log_incr_head(sdp);
 }
 
 static void log_flush_commit(struct gfs2_sbd *sdp)
 {
-	struct list_head *head = &sdp->sd_log_flush_list;
-	struct gfs2_log_buf *lb;
-	struct buffer_head *bh;
-	int flushcount = 0;
+	DEFINE_WAIT(wait);
 
-	while (!list_empty(head)) {
-		lb = list_entry(head->next, struct gfs2_log_buf, lb_list);
-		list_del(&lb->lb_list);
-		bh = lb->lb_bh;
-
-		wait_on_buffer(bh);
-		if (!buffer_uptodate(bh))
-			gfs2_io_error_bh(sdp, bh);
-		if (lb->lb_real) {
-			while (atomic_read(&bh->b_count) != 1)  /* Grrrr... */
-				schedule();
-			free_buffer_head(bh);
-		} else
-			brelse(bh);
-		kfree(lb);
-		flushcount++;
+	if (atomic_read(&sdp->sd_log_in_flight)) {
+		do {
+			prepare_to_wait(&sdp->sd_log_flush_wait, &wait,
+					TASK_UNINTERRUPTIBLE);
+			if (atomic_read(&sdp->sd_log_in_flight))
+				io_schedule();
+		} while(atomic_read(&sdp->sd_log_in_flight));
+		finish_wait(&sdp->sd_log_flush_wait, &wait);
 	}
 
-	/* If nothing was journaled, the header is unplanned and unwanted. */
-	if (flushcount) {
-		log_write_header(sdp, 0, 0);
-	} else {
-		unsigned int tail;
-		tail = current_tail(sdp);
-
-		gfs2_ail1_empty(sdp, 0);
-		if (sdp->sd_log_tail != tail)
-			log_pull_tail(sdp, tail);
-	}
+	log_write_header(sdp, 0, 0);
 }
 
 static void gfs2_ordered_write(struct gfs2_sbd *sdp)
@@ -698,10 +713,16 @@
 	INIT_LIST_HEAD(&ai->ai_ail1_list);
 	INIT_LIST_HEAD(&ai->ai_ail2_list);
 
-	gfs2_assert_withdraw(sdp,
-			     sdp->sd_log_num_buf + sdp->sd_log_num_databuf ==
-			     sdp->sd_log_commited_buf +
-			     sdp->sd_log_commited_databuf);
+	if (sdp->sd_log_num_buf != sdp->sd_log_commited_buf) {
+		printk(KERN_INFO "GFS2: log buf %u %u\n", sdp->sd_log_num_buf,
+		       sdp->sd_log_commited_buf);
+		gfs2_assert_withdraw(sdp, 0);
+	}
+	if (sdp->sd_log_num_databuf != sdp->sd_log_commited_databuf) {
+		printk(KERN_INFO "GFS2: log databuf %u %u\n",
+		       sdp->sd_log_num_databuf, sdp->sd_log_commited_databuf);
+		gfs2_assert_withdraw(sdp, 0);
+	}
 	gfs2_assert_withdraw(sdp,
 			sdp->sd_log_num_revoke == sdp->sd_log_commited_revoke);
 
@@ -713,7 +734,7 @@
 	lops_before_commit(sdp);
 	gfs2_ordered_wait(sdp);
 
-	if (!list_empty(&sdp->sd_log_flush_list))
+	if (sdp->sd_log_head != sdp->sd_log_flush_head)
 		log_flush_commit(sdp);
 	else if (sdp->sd_log_tail != current_tail(sdp) && !sdp->sd_log_idle){
 		gfs2_log_lock(sdp);