blob: dd100ef62c5ab98d2a97ebe2cb0e0c060d9087ce [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
2import datetime
3import textwrap
4import unittest
5import contextlib
6from test import support
7from nntplib import NNTP, GroupInfo
8import nntplib
9
10TIMEOUT = 30
11
12# TODO:
13# - test the `file` arg to more commands
14# - test error conditions
Antoine Pitroua5785b12010-09-29 16:19:50 +000015# - test auth and `usenetrc`
Antoine Pitrou69ab9512010-09-29 15:03:40 +000016
17
18class NetworkedNNTPTestsMixin:
19
20 def test_welcome(self):
21 welcome = self.server.getwelcome()
22 self.assertEqual(str, type(welcome))
23
24 def test_help(self):
25 resp, list = self.server.help()
26 self.assertTrue(resp.startswith("100 "), resp)
27 for line in list:
28 self.assertEqual(str, type(line))
29
30 def test_list(self):
31 resp, list = self.server.list()
32 if len(list) > 0:
33 self.assertEqual(GroupInfo, type(list[0]))
34 self.assertEqual(str, type(list[0].group))
35
36 def test_unknown_command(self):
37 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
38 self.server._shortcmd("XYZZY")
39 resp = cm.exception.response
40 self.assertTrue(resp.startswith("500 "), resp)
41
42 def test_newgroups(self):
43 # gmane gets a constant influx of new groups. In order not to stress
44 # the server too much, we choose a recent date in the past.
45 dt = datetime.date.today() - datetime.timedelta(days=7)
46 resp, groups = self.server.newgroups(dt)
47 if len(groups) > 0:
48 self.assertIsInstance(groups[0], GroupInfo)
49 self.assertIsInstance(groups[0].group, str)
50
51 def test_description(self):
52 def _check_desc(desc):
53 # Sanity checks
54 self.assertIsInstance(desc, str)
55 self.assertNotIn(self.GROUP_NAME, desc)
56 desc = self.server.description(self.GROUP_NAME)
57 _check_desc(desc)
58 # Another sanity check
59 self.assertIn("Python", desc)
60 # With a pattern
61 desc = self.server.description(self.GROUP_PAT)
62 _check_desc(desc)
63 # Shouldn't exist
64 desc = self.server.description("zk.brrtt.baz")
65 self.assertEqual(desc, '')
66
67 def test_descriptions(self):
68 resp, descs = self.server.descriptions(self.GROUP_PAT)
69 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
70 self.assertTrue(
71 resp.startswith("215 ") or resp.startswith("282 "), resp)
72 self.assertIsInstance(descs, dict)
73 desc = descs[self.GROUP_NAME]
74 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
75
76 def test_group(self):
77 result = self.server.group(self.GROUP_NAME)
78 self.assertEqual(5, len(result))
79 resp, count, first, last, group = result
80 self.assertEqual(group, self.GROUP_NAME)
81 self.assertIsInstance(count, int)
82 self.assertIsInstance(first, int)
83 self.assertIsInstance(last, int)
84 self.assertLessEqual(first, last)
85 self.assertTrue(resp.startswith("211 "), resp)
86
87 def test_date(self):
88 resp, date = self.server.date()
89 self.assertIsInstance(date, datetime.datetime)
90 # Sanity check
91 self.assertGreaterEqual(date.year, 1995)
92 self.assertLessEqual(date.year, 2030)
93
94 def _check_art_dict(self, art_dict):
95 # Some sanity checks for a field dictionary returned by OVER / XOVER
96 self.assertIsInstance(art_dict, dict)
97 # NNTP has 7 mandatory fields
98 self.assertGreaterEqual(art_dict.keys(),
99 {"subject", "from", "date", "message-id",
100 "references", ":bytes", ":lines"}
101 )
102 for v in art_dict.values():
103 self.assertIsInstance(v, str)
104
105 def test_xover(self):
106 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
107 resp, lines = self.server.xover(last, last)
108 art_num, art_dict = lines[0]
109 self.assertEqual(art_num, last)
110 self._check_art_dict(art_dict)
111
112 def test_over(self):
113 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
114 start = last - 10
115 # The "start-" article range form
116 resp, lines = self.server.over((start, None))
117 art_num, art_dict = lines[0]
118 self._check_art_dict(art_dict)
119 # The "start-end" article range form
120 resp, lines = self.server.over((start, last))
121 art_num, art_dict = lines[-1]
122 self.assertEqual(art_num, last)
123 self._check_art_dict(art_dict)
124 # XXX The "message_id" form is unsupported by gmane
125 # 503 Overview by message-ID unsupported
126
127 def test_xhdr(self):
128 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
129 resp, lines = self.server.xhdr('subject', last)
130 for line in lines:
131 self.assertEqual(str, type(line[1]))
132
133 def check_article_resp(self, resp, article, art_num=None):
134 self.assertIsInstance(article, nntplib.ArticleInfo)
135 if art_num is not None:
136 self.assertEqual(article.number, art_num)
137 for line in article.lines:
138 self.assertIsInstance(line, bytes)
139 # XXX this could exceptionally happen...
140 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
141
142 def test_article_head_body(self):
143 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
144 resp, head = self.server.head(last)
145 self.assertTrue(resp.startswith("221 "), resp)
146 self.check_article_resp(resp, head, last)
147 resp, body = self.server.body(last)
148 self.assertTrue(resp.startswith("222 "), resp)
149 self.check_article_resp(resp, body, last)
150 resp, article = self.server.article(last)
151 self.assertTrue(resp.startswith("220 "), resp)
152 self.check_article_resp(resp, article, last)
153 self.assertEqual(article.lines, head.lines + [b''] + body.lines)
154
155 def test_quit(self):
156 self.server.quit()
157 self.server = None
158
159
160class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
161 NNTP_HOST = 'news.gmane.org'
162 GROUP_NAME = 'gmane.comp.python.devel'
163 GROUP_PAT = 'gmane.comp.python.d*'
164
165 def setUp(self):
166 support.requires("network")
167 with support.transient_internet(self.NNTP_HOST):
Antoine Pitrou2620d812010-09-29 16:08:29 +0000168 self.server = NNTP(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000169
170 def tearDown(self):
171 if self.server is not None:
172 self.server.quit()
173
174 # Disabled with gmane as it produces too much data
175 test_list = None
176
177 def test_capabilities(self):
178 # As of this writing, gmane implements NNTP version 2 and has a
179 # couple of well-known capabilities. Just sanity check that we
180 # got them.
181 def _check_caps(caps):
182 caps_list = caps['LIST']
183 self.assertIsInstance(caps_list, (list, tuple))
184 self.assertIn('OVERVIEW.FMT', caps_list)
185 self.assertGreaterEqual(self.server.nntp_version, 2)
186 _check_caps(self.server.getcapabilities())
187 # This re-emits the command
188 resp, caps = self.server.capabilities()
189 _check_caps(caps)
190
191
192#
193# Non-networked tests using a local server (or something mocking it).
194#
195
196class _NNTPServerIO(io.RawIOBase):
197 """A raw IO object allowing NNTP commands to be received and processed
198 by a handler. The handler can push responses which can then be read
199 from the IO object."""
200
201 def __init__(self, handler):
202 io.RawIOBase.__init__(self)
203 # The channel from the client
204 self.c2s = io.BytesIO()
205 # The channel to the client
206 self.s2c = io.BytesIO()
207 self.handler = handler
208 self.handler.start(self.c2s.readline, self.push_data)
209
210 def readable(self):
211 return True
212
213 def writable(self):
214 return True
215
216 def push_data(self, data):
217 """Push (buffer) some data to send to the client."""
218 pos = self.s2c.tell()
219 self.s2c.seek(0, 2)
220 self.s2c.write(data)
221 self.s2c.seek(pos)
222
223 def write(self, b):
224 """The client sends us some data"""
225 pos = self.c2s.tell()
226 self.c2s.write(b)
227 self.c2s.seek(pos)
228 self.handler.process_pending()
229 return len(b)
230
231 def readinto(self, buf):
232 """The client wants to read a response"""
233 self.handler.process_pending()
234 b = self.s2c.read(len(buf))
235 n = len(b)
236 buf[:n] = b
237 return n
238
239
240class MockedNNTPTestsMixin:
241 # Override in derived classes
242 handler_class = None
243
244 def setUp(self):
245 super().setUp()
246 self.make_server()
247
248 def tearDown(self):
249 super().tearDown()
250 del self.server
251
252 def make_server(self, *args, **kwargs):
253 self.handler = self.handler_class()
254 self.sio = _NNTPServerIO(self.handler)
255 # Using BufferedRWPair instead of BufferedRandom ensures the file
256 # isn't seekable.
257 file = io.BufferedRWPair(self.sio, self.sio)
Antoine Pitrou2620d812010-09-29 16:08:29 +0000258 kwargs.setdefault('usenetrc', False)
Antoine Pitroua5785b12010-09-29 16:19:50 +0000259 self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000260 return self.server
261
262
263class NNTPv1Handler:
264 """A handler for RFC 977"""
265
266 welcome = "200 NNTP mock server"
267
268 def start(self, readline, push_data):
269 self.in_body = False
270 self.allow_posting = True
271 self._readline = readline
272 self._push_data = push_data
273 # Our welcome
274 self.handle_welcome()
275
276 def _decode(self, data):
277 return str(data, "utf-8", "surrogateescape")
278
279 def process_pending(self):
280 if self.in_body:
281 while True:
282 line = self._readline()
283 if not line:
284 return
285 self.body.append(line)
286 if line == b".\r\n":
287 break
288 try:
289 meth, tokens = self.body_callback
290 meth(*tokens, body=self.body)
291 finally:
292 self.body_callback = None
293 self.body = None
294 self.in_body = False
295 while True:
296 line = self._decode(self._readline())
297 if not line:
298 return
299 if not line.endswith("\r\n"):
300 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
301 line = line[:-2]
302 cmd, *tokens = line.split()
303 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
304 meth = getattr(self, "handle_" + cmd.upper(), None)
305 if meth is None:
306 self.handle_unknown()
307 else:
308 try:
309 meth(*tokens)
310 except Exception as e:
311 raise ValueError("command failed: {!r}".format(line)) from e
312 else:
313 if self.in_body:
314 self.body_callback = meth, tokens
315 self.body = []
316
317 def expect_body(self):
318 """Flag that the client is expected to post a request body"""
319 self.in_body = True
320
321 def push_data(self, data):
322 """Push some binary data"""
323 self._push_data(data)
324
325 def push_lit(self, lit):
326 """Push a string literal"""
327 lit = textwrap.dedent(lit)
328 lit = "\r\n".join(lit.splitlines()) + "\r\n"
329 lit = lit.encode('utf-8')
330 self.push_data(lit)
331
332 def handle_unknown(self):
333 self.push_lit("500 What?")
334
335 def handle_welcome(self):
336 self.push_lit(self.welcome)
337
338 def handle_QUIT(self):
339 self.push_lit("205 Bye!")
340
341 def handle_DATE(self):
342 self.push_lit("111 20100914001155")
343
344 def handle_GROUP(self, group):
345 if group == "fr.comp.lang.python":
346 self.push_lit("211 486 761 1265 fr.comp.lang.python")
347 else:
348 self.push_lit("411 No such group {}".format(group))
349
350 def handle_HELP(self):
351 self.push_lit("""\
352 100 Legal commands
353 authinfo user Name|pass Password|generic <prog> <args>
354 date
355 help
356 Report problems to <root@example.org>
357 .""")
358
359 def handle_STAT(self, message_spec=None):
360 if message_spec is None:
361 self.push_lit("412 No newsgroup selected")
362 elif message_spec == "3000234":
363 self.push_lit("223 3000234 <45223423@example.com>")
364 elif message_spec == "<45223423@example.com>":
365 self.push_lit("223 0 <45223423@example.com>")
366 else:
367 self.push_lit("430 No Such Article Found")
368
369 def handle_NEXT(self):
370 self.push_lit("223 3000237 <668929@example.org> retrieved")
371
372 def handle_LAST(self):
373 self.push_lit("223 3000234 <45223423@example.com> retrieved")
374
375 def handle_LIST(self, action=None, param=None):
376 if action is None:
377 self.push_lit("""\
378 215 Newsgroups in form "group high low flags".
379 comp.lang.python 0000052340 0000002828 y
380 comp.lang.python.announce 0000001153 0000000993 m
381 free.it.comp.lang.python 0000000002 0000000002 y
382 fr.comp.lang.python 0000001254 0000000760 y
383 free.it.comp.lang.python.learner 0000000000 0000000001 y
384 tw.bbs.comp.lang.python 0000000304 0000000304 y
385 .""")
386 elif action == "OVERVIEW.FMT":
387 self.push_lit("""\
388 215 Order of fields in overview database.
389 Subject:
390 From:
391 Date:
392 Message-ID:
393 References:
394 Bytes:
395 Lines:
396 Xref:full
397 .""")
398 elif action == "NEWSGROUPS":
399 assert param is not None
400 if param == "comp.lang.python":
401 self.push_lit("""\
402 215 Descriptions in form "group description".
403 comp.lang.python\tThe Python computer language.
404 .""")
405 elif param == "comp.lang.python*":
406 self.push_lit("""\
407 215 Descriptions in form "group description".
408 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
409 comp.lang.python\tThe Python computer language.
410 .""")
411 else:
412 self.push_lit("""\
413 215 Descriptions in form "group description".
414 .""")
415 else:
416 self.push_lit('501 Unknown LIST keyword')
417
418 def handle_NEWNEWS(self, group, date_str, time_str):
419 # We hard code different return messages depending on passed
420 # argument and date syntax.
421 if (group == "comp.lang.python" and date_str == "20100913"
422 and time_str == "082004"):
423 # Date was passed in RFC 3977 format (NNTP "v2")
424 self.push_lit("""\
425 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
426 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
427 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
428 .""")
429 elif (group == "comp.lang.python" and date_str == "100913"
430 and time_str == "082004"):
431 # Date was passed in RFC 977 format (NNTP "v1")
432 self.push_lit("""\
433 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
434 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
435 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
436 .""")
437 else:
438 self.push_lit("""\
439 230 An empty list of newsarticles follows
440 .""")
441 # (Note for experiments: many servers disable NEWNEWS.
442 # As of this writing, sicinfo3.epfl.ch doesn't.)
443
444 def handle_XOVER(self, message_spec):
445 if message_spec == "57-59":
446 self.push_lit(
447 "224 Overview information for 57-58 follows\n"
448 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
449 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
450 "\tSat, 19 Jun 2010 18:04:08 -0400"
451 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
452 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
453 "\tXref: news.gmane.org gmane.comp.python.authors:57"
454 "\n"
455 "58\tLooking for a few good bloggers"
456 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
457 "\tThu, 22 Jul 2010 09:14:14 -0400"
458 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
459 "\t\t6683\t16"
460 "\tXref: news.gmane.org gmane.comp.python.authors:58"
461 "\n"
462 # An UTF-8 overview line from fr.comp.lang.python
463 "59\tRe: Message d'erreur incompréhensible (par moi)"
464 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
465 "\tWed, 15 Sep 2010 18:09:15 +0200"
466 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
467 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
468 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
469 "\n"
470 ".\n")
471 else:
472 self.push_lit("""\
473 224 No articles
474 .""")
475
476 def handle_POST(self, *, body=None):
477 if body is None:
478 if self.allow_posting:
479 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
480 self.expect_body()
481 else:
482 self.push_lit("440 Posting not permitted")
483 else:
484 assert self.allow_posting
485 self.push_lit("240 Article received OK")
486 self.posted_body = body
487
488 def handle_IHAVE(self, message_id, *, body=None):
489 if body is None:
490 if (self.allow_posting and
491 message_id == "<i.am.an.article.you.will.want@example.com>"):
492 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
493 self.expect_body()
494 else:
495 self.push_lit("435 Article not wanted")
496 else:
497 assert self.allow_posting
498 self.push_lit("235 Article transferred OK")
499 self.posted_body = body
500
501 sample_head = """\
502 From: "Demo User" <nobody@example.net>
503 Subject: I am just a test article
504 Content-Type: text/plain; charset=UTF-8; format=flowed
505 Message-ID: <i.am.an.article.you.will.want@example.com>"""
506
507 sample_body = """\
508 This is just a test article.
509 ..Here is a dot-starting line.
510
511 -- Signed by Andr\xe9."""
512
513 sample_article = sample_head + "\n\n" + sample_body
514
515 def handle_ARTICLE(self, message_spec=None):
516 if message_spec is None:
517 self.push_lit("220 3000237 <45223423@example.com>")
518 elif message_spec == "<45223423@example.com>":
519 self.push_lit("220 0 <45223423@example.com>")
520 elif message_spec == "3000234":
521 self.push_lit("220 3000234 <45223423@example.com>")
522 else:
523 self.push_lit("430 No Such Article Found")
524 return
525 self.push_lit(self.sample_article)
526 self.push_lit(".")
527
528 def handle_HEAD(self, message_spec=None):
529 if message_spec is None:
530 self.push_lit("221 3000237 <45223423@example.com>")
531 elif message_spec == "<45223423@example.com>":
532 self.push_lit("221 0 <45223423@example.com>")
533 elif message_spec == "3000234":
534 self.push_lit("221 3000234 <45223423@example.com>")
535 else:
536 self.push_lit("430 No Such Article Found")
537 return
538 self.push_lit(self.sample_head)
539 self.push_lit(".")
540
541 def handle_BODY(self, message_spec=None):
542 if message_spec is None:
543 self.push_lit("222 3000237 <45223423@example.com>")
544 elif message_spec == "<45223423@example.com>":
545 self.push_lit("222 0 <45223423@example.com>")
546 elif message_spec == "3000234":
547 self.push_lit("222 3000234 <45223423@example.com>")
548 else:
549 self.push_lit("430 No Such Article Found")
550 return
551 self.push_lit(self.sample_body)
552 self.push_lit(".")
553
554
555class NNTPv2Handler(NNTPv1Handler):
556 """A handler for RFC 3977 (NNTP "v2")"""
557
558 def handle_CAPABILITIES(self):
559 self.push_lit("""\
560 101 Capability list:
561 VERSION 2
562 IMPLEMENTATION INN 2.5.1
563 AUTHINFO USER
564 HDR
565 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
566 OVER
567 POST
568 READER
569 .""")
570
571 def handle_OVER(self, message_spec=None):
572 return self.handle_XOVER(message_spec)
573
574
575class NNTPv1v2TestsMixin:
576
577 def setUp(self):
578 super().setUp()
579
580 def test_welcome(self):
581 self.assertEqual(self.server.welcome, self.handler.welcome)
582
583 def test_date(self):
584 resp, date = self.server.date()
585 self.assertEqual(resp, "111 20100914001155")
586 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
587
588 def test_quit(self):
589 self.assertFalse(self.sio.closed)
590 resp = self.server.quit()
591 self.assertEqual(resp, "205 Bye!")
592 self.assertTrue(self.sio.closed)
593
594 def test_help(self):
595 resp, help = self.server.help()
596 self.assertEqual(resp, "100 Legal commands")
597 self.assertEqual(help, [
598 ' authinfo user Name|pass Password|generic <prog> <args>',
599 ' date',
600 ' help',
601 'Report problems to <root@example.org>',
602 ])
603
604 def test_list(self):
605 resp, groups = self.server.list()
606 self.assertEqual(len(groups), 6)
607 g = groups[1]
608 self.assertEqual(g,
609 GroupInfo("comp.lang.python.announce", "0000001153",
610 "0000000993", "m"))
611
612 def test_stat(self):
613 resp, art_num, message_id = self.server.stat(3000234)
614 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
615 self.assertEqual(art_num, 3000234)
616 self.assertEqual(message_id, "<45223423@example.com>")
617 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
618 self.assertEqual(resp, "223 0 <45223423@example.com>")
619 self.assertEqual(art_num, 0)
620 self.assertEqual(message_id, "<45223423@example.com>")
621 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
622 self.server.stat("<non.existent.id>")
623 self.assertEqual(cm.exception.response, "430 No Such Article Found")
624 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
625 self.server.stat()
626 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
627
628 def test_next(self):
629 resp, art_num, message_id = self.server.next()
630 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
631 self.assertEqual(art_num, 3000237)
632 self.assertEqual(message_id, "<668929@example.org>")
633
634 def test_last(self):
635 resp, art_num, message_id = self.server.last()
636 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
637 self.assertEqual(art_num, 3000234)
638 self.assertEqual(message_id, "<45223423@example.com>")
639
640 def test_description(self):
641 desc = self.server.description("comp.lang.python")
642 self.assertEqual(desc, "The Python computer language.")
643 desc = self.server.description("comp.lang.pythonx")
644 self.assertEqual(desc, "")
645
646 def test_descriptions(self):
647 resp, groups = self.server.descriptions("comp.lang.python")
648 self.assertEqual(resp, '215 Descriptions in form "group description".')
649 self.assertEqual(groups, {
650 "comp.lang.python": "The Python computer language.",
651 })
652 resp, groups = self.server.descriptions("comp.lang.python*")
653 self.assertEqual(groups, {
654 "comp.lang.python": "The Python computer language.",
655 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
656 })
657 resp, groups = self.server.descriptions("comp.lang.pythonx")
658 self.assertEqual(groups, {})
659
660 def test_group(self):
661 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
662 self.assertTrue(resp.startswith("211 "), resp)
663 self.assertEqual(first, 761)
664 self.assertEqual(last, 1265)
665 self.assertEqual(count, 486)
666 self.assertEqual(group, "fr.comp.lang.python")
667 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
668 self.server.group("comp.lang.python.devel")
669 exc = cm.exception
670 self.assertTrue(exc.response.startswith("411 No such group"),
671 exc.response)
672
673 def test_newnews(self):
674 # NEWNEWS comp.lang.python [20]100913 082004
675 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
676 resp, ids = self.server.newnews("comp.lang.python", dt)
677 expected = (
678 "230 list of newsarticles (NNTP v{0}) "
679 "created after Mon Sep 13 08:20:04 2010 follows"
680 ).format(self.nntp_version)
681 self.assertEqual(resp, expected)
682 self.assertEqual(ids, [
683 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
684 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
685 ])
686 # NEWNEWS fr.comp.lang.python [20]100913 082004
687 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
688 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
689 self.assertEqual(resp, "230 An empty list of newsarticles follows")
690 self.assertEqual(ids, [])
691
692 def _check_article_body(self, lines):
693 self.assertEqual(len(lines), 4)
694 self.assertEqual(lines[-1].decode('utf8'), "-- Signed by André.")
695 self.assertEqual(lines[-2], b"")
696 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
697 self.assertEqual(lines[-4], b"This is just a test article.")
698
699 def _check_article_head(self, lines):
700 self.assertEqual(len(lines), 4)
701 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
702 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
703
704 def _check_article_data(self, lines):
705 self.assertEqual(len(lines), 9)
706 self._check_article_head(lines[:4])
707 self._check_article_body(lines[-4:])
708 self.assertEqual(lines[4], b"")
709
710 def test_article(self):
711 # ARTICLE
712 resp, info = self.server.article()
713 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
714 art_num, message_id, lines = info
715 self.assertEqual(art_num, 3000237)
716 self.assertEqual(message_id, "<45223423@example.com>")
717 self._check_article_data(lines)
718 # ARTICLE num
719 resp, info = self.server.article(3000234)
720 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
721 art_num, message_id, lines = info
722 self.assertEqual(art_num, 3000234)
723 self.assertEqual(message_id, "<45223423@example.com>")
724 self._check_article_data(lines)
725 # ARTICLE id
726 resp, info = self.server.article("<45223423@example.com>")
727 self.assertEqual(resp, "220 0 <45223423@example.com>")
728 art_num, message_id, lines = info
729 self.assertEqual(art_num, 0)
730 self.assertEqual(message_id, "<45223423@example.com>")
731 self._check_article_data(lines)
732 # Non-existent id
733 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
734 self.server.article("<non-existent@example.com>")
735 self.assertEqual(cm.exception.response, "430 No Such Article Found")
736
737 def test_article_file(self):
738 # With a "file" argument
739 f = io.BytesIO()
740 resp, info = self.server.article(file=f)
741 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
742 art_num, message_id, lines = info
743 self.assertEqual(art_num, 3000237)
744 self.assertEqual(message_id, "<45223423@example.com>")
745 self.assertEqual(lines, [])
746 data = f.getvalue()
747 self.assertTrue(data.startswith(
748 b'From: "Demo User" <nobody@example.net>\r\n'
749 b'Subject: I am just a test article\r\n'
750 ), ascii(data))
751 self.assertTrue(data.endswith(
752 b'This is just a test article.\r\n'
753 b'.Here is a dot-starting line.\r\n'
754 b'\r\n'
755 b'-- Signed by Andr\xc3\xa9.\r\n'
756 ), ascii(data))
757
758 def test_head(self):
759 # HEAD
760 resp, info = self.server.head()
761 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
762 art_num, message_id, lines = info
763 self.assertEqual(art_num, 3000237)
764 self.assertEqual(message_id, "<45223423@example.com>")
765 self._check_article_head(lines)
766 # HEAD num
767 resp, info = self.server.head(3000234)
768 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
769 art_num, message_id, lines = info
770 self.assertEqual(art_num, 3000234)
771 self.assertEqual(message_id, "<45223423@example.com>")
772 self._check_article_head(lines)
773 # HEAD id
774 resp, info = self.server.head("<45223423@example.com>")
775 self.assertEqual(resp, "221 0 <45223423@example.com>")
776 art_num, message_id, lines = info
777 self.assertEqual(art_num, 0)
778 self.assertEqual(message_id, "<45223423@example.com>")
779 self._check_article_head(lines)
780 # Non-existent id
781 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
782 self.server.head("<non-existent@example.com>")
783 self.assertEqual(cm.exception.response, "430 No Such Article Found")
784
785 def test_body(self):
786 # BODY
787 resp, info = self.server.body()
788 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
789 art_num, message_id, lines = info
790 self.assertEqual(art_num, 3000237)
791 self.assertEqual(message_id, "<45223423@example.com>")
792 self._check_article_body(lines)
793 # BODY num
794 resp, info = self.server.body(3000234)
795 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
796 art_num, message_id, lines = info
797 self.assertEqual(art_num, 3000234)
798 self.assertEqual(message_id, "<45223423@example.com>")
799 self._check_article_body(lines)
800 # BODY id
801 resp, info = self.server.body("<45223423@example.com>")
802 self.assertEqual(resp, "222 0 <45223423@example.com>")
803 art_num, message_id, lines = info
804 self.assertEqual(art_num, 0)
805 self.assertEqual(message_id, "<45223423@example.com>")
806 self._check_article_body(lines)
807 # Non-existent id
808 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
809 self.server.body("<non-existent@example.com>")
810 self.assertEqual(cm.exception.response, "430 No Such Article Found")
811
812 def check_over_xover_resp(self, resp, overviews):
813 self.assertTrue(resp.startswith("224 "), resp)
814 self.assertEqual(len(overviews), 3)
815 art_num, over = overviews[0]
816 self.assertEqual(art_num, 57)
817 self.assertEqual(over, {
818 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
819 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
820 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
821 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
822 "references": "<hvalf7$ort$1@dough.gmane.org>",
823 ":bytes": "7103",
824 ":lines": "16",
825 "xref": "news.gmane.org gmane.comp.python.authors:57"
826 })
827 art_num, over = overviews[2]
828 self.assertEqual(over["subject"],
829 "Re: Message d'erreur incompréhensible (par moi)")
830
831 def test_xover(self):
832 resp, overviews = self.server.xover(57, 59)
833 self.check_over_xover_resp(resp, overviews)
834
835 def test_over(self):
836 # In NNTP "v1", this will fallback on XOVER
837 resp, overviews = self.server.over((57, 59))
838 self.check_over_xover_resp(resp, overviews)
839
840 sample_post = (
841 b'From: "Demo User" <nobody@example.net>\r\n'
842 b'Subject: I am just a test article\r\n'
843 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
844 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
845 b'\r\n'
846 b'This is just a test article.\r\n'
847 b'.Here is a dot-starting line.\r\n'
848 b'\r\n'
849 b'-- Signed by Andr\xc3\xa9.\r\n'
850 )
851
852 def _check_posted_body(self):
853 # Check the raw body as received by the server
854 lines = self.handler.posted_body
855 # One additional line for the "." terminator
856 self.assertEqual(len(lines), 10)
857 self.assertEqual(lines[-1], b'.\r\n')
858 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
859 self.assertEqual(lines[-3], b'\r\n')
860 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
861 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
862
863 def _check_post_ihave_sub(self, func, *args, file_factory):
864 # First the prepared post with CRLF endings
865 post = self.sample_post
866 func_args = args + (file_factory(post),)
867 self.handler.posted_body = None
868 resp = func(*func_args)
869 self._check_posted_body()
870 # Then the same post with "normal" line endings - they should be
871 # converted by NNTP.post and NNTP.ihave.
872 post = self.sample_post.replace(b"\r\n", b"\n")
873 func_args = args + (file_factory(post),)
874 self.handler.posted_body = None
875 resp = func(*func_args)
876 self._check_posted_body()
877 return resp
878
879 def check_post_ihave(self, func, success_resp, *args):
880 # With a bytes object
881 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
882 self.assertEqual(resp, success_resp)
883 # With a bytearray object
884 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
885 self.assertEqual(resp, success_resp)
886 # With a file object
887 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
888 self.assertEqual(resp, success_resp)
889 # With an iterable of terminated lines
890 def iterlines(b):
891 return iter(b.splitlines(True))
892 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
893 self.assertEqual(resp, success_resp)
894 # With an iterable of non-terminated lines
895 def iterlines(b):
896 return iter(b.splitlines(False))
897 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
898 self.assertEqual(resp, success_resp)
899
900 def test_post(self):
901 self.check_post_ihave(self.server.post, "240 Article received OK")
902 self.handler.allow_posting = False
903 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
904 self.server.post(self.sample_post)
905 self.assertEqual(cm.exception.response,
906 "440 Posting not permitted")
907
908 def test_ihave(self):
909 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
910 "<i.am.an.article.you.will.want@example.com>")
911 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
912 self.server.ihave("<another.message.id>", self.sample_post)
913 self.assertEqual(cm.exception.response,
914 "435 Article not wanted")
915
916
917class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
918 """Tests an NNTP v1 server (no capabilities)."""
919
920 nntp_version = 1
921 handler_class = NNTPv1Handler
922
923 def test_caps(self):
924 caps = self.server.getcapabilities()
925 self.assertEqual(caps, {})
926 self.assertEqual(self.server.nntp_version, 1)
927
928
929class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
930 """Tests an NNTP v2 server (with capabilities)."""
931
932 nntp_version = 2
933 handler_class = NNTPv2Handler
934
935 def test_caps(self):
936 caps = self.server.getcapabilities()
937 self.assertEqual(caps, {
938 'VERSION': ['2'],
939 'IMPLEMENTATION': ['INN', '2.5.1'],
940 'AUTHINFO': ['USER'],
941 'HDR': [],
942 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
943 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
944 'OVER': [],
945 'POST': [],
946 'READER': [],
947 })
948 self.assertEqual(self.server.nntp_version, 2)
949
950
951class MiscTests(unittest.TestCase):
952
953 def test_decode_header(self):
954 def gives(a, b):
955 self.assertEqual(nntplib.decode_header(a), b)
956 gives("" , "")
957 gives("a plain header", "a plain header")
958 gives(" with extra spaces ", " with extra spaces ")
959 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
960 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
961 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
962 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
963 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
964 "Re: problème de matrice")
965 # A natively utf-8 header (found in the real world!)
966 gives("Re: Message d'erreur incompréhensible (par moi)",
967 "Re: Message d'erreur incompréhensible (par moi)")
968
969 def test_parse_overview_fmt(self):
970 # The minimal (default) response
971 lines = ["Subject:", "From:", "Date:", "Message-ID:",
972 "References:", ":bytes", ":lines"]
973 self.assertEqual(nntplib._parse_overview_fmt(lines),
974 ["subject", "from", "date", "message-id", "references",
975 ":bytes", ":lines"])
976 # The minimal response using alternative names
977 lines = ["Subject:", "From:", "Date:", "Message-ID:",
978 "References:", "Bytes:", "Lines:"]
979 self.assertEqual(nntplib._parse_overview_fmt(lines),
980 ["subject", "from", "date", "message-id", "references",
981 ":bytes", ":lines"])
982 # Variations in casing
983 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
984 "References:", "BYTES:", "Lines:"]
985 self.assertEqual(nntplib._parse_overview_fmt(lines),
986 ["subject", "from", "date", "message-id", "references",
987 ":bytes", ":lines"])
988 # First example from RFC 3977
989 lines = ["Subject:", "From:", "Date:", "Message-ID:",
990 "References:", ":bytes", ":lines", "Xref:full",
991 "Distribution:full"]
992 self.assertEqual(nntplib._parse_overview_fmt(lines),
993 ["subject", "from", "date", "message-id", "references",
994 ":bytes", ":lines", "xref", "distribution"])
995 # Second example from RFC 3977
996 lines = ["Subject:", "From:", "Date:", "Message-ID:",
997 "References:", "Bytes:", "Lines:", "Xref:FULL",
998 "Distribution:FULL"]
999 self.assertEqual(nntplib._parse_overview_fmt(lines),
1000 ["subject", "from", "date", "message-id", "references",
1001 ":bytes", ":lines", "xref", "distribution"])
1002 # A classic response from INN
1003 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1004 "References:", "Bytes:", "Lines:", "Xref:full"]
1005 self.assertEqual(nntplib._parse_overview_fmt(lines),
1006 ["subject", "from", "date", "message-id", "references",
1007 ":bytes", ":lines", "xref"])
1008
1009 def test_parse_overview(self):
1010 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1011 # First example from RFC 3977
1012 lines = [
1013 '3000234\tI am just a test article\t"Demo User" '
1014 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1015 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1016 '17\tXref: news.example.com misc.test:3000363',
1017 ]
1018 overview = nntplib._parse_overview(lines, fmt)
1019 (art_num, fields), = overview
1020 self.assertEqual(art_num, 3000234)
1021 self.assertEqual(fields, {
1022 'subject': 'I am just a test article',
1023 'from': '"Demo User" <nobody@example.com>',
1024 'date': '6 Oct 1998 04:38:40 -0500',
1025 'message-id': '<45223423@example.com>',
1026 'references': '<45454@example.net>',
1027 ':bytes': '1234',
1028 ':lines': '17',
1029 'xref': 'news.example.com misc.test:3000363',
1030 })
1031
1032 def test_parse_datetime(self):
1033 def gives(a, b, *c):
1034 self.assertEqual(nntplib._parse_datetime(a, b),
1035 datetime.datetime(*c))
1036 # Output of DATE command
1037 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1038 # Variations
1039 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1040 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1041 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1042
1043 def test_unparse_datetime(self):
1044 # Test non-legacy mode
1045 # 1) with a datetime
1046 def gives(y, M, d, h, m, s, date_str, time_str):
1047 dt = datetime.datetime(y, M, d, h, m, s)
1048 self.assertEqual(nntplib._unparse_datetime(dt),
1049 (date_str, time_str))
1050 self.assertEqual(nntplib._unparse_datetime(dt, False),
1051 (date_str, time_str))
1052 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1053 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1054 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1055 # 2) with a date
1056 def gives(y, M, d, date_str, time_str):
1057 dt = datetime.date(y, M, d)
1058 self.assertEqual(nntplib._unparse_datetime(dt),
1059 (date_str, time_str))
1060 self.assertEqual(nntplib._unparse_datetime(dt, False),
1061 (date_str, time_str))
1062 gives(1999, 6, 23, "19990623", "000000")
1063 gives(2000, 6, 23, "20000623", "000000")
1064 gives(2010, 6, 5, "20100605", "000000")
1065
1066 def test_unparse_datetime_legacy(self):
1067 # Test legacy mode (RFC 977)
1068 # 1) with a datetime
1069 def gives(y, M, d, h, m, s, date_str, time_str):
1070 dt = datetime.datetime(y, M, d, h, m, s)
1071 self.assertEqual(nntplib._unparse_datetime(dt, True),
1072 (date_str, time_str))
1073 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1074 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1075 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1076 # 2) with a date
1077 def gives(y, M, d, date_str, time_str):
1078 dt = datetime.date(y, M, d)
1079 self.assertEqual(nntplib._unparse_datetime(dt, True),
1080 (date_str, time_str))
1081 gives(1999, 6, 23, "990623", "000000")
1082 gives(2000, 6, 23, "000623", "000000")
1083 gives(2010, 6, 5, "100605", "000000")
1084
1085
1086def test_main():
1087 support.run_unittest(MiscTests, NNTPv1Tests, NNTPv2Tests,
1088 NetworkedNNTPTests
1089 )
1090
1091
1092if __name__ == "__main__":
1093 test_main()