| #!/usr/bin/python2 |
| |
| # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import logging |
| import socket |
| import sys |
| import time |
| |
| import common |
| |
| from autotest_lib.client.cros import dhcp_handling_rule |
| from autotest_lib.client.cros import dhcp_packet |
| from autotest_lib.client.cros import dhcp_test_server |
| |
| TEST_DATA_PATH_PREFIX = "client/cros/dhcp_test_data/" |
| |
| TEST_CLASSLESS_STATIC_ROUTE_DATA = \ |
| "\x12\x0a\x09\xc0\xac\x1f\x9b\x0a" \ |
| "\x00\xc0\xa8\x00\xfe" |
| |
| TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED = [ |
| (18, "10.9.192.0", "172.31.155.10"), |
| (0, "0.0.0.0", "192.168.0.254") |
| ] |
| |
| TEST_DOMAIN_SEARCH_LIST_COMPRESSED = \ |
| "\x03eng\x06google\x03com\x00\x09marketing\xC0\x04" |
| |
| TEST_DOMAIN_SEARCH_LIST_PARSED = ("eng.google.com", "marketing.google.com") |
| |
| # At this time, we don't support the compression allowed in the RFC. |
| # This is correct and sufficient for our purposes. |
| TEST_DOMAIN_SEARCH_LIST_EXPECTED = \ |
| "\x03eng\x06google\x03com\x00\x09marketing\x06google\x03com\x00" |
| |
| TEST_DOMAIN_SEARCH_LIST1 = \ |
| "w\x10\x03eng\x06google\x03com\x00" |
| |
| TEST_DOMAIN_SEARCH_LIST2 = \ |
| "w\x16\x09marketing\x06google\x03com\x00" |
| |
| def bin2hex(byte_str, justification=20): |
| """ |
| Turn big hex strings into prettier strings of hex bytes. Group those hex |
| bytes into lines justification bytes long. |
| """ |
| chars = ["x" + (hex(ord(c))[2:].zfill(2)) for c in byte_str] |
| groups = [] |
| for i in xrange(0, len(chars), justification): |
| groups.append("".join(chars[i:i+justification])) |
| return "\n".join(groups) |
| |
| def test_packet_serialization(): |
| log_file = open(TEST_DATA_PATH_PREFIX + "dhcp_discovery.log", "rb") |
| binary_discovery_packet = log_file.read() |
| log_file.close() |
| discovery_packet = dhcp_packet.DhcpPacket(byte_str=binary_discovery_packet) |
| if not discovery_packet.is_valid: |
| return False |
| generated_string = discovery_packet.to_binary_string() |
| if generated_string is None: |
| print "Failed to generate string from packet object." |
| return False |
| if generated_string != binary_discovery_packet: |
| print "Packets didn't match: " |
| print "Generated: \n%s" % bin2hex(generated_string) |
| print "Expected: \n%s" % bin2hex(binary_discovery_packet) |
| return False |
| print "test_packet_serialization PASSED" |
| return True |
| |
| def test_classless_static_route_parsing(): |
| parsed_routes = dhcp_packet.ClasslessStaticRoutesOption.unpack( |
| TEST_CLASSLESS_STATIC_ROUTE_DATA) |
| if parsed_routes != TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED: |
| print ("Parsed binary domain list and got %s but expected %s" % |
| (repr(parsed_routes), |
| repr(TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED))) |
| return False |
| print "test_classless_static_route_parsing PASSED" |
| return True |
| |
| def test_classless_static_route_serialization(): |
| byte_string = dhcp_packet.ClasslessStaticRoutesOption.pack( |
| TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED) |
| if byte_string != TEST_CLASSLESS_STATIC_ROUTE_DATA: |
| # Turn the strings into printable hex strings on a single line. |
| pretty_actual = bin2hex(byte_string, 100) |
| pretty_expected = bin2hex(TEST_CLASSLESS_STATIC_ROUTE_DATA, 100) |
| print ("Expected to serialize %s to %s but instead got %s." % |
| (repr(TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED), pretty_expected, |
| pretty_actual)) |
| return False |
| print "test_classless_static_route_serialization PASSED" |
| return True |
| |
| def test_domain_search_list_parsing(): |
| parsed_domains = dhcp_packet.DomainListOption.unpack( |
| TEST_DOMAIN_SEARCH_LIST_COMPRESSED) |
| # Order matters too. |
| parsed_domains = tuple(parsed_domains) |
| if parsed_domains != TEST_DOMAIN_SEARCH_LIST_PARSED: |
| print ("Parsed binary domain list and got %s but expected %s" % |
| (parsed_domains, TEST_DOMAIN_SEARCH_LIST_EXPECTED)) |
| return False |
| print "test_domain_search_list_parsing PASSED" |
| return True |
| |
| def test_domain_search_list_serialization(): |
| byte_string = dhcp_packet.DomainListOption.pack( |
| TEST_DOMAIN_SEARCH_LIST_PARSED) |
| if byte_string != TEST_DOMAIN_SEARCH_LIST_EXPECTED: |
| # Turn the strings into printable hex strings on a single line. |
| pretty_actual = bin2hex(byte_string, 100) |
| pretty_expected = bin2hex(TEST_DOMAIN_SEARCH_LIST_EXPECTED, 100) |
| print ("Expected to serialize %s to %s but instead got %s." % |
| (TEST_DOMAIN_SEARCH_LIST_PARSED, pretty_expected, pretty_actual)) |
| return False |
| print "test_domain_search_list_serialization PASSED" |
| return True |
| |
| def test_broken_domain_search_list_parsing(): |
| byte_string = '\x00' * 240 + TEST_DOMAIN_SEARCH_LIST1 + TEST_DOMAIN_SEARCH_LIST2 + '\xff' |
| packet = dhcp_packet.DhcpPacket(byte_str=byte_string) |
| if len(packet._options) != 1: |
| print "Expected domain list of length 1" |
| return False |
| for k, v in packet._options.items(): |
| if tuple(v) != TEST_DOMAIN_SEARCH_LIST_PARSED: |
| print ("Expected binary domain list and got %s but expected %s" % |
| (tuple(v), TEST_DOMAIN_SEARCH_LIST_PARSED)) |
| return False |
| print "test_broken_domain_search_list_parsing PASSED" |
| return True |
| |
| def receive_packet(a_socket, timeout_seconds=1.0): |
| data = None |
| start_time = time.time() |
| while data is None and start_time + timeout_seconds > time.time(): |
| try: |
| data, _ = a_socket.recvfrom(1024) |
| except socket.timeout: |
| pass # We expect many timeouts. |
| if data is None: |
| print "Timed out before we received a response from the server." |
| return None |
| |
| print "Client received a packet of length %d from the server." % len(data) |
| packet = dhcp_packet.DhcpPacket(byte_str=data) |
| if not packet.is_valid: |
| print "Received an invalid response from DHCP server." |
| return None |
| |
| return packet |
| |
| def test_simple_server_exchange(server): |
| intended_ip = "127.0.0.42" |
| subnet_mask = "255.255.255.0" |
| server_ip = "127.0.0.1" |
| lease_time_seconds = 60 |
| test_timeout = 3.0 |
| mac_addr = "\x01\x02\x03\x04\x05\x06" |
| # Build up our packets and have them request some default option values, |
| # like the IP we're being assigned and the address of the server assigning |
| # it. |
| discovery_message = dhcp_packet.DhcpPacket.create_discovery_packet(mac_addr) |
| discovery_message.set_option( |
| dhcp_packet.OPTION_PARAMETER_REQUEST_LIST, |
| dhcp_packet.OPTION_VALUE_PARAMETER_REQUEST_LIST_DEFAULT) |
| request_message = dhcp_packet.DhcpPacket.create_request_packet( |
| discovery_message.transaction_id, |
| mac_addr) |
| request_message.set_option( |
| dhcp_packet.OPTION_PARAMETER_REQUEST_LIST, |
| dhcp_packet.OPTION_VALUE_PARAMETER_REQUEST_LIST_DEFAULT) |
| # This is the pool of settings the DHCP server will seem to draw from to |
| # answer queries from the client. This information is written into packets |
| # through the handling rules. |
| dhcp_server_config = { |
| dhcp_packet.OPTION_SERVER_ID : server_ip, |
| dhcp_packet.OPTION_SUBNET_MASK : subnet_mask, |
| dhcp_packet.OPTION_IP_LEASE_TIME : lease_time_seconds, |
| dhcp_packet.OPTION_REQUESTED_IP : intended_ip, |
| } |
| # Build up the handling rules for the server and start the test. |
| rules = [] |
| rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery( |
| intended_ip, |
| server_ip, |
| dhcp_server_config, {})) |
| rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToRequest( |
| intended_ip, |
| server_ip, |
| dhcp_server_config, {})) |
| rules[-1].is_final_handler = True |
| server.start_test(rules, test_timeout) |
| # Because we don't want to require root permissions to run these tests, |
| # listen on the loopback device, don't broadcast, and don't use reserved |
| # ports (like the actual DHCP ports). Use 8068/8067 instead. |
| client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| client_socket.bind(("127.0.0.1", 8068)) |
| client_socket.settimeout(0.1) |
| client_socket.sendto(discovery_message.to_binary_string(), |
| (server_ip, 8067)) |
| |
| offer_packet = receive_packet(client_socket) |
| if offer_packet is None: |
| return False |
| |
| if (offer_packet.message_type != dhcp_packet.MESSAGE_TYPE_OFFER): |
| print "Type of DHCP response is not offer." |
| return False |
| |
| if offer_packet.get_field(dhcp_packet.FIELD_YOUR_IP) != intended_ip: |
| print "Server didn't offer the IP we expected." |
| return False |
| |
| print "Offer looks good to the client, sending request." |
| # In real tests, dhcpcd formats all the DISCOVERY and REQUEST messages. In |
| # our unit test, we have to do this ourselves. |
| request_message.set_option( |
| dhcp_packet.OPTION_SERVER_ID, |
| offer_packet.get_option(dhcp_packet.OPTION_SERVER_ID)) |
| request_message.set_option( |
| dhcp_packet.OPTION_SUBNET_MASK, |
| offer_packet.get_option(dhcp_packet.OPTION_SUBNET_MASK)) |
| request_message.set_option( |
| dhcp_packet.OPTION_IP_LEASE_TIME, |
| offer_packet.get_option(dhcp_packet.OPTION_IP_LEASE_TIME)) |
| request_message.set_option( |
| dhcp_packet.OPTION_REQUESTED_IP, |
| offer_packet.get_option(dhcp_packet.OPTION_REQUESTED_IP)) |
| # Send the REQUEST message. |
| client_socket.sendto(request_message.to_binary_string(), |
| (server_ip, 8067)) |
| ack_packet = receive_packet(client_socket) |
| if ack_packet is None: |
| return False |
| |
| if (ack_packet.message_type != dhcp_packet.MESSAGE_TYPE_ACK): |
| print "Type of DHCP response is not acknowledgement." |
| return False |
| |
| if ack_packet.get_field(dhcp_packet.FIELD_YOUR_IP) != intended_ip: |
| print "Server didn't give us the IP we expected." |
| return False |
| |
| print "Waiting for the server to finish." |
| server.wait_for_test_to_finish() |
| print "Server agrees that the test is over." |
| if not server.last_test_passed: |
| print "Server is unhappy with the test result." |
| return False |
| |
| print "test_simple_server_exchange PASSED." |
| return True |
| |
| def test_server_dialogue(): |
| server = dhcp_test_server.DhcpTestServer(ingress_address="127.0.0.1", |
| ingress_port=8067, |
| broadcast_address="127.0.0.1", |
| broadcast_port=8068) |
| server.start() |
| ret = False |
| if server.is_healthy: |
| ret = test_simple_server_exchange(server) |
| else: |
| print "Server isn't healthy, aborting." |
| print "Sending server stop() signal." |
| server.stop() |
| print "Stop signal sent." |
| return ret |
| |
| def run_tests(): |
| logger = logging.getLogger("dhcp") |
| logger.setLevel(logging.DEBUG) |
| stream_handler = logging.StreamHandler() |
| stream_handler.setLevel(logging.DEBUG) |
| logger.addHandler(stream_handler) |
| retval = test_packet_serialization() |
| retval &= test_classless_static_route_parsing() |
| retval &= test_classless_static_route_serialization() |
| retval &= test_domain_search_list_parsing() |
| retval &= test_domain_search_list_serialization() |
| retval &= test_broken_domain_search_list_parsing() |
| retval &= test_server_dialogue() |
| if retval: |
| print "All tests PASSED." |
| return 0 |
| else: |
| print "Some tests FAILED" |
| return -1 |
| |
| if __name__ == "__main__": |
| sys.exit(run_tests()) |