blob: 1504c027deb16deb156eafcc5e7219089a38abd4 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
3 * Copyright 2014, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34/* Available log space is divided up in blocks of
35 CENSUS_LOG_2_MAX_RECORD_SIZE bytes. A block can be in one of the
36 following three data structures:
37 - Free blocks (free_block_list)
38 - Blocks with unread data (dirty_block_list)
39 - Blocks currently attached to cores (core_local_blocks[])
40
41 census_log_start_write() moves a block from core_local_blocks[] to the
42 end of dirty_block_list when block:
43 - is out-of-space OR
44 - has an incomplete record (an incomplete record occurs when a thread calls
45 census_log_start_write() and is context-switched before calling
46 census_log_end_write()
47 So, blocks in dirty_block_list are ordered, from oldest to newest, by time
48 when block is detached from the core.
49
50 census_log_read_next() first iterates over dirty_block_list and then
51 core_local_blocks[]. It moves completely read blocks from dirty_block_list
52 to free_block_list. Blocks in core_local_blocks[] are not freed, even when
53 completely read.
54
55 If log is configured to discard old records and free_block_list is empty,
56 census_log_start_write() iterates over dirty_block_list to allocate a
57 new block. It moves the oldest available block (no pending read/write) to
58 core_local_blocks[].
59
60 core_local_block_struct is used to implement a map from core id to the block
61 associated with that core. This mapping is advisory. It is possible that the
62 block returned by this mapping is no longer associated with that core. This
63 mapping is updated, lazily, by census_log_start_write().
64
65 Locking in block struct:
66
67 Exclusive g_log.lock must be held before calling any functions operatong on
68 block structs except census_log_start_write() and
69 census_log_end_write().
70
71 Writes to a block are serialized via writer_lock.
72 census_log_start_write() acquires this lock and
73 census_log_end_write() releases it. On failure to acquire the lock,
74 writer allocates a new block for the current core and updates
75 core_local_block accordingly.
76
77 Simultaneous read and write access is allowed. Reader can safely read up to
78 committed bytes (bytes_committed).
79
80 reader_lock protects the block, currently being read, from getting recycled.
81 start_read() acquires reader_lock and end_read() releases the lock.
82
83 Read/write access to a block is disabled via try_disable_access(). It returns
84 with both writer_lock and reader_lock held. These locks are subsequently
85 released by enable_access() to enable access to the block.
86
87 A note on naming: Most function/struct names are prepended by cl_
88 (shorthand for census_log). Further, functions that manipulate structures
89 include the name of the structure, which will be passed as the first
90 argument. E.g. cl_block_initialize() will initialize a cl_block.
91*/
nnoble8a67b5c2014-12-12 10:48:34 -080092#include "src/core/statistics/census_log.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080093#include <string.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080094#include <grpc/support/alloc.h>
95#include <grpc/support/atm.h>
Craig Tiller1801e422015-02-09 14:27:35 -080096#include <grpc/support/cpu.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080097#include <grpc/support/log.h>
98#include <grpc/support/port_platform.h>
99#include <grpc/support/sync.h>
aveitch482a5be2014-12-15 10:25:12 -0800100#include <grpc/support/useful.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800101
102/* End of platform specific code */
103
104typedef struct census_log_block_list_struct {
105 struct census_log_block_list_struct* next;
106 struct census_log_block_list_struct* prev;
107 struct census_log_block* block;
108} cl_block_list_struct;
109
110typedef struct census_log_block {
111 /* Pointer to underlying buffer */
112 char* buffer;
113 gpr_atm writer_lock;
114 gpr_atm reader_lock;
115 /* Keeps completely written bytes. Declared atomic because accessed
116 simultaneously by reader and writer. */
117 gpr_atm bytes_committed;
118 /* Bytes already read */
119 gpr_int32 bytes_read;
120 /* Links for list */
121 cl_block_list_struct link;
aveitch482a5be2014-12-15 10:25:12 -0800122/* We want this structure to be cacheline aligned. We assume the following
123 sizes for the various parts on 32/64bit systems:
124 type 32b size 64b size
125 char* 4 8
126 3x gpr_atm 12 24
127 gpr_int32 4 8 (assumes padding)
128 cl_block_list_struct 12 24
129 TOTAL 32 64
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800130
aveitch482a5be2014-12-15 10:25:12 -0800131 Depending on the size of our cacheline and the architecture, we
132 selectively add char buffering to this structure. The size is checked
133 via assert in census_log_initialize(). */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800134#if defined(GPR_ARCH_64)
135#define CL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 64)
136#else
137#if defined(GPR_ARCH_32)
138#define CL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 32)
139#else
140#error "Unknown architecture"
141#endif
142#endif
143#if CL_BLOCK_PAD_SIZE > 0
144 char padding[CL_BLOCK_PAD_SIZE];
145#endif
146} cl_block;
147
148/* A list of cl_blocks, doubly-linked through cl_block::link. */
149typedef struct census_log_block_list {
aveitch482a5be2014-12-15 10:25:12 -0800150 gpr_int32 count; /* Number of items in list. */
151 cl_block_list_struct ht; /* head/tail of linked list. */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800152} cl_block_list;
153
154/* Cacheline aligned block pointers to avoid false sharing. Block pointer must
155 be initialized via set_block(), before calling other functions */
156typedef struct census_log_core_local_block {
157 gpr_atm block;
aveitch482a5be2014-12-15 10:25:12 -0800158/* Ensure cachline alignment: we assume sizeof(gpr_atm) == 4 or 8 */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800159#if defined(GPR_ARCH_64)
160#define CL_CORE_LOCAL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 8)
161#else
162#if defined(GPR_ARCH_32)
163#define CL_CORE_LOCAL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 4)
164#else
165#error "Unknown architecture"
166#endif
167#endif
168#if CL_CORE_LOCAL_BLOCK_PAD_SIZE > 0
169 char padding[CL_CORE_LOCAL_BLOCK_PAD_SIZE];
170#endif
171} cl_core_local_block;
172
173struct census_log {
174 int discard_old_records;
175 /* Number of cores (aka hardware-contexts) */
Nicolas "Pixel" Noble213ed912015-01-30 02:11:35 +0100176 unsigned num_cores;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800177 /* number of CENSUS_LOG_2_MAX_RECORD_SIZE blocks in log */
178 gpr_int32 num_blocks;
aveitch482a5be2014-12-15 10:25:12 -0800179 cl_block* blocks; /* Block metadata. */
180 cl_core_local_block* core_local_blocks; /* Keeps core to block mappings. */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800181 gpr_mu lock;
aveitch482a5be2014-12-15 10:25:12 -0800182 int initialized; /* has log been initialized? */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800183 /* Keeps the state of the reader iterator. A value of 0 indicates that
184 iterator has reached the end. census_log_init_reader() resets the
185 value to num_core to restart iteration. */
Nicolas "Pixel" Noble213ed912015-01-30 02:11:35 +0100186 gpr_uint32 read_iterator_state;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800187 /* Points to the block being read. If non-NULL, the block is locked for
188 reading (block_being_read_->reader_lock is held). */
189 cl_block* block_being_read;
190 /* A non-zero value indicates that log is full. */
191 gpr_atm is_full;
192 char* buffer;
193 cl_block_list free_block_list;
194 cl_block_list dirty_block_list;
195 gpr_atm out_of_space_count;
196};
197
198/* Single internal log */
199static struct census_log g_log;
200
201/* Functions that operate on an atomic memory location used as a lock */
202
203/* Returns non-zero if lock is acquired */
aveitch482a5be2014-12-15 10:25:12 -0800204static int cl_try_lock(gpr_atm* lock) { return gpr_atm_acq_cas(lock, 0, 1); }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800205
aveitch482a5be2014-12-15 10:25:12 -0800206static void cl_unlock(gpr_atm* lock) { gpr_atm_rel_store(lock, 0); }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800207
208/* Functions that operate on cl_core_local_block's */
209
210static void cl_core_local_block_set_block(cl_core_local_block* clb,
211 cl_block* block) {
212 gpr_atm_rel_store(&clb->block, (gpr_atm)block);
213}
214
215static cl_block* cl_core_local_block_get_block(cl_core_local_block* clb) {
216 return (cl_block*)gpr_atm_acq_load(&clb->block);
217}
218
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800219/* Functions that operate on cl_block_list_struct's */
220
221static void cl_block_list_struct_initialize(cl_block_list_struct* bls,
222 cl_block* block) {
223 bls->next = bls->prev = bls;
224 bls->block = block;
225}
226
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800227/* Functions that operate on cl_block_list's */
228
229static void cl_block_list_initialize(cl_block_list* list) {
230 list->count = 0;
231 cl_block_list_struct_initialize(&list->ht, NULL);
232}
233
234/* Returns head of *this, or NULL if empty. */
235static cl_block* cl_block_list_head(cl_block_list* list) {
236 return list->ht.next->block;
237}
238
239/* Insert element *e after *pos. */
aveitch482a5be2014-12-15 10:25:12 -0800240static void cl_block_list_insert(cl_block_list* list, cl_block_list_struct* pos,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800241 cl_block_list_struct* e) {
242 list->count++;
243 e->next = pos->next;
244 e->prev = pos;
245 e->next->prev = e;
246 e->prev->next = e;
247}
248
249/* Insert block at the head of the list */
aveitch482a5be2014-12-15 10:25:12 -0800250static void cl_block_list_insert_at_head(cl_block_list* list, cl_block* block) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800251 cl_block_list_insert(list, &list->ht, &block->link);
252}
253
254/* Insert block at the tail of the list */
aveitch482a5be2014-12-15 10:25:12 -0800255static void cl_block_list_insert_at_tail(cl_block_list* list, cl_block* block) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800256 cl_block_list_insert(list, list->ht.prev, &block->link);
257}
258
259/* Removes block *b. Requires *b be in the list. */
260static void cl_block_list_remove(cl_block_list* list, cl_block* b) {
261 list->count--;
262 b->link.next->prev = b->link.prev;
263 b->link.prev->next = b->link.next;
264}
265
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800266/* Functions that operate on cl_block's */
267
268static void cl_block_initialize(cl_block* block, char* buffer) {
269 block->buffer = buffer;
270 gpr_atm_rel_store(&block->writer_lock, 0);
271 gpr_atm_rel_store(&block->reader_lock, 0);
272 gpr_atm_rel_store(&block->bytes_committed, 0);
273 block->bytes_read = 0;
274 cl_block_list_struct_initialize(&block->link, block);
275}
276
277/* Guards against exposing partially written buffer to the reader. */
278static void cl_block_set_bytes_committed(cl_block* block,
279 gpr_int32 bytes_committed) {
280 gpr_atm_rel_store(&block->bytes_committed, bytes_committed);
281}
282
283static gpr_int32 cl_block_get_bytes_committed(cl_block* block) {
284 return gpr_atm_acq_load(&block->bytes_committed);
285}
286
287/* Tries to disable future read/write access to this block. Succeeds if:
288 - no in-progress write AND
289 - no in-progress read AND
290 - 'discard_data' set to true OR no unread data
291 On success, clears the block state and returns with writer_lock_ and
292 reader_lock_ held. These locks are released by a subsequent
293 cl_block_access_enable() call. */
294static int cl_block_try_disable_access(cl_block* block, int discard_data) {
295 if (!cl_try_lock(&block->writer_lock)) {
296 return 0;
297 }
298 if (!cl_try_lock(&block->reader_lock)) {
299 cl_unlock(&block->writer_lock);
300 return 0;
301 }
302 if (!discard_data &&
303 (block->bytes_read != cl_block_get_bytes_committed(block))) {
304 cl_unlock(&block->reader_lock);
305 cl_unlock(&block->writer_lock);
306 return 0;
307 }
308 cl_block_set_bytes_committed(block, 0);
309 block->bytes_read = 0;
310 return 1;
311}
312
313static void cl_block_enable_access(cl_block* block) {
314 cl_unlock(&block->reader_lock);
315 cl_unlock(&block->writer_lock);
316}
317
318/* Returns with writer_lock held. */
319static void* cl_block_start_write(cl_block* block, size_t size) {
320 gpr_int32 bytes_committed;
321 if (!cl_try_lock(&block->writer_lock)) {
322 return NULL;
323 }
324 bytes_committed = cl_block_get_bytes_committed(block);
325 if (bytes_committed + size > CENSUS_LOG_MAX_RECORD_SIZE) {
326 cl_unlock(&block->writer_lock);
327 return NULL;
328 }
329 return block->buffer + bytes_committed;
330}
331
332/* Releases writer_lock and increments committed bytes by 'bytes_written'.
333 'bytes_written' must be <= 'size' specified in the corresponding
334 StartWrite() call. This function is thread-safe. */
335static void cl_block_end_write(cl_block* block, size_t bytes_written) {
336 cl_block_set_bytes_committed(
337 block, cl_block_get_bytes_committed(block) + bytes_written);
338 cl_unlock(&block->writer_lock);
339}
340
341/* Returns a pointer to the first unread byte in buffer. The number of bytes
342 available are returned in 'bytes_available'. Acquires reader lock that is
343 released by a subsequent cl_block_end_read() call. Returns NULL if:
344 - read in progress
345 - no data available */
346static void* cl_block_start_read(cl_block* block, size_t* bytes_available) {
347 void* record;
348 if (!cl_try_lock(&block->reader_lock)) {
349 return NULL;
350 }
351 /* bytes_committed may change from under us. Use bytes_available to update
352 bytes_read below. */
353 *bytes_available = cl_block_get_bytes_committed(block) - block->bytes_read;
354 if (*bytes_available == 0) {
355 cl_unlock(&block->reader_lock);
356 return NULL;
357 }
358 record = block->buffer + block->bytes_read;
359 block->bytes_read += *bytes_available;
360 return record;
361}
362
363static void cl_block_end_read(cl_block* block) {
364 cl_unlock(&block->reader_lock);
365}
366
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800367/* Internal functions operating on g_log */
368
369/* Allocates a new free block (or recycles an available dirty block if log is
370 configured to discard old records). Returns NULL if out-of-space. */
Craig Tiller32946d32015-01-15 11:37:30 -0800371static cl_block* cl_allocate_block(void) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800372 cl_block* block = cl_block_list_head(&g_log.free_block_list);
373 if (block != NULL) {
374 cl_block_list_remove(&g_log.free_block_list, block);
375 return block;
376 }
377 if (!g_log.discard_old_records) {
378 /* No free block and log is configured to keep old records. */
379 return NULL;
380 }
381 /* Recycle dirty block. Start from the oldest. */
382 for (block = cl_block_list_head(&g_log.dirty_block_list); block != NULL;
383 block = block->link.next->block) {
384 if (cl_block_try_disable_access(block, 1 /* discard data */)) {
385 cl_block_list_remove(&g_log.dirty_block_list, block);
386 return block;
387 }
388 }
389 return NULL;
390}
391
392/* Allocates a new block and updates core id => block mapping. 'old_block'
393 points to the block that the caller thinks is attached to
394 'core_id'. 'old_block' may be NULL. Returns non-zero if:
395 - allocated a new block OR
396 - 'core_id' => 'old_block' mapping changed (another thread allocated a
397 block before lock was acquired). */
398static int cl_allocate_core_local_block(gpr_int32 core_id,
399 cl_block* old_block) {
400 /* Now that we have the lock, check if core-local mapping has changed. */
401 cl_core_local_block* core_local_block = &g_log.core_local_blocks[core_id];
402 cl_block* block = cl_core_local_block_get_block(core_local_block);
403 if ((block != NULL) && (block != old_block)) {
404 return 1;
405 }
406 if (block != NULL) {
407 cl_core_local_block_set_block(core_local_block, NULL);
408 cl_block_list_insert_at_tail(&g_log.dirty_block_list, block);
409 }
410 block = cl_allocate_block();
411 if (block == NULL) {
412 gpr_atm_rel_store(&g_log.is_full, 1);
413 return 0;
414 }
415 cl_core_local_block_set_block(core_local_block, block);
416 cl_block_enable_access(block);
417 return 1;
418}
419
420static cl_block* cl_get_block(void* record) {
421 gpr_uintptr p = (gpr_uintptr)((char*)record - g_log.buffer);
422 gpr_uintptr index = p >> CENSUS_LOG_2_MAX_RECORD_SIZE;
423 return &g_log.blocks[index];
424}
425
426/* Gets the next block to read and tries to free 'prev' block (if not NULL).
427 Returns NULL if reached the end. */
428static cl_block* cl_next_block_to_read(cl_block* prev) {
429 cl_block* block = NULL;
430 if (g_log.read_iterator_state == g_log.num_cores) {
431 /* We are traversing dirty list; find the next dirty block. */
432 if (prev != NULL) {
433 /* Try to free the previous block if there is no unread data. This block
434 may have unread data if previously incomplete record completed between
435 read_next() calls. */
436 block = prev->link.next->block;
437 if (cl_block_try_disable_access(prev, 0 /* do not discard data */)) {
438 cl_block_list_remove(&g_log.dirty_block_list, prev);
439 cl_block_list_insert_at_head(&g_log.free_block_list, prev);
440 gpr_atm_rel_store(&g_log.is_full, 0);
441 }
442 } else {
443 block = cl_block_list_head(&g_log.dirty_block_list);
444 }
445 if (block != NULL) {
446 return block;
447 }
448 /* We are done with the dirty list; moving on to core-local blocks. */
449 }
450 while (g_log.read_iterator_state > 0) {
451 g_log.read_iterator_state--;
452 block = cl_core_local_block_get_block(
453 &g_log.core_local_blocks[g_log.read_iterator_state]);
454 if (block != NULL) {
455 return block;
456 }
457 }
458 return NULL;
459}
460
461/* External functions: primary stats_log interface */
462void census_log_initialize(size_t size_in_mb, int discard_old_records) {
463 gpr_int32 ix;
aveitch482a5be2014-12-15 10:25:12 -0800464 /* Check cacheline alignment. */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800465 GPR_ASSERT(sizeof(cl_block) % GPR_CACHELINE_SIZE == 0);
466 GPR_ASSERT(sizeof(cl_core_local_block) % GPR_CACHELINE_SIZE == 0);
467 GPR_ASSERT(!g_log.initialized);
468 g_log.discard_old_records = discard_old_records;
469 g_log.num_cores = gpr_cpu_num_cores();
aveitch482a5be2014-12-15 10:25:12 -0800470 /* Ensure at least as many blocks as there are cores. */
471 g_log.num_blocks = GPR_MAX(
472 g_log.num_cores, (size_in_mb << 20) >> CENSUS_LOG_2_MAX_RECORD_SIZE);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800473 gpr_mu_init(&g_log.lock);
474 g_log.read_iterator_state = 0;
475 g_log.block_being_read = NULL;
476 gpr_atm_rel_store(&g_log.is_full, 0);
477 g_log.core_local_blocks = (cl_core_local_block*)gpr_malloc_aligned(
478 g_log.num_cores * sizeof(cl_core_local_block), GPR_CACHELINE_SIZE);
479 memset(g_log.core_local_blocks, 0,
480 g_log.num_cores * sizeof(cl_core_local_block));
481 g_log.blocks = (cl_block*)gpr_malloc_aligned(
482 g_log.num_blocks * sizeof(cl_block), GPR_CACHELINE_SIZE);
483 memset(g_log.blocks, 0, g_log.num_blocks * sizeof(cl_block));
484 g_log.buffer = gpr_malloc(g_log.num_blocks * CENSUS_LOG_MAX_RECORD_SIZE);
485 memset(g_log.buffer, 0, g_log.num_blocks * CENSUS_LOG_MAX_RECORD_SIZE);
486 cl_block_list_initialize(&g_log.free_block_list);
487 cl_block_list_initialize(&g_log.dirty_block_list);
488 for (ix = 0; ix < g_log.num_blocks; ++ix) {
489 cl_block* block = g_log.blocks + ix;
490 cl_block_initialize(block,
aveitch482a5be2014-12-15 10:25:12 -0800491 g_log.buffer + (CENSUS_LOG_MAX_RECORD_SIZE * ix));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800492 cl_block_try_disable_access(block, 1 /* discard data */);
493 cl_block_list_insert_at_tail(&g_log.free_block_list, block);
494 }
495 gpr_atm_rel_store(&g_log.out_of_space_count, 0);
496 g_log.initialized = 1;
497}
498
Craig Tiller32946d32015-01-15 11:37:30 -0800499void census_log_shutdown(void) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800500 GPR_ASSERT(g_log.initialized);
501 gpr_mu_destroy(&g_log.lock);
502 gpr_free_aligned(g_log.core_local_blocks);
503 g_log.core_local_blocks = NULL;
504 gpr_free_aligned(g_log.blocks);
505 g_log.blocks = NULL;
506 gpr_free(g_log.buffer);
507 g_log.buffer = NULL;
508 g_log.initialized = 0;
509}
510
511void* census_log_start_write(size_t size) {
512 /* Used to bound number of times block allocation is attempted. */
513 gpr_int32 attempts_remaining = g_log.num_blocks;
514 /* TODO(aveitch): move this inside the do loop when current_cpu is fixed */
515 gpr_int32 core_id = gpr_cpu_current_cpu();
516 GPR_ASSERT(g_log.initialized);
517 if (size > CENSUS_LOG_MAX_RECORD_SIZE) {
518 return NULL;
519 }
520 do {
521 int allocated;
522 void* record = NULL;
523 cl_block* block =
524 cl_core_local_block_get_block(&g_log.core_local_blocks[core_id]);
525 if (block && (record = cl_block_start_write(block, size))) {
526 return record;
527 }
528 /* Need to allocate a new block. We are here if:
529 - No block associated with the core OR
530 - Write in-progress on the block OR
531 - block is out of space */
532 if (gpr_atm_acq_load(&g_log.is_full)) {
533 gpr_atm_no_barrier_fetch_add(&g_log.out_of_space_count, 1);
534 return NULL;
535 }
536 gpr_mu_lock(&g_log.lock);
537 allocated = cl_allocate_core_local_block(core_id, block);
538 gpr_mu_unlock(&g_log.lock);
539 if (!allocated) {
540 gpr_atm_no_barrier_fetch_add(&g_log.out_of_space_count, 1);
541 return NULL;
542 }
543 } while (attempts_remaining--);
544 /* Give up. */
545 gpr_atm_no_barrier_fetch_add(&g_log.out_of_space_count, 1);
546 return NULL;
547}
548
549void census_log_end_write(void* record, size_t bytes_written) {
550 GPR_ASSERT(g_log.initialized);
551 cl_block_end_write(cl_get_block(record), bytes_written);
552}
553
Craig Tiller32946d32015-01-15 11:37:30 -0800554void census_log_init_reader(void) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800555 GPR_ASSERT(g_log.initialized);
556 gpr_mu_lock(&g_log.lock);
557 /* If a block is locked for reading unlock it. */
558 if (g_log.block_being_read != NULL) {
559 cl_block_end_read(g_log.block_being_read);
560 g_log.block_being_read = NULL;
561 }
562 g_log.read_iterator_state = g_log.num_cores;
563 gpr_mu_unlock(&g_log.lock);
564}
565
566const void* census_log_read_next(size_t* bytes_available) {
567 GPR_ASSERT(g_log.initialized);
568 gpr_mu_lock(&g_log.lock);
569 if (g_log.block_being_read != NULL) {
570 cl_block_end_read(g_log.block_being_read);
571 }
572 do {
573 g_log.block_being_read = cl_next_block_to_read(g_log.block_being_read);
574 if (g_log.block_being_read != NULL) {
aveitch482a5be2014-12-15 10:25:12 -0800575 void* record =
576 cl_block_start_read(g_log.block_being_read, bytes_available);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800577 if (record != NULL) {
578 gpr_mu_unlock(&g_log.lock);
579 return record;
580 }
581 }
582 } while (g_log.block_being_read != NULL);
583 gpr_mu_unlock(&g_log.lock);
584 return NULL;
585}
586
Craig Tiller32946d32015-01-15 11:37:30 -0800587size_t census_log_remaining_space(void) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800588 size_t space;
589 GPR_ASSERT(g_log.initialized);
590 gpr_mu_lock(&g_log.lock);
591 if (g_log.discard_old_records) {
592 /* Remaining space is not meaningful; just return the entire log space. */
593 space = g_log.num_blocks << CENSUS_LOG_2_MAX_RECORD_SIZE;
594 } else {
595 space = g_log.free_block_list.count * CENSUS_LOG_MAX_RECORD_SIZE;
596 }
597 gpr_mu_unlock(&g_log.lock);
598 return space;
599}
600
Craig Tiller32946d32015-01-15 11:37:30 -0800601int census_log_out_of_space_count(void) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800602 GPR_ASSERT(g_log.initialized);
603 return gpr_atm_acq_load(&g_log.out_of_space_count);
604}