blob: 9e88ddbda7b9de169d5a2ef813a30be1a66c7400 [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +00002import socket
Antoine Pitrou69ab9512010-09-29 15:03:40 +00003import datetime
4import textwrap
5import unittest
Antoine Pitroude609182010-11-18 17:29:23 +00006import functools
Antoine Pitrou69ab9512010-09-29 15:03:40 +00007import contextlib
8from test import support
Serhiy Storchaka43767632013-11-03 21:31:38 +02009from nntplib import NNTP, GroupInfo
Antoine Pitrou69ab9512010-09-29 15:03:40 +000010import nntplib
Serhiy Storchaka52027c32015-03-21 09:40:26 +020011from unittest.mock import patch
Serhiy Storchaka43767632013-11-03 21:31:38 +020012try:
Antoine Pitrou1cb121e2010-11-09 18:54:37 +000013 import ssl
Serhiy Storchaka43767632013-11-03 21:31:38 +020014except ImportError:
15 ssl = None
Antoine Pitrou69ab9512010-09-29 15:03:40 +000016
17TIMEOUT = 30
18
19# TODO:
20# - test the `file` arg to more commands
21# - test error conditions
Antoine Pitroua5785b12010-09-29 16:19:50 +000022# - test auth and `usenetrc`
Antoine Pitrou69ab9512010-09-29 15:03:40 +000023
24
25class NetworkedNNTPTestsMixin:
26
27 def test_welcome(self):
28 welcome = self.server.getwelcome()
29 self.assertEqual(str, type(welcome))
30
31 def test_help(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000032 resp, lines = self.server.help()
Antoine Pitrou69ab9512010-09-29 15:03:40 +000033 self.assertTrue(resp.startswith("100 "), resp)
Antoine Pitrou08eeada2010-11-04 21:36:15 +000034 for line in lines:
Antoine Pitrou69ab9512010-09-29 15:03:40 +000035 self.assertEqual(str, type(line))
36
37 def test_list(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000038 resp, groups = self.server.list()
39 if len(groups) > 0:
40 self.assertEqual(GroupInfo, type(groups[0]))
41 self.assertEqual(str, type(groups[0].group))
42
43 def test_list_active(self):
44 resp, groups = self.server.list(self.GROUP_PAT)
45 if len(groups) > 0:
46 self.assertEqual(GroupInfo, type(groups[0]))
47 self.assertEqual(str, type(groups[0].group))
Antoine Pitrou69ab9512010-09-29 15:03:40 +000048
49 def test_unknown_command(self):
50 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
51 self.server._shortcmd("XYZZY")
52 resp = cm.exception.response
53 self.assertTrue(resp.startswith("500 "), resp)
54
55 def test_newgroups(self):
56 # gmane gets a constant influx of new groups. In order not to stress
57 # the server too much, we choose a recent date in the past.
58 dt = datetime.date.today() - datetime.timedelta(days=7)
59 resp, groups = self.server.newgroups(dt)
60 if len(groups) > 0:
61 self.assertIsInstance(groups[0], GroupInfo)
62 self.assertIsInstance(groups[0].group, str)
63
64 def test_description(self):
65 def _check_desc(desc):
66 # Sanity checks
67 self.assertIsInstance(desc, str)
68 self.assertNotIn(self.GROUP_NAME, desc)
69 desc = self.server.description(self.GROUP_NAME)
70 _check_desc(desc)
71 # Another sanity check
72 self.assertIn("Python", desc)
73 # With a pattern
74 desc = self.server.description(self.GROUP_PAT)
75 _check_desc(desc)
76 # Shouldn't exist
77 desc = self.server.description("zk.brrtt.baz")
78 self.assertEqual(desc, '')
79
80 def test_descriptions(self):
81 resp, descs = self.server.descriptions(self.GROUP_PAT)
82 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
83 self.assertTrue(
84 resp.startswith("215 ") or resp.startswith("282 "), resp)
85 self.assertIsInstance(descs, dict)
86 desc = descs[self.GROUP_NAME]
87 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
88
89 def test_group(self):
90 result = self.server.group(self.GROUP_NAME)
91 self.assertEqual(5, len(result))
92 resp, count, first, last, group = result
93 self.assertEqual(group, self.GROUP_NAME)
94 self.assertIsInstance(count, int)
95 self.assertIsInstance(first, int)
96 self.assertIsInstance(last, int)
97 self.assertLessEqual(first, last)
98 self.assertTrue(resp.startswith("211 "), resp)
99
100 def test_date(self):
101 resp, date = self.server.date()
102 self.assertIsInstance(date, datetime.datetime)
103 # Sanity check
104 self.assertGreaterEqual(date.year, 1995)
105 self.assertLessEqual(date.year, 2030)
106
107 def _check_art_dict(self, art_dict):
108 # Some sanity checks for a field dictionary returned by OVER / XOVER
109 self.assertIsInstance(art_dict, dict)
110 # NNTP has 7 mandatory fields
111 self.assertGreaterEqual(art_dict.keys(),
112 {"subject", "from", "date", "message-id",
113 "references", ":bytes", ":lines"}
114 )
115 for v in art_dict.values():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000116 self.assertIsInstance(v, (str, type(None)))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000117
118 def test_xover(self):
119 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000120 resp, lines = self.server.xover(last - 5, last)
121 if len(lines) == 0:
122 self.skipTest("no articles retrieved")
123 # The 'last' article is not necessarily part of the output (cancelled?)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000124 art_num, art_dict = lines[0]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000125 self.assertGreaterEqual(art_num, last - 5)
126 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000127 self._check_art_dict(art_dict)
128
129 def test_over(self):
130 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
131 start = last - 10
132 # The "start-" article range form
133 resp, lines = self.server.over((start, None))
134 art_num, art_dict = lines[0]
135 self._check_art_dict(art_dict)
136 # The "start-end" article range form
137 resp, lines = self.server.over((start, last))
138 art_num, art_dict = lines[-1]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000139 # The 'last' article is not necessarily part of the output (cancelled?)
140 self.assertGreaterEqual(art_num, start)
141 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000142 self._check_art_dict(art_dict)
143 # XXX The "message_id" form is unsupported by gmane
144 # 503 Overview by message-ID unsupported
145
146 def test_xhdr(self):
147 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
148 resp, lines = self.server.xhdr('subject', last)
149 for line in lines:
150 self.assertEqual(str, type(line[1]))
151
152 def check_article_resp(self, resp, article, art_num=None):
153 self.assertIsInstance(article, nntplib.ArticleInfo)
154 if art_num is not None:
155 self.assertEqual(article.number, art_num)
156 for line in article.lines:
157 self.assertIsInstance(line, bytes)
158 # XXX this could exceptionally happen...
159 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
160
161 def test_article_head_body(self):
162 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000163 # Try to find an available article
164 for art_num in (last, first, last - 1):
165 try:
166 resp, head = self.server.head(art_num)
167 except nntplib.NNTPTemporaryError as e:
168 if not e.response.startswith("423 "):
169 raise
170 # "423 No such article" => choose another one
171 continue
172 break
173 else:
174 self.skipTest("could not find a suitable article number")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000175 self.assertTrue(resp.startswith("221 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000176 self.check_article_resp(resp, head, art_num)
177 resp, body = self.server.body(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000178 self.assertTrue(resp.startswith("222 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000179 self.check_article_resp(resp, body, art_num)
180 resp, article = self.server.article(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000181 self.assertTrue(resp.startswith("220 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000182 self.check_article_resp(resp, article, art_num)
Nick Coghlan14d99a12012-06-17 21:27:18 +1000183 # Tolerate running the tests from behind a NNTP virus checker
Antoine Pitrou1f5d2a02012-06-24 16:28:18 +0200184 blacklist = lambda line: line.startswith(b'X-Antivirus')
185 filtered_head_lines = [line for line in head.lines
186 if not blacklist(line)]
Nick Coghlan14d99a12012-06-17 21:27:18 +1000187 filtered_lines = [line for line in article.lines
Antoine Pitrou1f5d2a02012-06-24 16:28:18 +0200188 if not blacklist(line)]
189 self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000190
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000191 def test_capabilities(self):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000192 # The server under test implements NNTP version 2 and has a
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000193 # couple of well-known capabilities. Just sanity check that we
194 # got them.
195 def _check_caps(caps):
196 caps_list = caps['LIST']
197 self.assertIsInstance(caps_list, (list, tuple))
198 self.assertIn('OVERVIEW.FMT', caps_list)
199 self.assertGreaterEqual(self.server.nntp_version, 2)
200 _check_caps(self.server.getcapabilities())
201 # This re-emits the command
202 resp, caps = self.server.capabilities()
203 _check_caps(caps)
204
Serhiy Storchaka43767632013-11-03 21:31:38 +0200205 @unittest.skipUnless(ssl, 'requires SSL support')
206 def test_starttls(self):
207 file = self.server.file
208 sock = self.server.sock
209 try:
210 self.server.starttls()
211 except nntplib.NNTPPermanentError:
212 self.skipTest("STARTTLS not supported by server.")
213 else:
214 # Check that the socket and internal pseudo-file really were
215 # changed.
216 self.assertNotEqual(file, self.server.file)
217 self.assertNotEqual(sock, self.server.sock)
218 # Check that the new socket really is an SSL one
219 self.assertIsInstance(self.server.sock, ssl.SSLSocket)
220 # Check that trying starttls when it's already active fails.
221 self.assertRaises(ValueError, self.server.starttls)
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000222
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000223 def test_zlogin(self):
224 # This test must be the penultimate because further commands will be
225 # refused.
226 baduser = "notarealuser"
227 badpw = "notarealpassword"
228 # Check that bogus credentials cause failure
229 self.assertRaises(nntplib.NNTPError, self.server.login,
230 user=baduser, password=badpw, usenetrc=False)
231 # FIXME: We should check that correct credentials succeed, but that
232 # would require valid details for some server somewhere to be in the
233 # test suite, I think. Gmane is anonymous, at least as used for the
234 # other tests.
235
236 def test_zzquit(self):
237 # This test must be called last, hence the name
238 cls = type(self)
Antoine Pitrou3bce11c2010-11-21 17:14:19 +0000239 try:
240 self.server.quit()
241 finally:
242 cls.server = None
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000243
Antoine Pitroude609182010-11-18 17:29:23 +0000244 @classmethod
245 def wrap_methods(cls):
246 # Wrap all methods in a transient_internet() exception catcher
247 # XXX put a generic version in test.support?
248 def wrap_meth(meth):
249 @functools.wraps(meth)
250 def wrapped(self):
251 with support.transient_internet(self.NNTP_HOST):
252 meth(self)
253 return wrapped
254 for name in dir(cls):
255 if not name.startswith('test_'):
256 continue
257 meth = getattr(cls, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200258 if not callable(meth):
Antoine Pitroude609182010-11-18 17:29:23 +0000259 continue
260 # Need to use a closure so that meth remains bound to its current
261 # value
262 setattr(cls, name, wrap_meth(meth))
263
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000264 def test_with_statement(self):
265 def is_connected():
266 if not hasattr(server, 'file'):
267 return False
268 try:
269 server.help()
Andrew Svetlov0832af62012-12-18 23:10:48 +0200270 except (OSError, EOFError):
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000271 return False
272 return True
273
274 with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
275 self.assertTrue(is_connected())
276 self.assertTrue(server.help())
277 self.assertFalse(is_connected())
278
279 with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
280 server.quit()
281 self.assertFalse(is_connected())
282
283
Antoine Pitroude609182010-11-18 17:29:23 +0000284NetworkedNNTPTestsMixin.wrap_methods()
285
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000286
287class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
288 # This server supports STARTTLS (gmane doesn't)
289 NNTP_HOST = 'news.trigofacile.com'
290 GROUP_NAME = 'fr.comp.lang.python'
291 GROUP_PAT = 'fr.comp.lang.*'
292
Antoine Pitroude609182010-11-18 17:29:23 +0000293 NNTP_CLASS = NNTP
294
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000295 @classmethod
296 def setUpClass(cls):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000297 support.requires("network")
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000298 with support.transient_internet(cls.NNTP_HOST):
Antoine Pitroude609182010-11-18 17:29:23 +0000299 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, timeout=TIMEOUT, usenetrc=False)
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000300
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000301 @classmethod
302 def tearDownClass(cls):
303 if cls.server is not None:
304 cls.server.quit()
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000305
Serhiy Storchaka43767632013-11-03 21:31:38 +0200306@unittest.skipUnless(ssl, 'requires SSL support')
307class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000308
Serhiy Storchaka43767632013-11-03 21:31:38 +0200309 # Technical limits for this public NNTP server (see http://www.aioe.org):
310 # "Only two concurrent connections per IP address are allowed and
311 # 400 connections per day are accepted from each IP address."
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000312
Serhiy Storchaka43767632013-11-03 21:31:38 +0200313 NNTP_HOST = 'nntp.aioe.org'
314 GROUP_NAME = 'comp.lang.python'
315 GROUP_PAT = 'comp.lang.*'
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000316
Serhiy Storchaka43767632013-11-03 21:31:38 +0200317 NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None)
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000318
Serhiy Storchaka43767632013-11-03 21:31:38 +0200319 # Disabled as it produces too much data
320 test_list = None
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000321
Serhiy Storchaka43767632013-11-03 21:31:38 +0200322 # Disabled as the connection will already be encrypted.
323 test_starttls = None
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000324
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000325
326#
327# Non-networked tests using a local server (or something mocking it).
328#
329
330class _NNTPServerIO(io.RawIOBase):
331 """A raw IO object allowing NNTP commands to be received and processed
332 by a handler. The handler can push responses which can then be read
333 from the IO object."""
334
335 def __init__(self, handler):
336 io.RawIOBase.__init__(self)
337 # The channel from the client
338 self.c2s = io.BytesIO()
339 # The channel to the client
340 self.s2c = io.BytesIO()
341 self.handler = handler
342 self.handler.start(self.c2s.readline, self.push_data)
343
344 def readable(self):
345 return True
346
347 def writable(self):
348 return True
349
350 def push_data(self, data):
351 """Push (buffer) some data to send to the client."""
352 pos = self.s2c.tell()
353 self.s2c.seek(0, 2)
354 self.s2c.write(data)
355 self.s2c.seek(pos)
356
357 def write(self, b):
358 """The client sends us some data"""
359 pos = self.c2s.tell()
360 self.c2s.write(b)
361 self.c2s.seek(pos)
362 self.handler.process_pending()
363 return len(b)
364
365 def readinto(self, buf):
366 """The client wants to read a response"""
367 self.handler.process_pending()
368 b = self.s2c.read(len(buf))
369 n = len(b)
370 buf[:n] = b
371 return n
372
373
Serhiy Storchaka52027c32015-03-21 09:40:26 +0200374def make_mock_file(handler):
375 sio = _NNTPServerIO(handler)
376 # Using BufferedRWPair instead of BufferedRandom ensures the file
377 # isn't seekable.
378 file = io.BufferedRWPair(sio, sio)
379 return (sio, file)
380
381
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000382class MockedNNTPTestsMixin:
383 # Override in derived classes
384 handler_class = None
385
386 def setUp(self):
387 super().setUp()
388 self.make_server()
389
390 def tearDown(self):
391 super().tearDown()
392 del self.server
393
394 def make_server(self, *args, **kwargs):
395 self.handler = self.handler_class()
Serhiy Storchaka52027c32015-03-21 09:40:26 +0200396 self.sio, file = make_mock_file(self.handler)
Antoine Pitroua5785b12010-09-29 16:19:50 +0000397 self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000398 return self.server
399
400
Antoine Pitrou71135622012-02-14 23:29:34 +0100401class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin):
402 def setUp(self):
403 super().setUp()
404 self.make_server(readermode=True)
405
406
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000407class NNTPv1Handler:
408 """A handler for RFC 977"""
409
410 welcome = "200 NNTP mock server"
411
412 def start(self, readline, push_data):
413 self.in_body = False
414 self.allow_posting = True
415 self._readline = readline
416 self._push_data = push_data
Antoine Pitrou54411c12012-02-12 19:14:17 +0100417 self._logged_in = False
418 self._user_sent = False
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000419 # Our welcome
420 self.handle_welcome()
421
422 def _decode(self, data):
423 return str(data, "utf-8", "surrogateescape")
424
425 def process_pending(self):
426 if self.in_body:
427 while True:
428 line = self._readline()
429 if not line:
430 return
431 self.body.append(line)
432 if line == b".\r\n":
433 break
434 try:
435 meth, tokens = self.body_callback
436 meth(*tokens, body=self.body)
437 finally:
438 self.body_callback = None
439 self.body = None
440 self.in_body = False
441 while True:
442 line = self._decode(self._readline())
443 if not line:
444 return
445 if not line.endswith("\r\n"):
446 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
447 line = line[:-2]
448 cmd, *tokens = line.split()
449 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
450 meth = getattr(self, "handle_" + cmd.upper(), None)
451 if meth is None:
452 self.handle_unknown()
453 else:
454 try:
455 meth(*tokens)
456 except Exception as e:
457 raise ValueError("command failed: {!r}".format(line)) from e
458 else:
459 if self.in_body:
460 self.body_callback = meth, tokens
461 self.body = []
462
463 def expect_body(self):
464 """Flag that the client is expected to post a request body"""
465 self.in_body = True
466
467 def push_data(self, data):
468 """Push some binary data"""
469 self._push_data(data)
470
471 def push_lit(self, lit):
472 """Push a string literal"""
473 lit = textwrap.dedent(lit)
474 lit = "\r\n".join(lit.splitlines()) + "\r\n"
475 lit = lit.encode('utf-8')
476 self.push_data(lit)
477
478 def handle_unknown(self):
479 self.push_lit("500 What?")
480
481 def handle_welcome(self):
482 self.push_lit(self.welcome)
483
484 def handle_QUIT(self):
485 self.push_lit("205 Bye!")
486
487 def handle_DATE(self):
488 self.push_lit("111 20100914001155")
489
490 def handle_GROUP(self, group):
491 if group == "fr.comp.lang.python":
492 self.push_lit("211 486 761 1265 fr.comp.lang.python")
493 else:
494 self.push_lit("411 No such group {}".format(group))
495
496 def handle_HELP(self):
497 self.push_lit("""\
498 100 Legal commands
499 authinfo user Name|pass Password|generic <prog> <args>
500 date
501 help
502 Report problems to <root@example.org>
503 .""")
504
505 def handle_STAT(self, message_spec=None):
506 if message_spec is None:
507 self.push_lit("412 No newsgroup selected")
508 elif message_spec == "3000234":
509 self.push_lit("223 3000234 <45223423@example.com>")
510 elif message_spec == "<45223423@example.com>":
511 self.push_lit("223 0 <45223423@example.com>")
512 else:
513 self.push_lit("430 No Such Article Found")
514
515 def handle_NEXT(self):
516 self.push_lit("223 3000237 <668929@example.org> retrieved")
517
518 def handle_LAST(self):
519 self.push_lit("223 3000234 <45223423@example.com> retrieved")
520
521 def handle_LIST(self, action=None, param=None):
522 if action is None:
523 self.push_lit("""\
524 215 Newsgroups in form "group high low flags".
525 comp.lang.python 0000052340 0000002828 y
526 comp.lang.python.announce 0000001153 0000000993 m
527 free.it.comp.lang.python 0000000002 0000000002 y
528 fr.comp.lang.python 0000001254 0000000760 y
529 free.it.comp.lang.python.learner 0000000000 0000000001 y
530 tw.bbs.comp.lang.python 0000000304 0000000304 y
531 .""")
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000532 elif action == "ACTIVE":
533 if param == "*distutils*":
534 self.push_lit("""\
535 215 Newsgroups in form "group high low flags"
536 gmane.comp.python.distutils.devel 0000014104 0000000001 m
537 gmane.comp.python.distutils.cvs 0000000000 0000000001 m
538 .""")
539 else:
540 self.push_lit("""\
541 215 Newsgroups in form "group high low flags"
542 .""")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000543 elif action == "OVERVIEW.FMT":
544 self.push_lit("""\
545 215 Order of fields in overview database.
546 Subject:
547 From:
548 Date:
549 Message-ID:
550 References:
551 Bytes:
552 Lines:
553 Xref:full
554 .""")
555 elif action == "NEWSGROUPS":
556 assert param is not None
557 if param == "comp.lang.python":
558 self.push_lit("""\
559 215 Descriptions in form "group description".
560 comp.lang.python\tThe Python computer language.
561 .""")
562 elif param == "comp.lang.python*":
563 self.push_lit("""\
564 215 Descriptions in form "group description".
565 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
566 comp.lang.python\tThe Python computer language.
567 .""")
568 else:
569 self.push_lit("""\
570 215 Descriptions in form "group description".
571 .""")
572 else:
573 self.push_lit('501 Unknown LIST keyword')
574
575 def handle_NEWNEWS(self, group, date_str, time_str):
576 # We hard code different return messages depending on passed
577 # argument and date syntax.
578 if (group == "comp.lang.python" and date_str == "20100913"
579 and time_str == "082004"):
580 # Date was passed in RFC 3977 format (NNTP "v2")
581 self.push_lit("""\
582 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
583 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
584 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
585 .""")
586 elif (group == "comp.lang.python" and date_str == "100913"
587 and time_str == "082004"):
588 # Date was passed in RFC 977 format (NNTP "v1")
589 self.push_lit("""\
590 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
591 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
592 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
593 .""")
Georg Brandl28e78412013-10-27 07:29:47 +0100594 elif (group == 'comp.lang.python' and
595 date_str in ('20100101', '100101') and
596 time_str == '090000'):
597 self.push_lit('too long line' * 3000 +
598 '\n.')
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000599 else:
600 self.push_lit("""\
601 230 An empty list of newsarticles follows
602 .""")
603 # (Note for experiments: many servers disable NEWNEWS.
604 # As of this writing, sicinfo3.epfl.ch doesn't.)
605
606 def handle_XOVER(self, message_spec):
607 if message_spec == "57-59":
608 self.push_lit(
609 "224 Overview information for 57-58 follows\n"
610 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
611 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
612 "\tSat, 19 Jun 2010 18:04:08 -0400"
613 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
614 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
615 "\tXref: news.gmane.org gmane.comp.python.authors:57"
616 "\n"
617 "58\tLooking for a few good bloggers"
618 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
619 "\tThu, 22 Jul 2010 09:14:14 -0400"
620 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
621 "\t\t6683\t16"
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000622 "\t"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000623 "\n"
624 # An UTF-8 overview line from fr.comp.lang.python
625 "59\tRe: Message d'erreur incompréhensible (par moi)"
626 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
627 "\tWed, 15 Sep 2010 18:09:15 +0200"
628 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
629 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
630 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
631 "\n"
632 ".\n")
633 else:
634 self.push_lit("""\
635 224 No articles
636 .""")
637
638 def handle_POST(self, *, body=None):
639 if body is None:
640 if self.allow_posting:
641 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
642 self.expect_body()
643 else:
644 self.push_lit("440 Posting not permitted")
645 else:
646 assert self.allow_posting
647 self.push_lit("240 Article received OK")
648 self.posted_body = body
649
650 def handle_IHAVE(self, message_id, *, body=None):
651 if body is None:
652 if (self.allow_posting and
653 message_id == "<i.am.an.article.you.will.want@example.com>"):
654 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
655 self.expect_body()
656 else:
657 self.push_lit("435 Article not wanted")
658 else:
659 assert self.allow_posting
660 self.push_lit("235 Article transferred OK")
661 self.posted_body = body
662
663 sample_head = """\
664 From: "Demo User" <nobody@example.net>
665 Subject: I am just a test article
666 Content-Type: text/plain; charset=UTF-8; format=flowed
667 Message-ID: <i.am.an.article.you.will.want@example.com>"""
668
669 sample_body = """\
670 This is just a test article.
671 ..Here is a dot-starting line.
672
673 -- Signed by Andr\xe9."""
674
675 sample_article = sample_head + "\n\n" + sample_body
676
677 def handle_ARTICLE(self, message_spec=None):
678 if message_spec is None:
679 self.push_lit("220 3000237 <45223423@example.com>")
680 elif message_spec == "<45223423@example.com>":
681 self.push_lit("220 0 <45223423@example.com>")
682 elif message_spec == "3000234":
683 self.push_lit("220 3000234 <45223423@example.com>")
684 else:
685 self.push_lit("430 No Such Article Found")
686 return
687 self.push_lit(self.sample_article)
688 self.push_lit(".")
689
690 def handle_HEAD(self, message_spec=None):
691 if message_spec is None:
692 self.push_lit("221 3000237 <45223423@example.com>")
693 elif message_spec == "<45223423@example.com>":
694 self.push_lit("221 0 <45223423@example.com>")
695 elif message_spec == "3000234":
696 self.push_lit("221 3000234 <45223423@example.com>")
697 else:
698 self.push_lit("430 No Such Article Found")
699 return
700 self.push_lit(self.sample_head)
701 self.push_lit(".")
702
703 def handle_BODY(self, message_spec=None):
704 if message_spec is None:
705 self.push_lit("222 3000237 <45223423@example.com>")
706 elif message_spec == "<45223423@example.com>":
707 self.push_lit("222 0 <45223423@example.com>")
708 elif message_spec == "3000234":
709 self.push_lit("222 3000234 <45223423@example.com>")
710 else:
711 self.push_lit("430 No Such Article Found")
712 return
713 self.push_lit(self.sample_body)
714 self.push_lit(".")
715
Antoine Pitrou54411c12012-02-12 19:14:17 +0100716 def handle_AUTHINFO(self, cred_type, data):
717 if self._logged_in:
718 self.push_lit('502 Already Logged In')
719 elif cred_type == 'user':
720 if self._user_sent:
721 self.push_lit('482 User Credential Already Sent')
722 else:
723 self.push_lit('381 Password Required')
724 self._user_sent = True
725 elif cred_type == 'pass':
726 self.push_lit('281 Login Successful')
727 self._logged_in = True
728 else:
729 raise Exception('Unknown cred type {}'.format(cred_type))
730
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000731
732class NNTPv2Handler(NNTPv1Handler):
733 """A handler for RFC 3977 (NNTP "v2")"""
734
735 def handle_CAPABILITIES(self):
Antoine Pitrou54411c12012-02-12 19:14:17 +0100736 fmt = """\
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000737 101 Capability list:
Antoine Pitrouf80b3f72010-11-02 22:31:52 +0000738 VERSION 2 3
Antoine Pitrou54411c12012-02-12 19:14:17 +0100739 IMPLEMENTATION INN 2.5.1{}
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000740 HDR
741 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
742 OVER
743 POST
744 READER
Antoine Pitrou54411c12012-02-12 19:14:17 +0100745 ."""
746
747 if not self._logged_in:
748 self.push_lit(fmt.format('\n AUTHINFO USER'))
749 else:
750 self.push_lit(fmt.format(''))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000751
Antoine Pitrou71135622012-02-14 23:29:34 +0100752 def handle_MODE(self, _):
753 raise Exception('MODE READER sent despite READER has been advertised')
754
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000755 def handle_OVER(self, message_spec=None):
756 return self.handle_XOVER(message_spec)
757
758
Antoine Pitrou54411c12012-02-12 19:14:17 +0100759class CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
760 """A handler that allows CAPABILITIES only after login"""
761
762 def handle_CAPABILITIES(self):
763 if not self._logged_in:
764 self.push_lit('480 You must log in.')
765 else:
766 super().handle_CAPABILITIES()
767
768
Antoine Pitrou71135622012-02-14 23:29:34 +0100769class ModeSwitchingNNTPv2Handler(NNTPv2Handler):
770 """A server that starts in transit mode"""
771
772 def __init__(self):
773 self._switched = False
774
775 def handle_CAPABILITIES(self):
776 fmt = """\
777 101 Capability list:
778 VERSION 2 3
779 IMPLEMENTATION INN 2.5.1
780 HDR
781 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
782 OVER
783 POST
784 {}READER
785 ."""
786 if self._switched:
787 self.push_lit(fmt.format(''))
788 else:
789 self.push_lit(fmt.format('MODE-'))
790
791 def handle_MODE(self, what):
792 assert not self._switched and what == 'reader'
793 self._switched = True
794 self.push_lit('200 Posting allowed')
795
796
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000797class NNTPv1v2TestsMixin:
798
799 def setUp(self):
800 super().setUp()
801
802 def test_welcome(self):
803 self.assertEqual(self.server.welcome, self.handler.welcome)
804
Antoine Pitrou54411c12012-02-12 19:14:17 +0100805 def test_authinfo(self):
806 if self.nntp_version == 2:
807 self.assertIn('AUTHINFO', self.server._caps)
808 self.server.login('testuser', 'testpw')
809 # if AUTHINFO is gone from _caps we also know that getcapabilities()
810 # has been called after login as it should
811 self.assertNotIn('AUTHINFO', self.server._caps)
812
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000813 def test_date(self):
814 resp, date = self.server.date()
815 self.assertEqual(resp, "111 20100914001155")
816 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
817
818 def test_quit(self):
819 self.assertFalse(self.sio.closed)
820 resp = self.server.quit()
821 self.assertEqual(resp, "205 Bye!")
822 self.assertTrue(self.sio.closed)
823
824 def test_help(self):
825 resp, help = self.server.help()
826 self.assertEqual(resp, "100 Legal commands")
827 self.assertEqual(help, [
828 ' authinfo user Name|pass Password|generic <prog> <args>',
829 ' date',
830 ' help',
831 'Report problems to <root@example.org>',
832 ])
833
834 def test_list(self):
835 resp, groups = self.server.list()
836 self.assertEqual(len(groups), 6)
837 g = groups[1]
838 self.assertEqual(g,
839 GroupInfo("comp.lang.python.announce", "0000001153",
840 "0000000993", "m"))
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000841 resp, groups = self.server.list("*distutils*")
842 self.assertEqual(len(groups), 2)
843 g = groups[0]
844 self.assertEqual(g,
845 GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
846 "0000000001", "m"))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000847
848 def test_stat(self):
849 resp, art_num, message_id = self.server.stat(3000234)
850 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
851 self.assertEqual(art_num, 3000234)
852 self.assertEqual(message_id, "<45223423@example.com>")
853 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
854 self.assertEqual(resp, "223 0 <45223423@example.com>")
855 self.assertEqual(art_num, 0)
856 self.assertEqual(message_id, "<45223423@example.com>")
857 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
858 self.server.stat("<non.existent.id>")
859 self.assertEqual(cm.exception.response, "430 No Such Article Found")
860 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
861 self.server.stat()
862 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
863
864 def test_next(self):
865 resp, art_num, message_id = self.server.next()
866 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
867 self.assertEqual(art_num, 3000237)
868 self.assertEqual(message_id, "<668929@example.org>")
869
870 def test_last(self):
871 resp, art_num, message_id = self.server.last()
872 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
873 self.assertEqual(art_num, 3000234)
874 self.assertEqual(message_id, "<45223423@example.com>")
875
876 def test_description(self):
877 desc = self.server.description("comp.lang.python")
878 self.assertEqual(desc, "The Python computer language.")
879 desc = self.server.description("comp.lang.pythonx")
880 self.assertEqual(desc, "")
881
882 def test_descriptions(self):
883 resp, groups = self.server.descriptions("comp.lang.python")
884 self.assertEqual(resp, '215 Descriptions in form "group description".')
885 self.assertEqual(groups, {
886 "comp.lang.python": "The Python computer language.",
887 })
888 resp, groups = self.server.descriptions("comp.lang.python*")
889 self.assertEqual(groups, {
890 "comp.lang.python": "The Python computer language.",
891 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
892 })
893 resp, groups = self.server.descriptions("comp.lang.pythonx")
894 self.assertEqual(groups, {})
895
896 def test_group(self):
897 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
898 self.assertTrue(resp.startswith("211 "), resp)
899 self.assertEqual(first, 761)
900 self.assertEqual(last, 1265)
901 self.assertEqual(count, 486)
902 self.assertEqual(group, "fr.comp.lang.python")
903 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
904 self.server.group("comp.lang.python.devel")
905 exc = cm.exception
906 self.assertTrue(exc.response.startswith("411 No such group"),
907 exc.response)
908
909 def test_newnews(self):
910 # NEWNEWS comp.lang.python [20]100913 082004
911 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
912 resp, ids = self.server.newnews("comp.lang.python", dt)
913 expected = (
914 "230 list of newsarticles (NNTP v{0}) "
915 "created after Mon Sep 13 08:20:04 2010 follows"
916 ).format(self.nntp_version)
917 self.assertEqual(resp, expected)
918 self.assertEqual(ids, [
919 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
920 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
921 ])
922 # NEWNEWS fr.comp.lang.python [20]100913 082004
923 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
924 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
925 self.assertEqual(resp, "230 An empty list of newsarticles follows")
926 self.assertEqual(ids, [])
927
928 def _check_article_body(self, lines):
929 self.assertEqual(len(lines), 4)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +0000930 self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000931 self.assertEqual(lines[-2], b"")
932 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
933 self.assertEqual(lines[-4], b"This is just a test article.")
934
935 def _check_article_head(self, lines):
936 self.assertEqual(len(lines), 4)
937 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
938 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
939
940 def _check_article_data(self, lines):
941 self.assertEqual(len(lines), 9)
942 self._check_article_head(lines[:4])
943 self._check_article_body(lines[-4:])
944 self.assertEqual(lines[4], b"")
945
946 def test_article(self):
947 # ARTICLE
948 resp, info = self.server.article()
949 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
950 art_num, message_id, lines = info
951 self.assertEqual(art_num, 3000237)
952 self.assertEqual(message_id, "<45223423@example.com>")
953 self._check_article_data(lines)
954 # ARTICLE num
955 resp, info = self.server.article(3000234)
956 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
957 art_num, message_id, lines = info
958 self.assertEqual(art_num, 3000234)
959 self.assertEqual(message_id, "<45223423@example.com>")
960 self._check_article_data(lines)
961 # ARTICLE id
962 resp, info = self.server.article("<45223423@example.com>")
963 self.assertEqual(resp, "220 0 <45223423@example.com>")
964 art_num, message_id, lines = info
965 self.assertEqual(art_num, 0)
966 self.assertEqual(message_id, "<45223423@example.com>")
967 self._check_article_data(lines)
968 # Non-existent id
969 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
970 self.server.article("<non-existent@example.com>")
971 self.assertEqual(cm.exception.response, "430 No Such Article Found")
972
973 def test_article_file(self):
974 # With a "file" argument
975 f = io.BytesIO()
976 resp, info = self.server.article(file=f)
977 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
978 art_num, message_id, lines = info
979 self.assertEqual(art_num, 3000237)
980 self.assertEqual(message_id, "<45223423@example.com>")
981 self.assertEqual(lines, [])
982 data = f.getvalue()
983 self.assertTrue(data.startswith(
984 b'From: "Demo User" <nobody@example.net>\r\n'
985 b'Subject: I am just a test article\r\n'
986 ), ascii(data))
987 self.assertTrue(data.endswith(
988 b'This is just a test article.\r\n'
989 b'.Here is a dot-starting line.\r\n'
990 b'\r\n'
991 b'-- Signed by Andr\xc3\xa9.\r\n'
992 ), ascii(data))
993
994 def test_head(self):
995 # HEAD
996 resp, info = self.server.head()
997 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
998 art_num, message_id, lines = info
999 self.assertEqual(art_num, 3000237)
1000 self.assertEqual(message_id, "<45223423@example.com>")
1001 self._check_article_head(lines)
1002 # HEAD num
1003 resp, info = self.server.head(3000234)
1004 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
1005 art_num, message_id, lines = info
1006 self.assertEqual(art_num, 3000234)
1007 self.assertEqual(message_id, "<45223423@example.com>")
1008 self._check_article_head(lines)
1009 # HEAD id
1010 resp, info = self.server.head("<45223423@example.com>")
1011 self.assertEqual(resp, "221 0 <45223423@example.com>")
1012 art_num, message_id, lines = info
1013 self.assertEqual(art_num, 0)
1014 self.assertEqual(message_id, "<45223423@example.com>")
1015 self._check_article_head(lines)
1016 # Non-existent id
1017 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1018 self.server.head("<non-existent@example.com>")
1019 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1020
Antoine Pitrou2640b522012-02-15 18:53:18 +01001021 def test_head_file(self):
1022 f = io.BytesIO()
1023 resp, info = self.server.head(file=f)
1024 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1025 art_num, message_id, lines = info
1026 self.assertEqual(art_num, 3000237)
1027 self.assertEqual(message_id, "<45223423@example.com>")
1028 self.assertEqual(lines, [])
1029 data = f.getvalue()
1030 self.assertTrue(data.startswith(
1031 b'From: "Demo User" <nobody@example.net>\r\n'
1032 b'Subject: I am just a test article\r\n'
1033 ), ascii(data))
1034 self.assertFalse(data.endswith(
1035 b'This is just a test article.\r\n'
1036 b'.Here is a dot-starting line.\r\n'
1037 b'\r\n'
1038 b'-- Signed by Andr\xc3\xa9.\r\n'
1039 ), ascii(data))
1040
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001041 def test_body(self):
1042 # BODY
1043 resp, info = self.server.body()
1044 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1045 art_num, message_id, lines = info
1046 self.assertEqual(art_num, 3000237)
1047 self.assertEqual(message_id, "<45223423@example.com>")
1048 self._check_article_body(lines)
1049 # BODY num
1050 resp, info = self.server.body(3000234)
1051 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
1052 art_num, message_id, lines = info
1053 self.assertEqual(art_num, 3000234)
1054 self.assertEqual(message_id, "<45223423@example.com>")
1055 self._check_article_body(lines)
1056 # BODY id
1057 resp, info = self.server.body("<45223423@example.com>")
1058 self.assertEqual(resp, "222 0 <45223423@example.com>")
1059 art_num, message_id, lines = info
1060 self.assertEqual(art_num, 0)
1061 self.assertEqual(message_id, "<45223423@example.com>")
1062 self._check_article_body(lines)
1063 # Non-existent id
1064 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1065 self.server.body("<non-existent@example.com>")
1066 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1067
Antoine Pitrou2640b522012-02-15 18:53:18 +01001068 def test_body_file(self):
1069 f = io.BytesIO()
1070 resp, info = self.server.body(file=f)
1071 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1072 art_num, message_id, lines = info
1073 self.assertEqual(art_num, 3000237)
1074 self.assertEqual(message_id, "<45223423@example.com>")
1075 self.assertEqual(lines, [])
1076 data = f.getvalue()
1077 self.assertFalse(data.startswith(
1078 b'From: "Demo User" <nobody@example.net>\r\n'
1079 b'Subject: I am just a test article\r\n'
1080 ), ascii(data))
1081 self.assertTrue(data.endswith(
1082 b'This is just a test article.\r\n'
1083 b'.Here is a dot-starting line.\r\n'
1084 b'\r\n'
1085 b'-- Signed by Andr\xc3\xa9.\r\n'
1086 ), ascii(data))
1087
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001088 def check_over_xover_resp(self, resp, overviews):
1089 self.assertTrue(resp.startswith("224 "), resp)
1090 self.assertEqual(len(overviews), 3)
1091 art_num, over = overviews[0]
1092 self.assertEqual(art_num, 57)
1093 self.assertEqual(over, {
1094 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
1095 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
1096 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
1097 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
1098 "references": "<hvalf7$ort$1@dough.gmane.org>",
1099 ":bytes": "7103",
1100 ":lines": "16",
1101 "xref": "news.gmane.org gmane.comp.python.authors:57"
1102 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001103 art_num, over = overviews[1]
1104 self.assertEqual(over["xref"], None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001105 art_num, over = overviews[2]
1106 self.assertEqual(over["subject"],
1107 "Re: Message d'erreur incompréhensible (par moi)")
1108
1109 def test_xover(self):
1110 resp, overviews = self.server.xover(57, 59)
1111 self.check_over_xover_resp(resp, overviews)
1112
1113 def test_over(self):
1114 # In NNTP "v1", this will fallback on XOVER
1115 resp, overviews = self.server.over((57, 59))
1116 self.check_over_xover_resp(resp, overviews)
1117
1118 sample_post = (
1119 b'From: "Demo User" <nobody@example.net>\r\n'
1120 b'Subject: I am just a test article\r\n'
1121 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
1122 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
1123 b'\r\n'
1124 b'This is just a test article.\r\n'
1125 b'.Here is a dot-starting line.\r\n'
1126 b'\r\n'
1127 b'-- Signed by Andr\xc3\xa9.\r\n'
1128 )
1129
1130 def _check_posted_body(self):
1131 # Check the raw body as received by the server
1132 lines = self.handler.posted_body
1133 # One additional line for the "." terminator
1134 self.assertEqual(len(lines), 10)
1135 self.assertEqual(lines[-1], b'.\r\n')
1136 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
1137 self.assertEqual(lines[-3], b'\r\n')
1138 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
1139 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
1140
1141 def _check_post_ihave_sub(self, func, *args, file_factory):
1142 # First the prepared post with CRLF endings
1143 post = self.sample_post
1144 func_args = args + (file_factory(post),)
1145 self.handler.posted_body = None
1146 resp = func(*func_args)
1147 self._check_posted_body()
1148 # Then the same post with "normal" line endings - they should be
1149 # converted by NNTP.post and NNTP.ihave.
1150 post = self.sample_post.replace(b"\r\n", b"\n")
1151 func_args = args + (file_factory(post),)
1152 self.handler.posted_body = None
1153 resp = func(*func_args)
1154 self._check_posted_body()
1155 return resp
1156
1157 def check_post_ihave(self, func, success_resp, *args):
1158 # With a bytes object
1159 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1160 self.assertEqual(resp, success_resp)
1161 # With a bytearray object
1162 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1163 self.assertEqual(resp, success_resp)
1164 # With a file object
1165 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1166 self.assertEqual(resp, success_resp)
1167 # With an iterable of terminated lines
1168 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001169 return iter(b.splitlines(keepends=True))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001170 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1171 self.assertEqual(resp, success_resp)
1172 # With an iterable of non-terminated lines
1173 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001174 return iter(b.splitlines(keepends=False))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001175 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1176 self.assertEqual(resp, success_resp)
1177
1178 def test_post(self):
1179 self.check_post_ihave(self.server.post, "240 Article received OK")
1180 self.handler.allow_posting = False
1181 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1182 self.server.post(self.sample_post)
1183 self.assertEqual(cm.exception.response,
1184 "440 Posting not permitted")
1185
1186 def test_ihave(self):
1187 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1188 "<i.am.an.article.you.will.want@example.com>")
1189 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1190 self.server.ihave("<another.message.id>", self.sample_post)
1191 self.assertEqual(cm.exception.response,
1192 "435 Article not wanted")
1193
Georg Brandl28e78412013-10-27 07:29:47 +01001194 def test_too_long_lines(self):
1195 dt = datetime.datetime(2010, 1, 1, 9, 0, 0)
1196 self.assertRaises(nntplib.NNTPDataError,
1197 self.server.newnews, "comp.lang.python", dt)
1198
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001199
1200class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1201 """Tests an NNTP v1 server (no capabilities)."""
1202
1203 nntp_version = 1
1204 handler_class = NNTPv1Handler
1205
1206 def test_caps(self):
1207 caps = self.server.getcapabilities()
1208 self.assertEqual(caps, {})
1209 self.assertEqual(self.server.nntp_version, 1)
Antoine Pitroua0781152010-11-05 19:16:37 +00001210 self.assertEqual(self.server.nntp_implementation, None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001211
1212
1213class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1214 """Tests an NNTP v2 server (with capabilities)."""
1215
1216 nntp_version = 2
1217 handler_class = NNTPv2Handler
1218
1219 def test_caps(self):
1220 caps = self.server.getcapabilities()
1221 self.assertEqual(caps, {
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001222 'VERSION': ['2', '3'],
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001223 'IMPLEMENTATION': ['INN', '2.5.1'],
1224 'AUTHINFO': ['USER'],
1225 'HDR': [],
1226 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1227 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1228 'OVER': [],
1229 'POST': [],
1230 'READER': [],
1231 })
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001232 self.assertEqual(self.server.nntp_version, 3)
Antoine Pitroua0781152010-11-05 19:16:37 +00001233 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001234
1235
Antoine Pitrou54411c12012-02-12 19:14:17 +01001236class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
1237 """Tests a probably NNTP v2 server with capabilities only after login."""
1238
1239 nntp_version = 2
1240 handler_class = CapsAfterLoginNNTPv2Handler
1241
1242 def test_caps_only_after_login(self):
1243 self.assertEqual(self.server._caps, {})
1244 self.server.login('testuser', 'testpw')
1245 self.assertIn('VERSION', self.server._caps)
1246
1247
Antoine Pitrou71135622012-02-14 23:29:34 +01001248class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin,
1249 unittest.TestCase):
1250 """Same tests as for v2 but we tell NTTP to send MODE READER to a server
1251 that isn't in READER mode by default."""
1252
1253 nntp_version = 2
1254 handler_class = ModeSwitchingNNTPv2Handler
1255
1256 def test_we_are_in_reader_mode_after_connect(self):
1257 self.assertIn('READER', self.server._caps)
1258
1259
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001260class MiscTests(unittest.TestCase):
1261
1262 def test_decode_header(self):
1263 def gives(a, b):
1264 self.assertEqual(nntplib.decode_header(a), b)
1265 gives("" , "")
1266 gives("a plain header", "a plain header")
1267 gives(" with extra spaces ", " with extra spaces ")
1268 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
1269 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1270 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1271 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1272 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1273 "Re: problème de matrice")
1274 # A natively utf-8 header (found in the real world!)
1275 gives("Re: Message d'erreur incompréhensible (par moi)",
1276 "Re: Message d'erreur incompréhensible (par moi)")
1277
1278 def test_parse_overview_fmt(self):
1279 # The minimal (default) response
1280 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1281 "References:", ":bytes", ":lines"]
1282 self.assertEqual(nntplib._parse_overview_fmt(lines),
1283 ["subject", "from", "date", "message-id", "references",
1284 ":bytes", ":lines"])
1285 # The minimal response using alternative names
1286 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1287 "References:", "Bytes:", "Lines:"]
1288 self.assertEqual(nntplib._parse_overview_fmt(lines),
1289 ["subject", "from", "date", "message-id", "references",
1290 ":bytes", ":lines"])
1291 # Variations in casing
1292 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1293 "References:", "BYTES:", "Lines:"]
1294 self.assertEqual(nntplib._parse_overview_fmt(lines),
1295 ["subject", "from", "date", "message-id", "references",
1296 ":bytes", ":lines"])
1297 # First example from RFC 3977
1298 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1299 "References:", ":bytes", ":lines", "Xref:full",
1300 "Distribution:full"]
1301 self.assertEqual(nntplib._parse_overview_fmt(lines),
1302 ["subject", "from", "date", "message-id", "references",
1303 ":bytes", ":lines", "xref", "distribution"])
1304 # Second example from RFC 3977
1305 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1306 "References:", "Bytes:", "Lines:", "Xref:FULL",
1307 "Distribution:FULL"]
1308 self.assertEqual(nntplib._parse_overview_fmt(lines),
1309 ["subject", "from", "date", "message-id", "references",
1310 ":bytes", ":lines", "xref", "distribution"])
1311 # A classic response from INN
1312 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1313 "References:", "Bytes:", "Lines:", "Xref:full"]
1314 self.assertEqual(nntplib._parse_overview_fmt(lines),
1315 ["subject", "from", "date", "message-id", "references",
1316 ":bytes", ":lines", "xref"])
1317
1318 def test_parse_overview(self):
1319 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1320 # First example from RFC 3977
1321 lines = [
1322 '3000234\tI am just a test article\t"Demo User" '
1323 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1324 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1325 '17\tXref: news.example.com misc.test:3000363',
1326 ]
1327 overview = nntplib._parse_overview(lines, fmt)
1328 (art_num, fields), = overview
1329 self.assertEqual(art_num, 3000234)
1330 self.assertEqual(fields, {
1331 'subject': 'I am just a test article',
1332 'from': '"Demo User" <nobody@example.com>',
1333 'date': '6 Oct 1998 04:38:40 -0500',
1334 'message-id': '<45223423@example.com>',
1335 'references': '<45454@example.net>',
1336 ':bytes': '1234',
1337 ':lines': '17',
1338 'xref': 'news.example.com misc.test:3000363',
1339 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001340 # Second example; here the "Xref" field is totally absent (including
1341 # the header name) and comes out as None
1342 lines = [
1343 '3000234\tI am just a test article\t"Demo User" '
1344 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1345 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1346 '17\t\t',
1347 ]
1348 overview = nntplib._parse_overview(lines, fmt)
1349 (art_num, fields), = overview
1350 self.assertEqual(fields['xref'], None)
1351 # Third example; the "Xref" is an empty string, while "references"
1352 # is a single space.
1353 lines = [
1354 '3000234\tI am just a test article\t"Demo User" '
1355 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1356 '<45223423@example.com>\t \t1234\t'
1357 '17\tXref: \t',
1358 ]
1359 overview = nntplib._parse_overview(lines, fmt)
1360 (art_num, fields), = overview
1361 self.assertEqual(fields['references'], ' ')
1362 self.assertEqual(fields['xref'], '')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001363
1364 def test_parse_datetime(self):
1365 def gives(a, b, *c):
1366 self.assertEqual(nntplib._parse_datetime(a, b),
1367 datetime.datetime(*c))
1368 # Output of DATE command
1369 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1370 # Variations
1371 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1372 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1373 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1374
1375 def test_unparse_datetime(self):
1376 # Test non-legacy mode
1377 # 1) with a datetime
1378 def gives(y, M, d, h, m, s, date_str, time_str):
1379 dt = datetime.datetime(y, M, d, h, m, s)
1380 self.assertEqual(nntplib._unparse_datetime(dt),
1381 (date_str, time_str))
1382 self.assertEqual(nntplib._unparse_datetime(dt, False),
1383 (date_str, time_str))
1384 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1385 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1386 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1387 # 2) with a date
1388 def gives(y, M, d, date_str, time_str):
1389 dt = datetime.date(y, M, d)
1390 self.assertEqual(nntplib._unparse_datetime(dt),
1391 (date_str, time_str))
1392 self.assertEqual(nntplib._unparse_datetime(dt, False),
1393 (date_str, time_str))
1394 gives(1999, 6, 23, "19990623", "000000")
1395 gives(2000, 6, 23, "20000623", "000000")
1396 gives(2010, 6, 5, "20100605", "000000")
1397
1398 def test_unparse_datetime_legacy(self):
1399 # Test legacy mode (RFC 977)
1400 # 1) with a datetime
1401 def gives(y, M, d, h, m, s, date_str, time_str):
1402 dt = datetime.datetime(y, M, d, h, m, s)
1403 self.assertEqual(nntplib._unparse_datetime(dt, True),
1404 (date_str, time_str))
1405 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1406 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1407 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1408 # 2) with a date
1409 def gives(y, M, d, date_str, time_str):
1410 dt = datetime.date(y, M, d)
1411 self.assertEqual(nntplib._unparse_datetime(dt, True),
1412 (date_str, time_str))
1413 gives(1999, 6, 23, "990623", "000000")
1414 gives(2000, 6, 23, "000623", "000000")
1415 gives(2010, 6, 5, "100605", "000000")
1416
Serhiy Storchaka43767632013-11-03 21:31:38 +02001417 @unittest.skipUnless(ssl, 'requires SSL support')
1418 def test_ssl_support(self):
1419 self.assertTrue(hasattr(nntplib, 'NNTP_SSL'))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001420
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001421
Berker Peksag96756b62014-09-20 08:53:05 +03001422class PublicAPITests(unittest.TestCase):
1423 """Ensures that the correct values are exposed in the public API."""
1424
1425 def test_module_all_attribute(self):
1426 self.assertTrue(hasattr(nntplib, '__all__'))
1427 target_api = ['NNTP', 'NNTPError', 'NNTPReplyError',
1428 'NNTPTemporaryError', 'NNTPPermanentError',
1429 'NNTPProtocolError', 'NNTPDataError', 'decode_header']
1430 if ssl is not None:
1431 target_api.append('NNTP_SSL')
1432 self.assertEqual(set(nntplib.__all__), set(target_api))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001433
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001434class MockSocketTests(unittest.TestCase):
1435 """Tests involving a mock socket object
1436
1437 Used where the _NNTPServerIO file object is not enough."""
1438
1439 nntp_class = nntplib.NNTP
1440
1441 def check_constructor_error_conditions(
1442 self, handler_class,
1443 expected_error_type, expected_error_msg,
1444 login=None, password=None):
1445
1446 class mock_socket_module:
1447 def create_connection(address, timeout):
1448 return MockSocket()
1449
1450 class MockSocket:
1451 def close(self):
1452 nonlocal socket_closed
1453 socket_closed = True
1454
1455 def makefile(socket, mode):
1456 handler = handler_class()
1457 _, file = make_mock_file(handler)
1458 files.append(file)
1459 return file
1460
1461 socket_closed = False
1462 files = []
1463 with patch('nntplib.socket', mock_socket_module), \
1464 self.assertRaisesRegex(expected_error_type, expected_error_msg):
1465 self.nntp_class('dummy', user=login, password=password)
1466 self.assertTrue(socket_closed)
1467 for f in files:
1468 self.assertTrue(f.closed)
1469
1470 def test_bad_welcome(self):
1471 #Test a bad welcome message
1472 class Handler(NNTPv1Handler):
1473 welcome = 'Bad Welcome'
1474 self.check_constructor_error_conditions(
1475 Handler, nntplib.NNTPProtocolError, Handler.welcome)
1476
1477 def test_service_temporarily_unavailable(self):
1478 #Test service temporarily unavailable
1479 class Handler(NNTPv1Handler):
1480 welcome = '400 Service temporarily unavilable'
1481 self.check_constructor_error_conditions(
1482 Handler, nntplib.NNTPTemporaryError, Handler.welcome)
1483
1484 def test_service_permanently_unavailable(self):
1485 #Test service permanently unavailable
1486 class Handler(NNTPv1Handler):
1487 welcome = '502 Service permanently unavilable'
1488 self.check_constructor_error_conditions(
1489 Handler, nntplib.NNTPPermanentError, Handler.welcome)
1490
1491 def test_bad_capabilities(self):
1492 #Test a bad capabilities response
1493 class Handler(NNTPv1Handler):
1494 def handle_CAPABILITIES(self):
1495 self.push_lit(capabilities_response)
1496 capabilities_response = '201 bad capability'
1497 self.check_constructor_error_conditions(
1498 Handler, nntplib.NNTPReplyError, capabilities_response)
1499
1500 def test_login_aborted(self):
1501 #Test a bad authinfo response
1502 login = 't@e.com'
1503 password = 'python'
1504 class Handler(NNTPv1Handler):
1505 def handle_AUTHINFO(self, *args):
1506 self.push_lit(authinfo_response)
1507 authinfo_response = '503 Mechanism not recognized'
1508 self.check_constructor_error_conditions(
1509 Handler, nntplib.NNTPPermanentError, authinfo_response,
1510 login, password)
1511
1512@unittest.skipUnless(ssl, 'requires SSL support')
1513class MockSslTests(MockSocketTests):
1514 class nntp_class(nntplib.NNTP_SSL):
1515 def __init__(self, *pos, **kw):
1516 class bypass_context:
1517 """Bypass encryption and actual SSL module"""
1518 def wrap_socket(sock, **args):
1519 return sock
1520 return super().__init__(*pos, ssl_context=bypass_context, **kw)
1521
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001522if __name__ == "__main__":
Berker Peksag96756b62014-09-20 08:53:05 +03001523 unittest.main()