Implements the cloud console client based on pubsub.

Also makes sure the unittest will succeded when the cloud_console.proto
is not compiled.

BUG=chromium:724523
TEST=unit test.

Change-Id: Ib605c1a2f5dbb2bd871aac272da7f9a4ab7ed35a
Reviewed-on: https://chromium-review.googlesource.com/569453
Commit-Ready: Michael Tang <ntang@chromium.org>
Tested-by: Michael Tang <ntang@chromium.org>
Reviewed-by: Keith Haddow <haddowk@chromium.org>
Reviewed-by: Michael Tang <ntang@chromium.org>
diff --git a/moblab_config.ini b/moblab_config.ini
index 70c04ce..a702404 100644
--- a/moblab_config.ini
+++ b/moblab_config.ini
@@ -37,3 +37,6 @@
 
 # Reduce upload bandwidth for partner by switching on tar and compress results.
 gs_offloader_limit_file_count: True
+
+# Heartbeat rate to the cloud.
+heartbeat_rate_seconds: 0
diff --git a/site_utils/cloud_console.proto b/site_utils/cloud_console.proto
index c2beab9..25c2fa9 100644
--- a/site_utils/cloud_console.proto
+++ b/site_utils/cloud_console.proto
@@ -2,17 +2,32 @@
 
 // The types of message notification sent from Moblab.
 enum MessageType {
-    MOBLAB_HEARTBEAT = 1;
-    MOBLAB_REMOTE_EVENT = 2;
-    MOBLAB_ALERT = 3;
+  MSG_UNKNOWN = 0;
+  MSG_MOBLAB_HEARTBEAT = 1;
+  MSG_MOBLAB_REMOTE_EVENT = 2;
+  MSG_MOBLAB_ALERT = 3;
 }
 
+// The common pubsub notification attribute names.
+enum MessageAttribute {
+  ATTR_INVALID = 0;
+  ATTR_MESSAGE_TYPE = 1;
+  ATTR_MESSAGE_VERSION = 2;
+  ATTR_MOBLAB_MAC_ADDRESS = 3;
+  ATTR_MOBLAB_ID = 4;
+}
+
+// Timestamp is not defined in proto2. We need to define it by ourselves.
+// It represents a point in time independent of any time zone
+// or calendar, represented as seconds and fractions of seconds at
+// nanosecond resolution.
+// Clone from https://github.com/google/protobuf/blob/master/src/google/protobuf/timestamp.proto.
 message Timestamp {
-  required int64 seconds = 1;
+  optional int64 seconds = 1;
+  // Non-negative fractions of a second at nanosecond resolution.
   optional int64 nanos = 2 [default = 0];
 }
 
-// The heartbeat message
 message Heartbeat {
   optional Timestamp timestamp = 1;
 }
@@ -21,23 +36,24 @@
 message RemoteEventMessage {
   // EventType is an enumeration of event types sent to cloud console.
   // Any new event type should be added here.
-  enum EventType {
-    MOBLAB_INFO = 1;
-    MOBLAB_BOOT_COMPLETE = 2;
+  enum Type {
+    EVENT_UNKNOWN = 0;
+    EVENT_MOBLAB_BOOT_COMPLETE = 1;
   }
 
-  required EventType event_type = 1 [default = MOBLAB_INFO];
-  optional string event_data = 2;
+  optional Type type = 1 [default = EVENT_UNKNOWN];
+  optional string data = 2;
 }
 
 // Moblab alerts
 message Alert {
   enum AlertLevel {
-    CRITICAL = 1;
-    MAJOR = 2;
-    MINOR = 3;
+    ALERT_UNSPECIFIED = 0;
+    ALERT_CRITICAL = 1;
+    ALERT_MAJOR = 2;
+    ALERT_MINOR = 3;
   }
-  required AlertLevel level = 1;
+  optional AlertLevel level = 1;
   optional string data = 2;
   optional Timestamp timestamp = 3;
   optional string source_application = 4;
diff --git a/site_utils/cloud_console_client.py b/site_utils/cloud_console_client.py
index 892f677..c3141f8 100644
--- a/site_utils/cloud_console_client.py
+++ b/site_utils/cloud_console_client.py
@@ -15,29 +15,22 @@
 from autotest_lib.client.common_lib import utils
 from autotest_lib.server.hosts import moblab_host
 from autotest_lib.site_utils import pubsub_utils
+from autotest_lib.site_utils import cloud_console_pb2 as cpcon
 
 
 _PUBSUB_TOPIC = global_config.global_config.get_config_value(
         'CROS', 'cloud_notification_topic', default=None)
 
+# Current notification version.
+CURRENT_MESSAGE_VERSION = '1'
+
 # Test upload pubsub notification attributes
-NOTIFICATION_ATTR_VERSION = 'version'
-NOTIFICATION_ATTR_GCS_URI = 'gcs_uri'
-NOTIFICATION_ATTR_MOBLAB_MAC = 'moblab_mac_address'
-NOTIFICATION_ATTR_MOBLAB_ID = 'moblab_id'
-NOTIFICATION_VERSION = '1'
+LEGACY_ATTR_VERSION = 'version'
+LEGACY_ATTR_GCS_URI = 'gcs_uri'
+LEGACY_ATTR_MOBLAB_MAC = 'moblab_mac_address'
+LEGACY_ATTR_MOBLAB_ID = 'moblab_id'
 # the message data for new test result notification.
-NEW_TEST_RESULT_MESSAGE = 'NEW_TEST_RESULT'
-
-
-ALERT_CRITICAL = 'Critical'
-ALERT_MAJOR = 'Major'
-ALERT_MINOR = 'Minor'
-
-LOG_INFO = 'Info'
-LOG_WARNING = 'Warning'
-LOG_SEVERE = 'Severe'
-LOG_FATAL = 'Fatal'
+LEGACY_TEST_OFFLOAD_MESSAGE = 'NEW_TEST_RESULT'
 
 
 def is_cloud_notification_enabled():
@@ -49,6 +42,26 @@
         'CROS', 'cloud_notification_enabled', type=bool, default=False)
 
 
+def _get_message_type_name(message_type_enum):
+    """Gets the message type name from message type enum.
+
+    @param message_type_enum: The message type enum.
+
+    @return The corresponding message type name as string, or 'MSG_UNKNOWN'.
+    """
+    return cpcon.MessageType.Name(message_type_enum)
+
+
+def _get_attribute_name(attribute_enum):
+    """Gets the message attribute name from attribte enum.
+
+    @param attribute_enum: The attribute enum.
+
+    @return The corresponding attribute name as string, or 'ATTR_INVALID'.
+    """
+    return cpcon.MessageAttribute.Name(attribute_enum)
+
+
 class CloudConsoleClient(object):
     """The remote interface to the Cloud Console."""
     def send_heartbeat(self):
@@ -71,11 +84,11 @@
         """
         pass
 
-    def send_log(self, msg, level=LOG_INFO, session_id=None):
+    def send_log(self, msg, level=None, session_id=None):
         """Sends a log message to the remote console.
 
         @param msg: The log message.
-        @param level: The logging level as string.
+        @param level: The logging level.
         @param session_id: The current session id.
 
         @returns True if the notification is successfully sent.
@@ -83,11 +96,11 @@
         """
         pass
 
-    def send_alert(self, msg, level=ALERT_MINOR, session_id=None):
+    def send_alert(self, msg, level=None, session_id=None):
         """Sends an alert to the remote console.
 
         @param msg: The alert message.
-        @param level: The logging level as string.
+        @param level: The logging level.
         @param session_id: The current session id.
 
         @returns True if the notification is successfully sent.
@@ -129,7 +142,7 @@
         self._pubsub_topic = pubsub_topic
 
 
-    def _create_notification_message(self, data, msg_attributes):
+    def _create_message(self, data, msg_attributes):
         """Creates a cloud pubsub notification object.
 
         @param data: The message data as a string.
@@ -137,37 +150,51 @@
 
         @returns: A pubsub message object with data and attributes.
         """
-        message = {'data': data}
-        message['attributes'] = msg_attributes
+        message = {}
+        if data:
+            message['data'] = data
+        if msg_attributes:
+            message['attributes'] = msg_attributes
         return message
 
-    def _create_notification_attributes(self):
+    def _create_message_attributes(self, message_type_enum):
         """Creates a cloud pubsub notification message attribute map.
 
         Fills in the version, moblab mac address, and moblab id information
         as attributes.
 
+        @param message_type_enum The message type enum.
+
         @returns: A pubsub messsage attribute map.
         """
         msg_attributes = {}
-        msg_attributes[NOTIFICATION_ATTR_VERSION] = NOTIFICATION_VERSION
-        msg_attributes[NOTIFICATION_ATTR_MOBLAB_MAC] = (
+        msg_attributes[_get_attribute_name(cpcon.ATTR_MESSAGE_TYPE)] = (
+                _get_message_type_name(message_type_enum))
+        msg_attributes[_get_attribute_name(cpcon.ATTR_MESSAGE_VERSION)] = (
+                CURRENT_MESSAGE_VERSION)
+        msg_attributes[_get_attribute_name(cpcon.ATTR_MOBLAB_MAC_ADDRESS)] = (
                 utils.get_default_interface_mac_address())
-        msg_attributes[NOTIFICATION_ATTR_MOBLAB_ID] = utils.get_moblab_id()
+        msg_attributes[_get_attribute_name(cpcon.ATTR_MOBLAB_ID)] = (
+                utils.get_moblab_id())
         return msg_attributes
 
-    def _create_test_result_notification(self, gcs_uri):
+    def _create_test_job_offloaded_message(self, gcs_uri):
         """Construct a test result notification.
 
+        TODO(ntang): switch LEGACY to new message format.
         @param gcs_uri: The test result Google Cloud Storage URI.
 
         @returns The notification message.
         """
-        data = base64.b64encode(NEW_TEST_RESULT_MESSAGE)
-        msg_attributes = self._create_notification_attributes()
-        msg_attributes[NOTIFICATION_ATTR_GCS_URI] = gcs_uri
+        data = base64.b64encode(LEGACY_TEST_OFFLOAD_MESSAGE)
+        msg_attributes = {}
+        msg_attributes[LEGACY_ATTR_VERSION] = CURRENT_MESSAGE_VERSION
+        msg_attributes[LEGACY_ATTR_MOBLAB_MAC] = (
+                utils.get_default_interface_mac_address())
+        msg_attributes[LEGACY_ATTR_MOBLAB_ID] = utils.get_moblab_id()
+        msg_attributes[LEGACY_ATTR_GCS_URI] = gcs_uri
 
-        return self._create_notification_message(data, msg_attributes)
+        return self._create_message(data, msg_attributes)
 
 
     def send_test_job_offloaded_message(self, gcs_uri):
@@ -179,11 +206,67 @@
             Otherwise, False.
         """
         logging.info('Notification on gcs_uri %s', gcs_uri)
-        message = self._create_test_result_notification(gcs_uri)
+        message = self._create_test_job_offloaded_message(gcs_uri)
+        return self._publish_notification(message)
+
+
+    def _publish_notification(self, message):
         msg_ids = self._pubsub_client.publish_notifications(
                 self._pubsub_topic, [message])
+
         if msg_ids:
+            logging.debug('Successfully sent out a notification')
             return True
-        logging.warning('Failed to send notification on gcs_uri %s', gcs_uri)
+        logging.warning('Failed to send notification %s', str(message))
         return False
 
+    def send_heartbeat(self):
+        """Sends a heartbeat.
+
+        @returns True if the heartbeat notification is successfully sent.
+            Otherwise, False.
+        """
+        logging.info('Sending a heartbeat')
+
+        event = cpcon.Heartbeat()
+        # Don't sent local timestamp for now.
+        data = event.SerializeToString()
+        try:
+            attributes = self._create_message_attributes(
+                    cpcon.MSG_MOBLAB_HEARTBEAT)
+            message = self._create_message(data, attributes)
+        except ValueError:
+            logging.exception('Failed to create message.')
+            return False
+        return self._publish_notification(message)
+
+    def send_event(self, event_type=None, event_data=None):
+        """Sends an event notification to the remote console.
+
+        @param event_type: The event type that is defined in the protobuffer
+            file 'cloud_console.proto'.
+        @param event_data: The event data.
+
+        @returns True if the notification is successfully sent.
+            Otherwise, False.
+        """
+        logging.info('Send an event.')
+        if not event_type:
+            logging.info('Failed to send event without a type.')
+            return False
+
+        event = cpcon.RemoteEventMessage()
+        if event_data:
+            event.data = event_data
+        else:
+            event.data = ''
+        event.type = event_type
+        data = event.SerializeToString()
+        try:
+            attributes = self._create_message_attributes(
+                    cpcon.MSG_MOBLAB_REMOTE_EVENT)
+            message = self._create_message(data, attributes)
+        except ValueError:
+            logging.exception('Failed to create message.')
+            return False
+        return self._publish_notification(message)
diff --git a/site_utils/cloud_console_client_unittest.py b/site_utils/cloud_console_client_unittest.py
index 2258b99..8f67883 100644
--- a/site_utils/cloud_console_client_unittest.py
+++ b/site_utils/cloud_console_client_unittest.py
@@ -11,6 +11,7 @@
 
 from autotest_lib.client.common_lib import utils
 from autotest_lib.site_utils import cloud_console_client
+from autotest_lib.site_utils import cloud_console_pb2 as cpcon
 from autotest_lib.site_utils import pubsub_utils
 
 class PubSubBasedClientTests(mox.MoxTestBase):
@@ -31,45 +32,43 @@
     def test_create_test_result_notification(self):
         """Tests the test result notification message."""
         self._console_client = cloud_console_client.PubSubBasedClient()
-        self.mox.StubOutWithMock(utils, 'get_moblab_id')
         self.mox.StubOutWithMock(utils,
                                  'get_default_interface_mac_address')
         utils.get_default_interface_mac_address().AndReturn(
             '1c:dc:d1:11:01:e1')
+        self.mox.StubOutWithMock(utils, 'get_moblab_id')
         utils.get_moblab_id().AndReturn(
             'c8386d92-9ad1-11e6-80f5-111111111111')
         self.mox.ReplayAll()
         console_client = cloud_console_client.PubSubBasedClient()
-        msg = console_client._create_test_result_notification(
+        msg = console_client._create_test_job_offloaded_message(
                 'gs://test_bucket/123-moblab')
         self.assertEquals(base64.b64encode(
-            cloud_console_client.NEW_TEST_RESULT_MESSAGE), msg['data'])
+            cloud_console_client.LEGACY_TEST_OFFLOAD_MESSAGE), msg['data'])
         self.assertEquals(
-            cloud_console_client.NOTIFICATION_VERSION,
-            msg['attributes'][cloud_console_client.NOTIFICATION_ATTR_VERSION])
+            cloud_console_client.CURRENT_MESSAGE_VERSION,
+            msg['attributes'][cloud_console_client.LEGACY_ATTR_VERSION])
         self.assertEquals(
             '1c:dc:d1:11:01:e1',
-            msg['attributes'][cloud_console_client.NOTIFICATION_ATTR_MOBLAB_MAC]
+            msg['attributes'][cloud_console_client.LEGACY_ATTR_MOBLAB_MAC]
             )
         self.assertEquals(
             'c8386d92-9ad1-11e6-80f5-111111111111',
-            msg['attributes'][cloud_console_client.NOTIFICATION_ATTR_MOBLAB_ID])
+            msg['attributes'][cloud_console_client.LEGACY_ATTR_MOBLAB_ID])
         self.assertEquals(
             'gs://test_bucket/123-moblab',
-            msg['attributes'][cloud_console_client.NOTIFICATION_ATTR_GCS_URI])
+            msg['attributes'][cloud_console_client.LEGACY_ATTR_GCS_URI])
         self.mox.VerifyAll()
 
-
     def test_send_test_job_offloaded_message(self):
         """Tests send job offloaded notification."""
         console_client = cloud_console_client.PubSubBasedClient(
                 pubsub_topic='test topic')
 
-        # self.mox.ResetAll()
         message = {'data': 'dummy data', 'attributes' : {'key' : 'value'}}
         self.mox.StubOutWithMock(cloud_console_client.PubSubBasedClient,
-                '_create_test_result_notification')
-        console_client._create_test_result_notification(
+                '_create_test_job_offloaded_message')
+        console_client._create_test_job_offloaded_message(
                 'gs://test_bucket/123-moblab').AndReturn(message)
 
         msg_ids = ['1']
@@ -77,10 +76,55 @@
                 'test topic', [message]).AndReturn(msg_ids)
 
         self.mox.ReplayAll()
-        console_client.send_test_job_offloaded_message(
+        result = console_client.send_test_job_offloaded_message(
                 'gs://test_bucket/123-moblab')
+        self.assertTrue(result)
         self.mox.VerifyAll()
 
+    def test_send_heartbeat(self):
+        """Tests send heartbeat."""
+        console_client = cloud_console_client.PubSubBasedClient(
+                pubsub_topic='test topic')
+        self.mox.StubOutWithMock(utils, 'get_moblab_id')
+        utils.get_moblab_id().AndReturn(
+            'c8386d92-9ad1-11e6-80f5-111111111111')
+
+        message = {
+                'attributes' : {
+                    'ATTR_MOBLAB_ID': 'c8386d92-9ad1-11e6-80f5-111111111111',
+                    'ATTR_MESSAGE_VERSION': '1',
+                    'ATTR_MESSAGE_TYPE': 'MSG_MOBLAB_HEARTBEAT',
+                    'ATTR_MOBLAB_MAC_ADDRESS': '8c:dc:d4:56:06:e7'}}
+        msg_ids = ['1']
+        self._pubsub_client_mock.publish_notifications(
+                'test topic', [message]).AndReturn(msg_ids)
+        self.mox.ReplayAll()
+        console_client.send_heartbeat()
+        self.mox.VerifyAll()
+
+    def test_send_event(self):
+        """Tests send heartbeat."""
+        console_client = cloud_console_client.PubSubBasedClient(
+                pubsub_topic='test topic')
+        self.mox.StubOutWithMock(utils, 'get_moblab_id')
+        utils.get_moblab_id().AndReturn(
+            'c8386d92-9ad1-11e6-80f5-111111111111')
+
+        message = {
+                'data': '\x08\x01\x12\x0ethis is a test',
+                'attributes' : {
+                    'ATTR_MOBLAB_ID': 'c8386d92-9ad1-11e6-80f5-111111111111',
+                    'ATTR_MESSAGE_VERSION': '1',
+                    'ATTR_MESSAGE_TYPE': 'MSG_MOBLAB_REMOTE_EVENT',
+                    'ATTR_MOBLAB_MAC_ADDRESS': '8c:dc:d4:56:06:e7'}}
+        msg_ids = ['1']
+        self._pubsub_client_mock.publish_notifications(
+                'test topic', [message]).AndReturn(msg_ids)
+        self.mox.ReplayAll()
+        console_client.send_event(
+                cpcon.RemoteEventMessage.EVENT_MOBLAB_BOOT_COMPLETE,
+                'this is a test')
+        self.mox.VerifyAll()
 
 if __name__ == '__main__':
     unittest.main()
diff --git a/site_utils/gs_offloader.py b/site_utils/gs_offloader.py
index 177a3a6..9377c80 100755
--- a/site_utils/gs_offloader.py
+++ b/site_utils/gs_offloader.py
@@ -35,7 +35,11 @@
 from autotest_lib.client.common_lib import global_config
 from autotest_lib.client.common_lib import utils
 from autotest_lib.site_utils import job_directories
-from autotest_lib.site_utils import cloud_console_client
+# For unittest, the cloud_console.proto is not compiled yet.
+try:
+    from autotest_lib.site_utils import cloud_console_client
+except ImportError:
+    cloud_console_client = None
 from autotest_lib.tko import models
 from autotest_lib.utils import labellib
 from autotest_lib.utils import gslib
@@ -801,7 +805,8 @@
             logging.info(
                     'Offloader multiprocessing is set to:%r', multiprocessing)
             console_client = None
-            if cloud_console_client.is_cloud_notification_enabled():
+            if (cloud_console_client and
+                    cloud_console_client.is_cloud_notification_enabled()):
                 console_client = cloud_console_client.PubSubBasedClient()
             self._gs_offloader = GSOffloader(
                     self.gs_uri, multiprocessing, self._delete_age_limit,
diff --git a/site_utils/gs_offloader_unittest.py b/site_utils/gs_offloader_unittest.py
index f063538..256e958 100755
--- a/site_utils/gs_offloader_unittest.py
+++ b/site_utils/gs_offloader_unittest.py
@@ -23,7 +23,11 @@
 from autotest_lib.client.common_lib import global_config
 from autotest_lib.client.common_lib import time_utils
 from autotest_lib.client.common_lib import utils
-from autotest_lib.site_utils import cloud_console_client
+#For unittest without cloud_client.proto compiled.
+try:
+    from autotest_lib.site_utils import cloud_console_client
+except ImportError:
+    cloud_console_client = None
 from autotest_lib.site_utils import gs_offloader
 from autotest_lib.site_utils import job_directories
 from autotest_lib.tko import models
@@ -92,8 +96,9 @@
         sub_offloader = gs_offloader.GSOffloader(expected_gsuri,
             multiprocessing, delete_age, console_client)
         self.mox.StubOutWithMock(gs_offloader, 'GSOffloader')
-        self.mox.StubOutWithMock(cloud_console_client,
-                                'is_cloud_notification_enabled')
+        if cloud_console_client:
+            self.mox.StubOutWithMock(cloud_console_client,
+                    'is_cloud_notification_enabled')
         if console_client:
             cloud_console_client.is_cloud_notification_enabled().AndReturn(True)
             gs_offloader.GSOffloader(
@@ -101,8 +106,9 @@
                     mox.IsA(cloud_console_client.PubSubBasedClient)).AndReturn(
                         sub_offloader)
         else:
-            cloud_console_client.is_cloud_notification_enabled().AndReturn(
-                    False)
+            if cloud_console_client:
+                cloud_console_client.is_cloud_notification_enabled().AndReturn(
+                        False)
             gs_offloader.GSOffloader(
                 expected_gsuri, multiprocessing, delete_age, None).AndReturn(
                     sub_offloader)
@@ -240,6 +246,8 @@
 
     def test_offloader_pubsub_enabled(self):
         """Test multiprocessing is set."""
+        if not cloud_console_client:
+            return
         self.mox.StubOutWithMock(pubsub_utils, "PubSubClient")
         sub_offloader = self._mock_get_sub_offloader(True, False,
                 cloud_console_client.PubSubBasedClient())
diff --git a/utils/unittest_suite.py b/utils/unittest_suite.py
index d61e188..f15e94c 100755
--- a/utils/unittest_suite.py
+++ b/utils/unittest_suite.py
@@ -71,6 +71,7 @@
         ))
 
 REQUIRES_PROTOBUFS = set((
+        'cloud_console_client_unittest.py',
         'job_serializer_unittest.py',
         ))