Wrap the pubsub into a cloud console client.

BUG=chromium:724523
TEST=unit tests.

Change-Id: I4338cce8c2c983b4cd50b71014d4f1ca5667cd78
Reviewed-on: https://chromium-review.googlesource.com/540137
Commit-Ready: Michael Tang <ntang@chromium.org>
Tested-by: Michael Tang <ntang@chromium.org>
Reviewed-by: Keith Haddow <haddowk@chromium.org>
Reviewed-by: Michael Tang <ntang@chromium.org>
diff --git a/site_utils/cloud_console.proto b/site_utils/cloud_console.proto
new file mode 100644
index 0000000..c2beab9
--- /dev/null
+++ b/site_utils/cloud_console.proto
@@ -0,0 +1,45 @@
+syntax = "proto2";
+
+// The types of message notification sent from Moblab.
+enum MessageType {
+    MOBLAB_HEARTBEAT = 1;
+    MOBLAB_REMOTE_EVENT = 2;
+    MOBLAB_ALERT = 3;
+}
+
+message Timestamp {
+  required int64 seconds = 1;
+  optional int64 nanos = 2 [default = 0];
+}
+
+// The heartbeat message
+message Heartbeat {
+  optional Timestamp timestamp = 1;
+}
+
+// The remote event notification message.
+message RemoteEventMessage {
+  // EventType is an enumeration of event types sent to cloud console.
+  // Any new event type should be added here.
+  enum EventType {
+    MOBLAB_INFO = 1;
+    MOBLAB_BOOT_COMPLETE = 2;
+  }
+
+  required EventType event_type = 1 [default = MOBLAB_INFO];
+  optional string event_data = 2;
+}
+
+// Moblab alerts
+message Alert {
+  enum AlertLevel {
+    CRITICAL = 1;
+    MAJOR = 2;
+    MINOR = 3;
+  }
+  required AlertLevel level = 1;
+  optional string data = 2;
+  optional Timestamp timestamp = 3;
+  optional string source_application = 4;
+  optional string source_component = 5;
+}
diff --git a/site_utils/cloud_console_client.py b/site_utils/cloud_console_client.py
new file mode 100644
index 0000000..892f677
--- /dev/null
+++ b/site_utils/cloud_console_client.py
@@ -0,0 +1,189 @@
+# Copyright 2016 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Chrome OS Parnter Concole remote actions."""
+
+from __future__ import print_function
+
+import base64
+import logging
+
+import common
+
+from autotest_lib.client.common_lib import global_config
+from autotest_lib.client.common_lib import utils
+from autotest_lib.server.hosts import moblab_host
+from autotest_lib.site_utils import pubsub_utils
+
+
+_PUBSUB_TOPIC = global_config.global_config.get_config_value(
+        'CROS', 'cloud_notification_topic', default=None)
+
+# Test upload pubsub notification attributes
+NOTIFICATION_ATTR_VERSION = 'version'
+NOTIFICATION_ATTR_GCS_URI = 'gcs_uri'
+NOTIFICATION_ATTR_MOBLAB_MAC = 'moblab_mac_address'
+NOTIFICATION_ATTR_MOBLAB_ID = 'moblab_id'
+NOTIFICATION_VERSION = '1'
+# the message data for new test result notification.
+NEW_TEST_RESULT_MESSAGE = 'NEW_TEST_RESULT'
+
+
+ALERT_CRITICAL = 'Critical'
+ALERT_MAJOR = 'Major'
+ALERT_MINOR = 'Minor'
+
+LOG_INFO = 'Info'
+LOG_WARNING = 'Warning'
+LOG_SEVERE = 'Severe'
+LOG_FATAL = 'Fatal'
+
+
+def is_cloud_notification_enabled():
+    """Checks if cloud pubsub notification is enabled.
+
+    @returns: True if cloud pubsub notification is enabled. Otherwise, False.
+    """
+    return  global_config.global_config.get_config_value(
+        'CROS', 'cloud_notification_enabled', type=bool, default=False)
+
+
+class CloudConsoleClient(object):
+    """The remote interface to the Cloud Console."""
+    def send_heartbeat(self):
+        """Sends a heartbeat.
+
+        @returns True if the notification is successfully sent.
+            Otherwise, False.
+        """
+        pass
+
+    def send_event(self, event_type=None, event_data=None):
+        """Sends an event notification to the remote console.
+
+        @param event_type: The event type that is defined in the protobuffer
+            file 'cloud_console.proto'.
+        @param event_data: The event data.
+
+        @returns True if the notification is successfully sent.
+            Otherwise, False.
+        """
+        pass
+
+    def send_log(self, msg, level=LOG_INFO, session_id=None):
+        """Sends a log message to the remote console.
+
+        @param msg: The log message.
+        @param level: The logging level as string.
+        @param session_id: The current session id.
+
+        @returns True if the notification is successfully sent.
+            Otherwise, False.
+        """
+        pass
+
+    def send_alert(self, msg, level=ALERT_MINOR, session_id=None):
+        """Sends an alert to the remote console.
+
+        @param msg: The alert message.
+        @param level: The logging level as string.
+        @param session_id: The current session id.
+
+        @returns True if the notification is successfully sent.
+            Otherwise, False.
+        """
+        pass
+
+    def send_test_job_offloaded_message(self, gcs_uri):
+        """Sends a test job offloaded message to the remote console.
+
+        @param gcs_uri: The test result Google Cloud Storage URI.
+
+        @returns True if the notification is successfully sent.
+            Otherwise, False.
+        """
+        pass
+
+
+# Make it easy to mock out
+def _create_pubsub_client(credential):
+    return pubsub_utils.PubSubClient(credential)
+
+
+class PubSubBasedClient(CloudConsoleClient):
+    """A Cloud PubSub based implementation of the CloudConsoleClient interface.
+    """
+    def __init__(
+            self,
+            credential=moblab_host.MOBLAB_SERVICE_ACCOUNT_LOCATION,
+            pubsub_topic=_PUBSUB_TOPIC):
+        """Constructor.
+
+        @param credential: The service account credential filename. Default to
+            '/home/moblab/.service_account.json'.
+        @param pubsub_topic: The cloud pubsub topic name to use.
+        """
+        super(PubSubBasedClient, self).__init__()
+        self._pubsub_client = _create_pubsub_client(credential)
+        self._pubsub_topic = pubsub_topic
+
+
+    def _create_notification_message(self, data, msg_attributes):
+        """Creates a cloud pubsub notification object.
+
+        @param data: The message data as a string.
+        @param msg_attributes: The message attribute map.
+
+        @returns: A pubsub message object with data and attributes.
+        """
+        message = {'data': data}
+        message['attributes'] = msg_attributes
+        return message
+
+    def _create_notification_attributes(self):
+        """Creates a cloud pubsub notification message attribute map.
+
+        Fills in the version, moblab mac address, and moblab id information
+        as attributes.
+
+        @returns: A pubsub messsage attribute map.
+        """
+        msg_attributes = {}
+        msg_attributes[NOTIFICATION_ATTR_VERSION] = NOTIFICATION_VERSION
+        msg_attributes[NOTIFICATION_ATTR_MOBLAB_MAC] = (
+                utils.get_default_interface_mac_address())
+        msg_attributes[NOTIFICATION_ATTR_MOBLAB_ID] = utils.get_moblab_id()
+        return msg_attributes
+
+    def _create_test_result_notification(self, gcs_uri):
+        """Construct a test result notification.
+
+        @param gcs_uri: The test result Google Cloud Storage URI.
+
+        @returns The notification message.
+        """
+        data = base64.b64encode(NEW_TEST_RESULT_MESSAGE)
+        msg_attributes = self._create_notification_attributes()
+        msg_attributes[NOTIFICATION_ATTR_GCS_URI] = gcs_uri
+
+        return self._create_notification_message(data, msg_attributes)
+
+
+    def send_test_job_offloaded_message(self, gcs_uri):
+        """Notify the cloud console a test job is offloaded.
+
+        @param gcs_uri: The test result Google Cloud Storage URI.
+
+        @returns True if the notification is successfully sent.
+            Otherwise, False.
+        """
+        logging.info('Notification on gcs_uri %s', gcs_uri)
+        message = self._create_test_result_notification(gcs_uri)
+        msg_ids = self._pubsub_client.publish_notifications(
+                self._pubsub_topic, [message])
+        if msg_ids:
+            return True
+        logging.warning('Failed to send notification on gcs_uri %s', gcs_uri)
+        return False
+
diff --git a/site_utils/cloud_console_client_unittest.py b/site_utils/cloud_console_client_unittest.py
new file mode 100644
index 0000000..2258b99
--- /dev/null
+++ b/site_utils/cloud_console_client_unittest.py
@@ -0,0 +1,86 @@
+#!/usr/bin/python
+# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import base64
+import mox
+import unittest
+
+import common
+
+from autotest_lib.client.common_lib import utils
+from autotest_lib.site_utils import cloud_console_client
+from autotest_lib.site_utils import pubsub_utils
+
+class PubSubBasedClientTests(mox.MoxTestBase):
+    """Tests for the 'PubSubBasedClient'."""
+
+    def setUp(self):
+        super(PubSubBasedClientTests, self).setUp()
+        self._pubsub_client_mock = self.mox.CreateMock(
+                pubsub_utils.PubSubClient)
+        self._stubs = mox.stubout.StubOutForTesting()
+        self._stubs.Set(cloud_console_client, '_create_pubsub_client',
+                lambda x: self._pubsub_client_mock)
+
+    def tearDown(self):
+        self._stubs.UnsetAll()
+        super(PubSubBasedClientTests, self).tearDown()
+
+    def test_create_test_result_notification(self):
+        """Tests the test result notification message."""
+        self._console_client = cloud_console_client.PubSubBasedClient()
+        self.mox.StubOutWithMock(utils, 'get_moblab_id')
+        self.mox.StubOutWithMock(utils,
+                                 'get_default_interface_mac_address')
+        utils.get_default_interface_mac_address().AndReturn(
+            '1c:dc:d1:11:01:e1')
+        utils.get_moblab_id().AndReturn(
+            'c8386d92-9ad1-11e6-80f5-111111111111')
+        self.mox.ReplayAll()
+        console_client = cloud_console_client.PubSubBasedClient()
+        msg = console_client._create_test_result_notification(
+                'gs://test_bucket/123-moblab')
+        self.assertEquals(base64.b64encode(
+            cloud_console_client.NEW_TEST_RESULT_MESSAGE), msg['data'])
+        self.assertEquals(
+            cloud_console_client.NOTIFICATION_VERSION,
+            msg['attributes'][cloud_console_client.NOTIFICATION_ATTR_VERSION])
+        self.assertEquals(
+            '1c:dc:d1:11:01:e1',
+            msg['attributes'][cloud_console_client.NOTIFICATION_ATTR_MOBLAB_MAC]
+            )
+        self.assertEquals(
+            'c8386d92-9ad1-11e6-80f5-111111111111',
+            msg['attributes'][cloud_console_client.NOTIFICATION_ATTR_MOBLAB_ID])
+        self.assertEquals(
+            'gs://test_bucket/123-moblab',
+            msg['attributes'][cloud_console_client.NOTIFICATION_ATTR_GCS_URI])
+        self.mox.VerifyAll()
+
+
+    def test_send_test_job_offloaded_message(self):
+        """Tests send job offloaded notification."""
+        console_client = cloud_console_client.PubSubBasedClient(
+                pubsub_topic='test topic')
+
+        # self.mox.ResetAll()
+        message = {'data': 'dummy data', 'attributes' : {'key' : 'value'}}
+        self.mox.StubOutWithMock(cloud_console_client.PubSubBasedClient,
+                '_create_test_result_notification')
+        console_client._create_test_result_notification(
+                'gs://test_bucket/123-moblab').AndReturn(message)
+
+        msg_ids = ['1']
+        self._pubsub_client_mock.publish_notifications(
+                'test topic', [message]).AndReturn(msg_ids)
+
+        self.mox.ReplayAll()
+        console_client.send_test_job_offloaded_message(
+                'gs://test_bucket/123-moblab')
+        self.mox.VerifyAll()
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/site_utils/gs_offloader.py b/site_utils/gs_offloader.py
index 07e7fbd..95b31b6 100755
--- a/site_utils/gs_offloader.py
+++ b/site_utils/gs_offloader.py
@@ -11,7 +11,6 @@
 """
 
 import abc
-import base64
 import datetime
 import errno
 import glob
@@ -32,17 +31,15 @@
 from optparse import OptionParser
 
 import common
-from autotest_lib.client.common_lib import error
 from autotest_lib.client.common_lib import file_utils
 from autotest_lib.client.common_lib import global_config
 from autotest_lib.client.common_lib import utils
 from autotest_lib.client.common_lib.cros.graphite import autotest_es
 from autotest_lib.site_utils import job_directories
-from autotest_lib.site_utils import pubsub_utils
+from autotest_lib.site_utils import cloud_console_client
 from autotest_lib.tko import models
 from autotest_lib.utils import labellib
 from autotest_lib.utils import gslib
-from chromite.lib import gs
 from chromite.lib import timeout_util
 
 # Autotest requires the psutil module from site-packages, so it must be imported
@@ -120,15 +117,6 @@
 DEFAULT_CTS_APFE_GSURI = global_config.global_config.get_config_value(
         'CROS', 'cts_apfe_server', default='')
 
-_PUBSUB_ENABLED = global_config.global_config.get_config_value(
-        'CROS', 'cloud_notification_enabled', type=bool, default=False)
-_PUBSUB_TOPIC = global_config.global_config.get_config_value(
-        'CROS', 'cloud_notification_topic', default=None)
-
-
-# the message data for new test result notification.
-NEW_TEST_RESULT_MESSAGE = 'NEW_TEST_RESULT'
-
 # metadata type
 GS_OFFLOADER_SUCCESS_TYPE = 'gs_offloader_success'
 GS_OFFLOADER_FAILURE_TYPE = 'gs_offloader_failure'
@@ -491,37 +479,6 @@
         os.remove(test_result_file_gz)
 
 
-# Test upload pubsub notification attributes
-_NOTIFICATION_ATTR_VERSION = 'version'
-_NOTIFICATION_ATTR_GCS_URI = 'gcs_uri'
-_NOTIFICATION_ATTR_MOBLAB_MAC = 'moblab_mac_address'
-_NOTIFICATION_ATTR_MOBLAB_ID = 'moblab_id'
-_NOTIFICATION_VERSION = '1'
-
-
-def _create_test_result_notification(gs_path, dir_entry):
-    """Construct a test result notification.
-
-    @param gs_path: The test result Google Cloud Storage URI.
-    @param dir_entry: The local offload directory name.
-
-    @returns The notification message.
-    """
-    gcs_uri = os.path.join(gs_path, os.path.basename(dir_entry))
-    logging.info('Notification on gcs_uri %s', gcs_uri)
-    data = base64.b64encode(NEW_TEST_RESULT_MESSAGE)
-    msg_payload = {'data': data}
-    msg_attributes = {}
-    msg_attributes[_NOTIFICATION_ATTR_GCS_URI] = gcs_uri
-    msg_attributes[_NOTIFICATION_ATTR_VERSION] = _NOTIFICATION_VERSION
-    msg_attributes[_NOTIFICATION_ATTR_MOBLAB_MAC] = \
-        utils.get_default_interface_mac_address()
-    msg_attributes[_NOTIFICATION_ATTR_MOBLAB_ID] = utils.get_moblab_id()
-    msg_payload['attributes'] = msg_attributes
-
-    return msg_payload
-
-
 def _emit_gs_returncode_metric(returncode):
     """Increment the gs_returncode counter based on |returncode|."""
     m_gs_returncode = 'chromeos/autotest/gs_offloader/gs_returncode'
@@ -552,18 +509,19 @@
 class GSOffloader(BaseGSOffloader):
     """Google Storage Offloader."""
 
-    def __init__(self, gs_uri, multiprocessing, delete_age, pubsub_topic=None):
+    def __init__(self, gs_uri, multiprocessing, delete_age,
+            console_client=None):
         """Returns the offload directory function for the given gs_uri
 
         @param gs_uri: Google storage bucket uri to offload to.
         @param multiprocessing: True to turn on -m option for gsutil.
-        @param pubsub_topic: The pubsub topic to publish notificaitons. If None,
-              pubsub is not enabled.
+        @param console_client: The cloud console client. If None,
+          cloud console APIs are  not called.
         """
         self._gs_uri = gs_uri
         self._multiprocessing = multiprocessing
         self._delete_age = delete_age
-        self._pubsub_topic = pubsub_topic
+        self._console_client = console_client
 
     @metrics.SecondsTimerDecorator(
             'chromeos/autotest/gs_offloader/job_offload_duration')
@@ -655,14 +613,13 @@
                              type_str=GS_OFFLOADER_SUCCESS_TYPE,
                              metadata=es_metadata)
 
-            if self._pubsub_topic:
-                message = _create_test_result_notification(
-                        gs_path, dir_entry)
-                pubsub_client = pubsub_utils.PubSubClient()
-                msg_ids = pubsub_client.publish_notifications(
-                        self._pubsub_topic, [message])
-                if not msg_ids:
+            if self._console_client:
+                gcs_uri = os.path.join(gs_path,
+                        os.path.basename(dir_entry))
+                if not self._console_client.send_test_job_offloaded_message(
+                        gcs_uri):
                     raise error_obj
+
             _mark_uploaded(dir_entry)
         except timeout_util.TimeoutError:
             m_timeout = 'chromeos/autotest/errors/gs_offloader/timed_out_count'
@@ -839,7 +796,6 @@
     """
 
     def __init__(self, options):
-        self._pubsub_topic = None
         self._upload_age_limit = options.age_to_upload
         self._delete_age_limit = options.age_to_delete
         if options.delete_only:
@@ -854,13 +810,12 @@
                 multiprocessing = GS_OFFLOADER_MULTIPROCESSING
             logging.info(
                     'Offloader multiprocessing is set to:%r', multiprocessing)
-            if options.pubsub_topic_for_job_upload:
-                self._pubsub_topic = options.pubsub_topic_for_job_upload
-            elif _PUBSUB_ENABLED:
-                self._pubsub_topic = _PUBSUB_TOPIC
+            console_client = None
+            if cloud_console_client.is_cloud_notification_enabled():
+                console_client = cloud_console_client.PubSubBasedClient()
             self._gs_offloader = GSOffloader(
                     self.gs_uri, multiprocessing, self._delete_age_limit,
-                    self._pubsub_topic)
+                    console_client)
         classlist = []
         if options.process_hosts_only or options.process_all:
             classlist.append(job_directories.SpecialJobDirectory)
@@ -1052,11 +1007,6 @@
     parser.add_option('-d', '--days_old', dest='days_old',
                       help='Minimum job age in days before a result can be '
                       'offloaded.', type='int', default=0)
-    parser.add_option('-t', '--pubsub_topic_for_job_upload',
-                      dest='pubsub_topic_for_job_upload',
-                      help='The pubsub topic to send notifciations for '
-                      'new job upload',
-                      action='store', type='string', default=None)
     parser.add_option('-l', '--log_size', dest='log_size',
                       help='Limit the offloader logs to a specified '
                       'number of Mega Bytes.', type='int', default=0)
diff --git a/site_utils/gs_offloader_unittest.py b/site_utils/gs_offloader_unittest.py
index 143d302..4cf3b6f 100755
--- a/site_utils/gs_offloader_unittest.py
+++ b/site_utils/gs_offloader_unittest.py
@@ -5,7 +5,6 @@
 
 import __builtin__
 import Queue
-import base64
 import datetime
 import logging
 import os
@@ -24,10 +23,12 @@
 from autotest_lib.client.common_lib import global_config
 from autotest_lib.client.common_lib import time_utils
 from autotest_lib.client.common_lib import utils
+from autotest_lib.site_utils import cloud_console_client
 from autotest_lib.site_utils import gs_offloader
 from autotest_lib.site_utils import job_directories
 from autotest_lib.tko import models
 from autotest_lib.utils import gslib
+from autotest_lib.site_utils import pubsub_utils
 from chromite.lib import timeout_util
 
 # Test value to use for `days_old`, if nothing else is required.
@@ -78,7 +79,7 @@
 
 
     def _mock_get_sub_offloader(self, is_moblab, multiprocessing=False,
-                               pubsub_topic=None, delete_age=0):
+                               console_client=None, delete_age=0):
         """Mock the process of getting the offload_dir function."""
         if is_moblab:
             expected_gsuri = '%sresults/%s/%s/' % (
@@ -89,10 +90,22 @@
             expected_gsuri = utils.DEFAULT_OFFLOAD_GSURI
         utils.get_offload_gsuri().AndReturn(expected_gsuri)
         sub_offloader = gs_offloader.GSOffloader(expected_gsuri,
-            multiprocessing, delete_age, pubsub_topic)
+            multiprocessing, delete_age, console_client)
         self.mox.StubOutWithMock(gs_offloader, 'GSOffloader')
-        gs_offloader.GSOffloader(expected_gsuri, multiprocessing,
-            delete_age, pubsub_topic).AndReturn(sub_offloader)
+        self.mox.StubOutWithMock(cloud_console_client,
+                                'is_cloud_notification_enabled')
+        if console_client:
+            cloud_console_client.is_cloud_notification_enabled().AndReturn(True)
+            gs_offloader.GSOffloader(
+                    expected_gsuri, multiprocessing, delete_age,
+                    mox.IsA(cloud_console_client.PubSubBasedClient)).AndReturn(
+                        sub_offloader)
+        else:
+            cloud_console_client.is_cloud_notification_enabled().AndReturn(
+                    False)
+            gs_offloader.GSOffloader(
+                expected_gsuri, multiprocessing, delete_age, None).AndReturn(
+                    sub_offloader)
         self.mox.ReplayAll()
         return sub_offloader
 
@@ -161,7 +174,6 @@
                               gs_offloader.FakeGSOffloader)
         self.assertEqual(offloader._upload_age_limit, 0)
         self.assertEqual(offloader._delete_age_limit, 0)
-        self.assertIsNone(offloader._pubsub_topic)
 
 
     def test_days_old_option(self):
@@ -225,18 +237,13 @@
                          sub_offloader)
         self.mox.VerifyAll()
 
-    def test_offloader_pubsub_topic_not_set(self):
-        """Test multiprocessing is set."""
-        sub_offloader = self._mock_get_sub_offloader(True, False)
-        offloader = gs_offloader.Offloader(_get_options([]))
-        self.assertEqual(offloader._gs_offloader,
-                         sub_offloader)
-        self.mox.VerifyAll()
 
-    def test_offloader_pubsub_topic_set(self):
+    def test_offloader_pubsub_enabled(self):
         """Test multiprocessing is set."""
-        sub_offloader = self._mock_get_sub_offloader(True, False, 'test-topic')
-        offloader = gs_offloader.Offloader(_get_options(['-t', 'test-topic']))
+        self.mox.StubOutWithMock(pubsub_utils, "PubSubClient")
+        sub_offloader = self._mock_get_sub_offloader(True, False,
+                cloud_console_client.PubSubBasedClient())
+        offloader = gs_offloader.Offloader(_get_options([]))
         self.assertEqual(offloader._gs_offloader,
                          sub_offloader)
         self.mox.VerifyAll()
@@ -442,38 +449,6 @@
         self._command_list_assertions(job, multi=True)
 
 
-class PubSubTest(mox.MoxTestBase):
-    """Test the test result notifcation data structure."""
-
-    def test_create_test_result_notification(self):
-        """Tests the test result notification message."""
-        self.mox.StubOutWithMock(utils, 'get_moblab_id')
-        self.mox.StubOutWithMock(utils,
-                                 'get_default_interface_mac_address')
-        utils.get_default_interface_mac_address().AndReturn(
-            '1c:dc:d1:11:01:e1')
-        utils.get_moblab_id().AndReturn(
-            'c8386d92-9ad1-11e6-80f5-111111111111')
-        self.mox.ReplayAll()
-        msg = gs_offloader._create_test_result_notification(
-                'gs://test_bucket', '123-moblab')
-        self.assertEquals(base64.b64encode(
-            gs_offloader.NEW_TEST_RESULT_MESSAGE), msg['data'])
-        self.assertEquals(
-            gs_offloader._NOTIFICATION_VERSION,
-            msg['attributes'][gs_offloader._NOTIFICATION_ATTR_VERSION])
-        self.assertEquals(
-            '1c:dc:d1:11:01:e1',
-            msg['attributes'][gs_offloader._NOTIFICATION_ATTR_MOBLAB_MAC])
-        self.assertEquals(
-            'c8386d92-9ad1-11e6-80f5-111111111111',
-            msg['attributes'][gs_offloader._NOTIFICATION_ATTR_MOBLAB_ID])
-        self.assertEquals(
-            'gs://test_bucket/123-moblab',
-            msg['attributes'][gs_offloader._NOTIFICATION_ATTR_GCS_URI])
-        self.mox.VerifyAll()
-
-
 class _MockJob(object):
     """Class to mock the return value of `AFE.get_jobs()`."""
     def __init__(self, created):
diff --git a/site_utils/pubsub_utils.py b/site_utils/pubsub_utils.py
index 2f6c71d..6d7670b 100755
--- a/site_utils/pubsub_utils.py
+++ b/site_utils/pubsub_utils.py
@@ -8,24 +8,22 @@
 Upon successful copy, the local results directory is deleted.
 """
 
-import logging
+from __future__ import print_function
+
 import os
 
 from apiclient import discovery
+from apiclient import errors
 from oauth2client.client import ApplicationDefaultCredentialsError
 from oauth2client.client import GoogleCredentials
-from autotest_lib.server.hosts import moblab_host
-
-import common
+from chromite.lib import cros_logging as logging
 
 # Cloud service
-# TODO(ntang): move this to config.
-CLOUD_SERVICE_ACCOUNT_FILE = moblab_host.MOBLAB_SERVICE_ACCOUNT_LOCATION
 PUBSUB_SERVICE_NAME = 'pubsub'
 PUBSUB_VERSION = 'v1beta2'
 PUBSUB_SCOPES = ['https://www.googleapis.com/auth/pubsub']
 # number of retry to publish an event.
-_PUBSUB_NUM_RETRIES = 3
+DEFAULT_PUBSUB_NUM_RETRIES = 3
 
 class PubSubException(Exception):
     """Exception to be raised when the test to push to prod failed."""
@@ -33,14 +31,18 @@
 
 
 class PubSubClient(object):
-    """A generic pubsub client. """
-    def __init__(self, credential_file=CLOUD_SERVICE_ACCOUNT_FILE):
+    """A generic pubsub client."""
+    def __init__(self, credential_file=None):
         """Constructor for PubSubClient.
 
-        @param credential_file: The credential filename.
-        @raises PubSubException if the credential file does not exist or
-            corrupted.
+        Args:
+          credential_file: The credential filename.
+
+        Raises:
+          PubSubException if the credential file does not exist or corrupted.
         """
+        if not credential_file:
+            raise PubSubException('You need to specify a credential file.')
         self.credential_file = credential_file
         self.credential = self._get_credential()
 
@@ -48,30 +50,30 @@
         """Gets the pubsub service api handle."""
         if not os.path.isfile(self.credential_file):
             logging.error('No credential file found')
-            raise PubSubException("Credential file does not exists:"
-                    + self.credential_file)
+            raise PubSubException('Credential file does not exist:' +
+                                  self.credential_file)
         try:
             credential = GoogleCredentials.from_stream(self.credential_file)
             if credential.create_scoped_required():
                 credential = credential.create_scoped(PUBSUB_SCOPES)
             return credential
         except ApplicationDefaultCredentialsError as ex:
-            logging.error('Failed to get credential.')
-        except:
-            logging.error('Failed to get the pubsub service handle.')
+            logging.exception('Failed to get credential:%s', ex)
+        except errors.Error as e:
+            logging.exception('Failed to get the pubsub service handle:%s', e)
 
-        raise PubSubException("Credential file does not exists:"
-                + self.credential_file)
+        raise PubSubException('Credential file %s does not exists:' %
+                              self.credential_file)
 
     def _get_pubsub_service(self):
         try:
             return discovery.build(PUBSUB_SERVICE_NAME, PUBSUB_VERSION,
                                    credentials=self.credential)
-        except:
-            logging.error('Failed to get pubsub resource object.')
-            raise PubSubException("Failed to get pubsub resource object")
+        except errors.Error as e:
+            logging.exception('Failed to get pubsub resource object:%s', e)
+            raise PubSubException('Failed to get pubsub resource object')
 
-    def publish_notifications(self, topic, messages=[]):
+    def publish_notifications(self, topic, messages=None):
         """Publishes a test result notification to a given pubsub topic.
 
         @param topic: The Cloud pubsub topic.
@@ -81,16 +83,24 @@
 
         @raises PubSubException if failed to publish the notification.
         """
+        if not messages:
+            return None
+
         pubsub = self._get_pubsub_service()
         try:
             body = {'messages': messages}
-            resp = pubsub.projects().topics().publish(topic=topic,
-                    body=body).execute(num_retries=_PUBSUB_NUM_RETRIES)
+            resp = pubsub.projects().topics().publish(
+                topic=topic, body=body).execute(
+                    num_retries=DEFAULT_PUBSUB_NUM_RETRIES)
+            msgIds = []
             if resp:
                 msgIds = resp.get('messageIds')
                 if msgIds:
                     logging.debug('Published notification message')
-                    return msgIds
-        except:
-            logging.error('Failed to publish test result notifiation.')
-            raise PubSubException("Failed to publish the notifiation.")
+                else:
+                    logging.error('Failed to published notification message')
+            return msgIds
+        except errors.Error as e:
+            logging.exception('Failed to publish test result notification:%s',
+                    e)
+            raise PubSubException('Failed to publish the notification')
diff --git a/site_utils/pubsub_utils_unittest.py b/site_utils/pubsub_utils_unittest.py
index d5244d8..3c604ce 100644
--- a/site_utils/pubsub_utils_unittest.py
+++ b/site_utils/pubsub_utils_unittest.py
@@ -1,9 +1,11 @@
-#!/usr/bin/env python
-#
+#!/usr/bin/env python2
 # Copyright 2016 The Chromium OS Authors. All rights reserved.
 # Use of this source code is governed by a BSD-style license that can be
 # found in the LICENSE file.
 
+"""Unit test for pubsub_utils.py"""
+
+from __future__ import print_function
 import os
 import unittest
 
@@ -14,14 +16,15 @@
 from oauth2client.client import GoogleCredentials
 from googleapiclient.errors import UnknownApiNameOrVersion
 
-import common
 import pubsub_utils
 
+_TEST_CLOUD_SERVICE_ACCOUNT_FILE = '/tmp/test-credential'
+
 
 class MockedPubSub(object):
     """A mocked PubSub handle."""
     def __init__(self, test, topic, msg, retry, ret_val=None,
-            raise_except=False):
+                 raise_except=False):
         self.test = test
         self.topic = topic
         self.msg = msg
@@ -52,10 +55,10 @@
 
         @param num_retries: Number of retries.
         """
-        self.test.assertEquals(self.num_retries, num_retries)
+        self.test.assertEquals(self.retry, num_retries)
         if self.raise_except:
             raise Exception()
-        return self.ret
+        return self.ret_val
 
 
 def _create_sample_message():
@@ -71,48 +74,57 @@
 class PubSubTests(mox.MoxTestBase):
     """Tests for pubsub related functios."""
 
-    def test_ubsub_with_no_service_account(self):
+    def test_pubsub_with_no_service_account(self):
         """Test getting the pubsub service"""
         self.mox.StubOutWithMock(os.path, 'isfile')
-        os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(False)
         self.mox.ReplayAll()
         with self.assertRaises(pubsub_utils.PubSubException):
             pubsub_utils.PubSubClient()
         self.mox.VerifyAll()
 
+    def test_pubsub_with_non_existing_service_account(self):
+        """Test getting the pubsub service"""
+        self.mox.StubOutWithMock(os.path, 'isfile')
+        os.path.isfile(_TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(False)
+        self.mox.ReplayAll()
+        with self.assertRaises(pubsub_utils.PubSubException):
+            pubsub_utils.PubSubClient(_TEST_CLOUD_SERVICE_ACCOUNT_FILE)
+        self.mox.VerifyAll()
+
     def test_pubsub_with_corrupted_service_account(self):
         """Test pubsub with corrupted service account."""
         self.mox.StubOutWithMock(os.path, 'isfile')
         self.mox.StubOutWithMock(GoogleCredentials, 'from_stream')
-        os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
-        credentials = self.mox.CreateMock(GoogleCredentials)
+        os.path.isfile(_TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
         GoogleCredentials.from_stream(
-                pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndRaise(
-                        ApplicationDefaultCredentialsError())
+            _TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndRaise(
+                ApplicationDefaultCredentialsError())
         self.mox.ReplayAll()
         with self.assertRaises(pubsub_utils.PubSubException):
-            pubsub_utils.PubSubClient()
+            pubsub_utils.PubSubClient(_TEST_CLOUD_SERVICE_ACCOUNT_FILE)
         self.mox.VerifyAll()
 
     def test_pubsub_with_invalid_service_account(self):
         """Test pubsubwith invalid service account."""
         self.mox.StubOutWithMock(os.path, 'isfile')
         self.mox.StubOutWithMock(GoogleCredentials, 'from_stream')
-        os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
+        os.path.isfile(_TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
         credentials = self.mox.CreateMock(GoogleCredentials)
         GoogleCredentials.from_stream(
-                pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(credentials)
+            _TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(credentials)
         credentials.create_scoped_required().AndReturn(True)
         credentials.create_scoped(pubsub_utils.PUBSUB_SCOPES).AndReturn(
-                credentials)
+            credentials)
         self.mox.StubOutWithMock(discovery, 'build')
-        discovery.build(pubsub_utils.PUBSUB_SERVICE_NAME,
-                pubsub_utils.PUBSUB_VERSION,
-                credentials=credentials).AndRaise(UnknownApiNameOrVersion())
+        discovery.build(
+            pubsub_utils.PUBSUB_SERVICE_NAME,
+            pubsub_utils.PUBSUB_VERSION,
+            credentials=credentials).AndRaise(UnknownApiNameOrVersion())
         self.mox.ReplayAll()
         with self.assertRaises(pubsub_utils.PubSubException):
             msg = _create_sample_message()
-            pubsub_client = pubsub_utils.PubSubClient()
+            pubsub_client = pubsub_utils.PubSubClient(
+                _TEST_CLOUD_SERVICE_ACCOUNT_FILE)
             pubsub_client.publish_notifications('test_topic', [msg])
         self.mox.VerifyAll()
 
@@ -120,30 +132,33 @@
         """Test getting the pubsub service"""
         self.mox.StubOutWithMock(os.path, 'isfile')
         self.mox.StubOutWithMock(GoogleCredentials, 'from_stream')
-        os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
+        os.path.isfile(_TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
         credentials = self.mox.CreateMock(GoogleCredentials)
         GoogleCredentials.from_stream(
-                pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(credentials)
+            _TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(credentials)
         credentials.create_scoped_required().AndReturn(True)
         credentials.create_scoped(pubsub_utils.PUBSUB_SCOPES).AndReturn(
-                credentials)
+            credentials)
         self.mox.StubOutWithMock(discovery, 'build')
         msg = _create_sample_message()
-        discovery.build(pubsub_utils.PUBSUB_SERVICE_NAME,
-                pubsub_utils.PUBSUB_VERSION,
-                credentials=credentials).AndReturn(MockedPubSub(
-                    self,
-                    'test_topic',
-                    msg,
-                    pubsub_utils._PUBSUB_NUM_RETRIES,
-                    # use tuple ('123') instead of list just for easy to
-                    # write the test.
-                    ret_val = {'messageIds', ('123')}))
+        discovery.build(
+            pubsub_utils.PUBSUB_SERVICE_NAME,
+            pubsub_utils.PUBSUB_VERSION,
+            credentials=credentials).AndReturn(MockedPubSub(
+                self,
+                'test_topic',
+                msg,
+                pubsub_utils.DEFAULT_PUBSUB_NUM_RETRIES,
+                # use tuple ('123') instead of list just for easy to
+                # write the test.
+                ret_val={'messageIds': ('123')}))
 
         self.mox.ReplayAll()
-        with self.assertRaises(pubsub_utils.PubSubException):
-            pubsub_client = pubsub_utils.PubSubClient()
-            pubsub_client.publish_notifications('test_topic', [msg])
+        pubsub_client = pubsub_utils.PubSubClient(
+                _TEST_CLOUD_SERVICE_ACCOUNT_FILE)
+        msg_ids = pubsub_client.publish_notifications('test_topic', [msg])
+        self.assertEquals(('123'), msg_ids)
+
         self.mox.VerifyAll()