Dan Shi | cf2e8dd | 2015-05-07 17:18:48 -0700 | [diff] [blame] | 1 | # Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 | # Use of this source code is governed by a BSD-style license that can be |
| 3 | # found in the LICENSE file. |
| 4 | |
| 5 | """ |
| 6 | This module is designed to report metadata in a separated thread to avoid the |
| 7 | performance overhead of sending data to Elasticsearch using HTTP. |
| 8 | |
| 9 | """ |
| 10 | |
| 11 | import logging |
| 12 | import Queue |
| 13 | import time |
| 14 | import threading |
| 15 | |
| 16 | import common |
| 17 | from autotest_lib.client.common_lib.cros.graphite import autotest_es |
Dan Shi | 1e290c9 | 2015-05-11 12:54:48 -0700 | [diff] [blame] | 18 | from autotest_lib.client.common_lib.cros.graphite import autotest_stats |
Dan Shi | cf2e8dd | 2015-05-07 17:18:48 -0700 | [diff] [blame] | 19 | |
| 20 | |
| 21 | # Number of seconds to wait before checking queue again for uploading data. |
| 22 | _REPORT_INTERVAL_SECONDS = 5 |
| 23 | |
| 24 | _MAX_METADATA_QUEUE_SIZE = 100000 |
| 25 | # Queue to buffer metadata to be reported. |
| 26 | metadata_queue = Queue.Queue(_MAX_METADATA_QUEUE_SIZE) |
| 27 | |
| 28 | _report_lock = threading.Lock() |
| 29 | _abort = threading.Event() |
| 30 | _queue_full = threading.Event() |
| 31 | |
| 32 | def queue(data): |
| 33 | """Queue metadata to be uploaded in reporter thread. |
| 34 | |
| 35 | If the queue is full, an error will be logged for the first time the queue |
| 36 | becomes full. The call does not wait or raise Queue.Full exception, so |
| 37 | there is no overhead on the performance of caller, e.g., scheduler. |
| 38 | |
| 39 | @param data: A metadata entry, which should be a dictionary. |
| 40 | """ |
| 41 | try: |
| 42 | metadata_queue.put_nowait(data) |
| 43 | if _queue_full.is_set(): |
| 44 | logging.info('Metadata queue is available to receive new data ' |
| 45 | 'again.') |
| 46 | _queue_full.clear() |
| 47 | except Queue.Full: |
| 48 | if not _queue_full.is_set(): |
| 49 | _queue_full.set() |
| 50 | logging.error('Metadata queue is full, cannot report data. ' |
| 51 | 'Consider increasing the value of ' |
| 52 | '_MAX_METADATA_QUEUE_SIZE. Its current value is set ' |
| 53 | 'to %d.', _MAX_METADATA_QUEUE_SIZE) |
| 54 | |
| 55 | |
| 56 | def _run(): |
| 57 | """Report metadata in the queue until being aborted. |
| 58 | """ |
| 59 | try: |
| 60 | while True: |
| 61 | start_time = time.time() |
| 62 | data_list = [] |
| 63 | while (not metadata_queue.empty() and |
| 64 | len(data_list) < _MAX_METADATA_QUEUE_SIZE): |
| 65 | data_list.append(metadata_queue.get_nowait()) |
| 66 | if data_list: |
Dan Shi | 0e7d0f5 | 2015-05-13 10:17:13 -0700 | [diff] [blame^] | 67 | if autotest_es.bulk_post(data_list=data_list): |
| 68 | time_used = time.time() - start_time |
| 69 | logging.info('%d entries of metadata uploaded in %s ' |
| 70 | 'seconds.', len(data_list), time_used) |
| 71 | autotest_stats.Timer('metadata_reporter').send( |
| 72 | 'time_used', time_used) |
| 73 | autotest_stats.Gauge('metadata_reporter').send( |
| 74 | 'entries_uploaded', len(data_list)) |
| 75 | else: |
| 76 | logging.warn('Failed to upload %d entries of metadata, ' |
| 77 | 'they will be retried later.', len(data_list)) |
| 78 | for data in data_list: |
| 79 | queue(data) |
Dan Shi | cf2e8dd | 2015-05-07 17:18:48 -0700 | [diff] [blame] | 80 | sleep_time = _REPORT_INTERVAL_SECONDS - time.time() + start_time |
| 81 | if sleep_time < 0: |
| 82 | sleep_time = 0.5 |
| 83 | _abort.wait(timeout=sleep_time) |
| 84 | except Exception as e: |
| 85 | logging.error('Metadata reporter thread failed with error: %s', e) |
| 86 | raise |
| 87 | finally: |
| 88 | logging.info('Metadata reporting thread is exiting.') |
| 89 | _abort.clear() |
| 90 | _report_lock.release() |
| 91 | |
| 92 | |
| 93 | def start(): |
| 94 | """Start the thread to report metadata. |
| 95 | """ |
| 96 | # The lock makes sure there is only one reporting thread working. |
| 97 | if _report_lock.locked(): |
| 98 | logging.error('There is already a metadata reporter thread.') |
| 99 | return |
| 100 | |
| 101 | _report_lock.acquire() |
| 102 | reporting_thread = threading.Thread(target=_run) |
| 103 | # Make it a daemon thread so it doesn't need to be closed explicitly. |
| 104 | reporting_thread.setDaemon(True) |
| 105 | reporting_thread.start() |
| 106 | logging.info('Metadata reporting thread is started.') |
| 107 | |
| 108 | |
| 109 | def abort(): |
| 110 | """Abort the thread to report metadata. |
| 111 | |
| 112 | The call will wait up to 5 seconds for existing data to be uploaded. |
| 113 | """ |
| 114 | if not _report_lock.locked(): |
| 115 | logging.error('The metadata reporting thread has already exited.') |
| 116 | return |
| 117 | |
| 118 | _abort.set() |
| 119 | logging.info('Waiting up to %s seconds for metadata reporting thread to ' |
| 120 | 'complete.', _REPORT_INTERVAL_SECONDS) |
| 121 | _abort.wait(_REPORT_INTERVAL_SECONDS) |