sock: add MSG_ZEROCOPY

The kernel supports zerocopy sendmsg in virtio and tap. Expand the
infrastructure to support other socket types. Introduce a completion
notification channel over the socket error queue. Notifications are
returned with ee_origin SO_EE_ORIGIN_ZEROCOPY. ee_errno is 0 to avoid
blocking the send/recv path on receiving notifications.

Add reference counting, to support the skb split, merge, resize and
clone operations possible with SOCK_STREAM and other socket types.

The patch does not yet modify any datapaths.

Signed-off-by: Willem de Bruijn <willemb@google.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/core/skbuff.c b/net/core/skbuff.c
index a95877a..0603e44 100644
--- a/net/core/skbuff.c
+++ b/net/core/skbuff.c
@@ -915,6 +915,139 @@ struct sk_buff *skb_morph(struct sk_buff *dst, struct sk_buff *src)
 }
 EXPORT_SYMBOL_GPL(skb_morph);
 
+struct ubuf_info *sock_zerocopy_alloc(struct sock *sk, size_t size)
+{
+	struct ubuf_info *uarg;
+	struct sk_buff *skb;
+
+	WARN_ON_ONCE(!in_task());
+
+	skb = sock_omalloc(sk, 0, GFP_KERNEL);
+	if (!skb)
+		return NULL;
+
+	BUILD_BUG_ON(sizeof(*uarg) > sizeof(skb->cb));
+	uarg = (void *)skb->cb;
+
+	uarg->callback = sock_zerocopy_callback;
+	uarg->desc = atomic_inc_return(&sk->sk_zckey) - 1;
+	uarg->zerocopy = 1;
+	atomic_set(&uarg->refcnt, 0);
+	sock_hold(sk);
+
+	return uarg;
+}
+EXPORT_SYMBOL_GPL(sock_zerocopy_alloc);
+
+static inline struct sk_buff *skb_from_uarg(struct ubuf_info *uarg)
+{
+	return container_of((void *)uarg, struct sk_buff, cb);
+}
+
+void sock_zerocopy_callback(struct ubuf_info *uarg, bool success)
+{
+	struct sk_buff *skb = skb_from_uarg(uarg);
+	struct sock_exterr_skb *serr;
+	struct sock *sk = skb->sk;
+	u16 id = uarg->desc;
+
+	if (sock_flag(sk, SOCK_DEAD))
+		goto release;
+
+	serr = SKB_EXT_ERR(skb);
+	memset(serr, 0, sizeof(*serr));
+	serr->ee.ee_errno = 0;
+	serr->ee.ee_origin = SO_EE_ORIGIN_ZEROCOPY;
+	serr->ee.ee_data = id;
+	if (!success)
+		serr->ee.ee_code |= SO_EE_CODE_ZEROCOPY_COPIED;
+
+	skb_queue_tail(&sk->sk_error_queue, skb);
+	skb = NULL;
+
+	sk->sk_error_report(sk);
+
+release:
+	consume_skb(skb);
+	sock_put(sk);
+}
+EXPORT_SYMBOL_GPL(sock_zerocopy_callback);
+
+void sock_zerocopy_put(struct ubuf_info *uarg)
+{
+	if (uarg && atomic_dec_and_test(&uarg->refcnt)) {
+		if (uarg->callback)
+			uarg->callback(uarg, uarg->zerocopy);
+		else
+			consume_skb(skb_from_uarg(uarg));
+	}
+}
+EXPORT_SYMBOL_GPL(sock_zerocopy_put);
+
+void sock_zerocopy_put_abort(struct ubuf_info *uarg)
+{
+	if (uarg) {
+		struct sock *sk = skb_from_uarg(uarg)->sk;
+
+		atomic_dec(&sk->sk_zckey);
+
+		/* sock_zerocopy_put expects a ref. Most sockets take one per
+		 * skb, which is zero on abort. tcp_sendmsg holds one extra, to
+		 * avoid an skb send inside the main loop triggering uarg free.
+		 */
+		if (sk->sk_type != SOCK_STREAM)
+			atomic_inc(&uarg->refcnt);
+
+		sock_zerocopy_put(uarg);
+	}
+}
+EXPORT_SYMBOL_GPL(sock_zerocopy_put_abort);
+
+extern int __zerocopy_sg_from_iter(struct sock *sk, struct sk_buff *skb,
+				   struct iov_iter *from, size_t length);
+
+int skb_zerocopy_iter_stream(struct sock *sk, struct sk_buff *skb,
+			     struct msghdr *msg, int len,
+			     struct ubuf_info *uarg)
+{
+	struct iov_iter orig_iter = msg->msg_iter;
+	int err, orig_len = skb->len;
+
+	err = __zerocopy_sg_from_iter(sk, skb, &msg->msg_iter, len);
+	if (err == -EFAULT || (err == -EMSGSIZE && skb->len == orig_len)) {
+		/* Streams do not free skb on error. Reset to prev state. */
+		msg->msg_iter = orig_iter;
+		___pskb_trim(skb, orig_len);
+		return err;
+	}
+
+	skb_zcopy_set(skb, uarg);
+	return skb->len - orig_len;
+}
+EXPORT_SYMBOL_GPL(skb_zerocopy_iter_stream);
+
+/* unused only until next patch in the series; will remove attribute */
+static int __attribute__((unused))
+	   skb_zerocopy_clone(struct sk_buff *nskb, struct sk_buff *orig,
+			      gfp_t gfp_mask)
+{
+	if (skb_zcopy(orig)) {
+		if (skb_zcopy(nskb)) {
+			/* !gfp_mask callers are verified to !skb_zcopy(nskb) */
+			if (!gfp_mask) {
+				WARN_ON_ONCE(1);
+				return -ENOMEM;
+			}
+			if (skb_uarg(nskb) == skb_uarg(orig))
+				return 0;
+			if (skb_copy_ubufs(nskb, GFP_ATOMIC))
+				return -EIO;
+		}
+		skb_zcopy_set(nskb, skb_uarg(orig));
+	}
+	return 0;
+}
+
 /**
  *	skb_copy_ubufs	-	copy userspace skb frags buffers to kernel
  *	@skb: the skb to modify