Merge pull request #9801 from ctiller/rmw

Add counters for rmw atomic operations to microbenchmarks
diff --git a/Makefile b/Makefile
index 8f92816..d47d967 100644
--- a/Makefile
+++ b/Makefile
@@ -217,7 +217,7 @@
 CXX_counters = $(DEFAULT_CXX)
 LD_counters = $(DEFAULT_CC)
 LDXX_counters = $(DEFAULT_CXX)
-CPPFLAGS_counters = -O2 -DGPR_MU_COUNTERS
+CPPFLAGS_counters = -O2 -DGPR_LOW_LEVEL_COUNTERS
 DEFINES_counters = NDEBUG
 
 
diff --git a/build.yaml b/build.yaml
index 141526c..120c7a8 100644
--- a/build.yaml
+++ b/build.yaml
@@ -3919,7 +3919,7 @@
     CPPFLAGS: -O2 -DGRPC_BASIC_PROFILER -DGRPC_TIMERS_RDTSC
     DEFINES: NDEBUG
   counters:
-    CPPFLAGS: -O2 -DGPR_MU_COUNTERS
+    CPPFLAGS: -O2 -DGPR_LOW_LEVEL_COUNTERS
     DEFINES: NDEBUG
   dbg:
     CPPFLAGS: -O0
diff --git a/include/grpc/impl/codegen/atm_gcc_atomic.h b/include/grpc/impl/codegen/atm_gcc_atomic.h
index 7d4ae98..4bd3b25 100644
--- a/include/grpc/impl/codegen/atm_gcc_atomic.h
+++ b/include/grpc/impl/codegen/atm_gcc_atomic.h
@@ -40,6 +40,20 @@
 
 typedef intptr_t gpr_atm;
 
+#ifdef GPR_LOW_LEVEL_COUNTERS
+extern gpr_atm gpr_counter_atm_cas;
+extern gpr_atm gpr_counter_atm_add;
+#define GPR_ATM_INC_COUNTER(counter) \
+  __atomic_fetch_add(&counter, 1, __ATOMIC_RELAXED)
+#define GPR_ATM_INC_CAS_THEN(blah) \
+  (GPR_ATM_INC_COUNTER(gpr_counter_atm_cas), blah)
+#define GPR_ATM_INC_ADD_THEN(blah) \
+  (GPR_ATM_INC_COUNTER(gpr_counter_atm_add), blah)
+#else
+#define GPR_ATM_INC_CAS_THEN(blah) blah
+#define GPR_ATM_INC_ADD_THEN(blah) blah
+#endif
+
 #define gpr_atm_full_barrier() (__atomic_thread_fence(__ATOMIC_SEQ_CST))
 
 #define gpr_atm_acq_load(p) (__atomic_load_n((p), __ATOMIC_ACQUIRE))
@@ -50,25 +64,28 @@
   (__atomic_store_n((p), (intptr_t)(value), __ATOMIC_RELAXED))
 
 #define gpr_atm_no_barrier_fetch_add(p, delta) \
-  (__atomic_fetch_add((p), (intptr_t)(delta), __ATOMIC_RELAXED))
+  GPR_ATM_INC_ADD_THEN(                        \
+      __atomic_fetch_add((p), (intptr_t)(delta), __ATOMIC_RELAXED))
 #define gpr_atm_full_fetch_add(p, delta) \
-  (__atomic_fetch_add((p), (intptr_t)(delta), __ATOMIC_ACQ_REL))
+  GPR_ATM_INC_ADD_THEN(                  \
+      __atomic_fetch_add((p), (intptr_t)(delta), __ATOMIC_ACQ_REL))
 
 static __inline int gpr_atm_no_barrier_cas(gpr_atm *p, gpr_atm o, gpr_atm n) {
-  return __atomic_compare_exchange_n(p, &o, n, 0, __ATOMIC_RELAXED,
-                                     __ATOMIC_RELAXED);
+  return GPR_ATM_INC_CAS_THEN(__atomic_compare_exchange_n(
+      p, &o, n, 0, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
 }
 
 static __inline int gpr_atm_acq_cas(gpr_atm *p, gpr_atm o, gpr_atm n) {
-  return __atomic_compare_exchange_n(p, &o, n, 0, __ATOMIC_ACQUIRE,
-                                     __ATOMIC_RELAXED);
+  return GPR_ATM_INC_CAS_THEN(__atomic_compare_exchange_n(
+      p, &o, n, 0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED));
 }
 
 static __inline int gpr_atm_rel_cas(gpr_atm *p, gpr_atm o, gpr_atm n) {
-  return __atomic_compare_exchange_n(p, &o, n, 0, __ATOMIC_RELEASE,
-                                     __ATOMIC_RELAXED);
+  return GPR_ATM_INC_CAS_THEN(__atomic_compare_exchange_n(
+      p, &o, n, 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED));
 }
 
-#define gpr_atm_full_xchg(p, n) __atomic_exchange_n((p), (n), __ATOMIC_ACQ_REL)
+#define gpr_atm_full_xchg(p, n) \
+  GPR_ATM_INC_CAS_THEN(__atomic_exchange_n((p), (n), __ATOMIC_ACQ_REL))
 
 #endif /* GRPC_IMPL_CODEGEN_ATM_GCC_ATOMIC_H */
diff --git a/src/core/lib/support/sync_posix.c b/src/core/lib/support/sync_posix.c
index de0f048..16e7d6e 100644
--- a/src/core/lib/support/sync_posix.c
+++ b/src/core/lib/support/sync_posix.c
@@ -42,8 +42,10 @@
 #include <time.h>
 #include "src/core/lib/profiling/timers.h"
 
-#ifdef GPR_MU_COUNTERS
-gpr_atm grpc_mu_locks = 0;
+#ifdef GPR_LOW_LEVEL_COUNTERS
+gpr_atm gpr_mu_locks = 0;
+gpr_atm gpr_counter_atm_cas = 0;
+gpr_atm gpr_counter_atm_add = 0;
 #endif
 
 void gpr_mu_init(gpr_mu* mu) { GPR_ASSERT(pthread_mutex_init(mu, NULL) == 0); }
@@ -51,8 +53,8 @@
 void gpr_mu_destroy(gpr_mu* mu) { GPR_ASSERT(pthread_mutex_destroy(mu) == 0); }
 
 void gpr_mu_lock(gpr_mu* mu) {
-#ifdef GPR_MU_COUNTERS
-  gpr_atm_no_barrier_fetch_add(&grpc_mu_locks, 1);
+#ifdef GPR_LOW_LEVEL_COUNTERS
+  GPR_ATM_INC_COUNTER(gpr_mu_locks);
 #endif
   GPR_TIMER_BEGIN("gpr_mu_lock", 0);
   GPR_ASSERT(pthread_mutex_lock(mu) == 0);
diff --git a/test/cpp/microbenchmarks/bm_closure.cc b/test/cpp/microbenchmarks/bm_closure.cc
index 80d6610..03aede3 100644
--- a/test/cpp/microbenchmarks/bm_closure.cc
+++ b/test/cpp/microbenchmarks/bm_closure.cc
@@ -43,13 +43,53 @@
 
 #include "third_party/benchmark/include/benchmark/benchmark.h"
 
+#include <sstream>
+
+#ifdef GPR_LOW_LEVEL_COUNTERS
+extern "C" gpr_atm gpr_mu_locks;
+#endif
+
 static class InitializeStuff {
  public:
   InitializeStuff() { grpc_init(); }
   ~InitializeStuff() { grpc_shutdown(); }
 } initialize_stuff;
 
+class TrackCounters {
+ public:
+  TrackCounters(benchmark::State& state) : state_(state) {}
+
+  ~TrackCounters() {
+    std::ostringstream out;
+#ifdef GPR_LOW_LEVEL_COUNTERS
+    out << " locks/iter:" << ((double)(gpr_atm_no_barrier_load(&gpr_mu_locks) -
+                                       mu_locks_at_start_) /
+                              (double)state_.iterations())
+        << " atm_cas/iter:"
+        << ((double)(gpr_atm_no_barrier_load(&gpr_counter_atm_cas) -
+                     atm_cas_at_start_) /
+            (double)state_.iterations())
+        << " atm_add/iter:"
+        << ((double)(gpr_atm_no_barrier_load(&gpr_counter_atm_add) -
+                     atm_add_at_start_) /
+            (double)state_.iterations());
+#endif
+    state_.SetLabel(out.str());
+  }
+
+ private:
+  benchmark::State& state_;
+#ifdef GPR_LOW_LEVEL_COUNTERS
+  const size_t mu_locks_at_start_ = gpr_atm_no_barrier_load(&gpr_mu_locks);
+  const size_t atm_cas_at_start_ =
+      gpr_atm_no_barrier_load(&gpr_counter_atm_cas);
+  const size_t atm_add_at_start_ =
+      gpr_atm_no_barrier_load(&gpr_counter_atm_add);
+#endif
+};
+
 static void BM_NoOpExecCtx(benchmark::State& state) {
+  TrackCounters track_counters(state);
   while (state.KeepRunning()) {
     grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
     grpc_exec_ctx_finish(&exec_ctx);
@@ -58,6 +98,7 @@
 BENCHMARK(BM_NoOpExecCtx);
 
 static void BM_WellFlushed(benchmark::State& state) {
+  TrackCounters track_counters(state);
   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
   while (state.KeepRunning()) {
     grpc_exec_ctx_flush(&exec_ctx);
@@ -69,6 +110,7 @@
 static void DoNothing(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {}
 
 static void BM_ClosureInitAgainstExecCtx(benchmark::State& state) {
+  TrackCounters track_counters(state);
   grpc_closure c;
   while (state.KeepRunning()) {
     benchmark::DoNotOptimize(
@@ -78,6 +120,7 @@
 BENCHMARK(BM_ClosureInitAgainstExecCtx);
 
 static void BM_ClosureInitAgainstCombiner(benchmark::State& state) {
+  TrackCounters track_counters(state);
   grpc_combiner* combiner = grpc_combiner_create(NULL);
   grpc_closure c;
   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@@ -91,6 +134,7 @@
 BENCHMARK(BM_ClosureInitAgainstCombiner);
 
 static void BM_ClosureRunOnExecCtx(benchmark::State& state) {
+  TrackCounters track_counters(state);
   grpc_closure c;
   grpc_closure_init(&c, DoNothing, NULL, grpc_schedule_on_exec_ctx);
   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@@ -103,6 +147,7 @@
 BENCHMARK(BM_ClosureRunOnExecCtx);
 
 static void BM_ClosureCreateAndRun(benchmark::State& state) {
+  TrackCounters track_counters(state);
   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
   while (state.KeepRunning()) {
     grpc_closure_run(&exec_ctx, grpc_closure_create(DoNothing, NULL,
@@ -114,6 +159,7 @@
 BENCHMARK(BM_ClosureCreateAndRun);
 
 static void BM_ClosureInitAndRun(benchmark::State& state) {
+  TrackCounters track_counters(state);
   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
   grpc_closure c;
   while (state.KeepRunning()) {
@@ -126,6 +172,7 @@
 BENCHMARK(BM_ClosureInitAndRun);
 
 static void BM_ClosureSchedOnExecCtx(benchmark::State& state) {
+  TrackCounters track_counters(state);
   grpc_closure c;
   grpc_closure_init(&c, DoNothing, NULL, grpc_schedule_on_exec_ctx);
   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@@ -138,6 +185,7 @@
 BENCHMARK(BM_ClosureSchedOnExecCtx);
 
 static void BM_ClosureSched2OnExecCtx(benchmark::State& state) {
+  TrackCounters track_counters(state);
   grpc_closure c1;
   grpc_closure c2;
   grpc_closure_init(&c1, DoNothing, NULL, grpc_schedule_on_exec_ctx);
@@ -153,6 +201,7 @@
 BENCHMARK(BM_ClosureSched2OnExecCtx);
 
 static void BM_ClosureSched3OnExecCtx(benchmark::State& state) {
+  TrackCounters track_counters(state);
   grpc_closure c1;
   grpc_closure c2;
   grpc_closure c3;
@@ -171,6 +220,7 @@
 BENCHMARK(BM_ClosureSched3OnExecCtx);
 
 static void BM_AcquireMutex(benchmark::State& state) {
+  TrackCounters track_counters(state);
   // for comparison with the combiner stuff below
   gpr_mu mu;
   gpr_mu_init(&mu);
@@ -185,6 +235,7 @@
 BENCHMARK(BM_AcquireMutex);
 
 static void BM_ClosureSchedOnCombiner(benchmark::State& state) {
+  TrackCounters track_counters(state);
   grpc_combiner* combiner = grpc_combiner_create(NULL);
   grpc_closure c;
   grpc_closure_init(&c, DoNothing, NULL,
@@ -200,6 +251,7 @@
 BENCHMARK(BM_ClosureSchedOnCombiner);
 
 static void BM_ClosureSched2OnCombiner(benchmark::State& state) {
+  TrackCounters track_counters(state);
   grpc_combiner* combiner = grpc_combiner_create(NULL);
   grpc_closure c1;
   grpc_closure c2;
@@ -219,6 +271,7 @@
 BENCHMARK(BM_ClosureSched2OnCombiner);
 
 static void BM_ClosureSched3OnCombiner(benchmark::State& state) {
+  TrackCounters track_counters(state);
   grpc_combiner* combiner = grpc_combiner_create(NULL);
   grpc_closure c1;
   grpc_closure c2;
@@ -242,6 +295,7 @@
 BENCHMARK(BM_ClosureSched3OnCombiner);
 
 static void BM_ClosureSched2OnTwoCombiners(benchmark::State& state) {
+  TrackCounters track_counters(state);
   grpc_combiner* combiner1 = grpc_combiner_create(NULL);
   grpc_combiner* combiner2 = grpc_combiner_create(NULL);
   grpc_closure c1;
@@ -263,6 +317,7 @@
 BENCHMARK(BM_ClosureSched2OnTwoCombiners);
 
 static void BM_ClosureSched4OnTwoCombiners(benchmark::State& state) {
+  TrackCounters track_counters(state);
   grpc_combiner* combiner1 = grpc_combiner_create(NULL);
   grpc_combiner* combiner2 = grpc_combiner_create(NULL);
   grpc_closure c1;
@@ -323,6 +378,7 @@
 };
 
 static void BM_ClosureReschedOnExecCtx(benchmark::State& state) {
+  TrackCounters track_counters(state);
   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
   Rescheduler r(state, grpc_schedule_on_exec_ctx);
   r.ScheduleFirst(&exec_ctx);
@@ -331,6 +387,7 @@
 BENCHMARK(BM_ClosureReschedOnExecCtx);
 
 static void BM_ClosureReschedOnCombiner(benchmark::State& state) {
+  TrackCounters track_counters(state);
   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
   grpc_combiner* combiner = grpc_combiner_create(NULL);
   Rescheduler r(state, grpc_combiner_scheduler(combiner, false));
@@ -342,6 +399,7 @@
 BENCHMARK(BM_ClosureReschedOnCombiner);
 
 static void BM_ClosureReschedOnCombinerFinally(benchmark::State& state) {
+  TrackCounters track_counters(state);
   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
   grpc_combiner* combiner = grpc_combiner_create(NULL);
   Rescheduler r(state, grpc_combiner_finally_scheduler(combiner, false));
diff --git a/test/cpp/microbenchmarks/bm_fullstack.cc b/test/cpp/microbenchmarks/bm_fullstack.cc
index c63de0c..48e131f 100644
--- a/test/cpp/microbenchmarks/bm_fullstack.cc
+++ b/test/cpp/microbenchmarks/bm_fullstack.cc
@@ -99,8 +99,10 @@
   c->SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, INT_MAX);
 }
 
-#ifdef GPR_MU_COUNTERS
-extern "C" gpr_atm grpc_mu_locks;
+#ifdef GPR_LOW_LEVEL_COUNTERS
+extern "C" gpr_atm gpr_mu_locks;
+extern "C" gpr_atm gpr_counter_atm_cas;
+extern "C" gpr_atm gpr_counter_atm_add;
 #endif
 
 class BaseFixture {
@@ -108,10 +110,18 @@
   void Finish(benchmark::State& s) {
     std::ostringstream out;
     this->AddToLabel(out, s);
-#ifdef GPR_MU_COUNTERS
-    out << " locks/iter:" << ((double)(gpr_atm_no_barrier_load(&grpc_mu_locks) -
+#ifdef GPR_LOW_LEVEL_COUNTERS
+    out << " locks/iter:" << ((double)(gpr_atm_no_barrier_load(&gpr_mu_locks) -
                                        mu_locks_at_start_) /
-                              (double)s.iterations());
+                              (double)s.iterations())
+        << " atm_cas/iter:"
+        << ((double)(gpr_atm_no_barrier_load(&gpr_counter_atm_cas) -
+                     atm_cas_at_start_) /
+            (double)s.iterations())
+        << " atm_add/iter:"
+        << ((double)(gpr_atm_no_barrier_load(&gpr_counter_atm_add) -
+                     atm_add_at_start_) /
+            (double)s.iterations());
 #endif
     grpc_memory_counters counters_at_end = grpc_memory_counters_snapshot();
     out << " allocs/iter:"
@@ -128,8 +138,12 @@
   virtual void AddToLabel(std::ostream& out, benchmark::State& s) = 0;
 
  private:
-#ifdef GPR_MU_COUNTERS
-  const size_t mu_locks_at_start_ = gpr_atm_no_barrier_load(&grpc_mu_locks);
+#ifdef GPR_LOW_LEVEL_COUNTERS
+  const size_t mu_locks_at_start_ = gpr_atm_no_barrier_load(&gpr_mu_locks);
+  const size_t atm_cas_at_start_ =
+      gpr_atm_no_barrier_load(&gpr_counter_atm_cas);
+  const size_t atm_add_at_start_ =
+      gpr_atm_no_barrier_load(&gpr_counter_atm_add);
 #endif
   grpc_memory_counters counters_at_start_ = grpc_memory_counters_snapshot();
 };
diff --git a/tools/profiling/microbenchmarks/bm2bq.py b/tools/profiling/microbenchmarks/bm2bq.py
index a7d8226..8ead4b4 100755
--- a/tools/profiling/microbenchmarks/bm2bq.py
+++ b/tools/profiling/microbenchmarks/bm2bq.py
@@ -66,6 +66,8 @@
   ('cli_stream_stalls_per_iteration', 'float'),
   ('svr_transport_stalls_per_iteration', 'float'),
   ('svr_stream_stalls_per_iteration', 'float'),
+  ('atm_cas_per_iteration', 'float')
+  ('atm_add_per_iteration', 'float')
 ]
 
 if sys.argv[1] == '--schema':
@@ -158,7 +160,7 @@
 for bm in js['benchmarks']:
   context = js['context']
   if 'label' in bm:
-    labels_list = [s.split(':') for s in bm['label'].split(' ')]
+    labels_list = [s.split(':') for s in bm['label'].strip().split(' ')]
     for el in labels_list:
       el[0] = el[0].replace('/iter', '_per_iteration')
     labels = dict(labels_list)