| # |
| # Copyright 2008 Google Inc. All Rights Reserved. |
| # |
| |
| import os |
| from autotest_lib.frontend.afe import rpc_client_lib |
| from autotest_lib.frontend.afe.json_rpc import proxy |
| from autotest_lib.client.common_lib import global_config, utils |
| |
| GLOBAL_CONFIG = global_config.global_config |
| DEFAULT_SERVER = 'autotest' |
| AFE_RPC_PATH = '/afe/server/noauth/rpc/' |
| TKO_RPC_PATH = '/new_tko/server/noauth/rpc/' |
| |
| |
| class AuthError(Exception): |
| pass |
| |
| |
| def get_autotest_server(web_server=None): |
| if not web_server: |
| if 'AUTOTEST_WEB' in os.environ: |
| web_server = os.environ['AUTOTEST_WEB'] |
| else: |
| web_server = GLOBAL_CONFIG.get_config_value( |
| 'SERVER', 'hostname', default=DEFAULT_SERVER) |
| |
| web_server = rpc_client_lib.add_protocol(web_server) |
| return web_server |
| |
| |
| class rpc_comm(object): |
| """Shared AFE/TKO RPC class stuff""" |
| def __init__(self, web_server, rpc_path, username): |
| self.username = username |
| self.web_server = get_autotest_server(web_server) |
| try: |
| self.proxy = self._connect(rpc_path) |
| except rpc_client_lib.AuthError, s: |
| raise AuthError(s) |
| |
| |
| def _connect(self, rpc_path): |
| # This does not fail even if the address is wrong. |
| # We need to wait for an actual RPC to fail |
| headers = rpc_client_lib.authorization_headers(self.username, |
| self.web_server) |
| rpc_server = self.web_server + rpc_path |
| return rpc_client_lib.get_proxy(rpc_server, headers=headers) |
| |
| |
| def run(self, op, *args, **data): |
| if 'AUTOTEST_CLI_DEBUG' in os.environ: |
| print self.web_server, op, args, data |
| function = getattr(self.proxy, op) |
| result = function(*args, **data) |
| if 'AUTOTEST_CLI_DEBUG' in os.environ: |
| print 'result:', result |
| return result |
| |
| |
| class afe_comm(rpc_comm): |
| """Handles the AFE setup and communication through RPC""" |
| def __init__(self, web_server=None, rpc_path=AFE_RPC_PATH, username=None): |
| super(afe_comm, self).__init__(web_server, rpc_path, username) |
| |
| |
| class tko_comm(rpc_comm): |
| """Handles the TKO setup and communication through RPC""" |
| def __init__(self, web_server=None, rpc_path=TKO_RPC_PATH, username=None): |
| super(tko_comm, self).__init__(web_server, rpc_path, username) |