blob: 0154ca79985f147019ea26dc5c4f804a35a63ab3 [file] [log] [blame]
Wyatt Heplerbc254972020-01-06 18:35:30 -08001#!/usr/bin/env python3
2# Copyright 2020 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8# https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15"""Tests for the tokens module."""
16
17import datetime
18import io
19import logging
20import unittest
21
22from pw_tokenizer import tokens
Wyatt Heplera6e4ef32020-10-28 14:30:33 -070023from pw_tokenizer.tokens import default_hash, _LOG
Wyatt Heplerbc254972020-01-06 18:35:30 -080024
25CSV_DATABASE = '''\
2600000000,2019-06-10,""
27141c35d5, ,"The answer: ""%s"""
282db1515f, ,"%u%d%02x%X%hu%hhu%d%ld%lu%lld%llu%c%c%c"
292e668cd6,2019-06-11,"Jello, world!"
3031631781, ,"%d"
3161fd1e26, ,"%ld"
3268ab92da, ,"%s there are %x (%.2f) of them%c"
337b940e2a, ,"Hello %s! %hd %e"
34851beeb6, ,"%u %d"
35881436a0, ,"The answer is: %s"
36ad002c97, ,"%llx"
37b3653e13,2019-06-12,"Jello!"
38b912567b, ,"%x%lld%1.2f%s"
39cc6d3131,2020-01-01,"Jello?"
40e13b0f94, ,"%llu"
41e65aefef,2019-06-10,"Won't fit : %s%d"
42'''
43
44# The date 2019-06-10 is 07E3-06-0A in hex. In database order, it's 0A 06 E3 07.
45BINARY_DATABASE = (
46 b'TOKENS\x00\x00\x10\x00\x00\x00\0\0\0\0' # header (0x10 entries)
47 b'\x00\x00\x00\x00\x0a\x06\xe3\x07' # 0x01
48 b'\xd5\x35\x1c\x14\xff\xff\xff\xff' # 0x02
49 b'\x5f\x51\xb1\x2d\xff\xff\xff\xff' # 0x03
50 b'\xd6\x8c\x66\x2e\x0b\x06\xe3\x07' # 0x04
51 b'\x81\x17\x63\x31\xff\xff\xff\xff' # 0x05
52 b'\x26\x1e\xfd\x61\xff\xff\xff\xff' # 0x06
53 b'\xda\x92\xab\x68\xff\xff\xff\xff' # 0x07
54 b'\x2a\x0e\x94\x7b\xff\xff\xff\xff' # 0x08
55 b'\xb6\xee\x1b\x85\xff\xff\xff\xff' # 0x09
56 b'\xa0\x36\x14\x88\xff\xff\xff\xff' # 0x0a
57 b'\x97\x2c\x00\xad\xff\xff\xff\xff' # 0x0b
58 b'\x13\x3e\x65\xb3\x0c\x06\xe3\x07' # 0x0c
59 b'\x7b\x56\x12\xb9\xff\xff\xff\xff' # 0x0d
60 b'\x31\x31\x6d\xcc\x01\x01\xe4\x07' # 0x0e
61 b'\x94\x0f\x3b\xe1\xff\xff\xff\xff' # 0x0f
62 b'\xef\xef\x5a\xe6\x0a\x06\xe3\x07' # 0x10
63 b'\x00'
64 b'The answer: "%s"\x00'
65 b'%u%d%02x%X%hu%hhu%d%ld%lu%lld%llu%c%c%c\x00'
66 b'Jello, world!\x00'
67 b'%d\x00'
68 b'%ld\x00'
69 b'%s there are %x (%.2f) of them%c\x00'
70 b'Hello %s! %hd %e\x00'
71 b'%u %d\x00'
72 b'The answer is: %s\x00'
73 b'%llx\x00'
74 b'Jello!\x00'
75 b'%x%lld%1.2f%s\x00'
76 b'Jello?\x00'
77 b'%llu\x00'
78 b'Won\'t fit : %s%d\x00')
79
80INVALID_CSV = """\
811,,"Whoa there!"
822,this is totally invalid,"Whoa there!"
833,,"This one's OK"
84,,"Also broken"
855,1845-2-2,"I'm %s fine"
866,"Missing fields"
87"""
88
89
90def read_db_from_csv(csv_str):
91 with io.StringIO(csv_str) as csv_db:
92 return tokens.Database(tokens.parse_csv(csv_db))
93
94
95class TokenDatabaseTest(unittest.TestCase):
96 """Tests the token database class."""
97 def test_csv(self):
98 db = read_db_from_csv(CSV_DATABASE)
99 self.assertEqual(str(db), CSV_DATABASE)
100
101 db = read_db_from_csv('')
102 self.assertEqual(str(db), '')
103
104 def test_csv_formatting(self):
105 db = read_db_from_csv('')
106 self.assertEqual(str(db), '')
107
108 db = read_db_from_csv('abc123,2048-4-1,Fake string\n')
109 self.assertEqual(str(db), '00abc123,2048-04-01,"Fake string"\n')
110
111 db = read_db_from_csv('1,1990-01-01,"Quotes"""\n'
112 '0,1990-02-01,"Commas,"",,"\n')
113 self.assertEqual(str(db), ('00000000,1990-02-01,"Commas,"",,"\n'
114 '00000001,1990-01-01,"Quotes"""\n'))
115
116 def test_bad_csv(self):
117 with self.assertLogs(_LOG, logging.ERROR) as logs:
118 db = read_db_from_csv(INVALID_CSV)
119
120 self.assertGreaterEqual(len(logs.output), 3)
121 self.assertEqual(len(db.token_to_entries), 3)
122
123 self.assertEqual(db.token_to_entries[1][0].string, 'Whoa there!')
124 self.assertFalse(db.token_to_entries[2])
125 self.assertEqual(db.token_to_entries[3][0].string, "This one's OK")
126 self.assertFalse(db.token_to_entries[4])
127 self.assertEqual(db.token_to_entries[5][0].string, "I'm %s fine")
128 self.assertFalse(db.token_to_entries[6])
129
130 def test_lookup(self):
131 db = read_db_from_csv(CSV_DATABASE)
132 self.assertEqual(db.token_to_entries[0x9999], [])
133
134 matches = db.token_to_entries[0x2e668cd6]
135 self.assertEqual(len(matches), 1)
136 jello = matches[0]
137
138 self.assertEqual(jello.token, 0x2e668cd6)
139 self.assertEqual(jello.string, 'Jello, world!')
140 self.assertEqual(jello.date_removed, datetime.datetime(2019, 6, 11))
141
142 matches = db.token_to_entries[0xe13b0f94]
143 self.assertEqual(len(matches), 1)
144 llu = matches[0]
145 self.assertEqual(llu.token, 0xe13b0f94)
146 self.assertEqual(llu.string, '%llu')
147 self.assertIsNone(llu.date_removed)
148
149 answer, = db.token_to_entries[0x141c35d5]
150 self.assertEqual(answer.string, 'The answer: "%s"')
151
152 def test_collisions(self):
153 hash_1 = tokens.pw_tokenizer_65599_fixed_length_hash('o000', 96)
154 hash_2 = tokens.pw_tokenizer_65599_fixed_length_hash('0Q1Q', 96)
155 self.assertEqual(hash_1, hash_2)
156
157 db = tokens.Database.from_strings(['o000', '0Q1Q'])
158
159 self.assertEqual(len(db.token_to_entries[hash_1]), 2)
160 self.assertCountEqual(
161 [entry.string for entry in db.token_to_entries[hash_1]],
162 ['o000', '0Q1Q'])
163
164 def test_purge(self):
165 db = read_db_from_csv(CSV_DATABASE)
166 original_length = len(db.token_to_entries)
167
168 self.assertEqual(db.token_to_entries[0][0].string, '')
169 self.assertEqual(db.token_to_entries[0x31631781][0].string, '%d')
170 self.assertEqual(db.token_to_entries[0x2e668cd6][0].string,
171 'Jello, world!')
172 self.assertEqual(db.token_to_entries[0xb3653e13][0].string, 'Jello!')
173 self.assertEqual(db.token_to_entries[0xcc6d3131][0].string, 'Jello?')
174 self.assertEqual(db.token_to_entries[0xe65aefef][0].string,
175 "Won't fit : %s%d")
176
177 db.purge(datetime.datetime(2019, 6, 11))
178 self.assertLess(len(db.token_to_entries), original_length)
179
180 self.assertFalse(db.token_to_entries[0])
181 self.assertEqual(db.token_to_entries[0x31631781][0].string, '%d')
182 self.assertFalse(db.token_to_entries[0x2e668cd6])
183 self.assertEqual(db.token_to_entries[0xb3653e13][0].string, 'Jello!')
184 self.assertEqual(db.token_to_entries[0xcc6d3131][0].string, 'Jello?')
185 self.assertFalse(db.token_to_entries[0xe65aefef])
186
187 def test_merge(self):
188 """Tests the tokens.Database merge method."""
189
190 db = tokens.Database()
191
192 # Test basic merging into an empty database.
193 db.merge(
194 tokens.Database([
Wyatt Heplera6e4ef32020-10-28 14:30:33 -0700195 tokens.TokenizedStringEntry(
196 1, 'one', date_removed=datetime.datetime.min),
197 tokens.TokenizedStringEntry(
198 2, 'two', date_removed=datetime.datetime.min),
Wyatt Heplerbc254972020-01-06 18:35:30 -0800199 ]))
200 self.assertEqual({str(e) for e in db.entries()}, {'one', 'two'})
201 self.assertEqual(db.token_to_entries[1][0].date_removed,
202 datetime.datetime.min)
203 self.assertEqual(db.token_to_entries[2][0].date_removed,
204 datetime.datetime.min)
205
206 # Test merging in an entry with a removal date.
207 db.merge(
208 tokens.Database([
209 tokens.TokenizedStringEntry(3, 'three'),
Wyatt Heplera6e4ef32020-10-28 14:30:33 -0700210 tokens.TokenizedStringEntry(
211 4, 'four', date_removed=datetime.datetime.min),
Wyatt Heplerbc254972020-01-06 18:35:30 -0800212 ]))
213 self.assertEqual({str(e)
214 for e in db.entries()},
215 {'one', 'two', 'three', 'four'})
216 self.assertIsNone(db.token_to_entries[3][0].date_removed)
217 self.assertEqual(db.token_to_entries[4][0].date_removed,
218 datetime.datetime.min)
219
220 # Test merging in one entry.
221 db.merge(tokens.Database([
222 tokens.TokenizedStringEntry(5, 'five'),
223 ]))
224 self.assertEqual({str(e)
225 for e in db.entries()},
226 {'one', 'two', 'three', 'four', 'five'})
227 self.assertEqual(db.token_to_entries[4][0].date_removed,
228 datetime.datetime.min)
229 self.assertIsNone(db.token_to_entries[5][0].date_removed)
230
231 # Merge in repeated entries different removal dates.
232 db.merge(
233 tokens.Database([
Wyatt Heplera6e4ef32020-10-28 14:30:33 -0700234 tokens.TokenizedStringEntry(
235 4, 'four', date_removed=datetime.datetime.max),
236 tokens.TokenizedStringEntry(
237 5, 'five', date_removed=datetime.datetime.max),
Wyatt Heplerbc254972020-01-06 18:35:30 -0800238 ]))
239 self.assertEqual(len(db.entries()), 5)
240 self.assertEqual({str(e)
241 for e in db.entries()},
242 {'one', 'two', 'three', 'four', 'five'})
243 self.assertEqual(db.token_to_entries[4][0].date_removed,
244 datetime.datetime.max)
245 self.assertIsNone(db.token_to_entries[5][0].date_removed)
246
247 # Merge in the same repeated entries now without removal dates.
248 db.merge(
249 tokens.Database([
250 tokens.TokenizedStringEntry(4, 'four'),
251 tokens.TokenizedStringEntry(5, 'five')
252 ]))
253 self.assertEqual(len(db.entries()), 5)
254 self.assertEqual({str(e)
255 for e in db.entries()},
256 {'one', 'two', 'three', 'four', 'five'})
257 self.assertIsNone(db.token_to_entries[4][0].date_removed)
258 self.assertIsNone(db.token_to_entries[5][0].date_removed)
259
260 # Merge in an empty databsse.
261 db.merge(tokens.Database([]))
262 self.assertEqual({str(e)
263 for e in db.entries()},
264 {'one', 'two', 'three', 'four', 'five'})
265
Wyatt Heplera6e4ef32020-10-28 14:30:33 -0700266 def test_merge_multiple_datbases_in_one_call(self):
267 """Tests the merge and merged methods with multiple databases."""
Wyatt Heplerbc254972020-01-06 18:35:30 -0800268 db = tokens.Database.merged(
Wyatt Heplera6e4ef32020-10-28 14:30:33 -0700269 tokens.Database([
270 tokens.TokenizedStringEntry(1,
271 'one',
272 date_removed=datetime.datetime.max)
273 ]),
274 tokens.Database([
275 tokens.TokenizedStringEntry(2,
276 'two',
277 date_removed=datetime.datetime.min)
278 ]),
279 tokens.Database([
280 tokens.TokenizedStringEntry(1,
281 'one',
282 date_removed=datetime.datetime.min)
283 ]))
Wyatt Heplerbc254972020-01-06 18:35:30 -0800284 self.assertEqual({str(e) for e in db.entries()}, {'one', 'two'})
285
286 db.merge(
287 tokens.Database([
Wyatt Heplera6e4ef32020-10-28 14:30:33 -0700288 tokens.TokenizedStringEntry(4,
289 'four',
290 date_removed=datetime.datetime.max)
Wyatt Heplerbc254972020-01-06 18:35:30 -0800291 ]),
Wyatt Heplerbc254972020-01-06 18:35:30 -0800292 tokens.Database([
Wyatt Heplera6e4ef32020-10-28 14:30:33 -0700293 tokens.TokenizedStringEntry(2,
294 'two',
295 date_removed=datetime.datetime.max)
296 ]),
297 tokens.Database([
298 tokens.TokenizedStringEntry(3,
299 'three',
300 date_removed=datetime.datetime.min)
Wyatt Heplerbc254972020-01-06 18:35:30 -0800301 ]))
302 self.assertEqual({str(e)
303 for e in db.entries()},
304 {'one', 'two', 'three', 'four'})
305
306 def test_entry_counts(self):
307 self.assertEqual(len(CSV_DATABASE.splitlines()), 16)
308
309 db = read_db_from_csv(CSV_DATABASE)
310 self.assertEqual(len(db.entries()), 16)
311 self.assertEqual(len(db.token_to_entries), 16)
312
313 # Add two strings with the same hash.
314 db.add(['o000', '0Q1Q'])
315
316 self.assertEqual(len(db.entries()), 18)
317 self.assertEqual(len(db.token_to_entries), 17)
318
319 def test_mark_removals(self):
Wyatt Heplera6e4ef32020-10-28 14:30:33 -0700320 """Tests that date_removed field is set by mark_removals."""
Wyatt Heplerbc254972020-01-06 18:35:30 -0800321 db = tokens.Database.from_strings(
322 ['MILK', 'apples', 'oranges', 'CHEESE', 'pears'])
323
324 self.assertTrue(
325 all(entry.date_removed is None for entry in db.entries()))
326 date_1 = datetime.datetime(1, 2, 3)
327
328 db.mark_removals(['apples', 'oranges', 'pears'], date_1)
329
330 self.assertEqual(
Wyatt Heplera6e4ef32020-10-28 14:30:33 -0700331 db.token_to_entries[default_hash('MILK')][0].date_removed, date_1)
Wyatt Heplerbc254972020-01-06 18:35:30 -0800332 self.assertEqual(
Wyatt Heplera6e4ef32020-10-28 14:30:33 -0700333 db.token_to_entries[default_hash('CHEESE')][0].date_removed,
334 date_1)
Wyatt Heplerbc254972020-01-06 18:35:30 -0800335
336 now = datetime.datetime.now()
337 db.mark_removals(['MILK', 'CHEESE', 'pears'])
338
339 # New strings are not added or re-added in mark_removed().
340 self.assertGreaterEqual(
Wyatt Heplera6e4ef32020-10-28 14:30:33 -0700341 db.token_to_entries[default_hash('MILK')][0].date_removed, date_1)
Wyatt Heplerbc254972020-01-06 18:35:30 -0800342 self.assertGreaterEqual(
Wyatt Heplera6e4ef32020-10-28 14:30:33 -0700343 db.token_to_entries[default_hash('CHEESE')][0].date_removed,
344 date_1)
Wyatt Heplerbc254972020-01-06 18:35:30 -0800345
346 # These strings were removed.
347 self.assertGreaterEqual(
Wyatt Heplera6e4ef32020-10-28 14:30:33 -0700348 db.token_to_entries[default_hash('apples')][0].date_removed, now)
Wyatt Heplerbc254972020-01-06 18:35:30 -0800349 self.assertGreaterEqual(
Wyatt Heplera6e4ef32020-10-28 14:30:33 -0700350 db.token_to_entries[default_hash('oranges')][0].date_removed, now)
Wyatt Heplerbc254972020-01-06 18:35:30 -0800351 self.assertIsNone(
Wyatt Heplera6e4ef32020-10-28 14:30:33 -0700352 db.token_to_entries[default_hash('pears')][0].date_removed)
Wyatt Heplerbc254972020-01-06 18:35:30 -0800353
354 def test_add(self):
355 db = tokens.Database()
356 db.add(['MILK', 'apples'])
357 self.assertEqual({e.string for e in db.entries()}, {'MILK', 'apples'})
358
359 db.add(['oranges', 'CHEESE', 'pears'])
360 self.assertEqual(len(db.entries()), 5)
361
362 db.add(['MILK', 'apples', 'only this one is new'])
363 self.assertEqual(len(db.entries()), 6)
364
365 db.add(['MILK'])
366 self.assertEqual({e.string
367 for e in db.entries()}, {
368 'MILK', 'apples', 'oranges', 'CHEESE', 'pears',
369 'only this one is new'
370 })
371
372 def test_binary_format_write(self):
373 db = read_db_from_csv(CSV_DATABASE)
374
375 with io.BytesIO() as fd:
376 tokens.write_binary(db, fd)
377 binary_db = fd.getvalue()
378
379 self.assertEqual(BINARY_DATABASE, binary_db)
380
381 def test_binary_format_parse(self):
382 with io.BytesIO(BINARY_DATABASE) as binary_db:
383 db = tokens.Database(tokens.parse_binary(binary_db))
384
385 self.assertEqual(str(db), CSV_DATABASE)
386
387
388class TestFilter(unittest.TestCase):
389 """Tests the filtering functionality."""
390 def setUp(self):
Wyatt Heplerbc254972020-01-06 18:35:30 -0800391 self.db = tokens.Database([
392 tokens.TokenizedStringEntry(1, 'Luke'),
393 tokens.TokenizedStringEntry(2, 'Leia'),
394 tokens.TokenizedStringEntry(2, 'Darth Vader'),
395 tokens.TokenizedStringEntry(2, 'Emperor Palpatine'),
396 tokens.TokenizedStringEntry(3, 'Han'),
397 tokens.TokenizedStringEntry(4, 'Chewbacca'),
398 tokens.TokenizedStringEntry(5, 'Darth Maul'),
399 tokens.TokenizedStringEntry(6, 'Han Solo'),
400 ])
401
402 def test_filter_include_single_regex(self):
403 self.db.filter(include=[' ']) # anything with a space
404 self.assertEqual(
405 set(e.string for e in self.db.entries()),
406 {'Darth Vader', 'Emperor Palpatine', 'Darth Maul', 'Han Solo'})
407
408 def test_filter_include_multiple_regexes(self):
409 self.db.filter(include=['Darth', 'cc', '^Han$'])
410 self.assertEqual(set(e.string for e in self.db.entries()),
411 {'Darth Vader', 'Darth Maul', 'Han', 'Chewbacca'})
412
413 def test_filter_include_no_matches(self):
414 self.db.filter(include=['Gandalf'])
415 self.assertFalse(self.db.entries())
416
417 def test_filter_exclude_single_regex(self):
418 self.db.filter(exclude=['^[^L]'])
419 self.assertEqual(set(e.string for e in self.db.entries()),
420 {'Luke', 'Leia'})
421
422 def test_filter_exclude_multiple_regexes(self):
423 self.db.filter(exclude=[' ', 'Han', 'Chewbacca'])
424 self.assertEqual(set(e.string for e in self.db.entries()),
425 {'Luke', 'Leia'})
426
427 def test_filter_exclude_no_matches(self):
428 self.db.filter(exclude=['.*'])
429 self.assertFalse(self.db.entries())
430
431 def test_filter_include_and_exclude(self):
432 self.db.filter(include=[' '], exclude=['Darth', 'Emperor'])
433 self.assertEqual(set(e.string for e in self.db.entries()),
434 {'Han Solo'})
435
436 def test_filter_neither_include_nor_exclude(self):
437 self.db.filter()
438 self.assertEqual(
439 set(e.string for e in self.db.entries()), {
440 'Luke', 'Leia', 'Darth Vader', 'Emperor Palpatine', 'Han',
441 'Chewbacca', 'Darth Maul', 'Han Solo'
442 })
443
444
445if __name__ == '__main__':
446 unittest.main()