blob: 3114c07eacbb5195025d8cbac14388babae8658a [file] [log] [blame]
#!/usr/bin/python
"""
KVM configuration file utility functions.
@copyright: Red Hat 2008-2009
"""
import logging, re, os, sys, StringIO, optparse
import common
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib import logging_config, logging_manager
class KvmLoggingConfig(logging_config.LoggingConfig):
def configure_logging(self, results_dir=None, verbose=False):
super(KvmLoggingConfig, self).configure_logging(use_console=True,
verbose=verbose)
class config:
"""
Parse an input file or string that follows the KVM Test Config File format
and generate a list of dicts that will be later used as configuration
parameters by the the KVM tests.
@see: http://www.linux-kvm.org/page/KVM-Autotest/Test_Config_File
"""
def __init__(self, filename=None, debug=False):
"""
Initialize the list and optionally parse filename.
@param filename: Path of the file that will be taken.
@param debug: Whether to turn debugging output.
"""
self.list = [{"name": "", "shortname": "", "depend": []}]
self.debug = debug
self.filename = filename
if filename:
self.parse_file(filename)
def parse_file(self, filename):
"""
Parse filename, return the resulting list and store it in .list. If
filename does not exist, raise an exception.
@param filename: Path of the configuration file.
"""
if not os.path.exists(filename):
raise Exception, "File %s not found" % filename
self.filename = filename
file = open(filename, "r")
self.list = self.parse(file, self.list)
file.close()
return self.list
def parse_string(self, str):
"""
Parse a string, return the resulting list and store it in .list.
@param str: String that will be parsed.
"""
file = StringIO.StringIO(str)
self.list = self.parse(file, self.list)
file.close()
return self.list
def get_list(self):
"""
Return the list of dictionaries. This should probably be called after
parsing something.
"""
return self.list
def match(self, filter, dict):
"""
Return True if dict matches filter.
@param filter: A regular expression that defines the filter.
@param dict: Dictionary that will be inspected.
"""
filter = re.compile(r"(\.|^)" + filter + r"(\.|$)")
return bool(filter.search(dict["name"]))
def filter(self, filter, list=None):
"""
Filter a list of dicts.
@param filter: A regular expression that will be used as a filter.
@param list: A list of dictionaries that will be filtered.
"""
if list is None:
list = self.list
return [dict for dict in list if self.match(filter, dict)]
def split_and_strip(self, str, sep="="):
"""
Split str and strip quotes from the resulting parts.
@param str: String that will be processed
@param sep: Separator that will be used to split the string
"""
temp = str.split(sep, 1)
for i in range(len(temp)):
temp[i] = temp[i].strip()
if re.findall("^\".*\"$", temp[i]):
temp[i] = temp[i].strip("\"")
elif re.findall("^\'.*\'$", temp[i]):
temp[i] = temp[i].strip("\'")
return temp
def get_next_line(self, file):
"""
Get the next non-empty, non-comment line in a file like object.
@param file: File like object
@return: If no line is available, return None.
"""
while True:
line = file.readline()
if line == "": return None
stripped_line = line.strip()
if len(stripped_line) > 0 \
and not stripped_line.startswith('#') \
and not stripped_line.startswith('//'):
return line
def get_next_line_indent(self, file):
"""
Return the indent level of the next non-empty, non-comment line in file.
@param file: File like object.
@return: If no line is available, return -1.
"""
pos = file.tell()
line = self.get_next_line(file)
if not line:
file.seek(pos)
return -1
line = line.expandtabs()
indent = 0
while line[indent] == ' ':
indent += 1
file.seek(pos)
return indent
def add_name(self, str, name, append=False):
"""
Add name to str with a separator dot and return the result.
@param str: String that will be processed
@param name: name that will be appended to the string.
@return: If append is True, append name to str.
Otherwise, pre-pend name to str.
"""
if str == "":
return name
# Append?
elif append:
return str + "." + name
# Prepend?
else:
return name + "." + str
def parse_variants(self, file, list, subvariants=False, prev_indent=-1):
"""
Read and parse lines from file like object until a line with an indent
level lower than or equal to prev_indent is encountered.
@brief: Parse a 'variants' or 'subvariants' block from a file-like
object.
@param file: File-like object that will be parsed
@param list: List of dicts to operate on
@param subvariants: If True, parse in 'subvariants' mode;
otherwise parse in 'variants' mode
@param prev_indent: The indent level of the "parent" block
@return: The resulting list of dicts.
"""
new_list = []
while True:
indent = self.get_next_line_indent(file)
if indent <= prev_indent:
break
indented_line = self.get_next_line(file).rstrip()
line = indented_line.strip()
# Get name and dependencies
temp = line.strip("- ").split(":")
name = temp[0]
if len(temp) == 1:
dep_list = []
else:
dep_list = temp[1].split()
# See if name should be added to the 'shortname' field
add_to_shortname = True
if name.startswith("@"):
name = name.strip("@")
add_to_shortname = False
# Make a deep copy of list
temp_list = []
for dict in list:
new_dict = dict.copy()
new_dict["depend"] = dict["depend"][:]
temp_list.append(new_dict)
if subvariants:
# If we're parsing 'subvariants', first modify the list
self.__modify_list_subvariants(temp_list, name, dep_list,
add_to_shortname)
temp_list = self.parse(file, temp_list,
restricted=True, prev_indent=indent)
else:
# If we're parsing 'variants', parse before modifying the list
if self.debug:
self.__debug_print(indented_line,
"Entering variant '%s' "
"(variant inherits %d dicts)" %
(name, len(list)))
temp_list = self.parse(file, temp_list,
restricted=False, prev_indent=indent)
self.__modify_list_variants(temp_list, name, dep_list,
add_to_shortname)
new_list += temp_list
return new_list
def parse(self, file, list, restricted=False, prev_indent=-1):
"""
Read and parse lines from file until a line with an indent level lower
than or equal to prev_indent is encountered.
@brief: Parse a file-like object.
@param file: A file-like object
@param list: A list of dicts to operate on (list is modified in
place and should not be used after the call)
@param restricted: if True, operate in restricted mode
(prohibit 'variants')
@param prev_indent: the indent level of the "parent" block
@return: Return the resulting list of dicts.
@note: List is destroyed and should not be used after the call.
Only the returned list should be used.
"""
while True:
indent = self.get_next_line_indent(file)
if indent <= prev_indent:
break
indented_line = self.get_next_line(file).rstrip()
line = indented_line.strip()
words = line.split()
len_list = len(list)
# Look for a known operator in the line
operators = ["?+=", "?<=", "?=", "+=", "<=", "="]
op_found = None
op_pos = len(line)
for op in operators:
pos = line.find(op)
if pos >= 0 and pos < op_pos:
op_found = op
op_pos = pos
# Found an operator?
if op_found:
if self.debug and not restricted:
self.__debug_print(indented_line,
"Parsing operator (%d dicts in current "
"context)" % len_list)
(left, value) = self.split_and_strip(line, op_found)
filters_and_key = self.split_and_strip(left, ":")
filters = filters_and_key[:-1]
key = filters_and_key[-1]
filtered_list = list
for filter in filters:
filtered_list = self.filter(filter, filtered_list)
# Apply the operation to the filtered list
if op_found == "=":
for dict in filtered_list:
dict[key] = value
elif op_found == "+=":
for dict in filtered_list:
dict[key] = dict.get(key, "") + value
elif op_found == "<=":
for dict in filtered_list:
dict[key] = value + dict.get(key, "")
elif op_found.startswith("?"):
exp = re.compile("^(%s)$" % key)
if op_found == "?=":
for dict in filtered_list:
for key in dict.keys():
if exp.match(key):
dict[key] = value
elif op_found == "?+=":
for dict in filtered_list:
for key in dict.keys():
if exp.match(key):
dict[key] = dict.get(key, "") + value
elif op_found == "?<=":
for dict in filtered_list:
for key in dict.keys():
if exp.match(key):
dict[key] = value + dict.get(key, "")
# Parse 'no' and 'only' statements
elif words[0] == "no" or words[0] == "only":
if len(words) <= 1:
continue
filters = words[1:]
filtered_list = []
if words[0] == "no":
for dict in list:
for filter in filters:
if self.match(filter, dict):
break
else:
filtered_list.append(dict)
if words[0] == "only":
for dict in list:
for filter in filters:
if self.match(filter, dict):
filtered_list.append(dict)
break
list = filtered_list
if self.debug and not restricted:
self.__debug_print(indented_line,
"Parsing no/only (%d dicts in current "
"context, %d remain)" %
(len_list, len(list)))
# Parse 'variants'
elif line == "variants:":
# 'variants' not allowed in restricted mode
# (inside an exception or inside subvariants)
if restricted:
e_msg = "Using variants in this context is not allowed"
raise error.AutotestError(e_msg)
if self.debug and not restricted:
self.__debug_print(indented_line,
"Entering variants block (%d dicts in "
"current context)" % len_list)
list = self.parse_variants(file, list, subvariants=False,
prev_indent=indent)
# Parse 'subvariants' (the block is parsed for each dict
# separately)
elif line == "subvariants:":
if self.debug and not restricted:
self.__debug_print(indented_line,
"Entering subvariants block (%d dicts in "
"current context)" % len_list)
new_list = []
# Remember current file position
pos = file.tell()
# Read the lines in any case
self.parse_variants(file, [], subvariants=True,
prev_indent=indent)
# Iterate over the list...
for index in range(len(list)):
# Revert to initial file position in this 'subvariants'
# block
file.seek(pos)
# Everything inside 'subvariants' should be parsed in
# restricted mode
new_list += self.parse_variants(file, list[index:index+1],
subvariants=True,
prev_indent=indent)
list = new_list
# Parse 'include' statements
elif words[0] == "include":
if len(words) <= 1:
continue
if self.debug and not restricted:
self.__debug_print(indented_line,
"Entering file %s" % words[1])
if self.filename:
filename = os.path.join(os.path.dirname(self.filename),
words[1])
if os.path.exists(filename):
new_file = open(filename, "r")
list = self.parse(new_file, list, restricted)
new_file.close()
if self.debug and not restricted:
self.__debug_print("", "Leaving file %s" % words[1])
else:
logging.warning("Cannot include %s -- file not found",
filename)
else:
logging.warning("Cannot include %s because no file is "
"currently open", words[1])
# Parse multi-line exceptions
# (the block is parsed for each dict separately)
elif line.endswith(":"):
if self.debug and not restricted:
self.__debug_print(indented_line,
"Entering multi-line exception block "
"(%d dicts in current context outside "
"exception)" % len_list)
line = line.strip(":")
new_list = []
# Remember current file position
pos = file.tell()
# Read the lines in any case
self.parse(file, [], restricted=True, prev_indent=indent)
# Iterate over the list...
for index in range(len(list)):
if self.match(line, list[index]):
# Revert to initial file position in this
# exception block
file.seek(pos)
# Everything inside an exception should be parsed in
# restricted mode
new_list += self.parse(file, list[index:index+1],
restricted=True,
prev_indent=indent)
else:
new_list += list[index:index+1]
list = new_list
return list
def __debug_print(self, str1, str2=""):
"""
Nicely print two strings and an arrow.
@param str1: First string
@param str2: Second string
"""
if str2:
str = "%-50s ---> %s" % (str1, str2)
else:
str = str1
logging.debug(str)
def __modify_list_variants(self, list, name, dep_list, add_to_shortname):
"""
Make some modifications to list, as part of parsing a 'variants' block.
@param list: List to be processed
@param name: Name to be prepended to the dictionary's 'name' key
@param dep_list: List of dependencies to be added to the dictionary's
'depend' key
@param add_to_shortname: Boolean indicating whether name should be
prepended to the dictionary's 'shortname' key as well
"""
for dict in list:
# Prepend name to the dict's 'name' field
dict["name"] = self.add_name(dict["name"], name)
# Prepend name to the dict's 'shortname' field
if add_to_shortname:
dict["shortname"] = self.add_name(dict["shortname"], name)
# Prepend name to each of the dict's dependencies
for i in range(len(dict["depend"])):
dict["depend"][i] = self.add_name(dict["depend"][i], name)
# Add new dependencies
dict["depend"] += dep_list
def __modify_list_subvariants(self, list, name, dep_list, add_to_shortname):
"""
Make some modifications to list, as part of parsing a 'subvariants'
block.
@param list: List to be processed
@param name: Name to be appended to the dictionary's 'name' key
@param dep_list: List of dependencies to be added to the dictionary's
'depend' key
@param add_to_shortname: Boolean indicating whether name should be
appended to the dictionary's 'shortname' as well
"""
for dict in list:
# Add new dependencies
for dep in dep_list:
dep_name = self.add_name(dict["name"], dep, append=True)
dict["depend"].append(dep_name)
# Append name to the dict's 'name' field
dict["name"] = self.add_name(dict["name"], name, append=True)
# Append name to the dict's 'shortname' field
if add_to_shortname:
dict["shortname"] = self.add_name(dict["shortname"], name,
append=True)
if __name__ == "__main__":
parser = optparse.OptionParser()
parser.add_option('-f', '--file', dest="filename", action='store_true',
help='path to a config file that will be parsed. '
'If not specified, will parse kvm_tests.cfg '
'located inside the kvm test dir.')
parser.add_option('--verbose', dest="debug", action='store_true',
help='include debug messages in console output')
options, args = parser.parse_args()
filename = options.filename
debug = options.debug
if not filename:
filename = os.path.join(os.path.dirname(sys.argv[0]), "kvm_tests.cfg")
# Here we configure the stand alone program to use the autotest
# logging system.
logging_manager.configure_logging(KvmLoggingConfig(), verbose=debug)
list = config(filename, debug=debug).get_list()
i = 0
for dict in list:
logging.info("Dictionary #%d:", i)
keys = dict.keys()
keys.sort()
for key in keys:
logging.info(" %s = %s", key, dict[key])
i += 1