blob: f9a4cdcb522491c0cd7286abd9ab34fc85076c21 [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
2import datetime
3import textwrap
4import unittest
5import contextlib
6from test import support
7from nntplib import NNTP, GroupInfo
8import nntplib
9
10TIMEOUT = 30
11
12# TODO:
13# - test the `file` arg to more commands
14# - test error conditions
Antoine Pitroua5785b12010-09-29 16:19:50 +000015# - test auth and `usenetrc`
Antoine Pitrou69ab9512010-09-29 15:03:40 +000016
17
18class NetworkedNNTPTestsMixin:
19
20 def test_welcome(self):
21 welcome = self.server.getwelcome()
22 self.assertEqual(str, type(welcome))
23
24 def test_help(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000025 resp, lines = self.server.help()
Antoine Pitrou69ab9512010-09-29 15:03:40 +000026 self.assertTrue(resp.startswith("100 "), resp)
Antoine Pitrou08eeada2010-11-04 21:36:15 +000027 for line in lines:
Antoine Pitrou69ab9512010-09-29 15:03:40 +000028 self.assertEqual(str, type(line))
29
30 def test_list(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000031 resp, groups = self.server.list()
32 if len(groups) > 0:
33 self.assertEqual(GroupInfo, type(groups[0]))
34 self.assertEqual(str, type(groups[0].group))
35
36 def test_list_active(self):
37 resp, groups = self.server.list(self.GROUP_PAT)
38 if len(groups) > 0:
39 self.assertEqual(GroupInfo, type(groups[0]))
40 self.assertEqual(str, type(groups[0].group))
Antoine Pitrou69ab9512010-09-29 15:03:40 +000041
42 def test_unknown_command(self):
43 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
44 self.server._shortcmd("XYZZY")
45 resp = cm.exception.response
46 self.assertTrue(resp.startswith("500 "), resp)
47
48 def test_newgroups(self):
49 # gmane gets a constant influx of new groups. In order not to stress
50 # the server too much, we choose a recent date in the past.
51 dt = datetime.date.today() - datetime.timedelta(days=7)
52 resp, groups = self.server.newgroups(dt)
53 if len(groups) > 0:
54 self.assertIsInstance(groups[0], GroupInfo)
55 self.assertIsInstance(groups[0].group, str)
56
57 def test_description(self):
58 def _check_desc(desc):
59 # Sanity checks
60 self.assertIsInstance(desc, str)
61 self.assertNotIn(self.GROUP_NAME, desc)
62 desc = self.server.description(self.GROUP_NAME)
63 _check_desc(desc)
64 # Another sanity check
65 self.assertIn("Python", desc)
66 # With a pattern
67 desc = self.server.description(self.GROUP_PAT)
68 _check_desc(desc)
69 # Shouldn't exist
70 desc = self.server.description("zk.brrtt.baz")
71 self.assertEqual(desc, '')
72
73 def test_descriptions(self):
74 resp, descs = self.server.descriptions(self.GROUP_PAT)
75 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
76 self.assertTrue(
77 resp.startswith("215 ") or resp.startswith("282 "), resp)
78 self.assertIsInstance(descs, dict)
79 desc = descs[self.GROUP_NAME]
80 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
81
82 def test_group(self):
83 result = self.server.group(self.GROUP_NAME)
84 self.assertEqual(5, len(result))
85 resp, count, first, last, group = result
86 self.assertEqual(group, self.GROUP_NAME)
87 self.assertIsInstance(count, int)
88 self.assertIsInstance(first, int)
89 self.assertIsInstance(last, int)
90 self.assertLessEqual(first, last)
91 self.assertTrue(resp.startswith("211 "), resp)
92
93 def test_date(self):
94 resp, date = self.server.date()
95 self.assertIsInstance(date, datetime.datetime)
96 # Sanity check
97 self.assertGreaterEqual(date.year, 1995)
98 self.assertLessEqual(date.year, 2030)
99
100 def _check_art_dict(self, art_dict):
101 # Some sanity checks for a field dictionary returned by OVER / XOVER
102 self.assertIsInstance(art_dict, dict)
103 # NNTP has 7 mandatory fields
104 self.assertGreaterEqual(art_dict.keys(),
105 {"subject", "from", "date", "message-id",
106 "references", ":bytes", ":lines"}
107 )
108 for v in art_dict.values():
109 self.assertIsInstance(v, str)
110
111 def test_xover(self):
112 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
113 resp, lines = self.server.xover(last, last)
114 art_num, art_dict = lines[0]
115 self.assertEqual(art_num, last)
116 self._check_art_dict(art_dict)
117
118 def test_over(self):
119 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
120 start = last - 10
121 # The "start-" article range form
122 resp, lines = self.server.over((start, None))
123 art_num, art_dict = lines[0]
124 self._check_art_dict(art_dict)
125 # The "start-end" article range form
126 resp, lines = self.server.over((start, last))
127 art_num, art_dict = lines[-1]
128 self.assertEqual(art_num, last)
129 self._check_art_dict(art_dict)
130 # XXX The "message_id" form is unsupported by gmane
131 # 503 Overview by message-ID unsupported
132
133 def test_xhdr(self):
134 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
135 resp, lines = self.server.xhdr('subject', last)
136 for line in lines:
137 self.assertEqual(str, type(line[1]))
138
139 def check_article_resp(self, resp, article, art_num=None):
140 self.assertIsInstance(article, nntplib.ArticleInfo)
141 if art_num is not None:
142 self.assertEqual(article.number, art_num)
143 for line in article.lines:
144 self.assertIsInstance(line, bytes)
145 # XXX this could exceptionally happen...
146 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
147
148 def test_article_head_body(self):
149 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
150 resp, head = self.server.head(last)
151 self.assertTrue(resp.startswith("221 "), resp)
152 self.check_article_resp(resp, head, last)
153 resp, body = self.server.body(last)
154 self.assertTrue(resp.startswith("222 "), resp)
155 self.check_article_resp(resp, body, last)
156 resp, article = self.server.article(last)
157 self.assertTrue(resp.startswith("220 "), resp)
158 self.check_article_resp(resp, article, last)
159 self.assertEqual(article.lines, head.lines + [b''] + body.lines)
160
161 def test_quit(self):
162 self.server.quit()
163 self.server = None
164
165
166class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
167 NNTP_HOST = 'news.gmane.org'
168 GROUP_NAME = 'gmane.comp.python.devel'
169 GROUP_PAT = 'gmane.comp.python.d*'
170
171 def setUp(self):
172 support.requires("network")
173 with support.transient_internet(self.NNTP_HOST):
Antoine Pitrou2620d812010-09-29 16:08:29 +0000174 self.server = NNTP(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000175
176 def tearDown(self):
177 if self.server is not None:
178 self.server.quit()
179
180 # Disabled with gmane as it produces too much data
181 test_list = None
182
183 def test_capabilities(self):
184 # As of this writing, gmane implements NNTP version 2 and has a
185 # couple of well-known capabilities. Just sanity check that we
186 # got them.
187 def _check_caps(caps):
188 caps_list = caps['LIST']
189 self.assertIsInstance(caps_list, (list, tuple))
190 self.assertIn('OVERVIEW.FMT', caps_list)
191 self.assertGreaterEqual(self.server.nntp_version, 2)
192 _check_caps(self.server.getcapabilities())
193 # This re-emits the command
194 resp, caps = self.server.capabilities()
195 _check_caps(caps)
196
197
198#
199# Non-networked tests using a local server (or something mocking it).
200#
201
202class _NNTPServerIO(io.RawIOBase):
203 """A raw IO object allowing NNTP commands to be received and processed
204 by a handler. The handler can push responses which can then be read
205 from the IO object."""
206
207 def __init__(self, handler):
208 io.RawIOBase.__init__(self)
209 # The channel from the client
210 self.c2s = io.BytesIO()
211 # The channel to the client
212 self.s2c = io.BytesIO()
213 self.handler = handler
214 self.handler.start(self.c2s.readline, self.push_data)
215
216 def readable(self):
217 return True
218
219 def writable(self):
220 return True
221
222 def push_data(self, data):
223 """Push (buffer) some data to send to the client."""
224 pos = self.s2c.tell()
225 self.s2c.seek(0, 2)
226 self.s2c.write(data)
227 self.s2c.seek(pos)
228
229 def write(self, b):
230 """The client sends us some data"""
231 pos = self.c2s.tell()
232 self.c2s.write(b)
233 self.c2s.seek(pos)
234 self.handler.process_pending()
235 return len(b)
236
237 def readinto(self, buf):
238 """The client wants to read a response"""
239 self.handler.process_pending()
240 b = self.s2c.read(len(buf))
241 n = len(b)
242 buf[:n] = b
243 return n
244
245
246class MockedNNTPTestsMixin:
247 # Override in derived classes
248 handler_class = None
249
250 def setUp(self):
251 super().setUp()
252 self.make_server()
253
254 def tearDown(self):
255 super().tearDown()
256 del self.server
257
258 def make_server(self, *args, **kwargs):
259 self.handler = self.handler_class()
260 self.sio = _NNTPServerIO(self.handler)
261 # Using BufferedRWPair instead of BufferedRandom ensures the file
262 # isn't seekable.
263 file = io.BufferedRWPair(self.sio, self.sio)
Antoine Pitrou2620d812010-09-29 16:08:29 +0000264 kwargs.setdefault('usenetrc', False)
Antoine Pitroua5785b12010-09-29 16:19:50 +0000265 self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000266 return self.server
267
268
269class NNTPv1Handler:
270 """A handler for RFC 977"""
271
272 welcome = "200 NNTP mock server"
273
274 def start(self, readline, push_data):
275 self.in_body = False
276 self.allow_posting = True
277 self._readline = readline
278 self._push_data = push_data
279 # Our welcome
280 self.handle_welcome()
281
282 def _decode(self, data):
283 return str(data, "utf-8", "surrogateescape")
284
285 def process_pending(self):
286 if self.in_body:
287 while True:
288 line = self._readline()
289 if not line:
290 return
291 self.body.append(line)
292 if line == b".\r\n":
293 break
294 try:
295 meth, tokens = self.body_callback
296 meth(*tokens, body=self.body)
297 finally:
298 self.body_callback = None
299 self.body = None
300 self.in_body = False
301 while True:
302 line = self._decode(self._readline())
303 if not line:
304 return
305 if not line.endswith("\r\n"):
306 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
307 line = line[:-2]
308 cmd, *tokens = line.split()
309 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
310 meth = getattr(self, "handle_" + cmd.upper(), None)
311 if meth is None:
312 self.handle_unknown()
313 else:
314 try:
315 meth(*tokens)
316 except Exception as e:
317 raise ValueError("command failed: {!r}".format(line)) from e
318 else:
319 if self.in_body:
320 self.body_callback = meth, tokens
321 self.body = []
322
323 def expect_body(self):
324 """Flag that the client is expected to post a request body"""
325 self.in_body = True
326
327 def push_data(self, data):
328 """Push some binary data"""
329 self._push_data(data)
330
331 def push_lit(self, lit):
332 """Push a string literal"""
333 lit = textwrap.dedent(lit)
334 lit = "\r\n".join(lit.splitlines()) + "\r\n"
335 lit = lit.encode('utf-8')
336 self.push_data(lit)
337
338 def handle_unknown(self):
339 self.push_lit("500 What?")
340
341 def handle_welcome(self):
342 self.push_lit(self.welcome)
343
344 def handle_QUIT(self):
345 self.push_lit("205 Bye!")
346
347 def handle_DATE(self):
348 self.push_lit("111 20100914001155")
349
350 def handle_GROUP(self, group):
351 if group == "fr.comp.lang.python":
352 self.push_lit("211 486 761 1265 fr.comp.lang.python")
353 else:
354 self.push_lit("411 No such group {}".format(group))
355
356 def handle_HELP(self):
357 self.push_lit("""\
358 100 Legal commands
359 authinfo user Name|pass Password|generic <prog> <args>
360 date
361 help
362 Report problems to <root@example.org>
363 .""")
364
365 def handle_STAT(self, message_spec=None):
366 if message_spec is None:
367 self.push_lit("412 No newsgroup selected")
368 elif message_spec == "3000234":
369 self.push_lit("223 3000234 <45223423@example.com>")
370 elif message_spec == "<45223423@example.com>":
371 self.push_lit("223 0 <45223423@example.com>")
372 else:
373 self.push_lit("430 No Such Article Found")
374
375 def handle_NEXT(self):
376 self.push_lit("223 3000237 <668929@example.org> retrieved")
377
378 def handle_LAST(self):
379 self.push_lit("223 3000234 <45223423@example.com> retrieved")
380
381 def handle_LIST(self, action=None, param=None):
382 if action is None:
383 self.push_lit("""\
384 215 Newsgroups in form "group high low flags".
385 comp.lang.python 0000052340 0000002828 y
386 comp.lang.python.announce 0000001153 0000000993 m
387 free.it.comp.lang.python 0000000002 0000000002 y
388 fr.comp.lang.python 0000001254 0000000760 y
389 free.it.comp.lang.python.learner 0000000000 0000000001 y
390 tw.bbs.comp.lang.python 0000000304 0000000304 y
391 .""")
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000392 elif action == "ACTIVE":
393 if param == "*distutils*":
394 self.push_lit("""\
395 215 Newsgroups in form "group high low flags"
396 gmane.comp.python.distutils.devel 0000014104 0000000001 m
397 gmane.comp.python.distutils.cvs 0000000000 0000000001 m
398 .""")
399 else:
400 self.push_lit("""\
401 215 Newsgroups in form "group high low flags"
402 .""")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000403 elif action == "OVERVIEW.FMT":
404 self.push_lit("""\
405 215 Order of fields in overview database.
406 Subject:
407 From:
408 Date:
409 Message-ID:
410 References:
411 Bytes:
412 Lines:
413 Xref:full
414 .""")
415 elif action == "NEWSGROUPS":
416 assert param is not None
417 if param == "comp.lang.python":
418 self.push_lit("""\
419 215 Descriptions in form "group description".
420 comp.lang.python\tThe Python computer language.
421 .""")
422 elif param == "comp.lang.python*":
423 self.push_lit("""\
424 215 Descriptions in form "group description".
425 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
426 comp.lang.python\tThe Python computer language.
427 .""")
428 else:
429 self.push_lit("""\
430 215 Descriptions in form "group description".
431 .""")
432 else:
433 self.push_lit('501 Unknown LIST keyword')
434
435 def handle_NEWNEWS(self, group, date_str, time_str):
436 # We hard code different return messages depending on passed
437 # argument and date syntax.
438 if (group == "comp.lang.python" and date_str == "20100913"
439 and time_str == "082004"):
440 # Date was passed in RFC 3977 format (NNTP "v2")
441 self.push_lit("""\
442 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
443 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
444 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
445 .""")
446 elif (group == "comp.lang.python" and date_str == "100913"
447 and time_str == "082004"):
448 # Date was passed in RFC 977 format (NNTP "v1")
449 self.push_lit("""\
450 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
451 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
452 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
453 .""")
454 else:
455 self.push_lit("""\
456 230 An empty list of newsarticles follows
457 .""")
458 # (Note for experiments: many servers disable NEWNEWS.
459 # As of this writing, sicinfo3.epfl.ch doesn't.)
460
461 def handle_XOVER(self, message_spec):
462 if message_spec == "57-59":
463 self.push_lit(
464 "224 Overview information for 57-58 follows\n"
465 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
466 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
467 "\tSat, 19 Jun 2010 18:04:08 -0400"
468 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
469 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
470 "\tXref: news.gmane.org gmane.comp.python.authors:57"
471 "\n"
472 "58\tLooking for a few good bloggers"
473 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
474 "\tThu, 22 Jul 2010 09:14:14 -0400"
475 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
476 "\t\t6683\t16"
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000477 "\t"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000478 "\n"
479 # An UTF-8 overview line from fr.comp.lang.python
480 "59\tRe: Message d'erreur incompréhensible (par moi)"
481 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
482 "\tWed, 15 Sep 2010 18:09:15 +0200"
483 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
484 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
485 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
486 "\n"
487 ".\n")
488 else:
489 self.push_lit("""\
490 224 No articles
491 .""")
492
493 def handle_POST(self, *, body=None):
494 if body is None:
495 if self.allow_posting:
496 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
497 self.expect_body()
498 else:
499 self.push_lit("440 Posting not permitted")
500 else:
501 assert self.allow_posting
502 self.push_lit("240 Article received OK")
503 self.posted_body = body
504
505 def handle_IHAVE(self, message_id, *, body=None):
506 if body is None:
507 if (self.allow_posting and
508 message_id == "<i.am.an.article.you.will.want@example.com>"):
509 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
510 self.expect_body()
511 else:
512 self.push_lit("435 Article not wanted")
513 else:
514 assert self.allow_posting
515 self.push_lit("235 Article transferred OK")
516 self.posted_body = body
517
518 sample_head = """\
519 From: "Demo User" <nobody@example.net>
520 Subject: I am just a test article
521 Content-Type: text/plain; charset=UTF-8; format=flowed
522 Message-ID: <i.am.an.article.you.will.want@example.com>"""
523
524 sample_body = """\
525 This is just a test article.
526 ..Here is a dot-starting line.
527
528 -- Signed by Andr\xe9."""
529
530 sample_article = sample_head + "\n\n" + sample_body
531
532 def handle_ARTICLE(self, message_spec=None):
533 if message_spec is None:
534 self.push_lit("220 3000237 <45223423@example.com>")
535 elif message_spec == "<45223423@example.com>":
536 self.push_lit("220 0 <45223423@example.com>")
537 elif message_spec == "3000234":
538 self.push_lit("220 3000234 <45223423@example.com>")
539 else:
540 self.push_lit("430 No Such Article Found")
541 return
542 self.push_lit(self.sample_article)
543 self.push_lit(".")
544
545 def handle_HEAD(self, message_spec=None):
546 if message_spec is None:
547 self.push_lit("221 3000237 <45223423@example.com>")
548 elif message_spec == "<45223423@example.com>":
549 self.push_lit("221 0 <45223423@example.com>")
550 elif message_spec == "3000234":
551 self.push_lit("221 3000234 <45223423@example.com>")
552 else:
553 self.push_lit("430 No Such Article Found")
554 return
555 self.push_lit(self.sample_head)
556 self.push_lit(".")
557
558 def handle_BODY(self, message_spec=None):
559 if message_spec is None:
560 self.push_lit("222 3000237 <45223423@example.com>")
561 elif message_spec == "<45223423@example.com>":
562 self.push_lit("222 0 <45223423@example.com>")
563 elif message_spec == "3000234":
564 self.push_lit("222 3000234 <45223423@example.com>")
565 else:
566 self.push_lit("430 No Such Article Found")
567 return
568 self.push_lit(self.sample_body)
569 self.push_lit(".")
570
571
572class NNTPv2Handler(NNTPv1Handler):
573 """A handler for RFC 3977 (NNTP "v2")"""
574
575 def handle_CAPABILITIES(self):
576 self.push_lit("""\
577 101 Capability list:
Antoine Pitrouf80b3f72010-11-02 22:31:52 +0000578 VERSION 2 3
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000579 IMPLEMENTATION INN 2.5.1
580 AUTHINFO USER
581 HDR
582 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
583 OVER
584 POST
585 READER
586 .""")
587
588 def handle_OVER(self, message_spec=None):
589 return self.handle_XOVER(message_spec)
590
591
592class NNTPv1v2TestsMixin:
593
594 def setUp(self):
595 super().setUp()
596
597 def test_welcome(self):
598 self.assertEqual(self.server.welcome, self.handler.welcome)
599
600 def test_date(self):
601 resp, date = self.server.date()
602 self.assertEqual(resp, "111 20100914001155")
603 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
604
605 def test_quit(self):
606 self.assertFalse(self.sio.closed)
607 resp = self.server.quit()
608 self.assertEqual(resp, "205 Bye!")
609 self.assertTrue(self.sio.closed)
610
611 def test_help(self):
612 resp, help = self.server.help()
613 self.assertEqual(resp, "100 Legal commands")
614 self.assertEqual(help, [
615 ' authinfo user Name|pass Password|generic <prog> <args>',
616 ' date',
617 ' help',
618 'Report problems to <root@example.org>',
619 ])
620
621 def test_list(self):
622 resp, groups = self.server.list()
623 self.assertEqual(len(groups), 6)
624 g = groups[1]
625 self.assertEqual(g,
626 GroupInfo("comp.lang.python.announce", "0000001153",
627 "0000000993", "m"))
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000628 resp, groups = self.server.list("*distutils*")
629 self.assertEqual(len(groups), 2)
630 g = groups[0]
631 self.assertEqual(g,
632 GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
633 "0000000001", "m"))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000634
635 def test_stat(self):
636 resp, art_num, message_id = self.server.stat(3000234)
637 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
638 self.assertEqual(art_num, 3000234)
639 self.assertEqual(message_id, "<45223423@example.com>")
640 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
641 self.assertEqual(resp, "223 0 <45223423@example.com>")
642 self.assertEqual(art_num, 0)
643 self.assertEqual(message_id, "<45223423@example.com>")
644 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
645 self.server.stat("<non.existent.id>")
646 self.assertEqual(cm.exception.response, "430 No Such Article Found")
647 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
648 self.server.stat()
649 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
650
651 def test_next(self):
652 resp, art_num, message_id = self.server.next()
653 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
654 self.assertEqual(art_num, 3000237)
655 self.assertEqual(message_id, "<668929@example.org>")
656
657 def test_last(self):
658 resp, art_num, message_id = self.server.last()
659 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
660 self.assertEqual(art_num, 3000234)
661 self.assertEqual(message_id, "<45223423@example.com>")
662
663 def test_description(self):
664 desc = self.server.description("comp.lang.python")
665 self.assertEqual(desc, "The Python computer language.")
666 desc = self.server.description("comp.lang.pythonx")
667 self.assertEqual(desc, "")
668
669 def test_descriptions(self):
670 resp, groups = self.server.descriptions("comp.lang.python")
671 self.assertEqual(resp, '215 Descriptions in form "group description".')
672 self.assertEqual(groups, {
673 "comp.lang.python": "The Python computer language.",
674 })
675 resp, groups = self.server.descriptions("comp.lang.python*")
676 self.assertEqual(groups, {
677 "comp.lang.python": "The Python computer language.",
678 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
679 })
680 resp, groups = self.server.descriptions("comp.lang.pythonx")
681 self.assertEqual(groups, {})
682
683 def test_group(self):
684 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
685 self.assertTrue(resp.startswith("211 "), resp)
686 self.assertEqual(first, 761)
687 self.assertEqual(last, 1265)
688 self.assertEqual(count, 486)
689 self.assertEqual(group, "fr.comp.lang.python")
690 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
691 self.server.group("comp.lang.python.devel")
692 exc = cm.exception
693 self.assertTrue(exc.response.startswith("411 No such group"),
694 exc.response)
695
696 def test_newnews(self):
697 # NEWNEWS comp.lang.python [20]100913 082004
698 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
699 resp, ids = self.server.newnews("comp.lang.python", dt)
700 expected = (
701 "230 list of newsarticles (NNTP v{0}) "
702 "created after Mon Sep 13 08:20:04 2010 follows"
703 ).format(self.nntp_version)
704 self.assertEqual(resp, expected)
705 self.assertEqual(ids, [
706 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
707 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
708 ])
709 # NEWNEWS fr.comp.lang.python [20]100913 082004
710 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
711 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
712 self.assertEqual(resp, "230 An empty list of newsarticles follows")
713 self.assertEqual(ids, [])
714
715 def _check_article_body(self, lines):
716 self.assertEqual(len(lines), 4)
717 self.assertEqual(lines[-1].decode('utf8'), "-- Signed by André.")
718 self.assertEqual(lines[-2], b"")
719 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
720 self.assertEqual(lines[-4], b"This is just a test article.")
721
722 def _check_article_head(self, lines):
723 self.assertEqual(len(lines), 4)
724 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
725 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
726
727 def _check_article_data(self, lines):
728 self.assertEqual(len(lines), 9)
729 self._check_article_head(lines[:4])
730 self._check_article_body(lines[-4:])
731 self.assertEqual(lines[4], b"")
732
733 def test_article(self):
734 # ARTICLE
735 resp, info = self.server.article()
736 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
737 art_num, message_id, lines = info
738 self.assertEqual(art_num, 3000237)
739 self.assertEqual(message_id, "<45223423@example.com>")
740 self._check_article_data(lines)
741 # ARTICLE num
742 resp, info = self.server.article(3000234)
743 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
744 art_num, message_id, lines = info
745 self.assertEqual(art_num, 3000234)
746 self.assertEqual(message_id, "<45223423@example.com>")
747 self._check_article_data(lines)
748 # ARTICLE id
749 resp, info = self.server.article("<45223423@example.com>")
750 self.assertEqual(resp, "220 0 <45223423@example.com>")
751 art_num, message_id, lines = info
752 self.assertEqual(art_num, 0)
753 self.assertEqual(message_id, "<45223423@example.com>")
754 self._check_article_data(lines)
755 # Non-existent id
756 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
757 self.server.article("<non-existent@example.com>")
758 self.assertEqual(cm.exception.response, "430 No Such Article Found")
759
760 def test_article_file(self):
761 # With a "file" argument
762 f = io.BytesIO()
763 resp, info = self.server.article(file=f)
764 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
765 art_num, message_id, lines = info
766 self.assertEqual(art_num, 3000237)
767 self.assertEqual(message_id, "<45223423@example.com>")
768 self.assertEqual(lines, [])
769 data = f.getvalue()
770 self.assertTrue(data.startswith(
771 b'From: "Demo User" <nobody@example.net>\r\n'
772 b'Subject: I am just a test article\r\n'
773 ), ascii(data))
774 self.assertTrue(data.endswith(
775 b'This is just a test article.\r\n'
776 b'.Here is a dot-starting line.\r\n'
777 b'\r\n'
778 b'-- Signed by Andr\xc3\xa9.\r\n'
779 ), ascii(data))
780
781 def test_head(self):
782 # HEAD
783 resp, info = self.server.head()
784 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
785 art_num, message_id, lines = info
786 self.assertEqual(art_num, 3000237)
787 self.assertEqual(message_id, "<45223423@example.com>")
788 self._check_article_head(lines)
789 # HEAD num
790 resp, info = self.server.head(3000234)
791 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
792 art_num, message_id, lines = info
793 self.assertEqual(art_num, 3000234)
794 self.assertEqual(message_id, "<45223423@example.com>")
795 self._check_article_head(lines)
796 # HEAD id
797 resp, info = self.server.head("<45223423@example.com>")
798 self.assertEqual(resp, "221 0 <45223423@example.com>")
799 art_num, message_id, lines = info
800 self.assertEqual(art_num, 0)
801 self.assertEqual(message_id, "<45223423@example.com>")
802 self._check_article_head(lines)
803 # Non-existent id
804 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
805 self.server.head("<non-existent@example.com>")
806 self.assertEqual(cm.exception.response, "430 No Such Article Found")
807
808 def test_body(self):
809 # BODY
810 resp, info = self.server.body()
811 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
812 art_num, message_id, lines = info
813 self.assertEqual(art_num, 3000237)
814 self.assertEqual(message_id, "<45223423@example.com>")
815 self._check_article_body(lines)
816 # BODY num
817 resp, info = self.server.body(3000234)
818 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
819 art_num, message_id, lines = info
820 self.assertEqual(art_num, 3000234)
821 self.assertEqual(message_id, "<45223423@example.com>")
822 self._check_article_body(lines)
823 # BODY id
824 resp, info = self.server.body("<45223423@example.com>")
825 self.assertEqual(resp, "222 0 <45223423@example.com>")
826 art_num, message_id, lines = info
827 self.assertEqual(art_num, 0)
828 self.assertEqual(message_id, "<45223423@example.com>")
829 self._check_article_body(lines)
830 # Non-existent id
831 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
832 self.server.body("<non-existent@example.com>")
833 self.assertEqual(cm.exception.response, "430 No Such Article Found")
834
835 def check_over_xover_resp(self, resp, overviews):
836 self.assertTrue(resp.startswith("224 "), resp)
837 self.assertEqual(len(overviews), 3)
838 art_num, over = overviews[0]
839 self.assertEqual(art_num, 57)
840 self.assertEqual(over, {
841 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
842 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
843 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
844 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
845 "references": "<hvalf7$ort$1@dough.gmane.org>",
846 ":bytes": "7103",
847 ":lines": "16",
848 "xref": "news.gmane.org gmane.comp.python.authors:57"
849 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000850 art_num, over = overviews[1]
851 self.assertEqual(over["xref"], None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000852 art_num, over = overviews[2]
853 self.assertEqual(over["subject"],
854 "Re: Message d'erreur incompréhensible (par moi)")
855
856 def test_xover(self):
857 resp, overviews = self.server.xover(57, 59)
858 self.check_over_xover_resp(resp, overviews)
859
860 def test_over(self):
861 # In NNTP "v1", this will fallback on XOVER
862 resp, overviews = self.server.over((57, 59))
863 self.check_over_xover_resp(resp, overviews)
864
865 sample_post = (
866 b'From: "Demo User" <nobody@example.net>\r\n'
867 b'Subject: I am just a test article\r\n'
868 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
869 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
870 b'\r\n'
871 b'This is just a test article.\r\n'
872 b'.Here is a dot-starting line.\r\n'
873 b'\r\n'
874 b'-- Signed by Andr\xc3\xa9.\r\n'
875 )
876
877 def _check_posted_body(self):
878 # Check the raw body as received by the server
879 lines = self.handler.posted_body
880 # One additional line for the "." terminator
881 self.assertEqual(len(lines), 10)
882 self.assertEqual(lines[-1], b'.\r\n')
883 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
884 self.assertEqual(lines[-3], b'\r\n')
885 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
886 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
887
888 def _check_post_ihave_sub(self, func, *args, file_factory):
889 # First the prepared post with CRLF endings
890 post = self.sample_post
891 func_args = args + (file_factory(post),)
892 self.handler.posted_body = None
893 resp = func(*func_args)
894 self._check_posted_body()
895 # Then the same post with "normal" line endings - they should be
896 # converted by NNTP.post and NNTP.ihave.
897 post = self.sample_post.replace(b"\r\n", b"\n")
898 func_args = args + (file_factory(post),)
899 self.handler.posted_body = None
900 resp = func(*func_args)
901 self._check_posted_body()
902 return resp
903
904 def check_post_ihave(self, func, success_resp, *args):
905 # With a bytes object
906 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
907 self.assertEqual(resp, success_resp)
908 # With a bytearray object
909 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
910 self.assertEqual(resp, success_resp)
911 # With a file object
912 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
913 self.assertEqual(resp, success_resp)
914 # With an iterable of terminated lines
915 def iterlines(b):
916 return iter(b.splitlines(True))
917 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
918 self.assertEqual(resp, success_resp)
919 # With an iterable of non-terminated lines
920 def iterlines(b):
921 return iter(b.splitlines(False))
922 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
923 self.assertEqual(resp, success_resp)
924
925 def test_post(self):
926 self.check_post_ihave(self.server.post, "240 Article received OK")
927 self.handler.allow_posting = False
928 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
929 self.server.post(self.sample_post)
930 self.assertEqual(cm.exception.response,
931 "440 Posting not permitted")
932
933 def test_ihave(self):
934 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
935 "<i.am.an.article.you.will.want@example.com>")
936 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
937 self.server.ihave("<another.message.id>", self.sample_post)
938 self.assertEqual(cm.exception.response,
939 "435 Article not wanted")
940
941
942class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
943 """Tests an NNTP v1 server (no capabilities)."""
944
945 nntp_version = 1
946 handler_class = NNTPv1Handler
947
948 def test_caps(self):
949 caps = self.server.getcapabilities()
950 self.assertEqual(caps, {})
951 self.assertEqual(self.server.nntp_version, 1)
952
953
954class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
955 """Tests an NNTP v2 server (with capabilities)."""
956
957 nntp_version = 2
958 handler_class = NNTPv2Handler
959
960 def test_caps(self):
961 caps = self.server.getcapabilities()
962 self.assertEqual(caps, {
Antoine Pitrouf80b3f72010-11-02 22:31:52 +0000963 'VERSION': ['2', '3'],
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000964 'IMPLEMENTATION': ['INN', '2.5.1'],
965 'AUTHINFO': ['USER'],
966 'HDR': [],
967 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
968 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
969 'OVER': [],
970 'POST': [],
971 'READER': [],
972 })
Antoine Pitrouf80b3f72010-11-02 22:31:52 +0000973 self.assertEqual(self.server.nntp_version, 3)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000974
975
976class MiscTests(unittest.TestCase):
977
978 def test_decode_header(self):
979 def gives(a, b):
980 self.assertEqual(nntplib.decode_header(a), b)
981 gives("" , "")
982 gives("a plain header", "a plain header")
983 gives(" with extra spaces ", " with extra spaces ")
984 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
985 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
986 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
987 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
988 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
989 "Re: problème de matrice")
990 # A natively utf-8 header (found in the real world!)
991 gives("Re: Message d'erreur incompréhensible (par moi)",
992 "Re: Message d'erreur incompréhensible (par moi)")
993
994 def test_parse_overview_fmt(self):
995 # The minimal (default) response
996 lines = ["Subject:", "From:", "Date:", "Message-ID:",
997 "References:", ":bytes", ":lines"]
998 self.assertEqual(nntplib._parse_overview_fmt(lines),
999 ["subject", "from", "date", "message-id", "references",
1000 ":bytes", ":lines"])
1001 # The minimal response using alternative names
1002 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1003 "References:", "Bytes:", "Lines:"]
1004 self.assertEqual(nntplib._parse_overview_fmt(lines),
1005 ["subject", "from", "date", "message-id", "references",
1006 ":bytes", ":lines"])
1007 # Variations in casing
1008 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1009 "References:", "BYTES:", "Lines:"]
1010 self.assertEqual(nntplib._parse_overview_fmt(lines),
1011 ["subject", "from", "date", "message-id", "references",
1012 ":bytes", ":lines"])
1013 # First example from RFC 3977
1014 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1015 "References:", ":bytes", ":lines", "Xref:full",
1016 "Distribution:full"]
1017 self.assertEqual(nntplib._parse_overview_fmt(lines),
1018 ["subject", "from", "date", "message-id", "references",
1019 ":bytes", ":lines", "xref", "distribution"])
1020 # Second example from RFC 3977
1021 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1022 "References:", "Bytes:", "Lines:", "Xref:FULL",
1023 "Distribution:FULL"]
1024 self.assertEqual(nntplib._parse_overview_fmt(lines),
1025 ["subject", "from", "date", "message-id", "references",
1026 ":bytes", ":lines", "xref", "distribution"])
1027 # A classic response from INN
1028 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1029 "References:", "Bytes:", "Lines:", "Xref:full"]
1030 self.assertEqual(nntplib._parse_overview_fmt(lines),
1031 ["subject", "from", "date", "message-id", "references",
1032 ":bytes", ":lines", "xref"])
1033
1034 def test_parse_overview(self):
1035 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1036 # First example from RFC 3977
1037 lines = [
1038 '3000234\tI am just a test article\t"Demo User" '
1039 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1040 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1041 '17\tXref: news.example.com misc.test:3000363',
1042 ]
1043 overview = nntplib._parse_overview(lines, fmt)
1044 (art_num, fields), = overview
1045 self.assertEqual(art_num, 3000234)
1046 self.assertEqual(fields, {
1047 'subject': 'I am just a test article',
1048 'from': '"Demo User" <nobody@example.com>',
1049 'date': '6 Oct 1998 04:38:40 -0500',
1050 'message-id': '<45223423@example.com>',
1051 'references': '<45454@example.net>',
1052 ':bytes': '1234',
1053 ':lines': '17',
1054 'xref': 'news.example.com misc.test:3000363',
1055 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001056 # Second example; here the "Xref" field is totally absent (including
1057 # the header name) and comes out as None
1058 lines = [
1059 '3000234\tI am just a test article\t"Demo User" '
1060 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1061 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1062 '17\t\t',
1063 ]
1064 overview = nntplib._parse_overview(lines, fmt)
1065 (art_num, fields), = overview
1066 self.assertEqual(fields['xref'], None)
1067 # Third example; the "Xref" is an empty string, while "references"
1068 # is a single space.
1069 lines = [
1070 '3000234\tI am just a test article\t"Demo User" '
1071 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1072 '<45223423@example.com>\t \t1234\t'
1073 '17\tXref: \t',
1074 ]
1075 overview = nntplib._parse_overview(lines, fmt)
1076 (art_num, fields), = overview
1077 self.assertEqual(fields['references'], ' ')
1078 self.assertEqual(fields['xref'], '')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001079
1080 def test_parse_datetime(self):
1081 def gives(a, b, *c):
1082 self.assertEqual(nntplib._parse_datetime(a, b),
1083 datetime.datetime(*c))
1084 # Output of DATE command
1085 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1086 # Variations
1087 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1088 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1089 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1090
1091 def test_unparse_datetime(self):
1092 # Test non-legacy mode
1093 # 1) with a datetime
1094 def gives(y, M, d, h, m, s, date_str, time_str):
1095 dt = datetime.datetime(y, M, d, h, m, s)
1096 self.assertEqual(nntplib._unparse_datetime(dt),
1097 (date_str, time_str))
1098 self.assertEqual(nntplib._unparse_datetime(dt, False),
1099 (date_str, time_str))
1100 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1101 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1102 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1103 # 2) with a date
1104 def gives(y, M, d, date_str, time_str):
1105 dt = datetime.date(y, M, d)
1106 self.assertEqual(nntplib._unparse_datetime(dt),
1107 (date_str, time_str))
1108 self.assertEqual(nntplib._unparse_datetime(dt, False),
1109 (date_str, time_str))
1110 gives(1999, 6, 23, "19990623", "000000")
1111 gives(2000, 6, 23, "20000623", "000000")
1112 gives(2010, 6, 5, "20100605", "000000")
1113
1114 def test_unparse_datetime_legacy(self):
1115 # Test legacy mode (RFC 977)
1116 # 1) with a datetime
1117 def gives(y, M, d, h, m, s, date_str, time_str):
1118 dt = datetime.datetime(y, M, d, h, m, s)
1119 self.assertEqual(nntplib._unparse_datetime(dt, True),
1120 (date_str, time_str))
1121 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1122 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1123 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1124 # 2) with a date
1125 def gives(y, M, d, date_str, time_str):
1126 dt = datetime.date(y, M, d)
1127 self.assertEqual(nntplib._unparse_datetime(dt, True),
1128 (date_str, time_str))
1129 gives(1999, 6, 23, "990623", "000000")
1130 gives(2000, 6, 23, "000623", "000000")
1131 gives(2010, 6, 5, "100605", "000000")
1132
1133
1134def test_main():
1135 support.run_unittest(MiscTests, NNTPv1Tests, NNTPv2Tests,
1136 NetworkedNNTPTests
1137 )
1138
1139
1140if __name__ == "__main__":
1141 test_main()