blob: 30258a3e898a471e2fb05307272d769511fa153c [file] [log] [blame]
Dan Shicf2e8dd2015-05-07 17:18:48 -07001# Copyright 2015 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""
6This module is designed to report metadata in a separated thread to avoid the
7performance overhead of sending data to Elasticsearch using HTTP.
8
9"""
10
11import logging
12import Queue
13import time
14import threading
15
16import common
17from autotest_lib.client.common_lib.cros.graphite import autotest_es
Dan Shi6741cc12015-08-17 15:51:32 -070018from autotest_lib.scheduler import email_manager
19# The metadata_reporter thread runs inside scheduler process, thus it doesn't
20# need to setup django, otherwise, following import is needed:
21# from autotest_lib.frontend import setup_django_environment
22from autotest_lib.site_utils import server_manager_utils
Dan Shicf2e8dd2015-05-07 17:18:48 -070023
24
25# Number of seconds to wait before checking queue again for uploading data.
26_REPORT_INTERVAL_SECONDS = 5
27
Dan Shi5de10172015-06-05 09:17:17 -070028_MAX_METADATA_QUEUE_SIZE = 1000000
29_MAX_UPLOAD_SIZE = 50000
Dan Shi6741cc12015-08-17 15:51:32 -070030# The number of seconds for upload to fail continuously. After that, upload will
31# be limited to 1 entry.
Dan Shia974c782015-08-19 11:18:05 -070032_MAX_UPLOAD_FAIL_DURATION = 600
33# Number of entries to retry when the previous upload failed continueously for
34# the duration of _MAX_UPLOAD_FAIL_DURATION.
35_MIN_RETRY_ENTRIES = 10
Dan Shicf2e8dd2015-05-07 17:18:48 -070036# Queue to buffer metadata to be reported.
37metadata_queue = Queue.Queue(_MAX_METADATA_QUEUE_SIZE)
38
39_report_lock = threading.Lock()
40_abort = threading.Event()
41_queue_full = threading.Event()
42
43def queue(data):
44 """Queue metadata to be uploaded in reporter thread.
45
46 If the queue is full, an error will be logged for the first time the queue
47 becomes full. The call does not wait or raise Queue.Full exception, so
48 there is no overhead on the performance of caller, e.g., scheduler.
49
50 @param data: A metadata entry, which should be a dictionary.
51 """
52 try:
53 metadata_queue.put_nowait(data)
54 if _queue_full.is_set():
55 logging.info('Metadata queue is available to receive new data '
56 'again.')
57 _queue_full.clear()
58 except Queue.Full:
59 if not _queue_full.is_set():
60 _queue_full.set()
61 logging.error('Metadata queue is full, cannot report data. '
62 'Consider increasing the value of '
63 '_MAX_METADATA_QUEUE_SIZE. Its current value is set '
64 'to %d.', _MAX_METADATA_QUEUE_SIZE)
65
66
Dan Shi6741cc12015-08-17 15:51:32 -070067def _email_alert():
68 """
69 """
70 if not server_manager_utils.use_server_db():
71 logging.debug('Server database not emailed, email alert is skipped.')
72 return
73 try:
74 server_manager_utils.confirm_server_has_role(hostname='localhost',
75 role='scheduler')
76 except server_manager_utils.ServerActionError:
77 # Only email alert if the server is a scheduler, not shard.
78 return
79 subject = ('Metadata upload has been failing for %d seconds' %
80 _MAX_UPLOAD_FAIL_DURATION)
81 email_manager.manager.enqueue_notify_email(subject, '')
82 email_manager.manager.send_queued_emails()
83
84
Dan Shicf2e8dd2015-05-07 17:18:48 -070085def _run():
86 """Report metadata in the queue until being aborted.
87 """
Dan Shi6741cc12015-08-17 15:51:32 -070088 # Time when the first time upload failed. None if the last upload succeeded.
89 first_failed_upload = None
90 # True if email alert was sent when upload has been failing continuously
91 # for _MAX_UPLOAD_FAIL_DURATION seconds.
92 email_alert = False
Dan Shia974c782015-08-19 11:18:05 -070093 upload_size = _MIN_RETRY_ENTRIES
Dan Shicf2e8dd2015-05-07 17:18:48 -070094 try:
95 while True:
96 start_time = time.time()
97 data_list = []
Dan Shi6741cc12015-08-17 15:51:32 -070098 if (first_failed_upload and
99 time.time() - first_failed_upload > _MAX_UPLOAD_FAIL_DURATION):
Dan Shia974c782015-08-19 11:18:05 -0700100 upload_size = _MIN_RETRY_ENTRIES
Dan Shi6741cc12015-08-17 15:51:32 -0700101 if not email_alert:
102 _email_alert()
103 email_alert = True
104 else:
Dan Shia974c782015-08-19 11:18:05 -0700105 upload_size = min(upload_size*2, _MAX_UPLOAD_SIZE)
Dan Shi6741cc12015-08-17 15:51:32 -0700106 while (not metadata_queue.empty() and len(data_list) < upload_size):
Dan Shicf2e8dd2015-05-07 17:18:48 -0700107 data_list.append(metadata_queue.get_nowait())
108 if data_list:
Dan Shi0e7d0f52015-05-13 10:17:13 -0700109 if autotest_es.bulk_post(data_list=data_list):
110 time_used = time.time() - start_time
111 logging.info('%d entries of metadata uploaded in %s '
112 'seconds.', len(data_list), time_used)
Dan Shi6741cc12015-08-17 15:51:32 -0700113 first_failed_upload = None
114 email_alert = False
Dan Shi0e7d0f52015-05-13 10:17:13 -0700115 else:
116 logging.warn('Failed to upload %d entries of metadata, '
117 'they will be retried later.', len(data_list))
118 for data in data_list:
119 queue(data)
Dan Shi6741cc12015-08-17 15:51:32 -0700120 if not first_failed_upload:
121 first_failed_upload = time.time()
Dan Shicf2e8dd2015-05-07 17:18:48 -0700122 sleep_time = _REPORT_INTERVAL_SECONDS - time.time() + start_time
123 if sleep_time < 0:
124 sleep_time = 0.5
125 _abort.wait(timeout=sleep_time)
126 except Exception as e:
127 logging.error('Metadata reporter thread failed with error: %s', e)
128 raise
129 finally:
130 logging.info('Metadata reporting thread is exiting.')
131 _abort.clear()
132 _report_lock.release()
133
134
135def start():
136 """Start the thread to report metadata.
137 """
138 # The lock makes sure there is only one reporting thread working.
139 if _report_lock.locked():
140 logging.error('There is already a metadata reporter thread.')
141 return
142
143 _report_lock.acquire()
144 reporting_thread = threading.Thread(target=_run)
145 # Make it a daemon thread so it doesn't need to be closed explicitly.
146 reporting_thread.setDaemon(True)
147 reporting_thread.start()
148 logging.info('Metadata reporting thread is started.')
149
150
151def abort():
152 """Abort the thread to report metadata.
153
154 The call will wait up to 5 seconds for existing data to be uploaded.
155 """
156 if not _report_lock.locked():
157 logging.error('The metadata reporting thread has already exited.')
158 return
159
160 _abort.set()
161 logging.info('Waiting up to %s seconds for metadata reporting thread to '
162 'complete.', _REPORT_INTERVAL_SECONDS)
163 _abort.wait(_REPORT_INTERVAL_SECONDS)