blob: e97b9ccbcd9f8b2c1e88a4d358acd55f71bedd5e [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
2import datetime
3import textwrap
4import unittest
5import contextlib
6from test import support
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00007from nntplib import NNTP, GroupInfo, _have_ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +00008import nntplib
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00009if _have_ssl:
10 import ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +000011
12TIMEOUT = 30
13
14# TODO:
15# - test the `file` arg to more commands
16# - test error conditions
Antoine Pitroua5785b12010-09-29 16:19:50 +000017# - test auth and `usenetrc`
Antoine Pitrou69ab9512010-09-29 15:03:40 +000018
19
20class NetworkedNNTPTestsMixin:
21
22 def test_welcome(self):
23 welcome = self.server.getwelcome()
24 self.assertEqual(str, type(welcome))
25
26 def test_help(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000027 resp, lines = self.server.help()
Antoine Pitrou69ab9512010-09-29 15:03:40 +000028 self.assertTrue(resp.startswith("100 "), resp)
Antoine Pitrou08eeada2010-11-04 21:36:15 +000029 for line in lines:
Antoine Pitrou69ab9512010-09-29 15:03:40 +000030 self.assertEqual(str, type(line))
31
32 def test_list(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000033 resp, groups = self.server.list()
34 if len(groups) > 0:
35 self.assertEqual(GroupInfo, type(groups[0]))
36 self.assertEqual(str, type(groups[0].group))
37
38 def test_list_active(self):
39 resp, groups = self.server.list(self.GROUP_PAT)
40 if len(groups) > 0:
41 self.assertEqual(GroupInfo, type(groups[0]))
42 self.assertEqual(str, type(groups[0].group))
Antoine Pitrou69ab9512010-09-29 15:03:40 +000043
44 def test_unknown_command(self):
45 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
46 self.server._shortcmd("XYZZY")
47 resp = cm.exception.response
48 self.assertTrue(resp.startswith("500 "), resp)
49
50 def test_newgroups(self):
51 # gmane gets a constant influx of new groups. In order not to stress
52 # the server too much, we choose a recent date in the past.
53 dt = datetime.date.today() - datetime.timedelta(days=7)
54 resp, groups = self.server.newgroups(dt)
55 if len(groups) > 0:
56 self.assertIsInstance(groups[0], GroupInfo)
57 self.assertIsInstance(groups[0].group, str)
58
59 def test_description(self):
60 def _check_desc(desc):
61 # Sanity checks
62 self.assertIsInstance(desc, str)
63 self.assertNotIn(self.GROUP_NAME, desc)
64 desc = self.server.description(self.GROUP_NAME)
65 _check_desc(desc)
66 # Another sanity check
67 self.assertIn("Python", desc)
68 # With a pattern
69 desc = self.server.description(self.GROUP_PAT)
70 _check_desc(desc)
71 # Shouldn't exist
72 desc = self.server.description("zk.brrtt.baz")
73 self.assertEqual(desc, '')
74
75 def test_descriptions(self):
76 resp, descs = self.server.descriptions(self.GROUP_PAT)
77 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
78 self.assertTrue(
79 resp.startswith("215 ") or resp.startswith("282 "), resp)
80 self.assertIsInstance(descs, dict)
81 desc = descs[self.GROUP_NAME]
82 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
83
84 def test_group(self):
85 result = self.server.group(self.GROUP_NAME)
86 self.assertEqual(5, len(result))
87 resp, count, first, last, group = result
88 self.assertEqual(group, self.GROUP_NAME)
89 self.assertIsInstance(count, int)
90 self.assertIsInstance(first, int)
91 self.assertIsInstance(last, int)
92 self.assertLessEqual(first, last)
93 self.assertTrue(resp.startswith("211 "), resp)
94
95 def test_date(self):
96 resp, date = self.server.date()
97 self.assertIsInstance(date, datetime.datetime)
98 # Sanity check
99 self.assertGreaterEqual(date.year, 1995)
100 self.assertLessEqual(date.year, 2030)
101
102 def _check_art_dict(self, art_dict):
103 # Some sanity checks for a field dictionary returned by OVER / XOVER
104 self.assertIsInstance(art_dict, dict)
105 # NNTP has 7 mandatory fields
106 self.assertGreaterEqual(art_dict.keys(),
107 {"subject", "from", "date", "message-id",
108 "references", ":bytes", ":lines"}
109 )
110 for v in art_dict.values():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000111 self.assertIsInstance(v, (str, type(None)))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000112
113 def test_xover(self):
114 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
115 resp, lines = self.server.xover(last, last)
116 art_num, art_dict = lines[0]
117 self.assertEqual(art_num, last)
118 self._check_art_dict(art_dict)
119
120 def test_over(self):
121 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
122 start = last - 10
123 # The "start-" article range form
124 resp, lines = self.server.over((start, None))
125 art_num, art_dict = lines[0]
126 self._check_art_dict(art_dict)
127 # The "start-end" article range form
128 resp, lines = self.server.over((start, last))
129 art_num, art_dict = lines[-1]
130 self.assertEqual(art_num, last)
131 self._check_art_dict(art_dict)
132 # XXX The "message_id" form is unsupported by gmane
133 # 503 Overview by message-ID unsupported
134
135 def test_xhdr(self):
136 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
137 resp, lines = self.server.xhdr('subject', last)
138 for line in lines:
139 self.assertEqual(str, type(line[1]))
140
141 def check_article_resp(self, resp, article, art_num=None):
142 self.assertIsInstance(article, nntplib.ArticleInfo)
143 if art_num is not None:
144 self.assertEqual(article.number, art_num)
145 for line in article.lines:
146 self.assertIsInstance(line, bytes)
147 # XXX this could exceptionally happen...
148 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
149
150 def test_article_head_body(self):
151 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
152 resp, head = self.server.head(last)
153 self.assertTrue(resp.startswith("221 "), resp)
154 self.check_article_resp(resp, head, last)
155 resp, body = self.server.body(last)
156 self.assertTrue(resp.startswith("222 "), resp)
157 self.check_article_resp(resp, body, last)
158 resp, article = self.server.article(last)
159 self.assertTrue(resp.startswith("220 "), resp)
160 self.check_article_resp(resp, article, last)
161 self.assertEqual(article.lines, head.lines + [b''] + body.lines)
162
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000163 def test_capabilities(self):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000164 # The server under test implements NNTP version 2 and has a
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000165 # couple of well-known capabilities. Just sanity check that we
166 # got them.
167 def _check_caps(caps):
168 caps_list = caps['LIST']
169 self.assertIsInstance(caps_list, (list, tuple))
170 self.assertIn('OVERVIEW.FMT', caps_list)
171 self.assertGreaterEqual(self.server.nntp_version, 2)
172 _check_caps(self.server.getcapabilities())
173 # This re-emits the command
174 resp, caps = self.server.capabilities()
175 _check_caps(caps)
176
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000177 if _have_ssl:
178 def test_starttls(self):
179 file = self.server.file
180 sock = self.server.sock
181 try:
182 self.server.starttls()
183 except nntplib.NNTPPermanentError:
184 self.skipTest("STARTTLS not supported by server.")
185 else:
186 # Check that the socket and internal pseudo-file really were
187 # changed.
188 self.assertNotEqual(file, self.server.file)
189 self.assertNotEqual(sock, self.server.sock)
190 # Check that the new socket really is an SSL one
191 self.assertIsInstance(self.server.sock, ssl.SSLSocket)
192 # Check that trying starttls when it's already active fails.
193 self.assertRaises(ValueError, self.server.starttls)
194
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000195 def test_zlogin(self):
196 # This test must be the penultimate because further commands will be
197 # refused.
198 baduser = "notarealuser"
199 badpw = "notarealpassword"
200 # Check that bogus credentials cause failure
201 self.assertRaises(nntplib.NNTPError, self.server.login,
202 user=baduser, password=badpw, usenetrc=False)
203 # FIXME: We should check that correct credentials succeed, but that
204 # would require valid details for some server somewhere to be in the
205 # test suite, I think. Gmane is anonymous, at least as used for the
206 # other tests.
207
208 def test_zzquit(self):
209 # This test must be called last, hence the name
210 cls = type(self)
211 self.server.quit()
212 cls.server = None
213
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000214
215class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
216 # This server supports STARTTLS (gmane doesn't)
217 NNTP_HOST = 'news.trigofacile.com'
218 GROUP_NAME = 'fr.comp.lang.python'
219 GROUP_PAT = 'fr.comp.lang.*'
220
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000221 @classmethod
222 def setUpClass(cls):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000223 support.requires("network")
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000224 with support.transient_internet(cls.NNTP_HOST):
225 cls.server = NNTP(cls.NNTP_HOST, timeout=TIMEOUT, usenetrc=False)
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000226
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000227 @classmethod
228 def tearDownClass(cls):
229 if cls.server is not None:
230 cls.server.quit()
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000231
232
233if _have_ssl:
234 class NetworkedNNTP_SSLTests(NetworkedNNTPTestsMixin, unittest.TestCase):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000235
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000236 # Technical limits for this public NNTP server (see http://www.aioe.org):
237 # "Only two concurrent connections per IP address are allowed and
238 # 400 connections per day are accepted from each IP address."
239
240 NNTP_HOST = 'nntp.aioe.org'
241 GROUP_NAME = 'comp.lang.python'
242 GROUP_PAT = 'comp.lang.*'
243
244 @classmethod
245 def setUpClass(cls):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000246 support.requires("network")
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000247 with support.transient_internet(cls.NNTP_HOST):
248 cls.server = nntplib.NNTP_SSL(cls.NNTP_HOST, timeout=TIMEOUT,
249 usenetrc=False)
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000250
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000251 @classmethod
252 def tearDownClass(cls):
253 if cls.server is not None:
254 cls.server.quit()
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000255
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000256 # Disabled as it produces too much data
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000257 test_list = None
258
259 # Disabled as the connection will already be encrypted.
260 test_starttls = None
261
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000262
263#
264# Non-networked tests using a local server (or something mocking it).
265#
266
267class _NNTPServerIO(io.RawIOBase):
268 """A raw IO object allowing NNTP commands to be received and processed
269 by a handler. The handler can push responses which can then be read
270 from the IO object."""
271
272 def __init__(self, handler):
273 io.RawIOBase.__init__(self)
274 # The channel from the client
275 self.c2s = io.BytesIO()
276 # The channel to the client
277 self.s2c = io.BytesIO()
278 self.handler = handler
279 self.handler.start(self.c2s.readline, self.push_data)
280
281 def readable(self):
282 return True
283
284 def writable(self):
285 return True
286
287 def push_data(self, data):
288 """Push (buffer) some data to send to the client."""
289 pos = self.s2c.tell()
290 self.s2c.seek(0, 2)
291 self.s2c.write(data)
292 self.s2c.seek(pos)
293
294 def write(self, b):
295 """The client sends us some data"""
296 pos = self.c2s.tell()
297 self.c2s.write(b)
298 self.c2s.seek(pos)
299 self.handler.process_pending()
300 return len(b)
301
302 def readinto(self, buf):
303 """The client wants to read a response"""
304 self.handler.process_pending()
305 b = self.s2c.read(len(buf))
306 n = len(b)
307 buf[:n] = b
308 return n
309
310
311class MockedNNTPTestsMixin:
312 # Override in derived classes
313 handler_class = None
314
315 def setUp(self):
316 super().setUp()
317 self.make_server()
318
319 def tearDown(self):
320 super().tearDown()
321 del self.server
322
323 def make_server(self, *args, **kwargs):
324 self.handler = self.handler_class()
325 self.sio = _NNTPServerIO(self.handler)
326 # Using BufferedRWPair instead of BufferedRandom ensures the file
327 # isn't seekable.
328 file = io.BufferedRWPair(self.sio, self.sio)
Antoine Pitroua5785b12010-09-29 16:19:50 +0000329 self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000330 return self.server
331
332
333class NNTPv1Handler:
334 """A handler for RFC 977"""
335
336 welcome = "200 NNTP mock server"
337
338 def start(self, readline, push_data):
339 self.in_body = False
340 self.allow_posting = True
341 self._readline = readline
342 self._push_data = push_data
343 # Our welcome
344 self.handle_welcome()
345
346 def _decode(self, data):
347 return str(data, "utf-8", "surrogateescape")
348
349 def process_pending(self):
350 if self.in_body:
351 while True:
352 line = self._readline()
353 if not line:
354 return
355 self.body.append(line)
356 if line == b".\r\n":
357 break
358 try:
359 meth, tokens = self.body_callback
360 meth(*tokens, body=self.body)
361 finally:
362 self.body_callback = None
363 self.body = None
364 self.in_body = False
365 while True:
366 line = self._decode(self._readline())
367 if not line:
368 return
369 if not line.endswith("\r\n"):
370 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
371 line = line[:-2]
372 cmd, *tokens = line.split()
373 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
374 meth = getattr(self, "handle_" + cmd.upper(), None)
375 if meth is None:
376 self.handle_unknown()
377 else:
378 try:
379 meth(*tokens)
380 except Exception as e:
381 raise ValueError("command failed: {!r}".format(line)) from e
382 else:
383 if self.in_body:
384 self.body_callback = meth, tokens
385 self.body = []
386
387 def expect_body(self):
388 """Flag that the client is expected to post a request body"""
389 self.in_body = True
390
391 def push_data(self, data):
392 """Push some binary data"""
393 self._push_data(data)
394
395 def push_lit(self, lit):
396 """Push a string literal"""
397 lit = textwrap.dedent(lit)
398 lit = "\r\n".join(lit.splitlines()) + "\r\n"
399 lit = lit.encode('utf-8')
400 self.push_data(lit)
401
402 def handle_unknown(self):
403 self.push_lit("500 What?")
404
405 def handle_welcome(self):
406 self.push_lit(self.welcome)
407
408 def handle_QUIT(self):
409 self.push_lit("205 Bye!")
410
411 def handle_DATE(self):
412 self.push_lit("111 20100914001155")
413
414 def handle_GROUP(self, group):
415 if group == "fr.comp.lang.python":
416 self.push_lit("211 486 761 1265 fr.comp.lang.python")
417 else:
418 self.push_lit("411 No such group {}".format(group))
419
420 def handle_HELP(self):
421 self.push_lit("""\
422 100 Legal commands
423 authinfo user Name|pass Password|generic <prog> <args>
424 date
425 help
426 Report problems to <root@example.org>
427 .""")
428
429 def handle_STAT(self, message_spec=None):
430 if message_spec is None:
431 self.push_lit("412 No newsgroup selected")
432 elif message_spec == "3000234":
433 self.push_lit("223 3000234 <45223423@example.com>")
434 elif message_spec == "<45223423@example.com>":
435 self.push_lit("223 0 <45223423@example.com>")
436 else:
437 self.push_lit("430 No Such Article Found")
438
439 def handle_NEXT(self):
440 self.push_lit("223 3000237 <668929@example.org> retrieved")
441
442 def handle_LAST(self):
443 self.push_lit("223 3000234 <45223423@example.com> retrieved")
444
445 def handle_LIST(self, action=None, param=None):
446 if action is None:
447 self.push_lit("""\
448 215 Newsgroups in form "group high low flags".
449 comp.lang.python 0000052340 0000002828 y
450 comp.lang.python.announce 0000001153 0000000993 m
451 free.it.comp.lang.python 0000000002 0000000002 y
452 fr.comp.lang.python 0000001254 0000000760 y
453 free.it.comp.lang.python.learner 0000000000 0000000001 y
454 tw.bbs.comp.lang.python 0000000304 0000000304 y
455 .""")
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000456 elif action == "ACTIVE":
457 if param == "*distutils*":
458 self.push_lit("""\
459 215 Newsgroups in form "group high low flags"
460 gmane.comp.python.distutils.devel 0000014104 0000000001 m
461 gmane.comp.python.distutils.cvs 0000000000 0000000001 m
462 .""")
463 else:
464 self.push_lit("""\
465 215 Newsgroups in form "group high low flags"
466 .""")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000467 elif action == "OVERVIEW.FMT":
468 self.push_lit("""\
469 215 Order of fields in overview database.
470 Subject:
471 From:
472 Date:
473 Message-ID:
474 References:
475 Bytes:
476 Lines:
477 Xref:full
478 .""")
479 elif action == "NEWSGROUPS":
480 assert param is not None
481 if param == "comp.lang.python":
482 self.push_lit("""\
483 215 Descriptions in form "group description".
484 comp.lang.python\tThe Python computer language.
485 .""")
486 elif param == "comp.lang.python*":
487 self.push_lit("""\
488 215 Descriptions in form "group description".
489 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
490 comp.lang.python\tThe Python computer language.
491 .""")
492 else:
493 self.push_lit("""\
494 215 Descriptions in form "group description".
495 .""")
496 else:
497 self.push_lit('501 Unknown LIST keyword')
498
499 def handle_NEWNEWS(self, group, date_str, time_str):
500 # We hard code different return messages depending on passed
501 # argument and date syntax.
502 if (group == "comp.lang.python" and date_str == "20100913"
503 and time_str == "082004"):
504 # Date was passed in RFC 3977 format (NNTP "v2")
505 self.push_lit("""\
506 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
507 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
508 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
509 .""")
510 elif (group == "comp.lang.python" and date_str == "100913"
511 and time_str == "082004"):
512 # Date was passed in RFC 977 format (NNTP "v1")
513 self.push_lit("""\
514 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
515 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
516 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
517 .""")
518 else:
519 self.push_lit("""\
520 230 An empty list of newsarticles follows
521 .""")
522 # (Note for experiments: many servers disable NEWNEWS.
523 # As of this writing, sicinfo3.epfl.ch doesn't.)
524
525 def handle_XOVER(self, message_spec):
526 if message_spec == "57-59":
527 self.push_lit(
528 "224 Overview information for 57-58 follows\n"
529 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
530 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
531 "\tSat, 19 Jun 2010 18:04:08 -0400"
532 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
533 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
534 "\tXref: news.gmane.org gmane.comp.python.authors:57"
535 "\n"
536 "58\tLooking for a few good bloggers"
537 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
538 "\tThu, 22 Jul 2010 09:14:14 -0400"
539 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
540 "\t\t6683\t16"
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000541 "\t"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000542 "\n"
543 # An UTF-8 overview line from fr.comp.lang.python
544 "59\tRe: Message d'erreur incompréhensible (par moi)"
545 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
546 "\tWed, 15 Sep 2010 18:09:15 +0200"
547 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
548 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
549 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
550 "\n"
551 ".\n")
552 else:
553 self.push_lit("""\
554 224 No articles
555 .""")
556
557 def handle_POST(self, *, body=None):
558 if body is None:
559 if self.allow_posting:
560 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
561 self.expect_body()
562 else:
563 self.push_lit("440 Posting not permitted")
564 else:
565 assert self.allow_posting
566 self.push_lit("240 Article received OK")
567 self.posted_body = body
568
569 def handle_IHAVE(self, message_id, *, body=None):
570 if body is None:
571 if (self.allow_posting and
572 message_id == "<i.am.an.article.you.will.want@example.com>"):
573 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
574 self.expect_body()
575 else:
576 self.push_lit("435 Article not wanted")
577 else:
578 assert self.allow_posting
579 self.push_lit("235 Article transferred OK")
580 self.posted_body = body
581
582 sample_head = """\
583 From: "Demo User" <nobody@example.net>
584 Subject: I am just a test article
585 Content-Type: text/plain; charset=UTF-8; format=flowed
586 Message-ID: <i.am.an.article.you.will.want@example.com>"""
587
588 sample_body = """\
589 This is just a test article.
590 ..Here is a dot-starting line.
591
592 -- Signed by Andr\xe9."""
593
594 sample_article = sample_head + "\n\n" + sample_body
595
596 def handle_ARTICLE(self, message_spec=None):
597 if message_spec is None:
598 self.push_lit("220 3000237 <45223423@example.com>")
599 elif message_spec == "<45223423@example.com>":
600 self.push_lit("220 0 <45223423@example.com>")
601 elif message_spec == "3000234":
602 self.push_lit("220 3000234 <45223423@example.com>")
603 else:
604 self.push_lit("430 No Such Article Found")
605 return
606 self.push_lit(self.sample_article)
607 self.push_lit(".")
608
609 def handle_HEAD(self, message_spec=None):
610 if message_spec is None:
611 self.push_lit("221 3000237 <45223423@example.com>")
612 elif message_spec == "<45223423@example.com>":
613 self.push_lit("221 0 <45223423@example.com>")
614 elif message_spec == "3000234":
615 self.push_lit("221 3000234 <45223423@example.com>")
616 else:
617 self.push_lit("430 No Such Article Found")
618 return
619 self.push_lit(self.sample_head)
620 self.push_lit(".")
621
622 def handle_BODY(self, message_spec=None):
623 if message_spec is None:
624 self.push_lit("222 3000237 <45223423@example.com>")
625 elif message_spec == "<45223423@example.com>":
626 self.push_lit("222 0 <45223423@example.com>")
627 elif message_spec == "3000234":
628 self.push_lit("222 3000234 <45223423@example.com>")
629 else:
630 self.push_lit("430 No Such Article Found")
631 return
632 self.push_lit(self.sample_body)
633 self.push_lit(".")
634
635
636class NNTPv2Handler(NNTPv1Handler):
637 """A handler for RFC 3977 (NNTP "v2")"""
638
639 def handle_CAPABILITIES(self):
640 self.push_lit("""\
641 101 Capability list:
Antoine Pitrouf80b3f72010-11-02 22:31:52 +0000642 VERSION 2 3
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000643 IMPLEMENTATION INN 2.5.1
644 AUTHINFO USER
645 HDR
646 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
647 OVER
648 POST
649 READER
650 .""")
651
652 def handle_OVER(self, message_spec=None):
653 return self.handle_XOVER(message_spec)
654
655
656class NNTPv1v2TestsMixin:
657
658 def setUp(self):
659 super().setUp()
660
661 def test_welcome(self):
662 self.assertEqual(self.server.welcome, self.handler.welcome)
663
664 def test_date(self):
665 resp, date = self.server.date()
666 self.assertEqual(resp, "111 20100914001155")
667 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
668
669 def test_quit(self):
670 self.assertFalse(self.sio.closed)
671 resp = self.server.quit()
672 self.assertEqual(resp, "205 Bye!")
673 self.assertTrue(self.sio.closed)
674
675 def test_help(self):
676 resp, help = self.server.help()
677 self.assertEqual(resp, "100 Legal commands")
678 self.assertEqual(help, [
679 ' authinfo user Name|pass Password|generic <prog> <args>',
680 ' date',
681 ' help',
682 'Report problems to <root@example.org>',
683 ])
684
685 def test_list(self):
686 resp, groups = self.server.list()
687 self.assertEqual(len(groups), 6)
688 g = groups[1]
689 self.assertEqual(g,
690 GroupInfo("comp.lang.python.announce", "0000001153",
691 "0000000993", "m"))
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000692 resp, groups = self.server.list("*distutils*")
693 self.assertEqual(len(groups), 2)
694 g = groups[0]
695 self.assertEqual(g,
696 GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
697 "0000000001", "m"))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000698
699 def test_stat(self):
700 resp, art_num, message_id = self.server.stat(3000234)
701 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
702 self.assertEqual(art_num, 3000234)
703 self.assertEqual(message_id, "<45223423@example.com>")
704 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
705 self.assertEqual(resp, "223 0 <45223423@example.com>")
706 self.assertEqual(art_num, 0)
707 self.assertEqual(message_id, "<45223423@example.com>")
708 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
709 self.server.stat("<non.existent.id>")
710 self.assertEqual(cm.exception.response, "430 No Such Article Found")
711 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
712 self.server.stat()
713 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
714
715 def test_next(self):
716 resp, art_num, message_id = self.server.next()
717 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
718 self.assertEqual(art_num, 3000237)
719 self.assertEqual(message_id, "<668929@example.org>")
720
721 def test_last(self):
722 resp, art_num, message_id = self.server.last()
723 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
724 self.assertEqual(art_num, 3000234)
725 self.assertEqual(message_id, "<45223423@example.com>")
726
727 def test_description(self):
728 desc = self.server.description("comp.lang.python")
729 self.assertEqual(desc, "The Python computer language.")
730 desc = self.server.description("comp.lang.pythonx")
731 self.assertEqual(desc, "")
732
733 def test_descriptions(self):
734 resp, groups = self.server.descriptions("comp.lang.python")
735 self.assertEqual(resp, '215 Descriptions in form "group description".')
736 self.assertEqual(groups, {
737 "comp.lang.python": "The Python computer language.",
738 })
739 resp, groups = self.server.descriptions("comp.lang.python*")
740 self.assertEqual(groups, {
741 "comp.lang.python": "The Python computer language.",
742 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
743 })
744 resp, groups = self.server.descriptions("comp.lang.pythonx")
745 self.assertEqual(groups, {})
746
747 def test_group(self):
748 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
749 self.assertTrue(resp.startswith("211 "), resp)
750 self.assertEqual(first, 761)
751 self.assertEqual(last, 1265)
752 self.assertEqual(count, 486)
753 self.assertEqual(group, "fr.comp.lang.python")
754 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
755 self.server.group("comp.lang.python.devel")
756 exc = cm.exception
757 self.assertTrue(exc.response.startswith("411 No such group"),
758 exc.response)
759
760 def test_newnews(self):
761 # NEWNEWS comp.lang.python [20]100913 082004
762 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
763 resp, ids = self.server.newnews("comp.lang.python", dt)
764 expected = (
765 "230 list of newsarticles (NNTP v{0}) "
766 "created after Mon Sep 13 08:20:04 2010 follows"
767 ).format(self.nntp_version)
768 self.assertEqual(resp, expected)
769 self.assertEqual(ids, [
770 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
771 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
772 ])
773 # NEWNEWS fr.comp.lang.python [20]100913 082004
774 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
775 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
776 self.assertEqual(resp, "230 An empty list of newsarticles follows")
777 self.assertEqual(ids, [])
778
779 def _check_article_body(self, lines):
780 self.assertEqual(len(lines), 4)
781 self.assertEqual(lines[-1].decode('utf8'), "-- Signed by André.")
782 self.assertEqual(lines[-2], b"")
783 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
784 self.assertEqual(lines[-4], b"This is just a test article.")
785
786 def _check_article_head(self, lines):
787 self.assertEqual(len(lines), 4)
788 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
789 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
790
791 def _check_article_data(self, lines):
792 self.assertEqual(len(lines), 9)
793 self._check_article_head(lines[:4])
794 self._check_article_body(lines[-4:])
795 self.assertEqual(lines[4], b"")
796
797 def test_article(self):
798 # ARTICLE
799 resp, info = self.server.article()
800 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
801 art_num, message_id, lines = info
802 self.assertEqual(art_num, 3000237)
803 self.assertEqual(message_id, "<45223423@example.com>")
804 self._check_article_data(lines)
805 # ARTICLE num
806 resp, info = self.server.article(3000234)
807 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
808 art_num, message_id, lines = info
809 self.assertEqual(art_num, 3000234)
810 self.assertEqual(message_id, "<45223423@example.com>")
811 self._check_article_data(lines)
812 # ARTICLE id
813 resp, info = self.server.article("<45223423@example.com>")
814 self.assertEqual(resp, "220 0 <45223423@example.com>")
815 art_num, message_id, lines = info
816 self.assertEqual(art_num, 0)
817 self.assertEqual(message_id, "<45223423@example.com>")
818 self._check_article_data(lines)
819 # Non-existent id
820 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
821 self.server.article("<non-existent@example.com>")
822 self.assertEqual(cm.exception.response, "430 No Such Article Found")
823
824 def test_article_file(self):
825 # With a "file" argument
826 f = io.BytesIO()
827 resp, info = self.server.article(file=f)
828 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
829 art_num, message_id, lines = info
830 self.assertEqual(art_num, 3000237)
831 self.assertEqual(message_id, "<45223423@example.com>")
832 self.assertEqual(lines, [])
833 data = f.getvalue()
834 self.assertTrue(data.startswith(
835 b'From: "Demo User" <nobody@example.net>\r\n'
836 b'Subject: I am just a test article\r\n'
837 ), ascii(data))
838 self.assertTrue(data.endswith(
839 b'This is just a test article.\r\n'
840 b'.Here is a dot-starting line.\r\n'
841 b'\r\n'
842 b'-- Signed by Andr\xc3\xa9.\r\n'
843 ), ascii(data))
844
845 def test_head(self):
846 # HEAD
847 resp, info = self.server.head()
848 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
849 art_num, message_id, lines = info
850 self.assertEqual(art_num, 3000237)
851 self.assertEqual(message_id, "<45223423@example.com>")
852 self._check_article_head(lines)
853 # HEAD num
854 resp, info = self.server.head(3000234)
855 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
856 art_num, message_id, lines = info
857 self.assertEqual(art_num, 3000234)
858 self.assertEqual(message_id, "<45223423@example.com>")
859 self._check_article_head(lines)
860 # HEAD id
861 resp, info = self.server.head("<45223423@example.com>")
862 self.assertEqual(resp, "221 0 <45223423@example.com>")
863 art_num, message_id, lines = info
864 self.assertEqual(art_num, 0)
865 self.assertEqual(message_id, "<45223423@example.com>")
866 self._check_article_head(lines)
867 # Non-existent id
868 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
869 self.server.head("<non-existent@example.com>")
870 self.assertEqual(cm.exception.response, "430 No Such Article Found")
871
872 def test_body(self):
873 # BODY
874 resp, info = self.server.body()
875 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
876 art_num, message_id, lines = info
877 self.assertEqual(art_num, 3000237)
878 self.assertEqual(message_id, "<45223423@example.com>")
879 self._check_article_body(lines)
880 # BODY num
881 resp, info = self.server.body(3000234)
882 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
883 art_num, message_id, lines = info
884 self.assertEqual(art_num, 3000234)
885 self.assertEqual(message_id, "<45223423@example.com>")
886 self._check_article_body(lines)
887 # BODY id
888 resp, info = self.server.body("<45223423@example.com>")
889 self.assertEqual(resp, "222 0 <45223423@example.com>")
890 art_num, message_id, lines = info
891 self.assertEqual(art_num, 0)
892 self.assertEqual(message_id, "<45223423@example.com>")
893 self._check_article_body(lines)
894 # Non-existent id
895 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
896 self.server.body("<non-existent@example.com>")
897 self.assertEqual(cm.exception.response, "430 No Such Article Found")
898
899 def check_over_xover_resp(self, resp, overviews):
900 self.assertTrue(resp.startswith("224 "), resp)
901 self.assertEqual(len(overviews), 3)
902 art_num, over = overviews[0]
903 self.assertEqual(art_num, 57)
904 self.assertEqual(over, {
905 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
906 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
907 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
908 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
909 "references": "<hvalf7$ort$1@dough.gmane.org>",
910 ":bytes": "7103",
911 ":lines": "16",
912 "xref": "news.gmane.org gmane.comp.python.authors:57"
913 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000914 art_num, over = overviews[1]
915 self.assertEqual(over["xref"], None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000916 art_num, over = overviews[2]
917 self.assertEqual(over["subject"],
918 "Re: Message d'erreur incompréhensible (par moi)")
919
920 def test_xover(self):
921 resp, overviews = self.server.xover(57, 59)
922 self.check_over_xover_resp(resp, overviews)
923
924 def test_over(self):
925 # In NNTP "v1", this will fallback on XOVER
926 resp, overviews = self.server.over((57, 59))
927 self.check_over_xover_resp(resp, overviews)
928
929 sample_post = (
930 b'From: "Demo User" <nobody@example.net>\r\n'
931 b'Subject: I am just a test article\r\n'
932 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
933 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
934 b'\r\n'
935 b'This is just a test article.\r\n'
936 b'.Here is a dot-starting line.\r\n'
937 b'\r\n'
938 b'-- Signed by Andr\xc3\xa9.\r\n'
939 )
940
941 def _check_posted_body(self):
942 # Check the raw body as received by the server
943 lines = self.handler.posted_body
944 # One additional line for the "." terminator
945 self.assertEqual(len(lines), 10)
946 self.assertEqual(lines[-1], b'.\r\n')
947 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
948 self.assertEqual(lines[-3], b'\r\n')
949 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
950 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
951
952 def _check_post_ihave_sub(self, func, *args, file_factory):
953 # First the prepared post with CRLF endings
954 post = self.sample_post
955 func_args = args + (file_factory(post),)
956 self.handler.posted_body = None
957 resp = func(*func_args)
958 self._check_posted_body()
959 # Then the same post with "normal" line endings - they should be
960 # converted by NNTP.post and NNTP.ihave.
961 post = self.sample_post.replace(b"\r\n", b"\n")
962 func_args = args + (file_factory(post),)
963 self.handler.posted_body = None
964 resp = func(*func_args)
965 self._check_posted_body()
966 return resp
967
968 def check_post_ihave(self, func, success_resp, *args):
969 # With a bytes object
970 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
971 self.assertEqual(resp, success_resp)
972 # With a bytearray object
973 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
974 self.assertEqual(resp, success_resp)
975 # With a file object
976 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
977 self.assertEqual(resp, success_resp)
978 # With an iterable of terminated lines
979 def iterlines(b):
980 return iter(b.splitlines(True))
981 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
982 self.assertEqual(resp, success_resp)
983 # With an iterable of non-terminated lines
984 def iterlines(b):
985 return iter(b.splitlines(False))
986 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
987 self.assertEqual(resp, success_resp)
988
989 def test_post(self):
990 self.check_post_ihave(self.server.post, "240 Article received OK")
991 self.handler.allow_posting = False
992 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
993 self.server.post(self.sample_post)
994 self.assertEqual(cm.exception.response,
995 "440 Posting not permitted")
996
997 def test_ihave(self):
998 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
999 "<i.am.an.article.you.will.want@example.com>")
1000 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1001 self.server.ihave("<another.message.id>", self.sample_post)
1002 self.assertEqual(cm.exception.response,
1003 "435 Article not wanted")
1004
1005
1006class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1007 """Tests an NNTP v1 server (no capabilities)."""
1008
1009 nntp_version = 1
1010 handler_class = NNTPv1Handler
1011
1012 def test_caps(self):
1013 caps = self.server.getcapabilities()
1014 self.assertEqual(caps, {})
1015 self.assertEqual(self.server.nntp_version, 1)
Antoine Pitroua0781152010-11-05 19:16:37 +00001016 self.assertEqual(self.server.nntp_implementation, None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001017
1018
1019class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1020 """Tests an NNTP v2 server (with capabilities)."""
1021
1022 nntp_version = 2
1023 handler_class = NNTPv2Handler
1024
1025 def test_caps(self):
1026 caps = self.server.getcapabilities()
1027 self.assertEqual(caps, {
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001028 'VERSION': ['2', '3'],
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001029 'IMPLEMENTATION': ['INN', '2.5.1'],
1030 'AUTHINFO': ['USER'],
1031 'HDR': [],
1032 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1033 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1034 'OVER': [],
1035 'POST': [],
1036 'READER': [],
1037 })
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001038 self.assertEqual(self.server.nntp_version, 3)
Antoine Pitroua0781152010-11-05 19:16:37 +00001039 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001040
1041
1042class MiscTests(unittest.TestCase):
1043
1044 def test_decode_header(self):
1045 def gives(a, b):
1046 self.assertEqual(nntplib.decode_header(a), b)
1047 gives("" , "")
1048 gives("a plain header", "a plain header")
1049 gives(" with extra spaces ", " with extra spaces ")
1050 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
1051 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1052 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1053 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1054 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1055 "Re: problème de matrice")
1056 # A natively utf-8 header (found in the real world!)
1057 gives("Re: Message d'erreur incompréhensible (par moi)",
1058 "Re: Message d'erreur incompréhensible (par moi)")
1059
1060 def test_parse_overview_fmt(self):
1061 # The minimal (default) response
1062 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1063 "References:", ":bytes", ":lines"]
1064 self.assertEqual(nntplib._parse_overview_fmt(lines),
1065 ["subject", "from", "date", "message-id", "references",
1066 ":bytes", ":lines"])
1067 # The minimal response using alternative names
1068 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1069 "References:", "Bytes:", "Lines:"]
1070 self.assertEqual(nntplib._parse_overview_fmt(lines),
1071 ["subject", "from", "date", "message-id", "references",
1072 ":bytes", ":lines"])
1073 # Variations in casing
1074 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1075 "References:", "BYTES:", "Lines:"]
1076 self.assertEqual(nntplib._parse_overview_fmt(lines),
1077 ["subject", "from", "date", "message-id", "references",
1078 ":bytes", ":lines"])
1079 # First example from RFC 3977
1080 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1081 "References:", ":bytes", ":lines", "Xref:full",
1082 "Distribution:full"]
1083 self.assertEqual(nntplib._parse_overview_fmt(lines),
1084 ["subject", "from", "date", "message-id", "references",
1085 ":bytes", ":lines", "xref", "distribution"])
1086 # Second example from RFC 3977
1087 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1088 "References:", "Bytes:", "Lines:", "Xref:FULL",
1089 "Distribution:FULL"]
1090 self.assertEqual(nntplib._parse_overview_fmt(lines),
1091 ["subject", "from", "date", "message-id", "references",
1092 ":bytes", ":lines", "xref", "distribution"])
1093 # A classic response from INN
1094 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1095 "References:", "Bytes:", "Lines:", "Xref:full"]
1096 self.assertEqual(nntplib._parse_overview_fmt(lines),
1097 ["subject", "from", "date", "message-id", "references",
1098 ":bytes", ":lines", "xref"])
1099
1100 def test_parse_overview(self):
1101 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1102 # First example from RFC 3977
1103 lines = [
1104 '3000234\tI am just a test article\t"Demo User" '
1105 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1106 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1107 '17\tXref: news.example.com misc.test:3000363',
1108 ]
1109 overview = nntplib._parse_overview(lines, fmt)
1110 (art_num, fields), = overview
1111 self.assertEqual(art_num, 3000234)
1112 self.assertEqual(fields, {
1113 'subject': 'I am just a test article',
1114 'from': '"Demo User" <nobody@example.com>',
1115 'date': '6 Oct 1998 04:38:40 -0500',
1116 'message-id': '<45223423@example.com>',
1117 'references': '<45454@example.net>',
1118 ':bytes': '1234',
1119 ':lines': '17',
1120 'xref': 'news.example.com misc.test:3000363',
1121 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001122 # Second example; here the "Xref" field is totally absent (including
1123 # the header name) and comes out as None
1124 lines = [
1125 '3000234\tI am just a test article\t"Demo User" '
1126 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1127 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1128 '17\t\t',
1129 ]
1130 overview = nntplib._parse_overview(lines, fmt)
1131 (art_num, fields), = overview
1132 self.assertEqual(fields['xref'], None)
1133 # Third example; the "Xref" is an empty string, while "references"
1134 # is a single space.
1135 lines = [
1136 '3000234\tI am just a test article\t"Demo User" '
1137 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1138 '<45223423@example.com>\t \t1234\t'
1139 '17\tXref: \t',
1140 ]
1141 overview = nntplib._parse_overview(lines, fmt)
1142 (art_num, fields), = overview
1143 self.assertEqual(fields['references'], ' ')
1144 self.assertEqual(fields['xref'], '')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001145
1146 def test_parse_datetime(self):
1147 def gives(a, b, *c):
1148 self.assertEqual(nntplib._parse_datetime(a, b),
1149 datetime.datetime(*c))
1150 # Output of DATE command
1151 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1152 # Variations
1153 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1154 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1155 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1156
1157 def test_unparse_datetime(self):
1158 # Test non-legacy mode
1159 # 1) with a datetime
1160 def gives(y, M, d, h, m, s, date_str, time_str):
1161 dt = datetime.datetime(y, M, d, h, m, s)
1162 self.assertEqual(nntplib._unparse_datetime(dt),
1163 (date_str, time_str))
1164 self.assertEqual(nntplib._unparse_datetime(dt, False),
1165 (date_str, time_str))
1166 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1167 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1168 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1169 # 2) with a date
1170 def gives(y, M, d, date_str, time_str):
1171 dt = datetime.date(y, M, d)
1172 self.assertEqual(nntplib._unparse_datetime(dt),
1173 (date_str, time_str))
1174 self.assertEqual(nntplib._unparse_datetime(dt, False),
1175 (date_str, time_str))
1176 gives(1999, 6, 23, "19990623", "000000")
1177 gives(2000, 6, 23, "20000623", "000000")
1178 gives(2010, 6, 5, "20100605", "000000")
1179
1180 def test_unparse_datetime_legacy(self):
1181 # Test legacy mode (RFC 977)
1182 # 1) with a datetime
1183 def gives(y, M, d, h, m, s, date_str, time_str):
1184 dt = datetime.datetime(y, M, d, h, m, s)
1185 self.assertEqual(nntplib._unparse_datetime(dt, True),
1186 (date_str, time_str))
1187 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1188 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1189 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1190 # 2) with a date
1191 def gives(y, M, d, date_str, time_str):
1192 dt = datetime.date(y, M, d)
1193 self.assertEqual(nntplib._unparse_datetime(dt, True),
1194 (date_str, time_str))
1195 gives(1999, 6, 23, "990623", "000000")
1196 gives(2000, 6, 23, "000623", "000000")
1197 gives(2010, 6, 5, "100605", "000000")
1198
1199
1200def test_main():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00001201 tests = [MiscTests, NNTPv1Tests, NNTPv2Tests, NetworkedNNTPTests]
1202 if _have_ssl:
1203 tests.append(NetworkedNNTP_SSLTests)
1204 support.run_unittest(*tests)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001205
1206
1207if __name__ == "__main__":
1208 test_main()