blob: 786b5f093bc5c4d8bf2d01d6a2d7a238fd0215d0 [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
2import datetime
3import textwrap
4import unittest
Antoine Pitroude609182010-11-18 17:29:23 +00005import functools
Antoine Pitrou69ab9512010-09-29 15:03:40 +00006import contextlib
7from test import support
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00008from nntplib import NNTP, GroupInfo, _have_ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +00009import nntplib
Antoine Pitrou1cb121e2010-11-09 18:54:37 +000010if _have_ssl:
11 import ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +000012
13TIMEOUT = 30
14
15# TODO:
16# - test the `file` arg to more commands
17# - test error conditions
Antoine Pitroua5785b12010-09-29 16:19:50 +000018# - test auth and `usenetrc`
Antoine Pitrou69ab9512010-09-29 15:03:40 +000019
20
21class NetworkedNNTPTestsMixin:
22
23 def test_welcome(self):
24 welcome = self.server.getwelcome()
25 self.assertEqual(str, type(welcome))
26
27 def test_help(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000028 resp, lines = self.server.help()
Antoine Pitrou69ab9512010-09-29 15:03:40 +000029 self.assertTrue(resp.startswith("100 "), resp)
Antoine Pitrou08eeada2010-11-04 21:36:15 +000030 for line in lines:
Antoine Pitrou69ab9512010-09-29 15:03:40 +000031 self.assertEqual(str, type(line))
32
33 def test_list(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000034 resp, groups = self.server.list()
35 if len(groups) > 0:
36 self.assertEqual(GroupInfo, type(groups[0]))
37 self.assertEqual(str, type(groups[0].group))
38
39 def test_list_active(self):
40 resp, groups = self.server.list(self.GROUP_PAT)
41 if len(groups) > 0:
42 self.assertEqual(GroupInfo, type(groups[0]))
43 self.assertEqual(str, type(groups[0].group))
Antoine Pitrou69ab9512010-09-29 15:03:40 +000044
45 def test_unknown_command(self):
46 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
47 self.server._shortcmd("XYZZY")
48 resp = cm.exception.response
49 self.assertTrue(resp.startswith("500 "), resp)
50
51 def test_newgroups(self):
52 # gmane gets a constant influx of new groups. In order not to stress
53 # the server too much, we choose a recent date in the past.
54 dt = datetime.date.today() - datetime.timedelta(days=7)
55 resp, groups = self.server.newgroups(dt)
56 if len(groups) > 0:
57 self.assertIsInstance(groups[0], GroupInfo)
58 self.assertIsInstance(groups[0].group, str)
59
60 def test_description(self):
61 def _check_desc(desc):
62 # Sanity checks
63 self.assertIsInstance(desc, str)
64 self.assertNotIn(self.GROUP_NAME, desc)
65 desc = self.server.description(self.GROUP_NAME)
66 _check_desc(desc)
67 # Another sanity check
68 self.assertIn("Python", desc)
69 # With a pattern
70 desc = self.server.description(self.GROUP_PAT)
71 _check_desc(desc)
72 # Shouldn't exist
73 desc = self.server.description("zk.brrtt.baz")
74 self.assertEqual(desc, '')
75
76 def test_descriptions(self):
77 resp, descs = self.server.descriptions(self.GROUP_PAT)
78 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
79 self.assertTrue(
80 resp.startswith("215 ") or resp.startswith("282 "), resp)
81 self.assertIsInstance(descs, dict)
82 desc = descs[self.GROUP_NAME]
83 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
84
85 def test_group(self):
86 result = self.server.group(self.GROUP_NAME)
87 self.assertEqual(5, len(result))
88 resp, count, first, last, group = result
89 self.assertEqual(group, self.GROUP_NAME)
90 self.assertIsInstance(count, int)
91 self.assertIsInstance(first, int)
92 self.assertIsInstance(last, int)
93 self.assertLessEqual(first, last)
94 self.assertTrue(resp.startswith("211 "), resp)
95
96 def test_date(self):
97 resp, date = self.server.date()
98 self.assertIsInstance(date, datetime.datetime)
99 # Sanity check
100 self.assertGreaterEqual(date.year, 1995)
101 self.assertLessEqual(date.year, 2030)
102
103 def _check_art_dict(self, art_dict):
104 # Some sanity checks for a field dictionary returned by OVER / XOVER
105 self.assertIsInstance(art_dict, dict)
106 # NNTP has 7 mandatory fields
107 self.assertGreaterEqual(art_dict.keys(),
108 {"subject", "from", "date", "message-id",
109 "references", ":bytes", ":lines"}
110 )
111 for v in art_dict.values():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000112 self.assertIsInstance(v, (str, type(None)))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000113
114 def test_xover(self):
115 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000116 resp, lines = self.server.xover(last - 5, last)
117 if len(lines) == 0:
118 self.skipTest("no articles retrieved")
119 # The 'last' article is not necessarily part of the output (cancelled?)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000120 art_num, art_dict = lines[0]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000121 self.assertGreaterEqual(art_num, last - 5)
122 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000123 self._check_art_dict(art_dict)
124
125 def test_over(self):
126 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
127 start = last - 10
128 # The "start-" article range form
129 resp, lines = self.server.over((start, None))
130 art_num, art_dict = lines[0]
131 self._check_art_dict(art_dict)
132 # The "start-end" article range form
133 resp, lines = self.server.over((start, last))
134 art_num, art_dict = lines[-1]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000135 # The 'last' article is not necessarily part of the output (cancelled?)
136 self.assertGreaterEqual(art_num, start)
137 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000138 self._check_art_dict(art_dict)
139 # XXX The "message_id" form is unsupported by gmane
140 # 503 Overview by message-ID unsupported
141
142 def test_xhdr(self):
143 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
144 resp, lines = self.server.xhdr('subject', last)
145 for line in lines:
146 self.assertEqual(str, type(line[1]))
147
148 def check_article_resp(self, resp, article, art_num=None):
149 self.assertIsInstance(article, nntplib.ArticleInfo)
150 if art_num is not None:
151 self.assertEqual(article.number, art_num)
152 for line in article.lines:
153 self.assertIsInstance(line, bytes)
154 # XXX this could exceptionally happen...
155 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
156
157 def test_article_head_body(self):
158 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000159 # Try to find an available article
160 for art_num in (last, first, last - 1):
161 try:
162 resp, head = self.server.head(art_num)
163 except nntplib.NNTPTemporaryError as e:
164 if not e.response.startswith("423 "):
165 raise
166 # "423 No such article" => choose another one
167 continue
168 break
169 else:
170 self.skipTest("could not find a suitable article number")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000171 self.assertTrue(resp.startswith("221 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000172 self.check_article_resp(resp, head, art_num)
173 resp, body = self.server.body(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000174 self.assertTrue(resp.startswith("222 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000175 self.check_article_resp(resp, body, art_num)
176 resp, article = self.server.article(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000177 self.assertTrue(resp.startswith("220 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000178 self.check_article_resp(resp, article, art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000179 self.assertEqual(article.lines, head.lines + [b''] + body.lines)
180
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000181 def test_capabilities(self):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000182 # The server under test implements NNTP version 2 and has a
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000183 # couple of well-known capabilities. Just sanity check that we
184 # got them.
185 def _check_caps(caps):
186 caps_list = caps['LIST']
187 self.assertIsInstance(caps_list, (list, tuple))
188 self.assertIn('OVERVIEW.FMT', caps_list)
189 self.assertGreaterEqual(self.server.nntp_version, 2)
190 _check_caps(self.server.getcapabilities())
191 # This re-emits the command
192 resp, caps = self.server.capabilities()
193 _check_caps(caps)
194
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000195 if _have_ssl:
196 def test_starttls(self):
197 file = self.server.file
198 sock = self.server.sock
199 try:
200 self.server.starttls()
201 except nntplib.NNTPPermanentError:
202 self.skipTest("STARTTLS not supported by server.")
203 else:
204 # Check that the socket and internal pseudo-file really were
205 # changed.
206 self.assertNotEqual(file, self.server.file)
207 self.assertNotEqual(sock, self.server.sock)
208 # Check that the new socket really is an SSL one
209 self.assertIsInstance(self.server.sock, ssl.SSLSocket)
210 # Check that trying starttls when it's already active fails.
211 self.assertRaises(ValueError, self.server.starttls)
212
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000213 def test_zlogin(self):
214 # This test must be the penultimate because further commands will be
215 # refused.
216 baduser = "notarealuser"
217 badpw = "notarealpassword"
218 # Check that bogus credentials cause failure
219 self.assertRaises(nntplib.NNTPError, self.server.login,
220 user=baduser, password=badpw, usenetrc=False)
221 # FIXME: We should check that correct credentials succeed, but that
222 # would require valid details for some server somewhere to be in the
223 # test suite, I think. Gmane is anonymous, at least as used for the
224 # other tests.
225
226 def test_zzquit(self):
227 # This test must be called last, hence the name
228 cls = type(self)
Antoine Pitrou3bce11c2010-11-21 17:14:19 +0000229 try:
230 self.server.quit()
231 finally:
232 cls.server = None
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000233
Antoine Pitroude609182010-11-18 17:29:23 +0000234 @classmethod
235 def wrap_methods(cls):
236 # Wrap all methods in a transient_internet() exception catcher
237 # XXX put a generic version in test.support?
238 def wrap_meth(meth):
239 @functools.wraps(meth)
240 def wrapped(self):
241 with support.transient_internet(self.NNTP_HOST):
242 meth(self)
243 return wrapped
244 for name in dir(cls):
245 if not name.startswith('test_'):
246 continue
247 meth = getattr(cls, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200248 if not callable(meth):
Antoine Pitroude609182010-11-18 17:29:23 +0000249 continue
250 # Need to use a closure so that meth remains bound to its current
251 # value
252 setattr(cls, name, wrap_meth(meth))
253
254NetworkedNNTPTestsMixin.wrap_methods()
255
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000256
257class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
258 # This server supports STARTTLS (gmane doesn't)
259 NNTP_HOST = 'news.trigofacile.com'
260 GROUP_NAME = 'fr.comp.lang.python'
261 GROUP_PAT = 'fr.comp.lang.*'
262
Antoine Pitroude609182010-11-18 17:29:23 +0000263 NNTP_CLASS = NNTP
264
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000265 @classmethod
266 def setUpClass(cls):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000267 support.requires("network")
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000268 with support.transient_internet(cls.NNTP_HOST):
Antoine Pitroude609182010-11-18 17:29:23 +0000269 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, timeout=TIMEOUT, usenetrc=False)
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000270
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000271 @classmethod
272 def tearDownClass(cls):
273 if cls.server is not None:
274 cls.server.quit()
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000275
276
277if _have_ssl:
Antoine Pitroude609182010-11-18 17:29:23 +0000278 class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000279
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000280 # Technical limits for this public NNTP server (see http://www.aioe.org):
281 # "Only two concurrent connections per IP address are allowed and
282 # 400 connections per day are accepted from each IP address."
283
284 NNTP_HOST = 'nntp.aioe.org'
285 GROUP_NAME = 'comp.lang.python'
286 GROUP_PAT = 'comp.lang.*'
287
Antoine Pitroude609182010-11-18 17:29:23 +0000288 NNTP_CLASS = nntplib.NNTP_SSL
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000289
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000290 # Disabled as it produces too much data
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000291 test_list = None
292
293 # Disabled as the connection will already be encrypted.
294 test_starttls = None
295
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000296
297#
298# Non-networked tests using a local server (or something mocking it).
299#
300
301class _NNTPServerIO(io.RawIOBase):
302 """A raw IO object allowing NNTP commands to be received and processed
303 by a handler. The handler can push responses which can then be read
304 from the IO object."""
305
306 def __init__(self, handler):
307 io.RawIOBase.__init__(self)
308 # The channel from the client
309 self.c2s = io.BytesIO()
310 # The channel to the client
311 self.s2c = io.BytesIO()
312 self.handler = handler
313 self.handler.start(self.c2s.readline, self.push_data)
314
315 def readable(self):
316 return True
317
318 def writable(self):
319 return True
320
321 def push_data(self, data):
322 """Push (buffer) some data to send to the client."""
323 pos = self.s2c.tell()
324 self.s2c.seek(0, 2)
325 self.s2c.write(data)
326 self.s2c.seek(pos)
327
328 def write(self, b):
329 """The client sends us some data"""
330 pos = self.c2s.tell()
331 self.c2s.write(b)
332 self.c2s.seek(pos)
333 self.handler.process_pending()
334 return len(b)
335
336 def readinto(self, buf):
337 """The client wants to read a response"""
338 self.handler.process_pending()
339 b = self.s2c.read(len(buf))
340 n = len(b)
341 buf[:n] = b
342 return n
343
344
345class MockedNNTPTestsMixin:
346 # Override in derived classes
347 handler_class = None
348
349 def setUp(self):
350 super().setUp()
351 self.make_server()
352
353 def tearDown(self):
354 super().tearDown()
355 del self.server
356
357 def make_server(self, *args, **kwargs):
358 self.handler = self.handler_class()
359 self.sio = _NNTPServerIO(self.handler)
360 # Using BufferedRWPair instead of BufferedRandom ensures the file
361 # isn't seekable.
362 file = io.BufferedRWPair(self.sio, self.sio)
Antoine Pitroua5785b12010-09-29 16:19:50 +0000363 self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000364 return self.server
365
366
367class NNTPv1Handler:
368 """A handler for RFC 977"""
369
370 welcome = "200 NNTP mock server"
371
372 def start(self, readline, push_data):
373 self.in_body = False
374 self.allow_posting = True
375 self._readline = readline
376 self._push_data = push_data
377 # Our welcome
378 self.handle_welcome()
379
380 def _decode(self, data):
381 return str(data, "utf-8", "surrogateescape")
382
383 def process_pending(self):
384 if self.in_body:
385 while True:
386 line = self._readline()
387 if not line:
388 return
389 self.body.append(line)
390 if line == b".\r\n":
391 break
392 try:
393 meth, tokens = self.body_callback
394 meth(*tokens, body=self.body)
395 finally:
396 self.body_callback = None
397 self.body = None
398 self.in_body = False
399 while True:
400 line = self._decode(self._readline())
401 if not line:
402 return
403 if not line.endswith("\r\n"):
404 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
405 line = line[:-2]
406 cmd, *tokens = line.split()
407 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
408 meth = getattr(self, "handle_" + cmd.upper(), None)
409 if meth is None:
410 self.handle_unknown()
411 else:
412 try:
413 meth(*tokens)
414 except Exception as e:
415 raise ValueError("command failed: {!r}".format(line)) from e
416 else:
417 if self.in_body:
418 self.body_callback = meth, tokens
419 self.body = []
420
421 def expect_body(self):
422 """Flag that the client is expected to post a request body"""
423 self.in_body = True
424
425 def push_data(self, data):
426 """Push some binary data"""
427 self._push_data(data)
428
429 def push_lit(self, lit):
430 """Push a string literal"""
431 lit = textwrap.dedent(lit)
432 lit = "\r\n".join(lit.splitlines()) + "\r\n"
433 lit = lit.encode('utf-8')
434 self.push_data(lit)
435
436 def handle_unknown(self):
437 self.push_lit("500 What?")
438
439 def handle_welcome(self):
440 self.push_lit(self.welcome)
441
442 def handle_QUIT(self):
443 self.push_lit("205 Bye!")
444
445 def handle_DATE(self):
446 self.push_lit("111 20100914001155")
447
448 def handle_GROUP(self, group):
449 if group == "fr.comp.lang.python":
450 self.push_lit("211 486 761 1265 fr.comp.lang.python")
451 else:
452 self.push_lit("411 No such group {}".format(group))
453
454 def handle_HELP(self):
455 self.push_lit("""\
456 100 Legal commands
457 authinfo user Name|pass Password|generic <prog> <args>
458 date
459 help
460 Report problems to <root@example.org>
461 .""")
462
463 def handle_STAT(self, message_spec=None):
464 if message_spec is None:
465 self.push_lit("412 No newsgroup selected")
466 elif message_spec == "3000234":
467 self.push_lit("223 3000234 <45223423@example.com>")
468 elif message_spec == "<45223423@example.com>":
469 self.push_lit("223 0 <45223423@example.com>")
470 else:
471 self.push_lit("430 No Such Article Found")
472
473 def handle_NEXT(self):
474 self.push_lit("223 3000237 <668929@example.org> retrieved")
475
476 def handle_LAST(self):
477 self.push_lit("223 3000234 <45223423@example.com> retrieved")
478
479 def handle_LIST(self, action=None, param=None):
480 if action is None:
481 self.push_lit("""\
482 215 Newsgroups in form "group high low flags".
483 comp.lang.python 0000052340 0000002828 y
484 comp.lang.python.announce 0000001153 0000000993 m
485 free.it.comp.lang.python 0000000002 0000000002 y
486 fr.comp.lang.python 0000001254 0000000760 y
487 free.it.comp.lang.python.learner 0000000000 0000000001 y
488 tw.bbs.comp.lang.python 0000000304 0000000304 y
489 .""")
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000490 elif action == "ACTIVE":
491 if param == "*distutils*":
492 self.push_lit("""\
493 215 Newsgroups in form "group high low flags"
494 gmane.comp.python.distutils.devel 0000014104 0000000001 m
495 gmane.comp.python.distutils.cvs 0000000000 0000000001 m
496 .""")
497 else:
498 self.push_lit("""\
499 215 Newsgroups in form "group high low flags"
500 .""")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000501 elif action == "OVERVIEW.FMT":
502 self.push_lit("""\
503 215 Order of fields in overview database.
504 Subject:
505 From:
506 Date:
507 Message-ID:
508 References:
509 Bytes:
510 Lines:
511 Xref:full
512 .""")
513 elif action == "NEWSGROUPS":
514 assert param is not None
515 if param == "comp.lang.python":
516 self.push_lit("""\
517 215 Descriptions in form "group description".
518 comp.lang.python\tThe Python computer language.
519 .""")
520 elif param == "comp.lang.python*":
521 self.push_lit("""\
522 215 Descriptions in form "group description".
523 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
524 comp.lang.python\tThe Python computer language.
525 .""")
526 else:
527 self.push_lit("""\
528 215 Descriptions in form "group description".
529 .""")
530 else:
531 self.push_lit('501 Unknown LIST keyword')
532
533 def handle_NEWNEWS(self, group, date_str, time_str):
534 # We hard code different return messages depending on passed
535 # argument and date syntax.
536 if (group == "comp.lang.python" and date_str == "20100913"
537 and time_str == "082004"):
538 # Date was passed in RFC 3977 format (NNTP "v2")
539 self.push_lit("""\
540 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
541 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
542 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
543 .""")
544 elif (group == "comp.lang.python" and date_str == "100913"
545 and time_str == "082004"):
546 # Date was passed in RFC 977 format (NNTP "v1")
547 self.push_lit("""\
548 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
549 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
550 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
551 .""")
552 else:
553 self.push_lit("""\
554 230 An empty list of newsarticles follows
555 .""")
556 # (Note for experiments: many servers disable NEWNEWS.
557 # As of this writing, sicinfo3.epfl.ch doesn't.)
558
559 def handle_XOVER(self, message_spec):
560 if message_spec == "57-59":
561 self.push_lit(
562 "224 Overview information for 57-58 follows\n"
563 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
564 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
565 "\tSat, 19 Jun 2010 18:04:08 -0400"
566 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
567 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
568 "\tXref: news.gmane.org gmane.comp.python.authors:57"
569 "\n"
570 "58\tLooking for a few good bloggers"
571 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
572 "\tThu, 22 Jul 2010 09:14:14 -0400"
573 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
574 "\t\t6683\t16"
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000575 "\t"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000576 "\n"
577 # An UTF-8 overview line from fr.comp.lang.python
578 "59\tRe: Message d'erreur incompréhensible (par moi)"
579 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
580 "\tWed, 15 Sep 2010 18:09:15 +0200"
581 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
582 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
583 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
584 "\n"
585 ".\n")
586 else:
587 self.push_lit("""\
588 224 No articles
589 .""")
590
591 def handle_POST(self, *, body=None):
592 if body is None:
593 if self.allow_posting:
594 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
595 self.expect_body()
596 else:
597 self.push_lit("440 Posting not permitted")
598 else:
599 assert self.allow_posting
600 self.push_lit("240 Article received OK")
601 self.posted_body = body
602
603 def handle_IHAVE(self, message_id, *, body=None):
604 if body is None:
605 if (self.allow_posting and
606 message_id == "<i.am.an.article.you.will.want@example.com>"):
607 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
608 self.expect_body()
609 else:
610 self.push_lit("435 Article not wanted")
611 else:
612 assert self.allow_posting
613 self.push_lit("235 Article transferred OK")
614 self.posted_body = body
615
616 sample_head = """\
617 From: "Demo User" <nobody@example.net>
618 Subject: I am just a test article
619 Content-Type: text/plain; charset=UTF-8; format=flowed
620 Message-ID: <i.am.an.article.you.will.want@example.com>"""
621
622 sample_body = """\
623 This is just a test article.
624 ..Here is a dot-starting line.
625
626 -- Signed by Andr\xe9."""
627
628 sample_article = sample_head + "\n\n" + sample_body
629
630 def handle_ARTICLE(self, message_spec=None):
631 if message_spec is None:
632 self.push_lit("220 3000237 <45223423@example.com>")
633 elif message_spec == "<45223423@example.com>":
634 self.push_lit("220 0 <45223423@example.com>")
635 elif message_spec == "3000234":
636 self.push_lit("220 3000234 <45223423@example.com>")
637 else:
638 self.push_lit("430 No Such Article Found")
639 return
640 self.push_lit(self.sample_article)
641 self.push_lit(".")
642
643 def handle_HEAD(self, message_spec=None):
644 if message_spec is None:
645 self.push_lit("221 3000237 <45223423@example.com>")
646 elif message_spec == "<45223423@example.com>":
647 self.push_lit("221 0 <45223423@example.com>")
648 elif message_spec == "3000234":
649 self.push_lit("221 3000234 <45223423@example.com>")
650 else:
651 self.push_lit("430 No Such Article Found")
652 return
653 self.push_lit(self.sample_head)
654 self.push_lit(".")
655
656 def handle_BODY(self, message_spec=None):
657 if message_spec is None:
658 self.push_lit("222 3000237 <45223423@example.com>")
659 elif message_spec == "<45223423@example.com>":
660 self.push_lit("222 0 <45223423@example.com>")
661 elif message_spec == "3000234":
662 self.push_lit("222 3000234 <45223423@example.com>")
663 else:
664 self.push_lit("430 No Such Article Found")
665 return
666 self.push_lit(self.sample_body)
667 self.push_lit(".")
668
669
670class NNTPv2Handler(NNTPv1Handler):
671 """A handler for RFC 3977 (NNTP "v2")"""
672
673 def handle_CAPABILITIES(self):
674 self.push_lit("""\
675 101 Capability list:
Antoine Pitrouf80b3f72010-11-02 22:31:52 +0000676 VERSION 2 3
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000677 IMPLEMENTATION INN 2.5.1
678 AUTHINFO USER
679 HDR
680 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
681 OVER
682 POST
683 READER
684 .""")
685
686 def handle_OVER(self, message_spec=None):
687 return self.handle_XOVER(message_spec)
688
689
690class NNTPv1v2TestsMixin:
691
692 def setUp(self):
693 super().setUp()
694
695 def test_welcome(self):
696 self.assertEqual(self.server.welcome, self.handler.welcome)
697
698 def test_date(self):
699 resp, date = self.server.date()
700 self.assertEqual(resp, "111 20100914001155")
701 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
702
703 def test_quit(self):
704 self.assertFalse(self.sio.closed)
705 resp = self.server.quit()
706 self.assertEqual(resp, "205 Bye!")
707 self.assertTrue(self.sio.closed)
708
709 def test_help(self):
710 resp, help = self.server.help()
711 self.assertEqual(resp, "100 Legal commands")
712 self.assertEqual(help, [
713 ' authinfo user Name|pass Password|generic <prog> <args>',
714 ' date',
715 ' help',
716 'Report problems to <root@example.org>',
717 ])
718
719 def test_list(self):
720 resp, groups = self.server.list()
721 self.assertEqual(len(groups), 6)
722 g = groups[1]
723 self.assertEqual(g,
724 GroupInfo("comp.lang.python.announce", "0000001153",
725 "0000000993", "m"))
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000726 resp, groups = self.server.list("*distutils*")
727 self.assertEqual(len(groups), 2)
728 g = groups[0]
729 self.assertEqual(g,
730 GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
731 "0000000001", "m"))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000732
733 def test_stat(self):
734 resp, art_num, message_id = self.server.stat(3000234)
735 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
736 self.assertEqual(art_num, 3000234)
737 self.assertEqual(message_id, "<45223423@example.com>")
738 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
739 self.assertEqual(resp, "223 0 <45223423@example.com>")
740 self.assertEqual(art_num, 0)
741 self.assertEqual(message_id, "<45223423@example.com>")
742 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
743 self.server.stat("<non.existent.id>")
744 self.assertEqual(cm.exception.response, "430 No Such Article Found")
745 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
746 self.server.stat()
747 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
748
749 def test_next(self):
750 resp, art_num, message_id = self.server.next()
751 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
752 self.assertEqual(art_num, 3000237)
753 self.assertEqual(message_id, "<668929@example.org>")
754
755 def test_last(self):
756 resp, art_num, message_id = self.server.last()
757 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
758 self.assertEqual(art_num, 3000234)
759 self.assertEqual(message_id, "<45223423@example.com>")
760
761 def test_description(self):
762 desc = self.server.description("comp.lang.python")
763 self.assertEqual(desc, "The Python computer language.")
764 desc = self.server.description("comp.lang.pythonx")
765 self.assertEqual(desc, "")
766
767 def test_descriptions(self):
768 resp, groups = self.server.descriptions("comp.lang.python")
769 self.assertEqual(resp, '215 Descriptions in form "group description".')
770 self.assertEqual(groups, {
771 "comp.lang.python": "The Python computer language.",
772 })
773 resp, groups = self.server.descriptions("comp.lang.python*")
774 self.assertEqual(groups, {
775 "comp.lang.python": "The Python computer language.",
776 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
777 })
778 resp, groups = self.server.descriptions("comp.lang.pythonx")
779 self.assertEqual(groups, {})
780
781 def test_group(self):
782 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
783 self.assertTrue(resp.startswith("211 "), resp)
784 self.assertEqual(first, 761)
785 self.assertEqual(last, 1265)
786 self.assertEqual(count, 486)
787 self.assertEqual(group, "fr.comp.lang.python")
788 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
789 self.server.group("comp.lang.python.devel")
790 exc = cm.exception
791 self.assertTrue(exc.response.startswith("411 No such group"),
792 exc.response)
793
794 def test_newnews(self):
795 # NEWNEWS comp.lang.python [20]100913 082004
796 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
797 resp, ids = self.server.newnews("comp.lang.python", dt)
798 expected = (
799 "230 list of newsarticles (NNTP v{0}) "
800 "created after Mon Sep 13 08:20:04 2010 follows"
801 ).format(self.nntp_version)
802 self.assertEqual(resp, expected)
803 self.assertEqual(ids, [
804 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
805 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
806 ])
807 # NEWNEWS fr.comp.lang.python [20]100913 082004
808 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
809 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
810 self.assertEqual(resp, "230 An empty list of newsarticles follows")
811 self.assertEqual(ids, [])
812
813 def _check_article_body(self, lines):
814 self.assertEqual(len(lines), 4)
815 self.assertEqual(lines[-1].decode('utf8'), "-- Signed by André.")
816 self.assertEqual(lines[-2], b"")
817 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
818 self.assertEqual(lines[-4], b"This is just a test article.")
819
820 def _check_article_head(self, lines):
821 self.assertEqual(len(lines), 4)
822 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
823 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
824
825 def _check_article_data(self, lines):
826 self.assertEqual(len(lines), 9)
827 self._check_article_head(lines[:4])
828 self._check_article_body(lines[-4:])
829 self.assertEqual(lines[4], b"")
830
831 def test_article(self):
832 # ARTICLE
833 resp, info = self.server.article()
834 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
835 art_num, message_id, lines = info
836 self.assertEqual(art_num, 3000237)
837 self.assertEqual(message_id, "<45223423@example.com>")
838 self._check_article_data(lines)
839 # ARTICLE num
840 resp, info = self.server.article(3000234)
841 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
842 art_num, message_id, lines = info
843 self.assertEqual(art_num, 3000234)
844 self.assertEqual(message_id, "<45223423@example.com>")
845 self._check_article_data(lines)
846 # ARTICLE id
847 resp, info = self.server.article("<45223423@example.com>")
848 self.assertEqual(resp, "220 0 <45223423@example.com>")
849 art_num, message_id, lines = info
850 self.assertEqual(art_num, 0)
851 self.assertEqual(message_id, "<45223423@example.com>")
852 self._check_article_data(lines)
853 # Non-existent id
854 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
855 self.server.article("<non-existent@example.com>")
856 self.assertEqual(cm.exception.response, "430 No Such Article Found")
857
858 def test_article_file(self):
859 # With a "file" argument
860 f = io.BytesIO()
861 resp, info = self.server.article(file=f)
862 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
863 art_num, message_id, lines = info
864 self.assertEqual(art_num, 3000237)
865 self.assertEqual(message_id, "<45223423@example.com>")
866 self.assertEqual(lines, [])
867 data = f.getvalue()
868 self.assertTrue(data.startswith(
869 b'From: "Demo User" <nobody@example.net>\r\n'
870 b'Subject: I am just a test article\r\n'
871 ), ascii(data))
872 self.assertTrue(data.endswith(
873 b'This is just a test article.\r\n'
874 b'.Here is a dot-starting line.\r\n'
875 b'\r\n'
876 b'-- Signed by Andr\xc3\xa9.\r\n'
877 ), ascii(data))
878
879 def test_head(self):
880 # HEAD
881 resp, info = self.server.head()
882 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
883 art_num, message_id, lines = info
884 self.assertEqual(art_num, 3000237)
885 self.assertEqual(message_id, "<45223423@example.com>")
886 self._check_article_head(lines)
887 # HEAD num
888 resp, info = self.server.head(3000234)
889 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
890 art_num, message_id, lines = info
891 self.assertEqual(art_num, 3000234)
892 self.assertEqual(message_id, "<45223423@example.com>")
893 self._check_article_head(lines)
894 # HEAD id
895 resp, info = self.server.head("<45223423@example.com>")
896 self.assertEqual(resp, "221 0 <45223423@example.com>")
897 art_num, message_id, lines = info
898 self.assertEqual(art_num, 0)
899 self.assertEqual(message_id, "<45223423@example.com>")
900 self._check_article_head(lines)
901 # Non-existent id
902 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
903 self.server.head("<non-existent@example.com>")
904 self.assertEqual(cm.exception.response, "430 No Such Article Found")
905
906 def test_body(self):
907 # BODY
908 resp, info = self.server.body()
909 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
910 art_num, message_id, lines = info
911 self.assertEqual(art_num, 3000237)
912 self.assertEqual(message_id, "<45223423@example.com>")
913 self._check_article_body(lines)
914 # BODY num
915 resp, info = self.server.body(3000234)
916 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
917 art_num, message_id, lines = info
918 self.assertEqual(art_num, 3000234)
919 self.assertEqual(message_id, "<45223423@example.com>")
920 self._check_article_body(lines)
921 # BODY id
922 resp, info = self.server.body("<45223423@example.com>")
923 self.assertEqual(resp, "222 0 <45223423@example.com>")
924 art_num, message_id, lines = info
925 self.assertEqual(art_num, 0)
926 self.assertEqual(message_id, "<45223423@example.com>")
927 self._check_article_body(lines)
928 # Non-existent id
929 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
930 self.server.body("<non-existent@example.com>")
931 self.assertEqual(cm.exception.response, "430 No Such Article Found")
932
933 def check_over_xover_resp(self, resp, overviews):
934 self.assertTrue(resp.startswith("224 "), resp)
935 self.assertEqual(len(overviews), 3)
936 art_num, over = overviews[0]
937 self.assertEqual(art_num, 57)
938 self.assertEqual(over, {
939 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
940 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
941 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
942 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
943 "references": "<hvalf7$ort$1@dough.gmane.org>",
944 ":bytes": "7103",
945 ":lines": "16",
946 "xref": "news.gmane.org gmane.comp.python.authors:57"
947 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000948 art_num, over = overviews[1]
949 self.assertEqual(over["xref"], None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000950 art_num, over = overviews[2]
951 self.assertEqual(over["subject"],
952 "Re: Message d'erreur incompréhensible (par moi)")
953
954 def test_xover(self):
955 resp, overviews = self.server.xover(57, 59)
956 self.check_over_xover_resp(resp, overviews)
957
958 def test_over(self):
959 # In NNTP "v1", this will fallback on XOVER
960 resp, overviews = self.server.over((57, 59))
961 self.check_over_xover_resp(resp, overviews)
962
963 sample_post = (
964 b'From: "Demo User" <nobody@example.net>\r\n'
965 b'Subject: I am just a test article\r\n'
966 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
967 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
968 b'\r\n'
969 b'This is just a test article.\r\n'
970 b'.Here is a dot-starting line.\r\n'
971 b'\r\n'
972 b'-- Signed by Andr\xc3\xa9.\r\n'
973 )
974
975 def _check_posted_body(self):
976 # Check the raw body as received by the server
977 lines = self.handler.posted_body
978 # One additional line for the "." terminator
979 self.assertEqual(len(lines), 10)
980 self.assertEqual(lines[-1], b'.\r\n')
981 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
982 self.assertEqual(lines[-3], b'\r\n')
983 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
984 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
985
986 def _check_post_ihave_sub(self, func, *args, file_factory):
987 # First the prepared post with CRLF endings
988 post = self.sample_post
989 func_args = args + (file_factory(post),)
990 self.handler.posted_body = None
991 resp = func(*func_args)
992 self._check_posted_body()
993 # Then the same post with "normal" line endings - they should be
994 # converted by NNTP.post and NNTP.ihave.
995 post = self.sample_post.replace(b"\r\n", b"\n")
996 func_args = args + (file_factory(post),)
997 self.handler.posted_body = None
998 resp = func(*func_args)
999 self._check_posted_body()
1000 return resp
1001
1002 def check_post_ihave(self, func, success_resp, *args):
1003 # With a bytes object
1004 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1005 self.assertEqual(resp, success_resp)
1006 # With a bytearray object
1007 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1008 self.assertEqual(resp, success_resp)
1009 # With a file object
1010 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1011 self.assertEqual(resp, success_resp)
1012 # With an iterable of terminated lines
1013 def iterlines(b):
1014 return iter(b.splitlines(True))
1015 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1016 self.assertEqual(resp, success_resp)
1017 # With an iterable of non-terminated lines
1018 def iterlines(b):
1019 return iter(b.splitlines(False))
1020 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1021 self.assertEqual(resp, success_resp)
1022
1023 def test_post(self):
1024 self.check_post_ihave(self.server.post, "240 Article received OK")
1025 self.handler.allow_posting = False
1026 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1027 self.server.post(self.sample_post)
1028 self.assertEqual(cm.exception.response,
1029 "440 Posting not permitted")
1030
1031 def test_ihave(self):
1032 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1033 "<i.am.an.article.you.will.want@example.com>")
1034 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1035 self.server.ihave("<another.message.id>", self.sample_post)
1036 self.assertEqual(cm.exception.response,
1037 "435 Article not wanted")
1038
1039
1040class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1041 """Tests an NNTP v1 server (no capabilities)."""
1042
1043 nntp_version = 1
1044 handler_class = NNTPv1Handler
1045
1046 def test_caps(self):
1047 caps = self.server.getcapabilities()
1048 self.assertEqual(caps, {})
1049 self.assertEqual(self.server.nntp_version, 1)
Antoine Pitroua0781152010-11-05 19:16:37 +00001050 self.assertEqual(self.server.nntp_implementation, None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001051
1052
1053class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1054 """Tests an NNTP v2 server (with capabilities)."""
1055
1056 nntp_version = 2
1057 handler_class = NNTPv2Handler
1058
1059 def test_caps(self):
1060 caps = self.server.getcapabilities()
1061 self.assertEqual(caps, {
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001062 'VERSION': ['2', '3'],
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001063 'IMPLEMENTATION': ['INN', '2.5.1'],
1064 'AUTHINFO': ['USER'],
1065 'HDR': [],
1066 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1067 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1068 'OVER': [],
1069 'POST': [],
1070 'READER': [],
1071 })
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001072 self.assertEqual(self.server.nntp_version, 3)
Antoine Pitroua0781152010-11-05 19:16:37 +00001073 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001074
1075
1076class MiscTests(unittest.TestCase):
1077
1078 def test_decode_header(self):
1079 def gives(a, b):
1080 self.assertEqual(nntplib.decode_header(a), b)
1081 gives("" , "")
1082 gives("a plain header", "a plain header")
1083 gives(" with extra spaces ", " with extra spaces ")
1084 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
1085 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1086 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1087 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1088 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1089 "Re: problème de matrice")
1090 # A natively utf-8 header (found in the real world!)
1091 gives("Re: Message d'erreur incompréhensible (par moi)",
1092 "Re: Message d'erreur incompréhensible (par moi)")
1093
1094 def test_parse_overview_fmt(self):
1095 # The minimal (default) response
1096 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1097 "References:", ":bytes", ":lines"]
1098 self.assertEqual(nntplib._parse_overview_fmt(lines),
1099 ["subject", "from", "date", "message-id", "references",
1100 ":bytes", ":lines"])
1101 # The minimal response using alternative names
1102 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1103 "References:", "Bytes:", "Lines:"]
1104 self.assertEqual(nntplib._parse_overview_fmt(lines),
1105 ["subject", "from", "date", "message-id", "references",
1106 ":bytes", ":lines"])
1107 # Variations in casing
1108 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1109 "References:", "BYTES:", "Lines:"]
1110 self.assertEqual(nntplib._parse_overview_fmt(lines),
1111 ["subject", "from", "date", "message-id", "references",
1112 ":bytes", ":lines"])
1113 # First example from RFC 3977
1114 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1115 "References:", ":bytes", ":lines", "Xref:full",
1116 "Distribution:full"]
1117 self.assertEqual(nntplib._parse_overview_fmt(lines),
1118 ["subject", "from", "date", "message-id", "references",
1119 ":bytes", ":lines", "xref", "distribution"])
1120 # Second example from RFC 3977
1121 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1122 "References:", "Bytes:", "Lines:", "Xref:FULL",
1123 "Distribution:FULL"]
1124 self.assertEqual(nntplib._parse_overview_fmt(lines),
1125 ["subject", "from", "date", "message-id", "references",
1126 ":bytes", ":lines", "xref", "distribution"])
1127 # A classic response from INN
1128 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1129 "References:", "Bytes:", "Lines:", "Xref:full"]
1130 self.assertEqual(nntplib._parse_overview_fmt(lines),
1131 ["subject", "from", "date", "message-id", "references",
1132 ":bytes", ":lines", "xref"])
1133
1134 def test_parse_overview(self):
1135 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1136 # First example from RFC 3977
1137 lines = [
1138 '3000234\tI am just a test article\t"Demo User" '
1139 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1140 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1141 '17\tXref: news.example.com misc.test:3000363',
1142 ]
1143 overview = nntplib._parse_overview(lines, fmt)
1144 (art_num, fields), = overview
1145 self.assertEqual(art_num, 3000234)
1146 self.assertEqual(fields, {
1147 'subject': 'I am just a test article',
1148 'from': '"Demo User" <nobody@example.com>',
1149 'date': '6 Oct 1998 04:38:40 -0500',
1150 'message-id': '<45223423@example.com>',
1151 'references': '<45454@example.net>',
1152 ':bytes': '1234',
1153 ':lines': '17',
1154 'xref': 'news.example.com misc.test:3000363',
1155 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001156 # Second example; here the "Xref" field is totally absent (including
1157 # the header name) and comes out as None
1158 lines = [
1159 '3000234\tI am just a test article\t"Demo User" '
1160 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1161 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1162 '17\t\t',
1163 ]
1164 overview = nntplib._parse_overview(lines, fmt)
1165 (art_num, fields), = overview
1166 self.assertEqual(fields['xref'], None)
1167 # Third example; the "Xref" is an empty string, while "references"
1168 # is a single space.
1169 lines = [
1170 '3000234\tI am just a test article\t"Demo User" '
1171 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1172 '<45223423@example.com>\t \t1234\t'
1173 '17\tXref: \t',
1174 ]
1175 overview = nntplib._parse_overview(lines, fmt)
1176 (art_num, fields), = overview
1177 self.assertEqual(fields['references'], ' ')
1178 self.assertEqual(fields['xref'], '')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001179
1180 def test_parse_datetime(self):
1181 def gives(a, b, *c):
1182 self.assertEqual(nntplib._parse_datetime(a, b),
1183 datetime.datetime(*c))
1184 # Output of DATE command
1185 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1186 # Variations
1187 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1188 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1189 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1190
1191 def test_unparse_datetime(self):
1192 # Test non-legacy mode
1193 # 1) with a datetime
1194 def gives(y, M, d, h, m, s, date_str, time_str):
1195 dt = datetime.datetime(y, M, d, h, m, s)
1196 self.assertEqual(nntplib._unparse_datetime(dt),
1197 (date_str, time_str))
1198 self.assertEqual(nntplib._unparse_datetime(dt, False),
1199 (date_str, time_str))
1200 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1201 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1202 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1203 # 2) with a date
1204 def gives(y, M, d, date_str, time_str):
1205 dt = datetime.date(y, M, d)
1206 self.assertEqual(nntplib._unparse_datetime(dt),
1207 (date_str, time_str))
1208 self.assertEqual(nntplib._unparse_datetime(dt, False),
1209 (date_str, time_str))
1210 gives(1999, 6, 23, "19990623", "000000")
1211 gives(2000, 6, 23, "20000623", "000000")
1212 gives(2010, 6, 5, "20100605", "000000")
1213
1214 def test_unparse_datetime_legacy(self):
1215 # Test legacy mode (RFC 977)
1216 # 1) with a datetime
1217 def gives(y, M, d, h, m, s, date_str, time_str):
1218 dt = datetime.datetime(y, M, d, h, m, s)
1219 self.assertEqual(nntplib._unparse_datetime(dt, True),
1220 (date_str, time_str))
1221 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1222 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1223 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1224 # 2) with a date
1225 def gives(y, M, d, date_str, time_str):
1226 dt = datetime.date(y, M, d)
1227 self.assertEqual(nntplib._unparse_datetime(dt, True),
1228 (date_str, time_str))
1229 gives(1999, 6, 23, "990623", "000000")
1230 gives(2000, 6, 23, "000623", "000000")
1231 gives(2010, 6, 5, "100605", "000000")
1232
1233
1234def test_main():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00001235 tests = [MiscTests, NNTPv1Tests, NNTPv2Tests, NetworkedNNTPTests]
1236 if _have_ssl:
1237 tests.append(NetworkedNNTP_SSLTests)
1238 support.run_unittest(*tests)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001239
1240
1241if __name__ == "__main__":
1242 test_main()