lightnvm: pblk: sched. metadata on write thread

At the moment, line metadata is persisted on a separate work queue, that
is kicked each time that a line is closed. The assumption when designing
this was that freeing the write thread from creating a new write request
was better than the potential impact of writes colliding on the media
(user I/O and metadata I/O). Experimentation has proven that this
assumption is wrong; collision can cause up to 25% of bandwidth and
introduce long tail latencies on the write thread, which potentially
cause user write threads to spend more time spinning to get a free entry
on the write buffer.

This patch moves the metadata logic to the write thread. When a line is
closed, remaining metadata is written in memory and is placed on a
metadata queue. The write thread then takes the metadata corresponding
to the previous line, creates the write request and schedules it to
minimize collisions on the media. Using this approach, we see that we
can saturate the media's bandwidth, which helps reducing both write
latencies and the spinning time for user writer threads.

Signed-off-by: Javier González <javier@cnexlabs.com>
Signed-off-by: Matias Bjørling <matias@cnexlabs.com>
Signed-off-by: Jens Axboe <axboe@kernel.dk>
diff --git a/drivers/lightnvm/pblk-core.c b/drivers/lightnvm/pblk-core.c
index 6fa51eb..6e4b06f 100644
--- a/drivers/lightnvm/pblk-core.c
+++ b/drivers/lightnvm/pblk-core.c
@@ -87,7 +87,7 @@
 		spin_unlock(&line->lock);
 		return;
 	}
-	line->vsc--;
+	le32_add_cpu(line->vsc, -1);
 
 	if (line->state == PBLK_LINESTATE_CLOSED)
 		move_list = pblk_line_gc_list(pblk, line);
@@ -306,28 +306,29 @@
 	struct pblk_line_meta *lm = &pblk->lm;
 	struct pblk_line_mgmt *l_mg = &pblk->l_mg;
 	struct list_head *move_list = NULL;
+	int vsc = le32_to_cpu(*line->vsc);
 
-	if (!line->vsc) {
+	if (!vsc) {
 		if (line->gc_group != PBLK_LINEGC_FULL) {
 			line->gc_group = PBLK_LINEGC_FULL;
 			move_list = &l_mg->gc_full_list;
 		}
-	} else if (line->vsc < lm->mid_thrs) {
+	} else if (vsc < lm->mid_thrs) {
 		if (line->gc_group != PBLK_LINEGC_HIGH) {
 			line->gc_group = PBLK_LINEGC_HIGH;
 			move_list = &l_mg->gc_high_list;
 		}
-	} else if (line->vsc < lm->high_thrs) {
+	} else if (vsc < lm->high_thrs) {
 		if (line->gc_group != PBLK_LINEGC_MID) {
 			line->gc_group = PBLK_LINEGC_MID;
 			move_list = &l_mg->gc_mid_list;
 		}
-	} else if (line->vsc < line->sec_in_line) {
+	} else if (vsc < line->sec_in_line) {
 		if (line->gc_group != PBLK_LINEGC_LOW) {
 			line->gc_group = PBLK_LINEGC_LOW;
 			move_list = &l_mg->gc_low_list;
 		}
-	} else if (line->vsc == line->sec_in_line) {
+	} else if (vsc == line->sec_in_line) {
 		if (line->gc_group != PBLK_LINEGC_EMPTY) {
 			line->gc_group = PBLK_LINEGC_EMPTY;
 			move_list = &l_mg->gc_empty_list;
@@ -337,7 +338,7 @@
 		line->gc_group = PBLK_LINEGC_NONE;
 		move_list =  &l_mg->corrupt_list;
 		pr_err("pblk: corrupted vsc for line %d, vsc:%d (%d/%d/%d)\n",
-						line->id, line->vsc,
+						line->id, vsc,
 						line->sec_in_line,
 						lm->high_thrs, lm->mid_thrs);
 	}
@@ -496,8 +497,20 @@
 	return secs_to_sync;
 }
 
-static u64 __pblk_alloc_page(struct pblk *pblk, struct pblk_line *line,
-			     int nr_secs)
+void pblk_dealloc_page(struct pblk *pblk, struct pblk_line *line, int nr_secs)
+{
+	u64 addr;
+	int i;
+
+	addr = find_next_zero_bit(line->map_bitmap,
+					pblk->lm.sec_per_line, line->cur_sec);
+	line->cur_sec = addr - nr_secs;
+
+	for (i = 0; i < nr_secs; i++, line->cur_sec--)
+		WARN_ON(!test_and_clear_bit(line->cur_sec, line->map_bitmap));
+}
+
+u64 __pblk_alloc_page(struct pblk *pblk, struct pblk_line *line, int nr_secs)
 {
 	u64 addr;
 	int i;
@@ -532,12 +545,24 @@
 	return addr;
 }
 
+u64 pblk_lookup_page(struct pblk *pblk, struct pblk_line *line)
+{
+	u64 paddr;
+
+	spin_lock(&line->lock);
+	paddr = find_next_zero_bit(line->map_bitmap,
+					pblk->lm.sec_per_line, line->cur_sec);
+	spin_unlock(&line->lock);
+
+	return paddr;
+}
+
 /*
  * Submit emeta to one LUN in the raid line at the time to avoid a deadlock when
  * taking the per LUN semaphore.
  */
 static int pblk_line_submit_emeta_io(struct pblk *pblk, struct pblk_line *line,
-				     u64 paddr, int dir)
+				     void *emeta_buf, u64 paddr, int dir)
 {
 	struct nvm_tgt_dev *dev = pblk->dev;
 	struct nvm_geo *geo = &dev->geo;
@@ -546,9 +571,8 @@
 	struct nvm_rq rqd;
 	struct ppa_addr *ppa_list;
 	dma_addr_t dma_ppa_list;
-	void *emeta = line->emeta;
 	int min = pblk->min_write_pgs;
-	int left_ppas = lm->emeta_sec;
+	int left_ppas = lm->emeta_sec[0];
 	int id = line->id;
 	int rq_ppas, rq_len;
 	int cmd_op, bio_op;
@@ -578,7 +602,7 @@
 	rq_ppas = pblk_calc_secs(pblk, left_ppas, 0);
 	rq_len = rq_ppas * geo->sec_size;
 
-	bio = pblk_bio_map_addr(pblk, emeta, rq_ppas, rq_len, GFP_KERNEL);
+	bio = pblk_bio_map_addr(pblk, emeta_buf, rq_ppas, rq_len, GFP_KERNEL);
 	if (IS_ERR(bio)) {
 		ret = PTR_ERR(bio);
 		goto free_rqd_dma;
@@ -660,7 +684,7 @@
 			pblk_log_read_err(pblk, &rqd);
 	}
 
-	emeta += rq_len;
+	emeta_buf += rq_len;
 	left_ppas -= rq_ppas;
 	if (left_ppas)
 		goto next_rq;
@@ -701,7 +725,7 @@
 		bio_op = REQ_OP_WRITE;
 		cmd_op = NVM_OP_PWRITE;
 		flags = pblk_set_progr_mode(pblk, WRITE);
-		lba_list = pblk_line_emeta_to_lbas(line->emeta);
+		lba_list = emeta_to_lbas(pblk, line->emeta->buf);
 	} else if (dir == READ) {
 		bio_op = REQ_OP_READ;
 		cmd_op = NVM_OP_PREAD;
@@ -775,9 +799,11 @@
 	return pblk_line_submit_smeta_io(pblk, line, bpaddr, READ);
 }
 
-int pblk_line_read_emeta(struct pblk *pblk, struct pblk_line *line)
+int pblk_line_read_emeta(struct pblk *pblk, struct pblk_line *line,
+			 void *emeta_buf)
 {
-	return pblk_line_submit_emeta_io(pblk, line, line->emeta_ssec, READ);
+	return pblk_line_submit_emeta_io(pblk, line, emeta_buf,
+						line->emeta_ssec, READ);
 }
 
 static void pblk_setup_e_rq(struct pblk *pblk, struct nvm_rq *rqd,
@@ -863,18 +889,47 @@
 	return 0;
 }
 
+static void pblk_line_setup_metadata(struct pblk_line *line,
+				     struct pblk_line_mgmt *l_mg,
+				     struct pblk_line_meta *lm)
+{
+	int meta_line;
+
+retry_meta:
+	meta_line = find_first_zero_bit(&l_mg->meta_bitmap, PBLK_DATA_LINES);
+	if (meta_line == PBLK_DATA_LINES) {
+		spin_unlock(&l_mg->free_lock);
+		io_schedule();
+		spin_lock(&l_mg->free_lock);
+		goto retry_meta;
+	}
+
+	set_bit(meta_line, &l_mg->meta_bitmap);
+	line->meta_line = meta_line;
+
+	line->smeta = l_mg->sline_meta[meta_line];
+	line->emeta = l_mg->eline_meta[meta_line];
+
+	memset(line->smeta, 0, lm->smeta_len);
+	memset(line->emeta->buf, 0, lm->emeta_len[0]);
+
+	line->emeta->mem = 0;
+	atomic_set(&line->emeta->sync, 0);
+}
+
 /* For now lines are always assumed full lines. Thus, smeta former and current
  * lun bitmaps are omitted.
  */
-static int pblk_line_set_metadata(struct pblk *pblk, struct pblk_line *line,
+static int pblk_line_init_metadata(struct pblk *pblk, struct pblk_line *line,
 				  struct pblk_line *cur)
 {
 	struct nvm_tgt_dev *dev = pblk->dev;
 	struct nvm_geo *geo = &dev->geo;
 	struct pblk_line_meta *lm = &pblk->lm;
 	struct pblk_line_mgmt *l_mg = &pblk->l_mg;
-	struct line_smeta *smeta = line->smeta;
-	struct line_emeta *emeta = line->emeta;
+	struct pblk_emeta *emeta = line->emeta;
+	struct line_emeta *emeta_buf = emeta->buf;
+	struct line_smeta *smeta_buf = (struct line_smeta *)line->smeta;
 	int nr_blk_line;
 
 	/* After erasing the line, new bad blocks might appear and we risk
@@ -897,42 +952,44 @@
 	}
 
 	/* Run-time metadata */
-	line->lun_bitmap = ((void *)(smeta)) + sizeof(struct line_smeta);
+	line->lun_bitmap = ((void *)(smeta_buf)) + sizeof(struct line_smeta);
 
 	/* Mark LUNs allocated in this line (all for now) */
 	bitmap_set(line->lun_bitmap, 0, lm->lun_bitmap_len);
 
-	smeta->header.identifier = cpu_to_le32(PBLK_MAGIC);
-	memcpy(smeta->header.uuid, pblk->instance_uuid, 16);
-	smeta->header.id = cpu_to_le32(line->id);
-	smeta->header.type = cpu_to_le16(line->type);
-	smeta->header.version = cpu_to_le16(1);
+	smeta_buf->header.identifier = cpu_to_le32(PBLK_MAGIC);
+	memcpy(smeta_buf->header.uuid, pblk->instance_uuid, 16);
+	smeta_buf->header.id = cpu_to_le32(line->id);
+	smeta_buf->header.type = cpu_to_le16(line->type);
+	smeta_buf->header.version = cpu_to_le16(1);
 
 	/* Start metadata */
-	smeta->seq_nr = cpu_to_le64(line->seq_nr);
-	smeta->window_wr_lun = cpu_to_le32(geo->nr_luns);
+	smeta_buf->seq_nr = cpu_to_le64(line->seq_nr);
+	smeta_buf->window_wr_lun = cpu_to_le32(geo->nr_luns);
 
 	/* Fill metadata among lines */
 	if (cur) {
 		memcpy(line->lun_bitmap, cur->lun_bitmap, lm->lun_bitmap_len);
-		smeta->prev_id = cpu_to_le32(cur->id);
-		cur->emeta->next_id = cpu_to_le32(line->id);
+		smeta_buf->prev_id = cpu_to_le32(cur->id);
+		cur->emeta->buf->next_id = cpu_to_le32(line->id);
 	} else {
-		smeta->prev_id = cpu_to_le32(PBLK_LINE_EMPTY);
+		smeta_buf->prev_id = cpu_to_le32(PBLK_LINE_EMPTY);
 	}
 
 	/* All smeta must be set at this point */
-	smeta->header.crc = cpu_to_le32(pblk_calc_meta_header_crc(pblk, smeta));
-	smeta->crc = cpu_to_le32(pblk_calc_smeta_crc(pblk, smeta));
+	smeta_buf->header.crc = cpu_to_le32(
+			pblk_calc_meta_header_crc(pblk, &smeta_buf->header));
+	smeta_buf->crc = cpu_to_le32(pblk_calc_smeta_crc(pblk, smeta_buf));
 
 	/* End metadata */
-	memcpy(&emeta->header, &smeta->header, sizeof(struct line_header));
-	emeta->seq_nr = cpu_to_le64(line->seq_nr);
-	emeta->nr_lbas = cpu_to_le64(line->sec_in_line);
-	emeta->nr_valid_lbas = cpu_to_le64(0);
-	emeta->next_id = cpu_to_le32(PBLK_LINE_EMPTY);
-	emeta->crc = cpu_to_le32(0);
-	emeta->prev_id = smeta->prev_id;
+	memcpy(&emeta_buf->header, &smeta_buf->header,
+						sizeof(struct line_header));
+	emeta_buf->seq_nr = cpu_to_le64(line->seq_nr);
+	emeta_buf->nr_lbas = cpu_to_le64(line->sec_in_line);
+	emeta_buf->nr_valid_lbas = cpu_to_le64(0);
+	emeta_buf->next_id = cpu_to_le32(PBLK_LINE_EMPTY);
+	emeta_buf->crc = cpu_to_le32(0);
+	emeta_buf->prev_id = smeta_buf->prev_id;
 
 	return 1;
 }
@@ -987,8 +1044,8 @@
 	 * blocks to make sure that there are enough sectors to store emeta
 	 */
 	bit = lm->sec_per_line;
-	off = lm->sec_per_line - lm->emeta_sec;
-	bitmap_set(line->invalid_bitmap, off, lm->emeta_sec);
+	off = lm->sec_per_line - lm->emeta_sec[0];
+	bitmap_set(line->invalid_bitmap, off, lm->emeta_sec[0]);
 	while (nr_bb) {
 		off -= geo->sec_per_pl;
 		if (!test_bit(off, line->invalid_bitmap)) {
@@ -997,9 +1054,11 @@
 		}
 	}
 
-	line->sec_in_line -= lm->emeta_sec;
+	line->sec_in_line -= lm->emeta_sec[0];
 	line->emeta_ssec = off;
-	line->vsc = line->left_ssecs = line->left_msecs = line->sec_in_line;
+	line->nr_valid_lbas = 0;
+	line->left_ssecs = line->left_msecs = line->sec_in_line;
+	*line->vsc = cpu_to_le32(line->sec_in_line);
 
 	if (lm->sec_per_line - line->sec_in_line !=
 		bitmap_weight(line->invalid_bitmap, lm->sec_per_line)) {
@@ -1046,6 +1105,8 @@
 
 	atomic_set(&line->left_eblks, blk_in_line);
 	atomic_set(&line->left_seblks, blk_in_line);
+
+	line->meta_distance = lm->meta_distance;
 	spin_unlock(&line->lock);
 
 	/* Bad blocks do not need to be erased */
@@ -1170,7 +1231,6 @@
 {
 	struct pblk_line_mgmt *l_mg = &pblk->l_mg;
 	struct pblk_line *line;
-	int meta_line;
 	int is_next = 0;
 
 	spin_lock(&l_mg->free_lock);
@@ -1184,11 +1244,7 @@
 	line->type = PBLK_LINETYPE_DATA;
 	l_mg->data_line = line;
 
-	meta_line = find_first_zero_bit(&l_mg->meta_bitmap, PBLK_DATA_LINES);
-	set_bit(meta_line, &l_mg->meta_bitmap);
-	line->smeta = l_mg->sline_meta[meta_line].meta;
-	line->emeta = l_mg->eline_meta[meta_line].meta;
-	line->meta_line = meta_line;
+	pblk_line_setup_metadata(line, l_mg, &pblk->lm);
 
 	/* Allocate next line for preparation */
 	l_mg->data_next = pblk_line_get(pblk);
@@ -1207,7 +1263,7 @@
 		return NULL;
 
 retry_setup:
-	if (!pblk_line_set_metadata(pblk, line, NULL)) {
+	if (!pblk_line_init_metadata(pblk, line, NULL)) {
 		line = pblk_line_retry(pblk, line);
 		if (!line)
 			return NULL;
@@ -1228,11 +1284,9 @@
 
 struct pblk_line *pblk_line_replace_data(struct pblk *pblk)
 {
-	struct pblk_line_meta *lm = &pblk->lm;
 	struct pblk_line_mgmt *l_mg = &pblk->l_mg;
 	struct pblk_line *cur, *new;
 	unsigned int left_seblks;
-	int meta_line;
 	int is_next = 0;
 
 	cur = l_mg->data_line;
@@ -1263,29 +1317,14 @@
 		is_next = 1;
 	}
 
-retry_meta:
-	meta_line = find_first_zero_bit(&l_mg->meta_bitmap, PBLK_DATA_LINES);
-	if (meta_line == PBLK_DATA_LINES) {
-		spin_unlock(&l_mg->free_lock);
-		io_schedule();
-		spin_lock(&l_mg->free_lock);
-		goto retry_meta;
-	}
-
-	set_bit(meta_line, &l_mg->meta_bitmap);
-	new->smeta = l_mg->sline_meta[meta_line].meta;
-	new->emeta = l_mg->eline_meta[meta_line].meta;
-	new->meta_line = meta_line;
-
-	memset(new->smeta, 0, lm->smeta_len);
-	memset(new->emeta, 0, lm->emeta_len);
+	pblk_line_setup_metadata(new, l_mg, &pblk->lm);
 	spin_unlock(&l_mg->free_lock);
 
 	if (is_next)
 		pblk_rl_free_lines_dec(&pblk->rl, l_mg->data_next);
 
 retry_setup:
-	if (!pblk_line_set_metadata(pblk, new, cur)) {
+	if (!pblk_line_init_metadata(pblk, new, cur)) {
 		new = pblk_line_retry(pblk, new);
 		if (!new)
 			return NULL;
@@ -1311,6 +1350,8 @@
 	if (line->invalid_bitmap)
 		mempool_free(line->invalid_bitmap, pblk->line_meta_pool);
 
+	*line->vsc = cpu_to_le32(EMPTY_ENTRY);
+
 	line->map_bitmap = NULL;
 	line->invalid_bitmap = NULL;
 	line->smeta = NULL;
@@ -1386,14 +1427,10 @@
 void pblk_line_close(struct pblk *pblk, struct pblk_line *line)
 {
 	struct pblk_line_mgmt *l_mg = &pblk->l_mg;
+	struct pblk_line_meta *lm = &pblk->lm;
 	struct list_head *move_list;
 
-	line->emeta->crc = cpu_to_le32(pblk_calc_emeta_crc(pblk, line->emeta));
-
-	if (pblk_line_submit_emeta_io(pblk, line, line->cur_sec, WRITE))
-		pr_err("pblk: line %d close I/O failed\n", line->id);
-
-	WARN(!bitmap_full(line->map_bitmap, line->sec_in_line),
+	WARN(!bitmap_full(line->map_bitmap, lm->sec_per_line),
 				"pblk: corrupt closed line %d\n", line->id);
 
 	spin_lock(&l_mg->free_lock);
@@ -1417,6 +1454,27 @@
 	spin_unlock(&l_mg->gc_lock);
 }
 
+void pblk_line_close_meta(struct pblk *pblk, struct pblk_line *line)
+{
+	struct pblk_line_mgmt *l_mg = &pblk->l_mg;
+	struct pblk_line_meta *lm = &pblk->lm;
+	struct pblk_emeta *emeta = line->emeta;
+	struct line_emeta *emeta_buf = emeta->buf;
+
+	/* No need for exact vsc value; avoid a big line lock and tak aprox. */
+	memcpy(emeta_to_vsc(pblk, emeta_buf), l_mg->vsc_list, lm->vsc_list_len);
+	memcpy(emeta_to_bb(emeta_buf), line->blk_bitmap, lm->blk_bitmap_len);
+
+	emeta_buf->nr_valid_lbas = cpu_to_le64(line->nr_valid_lbas);
+	emeta_buf->crc = cpu_to_le32(pblk_calc_emeta_crc(pblk, emeta_buf));
+
+	spin_lock(&l_mg->close_lock);
+	spin_lock(&line->lock);
+	list_add_tail(&line->list, &l_mg->emeta_list);
+	spin_unlock(&line->lock);
+	spin_unlock(&l_mg->close_lock);
+}
+
 void pblk_line_close_ws(struct work_struct *work)
 {
 	struct pblk_line_ws *line_ws = container_of(work, struct pblk_line_ws,
@@ -1476,7 +1534,7 @@
 	struct nvm_tgt_dev *dev = pblk->dev;
 	struct nvm_geo *geo = &dev->geo;
 	struct pblk_lun *rlun;
-	int lun_id = ppa_list[0].g.ch * geo->luns_per_chnl + ppa_list[0].g.lun;
+	int pos = pblk_ppa_to_pos(geo, ppa_list[0]);
 	int ret;
 
 	/*
@@ -1493,10 +1551,10 @@
 	/* If the LUN has been locked for this same request, do no attempt to
 	 * lock it again
 	 */
-	if (test_and_set_bit(lun_id, lun_bitmap))
+	if (test_and_set_bit(pos, lun_bitmap))
 		return;
 
-	rlun = &pblk->luns[lun_id];
+	rlun = &pblk->luns[pos];
 	ret = down_timeout(&rlun->wr_sem, msecs_to_jiffies(5000));
 	if (ret) {
 		switch (ret) {