| # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import os |
| import subprocess |
| |
| from autotest_lib.client.bin import utils |
| from autotest_lib.client.common_lib.cros import site_eap_certs |
| |
| class HostapdServer(object): |
| """Hostapd server instance wrapped in a context manager. |
| |
| Simple interface to starting and controlling a hsotapd instance. |
| This can be combined with a virtual-ethernet setup to test 802.1x |
| on a wired interface. |
| |
| Example usage: |
| with hostapd_server.HostapdServer(interface='veth_master') as hostapd: |
| hostapd.send_eap_packets() |
| |
| """ |
| CONFIG_TEMPLATE = """ |
| interface=%(interface)s |
| driver=%(driver)s |
| logger_syslog=-1 |
| logger_syslog_level=2 |
| logger_stdout=-1 |
| logger_stdout_level=2 |
| dump_file=%(config_directory)s/hostapd.dump |
| ctrl_interface=%(config_directory)s/%(control_directory)s |
| ieee8021x=1 |
| eapol_key_index_workaround=0 |
| eap_server=1 |
| eap_user_file=%(config_directory)s/%(user_file)s |
| ca_cert=%(config_directory)s/%(ca_cert)s |
| server_cert=%(config_directory)s/%(server_cert)s |
| private_key=%(config_directory)s/%(server_key)s |
| use_pae_group_addr=1 |
| eap_reauth_period=10 |
| """ |
| CA_CERTIFICATE_FILE = 'ca.crt' |
| CONFIG_FILE = 'hostapd.conf' |
| CONTROL_DIRECTORY = 'hostapd.ctl' |
| EAP_PASSWORD = 'password' |
| EAP_PHASE2 = 'MSCHAPV2' |
| EAP_TYPE = 'PEAP' |
| EAP_USERNAME = 'test' |
| HOSTAPD_EXECUTABLE = 'hostapd' |
| HOSTAPD_CLIENT_EXECUTABLE = 'hostapd_cli' |
| SERVER_CERTIFICATE_FILE = 'server.crt' |
| SERVER_PRIVATE_KEY_FILE = 'server.key' |
| USER_AUTHENTICATION_TEMPLATE = """* %(type)s |
| "%(username)s"\t%(phase2)s\t"%(password)s"\t[2] |
| """ |
| USER_FILE = 'hostapd.eap_user' |
| # This is the default group MAC address to which EAP challenges |
| # are sent, absent any prior knowledge of a specific client on |
| # the link. |
| PAE_NEAREST_ADDRESS = '01:80:c2:00:00:03' |
| |
| def __init__(self, |
| interface=None, |
| driver='wired', |
| config_directory='/tmp/hostapd-test'): |
| super(HostapdServer, self).__init__() |
| self._interface = interface |
| self._config_directory = config_directory |
| self._control_directory = '%s/%s' % (self._config_directory, |
| self.CONTROL_DIRECTORY) |
| self._driver = driver |
| self._process = None |
| |
| |
| def __enter__(self): |
| self.start() |
| return self |
| |
| |
| def __exit__(self, exception, value, traceback): |
| self.stop() |
| |
| |
| def write_config(self): |
| """Write out a hostapd configuration file-set based on the caller |
| supplied parameters. |
| |
| @return the file name of the top-level configuration file written. |
| |
| """ |
| if not os.path.exists(self._config_directory): |
| os.mkdir(self._config_directory) |
| config_params = { |
| 'ca_cert': self.CA_CERTIFICATE_FILE, |
| 'config_directory' : self._config_directory, |
| 'control_directory': self.CONTROL_DIRECTORY, |
| 'driver': self._driver, |
| 'interface': self._interface, |
| 'server_cert': self.SERVER_CERTIFICATE_FILE, |
| 'server_key': self.SERVER_PRIVATE_KEY_FILE, |
| 'user_file': self.USER_FILE |
| } |
| authentication_params = { |
| 'password': self.EAP_PASSWORD, |
| 'phase2': self.EAP_PHASE2, |
| 'username': self.EAP_USERNAME, |
| 'type': self.EAP_TYPE |
| } |
| for filename, contents in ( |
| ( self.CA_CERTIFICATE_FILE, site_eap_certs.ca_cert_1 ), |
| ( self.CONFIG_FILE, self.CONFIG_TEMPLATE % config_params), |
| ( self.SERVER_CERTIFICATE_FILE, site_eap_certs.server_cert_1 ), |
| ( self.SERVER_PRIVATE_KEY_FILE, |
| site_eap_certs.server_private_key_1 ), |
| ( self.USER_FILE, |
| self.USER_AUTHENTICATION_TEMPLATE % authentication_params )): |
| config_file = '%s/%s' % (self._config_directory, filename) |
| with open(config_file, 'w') as f: |
| f.write(contents) |
| return '%s/%s' % (self._config_directory, self.CONFIG_FILE) |
| |
| |
| def start(self): |
| """Start the hostap server.""" |
| config_file = self.write_config() |
| self._process = subprocess.Popen( |
| [self.HOSTAPD_EXECUTABLE, '-dd', config_file]) |
| |
| |
| def stop(self): |
| """Stop the hostapd server.""" |
| if self._process: |
| self._process.terminate() |
| self._process.wait() |
| self._process = None |
| |
| |
| def running(self): |
| """Tests whether the hostapd process is still running. |
| |
| @return True if the hostapd process is still running, False otherwise. |
| |
| """ |
| if not self._process: |
| return False |
| |
| if self._process.poll() != None: |
| # We have essentially reaped the proces, and it is no more. |
| self._process = None |
| return False |
| |
| return True |
| |
| |
| def send_eap_packets(self): |
| """Start sending EAP packets to the nearest neighbor.""" |
| self.send_command('new_sta %s' % self.PAE_NEAREST_ADDRESS) |
| |
| |
| def get_client_mib(self, client_mac_address): |
| """Get a dict representing the MIB properties for |client_mac_address|. |
| |
| @param client_mac_address string MAC address of the client. |
| @return dict containing mib properties. |
| |
| """ |
| # Expected output of "hostapd cli <client_mac_address>": |
| # |
| # Selected interface 'veth_master' |
| # b6:f1:39:1d:ad:10 |
| # dot1xPaePortNumber=0 |
| # dot1xPaePortProtocolVersion=2 |
| # [...] |
| result = self.send_command('sta %s' % client_mac_address) |
| client_mib = {} |
| found_client = False |
| for line in result.splitlines(): |
| if found_client: |
| parts = line.split('=', 1) |
| if len(parts) == 2: |
| client_mib[parts[0]] = parts[1] |
| elif line == client_mac_address: |
| found_client = True |
| return client_mib |
| |
| |
| def send_command(self, command): |
| """Send a command to the hostapd instance. |
| |
| @param command string containing the command to run on hostapd. |
| @return string output of the command. |
| |
| """ |
| return utils.system_output('%s -p %s %s' % |
| (self.HOSTAPD_CLIENT_EXECUTABLE, |
| self._control_directory, command)) |
| |
| |
| def client_has_authenticated(self, client_mac_address): |
| """Return whether |client_mac_address| has successfully authenticated. |
| |
| @param client_mac_address string MAC address of the client. |
| @return True if client is authenticated. |
| |
| """ |
| mib = self.get_client_mib(client_mac_address) |
| return mib.get('dot1xAuthAuthSuccessesWhileAuthenticating', '') == '1' |
| |
| |
| def client_has_logged_off(self, client_mac_address): |
| """Return whether |client_mac_address| has logged-off. |
| |
| @param client_mac_address string MAC address of the client. |
| @return True if client has logged off. |
| |
| """ |
| mib = self.get_client_mib(client_mac_address) |
| return mib.get('dot1xAuthAuthEapLogoffWhileAuthenticated', '') == '1' |