Merge pull request #387 from maxwell-demon/census_active_ops
V0 implementation of census_get_active_ops().
diff --git a/src/core/statistics/census_rpc_stats.c b/src/core/statistics/census_rpc_stats.c
index 785c091..fc66cb9 100644
--- a/src/core/statistics/census_rpc_stats.c
+++ b/src/core/statistics/census_rpc_stats.c
@@ -141,7 +141,7 @@
const census_rpc_stats* stats) {
gpr_mu_lock(&g_mu);
if (store != NULL) {
- trace_obj* trace = NULL;
+ census_trace_obj* trace = NULL;
census_internal_lock_trace_store();
trace = census_get_trace_obj_locked(op_id);
if (trace != NULL) {
diff --git a/src/core/statistics/census_tracing.c b/src/core/statistics/census_tracing.c
index 3c4ba66..8b98323 100644
--- a/src/core/statistics/census_tracing.c
+++ b/src/core/statistics/census_tracing.c
@@ -32,38 +32,22 @@
*/
#include "src/core/statistics/census_interface.h"
+#include "src/core/statistics/census_tracing.h"
#include <stdio.h>
#include <string.h>
-#include "src/core/statistics/census_rpc_stats.h"
#include "src/core/statistics/hash_table.h"
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/sync.h>
-#include <grpc/support/time.h>
-/* Struct for a trace annotation. */
-typedef struct annotation {
- gpr_timespec ts; /* timestamp of the annotation */
- char txt[CENSUS_MAX_ANNOTATION_LENGTH + 1]; /* actual txt annotation */
- struct annotation* next;
-} annotation;
-
-typedef struct trace_obj {
- census_op_id id;
- gpr_timespec ts;
- census_rpc_stats rpc_stats;
- char* method;
- annotation* annotations;
-} trace_obj;
-
-static void trace_obj_destroy(trace_obj* obj) {
- annotation* p = obj->annotations;
+void census_trace_obj_destroy(census_trace_obj* obj) {
+ census_trace_annotation* p = obj->annotations;
while (p != NULL) {
- annotation* next = p->next;
+ census_trace_annotation* next = p->next;
gpr_free(p);
p = next;
}
@@ -71,7 +55,9 @@
gpr_free(obj);
}
-static void delete_trace_obj(void* obj) { trace_obj_destroy((trace_obj*)obj); }
+static void delete_trace_obj(void* obj) {
+ census_trace_obj_destroy((census_trace_obj*)obj);
+}
static const census_ht_option ht_opt = {
CENSUS_HT_UINT64 /* key type*/, 571 /* n_of_buckets */, NULL /* hash */,
@@ -103,8 +89,8 @@
census_op_id census_tracing_start_op(void) {
gpr_mu_lock(&g_mu);
{
- trace_obj* ret = (trace_obj*)gpr_malloc(sizeof(trace_obj));
- memset(ret, 0, sizeof(trace_obj));
+ census_trace_obj* ret = gpr_malloc(sizeof(census_trace_obj));
+ memset(ret, 0, sizeof(census_trace_obj));
g_id++;
memcpy(&ret->id, &g_id, sizeof(census_op_id));
ret->rpc_stats.cnt = 1;
@@ -118,7 +104,7 @@
int census_add_method_tag(census_op_id op_id, const char* method) {
int ret = 0;
- trace_obj* trace = NULL;
+ census_trace_obj* trace = NULL;
gpr_mu_lock(&g_mu);
trace = census_ht_find(g_trace_store, op_id_as_key(&op_id));
if (trace == NULL) {
@@ -131,11 +117,11 @@
}
void census_tracing_print(census_op_id op_id, const char* anno_txt) {
- trace_obj* trace = NULL;
+ census_trace_obj* trace = NULL;
gpr_mu_lock(&g_mu);
trace = census_ht_find(g_trace_store, op_id_as_key(&op_id));
if (trace != NULL) {
- annotation* anno = gpr_malloc(sizeof(annotation));
+ census_trace_annotation* anno = gpr_malloc(sizeof(census_trace_annotation));
anno->ts = gpr_now();
{
char* d = anno->txt;
@@ -153,7 +139,7 @@
}
void census_tracing_end_op(census_op_id op_id) {
- trace_obj* trace = NULL;
+ census_trace_obj* trace = NULL;
gpr_mu_lock(&g_mu);
trace = census_ht_find(g_trace_store, op_id_as_key(&op_id));
if (trace != NULL) {
@@ -196,14 +182,58 @@
void census_internal_unlock_trace_store(void) { gpr_mu_unlock(&g_mu); }
-trace_obj* census_get_trace_obj_locked(census_op_id op_id) {
+census_trace_obj* census_get_trace_obj_locked(census_op_id op_id) {
if (g_trace_store == NULL) {
gpr_log(GPR_ERROR, "Census trace store is not initialized.");
return NULL;
}
- return (trace_obj*)census_ht_find(g_trace_store, op_id_as_key(&op_id));
+ return (census_trace_obj*)census_ht_find(g_trace_store, op_id_as_key(&op_id));
}
-const char* census_get_trace_method_name(const trace_obj* trace) {
- return (const char*)trace->method;
+const char* census_get_trace_method_name(const census_trace_obj* trace) {
+ return trace->method;
+}
+
+static census_trace_annotation* dup_annotation_chain(
+ census_trace_annotation* from) {
+ census_trace_annotation *ret = NULL;
+ census_trace_annotation **to = &ret;
+ for (; from != NULL; from = from->next) {
+ *to = gpr_malloc(sizeof(census_trace_annotation));
+ memcpy(*to, from, sizeof(census_trace_annotation));
+ to = &(*to)->next;
+ }
+ return ret;
+}
+
+static census_trace_obj* trace_obj_dup(census_trace_obj* from) {
+ census_trace_obj* to = NULL;
+ GPR_ASSERT(from != NULL);
+ to = gpr_malloc(sizeof(census_trace_obj));
+ to->id = from->id;
+ to->ts = from->ts;
+ to->rpc_stats = from->rpc_stats;
+ to->method = gpr_strdup(from->method);
+ to->annotations = dup_annotation_chain(from->annotations);
+ return to;
+}
+
+census_trace_obj** census_get_active_ops(int* num_active_ops) {
+ census_trace_obj** ret = NULL;
+ gpr_mu_lock(&g_mu);
+ if (g_trace_store != NULL) {
+ size_t n = 0;
+ census_ht_kv* all_kvs = census_ht_get_all_elements(g_trace_store, &n);
+ *num_active_ops = (int)n;
+ if (n != 0 ) {
+ size_t i = 0;
+ ret = gpr_malloc(sizeof(census_trace_obj *) * n);
+ for (i = 0; i < n; i++) {
+ ret[i] = trace_obj_dup((census_trace_obj*)all_kvs[i].v);
+ }
+ }
+ gpr_free(all_kvs);
+ }
+ gpr_mu_unlock(&g_mu);
+ return ret;
}
diff --git a/src/core/statistics/census_tracing.h b/src/core/statistics/census_tracing.h
index f356c94..88a06a4 100644
--- a/src/core/statistics/census_tracing.h
+++ b/src/core/statistics/census_tracing.h
@@ -34,12 +34,35 @@
#ifndef __GRPC_INTERNAL_STATISTICS_CENSUS_TRACING_H_
#define __GRPC_INTERNAL_STATISTICS_CENSUS_TRACING_H_
+#include <grpc/support/time.h>
+#include "src/core/statistics/census_rpc_stats.h"
+
+/* WARNING: The data structures and APIs provided by this file are for GRPC
+ library's internal use ONLY. They might be changed in backward-incompatible
+ ways and are not subject to any deprecation policy.
+ They are not recommended for external use.
+ */
#ifdef __cplusplus
extern "C" {
#endif
-/* Opaque structure for trace object */
-typedef struct trace_obj trace_obj;
+/* Struct for a trace annotation. */
+typedef struct census_trace_annotation {
+ gpr_timespec ts; /* timestamp of the annotation */
+ char txt[CENSUS_MAX_ANNOTATION_LENGTH + 1]; /* actual txt annotation */
+ struct census_trace_annotation* next;
+} census_trace_annotation;
+
+typedef struct census_trace_obj {
+ census_op_id id;
+ gpr_timespec ts;
+ census_rpc_stats rpc_stats;
+ char* method;
+ census_trace_annotation* annotations;
+} census_trace_obj;
+
+/* Deletes trace object. */
+void census_trace_obj_destroy(census_trace_obj* obj);
/* Initializes trace store. This function is thread safe. */
void census_tracing_init(void);
@@ -50,15 +73,21 @@
/* Gets trace obj corresponding to the input op_id. Returns NULL if trace store
is not initialized or trace obj is not found. Requires trace store being
locked before calling this function. */
-trace_obj* census_get_trace_obj_locked(census_op_id op_id);
+census_trace_obj* census_get_trace_obj_locked(census_op_id op_id);
/* The following two functions acquire and release the trace store global lock.
They are for census internal use only. */
void census_internal_lock_trace_store(void);
void census_internal_unlock_trace_store(void);
-/* Gets method tag name associated with the input trace object. */
-const char* census_get_trace_method_name(const trace_obj* trace);
+/* Gets method name associated with the input trace object. */
+const char* census_get_trace_method_name(const census_trace_obj* trace);
+
+/* Returns an array of pointers to trace objects of currently active operations
+ and fills in number of active operations. Returns NULL if there are no active
+ operations.
+ Caller owns the returned objects. */
+census_trace_obj** census_get_active_ops(int* num_active_ops);
#ifdef __cplusplus
}
diff --git a/test/core/statistics/trace_test.c b/test/core/statistics/trace_test.c
index 6eafcf1..97e1463 100644
--- a/test/core/statistics/trace_test.c
+++ b/test/core/statistics/trace_test.c
@@ -32,10 +32,12 @@
*/
#include <string.h>
+#include <stdio.h>
#include "src/core/statistics/census_interface.h"
#include "src/core/statistics/census_tracing.h"
#include "src/core/statistics/census_tracing.h"
+#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/sync.h>
@@ -172,6 +174,74 @@
census_tracing_shutdown();
}
+/* Returns 1 if two ids are equal, otherwise returns 0. */
+static int ids_equal(census_op_id id1, census_op_id id2) {
+ return (id1.upper == id2.upper) && (id1.lower == id2.lower);
+}
+
+static void test_get_active_ops(void) {
+ census_op_id id_1, id_2, id_3;
+ census_trace_obj** active_ops;
+ const char* annotation_txt[] = {"annotation 1", "a2"};
+ int i = 0;
+ int n = 0;
+
+ gpr_log(GPR_INFO, "test_get_active_ops");
+ census_tracing_init();
+ /* No active ops before calling start_op(). */
+ active_ops = census_get_active_ops(&n);
+ GPR_ASSERT(active_ops == NULL);
+ GPR_ASSERT(n == 0);
+
+ /* Starts one op */
+ id_1 = census_tracing_start_op();
+ census_add_method_tag(id_1, "foo_1");
+ active_ops = census_get_active_ops(&n);
+ GPR_ASSERT(active_ops != NULL);
+ GPR_ASSERT(n == 1);
+ GPR_ASSERT(ids_equal(active_ops[0]->id, id_1));
+ census_trace_obj_destroy(active_ops[0]);
+ gpr_free(active_ops);
+ active_ops = NULL;
+
+ /* Start the second and the third ops */
+ id_2 = census_tracing_start_op();
+ census_add_method_tag(id_2, "foo_2");
+ id_3 = census_tracing_start_op();
+ census_add_method_tag(id_3, "foo_3");
+
+ active_ops = census_get_active_ops(&n);
+ GPR_ASSERT(n == 3);
+ for (i = 0; i < 3; i++) {
+ census_trace_obj_destroy(active_ops[i]);
+ }
+ gpr_free(active_ops);
+ active_ops = NULL;
+
+ /* End the second op and add annotations to the third ops*/
+ census_tracing_end_op(id_2);
+ census_tracing_print(id_3, annotation_txt[0]);
+ census_tracing_print(id_3, annotation_txt[1]);
+
+ active_ops = census_get_active_ops(&n);
+ GPR_ASSERT(active_ops != NULL);
+ GPR_ASSERT(n == 2);
+ for (i = 0; i < 2; i++) {
+ census_trace_obj_destroy(active_ops[i]);
+ }
+ gpr_free(active_ops);
+ active_ops = NULL;
+
+ /* End all ops. */
+ census_tracing_end_op(id_1);
+ census_tracing_end_op(id_3);
+ active_ops = census_get_active_ops(&n);
+ GPR_ASSERT(active_ops == NULL);
+ GPR_ASSERT(n == 0);
+
+ census_tracing_shutdown();
+}
+
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
test_init_shutdown();
@@ -180,5 +250,6 @@
test_concurrency();
test_add_method_tag_to_unknown_op_id();
test_trace_print();
+ test_get_active_ops();
return 0;
}