Wrap pubsub code into a reusable class.

BUG=chromium:724523
TEST=Unittest and manully.

Change-Id: Icfa4fa9c1c515945f18a309ef0cc43ed623775be
Reviewed-on: https://chromium-review.googlesource.com/510348
Commit-Ready: Michael Tang <ntang@chromium.org>
Tested-by: Michael Tang <ntang@chromium.org>
Reviewed-by: Keith Haddow <haddowk@chromium.org>
Reviewed-by: Michael Tang <ntang@chromium.org>
diff --git a/site_utils/gs_offloader.py b/site_utils/gs_offloader.py
index a5e3124..b6f8c5f 100755
--- a/site_utils/gs_offloader.py
+++ b/site_utils/gs_offloader.py
@@ -618,7 +618,8 @@
                     if pubsub_topic:
                         message = _create_test_result_notification(
                                 gs_path, dir_entry)
-                        msg_ids = pubsub_utils.publish_notifications(
+                        pubsub_client = pubsub_utils.PubSubClient()
+                        msg_ids = pubsub_client.publish_notifications(
                                 pubsub_topic, [message])
                         if not msg_ids:
                             error = True
diff --git a/site_utils/pubsub_utils.py b/site_utils/pubsub_utils.py
index f62a31d..2f6c71d 100755
--- a/site_utils/pubsub_utils.py
+++ b/site_utils/pubsub_utils.py
@@ -27,37 +27,61 @@
 # number of retry to publish an event.
 _PUBSUB_NUM_RETRIES = 3
 
-
-def _get_pubsub_service():
-    """Gets the pubsub service api handle."""
-    if not os.path.isfile(CLOUD_SERVICE_ACCOUNT_FILE):
-        logging.error('No credential file found')
-        return None
-
-    try:
-        credentials = GoogleCredentials.from_stream(CLOUD_SERVICE_ACCOUNT_FILE)
-        if credentials.create_scoped_required():
-            credentials = credentials.create_scoped(PUBSUB_SCOPES)
-        return discovery.build(PUBSUB_SERVICE_NAME, PUBSUB_VERSION,
-                                 credentials=credentials)
-    except ApplicationDefaultCredentialsError as ex:
-        logging.error('Failed to get credential.')
-    except:
-        logging.error('Failed to get the pubsub service handle.')
-
-    return None
+class PubSubException(Exception):
+    """Exception to be raised when the test to push to prod failed."""
+    pass
 
 
-def publish_notifications(topic, messages=[]):
-    """Publishes a test result notification to a given pubsub topic.
+class PubSubClient(object):
+    """A generic pubsub client. """
+    def __init__(self, credential_file=CLOUD_SERVICE_ACCOUNT_FILE):
+        """Constructor for PubSubClient.
 
-    @param topic: The Cloud pubsub topic.
-    @param messages: A list of notification messages.
+        @param credential_file: The credential filename.
+        @raises PubSubException if the credential file does not exist or
+            corrupted.
+        """
+        self.credential_file = credential_file
+        self.credential = self._get_credential()
 
-    @returns A list of pubsub message ids, and empty if fails.
-    """
-    pubsub = _get_pubsub_service()
-    if pubsub:
+    def _get_credential(self):
+        """Gets the pubsub service api handle."""
+        if not os.path.isfile(self.credential_file):
+            logging.error('No credential file found')
+            raise PubSubException("Credential file does not exists:"
+                    + self.credential_file)
+        try:
+            credential = GoogleCredentials.from_stream(self.credential_file)
+            if credential.create_scoped_required():
+                credential = credential.create_scoped(PUBSUB_SCOPES)
+            return credential
+        except ApplicationDefaultCredentialsError as ex:
+            logging.error('Failed to get credential.')
+        except:
+            logging.error('Failed to get the pubsub service handle.')
+
+        raise PubSubException("Credential file does not exists:"
+                + self.credential_file)
+
+    def _get_pubsub_service(self):
+        try:
+            return discovery.build(PUBSUB_SERVICE_NAME, PUBSUB_VERSION,
+                                   credentials=self.credential)
+        except:
+            logging.error('Failed to get pubsub resource object.')
+            raise PubSubException("Failed to get pubsub resource object")
+
+    def publish_notifications(self, topic, messages=[]):
+        """Publishes a test result notification to a given pubsub topic.
+
+        @param topic: The Cloud pubsub topic.
+        @param messages: A list of notification messages.
+
+        @returns A list of pubsub message ids, and empty if fails.
+
+        @raises PubSubException if failed to publish the notification.
+        """
+        pubsub = self._get_pubsub_service()
         try:
             body = {'messages': messages}
             resp = pubsub.projects().topics().publish(topic=topic,
@@ -68,6 +92,5 @@
                     logging.debug('Published notification message')
                     return msgIds
         except:
-            pass
-    logging.error('Failed to publish test result notifiation.')
-    return []
+            logging.error('Failed to publish test result notifiation.')
+            raise PubSubException("Failed to publish the notifiation.")
diff --git a/site_utils/pubsub_utils_unittest.py b/site_utils/pubsub_utils_unittest.py
index 8a7c60a..d5244d8 100644
--- a/site_utils/pubsub_utils_unittest.py
+++ b/site_utils/pubsub_utils_unittest.py
@@ -71,17 +71,17 @@
 class PubSubTests(mox.MoxTestBase):
     """Tests for pubsub related functios."""
 
-    def test_get_pubsub_service_no_service_account(self):
+    def test_ubsub_with_no_service_account(self):
         """Test getting the pubsub service"""
         self.mox.StubOutWithMock(os.path, 'isfile')
         os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(False)
         self.mox.ReplayAll()
-        pubsub = pubsub_utils._get_pubsub_service()
-        self.assertIsNone(pubsub)
+        with self.assertRaises(pubsub_utils.PubSubException):
+            pubsub_utils.PubSubClient()
         self.mox.VerifyAll()
 
-    def test_get_pubsub_service_with_invalid_service_account(self):
-        """Test getting the pubsub service"""
+    def test_pubsub_with_corrupted_service_account(self):
+        """Test pubsub with corrupted service account."""
         self.mox.StubOutWithMock(os.path, 'isfile')
         self.mox.StubOutWithMock(GoogleCredentials, 'from_stream')
         os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
@@ -90,12 +90,12 @@
                 pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndRaise(
                         ApplicationDefaultCredentialsError())
         self.mox.ReplayAll()
-        pubsub = pubsub_utils._get_pubsub_service()
-        self.assertIsNone(pubsub)
+        with self.assertRaises(pubsub_utils.PubSubException):
+            pubsub_utils.PubSubClient()
         self.mox.VerifyAll()
 
-    def test_get_pubsub_service_with_invalid_service_account(self):
-        """Test getting the pubsub service"""
+    def test_pubsub_with_invalid_service_account(self):
+        """Test pubsubwith invalid service account."""
         self.mox.StubOutWithMock(os.path, 'isfile')
         self.mox.StubOutWithMock(GoogleCredentials, 'from_stream')
         os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
@@ -110,11 +110,13 @@
                 pubsub_utils.PUBSUB_VERSION,
                 credentials=credentials).AndRaise(UnknownApiNameOrVersion())
         self.mox.ReplayAll()
-        pubsub = pubsub_utils._get_pubsub_service()
-        self.assertIsNone(pubsub)
+        with self.assertRaises(pubsub_utils.PubSubException):
+            msg = _create_sample_message()
+            pubsub_client = pubsub_utils.PubSubClient()
+            pubsub_client.publish_notifications('test_topic', [msg])
         self.mox.VerifyAll()
 
-    def test_get_pubsub_service_with_service_account(self):
+    def test_publish_notifications(self):
         """Test getting the pubsub service"""
         self.mox.StubOutWithMock(os.path, 'isfile')
         self.mox.StubOutWithMock(GoogleCredentials, 'from_stream')
@@ -126,29 +128,22 @@
         credentials.create_scoped(pubsub_utils.PUBSUB_SCOPES).AndReturn(
                 credentials)
         self.mox.StubOutWithMock(discovery, 'build')
+        msg = _create_sample_message()
         discovery.build(pubsub_utils.PUBSUB_SERVICE_NAME,
                 pubsub_utils.PUBSUB_VERSION,
-                credentials=credentials).AndReturn(1)
-        self.mox.ReplayAll()
-        pubsub = pubsub_utils._get_pubsub_service()
-        self.assertIsNotNone(pubsub)
-        self.mox.VerifyAll()
-
-    def test_publish_notifications(self):
-        """Tests publish notifications."""
-        self.mox.StubOutWithMock(pubsub_utils, '_get_pubsub_service')
-        msg = _create_sample_message()
-        pubsub_utils._get_pubsub_service().AndReturn(MockedPubSub(
-            self,
-            'test_topic',
-            msg,
-            pubsub_utils._PUBSUB_NUM_RETRIES,
-            # use tuple ('123') instead of list just for easy to write the test.
-            ret_val = {'messageIds', ('123')}))
+                credentials=credentials).AndReturn(MockedPubSub(
+                    self,
+                    'test_topic',
+                    msg,
+                    pubsub_utils._PUBSUB_NUM_RETRIES,
+                    # use tuple ('123') instead of list just for easy to
+                    # write the test.
+                    ret_val = {'messageIds', ('123')}))
 
         self.mox.ReplayAll()
-        pubsub_utils.publish_notifications(
-                'test_topic', [msg])
+        with self.assertRaises(pubsub_utils.PubSubException):
+            pubsub_client = pubsub_utils.PubSubClient()
+            pubsub_client.publish_notifications('test_topic', [msg])
         self.mox.VerifyAll()