blob: 983bd49a117763b699e45c32c51a0544c73ed0e8 [file] [log] [blame]
R David Murrayc27e5222012-05-25 15:01:48 -04001import io
R David Murray3edd22a2011-04-18 13:59:37 -04002import types
R David Murrayc27e5222012-05-25 15:01:48 -04003import textwrap
R David Murray3edd22a2011-04-18 13:59:37 -04004import unittest
5import email.policy
R David Murrayc27e5222012-05-25 15:01:48 -04006import email.parser
7import email.generator
R David Murrayea976682012-05-27 15:03:38 -04008from email import headerregistry
R David Murray0b6f6c82012-05-25 18:42:14 -04009
10def make_defaults(base_defaults, differences):
11 defaults = base_defaults.copy()
12 defaults.update(differences)
13 return defaults
R David Murray3edd22a2011-04-18 13:59:37 -040014
15class PolicyAPITests(unittest.TestCase):
16
17 longMessage = True
18
R David Murray0b6f6c82012-05-25 18:42:14 -040019 # Base default values.
20 compat32_defaults = {
R David Murray3edd22a2011-04-18 13:59:37 -040021 'max_line_length': 78,
22 'linesep': '\n',
R David Murrayc27e5222012-05-25 15:01:48 -040023 'cte_type': '8bit',
R David Murray3edd22a2011-04-18 13:59:37 -040024 'raise_on_defect': False,
25 }
R David Murray0b6f6c82012-05-25 18:42:14 -040026 # These default values are the ones set on email.policy.default.
27 # If any of these defaults change, the docs must be updated.
28 policy_defaults = compat32_defaults.copy()
29 policy_defaults.update({
30 'raise_on_defect': False,
31 'header_factory': email.policy.EmailPolicy.header_factory,
32 'refold_source': 'long',
33 })
R David Murray3edd22a2011-04-18 13:59:37 -040034
R David Murray0b6f6c82012-05-25 18:42:14 -040035 # For each policy under test, we give here what we expect the defaults to
36 # be for that policy. The second argument to make defaults is the
37 # difference between the base defaults and that for the particular policy.
38 new_policy = email.policy.EmailPolicy()
R David Murray3edd22a2011-04-18 13:59:37 -040039 policies = {
R David Murray0b6f6c82012-05-25 18:42:14 -040040 email.policy.compat32: make_defaults(compat32_defaults, {}),
41 email.policy.default: make_defaults(policy_defaults, {}),
42 email.policy.SMTP: make_defaults(policy_defaults,
43 {'linesep': '\r\n'}),
44 email.policy.HTTP: make_defaults(policy_defaults,
45 {'linesep': '\r\n',
46 'max_line_length': None}),
47 email.policy.strict: make_defaults(policy_defaults,
48 {'raise_on_defect': True}),
49 new_policy: make_defaults(policy_defaults, {}),
R David Murray3edd22a2011-04-18 13:59:37 -040050 }
R David Murray0b6f6c82012-05-25 18:42:14 -040051 # Creating a new policy creates a new header factory. There is a test
52 # later that proves this.
53 policies[new_policy]['header_factory'] = new_policy.header_factory
R David Murray3edd22a2011-04-18 13:59:37 -040054
55 def test_defaults(self):
R David Murray0b6f6c82012-05-25 18:42:14 -040056 for policy, expected in self.policies.items():
R David Murray3edd22a2011-04-18 13:59:37 -040057 for attr, value in expected.items():
58 self.assertEqual(getattr(policy, attr), value,
59 ("change {} docs/docstrings if defaults have "
60 "changed").format(policy))
61
62 def test_all_attributes_covered(self):
R David Murray0b6f6c82012-05-25 18:42:14 -040063 for policy, expected in self.policies.items():
64 for attr in dir(policy):
65 if (attr.startswith('_') or
66 isinstance(getattr(email.policy.EmailPolicy, attr),
67 types.FunctionType)):
68 continue
69 else:
70 self.assertIn(attr, expected,
71 "{} is not fully tested".format(attr))
R David Murray3edd22a2011-04-18 13:59:37 -040072
R David Murrayc27e5222012-05-25 15:01:48 -040073 def test_abc(self):
74 with self.assertRaises(TypeError) as cm:
75 email.policy.Policy()
76 msg = str(cm.exception)
77 abstract_methods = ('fold',
78 'fold_binary',
79 'header_fetch_parse',
80 'header_source_parse',
81 'header_store_parse')
82 for method in abstract_methods:
83 self.assertIn(method, msg)
84
R David Murray3edd22a2011-04-18 13:59:37 -040085 def test_policy_is_immutable(self):
R David Murray0b6f6c82012-05-25 18:42:14 -040086 for policy, defaults in self.policies.items():
87 for attr in defaults:
R David Murray3edd22a2011-04-18 13:59:37 -040088 with self.assertRaisesRegex(AttributeError, attr+".*read-only"):
89 setattr(policy, attr, None)
90 with self.assertRaisesRegex(AttributeError, 'no attribute.*foo'):
91 policy.foo = None
92
R David Murray0b6f6c82012-05-25 18:42:14 -040093 def test_set_policy_attrs_when_cloned(self):
94 # None of the attributes has a default value of None, so we set them
95 # all to None in the clone call and check that it worked.
96 for policyclass, defaults in self.policies.items():
97 testattrdict = {attr: None for attr in defaults}
R David Murray3edd22a2011-04-18 13:59:37 -040098 policy = policyclass.clone(**testattrdict)
R David Murray0b6f6c82012-05-25 18:42:14 -040099 for attr in defaults:
R David Murray3edd22a2011-04-18 13:59:37 -0400100 self.assertIsNone(getattr(policy, attr))
101
102 def test_reject_non_policy_keyword_when_called(self):
103 for policyclass in self.policies:
104 with self.assertRaises(TypeError):
105 policyclass(this_keyword_should_not_be_valid=None)
106 with self.assertRaises(TypeError):
107 policyclass(newtline=None)
108
109 def test_policy_addition(self):
110 expected = self.policy_defaults.copy()
111 p1 = email.policy.default.clone(max_line_length=100)
112 p2 = email.policy.default.clone(max_line_length=50)
113 added = p1 + p2
114 expected.update(max_line_length=50)
115 for attr, value in expected.items():
116 self.assertEqual(getattr(added, attr), value)
117 added = p2 + p1
118 expected.update(max_line_length=100)
119 for attr, value in expected.items():
120 self.assertEqual(getattr(added, attr), value)
121 added = added + email.policy.default
122 for attr, value in expected.items():
123 self.assertEqual(getattr(added, attr), value)
124
125 def test_register_defect(self):
126 class Dummy:
127 def __init__(self):
128 self.defects = []
129 obj = Dummy()
130 defect = object()
R David Murray0b6f6c82012-05-25 18:42:14 -0400131 policy = email.policy.EmailPolicy()
R David Murray3edd22a2011-04-18 13:59:37 -0400132 policy.register_defect(obj, defect)
133 self.assertEqual(obj.defects, [defect])
134 defect2 = object()
135 policy.register_defect(obj, defect2)
136 self.assertEqual(obj.defects, [defect, defect2])
137
138 class MyObj:
139 def __init__(self):
140 self.defects = []
141
142 class MyDefect(Exception):
143 pass
144
145 def test_handle_defect_raises_on_strict(self):
146 foo = self.MyObj()
147 defect = self.MyDefect("the telly is broken")
148 with self.assertRaisesRegex(self.MyDefect, "the telly is broken"):
149 email.policy.strict.handle_defect(foo, defect)
150
151 def test_handle_defect_registers_defect(self):
152 foo = self.MyObj()
153 defect1 = self.MyDefect("one")
154 email.policy.default.handle_defect(foo, defect1)
155 self.assertEqual(foo.defects, [defect1])
156 defect2 = self.MyDefect("two")
157 email.policy.default.handle_defect(foo, defect2)
158 self.assertEqual(foo.defects, [defect1, defect2])
159
R David Murray0b6f6c82012-05-25 18:42:14 -0400160 class MyPolicy(email.policy.EmailPolicy):
R David Murray7104c722012-03-16 21:39:57 -0400161 defects = None
162 def __init__(self, *args, **kw):
163 super().__init__(*args, defects=[], **kw)
R David Murray3edd22a2011-04-18 13:59:37 -0400164 def register_defect(self, obj, defect):
165 self.defects.append(defect)
166
167 def test_overridden_register_defect_still_raises(self):
168 foo = self.MyObj()
169 defect = self.MyDefect("the telly is broken")
170 with self.assertRaisesRegex(self.MyDefect, "the telly is broken"):
171 self.MyPolicy(raise_on_defect=True).handle_defect(foo, defect)
172
173 def test_overriden_register_defect_works(self):
174 foo = self.MyObj()
175 defect1 = self.MyDefect("one")
176 my_policy = self.MyPolicy()
177 my_policy.handle_defect(foo, defect1)
178 self.assertEqual(my_policy.defects, [defect1])
179 self.assertEqual(foo.defects, [])
180 defect2 = self.MyDefect("two")
181 my_policy.handle_defect(foo, defect2)
182 self.assertEqual(my_policy.defects, [defect1, defect2])
183 self.assertEqual(foo.defects, [])
184
R David Murray0b6f6c82012-05-25 18:42:14 -0400185 def test_default_header_factory(self):
186 h = email.policy.default.header_factory('Test', 'test')
187 self.assertEqual(h.name, 'Test')
R David Murrayea976682012-05-27 15:03:38 -0400188 self.assertIsInstance(h, headerregistry.UnstructuredHeader)
189 self.assertIsInstance(h, headerregistry.BaseHeader)
R David Murray0b6f6c82012-05-25 18:42:14 -0400190
191 class Foo:
R David Murrayea976682012-05-27 15:03:38 -0400192 parse = headerregistry.UnstructuredHeader.parse
R David Murray0b6f6c82012-05-25 18:42:14 -0400193
194 def test_each_Policy_gets_unique_factory(self):
195 policy1 = email.policy.EmailPolicy()
196 policy2 = email.policy.EmailPolicy()
197 policy1.header_factory.map_to_type('foo', self.Foo)
198 h = policy1.header_factory('foo', 'test')
199 self.assertIsInstance(h, self.Foo)
R David Murrayea976682012-05-27 15:03:38 -0400200 self.assertNotIsInstance(h, headerregistry.UnstructuredHeader)
R David Murray0b6f6c82012-05-25 18:42:14 -0400201 h = policy2.header_factory('foo', 'test')
202 self.assertNotIsInstance(h, self.Foo)
R David Murrayea976682012-05-27 15:03:38 -0400203 self.assertIsInstance(h, headerregistry.UnstructuredHeader)
R David Murray0b6f6c82012-05-25 18:42:14 -0400204
205 def test_clone_copies_factory(self):
206 policy1 = email.policy.EmailPolicy()
207 policy2 = policy1.clone()
208 policy1.header_factory.map_to_type('foo', self.Foo)
209 h = policy1.header_factory('foo', 'test')
210 self.assertIsInstance(h, self.Foo)
211 h = policy2.header_factory('foo', 'test')
212 self.assertIsInstance(h, self.Foo)
213
214 def test_new_factory_overrides_default(self):
215 mypolicy = email.policy.EmailPolicy()
216 myfactory = mypolicy.header_factory
217 newpolicy = mypolicy + email.policy.strict
218 self.assertEqual(newpolicy.header_factory, myfactory)
219 newpolicy = email.policy.strict + mypolicy
220 self.assertEqual(newpolicy.header_factory, myfactory)
221
222 def test_adding_default_policies_preserves_default_factory(self):
223 newpolicy = email.policy.default + email.policy.strict
224 self.assertEqual(newpolicy.header_factory,
225 email.policy.EmailPolicy.header_factory)
226 self.assertEqual(newpolicy.__dict__, {'raise_on_defect': True})
227
R David Murray3edd22a2011-04-18 13:59:37 -0400228 # XXX: Need subclassing tests.
229 # For adding subclassed objects, make sure the usual rules apply (subclass
230 # wins), but that the order still works (right overrides left).
231
R David Murrayc27e5222012-05-25 15:01:48 -0400232
233class TestPolicyPropagation(unittest.TestCase):
234
235 # The abstract methods are used by the parser but not by the wrapper
236 # functions that call it, so if the exception gets raised we know that the
237 # policy was actually propagated all the way to feedparser.
238 class MyPolicy(email.policy.Policy):
239 def badmethod(self, *args, **kw):
240 raise Exception("test")
241 fold = fold_binary = header_fetch_parser = badmethod
242 header_source_parse = header_store_parse = badmethod
243
244 def test_message_from_string(self):
245 with self.assertRaisesRegex(Exception, "^test$"):
246 email.message_from_string("Subject: test\n\n",
247 policy=self.MyPolicy)
248
249 def test_message_from_bytes(self):
250 with self.assertRaisesRegex(Exception, "^test$"):
251 email.message_from_bytes(b"Subject: test\n\n",
252 policy=self.MyPolicy)
253
254 def test_message_from_file(self):
255 f = io.StringIO('Subject: test\n\n')
256 with self.assertRaisesRegex(Exception, "^test$"):
257 email.message_from_file(f, policy=self.MyPolicy)
258
259 def test_message_from_binary_file(self):
260 f = io.BytesIO(b'Subject: test\n\n')
261 with self.assertRaisesRegex(Exception, "^test$"):
262 email.message_from_binary_file(f, policy=self.MyPolicy)
263
264 # These are redundant, but we need them for black-box completeness.
265
266 def test_parser(self):
267 p = email.parser.Parser(policy=self.MyPolicy)
268 with self.assertRaisesRegex(Exception, "^test$"):
269 p.parsestr('Subject: test\n\n')
270
271 def test_bytes_parser(self):
272 p = email.parser.BytesParser(policy=self.MyPolicy)
273 with self.assertRaisesRegex(Exception, "^test$"):
274 p.parsebytes(b'Subject: test\n\n')
275
276 # Now that we've established that all the parse methods get the
277 # policy in to feedparser, we can use message_from_string for
278 # the rest of the propagation tests.
279
280 def _make_msg(self, source='Subject: test\n\n', policy=None):
281 self.policy = email.policy.default.clone() if policy is None else policy
282 return email.message_from_string(source, policy=self.policy)
283
284 def test_parser_propagates_policy_to_message(self):
285 msg = self._make_msg()
286 self.assertIs(msg.policy, self.policy)
287
288 def test_parser_propagates_policy_to_sub_messages(self):
289 msg = self._make_msg(textwrap.dedent("""\
290 Subject: mime test
291 MIME-Version: 1.0
292 Content-Type: multipart/mixed, boundary="XXX"
293
294 --XXX
295 Content-Type: text/plain
296
297 test
298 --XXX
299 Content-Type: text/plain
300
301 test2
302 --XXX--
303 """))
304 for part in msg.walk():
305 self.assertIs(part.policy, self.policy)
306
307 def test_message_policy_propagates_to_generator(self):
308 msg = self._make_msg("Subject: test\nTo: foo\n\n",
309 policy=email.policy.default.clone(linesep='X'))
310 s = io.StringIO()
311 g = email.generator.Generator(s)
312 g.flatten(msg)
313 self.assertEqual(s.getvalue(), "Subject: testXTo: fooXX")
314
315 def test_message_policy_used_by_as_string(self):
316 msg = self._make_msg("Subject: test\nTo: foo\n\n",
317 policy=email.policy.default.clone(linesep='X'))
318 self.assertEqual(msg.as_string(), "Subject: testXTo: fooXX")
319
320
R David Murray3edd22a2011-04-18 13:59:37 -0400321if __name__ == '__main__':
322 unittest.main()