Publish a cloud notification when a job directory is uploaded.

BUG=chromium:623239
TEST=Unittest and manual test.

Change-Id: I2dc645d3dd632b8f11618bdf5ef66ffde8254308
Reviewed-on: https://chromium-review.googlesource.com/358552
Commit-Ready: Michael Tang <ntang@chromium.org>
Tested-by: Michael Tang <ntang@chromium.org>
Reviewed-by: Simran Basi <sbasi@chromium.org>
Reviewed-by: Michael Tang <ntang@chromium.org>
diff --git a/global_config.ini b/global_config.ini
index e779055..fe36c12 100644
--- a/global_config.ini
+++ b/global_config.ini
@@ -280,6 +280,10 @@
 infrastructure_user: chromeos-test
 gs_offloader_use_rsync: False
 gs_offloader_multiprocessing: False
+# Cloud pubsub
+cloud_notification_enabled: False
+# The cloud pubsub topic where notifications are sent to.
+cloud_notification_topic:
 
 # Naming convention of Android build.
 android_build_name_pattern: %\(branch\)s/%\(target\)s/%\(build_id\)s
diff --git a/site_utils/gs_offloader.py b/site_utils/gs_offloader.py
index a97c1bb..78d2356 100755
--- a/site_utils/gs_offloader.py
+++ b/site_utils/gs_offloader.py
@@ -10,6 +10,7 @@
 Upon successful copy, the local results directory is deleted.
 """
 
+import base64
 import datetime
 import errno
 import glob
@@ -41,6 +42,7 @@
     psutil = None
 
 import job_directories
+import pubsub_utils
 from autotest_lib.client.common_lib import global_config
 from autotest_lib.client.common_lib.cros.graphite import autotest_stats
 from autotest_lib.scheduler import email_manager
@@ -116,7 +118,7 @@
         'CROS', 'gs_offloader_multiprocessing', type=bool, default=False)
 
 D = '[0-9][0-9]'
-TIMESTAMP_PATTERN =  '%s%s.%s.%s_%s.%s.%s' % (D, D, D, D, D, D, D)
+TIMESTAMP_PATTERN = '%s%s.%s.%s_%s.%s.%s' % (D, D, D, D, D, D, D)
 CTS_RESULT_PATTERN = 'testResult.xml'
 # Google Storage bucket URI to store results in.
 DEFAULT_CTS_RESULTS_GSURI = global_config.global_config.get_config_value(
@@ -124,6 +126,15 @@
 DEFAULT_CTS_APFE_GSURI = global_config.global_config.get_config_value(
         'CROS', 'cts_apfe_server', default='')
 
+_PUBSUB_ENABLED = global_config.global_config.get_config_value(
+        'CROS', 'cloud_notification_enabled:', type=bool, default=False)
+_PUBSUB_TOPIC = global_config.global_config.get_config_value(
+        'CROS', 'cloud_notification_topic::', type='string', default=None)
+
+# the message data for new test result notification.
+NEW_TEST_RESULT_MESSAGE = 'NEW_TEST_RESULT'
+
+
 class TimeoutException(Exception):
     """Exception raised by the timeout_handler."""
     pass
@@ -366,11 +377,29 @@
                               dir_entry, e)
 
 
-def get_offload_dir_func(gs_uri, multiprocessing):
+def _create_test_result_notification(gs_path):
+    """Construct a test result notification.
+
+    @param gs_path: The test result Google Cloud Storage URI.
+
+    @returns The notification message.
+    """
+    data = base64.b64encode(NEW_TEST_RESULT_MESSAGE)
+    msg_payload = {'data': data}
+    msg_attributes = {}
+    msg_attributes['gcs_uri'] = gs_path
+    msg_payload['attributes'] = msg_attributes
+
+    return msg_payload
+
+
+def get_offload_dir_func(gs_uri, multiprocessing, pubsub_topic=None):
     """Returns the offload directory function for the given gs_uri
 
     @param gs_uri: Google storage bucket uri to offload to.
     @param multiprocessing: True to turn on -m option for gsutil.
+    @param pubsub_topic: The pubsub topic to publish notificaitons. If None,
+          pubsub is not enabled.
 
     @return offload_dir function to perform the offload.
     """
@@ -420,7 +449,16 @@
                 autotest_stats.Gauge(STATS_KEY, metadata=metadata).send(
                         'kibibytes_transferred', dir_size)
                 counter.increment('jobs_offloaded')
-                shutil.rmtree(dir_entry)
+
+                if pubsub_topic:
+                    message = _create_test_result_notification(gs_path)
+                    msg_ids = pubsub_utils.publish_notifications(
+                            pubsub_topic, [message])
+                    if not msg_ids:
+                        error = True
+
+                if not error:
+                    shutil.rmtree(dir_entry)
             else:
                 error = True
         except TimeoutException:
@@ -549,6 +587,7 @@
     """
 
     def __init__(self, options):
+        self._pubsub_topic = None
         if options.delete_only:
             self._offload_func = delete_files
         else:
@@ -561,8 +600,12 @@
                 multiprocessing = GS_OFFLOADER_MULTIPROCESSING
             logging.info(
                     'Offloader multiprocessing is set to:%r', multiprocessing)
+            if options.pubsub_topic_for_job_upload:
+              self._pubsub_topic = options.pubsub_topic_for_job_upload
+            elif _PUBSUB_ENABLED:
+              self._pubsub_topic = _PUBSUB_TOPIC
             self._offload_func = get_offload_dir_func(
-                    self.gs_uri, multiprocessing)
+                    self.gs_uri, multiprocessing, self._pubsub_topic)
         classlist = []
         if options.process_hosts_only or options.process_all:
             classlist.append(job_directories.SpecialJobDirectory)
@@ -574,6 +617,7 @@
         self._age_limit = options.days_old
         self._open_jobs = {}
         self._next_report_time = time.time()
+        self._pusub_topic = None
 
 
     def _add_new_jobs(self):
@@ -694,6 +738,11 @@
     parser.add_option('-d', '--days_old', dest='days_old',
                       help='Minimum job age in days before a result can be '
                       'offloaded.', type='int', default=0)
+    parser.add_option('-t', '--pubsub_topic_for_job_upload',
+                      dest='pubsub_topic_for_job_upload',
+                      help='The pubsub topic to send notifciations for '
+                      'new job upload',
+                      action='store', type='string', default=None)
     parser.add_option('-l', '--log_size', dest='log_size',
                       help='Limit the offloader logs to a specified '
                       'number of Mega Bytes.', type='int', default=0)
diff --git a/site_utils/gs_offloader_unittest.py b/site_utils/gs_offloader_unittest.py
index 77d1ff2..54214c5 100644
--- a/site_utils/gs_offloader_unittest.py
+++ b/site_utils/gs_offloader_unittest.py
@@ -1,8 +1,9 @@
-# Copyright 2013 The Chromium OS Authors. All rights reserved.
+# Copyright 2016 The Chromium OS Authors. All rights reserved.
 # Use of this source code is governed by a BSD-style license that can be
 # found in the LICENSE file.
 
 import Queue
+import base64
 import datetime
 import logging
 import os
@@ -64,7 +65,8 @@
         gs_offloader.GS_OFFLOADER_MULTIPROCESSING = False
 
 
-    def _mock_get_offload_func(self, is_moblab, multiprocessing=False):
+    def _mock_get_offload_func(self, is_moblab, multiprocessing=False,
+                               pubsub_topic=None):
         """Mock the process of getting the offload_dir function."""
         if is_moblab:
             expected_gsuri = '%sresults/%s/%s/' % (
@@ -77,8 +79,8 @@
         offload_func = gs_offloader.get_offload_dir_func(
                 expected_gsuri, multiprocessing)
         self.mox.StubOutWithMock(gs_offloader, 'get_offload_dir_func')
-        gs_offloader.get_offload_dir_func(expected_gsuri, multiprocessing).AndReturn(
-                offload_func)
+        gs_offloader.get_offload_dir_func(expected_gsuri, multiprocessing,
+                pubsub_topic).AndReturn(offload_func)
         self.mox.ReplayAll()
         return offload_func
 
@@ -142,6 +144,7 @@
         self.assertEqual(offloader._offload_func,
                          gs_offloader.delete_files)
         self.assertEqual(offloader._age_limit, 0)
+        self.assertIsNone(offloader._pubsub_topic)
 
 
     def test_days_old_option(self):
@@ -203,6 +206,22 @@
                          offload_func)
         self.mox.VerifyAll()
 
+    def test_offloader_pubsub_topic_not_set(self):
+        """Test multiprocessing is set."""
+        offload_func = self._mock_get_offload_func(True, False)
+        offloader = gs_offloader.Offloader(_get_options([]))
+        self.assertEqual(offloader._offload_func,
+                         offload_func)
+        self.mox.VerifyAll()
+
+    def test_offloader_pubsub_topic_set(self):
+        """Test multiprocessing is set."""
+        offload_func = self._mock_get_offload_func(True, False, 'test-topic')
+        offloader = gs_offloader.Offloader(_get_options(['-t', 'test-topic']))
+        self.assertEqual(offloader._offload_func,
+                         offload_func)
+        self.mox.VerifyAll()
+
 
 def _make_timestamp(age_limit, is_expired):
     """Create a timestamp for use by `job_directories._is_job_expired()`.
@@ -468,6 +487,17 @@
                       gs_offloader.ERROR_EMAIL_REPORT_FORMAT)
 
 
+class PubSubTest(mox.MoxTestBase):
+    """Test the test result notifcation data structure."""
+
+    def test_create_test_result_notification(self):
+        """Tests the test result notification message."""
+        msg = gs_offloader._create_test_result_notification('gs://test_bucket')
+        self.assertEquals(base64.b64encode(
+            gs_offloader.NEW_TEST_RESULT_MESSAGE), msg['data'])
+        self.assertEquals('gs://test_bucket', msg['attributes']['gcs_uri'])
+
+
 class _MockJob(object):
     """Class to mock the return value of `AFE.get_jobs()`."""
     def __init__(self, created):
@@ -943,6 +973,7 @@
 
         shutil.rmtree(results_folder)
 
+
 class JobDirectoryOffloadTests(_TempResultsDirTestBase):
     """Tests for `_JobDirectory.enqueue_offload()`.
 
diff --git a/site_utils/pubsub_utils.py b/site_utils/pubsub_utils.py
new file mode 100755
index 0000000..d3817ee
--- /dev/null
+++ b/site_utils/pubsub_utils.py
@@ -0,0 +1,73 @@
+# Copyright 2016 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Script to archive old Autotest results to Google Storage.
+
+Uses gsutil to archive files to the configured Google Storage bucket.
+Upon successful copy, the local results directory is deleted.
+"""
+
+import logging
+import os
+
+from apiclient import discovery
+from oauth2client.client import ApplicationDefaultCredentialsError
+from oauth2client.client import GoogleCredentials
+
+import common
+
+# Cloud service
+# TODO(ntang): move this to config.
+CLOUD_SERVICE_ACCOUNT_FILE = ('/home/moblab/.config/moblab/'
+                              'moblab-cloud-credential.json')
+PUBSUB_SERVICE_NAME = 'pubsub'
+PUBSUB_VERSION = 'v1beta2'
+PUBSUB_SCOPES = ['https://www.googleapis.com/auth/pubsub']
+# number of retry to publish an event.
+_PUBSUB_NUM_RETRIES = 3
+
+
+def _get_pubsub_service():
+    """Gets the pubsub service api handle."""
+    if not os.path.isfile(CLOUD_SERVICE_ACCOUNT_FILE):
+        logging.error('No credential file found')
+        return None
+
+    try:
+        credentials = GoogleCredentials.from_stream(CLOUD_SERVICE_ACCOUNT_FILE)
+        if credentials.create_scoped_required():
+            credentials = credentials.create_scoped(PUBSUB_SCOPES)
+        return discovery.build(PUBSUB_SERVICE_NAME, PUBSUB_VERSION,
+                                 credentials=credentials)
+    except ApplicationDefaultCredentialsError as ex:
+        logging.error('Failed to get credential.')
+    except:
+        logging.error('Failed to get the pubsub service handle.')
+
+    return None
+
+
+def publish_notifications(topic, messages=[]):
+    """Publishes a test result notification to a given pubsub topic.
+
+    @param topic: The Cloud pubsub topic.
+    @param messages: A list of notification messages.
+
+    @returns A list of pubsub message ids, and empty if fails.
+    """
+    pubsub = _get_pubsub_service()
+    if pubsub:
+        try:
+            body = {'messages': messages}
+            resp = pubsub.projects().topics().publish(topic=topic,
+                    body=body).execute(num_retries=_PUBSUB_NUM_RETRIES)
+            if resp:
+                msgIds = resp.get('messageIds')
+                if msgIds:
+                    logging.debug('Published notification message')
+                    return msgIds
+        except:
+            pass
+    logging.error('Failed to publish test result notifiation.')
+    return []
diff --git a/site_utils/pubsub_utils_unittest.py b/site_utils/pubsub_utils_unittest.py
new file mode 100644
index 0000000..8a7c60a
--- /dev/null
+++ b/site_utils/pubsub_utils_unittest.py
@@ -0,0 +1,156 @@
+#!/usr/bin/env python
+#
+# Copyright 2016 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import os
+import unittest
+
+import mox
+
+from apiclient import discovery
+from oauth2client.client import ApplicationDefaultCredentialsError
+from oauth2client.client import GoogleCredentials
+from googleapiclient.errors import UnknownApiNameOrVersion
+
+import common
+import pubsub_utils
+
+
+class MockedPubSub(object):
+    """A mocked PubSub handle."""
+    def __init__(self, test, topic, msg, retry, ret_val=None,
+            raise_except=False):
+        self.test = test
+        self.topic = topic
+        self.msg = msg
+        self.retry = retry
+        self.ret_val = ret_val
+        self.raise_except = raise_except
+
+    def projects(self):
+        """Mocked PubSub projects."""
+        return self
+
+    def topics(self):
+        """Mocked PubSub topics."""
+        return self
+
+    def publish(self, topic, body):
+        """Mocked PubSub publish method.
+
+        @param topic: PubSub topic string.
+        @param body: PubSub notification body.
+        """
+        self.test.assertEquals(self.topic, topic)
+        self.test.assertEquals(self.msg, body['messages'][0])
+        return self
+
+    def execute(self, num_retries):
+        """Mocked PubSub execute method.
+
+        @param num_retries: Number of retries.
+        """
+        self.test.assertEquals(self.num_retries, num_retries)
+        if self.raise_except:
+            raise Exception()
+        return self.ret
+
+
+def _create_sample_message():
+    """Creates a sample pubsub message."""
+    msg_payload = {'data': 'sample data'}
+    msg_attributes = {}
+    msg_attributes['var'] = 'value'
+    msg_payload['attributes'] = msg_attributes
+
+    return msg_payload
+
+
+class PubSubTests(mox.MoxTestBase):
+    """Tests for pubsub related functios."""
+
+    def test_get_pubsub_service_no_service_account(self):
+        """Test getting the pubsub service"""
+        self.mox.StubOutWithMock(os.path, 'isfile')
+        os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(False)
+        self.mox.ReplayAll()
+        pubsub = pubsub_utils._get_pubsub_service()
+        self.assertIsNone(pubsub)
+        self.mox.VerifyAll()
+
+    def test_get_pubsub_service_with_invalid_service_account(self):
+        """Test getting the pubsub service"""
+        self.mox.StubOutWithMock(os.path, 'isfile')
+        self.mox.StubOutWithMock(GoogleCredentials, 'from_stream')
+        os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
+        credentials = self.mox.CreateMock(GoogleCredentials)
+        GoogleCredentials.from_stream(
+                pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndRaise(
+                        ApplicationDefaultCredentialsError())
+        self.mox.ReplayAll()
+        pubsub = pubsub_utils._get_pubsub_service()
+        self.assertIsNone(pubsub)
+        self.mox.VerifyAll()
+
+    def test_get_pubsub_service_with_invalid_service_account(self):
+        """Test getting the pubsub service"""
+        self.mox.StubOutWithMock(os.path, 'isfile')
+        self.mox.StubOutWithMock(GoogleCredentials, 'from_stream')
+        os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
+        credentials = self.mox.CreateMock(GoogleCredentials)
+        GoogleCredentials.from_stream(
+                pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(credentials)
+        credentials.create_scoped_required().AndReturn(True)
+        credentials.create_scoped(pubsub_utils.PUBSUB_SCOPES).AndReturn(
+                credentials)
+        self.mox.StubOutWithMock(discovery, 'build')
+        discovery.build(pubsub_utils.PUBSUB_SERVICE_NAME,
+                pubsub_utils.PUBSUB_VERSION,
+                credentials=credentials).AndRaise(UnknownApiNameOrVersion())
+        self.mox.ReplayAll()
+        pubsub = pubsub_utils._get_pubsub_service()
+        self.assertIsNone(pubsub)
+        self.mox.VerifyAll()
+
+    def test_get_pubsub_service_with_service_account(self):
+        """Test getting the pubsub service"""
+        self.mox.StubOutWithMock(os.path, 'isfile')
+        self.mox.StubOutWithMock(GoogleCredentials, 'from_stream')
+        os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
+        credentials = self.mox.CreateMock(GoogleCredentials)
+        GoogleCredentials.from_stream(
+                pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(credentials)
+        credentials.create_scoped_required().AndReturn(True)
+        credentials.create_scoped(pubsub_utils.PUBSUB_SCOPES).AndReturn(
+                credentials)
+        self.mox.StubOutWithMock(discovery, 'build')
+        discovery.build(pubsub_utils.PUBSUB_SERVICE_NAME,
+                pubsub_utils.PUBSUB_VERSION,
+                credentials=credentials).AndReturn(1)
+        self.mox.ReplayAll()
+        pubsub = pubsub_utils._get_pubsub_service()
+        self.assertIsNotNone(pubsub)
+        self.mox.VerifyAll()
+
+    def test_publish_notifications(self):
+        """Tests publish notifications."""
+        self.mox.StubOutWithMock(pubsub_utils, '_get_pubsub_service')
+        msg = _create_sample_message()
+        pubsub_utils._get_pubsub_service().AndReturn(MockedPubSub(
+            self,
+            'test_topic',
+            msg,
+            pubsub_utils._PUBSUB_NUM_RETRIES,
+            # use tuple ('123') instead of list just for easy to write the test.
+            ret_val = {'messageIds', ('123')}))
+
+        self.mox.ReplayAll()
+        pubsub_utils.publish_notifications(
+                'test_topic', [msg])
+        self.mox.VerifyAll()
+
+
+if __name__ == '__main__':
+    unittest.main()