blob: a387f61f1e57c14a4fdf772397f2b2cf14b7b23f [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
2import datetime
3import textwrap
4import unittest
Antoine Pitroude609182010-11-18 17:29:23 +00005import functools
Antoine Pitrou69ab9512010-09-29 15:03:40 +00006import contextlib
Antoine Pitroude609182010-11-18 17:29:23 +00007import collections
Antoine Pitrou69ab9512010-09-29 15:03:40 +00008from test import support
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00009from nntplib import NNTP, GroupInfo, _have_ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +000010import nntplib
Antoine Pitrou1cb121e2010-11-09 18:54:37 +000011if _have_ssl:
12 import ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +000013
14TIMEOUT = 30
15
16# TODO:
17# - test the `file` arg to more commands
18# - test error conditions
Antoine Pitroua5785b12010-09-29 16:19:50 +000019# - test auth and `usenetrc`
Antoine Pitrou69ab9512010-09-29 15:03:40 +000020
21
22class NetworkedNNTPTestsMixin:
23
24 def test_welcome(self):
25 welcome = self.server.getwelcome()
26 self.assertEqual(str, type(welcome))
27
28 def test_help(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000029 resp, lines = self.server.help()
Antoine Pitrou69ab9512010-09-29 15:03:40 +000030 self.assertTrue(resp.startswith("100 "), resp)
Antoine Pitrou08eeada2010-11-04 21:36:15 +000031 for line in lines:
Antoine Pitrou69ab9512010-09-29 15:03:40 +000032 self.assertEqual(str, type(line))
33
34 def test_list(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000035 resp, groups = self.server.list()
36 if len(groups) > 0:
37 self.assertEqual(GroupInfo, type(groups[0]))
38 self.assertEqual(str, type(groups[0].group))
39
40 def test_list_active(self):
41 resp, groups = self.server.list(self.GROUP_PAT)
42 if len(groups) > 0:
43 self.assertEqual(GroupInfo, type(groups[0]))
44 self.assertEqual(str, type(groups[0].group))
Antoine Pitrou69ab9512010-09-29 15:03:40 +000045
46 def test_unknown_command(self):
47 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
48 self.server._shortcmd("XYZZY")
49 resp = cm.exception.response
50 self.assertTrue(resp.startswith("500 "), resp)
51
52 def test_newgroups(self):
53 # gmane gets a constant influx of new groups. In order not to stress
54 # the server too much, we choose a recent date in the past.
55 dt = datetime.date.today() - datetime.timedelta(days=7)
56 resp, groups = self.server.newgroups(dt)
57 if len(groups) > 0:
58 self.assertIsInstance(groups[0], GroupInfo)
59 self.assertIsInstance(groups[0].group, str)
60
61 def test_description(self):
62 def _check_desc(desc):
63 # Sanity checks
64 self.assertIsInstance(desc, str)
65 self.assertNotIn(self.GROUP_NAME, desc)
66 desc = self.server.description(self.GROUP_NAME)
67 _check_desc(desc)
68 # Another sanity check
69 self.assertIn("Python", desc)
70 # With a pattern
71 desc = self.server.description(self.GROUP_PAT)
72 _check_desc(desc)
73 # Shouldn't exist
74 desc = self.server.description("zk.brrtt.baz")
75 self.assertEqual(desc, '')
76
77 def test_descriptions(self):
78 resp, descs = self.server.descriptions(self.GROUP_PAT)
79 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
80 self.assertTrue(
81 resp.startswith("215 ") or resp.startswith("282 "), resp)
82 self.assertIsInstance(descs, dict)
83 desc = descs[self.GROUP_NAME]
84 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
85
86 def test_group(self):
87 result = self.server.group(self.GROUP_NAME)
88 self.assertEqual(5, len(result))
89 resp, count, first, last, group = result
90 self.assertEqual(group, self.GROUP_NAME)
91 self.assertIsInstance(count, int)
92 self.assertIsInstance(first, int)
93 self.assertIsInstance(last, int)
94 self.assertLessEqual(first, last)
95 self.assertTrue(resp.startswith("211 "), resp)
96
97 def test_date(self):
98 resp, date = self.server.date()
99 self.assertIsInstance(date, datetime.datetime)
100 # Sanity check
101 self.assertGreaterEqual(date.year, 1995)
102 self.assertLessEqual(date.year, 2030)
103
104 def _check_art_dict(self, art_dict):
105 # Some sanity checks for a field dictionary returned by OVER / XOVER
106 self.assertIsInstance(art_dict, dict)
107 # NNTP has 7 mandatory fields
108 self.assertGreaterEqual(art_dict.keys(),
109 {"subject", "from", "date", "message-id",
110 "references", ":bytes", ":lines"}
111 )
112 for v in art_dict.values():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000113 self.assertIsInstance(v, (str, type(None)))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000114
115 def test_xover(self):
116 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000117 resp, lines = self.server.xover(last - 5, last)
118 if len(lines) == 0:
119 self.skipTest("no articles retrieved")
120 # The 'last' article is not necessarily part of the output (cancelled?)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000121 art_num, art_dict = lines[0]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000122 self.assertGreaterEqual(art_num, last - 5)
123 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000124 self._check_art_dict(art_dict)
125
126 def test_over(self):
127 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
128 start = last - 10
129 # The "start-" article range form
130 resp, lines = self.server.over((start, None))
131 art_num, art_dict = lines[0]
132 self._check_art_dict(art_dict)
133 # The "start-end" article range form
134 resp, lines = self.server.over((start, last))
135 art_num, art_dict = lines[-1]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000136 # The 'last' article is not necessarily part of the output (cancelled?)
137 self.assertGreaterEqual(art_num, start)
138 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000139 self._check_art_dict(art_dict)
140 # XXX The "message_id" form is unsupported by gmane
141 # 503 Overview by message-ID unsupported
142
143 def test_xhdr(self):
144 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
145 resp, lines = self.server.xhdr('subject', last)
146 for line in lines:
147 self.assertEqual(str, type(line[1]))
148
149 def check_article_resp(self, resp, article, art_num=None):
150 self.assertIsInstance(article, nntplib.ArticleInfo)
151 if art_num is not None:
152 self.assertEqual(article.number, art_num)
153 for line in article.lines:
154 self.assertIsInstance(line, bytes)
155 # XXX this could exceptionally happen...
156 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
157
158 def test_article_head_body(self):
159 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000160 # Try to find an available article
161 for art_num in (last, first, last - 1):
162 try:
163 resp, head = self.server.head(art_num)
164 except nntplib.NNTPTemporaryError as e:
165 if not e.response.startswith("423 "):
166 raise
167 # "423 No such article" => choose another one
168 continue
169 break
170 else:
171 self.skipTest("could not find a suitable article number")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000172 self.assertTrue(resp.startswith("221 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000173 self.check_article_resp(resp, head, art_num)
174 resp, body = self.server.body(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000175 self.assertTrue(resp.startswith("222 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000176 self.check_article_resp(resp, body, art_num)
177 resp, article = self.server.article(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000178 self.assertTrue(resp.startswith("220 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000179 self.check_article_resp(resp, article, art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000180 self.assertEqual(article.lines, head.lines + [b''] + body.lines)
181
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000182 def test_capabilities(self):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000183 # The server under test implements NNTP version 2 and has a
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000184 # couple of well-known capabilities. Just sanity check that we
185 # got them.
186 def _check_caps(caps):
187 caps_list = caps['LIST']
188 self.assertIsInstance(caps_list, (list, tuple))
189 self.assertIn('OVERVIEW.FMT', caps_list)
190 self.assertGreaterEqual(self.server.nntp_version, 2)
191 _check_caps(self.server.getcapabilities())
192 # This re-emits the command
193 resp, caps = self.server.capabilities()
194 _check_caps(caps)
195
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000196 if _have_ssl:
197 def test_starttls(self):
198 file = self.server.file
199 sock = self.server.sock
200 try:
201 self.server.starttls()
202 except nntplib.NNTPPermanentError:
203 self.skipTest("STARTTLS not supported by server.")
204 else:
205 # Check that the socket and internal pseudo-file really were
206 # changed.
207 self.assertNotEqual(file, self.server.file)
208 self.assertNotEqual(sock, self.server.sock)
209 # Check that the new socket really is an SSL one
210 self.assertIsInstance(self.server.sock, ssl.SSLSocket)
211 # Check that trying starttls when it's already active fails.
212 self.assertRaises(ValueError, self.server.starttls)
213
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000214 def test_zlogin(self):
215 # This test must be the penultimate because further commands will be
216 # refused.
217 baduser = "notarealuser"
218 badpw = "notarealpassword"
219 # Check that bogus credentials cause failure
220 self.assertRaises(nntplib.NNTPError, self.server.login,
221 user=baduser, password=badpw, usenetrc=False)
222 # FIXME: We should check that correct credentials succeed, but that
223 # would require valid details for some server somewhere to be in the
224 # test suite, I think. Gmane is anonymous, at least as used for the
225 # other tests.
226
227 def test_zzquit(self):
228 # This test must be called last, hence the name
229 cls = type(self)
Antoine Pitrou3bce11c2010-11-21 17:14:19 +0000230 try:
231 self.server.quit()
232 finally:
233 cls.server = None
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000234
Antoine Pitroude609182010-11-18 17:29:23 +0000235 @classmethod
236 def wrap_methods(cls):
237 # Wrap all methods in a transient_internet() exception catcher
238 # XXX put a generic version in test.support?
239 def wrap_meth(meth):
240 @functools.wraps(meth)
241 def wrapped(self):
242 with support.transient_internet(self.NNTP_HOST):
243 meth(self)
244 return wrapped
245 for name in dir(cls):
246 if not name.startswith('test_'):
247 continue
248 meth = getattr(cls, name)
249 if not isinstance(meth, collections.Callable):
250 continue
251 # Need to use a closure so that meth remains bound to its current
252 # value
253 setattr(cls, name, wrap_meth(meth))
254
255NetworkedNNTPTestsMixin.wrap_methods()
256
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000257
258class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
259 # This server supports STARTTLS (gmane doesn't)
260 NNTP_HOST = 'news.trigofacile.com'
261 GROUP_NAME = 'fr.comp.lang.python'
262 GROUP_PAT = 'fr.comp.lang.*'
263
Antoine Pitroude609182010-11-18 17:29:23 +0000264 NNTP_CLASS = NNTP
265
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000266 @classmethod
267 def setUpClass(cls):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000268 support.requires("network")
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000269 with support.transient_internet(cls.NNTP_HOST):
Antoine Pitroude609182010-11-18 17:29:23 +0000270 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, timeout=TIMEOUT, usenetrc=False)
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000271
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000272 @classmethod
273 def tearDownClass(cls):
274 if cls.server is not None:
275 cls.server.quit()
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000276
277
278if _have_ssl:
Antoine Pitroude609182010-11-18 17:29:23 +0000279 class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000280
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000281 # Technical limits for this public NNTP server (see http://www.aioe.org):
282 # "Only two concurrent connections per IP address are allowed and
283 # 400 connections per day are accepted from each IP address."
284
285 NNTP_HOST = 'nntp.aioe.org'
286 GROUP_NAME = 'comp.lang.python'
287 GROUP_PAT = 'comp.lang.*'
288
Antoine Pitroude609182010-11-18 17:29:23 +0000289 NNTP_CLASS = nntplib.NNTP_SSL
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000290
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000291 # Disabled as it produces too much data
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000292 test_list = None
293
294 # Disabled as the connection will already be encrypted.
295 test_starttls = None
296
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000297
298#
299# Non-networked tests using a local server (or something mocking it).
300#
301
302class _NNTPServerIO(io.RawIOBase):
303 """A raw IO object allowing NNTP commands to be received and processed
304 by a handler. The handler can push responses which can then be read
305 from the IO object."""
306
307 def __init__(self, handler):
308 io.RawIOBase.__init__(self)
309 # The channel from the client
310 self.c2s = io.BytesIO()
311 # The channel to the client
312 self.s2c = io.BytesIO()
313 self.handler = handler
314 self.handler.start(self.c2s.readline, self.push_data)
315
316 def readable(self):
317 return True
318
319 def writable(self):
320 return True
321
322 def push_data(self, data):
323 """Push (buffer) some data to send to the client."""
324 pos = self.s2c.tell()
325 self.s2c.seek(0, 2)
326 self.s2c.write(data)
327 self.s2c.seek(pos)
328
329 def write(self, b):
330 """The client sends us some data"""
331 pos = self.c2s.tell()
332 self.c2s.write(b)
333 self.c2s.seek(pos)
334 self.handler.process_pending()
335 return len(b)
336
337 def readinto(self, buf):
338 """The client wants to read a response"""
339 self.handler.process_pending()
340 b = self.s2c.read(len(buf))
341 n = len(b)
342 buf[:n] = b
343 return n
344
345
346class MockedNNTPTestsMixin:
347 # Override in derived classes
348 handler_class = None
349
350 def setUp(self):
351 super().setUp()
352 self.make_server()
353
354 def tearDown(self):
355 super().tearDown()
356 del self.server
357
358 def make_server(self, *args, **kwargs):
359 self.handler = self.handler_class()
360 self.sio = _NNTPServerIO(self.handler)
361 # Using BufferedRWPair instead of BufferedRandom ensures the file
362 # isn't seekable.
363 file = io.BufferedRWPair(self.sio, self.sio)
Antoine Pitroua5785b12010-09-29 16:19:50 +0000364 self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000365 return self.server
366
367
368class NNTPv1Handler:
369 """A handler for RFC 977"""
370
371 welcome = "200 NNTP mock server"
372
373 def start(self, readline, push_data):
374 self.in_body = False
375 self.allow_posting = True
376 self._readline = readline
377 self._push_data = push_data
378 # Our welcome
379 self.handle_welcome()
380
381 def _decode(self, data):
382 return str(data, "utf-8", "surrogateescape")
383
384 def process_pending(self):
385 if self.in_body:
386 while True:
387 line = self._readline()
388 if not line:
389 return
390 self.body.append(line)
391 if line == b".\r\n":
392 break
393 try:
394 meth, tokens = self.body_callback
395 meth(*tokens, body=self.body)
396 finally:
397 self.body_callback = None
398 self.body = None
399 self.in_body = False
400 while True:
401 line = self._decode(self._readline())
402 if not line:
403 return
404 if not line.endswith("\r\n"):
405 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
406 line = line[:-2]
407 cmd, *tokens = line.split()
408 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
409 meth = getattr(self, "handle_" + cmd.upper(), None)
410 if meth is None:
411 self.handle_unknown()
412 else:
413 try:
414 meth(*tokens)
415 except Exception as e:
416 raise ValueError("command failed: {!r}".format(line)) from e
417 else:
418 if self.in_body:
419 self.body_callback = meth, tokens
420 self.body = []
421
422 def expect_body(self):
423 """Flag that the client is expected to post a request body"""
424 self.in_body = True
425
426 def push_data(self, data):
427 """Push some binary data"""
428 self._push_data(data)
429
430 def push_lit(self, lit):
431 """Push a string literal"""
432 lit = textwrap.dedent(lit)
433 lit = "\r\n".join(lit.splitlines()) + "\r\n"
434 lit = lit.encode('utf-8')
435 self.push_data(lit)
436
437 def handle_unknown(self):
438 self.push_lit("500 What?")
439
440 def handle_welcome(self):
441 self.push_lit(self.welcome)
442
443 def handle_QUIT(self):
444 self.push_lit("205 Bye!")
445
446 def handle_DATE(self):
447 self.push_lit("111 20100914001155")
448
449 def handle_GROUP(self, group):
450 if group == "fr.comp.lang.python":
451 self.push_lit("211 486 761 1265 fr.comp.lang.python")
452 else:
453 self.push_lit("411 No such group {}".format(group))
454
455 def handle_HELP(self):
456 self.push_lit("""\
457 100 Legal commands
458 authinfo user Name|pass Password|generic <prog> <args>
459 date
460 help
461 Report problems to <root@example.org>
462 .""")
463
464 def handle_STAT(self, message_spec=None):
465 if message_spec is None:
466 self.push_lit("412 No newsgroup selected")
467 elif message_spec == "3000234":
468 self.push_lit("223 3000234 <45223423@example.com>")
469 elif message_spec == "<45223423@example.com>":
470 self.push_lit("223 0 <45223423@example.com>")
471 else:
472 self.push_lit("430 No Such Article Found")
473
474 def handle_NEXT(self):
475 self.push_lit("223 3000237 <668929@example.org> retrieved")
476
477 def handle_LAST(self):
478 self.push_lit("223 3000234 <45223423@example.com> retrieved")
479
480 def handle_LIST(self, action=None, param=None):
481 if action is None:
482 self.push_lit("""\
483 215 Newsgroups in form "group high low flags".
484 comp.lang.python 0000052340 0000002828 y
485 comp.lang.python.announce 0000001153 0000000993 m
486 free.it.comp.lang.python 0000000002 0000000002 y
487 fr.comp.lang.python 0000001254 0000000760 y
488 free.it.comp.lang.python.learner 0000000000 0000000001 y
489 tw.bbs.comp.lang.python 0000000304 0000000304 y
490 .""")
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000491 elif action == "ACTIVE":
492 if param == "*distutils*":
493 self.push_lit("""\
494 215 Newsgroups in form "group high low flags"
495 gmane.comp.python.distutils.devel 0000014104 0000000001 m
496 gmane.comp.python.distutils.cvs 0000000000 0000000001 m
497 .""")
498 else:
499 self.push_lit("""\
500 215 Newsgroups in form "group high low flags"
501 .""")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000502 elif action == "OVERVIEW.FMT":
503 self.push_lit("""\
504 215 Order of fields in overview database.
505 Subject:
506 From:
507 Date:
508 Message-ID:
509 References:
510 Bytes:
511 Lines:
512 Xref:full
513 .""")
514 elif action == "NEWSGROUPS":
515 assert param is not None
516 if param == "comp.lang.python":
517 self.push_lit("""\
518 215 Descriptions in form "group description".
519 comp.lang.python\tThe Python computer language.
520 .""")
521 elif param == "comp.lang.python*":
522 self.push_lit("""\
523 215 Descriptions in form "group description".
524 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
525 comp.lang.python\tThe Python computer language.
526 .""")
527 else:
528 self.push_lit("""\
529 215 Descriptions in form "group description".
530 .""")
531 else:
532 self.push_lit('501 Unknown LIST keyword')
533
534 def handle_NEWNEWS(self, group, date_str, time_str):
535 # We hard code different return messages depending on passed
536 # argument and date syntax.
537 if (group == "comp.lang.python" and date_str == "20100913"
538 and time_str == "082004"):
539 # Date was passed in RFC 3977 format (NNTP "v2")
540 self.push_lit("""\
541 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
542 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
543 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
544 .""")
545 elif (group == "comp.lang.python" and date_str == "100913"
546 and time_str == "082004"):
547 # Date was passed in RFC 977 format (NNTP "v1")
548 self.push_lit("""\
549 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
550 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
551 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
552 .""")
553 else:
554 self.push_lit("""\
555 230 An empty list of newsarticles follows
556 .""")
557 # (Note for experiments: many servers disable NEWNEWS.
558 # As of this writing, sicinfo3.epfl.ch doesn't.)
559
560 def handle_XOVER(self, message_spec):
561 if message_spec == "57-59":
562 self.push_lit(
563 "224 Overview information for 57-58 follows\n"
564 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
565 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
566 "\tSat, 19 Jun 2010 18:04:08 -0400"
567 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
568 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
569 "\tXref: news.gmane.org gmane.comp.python.authors:57"
570 "\n"
571 "58\tLooking for a few good bloggers"
572 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
573 "\tThu, 22 Jul 2010 09:14:14 -0400"
574 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
575 "\t\t6683\t16"
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000576 "\t"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000577 "\n"
578 # An UTF-8 overview line from fr.comp.lang.python
579 "59\tRe: Message d'erreur incompréhensible (par moi)"
580 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
581 "\tWed, 15 Sep 2010 18:09:15 +0200"
582 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
583 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
584 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
585 "\n"
586 ".\n")
587 else:
588 self.push_lit("""\
589 224 No articles
590 .""")
591
592 def handle_POST(self, *, body=None):
593 if body is None:
594 if self.allow_posting:
595 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
596 self.expect_body()
597 else:
598 self.push_lit("440 Posting not permitted")
599 else:
600 assert self.allow_posting
601 self.push_lit("240 Article received OK")
602 self.posted_body = body
603
604 def handle_IHAVE(self, message_id, *, body=None):
605 if body is None:
606 if (self.allow_posting and
607 message_id == "<i.am.an.article.you.will.want@example.com>"):
608 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
609 self.expect_body()
610 else:
611 self.push_lit("435 Article not wanted")
612 else:
613 assert self.allow_posting
614 self.push_lit("235 Article transferred OK")
615 self.posted_body = body
616
617 sample_head = """\
618 From: "Demo User" <nobody@example.net>
619 Subject: I am just a test article
620 Content-Type: text/plain; charset=UTF-8; format=flowed
621 Message-ID: <i.am.an.article.you.will.want@example.com>"""
622
623 sample_body = """\
624 This is just a test article.
625 ..Here is a dot-starting line.
626
627 -- Signed by Andr\xe9."""
628
629 sample_article = sample_head + "\n\n" + sample_body
630
631 def handle_ARTICLE(self, message_spec=None):
632 if message_spec is None:
633 self.push_lit("220 3000237 <45223423@example.com>")
634 elif message_spec == "<45223423@example.com>":
635 self.push_lit("220 0 <45223423@example.com>")
636 elif message_spec == "3000234":
637 self.push_lit("220 3000234 <45223423@example.com>")
638 else:
639 self.push_lit("430 No Such Article Found")
640 return
641 self.push_lit(self.sample_article)
642 self.push_lit(".")
643
644 def handle_HEAD(self, message_spec=None):
645 if message_spec is None:
646 self.push_lit("221 3000237 <45223423@example.com>")
647 elif message_spec == "<45223423@example.com>":
648 self.push_lit("221 0 <45223423@example.com>")
649 elif message_spec == "3000234":
650 self.push_lit("221 3000234 <45223423@example.com>")
651 else:
652 self.push_lit("430 No Such Article Found")
653 return
654 self.push_lit(self.sample_head)
655 self.push_lit(".")
656
657 def handle_BODY(self, message_spec=None):
658 if message_spec is None:
659 self.push_lit("222 3000237 <45223423@example.com>")
660 elif message_spec == "<45223423@example.com>":
661 self.push_lit("222 0 <45223423@example.com>")
662 elif message_spec == "3000234":
663 self.push_lit("222 3000234 <45223423@example.com>")
664 else:
665 self.push_lit("430 No Such Article Found")
666 return
667 self.push_lit(self.sample_body)
668 self.push_lit(".")
669
670
671class NNTPv2Handler(NNTPv1Handler):
672 """A handler for RFC 3977 (NNTP "v2")"""
673
674 def handle_CAPABILITIES(self):
675 self.push_lit("""\
676 101 Capability list:
Antoine Pitrouf80b3f72010-11-02 22:31:52 +0000677 VERSION 2 3
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000678 IMPLEMENTATION INN 2.5.1
679 AUTHINFO USER
680 HDR
681 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
682 OVER
683 POST
684 READER
685 .""")
686
687 def handle_OVER(self, message_spec=None):
688 return self.handle_XOVER(message_spec)
689
690
691class NNTPv1v2TestsMixin:
692
693 def setUp(self):
694 super().setUp()
695
696 def test_welcome(self):
697 self.assertEqual(self.server.welcome, self.handler.welcome)
698
699 def test_date(self):
700 resp, date = self.server.date()
701 self.assertEqual(resp, "111 20100914001155")
702 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
703
704 def test_quit(self):
705 self.assertFalse(self.sio.closed)
706 resp = self.server.quit()
707 self.assertEqual(resp, "205 Bye!")
708 self.assertTrue(self.sio.closed)
709
710 def test_help(self):
711 resp, help = self.server.help()
712 self.assertEqual(resp, "100 Legal commands")
713 self.assertEqual(help, [
714 ' authinfo user Name|pass Password|generic <prog> <args>',
715 ' date',
716 ' help',
717 'Report problems to <root@example.org>',
718 ])
719
720 def test_list(self):
721 resp, groups = self.server.list()
722 self.assertEqual(len(groups), 6)
723 g = groups[1]
724 self.assertEqual(g,
725 GroupInfo("comp.lang.python.announce", "0000001153",
726 "0000000993", "m"))
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000727 resp, groups = self.server.list("*distutils*")
728 self.assertEqual(len(groups), 2)
729 g = groups[0]
730 self.assertEqual(g,
731 GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
732 "0000000001", "m"))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000733
734 def test_stat(self):
735 resp, art_num, message_id = self.server.stat(3000234)
736 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
737 self.assertEqual(art_num, 3000234)
738 self.assertEqual(message_id, "<45223423@example.com>")
739 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
740 self.assertEqual(resp, "223 0 <45223423@example.com>")
741 self.assertEqual(art_num, 0)
742 self.assertEqual(message_id, "<45223423@example.com>")
743 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
744 self.server.stat("<non.existent.id>")
745 self.assertEqual(cm.exception.response, "430 No Such Article Found")
746 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
747 self.server.stat()
748 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
749
750 def test_next(self):
751 resp, art_num, message_id = self.server.next()
752 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
753 self.assertEqual(art_num, 3000237)
754 self.assertEqual(message_id, "<668929@example.org>")
755
756 def test_last(self):
757 resp, art_num, message_id = self.server.last()
758 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
759 self.assertEqual(art_num, 3000234)
760 self.assertEqual(message_id, "<45223423@example.com>")
761
762 def test_description(self):
763 desc = self.server.description("comp.lang.python")
764 self.assertEqual(desc, "The Python computer language.")
765 desc = self.server.description("comp.lang.pythonx")
766 self.assertEqual(desc, "")
767
768 def test_descriptions(self):
769 resp, groups = self.server.descriptions("comp.lang.python")
770 self.assertEqual(resp, '215 Descriptions in form "group description".')
771 self.assertEqual(groups, {
772 "comp.lang.python": "The Python computer language.",
773 })
774 resp, groups = self.server.descriptions("comp.lang.python*")
775 self.assertEqual(groups, {
776 "comp.lang.python": "The Python computer language.",
777 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
778 })
779 resp, groups = self.server.descriptions("comp.lang.pythonx")
780 self.assertEqual(groups, {})
781
782 def test_group(self):
783 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
784 self.assertTrue(resp.startswith("211 "), resp)
785 self.assertEqual(first, 761)
786 self.assertEqual(last, 1265)
787 self.assertEqual(count, 486)
788 self.assertEqual(group, "fr.comp.lang.python")
789 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
790 self.server.group("comp.lang.python.devel")
791 exc = cm.exception
792 self.assertTrue(exc.response.startswith("411 No such group"),
793 exc.response)
794
795 def test_newnews(self):
796 # NEWNEWS comp.lang.python [20]100913 082004
797 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
798 resp, ids = self.server.newnews("comp.lang.python", dt)
799 expected = (
800 "230 list of newsarticles (NNTP v{0}) "
801 "created after Mon Sep 13 08:20:04 2010 follows"
802 ).format(self.nntp_version)
803 self.assertEqual(resp, expected)
804 self.assertEqual(ids, [
805 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
806 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
807 ])
808 # NEWNEWS fr.comp.lang.python [20]100913 082004
809 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
810 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
811 self.assertEqual(resp, "230 An empty list of newsarticles follows")
812 self.assertEqual(ids, [])
813
814 def _check_article_body(self, lines):
815 self.assertEqual(len(lines), 4)
816 self.assertEqual(lines[-1].decode('utf8'), "-- Signed by André.")
817 self.assertEqual(lines[-2], b"")
818 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
819 self.assertEqual(lines[-4], b"This is just a test article.")
820
821 def _check_article_head(self, lines):
822 self.assertEqual(len(lines), 4)
823 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
824 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
825
826 def _check_article_data(self, lines):
827 self.assertEqual(len(lines), 9)
828 self._check_article_head(lines[:4])
829 self._check_article_body(lines[-4:])
830 self.assertEqual(lines[4], b"")
831
832 def test_article(self):
833 # ARTICLE
834 resp, info = self.server.article()
835 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
836 art_num, message_id, lines = info
837 self.assertEqual(art_num, 3000237)
838 self.assertEqual(message_id, "<45223423@example.com>")
839 self._check_article_data(lines)
840 # ARTICLE num
841 resp, info = self.server.article(3000234)
842 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
843 art_num, message_id, lines = info
844 self.assertEqual(art_num, 3000234)
845 self.assertEqual(message_id, "<45223423@example.com>")
846 self._check_article_data(lines)
847 # ARTICLE id
848 resp, info = self.server.article("<45223423@example.com>")
849 self.assertEqual(resp, "220 0 <45223423@example.com>")
850 art_num, message_id, lines = info
851 self.assertEqual(art_num, 0)
852 self.assertEqual(message_id, "<45223423@example.com>")
853 self._check_article_data(lines)
854 # Non-existent id
855 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
856 self.server.article("<non-existent@example.com>")
857 self.assertEqual(cm.exception.response, "430 No Such Article Found")
858
859 def test_article_file(self):
860 # With a "file" argument
861 f = io.BytesIO()
862 resp, info = self.server.article(file=f)
863 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
864 art_num, message_id, lines = info
865 self.assertEqual(art_num, 3000237)
866 self.assertEqual(message_id, "<45223423@example.com>")
867 self.assertEqual(lines, [])
868 data = f.getvalue()
869 self.assertTrue(data.startswith(
870 b'From: "Demo User" <nobody@example.net>\r\n'
871 b'Subject: I am just a test article\r\n'
872 ), ascii(data))
873 self.assertTrue(data.endswith(
874 b'This is just a test article.\r\n'
875 b'.Here is a dot-starting line.\r\n'
876 b'\r\n'
877 b'-- Signed by Andr\xc3\xa9.\r\n'
878 ), ascii(data))
879
880 def test_head(self):
881 # HEAD
882 resp, info = self.server.head()
883 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
884 art_num, message_id, lines = info
885 self.assertEqual(art_num, 3000237)
886 self.assertEqual(message_id, "<45223423@example.com>")
887 self._check_article_head(lines)
888 # HEAD num
889 resp, info = self.server.head(3000234)
890 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
891 art_num, message_id, lines = info
892 self.assertEqual(art_num, 3000234)
893 self.assertEqual(message_id, "<45223423@example.com>")
894 self._check_article_head(lines)
895 # HEAD id
896 resp, info = self.server.head("<45223423@example.com>")
897 self.assertEqual(resp, "221 0 <45223423@example.com>")
898 art_num, message_id, lines = info
899 self.assertEqual(art_num, 0)
900 self.assertEqual(message_id, "<45223423@example.com>")
901 self._check_article_head(lines)
902 # Non-existent id
903 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
904 self.server.head("<non-existent@example.com>")
905 self.assertEqual(cm.exception.response, "430 No Such Article Found")
906
907 def test_body(self):
908 # BODY
909 resp, info = self.server.body()
910 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
911 art_num, message_id, lines = info
912 self.assertEqual(art_num, 3000237)
913 self.assertEqual(message_id, "<45223423@example.com>")
914 self._check_article_body(lines)
915 # BODY num
916 resp, info = self.server.body(3000234)
917 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
918 art_num, message_id, lines = info
919 self.assertEqual(art_num, 3000234)
920 self.assertEqual(message_id, "<45223423@example.com>")
921 self._check_article_body(lines)
922 # BODY id
923 resp, info = self.server.body("<45223423@example.com>")
924 self.assertEqual(resp, "222 0 <45223423@example.com>")
925 art_num, message_id, lines = info
926 self.assertEqual(art_num, 0)
927 self.assertEqual(message_id, "<45223423@example.com>")
928 self._check_article_body(lines)
929 # Non-existent id
930 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
931 self.server.body("<non-existent@example.com>")
932 self.assertEqual(cm.exception.response, "430 No Such Article Found")
933
934 def check_over_xover_resp(self, resp, overviews):
935 self.assertTrue(resp.startswith("224 "), resp)
936 self.assertEqual(len(overviews), 3)
937 art_num, over = overviews[0]
938 self.assertEqual(art_num, 57)
939 self.assertEqual(over, {
940 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
941 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
942 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
943 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
944 "references": "<hvalf7$ort$1@dough.gmane.org>",
945 ":bytes": "7103",
946 ":lines": "16",
947 "xref": "news.gmane.org gmane.comp.python.authors:57"
948 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000949 art_num, over = overviews[1]
950 self.assertEqual(over["xref"], None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000951 art_num, over = overviews[2]
952 self.assertEqual(over["subject"],
953 "Re: Message d'erreur incompréhensible (par moi)")
954
955 def test_xover(self):
956 resp, overviews = self.server.xover(57, 59)
957 self.check_over_xover_resp(resp, overviews)
958
959 def test_over(self):
960 # In NNTP "v1", this will fallback on XOVER
961 resp, overviews = self.server.over((57, 59))
962 self.check_over_xover_resp(resp, overviews)
963
964 sample_post = (
965 b'From: "Demo User" <nobody@example.net>\r\n'
966 b'Subject: I am just a test article\r\n'
967 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
968 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
969 b'\r\n'
970 b'This is just a test article.\r\n'
971 b'.Here is a dot-starting line.\r\n'
972 b'\r\n'
973 b'-- Signed by Andr\xc3\xa9.\r\n'
974 )
975
976 def _check_posted_body(self):
977 # Check the raw body as received by the server
978 lines = self.handler.posted_body
979 # One additional line for the "." terminator
980 self.assertEqual(len(lines), 10)
981 self.assertEqual(lines[-1], b'.\r\n')
982 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
983 self.assertEqual(lines[-3], b'\r\n')
984 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
985 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
986
987 def _check_post_ihave_sub(self, func, *args, file_factory):
988 # First the prepared post with CRLF endings
989 post = self.sample_post
990 func_args = args + (file_factory(post),)
991 self.handler.posted_body = None
992 resp = func(*func_args)
993 self._check_posted_body()
994 # Then the same post with "normal" line endings - they should be
995 # converted by NNTP.post and NNTP.ihave.
996 post = self.sample_post.replace(b"\r\n", b"\n")
997 func_args = args + (file_factory(post),)
998 self.handler.posted_body = None
999 resp = func(*func_args)
1000 self._check_posted_body()
1001 return resp
1002
1003 def check_post_ihave(self, func, success_resp, *args):
1004 # With a bytes object
1005 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1006 self.assertEqual(resp, success_resp)
1007 # With a bytearray object
1008 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1009 self.assertEqual(resp, success_resp)
1010 # With a file object
1011 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1012 self.assertEqual(resp, success_resp)
1013 # With an iterable of terminated lines
1014 def iterlines(b):
1015 return iter(b.splitlines(True))
1016 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1017 self.assertEqual(resp, success_resp)
1018 # With an iterable of non-terminated lines
1019 def iterlines(b):
1020 return iter(b.splitlines(False))
1021 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1022 self.assertEqual(resp, success_resp)
1023
1024 def test_post(self):
1025 self.check_post_ihave(self.server.post, "240 Article received OK")
1026 self.handler.allow_posting = False
1027 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1028 self.server.post(self.sample_post)
1029 self.assertEqual(cm.exception.response,
1030 "440 Posting not permitted")
1031
1032 def test_ihave(self):
1033 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1034 "<i.am.an.article.you.will.want@example.com>")
1035 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1036 self.server.ihave("<another.message.id>", self.sample_post)
1037 self.assertEqual(cm.exception.response,
1038 "435 Article not wanted")
1039
1040
1041class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1042 """Tests an NNTP v1 server (no capabilities)."""
1043
1044 nntp_version = 1
1045 handler_class = NNTPv1Handler
1046
1047 def test_caps(self):
1048 caps = self.server.getcapabilities()
1049 self.assertEqual(caps, {})
1050 self.assertEqual(self.server.nntp_version, 1)
Antoine Pitroua0781152010-11-05 19:16:37 +00001051 self.assertEqual(self.server.nntp_implementation, None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001052
1053
1054class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1055 """Tests an NNTP v2 server (with capabilities)."""
1056
1057 nntp_version = 2
1058 handler_class = NNTPv2Handler
1059
1060 def test_caps(self):
1061 caps = self.server.getcapabilities()
1062 self.assertEqual(caps, {
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001063 'VERSION': ['2', '3'],
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001064 'IMPLEMENTATION': ['INN', '2.5.1'],
1065 'AUTHINFO': ['USER'],
1066 'HDR': [],
1067 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1068 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1069 'OVER': [],
1070 'POST': [],
1071 'READER': [],
1072 })
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001073 self.assertEqual(self.server.nntp_version, 3)
Antoine Pitroua0781152010-11-05 19:16:37 +00001074 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001075
1076
1077class MiscTests(unittest.TestCase):
1078
1079 def test_decode_header(self):
1080 def gives(a, b):
1081 self.assertEqual(nntplib.decode_header(a), b)
1082 gives("" , "")
1083 gives("a plain header", "a plain header")
1084 gives(" with extra spaces ", " with extra spaces ")
1085 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
1086 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1087 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1088 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1089 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1090 "Re: problème de matrice")
1091 # A natively utf-8 header (found in the real world!)
1092 gives("Re: Message d'erreur incompréhensible (par moi)",
1093 "Re: Message d'erreur incompréhensible (par moi)")
1094
1095 def test_parse_overview_fmt(self):
1096 # The minimal (default) response
1097 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1098 "References:", ":bytes", ":lines"]
1099 self.assertEqual(nntplib._parse_overview_fmt(lines),
1100 ["subject", "from", "date", "message-id", "references",
1101 ":bytes", ":lines"])
1102 # The minimal response using alternative names
1103 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1104 "References:", "Bytes:", "Lines:"]
1105 self.assertEqual(nntplib._parse_overview_fmt(lines),
1106 ["subject", "from", "date", "message-id", "references",
1107 ":bytes", ":lines"])
1108 # Variations in casing
1109 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1110 "References:", "BYTES:", "Lines:"]
1111 self.assertEqual(nntplib._parse_overview_fmt(lines),
1112 ["subject", "from", "date", "message-id", "references",
1113 ":bytes", ":lines"])
1114 # First example from RFC 3977
1115 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1116 "References:", ":bytes", ":lines", "Xref:full",
1117 "Distribution:full"]
1118 self.assertEqual(nntplib._parse_overview_fmt(lines),
1119 ["subject", "from", "date", "message-id", "references",
1120 ":bytes", ":lines", "xref", "distribution"])
1121 # Second example from RFC 3977
1122 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1123 "References:", "Bytes:", "Lines:", "Xref:FULL",
1124 "Distribution:FULL"]
1125 self.assertEqual(nntplib._parse_overview_fmt(lines),
1126 ["subject", "from", "date", "message-id", "references",
1127 ":bytes", ":lines", "xref", "distribution"])
1128 # A classic response from INN
1129 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1130 "References:", "Bytes:", "Lines:", "Xref:full"]
1131 self.assertEqual(nntplib._parse_overview_fmt(lines),
1132 ["subject", "from", "date", "message-id", "references",
1133 ":bytes", ":lines", "xref"])
1134
1135 def test_parse_overview(self):
1136 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1137 # First example from RFC 3977
1138 lines = [
1139 '3000234\tI am just a test article\t"Demo User" '
1140 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1141 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1142 '17\tXref: news.example.com misc.test:3000363',
1143 ]
1144 overview = nntplib._parse_overview(lines, fmt)
1145 (art_num, fields), = overview
1146 self.assertEqual(art_num, 3000234)
1147 self.assertEqual(fields, {
1148 'subject': 'I am just a test article',
1149 'from': '"Demo User" <nobody@example.com>',
1150 'date': '6 Oct 1998 04:38:40 -0500',
1151 'message-id': '<45223423@example.com>',
1152 'references': '<45454@example.net>',
1153 ':bytes': '1234',
1154 ':lines': '17',
1155 'xref': 'news.example.com misc.test:3000363',
1156 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001157 # Second example; here the "Xref" field is totally absent (including
1158 # the header name) and comes out as None
1159 lines = [
1160 '3000234\tI am just a test article\t"Demo User" '
1161 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1162 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1163 '17\t\t',
1164 ]
1165 overview = nntplib._parse_overview(lines, fmt)
1166 (art_num, fields), = overview
1167 self.assertEqual(fields['xref'], None)
1168 # Third example; the "Xref" is an empty string, while "references"
1169 # is a single space.
1170 lines = [
1171 '3000234\tI am just a test article\t"Demo User" '
1172 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1173 '<45223423@example.com>\t \t1234\t'
1174 '17\tXref: \t',
1175 ]
1176 overview = nntplib._parse_overview(lines, fmt)
1177 (art_num, fields), = overview
1178 self.assertEqual(fields['references'], ' ')
1179 self.assertEqual(fields['xref'], '')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001180
1181 def test_parse_datetime(self):
1182 def gives(a, b, *c):
1183 self.assertEqual(nntplib._parse_datetime(a, b),
1184 datetime.datetime(*c))
1185 # Output of DATE command
1186 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1187 # Variations
1188 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1189 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1190 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1191
1192 def test_unparse_datetime(self):
1193 # Test non-legacy mode
1194 # 1) with a datetime
1195 def gives(y, M, d, h, m, s, date_str, time_str):
1196 dt = datetime.datetime(y, M, d, h, m, s)
1197 self.assertEqual(nntplib._unparse_datetime(dt),
1198 (date_str, time_str))
1199 self.assertEqual(nntplib._unparse_datetime(dt, False),
1200 (date_str, time_str))
1201 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1202 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1203 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1204 # 2) with a date
1205 def gives(y, M, d, date_str, time_str):
1206 dt = datetime.date(y, M, d)
1207 self.assertEqual(nntplib._unparse_datetime(dt),
1208 (date_str, time_str))
1209 self.assertEqual(nntplib._unparse_datetime(dt, False),
1210 (date_str, time_str))
1211 gives(1999, 6, 23, "19990623", "000000")
1212 gives(2000, 6, 23, "20000623", "000000")
1213 gives(2010, 6, 5, "20100605", "000000")
1214
1215 def test_unparse_datetime_legacy(self):
1216 # Test legacy mode (RFC 977)
1217 # 1) with a datetime
1218 def gives(y, M, d, h, m, s, date_str, time_str):
1219 dt = datetime.datetime(y, M, d, h, m, s)
1220 self.assertEqual(nntplib._unparse_datetime(dt, True),
1221 (date_str, time_str))
1222 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1223 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1224 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1225 # 2) with a date
1226 def gives(y, M, d, date_str, time_str):
1227 dt = datetime.date(y, M, d)
1228 self.assertEqual(nntplib._unparse_datetime(dt, True),
1229 (date_str, time_str))
1230 gives(1999, 6, 23, "990623", "000000")
1231 gives(2000, 6, 23, "000623", "000000")
1232 gives(2010, 6, 5, "100605", "000000")
1233
1234
1235def test_main():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00001236 tests = [MiscTests, NNTPv1Tests, NNTPv2Tests, NetworkedNNTPTests]
1237 if _have_ssl:
1238 tests.append(NetworkedNNTP_SSLTests)
1239 support.run_unittest(*tests)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001240
1241
1242if __name__ == "__main__":
1243 test_main()