Fathom TCP level changes. TracedBuffer for keeping track of all buffers
to be traced. Adding tests for Fathom and TracedBuffer. A lot more.
Please read PR description.
diff --git a/src/core/ext/filters/client_channel/http_connect_handshaker.cc b/src/core/ext/filters/client_channel/http_connect_handshaker.cc
index 4e8b8b7..7ce8da8 100644
--- a/src/core/ext/filters/client_channel/http_connect_handshaker.cc
+++ b/src/core/ext/filters/client_channel/http_connect_handshaker.cc
@@ -320,7 +320,7 @@
   // Take a new ref to be held by the write callback.
   gpr_ref(&handshaker->refcount);
   grpc_endpoint_write(args->endpoint, &handshaker->write_buffer,
-                      &handshaker->request_done_closure);
+                      &handshaker->request_done_closure, nullptr);
   gpr_mu_unlock(&handshaker->mu);
 }
 
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
index dfed824..5bdcb38 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
@@ -50,7 +50,7 @@
   GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
 
   grpc_endpoint* client = grpc_tcp_client_create_from_fd(
-      grpc_fd_create(fd, "client", false), args, "fd-client");
+      grpc_fd_create(fd, "client", true), args, "fd-client");
 
   grpc_transport* transport =
       grpc_create_chttp2_transport(final_args, client, true);
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
index a022878..e4bd91d 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
@@ -44,7 +44,7 @@
   gpr_asprintf(&name, "fd:%d", fd);
 
   grpc_endpoint* server_endpoint =
-      grpc_tcp_create(grpc_fd_create(fd, name, false),
+      grpc_tcp_create(grpc_fd_create(fd, name, true),
                       grpc_server_get_channel_args(server), name);
 
   gpr_free(name);
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index bc6fa0d..7332354 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -1007,7 +1007,8 @@
   grpc_endpoint_write(
       t->ep, &t->outbuf,
       GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end_locked, t,
-                        grpc_combiner_scheduler(t->combiner)));
+                        grpc_combiner_scheduler(t->combiner)),
+      nullptr);
 }
 
 static void write_action_end_locked(void* tp, grpc_error* error) {
diff --git a/src/core/lib/http/httpcli.cc b/src/core/lib/http/httpcli.cc
index 1206007..3bd7a2c 100644
--- a/src/core/lib/http/httpcli.cc
+++ b/src/core/lib/http/httpcli.cc
@@ -163,7 +163,7 @@
 static void start_write(internal_request* req) {
   grpc_slice_ref_internal(req->request_text);
   grpc_slice_buffer_add(&req->outgoing, req->request_text);
-  grpc_endpoint_write(req->ep, &req->outgoing, &req->done_write);
+  grpc_endpoint_write(req->ep, &req->outgoing, &req->done_write, nullptr);
 }
 
 static void on_handshake_done(void* arg, grpc_endpoint* ep) {
diff --git a/src/core/lib/iomgr/buffer_list.cc b/src/core/lib/iomgr/buffer_list.cc
new file mode 100644
index 0000000..4f0b522
--- /dev/null
+++ b/src/core/lib/iomgr/buffer_list.cc
@@ -0,0 +1,143 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/buffer_list.h"
+#include "src/core/lib/iomgr/port.h"
+
+#include <grpc/support/log.h>
+
+#ifdef GRPC_LINUX_ERRQUEUE
+#include <time.h>
+
+#include "src/core/lib/gprpp/memory.h"
+
+namespace grpc_core {
+void TracedBuffer::AddNewEntry(TracedBuffer** head, uint32_t seq_no,
+                               void* arg) {
+  gpr_log(GPR_INFO, "Adding new entry %u", seq_no);
+  GPR_DEBUG_ASSERT(head != nullptr);
+  TracedBuffer* new_elem = New<TracedBuffer>(seq_no, arg);
+  /* Store the current time as the sendmsg time. */
+  new_elem->ts_.sendmsg_time = gpr_now(GPR_CLOCK_REALTIME);
+  if (*head == nullptr) {
+    *head = new_elem;
+    gpr_log(GPR_INFO, "returning");
+    return;
+  }
+  /* Append at the end. */
+  TracedBuffer* ptr = *head;
+  while (ptr->next_ != nullptr) {
+    ptr = ptr->next_;
+  }
+  ptr->next_ = new_elem;
+  gpr_log(GPR_INFO, "returning");
+}
+
+namespace {
+void fill_gpr_from_timestamp(gpr_timespec* gts, const struct timespec* ts) {
+  gts->tv_sec = ts->tv_sec;
+  gts->tv_nsec = static_cast<int32_t>(ts->tv_nsec);
+  gts->clock_type = GPR_CLOCK_REALTIME;
+}
+
+void (*timestamps_callback)(void*, grpc_core::Timestamps*,
+                            grpc_error* shutdown_err);
+} /* namespace */
+
+void TracedBuffer::ProcessTimestamp(TracedBuffer** head,
+                                    struct sock_extended_err* serr,
+                                    struct scm_timestamping* tss) {
+  gpr_log(GPR_INFO, "Got timestamp %d", serr->ee_data);
+  GPR_DEBUG_ASSERT(head != nullptr);
+  TracedBuffer* elem = *head;
+  TracedBuffer* next = nullptr;
+  while (elem != nullptr) {
+    gpr_log(GPR_INFO, "looping");
+    /* The byte number refers to the sequence number of the last byte which this
+     * timestamp relates to. For scheduled and send, we are interested in the
+     * timestamp for the first byte, whereas for ack, we are interested in the
+     * last */
+    if (serr->ee_data >= elem->seq_no_) {
+      switch (serr->ee_info) {
+        case SCM_TSTAMP_SCHED:
+          gpr_log(GPR_INFO, "type sched\n");
+          fill_gpr_from_timestamp(&(elem->ts_.scheduled_time), &(tss->ts[0]));
+          elem = elem->next_;
+          break;
+        case SCM_TSTAMP_SND:
+          gpr_log(GPR_INFO, "type send\n");
+          fill_gpr_from_timestamp(&(elem->ts_.sent_time), &(tss->ts[0]));
+          elem = elem->next_;
+          break;
+        case SCM_TSTAMP_ACK:
+          gpr_log(GPR_INFO, "type ack\n");
+          if (serr->ee_data >= elem->seq_no_) {
+            fill_gpr_from_timestamp(&(elem->ts_.acked_time), &(tss->ts[0]));
+            /* Got all timestamps. Do the callback and free this TracedBuffer.
+             * The thing below can be passed by value if we don't want the
+             * restriction on the lifetime. */
+            timestamps_callback(elem->arg_, &(elem->ts_), GRPC_ERROR_NONE);
+            next = elem->next_;
+            Delete<TracedBuffer>(elem);
+            *head = elem = next;
+            break;
+            default:
+              abort();
+          }
+      }
+    } else {
+      break;
+    }
+  }
+}
+
+void TracedBuffer::Shutdown(TracedBuffer** head, grpc_error* shutdown_err) {
+  GPR_DEBUG_ASSERT(head != nullptr);
+  TracedBuffer* elem = *head;
+  while (elem != nullptr) {
+    if (timestamps_callback) {
+      timestamps_callback(elem->arg_, &(elem->ts_), shutdown_err);
+    }
+    auto* next = elem->next_;
+    Delete<TracedBuffer>(elem);
+    elem = next;
+  }
+  *head = nullptr;
+  GRPC_ERROR_UNREF(shutdown_err);
+}
+
+void grpc_tcp_set_write_timestamps_callback(void (*fn)(void*,
+                                                       grpc_core::Timestamps*,
+                                                       grpc_error* error)) {
+  timestamps_callback = fn;
+}
+} /* namespace grpc_core */
+
+#else /* GRPC_LINUX_ERRQUEUE */
+
+namespace grpc_core {
+void grpc_tcp_set_write_timestamps_callback(void (*fn)(void*,
+                                                       grpc_core::Timestamps*,
+                                                       grpc_error* error)) {
+  gpr_log(GPR_DEBUG, "Timestamps callback is not enabled for this platform");
+}
+} /* namespace grpc_core */
+
+#endif /* GRPC_LINUX_ERRQUEUE */
diff --git a/src/core/lib/iomgr/buffer_list.h b/src/core/lib/iomgr/buffer_list.h
new file mode 100644
index 0000000..d42f97f
--- /dev/null
+++ b/src/core/lib/iomgr/buffer_list.h
@@ -0,0 +1,81 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_BUFFER_LIST_H
+#define GRPC_CORE_LIB_IOMGR_BUFFER_LIST_H
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#include <grpc/support/time.h>
+
+#include "src/core/lib/gprpp/memory.h"
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/internal_errqueue.h"
+
+namespace grpc_core {
+struct Timestamps {
+  gpr_timespec sendmsg_time;
+  gpr_timespec scheduled_time;
+  gpr_timespec sent_time;
+  gpr_timespec acked_time;
+};
+
+#ifdef GRPC_LINUX_ERRQUEUE
+class TracedBuffer {
+ public:
+  /** Add a new entry in the TracedBuffer list pointed to by head */
+  static void AddNewEntry(grpc_core::TracedBuffer** head, uint32_t seq_no,
+                          void* arg);
+
+  /** Processes a timestamp received */
+  static void ProcessTimestamp(grpc_core::TracedBuffer** head,
+                               struct sock_extended_err* serr,
+                               struct scm_timestamping* tss);
+
+  /** Calls the callback for each traced buffer in the list with timestamps that
+   * it has. */
+  static void Shutdown(grpc_core::TracedBuffer** head,
+                       grpc_error* shutdown_err);
+
+ private:
+  GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW
+
+  TracedBuffer(int seq_no, void* arg)
+      : seq_no_(seq_no), arg_(arg), next_(nullptr) {
+    gpr_log(GPR_INFO, "seq_no %d", seq_no_);
+  }
+
+  uint32_t seq_no_; /* The sequence number for the last byte in the buffer */
+  void* arg_;       /* The arg to pass to timestamps_callback */
+  grpc_core::Timestamps ts_;
+  grpc_core::TracedBuffer* next_;
+};
+#else  /* GRPC_LINUX_ERRQUEUE */
+class TracedBuffer {};
+#endif /* GRPC_LINUX_ERRQUEUE */
+
+/** Sets the timestamp callback */
+void grpc_tcp_set_write_timestamps_callback(void (*fn)(void*,
+                                                       grpc_core::Timestamps*,
+                                                       grpc_error* error));
+
+};  // namespace grpc_core
+
+#endif /* GRPC_CORE_LIB_IOMGR_BUFFER_LIST_H */
diff --git a/src/core/lib/iomgr/endpoint.cc b/src/core/lib/iomgr/endpoint.cc
index 92e7930..44fb47e 100644
--- a/src/core/lib/iomgr/endpoint.cc
+++ b/src/core/lib/iomgr/endpoint.cc
@@ -28,8 +28,8 @@
 }
 
 void grpc_endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
-                         grpc_closure* cb) {
-  ep->vtable->write(ep, slices, cb);
+                         grpc_closure* cb, void* arg) {
+  ep->vtable->write(ep, slices, cb, arg);
 }
 
 void grpc_endpoint_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {
diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h
index 15db164..ea39ea6 100644
--- a/src/core/lib/iomgr/endpoint.h
+++ b/src/core/lib/iomgr/endpoint.h
@@ -33,10 +33,12 @@
 
 typedef struct grpc_endpoint grpc_endpoint;
 typedef struct grpc_endpoint_vtable grpc_endpoint_vtable;
+class Timestamps;
 
 struct grpc_endpoint_vtable {
   void (*read)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb);
-  void (*write)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb);
+  void (*write)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb,
+                void* arg);
   void (*add_to_pollset)(grpc_endpoint* ep, grpc_pollset* pollset);
   void (*add_to_pollset_set)(grpc_endpoint* ep, grpc_pollset_set* pollset);
   void (*delete_from_pollset_set)(grpc_endpoint* ep, grpc_pollset_set* pollset);
@@ -72,7 +74,7 @@
    it is a valid slice buffer.
    */
 void grpc_endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
-                         grpc_closure* cb);
+                         grpc_closure* cb, void* arg);
 
 /* Causes any pending and future read/write callbacks to run immediately with
    success==0 */
diff --git a/src/core/lib/iomgr/endpoint_cfstream.cc b/src/core/lib/iomgr/endpoint_cfstream.cc
index c3bc0cc..70d674f 100644
--- a/src/core/lib/iomgr/endpoint_cfstream.cc
+++ b/src/core/lib/iomgr/endpoint_cfstream.cc
@@ -268,7 +268,7 @@
 }
 
 static void CFStreamWrite(grpc_endpoint* ep, grpc_slice_buffer* slices,
-                          grpc_closure* cb) {
+                          grpc_closure* cb, void *arg) {
   CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
   if (grpc_tcp_trace.enabled()) {
     gpr_log(GPR_DEBUG, "CFStream endpoint:%p write (%p, %p) length:%zu",
diff --git a/src/core/lib/iomgr/endpoint_pair_posix.cc b/src/core/lib/iomgr/endpoint_pair_posix.cc
index 5c5c246..3afbfd7 100644
--- a/src/core/lib/iomgr/endpoint_pair_posix.cc
+++ b/src/core/lib/iomgr/endpoint_pair_posix.cc
@@ -59,11 +59,11 @@
   grpc_core::ExecCtx exec_ctx;
 
   gpr_asprintf(&final_name, "%s:client", name);
-  p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name, false), args,
+  p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name, true), args,
                              "socketpair-server");
   gpr_free(final_name);
   gpr_asprintf(&final_name, "%s:server", name);
-  p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name, false), args,
+  p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name, true), args,
                              "socketpair-client");
   gpr_free(final_name);
 
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc
index 86a0243..e8ee5c4 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.cc
+++ b/src/core/lib/iomgr/ev_epoll1_linux.cc
@@ -386,15 +386,27 @@
 }
 
 static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
-  fd->read_closure->NotifyOn(closure);
+  if (closure != nullptr) {
+    fd->read_closure->NotifyOn(closure);
+  } else {
+    fd->read_closure->SetReady();
+  }
 }
 
 static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
-  fd->write_closure->NotifyOn(closure);
+  if (closure != nullptr) {
+    fd->write_closure->NotifyOn(closure);
+  } else {
+    fd->write_closure->SetReady();
+  }
 }
 
 static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
-  fd->error_closure->NotifyOn(closure);
+  if (closure != nullptr) {
+    fd->error_closure->NotifyOn(closure);
+  } else {
+    fd->error_closure->SetReady();
+  }
 }
 
 static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) {
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index 7b36841..b17aa90 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -539,15 +539,27 @@
 }
 
 static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
-  fd->read_closure->NotifyOn(closure);
+  if (closure != nullptr) {
+    fd->read_closure->NotifyOn(closure);
+  } else {
+    fd->read_closure->SetReady();
+  }
 }
 
 static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
-  fd->write_closure->NotifyOn(closure);
+  if (closure != nullptr) {
+    fd->write_closure->NotifyOn(closure);
+  } else {
+    fd->write_closure->SetReady();
+  }
 }
 
 static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
-  fd->error_closure->NotifyOn(closure);
+  if (closure != nullptr) {
+    fd->error_closure->NotifyOn(closure);
+  } else {
+    fd->error_closure->SetReady();
+  }
 }
 
 /*******************************************************************************
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc
index 2189801..7bdfa22 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.cc
+++ b/src/core/lib/iomgr/ev_epollsig_linux.cc
@@ -947,15 +947,27 @@
 }
 
 static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
-  fd->read_closure->NotifyOn(closure);
+  if (closure != nullptr) {
+    fd->read_closure->NotifyOn(closure);
+  } else {
+    fd->read_closure->SetReady();
+  }
 }
 
 static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
-  fd->write_closure->NotifyOn(closure);
+  if (closure != nullptr) {
+    fd->write_closure->NotifyOn(closure);
+  } else {
+    fd->write_closure->SetReady();
+  }
 }
 
 static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
-  fd->error_closure->NotifyOn(closure);
+  if (closure != nullptr) {
+    fd->error_closure->NotifyOn(closure);
+  } else {
+    fd->error_closure->SetReady();
+  }
 }
 
 /*******************************************************************************
diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc
index 1139b32..6ca985c 100644
--- a/src/core/lib/iomgr/ev_posix.cc
+++ b/src/core/lib/iomgr/ev_posix.cc
@@ -200,8 +200,8 @@
 grpc_fd* grpc_fd_create(int fd, const char* name, bool track_err) {
   GRPC_POLLING_API_TRACE("fd_create(%d, %s, %d)", fd, name, track_err);
   GRPC_FD_TRACE("fd_create(%d, %s, %d)", fd, name, track_err);
-  GPR_DEBUG_ASSERT(!track_err || g_event_engine->can_track_err);
-  return g_event_engine->fd_create(fd, name, track_err);
+  return g_event_engine->fd_create(fd, name,
+                                   track_err && g_event_engine->can_track_err);
 }
 
 int grpc_fd_wrapped_fd(grpc_fd* fd) {
diff --git a/src/core/lib/iomgr/internal_errqueue.cc b/src/core/lib/iomgr/internal_errqueue.cc
new file mode 100644
index 0000000..3f3da66
--- /dev/null
+++ b/src/core/lib/iomgr/internal_errqueue.cc
@@ -0,0 +1,40 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#include "src/core/lib/iomgr/internal_errqueue.h"
+
+#ifdef GRPC_POSIX_SOCKET_TCP
+
+#ifdef GPR_LINUX
+#include <linux/version.h>
+#endif /* GPR_LINUX */
+
+bool kernel_supports_errqueue() {
+#ifdef LINUX_VERSION_CODE
+#if LINUX_VERSION_CODE <= KERNEL_VERSION(4, 0, 0)
+  return true;
+#endif /* LINUX_VERSION_CODE <= KERNEL_VERSION(4, 0, 0) */
+#endif /* LINUX_VERSION_CODE */
+  return false;
+}
+
+#endif /* GRPC_POSIX_SOCKET_TCP */
diff --git a/src/core/lib/iomgr/internal_errqueue.h b/src/core/lib/iomgr/internal_errqueue.h
new file mode 100644
index 0000000..92292e9
--- /dev/null
+++ b/src/core/lib/iomgr/internal_errqueue.h
@@ -0,0 +1,76 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_INTERNAL_ERRQUEUE_H
+#define GRPC_CORE_LIB_IOMGR_INTERNAL_ERRQUEUE_H
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_POSIX_SOCKET_TCP
+
+#include <sys/types.h>
+#include <time.h>
+
+#ifdef GRPC_LINUX_ERRQUEUE
+#include <linux/errqueue.h>
+#include <linux/net_tstamp.h>
+#include <sys/socket.h>
+#endif /* GRPC_LINUX_ERRQUEUE */
+
+namespace grpc_core {
+/* Redefining scm_timestamping in the same way that <linux/errqueue.h> defines
+ * it, so that code compiles on systems that don't have it. */
+struct scm_timestamping {
+  struct timespec ts[3];
+};
+
+/* Also redefine timestamp types */
+/* The timestamp type for when the driver passed skb to NIC, or HW. */
+constexpr int SCM_TSTAMP_SND = 0;
+/* The timestamp type for when data entered the packet scheduler. */
+constexpr int SCM_TSTAMP_SCHED = 1;
+/* The timestamp type for when data acknowledged by peer. */
+constexpr int SCM_TSTAMP_ACK = 2;
+
+/* Redefine required constants from <linux/net_tstamp.h> */
+constexpr uint32_t SOF_TIMESTAMPING_TX_SOFTWARE = 1u << 1;
+constexpr uint32_t SOF_TIMESTAMPING_SOFTWARE = 1u << 4;
+constexpr uint32_t SOF_TIMESTAMPING_OPT_ID = 1u << 7;
+constexpr uint32_t SOF_TIMESTAMPING_TX_SCHED = 1u << 8;
+constexpr uint32_t SOF_TIMESTAMPING_TX_ACK = 1u << 9;
+constexpr uint32_t SOF_TIMESTAMPING_OPT_TSONLY = 1u << 11;
+
+constexpr uint32_t kTimestampingSocketOptions = SOF_TIMESTAMPING_SOFTWARE |
+                                                SOF_TIMESTAMPING_OPT_ID |
+                                                SOF_TIMESTAMPING_OPT_TSONLY;
+
+constexpr uint32_t kTimestampingRecordingOptions =
+    SOF_TIMESTAMPING_TX_SCHED | SOF_TIMESTAMPING_TX_SOFTWARE |
+    SOF_TIMESTAMPING_TX_ACK;
+
+/* Returns true if kernel is capable of supporting errqueue and timestamping.
+ * Currently allowing only linux kernels above 4.0.0
+ */
+bool kernel_supports_errqueue();
+}  // namespace grpc_core
+
+#endif /* GRPC_POSIX_SOCKET_TCP */
+
+#endif /* GRPC_CORE_LIB_IOMGR_INTERNAL_ERRQUEUE_H */
diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h
index 80d8e63..a06e9f8 100644
--- a/src/core/lib/iomgr/port.h
+++ b/src/core/lib/iomgr/port.h
@@ -60,6 +60,7 @@
 #define GRPC_HAVE_IP_PKTINFO 1
 #define GRPC_HAVE_MSG_NOSIGNAL 1
 #define GRPC_HAVE_UNIX_SOCKET 1
+#define GRPC_LINUX_ERRQUEUE 1
 #define GRPC_LINUX_MULTIPOLL_WITH_EPOLL 1
 #define GRPC_POSIX_FORK 1
 #define GRPC_POSIX_HOST_NAME_MAX 1
diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc
index 296ee74..9c989b7 100644
--- a/src/core/lib/iomgr/tcp_client_posix.cc
+++ b/src/core/lib/iomgr/tcp_client_posix.cc
@@ -279,7 +279,7 @@
   }
   addr_str = grpc_sockaddr_to_uri(mapped_addr);
   gpr_asprintf(&name, "tcp-client:%s", addr_str);
-  *fdobj = grpc_fd_create(fd, name, false);
+  *fdobj = grpc_fd_create(fd, name, true);
   gpr_free(name);
   gpr_free(addr_str);
   return GRPC_ERROR_NONE;
diff --git a/src/core/lib/iomgr/tcp_custom.cc b/src/core/lib/iomgr/tcp_custom.cc
index 990e8d6..e02a189 100644
--- a/src/core/lib/iomgr/tcp_custom.cc
+++ b/src/core/lib/iomgr/tcp_custom.cc
@@ -221,7 +221,7 @@
 }
 
 static void endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* write_slices,
-                           grpc_closure* cb) {
+                           grpc_closure* cb, void* arg) {
   custom_tcp_endpoint* tcp = (custom_tcp_endpoint*)ep;
   GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
 
diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc
index 9df2e20..97251e7 100644
--- a/src/core/lib/iomgr/tcp_posix.cc
+++ b/src/core/lib/iomgr/tcp_posix.cc
@@ -26,7 +26,9 @@
 #include "src/core/lib/iomgr/tcp_posix.h"
 
 #include <errno.h>
+#include <netinet/in.h>
 #include <stdbool.h>
+#include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 #include <sys/socket.h>
@@ -45,6 +47,7 @@
 #include "src/core/lib/debug/trace.h"
 #include "src/core/lib/gpr/string.h"
 #include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/iomgr/buffer_list.h"
 #include "src/core/lib/iomgr/ev_posix.h"
 #include "src/core/lib/iomgr/executor.h"
 #include "src/core/lib/profiling/timers.h"
@@ -96,17 +99,26 @@
 
   grpc_closure read_done_closure;
   grpc_closure write_done_closure;
+  grpc_closure error_closure;
 
   char* peer_string;
 
   grpc_resource_user* resource_user;
   grpc_resource_user_slice_allocator slice_allocator;
+
+  grpc_core::TracedBuffer* head;
+  gpr_mu traced_buffer_lock;
+  void* outgoing_buffer_arg;
+  int bytes_counter;
+  bool socket_ts_enabled;
+  gpr_atm stop_error_notification;
 };
 
 struct backup_poller {
   gpr_mu* pollset_mu;
   grpc_closure run_poller;
 };
+
 }  // namespace
 
 #define BACKUP_POLLER_POLLSET(b) ((grpc_pollset*)((b) + 1))
@@ -301,6 +313,7 @@
   grpc_slice_buffer_destroy_internal(&tcp->last_read_buffer);
   grpc_resource_user_unref(tcp->resource_user);
   gpr_free(tcp->peer_string);
+  gpr_mu_destroy(&tcp->traced_buffer_lock);
   gpr_free(tcp);
 }
 
@@ -346,6 +359,11 @@
   grpc_network_status_unregister_endpoint(ep);
   grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
   grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer);
+  if (grpc_event_engine_can_track_errors()) {
+    // gpr_log(GPR_INFO, "stop errors");
+    gpr_atm_no_barrier_store(&tcp->stop_error_notification, true);
+    grpc_fd_notify_on_error(tcp->em_fd, nullptr);
+  }
   TCP_UNREF(tcp, "destroy");
 }
 
@@ -512,6 +530,215 @@
   }
 }
 
+/** This is to be called if outgoing_buffer_arg is not null. On linux platforms,
+ * this will call sendmsg with socket options set to collect timestamps inside
+ * the kernel. On return, sent_length is set to the return value of the sendmsg
+ * call. Returns false if setting the socket options failed. This is not
+ * implemented for non-linux platforms currently, and crashes out.
+ */
+static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
+                                      size_t sending_length,
+                                      ssize_t* sent_length, grpc_error** error);
+static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error);
+
+#ifdef GRPC_LINUX_ERRQUEUE
+static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
+                                      size_t sending_length,
+                                      ssize_t* sent_length,
+                                      grpc_error** error) {
+  if (!tcp->socket_ts_enabled) {
+    // gpr_log(GPR_INFO, "setting options yo");
+    uint32_t opt = grpc_core::kTimestampingSocketOptions;
+    if (setsockopt(tcp->fd, SOL_SOCKET, SO_TIMESTAMPING,
+                   static_cast<void*>(&opt), sizeof(opt)) != 0) {
+      *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "setsockopt"), tcp);
+      grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer);
+      gpr_log(GPR_INFO, "failed to set");
+      return false;
+    }
+    tcp->socket_ts_enabled = true;
+  }
+  union {
+    char cmsg_buf[CMSG_SPACE(sizeof(uint32_t))];
+    struct cmsghdr align;
+  } u;
+  cmsghdr* cmsg = reinterpret_cast<cmsghdr*>(u.cmsg_buf);
+  cmsg->cmsg_level = SOL_SOCKET;
+  cmsg->cmsg_type = SO_TIMESTAMPING;
+  cmsg->cmsg_len = CMSG_LEN(sizeof(uint32_t));
+  *reinterpret_cast<int*>(CMSG_DATA(cmsg)) =
+      grpc_core::kTimestampingRecordingOptions;
+  msg->msg_control = u.cmsg_buf;
+  msg->msg_controllen = CMSG_SPACE(sizeof(uint32_t));
+
+  ssize_t length;
+  do {
+    GRPC_STATS_INC_SYSCALL_WRITE();
+    length = sendmsg(tcp->fd, msg, SENDMSG_FLAGS);
+  } while (length < 0 && errno == EINTR);
+  *sent_length = length;
+  /* Only save timestamps if all the bytes were taken by sendmsg. */
+  if (sending_length == static_cast<size_t>(length)) {
+    gpr_mu_lock(&tcp->traced_buffer_lock);
+    grpc_core::TracedBuffer::AddNewEntry(
+        &tcp->head, static_cast<int>(tcp->bytes_counter + length),
+        tcp->outgoing_buffer_arg);
+    gpr_mu_unlock(&tcp->traced_buffer_lock);
+    tcp->outgoing_buffer_arg = nullptr;
+  }
+  return true;
+}
+
+struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg,
+                                  struct cmsghdr* cmsg) {
+  auto next_cmsg = CMSG_NXTHDR(msg, cmsg);
+  if (next_cmsg == nullptr) {
+    gpr_log(GPR_ERROR, "Received timestamp without extended error");
+    return cmsg;
+  }
+
+  if (!(next_cmsg->cmsg_level == SOL_IP || next_cmsg->cmsg_level == SOL_IPV6) ||
+      !(next_cmsg->cmsg_type == IP_RECVERR ||
+        next_cmsg->cmsg_type == IPV6_RECVERR)) {
+    gpr_log(GPR_ERROR, "Unexpected cmsg");
+    return cmsg;
+  }
+
+  auto tss =
+      reinterpret_cast<struct grpc_core::scm_timestamping*>(CMSG_DATA(cmsg));
+  auto serr = reinterpret_cast<struct sock_extended_err*>(CMSG_DATA(next_cmsg));
+  if (serr->ee_errno != ENOMSG ||
+      serr->ee_origin != SO_EE_ORIGIN_TIMESTAMPING) {
+    gpr_log(GPR_ERROR, "Unexpected cmsg");
+    return cmsg;
+  }
+  /* The error handling can potentially be done on another thread so we need
+   * to protect the traced buffer list. A lock free list might be better. Using
+   * a simple mutex for now. */
+  gpr_mu_lock(&tcp->traced_buffer_lock);
+  // gpr_log(GPR_INFO, "processing timestamp");
+  grpc_core::TracedBuffer::ProcessTimestamp(&tcp->head, serr, tss);
+  gpr_mu_unlock(&tcp->traced_buffer_lock);
+  return next_cmsg;
+}
+
+/** For linux platforms, reads the socket's error queue and processes error
+ * messages from the queue. Returns true if all the errors processed were
+ * timestamps. Returns false if the any of the errors were not timestamps. For
+ * non-linux platforms, error processing is not enabled currently, and hence
+ * crashes out.
+ */
+static bool process_errors(grpc_tcp* tcp) {
+  // gpr_log(GPR_INFO, "process errors");
+  while (true) {
+    // gpr_log(GPR_INFO, "looping");
+    struct iovec iov;
+    iov.iov_base = nullptr;
+    iov.iov_len = 0;
+    struct msghdr msg;
+    msg.msg_name = nullptr;
+    msg.msg_namelen = 0;
+    msg.msg_iov = &iov;
+    msg.msg_iovlen = 0;
+    msg.msg_flags = 0;
+
+    union {
+      char rbuf[1024 /*CMSG_SPACE(sizeof(scm_timestamping)) +
+                CMSG_SPACE(sizeof(sock_extended_err) + sizeof(sockaddr_in))*/];
+      struct cmsghdr align;
+    } aligned_buf;
+    memset(&aligned_buf, 0, sizeof(aligned_buf));
+
+    msg.msg_control = aligned_buf.rbuf;
+    msg.msg_controllen = sizeof(aligned_buf.rbuf);
+
+    int r, saved_errno;
+    do {
+      // gpr_log(GPR_INFO, "error recvmsg");
+      r = recvmsg(tcp->fd, &msg, MSG_ERRQUEUE);
+      saved_errno = errno;
+    } while (r < 0 && saved_errno == EINTR);
+
+    if (r == -1 && saved_errno == EAGAIN) {
+      // gpr_log(GPR_INFO, "here");
+      return true; /* No more errors to process */
+    }
+    if (r == -1) {
+      // gpr_log(GPR_INFO, "%d", saved_errno);
+      return false;
+    }
+    if ((msg.msg_flags & MSG_CTRUNC) == 1) {
+      gpr_log(GPR_INFO, "Error message was truncated.");
+    }
+
+    // gpr_log(GPR_INFO, "%d %lu", r, msg.msg_controllen);
+    if (msg.msg_controllen == 0) {
+      /* There was no control message read. Return now */
+      return true;
+    }
+    for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg && cmsg->cmsg_len;
+         cmsg = CMSG_NXTHDR(&msg, cmsg)) {
+      if (cmsg->cmsg_level != SOL_SOCKET ||
+          cmsg->cmsg_type != SCM_TIMESTAMPING) {
+        /* Got a weird one, not a timestamp */
+        gpr_log(GPR_INFO, "weird %d %d %d", r, cmsg->cmsg_level,
+                cmsg->cmsg_type);
+        continue;
+      }
+      process_timestamp(tcp, &msg, cmsg);
+    }
+  }
+}
+
+static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) {
+  // gpr_log(GPR_INFO, "grpc_tcp_handle_error");
+  grpc_tcp* tcp = static_cast<grpc_tcp*>(arg);
+  if (grpc_tcp_trace.enabled()) {
+    gpr_log(GPR_INFO, "TCP:%p got_error: %s", tcp, grpc_error_string(error));
+  }
+
+  if (error != GRPC_ERROR_NONE ||
+      static_cast<bool>(gpr_atm_acq_load(&tcp->stop_error_notification))) {
+    /* We aren't going to register to hear on error anymore, so it is safe to
+     * unref. */
+    // gpr_log(GPR_INFO, "%p %d", error,
+    //        static_cast<bool>(gpr_atm_acq_load(&tcp->stop_error_notification)));
+    // gpr_log(GPR_INFO, "unref");
+    grpc_core::TracedBuffer::Shutdown(&tcp->head, GRPC_ERROR_REF(error));
+    TCP_UNREF(tcp, "error");
+    // gpr_log(GPR_INFO, "here");
+  } else {
+    if (!process_errors(tcp)) {
+      // gpr_log(GPR_INFO, "no timestamps");
+      /* This was not a timestamps error. This was an actual error. Set the
+       * read and write closures to be ready.
+       */
+      grpc_fd_notify_on_read(tcp->em_fd, nullptr);
+      grpc_fd_notify_on_write(tcp->em_fd, nullptr);
+    }
+    GRPC_CLOSURE_INIT(&tcp->error_closure, tcp_handle_error, tcp,
+                      grpc_schedule_on_exec_ctx);
+    grpc_fd_notify_on_error(tcp->em_fd, &tcp->error_closure);
+    // gpr_log(GPR_INFO, "udhar se");
+  }
+}
+
+#else  /* GRPC_LINUX_ERRQUEUE */
+static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
+                                      size_t sending_length,
+                                      ssize_t* sent_length,
+                                      grpc_error** error) {
+  gpr_log(GPR_ERROR, "Write with timestamps not supported for this platform");
+  GPR_ASSERT(0);
+  return false;
+}
+
+static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) {
+  gpr_log(GPR_ERROR, "Error handling is not supported for this platform");
+  GPR_ASSERT(0);
+}
+#endif /* GRPC_LINUX_ERRQUEUE */
+
 /* returns true if done, false if pending; if returning true, *error is set */
 #define MAX_WRITE_IOVEC 1000
 static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) {
@@ -552,19 +779,26 @@
     msg.msg_namelen = 0;
     msg.msg_iov = iov;
     msg.msg_iovlen = iov_size;
-    msg.msg_control = nullptr;
-    msg.msg_controllen = 0;
     msg.msg_flags = 0;
+    if (tcp->outgoing_buffer_arg != nullptr) {
+      if (!tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length,
+                                     error))
+        return true; /* something went wrong with timestamps */
+    } else {
+      msg.msg_control = nullptr;
+      msg.msg_controllen = 0;
 
-    GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length);
-    GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(iov_size);
+      GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length);
+      GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(iov_size);
 
-    GPR_TIMER_SCOPE("sendmsg", 1);
-    do {
-      /* TODO(klempner): Cork if this is a partial write */
-      GRPC_STATS_INC_SYSCALL_WRITE();
-      sent_length = sendmsg(tcp->fd, &msg, SENDMSG_FLAGS);
-    } while (sent_length < 0 && errno == EINTR);
+      GPR_TIMER_SCOPE("sendmsg", 1);
+      do {
+        /* TODO(klempner): Cork if this is a partial write */
+        GRPC_STATS_INC_SYSCALL_WRITE();
+        sent_length = sendmsg(tcp->fd, &msg, SENDMSG_FLAGS);
+      } while (sent_length < 0 && errno == EINTR);
+    }
+    // gpr_log(GPR_INFO, "sent length %ld", sent_length);
 
     if (sent_length < 0) {
       if (errno == EAGAIN) {
@@ -588,6 +822,7 @@
     }
 
     GPR_ASSERT(tcp->outgoing_byte_idx == 0);
+    tcp->bytes_counter += sent_length;
     trailing = sending_length - static_cast<size_t>(sent_length);
     while (trailing > 0) {
       size_t slice_length;
@@ -602,7 +837,6 @@
         trailing -= slice_length;
       }
     }
-
     if (outgoing_slice_idx == tcp->outgoing_buffer->count) {
       *error = GRPC_ERROR_NONE;
       grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer);
@@ -635,14 +869,14 @@
       const char* str = grpc_error_string(error);
       gpr_log(GPR_INFO, "write: %s", str);
     }
-
+    // gpr_log(GPR_INFO, "scheduling callback");
     GRPC_CLOSURE_SCHED(cb, error);
     TCP_UNREF(tcp, "write");
   }
 }
 
 static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
-                      grpc_closure* cb) {
+                      grpc_closure* cb, void* arg) {
   GPR_TIMER_SCOPE("tcp_write", 0);
   grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
   grpc_error* error = GRPC_ERROR_NONE;
@@ -670,6 +904,8 @@
   }
   tcp->outgoing_buffer = buf;
   tcp->outgoing_byte_idx = 0;
+  tcp->outgoing_buffer_arg = arg;
+  if (arg) GPR_ASSERT(grpc_event_engine_can_track_errors());
 
   if (!tcp_flush(tcp, &error)) {
     TCP_REF(tcp, "write");
@@ -677,16 +913,20 @@
     if (grpc_tcp_trace.enabled()) {
       gpr_log(GPR_INFO, "write: delayed");
     }
+    // gpr_log(GPR_INFO, "notify");
     notify_on_write(tcp);
   } else {
     if (grpc_tcp_trace.enabled()) {
       const char* str = grpc_error_string(error);
       gpr_log(GPR_INFO, "write: %s", str);
     }
+    // gpr_log(GPR_INFO, "sched");
     GRPC_CLOSURE_SCHED(cb, error);
   }
 }
 
+namespace {} /* namespace */
+
 static void tcp_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {
   grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
   grpc_pollset_add_fd(pollset, tcp->em_fd);
@@ -787,6 +1027,8 @@
   tcp->bytes_read_this_round = 0;
   /* Will be set to false by the very first endpoint read function */
   tcp->is_first_read = true;
+  tcp->bytes_counter = -1;
+  tcp->socket_ts_enabled = false;
   /* paired with unref in grpc_tcp_destroy */
   gpr_ref_init(&tcp->refcount, 1);
   gpr_atm_no_barrier_store(&tcp->shutdown_count, 0);
@@ -798,6 +1040,16 @@
   /* Tell network status tracker about new endpoint */
   grpc_network_status_register_endpoint(&tcp->base);
   grpc_resource_quota_unref_internal(resource_quota);
+  gpr_mu_init(&tcp->traced_buffer_lock);
+  tcp->head = nullptr;
+  /* Start being notified on errors if event engine can track errors. */
+  if (grpc_event_engine_can_track_errors()) {
+    TCP_REF(tcp, "error");
+    gpr_atm_rel_store(&tcp->stop_error_notification, 0);
+    GRPC_CLOSURE_INIT(&tcp->error_closure, tcp_handle_error, tcp,
+                      grpc_schedule_on_exec_ctx);
+    grpc_fd_notify_on_error(tcp->em_fd, &tcp->error_closure);
+  }
 
   return &tcp->base;
 }
@@ -816,6 +1068,11 @@
   tcp->release_fd = fd;
   tcp->release_fd_cb = done;
   grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer);
+  if (grpc_event_engine_can_track_errors()) {
+    // gpr_log(GPR_INFO, "stop errors");
+    gpr_atm_no_barrier_store(&tcp->stop_error_notification, true);
+    grpc_fd_notify_on_error(tcp->em_fd, nullptr);
+  }
   TCP_UNREF(tcp, "destroy");
 }
 
diff --git a/src/core/lib/iomgr/tcp_posix.h b/src/core/lib/iomgr/tcp_posix.h
index af89bd2..322af62 100644
--- a/src/core/lib/iomgr/tcp_posix.h
+++ b/src/core/lib/iomgr/tcp_posix.h
@@ -31,7 +31,10 @@
 
 #include <grpc/support/port_platform.h>
 
+#include "src/core/lib/iomgr/port.h"
+
 #include "src/core/lib/debug/trace.h"
+#include "src/core/lib/iomgr/buffer_list.h"
 #include "src/core/lib/iomgr/endpoint.h"
 #include "src/core/lib/iomgr/ev_posix.h"
 
@@ -54,4 +57,9 @@
 void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd,
                                      grpc_closure* done);
 
+/** Sets the callback function to call when timestamps for a write are
+ *  collected. */
+void grpc_tcp_set_write_timestamps_callback(void (*fn)(void*,
+                                                       grpc_core::Timestamps*));
+
 #endif /* GRPC_CORE_LIB_IOMGR_TCP_POSIX_H */
diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc
index 8ddf684..824db07 100644
--- a/src/core/lib/iomgr/tcp_server_posix.cc
+++ b/src/core/lib/iomgr/tcp_server_posix.cc
@@ -226,7 +226,7 @@
       gpr_log(GPR_INFO, "SERVER_CONNECT: incoming connection: %s", addr_str);
     }
 
-    grpc_fd* fdobj = grpc_fd_create(fd, name, false);
+    grpc_fd* fdobj = grpc_fd_create(fd, name, true);
 
     read_notifier_pollset =
         sp->server->pollsets[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
@@ -362,7 +362,7 @@
     listener->sibling = sp;
     sp->server = listener->server;
     sp->fd = fd;
-    sp->emfd = grpc_fd_create(fd, name, false);
+    sp->emfd = grpc_fd_create(fd, name, true);
     memcpy(&sp->addr, &listener->addr, sizeof(grpc_resolved_address));
     sp->port = port;
     sp->port_index = listener->port_index;
diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
index b9f8145..9595c02 100644
--- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
+++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
@@ -105,7 +105,7 @@
     s->tail = sp;
     sp->server = s;
     sp->fd = fd;
-    sp->emfd = grpc_fd_create(fd, name, false);
+    sp->emfd = grpc_fd_create(fd, name, true);
     memcpy(&sp->addr, addr, sizeof(grpc_resolved_address));
     sp->port = port;
     sp->port_index = port_index;
diff --git a/src/core/lib/iomgr/tcp_windows.cc b/src/core/lib/iomgr/tcp_windows.cc
index 5d316d4..fd146c9 100644
--- a/src/core/lib/iomgr/tcp_windows.cc
+++ b/src/core/lib/iomgr/tcp_windows.cc
@@ -296,7 +296,7 @@
 
 /* Initiates a write. */
 static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
-                      grpc_closure* cb) {
+                      grpc_closure* cb, void* arg) {
   grpc_tcp* tcp = (grpc_tcp*)ep;
   grpc_winsocket* socket = tcp->socket;
   grpc_winsocket_callback_info* info = &socket->write_info;
diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc
index bdb2d0e..3dd7cab 100644
--- a/src/core/lib/iomgr/udp_server.cc
+++ b/src/core/lib/iomgr/udp_server.cc
@@ -152,7 +152,7 @@
   grpc_sockaddr_to_string(&addr_str, addr, 1);
   gpr_asprintf(&name, "udp-server-listener:%s", addr_str);
   gpr_free(addr_str);
-  emfd_ = grpc_fd_create(fd, name, false);
+  emfd_ = grpc_fd_create(fd, name, true);
   memcpy(&addr_, addr, sizeof(grpc_resolved_address));
   GPR_ASSERT(emfd_);
   gpr_free(name);
diff --git a/src/core/lib/security/transport/secure_endpoint.cc b/src/core/lib/security/transport/secure_endpoint.cc
index 840b2e7..f40f969 100644
--- a/src/core/lib/security/transport/secure_endpoint.cc
+++ b/src/core/lib/security/transport/secure_endpoint.cc
@@ -254,7 +254,7 @@
 }
 
 static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
-                           grpc_closure* cb) {
+                           grpc_closure* cb, void* arg) {
   GPR_TIMER_SCOPE("secure_endpoint.endpoint_write", 0);
 
   unsigned i;
@@ -342,7 +342,7 @@
     return;
   }
 
-  grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb);
+  grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb, arg);
 }
 
 static void endpoint_shutdown(grpc_endpoint* secure_ep, grpc_error* why) {
diff --git a/src/core/lib/security/transport/security_handshaker.cc b/src/core/lib/security/transport/security_handshaker.cc
index aff723e..d76d582 100644
--- a/src/core/lib/security/transport/security_handshaker.cc
+++ b/src/core/lib/security/transport/security_handshaker.cc
@@ -259,7 +259,7 @@
     grpc_slice_buffer_reset_and_unref_internal(&h->outgoing);
     grpc_slice_buffer_add(&h->outgoing, to_send);
     grpc_endpoint_write(h->args->endpoint, &h->outgoing,
-                        &h->on_handshake_data_sent_to_peer);
+                        &h->on_handshake_data_sent_to_peer, nullptr);
   } else if (handshaker_result == nullptr) {
     // There is nothing to send, but need to read from peer.
     grpc_endpoint_read(h->args->endpoint, h->args->read_buffer,