[autotest] Add a new thread to upload metadata reported by scheduler
Currently host state change was reported to metadb before the change is
committed to database. Each change makes a ES post call to send data. To avoid
performance overhead for scheduler, UDP is used. UDP has a data lost issue.
Especially that the ES server now lives in GCE, while scheduler runs in a
different network.
This CL attempts to fix the issue by reporting metadata in a separate thread
in bulk. The performance of ES bulk API is much better than individual calls.
For example, a single index request through HTTP might take 80ms. For bulk API,
1000 records can be indexed in less than 0.5 second.
BUG=chromium:471015
TEST=run local scheduler, make sure all metadata was uploaded. Also, confirm
scheduler can be properly shut down.
Change-Id: I38991b9e647bb7a6fcaade8e8ef9eea27d9aa035
Reviewed-on: https://chromium-review.googlesource.com/270074
Reviewed-by: Dan Shi <dshi@chromium.org>
Commit-Queue: Dan Shi <dshi@chromium.org>
Trybot-Ready: Dan Shi <dshi@chromium.org>
Tested-by: Dan Shi <dshi@chromium.org>
Reviewed-by: Keith Haddow <haddowk@chromium.org>
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index 09d7dd1..b94d44c 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -33,8 +33,10 @@
from autotest_lib.scheduler import scheduler_lib
from autotest_lib.server import autoserv_utils
from autotest_lib.server import utils as server_utils
+from autotest_lib.site_utils import metadata_reporter
from autotest_lib.site_utils import server_manager_utils
+
BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
PID_FILE_PREFIX = 'monitor_db'
@@ -162,6 +164,9 @@
server = status_server.StatusServer()
server.start()
+ # Start the thread to report metadata.
+ metadata_reporter.start()
+
try:
initialize()
dispatcher = Dispatcher()
@@ -181,6 +186,7 @@
email_manager.manager.log_stacktrace(
"Uncaught exception; terminating monitor_db")
+ metadata_reporter.abort()
email_manager.manager.send_queued_emails()
server.shutdown()
_drone_manager.shutdown()