autotest: Add tools for writing DHCP tests
Add dhcp_test_server, dhcp_packet, and dhcp_handling_rule.
dhcp_test_server starts up a thread to watch a given port for DHCP
packets, and can be programmed with instances of DhcpHandlingRule to
expect certain packets and report errors. dhcp_packet defines utility
logic to create and parse DHCP packets. There are much more elaborate
comments and example usages at the top of all three files.
Because this is a fairly elaborate piece of logic, I wrote some simple
sanity tests for both DhcpPacket and DhcpTestServer that may be run with
$ python dhcp_packet && echo Test passed.
$ python dhcp_test_server && echo Test passed.
The tests in dhcp_packet.py make sure that packet and serialization
works for discovery packets. Tests in dhcp_test_server.py walk through
a simple test case where the server expects a DISCOVERY packet, and the
client expects a valid response.
For debugging and sanity checking, I've taken packet logs of a
conversation between dhclient and dhcpd negotiating a ip lease. These
logs are in dhcp_test_data/*. I use these logs in the test for
DhcpPacket, but they could conceviably be useful in future testing.
BUG=chromium-os:32809
TEST=as described above
Change-Id: I04c6806e8b02446b0758e507c14ba85f6d10e30f
Reviewed-on: https://gerrit.chromium.org/gerrit/31134
Tested-by: Christopher Wiley <wiley@chromium.org>
Reviewed-by: Paul Stewart <pstew@chromium.org>
Commit-Ready: Christopher Wiley <wiley@chromium.org>
diff --git a/client/cros/dhcp_handling_rule.py b/client/cros/dhcp_handling_rule.py
new file mode 100644
index 0000000..ce7f4ad
--- /dev/null
+++ b/client/cros/dhcp_handling_rule.py
@@ -0,0 +1,98 @@
+# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""
+DHCP handling rules are ways to record expectations for a DhcpTestServer.
+
+When a handling rule reaches the front of the DhcpTestServer handling rule
+queue, the server begins to ask the rule what it should do with each incoming
+DHCP packet (in the form of a DhcpPacket). The handle method is expected to
+return a tuple (response, action) where response indicates whether the packet
+should be ignored or responded to and whether the test failed, succeeded, or is
+continuing. The action part of the tuple refers to whether or not the rule
+should be be removed from the test server's handling rule queue.
+"""
+
+import logging
+
+from autotest_lib.client.cros import dhcp_packet
+
+RESPONSE_FAIL = 0
+RESPONSE_IGNORE = 1
+RESPONSE_IGNORE_SUCCESS = 3
+RESPONSE_RESPOND = 2
+RESPONSE_RESPOND_SUCCESS = 4
+
+ACTION_POP_HANDLER = 0
+ACTION_KEEP_HANDLER = 1
+
+class DhcpHandlingRule(object):
+ def __init__(self):
+ super(DhcpHandlingRule, self).__init__()
+ self._is_final_handler = False
+ self._logger = logging.getLogger("dhcp.handling_rule")
+
+ @property
+ def logger(self):
+ return self._logger
+
+ @property
+ def is_final_handler(self):
+ return self._is_final_handler
+
+ @is_final_handler.setter
+ def is_final_handler(self, value):
+ self._is_final_handler = value
+
+
+ # Override this with your subclass, or all your tests will fail. The
+ # assumption is that the packet passed to this method is a valid DHCP
+ # packet, but not necessarily any particular kind of DHCP packet.
+ def handle(self, packet):
+ return (RESPONSE_FAIL, ACTION_KEEP_HANDLER)
+
+ # Override this if you will ever return RESPONSE_RESPOND_* in handle()
+ # above.
+ def respond(self, packet):
+ return None
+
+class DhcpHandlingRule_RespondToDiscovery(DhcpHandlingRule):
+ def __init__(self,
+ intended_ip,
+ subnet_mask,
+ server_ip,
+ lease_time_seconds):
+ super(DhcpHandlingRule_RespondToDiscovery, self).__init__()
+ self._intended_ip = intended_ip
+ self._subnet_mask = subnet_mask
+ self._server_ip = server_ip
+ self._lease_time_seconds = lease_time_seconds
+
+ def handle(self, packet):
+ if (packet.message_type !=
+ dhcp_packet.OPTION_VALUE_DHCP_MESSAGE_TYPE_DISCOVERY):
+ self.logger.info("Packet type was not DISCOVERY. Ignoring.")
+ return (RESPONSE_IGNORE, ACTION_KEEP_HANDLER)
+ self.logger.info("Received valid DISCOVERY packet. Processing.")
+ action = ACTION_POP_HANDLER
+ response = RESPONSE_RESPOND
+ if self.is_final_handler:
+ response = RESPONSE_RESPOND_SUCCESS
+ return (response, action)
+
+ def respond(self, packet):
+ if (packet.message_type !=
+ dhcp_packet.OPTION_VALUE_DHCP_MESSAGE_TYPE_DISCOVERY):
+ self.logger.error("Server erroneously asked for a response to an "
+ "invalid packet.")
+ return None
+ self.logger.info("Responding to DISCOVERY packet.")
+ packet = dhcp_packet.DhcpPacket.create_offer_packet(
+ packet.transaction_id,
+ packet.client_hw_address,
+ self._intended_ip,
+ self._subnet_mask,
+ self._server_ip,
+ self._lease_time_seconds)
+ return packet
diff --git a/client/cros/dhcp_packet.py b/client/cros/dhcp_packet.py
new file mode 100644
index 0000000..fd2826d
--- /dev/null
+++ b/client/cros/dhcp_packet.py
@@ -0,0 +1,476 @@
+# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""
+Tools for serializing and deserializing DHCP packets.
+
+DhcpPacket is a class that represents a single DHCP packet and contains some
+logic to create and parse binary strings containing on the wire DHCP packets.
+
+While you could call the constructor explicitly, most users should use the
+static factories to construct packets with reasonable default values in most of
+the fields, even if those values are zeros.
+
+For example:
+
+packet = dhcp_packet.create_offer_packet(transaction_id,
+ hwmac_addr,
+ offer_ip,
+ offer_mask,
+ server_ip,
+ lease_time_seconds)
+socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+# I believe that sending to the broadcast address needs special permissions.
+socket.sendto(response_packet.to_binary_string(),
+ ("255.255.255.255", 68))
+
+Note that if you make changes, make sure that the tests in the bottom of this
+file still pass.
+"""
+
+import logging
+import random
+import socket
+import struct
+
+class Option(object):
+ """
+ Represents an option in a DHCP packet. Options may or may not be present
+ and are not parsed into any particular format. This means that the value of
+ options is always in the form of a byte string.
+ """
+ def __init__(self, name, number, size):
+ super(Option, self).__init__()
+ self._name = name
+ self._number = number
+ self._size = size
+
+ @property
+ def name(self):
+ return self._name
+
+ @property
+ def number(self):
+ """
+ Every DHCP option has a number that goes into the packet to indicate
+ which particular option is being encoded in the next few bytes. This
+ property returns that number for each option.
+ """
+ return self._number
+
+ @property
+ def size(self):
+ """
+ The size property is a hint for what kind of size we might expect the
+ option to be. For instance, options with a size of 1 are expected to
+ always be 1 byte long. Negative sizes are variable length fields that
+ are expected to be at least abs(size) bytes long.
+
+ However, the size property is just a hint, and is not enforced or
+ checked in any way.
+ """
+ return self._size
+
+
+class Field(object):
+ """
+ Represents a required field in a DHCP packet. Unlike options, we sometimes
+ parse fields into more meaningful data types. For instance, the hardware
+ type field in an IPv4 packet is parsed into an int rather than being left as
+ a raw byte string of length 1.
+ """
+ def __init__(self, name, wire_format, offset, size):
+ super(Field, self).__init__()
+ self._name = name
+ self._wire_format = wire_format
+ self._offset = offset
+ self._size = size
+
+ @property
+ def name(self):
+ return self._name
+
+ @property
+ def wire_format(self):
+ """
+ The wire format for a field defines how it will be parsed out of a DHCP
+ packet.
+ """
+ return self._wire_format
+
+ @property
+ def offset(self):
+ """
+ The |offset| for a field defines the starting byte of the field in the
+ binary packet string. |offset| is using during parsing, along with
+ |size| to extract the byte string of a field.
+ """
+ return self._offset
+
+ @property
+ def size(self):
+ """
+ Fields in DHCP packets have a fixed size that must be respected. This
+ size property is used in parsing to indicate that |self._size| number of
+ bytes make up this field.
+ """
+ return self._size
+
+
+# This is per RFC 2131. The wording doesn't seem to say that the packets must
+# be this big, but that has been the historic assumption in implementations.
+DHCP_MIN_PACKET_SIZE = 300
+
+# These are required in every DHCP packet. Without these fields, the
+# packet will not even pass DhcpPacket.is_valid
+FIELD_OP = Field("op", "!B", 0, 1)
+FIELD_HWTYPE = Field("htype", "!B", 1, 1)
+FIELD_HWADDR_LEN = Field("hlen", "!B", 2, 1)
+FIELD_RELAY_HOPS = Field("hops", "!B", 3, 1)
+FIELD_TRANSACTION_ID = Field("xid", "!I", 4, 4)
+FIELD_TIME_SINCE_START = Field("secs", "!H", 8, 2)
+FIELD_FLAGS = Field("flags", "!H", 10, 2)
+FIELD_CLIENT_IP = Field("ciaddr", "!4s", 12, 4)
+FIELD_YOUR_IP = Field("yiaddr", "!4s", 16, 4)
+FIELD_SERVER_IP = Field("siaddr", "!4s", 20, 4)
+FIELD_GATEWAY_IP = Field("giaddr", "!4s", 24, 4)
+FIELD_CLIENT_HWADDR = Field("chaddr", "!16s", 28, 16)
+# For legacy BOOTP reasons, there are 192 octets of 0's that
+# come after the chaddr.
+FIELD_MAGIC_COOKIE = Field("magic_cookie", "!I", 236, 4)
+
+OPTION_TIME_OFFSET = Option("time_offset", 2, 4)
+OPTION_ROUTERS = Option("routers", 3, -4)
+OPTION_SUBNET_MASK = Option("subnet_mask", 1, 4)
+# These *_servers (and router) options are actually lists of IPv4
+# addressesexpected to be multiples of 4 octets.
+OPTION_TIME_SERVERS = Option("time_servers", 4, -4)
+OPTION_NAME_SERVERS = Option("name_servers", 5, -4)
+OPTION_DNS_SERVERS = Option("dns_servers", 6, -4)
+OPTION_LOG_SERVERS = Option("log_servers", 7, -4)
+OPTION_COOKIE_SERVERS = Option("cookie_servers", 8, -4)
+OPTION_LPR_SERVERS = Option("lpr_servers", 9, -4)
+OPTION_IMPRESS_SERVERS = Option("impress_servers", 10, -4)
+OPTION_RESOURCE_LOC_SERVERS = Option("resource_loc_servers", 11, -4)
+OPTION_HOST_NAME = Option("host_name", 12, -1)
+OPTION_BOOT_FILE_SIZE = Option("boot_file_size", 13, 2)
+OPTION_MERIT_DUMP_FILE = Option("merit_dump_file", 14, -1)
+OPTION_SWAP_SERVER = Option("domain_name", 15, -1)
+OPTION_DOMAIN_NAME = Option("swap_server", 16, 4)
+OPTION_ROOT_PATH = Option("root_path", 17, -1)
+OPTION_EXTENSIONS = Option("extensions", 18, -1)
+# DHCP options.
+OPTION_REQUESTED_IP = Option("requested_ip", 50, 4)
+OPTION_IP_LEASE_TIME = Option("ip_lease_time", 51, 4)
+OPTION_OPTION_OVERLOAD = Option("option_overload", 52, 1)
+OPTION_DHCP_MESSAGE_TYPE = Option("dhcp_message_type", 53, 1)
+OPTION_SERVER_ID = Option("server_id", 54, 4)
+OPTION_PARAMETER_REQUEST_LIST = Option("parameter_request_list", 55, -1)
+OPTION_MESSAGE = Option("message", 56, -1)
+OPTION_MAX_DHCP_MESSAGE_SIZE = Option("max_dhcp_message_size", 57, 2)
+OPTION_RENEWAL_T1_TIME_VALUE = Option("renewal_t1_time_value", 58, 4)
+OPTION_REBINDING_T2_TIME_VALUE = Option("rebinding_t2_time_value", 59, 4)
+OPTION_VENDOR_ID = Option("vendor_id", 60, -1)
+OPTION_CLIENT_ID = Option("client_id", 61, -2)
+OPTION_TFTP_SERVER_NAME = Option("tftp_server_name", 66, -1)
+OPTION_BOOTFILE_NAME = Option("bootfile_name", 67, -1)
+# Unlike every other option, which are tuples like:
+# <number, length in bytes, data>, the pad and end options are just
+# single bytes "\x00" and "\xff" (without length or data fields).
+OPTION_PAD = 0
+OPTION_END = 255
+
+# All fields are required.
+DHCP_PACKET_FIELDS = [
+ FIELD_OP,
+ FIELD_HWTYPE,
+ FIELD_HWADDR_LEN,
+ FIELD_RELAY_HOPS,
+ FIELD_TRANSACTION_ID,
+ FIELD_TIME_SINCE_START,
+ FIELD_FLAGS,
+ FIELD_CLIENT_IP,
+ FIELD_YOUR_IP,
+ FIELD_SERVER_IP,
+ FIELD_GATEWAY_IP,
+ FIELD_CLIENT_HWADDR,
+ FIELD_MAGIC_COOKIE,
+ ]
+# The op field in an ipv4 packet is either 1 or 2 depending on
+# whether the packet is from a server or from a client.
+FIELD_VALUE_OP_CLIENT_REQUEST = 1
+FIELD_VALUE_OP_SERVER_RESPONSE = 2
+# 1 == 10mb ethernet hardware address type (aka MAC).
+FIELD_VALUE_HWTYPE_10MB_ETH = 1
+# MAC addresses are still 6 bytes long.
+FIELD_VALUE_HWADDR_LEN_10MB_ETH = 6
+FIELD_VALUE_MAGIC_COOKIE = 0x63825363
+
+OPTIONS_START_OFFSET = 240
+# From RFC2132, the valid DHCP message types are:
+OPTION_VALUE_DHCP_MESSAGE_TYPE_DISCOVERY = "\x01"
+OPTION_VALUE_DHCP_MESSAGE_TYPE_OFFER = "\x02"
+OPTION_VALUE_DHCP_MESSAGE_TYPE_REQUEST = "\x03"
+OPTION_VALUE_DHCP_MESSAGE_TYPE_DECLINE = "\x04"
+OPTION_VALUE_DHCP_MESSAGE_TYPE_ACK = "\x05"
+OPTION_VALUE_DHCP_MESSAGE_TYPE_NAK = "\x06"
+OPTION_VALUE_DHCP_MESSAGE_TYPE_RELEASE = "\x07"
+OPTION_VALUE_DHCP_MESSAGE_TYPE_INFORM = "\x08"
+
+# These are possible options that may not be in every packet.
+# Frequently, the client can include a bunch of options that indicate
+# that it would like to receive information about time servers, routers,
+# lpr servers, and much more, but the DHCP server can usually ignore
+# those requests.
+#
+# Eventually, each option is encoded as:
+# <option.number, option.size, [array of option.size bytes]>
+# Unlike fields, which make up a fixed packet format, options can be in
+# any order, except where they cannot. For instance, option 1 must
+# follow option 3 if both are supplied. For this reason, potential
+# options are in this list, and added to the packet in this order every
+# time.
+#
+# size < 0 indicates that this is variable length field of at least
+# abs(length) bytes in size.
+DHCP_PACKET_OPTIONS = [
+ OPTION_TIME_OFFSET,
+ OPTION_ROUTERS,
+ OPTION_SUBNET_MASK,
+ # These *_servers (and router) options are actually lists of
+ # IPv4 addresses expected to be multiples of 4 octets.
+ OPTION_TIME_SERVERS,
+ OPTION_NAME_SERVERS,
+ OPTION_DNS_SERVERS,
+ OPTION_LOG_SERVERS,
+ OPTION_COOKIE_SERVERS,
+ OPTION_LPR_SERVERS,
+ OPTION_IMPRESS_SERVERS,
+ OPTION_RESOURCE_LOC_SERVERS,
+ OPTION_HOST_NAME,
+ OPTION_BOOT_FILE_SIZE,
+ OPTION_MERIT_DUMP_FILE,
+ OPTION_SWAP_SERVER,
+ OPTION_DOMAIN_NAME,
+ OPTION_ROOT_PATH,
+ OPTION_EXTENSIONS,
+ # DHCP options.
+ OPTION_REQUESTED_IP,
+ OPTION_IP_LEASE_TIME,
+ OPTION_OPTION_OVERLOAD,
+ OPTION_DHCP_MESSAGE_TYPE,
+ OPTION_SERVER_ID,
+ OPTION_PARAMETER_REQUEST_LIST,
+ OPTION_MESSAGE,
+ OPTION_MAX_DHCP_MESSAGE_SIZE,
+ OPTION_RENEWAL_T1_TIME_VALUE,
+ OPTION_REBINDING_T2_TIME_VALUE,
+ OPTION_VENDOR_ID,
+ OPTION_CLIENT_ID,
+ OPTION_TFTP_SERVER_NAME,
+ OPTION_BOOTFILE_NAME,
+ ]
+
+def get_dhcp_option_by_number(number):
+ for option in DHCP_PACKET_OPTIONS:
+ if option.number == number:
+ return option
+ return None
+
+class DhcpPacket(object):
+ @staticmethod
+ def create_discovery_packet(hwmac_addr):
+ """
+ Create a discovery packet.
+
+ Fill in fields of a DHCP packet as if it were being sent from
+ |hwmac_addr|. Requests subnet masks, broadcast addresses, router
+ addresses, dns addresses, domain search lists, client host name, and NTP
+ server addresses. Note that the offer packet received in response to
+ this packet will probably not contain all of that information.
+ """
+ # MAC addresses are actually only 6 bytes long, however, for whatever
+ # reason, DHCP allocated 12 bytes to this field. Ease the burden on
+ # developers and hide this detail.
+ while len(hwmac_addr) < 12:
+ hwmac_addr += chr(OPTION_PAD)
+
+ packet = DhcpPacket()
+ packet.set_field(FIELD_OP.name, FIELD_VALUE_OP_CLIENT_REQUEST)
+ packet.set_field(FIELD_HWTYPE.name, FIELD_VALUE_HWTYPE_10MB_ETH)
+ packet.set_field(FIELD_HWADDR_LEN.name, FIELD_VALUE_HWADDR_LEN_10MB_ETH)
+ packet.set_field(FIELD_RELAY_HOPS.name, 0)
+ packet.set_field(FIELD_TRANSACTION_ID.name, random.getrandbits(32))
+ packet.set_field(FIELD_TIME_SINCE_START.name, 0)
+ packet.set_field(FIELD_FLAGS.name, 0)
+ packet.set_field(FIELD_CLIENT_IP.name, "\x00\x00\x00\x00")
+ packet.set_field(FIELD_YOUR_IP.name, "\x00\x00\x00\x00")
+ packet.set_field(FIELD_SERVER_IP.name, "\x00\x00\x00\x00")
+ packet.set_field(FIELD_GATEWAY_IP.name, "\x00\x00\x00\x00")
+ packet.set_field(FIELD_CLIENT_HWADDR.name, hwmac_addr)
+ packet.set_field(FIELD_MAGIC_COOKIE.name, FIELD_VALUE_MAGIC_COOKIE)
+ packet.set_option(OPTION_DHCP_MESSAGE_TYPE.name,
+ OPTION_VALUE_DHCP_MESSAGE_TYPE_DISCOVERY)
+ # We're requesting (in order) the subnet mask, broadcast addr, router
+ # addr, dns addr, domain search list, client host name, and ntp server
+ # addr.
+ packet.set_option(OPTION_PARAMETER_REQUEST_LIST.name,
+ "\x01\x1c\x03\x06w\x0c*")
+ return packet
+
+ @staticmethod
+ def create_offer_packet(transaction_id,
+ hwmac_addr,
+ offer_ip,
+ offer_subnet_mask,
+ server_ip,
+ lease_time_seconds):
+ """
+ Create an offer packet, given some fields that tie the packet to a
+ particular offer.
+ """
+ packet = DhcpPacket()
+ packet.set_field(FIELD_OP.name, FIELD_VALUE_OP_SERVER_RESPONSE)
+ packet.set_field(FIELD_HWTYPE.name, FIELD_VALUE_HWTYPE_10MB_ETH)
+ packet.set_field(FIELD_HWADDR_LEN.name, FIELD_VALUE_HWADDR_LEN_10MB_ETH)
+ # This has something to do with relay agents
+ packet.set_field(FIELD_RELAY_HOPS.name, 0)
+ packet.set_field(FIELD_TRANSACTION_ID.name, transaction_id)
+ packet.set_field(FIELD_TIME_SINCE_START.name, 0)
+ packet.set_field(FIELD_FLAGS.name, 0)
+ packet.set_field(FIELD_CLIENT_IP.name, "\x00\x00\x00\x00")
+ packet.set_field(FIELD_YOUR_IP.name, socket.inet_aton(offer_ip))
+ packet.set_field(FIELD_SERVER_IP.name, socket.inet_aton(server_ip))
+ packet.set_field(FIELD_GATEWAY_IP.name, "\x00\x00\x00\x00")
+ packet.set_field(FIELD_CLIENT_HWADDR.name, hwmac_addr)
+ packet.set_field(FIELD_MAGIC_COOKIE.name, FIELD_VALUE_MAGIC_COOKIE)
+ packet.set_option(OPTION_DHCP_MESSAGE_TYPE.name,
+ OPTION_VALUE_DHCP_MESSAGE_TYPE_OFFER)
+ packet.set_option(OPTION_SUBNET_MASK.name,
+ socket.inet_aton(offer_subnet_mask))
+ packet.set_option(OPTION_IP_LEASE_TIME.name,
+ struct.pack("!I", int(lease_time_seconds)))
+ return packet
+
+ def __init__(self, byte_str=None):
+ """
+ Create a DhcpPacket, filling in fields from a byte string if given.
+
+ Assumes that the packet starts at offset 0 in the binary string. This
+ includes the fields and options. Fields are different from options in
+ that we bother to decode these into more usable data types like
+ integers rather than keeping them as raw byte strings. Fields are also
+ required to exist, unlike options which may not.
+
+ Each option is encoded as a tuple <option number, length, data> where
+ option number is a byte indicating the type of option, length indicates
+ the number of bytes in the data for option, and data is a length array
+ of bytes. The only exceptions to this rule are the 0 and 255 options,
+ which have 0 data length, and no length byte. These tuples are then
+ simply appended to each other. This encoding is the same as the BOOTP
+ vendor extention field encoding.
+ """
+ super(DhcpPacket, self).__init__()
+ self._options = {}
+ self._fields = {}
+ self._logger = logging.getLogger("dhcp.packet")
+ if byte_str is None:
+ return
+ if len(byte_str) < OPTIONS_START_OFFSET + 1:
+ self._logger.error("Invalid byte string for packet.")
+ return
+ for field in DHCP_PACKET_FIELDS:
+ self._fields[field.name] = struct.unpack(field.wire_format,
+ byte_str[field.offset :
+ field.offset +
+ field.size])[0]
+ offset = OPTIONS_START_OFFSET
+ while offset < len(byte_str) and ord(byte_str[offset]) != OPTION_END:
+ data_type = ord(byte_str[offset])
+ offset += 1
+ if data_type == OPTION_PAD:
+ continue
+ data_length = ord(byte_str[offset])
+ offset += 1
+ data = byte_str[offset: offset + data_length]
+ offset += data_length
+ option_bunch = get_dhcp_option_by_number(data_type)
+ if option_bunch is None:
+ # Unsupported data type, of which we have many.
+ continue
+ self._options[option_bunch.name] = data
+
+ @property
+ def client_hw_address(self):
+ return self._fields["chaddr"]
+
+ @property
+ def is_valid(self):
+ for field in DHCP_PACKET_FIELDS:
+ if (not field.name in self._fields or
+ self._fields[field.name] is None):
+ self._logger.info("Missing field %s in packet." % field.name)
+ return False
+ if (self._fields[FIELD_MAGIC_COOKIE.name] !=
+ FIELD_VALUE_MAGIC_COOKIE):
+ return False
+ return True
+
+ @property
+ def message_type(self):
+ if not "dhcp_message_type" in self._options:
+ return -1
+ return self._options["dhcp_message_type"]
+
+ @property
+ def transaction_id(self):
+ return self._fields["xid"]
+
+ def get_field(self, field_name):
+ if field_name in self._fields:
+ return self._fields[field_name]
+ return None
+
+ def get_option(self, option_name):
+ if option_name in self._options:
+ return self._options[option_name]
+ return None
+
+ def set_field(self, field_name, field_value):
+ self._fields[field_name] = field_value
+
+ def set_option(self, option_name, option_value):
+ self._options[option_name] = option_value
+
+ def to_binary_string(self):
+ if not self.is_valid:
+ return None
+ # A list of byte strings to be joined into a single string at the end.
+ data = []
+ offset = 0
+ for field in DHCP_PACKET_FIELDS:
+ field_data = struct.pack(field.wire_format,
+ self._fields[field.name])
+ while offset < field.offset:
+ data.append("\x00")
+ offset += 1
+ data.append(field_data)
+ offset += field.size
+ # Last field processed is the magic cookie, so we're ready for options.
+ # Have to process options
+ for option in DHCP_PACKET_OPTIONS:
+ if not option.name in self._options:
+ continue
+ data.append(struct.pack("BB",
+ option.number,
+ len(self._options[option.name])))
+ offset += 2
+ data.append(self._options[option.name])
+ offset += len(self._options[option.name])
+ data.append(chr(OPTION_END))
+ offset += 1
+ while offset < DHCP_MIN_PACKET_SIZE:
+ data.append(chr(OPTION_PAD))
+ offset += 1
+ return "".join(data)
diff --git a/client/cros/dhcp_test_data/README b/client/cros/dhcp_test_data/README
new file mode 100644
index 0000000..aa4f8a7
--- /dev/null
+++ b/client/cros/dhcp_test_data/README
@@ -0,0 +1,5 @@
+These are dhcp packets as sent by dhclient and dhcpd v3.1.3. They make up a
+conversation where the client asks for an address on 10.9.8.xxx and the server
+grants such an address.
+
+We use these logs as part of sanity checking that DhcpPacket parsing works.
diff --git a/client/cros/dhcp_test_data/dhcp_discovery.log b/client/cros/dhcp_test_data/dhcp_discovery.log
new file mode 100644
index 0000000..713371b
--- /dev/null
+++ b/client/cros/dhcp_test_data/dhcp_discovery.log
Binary files differ
diff --git a/client/cros/dhcp_test_data/dhcp_offer.log b/client/cros/dhcp_test_data/dhcp_offer.log
new file mode 100644
index 0000000..b8105e7
--- /dev/null
+++ b/client/cros/dhcp_test_data/dhcp_offer.log
Binary files differ
diff --git a/client/cros/dhcp_test_data/dhcp_reply.log b/client/cros/dhcp_test_data/dhcp_reply.log
new file mode 100644
index 0000000..2ccdc12
--- /dev/null
+++ b/client/cros/dhcp_test_data/dhcp_reply.log
Binary files differ
diff --git a/client/cros/dhcp_test_data/dhcp_request.log b/client/cros/dhcp_test_data/dhcp_request.log
new file mode 100644
index 0000000..9d189b5
--- /dev/null
+++ b/client/cros/dhcp_test_data/dhcp_request.log
Binary files differ
diff --git a/client/cros/dhcp_test_server.py b/client/cros/dhcp_test_server.py
new file mode 100644
index 0000000..975ba57
--- /dev/null
+++ b/client/cros/dhcp_test_server.py
@@ -0,0 +1,289 @@
+# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""
+Programmable testing DHCP server.
+
+Simple DHCP server you can program with expectations of future packets and
+responses to those packets. The server is basically a thin wrapper around a
+server socket with some utility logic to make setting up tests easier. To write
+a test, you start a server, construct a sequence of handling rules.
+
+Handling rules let you set up expectations of future packets of certain types.
+Handling rules are processed in order, and only the first remaining handler
+handles a given packet. In theory you could write the entire test into a single
+handling rule and keep an internal state machine for how far that handler has
+gotten through the test. This would be poor style however. Correct style is to
+write (or reuse) a handler for each packet the server should see, leading us to
+a happy land where any conceivable packet handler has already been written for
+us.
+
+Example usage:
+
+# Start up the DHCP server, which will ignore packets until a test is started
+server = DhcpTestServer(interface="veth_master")
+server.start()
+
+# Given a list of handling rules, start a test with a 30 sec timeout.
+handling_rules = []
+handling_rules.append(DhcpHandlingRule_RespondToDiscovery(intended_ip,
+ intended_subnet_mask,
+ dhcp_server_ip,
+ lease_time_seconds)
+server.start_test(handling_rules, 30.0)
+
+# Trigger DHCP clients to do various test related actions
+...
+
+# Get results
+server.wait_for_test_to_finish()
+if (server.last_test_passed):
+ ...
+else:
+ ...
+
+
+Note that if you make changes, make sure that the tests in dhcp_unittest.py
+still pass.
+"""
+
+import logging
+import socket
+import threading
+import time
+import traceback
+
+from autotest_lib.client.cros import dhcp_packet
+from autotest_lib.client.cros import dhcp_handling_rule
+
+# From socket.h
+SO_BINDTODEVICE = 25
+
+class DhcpTestServer(threading.Thread):
+ def __init__(self,
+ interface=None,
+ ingress_address="<broadcast>",
+ ingress_port=67,
+ broadcast_address="255.255.255.255",
+ broadcast_port=68):
+ super(DhcpTestServer, self).__init__()
+ self._mutex = threading.Lock()
+ self._ingress_address = ingress_address
+ self._ingress_port = ingress_port
+ self._broadcast_port = broadcast_port
+ self._broadcast_address = broadcast_address
+ self._socket = None
+ self._interface = interface
+ self._stopped = False
+ self._test_in_progress = False
+ self._last_test_passed = False
+ self._test_timeout = 0
+ self._handling_rules = []
+ self._logger = logging.getLogger("dhcp.test_server")
+ self.daemon = False
+
+ @property
+ def stopped(self):
+ with self._mutex:
+ return self._stopped
+
+ @property
+ def is_healthy(self):
+ with self._mutex:
+ return self._socket is not None
+
+ @property
+ def test_in_progress(self):
+ with self._mutex:
+ return self._test_in_progress
+
+ @property
+ def last_test_passed(self):
+ with self._mutex:
+ return self._last_test_passed
+
+ def start(self):
+ """
+ Start the DHCP server. Only call this once.
+ """
+ if self.is_alive():
+ return False
+ self._logger.info("DhcpTestServer started; opening sockets.")
+ try:
+ self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ self._logger.info("Opening socket on '%s' port %d." %
+ (self._ingress_address, self._ingress_port))
+ self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
+ if self._interface is not None:
+ self._logger.info("Binding to %s" % self._interface)
+ self._socket.setsockopt(socket.SOL_SOCKET,
+ SO_BINDTODEVICE,
+ self._interface)
+ self._socket.bind((self._ingress_address, self._ingress_port))
+ # Wait 100 ms for a packet, then return, thus keeping the thread
+ # active but mostly idle.
+ self._socket.settimeout(0.1)
+ except socket.error, socket_error:
+ self._logger.error("Socket error: %s." % str(socket_error))
+ self._logger.error(traceback.format_exc())
+ if not self._socket is None:
+ self._socket.close()
+ self._socket = None
+ self._logger.error("Failed to open server socket. Aborting.")
+ return
+ super(DhcpTestServer, self).start()
+
+ def stop(self):
+ """
+ Stop the DHCP server and free its socket.
+ """
+ with self._mutex:
+ self._stopped = True
+
+ def start_test(self, handling_rules, test_timeout_seconds):
+ """
+ Start a new test using |handling_rules|. The server will call the
+ test successfull if it receives a RESPONSE_IGNORE_SUCCESS (or
+ RESPONSE_RESPOND_SUCCESS) from a handling_rule before
+ |test_timeout_seconds| passes. If the timeout passes without that
+ message, the server runs out of handling rules, or a handling rule
+ return RESPONSE_FAIL, the test is ended and marked as not passed.
+
+ All packets received before start_test() is called are received and
+ ignored.
+ """
+ with self._mutex:
+ self._test_timeout = time.time() + test_timeout_seconds
+ self._handling_rules = handling_rules
+ self._test_in_progress = True
+ self._last_test_passed = False
+
+ def wait_for_test_to_finish(self):
+ """
+ Block on the test finishing in a CPU friendly way. Timeouts, successes,
+ and failures count as finishes.
+ """
+ while self.test_in_progress:
+ time.sleep(0.1)
+
+ def abort_test(self):
+ """
+ Abort a test prematurely, counting the test as a failure.
+ """
+ with self._mutex:
+ self._logger.info("Manually aborting test.")
+ self._end_test_unsafe(False)
+
+ def _teardown(self):
+ with self._mutex:
+ self._socket.close()
+ self._socket = None
+
+ def _end_test_unsafe(self, passed):
+ if not self._test_in_progress:
+ return
+ if passed:
+ self._logger.info("DHCP server says test passed.")
+ else:
+ self._logger.info("DHCP server says test failed.")
+ self._test_in_progress = False
+ self._last_test_passed = passed
+
+ def _send_response_unsafe(self, packet):
+ if packet is None:
+ self._logger.error("Handling rule failed to return a packet.")
+ return False
+ self._logger.debug("Sending response with options: %s" %
+ str(packet._options))
+ self._logger.debug("Sending response with fields: %s" %
+ str(packet._fields))
+ binary_string = packet.to_binary_string()
+ if binary_string is None or len(binary_string) < 1:
+ self._logger.error("Packet failed to serialize to binary string.")
+ return False
+
+ self._socket.sendto(binary_string,
+ (self._broadcast_address, self._broadcast_port))
+ return True
+
+ def _loop_body(self):
+ with self._mutex:
+ if self._test_in_progress and self._test_timeout < time.time():
+ # The test has timed out, so we abort it. However, we should
+ # continue to accept packets, so we fall through.
+ self._end_test_unsafe(False)
+ try:
+ data, _ = self._socket.recvfrom(1024)
+ self._logger.info("Server received packet of length %d." %
+ len(data))
+ except socket.timeout:
+ # No packets available, lets return and see if the server has
+ # been shut down in the meantime.
+ return
+
+ # Receive packets when no test is in progress, just don't process
+ # them.
+ if not self._test_in_progress:
+ return
+
+ packet = dhcp_packet.DhcpPacket(byte_str=data)
+ if not packet.is_valid:
+ self._logger.warning("Server received an invalid packet over a "
+ "DHCP port?")
+ return
+
+ if len(self._handling_rules) < 1:
+ self._logger.info("No handling rule for packet: %s." %
+ str(packet))
+ self._end_test_unsafe(False)
+ return
+
+ handling_rule = self._handling_rules[0]
+ (handling_code, action) = handling_rule.handle(packet)
+ if action == dhcp_handling_rule.ACTION_POP_HANDLER:
+ self._handling_rules.pop(0)
+ if handling_code == dhcp_handling_rule.RESPONSE_IGNORE:
+ pass
+ elif handling_code == dhcp_handling_rule.RESPONSE_IGNORE_SUCCESS:
+ self._end_test_unsafe(True)
+ elif handling_code == dhcp_handling_rule.RESPONSE_RESPOND:
+ if not self._send_response_unsafe(
+ handling_rule.respond(packet)):
+ self._end_test_unsafe(False)
+ elif handling_code == dhcp_handling_rule.RESPONSE_RESPOND_SUCCESS:
+ response = handling_rule.respond(packet)
+ self._end_test_unsafe(self._send_response_unsafe(response))
+ elif handling_code == dhcp_handling_rule.RESPONSE_FAIL:
+ self._logger.info("Handling rule %s rejected packet %s." %
+ (handling_rule, packet))
+ self._end_test_unsafe(False)
+ else:
+ self._logger.info("Unknown code %d "
+ "returned from handling rule %s." %
+ (handling_code, handling_rule))
+ self._end_test_unsafe(False)
+
+ def run(self):
+ """
+ Main method of the thread. Never call this directly, since it assumes
+ some setup done in start().
+ """
+ with self._mutex:
+ if self._socket is None:
+ self._logger.error("Failed to create server socket, exiting.")
+ return
+
+ self._logger.info("DhcpTestServer entering handling loop.")
+ while not self.stopped:
+ self._loop_body()
+ # Python does not have waiting queues on Lock objects. Give other
+ # threads a change to hold the mutex by forcibly releasing the GIL
+ # while we sleep.
+ time.sleep(0.01)
+ with self._mutex:
+ self._end_test_unsafe(False)
+ self._logger.info("DhcpTestServer closing sockets.")
+ self._teardown()
+ self._logger.info("DhcpTestServer exiting.")
diff --git a/client/cros/dhcp_unittest.py b/client/cros/dhcp_unittest.py
new file mode 100755
index 0000000..0c0b863
--- /dev/null
+++ b/client/cros/dhcp_unittest.py
@@ -0,0 +1,138 @@
+#!/usr/bin/python
+
+# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import logging
+import socket
+import sys
+import time
+
+from autotest_lib.client.cros import dhcp_handling_rule
+from autotest_lib.client.cros import dhcp_packet
+from autotest_lib.client.cros import dhcp_test_server
+
+TEST_DATA_PATH_PREFIX = "client/cros/dhcp_test_data/"
+
+def bin2hex(byte_str, justification=20):
+ """
+ Turn big hex strings into prettier strings of hex bytes. Group those hex
+ bytes into lines justification bytes long.
+ """
+ chars = ["x" + (hex(ord(c))[2:].zfill(2)) for c in byte_str]
+ groups = []
+ for i in xrange(0, len(chars), justification):
+ groups.append("".join(chars[i:i+justification]))
+ return "\n".join(groups)
+
+def test_packet_serialization():
+ log_file = open(TEST_DATA_PATH_PREFIX + "dhcp_discovery.log", "rb")
+ binary_discovery_packet = log_file.read()
+ log_file.close()
+ discovery_packet = dhcp_packet.DhcpPacket(byte_str=binary_discovery_packet)
+ if not discovery_packet.is_valid:
+ return False
+ generated_string = discovery_packet.to_binary_string()
+ if generated_string is None:
+ print "Failed to generate string from packet object."
+ return False
+ if generated_string != binary_discovery_packet:
+ print "Packets didn't match: "
+ print "Generated: \n%s" % bin2hex(generated_string)
+ print "Expected: \n%s" % bin2hex(binary_discovery_packet)
+ return False
+ print "test_packet_serialization PASSED"
+ return True
+
+def test_simple_server_exchange(server):
+ intended_ip = "127.0.0.42"
+ intended_subnet_mask = "255.255.255.0"
+ server_ip = "127.0.0.1"
+ lease_time_seconds = 60
+ test_timeout = 3.0
+ handling_rule = dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery(
+ intended_ip,
+ intended_subnet_mask,
+ server_ip,
+ lease_time_seconds)
+ handling_rule.is_final_handler = True
+ server.start_test([handling_rule], test_timeout)
+ discovery_message = dhcp_packet.DhcpPacket.create_discovery_packet(
+ "\x01\x02\x03\x04\x05\x06")
+ # Put these ports at 8067/8068 to avoid requiring root permissions.
+ client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ client_socket.bind(("127.0.0.1", 8068))
+ client_socket.settimeout(0.1)
+ client_socket.sendto(discovery_message.to_binary_string(),
+ ("127.0.0.1", 8067))
+ data = None
+ start_time = time.time()
+ while data is None and start_time + test_timeout > time.time():
+ try:
+ data, _ = client_socket.recvfrom(1024)
+ except socket.timeout:
+ pass # We expect many timeouts.
+ if data is None:
+ print "Timed out before we received a response from the server."
+ return False
+
+ print "Client received a packet of length %d from the server." % len(data)
+ response_packet = dhcp_packet.DhcpPacket(byte_str=data)
+ if not response_packet.is_valid:
+ print "Received an invalid response from DHCP server."
+ return False
+
+ if (response_packet.message_type !=
+ dhcp_packet.OPTION_VALUE_DHCP_MESSAGE_TYPE_OFFER):
+ print "Type of DHCP response is not offer."
+ return False
+
+ if (response_packet.get_field("yiaddr") !=
+ socket.inet_aton(intended_ip)):
+ print "Server didn't offer the IP we expected."
+ return False
+
+ print "Packet looks good to the client, waiting for server to finish."
+ server.wait_for_test_to_finish()
+ print "Server agrees that the test is over."
+ if not server.last_test_passed:
+ print "Server is unhappy with the test result."
+ return False
+
+ print "test_simple_server_exchange PASSED"
+ return True
+
+def test_server_dialogue():
+ server = dhcp_test_server.DhcpTestServer(ingress_address="127.0.0.1",
+ ingress_port=8067,
+ broadcast_address="127.0.0.1",
+ broadcast_port=8068)
+ server.start()
+ ret = False
+ if server.is_healthy:
+ ret = test_simple_server_exchange(server)
+ else:
+ print "Server isn't healthy, aborting."
+ print "Sending server stop() signal."
+ server.stop()
+ print "Stop signal sent."
+ return ret
+
+def run_tests():
+ logger = logging.getLogger("dhcp")
+ logger.setLevel(logging.DEBUG)
+ stream_handler = logging.StreamHandler()
+ stream_handler.setLevel(logging.DEBUG)
+ logger.addHandler(stream_handler)
+ retval = test_packet_serialization()
+ retval &= test_server_dialogue()
+ if retval:
+ print "All tests PASSED."
+ return 0
+ else:
+ print "Some tests FAILED"
+ return -1
+
+if __name__ == "__main__":
+ sys.exit(run_tests())