blob: aaa69f920275567e75b1e094fdd864d35dfdb475 [file] [log] [blame]
Dan Shicf2e8dd2015-05-07 17:18:48 -07001# Copyright 2015 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""
6This module is designed to report metadata in a separated thread to avoid the
7performance overhead of sending data to Elasticsearch using HTTP.
8
9"""
10
11import logging
12import Queue
13import time
14import threading
15
16import common
17from autotest_lib.client.common_lib.cros.graphite import autotest_es
Dan Shi1e290c92015-05-11 12:54:48 -070018from autotest_lib.client.common_lib.cros.graphite import autotest_stats
Dan Shicf2e8dd2015-05-07 17:18:48 -070019
20
21# Number of seconds to wait before checking queue again for uploading data.
22_REPORT_INTERVAL_SECONDS = 5
23
Dan Shi5de10172015-06-05 09:17:17 -070024_MAX_METADATA_QUEUE_SIZE = 1000000
25_MAX_UPLOAD_SIZE = 50000
Dan Shicf2e8dd2015-05-07 17:18:48 -070026# Queue to buffer metadata to be reported.
27metadata_queue = Queue.Queue(_MAX_METADATA_QUEUE_SIZE)
28
29_report_lock = threading.Lock()
30_abort = threading.Event()
31_queue_full = threading.Event()
32
33def queue(data):
34 """Queue metadata to be uploaded in reporter thread.
35
36 If the queue is full, an error will be logged for the first time the queue
37 becomes full. The call does not wait or raise Queue.Full exception, so
38 there is no overhead on the performance of caller, e.g., scheduler.
39
40 @param data: A metadata entry, which should be a dictionary.
41 """
42 try:
43 metadata_queue.put_nowait(data)
44 if _queue_full.is_set():
45 logging.info('Metadata queue is available to receive new data '
46 'again.')
47 _queue_full.clear()
48 except Queue.Full:
49 if not _queue_full.is_set():
50 _queue_full.set()
51 logging.error('Metadata queue is full, cannot report data. '
52 'Consider increasing the value of '
53 '_MAX_METADATA_QUEUE_SIZE. Its current value is set '
54 'to %d.', _MAX_METADATA_QUEUE_SIZE)
55
56
57def _run():
58 """Report metadata in the queue until being aborted.
59 """
60 try:
61 while True:
62 start_time = time.time()
63 data_list = []
64 while (not metadata_queue.empty() and
Dan Shi5de10172015-06-05 09:17:17 -070065 len(data_list) < _MAX_UPLOAD_SIZE):
Dan Shicf2e8dd2015-05-07 17:18:48 -070066 data_list.append(metadata_queue.get_nowait())
67 if data_list:
Dan Shi0e7d0f52015-05-13 10:17:13 -070068 if autotest_es.bulk_post(data_list=data_list):
69 time_used = time.time() - start_time
70 logging.info('%d entries of metadata uploaded in %s '
71 'seconds.', len(data_list), time_used)
72 autotest_stats.Timer('metadata_reporter').send(
73 'time_used', time_used)
74 autotest_stats.Gauge('metadata_reporter').send(
75 'entries_uploaded', len(data_list))
76 else:
77 logging.warn('Failed to upload %d entries of metadata, '
78 'they will be retried later.', len(data_list))
Dan Shi970c7b72015-07-07 15:08:11 -070079 autotest_stats.Gauge('metadata_reporter').send(
80 'entries_failed', len(data_list))
Dan Shi0e7d0f52015-05-13 10:17:13 -070081 for data in data_list:
82 queue(data)
Dan Shicf2e8dd2015-05-07 17:18:48 -070083 sleep_time = _REPORT_INTERVAL_SECONDS - time.time() + start_time
84 if sleep_time < 0:
85 sleep_time = 0.5
86 _abort.wait(timeout=sleep_time)
87 except Exception as e:
88 logging.error('Metadata reporter thread failed with error: %s', e)
89 raise
90 finally:
91 logging.info('Metadata reporting thread is exiting.')
92 _abort.clear()
93 _report_lock.release()
94
95
96def start():
97 """Start the thread to report metadata.
98 """
99 # The lock makes sure there is only one reporting thread working.
100 if _report_lock.locked():
101 logging.error('There is already a metadata reporter thread.')
102 return
103
104 _report_lock.acquire()
105 reporting_thread = threading.Thread(target=_run)
106 # Make it a daemon thread so it doesn't need to be closed explicitly.
107 reporting_thread.setDaemon(True)
108 reporting_thread.start()
109 logging.info('Metadata reporting thread is started.')
110
111
112def abort():
113 """Abort the thread to report metadata.
114
115 The call will wait up to 5 seconds for existing data to be uploaded.
116 """
117 if not _report_lock.locked():
118 logging.error('The metadata reporting thread has already exited.')
119 return
120
121 _abort.set()
122 logging.info('Waiting up to %s seconds for metadata reporting thread to '
123 'complete.', _REPORT_INTERVAL_SECONDS)
124 _abort.wait(_REPORT_INTERVAL_SECONDS)