pw_tokenizer: Update Python for tokenized entry format
- Update the token database code to support parsing new-style tokenized
string entries. Maintain support for legacy tokenized binaries.
- Update the report format to use JSON to make it easier to test and
extend.
- Add the domain to the TokenizedStringEntry class so that it stores all
information from tokenized entries in ELF files.
- Update database tests to test a new-style binary in addition to the
legacy binary.
Change-Id: I974cfa56d9b7261ea237bc900d29ba5b1e3b8dc8
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/21981
Commit-Queue: Wyatt Hepler <hepler@google.com>
Reviewed-by: Keir Mierle <keir@google.com>
diff --git a/pw_tokenizer/py/BUILD.gn b/pw_tokenizer/py/BUILD.gn
index eca6abc..a8a1c1b 100644
--- a/pw_tokenizer/py/BUILD.gn
+++ b/pw_tokenizer/py/BUILD.gn
@@ -42,4 +42,8 @@
"encode_test.py",
"tokens_test.py",
]
+ inputs = [
+ "example_binary_with_tokenized_strings.elf",
+ "example_legacy_binary_with_tokenized_strings.elf",
+ ]
}
diff --git a/pw_tokenizer/py/database_test.py b/pw_tokenizer/py/database_test.py
index 0049c68..18530b8 100755
--- a/pw_tokenizer/py/database_test.py
+++ b/pw_tokenizer/py/database_test.py
@@ -14,8 +14,8 @@
# the License.
"""Tests for the database module."""
+import json
import io
-import os
from pathlib import Path
import shutil
import sys
@@ -25,11 +25,21 @@
from pw_tokenizer import database
-ELF = Path(__file__).parent / 'example_binary_with_tokenized_strings.elf'
+# This is an ELF file with only the pw_tokenizer sections. It was created
+# from a tokenize_test binary built for the STM32F429i Discovery board. The
+# pw_tokenizer sections were extracted with this command:
+#
+# arm-none-eabi-objcopy -S --only-section ".pw_tokenize*" <ELF> <OUTPUT>
+#
+TOKENIZED_ENTRIES_ELF = Path(
+ __file__).parent / 'example_binary_with_tokenized_strings.elf'
+LEGACY_PLAIN_STRING_ELF = Path(
+ __file__).parent / 'example_legacy_binary_with_tokenized_strings.elf'
CSV_DEFAULT_DOMAIN = '''\
00000000, ,""
141c35d5, ,"The answer: ""%s"""
+29aef586, ,"1234"
2b78825f, ,"[:-)"
2e668cd6, ,"Jello, world!"
31631781, ,"%d"
@@ -37,9 +47,13 @@
68ab92da, ,"%s there are %x (%.2f) of them%c"
7b940e2a, ,"Hello %s! %hd %e"
7da55d52, ,">:-[]"
+7f35a9a5, ,"TestName"
851beeb6, ,"%u %d"
881436a0, ,"The answer is: %s"
88808930, ,"%u%d%02x%X%hu%hhd%d%ld%lu%lld%llu%c%c%c"
+92723f44, ,"???"
+a09d6698, ,"won-won-won-wonderful"
+aa9ffa66, ,"void pw::tokenizer::{anonymous}::TestName()"
ad002c97, ,"%llx"
b3653e13, ,"Jello!"
cc6d3131, ,"Jello?"
@@ -47,15 +61,20 @@
e65aefef, ,"Won't fit : %s%d"
'''
-CSV_TEST_DOMAIN = '''\
-00000000, ,""
+CSV_TEST_DOMAIN = """\
+17fa86d3, ,"hello"
+18c5017c, ,"yes"
59b2701c, ,"The answer was: %s"
881436a0, ,"The answer is: %s"
-'''
+d18ada0f, ,"something"
+"""
CSV_ALL_DOMAINS = '''\
00000000, ,""
141c35d5, ,"The answer: ""%s"""
+17fa86d3, ,"hello"
+18c5017c, ,"yes"
+29aef586, ,"1234"
2b78825f, ,"[:-)"
2e668cd6, ,"Jello, world!"
31631781, ,"%d"
@@ -64,16 +83,40 @@
68ab92da, ,"%s there are %x (%.2f) of them%c"
7b940e2a, ,"Hello %s! %hd %e"
7da55d52, ,">:-[]"
+7f35a9a5, ,"TestName"
851beeb6, ,"%u %d"
881436a0, ,"The answer is: %s"
88808930, ,"%u%d%02x%X%hu%hhd%d%ld%lu%lld%llu%c%c%c"
+92723f44, ,"???"
+a09d6698, ,"won-won-won-wonderful"
+aa9ffa66, ,"void pw::tokenizer::{anonymous}::TestName()"
ad002c97, ,"%llx"
b3653e13, ,"Jello!"
cc6d3131, ,"Jello?"
+d18ada0f, ,"something"
e13b0f94, ,"%llu"
e65aefef, ,"Won't fit : %s%d"
'''
+EXPECTED_REPORT = {
+ str(TOKENIZED_ENTRIES_ELF): {
+ '': {
+ 'present_entries': 22,
+ 'present_size_bytes': 289,
+ 'total_entries': 22,
+ 'total_size_bytes': 289,
+ 'collisions': 0
+ },
+ 'TEST_DOMAIN': {
+ 'present_entries': 5,
+ 'present_size_bytes': 57,
+ 'total_entries': 5,
+ 'total_size_bytes': 57,
+ 'collisions': 0
+ }
+ }
+}
+
def run_cli(*args):
original_argv = sys.argv
@@ -96,112 +139,140 @@
return io.TextIOWrapper(output, write_through=True)
-REPORT_DEFAULT_DOMAIN = '''\
-example_binary_with_tokenized_strings.elf]
- Domain: default
- Entries present: 17
- Size of strings: 205 B
- Total entries: 17
- Total size of strings: 205 B
- Collisions: 0 tokens
-'''.replace('\n', os.linesep).encode()
-
-REPORT_TEST_DOMAIN = '''\
-example_binary_with_tokenized_strings.elf]
- Domain: TEST_DOMAIN
- Entries present: 3
- Size of strings: 38 B
- Total entries: 3
- Total size of strings: 38 B
- Collisions: 0 tokens
-'''.replace('\n', os.linesep).encode()
-
-
class DatabaseCommandLineTest(unittest.TestCase):
"""Tests the database.py command line interface."""
def setUp(self):
self._dir = Path(tempfile.mkdtemp('_pw_tokenizer_test'))
self._csv = self._dir / 'db.csv'
+ self._elf = TOKENIZED_ENTRIES_ELF
+
+ self._csv_test_domain = CSV_TEST_DOMAIN
def tearDown(self):
shutil.rmtree(self._dir)
def test_create_csv(self):
- run_cli('create', '--database', self._csv, ELF)
+ run_cli('create', '--database', self._csv, self._elf)
- self.assertEqual(CSV_DEFAULT_DOMAIN, self._csv.read_text())
+ self.assertEqual(CSV_DEFAULT_DOMAIN.splitlines(),
+ self._csv.read_text().splitlines())
def test_create_csv_test_domain(self):
- run_cli('create', '--database', self._csv, f'{ELF}#TEST_DOMAIN')
+ run_cli('create', '--database', self._csv, f'{self._elf}#TEST_DOMAIN')
- self.assertEqual(CSV_TEST_DOMAIN, self._csv.read_text())
+ self.assertEqual(self._csv_test_domain.splitlines(),
+ self._csv.read_text().splitlines())
def test_create_csv_all_domains(self):
- run_cli('create', '--database', self._csv, f'{ELF}#.*')
+ run_cli('create', '--database', self._csv, f'{self._elf}#.*')
- self.assertEqual(CSV_ALL_DOMAINS, self._csv.read_text())
+ self.assertEqual(CSV_ALL_DOMAINS.splitlines(),
+ self._csv.read_text().splitlines())
def test_create_force(self):
self._csv.write_text(CSV_ALL_DOMAINS)
with self.assertRaises(FileExistsError):
- run_cli('create', '--database', self._csv, ELF)
+ run_cli('create', '--database', self._csv, self._elf)
- run_cli('create', '--force', '--database', self._csv, ELF)
+ run_cli('create', '--force', '--database', self._csv, self._elf)
def test_create_binary(self):
binary = self._dir / 'db.bin'
- run_cli('create', '--type', 'binary', '--database', binary, ELF)
+ run_cli('create', '--type', 'binary', '--database', binary, self._elf)
# Write the binary database as CSV to verify its contents.
run_cli('create', '--database', self._csv, binary)
- self.assertEqual(CSV_DEFAULT_DOMAIN, self._csv.read_text())
+ self.assertEqual(CSV_DEFAULT_DOMAIN.splitlines(),
+ self._csv.read_text().splitlines())
def test_add(self):
self._csv.write_text(CSV_ALL_DOMAINS)
- run_cli('add', '--database', self._csv, f'{ELF}#TEST_DOMAIN')
- self.assertEqual(CSV_ALL_DOMAINS, self._csv.read_text())
+ run_cli('add', '--database', self._csv, f'{self._elf}#TEST_DOMAIN')
+ self.assertEqual(CSV_ALL_DOMAINS.splitlines(),
+ self._csv.read_text().splitlines())
def test_mark_removals(self):
self._csv.write_text(CSV_ALL_DOMAINS)
run_cli('mark_removals', '--database', self._csv, '--date',
- '1998-09-04', f'{ELF}#default')
+ '1998-09-04', self._elf)
- # Add the removal date to the token not in the default domain
- new_csv = CSV_ALL_DOMAINS.replace('59b2701c, ,',
- '59b2701c,1998-09-04,')
+ # Add the removal date to the four tokens not in the default domain
+ new_csv = CSV_ALL_DOMAINS
+ new_csv = new_csv.replace('17fa86d3, ,"hello"',
+ '17fa86d3,1998-09-04,"hello"')
+ new_csv = new_csv.replace('18c5017c, ,"yes"',
+ '18c5017c,1998-09-04,"yes"')
+ new_csv = new_csv.replace('59b2701c, ,"The answer was: %s"',
+ '59b2701c,1998-09-04,"The answer was: %s"')
+ new_csv = new_csv.replace('d18ada0f, ,"something"',
+ 'd18ada0f,1998-09-04,"something"')
self.assertNotEqual(CSV_ALL_DOMAINS, new_csv)
- self.assertEqual(new_csv, self._csv.read_text())
+ self.assertEqual(new_csv.splitlines(),
+ self._csv.read_text().splitlines())
def test_purge(self):
self._csv.write_text(CSV_ALL_DOMAINS)
# Mark everything not in TEST_DOMAIN as removed.
- run_cli('mark_removals', '--database', self._csv, f'{ELF}#TEST_DOMAIN')
+ run_cli('mark_removals', '--database', self._csv,
+ f'{self._elf}#TEST_DOMAIN')
# Delete all entries except those in TEST_DOMAIN.
run_cli('purge', '--database', self._csv)
- self.assertEqual(CSV_TEST_DOMAIN, self._csv.read_text())
+ self.assertEqual(self._csv_test_domain.splitlines(),
+ self._csv.read_text().splitlines())
@mock.patch('sys.stdout', new_callable=_mock_output)
def test_report(self, mock_stdout):
- run_cli('report', ELF)
- self.assertIn(REPORT_DEFAULT_DOMAIN, mock_stdout.buffer.getvalue())
- self.assertIn(REPORT_TEST_DOMAIN, mock_stdout.buffer.getvalue())
+ run_cli('report', self._elf)
+
+ self.assertEqual(json.loads(mock_stdout.buffer.getvalue()),
+ EXPECTED_REPORT)
def test_replace(self):
sub = 'replace/ment'
- run_cli('create', '--database', self._csv, ELF, '--replace',
+ run_cli('create', '--database', self._csv, self._elf, '--replace',
r'(?i)\b[jh]ello\b/' + sub)
self.assertEqual(
CSV_DEFAULT_DOMAIN.replace('Jello', sub).replace('Hello', sub),
self._csv.read_text())
+class LegacyDatabaseCommandLineTest(DatabaseCommandLineTest):
+ """Test an ELF with the legacy plain string storage format."""
+ def setUp(self):
+ super().setUp()
+ self._elf = LEGACY_PLAIN_STRING_ELF
+
+ # The legacy approach for storing tokenized strings in an ELF always
+ # adds an entry for "", even if the empty string was never tokenized.
+ self._csv_test_domain = '00000000, ,""\n' + CSV_TEST_DOMAIN
+
+ @mock.patch('sys.stdout', new_callable=_mock_output)
+ def test_report(self, mock_stdout):
+ run_cli('report', self._elf)
+
+ report = EXPECTED_REPORT[str(TOKENIZED_ENTRIES_ELF)].copy()
+
+ # Count the implicitly added "" entry in TEST_DOMAIN.
+ report['TEST_DOMAIN']['present_entries'] += 1
+ report['TEST_DOMAIN']['present_size_bytes'] += 1
+ report['TEST_DOMAIN']['total_entries'] += 1
+ report['TEST_DOMAIN']['total_size_bytes'] += 1
+
+ # Rename "" to the legacy name "default"
+ report['default'] = report['']
+ del report['']
+
+ self.assertEqual({str(LEGACY_PLAIN_STRING_ELF): report},
+ json.loads(mock_stdout.buffer.getvalue()))
+
+
if __name__ == '__main__':
unittest.main()
diff --git a/pw_tokenizer/py/detokenize_test.py b/pw_tokenizer/py/detokenize_test.py
index a472385..7158102 100755
--- a/pw_tokenizer/py/detokenize_test.py
+++ b/pw_tokenizer/py/detokenize_test.py
@@ -18,6 +18,7 @@
import datetime as dt
import io
import os
+from pathlib import Path
import struct
import tempfile
import unittest
@@ -82,45 +83,16 @@
b'\x00\x0b\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00'
b'\x00\x00\x00')
-# This is an ELF file with only .pw_tokenized and .pw_tokenizer_info sections.
-# It was created from the ELF file for tokenize_test.cc with the command:
+# This is an ELF file with only the pw_tokenizer sections. It was created
+# from a tokenize_test binary built for the STM32F429i Discovery board. The
+# pw_tokenizer sections were extracted with this command:
#
-# arm-none-eabi-objcopy -S --only-section ".pw_tokenize*" <ELF> <OUTPUT>
+# arm-none-eabi-objcopy -S --only-section ".pw_tokenizer*" <ELF> <OUTPUT>
#
-# The resulting ELF was converted to a Python binary string using
-# path_to_byte_string function above. The file is also included in the repo as
-# example_binary_with_tokenized_strings.elf.
-ELF_WITH_TOKENIZER_SECTIONS = (
- b'\x7fELF\x01\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00(\x00\x01'
- b'\x00\x00\x00!G\x00\x084\x00\x00\x00\xd4\x02\x00\x00\x00\x04\x00\x054\x00'
- b' \x00\x04\x00(\x00\x04\x00\x03\x00\x01\x00\x00\x00\xb4\x00\x00\x00\x00'
- b'\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00'
- b'\x00\x00\x00\x01\x00\x01\x00\x00\x00\xb4\x00\x00\x00\x00\x02\x00\x08\x00'
- b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x07\x00\x00\x00\x00\x00\x01'
- b'\x00\x01\x00\x00\x00\xb4\x00\x00\x00\x00\x00\x00 \x00\x00\x00\x00\x00\x00'
- b'\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x01\x00\x01\x00\x00\x00'
- b'\xb4\x00\x00\x00\x18D\x00 \x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
- b'\x00\x06\x00\x00\x00\x00\x00\x01\x00Hello %s! %hd %e\x00\x00\x00\x00%u'
- b'%d%02x%X%hu%hhd%d%ld%lu%lld%llu%c%c%c\x00%u%d%02x%X%hu%hhd%d%ld%lu%lld'
- b'%llu%c%c%c\x00Won\'t fit : %s%d\x00\x00\x00\x00%llx\x00\x00\x00\x00%ld'
- b'\x00%d\x00\x00%ld\x00The answer is: %s\x00\x00\x00The answer is: %s\x00'
- b'\x00\x00The answer is: %s\x00\x00\x00The answer is: %s\x00\x00\x00The '
- b'answer is: %s\x00\x00\x00The answer is: %s\x00\x00\x00The answer is: %'
- b's\x00\x00\x00The answer is: %s\x00\x00\x00%u %d\x00\x00\x00The answer:'
- b' "%s"\x00\x00\x00\x00Jello, world!\x00\x00\x00Jello!\x00\x00Jello?\x00'
- b'\x00%s there are %x (%.2f) of them%c\x00\x00\x00\x00The answer is: %s\x00'
- b'\x00\x00\x00\x00\x00\x00[:-)\x00\x00\x00\x00>:-[]\x00\x00\x00%llu\x00\x00'
- b'\x00\x00The answer was: %s\x00\x00The answer is: %s\x00\x00.shstrtab\x00'
- b'.pw_tokenized.default\x00.pw_tokenized.TEST_DOMAIN\x00\x00\x00\x00\x00'
- b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
- b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
- b'\x00\x00\x00\x0b\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
- b'\x00\xb4\x00\x00\x00\xb9\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04'
- b'\x00\x00\x00\x00\x00\x00\x00!\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00'
- b'\x00\x00\x00\x00p\x02\x00\x00&\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
- b'\x00\x04\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x03\x00\x00\x00\x00'
- b'\x00\x00\x00\x00\x00\x00\x00\x96\x02\x00\x00;\x00\x00\x00\x00\x00\x00\x00'
- b'\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00')
+ELF_WITH_TOKENIZER_SECTIONS = Path(__file__).parent.joinpath(
+ 'example_binary_with_tokenized_strings.elf').read_bytes()
+
+TOKENS_IN_ELF = 22
# 0x2e668cd6 is 'Jello, world!' (which is also used in database_test.py).
JELLO_WORLD_TOKEN = b'\xd6\x8c\x66\x2e'
@@ -131,8 +103,9 @@
def test_simple(self):
detok = detokenize.Detokenizer(
tokens.Database([
- tokens.TokenizedStringEntry(0xcdab, '%02d %s %c%%',
- dt.datetime.now())
+ tokens.TokenizedStringEntry(0xcdab,
+ '%02d %s %c%%',
+ date_removed=dt.datetime.now())
]))
self.assertEqual(str(detok.detokenize(b'\xab\xcd\0\0\x02\x03Two\x66')),
'01 Two 3%')
@@ -140,7 +113,9 @@
def test_detokenize_extra_data_is_unsuccessful(self):
detok = detokenize.Detokenizer(
tokens.Database([
- tokens.TokenizedStringEntry(1, 'no args', dt.datetime(1, 1, 1))
+ tokens.TokenizedStringEntry(1,
+ 'no args',
+ date_removed=dt.datetime(1, 1, 1))
]))
result = detok.detokenize(b'\x01\0\0\0\x04args')
@@ -154,8 +129,11 @@
def test_detokenize_missing_data_is_unsuccessful(self):
detok = detokenize.Detokenizer(
- tokens.Database(
- [tokens.TokenizedStringEntry(2, '%s', dt.datetime(1, 1, 1))]))
+ tokens.Database([
+ tokens.TokenizedStringEntry(2,
+ '%s',
+ date_removed=dt.datetime(1, 1, 1))
+ ]))
result = detok.detokenize(b'\x02\0\0\0')
string, args, remaining = result.failures[0]
@@ -166,8 +144,11 @@
self.assertEqual('%s', str(result))
def test_detokenize_missing_data_with_errors_is_unsuccessful(self):
- detok = detokenize.Detokenizer(tokens.Database(
- [tokens.TokenizedStringEntry(2, '%s', dt.datetime(1, 1, 1))]),
+ detok = detokenize.Detokenizer(tokens.Database([
+ tokens.TokenizedStringEntry(2,
+ '%s',
+ date_removed=dt.datetime(1, 1, 1))
+ ]),
show_errors=True)
result = detok.detokenize(b'\x02\0\0\0')
@@ -181,8 +162,10 @@
def test_unparsed_data(self):
detok = detokenize.Detokenizer(
tokens.Database([
- tokens.TokenizedStringEntry(1, 'no args',
- dt.datetime(100, 1, 1)),
+ tokens.TokenizedStringEntry(1,
+ 'no args',
+ date_removed=dt.datetime(
+ 100, 1, 1)),
]))
result = detok.detokenize(b'\x01\0\0\0o_o')
self.assertFalse(result.ok())
@@ -289,7 +272,7 @@
expected_tokens = frozenset(detok.database.token_to_entries.keys())
csv_database = str(detok.database)
- self.assertEqual(len(csv_database.splitlines()), 17)
+ self.assertEqual(len(csv_database.splitlines()), TOKENS_IN_ELF)
csv_file = tempfile.NamedTemporaryFile('w', delete=False)
try:
@@ -327,10 +310,13 @@
# Database with several conflicting tokens.
self.detok = detokenize.Detokenizer(tokens.Database([
- tokens.TokenizedStringEntry(token, 'REMOVED', dt.datetime(9, 1, 1)),
+ tokens.TokenizedStringEntry(
+ token, 'REMOVED', date_removed=dt.datetime(9, 1, 1)),
tokens.TokenizedStringEntry(token, 'newer'),
- tokens.TokenizedStringEntry(token, 'A: %d', dt.datetime(30, 5, 9)),
- tokens.TokenizedStringEntry(token, 'B: %c', dt.datetime(30, 5, 10)),
+ tokens.TokenizedStringEntry(
+ token, 'A: %d', date_removed=dt.datetime(30, 5, 9)),
+ tokens.TokenizedStringEntry(
+ token, 'B: %c', date_removed=dt.datetime(30, 5, 10)),
tokens.TokenizedStringEntry(token, 'C: %s'),
tokens.TokenizedStringEntry(token, '%d%u'),
tokens.TokenizedStringEntry(token, '%s%u %d'),
@@ -400,7 +386,7 @@
db = database.load_token_database(
io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS))
- self.assertEqual(len(db), 17)
+ self.assertEqual(len(db), TOKENS_IN_ELF)
the_time = [100]
diff --git a/pw_tokenizer/py/elf_reader_test.py b/pw_tokenizer/py/elf_reader_test.py
index a65587f..2473182 100755
--- a/pw_tokenizer/py/elf_reader_test.py
+++ b/pw_tokenizer/py/elf_reader_test.py
@@ -125,9 +125,9 @@
self.assertEqual(section.size, size)
def test_dump_single_section(self):
- self.assertEqual(self._elf.dump_sections(r'\.test_section_1'),
+ self.assertEqual(self._elf.dump_section_contents(r'\.test_section_1'),
b'You cannot pass\0')
- self.assertEqual(self._elf.dump_sections(r'\.test_section_2'),
+ self.assertEqual(self._elf.dump_section_contents(r'\.test_section_2'),
b'\xef\xbe\xed\xfe')
def test_dump_multiple_sections(self):
@@ -137,7 +137,8 @@
else:
contents = b'\xef\xbe\xed\xfeYou cannot pass\0'
- self.assertIn(self._elf.dump_sections(r'.test_section_\d'), contents)
+ self.assertIn(self._elf.dump_section_contents(r'.test_section_\d'),
+ contents)
def test_read_values(self):
address = self._section('.test_section_1').address
@@ -247,9 +248,9 @@
def test_elf_reader_dump_single_section(self):
elf = elf_reader.Elf(self._archive)
- self.assertEqual(elf.dump_sections(r'\.test_section_1'),
+ self.assertEqual(elf.dump_section_contents(r'\.test_section_1'),
b'You cannot pass\0')
- self.assertEqual(elf.dump_sections(r'\.test_section_2'),
+ self.assertEqual(elf.dump_section_contents(r'\.test_section_2'),
b'\xef\xbe\xed\xfe')
def test_elf_reader_read_values(self):
diff --git a/pw_tokenizer/py/example_binary_with_tokenized_strings.elf b/pw_tokenizer/py/example_binary_with_tokenized_strings.elf
old mode 100644
new mode 100755
index 7313906..118c05a
--- a/pw_tokenizer/py/example_binary_with_tokenized_strings.elf
+++ b/pw_tokenizer/py/example_binary_with_tokenized_strings.elf
Binary files differ
diff --git a/pw_tokenizer/py/example_legacy_binary_with_tokenized_strings.elf b/pw_tokenizer/py/example_legacy_binary_with_tokenized_strings.elf
new file mode 100755
index 0000000..0fe2e60
--- /dev/null
+++ b/pw_tokenizer/py/example_legacy_binary_with_tokenized_strings.elf
Binary files differ
diff --git a/pw_tokenizer/py/pw_tokenizer/database.py b/pw_tokenizer/py/pw_tokenizer/database.py
index 3a41f34..5e23b6c 100755
--- a/pw_tokenizer/py/pw_tokenizer/database.py
+++ b/pw_tokenizer/py/pw_tokenizer/database.py
@@ -21,13 +21,15 @@
import argparse
from datetime import datetime
import glob
+import json
import logging
import os
from pathlib import Path
import re
import struct
import sys
-from typing import Callable, Dict, Iterable, List, Pattern, Set, Tuple
+from typing import (Callable, Dict, Iterable, Iterator, List, Pattern, Set,
+ TextIO, Tuple, Union)
try:
from pw_tokenizer import elf_reader, tokens
@@ -40,36 +42,117 @@
_LOG = logging.getLogger('pw_tokenizer')
-DEFAULT_DOMAIN = 'default'
-
def _elf_reader(elf) -> elf_reader.Elf:
return elf if isinstance(elf, elf_reader.Elf) else elf_reader.Elf(elf)
-def _read_strings_from_elf(elf, domain: str) -> Iterable[str]:
+# Magic number used to indicate the beginning of a tokenized string entry. This
+# value MUST match the value of _PW_TOKENIZER_ENTRY_MAGIC in
+# pw_tokenizer/public/pw_tokenizer/internal/tokenize_string.h.
+_TOKENIZED_ENTRY_MAGIC = 0xBAA98DEE
+_ENTRY = struct.Struct('<4I')
+_TOKENIZED_ENTRY_SECTIONS = re.compile(
+ r'^\.pw_tokenizer.entries(?:\.[_\d]+)?$')
+
+_LEGACY_STRING_SECTIONS = re.compile(
+ r'^\.pw_tokenized\.(?P<domain>[^.]+)(?:\.\d+)?$')
+
+_ERROR_HANDLER = 'surrogateescape' # How to deal with UTF-8 decoding errors
+
+
+class Error(Exception):
+ """Failed to extract token entries from an ELF file."""
+
+
+def _read_tokenized_entries(
+ data: bytes,
+ domain: Pattern[str]) -> Iterator[tokens.TokenizedStringEntry]:
+ index = 0
+
+ while index + _ENTRY.size <= len(data):
+ magic, token, domain_len, string_len = _ENTRY.unpack_from(data, index)
+
+ if magic != _TOKENIZED_ENTRY_MAGIC:
+ raise Error(
+ f'Expected magic number 0x{_TOKENIZED_ENTRY_MAGIC:08x}, '
+ f'found 0x{magic:08x}')
+
+ start = index + _ENTRY.size
+ index = start + domain_len + string_len
+
+ # Create the entries, trimming null terminators.
+ entry = tokens.TokenizedStringEntry(
+ token,
+ data[start + domain_len:index - 1].decode(errors=_ERROR_HANDLER),
+ data[start:start + domain_len - 1].decode(errors=_ERROR_HANDLER),
+ )
+
+ if data[start + domain_len - 1] != 0:
+ raise Error(
+ f'Domain {entry.domain} for {entry.string} not null terminated'
+ )
+
+ if data[index - 1] != 0:
+ raise Error(f'String {entry.string} is not null terminated')
+
+ if domain.fullmatch(entry.domain):
+ yield entry
+
+
+def _read_tokenized_strings(sections: Dict[str, bytes],
+ domain: Pattern[str]) -> Iterator[tokens.Database]:
+ # Legacy ELF files used "default" as the default domain instead of "". Remap
+ # the default if necessary.
+ if domain.pattern == tokens.DEFAULT_DOMAIN:
+ domain = re.compile('default')
+
+ for section, data in sections.items():
+ match = _LEGACY_STRING_SECTIONS.match(section)
+ if match and domain.match(match.group('domain')):
+ yield tokens.Database.from_strings(
+ (s.decode(errors=_ERROR_HANDLER) for s in data.split(b'\0')),
+ match.group('domain'))
+
+
+def _database_from_elf(elf, domain: Pattern[str]) -> tokens.Database:
"""Reads the tokenized strings from an elf_reader.Elf or ELF file object."""
_LOG.debug('Reading tokenized strings in domain "%s" from %s', domain, elf)
- sections = _elf_reader(elf).dump_sections(
- rf'^\.pw_tokenized\.{domain}(?:\.\d+)?$')
- if sections is not None:
- for string in sections.split(b'\0'):
- yield string.decode()
+ reader = _elf_reader(elf)
+
+ # Read tokenized string entries.
+ section_data = reader.dump_section_contents(_TOKENIZED_ENTRY_SECTIONS)
+ if section_data is not None:
+ return tokens.Database(_read_tokenized_entries(section_data, domain))
+
+ # Read legacy null-terminated string entries.
+ sections = reader.dump_sections(_LEGACY_STRING_SECTIONS)
+ if sections:
+ return tokens.Database.merged(
+ *_read_tokenized_strings(sections, domain))
+
+ return tokens.Database([])
-def tokenization_domains(elf) -> Iterable[str]:
+def tokenization_domains(elf) -> Iterator[str]:
"""Lists all tokenization domains in an ELF file."""
- tokenized_section = re.compile(r'\.pw_tokenized\.(?P<domain>.+)(?:\.\d+)?')
- for section in _elf_reader(elf).sections:
- match = tokenized_section.match(section.name)
- if match:
- yield match.group('domain')
+ reader = _elf_reader(elf)
+ section_data = reader.dump_section_contents(_TOKENIZED_ENTRY_SECTIONS)
+ if section_data is not None:
+ yield from frozenset(
+ e.domain
+ for e in _read_tokenized_entries(section_data, re.compile('.*')))
+ else: # Check for the legacy domain sections
+ for section in reader.sections:
+ match = _LEGACY_STRING_SECTIONS.match(section.name)
+ if match:
+ yield match.group('domain')
def read_tokenizer_metadata(elf) -> Dict[str, int]:
"""Reads the metadata entries from an ELF."""
- sections = _elf_reader(elf).dump_sections(r'\.pw_tokenizer_info')
+ sections = _elf_reader(elf).dump_section_contents(r'\.pw_tokenizer\.info')
metadata: Dict[str, int] = {}
if sections is not None:
@@ -83,7 +166,7 @@
return metadata
-def _load_token_database(db, domain: str) -> tokens.Database:
+def _load_token_database(db, domain: Pattern[str]) -> tokens.Database:
"""Loads a Database from a database object, ELF, CSV, or binary database."""
if db is None:
return tokens.Database()
@@ -92,7 +175,7 @@
return db
if isinstance(db, elf_reader.Elf):
- return tokens.Database.from_strings(_read_strings_from_elf(db, domain))
+ return _database_from_elf(db, domain)
# If it's a str, it might be a path. Check if it's an ELF or CSV.
if isinstance(db, (str, Path)):
@@ -103,15 +186,14 @@
# Read the path as an ELF file.
with open(db, 'rb') as fd:
if elf_reader.compatible_file(fd):
- return tokens.Database.from_strings(
- _read_strings_from_elf(fd, domain))
+ return _database_from_elf(fd, domain)
# Read the path as a packed binary or CSV file.
return tokens.DatabaseFile(db)
# Assume that it's a file object and check if it's an ELF.
if elf_reader.compatible_file(db):
- return tokens.Database.from_strings(_read_strings_from_elf(db, domain))
+ return _database_from_elf(db, domain)
# Read the database as CSV or packed binary from a file object's path.
if hasattr(db, 'name') and os.path.exists(db.name):
@@ -121,14 +203,17 @@
return tokens.Database(tokens.parse_csv(db))
-def load_token_database(*databases,
- domain: str = DEFAULT_DOMAIN) -> tokens.Database:
+def load_token_database(
+ *databases,
+ domain: Union[str,
+ Pattern[str]] = tokens.DEFAULT_DOMAIN) -> tokens.Database:
"""Loads a Database from database objects, ELFs, CSVs, or binary files."""
+ domain = re.compile(domain)
return tokens.Database.merged(*(_load_token_database(db, domain)
for db in databases))
-def generate_report(db: tokens.Database) -> Dict[str, int]:
+def database_summary(db: tokens.Database) -> Dict[str, int]:
"""Returns a simple report of properties of the database."""
present = [entry for entry in db.entries() if not entry.date_removed]
@@ -143,6 +228,31 @@
}
+_DatabaseReport = Dict[str, Dict[str, Dict[str, int]]]
+
+
+def generate_reports(paths: Iterable[Path]) -> _DatabaseReport:
+ """Returns a dictionary with information about the provided databases."""
+ reports: _DatabaseReport = {}
+
+ for path in paths:
+ with path.open('rb') as file:
+ if elf_reader.compatible_file(file):
+ domains = list(tokenization_domains(file))
+ else:
+ domains = ['']
+
+ domain_reports = {}
+
+ for domain in domains:
+ domain_reports[domain] = database_summary(
+ load_token_database(path, domain=domain))
+
+ reports[str(path)] = domain_reports
+
+ return reports
+
+
def _handle_create(databases, database, force, output_type, include, exclude,
replace):
"""Creates a token database file from one or more ELF files."""
@@ -202,27 +312,9 @@
_LOG.info('Removed %d entries from %s', len(purged), token_database.path)
-def _handle_report(token_database_or_elf, output):
- for path in token_database_or_elf:
- with path.open('rb') as file:
- if elf_reader.compatible_file(file):
- domains = list(tokenization_domains(file))
- else:
- domains = [path.name]
-
- for domain in domains:
- output.write(
- '[{name}]\n'
- ' Domain: {domain}\n'
- ' Entries present: {present_entries}\n'
- ' Size of strings: {present_size_bytes} B\n'
- ' Total entries: {total_entries}\n'
- ' Total size of strings: {total_size_bytes} B\n'
- ' Collisions: {collisions} tokens\n'.format(
- name=path,
- domain=domain,
- **generate_report(load_token_database(path,
- domain=domain))))
+def _handle_report(token_database_or_elf: List[Path], output: TextIO) -> None:
+ json.dump(generate_reports(token_database_or_elf), output, indent=2)
+ output.write('\n')
def expand_paths_or_globs(*paths_or_globs: str) -> Iterable[Path]:
@@ -250,15 +342,15 @@
setattr(namespace, self.dest, list(expand_paths_or_globs(*values)))
-def _read_elf_with_domain(elf: str, domain: str) -> Iterable[tokens.Database]:
+def _read_elf_with_domain(elf: str,
+ domain: Pattern[str]) -> Iterable[tokens.Database]:
for path in expand_paths_or_globs(elf):
with path.open('rb') as file:
if not elf_reader.compatible_file(file):
raise ValueError(f'{elf} is not an ELF file, '
f'but the "{domain}" domain was specified')
- yield tokens.Database.from_strings(
- _read_strings_from_elf(file, domain))
+ yield _database_from_elf(file, domain)
class LoadTokenDatabases(argparse.Action):
@@ -274,7 +366,9 @@
try:
for value in values:
if value.count('#') == 1:
- databases.extend(_read_elf_with_domain(*value.split('#')))
+ path, domain = value.split('#')
+ domain = re.compile(domain)
+ databases.extend(_read_elf_with_domain(path, domain))
else:
paths.update(expand_paths_or_globs(value))
@@ -305,8 +399,8 @@
'tokens. For ELF files, the tokenization domain to read from '
'may specified after the path as #domain_name (e.g. '
'foo.elf#TEST_DOMAIN). Unless specified, only the default '
- 'domain is read from ELF files; .* reads all domains. Globs are '
- 'expanded to compatible database files.'))
+ 'domain ("") is read from ELF files; .* reads all domains. '
+ 'Globs are expanded to compatible database files.'))
return parser
diff --git a/pw_tokenizer/py/pw_tokenizer/elf_reader.py b/pw_tokenizer/py/pw_tokenizer/elf_reader.py
index 2a3ac3b..a917c5b 100755
--- a/pw_tokenizer/py/pw_tokenizer/elf_reader.py
+++ b/pw_tokenizer/py/pw_tokenizer/elf_reader.py
@@ -304,17 +304,23 @@
return self._elf.read(size)
- def dump_sections(self, name: Union[str, Pattern[str]]) -> Optional[bytes]:
+ def dump_sections(self, name: Union[str,
+ Pattern[str]]) -> Dict[str, bytes]:
"""Dumps a binary string containing the sections matching the regex."""
name_regex = re.compile(name)
- sections = []
+ sections: Dict[str, bytes] = {}
for section in self.sections:
if name_regex.match(section.name):
self._elf.seek(section.file_offset + section.offset)
- sections.append(self._elf.read(section.size))
+ sections[section.name] = self._elf.read(section.size)
- return b''.join(sections) if sections else None
+ return sections
+
+ def dump_section_contents(
+ self, name: Union[str, Pattern[str]]) -> Optional[bytes]:
+ sections = self.dump_sections(name)
+ return b''.join(sections.values()) if sections else None
def summary(self) -> str:
return '\n'.join(
@@ -342,7 +348,7 @@
return
for section_pattern in sections:
- output(elf.dump_sections(section_pattern))
+ output(elf.dump_section_contents(section_pattern))
def _parse_args() -> argparse.Namespace:
diff --git a/pw_tokenizer/py/pw_tokenizer/tokens.py b/pw_tokenizer/py/pw_tokenizer/tokens.py
index 15d5c13..a0c9b66 100644
--- a/pw_tokenizer/py/pw_tokenizer/tokens.py
+++ b/pw_tokenizer/py/pw_tokenizer/tokens.py
@@ -15,6 +15,7 @@
import collections
import csv
+from dataclasses import dataclass
from datetime import datetime
import io
import logging
@@ -25,10 +26,16 @@
from typing import Optional, Pattern, Tuple, Union, ValuesView
DATE_FORMAT = '%Y-%m-%d'
+DEFAULT_DOMAIN = ''
-# The default hash length to use. This MUST match the default value of
-# PW_TOKENIZER_CFG_HASH_LENGTH in pw_tokenizer/public/pw_tokenizer/config.h.
-DEFAULT_HASH_LENGTH = 128
+# The default hash length to use. This value only applies when hashing strings
+# from a legacy-style ELF with plain strings. New tokenized string entries
+# include the token alongside the string.
+#
+# This MUST match the default value of PW_TOKENIZER_CFG_C_HASH_LENGTH in
+# pw_tokenizer/public/pw_tokenizer/config.h.
+DEFAULT_C_HASH_LENGTH = 128
+
TOKENIZER_HASH_CONSTANT = 65599
_LOG = logging.getLogger('pw_tokenizer')
@@ -40,7 +47,11 @@
def pw_tokenizer_65599_fixed_length_hash(string: Union[str, bytes],
hash_length: int) -> int:
- """Hashes the provided string."""
+ """Hashes the provided string.
+
+ This hash function is only used when adding tokens from legacy-style
+ tokenized strings in an ELF, which do not include the token.
+ """
hash_value = len(string)
coefficient = TOKENIZER_HASH_CONSTANT
@@ -52,25 +63,26 @@
def default_hash(string: Union[str, bytes]) -> int:
- return pw_tokenizer_65599_fixed_length_hash(string, DEFAULT_HASH_LENGTH)
+ return pw_tokenizer_65599_fixed_length_hash(string, DEFAULT_C_HASH_LENGTH)
-_EntryKey = Tuple[int, str] # Key for uniquely referring to an entry
+class _EntryKey(NamedTuple):
+ """Uniquely refers to an entry."""
+ token: int
+ string: str
+@dataclass(eq=True, order=False)
class TokenizedStringEntry:
"""A tokenized string with its metadata."""
- def __init__(self,
- token: int,
- string: str,
- date_removed: Optional[datetime] = None):
- self.token = token
- self.string = string
- self.date_removed = date_removed
+ token: int
+ string: str
+ domain: str = DEFAULT_DOMAIN
+ date_removed: Optional[datetime] = None
def key(self) -> _EntryKey:
"""The key determines uniqueness for a tokenized string."""
- return self.token, self.string
+ return _EntryKey(self.token, self.string)
def update_date_removed(self,
new_date_removed: Optional[datetime]) -> None:
@@ -98,22 +110,16 @@
def __str__(self) -> str:
return self.string
- def __repr__(self) -> str:
- return '{}({!r})'.format(type(self).__name__, self.string)
-
class Database:
"""Database of tokenized strings stored as TokenizedStringEntry objects."""
- def __init__(self,
- entries: Iterable[TokenizedStringEntry] = (),
- tokenize: Callable[[str], int] = default_hash):
+ def __init__(self, entries: Iterable[TokenizedStringEntry] = ()):
"""Creates a token database."""
# The database dict stores each unique (token, string) entry.
self._database: Dict[_EntryKey, TokenizedStringEntry] = {
entry.key(): entry
for entry in entries
}
- self.tokenize = tokenize
# This is a cache for fast token lookup that is built as needed.
self._cache: Optional[Dict[int, List[TokenizedStringEntry]]] = None
@@ -122,10 +128,11 @@
def from_strings(
cls,
strings: Iterable[str],
+ domain: str = DEFAULT_DOMAIN,
tokenize: Callable[[str], int] = default_hash) -> 'Database':
"""Creates a Database from an iterable of strings."""
- return cls((TokenizedStringEntry(tokenize(string), string)
- for string in strings), tokenize)
+ return cls((TokenizedStringEntry(tokenize(string), string, domain)
+ for string in strings))
@classmethod
def merged(cls, *databases: 'Database') -> 'Database':
@@ -164,7 +171,7 @@
The strings are assumed to represent the complete set of strings for the
database. Strings currently in the database not present in the provided
strings are marked with a removal date but remain in the database.
- Strings in all_strings missing from the database are NOT added; call the
+ Strings in all_strings missing from the database are NOT ; call the
add function to add these strings.
Args:
@@ -194,20 +201,29 @@
return removed
- def add(self, strings: Iterable[str]) -> None:
- """Adds new strings to the database."""
+ def add(self,
+ entries: Iterable[Union[str, TokenizedStringEntry]],
+ tokenize: Callable[[str], int] = default_hash) -> None:
+ """Adds new entries or strings to the database."""
self._cache = None
# Add new and update previously removed entries.
- for string in strings:
- key = self.tokenize(string), string
+ for new_entry in entries:
+ # Handle legacy plain string entries, which need to be hashed.
+ if isinstance(new_entry, str):
+ key = _EntryKey(tokenize(new_entry), new_entry)
+ domain = DEFAULT_DOMAIN
+ else:
+ key = _EntryKey(new_entry.token, new_entry.string)
+ domain = new_entry.domain
try:
entry = self._database[key]
if entry.date_removed:
entry.date_removed = None
except KeyError:
- self._database[key] = TokenizedStringEntry(key[0], string)
+ self._database[key] = TokenizedStringEntry(
+ key.token, key.string, domain)
def purge(
self,
@@ -250,11 +266,11 @@
) -> None:
"""Filters the database using regular expressions (strings or compiled).
- Args:
- include: iterable of regexes; only entries matching at least one are kept
- exclude: iterable of regexes; entries matching any of these are removed
- replace: iterable of (regex, str); replaces matching terms in all entries
- """
+ Args:
+ include: regexes; only entries matching at least one are kept
+ exclude: regexes; entries matching any of these are removed
+ replace: (regex, str) tuples; replaces matching terms in all entries
+ """
self._cache = None
to_delete: List[_EntryKey] = []
@@ -300,7 +316,8 @@
date = (datetime.strptime(date_str, DATE_FORMAT)
if date_str.strip() else None)
- yield TokenizedStringEntry(token, string_literal, date)
+ yield TokenizedStringEntry(token, string_literal, DEFAULT_DOMAIN,
+ date)
except (ValueError, UnicodeDecodeError) as err:
_LOG.error('Failed to parse tokenized string entry %s: %s', line,
err)
@@ -373,7 +390,7 @@
offset = 0
for token, removed in entries:
string, offset = read_string(offset)
- yield TokenizedStringEntry(token, string, removed)
+ yield TokenizedStringEntry(token, string, DEFAULT_DOMAIN, removed)
def write_binary(database: Database, fd: BinaryIO) -> None:
@@ -410,9 +427,9 @@
class DatabaseFile(Database):
"""A token database that is associated with a particular file.
- This class adds the write_to_file() method that writes to file from which it
- was created in the correct format (CSV or binary).
- """
+ This class adds the write_to_file() method that writes to file from which it
+ was created in the correct format (CSV or binary).
+ """
def __init__(self, path: Union[Path, str]):
self.path = Path(path)
diff --git a/pw_tokenizer/py/tokens_test.py b/pw_tokenizer/py/tokens_test.py
index c80a9f7..0154ca7 100755
--- a/pw_tokenizer/py/tokens_test.py
+++ b/pw_tokenizer/py/tokens_test.py
@@ -20,7 +20,7 @@
import unittest
from pw_tokenizer import tokens
-from pw_tokenizer.tokens import _LOG
+from pw_tokenizer.tokens import default_hash, _LOG
CSV_DATABASE = '''\
00000000,2019-06-10,""
@@ -192,8 +192,10 @@
# Test basic merging into an empty database.
db.merge(
tokens.Database([
- tokens.TokenizedStringEntry(1, 'one', datetime.datetime.min),
- tokens.TokenizedStringEntry(2, 'two', datetime.datetime.min),
+ tokens.TokenizedStringEntry(
+ 1, 'one', date_removed=datetime.datetime.min),
+ tokens.TokenizedStringEntry(
+ 2, 'two', date_removed=datetime.datetime.min),
]))
self.assertEqual({str(e) for e in db.entries()}, {'one', 'two'})
self.assertEqual(db.token_to_entries[1][0].date_removed,
@@ -205,7 +207,8 @@
db.merge(
tokens.Database([
tokens.TokenizedStringEntry(3, 'three'),
- tokens.TokenizedStringEntry(4, 'four', datetime.datetime.min),
+ tokens.TokenizedStringEntry(
+ 4, 'four', date_removed=datetime.datetime.min),
]))
self.assertEqual({str(e)
for e in db.entries()},
@@ -228,8 +231,10 @@
# Merge in repeated entries different removal dates.
db.merge(
tokens.Database([
- tokens.TokenizedStringEntry(4, 'four', datetime.datetime.max),
- tokens.TokenizedStringEntry(5, 'five', datetime.datetime.max),
+ tokens.TokenizedStringEntry(
+ 4, 'four', date_removed=datetime.datetime.max),
+ tokens.TokenizedStringEntry(
+ 5, 'five', date_removed=datetime.datetime.max),
]))
self.assertEqual(len(db.entries()), 5)
self.assertEqual({str(e)
@@ -258,28 +263,41 @@
for e in db.entries()},
{'one', 'two', 'three', 'four', 'five'})
- def test_merge_multiple(self):
+ def test_merge_multiple_datbases_in_one_call(self):
+ """Tests the merge and merged methods with multiple databases."""
db = tokens.Database.merged(
- tokens.Database(
- [tokens.TokenizedStringEntry(1, 'one',
- datetime.datetime.max)]),
- tokens.Database(
- [tokens.TokenizedStringEntry(2, 'two',
- datetime.datetime.min)]),
- tokens.Database(
- [tokens.TokenizedStringEntry(1, 'one',
- datetime.datetime.min)]))
+ tokens.Database([
+ tokens.TokenizedStringEntry(1,
+ 'one',
+ date_removed=datetime.datetime.max)
+ ]),
+ tokens.Database([
+ tokens.TokenizedStringEntry(2,
+ 'two',
+ date_removed=datetime.datetime.min)
+ ]),
+ tokens.Database([
+ tokens.TokenizedStringEntry(1,
+ 'one',
+ date_removed=datetime.datetime.min)
+ ]))
self.assertEqual({str(e) for e in db.entries()}, {'one', 'two'})
db.merge(
tokens.Database([
- tokens.TokenizedStringEntry(4, 'four', datetime.datetime.max)
+ tokens.TokenizedStringEntry(4,
+ 'four',
+ date_removed=datetime.datetime.max)
]),
- tokens.Database(
- [tokens.TokenizedStringEntry(2, 'two',
- datetime.datetime.max)]),
tokens.Database([
- tokens.TokenizedStringEntry(3, 'three', datetime.datetime.min)
+ tokens.TokenizedStringEntry(2,
+ 'two',
+ date_removed=datetime.datetime.max)
+ ]),
+ tokens.Database([
+ tokens.TokenizedStringEntry(3,
+ 'three',
+ date_removed=datetime.datetime.min)
]))
self.assertEqual({str(e)
for e in db.entries()},
@@ -299,6 +317,7 @@
self.assertEqual(len(db.token_to_entries), 17)
def test_mark_removals(self):
+ """Tests that date_removed field is set by mark_removals."""
db = tokens.Database.from_strings(
['MILK', 'apples', 'oranges', 'CHEESE', 'pears'])
@@ -309,26 +328,28 @@
db.mark_removals(['apples', 'oranges', 'pears'], date_1)
self.assertEqual(
- db.token_to_entries[db.tokenize('MILK')][0].date_removed, date_1)
+ db.token_to_entries[default_hash('MILK')][0].date_removed, date_1)
self.assertEqual(
- db.token_to_entries[db.tokenize('CHEESE')][0].date_removed, date_1)
+ db.token_to_entries[default_hash('CHEESE')][0].date_removed,
+ date_1)
now = datetime.datetime.now()
db.mark_removals(['MILK', 'CHEESE', 'pears'])
# New strings are not added or re-added in mark_removed().
self.assertGreaterEqual(
- db.token_to_entries[db.tokenize('MILK')][0].date_removed, date_1)
+ db.token_to_entries[default_hash('MILK')][0].date_removed, date_1)
self.assertGreaterEqual(
- db.token_to_entries[db.tokenize('CHEESE')][0].date_removed, date_1)
+ db.token_to_entries[default_hash('CHEESE')][0].date_removed,
+ date_1)
# These strings were removed.
self.assertGreaterEqual(
- db.token_to_entries[db.tokenize('apples')][0].date_removed, now)
+ db.token_to_entries[default_hash('apples')][0].date_removed, now)
self.assertGreaterEqual(
- db.token_to_entries[db.tokenize('oranges')][0].date_removed, now)
+ db.token_to_entries[default_hash('oranges')][0].date_removed, now)
self.assertIsNone(
- db.token_to_entries[db.tokenize('pears')][0].date_removed)
+ db.token_to_entries[default_hash('pears')][0].date_removed)
def test_add(self):
db = tokens.Database()
@@ -367,8 +388,6 @@
class TestFilter(unittest.TestCase):
"""Tests the filtering functionality."""
def setUp(self):
- super().setUp()
-
self.db = tokens.Database([
tokens.TokenizedStringEntry(1, 'Luke'),
tokens.TokenizedStringEntry(2, 'Leia'),