blob: cd61b7844c0d48505385103b7fd1a9ded7843677 [file] [log] [blame]
Lucas Bates76b903e2017-06-16 17:22:35 -04001#!/usr/bin/env python3
2
3"""
4tdc.py - Linux tc (Traffic Control) unit test driver
5
6Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
7"""
8
9import re
10import os
11import sys
12import argparse
13import json
14import subprocess
15from collections import OrderedDict
16from string import Template
17
18from tdc_config import *
19from tdc_helper import *
20
21
22USE_NS = True
23
24
25def replace_keywords(cmd):
26 """
27 For a given executable command, substitute any known
28 variables contained within NAMES with the correct values
29 """
30 tcmd = Template(cmd)
31 subcmd = tcmd.safe_substitute(NAMES)
32 return subcmd
33
34
35def exec_cmd(command, nsonly=True):
36 """
37 Perform any required modifications on an executable command, then run
38 it in a subprocess and return the results.
39 """
40 if (USE_NS and nsonly):
41 command = 'ip netns exec $NS ' + command
42
43 if '$' in command:
44 command = replace_keywords(command)
45
46 proc = subprocess.Popen(command,
47 shell=True,
48 stdout=subprocess.PIPE,
49 stderr=subprocess.PIPE)
50 (rawout, serr) = proc.communicate()
51
52 if proc.returncode != 0:
53 foutput = serr.decode("utf-8")
54 else:
55 foutput = rawout.decode("utf-8")
56
57 proc.stdout.close()
58 proc.stderr.close()
59 return proc, foutput
60
61
62def prepare_env(cmdlist):
63 """
64 Execute the setup/teardown commands for a test case. Optionally
65 terminate test execution if the command fails.
66 """
67 for cmdinfo in cmdlist:
68 if (type(cmdinfo) == list):
69 exit_codes = cmdinfo[1:]
70 cmd = cmdinfo[0]
71 else:
72 exit_codes = [0]
73 cmd = cmdinfo
74
75 if (len(cmd) == 0):
76 continue
77
78 (proc, foutput) = exec_cmd(cmd)
79
80 if proc.returncode not in exit_codes:
81 print
82 print("Could not execute:")
83 print(cmd)
84 print("\nError message:")
85 print(foutput)
86 print("\nAborting test run.")
87 ns_destroy()
88 exit(1)
89
90
91def test_runner(filtered_tests):
92 """
93 Driver function for the unit tests.
94
95 Prints information about the tests being run, executes the setup and
96 teardown commands and the command under test itself. Also determines
97 success/failure based on the information in the test case and generates
98 TAP output accordingly.
99 """
100 testlist = filtered_tests
101 tcount = len(testlist)
102 index = 1
103 tap = str(index) + ".." + str(tcount) + "\n"
104
105 for tidx in testlist:
106 result = True
107 tresult = ""
108 print("Test " + tidx["id"] + ": " + tidx["name"])
109 prepare_env(tidx["setup"])
110 (p, procout) = exec_cmd(tidx["cmdUnderTest"])
111 exit_code = p.returncode
112
113 if (exit_code != int(tidx["expExitCode"])):
114 result = False
115 print("exit:", exit_code, int(tidx["expExitCode"]))
116 print(procout)
117 else:
118 match_pattern = re.compile(str(tidx["matchPattern"]), re.DOTALL)
119 (p, procout) = exec_cmd(tidx["verifyCmd"])
120 match_index = re.findall(match_pattern, procout)
121 if len(match_index) != int(tidx["matchCount"]):
122 result = False
123
124 if result == True:
125 tresult += "ok "
126 else:
127 tresult += "not ok "
128 tap += tresult + str(index) + " " + tidx["id"] + " " + tidx["name"] + "\n"
129
130 if result == False:
131 tap += procout
132
133 prepare_env(tidx["teardown"])
134 index += 1
135
136 return tap
137
138
139def ns_create():
140 """
141 Create the network namespace in which the tests will be run and set up
142 the required network devices for it.
143 """
144 if (USE_NS):
145 cmd = 'ip netns add $NS'
146 exec_cmd(cmd, False)
147 cmd = 'ip link add $DEV0 type veth peer name $DEV1'
148 exec_cmd(cmd, False)
149 cmd = 'ip link set $DEV1 netns $NS'
150 exec_cmd(cmd, False)
151 cmd = 'ip link set $DEV0 up'
152 exec_cmd(cmd, False)
153 cmd = 'ip -s $NS link set $DEV1 up'
154 exec_cmd(cmd, False)
155
156
157def ns_destroy():
158 """
159 Destroy the network namespace for testing (and any associated network
160 devices as well)
161 """
162 if (USE_NS):
163 cmd = 'ip netns delete $NS'
164 exec_cmd(cmd, False)
165
166
167def has_blank_ids(idlist):
168 """
169 Search the list for empty ID fields and return true/false accordingly.
170 """
171 return not(all(k for k in idlist))
172
173
174def load_from_file(filename):
175 """
176 Open the JSON file containing the test cases and return them as an
177 ordered dictionary object.
178 """
179 with open(filename) as test_data:
180 testlist = json.load(test_data, object_pairs_hook=OrderedDict)
181 idlist = get_id_list(testlist)
182 if (has_blank_ids(idlist)):
183 for k in testlist:
184 k['filename'] = filename
185 return testlist
186
187
188def args_parse():
189 """
190 Create the argument parser.
191 """
192 parser = argparse.ArgumentParser(description='Linux TC unit tests')
193 return parser
194
195
196def set_args(parser):
197 """
198 Set the command line arguments for tdc.
199 """
200 parser.add_argument('-p', '--path', type=str,
201 help='The full path to the tc executable to use')
202 parser.add_argument('-c', '--category', type=str, nargs='?', const='+c',
203 help='Run tests only from the specified category, or if no category is specified, list known categories.')
204 parser.add_argument('-f', '--file', type=str,
205 help='Run tests from the specified file')
206 parser.add_argument('-l', '--list', type=str, nargs='?', const="", metavar='CATEGORY',
207 help='List all test cases, or those only within the specified category')
208 parser.add_argument('-s', '--show', type=str, nargs=1, metavar='ID', dest='showID',
209 help='Display the test case with specified id')
210 parser.add_argument('-e', '--execute', type=str, nargs=1, metavar='ID',
211 help='Execute the single test case with specified ID')
212 parser.add_argument('-i', '--id', action='store_true', dest='gen_id',
213 help='Generate ID numbers for new test cases')
214 return parser
215 return parser
216
217
218def check_default_settings(args):
219 """
220 Process any arguments overriding the default settings, and ensure the
221 settings are correct.
222 """
223 # Allow for overriding specific settings
224 global NAMES
225
226 if args.path != None:
227 NAMES['TC'] = args.path
228 if not os.path.isfile(NAMES['TC']):
229 print("The specified tc path " + NAMES['TC'] + " does not exist.")
230 exit(1)
231
232
233def get_id_list(alltests):
234 """
235 Generate a list of all IDs in the test cases.
236 """
237 return [x["id"] for x in alltests]
238
239
240def check_case_id(alltests):
241 """
242 Check for duplicate test case IDs.
243 """
244 idl = get_id_list(alltests)
245 return [x for x in idl if idl.count(x) > 1]
246
247
248def does_id_exist(alltests, newid):
249 """
250 Check if a given ID already exists in the list of test cases.
251 """
252 idl = get_id_list(alltests)
253 return (any(newid == x for x in idl))
254
255
256def generate_case_ids(alltests):
257 """
258 If a test case has a blank ID field, generate a random hex ID for it
259 and then write the test cases back to disk.
260 """
261 import random
262 for c in alltests:
263 if (c["id"] == ""):
264 while True:
265 newid = str('%04x' % random.randrange(16**4))
266 if (does_id_exist(alltests, newid)):
267 continue
268 else:
269 c['id'] = newid
270 break
271
272 ufilename = []
273 for c in alltests:
274 if ('filename' in c):
275 ufilename.append(c['filename'])
276 ufilename = get_unique_item(ufilename)
277 for f in ufilename:
278 testlist = []
279 for t in alltests:
280 if 'filename' in t:
281 if t['filename'] == f:
282 del t['filename']
283 testlist.append(t)
284 outfile = open(f, "w")
285 json.dump(testlist, outfile, indent=4)
286 outfile.close()
287
288
289def get_test_cases(args):
290 """
291 If a test case file is specified, retrieve tests from that file.
292 Otherwise, glob for all json files in subdirectories and load from
293 each one.
294 """
295 import fnmatch
296 if args.file != None:
297 if not os.path.isfile(args.file):
298 print("The specified test case file " + args.file + " does not exist.")
299 exit(1)
300 flist = [args.file]
301 else:
302 flist = []
303 for root, dirnames, filenames in os.walk('tc-tests'):
304 for filename in fnmatch.filter(filenames, '*.json'):
305 flist.append(os.path.join(root, filename))
306 alltests = list()
307 for casefile in flist:
308 alltests = alltests + (load_from_file(casefile))
309 return alltests
310
311
312def set_operation_mode(args):
313 """
314 Load the test case data and process remaining arguments to determine
315 what the script should do for this run, and call the appropriate
316 function.
317 """
318 alltests = get_test_cases(args)
319
320 if args.gen_id:
321 idlist = get_id_list(alltests)
322 if (has_blank_ids(idlist)):
323 alltests = generate_case_ids(alltests)
324 else:
325 print("No empty ID fields found in test files.")
326 exit(0)
327
328 duplicate_ids = check_case_id(alltests)
329 if (len(duplicate_ids) > 0):
330 print("The following test case IDs are not unique:")
331 print(str(set(duplicate_ids)))
332 print("Please correct them before continuing.")
333 exit(1)
334
335 ucat = get_test_categories(alltests)
336
337 if args.showID:
338 show_test_case_by_id(alltests, args.showID[0])
339 exit(0)
340
341 if args.execute:
342 target_id = args.execute[0]
343 else:
344 target_id = ""
345
346 if args.category:
347 if (args.category == '+c'):
348 print("Available categories:")
349 print_sll(ucat)
350 exit(0)
351 else:
352 target_category = args.category
353 else:
354 target_category = ""
355
356
357 testcases = get_categorized_testlist(alltests, ucat)
358
359 if args.list:
360 if (len(args.list) == 0):
361 list_test_cases(alltests)
362 exit(0)
363 elif(len(args.list > 0)):
364 if (args.list not in ucat):
365 print("Unknown category " + args.list)
366 print("Available categories:")
367 print_sll(ucat)
368 exit(1)
369 list_test_cases(testcases[args.list])
370 exit(0)
371
372 if (os.geteuid() != 0):
373 print("This script must be run with root privileges.\n")
374 exit(1)
375
376 ns_create()
377
378 if (len(target_category) == 0):
379 if (len(target_id) > 0):
380 alltests = list(filter(lambda x: target_id in x['id'], alltests))
381 if (len(alltests) == 0):
382 print("Cannot find a test case with ID matching " + target_id)
383 exit(1)
384 catresults = test_runner(alltests)
385 print("All test results: " + "\n\n" + catresults)
386 elif (len(target_category) > 0):
387 if (target_category not in ucat):
388 print("Specified category is not present in this file.")
389 exit(1)
390 else:
391 catresults = test_runner(testcases[target_category])
392 print("Category " + target_category + "\n\n" + catresults)
393
394 ns_destroy()
395
396
397def main():
398 """
399 Start of execution; set up argument parser and get the arguments,
400 and start operations.
401 """
402 parser = args_parse()
403 parser = set_args(parser)
404 (args, remaining) = parser.parse_known_args()
405 check_default_settings(args)
406
407 set_operation_mode(args)
408
409 exit(0)
410
411
412if __name__ == "__main__":
413 main()