Merge tag 'for-5.4/io_uring-2019-09-15' of git://git.kernel.dk/linux-block

Pull io_uring updates from Jens Axboe:

 - Allocate SQ/CQ ring together, more efficient. Expose this through a
   feature flag as well, so we can reduce the number of mmaps by 1
   (Hristo and me)

 - Fix for sequence logic with SQ thread (Jackie).

 - Add support for links with drain commands (Jackie).

 - Improved async merging (me)

 - Improved buffered async write performance (me)

 - Support SQ poll wakeup + event get in single io_uring_enter() (me)

 - Support larger SQ ring size. For epoll conversions, the 4k limit was
   too small for some prod workloads (Daniel).

 - put_user_page() usage (John)

* tag 'for-5.4/io_uring-2019-09-15' of git://git.kernel.dk/linux-block:
  io_uring: increase IORING_MAX_ENTRIES to 32K
  io_uring: make sqpoll wakeup possible with getevents
  io_uring: extend async work merging
  io_uring: limit parallelism of buffered writes
  io_uring: add io_queue_async_work() helper
  io_uring: optimize submit_and_wait API
  io_uring: add support for link with drain
  io_uring: fix wrong sequence setting logic
  io_uring: expose single mmap capability
  io_uring: allocate the two rings together
  fs/io_uring.c: convert put_page() to put_user_page*()
diff --git a/fs/io_uring.c b/fs/io_uring.c
index cfb48bd..0dadbdb 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -75,7 +75,7 @@
 
 #include "internal.h"
 
-#define IORING_MAX_ENTRIES	4096
+#define IORING_MAX_ENTRIES	32768
 #define IORING_MAX_FIXED_FILES	1024
 
 struct io_uring {
@@ -84,27 +84,29 @@
 };
 
 /*
- * This data is shared with the application through the mmap at offset
- * IORING_OFF_SQ_RING.
+ * This data is shared with the application through the mmap at offsets
+ * IORING_OFF_SQ_RING and IORING_OFF_CQ_RING.
  *
  * The offsets to the member fields are published through struct
  * io_sqring_offsets when calling io_uring_setup.
  */
-struct io_sq_ring {
+struct io_rings {
 	/*
 	 * Head and tail offsets into the ring; the offsets need to be
 	 * masked to get valid indices.
 	 *
-	 * The kernel controls head and the application controls tail.
+	 * The kernel controls head of the sq ring and the tail of the cq ring,
+	 * and the application controls tail of the sq ring and the head of the
+	 * cq ring.
 	 */
-	struct io_uring		r;
+	struct io_uring		sq, cq;
 	/*
-	 * Bitmask to apply to head and tail offsets (constant, equals
+	 * Bitmasks to apply to head and tail offsets (constant, equals
 	 * ring_entries - 1)
 	 */
-	u32			ring_mask;
-	/* Ring size (constant, power of 2) */
-	u32			ring_entries;
+	u32			sq_ring_mask, cq_ring_mask;
+	/* Ring sizes (constant, power of 2) */
+	u32			sq_ring_entries, cq_ring_entries;
 	/*
 	 * Number of invalid entries dropped by the kernel due to
 	 * invalid index stored in array
@@ -117,7 +119,7 @@
 	 * counter includes all submissions that were dropped reaching
 	 * the new SQ head (and possibly more).
 	 */
-	u32			dropped;
+	u32			sq_dropped;
 	/*
 	 * Runtime flags
 	 *
@@ -127,43 +129,7 @@
 	 * The application needs a full memory barrier before checking
 	 * for IORING_SQ_NEED_WAKEUP after updating the sq tail.
 	 */
-	u32			flags;
-	/*
-	 * Ring buffer of indices into array of io_uring_sqe, which is
-	 * mmapped by the application using the IORING_OFF_SQES offset.
-	 *
-	 * This indirection could e.g. be used to assign fixed
-	 * io_uring_sqe entries to operations and only submit them to
-	 * the queue when needed.
-	 *
-	 * The kernel modifies neither the indices array nor the entries
-	 * array.
-	 */
-	u32			array[];
-};
-
-/*
- * This data is shared with the application through the mmap at offset
- * IORING_OFF_CQ_RING.
- *
- * The offsets to the member fields are published through struct
- * io_cqring_offsets when calling io_uring_setup.
- */
-struct io_cq_ring {
-	/*
-	 * Head and tail offsets into the ring; the offsets need to be
-	 * masked to get valid indices.
-	 *
-	 * The application controls head and the kernel tail.
-	 */
-	struct io_uring		r;
-	/*
-	 * Bitmask to apply to head and tail offsets (constant, equals
-	 * ring_entries - 1)
-	 */
-	u32			ring_mask;
-	/* Ring size (constant, power of 2) */
-	u32			ring_entries;
+	u32			sq_flags;
 	/*
 	 * Number of completion events lost because the queue was full;
 	 * this should be avoided by the application by making sure
@@ -177,7 +143,7 @@
 	 * As completion events come in out of order this counter is not
 	 * ordered with any other data.
 	 */
-	u32			overflow;
+	u32			cq_overflow;
 	/*
 	 * Ring buffer of completion events.
 	 *
@@ -185,7 +151,7 @@
 	 * produced, so the application is allowed to modify pending
 	 * entries.
 	 */
-	struct io_uring_cqe	cqes[];
+	struct io_uring_cqe	cqes[] ____cacheline_aligned_in_smp;
 };
 
 struct io_mapped_ubuf {
@@ -201,7 +167,7 @@
 	struct list_head	list;
 
 	struct file		*file;
-	off_t			io_end;
+	off_t			io_start;
 	size_t			io_len;
 };
 
@@ -215,8 +181,18 @@
 		bool			compat;
 		bool			account_mem;
 
-		/* SQ ring */
-		struct io_sq_ring	*sq_ring;
+		/*
+		 * Ring buffer of indices into array of io_uring_sqe, which is
+		 * mmapped by the application using the IORING_OFF_SQES offset.
+		 *
+		 * This indirection could e.g. be used to assign fixed
+		 * io_uring_sqe entries to operations and only submit them to
+		 * the queue when needed.
+		 *
+		 * The kernel modifies neither the indices array nor the entries
+		 * array.
+		 */
+		u32			*sq_array;
 		unsigned		cached_sq_head;
 		unsigned		sq_entries;
 		unsigned		sq_mask;
@@ -227,15 +203,13 @@
 	} ____cacheline_aligned_in_smp;
 
 	/* IO offload */
-	struct workqueue_struct	*sqo_wq;
+	struct workqueue_struct	*sqo_wq[2];
 	struct task_struct	*sqo_thread;	/* if using sq thread polling */
 	struct mm_struct	*sqo_mm;
 	wait_queue_head_t	sqo_wait;
 	struct completion	sqo_thread_started;
 
 	struct {
-		/* CQ ring */
-		struct io_cq_ring	*cq_ring;
 		unsigned		cached_cq_tail;
 		unsigned		cq_entries;
 		unsigned		cq_mask;
@@ -244,6 +218,8 @@
 		struct eventfd_ctx	*cq_ev_fd;
 	} ____cacheline_aligned_in_smp;
 
+	struct io_rings	*rings;
+
 	/*
 	 * If used, fixed file set. Writers must ensure that ->refs is dead,
 	 * readers must ensure that ->refs is alive as long as the file* is
@@ -288,6 +264,7 @@
 struct sqe_submit {
 	const struct io_uring_sqe	*sqe;
 	unsigned short			index;
+	u32				sequence;
 	bool				has_user;
 	bool				needs_lock;
 	bool				needs_fixed_file;
@@ -335,6 +312,7 @@
 #define REQ_F_LINK		64	/* linked sqes */
 #define REQ_F_LINK_DONE		128	/* linked sqes done */
 #define REQ_F_FAIL_LINK		256	/* fail rest of links */
+#define REQ_F_SHADOW_DRAIN	512	/* link-drain shadow req */
 	u64			user_data;
 	u32			result;
 	u32			sequence;
@@ -366,6 +344,7 @@
 };
 
 static void io_sq_wq_submit_work(struct work_struct *work);
+static void __io_free_req(struct io_kiocb *req);
 
 static struct kmem_cache *req_cachep;
 
@@ -430,7 +409,7 @@
 	if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN)
 		return false;
 
-	return req->sequence != ctx->cached_cq_tail + ctx->sq_ring->dropped;
+	return req->sequence != ctx->cached_cq_tail + ctx->rings->sq_dropped;
 }
 
 static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
@@ -451,11 +430,11 @@
 
 static void __io_commit_cqring(struct io_ring_ctx *ctx)
 {
-	struct io_cq_ring *ring = ctx->cq_ring;
+	struct io_rings *rings = ctx->rings;
 
-	if (ctx->cached_cq_tail != READ_ONCE(ring->r.tail)) {
+	if (ctx->cached_cq_tail != READ_ONCE(rings->cq.tail)) {
 		/* order cqe stores with ring update */
-		smp_store_release(&ring->r.tail, ctx->cached_cq_tail);
+		smp_store_release(&rings->cq.tail, ctx->cached_cq_tail);
 
 		if (wq_has_sleeper(&ctx->cq_wait)) {
 			wake_up_interruptible(&ctx->cq_wait);
@@ -464,6 +443,24 @@
 	}
 }
 
+static inline void io_queue_async_work(struct io_ring_ctx *ctx,
+				       struct io_kiocb *req)
+{
+	int rw;
+
+	switch (req->submit.sqe->opcode) {
+	case IORING_OP_WRITEV:
+	case IORING_OP_WRITE_FIXED:
+		rw = !(req->rw.ki_flags & IOCB_DIRECT);
+		break;
+	default:
+		rw = 0;
+		break;
+	}
+
+	queue_work(ctx->sqo_wq[rw], &req->work);
+}
+
 static void io_commit_cqring(struct io_ring_ctx *ctx)
 {
 	struct io_kiocb *req;
@@ -471,14 +468,19 @@
 	__io_commit_cqring(ctx);
 
 	while ((req = io_get_deferred_req(ctx)) != NULL) {
+		if (req->flags & REQ_F_SHADOW_DRAIN) {
+			/* Just for drain, free it. */
+			__io_free_req(req);
+			continue;
+		}
 		req->flags |= REQ_F_IO_DRAINED;
-		queue_work(ctx->sqo_wq, &req->work);
+		io_queue_async_work(ctx, req);
 	}
 }
 
 static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
 {
-	struct io_cq_ring *ring = ctx->cq_ring;
+	struct io_rings *rings = ctx->rings;
 	unsigned tail;
 
 	tail = ctx->cached_cq_tail;
@@ -487,11 +489,11 @@
 	 * control dependency is enough as we're using WRITE_ONCE to
 	 * fill the cq entry
 	 */
-	if (tail - READ_ONCE(ring->r.head) == ring->ring_entries)
+	if (tail - READ_ONCE(rings->cq.head) == rings->cq_ring_entries)
 		return NULL;
 
 	ctx->cached_cq_tail++;
-	return &ring->cqes[tail & ctx->cq_mask];
+	return &rings->cqes[tail & ctx->cq_mask];
 }
 
 static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
@@ -510,9 +512,9 @@
 		WRITE_ONCE(cqe->res, res);
 		WRITE_ONCE(cqe->flags, 0);
 	} else {
-		unsigned overflow = READ_ONCE(ctx->cq_ring->overflow);
+		unsigned overflow = READ_ONCE(ctx->rings->cq_overflow);
 
-		WRITE_ONCE(ctx->cq_ring->overflow, overflow + 1);
+		WRITE_ONCE(ctx->rings->cq_overflow, overflow + 1);
 	}
 }
 
@@ -635,7 +637,7 @@
 
 		nxt->flags |= REQ_F_LINK_DONE;
 		INIT_WORK(&nxt->work, io_sq_wq_submit_work);
-		queue_work(req->ctx->sqo_wq, &nxt->work);
+		io_queue_async_work(req->ctx, nxt);
 	}
 }
 
@@ -679,11 +681,11 @@
 		io_free_req(req);
 }
 
-static unsigned io_cqring_events(struct io_cq_ring *ring)
+static unsigned io_cqring_events(struct io_rings *rings)
 {
 	/* See comment at the top of this file */
 	smp_rmb();
-	return READ_ONCE(ring->r.tail) - READ_ONCE(ring->r.head);
+	return READ_ONCE(rings->cq.tail) - READ_ONCE(rings->cq.head);
 }
 
 /*
@@ -836,7 +838,7 @@
 		 * If we do, we can potentially be spinning for commands that
 		 * already triggered a CQE (eg in error).
 		 */
-		if (io_cqring_events(ctx->cq_ring))
+		if (io_cqring_events(ctx->rings))
 			break;
 
 		/*
@@ -1187,6 +1189,28 @@
 	return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
 }
 
+static inline bool io_should_merge(struct async_list *al, struct kiocb *kiocb)
+{
+	if (al->file == kiocb->ki_filp) {
+		off_t start, end;
+
+		/*
+		 * Allow merging if we're anywhere in the range of the same
+		 * page. Generally this happens for sub-page reads or writes,
+		 * and it's beneficial to allow the first worker to bring the
+		 * page in and the piggy backed work can then work on the
+		 * cached page.
+		 */
+		start = al->io_start & PAGE_MASK;
+		end = (al->io_start + al->io_len + PAGE_SIZE - 1) & PAGE_MASK;
+		if (kiocb->ki_pos >= start && kiocb->ki_pos <= end)
+			return true;
+	}
+
+	al->file = NULL;
+	return false;
+}
+
 /*
  * Make a note of the last file/offset/direction we punted to async
  * context. We'll use this information to see if we can piggy back a
@@ -1198,9 +1222,8 @@
 	struct async_list *async_list = &req->ctx->pending_async[rw];
 	struct kiocb *kiocb = &req->rw;
 	struct file *filp = kiocb->ki_filp;
-	off_t io_end = kiocb->ki_pos + len;
 
-	if (filp == async_list->file && kiocb->ki_pos == async_list->io_end) {
+	if (io_should_merge(async_list, kiocb)) {
 		unsigned long max_bytes;
 
 		/* Use 8x RA size as a decent limiter for both reads/writes */
@@ -1213,17 +1236,16 @@
 			req->flags |= REQ_F_SEQ_PREV;
 			async_list->io_len += len;
 		} else {
-			io_end = 0;
-			async_list->io_len = 0;
+			async_list->file = NULL;
 		}
 	}
 
 	/* New file? Reset state. */
 	if (async_list->file != filp) {
-		async_list->io_len = 0;
+		async_list->io_start = kiocb->ki_pos;
+		async_list->io_len = len;
 		async_list->file = filp;
 	}
-	async_list->io_end = io_end;
 }
 
 static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
@@ -1535,7 +1557,7 @@
 	WRITE_ONCE(poll->canceled, true);
 	if (!list_empty(&poll->wait.entry)) {
 		list_del_init(&poll->wait.entry);
-		queue_work(req->ctx->sqo_wq, &req->work);
+		io_queue_async_work(req->ctx, req);
 	}
 	spin_unlock(&poll->head->lock);
 
@@ -1649,7 +1671,7 @@
 		io_cqring_ev_posted(ctx);
 		io_put_req(req);
 	} else {
-		queue_work(ctx->sqo_wq, &req->work);
+		io_queue_async_work(ctx, req);
 	}
 
 	return 1;
@@ -1992,7 +2014,7 @@
  */
 static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req)
 {
-	bool ret = false;
+	bool ret;
 
 	if (!list)
 		return false;
@@ -2038,10 +2060,14 @@
 	flags = READ_ONCE(s->sqe->flags);
 	fd = READ_ONCE(s->sqe->fd);
 
-	if (flags & IOSQE_IO_DRAIN) {
+	if (flags & IOSQE_IO_DRAIN)
 		req->flags |= REQ_F_IO_DRAIN;
-		req->sequence = ctx->cached_sq_head - 1;
-	}
+	/*
+	 * All io need record the previous position, if LINK vs DARIN,
+	 * it can be used to mark the position of the first IO in the
+	 * link list.
+	 */
+	req->sequence = s->sequence;
 
 	if (!io_op_needs_file(s->sqe))
 		return 0;
@@ -2063,21 +2089,12 @@
 	return 0;
 }
 
-static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
-			struct sqe_submit *s)
+static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
+			struct sqe_submit *s, bool force_nonblock)
 {
 	int ret;
 
-	ret = io_req_defer(ctx, req, s->sqe);
-	if (ret) {
-		if (ret != -EIOCBQUEUED) {
-			io_free_req(req);
-			io_cqring_add_event(ctx, s->sqe->user_data, ret);
-		}
-		return 0;
-	}
-
-	ret = __io_submit_sqe(ctx, req, s, true);
+	ret = __io_submit_sqe(ctx, req, s, force_nonblock);
 	if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
 		struct io_uring_sqe *sqe_copy;
 
@@ -2094,7 +2111,7 @@
 				if (list)
 					atomic_inc(&list->cnt);
 				INIT_WORK(&req->work, io_sq_wq_submit_work);
-				queue_work(ctx->sqo_wq, &req->work);
+				io_queue_async_work(ctx, req);
 			}
 
 			/*
@@ -2119,10 +2136,70 @@
 	return ret;
 }
 
+static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
+			struct sqe_submit *s, bool force_nonblock)
+{
+	int ret;
+
+	ret = io_req_defer(ctx, req, s->sqe);
+	if (ret) {
+		if (ret != -EIOCBQUEUED) {
+			io_free_req(req);
+			io_cqring_add_event(ctx, s->sqe->user_data, ret);
+		}
+		return 0;
+	}
+
+	return __io_queue_sqe(ctx, req, s, force_nonblock);
+}
+
+static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req,
+			      struct sqe_submit *s, struct io_kiocb *shadow,
+			      bool force_nonblock)
+{
+	int ret;
+	int need_submit = false;
+
+	if (!shadow)
+		return io_queue_sqe(ctx, req, s, force_nonblock);
+
+	/*
+	 * Mark the first IO in link list as DRAIN, let all the following
+	 * IOs enter the defer list. all IO needs to be completed before link
+	 * list.
+	 */
+	req->flags |= REQ_F_IO_DRAIN;
+	ret = io_req_defer(ctx, req, s->sqe);
+	if (ret) {
+		if (ret != -EIOCBQUEUED) {
+			io_free_req(req);
+			io_cqring_add_event(ctx, s->sqe->user_data, ret);
+			return 0;
+		}
+	} else {
+		/*
+		 * If ret == 0 means that all IOs in front of link io are
+		 * running done. let's queue link head.
+		 */
+		need_submit = true;
+	}
+
+	/* Insert shadow req to defer_list, blocking next IOs */
+	spin_lock_irq(&ctx->completion_lock);
+	list_add_tail(&shadow->list, &ctx->defer_list);
+	spin_unlock_irq(&ctx->completion_lock);
+
+	if (need_submit)
+		return __io_queue_sqe(ctx, req, s, force_nonblock);
+
+	return 0;
+}
+
 #define SQE_VALID_FLAGS	(IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK)
 
 static void io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
-			  struct io_submit_state *state, struct io_kiocb **link)
+			  struct io_submit_state *state, struct io_kiocb **link,
+			  bool force_nonblock)
 {
 	struct io_uring_sqe *sqe_copy;
 	struct io_kiocb *req;
@@ -2175,7 +2252,7 @@
 		INIT_LIST_HEAD(&req->link_list);
 		*link = req;
 	} else {
-		io_queue_sqe(ctx, req, s);
+		io_queue_sqe(ctx, req, s, force_nonblock);
 	}
 }
 
@@ -2205,15 +2282,15 @@
 
 static void io_commit_sqring(struct io_ring_ctx *ctx)
 {
-	struct io_sq_ring *ring = ctx->sq_ring;
+	struct io_rings *rings = ctx->rings;
 
-	if (ctx->cached_sq_head != READ_ONCE(ring->r.head)) {
+	if (ctx->cached_sq_head != READ_ONCE(rings->sq.head)) {
 		/*
 		 * Ensure any loads from the SQEs are done at this point,
 		 * since once we write the new head, the application could
 		 * write new data to them.
 		 */
-		smp_store_release(&ring->r.head, ctx->cached_sq_head);
+		smp_store_release(&rings->sq.head, ctx->cached_sq_head);
 	}
 }
 
@@ -2227,7 +2304,8 @@
  */
 static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
 {
-	struct io_sq_ring *ring = ctx->sq_ring;
+	struct io_rings *rings = ctx->rings;
+	u32 *sq_array = ctx->sq_array;
 	unsigned head;
 
 	/*
@@ -2240,20 +2318,21 @@
 	 */
 	head = ctx->cached_sq_head;
 	/* make sure SQ entry isn't read before tail */
-	if (head == smp_load_acquire(&ring->r.tail))
+	if (head == smp_load_acquire(&rings->sq.tail))
 		return false;
 
-	head = READ_ONCE(ring->array[head & ctx->sq_mask]);
+	head = READ_ONCE(sq_array[head & ctx->sq_mask]);
 	if (head < ctx->sq_entries) {
 		s->index = head;
 		s->sqe = &ctx->sq_sqes[head];
+		s->sequence = ctx->cached_sq_head;
 		ctx->cached_sq_head++;
 		return true;
 	}
 
 	/* drop invalid entries */
 	ctx->cached_sq_head++;
-	ring->dropped++;
+	rings->sq_dropped++;
 	return false;
 }
 
@@ -2262,6 +2341,7 @@
 {
 	struct io_submit_state state, *statep = NULL;
 	struct io_kiocb *link = NULL;
+	struct io_kiocb *shadow_req = NULL;
 	bool prev_was_link = false;
 	int i, submitted = 0;
 
@@ -2276,11 +2356,21 @@
 		 * that's the end of the chain. Submit the previous link.
 		 */
 		if (!prev_was_link && link) {
-			io_queue_sqe(ctx, link, &link->submit);
+			io_queue_link_head(ctx, link, &link->submit, shadow_req,
+						true);
 			link = NULL;
 		}
 		prev_was_link = (sqes[i].sqe->flags & IOSQE_IO_LINK) != 0;
 
+		if (link && (sqes[i].sqe->flags & IOSQE_IO_DRAIN)) {
+			if (!shadow_req) {
+				shadow_req = io_get_req(ctx, NULL);
+				shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN);
+				refcount_dec(&shadow_req->refs);
+			}
+			shadow_req->sequence = sqes[i].sequence;
+		}
+
 		if (unlikely(mm_fault)) {
 			io_cqring_add_event(ctx, sqes[i].sqe->user_data,
 						-EFAULT);
@@ -2288,13 +2378,13 @@
 			sqes[i].has_user = has_user;
 			sqes[i].needs_lock = true;
 			sqes[i].needs_fixed_file = true;
-			io_submit_sqe(ctx, &sqes[i], statep, &link);
+			io_submit_sqe(ctx, &sqes[i], statep, &link, true);
 			submitted++;
 		}
 	}
 
 	if (link)
-		io_queue_sqe(ctx, link, &link->submit);
+		io_queue_link_head(ctx, link, &link->submit, shadow_req, true);
 	if (statep)
 		io_submit_state_end(&state);
 
@@ -2366,7 +2456,7 @@
 						TASK_INTERRUPTIBLE);
 
 			/* Tell userspace we may need a wakeup call */
-			ctx->sq_ring->flags |= IORING_SQ_NEED_WAKEUP;
+			ctx->rings->sq_flags |= IORING_SQ_NEED_WAKEUP;
 			/* make sure to read SQ tail after writing flags */
 			smp_mb();
 
@@ -2380,12 +2470,12 @@
 				schedule();
 				finish_wait(&ctx->sqo_wait, &wait);
 
-				ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
+				ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
 				continue;
 			}
 			finish_wait(&ctx->sqo_wait, &wait);
 
-			ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
+			ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
 		}
 
 		i = 0;
@@ -2426,10 +2516,12 @@
 	return 0;
 }
 
-static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
+static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit,
+			  bool block_for_last)
 {
 	struct io_submit_state state, *statep = NULL;
 	struct io_kiocb *link = NULL;
+	struct io_kiocb *shadow_req = NULL;
 	bool prev_was_link = false;
 	int i, submit = 0;
 
@@ -2439,6 +2531,7 @@
 	}
 
 	for (i = 0; i < to_submit; i++) {
+		bool force_nonblock = true;
 		struct sqe_submit s;
 
 		if (!io_get_sqring(ctx, &s))
@@ -2449,21 +2542,43 @@
 		 * that's the end of the chain. Submit the previous link.
 		 */
 		if (!prev_was_link && link) {
-			io_queue_sqe(ctx, link, &link->submit);
+			io_queue_link_head(ctx, link, &link->submit, shadow_req,
+						force_nonblock);
 			link = NULL;
 		}
 		prev_was_link = (s.sqe->flags & IOSQE_IO_LINK) != 0;
 
+		if (link && (s.sqe->flags & IOSQE_IO_DRAIN)) {
+			if (!shadow_req) {
+				shadow_req = io_get_req(ctx, NULL);
+				shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN);
+				refcount_dec(&shadow_req->refs);
+			}
+			shadow_req->sequence = s.sequence;
+		}
+
 		s.has_user = true;
 		s.needs_lock = false;
 		s.needs_fixed_file = false;
 		submit++;
-		io_submit_sqe(ctx, &s, statep, &link);
+
+		/*
+		 * The caller will block for events after submit, submit the
+		 * last IO non-blocking. This is either the only IO it's
+		 * submitting, or it already submitted the previous ones. This
+		 * improves performance by avoiding an async punt that we don't
+		 * need to do.
+		 */
+		if (block_for_last && submit == to_submit)
+			force_nonblock = false;
+
+		io_submit_sqe(ctx, &s, statep, &link, force_nonblock);
 	}
 	io_commit_sqring(ctx);
 
 	if (link)
-		io_queue_sqe(ctx, link, &link->submit);
+		io_queue_link_head(ctx, link, &link->submit, shadow_req,
+					block_for_last);
 	if (statep)
 		io_submit_state_end(statep);
 
@@ -2477,10 +2592,10 @@
 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
 			  const sigset_t __user *sig, size_t sigsz)
 {
-	struct io_cq_ring *ring = ctx->cq_ring;
+	struct io_rings *rings = ctx->rings;
 	int ret;
 
-	if (io_cqring_events(ring) >= min_events)
+	if (io_cqring_events(rings) >= min_events)
 		return 0;
 
 	if (sig) {
@@ -2496,12 +2611,12 @@
 			return ret;
 	}
 
-	ret = wait_event_interruptible(ctx->wait, io_cqring_events(ring) >= min_events);
+	ret = wait_event_interruptible(ctx->wait, io_cqring_events(rings) >= min_events);
 	restore_saved_sigmask_unless(ret == -ERESTARTSYS);
 	if (ret == -ERESTARTSYS)
 		ret = -EINTR;
 
-	return READ_ONCE(ring->r.head) == READ_ONCE(ring->r.tail) ? ret : 0;
+	return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;
 }
 
 static void __io_sqe_files_unregister(struct io_ring_ctx *ctx)
@@ -2551,11 +2666,15 @@
 
 static void io_finish_async(struct io_ring_ctx *ctx)
 {
+	int i;
+
 	io_sq_thread_stop(ctx);
 
-	if (ctx->sqo_wq) {
-		destroy_workqueue(ctx->sqo_wq);
-		ctx->sqo_wq = NULL;
+	for (i = 0; i < ARRAY_SIZE(ctx->sqo_wq); i++) {
+		if (ctx->sqo_wq[i]) {
+			destroy_workqueue(ctx->sqo_wq[i]);
+			ctx->sqo_wq[i] = NULL;
+		}
 	}
 }
 
@@ -2763,16 +2882,31 @@
 	}
 
 	/* Do QD, or 2 * CPUS, whatever is smallest */
-	ctx->sqo_wq = alloc_workqueue("io_ring-wq", WQ_UNBOUND | WQ_FREEZABLE,
+	ctx->sqo_wq[0] = alloc_workqueue("io_ring-wq",
+			WQ_UNBOUND | WQ_FREEZABLE,
 			min(ctx->sq_entries - 1, 2 * num_online_cpus()));
-	if (!ctx->sqo_wq) {
+	if (!ctx->sqo_wq[0]) {
+		ret = -ENOMEM;
+		goto err;
+	}
+
+	/*
+	 * This is for buffered writes, where we want to limit the parallelism
+	 * due to file locking in file systems. As "normal" buffered writes
+	 * should parellelize on writeout quite nicely, limit us to having 2
+	 * pending. This avoids massive contention on the inode when doing
+	 * buffered async writes.
+	 */
+	ctx->sqo_wq[1] = alloc_workqueue("io_ring-write-wq",
+						WQ_UNBOUND | WQ_FREEZABLE, 2);
+	if (!ctx->sqo_wq[1]) {
 		ret = -ENOMEM;
 		goto err;
 	}
 
 	return 0;
 err:
-	io_sq_thread_stop(ctx);
+	io_finish_async(ctx);
 	mmdrop(ctx->sqo_mm);
 	ctx->sqo_mm = NULL;
 	return ret;
@@ -2821,17 +2955,45 @@
 	return (void *) __get_free_pages(gfp_flags, get_order(size));
 }
 
+static unsigned long rings_size(unsigned sq_entries, unsigned cq_entries,
+				size_t *sq_offset)
+{
+	struct io_rings *rings;
+	size_t off, sq_array_size;
+
+	off = struct_size(rings, cqes, cq_entries);
+	if (off == SIZE_MAX)
+		return SIZE_MAX;
+
+#ifdef CONFIG_SMP
+	off = ALIGN(off, SMP_CACHE_BYTES);
+	if (off == 0)
+		return SIZE_MAX;
+#endif
+
+	sq_array_size = array_size(sizeof(u32), sq_entries);
+	if (sq_array_size == SIZE_MAX)
+		return SIZE_MAX;
+
+	if (check_add_overflow(off, sq_array_size, &off))
+		return SIZE_MAX;
+
+	if (sq_offset)
+		*sq_offset = off;
+
+	return off;
+}
+
 static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries)
 {
-	struct io_sq_ring *sq_ring;
-	struct io_cq_ring *cq_ring;
-	size_t bytes;
+	size_t pages;
 
-	bytes = struct_size(sq_ring, array, sq_entries);
-	bytes += array_size(sizeof(struct io_uring_sqe), sq_entries);
-	bytes += struct_size(cq_ring, cqes, cq_entries);
+	pages = (size_t)1 << get_order(
+		rings_size(sq_entries, cq_entries, NULL));
+	pages += (size_t)1 << get_order(
+		array_size(sizeof(struct io_uring_sqe), sq_entries));
 
-	return (bytes + PAGE_SIZE - 1) / PAGE_SIZE;
+	return pages;
 }
 
 static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
@@ -2845,7 +3007,7 @@
 		struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
 
 		for (j = 0; j < imu->nr_bvecs; j++)
-			put_page(imu->bvec[j].bv_page);
+			put_user_page(imu->bvec[j].bv_page);
 
 		if (ctx->account_mem)
 			io_unaccount_mem(ctx->user, imu->nr_bvecs);
@@ -2989,10 +3151,8 @@
 			 * if we did partial map, or found file backed vmas,
 			 * release any pages we did get
 			 */
-			if (pret > 0) {
-				for (j = 0; j < pret; j++)
-					put_page(pages[j]);
-			}
+			if (pret > 0)
+				put_user_pages(pages, pret);
 			if (ctx->account_mem)
 				io_unaccount_mem(ctx->user, nr_pages);
 			kvfree(imu->bvec);
@@ -3078,9 +3238,8 @@
 	}
 #endif
 
-	io_mem_free(ctx->sq_ring);
+	io_mem_free(ctx->rings);
 	io_mem_free(ctx->sq_sqes);
-	io_mem_free(ctx->cq_ring);
 
 	percpu_ref_exit(&ctx->refs);
 	if (ctx->account_mem)
@@ -3101,10 +3260,10 @@
 	 * io_commit_cqring
 	 */
 	smp_rmb();
-	if (READ_ONCE(ctx->sq_ring->r.tail) - ctx->cached_sq_head !=
-	    ctx->sq_ring->ring_entries)
+	if (READ_ONCE(ctx->rings->sq.tail) - ctx->cached_sq_head !=
+	    ctx->rings->sq_ring_entries)
 		mask |= EPOLLOUT | EPOLLWRNORM;
-	if (READ_ONCE(ctx->cq_ring->r.head) != ctx->cached_cq_tail)
+	if (READ_ONCE(ctx->rings->sq.head) != ctx->cached_cq_tail)
 		mask |= EPOLLIN | EPOLLRDNORM;
 
 	return mask;
@@ -3149,14 +3308,12 @@
 
 	switch (offset) {
 	case IORING_OFF_SQ_RING:
-		ptr = ctx->sq_ring;
+	case IORING_OFF_CQ_RING:
+		ptr = ctx->rings;
 		break;
 	case IORING_OFF_SQES:
 		ptr = ctx->sq_sqes;
 		break;
-	case IORING_OFF_CQ_RING:
-		ptr = ctx->cq_ring;
-		break;
 	default:
 		return -EINVAL;
 	}
@@ -3199,19 +3356,27 @@
 	 * Just return the requested submit count, and wake the thread if
 	 * we were asked to.
 	 */
+	ret = 0;
 	if (ctx->flags & IORING_SETUP_SQPOLL) {
 		if (flags & IORING_ENTER_SQ_WAKEUP)
 			wake_up(&ctx->sqo_wait);
 		submitted = to_submit;
-		goto out_ctx;
-	}
+	} else if (to_submit) {
+		bool block_for_last = false;
 
-	ret = 0;
-	if (to_submit) {
 		to_submit = min(to_submit, ctx->sq_entries);
 
+		/*
+		 * Allow last submission to block in a series, IFF the caller
+		 * asked to wait for events and we don't currently have
+		 * enough. This potentially avoids an async punt.
+		 */
+		if (to_submit == min_complete &&
+		    io_cqring_events(ctx->rings) < min_complete)
+			block_for_last = true;
+
 		mutex_lock(&ctx->uring_lock);
-		submitted = io_ring_submit(ctx, to_submit);
+		submitted = io_ring_submit(ctx, to_submit, block_for_last);
 		mutex_unlock(&ctx->uring_lock);
 	}
 	if (flags & IORING_ENTER_GETEVENTS) {
@@ -3226,7 +3391,6 @@
 		}
 	}
 
-out_ctx:
 	io_ring_drop_ctx_refs(ctx, 1);
 out_fput:
 	fdput(f);
@@ -3243,19 +3407,27 @@
 static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
 				  struct io_uring_params *p)
 {
-	struct io_sq_ring *sq_ring;
-	struct io_cq_ring *cq_ring;
-	size_t size;
+	struct io_rings *rings;
+	size_t size, sq_array_offset;
 
-	sq_ring = io_mem_alloc(struct_size(sq_ring, array, p->sq_entries));
-	if (!sq_ring)
+	size = rings_size(p->sq_entries, p->cq_entries, &sq_array_offset);
+	if (size == SIZE_MAX)
+		return -EOVERFLOW;
+
+	rings = io_mem_alloc(size);
+	if (!rings)
 		return -ENOMEM;
 
-	ctx->sq_ring = sq_ring;
-	sq_ring->ring_mask = p->sq_entries - 1;
-	sq_ring->ring_entries = p->sq_entries;
-	ctx->sq_mask = sq_ring->ring_mask;
-	ctx->sq_entries = sq_ring->ring_entries;
+	ctx->rings = rings;
+	ctx->sq_array = (u32 *)((char *)rings + sq_array_offset);
+	rings->sq_ring_mask = p->sq_entries - 1;
+	rings->cq_ring_mask = p->cq_entries - 1;
+	rings->sq_ring_entries = p->sq_entries;
+	rings->cq_ring_entries = p->cq_entries;
+	ctx->sq_mask = rings->sq_ring_mask;
+	ctx->cq_mask = rings->cq_ring_mask;
+	ctx->sq_entries = rings->sq_ring_entries;
+	ctx->cq_entries = rings->cq_ring_entries;
 
 	size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
 	if (size == SIZE_MAX)
@@ -3265,15 +3437,6 @@
 	if (!ctx->sq_sqes)
 		return -ENOMEM;
 
-	cq_ring = io_mem_alloc(struct_size(cq_ring, cqes, p->cq_entries));
-	if (!cq_ring)
-		return -ENOMEM;
-
-	ctx->cq_ring = cq_ring;
-	cq_ring->ring_mask = p->cq_entries - 1;
-	cq_ring->ring_entries = p->cq_entries;
-	ctx->cq_mask = cq_ring->ring_mask;
-	ctx->cq_entries = cq_ring->ring_entries;
 	return 0;
 }
 
@@ -3377,21 +3540,23 @@
 		goto err;
 
 	memset(&p->sq_off, 0, sizeof(p->sq_off));
-	p->sq_off.head = offsetof(struct io_sq_ring, r.head);
-	p->sq_off.tail = offsetof(struct io_sq_ring, r.tail);
-	p->sq_off.ring_mask = offsetof(struct io_sq_ring, ring_mask);
-	p->sq_off.ring_entries = offsetof(struct io_sq_ring, ring_entries);
-	p->sq_off.flags = offsetof(struct io_sq_ring, flags);
-	p->sq_off.dropped = offsetof(struct io_sq_ring, dropped);
-	p->sq_off.array = offsetof(struct io_sq_ring, array);
+	p->sq_off.head = offsetof(struct io_rings, sq.head);
+	p->sq_off.tail = offsetof(struct io_rings, sq.tail);
+	p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask);
+	p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries);
+	p->sq_off.flags = offsetof(struct io_rings, sq_flags);
+	p->sq_off.dropped = offsetof(struct io_rings, sq_dropped);
+	p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings;
 
 	memset(&p->cq_off, 0, sizeof(p->cq_off));
-	p->cq_off.head = offsetof(struct io_cq_ring, r.head);
-	p->cq_off.tail = offsetof(struct io_cq_ring, r.tail);
-	p->cq_off.ring_mask = offsetof(struct io_cq_ring, ring_mask);
-	p->cq_off.ring_entries = offsetof(struct io_cq_ring, ring_entries);
-	p->cq_off.overflow = offsetof(struct io_cq_ring, overflow);
-	p->cq_off.cqes = offsetof(struct io_cq_ring, cqes);
+	p->cq_off.head = offsetof(struct io_rings, cq.head);
+	p->cq_off.tail = offsetof(struct io_rings, cq.tail);
+	p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask);
+	p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries);
+	p->cq_off.overflow = offsetof(struct io_rings, cq_overflow);
+	p->cq_off.cqes = offsetof(struct io_rings, cqes);
+
+	p->features = IORING_FEAT_SINGLE_MMAP;
 	return ret;
 err:
 	io_ring_ctx_wait_and_kill(ctx);
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index 1e1652f..96ee9d9 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -128,12 +128,18 @@
 	__u32 flags;
 	__u32 sq_thread_cpu;
 	__u32 sq_thread_idle;
-	__u32 resv[5];
+	__u32 features;
+	__u32 resv[4];
 	struct io_sqring_offsets sq_off;
 	struct io_cqring_offsets cq_off;
 };
 
 /*
+ * io_uring_params->features flags
+ */
+#define IORING_FEAT_SINGLE_MMAP		(1U << 0)
+
+/*
  * io_uring_register(2) opcodes and arguments
  */
 #define IORING_REGISTER_BUFFERS		0