blob: 97179cf78a2d199084660dbfc33baed2b2e4a0d6 [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
2import datetime
3import textwrap
4import unittest
5import contextlib
6from test import support
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00007from nntplib import NNTP, GroupInfo, _have_ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +00008import nntplib
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00009if _have_ssl:
10 import ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +000011
12TIMEOUT = 30
13
14# TODO:
15# - test the `file` arg to more commands
16# - test error conditions
Antoine Pitroua5785b12010-09-29 16:19:50 +000017# - test auth and `usenetrc`
Antoine Pitrou69ab9512010-09-29 15:03:40 +000018
19
20class NetworkedNNTPTestsMixin:
21
22 def test_welcome(self):
23 welcome = self.server.getwelcome()
24 self.assertEqual(str, type(welcome))
25
26 def test_help(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000027 resp, lines = self.server.help()
Antoine Pitrou69ab9512010-09-29 15:03:40 +000028 self.assertTrue(resp.startswith("100 "), resp)
Antoine Pitrou08eeada2010-11-04 21:36:15 +000029 for line in lines:
Antoine Pitrou69ab9512010-09-29 15:03:40 +000030 self.assertEqual(str, type(line))
31
32 def test_list(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000033 resp, groups = self.server.list()
34 if len(groups) > 0:
35 self.assertEqual(GroupInfo, type(groups[0]))
36 self.assertEqual(str, type(groups[0].group))
37
38 def test_list_active(self):
39 resp, groups = self.server.list(self.GROUP_PAT)
40 if len(groups) > 0:
41 self.assertEqual(GroupInfo, type(groups[0]))
42 self.assertEqual(str, type(groups[0].group))
Antoine Pitrou69ab9512010-09-29 15:03:40 +000043
44 def test_unknown_command(self):
45 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
46 self.server._shortcmd("XYZZY")
47 resp = cm.exception.response
48 self.assertTrue(resp.startswith("500 "), resp)
49
50 def test_newgroups(self):
51 # gmane gets a constant influx of new groups. In order not to stress
52 # the server too much, we choose a recent date in the past.
53 dt = datetime.date.today() - datetime.timedelta(days=7)
54 resp, groups = self.server.newgroups(dt)
55 if len(groups) > 0:
56 self.assertIsInstance(groups[0], GroupInfo)
57 self.assertIsInstance(groups[0].group, str)
58
59 def test_description(self):
60 def _check_desc(desc):
61 # Sanity checks
62 self.assertIsInstance(desc, str)
63 self.assertNotIn(self.GROUP_NAME, desc)
64 desc = self.server.description(self.GROUP_NAME)
65 _check_desc(desc)
66 # Another sanity check
67 self.assertIn("Python", desc)
68 # With a pattern
69 desc = self.server.description(self.GROUP_PAT)
70 _check_desc(desc)
71 # Shouldn't exist
72 desc = self.server.description("zk.brrtt.baz")
73 self.assertEqual(desc, '')
74
75 def test_descriptions(self):
76 resp, descs = self.server.descriptions(self.GROUP_PAT)
77 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
78 self.assertTrue(
79 resp.startswith("215 ") or resp.startswith("282 "), resp)
80 self.assertIsInstance(descs, dict)
81 desc = descs[self.GROUP_NAME]
82 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
83
84 def test_group(self):
85 result = self.server.group(self.GROUP_NAME)
86 self.assertEqual(5, len(result))
87 resp, count, first, last, group = result
88 self.assertEqual(group, self.GROUP_NAME)
89 self.assertIsInstance(count, int)
90 self.assertIsInstance(first, int)
91 self.assertIsInstance(last, int)
92 self.assertLessEqual(first, last)
93 self.assertTrue(resp.startswith("211 "), resp)
94
95 def test_date(self):
96 resp, date = self.server.date()
97 self.assertIsInstance(date, datetime.datetime)
98 # Sanity check
99 self.assertGreaterEqual(date.year, 1995)
100 self.assertLessEqual(date.year, 2030)
101
102 def _check_art_dict(self, art_dict):
103 # Some sanity checks for a field dictionary returned by OVER / XOVER
104 self.assertIsInstance(art_dict, dict)
105 # NNTP has 7 mandatory fields
106 self.assertGreaterEqual(art_dict.keys(),
107 {"subject", "from", "date", "message-id",
108 "references", ":bytes", ":lines"}
109 )
110 for v in art_dict.values():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000111 self.assertIsInstance(v, (str, type(None)))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000112
113 def test_xover(self):
114 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000115 resp, lines = self.server.xover(last - 5, last)
116 if len(lines) == 0:
117 self.skipTest("no articles retrieved")
118 # The 'last' article is not necessarily part of the output (cancelled?)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000119 art_num, art_dict = lines[0]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000120 self.assertGreaterEqual(art_num, last - 5)
121 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000122 self._check_art_dict(art_dict)
123
124 def test_over(self):
125 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
126 start = last - 10
127 # The "start-" article range form
128 resp, lines = self.server.over((start, None))
129 art_num, art_dict = lines[0]
130 self._check_art_dict(art_dict)
131 # The "start-end" article range form
132 resp, lines = self.server.over((start, last))
133 art_num, art_dict = lines[-1]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000134 # The 'last' article is not necessarily part of the output (cancelled?)
135 self.assertGreaterEqual(art_num, start)
136 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000137 self._check_art_dict(art_dict)
138 # XXX The "message_id" form is unsupported by gmane
139 # 503 Overview by message-ID unsupported
140
141 def test_xhdr(self):
142 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
143 resp, lines = self.server.xhdr('subject', last)
144 for line in lines:
145 self.assertEqual(str, type(line[1]))
146
147 def check_article_resp(self, resp, article, art_num=None):
148 self.assertIsInstance(article, nntplib.ArticleInfo)
149 if art_num is not None:
150 self.assertEqual(article.number, art_num)
151 for line in article.lines:
152 self.assertIsInstance(line, bytes)
153 # XXX this could exceptionally happen...
154 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
155
156 def test_article_head_body(self):
157 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000158 # Try to find an available article
159 for art_num in (last, first, last - 1):
160 try:
161 resp, head = self.server.head(art_num)
162 except nntplib.NNTPTemporaryError as e:
163 if not e.response.startswith("423 "):
164 raise
165 # "423 No such article" => choose another one
166 continue
167 break
168 else:
169 self.skipTest("could not find a suitable article number")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000170 self.assertTrue(resp.startswith("221 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000171 self.check_article_resp(resp, head, art_num)
172 resp, body = self.server.body(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000173 self.assertTrue(resp.startswith("222 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000174 self.check_article_resp(resp, body, art_num)
175 resp, article = self.server.article(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000176 self.assertTrue(resp.startswith("220 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000177 self.check_article_resp(resp, article, art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000178 self.assertEqual(article.lines, head.lines + [b''] + body.lines)
179
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000180 def test_capabilities(self):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000181 # The server under test implements NNTP version 2 and has a
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000182 # couple of well-known capabilities. Just sanity check that we
183 # got them.
184 def _check_caps(caps):
185 caps_list = caps['LIST']
186 self.assertIsInstance(caps_list, (list, tuple))
187 self.assertIn('OVERVIEW.FMT', caps_list)
188 self.assertGreaterEqual(self.server.nntp_version, 2)
189 _check_caps(self.server.getcapabilities())
190 # This re-emits the command
191 resp, caps = self.server.capabilities()
192 _check_caps(caps)
193
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000194 if _have_ssl:
195 def test_starttls(self):
196 file = self.server.file
197 sock = self.server.sock
198 try:
199 self.server.starttls()
200 except nntplib.NNTPPermanentError:
201 self.skipTest("STARTTLS not supported by server.")
202 else:
203 # Check that the socket and internal pseudo-file really were
204 # changed.
205 self.assertNotEqual(file, self.server.file)
206 self.assertNotEqual(sock, self.server.sock)
207 # Check that the new socket really is an SSL one
208 self.assertIsInstance(self.server.sock, ssl.SSLSocket)
209 # Check that trying starttls when it's already active fails.
210 self.assertRaises(ValueError, self.server.starttls)
211
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000212 def test_zlogin(self):
213 # This test must be the penultimate because further commands will be
214 # refused.
215 baduser = "notarealuser"
216 badpw = "notarealpassword"
217 # Check that bogus credentials cause failure
218 self.assertRaises(nntplib.NNTPError, self.server.login,
219 user=baduser, password=badpw, usenetrc=False)
220 # FIXME: We should check that correct credentials succeed, but that
221 # would require valid details for some server somewhere to be in the
222 # test suite, I think. Gmane is anonymous, at least as used for the
223 # other tests.
224
225 def test_zzquit(self):
226 # This test must be called last, hence the name
227 cls = type(self)
228 self.server.quit()
229 cls.server = None
230
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000231
232class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
233 # This server supports STARTTLS (gmane doesn't)
234 NNTP_HOST = 'news.trigofacile.com'
235 GROUP_NAME = 'fr.comp.lang.python'
236 GROUP_PAT = 'fr.comp.lang.*'
237
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000238 @classmethod
239 def setUpClass(cls):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000240 support.requires("network")
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000241 with support.transient_internet(cls.NNTP_HOST):
242 cls.server = NNTP(cls.NNTP_HOST, timeout=TIMEOUT, usenetrc=False)
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000243
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000244 @classmethod
245 def tearDownClass(cls):
246 if cls.server is not None:
247 cls.server.quit()
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000248
249
250if _have_ssl:
251 class NetworkedNNTP_SSLTests(NetworkedNNTPTestsMixin, unittest.TestCase):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000252
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000253 # Technical limits for this public NNTP server (see http://www.aioe.org):
254 # "Only two concurrent connections per IP address are allowed and
255 # 400 connections per day are accepted from each IP address."
256
257 NNTP_HOST = 'nntp.aioe.org'
258 GROUP_NAME = 'comp.lang.python'
259 GROUP_PAT = 'comp.lang.*'
260
261 @classmethod
262 def setUpClass(cls):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000263 support.requires("network")
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000264 with support.transient_internet(cls.NNTP_HOST):
265 cls.server = nntplib.NNTP_SSL(cls.NNTP_HOST, timeout=TIMEOUT,
266 usenetrc=False)
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000267
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000268 @classmethod
269 def tearDownClass(cls):
270 if cls.server is not None:
271 cls.server.quit()
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000272
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000273 # Disabled as it produces too much data
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000274 test_list = None
275
276 # Disabled as the connection will already be encrypted.
277 test_starttls = None
278
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000279
280#
281# Non-networked tests using a local server (or something mocking it).
282#
283
284class _NNTPServerIO(io.RawIOBase):
285 """A raw IO object allowing NNTP commands to be received and processed
286 by a handler. The handler can push responses which can then be read
287 from the IO object."""
288
289 def __init__(self, handler):
290 io.RawIOBase.__init__(self)
291 # The channel from the client
292 self.c2s = io.BytesIO()
293 # The channel to the client
294 self.s2c = io.BytesIO()
295 self.handler = handler
296 self.handler.start(self.c2s.readline, self.push_data)
297
298 def readable(self):
299 return True
300
301 def writable(self):
302 return True
303
304 def push_data(self, data):
305 """Push (buffer) some data to send to the client."""
306 pos = self.s2c.tell()
307 self.s2c.seek(0, 2)
308 self.s2c.write(data)
309 self.s2c.seek(pos)
310
311 def write(self, b):
312 """The client sends us some data"""
313 pos = self.c2s.tell()
314 self.c2s.write(b)
315 self.c2s.seek(pos)
316 self.handler.process_pending()
317 return len(b)
318
319 def readinto(self, buf):
320 """The client wants to read a response"""
321 self.handler.process_pending()
322 b = self.s2c.read(len(buf))
323 n = len(b)
324 buf[:n] = b
325 return n
326
327
328class MockedNNTPTestsMixin:
329 # Override in derived classes
330 handler_class = None
331
332 def setUp(self):
333 super().setUp()
334 self.make_server()
335
336 def tearDown(self):
337 super().tearDown()
338 del self.server
339
340 def make_server(self, *args, **kwargs):
341 self.handler = self.handler_class()
342 self.sio = _NNTPServerIO(self.handler)
343 # Using BufferedRWPair instead of BufferedRandom ensures the file
344 # isn't seekable.
345 file = io.BufferedRWPair(self.sio, self.sio)
Antoine Pitroua5785b12010-09-29 16:19:50 +0000346 self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000347 return self.server
348
349
350class NNTPv1Handler:
351 """A handler for RFC 977"""
352
353 welcome = "200 NNTP mock server"
354
355 def start(self, readline, push_data):
356 self.in_body = False
357 self.allow_posting = True
358 self._readline = readline
359 self._push_data = push_data
360 # Our welcome
361 self.handle_welcome()
362
363 def _decode(self, data):
364 return str(data, "utf-8", "surrogateescape")
365
366 def process_pending(self):
367 if self.in_body:
368 while True:
369 line = self._readline()
370 if not line:
371 return
372 self.body.append(line)
373 if line == b".\r\n":
374 break
375 try:
376 meth, tokens = self.body_callback
377 meth(*tokens, body=self.body)
378 finally:
379 self.body_callback = None
380 self.body = None
381 self.in_body = False
382 while True:
383 line = self._decode(self._readline())
384 if not line:
385 return
386 if not line.endswith("\r\n"):
387 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
388 line = line[:-2]
389 cmd, *tokens = line.split()
390 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
391 meth = getattr(self, "handle_" + cmd.upper(), None)
392 if meth is None:
393 self.handle_unknown()
394 else:
395 try:
396 meth(*tokens)
397 except Exception as e:
398 raise ValueError("command failed: {!r}".format(line)) from e
399 else:
400 if self.in_body:
401 self.body_callback = meth, tokens
402 self.body = []
403
404 def expect_body(self):
405 """Flag that the client is expected to post a request body"""
406 self.in_body = True
407
408 def push_data(self, data):
409 """Push some binary data"""
410 self._push_data(data)
411
412 def push_lit(self, lit):
413 """Push a string literal"""
414 lit = textwrap.dedent(lit)
415 lit = "\r\n".join(lit.splitlines()) + "\r\n"
416 lit = lit.encode('utf-8')
417 self.push_data(lit)
418
419 def handle_unknown(self):
420 self.push_lit("500 What?")
421
422 def handle_welcome(self):
423 self.push_lit(self.welcome)
424
425 def handle_QUIT(self):
426 self.push_lit("205 Bye!")
427
428 def handle_DATE(self):
429 self.push_lit("111 20100914001155")
430
431 def handle_GROUP(self, group):
432 if group == "fr.comp.lang.python":
433 self.push_lit("211 486 761 1265 fr.comp.lang.python")
434 else:
435 self.push_lit("411 No such group {}".format(group))
436
437 def handle_HELP(self):
438 self.push_lit("""\
439 100 Legal commands
440 authinfo user Name|pass Password|generic <prog> <args>
441 date
442 help
443 Report problems to <root@example.org>
444 .""")
445
446 def handle_STAT(self, message_spec=None):
447 if message_spec is None:
448 self.push_lit("412 No newsgroup selected")
449 elif message_spec == "3000234":
450 self.push_lit("223 3000234 <45223423@example.com>")
451 elif message_spec == "<45223423@example.com>":
452 self.push_lit("223 0 <45223423@example.com>")
453 else:
454 self.push_lit("430 No Such Article Found")
455
456 def handle_NEXT(self):
457 self.push_lit("223 3000237 <668929@example.org> retrieved")
458
459 def handle_LAST(self):
460 self.push_lit("223 3000234 <45223423@example.com> retrieved")
461
462 def handle_LIST(self, action=None, param=None):
463 if action is None:
464 self.push_lit("""\
465 215 Newsgroups in form "group high low flags".
466 comp.lang.python 0000052340 0000002828 y
467 comp.lang.python.announce 0000001153 0000000993 m
468 free.it.comp.lang.python 0000000002 0000000002 y
469 fr.comp.lang.python 0000001254 0000000760 y
470 free.it.comp.lang.python.learner 0000000000 0000000001 y
471 tw.bbs.comp.lang.python 0000000304 0000000304 y
472 .""")
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000473 elif action == "ACTIVE":
474 if param == "*distutils*":
475 self.push_lit("""\
476 215 Newsgroups in form "group high low flags"
477 gmane.comp.python.distutils.devel 0000014104 0000000001 m
478 gmane.comp.python.distutils.cvs 0000000000 0000000001 m
479 .""")
480 else:
481 self.push_lit("""\
482 215 Newsgroups in form "group high low flags"
483 .""")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000484 elif action == "OVERVIEW.FMT":
485 self.push_lit("""\
486 215 Order of fields in overview database.
487 Subject:
488 From:
489 Date:
490 Message-ID:
491 References:
492 Bytes:
493 Lines:
494 Xref:full
495 .""")
496 elif action == "NEWSGROUPS":
497 assert param is not None
498 if param == "comp.lang.python":
499 self.push_lit("""\
500 215 Descriptions in form "group description".
501 comp.lang.python\tThe Python computer language.
502 .""")
503 elif param == "comp.lang.python*":
504 self.push_lit("""\
505 215 Descriptions in form "group description".
506 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
507 comp.lang.python\tThe Python computer language.
508 .""")
509 else:
510 self.push_lit("""\
511 215 Descriptions in form "group description".
512 .""")
513 else:
514 self.push_lit('501 Unknown LIST keyword')
515
516 def handle_NEWNEWS(self, group, date_str, time_str):
517 # We hard code different return messages depending on passed
518 # argument and date syntax.
519 if (group == "comp.lang.python" and date_str == "20100913"
520 and time_str == "082004"):
521 # Date was passed in RFC 3977 format (NNTP "v2")
522 self.push_lit("""\
523 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
524 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
525 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
526 .""")
527 elif (group == "comp.lang.python" and date_str == "100913"
528 and time_str == "082004"):
529 # Date was passed in RFC 977 format (NNTP "v1")
530 self.push_lit("""\
531 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
532 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
533 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
534 .""")
535 else:
536 self.push_lit("""\
537 230 An empty list of newsarticles follows
538 .""")
539 # (Note for experiments: many servers disable NEWNEWS.
540 # As of this writing, sicinfo3.epfl.ch doesn't.)
541
542 def handle_XOVER(self, message_spec):
543 if message_spec == "57-59":
544 self.push_lit(
545 "224 Overview information for 57-58 follows\n"
546 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
547 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
548 "\tSat, 19 Jun 2010 18:04:08 -0400"
549 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
550 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
551 "\tXref: news.gmane.org gmane.comp.python.authors:57"
552 "\n"
553 "58\tLooking for a few good bloggers"
554 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
555 "\tThu, 22 Jul 2010 09:14:14 -0400"
556 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
557 "\t\t6683\t16"
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000558 "\t"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000559 "\n"
560 # An UTF-8 overview line from fr.comp.lang.python
561 "59\tRe: Message d'erreur incompréhensible (par moi)"
562 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
563 "\tWed, 15 Sep 2010 18:09:15 +0200"
564 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
565 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
566 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
567 "\n"
568 ".\n")
569 else:
570 self.push_lit("""\
571 224 No articles
572 .""")
573
574 def handle_POST(self, *, body=None):
575 if body is None:
576 if self.allow_posting:
577 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
578 self.expect_body()
579 else:
580 self.push_lit("440 Posting not permitted")
581 else:
582 assert self.allow_posting
583 self.push_lit("240 Article received OK")
584 self.posted_body = body
585
586 def handle_IHAVE(self, message_id, *, body=None):
587 if body is None:
588 if (self.allow_posting and
589 message_id == "<i.am.an.article.you.will.want@example.com>"):
590 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
591 self.expect_body()
592 else:
593 self.push_lit("435 Article not wanted")
594 else:
595 assert self.allow_posting
596 self.push_lit("235 Article transferred OK")
597 self.posted_body = body
598
599 sample_head = """\
600 From: "Demo User" <nobody@example.net>
601 Subject: I am just a test article
602 Content-Type: text/plain; charset=UTF-8; format=flowed
603 Message-ID: <i.am.an.article.you.will.want@example.com>"""
604
605 sample_body = """\
606 This is just a test article.
607 ..Here is a dot-starting line.
608
609 -- Signed by Andr\xe9."""
610
611 sample_article = sample_head + "\n\n" + sample_body
612
613 def handle_ARTICLE(self, message_spec=None):
614 if message_spec is None:
615 self.push_lit("220 3000237 <45223423@example.com>")
616 elif message_spec == "<45223423@example.com>":
617 self.push_lit("220 0 <45223423@example.com>")
618 elif message_spec == "3000234":
619 self.push_lit("220 3000234 <45223423@example.com>")
620 else:
621 self.push_lit("430 No Such Article Found")
622 return
623 self.push_lit(self.sample_article)
624 self.push_lit(".")
625
626 def handle_HEAD(self, message_spec=None):
627 if message_spec is None:
628 self.push_lit("221 3000237 <45223423@example.com>")
629 elif message_spec == "<45223423@example.com>":
630 self.push_lit("221 0 <45223423@example.com>")
631 elif message_spec == "3000234":
632 self.push_lit("221 3000234 <45223423@example.com>")
633 else:
634 self.push_lit("430 No Such Article Found")
635 return
636 self.push_lit(self.sample_head)
637 self.push_lit(".")
638
639 def handle_BODY(self, message_spec=None):
640 if message_spec is None:
641 self.push_lit("222 3000237 <45223423@example.com>")
642 elif message_spec == "<45223423@example.com>":
643 self.push_lit("222 0 <45223423@example.com>")
644 elif message_spec == "3000234":
645 self.push_lit("222 3000234 <45223423@example.com>")
646 else:
647 self.push_lit("430 No Such Article Found")
648 return
649 self.push_lit(self.sample_body)
650 self.push_lit(".")
651
652
653class NNTPv2Handler(NNTPv1Handler):
654 """A handler for RFC 3977 (NNTP "v2")"""
655
656 def handle_CAPABILITIES(self):
657 self.push_lit("""\
658 101 Capability list:
Antoine Pitrouf80b3f72010-11-02 22:31:52 +0000659 VERSION 2 3
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000660 IMPLEMENTATION INN 2.5.1
661 AUTHINFO USER
662 HDR
663 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
664 OVER
665 POST
666 READER
667 .""")
668
669 def handle_OVER(self, message_spec=None):
670 return self.handle_XOVER(message_spec)
671
672
673class NNTPv1v2TestsMixin:
674
675 def setUp(self):
676 super().setUp()
677
678 def test_welcome(self):
679 self.assertEqual(self.server.welcome, self.handler.welcome)
680
681 def test_date(self):
682 resp, date = self.server.date()
683 self.assertEqual(resp, "111 20100914001155")
684 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
685
686 def test_quit(self):
687 self.assertFalse(self.sio.closed)
688 resp = self.server.quit()
689 self.assertEqual(resp, "205 Bye!")
690 self.assertTrue(self.sio.closed)
691
692 def test_help(self):
693 resp, help = self.server.help()
694 self.assertEqual(resp, "100 Legal commands")
695 self.assertEqual(help, [
696 ' authinfo user Name|pass Password|generic <prog> <args>',
697 ' date',
698 ' help',
699 'Report problems to <root@example.org>',
700 ])
701
702 def test_list(self):
703 resp, groups = self.server.list()
704 self.assertEqual(len(groups), 6)
705 g = groups[1]
706 self.assertEqual(g,
707 GroupInfo("comp.lang.python.announce", "0000001153",
708 "0000000993", "m"))
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000709 resp, groups = self.server.list("*distutils*")
710 self.assertEqual(len(groups), 2)
711 g = groups[0]
712 self.assertEqual(g,
713 GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
714 "0000000001", "m"))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000715
716 def test_stat(self):
717 resp, art_num, message_id = self.server.stat(3000234)
718 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
719 self.assertEqual(art_num, 3000234)
720 self.assertEqual(message_id, "<45223423@example.com>")
721 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
722 self.assertEqual(resp, "223 0 <45223423@example.com>")
723 self.assertEqual(art_num, 0)
724 self.assertEqual(message_id, "<45223423@example.com>")
725 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
726 self.server.stat("<non.existent.id>")
727 self.assertEqual(cm.exception.response, "430 No Such Article Found")
728 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
729 self.server.stat()
730 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
731
732 def test_next(self):
733 resp, art_num, message_id = self.server.next()
734 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
735 self.assertEqual(art_num, 3000237)
736 self.assertEqual(message_id, "<668929@example.org>")
737
738 def test_last(self):
739 resp, art_num, message_id = self.server.last()
740 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
741 self.assertEqual(art_num, 3000234)
742 self.assertEqual(message_id, "<45223423@example.com>")
743
744 def test_description(self):
745 desc = self.server.description("comp.lang.python")
746 self.assertEqual(desc, "The Python computer language.")
747 desc = self.server.description("comp.lang.pythonx")
748 self.assertEqual(desc, "")
749
750 def test_descriptions(self):
751 resp, groups = self.server.descriptions("comp.lang.python")
752 self.assertEqual(resp, '215 Descriptions in form "group description".')
753 self.assertEqual(groups, {
754 "comp.lang.python": "The Python computer language.",
755 })
756 resp, groups = self.server.descriptions("comp.lang.python*")
757 self.assertEqual(groups, {
758 "comp.lang.python": "The Python computer language.",
759 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
760 })
761 resp, groups = self.server.descriptions("comp.lang.pythonx")
762 self.assertEqual(groups, {})
763
764 def test_group(self):
765 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
766 self.assertTrue(resp.startswith("211 "), resp)
767 self.assertEqual(first, 761)
768 self.assertEqual(last, 1265)
769 self.assertEqual(count, 486)
770 self.assertEqual(group, "fr.comp.lang.python")
771 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
772 self.server.group("comp.lang.python.devel")
773 exc = cm.exception
774 self.assertTrue(exc.response.startswith("411 No such group"),
775 exc.response)
776
777 def test_newnews(self):
778 # NEWNEWS comp.lang.python [20]100913 082004
779 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
780 resp, ids = self.server.newnews("comp.lang.python", dt)
781 expected = (
782 "230 list of newsarticles (NNTP v{0}) "
783 "created after Mon Sep 13 08:20:04 2010 follows"
784 ).format(self.nntp_version)
785 self.assertEqual(resp, expected)
786 self.assertEqual(ids, [
787 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
788 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
789 ])
790 # NEWNEWS fr.comp.lang.python [20]100913 082004
791 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
792 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
793 self.assertEqual(resp, "230 An empty list of newsarticles follows")
794 self.assertEqual(ids, [])
795
796 def _check_article_body(self, lines):
797 self.assertEqual(len(lines), 4)
798 self.assertEqual(lines[-1].decode('utf8'), "-- Signed by André.")
799 self.assertEqual(lines[-2], b"")
800 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
801 self.assertEqual(lines[-4], b"This is just a test article.")
802
803 def _check_article_head(self, lines):
804 self.assertEqual(len(lines), 4)
805 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
806 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
807
808 def _check_article_data(self, lines):
809 self.assertEqual(len(lines), 9)
810 self._check_article_head(lines[:4])
811 self._check_article_body(lines[-4:])
812 self.assertEqual(lines[4], b"")
813
814 def test_article(self):
815 # ARTICLE
816 resp, info = self.server.article()
817 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
818 art_num, message_id, lines = info
819 self.assertEqual(art_num, 3000237)
820 self.assertEqual(message_id, "<45223423@example.com>")
821 self._check_article_data(lines)
822 # ARTICLE num
823 resp, info = self.server.article(3000234)
824 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
825 art_num, message_id, lines = info
826 self.assertEqual(art_num, 3000234)
827 self.assertEqual(message_id, "<45223423@example.com>")
828 self._check_article_data(lines)
829 # ARTICLE id
830 resp, info = self.server.article("<45223423@example.com>")
831 self.assertEqual(resp, "220 0 <45223423@example.com>")
832 art_num, message_id, lines = info
833 self.assertEqual(art_num, 0)
834 self.assertEqual(message_id, "<45223423@example.com>")
835 self._check_article_data(lines)
836 # Non-existent id
837 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
838 self.server.article("<non-existent@example.com>")
839 self.assertEqual(cm.exception.response, "430 No Such Article Found")
840
841 def test_article_file(self):
842 # With a "file" argument
843 f = io.BytesIO()
844 resp, info = self.server.article(file=f)
845 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
846 art_num, message_id, lines = info
847 self.assertEqual(art_num, 3000237)
848 self.assertEqual(message_id, "<45223423@example.com>")
849 self.assertEqual(lines, [])
850 data = f.getvalue()
851 self.assertTrue(data.startswith(
852 b'From: "Demo User" <nobody@example.net>\r\n'
853 b'Subject: I am just a test article\r\n'
854 ), ascii(data))
855 self.assertTrue(data.endswith(
856 b'This is just a test article.\r\n'
857 b'.Here is a dot-starting line.\r\n'
858 b'\r\n'
859 b'-- Signed by Andr\xc3\xa9.\r\n'
860 ), ascii(data))
861
862 def test_head(self):
863 # HEAD
864 resp, info = self.server.head()
865 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
866 art_num, message_id, lines = info
867 self.assertEqual(art_num, 3000237)
868 self.assertEqual(message_id, "<45223423@example.com>")
869 self._check_article_head(lines)
870 # HEAD num
871 resp, info = self.server.head(3000234)
872 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
873 art_num, message_id, lines = info
874 self.assertEqual(art_num, 3000234)
875 self.assertEqual(message_id, "<45223423@example.com>")
876 self._check_article_head(lines)
877 # HEAD id
878 resp, info = self.server.head("<45223423@example.com>")
879 self.assertEqual(resp, "221 0 <45223423@example.com>")
880 art_num, message_id, lines = info
881 self.assertEqual(art_num, 0)
882 self.assertEqual(message_id, "<45223423@example.com>")
883 self._check_article_head(lines)
884 # Non-existent id
885 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
886 self.server.head("<non-existent@example.com>")
887 self.assertEqual(cm.exception.response, "430 No Such Article Found")
888
889 def test_body(self):
890 # BODY
891 resp, info = self.server.body()
892 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
893 art_num, message_id, lines = info
894 self.assertEqual(art_num, 3000237)
895 self.assertEqual(message_id, "<45223423@example.com>")
896 self._check_article_body(lines)
897 # BODY num
898 resp, info = self.server.body(3000234)
899 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
900 art_num, message_id, lines = info
901 self.assertEqual(art_num, 3000234)
902 self.assertEqual(message_id, "<45223423@example.com>")
903 self._check_article_body(lines)
904 # BODY id
905 resp, info = self.server.body("<45223423@example.com>")
906 self.assertEqual(resp, "222 0 <45223423@example.com>")
907 art_num, message_id, lines = info
908 self.assertEqual(art_num, 0)
909 self.assertEqual(message_id, "<45223423@example.com>")
910 self._check_article_body(lines)
911 # Non-existent id
912 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
913 self.server.body("<non-existent@example.com>")
914 self.assertEqual(cm.exception.response, "430 No Such Article Found")
915
916 def check_over_xover_resp(self, resp, overviews):
917 self.assertTrue(resp.startswith("224 "), resp)
918 self.assertEqual(len(overviews), 3)
919 art_num, over = overviews[0]
920 self.assertEqual(art_num, 57)
921 self.assertEqual(over, {
922 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
923 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
924 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
925 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
926 "references": "<hvalf7$ort$1@dough.gmane.org>",
927 ":bytes": "7103",
928 ":lines": "16",
929 "xref": "news.gmane.org gmane.comp.python.authors:57"
930 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000931 art_num, over = overviews[1]
932 self.assertEqual(over["xref"], None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000933 art_num, over = overviews[2]
934 self.assertEqual(over["subject"],
935 "Re: Message d'erreur incompréhensible (par moi)")
936
937 def test_xover(self):
938 resp, overviews = self.server.xover(57, 59)
939 self.check_over_xover_resp(resp, overviews)
940
941 def test_over(self):
942 # In NNTP "v1", this will fallback on XOVER
943 resp, overviews = self.server.over((57, 59))
944 self.check_over_xover_resp(resp, overviews)
945
946 sample_post = (
947 b'From: "Demo User" <nobody@example.net>\r\n'
948 b'Subject: I am just a test article\r\n'
949 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
950 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
951 b'\r\n'
952 b'This is just a test article.\r\n'
953 b'.Here is a dot-starting line.\r\n'
954 b'\r\n'
955 b'-- Signed by Andr\xc3\xa9.\r\n'
956 )
957
958 def _check_posted_body(self):
959 # Check the raw body as received by the server
960 lines = self.handler.posted_body
961 # One additional line for the "." terminator
962 self.assertEqual(len(lines), 10)
963 self.assertEqual(lines[-1], b'.\r\n')
964 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
965 self.assertEqual(lines[-3], b'\r\n')
966 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
967 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
968
969 def _check_post_ihave_sub(self, func, *args, file_factory):
970 # First the prepared post with CRLF endings
971 post = self.sample_post
972 func_args = args + (file_factory(post),)
973 self.handler.posted_body = None
974 resp = func(*func_args)
975 self._check_posted_body()
976 # Then the same post with "normal" line endings - they should be
977 # converted by NNTP.post and NNTP.ihave.
978 post = self.sample_post.replace(b"\r\n", b"\n")
979 func_args = args + (file_factory(post),)
980 self.handler.posted_body = None
981 resp = func(*func_args)
982 self._check_posted_body()
983 return resp
984
985 def check_post_ihave(self, func, success_resp, *args):
986 # With a bytes object
987 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
988 self.assertEqual(resp, success_resp)
989 # With a bytearray object
990 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
991 self.assertEqual(resp, success_resp)
992 # With a file object
993 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
994 self.assertEqual(resp, success_resp)
995 # With an iterable of terminated lines
996 def iterlines(b):
997 return iter(b.splitlines(True))
998 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
999 self.assertEqual(resp, success_resp)
1000 # With an iterable of non-terminated lines
1001 def iterlines(b):
1002 return iter(b.splitlines(False))
1003 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1004 self.assertEqual(resp, success_resp)
1005
1006 def test_post(self):
1007 self.check_post_ihave(self.server.post, "240 Article received OK")
1008 self.handler.allow_posting = False
1009 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1010 self.server.post(self.sample_post)
1011 self.assertEqual(cm.exception.response,
1012 "440 Posting not permitted")
1013
1014 def test_ihave(self):
1015 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1016 "<i.am.an.article.you.will.want@example.com>")
1017 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1018 self.server.ihave("<another.message.id>", self.sample_post)
1019 self.assertEqual(cm.exception.response,
1020 "435 Article not wanted")
1021
1022
1023class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1024 """Tests an NNTP v1 server (no capabilities)."""
1025
1026 nntp_version = 1
1027 handler_class = NNTPv1Handler
1028
1029 def test_caps(self):
1030 caps = self.server.getcapabilities()
1031 self.assertEqual(caps, {})
1032 self.assertEqual(self.server.nntp_version, 1)
Antoine Pitroua0781152010-11-05 19:16:37 +00001033 self.assertEqual(self.server.nntp_implementation, None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001034
1035
1036class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1037 """Tests an NNTP v2 server (with capabilities)."""
1038
1039 nntp_version = 2
1040 handler_class = NNTPv2Handler
1041
1042 def test_caps(self):
1043 caps = self.server.getcapabilities()
1044 self.assertEqual(caps, {
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001045 'VERSION': ['2', '3'],
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001046 'IMPLEMENTATION': ['INN', '2.5.1'],
1047 'AUTHINFO': ['USER'],
1048 'HDR': [],
1049 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1050 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1051 'OVER': [],
1052 'POST': [],
1053 'READER': [],
1054 })
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001055 self.assertEqual(self.server.nntp_version, 3)
Antoine Pitroua0781152010-11-05 19:16:37 +00001056 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001057
1058
1059class MiscTests(unittest.TestCase):
1060
1061 def test_decode_header(self):
1062 def gives(a, b):
1063 self.assertEqual(nntplib.decode_header(a), b)
1064 gives("" , "")
1065 gives("a plain header", "a plain header")
1066 gives(" with extra spaces ", " with extra spaces ")
1067 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
1068 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1069 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1070 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1071 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1072 "Re: problème de matrice")
1073 # A natively utf-8 header (found in the real world!)
1074 gives("Re: Message d'erreur incompréhensible (par moi)",
1075 "Re: Message d'erreur incompréhensible (par moi)")
1076
1077 def test_parse_overview_fmt(self):
1078 # The minimal (default) response
1079 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1080 "References:", ":bytes", ":lines"]
1081 self.assertEqual(nntplib._parse_overview_fmt(lines),
1082 ["subject", "from", "date", "message-id", "references",
1083 ":bytes", ":lines"])
1084 # The minimal response using alternative names
1085 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1086 "References:", "Bytes:", "Lines:"]
1087 self.assertEqual(nntplib._parse_overview_fmt(lines),
1088 ["subject", "from", "date", "message-id", "references",
1089 ":bytes", ":lines"])
1090 # Variations in casing
1091 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1092 "References:", "BYTES:", "Lines:"]
1093 self.assertEqual(nntplib._parse_overview_fmt(lines),
1094 ["subject", "from", "date", "message-id", "references",
1095 ":bytes", ":lines"])
1096 # First example from RFC 3977
1097 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1098 "References:", ":bytes", ":lines", "Xref:full",
1099 "Distribution:full"]
1100 self.assertEqual(nntplib._parse_overview_fmt(lines),
1101 ["subject", "from", "date", "message-id", "references",
1102 ":bytes", ":lines", "xref", "distribution"])
1103 # Second example from RFC 3977
1104 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1105 "References:", "Bytes:", "Lines:", "Xref:FULL",
1106 "Distribution:FULL"]
1107 self.assertEqual(nntplib._parse_overview_fmt(lines),
1108 ["subject", "from", "date", "message-id", "references",
1109 ":bytes", ":lines", "xref", "distribution"])
1110 # A classic response from INN
1111 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1112 "References:", "Bytes:", "Lines:", "Xref:full"]
1113 self.assertEqual(nntplib._parse_overview_fmt(lines),
1114 ["subject", "from", "date", "message-id", "references",
1115 ":bytes", ":lines", "xref"])
1116
1117 def test_parse_overview(self):
1118 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1119 # First example from RFC 3977
1120 lines = [
1121 '3000234\tI am just a test article\t"Demo User" '
1122 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1123 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1124 '17\tXref: news.example.com misc.test:3000363',
1125 ]
1126 overview = nntplib._parse_overview(lines, fmt)
1127 (art_num, fields), = overview
1128 self.assertEqual(art_num, 3000234)
1129 self.assertEqual(fields, {
1130 'subject': 'I am just a test article',
1131 'from': '"Demo User" <nobody@example.com>',
1132 'date': '6 Oct 1998 04:38:40 -0500',
1133 'message-id': '<45223423@example.com>',
1134 'references': '<45454@example.net>',
1135 ':bytes': '1234',
1136 ':lines': '17',
1137 'xref': 'news.example.com misc.test:3000363',
1138 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001139 # Second example; here the "Xref" field is totally absent (including
1140 # the header name) and comes out as None
1141 lines = [
1142 '3000234\tI am just a test article\t"Demo User" '
1143 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1144 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1145 '17\t\t',
1146 ]
1147 overview = nntplib._parse_overview(lines, fmt)
1148 (art_num, fields), = overview
1149 self.assertEqual(fields['xref'], None)
1150 # Third example; the "Xref" is an empty string, while "references"
1151 # is a single space.
1152 lines = [
1153 '3000234\tI am just a test article\t"Demo User" '
1154 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1155 '<45223423@example.com>\t \t1234\t'
1156 '17\tXref: \t',
1157 ]
1158 overview = nntplib._parse_overview(lines, fmt)
1159 (art_num, fields), = overview
1160 self.assertEqual(fields['references'], ' ')
1161 self.assertEqual(fields['xref'], '')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001162
1163 def test_parse_datetime(self):
1164 def gives(a, b, *c):
1165 self.assertEqual(nntplib._parse_datetime(a, b),
1166 datetime.datetime(*c))
1167 # Output of DATE command
1168 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1169 # Variations
1170 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1171 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1172 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1173
1174 def test_unparse_datetime(self):
1175 # Test non-legacy mode
1176 # 1) with a datetime
1177 def gives(y, M, d, h, m, s, date_str, time_str):
1178 dt = datetime.datetime(y, M, d, h, m, s)
1179 self.assertEqual(nntplib._unparse_datetime(dt),
1180 (date_str, time_str))
1181 self.assertEqual(nntplib._unparse_datetime(dt, False),
1182 (date_str, time_str))
1183 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1184 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1185 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1186 # 2) with a date
1187 def gives(y, M, d, date_str, time_str):
1188 dt = datetime.date(y, M, d)
1189 self.assertEqual(nntplib._unparse_datetime(dt),
1190 (date_str, time_str))
1191 self.assertEqual(nntplib._unparse_datetime(dt, False),
1192 (date_str, time_str))
1193 gives(1999, 6, 23, "19990623", "000000")
1194 gives(2000, 6, 23, "20000623", "000000")
1195 gives(2010, 6, 5, "20100605", "000000")
1196
1197 def test_unparse_datetime_legacy(self):
1198 # Test legacy mode (RFC 977)
1199 # 1) with a datetime
1200 def gives(y, M, d, h, m, s, date_str, time_str):
1201 dt = datetime.datetime(y, M, d, h, m, s)
1202 self.assertEqual(nntplib._unparse_datetime(dt, True),
1203 (date_str, time_str))
1204 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1205 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1206 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1207 # 2) with a date
1208 def gives(y, M, d, date_str, time_str):
1209 dt = datetime.date(y, M, d)
1210 self.assertEqual(nntplib._unparse_datetime(dt, True),
1211 (date_str, time_str))
1212 gives(1999, 6, 23, "990623", "000000")
1213 gives(2000, 6, 23, "000623", "000000")
1214 gives(2010, 6, 5, "100605", "000000")
1215
1216
1217def test_main():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00001218 tests = [MiscTests, NNTPv1Tests, NNTPv2Tests, NetworkedNNTPTests]
1219 if _have_ssl:
1220 tests.append(NetworkedNNTP_SSLTests)
1221 support.run_unittest(*tests)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001222
1223
1224if __name__ == "__main__":
1225 test_main()