Publish a cloud notification when a job directory is uploaded.
BUG=chromium:623239
TEST=Unittest and manual test.
Change-Id: I2dc645d3dd632b8f11618bdf5ef66ffde8254308
Reviewed-on: https://chromium-review.googlesource.com/358552
Commit-Ready: Michael Tang <ntang@chromium.org>
Tested-by: Michael Tang <ntang@chromium.org>
Reviewed-by: Simran Basi <sbasi@chromium.org>
Reviewed-by: Michael Tang <ntang@chromium.org>
diff --git a/global_config.ini b/global_config.ini
index e779055..fe36c12 100644
--- a/global_config.ini
+++ b/global_config.ini
@@ -280,6 +280,10 @@
infrastructure_user: chromeos-test
gs_offloader_use_rsync: False
gs_offloader_multiprocessing: False
+# Cloud pubsub
+cloud_notification_enabled: False
+# The cloud pubsub topic where notifications are sent to.
+cloud_notification_topic:
# Naming convention of Android build.
android_build_name_pattern: %\(branch\)s/%\(target\)s/%\(build_id\)s
diff --git a/site_utils/gs_offloader.py b/site_utils/gs_offloader.py
index a97c1bb..78d2356 100755
--- a/site_utils/gs_offloader.py
+++ b/site_utils/gs_offloader.py
@@ -10,6 +10,7 @@
Upon successful copy, the local results directory is deleted.
"""
+import base64
import datetime
import errno
import glob
@@ -41,6 +42,7 @@
psutil = None
import job_directories
+import pubsub_utils
from autotest_lib.client.common_lib import global_config
from autotest_lib.client.common_lib.cros.graphite import autotest_stats
from autotest_lib.scheduler import email_manager
@@ -116,7 +118,7 @@
'CROS', 'gs_offloader_multiprocessing', type=bool, default=False)
D = '[0-9][0-9]'
-TIMESTAMP_PATTERN = '%s%s.%s.%s_%s.%s.%s' % (D, D, D, D, D, D, D)
+TIMESTAMP_PATTERN = '%s%s.%s.%s_%s.%s.%s' % (D, D, D, D, D, D, D)
CTS_RESULT_PATTERN = 'testResult.xml'
# Google Storage bucket URI to store results in.
DEFAULT_CTS_RESULTS_GSURI = global_config.global_config.get_config_value(
@@ -124,6 +126,15 @@
DEFAULT_CTS_APFE_GSURI = global_config.global_config.get_config_value(
'CROS', 'cts_apfe_server', default='')
+_PUBSUB_ENABLED = global_config.global_config.get_config_value(
+ 'CROS', 'cloud_notification_enabled:', type=bool, default=False)
+_PUBSUB_TOPIC = global_config.global_config.get_config_value(
+ 'CROS', 'cloud_notification_topic::', type='string', default=None)
+
+# the message data for new test result notification.
+NEW_TEST_RESULT_MESSAGE = 'NEW_TEST_RESULT'
+
+
class TimeoutException(Exception):
"""Exception raised by the timeout_handler."""
pass
@@ -366,11 +377,29 @@
dir_entry, e)
-def get_offload_dir_func(gs_uri, multiprocessing):
+def _create_test_result_notification(gs_path):
+ """Construct a test result notification.
+
+ @param gs_path: The test result Google Cloud Storage URI.
+
+ @returns The notification message.
+ """
+ data = base64.b64encode(NEW_TEST_RESULT_MESSAGE)
+ msg_payload = {'data': data}
+ msg_attributes = {}
+ msg_attributes['gcs_uri'] = gs_path
+ msg_payload['attributes'] = msg_attributes
+
+ return msg_payload
+
+
+def get_offload_dir_func(gs_uri, multiprocessing, pubsub_topic=None):
"""Returns the offload directory function for the given gs_uri
@param gs_uri: Google storage bucket uri to offload to.
@param multiprocessing: True to turn on -m option for gsutil.
+ @param pubsub_topic: The pubsub topic to publish notificaitons. If None,
+ pubsub is not enabled.
@return offload_dir function to perform the offload.
"""
@@ -420,7 +449,16 @@
autotest_stats.Gauge(STATS_KEY, metadata=metadata).send(
'kibibytes_transferred', dir_size)
counter.increment('jobs_offloaded')
- shutil.rmtree(dir_entry)
+
+ if pubsub_topic:
+ message = _create_test_result_notification(gs_path)
+ msg_ids = pubsub_utils.publish_notifications(
+ pubsub_topic, [message])
+ if not msg_ids:
+ error = True
+
+ if not error:
+ shutil.rmtree(dir_entry)
else:
error = True
except TimeoutException:
@@ -549,6 +587,7 @@
"""
def __init__(self, options):
+ self._pubsub_topic = None
if options.delete_only:
self._offload_func = delete_files
else:
@@ -561,8 +600,12 @@
multiprocessing = GS_OFFLOADER_MULTIPROCESSING
logging.info(
'Offloader multiprocessing is set to:%r', multiprocessing)
+ if options.pubsub_topic_for_job_upload:
+ self._pubsub_topic = options.pubsub_topic_for_job_upload
+ elif _PUBSUB_ENABLED:
+ self._pubsub_topic = _PUBSUB_TOPIC
self._offload_func = get_offload_dir_func(
- self.gs_uri, multiprocessing)
+ self.gs_uri, multiprocessing, self._pubsub_topic)
classlist = []
if options.process_hosts_only or options.process_all:
classlist.append(job_directories.SpecialJobDirectory)
@@ -574,6 +617,7 @@
self._age_limit = options.days_old
self._open_jobs = {}
self._next_report_time = time.time()
+ self._pusub_topic = None
def _add_new_jobs(self):
@@ -694,6 +738,11 @@
parser.add_option('-d', '--days_old', dest='days_old',
help='Minimum job age in days before a result can be '
'offloaded.', type='int', default=0)
+ parser.add_option('-t', '--pubsub_topic_for_job_upload',
+ dest='pubsub_topic_for_job_upload',
+ help='The pubsub topic to send notifciations for '
+ 'new job upload',
+ action='store', type='string', default=None)
parser.add_option('-l', '--log_size', dest='log_size',
help='Limit the offloader logs to a specified '
'number of Mega Bytes.', type='int', default=0)
diff --git a/site_utils/gs_offloader_unittest.py b/site_utils/gs_offloader_unittest.py
index 77d1ff2..54214c5 100644
--- a/site_utils/gs_offloader_unittest.py
+++ b/site_utils/gs_offloader_unittest.py
@@ -1,8 +1,9 @@
-# Copyright 2013 The Chromium OS Authors. All rights reserved.
+# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import Queue
+import base64
import datetime
import logging
import os
@@ -64,7 +65,8 @@
gs_offloader.GS_OFFLOADER_MULTIPROCESSING = False
- def _mock_get_offload_func(self, is_moblab, multiprocessing=False):
+ def _mock_get_offload_func(self, is_moblab, multiprocessing=False,
+ pubsub_topic=None):
"""Mock the process of getting the offload_dir function."""
if is_moblab:
expected_gsuri = '%sresults/%s/%s/' % (
@@ -77,8 +79,8 @@
offload_func = gs_offloader.get_offload_dir_func(
expected_gsuri, multiprocessing)
self.mox.StubOutWithMock(gs_offloader, 'get_offload_dir_func')
- gs_offloader.get_offload_dir_func(expected_gsuri, multiprocessing).AndReturn(
- offload_func)
+ gs_offloader.get_offload_dir_func(expected_gsuri, multiprocessing,
+ pubsub_topic).AndReturn(offload_func)
self.mox.ReplayAll()
return offload_func
@@ -142,6 +144,7 @@
self.assertEqual(offloader._offload_func,
gs_offloader.delete_files)
self.assertEqual(offloader._age_limit, 0)
+ self.assertIsNone(offloader._pubsub_topic)
def test_days_old_option(self):
@@ -203,6 +206,22 @@
offload_func)
self.mox.VerifyAll()
+ def test_offloader_pubsub_topic_not_set(self):
+ """Test multiprocessing is set."""
+ offload_func = self._mock_get_offload_func(True, False)
+ offloader = gs_offloader.Offloader(_get_options([]))
+ self.assertEqual(offloader._offload_func,
+ offload_func)
+ self.mox.VerifyAll()
+
+ def test_offloader_pubsub_topic_set(self):
+ """Test multiprocessing is set."""
+ offload_func = self._mock_get_offload_func(True, False, 'test-topic')
+ offloader = gs_offloader.Offloader(_get_options(['-t', 'test-topic']))
+ self.assertEqual(offloader._offload_func,
+ offload_func)
+ self.mox.VerifyAll()
+
def _make_timestamp(age_limit, is_expired):
"""Create a timestamp for use by `job_directories._is_job_expired()`.
@@ -468,6 +487,17 @@
gs_offloader.ERROR_EMAIL_REPORT_FORMAT)
+class PubSubTest(mox.MoxTestBase):
+ """Test the test result notifcation data structure."""
+
+ def test_create_test_result_notification(self):
+ """Tests the test result notification message."""
+ msg = gs_offloader._create_test_result_notification('gs://test_bucket')
+ self.assertEquals(base64.b64encode(
+ gs_offloader.NEW_TEST_RESULT_MESSAGE), msg['data'])
+ self.assertEquals('gs://test_bucket', msg['attributes']['gcs_uri'])
+
+
class _MockJob(object):
"""Class to mock the return value of `AFE.get_jobs()`."""
def __init__(self, created):
@@ -943,6 +973,7 @@
shutil.rmtree(results_folder)
+
class JobDirectoryOffloadTests(_TempResultsDirTestBase):
"""Tests for `_JobDirectory.enqueue_offload()`.
diff --git a/site_utils/pubsub_utils.py b/site_utils/pubsub_utils.py
new file mode 100755
index 0000000..d3817ee
--- /dev/null
+++ b/site_utils/pubsub_utils.py
@@ -0,0 +1,73 @@
+# Copyright 2016 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Script to archive old Autotest results to Google Storage.
+
+Uses gsutil to archive files to the configured Google Storage bucket.
+Upon successful copy, the local results directory is deleted.
+"""
+
+import logging
+import os
+
+from apiclient import discovery
+from oauth2client.client import ApplicationDefaultCredentialsError
+from oauth2client.client import GoogleCredentials
+
+import common
+
+# Cloud service
+# TODO(ntang): move this to config.
+CLOUD_SERVICE_ACCOUNT_FILE = ('/home/moblab/.config/moblab/'
+ 'moblab-cloud-credential.json')
+PUBSUB_SERVICE_NAME = 'pubsub'
+PUBSUB_VERSION = 'v1beta2'
+PUBSUB_SCOPES = ['https://www.googleapis.com/auth/pubsub']
+# number of retry to publish an event.
+_PUBSUB_NUM_RETRIES = 3
+
+
+def _get_pubsub_service():
+ """Gets the pubsub service api handle."""
+ if not os.path.isfile(CLOUD_SERVICE_ACCOUNT_FILE):
+ logging.error('No credential file found')
+ return None
+
+ try:
+ credentials = GoogleCredentials.from_stream(CLOUD_SERVICE_ACCOUNT_FILE)
+ if credentials.create_scoped_required():
+ credentials = credentials.create_scoped(PUBSUB_SCOPES)
+ return discovery.build(PUBSUB_SERVICE_NAME, PUBSUB_VERSION,
+ credentials=credentials)
+ except ApplicationDefaultCredentialsError as ex:
+ logging.error('Failed to get credential.')
+ except:
+ logging.error('Failed to get the pubsub service handle.')
+
+ return None
+
+
+def publish_notifications(topic, messages=[]):
+ """Publishes a test result notification to a given pubsub topic.
+
+ @param topic: The Cloud pubsub topic.
+ @param messages: A list of notification messages.
+
+ @returns A list of pubsub message ids, and empty if fails.
+ """
+ pubsub = _get_pubsub_service()
+ if pubsub:
+ try:
+ body = {'messages': messages}
+ resp = pubsub.projects().topics().publish(topic=topic,
+ body=body).execute(num_retries=_PUBSUB_NUM_RETRIES)
+ if resp:
+ msgIds = resp.get('messageIds')
+ if msgIds:
+ logging.debug('Published notification message')
+ return msgIds
+ except:
+ pass
+ logging.error('Failed to publish test result notifiation.')
+ return []
diff --git a/site_utils/pubsub_utils_unittest.py b/site_utils/pubsub_utils_unittest.py
new file mode 100644
index 0000000..8a7c60a
--- /dev/null
+++ b/site_utils/pubsub_utils_unittest.py
@@ -0,0 +1,156 @@
+#!/usr/bin/env python
+#
+# Copyright 2016 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import os
+import unittest
+
+import mox
+
+from apiclient import discovery
+from oauth2client.client import ApplicationDefaultCredentialsError
+from oauth2client.client import GoogleCredentials
+from googleapiclient.errors import UnknownApiNameOrVersion
+
+import common
+import pubsub_utils
+
+
+class MockedPubSub(object):
+ """A mocked PubSub handle."""
+ def __init__(self, test, topic, msg, retry, ret_val=None,
+ raise_except=False):
+ self.test = test
+ self.topic = topic
+ self.msg = msg
+ self.retry = retry
+ self.ret_val = ret_val
+ self.raise_except = raise_except
+
+ def projects(self):
+ """Mocked PubSub projects."""
+ return self
+
+ def topics(self):
+ """Mocked PubSub topics."""
+ return self
+
+ def publish(self, topic, body):
+ """Mocked PubSub publish method.
+
+ @param topic: PubSub topic string.
+ @param body: PubSub notification body.
+ """
+ self.test.assertEquals(self.topic, topic)
+ self.test.assertEquals(self.msg, body['messages'][0])
+ return self
+
+ def execute(self, num_retries):
+ """Mocked PubSub execute method.
+
+ @param num_retries: Number of retries.
+ """
+ self.test.assertEquals(self.num_retries, num_retries)
+ if self.raise_except:
+ raise Exception()
+ return self.ret
+
+
+def _create_sample_message():
+ """Creates a sample pubsub message."""
+ msg_payload = {'data': 'sample data'}
+ msg_attributes = {}
+ msg_attributes['var'] = 'value'
+ msg_payload['attributes'] = msg_attributes
+
+ return msg_payload
+
+
+class PubSubTests(mox.MoxTestBase):
+ """Tests for pubsub related functios."""
+
+ def test_get_pubsub_service_no_service_account(self):
+ """Test getting the pubsub service"""
+ self.mox.StubOutWithMock(os.path, 'isfile')
+ os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(False)
+ self.mox.ReplayAll()
+ pubsub = pubsub_utils._get_pubsub_service()
+ self.assertIsNone(pubsub)
+ self.mox.VerifyAll()
+
+ def test_get_pubsub_service_with_invalid_service_account(self):
+ """Test getting the pubsub service"""
+ self.mox.StubOutWithMock(os.path, 'isfile')
+ self.mox.StubOutWithMock(GoogleCredentials, 'from_stream')
+ os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
+ credentials = self.mox.CreateMock(GoogleCredentials)
+ GoogleCredentials.from_stream(
+ pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndRaise(
+ ApplicationDefaultCredentialsError())
+ self.mox.ReplayAll()
+ pubsub = pubsub_utils._get_pubsub_service()
+ self.assertIsNone(pubsub)
+ self.mox.VerifyAll()
+
+ def test_get_pubsub_service_with_invalid_service_account(self):
+ """Test getting the pubsub service"""
+ self.mox.StubOutWithMock(os.path, 'isfile')
+ self.mox.StubOutWithMock(GoogleCredentials, 'from_stream')
+ os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
+ credentials = self.mox.CreateMock(GoogleCredentials)
+ GoogleCredentials.from_stream(
+ pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(credentials)
+ credentials.create_scoped_required().AndReturn(True)
+ credentials.create_scoped(pubsub_utils.PUBSUB_SCOPES).AndReturn(
+ credentials)
+ self.mox.StubOutWithMock(discovery, 'build')
+ discovery.build(pubsub_utils.PUBSUB_SERVICE_NAME,
+ pubsub_utils.PUBSUB_VERSION,
+ credentials=credentials).AndRaise(UnknownApiNameOrVersion())
+ self.mox.ReplayAll()
+ pubsub = pubsub_utils._get_pubsub_service()
+ self.assertIsNone(pubsub)
+ self.mox.VerifyAll()
+
+ def test_get_pubsub_service_with_service_account(self):
+ """Test getting the pubsub service"""
+ self.mox.StubOutWithMock(os.path, 'isfile')
+ self.mox.StubOutWithMock(GoogleCredentials, 'from_stream')
+ os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
+ credentials = self.mox.CreateMock(GoogleCredentials)
+ GoogleCredentials.from_stream(
+ pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(credentials)
+ credentials.create_scoped_required().AndReturn(True)
+ credentials.create_scoped(pubsub_utils.PUBSUB_SCOPES).AndReturn(
+ credentials)
+ self.mox.StubOutWithMock(discovery, 'build')
+ discovery.build(pubsub_utils.PUBSUB_SERVICE_NAME,
+ pubsub_utils.PUBSUB_VERSION,
+ credentials=credentials).AndReturn(1)
+ self.mox.ReplayAll()
+ pubsub = pubsub_utils._get_pubsub_service()
+ self.assertIsNotNone(pubsub)
+ self.mox.VerifyAll()
+
+ def test_publish_notifications(self):
+ """Tests publish notifications."""
+ self.mox.StubOutWithMock(pubsub_utils, '_get_pubsub_service')
+ msg = _create_sample_message()
+ pubsub_utils._get_pubsub_service().AndReturn(MockedPubSub(
+ self,
+ 'test_topic',
+ msg,
+ pubsub_utils._PUBSUB_NUM_RETRIES,
+ # use tuple ('123') instead of list just for easy to write the test.
+ ret_val = {'messageIds', ('123')}))
+
+ self.mox.ReplayAll()
+ pubsub_utils.publish_notifications(
+ 'test_topic', [msg])
+ self.mox.VerifyAll()
+
+
+if __name__ == '__main__':
+ unittest.main()