blob: 343abefd805ac81bf8ed6bfb23fb8363e15cb52e [file] [log] [blame]
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Chrome OS Parnter Concole remote actions."""
from __future__ import print_function
import base64
import logging
import common
from autotest_lib.client.common_lib import global_config
from autotest_lib.client.common_lib import utils
from autotest_lib.server.hosts import moblab_host
from autotest_lib.site_utils import pubsub_utils
from autotest_lib.site_utils import cloud_console_pb2 as cpcon
_PUBSUB_TOPIC = global_config.global_config.get_config_value(
'CROS', 'cloud_notification_topic', default=None)
# Current notification version.
CURRENT_MESSAGE_VERSION = '1'
# Test upload pubsub notification attributes
LEGACY_ATTR_VERSION = 'version'
LEGACY_ATTR_GCS_URI = 'gcs_uri'
LEGACY_ATTR_MOBLAB_MAC = 'moblab_mac_address'
LEGACY_ATTR_MOBLAB_ID = 'moblab_id'
# the message data for new test result notification.
LEGACY_TEST_OFFLOAD_MESSAGE = 'NEW_TEST_RESULT'
def is_cloud_notification_enabled():
"""Checks if cloud pubsub notification is enabled.
@returns: True if cloud pubsub notification is enabled. Otherwise, False.
"""
return global_config.global_config.get_config_value(
'CROS', 'cloud_notification_enabled', type=bool, default=False)
def _get_message_type_name(message_type_enum):
"""Gets the message type name from message type enum.
@param message_type_enum: The message type enum.
@return The corresponding message type name as string, or 'MSG_UNKNOWN'.
"""
return cpcon.MessageType.Name(message_type_enum)
def _get_attribute_name(attribute_enum):
"""Gets the message attribute name from attribte enum.
@param attribute_enum: The attribute enum.
@return The corresponding attribute name as string, or 'ATTR_INVALID'.
"""
return cpcon.MessageAttribute.Name(attribute_enum)
class CloudConsoleClient(object):
"""The remote interface to the Cloud Console."""
def send_heartbeat(self):
"""Sends a heartbeat.
@returns True if the notification is successfully sent.
Otherwise, False.
"""
pass
def send_event(self, event_type=None, event_data=None):
"""Sends an event notification to the remote console.
@param event_type: The event type that is defined in the protobuffer
file 'cloud_console.proto'.
@param event_data: The event data.
@returns True if the notification is successfully sent.
Otherwise, False.
"""
pass
def send_log(self, msg, level=None, session_id=None):
"""Sends a log message to the remote console.
@param msg: The log message.
@param level: The logging level.
@param session_id: The current session id.
@returns True if the notification is successfully sent.
Otherwise, False.
"""
pass
def send_alert(self, msg, level=None, session_id=None):
"""Sends an alert to the remote console.
@param msg: The alert message.
@param level: The logging level.
@param session_id: The current session id.
@returns True if the notification is successfully sent.
Otherwise, False.
"""
pass
def send_test_job_offloaded_message(self, gcs_uri):
"""Sends a test job offloaded message to the remote console.
@param gcs_uri: The test result Google Cloud Storage URI.
@returns True if the notification is successfully sent.
Otherwise, False.
"""
pass
# Make it easy to mock out
def _create_pubsub_client(credential):
return pubsub_utils.PubSubClient(credential)
class PubSubBasedClient(CloudConsoleClient):
"""A Cloud PubSub based implementation of the CloudConsoleClient interface.
"""
def __init__(
self,
credential=moblab_host.MOBLAB_SERVICE_ACCOUNT_LOCATION,
pubsub_topic=_PUBSUB_TOPIC):
"""Constructor.
@param credential: The service account credential filename. Default to
'/home/moblab/.service_account.json'.
@param pubsub_topic: The cloud pubsub topic name to use.
"""
super(PubSubBasedClient, self).__init__()
self._pubsub_client = _create_pubsub_client(credential)
self._pubsub_topic = pubsub_topic
def _create_message(self, data, msg_attributes):
"""Creates a cloud pubsub notification object.
@param data: The message data as a string.
@param msg_attributes: The message attribute map.
@returns: A pubsub message object with data and attributes.
"""
message = {}
if data:
message['data'] = data
if msg_attributes:
message['attributes'] = msg_attributes
return message
def _create_message_attributes(self, message_type_enum):
"""Creates a cloud pubsub notification message attribute map.
Fills in the version, moblab mac address, and moblab id information
as attributes.
@param message_type_enum The message type enum.
@returns: A pubsub messsage attribute map.
"""
msg_attributes = {}
msg_attributes[_get_attribute_name(cpcon.ATTR_MESSAGE_TYPE)] = (
_get_message_type_name(message_type_enum))
msg_attributes[_get_attribute_name(cpcon.ATTR_MESSAGE_VERSION)] = (
CURRENT_MESSAGE_VERSION)
msg_attributes[_get_attribute_name(cpcon.ATTR_MOBLAB_MAC_ADDRESS)] = (
utils.get_moblab_serial_number())
msg_attributes[_get_attribute_name(cpcon.ATTR_MOBLAB_ID)] = (
utils.get_moblab_id())
return msg_attributes
def _create_test_job_offloaded_message(self, gcs_uri):
"""Construct a test result notification.
TODO(ntang): switch LEGACY to new message format.
@param gcs_uri: The test result Google Cloud Storage URI.
@returns The notification message.
"""
data = base64.b64encode(LEGACY_TEST_OFFLOAD_MESSAGE)
msg_attributes = {}
msg_attributes[LEGACY_ATTR_VERSION] = CURRENT_MESSAGE_VERSION
msg_attributes[LEGACY_ATTR_MOBLAB_MAC] = (
utils.get_moblab_serial_number())
msg_attributes[LEGACY_ATTR_MOBLAB_ID] = utils.get_moblab_id()
msg_attributes[LEGACY_ATTR_GCS_URI] = gcs_uri
return self._create_message(data, msg_attributes)
def send_test_job_offloaded_message(self, gcs_uri):
"""Notify the cloud console a test job is offloaded.
@param gcs_uri: The test result Google Cloud Storage URI.
@returns True if the notification is successfully sent.
Otherwise, False.
"""
logging.info('Notification on gcs_uri %s', gcs_uri)
message = self._create_test_job_offloaded_message(gcs_uri)
return self._publish_notification(message)
def _publish_notification(self, message):
msg_ids = self._pubsub_client.publish_notifications(
self._pubsub_topic, [message])
if msg_ids:
logging.debug('Successfully sent out a notification')
return True
logging.warning('Failed to send notification %s', str(message))
return False
def send_heartbeat(self):
"""Sends a heartbeat.
@returns True if the heartbeat notification is successfully sent.
Otherwise, False.
"""
logging.info('Sending a heartbeat')
event = cpcon.Heartbeat()
# Don't sent local timestamp for now.
data = event.SerializeToString()
try:
attributes = self._create_message_attributes(
cpcon.MSG_MOBLAB_HEARTBEAT)
message = self._create_message(data, attributes)
except ValueError:
logging.exception('Failed to create message.')
return False
return self._publish_notification(message)
def send_event(self, event_type=None, event_data=None):
"""Sends an event notification to the remote console.
@param event_type: The event type that is defined in the protobuffer
file 'cloud_console.proto'.
@param event_data: The event data.
@returns True if the notification is successfully sent.
Otherwise, False.
"""
logging.info('Send an event.')
if not event_type:
logging.info('Failed to send event without a type.')
return False
event = cpcon.RemoteEventMessage()
if event_data:
event.data = event_data
else:
event.data = ''
event.type = event_type
data = event.SerializeToString()
try:
attributes = self._create_message_attributes(
cpcon.MSG_MOBLAB_REMOTE_EVENT)
message = self._create_message(data, attributes)
except ValueError:
logging.exception('Failed to create message.')
return False
return self._publish_notification(message)