Wrap pubsub code into a reusable class.
BUG=chromium:724523
TEST=Unittest and manully.
Change-Id: Icfa4fa9c1c515945f18a309ef0cc43ed623775be
Reviewed-on: https://chromium-review.googlesource.com/510348
Commit-Ready: Michael Tang <ntang@chromium.org>
Tested-by: Michael Tang <ntang@chromium.org>
Reviewed-by: Keith Haddow <haddowk@chromium.org>
Reviewed-by: Michael Tang <ntang@chromium.org>
diff --git a/site_utils/gs_offloader.py b/site_utils/gs_offloader.py
index a5e3124..b6f8c5f 100755
--- a/site_utils/gs_offloader.py
+++ b/site_utils/gs_offloader.py
@@ -618,7 +618,8 @@
if pubsub_topic:
message = _create_test_result_notification(
gs_path, dir_entry)
- msg_ids = pubsub_utils.publish_notifications(
+ pubsub_client = pubsub_utils.PubSubClient()
+ msg_ids = pubsub_client.publish_notifications(
pubsub_topic, [message])
if not msg_ids:
error = True
diff --git a/site_utils/pubsub_utils.py b/site_utils/pubsub_utils.py
index f62a31d..2f6c71d 100755
--- a/site_utils/pubsub_utils.py
+++ b/site_utils/pubsub_utils.py
@@ -27,37 +27,61 @@
# number of retry to publish an event.
_PUBSUB_NUM_RETRIES = 3
-
-def _get_pubsub_service():
- """Gets the pubsub service api handle."""
- if not os.path.isfile(CLOUD_SERVICE_ACCOUNT_FILE):
- logging.error('No credential file found')
- return None
-
- try:
- credentials = GoogleCredentials.from_stream(CLOUD_SERVICE_ACCOUNT_FILE)
- if credentials.create_scoped_required():
- credentials = credentials.create_scoped(PUBSUB_SCOPES)
- return discovery.build(PUBSUB_SERVICE_NAME, PUBSUB_VERSION,
- credentials=credentials)
- except ApplicationDefaultCredentialsError as ex:
- logging.error('Failed to get credential.')
- except:
- logging.error('Failed to get the pubsub service handle.')
-
- return None
+class PubSubException(Exception):
+ """Exception to be raised when the test to push to prod failed."""
+ pass
-def publish_notifications(topic, messages=[]):
- """Publishes a test result notification to a given pubsub topic.
+class PubSubClient(object):
+ """A generic pubsub client. """
+ def __init__(self, credential_file=CLOUD_SERVICE_ACCOUNT_FILE):
+ """Constructor for PubSubClient.
- @param topic: The Cloud pubsub topic.
- @param messages: A list of notification messages.
+ @param credential_file: The credential filename.
+ @raises PubSubException if the credential file does not exist or
+ corrupted.
+ """
+ self.credential_file = credential_file
+ self.credential = self._get_credential()
- @returns A list of pubsub message ids, and empty if fails.
- """
- pubsub = _get_pubsub_service()
- if pubsub:
+ def _get_credential(self):
+ """Gets the pubsub service api handle."""
+ if not os.path.isfile(self.credential_file):
+ logging.error('No credential file found')
+ raise PubSubException("Credential file does not exists:"
+ + self.credential_file)
+ try:
+ credential = GoogleCredentials.from_stream(self.credential_file)
+ if credential.create_scoped_required():
+ credential = credential.create_scoped(PUBSUB_SCOPES)
+ return credential
+ except ApplicationDefaultCredentialsError as ex:
+ logging.error('Failed to get credential.')
+ except:
+ logging.error('Failed to get the pubsub service handle.')
+
+ raise PubSubException("Credential file does not exists:"
+ + self.credential_file)
+
+ def _get_pubsub_service(self):
+ try:
+ return discovery.build(PUBSUB_SERVICE_NAME, PUBSUB_VERSION,
+ credentials=self.credential)
+ except:
+ logging.error('Failed to get pubsub resource object.')
+ raise PubSubException("Failed to get pubsub resource object")
+
+ def publish_notifications(self, topic, messages=[]):
+ """Publishes a test result notification to a given pubsub topic.
+
+ @param topic: The Cloud pubsub topic.
+ @param messages: A list of notification messages.
+
+ @returns A list of pubsub message ids, and empty if fails.
+
+ @raises PubSubException if failed to publish the notification.
+ """
+ pubsub = self._get_pubsub_service()
try:
body = {'messages': messages}
resp = pubsub.projects().topics().publish(topic=topic,
@@ -68,6 +92,5 @@
logging.debug('Published notification message')
return msgIds
except:
- pass
- logging.error('Failed to publish test result notifiation.')
- return []
+ logging.error('Failed to publish test result notifiation.')
+ raise PubSubException("Failed to publish the notifiation.")
diff --git a/site_utils/pubsub_utils_unittest.py b/site_utils/pubsub_utils_unittest.py
index 8a7c60a..d5244d8 100644
--- a/site_utils/pubsub_utils_unittest.py
+++ b/site_utils/pubsub_utils_unittest.py
@@ -71,17 +71,17 @@
class PubSubTests(mox.MoxTestBase):
"""Tests for pubsub related functios."""
- def test_get_pubsub_service_no_service_account(self):
+ def test_ubsub_with_no_service_account(self):
"""Test getting the pubsub service"""
self.mox.StubOutWithMock(os.path, 'isfile')
os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(False)
self.mox.ReplayAll()
- pubsub = pubsub_utils._get_pubsub_service()
- self.assertIsNone(pubsub)
+ with self.assertRaises(pubsub_utils.PubSubException):
+ pubsub_utils.PubSubClient()
self.mox.VerifyAll()
- def test_get_pubsub_service_with_invalid_service_account(self):
- """Test getting the pubsub service"""
+ def test_pubsub_with_corrupted_service_account(self):
+ """Test pubsub with corrupted service account."""
self.mox.StubOutWithMock(os.path, 'isfile')
self.mox.StubOutWithMock(GoogleCredentials, 'from_stream')
os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
@@ -90,12 +90,12 @@
pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndRaise(
ApplicationDefaultCredentialsError())
self.mox.ReplayAll()
- pubsub = pubsub_utils._get_pubsub_service()
- self.assertIsNone(pubsub)
+ with self.assertRaises(pubsub_utils.PubSubException):
+ pubsub_utils.PubSubClient()
self.mox.VerifyAll()
- def test_get_pubsub_service_with_invalid_service_account(self):
- """Test getting the pubsub service"""
+ def test_pubsub_with_invalid_service_account(self):
+ """Test pubsubwith invalid service account."""
self.mox.StubOutWithMock(os.path, 'isfile')
self.mox.StubOutWithMock(GoogleCredentials, 'from_stream')
os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
@@ -110,11 +110,13 @@
pubsub_utils.PUBSUB_VERSION,
credentials=credentials).AndRaise(UnknownApiNameOrVersion())
self.mox.ReplayAll()
- pubsub = pubsub_utils._get_pubsub_service()
- self.assertIsNone(pubsub)
+ with self.assertRaises(pubsub_utils.PubSubException):
+ msg = _create_sample_message()
+ pubsub_client = pubsub_utils.PubSubClient()
+ pubsub_client.publish_notifications('test_topic', [msg])
self.mox.VerifyAll()
- def test_get_pubsub_service_with_service_account(self):
+ def test_publish_notifications(self):
"""Test getting the pubsub service"""
self.mox.StubOutWithMock(os.path, 'isfile')
self.mox.StubOutWithMock(GoogleCredentials, 'from_stream')
@@ -126,29 +128,22 @@
credentials.create_scoped(pubsub_utils.PUBSUB_SCOPES).AndReturn(
credentials)
self.mox.StubOutWithMock(discovery, 'build')
+ msg = _create_sample_message()
discovery.build(pubsub_utils.PUBSUB_SERVICE_NAME,
pubsub_utils.PUBSUB_VERSION,
- credentials=credentials).AndReturn(1)
- self.mox.ReplayAll()
- pubsub = pubsub_utils._get_pubsub_service()
- self.assertIsNotNone(pubsub)
- self.mox.VerifyAll()
-
- def test_publish_notifications(self):
- """Tests publish notifications."""
- self.mox.StubOutWithMock(pubsub_utils, '_get_pubsub_service')
- msg = _create_sample_message()
- pubsub_utils._get_pubsub_service().AndReturn(MockedPubSub(
- self,
- 'test_topic',
- msg,
- pubsub_utils._PUBSUB_NUM_RETRIES,
- # use tuple ('123') instead of list just for easy to write the test.
- ret_val = {'messageIds', ('123')}))
+ credentials=credentials).AndReturn(MockedPubSub(
+ self,
+ 'test_topic',
+ msg,
+ pubsub_utils._PUBSUB_NUM_RETRIES,
+ # use tuple ('123') instead of list just for easy to
+ # write the test.
+ ret_val = {'messageIds', ('123')}))
self.mox.ReplayAll()
- pubsub_utils.publish_notifications(
- 'test_topic', [msg])
+ with self.assertRaises(pubsub_utils.PubSubException):
+ pubsub_client = pubsub_utils.PubSubClient()
+ pubsub_client.publish_notifications('test_topic', [msg])
self.mox.VerifyAll()