Dan Shi | cf2e8dd | 2015-05-07 17:18:48 -0700 | [diff] [blame] | 1 | # Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 | # Use of this source code is governed by a BSD-style license that can be |
| 3 | # found in the LICENSE file. |
| 4 | |
| 5 | """ |
| 6 | This module is designed to report metadata in a separated thread to avoid the |
| 7 | performance overhead of sending data to Elasticsearch using HTTP. |
| 8 | |
| 9 | """ |
| 10 | |
| 11 | import logging |
| 12 | import Queue |
| 13 | import time |
| 14 | import threading |
| 15 | |
| 16 | import common |
| 17 | from autotest_lib.client.common_lib.cros.graphite import autotest_es |
Dan Shi | 6741cc1 | 2015-08-17 15:51:32 -0700 | [diff] [blame] | 18 | from autotest_lib.scheduler import email_manager |
| 19 | # The metadata_reporter thread runs inside scheduler process, thus it doesn't |
| 20 | # need to setup django, otherwise, following import is needed: |
| 21 | # from autotest_lib.frontend import setup_django_environment |
| 22 | from autotest_lib.site_utils import server_manager_utils |
Dan Shi | cf2e8dd | 2015-05-07 17:18:48 -0700 | [diff] [blame] | 23 | |
| 24 | |
| 25 | # Number of seconds to wait before checking queue again for uploading data. |
| 26 | _REPORT_INTERVAL_SECONDS = 5 |
| 27 | |
Dan Shi | 5de1017 | 2015-06-05 09:17:17 -0700 | [diff] [blame] | 28 | _MAX_METADATA_QUEUE_SIZE = 1000000 |
| 29 | _MAX_UPLOAD_SIZE = 50000 |
Dan Shi | 6741cc1 | 2015-08-17 15:51:32 -0700 | [diff] [blame] | 30 | # The number of seconds for upload to fail continuously. After that, upload will |
| 31 | # be limited to 1 entry. |
Dan Shi | a974c78 | 2015-08-19 11:18:05 -0700 | [diff] [blame] | 32 | _MAX_UPLOAD_FAIL_DURATION = 600 |
| 33 | # Number of entries to retry when the previous upload failed continueously for |
| 34 | # the duration of _MAX_UPLOAD_FAIL_DURATION. |
| 35 | _MIN_RETRY_ENTRIES = 10 |
Dan Shi | cf2e8dd | 2015-05-07 17:18:48 -0700 | [diff] [blame] | 36 | # Queue to buffer metadata to be reported. |
| 37 | metadata_queue = Queue.Queue(_MAX_METADATA_QUEUE_SIZE) |
| 38 | |
| 39 | _report_lock = threading.Lock() |
| 40 | _abort = threading.Event() |
| 41 | _queue_full = threading.Event() |
| 42 | |
| 43 | def queue(data): |
| 44 | """Queue metadata to be uploaded in reporter thread. |
| 45 | |
| 46 | If the queue is full, an error will be logged for the first time the queue |
| 47 | becomes full. The call does not wait or raise Queue.Full exception, so |
| 48 | there is no overhead on the performance of caller, e.g., scheduler. |
| 49 | |
| 50 | @param data: A metadata entry, which should be a dictionary. |
| 51 | """ |
| 52 | try: |
| 53 | metadata_queue.put_nowait(data) |
| 54 | if _queue_full.is_set(): |
| 55 | logging.info('Metadata queue is available to receive new data ' |
| 56 | 'again.') |
| 57 | _queue_full.clear() |
| 58 | except Queue.Full: |
| 59 | if not _queue_full.is_set(): |
| 60 | _queue_full.set() |
| 61 | logging.error('Metadata queue is full, cannot report data. ' |
| 62 | 'Consider increasing the value of ' |
| 63 | '_MAX_METADATA_QUEUE_SIZE. Its current value is set ' |
| 64 | 'to %d.', _MAX_METADATA_QUEUE_SIZE) |
| 65 | |
| 66 | |
Dan Shi | 6741cc1 | 2015-08-17 15:51:32 -0700 | [diff] [blame] | 67 | def _email_alert(): |
| 68 | """ |
| 69 | """ |
| 70 | if not server_manager_utils.use_server_db(): |
| 71 | logging.debug('Server database not emailed, email alert is skipped.') |
| 72 | return |
| 73 | try: |
| 74 | server_manager_utils.confirm_server_has_role(hostname='localhost', |
| 75 | role='scheduler') |
| 76 | except server_manager_utils.ServerActionError: |
| 77 | # Only email alert if the server is a scheduler, not shard. |
| 78 | return |
| 79 | subject = ('Metadata upload has been failing for %d seconds' % |
| 80 | _MAX_UPLOAD_FAIL_DURATION) |
| 81 | email_manager.manager.enqueue_notify_email(subject, '') |
| 82 | email_manager.manager.send_queued_emails() |
| 83 | |
| 84 | |
Dan Shi | cf2e8dd | 2015-05-07 17:18:48 -0700 | [diff] [blame] | 85 | def _run(): |
| 86 | """Report metadata in the queue until being aborted. |
| 87 | """ |
Dan Shi | 6741cc1 | 2015-08-17 15:51:32 -0700 | [diff] [blame] | 88 | # Time when the first time upload failed. None if the last upload succeeded. |
| 89 | first_failed_upload = None |
| 90 | # True if email alert was sent when upload has been failing continuously |
| 91 | # for _MAX_UPLOAD_FAIL_DURATION seconds. |
| 92 | email_alert = False |
Dan Shi | a974c78 | 2015-08-19 11:18:05 -0700 | [diff] [blame] | 93 | upload_size = _MIN_RETRY_ENTRIES |
Dan Shi | cf2e8dd | 2015-05-07 17:18:48 -0700 | [diff] [blame] | 94 | try: |
| 95 | while True: |
| 96 | start_time = time.time() |
| 97 | data_list = [] |
Dan Shi | 6741cc1 | 2015-08-17 15:51:32 -0700 | [diff] [blame] | 98 | if (first_failed_upload and |
| 99 | time.time() - first_failed_upload > _MAX_UPLOAD_FAIL_DURATION): |
Dan Shi | a974c78 | 2015-08-19 11:18:05 -0700 | [diff] [blame] | 100 | upload_size = _MIN_RETRY_ENTRIES |
Dan Shi | 6741cc1 | 2015-08-17 15:51:32 -0700 | [diff] [blame] | 101 | if not email_alert: |
| 102 | _email_alert() |
| 103 | email_alert = True |
| 104 | else: |
Dan Shi | a974c78 | 2015-08-19 11:18:05 -0700 | [diff] [blame] | 105 | upload_size = min(upload_size*2, _MAX_UPLOAD_SIZE) |
Dan Shi | 6741cc1 | 2015-08-17 15:51:32 -0700 | [diff] [blame] | 106 | while (not metadata_queue.empty() and len(data_list) < upload_size): |
Dan Shi | cf2e8dd | 2015-05-07 17:18:48 -0700 | [diff] [blame] | 107 | data_list.append(metadata_queue.get_nowait()) |
| 108 | if data_list: |
Dan Shi | 0e7d0f5 | 2015-05-13 10:17:13 -0700 | [diff] [blame] | 109 | if autotest_es.bulk_post(data_list=data_list): |
| 110 | time_used = time.time() - start_time |
| 111 | logging.info('%d entries of metadata uploaded in %s ' |
| 112 | 'seconds.', len(data_list), time_used) |
Dan Shi | 6741cc1 | 2015-08-17 15:51:32 -0700 | [diff] [blame] | 113 | first_failed_upload = None |
| 114 | email_alert = False |
Dan Shi | 0e7d0f5 | 2015-05-13 10:17:13 -0700 | [diff] [blame] | 115 | else: |
| 116 | logging.warn('Failed to upload %d entries of metadata, ' |
| 117 | 'they will be retried later.', len(data_list)) |
| 118 | for data in data_list: |
| 119 | queue(data) |
Dan Shi | 6741cc1 | 2015-08-17 15:51:32 -0700 | [diff] [blame] | 120 | if not first_failed_upload: |
| 121 | first_failed_upload = time.time() |
Dan Shi | cf2e8dd | 2015-05-07 17:18:48 -0700 | [diff] [blame] | 122 | sleep_time = _REPORT_INTERVAL_SECONDS - time.time() + start_time |
| 123 | if sleep_time < 0: |
| 124 | sleep_time = 0.5 |
| 125 | _abort.wait(timeout=sleep_time) |
| 126 | except Exception as e: |
| 127 | logging.error('Metadata reporter thread failed with error: %s', e) |
| 128 | raise |
| 129 | finally: |
| 130 | logging.info('Metadata reporting thread is exiting.') |
| 131 | _abort.clear() |
| 132 | _report_lock.release() |
| 133 | |
| 134 | |
| 135 | def start(): |
| 136 | """Start the thread to report metadata. |
| 137 | """ |
| 138 | # The lock makes sure there is only one reporting thread working. |
| 139 | if _report_lock.locked(): |
| 140 | logging.error('There is already a metadata reporter thread.') |
| 141 | return |
| 142 | |
| 143 | _report_lock.acquire() |
| 144 | reporting_thread = threading.Thread(target=_run) |
| 145 | # Make it a daemon thread so it doesn't need to be closed explicitly. |
| 146 | reporting_thread.setDaemon(True) |
| 147 | reporting_thread.start() |
| 148 | logging.info('Metadata reporting thread is started.') |
| 149 | |
| 150 | |
| 151 | def abort(): |
| 152 | """Abort the thread to report metadata. |
| 153 | |
| 154 | The call will wait up to 5 seconds for existing data to be uploaded. |
| 155 | """ |
| 156 | if not _report_lock.locked(): |
| 157 | logging.error('The metadata reporting thread has already exited.') |
| 158 | return |
| 159 | |
| 160 | _abort.set() |
| 161 | logging.info('Waiting up to %s seconds for metadata reporting thread to ' |
| 162 | 'complete.', _REPORT_INTERVAL_SECONDS) |
| 163 | _abort.wait(_REPORT_INTERVAL_SECONDS) |