blob: 512bcd585cddfdaedeb8c9e1b83c3769da5f684f [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
2import datetime
3import textwrap
4import unittest
5import contextlib
6from test import support
7from nntplib import NNTP, GroupInfo
8import nntplib
9
10TIMEOUT = 30
11
12# TODO:
13# - test the `file` arg to more commands
14# - test error conditions
15
16
17class NetworkedNNTPTestsMixin:
18
19 def test_welcome(self):
20 welcome = self.server.getwelcome()
21 self.assertEqual(str, type(welcome))
22
23 def test_help(self):
24 resp, list = self.server.help()
25 self.assertTrue(resp.startswith("100 "), resp)
26 for line in list:
27 self.assertEqual(str, type(line))
28
29 def test_list(self):
30 resp, list = self.server.list()
31 if len(list) > 0:
32 self.assertEqual(GroupInfo, type(list[0]))
33 self.assertEqual(str, type(list[0].group))
34
35 def test_unknown_command(self):
36 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
37 self.server._shortcmd("XYZZY")
38 resp = cm.exception.response
39 self.assertTrue(resp.startswith("500 "), resp)
40
41 def test_newgroups(self):
42 # gmane gets a constant influx of new groups. In order not to stress
43 # the server too much, we choose a recent date in the past.
44 dt = datetime.date.today() - datetime.timedelta(days=7)
45 resp, groups = self.server.newgroups(dt)
46 if len(groups) > 0:
47 self.assertIsInstance(groups[0], GroupInfo)
48 self.assertIsInstance(groups[0].group, str)
49
50 def test_description(self):
51 def _check_desc(desc):
52 # Sanity checks
53 self.assertIsInstance(desc, str)
54 self.assertNotIn(self.GROUP_NAME, desc)
55 desc = self.server.description(self.GROUP_NAME)
56 _check_desc(desc)
57 # Another sanity check
58 self.assertIn("Python", desc)
59 # With a pattern
60 desc = self.server.description(self.GROUP_PAT)
61 _check_desc(desc)
62 # Shouldn't exist
63 desc = self.server.description("zk.brrtt.baz")
64 self.assertEqual(desc, '')
65
66 def test_descriptions(self):
67 resp, descs = self.server.descriptions(self.GROUP_PAT)
68 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
69 self.assertTrue(
70 resp.startswith("215 ") or resp.startswith("282 "), resp)
71 self.assertIsInstance(descs, dict)
72 desc = descs[self.GROUP_NAME]
73 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
74
75 def test_group(self):
76 result = self.server.group(self.GROUP_NAME)
77 self.assertEqual(5, len(result))
78 resp, count, first, last, group = result
79 self.assertEqual(group, self.GROUP_NAME)
80 self.assertIsInstance(count, int)
81 self.assertIsInstance(first, int)
82 self.assertIsInstance(last, int)
83 self.assertLessEqual(first, last)
84 self.assertTrue(resp.startswith("211 "), resp)
85
86 def test_date(self):
87 resp, date = self.server.date()
88 self.assertIsInstance(date, datetime.datetime)
89 # Sanity check
90 self.assertGreaterEqual(date.year, 1995)
91 self.assertLessEqual(date.year, 2030)
92
93 def _check_art_dict(self, art_dict):
94 # Some sanity checks for a field dictionary returned by OVER / XOVER
95 self.assertIsInstance(art_dict, dict)
96 # NNTP has 7 mandatory fields
97 self.assertGreaterEqual(art_dict.keys(),
98 {"subject", "from", "date", "message-id",
99 "references", ":bytes", ":lines"}
100 )
101 for v in art_dict.values():
102 self.assertIsInstance(v, str)
103
104 def test_xover(self):
105 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
106 resp, lines = self.server.xover(last, last)
107 art_num, art_dict = lines[0]
108 self.assertEqual(art_num, last)
109 self._check_art_dict(art_dict)
110
111 def test_over(self):
112 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
113 start = last - 10
114 # The "start-" article range form
115 resp, lines = self.server.over((start, None))
116 art_num, art_dict = lines[0]
117 self._check_art_dict(art_dict)
118 # The "start-end" article range form
119 resp, lines = self.server.over((start, last))
120 art_num, art_dict = lines[-1]
121 self.assertEqual(art_num, last)
122 self._check_art_dict(art_dict)
123 # XXX The "message_id" form is unsupported by gmane
124 # 503 Overview by message-ID unsupported
125
126 def test_xhdr(self):
127 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
128 resp, lines = self.server.xhdr('subject', last)
129 for line in lines:
130 self.assertEqual(str, type(line[1]))
131
132 def check_article_resp(self, resp, article, art_num=None):
133 self.assertIsInstance(article, nntplib.ArticleInfo)
134 if art_num is not None:
135 self.assertEqual(article.number, art_num)
136 for line in article.lines:
137 self.assertIsInstance(line, bytes)
138 # XXX this could exceptionally happen...
139 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
140
141 def test_article_head_body(self):
142 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
143 resp, head = self.server.head(last)
144 self.assertTrue(resp.startswith("221 "), resp)
145 self.check_article_resp(resp, head, last)
146 resp, body = self.server.body(last)
147 self.assertTrue(resp.startswith("222 "), resp)
148 self.check_article_resp(resp, body, last)
149 resp, article = self.server.article(last)
150 self.assertTrue(resp.startswith("220 "), resp)
151 self.check_article_resp(resp, article, last)
152 self.assertEqual(article.lines, head.lines + [b''] + body.lines)
153
154 def test_quit(self):
155 self.server.quit()
156 self.server = None
157
158
159class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
160 NNTP_HOST = 'news.gmane.org'
161 GROUP_NAME = 'gmane.comp.python.devel'
162 GROUP_PAT = 'gmane.comp.python.d*'
163
164 def setUp(self):
165 support.requires("network")
166 with support.transient_internet(self.NNTP_HOST):
167 self.server = NNTP(self.NNTP_HOST, timeout=TIMEOUT)
168
169 def tearDown(self):
170 if self.server is not None:
171 self.server.quit()
172
173 # Disabled with gmane as it produces too much data
174 test_list = None
175
176 def test_capabilities(self):
177 # As of this writing, gmane implements NNTP version 2 and has a
178 # couple of well-known capabilities. Just sanity check that we
179 # got them.
180 def _check_caps(caps):
181 caps_list = caps['LIST']
182 self.assertIsInstance(caps_list, (list, tuple))
183 self.assertIn('OVERVIEW.FMT', caps_list)
184 self.assertGreaterEqual(self.server.nntp_version, 2)
185 _check_caps(self.server.getcapabilities())
186 # This re-emits the command
187 resp, caps = self.server.capabilities()
188 _check_caps(caps)
189
190
191#
192# Non-networked tests using a local server (or something mocking it).
193#
194
195class _NNTPServerIO(io.RawIOBase):
196 """A raw IO object allowing NNTP commands to be received and processed
197 by a handler. The handler can push responses which can then be read
198 from the IO object."""
199
200 def __init__(self, handler):
201 io.RawIOBase.__init__(self)
202 # The channel from the client
203 self.c2s = io.BytesIO()
204 # The channel to the client
205 self.s2c = io.BytesIO()
206 self.handler = handler
207 self.handler.start(self.c2s.readline, self.push_data)
208
209 def readable(self):
210 return True
211
212 def writable(self):
213 return True
214
215 def push_data(self, data):
216 """Push (buffer) some data to send to the client."""
217 pos = self.s2c.tell()
218 self.s2c.seek(0, 2)
219 self.s2c.write(data)
220 self.s2c.seek(pos)
221
222 def write(self, b):
223 """The client sends us some data"""
224 pos = self.c2s.tell()
225 self.c2s.write(b)
226 self.c2s.seek(pos)
227 self.handler.process_pending()
228 return len(b)
229
230 def readinto(self, buf):
231 """The client wants to read a response"""
232 self.handler.process_pending()
233 b = self.s2c.read(len(buf))
234 n = len(b)
235 buf[:n] = b
236 return n
237
238
239class MockedNNTPTestsMixin:
240 # Override in derived classes
241 handler_class = None
242
243 def setUp(self):
244 super().setUp()
245 self.make_server()
246
247 def tearDown(self):
248 super().tearDown()
249 del self.server
250
251 def make_server(self, *args, **kwargs):
252 self.handler = self.handler_class()
253 self.sio = _NNTPServerIO(self.handler)
254 # Using BufferedRWPair instead of BufferedRandom ensures the file
255 # isn't seekable.
256 file = io.BufferedRWPair(self.sio, self.sio)
257 self.server = nntplib._NNTPBase(file, *args, **kwargs)
258 return self.server
259
260
261class NNTPv1Handler:
262 """A handler for RFC 977"""
263
264 welcome = "200 NNTP mock server"
265
266 def start(self, readline, push_data):
267 self.in_body = False
268 self.allow_posting = True
269 self._readline = readline
270 self._push_data = push_data
271 # Our welcome
272 self.handle_welcome()
273
274 def _decode(self, data):
275 return str(data, "utf-8", "surrogateescape")
276
277 def process_pending(self):
278 if self.in_body:
279 while True:
280 line = self._readline()
281 if not line:
282 return
283 self.body.append(line)
284 if line == b".\r\n":
285 break
286 try:
287 meth, tokens = self.body_callback
288 meth(*tokens, body=self.body)
289 finally:
290 self.body_callback = None
291 self.body = None
292 self.in_body = False
293 while True:
294 line = self._decode(self._readline())
295 if not line:
296 return
297 if not line.endswith("\r\n"):
298 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
299 line = line[:-2]
300 cmd, *tokens = line.split()
301 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
302 meth = getattr(self, "handle_" + cmd.upper(), None)
303 if meth is None:
304 self.handle_unknown()
305 else:
306 try:
307 meth(*tokens)
308 except Exception as e:
309 raise ValueError("command failed: {!r}".format(line)) from e
310 else:
311 if self.in_body:
312 self.body_callback = meth, tokens
313 self.body = []
314
315 def expect_body(self):
316 """Flag that the client is expected to post a request body"""
317 self.in_body = True
318
319 def push_data(self, data):
320 """Push some binary data"""
321 self._push_data(data)
322
323 def push_lit(self, lit):
324 """Push a string literal"""
325 lit = textwrap.dedent(lit)
326 lit = "\r\n".join(lit.splitlines()) + "\r\n"
327 lit = lit.encode('utf-8')
328 self.push_data(lit)
329
330 def handle_unknown(self):
331 self.push_lit("500 What?")
332
333 def handle_welcome(self):
334 self.push_lit(self.welcome)
335
336 def handle_QUIT(self):
337 self.push_lit("205 Bye!")
338
339 def handle_DATE(self):
340 self.push_lit("111 20100914001155")
341
342 def handle_GROUP(self, group):
343 if group == "fr.comp.lang.python":
344 self.push_lit("211 486 761 1265 fr.comp.lang.python")
345 else:
346 self.push_lit("411 No such group {}".format(group))
347
348 def handle_HELP(self):
349 self.push_lit("""\
350 100 Legal commands
351 authinfo user Name|pass Password|generic <prog> <args>
352 date
353 help
354 Report problems to <root@example.org>
355 .""")
356
357 def handle_STAT(self, message_spec=None):
358 if message_spec is None:
359 self.push_lit("412 No newsgroup selected")
360 elif message_spec == "3000234":
361 self.push_lit("223 3000234 <45223423@example.com>")
362 elif message_spec == "<45223423@example.com>":
363 self.push_lit("223 0 <45223423@example.com>")
364 else:
365 self.push_lit("430 No Such Article Found")
366
367 def handle_NEXT(self):
368 self.push_lit("223 3000237 <668929@example.org> retrieved")
369
370 def handle_LAST(self):
371 self.push_lit("223 3000234 <45223423@example.com> retrieved")
372
373 def handle_LIST(self, action=None, param=None):
374 if action is None:
375 self.push_lit("""\
376 215 Newsgroups in form "group high low flags".
377 comp.lang.python 0000052340 0000002828 y
378 comp.lang.python.announce 0000001153 0000000993 m
379 free.it.comp.lang.python 0000000002 0000000002 y
380 fr.comp.lang.python 0000001254 0000000760 y
381 free.it.comp.lang.python.learner 0000000000 0000000001 y
382 tw.bbs.comp.lang.python 0000000304 0000000304 y
383 .""")
384 elif action == "OVERVIEW.FMT":
385 self.push_lit("""\
386 215 Order of fields in overview database.
387 Subject:
388 From:
389 Date:
390 Message-ID:
391 References:
392 Bytes:
393 Lines:
394 Xref:full
395 .""")
396 elif action == "NEWSGROUPS":
397 assert param is not None
398 if param == "comp.lang.python":
399 self.push_lit("""\
400 215 Descriptions in form "group description".
401 comp.lang.python\tThe Python computer language.
402 .""")
403 elif param == "comp.lang.python*":
404 self.push_lit("""\
405 215 Descriptions in form "group description".
406 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
407 comp.lang.python\tThe Python computer language.
408 .""")
409 else:
410 self.push_lit("""\
411 215 Descriptions in form "group description".
412 .""")
413 else:
414 self.push_lit('501 Unknown LIST keyword')
415
416 def handle_NEWNEWS(self, group, date_str, time_str):
417 # We hard code different return messages depending on passed
418 # argument and date syntax.
419 if (group == "comp.lang.python" and date_str == "20100913"
420 and time_str == "082004"):
421 # Date was passed in RFC 3977 format (NNTP "v2")
422 self.push_lit("""\
423 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
424 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
425 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
426 .""")
427 elif (group == "comp.lang.python" and date_str == "100913"
428 and time_str == "082004"):
429 # Date was passed in RFC 977 format (NNTP "v1")
430 self.push_lit("""\
431 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
432 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
433 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
434 .""")
435 else:
436 self.push_lit("""\
437 230 An empty list of newsarticles follows
438 .""")
439 # (Note for experiments: many servers disable NEWNEWS.
440 # As of this writing, sicinfo3.epfl.ch doesn't.)
441
442 def handle_XOVER(self, message_spec):
443 if message_spec == "57-59":
444 self.push_lit(
445 "224 Overview information for 57-58 follows\n"
446 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
447 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
448 "\tSat, 19 Jun 2010 18:04:08 -0400"
449 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
450 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
451 "\tXref: news.gmane.org gmane.comp.python.authors:57"
452 "\n"
453 "58\tLooking for a few good bloggers"
454 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
455 "\tThu, 22 Jul 2010 09:14:14 -0400"
456 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
457 "\t\t6683\t16"
458 "\tXref: news.gmane.org gmane.comp.python.authors:58"
459 "\n"
460 # An UTF-8 overview line from fr.comp.lang.python
461 "59\tRe: Message d'erreur incompréhensible (par moi)"
462 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
463 "\tWed, 15 Sep 2010 18:09:15 +0200"
464 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
465 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
466 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
467 "\n"
468 ".\n")
469 else:
470 self.push_lit("""\
471 224 No articles
472 .""")
473
474 def handle_POST(self, *, body=None):
475 if body is None:
476 if self.allow_posting:
477 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
478 self.expect_body()
479 else:
480 self.push_lit("440 Posting not permitted")
481 else:
482 assert self.allow_posting
483 self.push_lit("240 Article received OK")
484 self.posted_body = body
485
486 def handle_IHAVE(self, message_id, *, body=None):
487 if body is None:
488 if (self.allow_posting and
489 message_id == "<i.am.an.article.you.will.want@example.com>"):
490 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
491 self.expect_body()
492 else:
493 self.push_lit("435 Article not wanted")
494 else:
495 assert self.allow_posting
496 self.push_lit("235 Article transferred OK")
497 self.posted_body = body
498
499 sample_head = """\
500 From: "Demo User" <nobody@example.net>
501 Subject: I am just a test article
502 Content-Type: text/plain; charset=UTF-8; format=flowed
503 Message-ID: <i.am.an.article.you.will.want@example.com>"""
504
505 sample_body = """\
506 This is just a test article.
507 ..Here is a dot-starting line.
508
509 -- Signed by Andr\xe9."""
510
511 sample_article = sample_head + "\n\n" + sample_body
512
513 def handle_ARTICLE(self, message_spec=None):
514 if message_spec is None:
515 self.push_lit("220 3000237 <45223423@example.com>")
516 elif message_spec == "<45223423@example.com>":
517 self.push_lit("220 0 <45223423@example.com>")
518 elif message_spec == "3000234":
519 self.push_lit("220 3000234 <45223423@example.com>")
520 else:
521 self.push_lit("430 No Such Article Found")
522 return
523 self.push_lit(self.sample_article)
524 self.push_lit(".")
525
526 def handle_HEAD(self, message_spec=None):
527 if message_spec is None:
528 self.push_lit("221 3000237 <45223423@example.com>")
529 elif message_spec == "<45223423@example.com>":
530 self.push_lit("221 0 <45223423@example.com>")
531 elif message_spec == "3000234":
532 self.push_lit("221 3000234 <45223423@example.com>")
533 else:
534 self.push_lit("430 No Such Article Found")
535 return
536 self.push_lit(self.sample_head)
537 self.push_lit(".")
538
539 def handle_BODY(self, message_spec=None):
540 if message_spec is None:
541 self.push_lit("222 3000237 <45223423@example.com>")
542 elif message_spec == "<45223423@example.com>":
543 self.push_lit("222 0 <45223423@example.com>")
544 elif message_spec == "3000234":
545 self.push_lit("222 3000234 <45223423@example.com>")
546 else:
547 self.push_lit("430 No Such Article Found")
548 return
549 self.push_lit(self.sample_body)
550 self.push_lit(".")
551
552
553class NNTPv2Handler(NNTPv1Handler):
554 """A handler for RFC 3977 (NNTP "v2")"""
555
556 def handle_CAPABILITIES(self):
557 self.push_lit("""\
558 101 Capability list:
559 VERSION 2
560 IMPLEMENTATION INN 2.5.1
561 AUTHINFO USER
562 HDR
563 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
564 OVER
565 POST
566 READER
567 .""")
568
569 def handle_OVER(self, message_spec=None):
570 return self.handle_XOVER(message_spec)
571
572
573class NNTPv1v2TestsMixin:
574
575 def setUp(self):
576 super().setUp()
577
578 def test_welcome(self):
579 self.assertEqual(self.server.welcome, self.handler.welcome)
580
581 def test_date(self):
582 resp, date = self.server.date()
583 self.assertEqual(resp, "111 20100914001155")
584 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
585
586 def test_quit(self):
587 self.assertFalse(self.sio.closed)
588 resp = self.server.quit()
589 self.assertEqual(resp, "205 Bye!")
590 self.assertTrue(self.sio.closed)
591
592 def test_help(self):
593 resp, help = self.server.help()
594 self.assertEqual(resp, "100 Legal commands")
595 self.assertEqual(help, [
596 ' authinfo user Name|pass Password|generic <prog> <args>',
597 ' date',
598 ' help',
599 'Report problems to <root@example.org>',
600 ])
601
602 def test_list(self):
603 resp, groups = self.server.list()
604 self.assertEqual(len(groups), 6)
605 g = groups[1]
606 self.assertEqual(g,
607 GroupInfo("comp.lang.python.announce", "0000001153",
608 "0000000993", "m"))
609
610 def test_stat(self):
611 resp, art_num, message_id = self.server.stat(3000234)
612 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
613 self.assertEqual(art_num, 3000234)
614 self.assertEqual(message_id, "<45223423@example.com>")
615 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
616 self.assertEqual(resp, "223 0 <45223423@example.com>")
617 self.assertEqual(art_num, 0)
618 self.assertEqual(message_id, "<45223423@example.com>")
619 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
620 self.server.stat("<non.existent.id>")
621 self.assertEqual(cm.exception.response, "430 No Such Article Found")
622 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
623 self.server.stat()
624 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
625
626 def test_next(self):
627 resp, art_num, message_id = self.server.next()
628 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
629 self.assertEqual(art_num, 3000237)
630 self.assertEqual(message_id, "<668929@example.org>")
631
632 def test_last(self):
633 resp, art_num, message_id = self.server.last()
634 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
635 self.assertEqual(art_num, 3000234)
636 self.assertEqual(message_id, "<45223423@example.com>")
637
638 def test_description(self):
639 desc = self.server.description("comp.lang.python")
640 self.assertEqual(desc, "The Python computer language.")
641 desc = self.server.description("comp.lang.pythonx")
642 self.assertEqual(desc, "")
643
644 def test_descriptions(self):
645 resp, groups = self.server.descriptions("comp.lang.python")
646 self.assertEqual(resp, '215 Descriptions in form "group description".')
647 self.assertEqual(groups, {
648 "comp.lang.python": "The Python computer language.",
649 })
650 resp, groups = self.server.descriptions("comp.lang.python*")
651 self.assertEqual(groups, {
652 "comp.lang.python": "The Python computer language.",
653 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
654 })
655 resp, groups = self.server.descriptions("comp.lang.pythonx")
656 self.assertEqual(groups, {})
657
658 def test_group(self):
659 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
660 self.assertTrue(resp.startswith("211 "), resp)
661 self.assertEqual(first, 761)
662 self.assertEqual(last, 1265)
663 self.assertEqual(count, 486)
664 self.assertEqual(group, "fr.comp.lang.python")
665 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
666 self.server.group("comp.lang.python.devel")
667 exc = cm.exception
668 self.assertTrue(exc.response.startswith("411 No such group"),
669 exc.response)
670
671 def test_newnews(self):
672 # NEWNEWS comp.lang.python [20]100913 082004
673 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
674 resp, ids = self.server.newnews("comp.lang.python", dt)
675 expected = (
676 "230 list of newsarticles (NNTP v{0}) "
677 "created after Mon Sep 13 08:20:04 2010 follows"
678 ).format(self.nntp_version)
679 self.assertEqual(resp, expected)
680 self.assertEqual(ids, [
681 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
682 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
683 ])
684 # NEWNEWS fr.comp.lang.python [20]100913 082004
685 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
686 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
687 self.assertEqual(resp, "230 An empty list of newsarticles follows")
688 self.assertEqual(ids, [])
689
690 def _check_article_body(self, lines):
691 self.assertEqual(len(lines), 4)
692 self.assertEqual(lines[-1].decode('utf8'), "-- Signed by André.")
693 self.assertEqual(lines[-2], b"")
694 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
695 self.assertEqual(lines[-4], b"This is just a test article.")
696
697 def _check_article_head(self, lines):
698 self.assertEqual(len(lines), 4)
699 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
700 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
701
702 def _check_article_data(self, lines):
703 self.assertEqual(len(lines), 9)
704 self._check_article_head(lines[:4])
705 self._check_article_body(lines[-4:])
706 self.assertEqual(lines[4], b"")
707
708 def test_article(self):
709 # ARTICLE
710 resp, info = self.server.article()
711 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
712 art_num, message_id, lines = info
713 self.assertEqual(art_num, 3000237)
714 self.assertEqual(message_id, "<45223423@example.com>")
715 self._check_article_data(lines)
716 # ARTICLE num
717 resp, info = self.server.article(3000234)
718 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
719 art_num, message_id, lines = info
720 self.assertEqual(art_num, 3000234)
721 self.assertEqual(message_id, "<45223423@example.com>")
722 self._check_article_data(lines)
723 # ARTICLE id
724 resp, info = self.server.article("<45223423@example.com>")
725 self.assertEqual(resp, "220 0 <45223423@example.com>")
726 art_num, message_id, lines = info
727 self.assertEqual(art_num, 0)
728 self.assertEqual(message_id, "<45223423@example.com>")
729 self._check_article_data(lines)
730 # Non-existent id
731 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
732 self.server.article("<non-existent@example.com>")
733 self.assertEqual(cm.exception.response, "430 No Such Article Found")
734
735 def test_article_file(self):
736 # With a "file" argument
737 f = io.BytesIO()
738 resp, info = self.server.article(file=f)
739 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
740 art_num, message_id, lines = info
741 self.assertEqual(art_num, 3000237)
742 self.assertEqual(message_id, "<45223423@example.com>")
743 self.assertEqual(lines, [])
744 data = f.getvalue()
745 self.assertTrue(data.startswith(
746 b'From: "Demo User" <nobody@example.net>\r\n'
747 b'Subject: I am just a test article\r\n'
748 ), ascii(data))
749 self.assertTrue(data.endswith(
750 b'This is just a test article.\r\n'
751 b'.Here is a dot-starting line.\r\n'
752 b'\r\n'
753 b'-- Signed by Andr\xc3\xa9.\r\n'
754 ), ascii(data))
755
756 def test_head(self):
757 # HEAD
758 resp, info = self.server.head()
759 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
760 art_num, message_id, lines = info
761 self.assertEqual(art_num, 3000237)
762 self.assertEqual(message_id, "<45223423@example.com>")
763 self._check_article_head(lines)
764 # HEAD num
765 resp, info = self.server.head(3000234)
766 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
767 art_num, message_id, lines = info
768 self.assertEqual(art_num, 3000234)
769 self.assertEqual(message_id, "<45223423@example.com>")
770 self._check_article_head(lines)
771 # HEAD id
772 resp, info = self.server.head("<45223423@example.com>")
773 self.assertEqual(resp, "221 0 <45223423@example.com>")
774 art_num, message_id, lines = info
775 self.assertEqual(art_num, 0)
776 self.assertEqual(message_id, "<45223423@example.com>")
777 self._check_article_head(lines)
778 # Non-existent id
779 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
780 self.server.head("<non-existent@example.com>")
781 self.assertEqual(cm.exception.response, "430 No Such Article Found")
782
783 def test_body(self):
784 # BODY
785 resp, info = self.server.body()
786 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
787 art_num, message_id, lines = info
788 self.assertEqual(art_num, 3000237)
789 self.assertEqual(message_id, "<45223423@example.com>")
790 self._check_article_body(lines)
791 # BODY num
792 resp, info = self.server.body(3000234)
793 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
794 art_num, message_id, lines = info
795 self.assertEqual(art_num, 3000234)
796 self.assertEqual(message_id, "<45223423@example.com>")
797 self._check_article_body(lines)
798 # BODY id
799 resp, info = self.server.body("<45223423@example.com>")
800 self.assertEqual(resp, "222 0 <45223423@example.com>")
801 art_num, message_id, lines = info
802 self.assertEqual(art_num, 0)
803 self.assertEqual(message_id, "<45223423@example.com>")
804 self._check_article_body(lines)
805 # Non-existent id
806 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
807 self.server.body("<non-existent@example.com>")
808 self.assertEqual(cm.exception.response, "430 No Such Article Found")
809
810 def check_over_xover_resp(self, resp, overviews):
811 self.assertTrue(resp.startswith("224 "), resp)
812 self.assertEqual(len(overviews), 3)
813 art_num, over = overviews[0]
814 self.assertEqual(art_num, 57)
815 self.assertEqual(over, {
816 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
817 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
818 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
819 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
820 "references": "<hvalf7$ort$1@dough.gmane.org>",
821 ":bytes": "7103",
822 ":lines": "16",
823 "xref": "news.gmane.org gmane.comp.python.authors:57"
824 })
825 art_num, over = overviews[2]
826 self.assertEqual(over["subject"],
827 "Re: Message d'erreur incompréhensible (par moi)")
828
829 def test_xover(self):
830 resp, overviews = self.server.xover(57, 59)
831 self.check_over_xover_resp(resp, overviews)
832
833 def test_over(self):
834 # In NNTP "v1", this will fallback on XOVER
835 resp, overviews = self.server.over((57, 59))
836 self.check_over_xover_resp(resp, overviews)
837
838 sample_post = (
839 b'From: "Demo User" <nobody@example.net>\r\n'
840 b'Subject: I am just a test article\r\n'
841 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
842 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
843 b'\r\n'
844 b'This is just a test article.\r\n'
845 b'.Here is a dot-starting line.\r\n'
846 b'\r\n'
847 b'-- Signed by Andr\xc3\xa9.\r\n'
848 )
849
850 def _check_posted_body(self):
851 # Check the raw body as received by the server
852 lines = self.handler.posted_body
853 # One additional line for the "." terminator
854 self.assertEqual(len(lines), 10)
855 self.assertEqual(lines[-1], b'.\r\n')
856 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
857 self.assertEqual(lines[-3], b'\r\n')
858 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
859 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
860
861 def _check_post_ihave_sub(self, func, *args, file_factory):
862 # First the prepared post with CRLF endings
863 post = self.sample_post
864 func_args = args + (file_factory(post),)
865 self.handler.posted_body = None
866 resp = func(*func_args)
867 self._check_posted_body()
868 # Then the same post with "normal" line endings - they should be
869 # converted by NNTP.post and NNTP.ihave.
870 post = self.sample_post.replace(b"\r\n", b"\n")
871 func_args = args + (file_factory(post),)
872 self.handler.posted_body = None
873 resp = func(*func_args)
874 self._check_posted_body()
875 return resp
876
877 def check_post_ihave(self, func, success_resp, *args):
878 # With a bytes object
879 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
880 self.assertEqual(resp, success_resp)
881 # With a bytearray object
882 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
883 self.assertEqual(resp, success_resp)
884 # With a file object
885 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
886 self.assertEqual(resp, success_resp)
887 # With an iterable of terminated lines
888 def iterlines(b):
889 return iter(b.splitlines(True))
890 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
891 self.assertEqual(resp, success_resp)
892 # With an iterable of non-terminated lines
893 def iterlines(b):
894 return iter(b.splitlines(False))
895 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
896 self.assertEqual(resp, success_resp)
897
898 def test_post(self):
899 self.check_post_ihave(self.server.post, "240 Article received OK")
900 self.handler.allow_posting = False
901 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
902 self.server.post(self.sample_post)
903 self.assertEqual(cm.exception.response,
904 "440 Posting not permitted")
905
906 def test_ihave(self):
907 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
908 "<i.am.an.article.you.will.want@example.com>")
909 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
910 self.server.ihave("<another.message.id>", self.sample_post)
911 self.assertEqual(cm.exception.response,
912 "435 Article not wanted")
913
914
915class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
916 """Tests an NNTP v1 server (no capabilities)."""
917
918 nntp_version = 1
919 handler_class = NNTPv1Handler
920
921 def test_caps(self):
922 caps = self.server.getcapabilities()
923 self.assertEqual(caps, {})
924 self.assertEqual(self.server.nntp_version, 1)
925
926
927class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
928 """Tests an NNTP v2 server (with capabilities)."""
929
930 nntp_version = 2
931 handler_class = NNTPv2Handler
932
933 def test_caps(self):
934 caps = self.server.getcapabilities()
935 self.assertEqual(caps, {
936 'VERSION': ['2'],
937 'IMPLEMENTATION': ['INN', '2.5.1'],
938 'AUTHINFO': ['USER'],
939 'HDR': [],
940 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
941 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
942 'OVER': [],
943 'POST': [],
944 'READER': [],
945 })
946 self.assertEqual(self.server.nntp_version, 2)
947
948
949class MiscTests(unittest.TestCase):
950
951 def test_decode_header(self):
952 def gives(a, b):
953 self.assertEqual(nntplib.decode_header(a), b)
954 gives("" , "")
955 gives("a plain header", "a plain header")
956 gives(" with extra spaces ", " with extra spaces ")
957 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
958 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
959 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
960 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
961 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
962 "Re: problème de matrice")
963 # A natively utf-8 header (found in the real world!)
964 gives("Re: Message d'erreur incompréhensible (par moi)",
965 "Re: Message d'erreur incompréhensible (par moi)")
966
967 def test_parse_overview_fmt(self):
968 # The minimal (default) response
969 lines = ["Subject:", "From:", "Date:", "Message-ID:",
970 "References:", ":bytes", ":lines"]
971 self.assertEqual(nntplib._parse_overview_fmt(lines),
972 ["subject", "from", "date", "message-id", "references",
973 ":bytes", ":lines"])
974 # The minimal response using alternative names
975 lines = ["Subject:", "From:", "Date:", "Message-ID:",
976 "References:", "Bytes:", "Lines:"]
977 self.assertEqual(nntplib._parse_overview_fmt(lines),
978 ["subject", "from", "date", "message-id", "references",
979 ":bytes", ":lines"])
980 # Variations in casing
981 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
982 "References:", "BYTES:", "Lines:"]
983 self.assertEqual(nntplib._parse_overview_fmt(lines),
984 ["subject", "from", "date", "message-id", "references",
985 ":bytes", ":lines"])
986 # First example from RFC 3977
987 lines = ["Subject:", "From:", "Date:", "Message-ID:",
988 "References:", ":bytes", ":lines", "Xref:full",
989 "Distribution:full"]
990 self.assertEqual(nntplib._parse_overview_fmt(lines),
991 ["subject", "from", "date", "message-id", "references",
992 ":bytes", ":lines", "xref", "distribution"])
993 # Second example from RFC 3977
994 lines = ["Subject:", "From:", "Date:", "Message-ID:",
995 "References:", "Bytes:", "Lines:", "Xref:FULL",
996 "Distribution:FULL"]
997 self.assertEqual(nntplib._parse_overview_fmt(lines),
998 ["subject", "from", "date", "message-id", "references",
999 ":bytes", ":lines", "xref", "distribution"])
1000 # A classic response from INN
1001 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1002 "References:", "Bytes:", "Lines:", "Xref:full"]
1003 self.assertEqual(nntplib._parse_overview_fmt(lines),
1004 ["subject", "from", "date", "message-id", "references",
1005 ":bytes", ":lines", "xref"])
1006
1007 def test_parse_overview(self):
1008 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1009 # First example from RFC 3977
1010 lines = [
1011 '3000234\tI am just a test article\t"Demo User" '
1012 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1013 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1014 '17\tXref: news.example.com misc.test:3000363',
1015 ]
1016 overview = nntplib._parse_overview(lines, fmt)
1017 (art_num, fields), = overview
1018 self.assertEqual(art_num, 3000234)
1019 self.assertEqual(fields, {
1020 'subject': 'I am just a test article',
1021 'from': '"Demo User" <nobody@example.com>',
1022 'date': '6 Oct 1998 04:38:40 -0500',
1023 'message-id': '<45223423@example.com>',
1024 'references': '<45454@example.net>',
1025 ':bytes': '1234',
1026 ':lines': '17',
1027 'xref': 'news.example.com misc.test:3000363',
1028 })
1029
1030 def test_parse_datetime(self):
1031 def gives(a, b, *c):
1032 self.assertEqual(nntplib._parse_datetime(a, b),
1033 datetime.datetime(*c))
1034 # Output of DATE command
1035 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1036 # Variations
1037 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1038 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1039 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1040
1041 def test_unparse_datetime(self):
1042 # Test non-legacy mode
1043 # 1) with a datetime
1044 def gives(y, M, d, h, m, s, date_str, time_str):
1045 dt = datetime.datetime(y, M, d, h, m, s)
1046 self.assertEqual(nntplib._unparse_datetime(dt),
1047 (date_str, time_str))
1048 self.assertEqual(nntplib._unparse_datetime(dt, False),
1049 (date_str, time_str))
1050 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1051 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1052 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1053 # 2) with a date
1054 def gives(y, M, d, date_str, time_str):
1055 dt = datetime.date(y, M, d)
1056 self.assertEqual(nntplib._unparse_datetime(dt),
1057 (date_str, time_str))
1058 self.assertEqual(nntplib._unparse_datetime(dt, False),
1059 (date_str, time_str))
1060 gives(1999, 6, 23, "19990623", "000000")
1061 gives(2000, 6, 23, "20000623", "000000")
1062 gives(2010, 6, 5, "20100605", "000000")
1063
1064 def test_unparse_datetime_legacy(self):
1065 # Test legacy mode (RFC 977)
1066 # 1) with a datetime
1067 def gives(y, M, d, h, m, s, date_str, time_str):
1068 dt = datetime.datetime(y, M, d, h, m, s)
1069 self.assertEqual(nntplib._unparse_datetime(dt, True),
1070 (date_str, time_str))
1071 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1072 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1073 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1074 # 2) with a date
1075 def gives(y, M, d, date_str, time_str):
1076 dt = datetime.date(y, M, d)
1077 self.assertEqual(nntplib._unparse_datetime(dt, True),
1078 (date_str, time_str))
1079 gives(1999, 6, 23, "990623", "000000")
1080 gives(2000, 6, 23, "000623", "000000")
1081 gives(2010, 6, 5, "100605", "000000")
1082
1083
1084def test_main():
1085 support.run_unittest(MiscTests, NNTPv1Tests, NNTPv2Tests,
1086 NetworkedNNTPTests
1087 )
1088
1089
1090if __name__ == "__main__":
1091 test_main()