Dan Shi | cf2e8dd | 2015-05-07 17:18:48 -0700 | [diff] [blame] | 1 | # Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 | # Use of this source code is governed by a BSD-style license that can be |
| 3 | # found in the LICENSE file. |
| 4 | |
| 5 | """ |
| 6 | This module is designed to report metadata in a separated thread to avoid the |
| 7 | performance overhead of sending data to Elasticsearch using HTTP. |
| 8 | |
| 9 | """ |
| 10 | |
| 11 | import logging |
| 12 | import Queue |
| 13 | import time |
| 14 | import threading |
| 15 | |
| 16 | import common |
| 17 | from autotest_lib.client.common_lib.cros.graphite import autotest_es |
Dan Shi | 1e290c9 | 2015-05-11 12:54:48 -0700 | [diff] [blame] | 18 | from autotest_lib.client.common_lib.cros.graphite import autotest_stats |
Dan Shi | cf2e8dd | 2015-05-07 17:18:48 -0700 | [diff] [blame] | 19 | |
| 20 | |
| 21 | # Number of seconds to wait before checking queue again for uploading data. |
| 22 | _REPORT_INTERVAL_SECONDS = 5 |
| 23 | |
Dan Shi | 5de1017 | 2015-06-05 09:17:17 -0700 | [diff] [blame] | 24 | _MAX_METADATA_QUEUE_SIZE = 1000000 |
| 25 | _MAX_UPLOAD_SIZE = 50000 |
Dan Shi | cf2e8dd | 2015-05-07 17:18:48 -0700 | [diff] [blame] | 26 | # Queue to buffer metadata to be reported. |
| 27 | metadata_queue = Queue.Queue(_MAX_METADATA_QUEUE_SIZE) |
| 28 | |
| 29 | _report_lock = threading.Lock() |
| 30 | _abort = threading.Event() |
| 31 | _queue_full = threading.Event() |
| 32 | |
| 33 | def queue(data): |
| 34 | """Queue metadata to be uploaded in reporter thread. |
| 35 | |
| 36 | If the queue is full, an error will be logged for the first time the queue |
| 37 | becomes full. The call does not wait or raise Queue.Full exception, so |
| 38 | there is no overhead on the performance of caller, e.g., scheduler. |
| 39 | |
| 40 | @param data: A metadata entry, which should be a dictionary. |
| 41 | """ |
| 42 | try: |
| 43 | metadata_queue.put_nowait(data) |
| 44 | if _queue_full.is_set(): |
| 45 | logging.info('Metadata queue is available to receive new data ' |
| 46 | 'again.') |
| 47 | _queue_full.clear() |
| 48 | except Queue.Full: |
| 49 | if not _queue_full.is_set(): |
| 50 | _queue_full.set() |
| 51 | logging.error('Metadata queue is full, cannot report data. ' |
| 52 | 'Consider increasing the value of ' |
| 53 | '_MAX_METADATA_QUEUE_SIZE. Its current value is set ' |
| 54 | 'to %d.', _MAX_METADATA_QUEUE_SIZE) |
| 55 | |
| 56 | |
| 57 | def _run(): |
| 58 | """Report metadata in the queue until being aborted. |
| 59 | """ |
| 60 | try: |
| 61 | while True: |
| 62 | start_time = time.time() |
| 63 | data_list = [] |
| 64 | while (not metadata_queue.empty() and |
Dan Shi | 5de1017 | 2015-06-05 09:17:17 -0700 | [diff] [blame] | 65 | len(data_list) < _MAX_UPLOAD_SIZE): |
Dan Shi | cf2e8dd | 2015-05-07 17:18:48 -0700 | [diff] [blame] | 66 | data_list.append(metadata_queue.get_nowait()) |
| 67 | if data_list: |
Dan Shi | 0e7d0f5 | 2015-05-13 10:17:13 -0700 | [diff] [blame] | 68 | if autotest_es.bulk_post(data_list=data_list): |
| 69 | time_used = time.time() - start_time |
| 70 | logging.info('%d entries of metadata uploaded in %s ' |
| 71 | 'seconds.', len(data_list), time_used) |
| 72 | autotest_stats.Timer('metadata_reporter').send( |
| 73 | 'time_used', time_used) |
| 74 | autotest_stats.Gauge('metadata_reporter').send( |
| 75 | 'entries_uploaded', len(data_list)) |
| 76 | else: |
| 77 | logging.warn('Failed to upload %d entries of metadata, ' |
| 78 | 'they will be retried later.', len(data_list)) |
Dan Shi | 970c7b7 | 2015-07-07 15:08:11 -0700 | [diff] [blame^] | 79 | autotest_stats.Gauge('metadata_reporter').send( |
| 80 | 'entries_failed', len(data_list)) |
Dan Shi | 0e7d0f5 | 2015-05-13 10:17:13 -0700 | [diff] [blame] | 81 | for data in data_list: |
| 82 | queue(data) |
Dan Shi | cf2e8dd | 2015-05-07 17:18:48 -0700 | [diff] [blame] | 83 | sleep_time = _REPORT_INTERVAL_SECONDS - time.time() + start_time |
| 84 | if sleep_time < 0: |
| 85 | sleep_time = 0.5 |
| 86 | _abort.wait(timeout=sleep_time) |
| 87 | except Exception as e: |
| 88 | logging.error('Metadata reporter thread failed with error: %s', e) |
| 89 | raise |
| 90 | finally: |
| 91 | logging.info('Metadata reporting thread is exiting.') |
| 92 | _abort.clear() |
| 93 | _report_lock.release() |
| 94 | |
| 95 | |
| 96 | def start(): |
| 97 | """Start the thread to report metadata. |
| 98 | """ |
| 99 | # The lock makes sure there is only one reporting thread working. |
| 100 | if _report_lock.locked(): |
| 101 | logging.error('There is already a metadata reporter thread.') |
| 102 | return |
| 103 | |
| 104 | _report_lock.acquire() |
| 105 | reporting_thread = threading.Thread(target=_run) |
| 106 | # Make it a daemon thread so it doesn't need to be closed explicitly. |
| 107 | reporting_thread.setDaemon(True) |
| 108 | reporting_thread.start() |
| 109 | logging.info('Metadata reporting thread is started.') |
| 110 | |
| 111 | |
| 112 | def abort(): |
| 113 | """Abort the thread to report metadata. |
| 114 | |
| 115 | The call will wait up to 5 seconds for existing data to be uploaded. |
| 116 | """ |
| 117 | if not _report_lock.locked(): |
| 118 | logging.error('The metadata reporting thread has already exited.') |
| 119 | return |
| 120 | |
| 121 | _abort.set() |
| 122 | logging.info('Waiting up to %s seconds for metadata reporting thread to ' |
| 123 | 'complete.', _REPORT_INTERVAL_SECONDS) |
| 124 | _abort.wait(_REPORT_INTERVAL_SECONDS) |