Wrap the pubsub into a cloud console client.
BUG=chromium:724523
TEST=unit tests.
Change-Id: I4338cce8c2c983b4cd50b71014d4f1ca5667cd78
Reviewed-on: https://chromium-review.googlesource.com/540137
Commit-Ready: Michael Tang <ntang@chromium.org>
Tested-by: Michael Tang <ntang@chromium.org>
Reviewed-by: Keith Haddow <haddowk@chromium.org>
Reviewed-by: Michael Tang <ntang@chromium.org>
diff --git a/site_utils/cloud_console.proto b/site_utils/cloud_console.proto
new file mode 100644
index 0000000..c2beab9
--- /dev/null
+++ b/site_utils/cloud_console.proto
@@ -0,0 +1,45 @@
+syntax = "proto2";
+
+// The types of message notification sent from Moblab.
+enum MessageType {
+ MOBLAB_HEARTBEAT = 1;
+ MOBLAB_REMOTE_EVENT = 2;
+ MOBLAB_ALERT = 3;
+}
+
+message Timestamp {
+ required int64 seconds = 1;
+ optional int64 nanos = 2 [default = 0];
+}
+
+// The heartbeat message
+message Heartbeat {
+ optional Timestamp timestamp = 1;
+}
+
+// The remote event notification message.
+message RemoteEventMessage {
+ // EventType is an enumeration of event types sent to cloud console.
+ // Any new event type should be added here.
+ enum EventType {
+ MOBLAB_INFO = 1;
+ MOBLAB_BOOT_COMPLETE = 2;
+ }
+
+ required EventType event_type = 1 [default = MOBLAB_INFO];
+ optional string event_data = 2;
+}
+
+// Moblab alerts
+message Alert {
+ enum AlertLevel {
+ CRITICAL = 1;
+ MAJOR = 2;
+ MINOR = 3;
+ }
+ required AlertLevel level = 1;
+ optional string data = 2;
+ optional Timestamp timestamp = 3;
+ optional string source_application = 4;
+ optional string source_component = 5;
+}
diff --git a/site_utils/cloud_console_client.py b/site_utils/cloud_console_client.py
new file mode 100644
index 0000000..892f677
--- /dev/null
+++ b/site_utils/cloud_console_client.py
@@ -0,0 +1,189 @@
+# Copyright 2016 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Chrome OS Parnter Concole remote actions."""
+
+from __future__ import print_function
+
+import base64
+import logging
+
+import common
+
+from autotest_lib.client.common_lib import global_config
+from autotest_lib.client.common_lib import utils
+from autotest_lib.server.hosts import moblab_host
+from autotest_lib.site_utils import pubsub_utils
+
+
+_PUBSUB_TOPIC = global_config.global_config.get_config_value(
+ 'CROS', 'cloud_notification_topic', default=None)
+
+# Test upload pubsub notification attributes
+NOTIFICATION_ATTR_VERSION = 'version'
+NOTIFICATION_ATTR_GCS_URI = 'gcs_uri'
+NOTIFICATION_ATTR_MOBLAB_MAC = 'moblab_mac_address'
+NOTIFICATION_ATTR_MOBLAB_ID = 'moblab_id'
+NOTIFICATION_VERSION = '1'
+# the message data for new test result notification.
+NEW_TEST_RESULT_MESSAGE = 'NEW_TEST_RESULT'
+
+
+ALERT_CRITICAL = 'Critical'
+ALERT_MAJOR = 'Major'
+ALERT_MINOR = 'Minor'
+
+LOG_INFO = 'Info'
+LOG_WARNING = 'Warning'
+LOG_SEVERE = 'Severe'
+LOG_FATAL = 'Fatal'
+
+
+def is_cloud_notification_enabled():
+ """Checks if cloud pubsub notification is enabled.
+
+ @returns: True if cloud pubsub notification is enabled. Otherwise, False.
+ """
+ return global_config.global_config.get_config_value(
+ 'CROS', 'cloud_notification_enabled', type=bool, default=False)
+
+
+class CloudConsoleClient(object):
+ """The remote interface to the Cloud Console."""
+ def send_heartbeat(self):
+ """Sends a heartbeat.
+
+ @returns True if the notification is successfully sent.
+ Otherwise, False.
+ """
+ pass
+
+ def send_event(self, event_type=None, event_data=None):
+ """Sends an event notification to the remote console.
+
+ @param event_type: The event type that is defined in the protobuffer
+ file 'cloud_console.proto'.
+ @param event_data: The event data.
+
+ @returns True if the notification is successfully sent.
+ Otherwise, False.
+ """
+ pass
+
+ def send_log(self, msg, level=LOG_INFO, session_id=None):
+ """Sends a log message to the remote console.
+
+ @param msg: The log message.
+ @param level: The logging level as string.
+ @param session_id: The current session id.
+
+ @returns True if the notification is successfully sent.
+ Otherwise, False.
+ """
+ pass
+
+ def send_alert(self, msg, level=ALERT_MINOR, session_id=None):
+ """Sends an alert to the remote console.
+
+ @param msg: The alert message.
+ @param level: The logging level as string.
+ @param session_id: The current session id.
+
+ @returns True if the notification is successfully sent.
+ Otherwise, False.
+ """
+ pass
+
+ def send_test_job_offloaded_message(self, gcs_uri):
+ """Sends a test job offloaded message to the remote console.
+
+ @param gcs_uri: The test result Google Cloud Storage URI.
+
+ @returns True if the notification is successfully sent.
+ Otherwise, False.
+ """
+ pass
+
+
+# Make it easy to mock out
+def _create_pubsub_client(credential):
+ return pubsub_utils.PubSubClient(credential)
+
+
+class PubSubBasedClient(CloudConsoleClient):
+ """A Cloud PubSub based implementation of the CloudConsoleClient interface.
+ """
+ def __init__(
+ self,
+ credential=moblab_host.MOBLAB_SERVICE_ACCOUNT_LOCATION,
+ pubsub_topic=_PUBSUB_TOPIC):
+ """Constructor.
+
+ @param credential: The service account credential filename. Default to
+ '/home/moblab/.service_account.json'.
+ @param pubsub_topic: The cloud pubsub topic name to use.
+ """
+ super(PubSubBasedClient, self).__init__()
+ self._pubsub_client = _create_pubsub_client(credential)
+ self._pubsub_topic = pubsub_topic
+
+
+ def _create_notification_message(self, data, msg_attributes):
+ """Creates a cloud pubsub notification object.
+
+ @param data: The message data as a string.
+ @param msg_attributes: The message attribute map.
+
+ @returns: A pubsub message object with data and attributes.
+ """
+ message = {'data': data}
+ message['attributes'] = msg_attributes
+ return message
+
+ def _create_notification_attributes(self):
+ """Creates a cloud pubsub notification message attribute map.
+
+ Fills in the version, moblab mac address, and moblab id information
+ as attributes.
+
+ @returns: A pubsub messsage attribute map.
+ """
+ msg_attributes = {}
+ msg_attributes[NOTIFICATION_ATTR_VERSION] = NOTIFICATION_VERSION
+ msg_attributes[NOTIFICATION_ATTR_MOBLAB_MAC] = (
+ utils.get_default_interface_mac_address())
+ msg_attributes[NOTIFICATION_ATTR_MOBLAB_ID] = utils.get_moblab_id()
+ return msg_attributes
+
+ def _create_test_result_notification(self, gcs_uri):
+ """Construct a test result notification.
+
+ @param gcs_uri: The test result Google Cloud Storage URI.
+
+ @returns The notification message.
+ """
+ data = base64.b64encode(NEW_TEST_RESULT_MESSAGE)
+ msg_attributes = self._create_notification_attributes()
+ msg_attributes[NOTIFICATION_ATTR_GCS_URI] = gcs_uri
+
+ return self._create_notification_message(data, msg_attributes)
+
+
+ def send_test_job_offloaded_message(self, gcs_uri):
+ """Notify the cloud console a test job is offloaded.
+
+ @param gcs_uri: The test result Google Cloud Storage URI.
+
+ @returns True if the notification is successfully sent.
+ Otherwise, False.
+ """
+ logging.info('Notification on gcs_uri %s', gcs_uri)
+ message = self._create_test_result_notification(gcs_uri)
+ msg_ids = self._pubsub_client.publish_notifications(
+ self._pubsub_topic, [message])
+ if msg_ids:
+ return True
+ logging.warning('Failed to send notification on gcs_uri %s', gcs_uri)
+ return False
+
diff --git a/site_utils/cloud_console_client_unittest.py b/site_utils/cloud_console_client_unittest.py
new file mode 100644
index 0000000..2258b99
--- /dev/null
+++ b/site_utils/cloud_console_client_unittest.py
@@ -0,0 +1,86 @@
+#!/usr/bin/python
+# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import base64
+import mox
+import unittest
+
+import common
+
+from autotest_lib.client.common_lib import utils
+from autotest_lib.site_utils import cloud_console_client
+from autotest_lib.site_utils import pubsub_utils
+
+class PubSubBasedClientTests(mox.MoxTestBase):
+ """Tests for the 'PubSubBasedClient'."""
+
+ def setUp(self):
+ super(PubSubBasedClientTests, self).setUp()
+ self._pubsub_client_mock = self.mox.CreateMock(
+ pubsub_utils.PubSubClient)
+ self._stubs = mox.stubout.StubOutForTesting()
+ self._stubs.Set(cloud_console_client, '_create_pubsub_client',
+ lambda x: self._pubsub_client_mock)
+
+ def tearDown(self):
+ self._stubs.UnsetAll()
+ super(PubSubBasedClientTests, self).tearDown()
+
+ def test_create_test_result_notification(self):
+ """Tests the test result notification message."""
+ self._console_client = cloud_console_client.PubSubBasedClient()
+ self.mox.StubOutWithMock(utils, 'get_moblab_id')
+ self.mox.StubOutWithMock(utils,
+ 'get_default_interface_mac_address')
+ utils.get_default_interface_mac_address().AndReturn(
+ '1c:dc:d1:11:01:e1')
+ utils.get_moblab_id().AndReturn(
+ 'c8386d92-9ad1-11e6-80f5-111111111111')
+ self.mox.ReplayAll()
+ console_client = cloud_console_client.PubSubBasedClient()
+ msg = console_client._create_test_result_notification(
+ 'gs://test_bucket/123-moblab')
+ self.assertEquals(base64.b64encode(
+ cloud_console_client.NEW_TEST_RESULT_MESSAGE), msg['data'])
+ self.assertEquals(
+ cloud_console_client.NOTIFICATION_VERSION,
+ msg['attributes'][cloud_console_client.NOTIFICATION_ATTR_VERSION])
+ self.assertEquals(
+ '1c:dc:d1:11:01:e1',
+ msg['attributes'][cloud_console_client.NOTIFICATION_ATTR_MOBLAB_MAC]
+ )
+ self.assertEquals(
+ 'c8386d92-9ad1-11e6-80f5-111111111111',
+ msg['attributes'][cloud_console_client.NOTIFICATION_ATTR_MOBLAB_ID])
+ self.assertEquals(
+ 'gs://test_bucket/123-moblab',
+ msg['attributes'][cloud_console_client.NOTIFICATION_ATTR_GCS_URI])
+ self.mox.VerifyAll()
+
+
+ def test_send_test_job_offloaded_message(self):
+ """Tests send job offloaded notification."""
+ console_client = cloud_console_client.PubSubBasedClient(
+ pubsub_topic='test topic')
+
+ # self.mox.ResetAll()
+ message = {'data': 'dummy data', 'attributes' : {'key' : 'value'}}
+ self.mox.StubOutWithMock(cloud_console_client.PubSubBasedClient,
+ '_create_test_result_notification')
+ console_client._create_test_result_notification(
+ 'gs://test_bucket/123-moblab').AndReturn(message)
+
+ msg_ids = ['1']
+ self._pubsub_client_mock.publish_notifications(
+ 'test topic', [message]).AndReturn(msg_ids)
+
+ self.mox.ReplayAll()
+ console_client.send_test_job_offloaded_message(
+ 'gs://test_bucket/123-moblab')
+ self.mox.VerifyAll()
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/site_utils/gs_offloader.py b/site_utils/gs_offloader.py
index 07e7fbd..95b31b6 100755
--- a/site_utils/gs_offloader.py
+++ b/site_utils/gs_offloader.py
@@ -11,7 +11,6 @@
"""
import abc
-import base64
import datetime
import errno
import glob
@@ -32,17 +31,15 @@
from optparse import OptionParser
import common
-from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib import file_utils
from autotest_lib.client.common_lib import global_config
from autotest_lib.client.common_lib import utils
from autotest_lib.client.common_lib.cros.graphite import autotest_es
from autotest_lib.site_utils import job_directories
-from autotest_lib.site_utils import pubsub_utils
+from autotest_lib.site_utils import cloud_console_client
from autotest_lib.tko import models
from autotest_lib.utils import labellib
from autotest_lib.utils import gslib
-from chromite.lib import gs
from chromite.lib import timeout_util
# Autotest requires the psutil module from site-packages, so it must be imported
@@ -120,15 +117,6 @@
DEFAULT_CTS_APFE_GSURI = global_config.global_config.get_config_value(
'CROS', 'cts_apfe_server', default='')
-_PUBSUB_ENABLED = global_config.global_config.get_config_value(
- 'CROS', 'cloud_notification_enabled', type=bool, default=False)
-_PUBSUB_TOPIC = global_config.global_config.get_config_value(
- 'CROS', 'cloud_notification_topic', default=None)
-
-
-# the message data for new test result notification.
-NEW_TEST_RESULT_MESSAGE = 'NEW_TEST_RESULT'
-
# metadata type
GS_OFFLOADER_SUCCESS_TYPE = 'gs_offloader_success'
GS_OFFLOADER_FAILURE_TYPE = 'gs_offloader_failure'
@@ -491,37 +479,6 @@
os.remove(test_result_file_gz)
-# Test upload pubsub notification attributes
-_NOTIFICATION_ATTR_VERSION = 'version'
-_NOTIFICATION_ATTR_GCS_URI = 'gcs_uri'
-_NOTIFICATION_ATTR_MOBLAB_MAC = 'moblab_mac_address'
-_NOTIFICATION_ATTR_MOBLAB_ID = 'moblab_id'
-_NOTIFICATION_VERSION = '1'
-
-
-def _create_test_result_notification(gs_path, dir_entry):
- """Construct a test result notification.
-
- @param gs_path: The test result Google Cloud Storage URI.
- @param dir_entry: The local offload directory name.
-
- @returns The notification message.
- """
- gcs_uri = os.path.join(gs_path, os.path.basename(dir_entry))
- logging.info('Notification on gcs_uri %s', gcs_uri)
- data = base64.b64encode(NEW_TEST_RESULT_MESSAGE)
- msg_payload = {'data': data}
- msg_attributes = {}
- msg_attributes[_NOTIFICATION_ATTR_GCS_URI] = gcs_uri
- msg_attributes[_NOTIFICATION_ATTR_VERSION] = _NOTIFICATION_VERSION
- msg_attributes[_NOTIFICATION_ATTR_MOBLAB_MAC] = \
- utils.get_default_interface_mac_address()
- msg_attributes[_NOTIFICATION_ATTR_MOBLAB_ID] = utils.get_moblab_id()
- msg_payload['attributes'] = msg_attributes
-
- return msg_payload
-
-
def _emit_gs_returncode_metric(returncode):
"""Increment the gs_returncode counter based on |returncode|."""
m_gs_returncode = 'chromeos/autotest/gs_offloader/gs_returncode'
@@ -552,18 +509,19 @@
class GSOffloader(BaseGSOffloader):
"""Google Storage Offloader."""
- def __init__(self, gs_uri, multiprocessing, delete_age, pubsub_topic=None):
+ def __init__(self, gs_uri, multiprocessing, delete_age,
+ console_client=None):
"""Returns the offload directory function for the given gs_uri
@param gs_uri: Google storage bucket uri to offload to.
@param multiprocessing: True to turn on -m option for gsutil.
- @param pubsub_topic: The pubsub topic to publish notificaitons. If None,
- pubsub is not enabled.
+ @param console_client: The cloud console client. If None,
+ cloud console APIs are not called.
"""
self._gs_uri = gs_uri
self._multiprocessing = multiprocessing
self._delete_age = delete_age
- self._pubsub_topic = pubsub_topic
+ self._console_client = console_client
@metrics.SecondsTimerDecorator(
'chromeos/autotest/gs_offloader/job_offload_duration')
@@ -655,14 +613,13 @@
type_str=GS_OFFLOADER_SUCCESS_TYPE,
metadata=es_metadata)
- if self._pubsub_topic:
- message = _create_test_result_notification(
- gs_path, dir_entry)
- pubsub_client = pubsub_utils.PubSubClient()
- msg_ids = pubsub_client.publish_notifications(
- self._pubsub_topic, [message])
- if not msg_ids:
+ if self._console_client:
+ gcs_uri = os.path.join(gs_path,
+ os.path.basename(dir_entry))
+ if not self._console_client.send_test_job_offloaded_message(
+ gcs_uri):
raise error_obj
+
_mark_uploaded(dir_entry)
except timeout_util.TimeoutError:
m_timeout = 'chromeos/autotest/errors/gs_offloader/timed_out_count'
@@ -839,7 +796,6 @@
"""
def __init__(self, options):
- self._pubsub_topic = None
self._upload_age_limit = options.age_to_upload
self._delete_age_limit = options.age_to_delete
if options.delete_only:
@@ -854,13 +810,12 @@
multiprocessing = GS_OFFLOADER_MULTIPROCESSING
logging.info(
'Offloader multiprocessing is set to:%r', multiprocessing)
- if options.pubsub_topic_for_job_upload:
- self._pubsub_topic = options.pubsub_topic_for_job_upload
- elif _PUBSUB_ENABLED:
- self._pubsub_topic = _PUBSUB_TOPIC
+ console_client = None
+ if cloud_console_client.is_cloud_notification_enabled():
+ console_client = cloud_console_client.PubSubBasedClient()
self._gs_offloader = GSOffloader(
self.gs_uri, multiprocessing, self._delete_age_limit,
- self._pubsub_topic)
+ console_client)
classlist = []
if options.process_hosts_only or options.process_all:
classlist.append(job_directories.SpecialJobDirectory)
@@ -1052,11 +1007,6 @@
parser.add_option('-d', '--days_old', dest='days_old',
help='Minimum job age in days before a result can be '
'offloaded.', type='int', default=0)
- parser.add_option('-t', '--pubsub_topic_for_job_upload',
- dest='pubsub_topic_for_job_upload',
- help='The pubsub topic to send notifciations for '
- 'new job upload',
- action='store', type='string', default=None)
parser.add_option('-l', '--log_size', dest='log_size',
help='Limit the offloader logs to a specified '
'number of Mega Bytes.', type='int', default=0)
diff --git a/site_utils/gs_offloader_unittest.py b/site_utils/gs_offloader_unittest.py
index 143d302..4cf3b6f 100755
--- a/site_utils/gs_offloader_unittest.py
+++ b/site_utils/gs_offloader_unittest.py
@@ -5,7 +5,6 @@
import __builtin__
import Queue
-import base64
import datetime
import logging
import os
@@ -24,10 +23,12 @@
from autotest_lib.client.common_lib import global_config
from autotest_lib.client.common_lib import time_utils
from autotest_lib.client.common_lib import utils
+from autotest_lib.site_utils import cloud_console_client
from autotest_lib.site_utils import gs_offloader
from autotest_lib.site_utils import job_directories
from autotest_lib.tko import models
from autotest_lib.utils import gslib
+from autotest_lib.site_utils import pubsub_utils
from chromite.lib import timeout_util
# Test value to use for `days_old`, if nothing else is required.
@@ -78,7 +79,7 @@
def _mock_get_sub_offloader(self, is_moblab, multiprocessing=False,
- pubsub_topic=None, delete_age=0):
+ console_client=None, delete_age=0):
"""Mock the process of getting the offload_dir function."""
if is_moblab:
expected_gsuri = '%sresults/%s/%s/' % (
@@ -89,10 +90,22 @@
expected_gsuri = utils.DEFAULT_OFFLOAD_GSURI
utils.get_offload_gsuri().AndReturn(expected_gsuri)
sub_offloader = gs_offloader.GSOffloader(expected_gsuri,
- multiprocessing, delete_age, pubsub_topic)
+ multiprocessing, delete_age, console_client)
self.mox.StubOutWithMock(gs_offloader, 'GSOffloader')
- gs_offloader.GSOffloader(expected_gsuri, multiprocessing,
- delete_age, pubsub_topic).AndReturn(sub_offloader)
+ self.mox.StubOutWithMock(cloud_console_client,
+ 'is_cloud_notification_enabled')
+ if console_client:
+ cloud_console_client.is_cloud_notification_enabled().AndReturn(True)
+ gs_offloader.GSOffloader(
+ expected_gsuri, multiprocessing, delete_age,
+ mox.IsA(cloud_console_client.PubSubBasedClient)).AndReturn(
+ sub_offloader)
+ else:
+ cloud_console_client.is_cloud_notification_enabled().AndReturn(
+ False)
+ gs_offloader.GSOffloader(
+ expected_gsuri, multiprocessing, delete_age, None).AndReturn(
+ sub_offloader)
self.mox.ReplayAll()
return sub_offloader
@@ -161,7 +174,6 @@
gs_offloader.FakeGSOffloader)
self.assertEqual(offloader._upload_age_limit, 0)
self.assertEqual(offloader._delete_age_limit, 0)
- self.assertIsNone(offloader._pubsub_topic)
def test_days_old_option(self):
@@ -225,18 +237,13 @@
sub_offloader)
self.mox.VerifyAll()
- def test_offloader_pubsub_topic_not_set(self):
- """Test multiprocessing is set."""
- sub_offloader = self._mock_get_sub_offloader(True, False)
- offloader = gs_offloader.Offloader(_get_options([]))
- self.assertEqual(offloader._gs_offloader,
- sub_offloader)
- self.mox.VerifyAll()
- def test_offloader_pubsub_topic_set(self):
+ def test_offloader_pubsub_enabled(self):
"""Test multiprocessing is set."""
- sub_offloader = self._mock_get_sub_offloader(True, False, 'test-topic')
- offloader = gs_offloader.Offloader(_get_options(['-t', 'test-topic']))
+ self.mox.StubOutWithMock(pubsub_utils, "PubSubClient")
+ sub_offloader = self._mock_get_sub_offloader(True, False,
+ cloud_console_client.PubSubBasedClient())
+ offloader = gs_offloader.Offloader(_get_options([]))
self.assertEqual(offloader._gs_offloader,
sub_offloader)
self.mox.VerifyAll()
@@ -442,38 +449,6 @@
self._command_list_assertions(job, multi=True)
-class PubSubTest(mox.MoxTestBase):
- """Test the test result notifcation data structure."""
-
- def test_create_test_result_notification(self):
- """Tests the test result notification message."""
- self.mox.StubOutWithMock(utils, 'get_moblab_id')
- self.mox.StubOutWithMock(utils,
- 'get_default_interface_mac_address')
- utils.get_default_interface_mac_address().AndReturn(
- '1c:dc:d1:11:01:e1')
- utils.get_moblab_id().AndReturn(
- 'c8386d92-9ad1-11e6-80f5-111111111111')
- self.mox.ReplayAll()
- msg = gs_offloader._create_test_result_notification(
- 'gs://test_bucket', '123-moblab')
- self.assertEquals(base64.b64encode(
- gs_offloader.NEW_TEST_RESULT_MESSAGE), msg['data'])
- self.assertEquals(
- gs_offloader._NOTIFICATION_VERSION,
- msg['attributes'][gs_offloader._NOTIFICATION_ATTR_VERSION])
- self.assertEquals(
- '1c:dc:d1:11:01:e1',
- msg['attributes'][gs_offloader._NOTIFICATION_ATTR_MOBLAB_MAC])
- self.assertEquals(
- 'c8386d92-9ad1-11e6-80f5-111111111111',
- msg['attributes'][gs_offloader._NOTIFICATION_ATTR_MOBLAB_ID])
- self.assertEquals(
- 'gs://test_bucket/123-moblab',
- msg['attributes'][gs_offloader._NOTIFICATION_ATTR_GCS_URI])
- self.mox.VerifyAll()
-
-
class _MockJob(object):
"""Class to mock the return value of `AFE.get_jobs()`."""
def __init__(self, created):
diff --git a/site_utils/pubsub_utils.py b/site_utils/pubsub_utils.py
index 2f6c71d..6d7670b 100755
--- a/site_utils/pubsub_utils.py
+++ b/site_utils/pubsub_utils.py
@@ -8,24 +8,22 @@
Upon successful copy, the local results directory is deleted.
"""
-import logging
+from __future__ import print_function
+
import os
from apiclient import discovery
+from apiclient import errors
from oauth2client.client import ApplicationDefaultCredentialsError
from oauth2client.client import GoogleCredentials
-from autotest_lib.server.hosts import moblab_host
-
-import common
+from chromite.lib import cros_logging as logging
# Cloud service
-# TODO(ntang): move this to config.
-CLOUD_SERVICE_ACCOUNT_FILE = moblab_host.MOBLAB_SERVICE_ACCOUNT_LOCATION
PUBSUB_SERVICE_NAME = 'pubsub'
PUBSUB_VERSION = 'v1beta2'
PUBSUB_SCOPES = ['https://www.googleapis.com/auth/pubsub']
# number of retry to publish an event.
-_PUBSUB_NUM_RETRIES = 3
+DEFAULT_PUBSUB_NUM_RETRIES = 3
class PubSubException(Exception):
"""Exception to be raised when the test to push to prod failed."""
@@ -33,14 +31,18 @@
class PubSubClient(object):
- """A generic pubsub client. """
- def __init__(self, credential_file=CLOUD_SERVICE_ACCOUNT_FILE):
+ """A generic pubsub client."""
+ def __init__(self, credential_file=None):
"""Constructor for PubSubClient.
- @param credential_file: The credential filename.
- @raises PubSubException if the credential file does not exist or
- corrupted.
+ Args:
+ credential_file: The credential filename.
+
+ Raises:
+ PubSubException if the credential file does not exist or corrupted.
"""
+ if not credential_file:
+ raise PubSubException('You need to specify a credential file.')
self.credential_file = credential_file
self.credential = self._get_credential()
@@ -48,30 +50,30 @@
"""Gets the pubsub service api handle."""
if not os.path.isfile(self.credential_file):
logging.error('No credential file found')
- raise PubSubException("Credential file does not exists:"
- + self.credential_file)
+ raise PubSubException('Credential file does not exist:' +
+ self.credential_file)
try:
credential = GoogleCredentials.from_stream(self.credential_file)
if credential.create_scoped_required():
credential = credential.create_scoped(PUBSUB_SCOPES)
return credential
except ApplicationDefaultCredentialsError as ex:
- logging.error('Failed to get credential.')
- except:
- logging.error('Failed to get the pubsub service handle.')
+ logging.exception('Failed to get credential:%s', ex)
+ except errors.Error as e:
+ logging.exception('Failed to get the pubsub service handle:%s', e)
- raise PubSubException("Credential file does not exists:"
- + self.credential_file)
+ raise PubSubException('Credential file %s does not exists:' %
+ self.credential_file)
def _get_pubsub_service(self):
try:
return discovery.build(PUBSUB_SERVICE_NAME, PUBSUB_VERSION,
credentials=self.credential)
- except:
- logging.error('Failed to get pubsub resource object.')
- raise PubSubException("Failed to get pubsub resource object")
+ except errors.Error as e:
+ logging.exception('Failed to get pubsub resource object:%s', e)
+ raise PubSubException('Failed to get pubsub resource object')
- def publish_notifications(self, topic, messages=[]):
+ def publish_notifications(self, topic, messages=None):
"""Publishes a test result notification to a given pubsub topic.
@param topic: The Cloud pubsub topic.
@@ -81,16 +83,24 @@
@raises PubSubException if failed to publish the notification.
"""
+ if not messages:
+ return None
+
pubsub = self._get_pubsub_service()
try:
body = {'messages': messages}
- resp = pubsub.projects().topics().publish(topic=topic,
- body=body).execute(num_retries=_PUBSUB_NUM_RETRIES)
+ resp = pubsub.projects().topics().publish(
+ topic=topic, body=body).execute(
+ num_retries=DEFAULT_PUBSUB_NUM_RETRIES)
+ msgIds = []
if resp:
msgIds = resp.get('messageIds')
if msgIds:
logging.debug('Published notification message')
- return msgIds
- except:
- logging.error('Failed to publish test result notifiation.')
- raise PubSubException("Failed to publish the notifiation.")
+ else:
+ logging.error('Failed to published notification message')
+ return msgIds
+ except errors.Error as e:
+ logging.exception('Failed to publish test result notification:%s',
+ e)
+ raise PubSubException('Failed to publish the notification')
diff --git a/site_utils/pubsub_utils_unittest.py b/site_utils/pubsub_utils_unittest.py
index d5244d8..3c604ce 100644
--- a/site_utils/pubsub_utils_unittest.py
+++ b/site_utils/pubsub_utils_unittest.py
@@ -1,9 +1,11 @@
-#!/usr/bin/env python
-#
+#!/usr/bin/env python2
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
+"""Unit test for pubsub_utils.py"""
+
+from __future__ import print_function
import os
import unittest
@@ -14,14 +16,15 @@
from oauth2client.client import GoogleCredentials
from googleapiclient.errors import UnknownApiNameOrVersion
-import common
import pubsub_utils
+_TEST_CLOUD_SERVICE_ACCOUNT_FILE = '/tmp/test-credential'
+
class MockedPubSub(object):
"""A mocked PubSub handle."""
def __init__(self, test, topic, msg, retry, ret_val=None,
- raise_except=False):
+ raise_except=False):
self.test = test
self.topic = topic
self.msg = msg
@@ -52,10 +55,10 @@
@param num_retries: Number of retries.
"""
- self.test.assertEquals(self.num_retries, num_retries)
+ self.test.assertEquals(self.retry, num_retries)
if self.raise_except:
raise Exception()
- return self.ret
+ return self.ret_val
def _create_sample_message():
@@ -71,48 +74,57 @@
class PubSubTests(mox.MoxTestBase):
"""Tests for pubsub related functios."""
- def test_ubsub_with_no_service_account(self):
+ def test_pubsub_with_no_service_account(self):
"""Test getting the pubsub service"""
self.mox.StubOutWithMock(os.path, 'isfile')
- os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(False)
self.mox.ReplayAll()
with self.assertRaises(pubsub_utils.PubSubException):
pubsub_utils.PubSubClient()
self.mox.VerifyAll()
+ def test_pubsub_with_non_existing_service_account(self):
+ """Test getting the pubsub service"""
+ self.mox.StubOutWithMock(os.path, 'isfile')
+ os.path.isfile(_TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(False)
+ self.mox.ReplayAll()
+ with self.assertRaises(pubsub_utils.PubSubException):
+ pubsub_utils.PubSubClient(_TEST_CLOUD_SERVICE_ACCOUNT_FILE)
+ self.mox.VerifyAll()
+
def test_pubsub_with_corrupted_service_account(self):
"""Test pubsub with corrupted service account."""
self.mox.StubOutWithMock(os.path, 'isfile')
self.mox.StubOutWithMock(GoogleCredentials, 'from_stream')
- os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
- credentials = self.mox.CreateMock(GoogleCredentials)
+ os.path.isfile(_TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
GoogleCredentials.from_stream(
- pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndRaise(
- ApplicationDefaultCredentialsError())
+ _TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndRaise(
+ ApplicationDefaultCredentialsError())
self.mox.ReplayAll()
with self.assertRaises(pubsub_utils.PubSubException):
- pubsub_utils.PubSubClient()
+ pubsub_utils.PubSubClient(_TEST_CLOUD_SERVICE_ACCOUNT_FILE)
self.mox.VerifyAll()
def test_pubsub_with_invalid_service_account(self):
"""Test pubsubwith invalid service account."""
self.mox.StubOutWithMock(os.path, 'isfile')
self.mox.StubOutWithMock(GoogleCredentials, 'from_stream')
- os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
+ os.path.isfile(_TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
credentials = self.mox.CreateMock(GoogleCredentials)
GoogleCredentials.from_stream(
- pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(credentials)
+ _TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(credentials)
credentials.create_scoped_required().AndReturn(True)
credentials.create_scoped(pubsub_utils.PUBSUB_SCOPES).AndReturn(
- credentials)
+ credentials)
self.mox.StubOutWithMock(discovery, 'build')
- discovery.build(pubsub_utils.PUBSUB_SERVICE_NAME,
- pubsub_utils.PUBSUB_VERSION,
- credentials=credentials).AndRaise(UnknownApiNameOrVersion())
+ discovery.build(
+ pubsub_utils.PUBSUB_SERVICE_NAME,
+ pubsub_utils.PUBSUB_VERSION,
+ credentials=credentials).AndRaise(UnknownApiNameOrVersion())
self.mox.ReplayAll()
with self.assertRaises(pubsub_utils.PubSubException):
msg = _create_sample_message()
- pubsub_client = pubsub_utils.PubSubClient()
+ pubsub_client = pubsub_utils.PubSubClient(
+ _TEST_CLOUD_SERVICE_ACCOUNT_FILE)
pubsub_client.publish_notifications('test_topic', [msg])
self.mox.VerifyAll()
@@ -120,30 +132,33 @@
"""Test getting the pubsub service"""
self.mox.StubOutWithMock(os.path, 'isfile')
self.mox.StubOutWithMock(GoogleCredentials, 'from_stream')
- os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
+ os.path.isfile(_TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
credentials = self.mox.CreateMock(GoogleCredentials)
GoogleCredentials.from_stream(
- pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(credentials)
+ _TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(credentials)
credentials.create_scoped_required().AndReturn(True)
credentials.create_scoped(pubsub_utils.PUBSUB_SCOPES).AndReturn(
- credentials)
+ credentials)
self.mox.StubOutWithMock(discovery, 'build')
msg = _create_sample_message()
- discovery.build(pubsub_utils.PUBSUB_SERVICE_NAME,
- pubsub_utils.PUBSUB_VERSION,
- credentials=credentials).AndReturn(MockedPubSub(
- self,
- 'test_topic',
- msg,
- pubsub_utils._PUBSUB_NUM_RETRIES,
- # use tuple ('123') instead of list just for easy to
- # write the test.
- ret_val = {'messageIds', ('123')}))
+ discovery.build(
+ pubsub_utils.PUBSUB_SERVICE_NAME,
+ pubsub_utils.PUBSUB_VERSION,
+ credentials=credentials).AndReturn(MockedPubSub(
+ self,
+ 'test_topic',
+ msg,
+ pubsub_utils.DEFAULT_PUBSUB_NUM_RETRIES,
+ # use tuple ('123') instead of list just for easy to
+ # write the test.
+ ret_val={'messageIds': ('123')}))
self.mox.ReplayAll()
- with self.assertRaises(pubsub_utils.PubSubException):
- pubsub_client = pubsub_utils.PubSubClient()
- pubsub_client.publish_notifications('test_topic', [msg])
+ pubsub_client = pubsub_utils.PubSubClient(
+ _TEST_CLOUD_SERVICE_ACCOUNT_FILE)
+ msg_ids = pubsub_client.publish_notifications('test_topic', [msg])
+ self.assertEquals(('123'), msg_ids)
+
self.mox.VerifyAll()