blob: ec790ada52a7585917b58cbc936c80e1e4883323 [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +00002import socket
Antoine Pitrou69ab9512010-09-29 15:03:40 +00003import datetime
4import textwrap
5import unittest
Antoine Pitroude609182010-11-18 17:29:23 +00006import functools
Antoine Pitrou69ab9512010-09-29 15:03:40 +00007import contextlib
Raymond Hettinger57d1a882011-02-23 00:46:28 +00008import collections.abc
Antoine Pitrou69ab9512010-09-29 15:03:40 +00009from test import support
Antoine Pitrou1cb121e2010-11-09 18:54:37 +000010from nntplib import NNTP, GroupInfo, _have_ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +000011import nntplib
Antoine Pitrou1cb121e2010-11-09 18:54:37 +000012if _have_ssl:
13 import ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +000014
15TIMEOUT = 30
16
17# TODO:
18# - test the `file` arg to more commands
19# - test error conditions
Antoine Pitroua5785b12010-09-29 16:19:50 +000020# - test auth and `usenetrc`
Antoine Pitrou69ab9512010-09-29 15:03:40 +000021
22
23class NetworkedNNTPTestsMixin:
24
25 def test_welcome(self):
26 welcome = self.server.getwelcome()
27 self.assertEqual(str, type(welcome))
28
29 def test_help(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000030 resp, lines = self.server.help()
Antoine Pitrou69ab9512010-09-29 15:03:40 +000031 self.assertTrue(resp.startswith("100 "), resp)
Antoine Pitrou08eeada2010-11-04 21:36:15 +000032 for line in lines:
Antoine Pitrou69ab9512010-09-29 15:03:40 +000033 self.assertEqual(str, type(line))
34
35 def test_list(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000036 resp, groups = self.server.list()
37 if len(groups) > 0:
38 self.assertEqual(GroupInfo, type(groups[0]))
39 self.assertEqual(str, type(groups[0].group))
40
41 def test_list_active(self):
42 resp, groups = self.server.list(self.GROUP_PAT)
43 if len(groups) > 0:
44 self.assertEqual(GroupInfo, type(groups[0]))
45 self.assertEqual(str, type(groups[0].group))
Antoine Pitrou69ab9512010-09-29 15:03:40 +000046
47 def test_unknown_command(self):
48 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
49 self.server._shortcmd("XYZZY")
50 resp = cm.exception.response
51 self.assertTrue(resp.startswith("500 "), resp)
52
53 def test_newgroups(self):
54 # gmane gets a constant influx of new groups. In order not to stress
55 # the server too much, we choose a recent date in the past.
56 dt = datetime.date.today() - datetime.timedelta(days=7)
57 resp, groups = self.server.newgroups(dt)
58 if len(groups) > 0:
59 self.assertIsInstance(groups[0], GroupInfo)
60 self.assertIsInstance(groups[0].group, str)
61
62 def test_description(self):
63 def _check_desc(desc):
64 # Sanity checks
65 self.assertIsInstance(desc, str)
66 self.assertNotIn(self.GROUP_NAME, desc)
67 desc = self.server.description(self.GROUP_NAME)
68 _check_desc(desc)
69 # Another sanity check
70 self.assertIn("Python", desc)
71 # With a pattern
72 desc = self.server.description(self.GROUP_PAT)
73 _check_desc(desc)
74 # Shouldn't exist
75 desc = self.server.description("zk.brrtt.baz")
76 self.assertEqual(desc, '')
77
78 def test_descriptions(self):
79 resp, descs = self.server.descriptions(self.GROUP_PAT)
80 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
81 self.assertTrue(
82 resp.startswith("215 ") or resp.startswith("282 "), resp)
83 self.assertIsInstance(descs, dict)
84 desc = descs[self.GROUP_NAME]
85 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
86
87 def test_group(self):
88 result = self.server.group(self.GROUP_NAME)
89 self.assertEqual(5, len(result))
90 resp, count, first, last, group = result
91 self.assertEqual(group, self.GROUP_NAME)
92 self.assertIsInstance(count, int)
93 self.assertIsInstance(first, int)
94 self.assertIsInstance(last, int)
95 self.assertLessEqual(first, last)
96 self.assertTrue(resp.startswith("211 "), resp)
97
98 def test_date(self):
99 resp, date = self.server.date()
100 self.assertIsInstance(date, datetime.datetime)
101 # Sanity check
102 self.assertGreaterEqual(date.year, 1995)
103 self.assertLessEqual(date.year, 2030)
104
105 def _check_art_dict(self, art_dict):
106 # Some sanity checks for a field dictionary returned by OVER / XOVER
107 self.assertIsInstance(art_dict, dict)
108 # NNTP has 7 mandatory fields
109 self.assertGreaterEqual(art_dict.keys(),
110 {"subject", "from", "date", "message-id",
111 "references", ":bytes", ":lines"}
112 )
113 for v in art_dict.values():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000114 self.assertIsInstance(v, (str, type(None)))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000115
116 def test_xover(self):
117 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000118 resp, lines = self.server.xover(last - 5, last)
119 if len(lines) == 0:
120 self.skipTest("no articles retrieved")
121 # The 'last' article is not necessarily part of the output (cancelled?)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000122 art_num, art_dict = lines[0]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000123 self.assertGreaterEqual(art_num, last - 5)
124 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000125 self._check_art_dict(art_dict)
126
127 def test_over(self):
128 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
129 start = last - 10
130 # The "start-" article range form
131 resp, lines = self.server.over((start, None))
132 art_num, art_dict = lines[0]
133 self._check_art_dict(art_dict)
134 # The "start-end" article range form
135 resp, lines = self.server.over((start, last))
136 art_num, art_dict = lines[-1]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000137 # The 'last' article is not necessarily part of the output (cancelled?)
138 self.assertGreaterEqual(art_num, start)
139 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000140 self._check_art_dict(art_dict)
141 # XXX The "message_id" form is unsupported by gmane
142 # 503 Overview by message-ID unsupported
143
144 def test_xhdr(self):
145 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
146 resp, lines = self.server.xhdr('subject', last)
147 for line in lines:
148 self.assertEqual(str, type(line[1]))
149
150 def check_article_resp(self, resp, article, art_num=None):
151 self.assertIsInstance(article, nntplib.ArticleInfo)
152 if art_num is not None:
153 self.assertEqual(article.number, art_num)
154 for line in article.lines:
155 self.assertIsInstance(line, bytes)
156 # XXX this could exceptionally happen...
157 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
158
159 def test_article_head_body(self):
160 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000161 # Try to find an available article
162 for art_num in (last, first, last - 1):
163 try:
164 resp, head = self.server.head(art_num)
165 except nntplib.NNTPTemporaryError as e:
166 if not e.response.startswith("423 "):
167 raise
168 # "423 No such article" => choose another one
169 continue
170 break
171 else:
172 self.skipTest("could not find a suitable article number")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000173 self.assertTrue(resp.startswith("221 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000174 self.check_article_resp(resp, head, art_num)
175 resp, body = self.server.body(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000176 self.assertTrue(resp.startswith("222 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000177 self.check_article_resp(resp, body, art_num)
178 resp, article = self.server.article(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000179 self.assertTrue(resp.startswith("220 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000180 self.check_article_resp(resp, article, art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000181 self.assertEqual(article.lines, head.lines + [b''] + body.lines)
182
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000183 def test_capabilities(self):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000184 # The server under test implements NNTP version 2 and has a
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000185 # couple of well-known capabilities. Just sanity check that we
186 # got them.
187 def _check_caps(caps):
188 caps_list = caps['LIST']
189 self.assertIsInstance(caps_list, (list, tuple))
190 self.assertIn('OVERVIEW.FMT', caps_list)
191 self.assertGreaterEqual(self.server.nntp_version, 2)
192 _check_caps(self.server.getcapabilities())
193 # This re-emits the command
194 resp, caps = self.server.capabilities()
195 _check_caps(caps)
196
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000197 if _have_ssl:
198 def test_starttls(self):
199 file = self.server.file
200 sock = self.server.sock
201 try:
202 self.server.starttls()
203 except nntplib.NNTPPermanentError:
204 self.skipTest("STARTTLS not supported by server.")
205 else:
206 # Check that the socket and internal pseudo-file really were
207 # changed.
208 self.assertNotEqual(file, self.server.file)
209 self.assertNotEqual(sock, self.server.sock)
210 # Check that the new socket really is an SSL one
211 self.assertIsInstance(self.server.sock, ssl.SSLSocket)
212 # Check that trying starttls when it's already active fails.
213 self.assertRaises(ValueError, self.server.starttls)
214
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000215 def test_zlogin(self):
216 # This test must be the penultimate because further commands will be
217 # refused.
218 baduser = "notarealuser"
219 badpw = "notarealpassword"
220 # Check that bogus credentials cause failure
221 self.assertRaises(nntplib.NNTPError, self.server.login,
222 user=baduser, password=badpw, usenetrc=False)
223 # FIXME: We should check that correct credentials succeed, but that
224 # would require valid details for some server somewhere to be in the
225 # test suite, I think. Gmane is anonymous, at least as used for the
226 # other tests.
227
228 def test_zzquit(self):
229 # This test must be called last, hence the name
230 cls = type(self)
Antoine Pitrou3bce11c2010-11-21 17:14:19 +0000231 try:
232 self.server.quit()
233 finally:
234 cls.server = None
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000235
Antoine Pitroude609182010-11-18 17:29:23 +0000236 @classmethod
237 def wrap_methods(cls):
238 # Wrap all methods in a transient_internet() exception catcher
239 # XXX put a generic version in test.support?
240 def wrap_meth(meth):
241 @functools.wraps(meth)
242 def wrapped(self):
243 with support.transient_internet(self.NNTP_HOST):
244 meth(self)
245 return wrapped
246 for name in dir(cls):
247 if not name.startswith('test_'):
248 continue
249 meth = getattr(cls, name)
Raymond Hettinger57d1a882011-02-23 00:46:28 +0000250 if not isinstance(meth, collections.abc.Callable):
Antoine Pitroude609182010-11-18 17:29:23 +0000251 continue
252 # Need to use a closure so that meth remains bound to its current
253 # value
254 setattr(cls, name, wrap_meth(meth))
255
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000256 def test_with_statement(self):
257 def is_connected():
258 if not hasattr(server, 'file'):
259 return False
260 try:
261 server.help()
262 except (socket.error, EOFError):
263 return False
264 return True
265
266 with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
267 self.assertTrue(is_connected())
268 self.assertTrue(server.help())
269 self.assertFalse(is_connected())
270
271 with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
272 server.quit()
273 self.assertFalse(is_connected())
274
275
Antoine Pitroude609182010-11-18 17:29:23 +0000276NetworkedNNTPTestsMixin.wrap_methods()
277
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000278
279class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
280 # This server supports STARTTLS (gmane doesn't)
281 NNTP_HOST = 'news.trigofacile.com'
282 GROUP_NAME = 'fr.comp.lang.python'
283 GROUP_PAT = 'fr.comp.lang.*'
284
Antoine Pitroude609182010-11-18 17:29:23 +0000285 NNTP_CLASS = NNTP
286
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000287 @classmethod
288 def setUpClass(cls):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000289 support.requires("network")
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000290 with support.transient_internet(cls.NNTP_HOST):
Antoine Pitroude609182010-11-18 17:29:23 +0000291 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, timeout=TIMEOUT, usenetrc=False)
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000292
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000293 @classmethod
294 def tearDownClass(cls):
295 if cls.server is not None:
296 cls.server.quit()
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000297
298
299if _have_ssl:
Antoine Pitroude609182010-11-18 17:29:23 +0000300 class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000301
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000302 # Technical limits for this public NNTP server (see http://www.aioe.org):
303 # "Only two concurrent connections per IP address are allowed and
304 # 400 connections per day are accepted from each IP address."
305
306 NNTP_HOST = 'nntp.aioe.org'
307 GROUP_NAME = 'comp.lang.python'
308 GROUP_PAT = 'comp.lang.*'
309
Antoine Pitroude609182010-11-18 17:29:23 +0000310 NNTP_CLASS = nntplib.NNTP_SSL
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000311
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000312 # Disabled as it produces too much data
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000313 test_list = None
314
315 # Disabled as the connection will already be encrypted.
316 test_starttls = None
317
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000318
319#
320# Non-networked tests using a local server (or something mocking it).
321#
322
323class _NNTPServerIO(io.RawIOBase):
324 """A raw IO object allowing NNTP commands to be received and processed
325 by a handler. The handler can push responses which can then be read
326 from the IO object."""
327
328 def __init__(self, handler):
329 io.RawIOBase.__init__(self)
330 # The channel from the client
331 self.c2s = io.BytesIO()
332 # The channel to the client
333 self.s2c = io.BytesIO()
334 self.handler = handler
335 self.handler.start(self.c2s.readline, self.push_data)
336
337 def readable(self):
338 return True
339
340 def writable(self):
341 return True
342
343 def push_data(self, data):
344 """Push (buffer) some data to send to the client."""
345 pos = self.s2c.tell()
346 self.s2c.seek(0, 2)
347 self.s2c.write(data)
348 self.s2c.seek(pos)
349
350 def write(self, b):
351 """The client sends us some data"""
352 pos = self.c2s.tell()
353 self.c2s.write(b)
354 self.c2s.seek(pos)
355 self.handler.process_pending()
356 return len(b)
357
358 def readinto(self, buf):
359 """The client wants to read a response"""
360 self.handler.process_pending()
361 b = self.s2c.read(len(buf))
362 n = len(b)
363 buf[:n] = b
364 return n
365
366
367class MockedNNTPTestsMixin:
368 # Override in derived classes
369 handler_class = None
370
371 def setUp(self):
372 super().setUp()
373 self.make_server()
374
375 def tearDown(self):
376 super().tearDown()
377 del self.server
378
379 def make_server(self, *args, **kwargs):
380 self.handler = self.handler_class()
381 self.sio = _NNTPServerIO(self.handler)
382 # Using BufferedRWPair instead of BufferedRandom ensures the file
383 # isn't seekable.
384 file = io.BufferedRWPair(self.sio, self.sio)
Antoine Pitroua5785b12010-09-29 16:19:50 +0000385 self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000386 return self.server
387
388
389class NNTPv1Handler:
390 """A handler for RFC 977"""
391
392 welcome = "200 NNTP mock server"
393
394 def start(self, readline, push_data):
395 self.in_body = False
396 self.allow_posting = True
397 self._readline = readline
398 self._push_data = push_data
399 # Our welcome
400 self.handle_welcome()
401
402 def _decode(self, data):
403 return str(data, "utf-8", "surrogateescape")
404
405 def process_pending(self):
406 if self.in_body:
407 while True:
408 line = self._readline()
409 if not line:
410 return
411 self.body.append(line)
412 if line == b".\r\n":
413 break
414 try:
415 meth, tokens = self.body_callback
416 meth(*tokens, body=self.body)
417 finally:
418 self.body_callback = None
419 self.body = None
420 self.in_body = False
421 while True:
422 line = self._decode(self._readline())
423 if not line:
424 return
425 if not line.endswith("\r\n"):
426 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
427 line = line[:-2]
428 cmd, *tokens = line.split()
429 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
430 meth = getattr(self, "handle_" + cmd.upper(), None)
431 if meth is None:
432 self.handle_unknown()
433 else:
434 try:
435 meth(*tokens)
436 except Exception as e:
437 raise ValueError("command failed: {!r}".format(line)) from e
438 else:
439 if self.in_body:
440 self.body_callback = meth, tokens
441 self.body = []
442
443 def expect_body(self):
444 """Flag that the client is expected to post a request body"""
445 self.in_body = True
446
447 def push_data(self, data):
448 """Push some binary data"""
449 self._push_data(data)
450
451 def push_lit(self, lit):
452 """Push a string literal"""
453 lit = textwrap.dedent(lit)
454 lit = "\r\n".join(lit.splitlines()) + "\r\n"
455 lit = lit.encode('utf-8')
456 self.push_data(lit)
457
458 def handle_unknown(self):
459 self.push_lit("500 What?")
460
461 def handle_welcome(self):
462 self.push_lit(self.welcome)
463
464 def handle_QUIT(self):
465 self.push_lit("205 Bye!")
466
467 def handle_DATE(self):
468 self.push_lit("111 20100914001155")
469
470 def handle_GROUP(self, group):
471 if group == "fr.comp.lang.python":
472 self.push_lit("211 486 761 1265 fr.comp.lang.python")
473 else:
474 self.push_lit("411 No such group {}".format(group))
475
476 def handle_HELP(self):
477 self.push_lit("""\
478 100 Legal commands
479 authinfo user Name|pass Password|generic <prog> <args>
480 date
481 help
482 Report problems to <root@example.org>
483 .""")
484
485 def handle_STAT(self, message_spec=None):
486 if message_spec is None:
487 self.push_lit("412 No newsgroup selected")
488 elif message_spec == "3000234":
489 self.push_lit("223 3000234 <45223423@example.com>")
490 elif message_spec == "<45223423@example.com>":
491 self.push_lit("223 0 <45223423@example.com>")
492 else:
493 self.push_lit("430 No Such Article Found")
494
495 def handle_NEXT(self):
496 self.push_lit("223 3000237 <668929@example.org> retrieved")
497
498 def handle_LAST(self):
499 self.push_lit("223 3000234 <45223423@example.com> retrieved")
500
501 def handle_LIST(self, action=None, param=None):
502 if action is None:
503 self.push_lit("""\
504 215 Newsgroups in form "group high low flags".
505 comp.lang.python 0000052340 0000002828 y
506 comp.lang.python.announce 0000001153 0000000993 m
507 free.it.comp.lang.python 0000000002 0000000002 y
508 fr.comp.lang.python 0000001254 0000000760 y
509 free.it.comp.lang.python.learner 0000000000 0000000001 y
510 tw.bbs.comp.lang.python 0000000304 0000000304 y
511 .""")
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000512 elif action == "ACTIVE":
513 if param == "*distutils*":
514 self.push_lit("""\
515 215 Newsgroups in form "group high low flags"
516 gmane.comp.python.distutils.devel 0000014104 0000000001 m
517 gmane.comp.python.distutils.cvs 0000000000 0000000001 m
518 .""")
519 else:
520 self.push_lit("""\
521 215 Newsgroups in form "group high low flags"
522 .""")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000523 elif action == "OVERVIEW.FMT":
524 self.push_lit("""\
525 215 Order of fields in overview database.
526 Subject:
527 From:
528 Date:
529 Message-ID:
530 References:
531 Bytes:
532 Lines:
533 Xref:full
534 .""")
535 elif action == "NEWSGROUPS":
536 assert param is not None
537 if param == "comp.lang.python":
538 self.push_lit("""\
539 215 Descriptions in form "group description".
540 comp.lang.python\tThe Python computer language.
541 .""")
542 elif param == "comp.lang.python*":
543 self.push_lit("""\
544 215 Descriptions in form "group description".
545 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
546 comp.lang.python\tThe Python computer language.
547 .""")
548 else:
549 self.push_lit("""\
550 215 Descriptions in form "group description".
551 .""")
552 else:
553 self.push_lit('501 Unknown LIST keyword')
554
555 def handle_NEWNEWS(self, group, date_str, time_str):
556 # We hard code different return messages depending on passed
557 # argument and date syntax.
558 if (group == "comp.lang.python" and date_str == "20100913"
559 and time_str == "082004"):
560 # Date was passed in RFC 3977 format (NNTP "v2")
561 self.push_lit("""\
562 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
563 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
564 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
565 .""")
566 elif (group == "comp.lang.python" and date_str == "100913"
567 and time_str == "082004"):
568 # Date was passed in RFC 977 format (NNTP "v1")
569 self.push_lit("""\
570 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
571 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
572 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
573 .""")
574 else:
575 self.push_lit("""\
576 230 An empty list of newsarticles follows
577 .""")
578 # (Note for experiments: many servers disable NEWNEWS.
579 # As of this writing, sicinfo3.epfl.ch doesn't.)
580
581 def handle_XOVER(self, message_spec):
582 if message_spec == "57-59":
583 self.push_lit(
584 "224 Overview information for 57-58 follows\n"
585 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
586 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
587 "\tSat, 19 Jun 2010 18:04:08 -0400"
588 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
589 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
590 "\tXref: news.gmane.org gmane.comp.python.authors:57"
591 "\n"
592 "58\tLooking for a few good bloggers"
593 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
594 "\tThu, 22 Jul 2010 09:14:14 -0400"
595 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
596 "\t\t6683\t16"
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000597 "\t"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000598 "\n"
599 # An UTF-8 overview line from fr.comp.lang.python
600 "59\tRe: Message d'erreur incompréhensible (par moi)"
601 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
602 "\tWed, 15 Sep 2010 18:09:15 +0200"
603 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
604 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
605 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
606 "\n"
607 ".\n")
608 else:
609 self.push_lit("""\
610 224 No articles
611 .""")
612
613 def handle_POST(self, *, body=None):
614 if body is None:
615 if self.allow_posting:
616 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
617 self.expect_body()
618 else:
619 self.push_lit("440 Posting not permitted")
620 else:
621 assert self.allow_posting
622 self.push_lit("240 Article received OK")
623 self.posted_body = body
624
625 def handle_IHAVE(self, message_id, *, body=None):
626 if body is None:
627 if (self.allow_posting and
628 message_id == "<i.am.an.article.you.will.want@example.com>"):
629 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
630 self.expect_body()
631 else:
632 self.push_lit("435 Article not wanted")
633 else:
634 assert self.allow_posting
635 self.push_lit("235 Article transferred OK")
636 self.posted_body = body
637
638 sample_head = """\
639 From: "Demo User" <nobody@example.net>
640 Subject: I am just a test article
641 Content-Type: text/plain; charset=UTF-8; format=flowed
642 Message-ID: <i.am.an.article.you.will.want@example.com>"""
643
644 sample_body = """\
645 This is just a test article.
646 ..Here is a dot-starting line.
647
648 -- Signed by Andr\xe9."""
649
650 sample_article = sample_head + "\n\n" + sample_body
651
652 def handle_ARTICLE(self, message_spec=None):
653 if message_spec is None:
654 self.push_lit("220 3000237 <45223423@example.com>")
655 elif message_spec == "<45223423@example.com>":
656 self.push_lit("220 0 <45223423@example.com>")
657 elif message_spec == "3000234":
658 self.push_lit("220 3000234 <45223423@example.com>")
659 else:
660 self.push_lit("430 No Such Article Found")
661 return
662 self.push_lit(self.sample_article)
663 self.push_lit(".")
664
665 def handle_HEAD(self, message_spec=None):
666 if message_spec is None:
667 self.push_lit("221 3000237 <45223423@example.com>")
668 elif message_spec == "<45223423@example.com>":
669 self.push_lit("221 0 <45223423@example.com>")
670 elif message_spec == "3000234":
671 self.push_lit("221 3000234 <45223423@example.com>")
672 else:
673 self.push_lit("430 No Such Article Found")
674 return
675 self.push_lit(self.sample_head)
676 self.push_lit(".")
677
678 def handle_BODY(self, message_spec=None):
679 if message_spec is None:
680 self.push_lit("222 3000237 <45223423@example.com>")
681 elif message_spec == "<45223423@example.com>":
682 self.push_lit("222 0 <45223423@example.com>")
683 elif message_spec == "3000234":
684 self.push_lit("222 3000234 <45223423@example.com>")
685 else:
686 self.push_lit("430 No Such Article Found")
687 return
688 self.push_lit(self.sample_body)
689 self.push_lit(".")
690
691
692class NNTPv2Handler(NNTPv1Handler):
693 """A handler for RFC 3977 (NNTP "v2")"""
694
695 def handle_CAPABILITIES(self):
696 self.push_lit("""\
697 101 Capability list:
Antoine Pitrouf80b3f72010-11-02 22:31:52 +0000698 VERSION 2 3
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000699 IMPLEMENTATION INN 2.5.1
700 AUTHINFO USER
701 HDR
702 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
703 OVER
704 POST
705 READER
706 .""")
707
708 def handle_OVER(self, message_spec=None):
709 return self.handle_XOVER(message_spec)
710
711
712class NNTPv1v2TestsMixin:
713
714 def setUp(self):
715 super().setUp()
716
717 def test_welcome(self):
718 self.assertEqual(self.server.welcome, self.handler.welcome)
719
720 def test_date(self):
721 resp, date = self.server.date()
722 self.assertEqual(resp, "111 20100914001155")
723 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
724
725 def test_quit(self):
726 self.assertFalse(self.sio.closed)
727 resp = self.server.quit()
728 self.assertEqual(resp, "205 Bye!")
729 self.assertTrue(self.sio.closed)
730
731 def test_help(self):
732 resp, help = self.server.help()
733 self.assertEqual(resp, "100 Legal commands")
734 self.assertEqual(help, [
735 ' authinfo user Name|pass Password|generic <prog> <args>',
736 ' date',
737 ' help',
738 'Report problems to <root@example.org>',
739 ])
740
741 def test_list(self):
742 resp, groups = self.server.list()
743 self.assertEqual(len(groups), 6)
744 g = groups[1]
745 self.assertEqual(g,
746 GroupInfo("comp.lang.python.announce", "0000001153",
747 "0000000993", "m"))
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000748 resp, groups = self.server.list("*distutils*")
749 self.assertEqual(len(groups), 2)
750 g = groups[0]
751 self.assertEqual(g,
752 GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
753 "0000000001", "m"))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000754
755 def test_stat(self):
756 resp, art_num, message_id = self.server.stat(3000234)
757 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
758 self.assertEqual(art_num, 3000234)
759 self.assertEqual(message_id, "<45223423@example.com>")
760 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
761 self.assertEqual(resp, "223 0 <45223423@example.com>")
762 self.assertEqual(art_num, 0)
763 self.assertEqual(message_id, "<45223423@example.com>")
764 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
765 self.server.stat("<non.existent.id>")
766 self.assertEqual(cm.exception.response, "430 No Such Article Found")
767 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
768 self.server.stat()
769 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
770
771 def test_next(self):
772 resp, art_num, message_id = self.server.next()
773 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
774 self.assertEqual(art_num, 3000237)
775 self.assertEqual(message_id, "<668929@example.org>")
776
777 def test_last(self):
778 resp, art_num, message_id = self.server.last()
779 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
780 self.assertEqual(art_num, 3000234)
781 self.assertEqual(message_id, "<45223423@example.com>")
782
783 def test_description(self):
784 desc = self.server.description("comp.lang.python")
785 self.assertEqual(desc, "The Python computer language.")
786 desc = self.server.description("comp.lang.pythonx")
787 self.assertEqual(desc, "")
788
789 def test_descriptions(self):
790 resp, groups = self.server.descriptions("comp.lang.python")
791 self.assertEqual(resp, '215 Descriptions in form "group description".')
792 self.assertEqual(groups, {
793 "comp.lang.python": "The Python computer language.",
794 })
795 resp, groups = self.server.descriptions("comp.lang.python*")
796 self.assertEqual(groups, {
797 "comp.lang.python": "The Python computer language.",
798 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
799 })
800 resp, groups = self.server.descriptions("comp.lang.pythonx")
801 self.assertEqual(groups, {})
802
803 def test_group(self):
804 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
805 self.assertTrue(resp.startswith("211 "), resp)
806 self.assertEqual(first, 761)
807 self.assertEqual(last, 1265)
808 self.assertEqual(count, 486)
809 self.assertEqual(group, "fr.comp.lang.python")
810 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
811 self.server.group("comp.lang.python.devel")
812 exc = cm.exception
813 self.assertTrue(exc.response.startswith("411 No such group"),
814 exc.response)
815
816 def test_newnews(self):
817 # NEWNEWS comp.lang.python [20]100913 082004
818 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
819 resp, ids = self.server.newnews("comp.lang.python", dt)
820 expected = (
821 "230 list of newsarticles (NNTP v{0}) "
822 "created after Mon Sep 13 08:20:04 2010 follows"
823 ).format(self.nntp_version)
824 self.assertEqual(resp, expected)
825 self.assertEqual(ids, [
826 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
827 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
828 ])
829 # NEWNEWS fr.comp.lang.python [20]100913 082004
830 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
831 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
832 self.assertEqual(resp, "230 An empty list of newsarticles follows")
833 self.assertEqual(ids, [])
834
835 def _check_article_body(self, lines):
836 self.assertEqual(len(lines), 4)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +0000837 self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000838 self.assertEqual(lines[-2], b"")
839 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
840 self.assertEqual(lines[-4], b"This is just a test article.")
841
842 def _check_article_head(self, lines):
843 self.assertEqual(len(lines), 4)
844 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
845 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
846
847 def _check_article_data(self, lines):
848 self.assertEqual(len(lines), 9)
849 self._check_article_head(lines[:4])
850 self._check_article_body(lines[-4:])
851 self.assertEqual(lines[4], b"")
852
853 def test_article(self):
854 # ARTICLE
855 resp, info = self.server.article()
856 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
857 art_num, message_id, lines = info
858 self.assertEqual(art_num, 3000237)
859 self.assertEqual(message_id, "<45223423@example.com>")
860 self._check_article_data(lines)
861 # ARTICLE num
862 resp, info = self.server.article(3000234)
863 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
864 art_num, message_id, lines = info
865 self.assertEqual(art_num, 3000234)
866 self.assertEqual(message_id, "<45223423@example.com>")
867 self._check_article_data(lines)
868 # ARTICLE id
869 resp, info = self.server.article("<45223423@example.com>")
870 self.assertEqual(resp, "220 0 <45223423@example.com>")
871 art_num, message_id, lines = info
872 self.assertEqual(art_num, 0)
873 self.assertEqual(message_id, "<45223423@example.com>")
874 self._check_article_data(lines)
875 # Non-existent id
876 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
877 self.server.article("<non-existent@example.com>")
878 self.assertEqual(cm.exception.response, "430 No Such Article Found")
879
880 def test_article_file(self):
881 # With a "file" argument
882 f = io.BytesIO()
883 resp, info = self.server.article(file=f)
884 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
885 art_num, message_id, lines = info
886 self.assertEqual(art_num, 3000237)
887 self.assertEqual(message_id, "<45223423@example.com>")
888 self.assertEqual(lines, [])
889 data = f.getvalue()
890 self.assertTrue(data.startswith(
891 b'From: "Demo User" <nobody@example.net>\r\n'
892 b'Subject: I am just a test article\r\n'
893 ), ascii(data))
894 self.assertTrue(data.endswith(
895 b'This is just a test article.\r\n'
896 b'.Here is a dot-starting line.\r\n'
897 b'\r\n'
898 b'-- Signed by Andr\xc3\xa9.\r\n'
899 ), ascii(data))
900
901 def test_head(self):
902 # HEAD
903 resp, info = self.server.head()
904 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
905 art_num, message_id, lines = info
906 self.assertEqual(art_num, 3000237)
907 self.assertEqual(message_id, "<45223423@example.com>")
908 self._check_article_head(lines)
909 # HEAD num
910 resp, info = self.server.head(3000234)
911 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
912 art_num, message_id, lines = info
913 self.assertEqual(art_num, 3000234)
914 self.assertEqual(message_id, "<45223423@example.com>")
915 self._check_article_head(lines)
916 # HEAD id
917 resp, info = self.server.head("<45223423@example.com>")
918 self.assertEqual(resp, "221 0 <45223423@example.com>")
919 art_num, message_id, lines = info
920 self.assertEqual(art_num, 0)
921 self.assertEqual(message_id, "<45223423@example.com>")
922 self._check_article_head(lines)
923 # Non-existent id
924 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
925 self.server.head("<non-existent@example.com>")
926 self.assertEqual(cm.exception.response, "430 No Such Article Found")
927
928 def test_body(self):
929 # BODY
930 resp, info = self.server.body()
931 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
932 art_num, message_id, lines = info
933 self.assertEqual(art_num, 3000237)
934 self.assertEqual(message_id, "<45223423@example.com>")
935 self._check_article_body(lines)
936 # BODY num
937 resp, info = self.server.body(3000234)
938 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
939 art_num, message_id, lines = info
940 self.assertEqual(art_num, 3000234)
941 self.assertEqual(message_id, "<45223423@example.com>")
942 self._check_article_body(lines)
943 # BODY id
944 resp, info = self.server.body("<45223423@example.com>")
945 self.assertEqual(resp, "222 0 <45223423@example.com>")
946 art_num, message_id, lines = info
947 self.assertEqual(art_num, 0)
948 self.assertEqual(message_id, "<45223423@example.com>")
949 self._check_article_body(lines)
950 # Non-existent id
951 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
952 self.server.body("<non-existent@example.com>")
953 self.assertEqual(cm.exception.response, "430 No Such Article Found")
954
955 def check_over_xover_resp(self, resp, overviews):
956 self.assertTrue(resp.startswith("224 "), resp)
957 self.assertEqual(len(overviews), 3)
958 art_num, over = overviews[0]
959 self.assertEqual(art_num, 57)
960 self.assertEqual(over, {
961 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
962 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
963 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
964 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
965 "references": "<hvalf7$ort$1@dough.gmane.org>",
966 ":bytes": "7103",
967 ":lines": "16",
968 "xref": "news.gmane.org gmane.comp.python.authors:57"
969 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000970 art_num, over = overviews[1]
971 self.assertEqual(over["xref"], None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000972 art_num, over = overviews[2]
973 self.assertEqual(over["subject"],
974 "Re: Message d'erreur incompréhensible (par moi)")
975
976 def test_xover(self):
977 resp, overviews = self.server.xover(57, 59)
978 self.check_over_xover_resp(resp, overviews)
979
980 def test_over(self):
981 # In NNTP "v1", this will fallback on XOVER
982 resp, overviews = self.server.over((57, 59))
983 self.check_over_xover_resp(resp, overviews)
984
985 sample_post = (
986 b'From: "Demo User" <nobody@example.net>\r\n'
987 b'Subject: I am just a test article\r\n'
988 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
989 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
990 b'\r\n'
991 b'This is just a test article.\r\n'
992 b'.Here is a dot-starting line.\r\n'
993 b'\r\n'
994 b'-- Signed by Andr\xc3\xa9.\r\n'
995 )
996
997 def _check_posted_body(self):
998 # Check the raw body as received by the server
999 lines = self.handler.posted_body
1000 # One additional line for the "." terminator
1001 self.assertEqual(len(lines), 10)
1002 self.assertEqual(lines[-1], b'.\r\n')
1003 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
1004 self.assertEqual(lines[-3], b'\r\n')
1005 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
1006 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
1007
1008 def _check_post_ihave_sub(self, func, *args, file_factory):
1009 # First the prepared post with CRLF endings
1010 post = self.sample_post
1011 func_args = args + (file_factory(post),)
1012 self.handler.posted_body = None
1013 resp = func(*func_args)
1014 self._check_posted_body()
1015 # Then the same post with "normal" line endings - they should be
1016 # converted by NNTP.post and NNTP.ihave.
1017 post = self.sample_post.replace(b"\r\n", b"\n")
1018 func_args = args + (file_factory(post),)
1019 self.handler.posted_body = None
1020 resp = func(*func_args)
1021 self._check_posted_body()
1022 return resp
1023
1024 def check_post_ihave(self, func, success_resp, *args):
1025 # With a bytes object
1026 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1027 self.assertEqual(resp, success_resp)
1028 # With a bytearray object
1029 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1030 self.assertEqual(resp, success_resp)
1031 # With a file object
1032 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1033 self.assertEqual(resp, success_resp)
1034 # With an iterable of terminated lines
1035 def iterlines(b):
1036 return iter(b.splitlines(True))
1037 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1038 self.assertEqual(resp, success_resp)
1039 # With an iterable of non-terminated lines
1040 def iterlines(b):
1041 return iter(b.splitlines(False))
1042 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1043 self.assertEqual(resp, success_resp)
1044
1045 def test_post(self):
1046 self.check_post_ihave(self.server.post, "240 Article received OK")
1047 self.handler.allow_posting = False
1048 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1049 self.server.post(self.sample_post)
1050 self.assertEqual(cm.exception.response,
1051 "440 Posting not permitted")
1052
1053 def test_ihave(self):
1054 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1055 "<i.am.an.article.you.will.want@example.com>")
1056 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1057 self.server.ihave("<another.message.id>", self.sample_post)
1058 self.assertEqual(cm.exception.response,
1059 "435 Article not wanted")
1060
1061
1062class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1063 """Tests an NNTP v1 server (no capabilities)."""
1064
1065 nntp_version = 1
1066 handler_class = NNTPv1Handler
1067
1068 def test_caps(self):
1069 caps = self.server.getcapabilities()
1070 self.assertEqual(caps, {})
1071 self.assertEqual(self.server.nntp_version, 1)
Antoine Pitroua0781152010-11-05 19:16:37 +00001072 self.assertEqual(self.server.nntp_implementation, None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001073
1074
1075class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1076 """Tests an NNTP v2 server (with capabilities)."""
1077
1078 nntp_version = 2
1079 handler_class = NNTPv2Handler
1080
1081 def test_caps(self):
1082 caps = self.server.getcapabilities()
1083 self.assertEqual(caps, {
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001084 'VERSION': ['2', '3'],
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001085 'IMPLEMENTATION': ['INN', '2.5.1'],
1086 'AUTHINFO': ['USER'],
1087 'HDR': [],
1088 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1089 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1090 'OVER': [],
1091 'POST': [],
1092 'READER': [],
1093 })
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001094 self.assertEqual(self.server.nntp_version, 3)
Antoine Pitroua0781152010-11-05 19:16:37 +00001095 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001096
1097
1098class MiscTests(unittest.TestCase):
1099
1100 def test_decode_header(self):
1101 def gives(a, b):
1102 self.assertEqual(nntplib.decode_header(a), b)
1103 gives("" , "")
1104 gives("a plain header", "a plain header")
1105 gives(" with extra spaces ", " with extra spaces ")
1106 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "DĂ©buter en Python")
1107 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1108 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1109 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1110 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1111 "Re: problème de matrice")
1112 # A natively utf-8 header (found in the real world!)
1113 gives("Re: Message d'erreur incompréhensible (par moi)",
1114 "Re: Message d'erreur incompréhensible (par moi)")
1115
1116 def test_parse_overview_fmt(self):
1117 # The minimal (default) response
1118 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1119 "References:", ":bytes", ":lines"]
1120 self.assertEqual(nntplib._parse_overview_fmt(lines),
1121 ["subject", "from", "date", "message-id", "references",
1122 ":bytes", ":lines"])
1123 # The minimal response using alternative names
1124 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1125 "References:", "Bytes:", "Lines:"]
1126 self.assertEqual(nntplib._parse_overview_fmt(lines),
1127 ["subject", "from", "date", "message-id", "references",
1128 ":bytes", ":lines"])
1129 # Variations in casing
1130 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1131 "References:", "BYTES:", "Lines:"]
1132 self.assertEqual(nntplib._parse_overview_fmt(lines),
1133 ["subject", "from", "date", "message-id", "references",
1134 ":bytes", ":lines"])
1135 # First example from RFC 3977
1136 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1137 "References:", ":bytes", ":lines", "Xref:full",
1138 "Distribution:full"]
1139 self.assertEqual(nntplib._parse_overview_fmt(lines),
1140 ["subject", "from", "date", "message-id", "references",
1141 ":bytes", ":lines", "xref", "distribution"])
1142 # Second example from RFC 3977
1143 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1144 "References:", "Bytes:", "Lines:", "Xref:FULL",
1145 "Distribution:FULL"]
1146 self.assertEqual(nntplib._parse_overview_fmt(lines),
1147 ["subject", "from", "date", "message-id", "references",
1148 ":bytes", ":lines", "xref", "distribution"])
1149 # A classic response from INN
1150 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1151 "References:", "Bytes:", "Lines:", "Xref:full"]
1152 self.assertEqual(nntplib._parse_overview_fmt(lines),
1153 ["subject", "from", "date", "message-id", "references",
1154 ":bytes", ":lines", "xref"])
1155
1156 def test_parse_overview(self):
1157 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1158 # First example from RFC 3977
1159 lines = [
1160 '3000234\tI am just a test article\t"Demo User" '
1161 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1162 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1163 '17\tXref: news.example.com misc.test:3000363',
1164 ]
1165 overview = nntplib._parse_overview(lines, fmt)
1166 (art_num, fields), = overview
1167 self.assertEqual(art_num, 3000234)
1168 self.assertEqual(fields, {
1169 'subject': 'I am just a test article',
1170 'from': '"Demo User" <nobody@example.com>',
1171 'date': '6 Oct 1998 04:38:40 -0500',
1172 'message-id': '<45223423@example.com>',
1173 'references': '<45454@example.net>',
1174 ':bytes': '1234',
1175 ':lines': '17',
1176 'xref': 'news.example.com misc.test:3000363',
1177 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001178 # Second example; here the "Xref" field is totally absent (including
1179 # the header name) and comes out as None
1180 lines = [
1181 '3000234\tI am just a test article\t"Demo User" '
1182 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1183 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1184 '17\t\t',
1185 ]
1186 overview = nntplib._parse_overview(lines, fmt)
1187 (art_num, fields), = overview
1188 self.assertEqual(fields['xref'], None)
1189 # Third example; the "Xref" is an empty string, while "references"
1190 # is a single space.
1191 lines = [
1192 '3000234\tI am just a test article\t"Demo User" '
1193 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1194 '<45223423@example.com>\t \t1234\t'
1195 '17\tXref: \t',
1196 ]
1197 overview = nntplib._parse_overview(lines, fmt)
1198 (art_num, fields), = overview
1199 self.assertEqual(fields['references'], ' ')
1200 self.assertEqual(fields['xref'], '')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001201
1202 def test_parse_datetime(self):
1203 def gives(a, b, *c):
1204 self.assertEqual(nntplib._parse_datetime(a, b),
1205 datetime.datetime(*c))
1206 # Output of DATE command
1207 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1208 # Variations
1209 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1210 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1211 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1212
1213 def test_unparse_datetime(self):
1214 # Test non-legacy mode
1215 # 1) with a datetime
1216 def gives(y, M, d, h, m, s, date_str, time_str):
1217 dt = datetime.datetime(y, M, d, h, m, s)
1218 self.assertEqual(nntplib._unparse_datetime(dt),
1219 (date_str, time_str))
1220 self.assertEqual(nntplib._unparse_datetime(dt, False),
1221 (date_str, time_str))
1222 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1223 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1224 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1225 # 2) with a date
1226 def gives(y, M, d, date_str, time_str):
1227 dt = datetime.date(y, M, d)
1228 self.assertEqual(nntplib._unparse_datetime(dt),
1229 (date_str, time_str))
1230 self.assertEqual(nntplib._unparse_datetime(dt, False),
1231 (date_str, time_str))
1232 gives(1999, 6, 23, "19990623", "000000")
1233 gives(2000, 6, 23, "20000623", "000000")
1234 gives(2010, 6, 5, "20100605", "000000")
1235
1236 def test_unparse_datetime_legacy(self):
1237 # Test legacy mode (RFC 977)
1238 # 1) with a datetime
1239 def gives(y, M, d, h, m, s, date_str, time_str):
1240 dt = datetime.datetime(y, M, d, h, m, s)
1241 self.assertEqual(nntplib._unparse_datetime(dt, True),
1242 (date_str, time_str))
1243 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1244 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1245 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1246 # 2) with a date
1247 def gives(y, M, d, date_str, time_str):
1248 dt = datetime.date(y, M, d)
1249 self.assertEqual(nntplib._unparse_datetime(dt, True),
1250 (date_str, time_str))
1251 gives(1999, 6, 23, "990623", "000000")
1252 gives(2000, 6, 23, "000623", "000000")
1253 gives(2010, 6, 5, "100605", "000000")
1254
1255
1256def test_main():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00001257 tests = [MiscTests, NNTPv1Tests, NNTPv2Tests, NetworkedNNTPTests]
1258 if _have_ssl:
1259 tests.append(NetworkedNNTP_SSLTests)
1260 support.run_unittest(*tests)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001261
1262
1263if __name__ == "__main__":
1264 test_main()