blob: 85334bbefc36af92eaece91915985a8ec1a76173 [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
2import datetime
3import textwrap
4import unittest
5import contextlib
6from test import support
7from nntplib import NNTP, GroupInfo
8import nntplib
9
10TIMEOUT = 30
11
12# TODO:
13# - test the `file` arg to more commands
14# - test error conditions
15
16
17class NetworkedNNTPTestsMixin:
18
19 def test_welcome(self):
20 welcome = self.server.getwelcome()
21 self.assertEqual(str, type(welcome))
22
23 def test_help(self):
24 resp, list = self.server.help()
25 self.assertTrue(resp.startswith("100 "), resp)
26 for line in list:
27 self.assertEqual(str, type(line))
28
29 def test_list(self):
30 resp, list = self.server.list()
31 if len(list) > 0:
32 self.assertEqual(GroupInfo, type(list[0]))
33 self.assertEqual(str, type(list[0].group))
34
35 def test_unknown_command(self):
36 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
37 self.server._shortcmd("XYZZY")
38 resp = cm.exception.response
39 self.assertTrue(resp.startswith("500 "), resp)
40
41 def test_newgroups(self):
42 # gmane gets a constant influx of new groups. In order not to stress
43 # the server too much, we choose a recent date in the past.
44 dt = datetime.date.today() - datetime.timedelta(days=7)
45 resp, groups = self.server.newgroups(dt)
46 if len(groups) > 0:
47 self.assertIsInstance(groups[0], GroupInfo)
48 self.assertIsInstance(groups[0].group, str)
49
50 def test_description(self):
51 def _check_desc(desc):
52 # Sanity checks
53 self.assertIsInstance(desc, str)
54 self.assertNotIn(self.GROUP_NAME, desc)
55 desc = self.server.description(self.GROUP_NAME)
56 _check_desc(desc)
57 # Another sanity check
58 self.assertIn("Python", desc)
59 # With a pattern
60 desc = self.server.description(self.GROUP_PAT)
61 _check_desc(desc)
62 # Shouldn't exist
63 desc = self.server.description("zk.brrtt.baz")
64 self.assertEqual(desc, '')
65
66 def test_descriptions(self):
67 resp, descs = self.server.descriptions(self.GROUP_PAT)
68 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
69 self.assertTrue(
70 resp.startswith("215 ") or resp.startswith("282 "), resp)
71 self.assertIsInstance(descs, dict)
72 desc = descs[self.GROUP_NAME]
73 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
74
75 def test_group(self):
76 result = self.server.group(self.GROUP_NAME)
77 self.assertEqual(5, len(result))
78 resp, count, first, last, group = result
79 self.assertEqual(group, self.GROUP_NAME)
80 self.assertIsInstance(count, int)
81 self.assertIsInstance(first, int)
82 self.assertIsInstance(last, int)
83 self.assertLessEqual(first, last)
84 self.assertTrue(resp.startswith("211 "), resp)
85
86 def test_date(self):
87 resp, date = self.server.date()
88 self.assertIsInstance(date, datetime.datetime)
89 # Sanity check
90 self.assertGreaterEqual(date.year, 1995)
91 self.assertLessEqual(date.year, 2030)
92
93 def _check_art_dict(self, art_dict):
94 # Some sanity checks for a field dictionary returned by OVER / XOVER
95 self.assertIsInstance(art_dict, dict)
96 # NNTP has 7 mandatory fields
97 self.assertGreaterEqual(art_dict.keys(),
98 {"subject", "from", "date", "message-id",
99 "references", ":bytes", ":lines"}
100 )
101 for v in art_dict.values():
102 self.assertIsInstance(v, str)
103
104 def test_xover(self):
105 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
106 resp, lines = self.server.xover(last, last)
107 art_num, art_dict = lines[0]
108 self.assertEqual(art_num, last)
109 self._check_art_dict(art_dict)
110
111 def test_over(self):
112 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
113 start = last - 10
114 # The "start-" article range form
115 resp, lines = self.server.over((start, None))
116 art_num, art_dict = lines[0]
117 self._check_art_dict(art_dict)
118 # The "start-end" article range form
119 resp, lines = self.server.over((start, last))
120 art_num, art_dict = lines[-1]
121 self.assertEqual(art_num, last)
122 self._check_art_dict(art_dict)
123 # XXX The "message_id" form is unsupported by gmane
124 # 503 Overview by message-ID unsupported
125
126 def test_xhdr(self):
127 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
128 resp, lines = self.server.xhdr('subject', last)
129 for line in lines:
130 self.assertEqual(str, type(line[1]))
131
132 def check_article_resp(self, resp, article, art_num=None):
133 self.assertIsInstance(article, nntplib.ArticleInfo)
134 if art_num is not None:
135 self.assertEqual(article.number, art_num)
136 for line in article.lines:
137 self.assertIsInstance(line, bytes)
138 # XXX this could exceptionally happen...
139 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
140
141 def test_article_head_body(self):
142 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
143 resp, head = self.server.head(last)
144 self.assertTrue(resp.startswith("221 "), resp)
145 self.check_article_resp(resp, head, last)
146 resp, body = self.server.body(last)
147 self.assertTrue(resp.startswith("222 "), resp)
148 self.check_article_resp(resp, body, last)
149 resp, article = self.server.article(last)
150 self.assertTrue(resp.startswith("220 "), resp)
151 self.check_article_resp(resp, article, last)
152 self.assertEqual(article.lines, head.lines + [b''] + body.lines)
153
154 def test_quit(self):
155 self.server.quit()
156 self.server = None
157
158
159class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
160 NNTP_HOST = 'news.gmane.org'
161 GROUP_NAME = 'gmane.comp.python.devel'
162 GROUP_PAT = 'gmane.comp.python.d*'
163
164 def setUp(self):
165 support.requires("network")
166 with support.transient_internet(self.NNTP_HOST):
Antoine Pitrou2620d812010-09-29 16:08:29 +0000167 self.server = NNTP(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000168
169 def tearDown(self):
170 if self.server is not None:
171 self.server.quit()
172
173 # Disabled with gmane as it produces too much data
174 test_list = None
175
176 def test_capabilities(self):
177 # As of this writing, gmane implements NNTP version 2 and has a
178 # couple of well-known capabilities. Just sanity check that we
179 # got them.
180 def _check_caps(caps):
181 caps_list = caps['LIST']
182 self.assertIsInstance(caps_list, (list, tuple))
183 self.assertIn('OVERVIEW.FMT', caps_list)
184 self.assertGreaterEqual(self.server.nntp_version, 2)
185 _check_caps(self.server.getcapabilities())
186 # This re-emits the command
187 resp, caps = self.server.capabilities()
188 _check_caps(caps)
189
190
191#
192# Non-networked tests using a local server (or something mocking it).
193#
194
195class _NNTPServerIO(io.RawIOBase):
196 """A raw IO object allowing NNTP commands to be received and processed
197 by a handler. The handler can push responses which can then be read
198 from the IO object."""
199
200 def __init__(self, handler):
201 io.RawIOBase.__init__(self)
202 # The channel from the client
203 self.c2s = io.BytesIO()
204 # The channel to the client
205 self.s2c = io.BytesIO()
206 self.handler = handler
207 self.handler.start(self.c2s.readline, self.push_data)
208
209 def readable(self):
210 return True
211
212 def writable(self):
213 return True
214
215 def push_data(self, data):
216 """Push (buffer) some data to send to the client."""
217 pos = self.s2c.tell()
218 self.s2c.seek(0, 2)
219 self.s2c.write(data)
220 self.s2c.seek(pos)
221
222 def write(self, b):
223 """The client sends us some data"""
224 pos = self.c2s.tell()
225 self.c2s.write(b)
226 self.c2s.seek(pos)
227 self.handler.process_pending()
228 return len(b)
229
230 def readinto(self, buf):
231 """The client wants to read a response"""
232 self.handler.process_pending()
233 b = self.s2c.read(len(buf))
234 n = len(b)
235 buf[:n] = b
236 return n
237
238
239class MockedNNTPTestsMixin:
240 # Override in derived classes
241 handler_class = None
242
243 def setUp(self):
244 super().setUp()
245 self.make_server()
246
247 def tearDown(self):
248 super().tearDown()
249 del self.server
250
251 def make_server(self, *args, **kwargs):
252 self.handler = self.handler_class()
253 self.sio = _NNTPServerIO(self.handler)
254 # Using BufferedRWPair instead of BufferedRandom ensures the file
255 # isn't seekable.
256 file = io.BufferedRWPair(self.sio, self.sio)
Antoine Pitrou2620d812010-09-29 16:08:29 +0000257 kwargs.setdefault('usenetrc', False)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000258 self.server = nntplib._NNTPBase(file, *args, **kwargs)
259 return self.server
260
261
262class NNTPv1Handler:
263 """A handler for RFC 977"""
264
265 welcome = "200 NNTP mock server"
266
267 def start(self, readline, push_data):
268 self.in_body = False
269 self.allow_posting = True
270 self._readline = readline
271 self._push_data = push_data
272 # Our welcome
273 self.handle_welcome()
274
275 def _decode(self, data):
276 return str(data, "utf-8", "surrogateescape")
277
278 def process_pending(self):
279 if self.in_body:
280 while True:
281 line = self._readline()
282 if not line:
283 return
284 self.body.append(line)
285 if line == b".\r\n":
286 break
287 try:
288 meth, tokens = self.body_callback
289 meth(*tokens, body=self.body)
290 finally:
291 self.body_callback = None
292 self.body = None
293 self.in_body = False
294 while True:
295 line = self._decode(self._readline())
296 if not line:
297 return
298 if not line.endswith("\r\n"):
299 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
300 line = line[:-2]
301 cmd, *tokens = line.split()
302 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
303 meth = getattr(self, "handle_" + cmd.upper(), None)
304 if meth is None:
305 self.handle_unknown()
306 else:
307 try:
308 meth(*tokens)
309 except Exception as e:
310 raise ValueError("command failed: {!r}".format(line)) from e
311 else:
312 if self.in_body:
313 self.body_callback = meth, tokens
314 self.body = []
315
316 def expect_body(self):
317 """Flag that the client is expected to post a request body"""
318 self.in_body = True
319
320 def push_data(self, data):
321 """Push some binary data"""
322 self._push_data(data)
323
324 def push_lit(self, lit):
325 """Push a string literal"""
326 lit = textwrap.dedent(lit)
327 lit = "\r\n".join(lit.splitlines()) + "\r\n"
328 lit = lit.encode('utf-8')
329 self.push_data(lit)
330
331 def handle_unknown(self):
332 self.push_lit("500 What?")
333
334 def handle_welcome(self):
335 self.push_lit(self.welcome)
336
337 def handle_QUIT(self):
338 self.push_lit("205 Bye!")
339
340 def handle_DATE(self):
341 self.push_lit("111 20100914001155")
342
343 def handle_GROUP(self, group):
344 if group == "fr.comp.lang.python":
345 self.push_lit("211 486 761 1265 fr.comp.lang.python")
346 else:
347 self.push_lit("411 No such group {}".format(group))
348
349 def handle_HELP(self):
350 self.push_lit("""\
351 100 Legal commands
352 authinfo user Name|pass Password|generic <prog> <args>
353 date
354 help
355 Report problems to <root@example.org>
356 .""")
357
358 def handle_STAT(self, message_spec=None):
359 if message_spec is None:
360 self.push_lit("412 No newsgroup selected")
361 elif message_spec == "3000234":
362 self.push_lit("223 3000234 <45223423@example.com>")
363 elif message_spec == "<45223423@example.com>":
364 self.push_lit("223 0 <45223423@example.com>")
365 else:
366 self.push_lit("430 No Such Article Found")
367
368 def handle_NEXT(self):
369 self.push_lit("223 3000237 <668929@example.org> retrieved")
370
371 def handle_LAST(self):
372 self.push_lit("223 3000234 <45223423@example.com> retrieved")
373
374 def handle_LIST(self, action=None, param=None):
375 if action is None:
376 self.push_lit("""\
377 215 Newsgroups in form "group high low flags".
378 comp.lang.python 0000052340 0000002828 y
379 comp.lang.python.announce 0000001153 0000000993 m
380 free.it.comp.lang.python 0000000002 0000000002 y
381 fr.comp.lang.python 0000001254 0000000760 y
382 free.it.comp.lang.python.learner 0000000000 0000000001 y
383 tw.bbs.comp.lang.python 0000000304 0000000304 y
384 .""")
385 elif action == "OVERVIEW.FMT":
386 self.push_lit("""\
387 215 Order of fields in overview database.
388 Subject:
389 From:
390 Date:
391 Message-ID:
392 References:
393 Bytes:
394 Lines:
395 Xref:full
396 .""")
397 elif action == "NEWSGROUPS":
398 assert param is not None
399 if param == "comp.lang.python":
400 self.push_lit("""\
401 215 Descriptions in form "group description".
402 comp.lang.python\tThe Python computer language.
403 .""")
404 elif param == "comp.lang.python*":
405 self.push_lit("""\
406 215 Descriptions in form "group description".
407 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
408 comp.lang.python\tThe Python computer language.
409 .""")
410 else:
411 self.push_lit("""\
412 215 Descriptions in form "group description".
413 .""")
414 else:
415 self.push_lit('501 Unknown LIST keyword')
416
417 def handle_NEWNEWS(self, group, date_str, time_str):
418 # We hard code different return messages depending on passed
419 # argument and date syntax.
420 if (group == "comp.lang.python" and date_str == "20100913"
421 and time_str == "082004"):
422 # Date was passed in RFC 3977 format (NNTP "v2")
423 self.push_lit("""\
424 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
425 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
426 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
427 .""")
428 elif (group == "comp.lang.python" and date_str == "100913"
429 and time_str == "082004"):
430 # Date was passed in RFC 977 format (NNTP "v1")
431 self.push_lit("""\
432 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
433 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
434 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
435 .""")
436 else:
437 self.push_lit("""\
438 230 An empty list of newsarticles follows
439 .""")
440 # (Note for experiments: many servers disable NEWNEWS.
441 # As of this writing, sicinfo3.epfl.ch doesn't.)
442
443 def handle_XOVER(self, message_spec):
444 if message_spec == "57-59":
445 self.push_lit(
446 "224 Overview information for 57-58 follows\n"
447 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
448 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
449 "\tSat, 19 Jun 2010 18:04:08 -0400"
450 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
451 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
452 "\tXref: news.gmane.org gmane.comp.python.authors:57"
453 "\n"
454 "58\tLooking for a few good bloggers"
455 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
456 "\tThu, 22 Jul 2010 09:14:14 -0400"
457 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
458 "\t\t6683\t16"
459 "\tXref: news.gmane.org gmane.comp.python.authors:58"
460 "\n"
461 # An UTF-8 overview line from fr.comp.lang.python
462 "59\tRe: Message d'erreur incompréhensible (par moi)"
463 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
464 "\tWed, 15 Sep 2010 18:09:15 +0200"
465 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
466 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
467 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
468 "\n"
469 ".\n")
470 else:
471 self.push_lit("""\
472 224 No articles
473 .""")
474
475 def handle_POST(self, *, body=None):
476 if body is None:
477 if self.allow_posting:
478 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
479 self.expect_body()
480 else:
481 self.push_lit("440 Posting not permitted")
482 else:
483 assert self.allow_posting
484 self.push_lit("240 Article received OK")
485 self.posted_body = body
486
487 def handle_IHAVE(self, message_id, *, body=None):
488 if body is None:
489 if (self.allow_posting and
490 message_id == "<i.am.an.article.you.will.want@example.com>"):
491 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
492 self.expect_body()
493 else:
494 self.push_lit("435 Article not wanted")
495 else:
496 assert self.allow_posting
497 self.push_lit("235 Article transferred OK")
498 self.posted_body = body
499
500 sample_head = """\
501 From: "Demo User" <nobody@example.net>
502 Subject: I am just a test article
503 Content-Type: text/plain; charset=UTF-8; format=flowed
504 Message-ID: <i.am.an.article.you.will.want@example.com>"""
505
506 sample_body = """\
507 This is just a test article.
508 ..Here is a dot-starting line.
509
510 -- Signed by Andr\xe9."""
511
512 sample_article = sample_head + "\n\n" + sample_body
513
514 def handle_ARTICLE(self, message_spec=None):
515 if message_spec is None:
516 self.push_lit("220 3000237 <45223423@example.com>")
517 elif message_spec == "<45223423@example.com>":
518 self.push_lit("220 0 <45223423@example.com>")
519 elif message_spec == "3000234":
520 self.push_lit("220 3000234 <45223423@example.com>")
521 else:
522 self.push_lit("430 No Such Article Found")
523 return
524 self.push_lit(self.sample_article)
525 self.push_lit(".")
526
527 def handle_HEAD(self, message_spec=None):
528 if message_spec is None:
529 self.push_lit("221 3000237 <45223423@example.com>")
530 elif message_spec == "<45223423@example.com>":
531 self.push_lit("221 0 <45223423@example.com>")
532 elif message_spec == "3000234":
533 self.push_lit("221 3000234 <45223423@example.com>")
534 else:
535 self.push_lit("430 No Such Article Found")
536 return
537 self.push_lit(self.sample_head)
538 self.push_lit(".")
539
540 def handle_BODY(self, message_spec=None):
541 if message_spec is None:
542 self.push_lit("222 3000237 <45223423@example.com>")
543 elif message_spec == "<45223423@example.com>":
544 self.push_lit("222 0 <45223423@example.com>")
545 elif message_spec == "3000234":
546 self.push_lit("222 3000234 <45223423@example.com>")
547 else:
548 self.push_lit("430 No Such Article Found")
549 return
550 self.push_lit(self.sample_body)
551 self.push_lit(".")
552
553
554class NNTPv2Handler(NNTPv1Handler):
555 """A handler for RFC 3977 (NNTP "v2")"""
556
557 def handle_CAPABILITIES(self):
558 self.push_lit("""\
559 101 Capability list:
560 VERSION 2
561 IMPLEMENTATION INN 2.5.1
562 AUTHINFO USER
563 HDR
564 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
565 OVER
566 POST
567 READER
568 .""")
569
570 def handle_OVER(self, message_spec=None):
571 return self.handle_XOVER(message_spec)
572
573
574class NNTPv1v2TestsMixin:
575
576 def setUp(self):
577 super().setUp()
578
579 def test_welcome(self):
580 self.assertEqual(self.server.welcome, self.handler.welcome)
581
582 def test_date(self):
583 resp, date = self.server.date()
584 self.assertEqual(resp, "111 20100914001155")
585 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
586
587 def test_quit(self):
588 self.assertFalse(self.sio.closed)
589 resp = self.server.quit()
590 self.assertEqual(resp, "205 Bye!")
591 self.assertTrue(self.sio.closed)
592
593 def test_help(self):
594 resp, help = self.server.help()
595 self.assertEqual(resp, "100 Legal commands")
596 self.assertEqual(help, [
597 ' authinfo user Name|pass Password|generic <prog> <args>',
598 ' date',
599 ' help',
600 'Report problems to <root@example.org>',
601 ])
602
603 def test_list(self):
604 resp, groups = self.server.list()
605 self.assertEqual(len(groups), 6)
606 g = groups[1]
607 self.assertEqual(g,
608 GroupInfo("comp.lang.python.announce", "0000001153",
609 "0000000993", "m"))
610
611 def test_stat(self):
612 resp, art_num, message_id = self.server.stat(3000234)
613 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
614 self.assertEqual(art_num, 3000234)
615 self.assertEqual(message_id, "<45223423@example.com>")
616 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
617 self.assertEqual(resp, "223 0 <45223423@example.com>")
618 self.assertEqual(art_num, 0)
619 self.assertEqual(message_id, "<45223423@example.com>")
620 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
621 self.server.stat("<non.existent.id>")
622 self.assertEqual(cm.exception.response, "430 No Such Article Found")
623 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
624 self.server.stat()
625 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
626
627 def test_next(self):
628 resp, art_num, message_id = self.server.next()
629 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
630 self.assertEqual(art_num, 3000237)
631 self.assertEqual(message_id, "<668929@example.org>")
632
633 def test_last(self):
634 resp, art_num, message_id = self.server.last()
635 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
636 self.assertEqual(art_num, 3000234)
637 self.assertEqual(message_id, "<45223423@example.com>")
638
639 def test_description(self):
640 desc = self.server.description("comp.lang.python")
641 self.assertEqual(desc, "The Python computer language.")
642 desc = self.server.description("comp.lang.pythonx")
643 self.assertEqual(desc, "")
644
645 def test_descriptions(self):
646 resp, groups = self.server.descriptions("comp.lang.python")
647 self.assertEqual(resp, '215 Descriptions in form "group description".')
648 self.assertEqual(groups, {
649 "comp.lang.python": "The Python computer language.",
650 })
651 resp, groups = self.server.descriptions("comp.lang.python*")
652 self.assertEqual(groups, {
653 "comp.lang.python": "The Python computer language.",
654 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
655 })
656 resp, groups = self.server.descriptions("comp.lang.pythonx")
657 self.assertEqual(groups, {})
658
659 def test_group(self):
660 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
661 self.assertTrue(resp.startswith("211 "), resp)
662 self.assertEqual(first, 761)
663 self.assertEqual(last, 1265)
664 self.assertEqual(count, 486)
665 self.assertEqual(group, "fr.comp.lang.python")
666 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
667 self.server.group("comp.lang.python.devel")
668 exc = cm.exception
669 self.assertTrue(exc.response.startswith("411 No such group"),
670 exc.response)
671
672 def test_newnews(self):
673 # NEWNEWS comp.lang.python [20]100913 082004
674 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
675 resp, ids = self.server.newnews("comp.lang.python", dt)
676 expected = (
677 "230 list of newsarticles (NNTP v{0}) "
678 "created after Mon Sep 13 08:20:04 2010 follows"
679 ).format(self.nntp_version)
680 self.assertEqual(resp, expected)
681 self.assertEqual(ids, [
682 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
683 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
684 ])
685 # NEWNEWS fr.comp.lang.python [20]100913 082004
686 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
687 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
688 self.assertEqual(resp, "230 An empty list of newsarticles follows")
689 self.assertEqual(ids, [])
690
691 def _check_article_body(self, lines):
692 self.assertEqual(len(lines), 4)
693 self.assertEqual(lines[-1].decode('utf8'), "-- Signed by André.")
694 self.assertEqual(lines[-2], b"")
695 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
696 self.assertEqual(lines[-4], b"This is just a test article.")
697
698 def _check_article_head(self, lines):
699 self.assertEqual(len(lines), 4)
700 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
701 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
702
703 def _check_article_data(self, lines):
704 self.assertEqual(len(lines), 9)
705 self._check_article_head(lines[:4])
706 self._check_article_body(lines[-4:])
707 self.assertEqual(lines[4], b"")
708
709 def test_article(self):
710 # ARTICLE
711 resp, info = self.server.article()
712 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
713 art_num, message_id, lines = info
714 self.assertEqual(art_num, 3000237)
715 self.assertEqual(message_id, "<45223423@example.com>")
716 self._check_article_data(lines)
717 # ARTICLE num
718 resp, info = self.server.article(3000234)
719 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
720 art_num, message_id, lines = info
721 self.assertEqual(art_num, 3000234)
722 self.assertEqual(message_id, "<45223423@example.com>")
723 self._check_article_data(lines)
724 # ARTICLE id
725 resp, info = self.server.article("<45223423@example.com>")
726 self.assertEqual(resp, "220 0 <45223423@example.com>")
727 art_num, message_id, lines = info
728 self.assertEqual(art_num, 0)
729 self.assertEqual(message_id, "<45223423@example.com>")
730 self._check_article_data(lines)
731 # Non-existent id
732 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
733 self.server.article("<non-existent@example.com>")
734 self.assertEqual(cm.exception.response, "430 No Such Article Found")
735
736 def test_article_file(self):
737 # With a "file" argument
738 f = io.BytesIO()
739 resp, info = self.server.article(file=f)
740 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
741 art_num, message_id, lines = info
742 self.assertEqual(art_num, 3000237)
743 self.assertEqual(message_id, "<45223423@example.com>")
744 self.assertEqual(lines, [])
745 data = f.getvalue()
746 self.assertTrue(data.startswith(
747 b'From: "Demo User" <nobody@example.net>\r\n'
748 b'Subject: I am just a test article\r\n'
749 ), ascii(data))
750 self.assertTrue(data.endswith(
751 b'This is just a test article.\r\n'
752 b'.Here is a dot-starting line.\r\n'
753 b'\r\n'
754 b'-- Signed by Andr\xc3\xa9.\r\n'
755 ), ascii(data))
756
757 def test_head(self):
758 # HEAD
759 resp, info = self.server.head()
760 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
761 art_num, message_id, lines = info
762 self.assertEqual(art_num, 3000237)
763 self.assertEqual(message_id, "<45223423@example.com>")
764 self._check_article_head(lines)
765 # HEAD num
766 resp, info = self.server.head(3000234)
767 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
768 art_num, message_id, lines = info
769 self.assertEqual(art_num, 3000234)
770 self.assertEqual(message_id, "<45223423@example.com>")
771 self._check_article_head(lines)
772 # HEAD id
773 resp, info = self.server.head("<45223423@example.com>")
774 self.assertEqual(resp, "221 0 <45223423@example.com>")
775 art_num, message_id, lines = info
776 self.assertEqual(art_num, 0)
777 self.assertEqual(message_id, "<45223423@example.com>")
778 self._check_article_head(lines)
779 # Non-existent id
780 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
781 self.server.head("<non-existent@example.com>")
782 self.assertEqual(cm.exception.response, "430 No Such Article Found")
783
784 def test_body(self):
785 # BODY
786 resp, info = self.server.body()
787 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
788 art_num, message_id, lines = info
789 self.assertEqual(art_num, 3000237)
790 self.assertEqual(message_id, "<45223423@example.com>")
791 self._check_article_body(lines)
792 # BODY num
793 resp, info = self.server.body(3000234)
794 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
795 art_num, message_id, lines = info
796 self.assertEqual(art_num, 3000234)
797 self.assertEqual(message_id, "<45223423@example.com>")
798 self._check_article_body(lines)
799 # BODY id
800 resp, info = self.server.body("<45223423@example.com>")
801 self.assertEqual(resp, "222 0 <45223423@example.com>")
802 art_num, message_id, lines = info
803 self.assertEqual(art_num, 0)
804 self.assertEqual(message_id, "<45223423@example.com>")
805 self._check_article_body(lines)
806 # Non-existent id
807 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
808 self.server.body("<non-existent@example.com>")
809 self.assertEqual(cm.exception.response, "430 No Such Article Found")
810
811 def check_over_xover_resp(self, resp, overviews):
812 self.assertTrue(resp.startswith("224 "), resp)
813 self.assertEqual(len(overviews), 3)
814 art_num, over = overviews[0]
815 self.assertEqual(art_num, 57)
816 self.assertEqual(over, {
817 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
818 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
819 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
820 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
821 "references": "<hvalf7$ort$1@dough.gmane.org>",
822 ":bytes": "7103",
823 ":lines": "16",
824 "xref": "news.gmane.org gmane.comp.python.authors:57"
825 })
826 art_num, over = overviews[2]
827 self.assertEqual(over["subject"],
828 "Re: Message d'erreur incompréhensible (par moi)")
829
830 def test_xover(self):
831 resp, overviews = self.server.xover(57, 59)
832 self.check_over_xover_resp(resp, overviews)
833
834 def test_over(self):
835 # In NNTP "v1", this will fallback on XOVER
836 resp, overviews = self.server.over((57, 59))
837 self.check_over_xover_resp(resp, overviews)
838
839 sample_post = (
840 b'From: "Demo User" <nobody@example.net>\r\n'
841 b'Subject: I am just a test article\r\n'
842 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
843 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
844 b'\r\n'
845 b'This is just a test article.\r\n'
846 b'.Here is a dot-starting line.\r\n'
847 b'\r\n'
848 b'-- Signed by Andr\xc3\xa9.\r\n'
849 )
850
851 def _check_posted_body(self):
852 # Check the raw body as received by the server
853 lines = self.handler.posted_body
854 # One additional line for the "." terminator
855 self.assertEqual(len(lines), 10)
856 self.assertEqual(lines[-1], b'.\r\n')
857 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
858 self.assertEqual(lines[-3], b'\r\n')
859 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
860 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
861
862 def _check_post_ihave_sub(self, func, *args, file_factory):
863 # First the prepared post with CRLF endings
864 post = self.sample_post
865 func_args = args + (file_factory(post),)
866 self.handler.posted_body = None
867 resp = func(*func_args)
868 self._check_posted_body()
869 # Then the same post with "normal" line endings - they should be
870 # converted by NNTP.post and NNTP.ihave.
871 post = self.sample_post.replace(b"\r\n", b"\n")
872 func_args = args + (file_factory(post),)
873 self.handler.posted_body = None
874 resp = func(*func_args)
875 self._check_posted_body()
876 return resp
877
878 def check_post_ihave(self, func, success_resp, *args):
879 # With a bytes object
880 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
881 self.assertEqual(resp, success_resp)
882 # With a bytearray object
883 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
884 self.assertEqual(resp, success_resp)
885 # With a file object
886 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
887 self.assertEqual(resp, success_resp)
888 # With an iterable of terminated lines
889 def iterlines(b):
890 return iter(b.splitlines(True))
891 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
892 self.assertEqual(resp, success_resp)
893 # With an iterable of non-terminated lines
894 def iterlines(b):
895 return iter(b.splitlines(False))
896 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
897 self.assertEqual(resp, success_resp)
898
899 def test_post(self):
900 self.check_post_ihave(self.server.post, "240 Article received OK")
901 self.handler.allow_posting = False
902 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
903 self.server.post(self.sample_post)
904 self.assertEqual(cm.exception.response,
905 "440 Posting not permitted")
906
907 def test_ihave(self):
908 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
909 "<i.am.an.article.you.will.want@example.com>")
910 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
911 self.server.ihave("<another.message.id>", self.sample_post)
912 self.assertEqual(cm.exception.response,
913 "435 Article not wanted")
914
915
916class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
917 """Tests an NNTP v1 server (no capabilities)."""
918
919 nntp_version = 1
920 handler_class = NNTPv1Handler
921
922 def test_caps(self):
923 caps = self.server.getcapabilities()
924 self.assertEqual(caps, {})
925 self.assertEqual(self.server.nntp_version, 1)
926
927
928class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
929 """Tests an NNTP v2 server (with capabilities)."""
930
931 nntp_version = 2
932 handler_class = NNTPv2Handler
933
934 def test_caps(self):
935 caps = self.server.getcapabilities()
936 self.assertEqual(caps, {
937 'VERSION': ['2'],
938 'IMPLEMENTATION': ['INN', '2.5.1'],
939 'AUTHINFO': ['USER'],
940 'HDR': [],
941 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
942 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
943 'OVER': [],
944 'POST': [],
945 'READER': [],
946 })
947 self.assertEqual(self.server.nntp_version, 2)
948
949
950class MiscTests(unittest.TestCase):
951
952 def test_decode_header(self):
953 def gives(a, b):
954 self.assertEqual(nntplib.decode_header(a), b)
955 gives("" , "")
956 gives("a plain header", "a plain header")
957 gives(" with extra spaces ", " with extra spaces ")
958 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
959 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
960 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
961 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
962 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
963 "Re: problème de matrice")
964 # A natively utf-8 header (found in the real world!)
965 gives("Re: Message d'erreur incompréhensible (par moi)",
966 "Re: Message d'erreur incompréhensible (par moi)")
967
968 def test_parse_overview_fmt(self):
969 # The minimal (default) response
970 lines = ["Subject:", "From:", "Date:", "Message-ID:",
971 "References:", ":bytes", ":lines"]
972 self.assertEqual(nntplib._parse_overview_fmt(lines),
973 ["subject", "from", "date", "message-id", "references",
974 ":bytes", ":lines"])
975 # The minimal response using alternative names
976 lines = ["Subject:", "From:", "Date:", "Message-ID:",
977 "References:", "Bytes:", "Lines:"]
978 self.assertEqual(nntplib._parse_overview_fmt(lines),
979 ["subject", "from", "date", "message-id", "references",
980 ":bytes", ":lines"])
981 # Variations in casing
982 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
983 "References:", "BYTES:", "Lines:"]
984 self.assertEqual(nntplib._parse_overview_fmt(lines),
985 ["subject", "from", "date", "message-id", "references",
986 ":bytes", ":lines"])
987 # First example from RFC 3977
988 lines = ["Subject:", "From:", "Date:", "Message-ID:",
989 "References:", ":bytes", ":lines", "Xref:full",
990 "Distribution:full"]
991 self.assertEqual(nntplib._parse_overview_fmt(lines),
992 ["subject", "from", "date", "message-id", "references",
993 ":bytes", ":lines", "xref", "distribution"])
994 # Second example from RFC 3977
995 lines = ["Subject:", "From:", "Date:", "Message-ID:",
996 "References:", "Bytes:", "Lines:", "Xref:FULL",
997 "Distribution:FULL"]
998 self.assertEqual(nntplib._parse_overview_fmt(lines),
999 ["subject", "from", "date", "message-id", "references",
1000 ":bytes", ":lines", "xref", "distribution"])
1001 # A classic response from INN
1002 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1003 "References:", "Bytes:", "Lines:", "Xref:full"]
1004 self.assertEqual(nntplib._parse_overview_fmt(lines),
1005 ["subject", "from", "date", "message-id", "references",
1006 ":bytes", ":lines", "xref"])
1007
1008 def test_parse_overview(self):
1009 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1010 # First example from RFC 3977
1011 lines = [
1012 '3000234\tI am just a test article\t"Demo User" '
1013 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1014 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1015 '17\tXref: news.example.com misc.test:3000363',
1016 ]
1017 overview = nntplib._parse_overview(lines, fmt)
1018 (art_num, fields), = overview
1019 self.assertEqual(art_num, 3000234)
1020 self.assertEqual(fields, {
1021 'subject': 'I am just a test article',
1022 'from': '"Demo User" <nobody@example.com>',
1023 'date': '6 Oct 1998 04:38:40 -0500',
1024 'message-id': '<45223423@example.com>',
1025 'references': '<45454@example.net>',
1026 ':bytes': '1234',
1027 ':lines': '17',
1028 'xref': 'news.example.com misc.test:3000363',
1029 })
1030
1031 def test_parse_datetime(self):
1032 def gives(a, b, *c):
1033 self.assertEqual(nntplib._parse_datetime(a, b),
1034 datetime.datetime(*c))
1035 # Output of DATE command
1036 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1037 # Variations
1038 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1039 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1040 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1041
1042 def test_unparse_datetime(self):
1043 # Test non-legacy mode
1044 # 1) with a datetime
1045 def gives(y, M, d, h, m, s, date_str, time_str):
1046 dt = datetime.datetime(y, M, d, h, m, s)
1047 self.assertEqual(nntplib._unparse_datetime(dt),
1048 (date_str, time_str))
1049 self.assertEqual(nntplib._unparse_datetime(dt, False),
1050 (date_str, time_str))
1051 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1052 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1053 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1054 # 2) with a date
1055 def gives(y, M, d, date_str, time_str):
1056 dt = datetime.date(y, M, d)
1057 self.assertEqual(nntplib._unparse_datetime(dt),
1058 (date_str, time_str))
1059 self.assertEqual(nntplib._unparse_datetime(dt, False),
1060 (date_str, time_str))
1061 gives(1999, 6, 23, "19990623", "000000")
1062 gives(2000, 6, 23, "20000623", "000000")
1063 gives(2010, 6, 5, "20100605", "000000")
1064
1065 def test_unparse_datetime_legacy(self):
1066 # Test legacy mode (RFC 977)
1067 # 1) with a datetime
1068 def gives(y, M, d, h, m, s, date_str, time_str):
1069 dt = datetime.datetime(y, M, d, h, m, s)
1070 self.assertEqual(nntplib._unparse_datetime(dt, True),
1071 (date_str, time_str))
1072 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1073 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1074 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1075 # 2) with a date
1076 def gives(y, M, d, date_str, time_str):
1077 dt = datetime.date(y, M, d)
1078 self.assertEqual(nntplib._unparse_datetime(dt, True),
1079 (date_str, time_str))
1080 gives(1999, 6, 23, "990623", "000000")
1081 gives(2000, 6, 23, "000623", "000000")
1082 gives(2010, 6, 5, "100605", "000000")
1083
1084
1085def test_main():
1086 support.run_unittest(MiscTests, NNTPv1Tests, NNTPv2Tests,
1087 NetworkedNNTPTests
1088 )
1089
1090
1091if __name__ == "__main__":
1092 test_main()