Merge pull request #2945 from maxwell-demon/master

Re-install census_filters
diff --git a/BUILD b/BUILD
index 0e244ca..607e3f4 100644
--- a/BUILD
+++ b/BUILD
@@ -143,7 +143,7 @@
     "src/core/tsi/ssl_transport_security.h",
     "src/core/tsi/transport_security.h",
     "src/core/tsi/transport_security_interface.h",
-    "src/core/channel/census_filter.h",
+    "src/core/census/grpc_filter.h",
     "src/core/channel/channel_args.h",
     "src/core/channel/channel_stack.h",
     "src/core/channel/client_channel.h",
@@ -267,6 +267,7 @@
     "src/core/tsi/ssl_transport_security.c",
     "src/core/tsi/transport_security.c",
     "src/core/census/grpc_context.c",
+    "src/core/census/grpc_filter.c",
     "src/core/channel/channel_args.c",
     "src/core/channel/channel_stack.c",
     "src/core/channel/client_channel.c",
@@ -409,7 +410,7 @@
 cc_library(
   name = "grpc_unsecure",
   srcs = [
-    "src/core/channel/census_filter.h",
+    "src/core/census/grpc_filter.h",
     "src/core/channel/channel_args.h",
     "src/core/channel/channel_stack.h",
     "src/core/channel/client_channel.h",
@@ -513,6 +514,7 @@
     "src/core/census/rpc_stat_id.h",
     "src/core/surface/init_unsecure.c",
     "src/core/census/grpc_context.c",
+    "src/core/census/grpc_filter.c",
     "src/core/channel/channel_args.c",
     "src/core/channel/channel_stack.c",
     "src/core/channel/client_channel.c",
@@ -1021,6 +1023,7 @@
     "src/core/tsi/ssl_transport_security.c",
     "src/core/tsi/transport_security.c",
     "src/core/census/grpc_context.c",
+    "src/core/census/grpc_filter.c",
     "src/core/channel/channel_args.c",
     "src/core/channel/channel_stack.c",
     "src/core/channel/client_channel.c",
@@ -1160,7 +1163,7 @@
     "src/core/tsi/ssl_transport_security.h",
     "src/core/tsi/transport_security.h",
     "src/core/tsi/transport_security_interface.h",
-    "src/core/channel/census_filter.h",
+    "src/core/census/grpc_filter.h",
     "src/core/channel/channel_args.h",
     "src/core/channel/channel_stack.h",
     "src/core/channel/client_channel.h",
diff --git a/Makefile b/Makefile
index 77be14e..48ecea9 100644
--- a/Makefile
+++ b/Makefile
@@ -4079,6 +4079,7 @@
     src/core/tsi/ssl_transport_security.c \
     src/core/tsi/transport_security.c \
     src/core/census/grpc_context.c \
+    src/core/census/grpc_filter.c \
     src/core/channel/channel_args.c \
     src/core/channel/channel_stack.c \
     src/core/channel/client_channel.c \
@@ -4352,6 +4353,7 @@
 LIBGRPC_UNSECURE_SRC = \
     src/core/surface/init_unsecure.c \
     src/core/census/grpc_context.c \
+    src/core/census/grpc_filter.c \
     src/core/channel/channel_args.c \
     src/core/channel/channel_stack.c \
     src/core/channel/client_channel.c \
diff --git a/build.json b/build.json
index 4f9017a..c3e784b 100644
--- a/build.json
+++ b/build.json
@@ -116,7 +116,7 @@
         "include/grpc/status.h"
       ],
       "headers": [
-        "src/core/channel/census_filter.h",
+        "src/core/census/grpc_filter.h",
         "src/core/channel/channel_args.h",
         "src/core/channel/channel_stack.h",
         "src/core/channel/client_channel.h",
@@ -219,6 +219,7 @@
       ],
       "src": [
         "src/core/census/grpc_context.c",
+        "src/core/census/grpc_filter.c",
         "src/core/channel/channel_args.c",
         "src/core/channel/channel_stack.c",
         "src/core/channel/client_channel.c",
diff --git a/gRPC.podspec b/gRPC.podspec
index d945c29..0e826b5 100644
--- a/gRPC.podspec
+++ b/gRPC.podspec
@@ -145,7 +145,7 @@
                       'src/core/tsi/ssl_transport_security.h',
                       'src/core/tsi/transport_security.h',
                       'src/core/tsi/transport_security_interface.h',
-                      'src/core/channel/census_filter.h',
+                      'src/core/census/grpc_filter.h',
                       'src/core/channel/channel_args.h',
                       'src/core/channel/channel_stack.h',
                       'src/core/channel/client_channel.h',
@@ -276,6 +276,7 @@
                       'src/core/tsi/ssl_transport_security.c',
                       'src/core/tsi/transport_security.c',
                       'src/core/census/grpc_context.c',
+                      'src/core/census/grpc_filter.c',
                       'src/core/channel/channel_args.c',
                       'src/core/channel/channel_stack.c',
                       'src/core/channel/client_channel.c',
@@ -414,7 +415,7 @@
                               'src/core/tsi/ssl_transport_security.h',
                               'src/core/tsi/transport_security.h',
                               'src/core/tsi/transport_security_interface.h',
-                              'src/core/channel/census_filter.h',
+                              'src/core/census/grpc_filter.h',
                               'src/core/channel/channel_args.h',
                               'src/core/channel/channel_stack.h',
                               'src/core/channel/client_channel.h',
diff --git a/src/core/channel/census_filter.c b/src/core/census/grpc_filter.c
similarity index 77%
rename from src/core/channel/census_filter.c
rename to src/core/census/grpc_filter.c
index d996c34..fbedb35 100644
--- a/src/core/channel/census_filter.c
+++ b/src/core/census/grpc_filter.c
@@ -31,11 +31,13 @@
  *
  */
 
-#include "src/core/channel/census_filter.h"
+#include "src/core/census/grpc_filter.h"
 
 #include <stdio.h>
 #include <string.h>
 
+#include "include/grpc/census.h"
+#include "src/core/census/rpc_stat_id.h"
 #include "src/core/channel/channel_stack.h"
 #include "src/core/channel/noop_filter.h"
 #include "src/core/statistics/census_interface.h"
@@ -47,24 +49,19 @@
 
 typedef struct call_data {
   census_op_id op_id;
-  census_rpc_stats stats;
+  census_context* ctxt;
   gpr_timespec start_ts;
+  int error;
 
   /* recv callback */
   grpc_stream_op_buffer* recv_ops;
-  void (*on_done_recv)(void* user_data, int success);
-  void* recv_user_data;
+  grpc_iomgr_closure* on_done_recv;
 } call_data;
 
 typedef struct channel_data {
   grpc_mdstr* path_str; /* pointer to meta data str with key == ":path" */
 } channel_data;
 
-static void init_rpc_stats(census_rpc_stats* stats) {
-  memset(stats, 0, sizeof(census_rpc_stats));
-  stats->cnt = 1;
-}
-
 static void extract_and_annotate_method_tag(grpc_stream_op_buffer* sopb,
                                             call_data* calld,
                                             channel_data* chand) {
@@ -77,8 +74,7 @@
       if (m->md->key == chand->path_str) {
         gpr_log(GPR_DEBUG, "%s",
                 (const char*)GPR_SLICE_START_PTR(m->md->value->slice));
-        census_add_method_tag(calld->op_id, (const char*)GPR_SLICE_START_PTR(
-                                                m->md->value->slice));
+        /* Add method tag here */
       }
     }
   }
@@ -95,8 +91,6 @@
 
 static void client_start_transport_op(grpc_call_element* elem,
                                       grpc_transport_stream_op* op) {
-  call_data* calld = elem->call_data;
-  GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
   client_mutate_op(elem, op);
   grpc_call_next_op(elem, op);
 }
@@ -108,7 +102,7 @@
   if (success) {
     extract_and_annotate_method_tag(calld->recv_ops, calld, chand);
   }
-  calld->on_done_recv(calld->recv_user_data, success);
+  calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success);
 }
 
 static void server_mutate_op(grpc_call_element* elem,
@@ -118,9 +112,7 @@
     /* substitute our callback for the op callback */
     calld->recv_ops = op->recv_ops;
     calld->on_done_recv = op->on_done_recv;
-    calld->recv_user_data = op->recv_user_data;
-    op->on_done_recv = server_on_done_recv;
-    op->recv_user_data = elem;
+    op->on_done_recv = calld->on_done_recv;
   }
 }
 
@@ -132,35 +124,19 @@
   grpc_call_next_op(elem, op);
 }
 
-static void channel_op(grpc_channel_element* elem,
-                       grpc_channel_element* from_elem, grpc_channel_op* op) {
-  switch (op->type) {
-    case GRPC_TRANSPORT_CLOSED:
-      /* TODO(hongyu): Annotate trace information for all calls of the channel
-       */
-      break;
-    default:
-      break;
-  }
-  grpc_channel_next_op(elem, op);
-}
-
 static void client_init_call_elem(grpc_call_element* elem,
                                   const void* server_transport_data,
                                   grpc_transport_stream_op* initial_op) {
   call_data* d = elem->call_data;
   GPR_ASSERT(d != NULL);
-  init_rpc_stats(&d->stats);
   d->start_ts = gpr_now(GPR_CLOCK_REALTIME);
-  d->op_id = census_tracing_start_op();
   if (initial_op) client_mutate_op(elem, initial_op);
 }
 
 static void client_destroy_call_elem(grpc_call_element* elem) {
   call_data* d = elem->call_data;
   GPR_ASSERT(d != NULL);
-  census_record_rpc_client_stats(d->op_id, &d->stats);
-  census_tracing_end_op(d->op_id);
+  /* TODO(hongyu): record rpc client stats and census_rpc_end_op here */
 }
 
 static void server_init_call_elem(grpc_call_element* elem,
@@ -168,29 +144,24 @@
                                   grpc_transport_stream_op* initial_op) {
   call_data* d = elem->call_data;
   GPR_ASSERT(d != NULL);
-  init_rpc_stats(&d->stats);
   d->start_ts = gpr_now(GPR_CLOCK_REALTIME);
-  d->op_id = census_tracing_start_op();
+  /* TODO(hongyu): call census_tracing_start_op here. */
+  grpc_iomgr_closure_init(d->on_done_recv, server_on_done_recv, elem);
   if (initial_op) server_mutate_op(elem, initial_op);
 }
 
 static void server_destroy_call_elem(grpc_call_element* elem) {
   call_data* d = elem->call_data;
   GPR_ASSERT(d != NULL);
-  d->stats.elapsed_time_ms = gpr_timespec_to_micros(
-      gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), d->start_ts));
-  census_record_rpc_server_stats(d->op_id, &d->stats);
-  census_tracing_end_op(d->op_id);
+  /* TODO(hongyu): record rpc server stats and census_tracing_end_op here */
 }
 
-static void init_channel_elem(grpc_channel_element* elem,
+static void init_channel_elem(grpc_channel_element* elem, grpc_channel* master,
                               const grpc_channel_args* args, grpc_mdctx* mdctx,
                               int is_first, int is_last) {
   channel_data* chand = elem->channel_data;
   GPR_ASSERT(chand != NULL);
-  GPR_ASSERT(!is_first);
-  GPR_ASSERT(!is_last);
-  chand->path_str = grpc_mdstr_from_string(mdctx, ":path");
+  chand->path_str = grpc_mdstr_from_string(mdctx, ":path", 0);
 }
 
 static void destroy_channel_elem(grpc_channel_element* elem) {
@@ -203,22 +174,24 @@
 
 const grpc_channel_filter grpc_client_census_filter = {
     client_start_transport_op,
-    channel_op,
+    grpc_channel_next_op,
     sizeof(call_data),
     client_init_call_elem,
     client_destroy_call_elem,
     sizeof(channel_data),
     init_channel_elem,
     destroy_channel_elem,
+    grpc_call_next_get_peer,
     "census-client"};
 
 const grpc_channel_filter grpc_server_census_filter = {
     server_start_transport_op,
-    channel_op,
+    grpc_channel_next_op,
     sizeof(call_data),
     server_init_call_elem,
     server_destroy_call_elem,
     sizeof(channel_data),
     init_channel_elem,
     destroy_channel_elem,
+    grpc_call_next_get_peer,
     "census-server"};
diff --git a/src/core/channel/census_filter.h b/src/core/census/grpc_filter.h
similarity index 100%
rename from src/core/channel/census_filter.h
rename to src/core/census/grpc_filter.h
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index 82ddfac..707251d 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -38,6 +38,7 @@
 
 #include <grpc/support/alloc.h>
 
+#include "src/core/census/grpc_filter.h"
 #include "src/core/channel/channel_args.h"
 #include "src/core/channel/client_channel.h"
 #include "src/core/channel/compress_filter.h"
@@ -165,10 +166,9 @@
   grpc_mdctx *mdctx = grpc_mdctx_create();
   int n = 0;
   GPR_ASSERT(!reserved);
-  /* TODO(census)
   if (grpc_channel_args_is_census_enabled(args)) {
     filters[n++] = &grpc_client_census_filter;
-    } */
+  }
   filters[n++] = &grpc_compress_filter;
   filters[n++] = &grpc_client_channel_filter;
   GPR_ASSERT(n <= MAX_FILTERS);
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index 5b03ba9..eccee24 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -38,6 +38,7 @@
 
 #include <grpc/support/alloc.h>
 
+#include "src/core/census/grpc_filter.h"
 #include "src/core/channel/channel_args.h"
 #include "src/core/channel/client_channel.h"
 #include "src/core/channel/compress_filter.h"
@@ -217,10 +218,9 @@
   args_copy = grpc_channel_args_copy_and_add(
       new_args_from_connector != NULL ? new_args_from_connector : args,
       &connector_arg, 1);
-  /* TODO(census)
   if (grpc_channel_args_is_census_enabled(args)) {
     filters[n++] = &grpc_client_census_filter;
-    } */
+  }
   filters[n++] = &grpc_compress_filter;
   filters[n++] = &grpc_client_channel_filter;
   GPR_ASSERT(n <= MAX_FILTERS);
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 1c40241..292bf6f 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -41,7 +41,7 @@
 #include <grpc/support/string_util.h>
 #include <grpc/support/useful.h>
 
-#include "src/core/channel/census_filter.h"
+#include "src/core/census/grpc_filter.h"
 #include "src/core/channel/channel_args.h"
 #include "src/core/channel/connected_channel.h"
 #include "src/core/iomgr/iomgr.h"
@@ -821,10 +821,9 @@
   server->channel_filters =
       gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *));
   server->channel_filters[0] = &server_surface_filter;
-  /* TODO(census): restore this once we rework census filter
   if (census_enabled) {
     server->channel_filters[1] = &grpc_server_census_filter;
-    } */
+  }
   for (i = 0; i < filter_count; i++) {
     server->channel_filters[i + 1 + census_enabled] = filters[i];
   }
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index 325a293..d27c5d9 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -780,7 +780,7 @@
 src/core/tsi/ssl_transport_security.h \
 src/core/tsi/transport_security.h \
 src/core/tsi/transport_security_interface.h \
-src/core/channel/census_filter.h \
+src/core/census/grpc_filter.h \
 src/core/channel/channel_args.h \
 src/core/channel/channel_stack.h \
 src/core/channel/client_channel.h \
@@ -904,6 +904,7 @@
 src/core/tsi/ssl_transport_security.c \
 src/core/tsi/transport_security.c \
 src/core/census/grpc_context.c \
+src/core/census/grpc_filter.c \
 src/core/channel/channel_args.c \
 src/core/channel/channel_stack.c \
 src/core/channel/client_channel.c \
diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json
index a7fd884..f2a2abe 100644
--- a/tools/run_tests/sources_and_headers.json
+++ b/tools/run_tests/sources_and_headers.json
@@ -12264,8 +12264,8 @@
       "include/grpc/grpc_security.h", 
       "include/grpc/status.h", 
       "src/core/census/context.h", 
+      "src/core/census/grpc_filter.h", 
       "src/core/census/rpc_stat_id.h", 
-      "src/core/channel/census_filter.h", 
       "src/core/channel/channel_args.h", 
       "src/core/channel/channel_stack.h", 
       "src/core/channel/client_channel.h", 
@@ -12392,10 +12392,11 @@
       "src/core/census/context.c", 
       "src/core/census/context.h", 
       "src/core/census/grpc_context.c", 
+      "src/core/census/grpc_filter.c", 
+      "src/core/census/grpc_filter.h", 
       "src/core/census/initialize.c", 
       "src/core/census/record_stat.c", 
       "src/core/census/rpc_stat_id.h", 
-      "src/core/channel/census_filter.h", 
       "src/core/channel/channel_args.c", 
       "src/core/channel/channel_args.h", 
       "src/core/channel/channel_stack.c", 
@@ -12738,8 +12739,8 @@
       "include/grpc/grpc.h", 
       "include/grpc/status.h", 
       "src/core/census/context.h", 
+      "src/core/census/grpc_filter.h", 
       "src/core/census/rpc_stat_id.h", 
-      "src/core/channel/census_filter.h", 
       "src/core/channel/channel_args.h", 
       "src/core/channel/channel_stack.h", 
       "src/core/channel/client_channel.h", 
@@ -12852,10 +12853,11 @@
       "src/core/census/context.c", 
       "src/core/census/context.h", 
       "src/core/census/grpc_context.c", 
+      "src/core/census/grpc_filter.c", 
+      "src/core/census/grpc_filter.h", 
       "src/core/census/initialize.c", 
       "src/core/census/record_stat.c", 
       "src/core/census/rpc_stat_id.h", 
-      "src/core/channel/census_filter.h", 
       "src/core/channel/channel_args.c", 
       "src/core/channel/channel_args.h", 
       "src/core/channel/channel_stack.c", 
diff --git a/vsprojects/grpc/grpc.vcxproj b/vsprojects/grpc/grpc.vcxproj
index ebdc926..500cf9f 100644
--- a/vsprojects/grpc/grpc.vcxproj
+++ b/vsprojects/grpc/grpc.vcxproj
@@ -242,7 +242,7 @@
     <ClInclude Include="..\..\src\core\tsi\ssl_transport_security.h" />
     <ClInclude Include="..\..\src\core\tsi\transport_security.h" />
     <ClInclude Include="..\..\src\core\tsi\transport_security_interface.h" />
-    <ClInclude Include="..\..\src\core\channel\census_filter.h" />
+    <ClInclude Include="..\..\src\core\census\grpc_filter.h" />
     <ClInclude Include="..\..\src\core\channel\channel_args.h" />
     <ClInclude Include="..\..\src\core\channel\channel_stack.h" />
     <ClInclude Include="..\..\src\core\channel\client_channel.h" />
@@ -390,6 +390,8 @@
     </ClCompile>
     <ClCompile Include="..\..\src\core\census\grpc_context.c">
     </ClCompile>
+    <ClCompile Include="..\..\src\core\census\grpc_filter.c">
+    </ClCompile>
     <ClCompile Include="..\..\src\core\channel\channel_args.c">
     </ClCompile>
     <ClCompile Include="..\..\src\core\channel\channel_stack.c">
diff --git a/vsprojects/grpc/grpc.vcxproj.filters b/vsprojects/grpc/grpc.vcxproj.filters
index baec0db..02060b7 100644
--- a/vsprojects/grpc/grpc.vcxproj.filters
+++ b/vsprojects/grpc/grpc.vcxproj.filters
@@ -67,6 +67,9 @@
     <ClCompile Include="..\..\src\core\census\grpc_context.c">
       <Filter>src\core\census</Filter>
     </ClCompile>
+    <ClCompile Include="..\..\src\core\census\grpc_filter.c">
+      <Filter>src\core\census</Filter>
+    </ClCompile>
     <ClCompile Include="..\..\src\core\channel\channel_args.c">
       <Filter>src\core\channel</Filter>
     </ClCompile>
@@ -482,8 +485,8 @@
     <ClInclude Include="..\..\src\core\tsi\transport_security_interface.h">
       <Filter>src\core\tsi</Filter>
     </ClInclude>
-    <ClInclude Include="..\..\src\core\channel\census_filter.h">
-      <Filter>src\core\channel</Filter>
+    <ClInclude Include="..\..\src\core\census\grpc_filter.h">
+      <Filter>src\core\census</Filter>
     </ClInclude>
     <ClInclude Include="..\..\src\core\channel\channel_args.h">
       <Filter>src\core\channel</Filter>
diff --git a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj
index 1d60839..13c018c 100644
--- a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj
+++ b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj
@@ -225,7 +225,7 @@
     <ClInclude Include="..\..\include\grpc\census.h" />
   </ItemGroup>
   <ItemGroup>
-    <ClInclude Include="..\..\src\core\channel\census_filter.h" />
+    <ClInclude Include="..\..\src\core\census\grpc_filter.h" />
     <ClInclude Include="..\..\src\core\channel\channel_args.h" />
     <ClInclude Include="..\..\src\core\channel\channel_stack.h" />
     <ClInclude Include="..\..\src\core\channel\client_channel.h" />
@@ -333,6 +333,8 @@
     </ClCompile>
     <ClCompile Include="..\..\src\core\census\grpc_context.c">
     </ClCompile>
+    <ClCompile Include="..\..\src\core\census\grpc_filter.c">
+    </ClCompile>
     <ClCompile Include="..\..\src\core\channel\channel_args.c">
     </ClCompile>
     <ClCompile Include="..\..\src\core\channel\channel_stack.c">
diff --git a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters
index cb9e8c2..5adcdd6 100644
--- a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters
+++ b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters
@@ -7,6 +7,9 @@
     <ClCompile Include="..\..\src\core\census\grpc_context.c">
       <Filter>src\core\census</Filter>
     </ClCompile>
+    <ClCompile Include="..\..\src\core\census\grpc_filter.c">
+      <Filter>src\core\census</Filter>
+    </ClCompile>
     <ClCompile Include="..\..\src\core\channel\channel_args.c">
       <Filter>src\core\channel</Filter>
     </ClCompile>
@@ -380,8 +383,8 @@
     </ClInclude>
   </ItemGroup>
   <ItemGroup>
-    <ClInclude Include="..\..\src\core\channel\census_filter.h">
-      <Filter>src\core\channel</Filter>
+    <ClInclude Include="..\..\src\core\census\grpc_filter.h">
+      <Filter>src\core\census</Filter>
     </ClInclude>
     <ClInclude Include="..\..\src\core\channel\channel_args.h">
       <Filter>src\core\channel</Filter>