drbd: allow petabyte storage on 64bit arch

Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
diff --git a/drivers/block/drbd/drbd_bitmap.c b/drivers/block/drbd/drbd_bitmap.c
index 72cd41a..0e31e57 100644
--- a/drivers/block/drbd/drbd_bitmap.c
+++ b/drivers/block/drbd/drbd_bitmap.c
@@ -37,10 +37,46 @@
  * convention:
  * function name drbd_bm_... => used elsewhere, "public".
  * function name      bm_... => internal to implementation, "private".
+ */
 
- * Note that since find_first_bit returns int, at the current granularity of
- * the bitmap (4KB per byte), this implementation "only" supports up to
- * 1<<(32+12) == 16 TB...
+
+/*
+ * LIMITATIONS:
+ * We want to support >= peta byte of backend storage, while for now still using
+ * a granularity of one bit per 4KiB of storage.
+ * 1 << 50		bytes backend storage (1 PiB)
+ * 1 << (50 - 12)	bits needed
+ *	38 --> we need u64 to index and count bits
+ * 1 << (38 - 3)	bitmap bytes needed
+ *	35 --> we still need u64 to index and count bytes
+ *			(that's 32 GiB of bitmap for 1 PiB storage)
+ * 1 << (35 - 2)	32bit longs needed
+ *	33 --> we'd even need u64 to index and count 32bit long words.
+ * 1 << (35 - 3)	64bit longs needed
+ *	32 --> we could get away with a 32bit unsigned int to index and count
+ *	64bit long words, but I rather stay with unsigned long for now.
+ *	We probably should neither count nor point to bytes or long words
+ *	directly, but either by bitnumber, or by page index and offset.
+ * 1 << (35 - 12)
+ *	22 --> we need that much 4KiB pages of bitmap.
+ *	1 << (22 + 3) --> on a 64bit arch,
+ *	we need 32 MiB to store the array of page pointers.
+ *
+ * Because I'm lazy, and because the resulting patch was too large, too ugly
+ * and still incomplete, on 32bit we still "only" support 16 TiB (minus some),
+ * (1 << 32) bits * 4k storage.
+ *
+
+ * bitmap storage and IO:
+ *	Bitmap is stored little endian on disk, and is kept little endian in
+ *	core memory. Currently we still hold the full bitmap in core as long
+ *	as we are "attached" to a local disk, which at 32 GiB for 1PiB storage
+ *	seems excessive.
+ *
+ *	We plan to reduce the amount of in-core bitmap pages by pageing them in
+ *	and out against their on-disk location as necessary, but need to make
+ *	sure we don't cause too much meta data IO, and must not deadlock in
+ *	tight memory situations. This needs some more work.
  */
 
 /*
@@ -56,13 +92,9 @@
 struct drbd_bitmap {
 	struct page **bm_pages;
 	spinlock_t bm_lock;
-	/* WARNING unsigned long bm_*:
-	 * 32bit number of bit offset is just enough for 512 MB bitmap.
-	 * it will blow up if we make the bitmap bigger...
-	 * not that it makes much sense to have a bitmap that large,
-	 * rather change the granularity to 16k or 64k or something.
-	 * (that implies other problems, however...)
-	 */
+
+	/* see LIMITATIONS: above */
+
 	unsigned long bm_set;       /* nr of set bits; THINK maybe atomic_t? */
 	unsigned long bm_bits;
 	size_t   bm_words;
@@ -517,43 +549,39 @@
 	bm_unmap(p_addr);
 }
 
+/* you better not modify the bitmap while this is running,
+ * or its results will be stale */
 static unsigned long bm_count_bits(struct drbd_bitmap *b)
 {
-	unsigned long *p_addr, *bm, offset = 0;
+	unsigned long *p_addr;
 	unsigned long bits = 0;
-	unsigned long i, do_now;
-	unsigned long words;
+	unsigned long mask = (1UL << (b->bm_bits & BITS_PER_LONG_MASK)) -1;
+	int idx, last_page, i, last_word;
 
-	/* due to 64bit alignment, the last long on a 32bit arch
-	 * may be not used at all. The last used long will likely
-	 * be only partially used, always. Don't count those bits,
-	 * but mask them out. */
-	words = (b->bm_bits + BITS_PER_LONG - 1) >> LN2_BPL;
+	/* because of the "extra long to catch oob access" we allocate in
+	 * drbd_bm_resize, bm_number_of_pages -1 is not necessarily the page
+	 * containing the last _relevant_ bitmap word */
+	last_page = bm_bit_to_page_idx(b, b->bm_bits-1);
 
-	while (offset < words) {
-		i = do_now = min_t(size_t, words-offset, LWPP);
-		p_addr = __bm_map_pidx(b, bm_word_to_page_idx(b, offset), KM_USER0);
-		bm = p_addr + MLPP(offset);
-		while (i--) {
-			bits += hweight_long(*bm++);
-		}
-		offset += do_now;
-		if (offset == words) {
-			/* last word may only be partially used,
-			 * see also bm_clear_surplus. */
-			i = (1UL << (b->bm_bits & (BITS_PER_LONG-1))) -1;
-			if (i) {
-				bits -= hweight_long(p_addr[do_now-1] & ~i);
-				p_addr[do_now-1] &= i;
-			}
-			/* 32bit arch, may have an unused padding long */
-			if (words != b->bm_words)
-				p_addr[do_now] = 0;
-		}
+	/* all but last page */
+	for (idx = 0; idx < last_page; idx++) {
+		p_addr = __bm_map_pidx(b, idx, KM_USER0);
+		for (i = 0; i < LWPP; i++)
+			bits += hweight_long(p_addr[i]);
 		__bm_unmap(p_addr, KM_USER0);
 		cond_resched();
 	}
-
+	/* last (or only) page */
+	last_word = ((b->bm_bits - 1) & BITS_PER_PAGE_MASK) >> LN2_BPL;
+	p_addr = __bm_map_pidx(b, idx, KM_USER0);
+	for (i = 0; i < last_word; i++)
+		bits += hweight_long(p_addr[i]);
+	p_addr[last_word] &= cpu_to_lel(mask);
+	bits += hweight_long(p_addr[last_word]);
+	/* 32bit arch, may have an unused padding long */
+	if (BITS_PER_LONG == 32 && (last_word & 1) == 0)
+		p_addr[last_word+1] = 0;
+	__bm_unmap(p_addr, KM_USER0);
 	return bits;
 }
 
@@ -564,8 +592,6 @@
 	unsigned int idx;
 	size_t do_now, end;
 
-#define BM_SECTORS_PER_BIT (BM_BLOCK_SIZE/512)
-
 	end = offset + len;
 
 	if (end > b->bm_words) {
@@ -645,8 +671,14 @@
 	words = ALIGN(bits, 64) >> LN2_BPL;
 
 	if (get_ldev(mdev)) {
-		D_ASSERT((u64)bits <= (((u64)mdev->ldev->md.md_size_sect-MD_BM_OFFSET) << 12));
+		u64 bits_on_disk = ((u64)mdev->ldev->md.md_size_sect-MD_BM_OFFSET) << 12;
 		put_ldev(mdev);
+		if (bits > bits_on_disk) {
+			dev_info(DEV, "bits = %lu\n", bits);
+			dev_info(DEV, "bits_on_disk = %llu\n", bits_on_disk);
+			err = -ENOSPC;
+			goto out;
+		}
 	}
 
 	/* one extra long to catch off by one errors */
@@ -1113,9 +1145,12 @@
  * @mdev:	DRBD device.
  * @idx:	bitmap page index
  *
- * We don't want to special case on logical_block_size of the underlaying
- * device, so we submit PAGE_SIZE aligned pieces containing the requested enr.
+ * We don't want to special case on logical_block_size of the backend device,
+ * so we submit PAGE_SIZE aligned pieces.
  * Note that on "most" systems, PAGE_SIZE is 4k.
+ *
+ * In case this becomes an issue on systems with larger PAGE_SIZE,
+ * we may want to change this again to write 4k aligned 4k pieces.
  */
 int drbd_bm_write_page(struct drbd_conf *mdev, unsigned int idx) __must_hold(local)
 {
@@ -1144,52 +1179,57 @@
 
 /* NOTE
  * find_first_bit returns int, we return unsigned long.
- * should not make much difference anyways, but ...
+ * For this to work on 32bit arch with bitnumbers > (1<<32),
+ * we'd need to return u64, and get a whole lot of other places
+ * fixed where we still use unsigned long.
  *
  * this returns a bit number, NOT a sector!
  */
-#define BPP_MASK ((1UL << (PAGE_SHIFT+3)) - 1)
 static unsigned long __bm_find_next(struct drbd_conf *mdev, unsigned long bm_fo,
 	const int find_zero_bit, const enum km_type km)
 {
 	struct drbd_bitmap *b = mdev->bitmap;
-	unsigned long i = -1UL;
 	unsigned long *p_addr;
-	unsigned long bit_offset; /* bit offset of the mapped page. */
+	unsigned long bit_offset;
+	unsigned i;
+
 
 	if (bm_fo > b->bm_bits) {
 		dev_err(DEV, "bm_fo=%lu bm_bits=%lu\n", bm_fo, b->bm_bits);
+		bm_fo = DRBD_END_OF_BITMAP;
 	} else {
 		while (bm_fo < b->bm_bits) {
 			/* bit offset of the first bit in the page */
-			bit_offset = bm_fo & ~BPP_MASK;
+			bit_offset = bm_fo & ~BITS_PER_PAGE_MASK;
 			p_addr = __bm_map_pidx(b, bm_bit_to_page_idx(b, bm_fo), km);
 
 			if (find_zero_bit)
-				i = generic_find_next_zero_le_bit(p_addr, PAGE_SIZE*8, bm_fo & BPP_MASK);
+				i = generic_find_next_zero_le_bit(p_addr,
+						PAGE_SIZE*8, bm_fo & BITS_PER_PAGE_MASK);
 			else
-				i = generic_find_next_le_bit(p_addr, PAGE_SIZE*8, bm_fo & BPP_MASK);
+				i = generic_find_next_le_bit(p_addr,
+						PAGE_SIZE*8, bm_fo & BITS_PER_PAGE_MASK);
 
 			__bm_unmap(p_addr, km);
 			if (i < PAGE_SIZE*8) {
-				i = bit_offset + i;
-				if (i >= b->bm_bits)
+				bm_fo = bit_offset + i;
+				if (bm_fo >= b->bm_bits)
 					break;
 				goto found;
 			}
 			bm_fo = bit_offset + PAGE_SIZE*8;
 		}
-		i = -1UL;
+		bm_fo = DRBD_END_OF_BITMAP;
 	}
  found:
-	return i;
+	return bm_fo;
 }
 
 static unsigned long bm_find_next(struct drbd_conf *mdev,
 	unsigned long bm_fo, const int find_zero_bit)
 {
 	struct drbd_bitmap *b = mdev->bitmap;
-	unsigned long i = -1UL;
+	unsigned long i = DRBD_END_OF_BITMAP;
 
 	ERR_IF(!b) return i;
 	ERR_IF(!b->bm_pages) return i;
@@ -1267,9 +1307,9 @@
 			last_page_nr = page_nr;
 		}
 		if (val)
-			c += (0 == generic___test_and_set_le_bit(bitnr & BPP_MASK, p_addr));
+			c += (0 == generic___test_and_set_le_bit(bitnr & BITS_PER_PAGE_MASK, p_addr));
 		else
-			c -= (0 != generic___test_and_clear_le_bit(bitnr & BPP_MASK, p_addr));
+			c -= (0 != generic___test_and_clear_le_bit(bitnr & BITS_PER_PAGE_MASK, p_addr));
 	}
 	if (p_addr)
 		__bm_unmap(p_addr, km);
@@ -1418,7 +1458,7 @@
 		bm_print_lock_info(mdev);
 	if (bitnr < b->bm_bits) {
 		p_addr = bm_map_pidx(b, bm_bit_to_page_idx(b, bitnr));
-		i = generic_test_le_bit(bitnr & BPP_MASK, p_addr) ? 1 : 0;
+		i = generic_test_le_bit(bitnr & BITS_PER_PAGE_MASK, p_addr) ? 1 : 0;
 		bm_unmap(p_addr);
 	} else if (bitnr == b->bm_bits) {
 		i = -1;
@@ -1517,13 +1557,15 @@
 	return count;
 }
 
-/* set all bits covered by the AL-extent al_enr */
+/* Set all bits covered by the AL-extent al_enr.
+ * Returns number of bits changed. */
 unsigned long drbd_bm_ALe_set_all(struct drbd_conf *mdev, unsigned long al_enr)
 {
 	struct drbd_bitmap *b = mdev->bitmap;
 	unsigned long *p_addr, *bm;
 	unsigned long weight;
-	int count, s, e, i, do_now;
+	unsigned long s, e;
+	int count, i, do_now;
 	ERR_IF(!b) return 0;
 	ERR_IF(!b->bm_pages) return 0;
 
@@ -1552,7 +1594,7 @@
 		if (e == b->bm_words)
 			b->bm_set -= bm_clear_surplus(b);
 	} else {
-		dev_err(DEV, "start offset (%d) too large in drbd_bm_ALe_set_all\n", s);
+		dev_err(DEV, "start offset (%lu) too large in drbd_bm_ALe_set_all\n", s);
 	}
 	weight = b->bm_set - weight;
 	spin_unlock_irq(&b->bm_lock);