Wrap the pubsub into a cloud console client.

BUG=chromium:724523
TEST=unit tests.

Change-Id: I4338cce8c2c983b4cd50b71014d4f1ca5667cd78
Reviewed-on: https://chromium-review.googlesource.com/540137
Commit-Ready: Michael Tang <ntang@chromium.org>
Tested-by: Michael Tang <ntang@chromium.org>
Reviewed-by: Keith Haddow <haddowk@chromium.org>
Reviewed-by: Michael Tang <ntang@chromium.org>
diff --git a/site_utils/gs_offloader_unittest.py b/site_utils/gs_offloader_unittest.py
index 143d302..4cf3b6f 100755
--- a/site_utils/gs_offloader_unittest.py
+++ b/site_utils/gs_offloader_unittest.py
@@ -5,7 +5,6 @@
 
 import __builtin__
 import Queue
-import base64
 import datetime
 import logging
 import os
@@ -24,10 +23,12 @@
 from autotest_lib.client.common_lib import global_config
 from autotest_lib.client.common_lib import time_utils
 from autotest_lib.client.common_lib import utils
+from autotest_lib.site_utils import cloud_console_client
 from autotest_lib.site_utils import gs_offloader
 from autotest_lib.site_utils import job_directories
 from autotest_lib.tko import models
 from autotest_lib.utils import gslib
+from autotest_lib.site_utils import pubsub_utils
 from chromite.lib import timeout_util
 
 # Test value to use for `days_old`, if nothing else is required.
@@ -78,7 +79,7 @@
 
 
     def _mock_get_sub_offloader(self, is_moblab, multiprocessing=False,
-                               pubsub_topic=None, delete_age=0):
+                               console_client=None, delete_age=0):
         """Mock the process of getting the offload_dir function."""
         if is_moblab:
             expected_gsuri = '%sresults/%s/%s/' % (
@@ -89,10 +90,22 @@
             expected_gsuri = utils.DEFAULT_OFFLOAD_GSURI
         utils.get_offload_gsuri().AndReturn(expected_gsuri)
         sub_offloader = gs_offloader.GSOffloader(expected_gsuri,
-            multiprocessing, delete_age, pubsub_topic)
+            multiprocessing, delete_age, console_client)
         self.mox.StubOutWithMock(gs_offloader, 'GSOffloader')
-        gs_offloader.GSOffloader(expected_gsuri, multiprocessing,
-            delete_age, pubsub_topic).AndReturn(sub_offloader)
+        self.mox.StubOutWithMock(cloud_console_client,
+                                'is_cloud_notification_enabled')
+        if console_client:
+            cloud_console_client.is_cloud_notification_enabled().AndReturn(True)
+            gs_offloader.GSOffloader(
+                    expected_gsuri, multiprocessing, delete_age,
+                    mox.IsA(cloud_console_client.PubSubBasedClient)).AndReturn(
+                        sub_offloader)
+        else:
+            cloud_console_client.is_cloud_notification_enabled().AndReturn(
+                    False)
+            gs_offloader.GSOffloader(
+                expected_gsuri, multiprocessing, delete_age, None).AndReturn(
+                    sub_offloader)
         self.mox.ReplayAll()
         return sub_offloader
 
@@ -161,7 +174,6 @@
                               gs_offloader.FakeGSOffloader)
         self.assertEqual(offloader._upload_age_limit, 0)
         self.assertEqual(offloader._delete_age_limit, 0)
-        self.assertIsNone(offloader._pubsub_topic)
 
 
     def test_days_old_option(self):
@@ -225,18 +237,13 @@
                          sub_offloader)
         self.mox.VerifyAll()
 
-    def test_offloader_pubsub_topic_not_set(self):
-        """Test multiprocessing is set."""
-        sub_offloader = self._mock_get_sub_offloader(True, False)
-        offloader = gs_offloader.Offloader(_get_options([]))
-        self.assertEqual(offloader._gs_offloader,
-                         sub_offloader)
-        self.mox.VerifyAll()
 
-    def test_offloader_pubsub_topic_set(self):
+    def test_offloader_pubsub_enabled(self):
         """Test multiprocessing is set."""
-        sub_offloader = self._mock_get_sub_offloader(True, False, 'test-topic')
-        offloader = gs_offloader.Offloader(_get_options(['-t', 'test-topic']))
+        self.mox.StubOutWithMock(pubsub_utils, "PubSubClient")
+        sub_offloader = self._mock_get_sub_offloader(True, False,
+                cloud_console_client.PubSubBasedClient())
+        offloader = gs_offloader.Offloader(_get_options([]))
         self.assertEqual(offloader._gs_offloader,
                          sub_offloader)
         self.mox.VerifyAll()
@@ -442,38 +449,6 @@
         self._command_list_assertions(job, multi=True)
 
 
-class PubSubTest(mox.MoxTestBase):
-    """Test the test result notifcation data structure."""
-
-    def test_create_test_result_notification(self):
-        """Tests the test result notification message."""
-        self.mox.StubOutWithMock(utils, 'get_moblab_id')
-        self.mox.StubOutWithMock(utils,
-                                 'get_default_interface_mac_address')
-        utils.get_default_interface_mac_address().AndReturn(
-            '1c:dc:d1:11:01:e1')
-        utils.get_moblab_id().AndReturn(
-            'c8386d92-9ad1-11e6-80f5-111111111111')
-        self.mox.ReplayAll()
-        msg = gs_offloader._create_test_result_notification(
-                'gs://test_bucket', '123-moblab')
-        self.assertEquals(base64.b64encode(
-            gs_offloader.NEW_TEST_RESULT_MESSAGE), msg['data'])
-        self.assertEquals(
-            gs_offloader._NOTIFICATION_VERSION,
-            msg['attributes'][gs_offloader._NOTIFICATION_ATTR_VERSION])
-        self.assertEquals(
-            '1c:dc:d1:11:01:e1',
-            msg['attributes'][gs_offloader._NOTIFICATION_ATTR_MOBLAB_MAC])
-        self.assertEquals(
-            'c8386d92-9ad1-11e6-80f5-111111111111',
-            msg['attributes'][gs_offloader._NOTIFICATION_ATTR_MOBLAB_ID])
-        self.assertEquals(
-            'gs://test_bucket/123-moblab',
-            msg['attributes'][gs_offloader._NOTIFICATION_ATTR_GCS_URI])
-        self.mox.VerifyAll()
-
-
 class _MockJob(object):
     """Class to mock the return value of `AFE.get_jobs()`."""
     def __init__(self, created):