blob: 4b8c8466b3a235b7b78880030f066f5e7161e53c [file] [log] [blame]
Alistair Veitch85afe712016-02-02 17:58:15 -08001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2015 gRPC authors.
Alistair Veitch85afe712016-02-02 17:58:15 -08004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Alistair Veitch85afe712016-02-02 17:58:15 -08008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Alistair Veitch85afe712016-02-02 17:58:15 -080010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Alistair Veitch85afe712016-02-02 17:58:15 -080016 *
17 */
18
Alistair Veitch532519a2016-02-11 09:17:12 -080019// Implements an efficient in-memory log, optimized for multiple writers and
20// a single reader. Available log space is divided up in blocks of
21// CENSUS_LOG_2_MAX_RECORD_SIZE bytes. A block can be in one of the following
22// three data structures:
23// - Free blocks (free_block_list)
24// - Blocks with unread data (dirty_block_list)
25// - Blocks currently attached to cores (core_local_blocks[])
26//
27// census_log_start_write() moves a block from core_local_blocks[] to the end of
28// dirty_block_list when block:
29// - is out-of-space OR
30// - has an incomplete record (an incomplete record occurs when a thread calls
31// census_log_start_write() and is context-switched before calling
32// census_log_end_write()
33// So, blocks in dirty_block_list are ordered, from oldest to newest, by the
34// time when block is detached from the core.
35//
36// census_log_read_next() first iterates over dirty_block_list and then
37// core_local_blocks[]. It moves completely read blocks from dirty_block_list
38// to free_block_list. Blocks in core_local_blocks[] are not freed, even when
39// completely read.
40//
41// If the log is configured to discard old records and free_block_list is empty,
42// census_log_start_write() iterates over dirty_block_list to allocate a
43// new block. It moves the oldest available block (no pending read/write) to
44// core_local_blocks[].
45//
46// core_local_block_struct is used to implement a map from core id to the block
47// associated with that core. This mapping is advisory. It is possible that the
48// block returned by this mapping is no longer associated with that core. This
49// mapping is updated, lazily, by census_log_start_write().
50//
51// Locking in block struct:
52//
53// Exclusive g_log.lock must be held before calling any functions operating on
54// block structs except census_log_start_write() and census_log_end_write().
55//
56// Writes to a block are serialized via writer_lock. census_log_start_write()
57// acquires this lock and census_log_end_write() releases it. On failure to
58// acquire the lock, writer allocates a new block for the current core and
59// updates core_local_block accordingly.
60//
61// Simultaneous read and write access is allowed. Readers can safely read up to
62// committed bytes (bytes_committed).
63//
64// reader_lock protects the block, currently being read, from getting recycled.
65// start_read() acquires reader_lock and end_read() releases the lock.
66//
67// Read/write access to a block is disabled via try_disable_access(). It returns
68// with both writer_lock and reader_lock held. These locks are subsequently
69// released by enable_access() to enable access to the block.
70//
71// A note on naming: Most function/struct names are prepended by cl_
72// (shorthand for census_log). Further, functions that manipulate structures
73// include the name of the structure, which will be passed as the first
74// argument. E.g. cl_block_initialize() will initialize a cl_block.
Alistair Veitch85afe712016-02-02 17:58:15 -080075
Craig Tiller8f8e9f92016-03-29 09:41:28 -070076#include "src/core/ext/census/mlog.h"
Alistair Veitch85afe712016-02-02 17:58:15 -080077#include <grpc/support/alloc.h>
78#include <grpc/support/atm.h>
79#include <grpc/support/cpu.h>
80#include <grpc/support/log.h>
81#include <grpc/support/sync.h>
82#include <grpc/support/useful.h>
83#include <stdbool.h>
84#include <string.h>
85
Alistair Veitch532519a2016-02-11 09:17:12 -080086// End of platform specific code
Alistair Veitch85afe712016-02-02 17:58:15 -080087
88typedef struct census_log_block_list_struct {
89 struct census_log_block_list_struct* next;
90 struct census_log_block_list_struct* prev;
91 struct census_log_block* block;
92} cl_block_list_struct;
93
94typedef struct census_log_block {
Alistair Veitch532519a2016-02-11 09:17:12 -080095 // Pointer to underlying buffer.
Alistair Veitch85afe712016-02-02 17:58:15 -080096 char* buffer;
97 gpr_atm writer_lock;
98 gpr_atm reader_lock;
Alistair Veitch532519a2016-02-11 09:17:12 -080099 // Keeps completely written bytes. Declared atomic because accessed
100 // simultaneously by reader and writer.
Alistair Veitch85afe712016-02-02 17:58:15 -0800101 gpr_atm bytes_committed;
Alistair Veitch532519a2016-02-11 09:17:12 -0800102 // Bytes already read.
Alistair Veitch85afe712016-02-02 17:58:15 -0800103 size_t bytes_read;
Alistair Veitch532519a2016-02-11 09:17:12 -0800104 // Links for list.
Alistair Veitch85afe712016-02-02 17:58:15 -0800105 cl_block_list_struct link;
Alistair Veitch532519a2016-02-11 09:17:12 -0800106// We want this structure to be cacheline aligned. We assume the following
107// sizes for the various parts on 32/64bit systems:
108// type 32b size 64b size
109// char* 4 8
110// 3x gpr_atm 12 24
111// size_t 4 8
112// cl_block_list_struct 12 24
113// TOTAL 32 64
114//
115// Depending on the size of our cacheline and the architecture, we
116// selectively add char buffering to this structure. The size is checked
117// via assert in census_log_initialize().
Alistair Veitch85afe712016-02-02 17:58:15 -0800118#if defined(GPR_ARCH_64)
119#define CL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 64)
120#else
121#if defined(GPR_ARCH_32)
122#define CL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 32)
123#else
124#error "Unknown architecture"
125#endif
126#endif
127#if CL_BLOCK_PAD_SIZE > 0
128 char padding[CL_BLOCK_PAD_SIZE];
129#endif
130} cl_block;
131
Alistair Veitch532519a2016-02-11 09:17:12 -0800132// A list of cl_blocks, doubly-linked through cl_block::link.
Alistair Veitch85afe712016-02-02 17:58:15 -0800133typedef struct census_log_block_list {
Alistair Veitch532519a2016-02-11 09:17:12 -0800134 int32_t count; // Number of items in list.
135 cl_block_list_struct ht; // head/tail of linked list.
Alistair Veitch85afe712016-02-02 17:58:15 -0800136} cl_block_list;
137
Alistair Veitch532519a2016-02-11 09:17:12 -0800138// Cacheline aligned block pointers to avoid false sharing. Block pointer must
139// be initialized via set_block(), before calling other functions
Alistair Veitch85afe712016-02-02 17:58:15 -0800140typedef struct census_log_core_local_block {
141 gpr_atm block;
Alistair Veitch532519a2016-02-11 09:17:12 -0800142// Ensure cachline alignment: we assume sizeof(gpr_atm) == 4 or 8
Alistair Veitch85afe712016-02-02 17:58:15 -0800143#if defined(GPR_ARCH_64)
144#define CL_CORE_LOCAL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 8)
145#else
146#if defined(GPR_ARCH_32)
147#define CL_CORE_LOCAL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 4)
148#else
149#error "Unknown architecture"
150#endif
151#endif
152#if CL_CORE_LOCAL_BLOCK_PAD_SIZE > 0
153 char padding[CL_CORE_LOCAL_BLOCK_PAD_SIZE];
154#endif
155} cl_core_local_block;
156
157struct census_log {
158 int discard_old_records;
Alistair Veitch532519a2016-02-11 09:17:12 -0800159 // Number of cores (aka hardware-contexts)
Alistair Veitch85afe712016-02-02 17:58:15 -0800160 unsigned num_cores;
Alistair Veitch532519a2016-02-11 09:17:12 -0800161 // number of CENSUS_LOG_2_MAX_RECORD_SIZE blocks in log
Alistair Veitch85afe712016-02-02 17:58:15 -0800162 uint32_t num_blocks;
Alistair Veitch532519a2016-02-11 09:17:12 -0800163 cl_block* blocks; // Block metadata.
164 cl_core_local_block* core_local_blocks; // Keeps core to block mappings.
Alistair Veitch85afe712016-02-02 17:58:15 -0800165 gpr_mu lock;
Alistair Veitch532519a2016-02-11 09:17:12 -0800166 int initialized; // has log been initialized?
167 // Keeps the state of the reader iterator. A value of 0 indicates that
168 // iterator has reached the end. census_log_init_reader() resets the value
169 // to num_core to restart iteration.
Alistair Veitch85afe712016-02-02 17:58:15 -0800170 uint32_t read_iterator_state;
Alistair Veitch532519a2016-02-11 09:17:12 -0800171 // Points to the block being read. If non-NULL, the block is locked for
172 // reading(block_being_read_->reader_lock is held).
Alistair Veitch85afe712016-02-02 17:58:15 -0800173 cl_block* block_being_read;
Alistair Veitch85afe712016-02-02 17:58:15 -0800174 char* buffer;
175 cl_block_list free_block_list;
176 cl_block_list dirty_block_list;
177 gpr_atm out_of_space_count;
178};
179
Alistair Veitch532519a2016-02-11 09:17:12 -0800180// Single internal log.
Alistair Veitch85afe712016-02-02 17:58:15 -0800181static struct census_log g_log;
182
Alistair Veitch532519a2016-02-11 09:17:12 -0800183// Functions that operate on an atomic memory location used as a lock.
Alistair Veitch85afe712016-02-02 17:58:15 -0800184
Alistair Veitch532519a2016-02-11 09:17:12 -0800185// Returns non-zero if lock is acquired.
Alistair Veitch85afe712016-02-02 17:58:15 -0800186static int cl_try_lock(gpr_atm* lock) { return gpr_atm_acq_cas(lock, 0, 1); }
187
188static void cl_unlock(gpr_atm* lock) { gpr_atm_rel_store(lock, 0); }
189
Alistair Veitch532519a2016-02-11 09:17:12 -0800190// Functions that operate on cl_core_local_block's.
Alistair Veitch85afe712016-02-02 17:58:15 -0800191
192static void cl_core_local_block_set_block(cl_core_local_block* clb,
193 cl_block* block) {
194 gpr_atm_rel_store(&clb->block, (gpr_atm)block);
195}
196
197static cl_block* cl_core_local_block_get_block(cl_core_local_block* clb) {
198 return (cl_block*)gpr_atm_acq_load(&clb->block);
199}
200
Alistair Veitch532519a2016-02-11 09:17:12 -0800201// Functions that operate on cl_block_list_struct's.
Alistair Veitch85afe712016-02-02 17:58:15 -0800202
203static void cl_block_list_struct_initialize(cl_block_list_struct* bls,
204 cl_block* block) {
205 bls->next = bls->prev = bls;
206 bls->block = block;
207}
208
Alistair Veitch532519a2016-02-11 09:17:12 -0800209// Functions that operate on cl_block_list's.
Alistair Veitch85afe712016-02-02 17:58:15 -0800210
211static void cl_block_list_initialize(cl_block_list* list) {
212 list->count = 0;
213 cl_block_list_struct_initialize(&list->ht, NULL);
214}
215
Alistair Veitch532519a2016-02-11 09:17:12 -0800216// Returns head of *this, or NULL if empty.
Alistair Veitch85afe712016-02-02 17:58:15 -0800217static cl_block* cl_block_list_head(cl_block_list* list) {
218 return list->ht.next->block;
219}
220
Alistair Veitch532519a2016-02-11 09:17:12 -0800221// Insert element *e after *pos.
Alistair Veitch85afe712016-02-02 17:58:15 -0800222static void cl_block_list_insert(cl_block_list* list, cl_block_list_struct* pos,
223 cl_block_list_struct* e) {
224 list->count++;
225 e->next = pos->next;
226 e->prev = pos;
227 e->next->prev = e;
228 e->prev->next = e;
229}
230
Alistair Veitch532519a2016-02-11 09:17:12 -0800231// Insert block at the head of the list
Alistair Veitch85afe712016-02-02 17:58:15 -0800232static void cl_block_list_insert_at_head(cl_block_list* list, cl_block* block) {
233 cl_block_list_insert(list, &list->ht, &block->link);
234}
235
Alistair Veitch532519a2016-02-11 09:17:12 -0800236// Insert block at the tail of the list.
Alistair Veitch85afe712016-02-02 17:58:15 -0800237static void cl_block_list_insert_at_tail(cl_block_list* list, cl_block* block) {
238 cl_block_list_insert(list, list->ht.prev, &block->link);
239}
240
Alistair Veitch532519a2016-02-11 09:17:12 -0800241// Removes block *b. Requires *b be in the list.
Alistair Veitch85afe712016-02-02 17:58:15 -0800242static void cl_block_list_remove(cl_block_list* list, cl_block* b) {
243 list->count--;
244 b->link.next->prev = b->link.prev;
245 b->link.prev->next = b->link.next;
246}
247
Alistair Veitch532519a2016-02-11 09:17:12 -0800248// Functions that operate on cl_block's
Alistair Veitch85afe712016-02-02 17:58:15 -0800249
250static void cl_block_initialize(cl_block* block, char* buffer) {
251 block->buffer = buffer;
252 gpr_atm_rel_store(&block->writer_lock, 0);
253 gpr_atm_rel_store(&block->reader_lock, 0);
254 gpr_atm_rel_store(&block->bytes_committed, 0);
255 block->bytes_read = 0;
256 cl_block_list_struct_initialize(&block->link, block);
257}
258
Alistair Veitch532519a2016-02-11 09:17:12 -0800259// Guards against exposing partially written buffer to the reader.
Alistair Veitch85afe712016-02-02 17:58:15 -0800260static void cl_block_set_bytes_committed(cl_block* block,
261 size_t bytes_committed) {
262 gpr_atm_rel_store(&block->bytes_committed, (gpr_atm)bytes_committed);
263}
264
265static size_t cl_block_get_bytes_committed(cl_block* block) {
266 return (size_t)gpr_atm_acq_load(&block->bytes_committed);
267}
268
Alistair Veitch532519a2016-02-11 09:17:12 -0800269// Tries to disable future read/write access to this block. Succeeds if:
270// - no in-progress write AND
271// - no in-progress read AND
272// - 'discard_data' set to true OR no unread data
273// On success, clears the block state and returns with writer_lock_ and
274// reader_lock_ held. These locks are released by a subsequent
275// cl_block_access_enable() call.
Alistair Veitch85afe712016-02-02 17:58:15 -0800276static bool cl_block_try_disable_access(cl_block* block, int discard_data) {
277 if (!cl_try_lock(&block->writer_lock)) {
278 return false;
279 }
280 if (!cl_try_lock(&block->reader_lock)) {
281 cl_unlock(&block->writer_lock);
282 return false;
283 }
284 if (!discard_data &&
285 (block->bytes_read != cl_block_get_bytes_committed(block))) {
286 cl_unlock(&block->reader_lock);
287 cl_unlock(&block->writer_lock);
288 return false;
289 }
290 cl_block_set_bytes_committed(block, 0);
291 block->bytes_read = 0;
292 return true;
293}
294
295static void cl_block_enable_access(cl_block* block) {
296 cl_unlock(&block->reader_lock);
297 cl_unlock(&block->writer_lock);
298}
299
Alistair Veitch532519a2016-02-11 09:17:12 -0800300// Returns with writer_lock held.
Alistair Veitch85afe712016-02-02 17:58:15 -0800301static void* cl_block_start_write(cl_block* block, size_t size) {
Alistair Veitch85afe712016-02-02 17:58:15 -0800302 if (!cl_try_lock(&block->writer_lock)) {
303 return NULL;
304 }
Alistair Veitch532519a2016-02-11 09:17:12 -0800305 size_t bytes_committed = cl_block_get_bytes_committed(block);
Alistair Veitch85afe712016-02-02 17:58:15 -0800306 if (bytes_committed + size > CENSUS_LOG_MAX_RECORD_SIZE) {
307 cl_unlock(&block->writer_lock);
308 return NULL;
309 }
310 return block->buffer + bytes_committed;
311}
312
Alistair Veitch532519a2016-02-11 09:17:12 -0800313// Releases writer_lock and increments committed bytes by 'bytes_written'.
314// 'bytes_written' must be <= 'size' specified in the corresponding
315// StartWrite() call. This function is thread-safe.
Alistair Veitch85afe712016-02-02 17:58:15 -0800316static void cl_block_end_write(cl_block* block, size_t bytes_written) {
317 cl_block_set_bytes_committed(
318 block, cl_block_get_bytes_committed(block) + bytes_written);
319 cl_unlock(&block->writer_lock);
320}
321
Alistair Veitch532519a2016-02-11 09:17:12 -0800322// Returns a pointer to the first unread byte in buffer. The number of bytes
323// available are returned in 'bytes_available'. Acquires reader lock that is
324// released by a subsequent cl_block_end_read() call. Returns NULL if:
325// - read in progress
326// - no data available
Alistair Veitch85afe712016-02-02 17:58:15 -0800327static void* cl_block_start_read(cl_block* block, size_t* bytes_available) {
328 if (!cl_try_lock(&block->reader_lock)) {
329 return NULL;
330 }
Alistair Veitch532519a2016-02-11 09:17:12 -0800331 // bytes_committed may change from under us. Use bytes_available to update
332 // bytes_read below.
Alistair Veitch85afe712016-02-02 17:58:15 -0800333 size_t bytes_committed = cl_block_get_bytes_committed(block);
334 GPR_ASSERT(bytes_committed >= block->bytes_read);
335 *bytes_available = bytes_committed - block->bytes_read;
336 if (*bytes_available == 0) {
337 cl_unlock(&block->reader_lock);
338 return NULL;
339 }
340 void* record = block->buffer + block->bytes_read;
341 block->bytes_read += *bytes_available;
342 return record;
343}
344
345static void cl_block_end_read(cl_block* block) {
346 cl_unlock(&block->reader_lock);
347}
348
Alistair Veitch532519a2016-02-11 09:17:12 -0800349// Internal functions operating on g_log
Alistair Veitch85afe712016-02-02 17:58:15 -0800350
Alistair Veitch532519a2016-02-11 09:17:12 -0800351// Allocates a new free block (or recycles an available dirty block if log is
352// configured to discard old records). Returns NULL if out-of-space.
Alistair Veitch85afe712016-02-02 17:58:15 -0800353static cl_block* cl_allocate_block(void) {
354 cl_block* block = cl_block_list_head(&g_log.free_block_list);
355 if (block != NULL) {
356 cl_block_list_remove(&g_log.free_block_list, block);
357 return block;
358 }
359 if (!g_log.discard_old_records) {
Alistair Veitch532519a2016-02-11 09:17:12 -0800360 // No free block and log is configured to keep old records.
Alistair Veitch85afe712016-02-02 17:58:15 -0800361 return NULL;
362 }
Alistair Veitch532519a2016-02-11 09:17:12 -0800363 // Recycle dirty block. Start from the oldest.
Alistair Veitch85afe712016-02-02 17:58:15 -0800364 for (block = cl_block_list_head(&g_log.dirty_block_list); block != NULL;
365 block = block->link.next->block) {
366 if (cl_block_try_disable_access(block, 1 /* discard data */)) {
367 cl_block_list_remove(&g_log.dirty_block_list, block);
368 return block;
369 }
370 }
371 return NULL;
372}
373
Alistair Veitch532519a2016-02-11 09:17:12 -0800374// Allocates a new block and updates core id => block mapping. 'old_block'
375// points to the block that the caller thinks is attached to
376// 'core_id'. 'old_block' may be NULL. Returns true if:
377// - allocated a new block OR
378// - 'core_id' => 'old_block' mapping changed (another thread allocated a
379// block before lock was acquired).
Alistair Veitch85afe712016-02-02 17:58:15 -0800380static bool cl_allocate_core_local_block(uint32_t core_id,
381 cl_block* old_block) {
Alistair Veitch532519a2016-02-11 09:17:12 -0800382 // Now that we have the lock, check if core-local mapping has changed.
Alistair Veitch85afe712016-02-02 17:58:15 -0800383 cl_core_local_block* core_local_block = &g_log.core_local_blocks[core_id];
384 cl_block* block = cl_core_local_block_get_block(core_local_block);
385 if ((block != NULL) && (block != old_block)) {
386 return true;
387 }
388 if (block != NULL) {
389 cl_core_local_block_set_block(core_local_block, NULL);
390 cl_block_list_insert_at_tail(&g_log.dirty_block_list, block);
391 }
392 block = cl_allocate_block();
393 if (block == NULL) {
Alistair Veitch85afe712016-02-02 17:58:15 -0800394 return false;
395 }
396 cl_core_local_block_set_block(core_local_block, block);
397 cl_block_enable_access(block);
398 return true;
399}
400
401static cl_block* cl_get_block(void* record) {
402 uintptr_t p = (uintptr_t)((char*)record - g_log.buffer);
403 uintptr_t index = p >> CENSUS_LOG_2_MAX_RECORD_SIZE;
404 return &g_log.blocks[index];
405}
406
Alistair Veitch532519a2016-02-11 09:17:12 -0800407// Gets the next block to read and tries to free 'prev' block (if not NULL).
408// Returns NULL if reached the end.
Alistair Veitch85afe712016-02-02 17:58:15 -0800409static cl_block* cl_next_block_to_read(cl_block* prev) {
410 cl_block* block = NULL;
411 if (g_log.read_iterator_state == g_log.num_cores) {
Alistair Veitch532519a2016-02-11 09:17:12 -0800412 // We are traversing dirty list; find the next dirty block.
Alistair Veitch85afe712016-02-02 17:58:15 -0800413 if (prev != NULL) {
Alistair Veitch532519a2016-02-11 09:17:12 -0800414 // Try to free the previous block if there is no unread data. This
415 // block
416 // may have unread data if previously incomplete record completed
417 // between
418 // read_next() calls.
Alistair Veitch85afe712016-02-02 17:58:15 -0800419 block = prev->link.next->block;
420 if (cl_block_try_disable_access(prev, 0 /* do not discard data */)) {
421 cl_block_list_remove(&g_log.dirty_block_list, prev);
422 cl_block_list_insert_at_head(&g_log.free_block_list, prev);
Alistair Veitch85afe712016-02-02 17:58:15 -0800423 }
424 } else {
425 block = cl_block_list_head(&g_log.dirty_block_list);
426 }
427 if (block != NULL) {
428 return block;
429 }
Alistair Veitch532519a2016-02-11 09:17:12 -0800430 // We are done with the dirty list; moving on to core-local blocks.
Alistair Veitch85afe712016-02-02 17:58:15 -0800431 }
432 while (g_log.read_iterator_state > 0) {
433 g_log.read_iterator_state--;
434 block = cl_core_local_block_get_block(
435 &g_log.core_local_blocks[g_log.read_iterator_state]);
436 if (block != NULL) {
437 return block;
438 }
439 }
440 return NULL;
441}
442
Alistair Veitch532519a2016-02-11 09:17:12 -0800443#define CL_LOG_2_MB 20 // 2^20 = 1MB
Alistair Veitch85afe712016-02-02 17:58:15 -0800444
Alistair Veitch532519a2016-02-11 09:17:12 -0800445// External functions: primary stats_log interface
Alistair Veitch85afe712016-02-02 17:58:15 -0800446void census_log_initialize(size_t size_in_mb, int discard_old_records) {
Alistair Veitch532519a2016-02-11 09:17:12 -0800447 // Check cacheline alignment.
Alistair Veitch85afe712016-02-02 17:58:15 -0800448 GPR_ASSERT(sizeof(cl_block) % GPR_CACHELINE_SIZE == 0);
449 GPR_ASSERT(sizeof(cl_core_local_block) % GPR_CACHELINE_SIZE == 0);
450 GPR_ASSERT(!g_log.initialized);
451 g_log.discard_old_records = discard_old_records;
452 g_log.num_cores = gpr_cpu_num_cores();
Alistair Veitch532519a2016-02-11 09:17:12 -0800453 // Ensure that we will not get any overflow in calaculating num_blocks
Alistair Veitch85afe712016-02-02 17:58:15 -0800454 GPR_ASSERT(CL_LOG_2_MB >= CENSUS_LOG_2_MAX_RECORD_SIZE);
455 GPR_ASSERT(size_in_mb < 1000);
Alistair Veitch532519a2016-02-11 09:17:12 -0800456 // Ensure at least 2x as many blocks as there are cores.
Alistair Veitch85afe712016-02-02 17:58:15 -0800457 g_log.num_blocks =
Alistair Veitch532519a2016-02-11 09:17:12 -0800458 (uint32_t)GPR_MAX(2 * g_log.num_cores, (size_in_mb << CL_LOG_2_MB) >>
459 CENSUS_LOG_2_MAX_RECORD_SIZE);
Alistair Veitch85afe712016-02-02 17:58:15 -0800460 gpr_mu_init(&g_log.lock);
461 g_log.read_iterator_state = 0;
462 g_log.block_being_read = NULL;
Alistair Veitch85afe712016-02-02 17:58:15 -0800463 g_log.core_local_blocks = (cl_core_local_block*)gpr_malloc_aligned(
464 g_log.num_cores * sizeof(cl_core_local_block), GPR_CACHELINE_SIZE_LOG);
465 memset(g_log.core_local_blocks, 0,
466 g_log.num_cores * sizeof(cl_core_local_block));
467 g_log.blocks = (cl_block*)gpr_malloc_aligned(
468 g_log.num_blocks * sizeof(cl_block), GPR_CACHELINE_SIZE_LOG);
469 memset(g_log.blocks, 0, g_log.num_blocks * sizeof(cl_block));
Yash Tibrewaldf6227c2017-09-06 16:33:06 -0700470 g_log.buffer =
471 (char*)gpr_malloc(g_log.num_blocks * CENSUS_LOG_MAX_RECORD_SIZE);
Alistair Veitch85afe712016-02-02 17:58:15 -0800472 memset(g_log.buffer, 0, g_log.num_blocks * CENSUS_LOG_MAX_RECORD_SIZE);
473 cl_block_list_initialize(&g_log.free_block_list);
474 cl_block_list_initialize(&g_log.dirty_block_list);
475 for (uint32_t i = 0; i < g_log.num_blocks; ++i) {
476 cl_block* block = g_log.blocks + i;
477 cl_block_initialize(block, g_log.buffer + (CENSUS_LOG_MAX_RECORD_SIZE * i));
478 cl_block_try_disable_access(block, 1 /* discard data */);
479 cl_block_list_insert_at_tail(&g_log.free_block_list, block);
480 }
481 gpr_atm_rel_store(&g_log.out_of_space_count, 0);
482 g_log.initialized = 1;
483}
484
485void census_log_shutdown(void) {
486 GPR_ASSERT(g_log.initialized);
487 gpr_mu_destroy(&g_log.lock);
488 gpr_free_aligned(g_log.core_local_blocks);
489 g_log.core_local_blocks = NULL;
490 gpr_free_aligned(g_log.blocks);
491 g_log.blocks = NULL;
492 gpr_free(g_log.buffer);
493 g_log.buffer = NULL;
494 g_log.initialized = 0;
495}
496
497void* census_log_start_write(size_t size) {
Alistair Veitch532519a2016-02-11 09:17:12 -0800498 // Used to bound number of times block allocation is attempted.
Alistair Veitch85afe712016-02-02 17:58:15 -0800499 GPR_ASSERT(size > 0);
500 GPR_ASSERT(g_log.initialized);
501 if (size > CENSUS_LOG_MAX_RECORD_SIZE) {
502 return NULL;
503 }
504 uint32_t attempts_remaining = g_log.num_blocks;
505 uint32_t core_id = gpr_cpu_current_cpu();
506 do {
507 void* record = NULL;
508 cl_block* block =
509 cl_core_local_block_get_block(&g_log.core_local_blocks[core_id]);
510 if (block && (record = cl_block_start_write(block, size))) {
511 return record;
512 }
Alistair Veitch532519a2016-02-11 09:17:12 -0800513 // Need to allocate a new block. We are here if:
514 // - No block associated with the core OR
515 // - Write in-progress on the block OR
516 // - block is out of space
Alistair Veitch85afe712016-02-02 17:58:15 -0800517 gpr_mu_lock(&g_log.lock);
518 bool allocated = cl_allocate_core_local_block(core_id, block);
519 gpr_mu_unlock(&g_log.lock);
520 if (!allocated) {
521 gpr_atm_no_barrier_fetch_add(&g_log.out_of_space_count, 1);
522 return NULL;
523 }
524 } while (attempts_remaining--);
Alistair Veitch532519a2016-02-11 09:17:12 -0800525 // Give up.
Alistair Veitch85afe712016-02-02 17:58:15 -0800526 gpr_atm_no_barrier_fetch_add(&g_log.out_of_space_count, 1);
527 return NULL;
528}
529
530void census_log_end_write(void* record, size_t bytes_written) {
531 GPR_ASSERT(g_log.initialized);
532 cl_block_end_write(cl_get_block(record), bytes_written);
533}
534
535void census_log_init_reader(void) {
536 GPR_ASSERT(g_log.initialized);
537 gpr_mu_lock(&g_log.lock);
Alistair Veitch532519a2016-02-11 09:17:12 -0800538 // If a block is locked for reading unlock it.
Alistair Veitch85afe712016-02-02 17:58:15 -0800539 if (g_log.block_being_read != NULL) {
540 cl_block_end_read(g_log.block_being_read);
541 g_log.block_being_read = NULL;
542 }
543 g_log.read_iterator_state = g_log.num_cores;
544 gpr_mu_unlock(&g_log.lock);
545}
546
547const void* census_log_read_next(size_t* bytes_available) {
548 GPR_ASSERT(g_log.initialized);
549 gpr_mu_lock(&g_log.lock);
550 if (g_log.block_being_read != NULL) {
551 cl_block_end_read(g_log.block_being_read);
552 }
553 do {
554 g_log.block_being_read = cl_next_block_to_read(g_log.block_being_read);
555 if (g_log.block_being_read != NULL) {
556 void* record =
557 cl_block_start_read(g_log.block_being_read, bytes_available);
558 if (record != NULL) {
559 gpr_mu_unlock(&g_log.lock);
560 return record;
561 }
562 }
563 } while (g_log.block_being_read != NULL);
564 gpr_mu_unlock(&g_log.lock);
565 return NULL;
566}
567
568size_t census_log_remaining_space(void) {
569 GPR_ASSERT(g_log.initialized);
570 size_t space = 0;
571 gpr_mu_lock(&g_log.lock);
572 if (g_log.discard_old_records) {
Alistair Veitch532519a2016-02-11 09:17:12 -0800573 // Remaining space is not meaningful; just return the entire log space.
Alistair Veitch85afe712016-02-02 17:58:15 -0800574 space = g_log.num_blocks << CENSUS_LOG_2_MAX_RECORD_SIZE;
575 } else {
576 GPR_ASSERT(g_log.free_block_list.count >= 0);
577 space = (size_t)g_log.free_block_list.count * CENSUS_LOG_MAX_RECORD_SIZE;
578 }
579 gpr_mu_unlock(&g_log.lock);
580 return space;
581}
582
583int64_t census_log_out_of_space_count(void) {
584 GPR_ASSERT(g_log.initialized);
585 return gpr_atm_acq_load(&g_log.out_of_space_count);
586}