blob: 6604d6e20ddfda1ab091125f0359df1297232619 [file] [log] [blame]
Dan Shicf2e8dd2015-05-07 17:18:48 -07001# Copyright 2015 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""
6This module is designed to report metadata in a separated thread to avoid the
7performance overhead of sending data to Elasticsearch using HTTP.
8
9"""
10
11import logging
12import Queue
13import time
14import threading
15
16import common
17from autotest_lib.client.common_lib.cros.graphite import autotest_es
Dan Shi1e290c92015-05-11 12:54:48 -070018from autotest_lib.client.common_lib.cros.graphite import autotest_stats
Dan Shi6741cc12015-08-17 15:51:32 -070019from autotest_lib.scheduler import email_manager
20# The metadata_reporter thread runs inside scheduler process, thus it doesn't
21# need to setup django, otherwise, following import is needed:
22# from autotest_lib.frontend import setup_django_environment
23from autotest_lib.site_utils import server_manager_utils
Dan Shicf2e8dd2015-05-07 17:18:48 -070024
25
26# Number of seconds to wait before checking queue again for uploading data.
27_REPORT_INTERVAL_SECONDS = 5
28
Dan Shi5de10172015-06-05 09:17:17 -070029_MAX_METADATA_QUEUE_SIZE = 1000000
30_MAX_UPLOAD_SIZE = 50000
Dan Shi6741cc12015-08-17 15:51:32 -070031# The number of seconds for upload to fail continuously. After that, upload will
32# be limited to 1 entry.
33_MAX_UPLOAD_FAIL_DURATION = 1800
Dan Shicf2e8dd2015-05-07 17:18:48 -070034# Queue to buffer metadata to be reported.
35metadata_queue = Queue.Queue(_MAX_METADATA_QUEUE_SIZE)
36
37_report_lock = threading.Lock()
38_abort = threading.Event()
39_queue_full = threading.Event()
40
41def queue(data):
42 """Queue metadata to be uploaded in reporter thread.
43
44 If the queue is full, an error will be logged for the first time the queue
45 becomes full. The call does not wait or raise Queue.Full exception, so
46 there is no overhead on the performance of caller, e.g., scheduler.
47
48 @param data: A metadata entry, which should be a dictionary.
49 """
50 try:
51 metadata_queue.put_nowait(data)
52 if _queue_full.is_set():
53 logging.info('Metadata queue is available to receive new data '
54 'again.')
55 _queue_full.clear()
56 except Queue.Full:
57 if not _queue_full.is_set():
58 _queue_full.set()
59 logging.error('Metadata queue is full, cannot report data. '
60 'Consider increasing the value of '
61 '_MAX_METADATA_QUEUE_SIZE. Its current value is set '
62 'to %d.', _MAX_METADATA_QUEUE_SIZE)
63
64
Dan Shi6741cc12015-08-17 15:51:32 -070065def _email_alert():
66 """
67 """
68 if not server_manager_utils.use_server_db():
69 logging.debug('Server database not emailed, email alert is skipped.')
70 return
71 try:
72 server_manager_utils.confirm_server_has_role(hostname='localhost',
73 role='scheduler')
74 except server_manager_utils.ServerActionError:
75 # Only email alert if the server is a scheduler, not shard.
76 return
77 subject = ('Metadata upload has been failing for %d seconds' %
78 _MAX_UPLOAD_FAIL_DURATION)
79 email_manager.manager.enqueue_notify_email(subject, '')
80 email_manager.manager.send_queued_emails()
81
82
Dan Shicf2e8dd2015-05-07 17:18:48 -070083def _run():
84 """Report metadata in the queue until being aborted.
85 """
Dan Shi6741cc12015-08-17 15:51:32 -070086 # Time when the first time upload failed. None if the last upload succeeded.
87 first_failed_upload = None
88 # True if email alert was sent when upload has been failing continuously
89 # for _MAX_UPLOAD_FAIL_DURATION seconds.
90 email_alert = False
Dan Shicf2e8dd2015-05-07 17:18:48 -070091 try:
92 while True:
93 start_time = time.time()
94 data_list = []
Dan Shi6741cc12015-08-17 15:51:32 -070095 if (first_failed_upload and
96 time.time() - first_failed_upload > _MAX_UPLOAD_FAIL_DURATION):
97 upload_size = 1
98 if not email_alert:
99 _email_alert()
100 email_alert = True
101 else:
102 upload_size = _MAX_UPLOAD_SIZE
103 while (not metadata_queue.empty() and len(data_list) < upload_size):
Dan Shicf2e8dd2015-05-07 17:18:48 -0700104 data_list.append(metadata_queue.get_nowait())
105 if data_list:
Dan Shi0e7d0f52015-05-13 10:17:13 -0700106 if autotest_es.bulk_post(data_list=data_list):
107 time_used = time.time() - start_time
108 logging.info('%d entries of metadata uploaded in %s '
109 'seconds.', len(data_list), time_used)
110 autotest_stats.Timer('metadata_reporter').send(
111 'time_used', time_used)
112 autotest_stats.Gauge('metadata_reporter').send(
113 'entries_uploaded', len(data_list))
Dan Shi6741cc12015-08-17 15:51:32 -0700114 first_failed_upload = None
115 email_alert = False
Dan Shi0e7d0f52015-05-13 10:17:13 -0700116 else:
117 logging.warn('Failed to upload %d entries of metadata, '
118 'they will be retried later.', len(data_list))
Dan Shi970c7b72015-07-07 15:08:11 -0700119 autotest_stats.Gauge('metadata_reporter').send(
120 'entries_failed', len(data_list))
Dan Shi0e7d0f52015-05-13 10:17:13 -0700121 for data in data_list:
122 queue(data)
Dan Shi6741cc12015-08-17 15:51:32 -0700123 if not first_failed_upload:
124 first_failed_upload = time.time()
Dan Shicf2e8dd2015-05-07 17:18:48 -0700125 sleep_time = _REPORT_INTERVAL_SECONDS - time.time() + start_time
126 if sleep_time < 0:
127 sleep_time = 0.5
128 _abort.wait(timeout=sleep_time)
129 except Exception as e:
130 logging.error('Metadata reporter thread failed with error: %s', e)
131 raise
132 finally:
133 logging.info('Metadata reporting thread is exiting.')
134 _abort.clear()
135 _report_lock.release()
136
137
138def start():
139 """Start the thread to report metadata.
140 """
141 # The lock makes sure there is only one reporting thread working.
142 if _report_lock.locked():
143 logging.error('There is already a metadata reporter thread.')
144 return
145
146 _report_lock.acquire()
147 reporting_thread = threading.Thread(target=_run)
148 # Make it a daemon thread so it doesn't need to be closed explicitly.
149 reporting_thread.setDaemon(True)
150 reporting_thread.start()
151 logging.info('Metadata reporting thread is started.')
152
153
154def abort():
155 """Abort the thread to report metadata.
156
157 The call will wait up to 5 seconds for existing data to be uploaded.
158 """
159 if not _report_lock.locked():
160 logging.error('The metadata reporting thread has already exited.')
161 return
162
163 _abort.set()
164 logging.info('Waiting up to %s seconds for metadata reporting thread to '
165 'complete.', _REPORT_INTERVAL_SECONDS)
166 _abort.wait(_REPORT_INTERVAL_SECONDS)