blob: 2ef6d02c2bffbe5b25e72851e3d8fb33c4e9acf8 [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +00002import socket
Antoine Pitrou69ab9512010-09-29 15:03:40 +00003import datetime
4import textwrap
5import unittest
Antoine Pitroude609182010-11-18 17:29:23 +00006import functools
Antoine Pitrou69ab9512010-09-29 15:03:40 +00007import contextlib
Martin Panter8f19e8e2016-01-19 01:10:58 +00008import os.path
Antoine Pitrou69ab9512010-09-29 15:03:40 +00009from test import support
Serhiy Storchaka43767632013-11-03 21:31:38 +020010from nntplib import NNTP, GroupInfo
Antoine Pitrou69ab9512010-09-29 15:03:40 +000011import nntplib
Serhiy Storchaka52027c32015-03-21 09:40:26 +020012from unittest.mock import patch
Serhiy Storchaka43767632013-11-03 21:31:38 +020013try:
Antoine Pitrou1cb121e2010-11-09 18:54:37 +000014 import ssl
Serhiy Storchaka43767632013-11-03 21:31:38 +020015except ImportError:
16 ssl = None
Martin Panter8f19e8e2016-01-19 01:10:58 +000017try:
18 import threading
19except ImportError:
20 threading = None
Antoine Pitrou69ab9512010-09-29 15:03:40 +000021
22TIMEOUT = 30
Martin Panter8f19e8e2016-01-19 01:10:58 +000023certfile = os.path.join(os.path.dirname(__file__), 'keycert3.pem')
Antoine Pitrou69ab9512010-09-29 15:03:40 +000024
25# TODO:
26# - test the `file` arg to more commands
27# - test error conditions
Antoine Pitroua5785b12010-09-29 16:19:50 +000028# - test auth and `usenetrc`
Antoine Pitrou69ab9512010-09-29 15:03:40 +000029
30
31class NetworkedNNTPTestsMixin:
32
33 def test_welcome(self):
34 welcome = self.server.getwelcome()
35 self.assertEqual(str, type(welcome))
36
37 def test_help(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000038 resp, lines = self.server.help()
Antoine Pitrou69ab9512010-09-29 15:03:40 +000039 self.assertTrue(resp.startswith("100 "), resp)
Antoine Pitrou08eeada2010-11-04 21:36:15 +000040 for line in lines:
Antoine Pitrou69ab9512010-09-29 15:03:40 +000041 self.assertEqual(str, type(line))
42
43 def test_list(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000044 resp, groups = self.server.list()
45 if len(groups) > 0:
46 self.assertEqual(GroupInfo, type(groups[0]))
47 self.assertEqual(str, type(groups[0].group))
48
49 def test_list_active(self):
50 resp, groups = self.server.list(self.GROUP_PAT)
51 if len(groups) > 0:
52 self.assertEqual(GroupInfo, type(groups[0]))
53 self.assertEqual(str, type(groups[0].group))
Antoine Pitrou69ab9512010-09-29 15:03:40 +000054
55 def test_unknown_command(self):
56 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
57 self.server._shortcmd("XYZZY")
58 resp = cm.exception.response
59 self.assertTrue(resp.startswith("500 "), resp)
60
61 def test_newgroups(self):
62 # gmane gets a constant influx of new groups. In order not to stress
63 # the server too much, we choose a recent date in the past.
64 dt = datetime.date.today() - datetime.timedelta(days=7)
65 resp, groups = self.server.newgroups(dt)
66 if len(groups) > 0:
67 self.assertIsInstance(groups[0], GroupInfo)
68 self.assertIsInstance(groups[0].group, str)
69
70 def test_description(self):
71 def _check_desc(desc):
72 # Sanity checks
73 self.assertIsInstance(desc, str)
74 self.assertNotIn(self.GROUP_NAME, desc)
75 desc = self.server.description(self.GROUP_NAME)
76 _check_desc(desc)
77 # Another sanity check
78 self.assertIn("Python", desc)
79 # With a pattern
80 desc = self.server.description(self.GROUP_PAT)
81 _check_desc(desc)
82 # Shouldn't exist
83 desc = self.server.description("zk.brrtt.baz")
84 self.assertEqual(desc, '')
85
86 def test_descriptions(self):
87 resp, descs = self.server.descriptions(self.GROUP_PAT)
88 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
89 self.assertTrue(
90 resp.startswith("215 ") or resp.startswith("282 "), resp)
91 self.assertIsInstance(descs, dict)
92 desc = descs[self.GROUP_NAME]
93 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
94
95 def test_group(self):
96 result = self.server.group(self.GROUP_NAME)
97 self.assertEqual(5, len(result))
98 resp, count, first, last, group = result
99 self.assertEqual(group, self.GROUP_NAME)
100 self.assertIsInstance(count, int)
101 self.assertIsInstance(first, int)
102 self.assertIsInstance(last, int)
103 self.assertLessEqual(first, last)
104 self.assertTrue(resp.startswith("211 "), resp)
105
106 def test_date(self):
107 resp, date = self.server.date()
108 self.assertIsInstance(date, datetime.datetime)
109 # Sanity check
110 self.assertGreaterEqual(date.year, 1995)
111 self.assertLessEqual(date.year, 2030)
112
113 def _check_art_dict(self, art_dict):
114 # Some sanity checks for a field dictionary returned by OVER / XOVER
115 self.assertIsInstance(art_dict, dict)
116 # NNTP has 7 mandatory fields
117 self.assertGreaterEqual(art_dict.keys(),
118 {"subject", "from", "date", "message-id",
119 "references", ":bytes", ":lines"}
120 )
121 for v in art_dict.values():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000122 self.assertIsInstance(v, (str, type(None)))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000123
124 def test_xover(self):
125 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000126 resp, lines = self.server.xover(last - 5, last)
127 if len(lines) == 0:
128 self.skipTest("no articles retrieved")
129 # The 'last' article is not necessarily part of the output (cancelled?)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000130 art_num, art_dict = lines[0]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000131 self.assertGreaterEqual(art_num, last - 5)
132 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000133 self._check_art_dict(art_dict)
134
135 def test_over(self):
136 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
137 start = last - 10
138 # The "start-" article range form
139 resp, lines = self.server.over((start, None))
140 art_num, art_dict = lines[0]
141 self._check_art_dict(art_dict)
142 # The "start-end" article range form
143 resp, lines = self.server.over((start, last))
144 art_num, art_dict = lines[-1]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000145 # The 'last' article is not necessarily part of the output (cancelled?)
146 self.assertGreaterEqual(art_num, start)
147 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000148 self._check_art_dict(art_dict)
149 # XXX The "message_id" form is unsupported by gmane
150 # 503 Overview by message-ID unsupported
151
152 def test_xhdr(self):
153 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
154 resp, lines = self.server.xhdr('subject', last)
155 for line in lines:
156 self.assertEqual(str, type(line[1]))
157
158 def check_article_resp(self, resp, article, art_num=None):
159 self.assertIsInstance(article, nntplib.ArticleInfo)
160 if art_num is not None:
161 self.assertEqual(article.number, art_num)
162 for line in article.lines:
163 self.assertIsInstance(line, bytes)
164 # XXX this could exceptionally happen...
165 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
166
167 def test_article_head_body(self):
168 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000169 # Try to find an available article
170 for art_num in (last, first, last - 1):
171 try:
172 resp, head = self.server.head(art_num)
173 except nntplib.NNTPTemporaryError as e:
174 if not e.response.startswith("423 "):
175 raise
176 # "423 No such article" => choose another one
177 continue
178 break
179 else:
180 self.skipTest("could not find a suitable article number")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000181 self.assertTrue(resp.startswith("221 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000182 self.check_article_resp(resp, head, art_num)
183 resp, body = self.server.body(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000184 self.assertTrue(resp.startswith("222 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000185 self.check_article_resp(resp, body, art_num)
186 resp, article = self.server.article(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000187 self.assertTrue(resp.startswith("220 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000188 self.check_article_resp(resp, article, art_num)
Nick Coghlan14d99a12012-06-17 21:27:18 +1000189 # Tolerate running the tests from behind a NNTP virus checker
Antoine Pitrou1f5d2a02012-06-24 16:28:18 +0200190 blacklist = lambda line: line.startswith(b'X-Antivirus')
191 filtered_head_lines = [line for line in head.lines
192 if not blacklist(line)]
Nick Coghlan14d99a12012-06-17 21:27:18 +1000193 filtered_lines = [line for line in article.lines
Antoine Pitrou1f5d2a02012-06-24 16:28:18 +0200194 if not blacklist(line)]
195 self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000196
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000197 def test_capabilities(self):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000198 # The server under test implements NNTP version 2 and has a
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000199 # couple of well-known capabilities. Just sanity check that we
200 # got them.
201 def _check_caps(caps):
202 caps_list = caps['LIST']
203 self.assertIsInstance(caps_list, (list, tuple))
204 self.assertIn('OVERVIEW.FMT', caps_list)
205 self.assertGreaterEqual(self.server.nntp_version, 2)
206 _check_caps(self.server.getcapabilities())
207 # This re-emits the command
208 resp, caps = self.server.capabilities()
209 _check_caps(caps)
210
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000211 def test_zlogin(self):
212 # This test must be the penultimate because further commands will be
213 # refused.
214 baduser = "notarealuser"
215 badpw = "notarealpassword"
216 # Check that bogus credentials cause failure
217 self.assertRaises(nntplib.NNTPError, self.server.login,
218 user=baduser, password=badpw, usenetrc=False)
219 # FIXME: We should check that correct credentials succeed, but that
220 # would require valid details for some server somewhere to be in the
221 # test suite, I think. Gmane is anonymous, at least as used for the
222 # other tests.
223
224 def test_zzquit(self):
225 # This test must be called last, hence the name
226 cls = type(self)
Antoine Pitrou3bce11c2010-11-21 17:14:19 +0000227 try:
228 self.server.quit()
229 finally:
230 cls.server = None
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000231
Antoine Pitroude609182010-11-18 17:29:23 +0000232 @classmethod
233 def wrap_methods(cls):
234 # Wrap all methods in a transient_internet() exception catcher
235 # XXX put a generic version in test.support?
236 def wrap_meth(meth):
237 @functools.wraps(meth)
238 def wrapped(self):
239 with support.transient_internet(self.NNTP_HOST):
240 meth(self)
241 return wrapped
242 for name in dir(cls):
243 if not name.startswith('test_'):
244 continue
245 meth = getattr(cls, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200246 if not callable(meth):
Antoine Pitroude609182010-11-18 17:29:23 +0000247 continue
248 # Need to use a closure so that meth remains bound to its current
249 # value
250 setattr(cls, name, wrap_meth(meth))
251
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000252 def test_with_statement(self):
253 def is_connected():
254 if not hasattr(server, 'file'):
255 return False
256 try:
257 server.help()
Andrew Svetlov0832af62012-12-18 23:10:48 +0200258 except (OSError, EOFError):
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000259 return False
260 return True
261
262 with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
263 self.assertTrue(is_connected())
264 self.assertTrue(server.help())
265 self.assertFalse(is_connected())
266
267 with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
268 server.quit()
269 self.assertFalse(is_connected())
270
271
Antoine Pitroude609182010-11-18 17:29:23 +0000272NetworkedNNTPTestsMixin.wrap_methods()
273
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000274
275class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
276 # This server supports STARTTLS (gmane doesn't)
277 NNTP_HOST = 'news.trigofacile.com'
278 GROUP_NAME = 'fr.comp.lang.python'
279 GROUP_PAT = 'fr.comp.lang.*'
280
Antoine Pitroude609182010-11-18 17:29:23 +0000281 NNTP_CLASS = NNTP
282
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000283 @classmethod
284 def setUpClass(cls):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000285 support.requires("network")
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000286 with support.transient_internet(cls.NNTP_HOST):
Antoine Pitroude609182010-11-18 17:29:23 +0000287 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, timeout=TIMEOUT, usenetrc=False)
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000288
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000289 @classmethod
290 def tearDownClass(cls):
291 if cls.server is not None:
292 cls.server.quit()
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000293
Serhiy Storchaka43767632013-11-03 21:31:38 +0200294@unittest.skipUnless(ssl, 'requires SSL support')
295class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000296
Serhiy Storchaka43767632013-11-03 21:31:38 +0200297 # Technical limits for this public NNTP server (see http://www.aioe.org):
298 # "Only two concurrent connections per IP address are allowed and
299 # 400 connections per day are accepted from each IP address."
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000300
Serhiy Storchaka43767632013-11-03 21:31:38 +0200301 NNTP_HOST = 'nntp.aioe.org'
302 GROUP_NAME = 'comp.lang.python'
303 GROUP_PAT = 'comp.lang.*'
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000304
Serhiy Storchaka43767632013-11-03 21:31:38 +0200305 NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None)
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000306
Serhiy Storchaka43767632013-11-03 21:31:38 +0200307 # Disabled as it produces too much data
308 test_list = None
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000309
Serhiy Storchaka43767632013-11-03 21:31:38 +0200310 # Disabled as the connection will already be encrypted.
311 test_starttls = None
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000312
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000313
314#
315# Non-networked tests using a local server (or something mocking it).
316#
317
318class _NNTPServerIO(io.RawIOBase):
319 """A raw IO object allowing NNTP commands to be received and processed
320 by a handler. The handler can push responses which can then be read
321 from the IO object."""
322
323 def __init__(self, handler):
324 io.RawIOBase.__init__(self)
325 # The channel from the client
326 self.c2s = io.BytesIO()
327 # The channel to the client
328 self.s2c = io.BytesIO()
329 self.handler = handler
330 self.handler.start(self.c2s.readline, self.push_data)
331
332 def readable(self):
333 return True
334
335 def writable(self):
336 return True
337
338 def push_data(self, data):
339 """Push (buffer) some data to send to the client."""
340 pos = self.s2c.tell()
341 self.s2c.seek(0, 2)
342 self.s2c.write(data)
343 self.s2c.seek(pos)
344
345 def write(self, b):
346 """The client sends us some data"""
347 pos = self.c2s.tell()
348 self.c2s.write(b)
349 self.c2s.seek(pos)
350 self.handler.process_pending()
351 return len(b)
352
353 def readinto(self, buf):
354 """The client wants to read a response"""
355 self.handler.process_pending()
356 b = self.s2c.read(len(buf))
357 n = len(b)
358 buf[:n] = b
359 return n
360
361
Serhiy Storchaka52027c32015-03-21 09:40:26 +0200362def make_mock_file(handler):
363 sio = _NNTPServerIO(handler)
364 # Using BufferedRWPair instead of BufferedRandom ensures the file
365 # isn't seekable.
366 file = io.BufferedRWPair(sio, sio)
367 return (sio, file)
368
369
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000370class MockedNNTPTestsMixin:
371 # Override in derived classes
372 handler_class = None
373
374 def setUp(self):
375 super().setUp()
376 self.make_server()
377
378 def tearDown(self):
379 super().tearDown()
380 del self.server
381
382 def make_server(self, *args, **kwargs):
383 self.handler = self.handler_class()
Serhiy Storchaka52027c32015-03-21 09:40:26 +0200384 self.sio, file = make_mock_file(self.handler)
Antoine Pitroua5785b12010-09-29 16:19:50 +0000385 self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000386 return self.server
387
388
Antoine Pitrou71135622012-02-14 23:29:34 +0100389class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin):
390 def setUp(self):
391 super().setUp()
392 self.make_server(readermode=True)
393
394
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000395class NNTPv1Handler:
396 """A handler for RFC 977"""
397
398 welcome = "200 NNTP mock server"
399
400 def start(self, readline, push_data):
401 self.in_body = False
402 self.allow_posting = True
403 self._readline = readline
404 self._push_data = push_data
Antoine Pitrou54411c12012-02-12 19:14:17 +0100405 self._logged_in = False
406 self._user_sent = False
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000407 # Our welcome
408 self.handle_welcome()
409
410 def _decode(self, data):
411 return str(data, "utf-8", "surrogateescape")
412
413 def process_pending(self):
414 if self.in_body:
415 while True:
416 line = self._readline()
417 if not line:
418 return
419 self.body.append(line)
420 if line == b".\r\n":
421 break
422 try:
423 meth, tokens = self.body_callback
424 meth(*tokens, body=self.body)
425 finally:
426 self.body_callback = None
427 self.body = None
428 self.in_body = False
429 while True:
430 line = self._decode(self._readline())
431 if not line:
432 return
433 if not line.endswith("\r\n"):
434 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
435 line = line[:-2]
436 cmd, *tokens = line.split()
437 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
438 meth = getattr(self, "handle_" + cmd.upper(), None)
439 if meth is None:
440 self.handle_unknown()
441 else:
442 try:
443 meth(*tokens)
444 except Exception as e:
445 raise ValueError("command failed: {!r}".format(line)) from e
446 else:
447 if self.in_body:
448 self.body_callback = meth, tokens
449 self.body = []
450
451 def expect_body(self):
452 """Flag that the client is expected to post a request body"""
453 self.in_body = True
454
455 def push_data(self, data):
456 """Push some binary data"""
457 self._push_data(data)
458
459 def push_lit(self, lit):
460 """Push a string literal"""
461 lit = textwrap.dedent(lit)
462 lit = "\r\n".join(lit.splitlines()) + "\r\n"
463 lit = lit.encode('utf-8')
464 self.push_data(lit)
465
466 def handle_unknown(self):
467 self.push_lit("500 What?")
468
469 def handle_welcome(self):
470 self.push_lit(self.welcome)
471
472 def handle_QUIT(self):
473 self.push_lit("205 Bye!")
474
475 def handle_DATE(self):
476 self.push_lit("111 20100914001155")
477
478 def handle_GROUP(self, group):
479 if group == "fr.comp.lang.python":
480 self.push_lit("211 486 761 1265 fr.comp.lang.python")
481 else:
482 self.push_lit("411 No such group {}".format(group))
483
484 def handle_HELP(self):
485 self.push_lit("""\
486 100 Legal commands
487 authinfo user Name|pass Password|generic <prog> <args>
488 date
489 help
490 Report problems to <root@example.org>
491 .""")
492
493 def handle_STAT(self, message_spec=None):
494 if message_spec is None:
495 self.push_lit("412 No newsgroup selected")
496 elif message_spec == "3000234":
497 self.push_lit("223 3000234 <45223423@example.com>")
498 elif message_spec == "<45223423@example.com>":
499 self.push_lit("223 0 <45223423@example.com>")
500 else:
501 self.push_lit("430 No Such Article Found")
502
503 def handle_NEXT(self):
504 self.push_lit("223 3000237 <668929@example.org> retrieved")
505
506 def handle_LAST(self):
507 self.push_lit("223 3000234 <45223423@example.com> retrieved")
508
509 def handle_LIST(self, action=None, param=None):
510 if action is None:
511 self.push_lit("""\
512 215 Newsgroups in form "group high low flags".
513 comp.lang.python 0000052340 0000002828 y
514 comp.lang.python.announce 0000001153 0000000993 m
515 free.it.comp.lang.python 0000000002 0000000002 y
516 fr.comp.lang.python 0000001254 0000000760 y
517 free.it.comp.lang.python.learner 0000000000 0000000001 y
518 tw.bbs.comp.lang.python 0000000304 0000000304 y
519 .""")
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000520 elif action == "ACTIVE":
521 if param == "*distutils*":
522 self.push_lit("""\
523 215 Newsgroups in form "group high low flags"
524 gmane.comp.python.distutils.devel 0000014104 0000000001 m
525 gmane.comp.python.distutils.cvs 0000000000 0000000001 m
526 .""")
527 else:
528 self.push_lit("""\
529 215 Newsgroups in form "group high low flags"
530 .""")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000531 elif action == "OVERVIEW.FMT":
532 self.push_lit("""\
533 215 Order of fields in overview database.
534 Subject:
535 From:
536 Date:
537 Message-ID:
538 References:
539 Bytes:
540 Lines:
541 Xref:full
542 .""")
543 elif action == "NEWSGROUPS":
544 assert param is not None
545 if param == "comp.lang.python":
546 self.push_lit("""\
547 215 Descriptions in form "group description".
548 comp.lang.python\tThe Python computer language.
549 .""")
550 elif param == "comp.lang.python*":
551 self.push_lit("""\
552 215 Descriptions in form "group description".
553 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
554 comp.lang.python\tThe Python computer language.
555 .""")
556 else:
557 self.push_lit("""\
558 215 Descriptions in form "group description".
559 .""")
560 else:
561 self.push_lit('501 Unknown LIST keyword')
562
563 def handle_NEWNEWS(self, group, date_str, time_str):
564 # We hard code different return messages depending on passed
565 # argument and date syntax.
566 if (group == "comp.lang.python" and date_str == "20100913"
567 and time_str == "082004"):
568 # Date was passed in RFC 3977 format (NNTP "v2")
569 self.push_lit("""\
570 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
571 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
572 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
573 .""")
574 elif (group == "comp.lang.python" and date_str == "100913"
575 and time_str == "082004"):
576 # Date was passed in RFC 977 format (NNTP "v1")
577 self.push_lit("""\
578 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
579 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
580 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
581 .""")
Georg Brandl28e78412013-10-27 07:29:47 +0100582 elif (group == 'comp.lang.python' and
583 date_str in ('20100101', '100101') and
584 time_str == '090000'):
585 self.push_lit('too long line' * 3000 +
586 '\n.')
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000587 else:
588 self.push_lit("""\
589 230 An empty list of newsarticles follows
590 .""")
591 # (Note for experiments: many servers disable NEWNEWS.
592 # As of this writing, sicinfo3.epfl.ch doesn't.)
593
594 def handle_XOVER(self, message_spec):
595 if message_spec == "57-59":
596 self.push_lit(
597 "224 Overview information for 57-58 follows\n"
598 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
599 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
600 "\tSat, 19 Jun 2010 18:04:08 -0400"
601 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
602 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
603 "\tXref: news.gmane.org gmane.comp.python.authors:57"
604 "\n"
605 "58\tLooking for a few good bloggers"
606 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
607 "\tThu, 22 Jul 2010 09:14:14 -0400"
608 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
609 "\t\t6683\t16"
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000610 "\t"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000611 "\n"
Martin Panter6245cb32016-04-15 02:14:19 +0000612 # A UTF-8 overview line from fr.comp.lang.python
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000613 "59\tRe: Message d'erreur incompréhensible (par moi)"
614 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
615 "\tWed, 15 Sep 2010 18:09:15 +0200"
616 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
617 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
618 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
619 "\n"
620 ".\n")
621 else:
622 self.push_lit("""\
623 224 No articles
624 .""")
625
626 def handle_POST(self, *, body=None):
627 if body is None:
628 if self.allow_posting:
629 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
630 self.expect_body()
631 else:
632 self.push_lit("440 Posting not permitted")
633 else:
634 assert self.allow_posting
635 self.push_lit("240 Article received OK")
636 self.posted_body = body
637
638 def handle_IHAVE(self, message_id, *, body=None):
639 if body is None:
640 if (self.allow_posting and
641 message_id == "<i.am.an.article.you.will.want@example.com>"):
642 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
643 self.expect_body()
644 else:
645 self.push_lit("435 Article not wanted")
646 else:
647 assert self.allow_posting
648 self.push_lit("235 Article transferred OK")
649 self.posted_body = body
650
651 sample_head = """\
652 From: "Demo User" <nobody@example.net>
653 Subject: I am just a test article
654 Content-Type: text/plain; charset=UTF-8; format=flowed
655 Message-ID: <i.am.an.article.you.will.want@example.com>"""
656
657 sample_body = """\
658 This is just a test article.
659 ..Here is a dot-starting line.
660
661 -- Signed by Andr\xe9."""
662
663 sample_article = sample_head + "\n\n" + sample_body
664
665 def handle_ARTICLE(self, message_spec=None):
666 if message_spec is None:
667 self.push_lit("220 3000237 <45223423@example.com>")
668 elif message_spec == "<45223423@example.com>":
669 self.push_lit("220 0 <45223423@example.com>")
670 elif message_spec == "3000234":
671 self.push_lit("220 3000234 <45223423@example.com>")
672 else:
673 self.push_lit("430 No Such Article Found")
674 return
675 self.push_lit(self.sample_article)
676 self.push_lit(".")
677
678 def handle_HEAD(self, message_spec=None):
679 if message_spec is None:
680 self.push_lit("221 3000237 <45223423@example.com>")
681 elif message_spec == "<45223423@example.com>":
682 self.push_lit("221 0 <45223423@example.com>")
683 elif message_spec == "3000234":
684 self.push_lit("221 3000234 <45223423@example.com>")
685 else:
686 self.push_lit("430 No Such Article Found")
687 return
688 self.push_lit(self.sample_head)
689 self.push_lit(".")
690
691 def handle_BODY(self, message_spec=None):
692 if message_spec is None:
693 self.push_lit("222 3000237 <45223423@example.com>")
694 elif message_spec == "<45223423@example.com>":
695 self.push_lit("222 0 <45223423@example.com>")
696 elif message_spec == "3000234":
697 self.push_lit("222 3000234 <45223423@example.com>")
698 else:
699 self.push_lit("430 No Such Article Found")
700 return
701 self.push_lit(self.sample_body)
702 self.push_lit(".")
703
Antoine Pitrou54411c12012-02-12 19:14:17 +0100704 def handle_AUTHINFO(self, cred_type, data):
705 if self._logged_in:
706 self.push_lit('502 Already Logged In')
707 elif cred_type == 'user':
708 if self._user_sent:
709 self.push_lit('482 User Credential Already Sent')
710 else:
711 self.push_lit('381 Password Required')
712 self._user_sent = True
713 elif cred_type == 'pass':
714 self.push_lit('281 Login Successful')
715 self._logged_in = True
716 else:
717 raise Exception('Unknown cred type {}'.format(cred_type))
718
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000719
720class NNTPv2Handler(NNTPv1Handler):
721 """A handler for RFC 3977 (NNTP "v2")"""
722
723 def handle_CAPABILITIES(self):
Antoine Pitrou54411c12012-02-12 19:14:17 +0100724 fmt = """\
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000725 101 Capability list:
Antoine Pitrouf80b3f72010-11-02 22:31:52 +0000726 VERSION 2 3
Antoine Pitrou54411c12012-02-12 19:14:17 +0100727 IMPLEMENTATION INN 2.5.1{}
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000728 HDR
729 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
730 OVER
731 POST
732 READER
Antoine Pitrou54411c12012-02-12 19:14:17 +0100733 ."""
734
735 if not self._logged_in:
736 self.push_lit(fmt.format('\n AUTHINFO USER'))
737 else:
738 self.push_lit(fmt.format(''))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000739
Antoine Pitrou71135622012-02-14 23:29:34 +0100740 def handle_MODE(self, _):
741 raise Exception('MODE READER sent despite READER has been advertised')
742
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000743 def handle_OVER(self, message_spec=None):
744 return self.handle_XOVER(message_spec)
745
746
Antoine Pitrou54411c12012-02-12 19:14:17 +0100747class CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
748 """A handler that allows CAPABILITIES only after login"""
749
750 def handle_CAPABILITIES(self):
751 if not self._logged_in:
752 self.push_lit('480 You must log in.')
753 else:
754 super().handle_CAPABILITIES()
755
756
Antoine Pitrou71135622012-02-14 23:29:34 +0100757class ModeSwitchingNNTPv2Handler(NNTPv2Handler):
758 """A server that starts in transit mode"""
759
760 def __init__(self):
761 self._switched = False
762
763 def handle_CAPABILITIES(self):
764 fmt = """\
765 101 Capability list:
766 VERSION 2 3
767 IMPLEMENTATION INN 2.5.1
768 HDR
769 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
770 OVER
771 POST
772 {}READER
773 ."""
774 if self._switched:
775 self.push_lit(fmt.format(''))
776 else:
777 self.push_lit(fmt.format('MODE-'))
778
779 def handle_MODE(self, what):
780 assert not self._switched and what == 'reader'
781 self._switched = True
782 self.push_lit('200 Posting allowed')
783
784
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000785class NNTPv1v2TestsMixin:
786
787 def setUp(self):
788 super().setUp()
789
790 def test_welcome(self):
791 self.assertEqual(self.server.welcome, self.handler.welcome)
792
Antoine Pitrou54411c12012-02-12 19:14:17 +0100793 def test_authinfo(self):
794 if self.nntp_version == 2:
795 self.assertIn('AUTHINFO', self.server._caps)
796 self.server.login('testuser', 'testpw')
797 # if AUTHINFO is gone from _caps we also know that getcapabilities()
798 # has been called after login as it should
799 self.assertNotIn('AUTHINFO', self.server._caps)
800
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000801 def test_date(self):
802 resp, date = self.server.date()
803 self.assertEqual(resp, "111 20100914001155")
804 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
805
806 def test_quit(self):
807 self.assertFalse(self.sio.closed)
808 resp = self.server.quit()
809 self.assertEqual(resp, "205 Bye!")
810 self.assertTrue(self.sio.closed)
811
812 def test_help(self):
813 resp, help = self.server.help()
814 self.assertEqual(resp, "100 Legal commands")
815 self.assertEqual(help, [
816 ' authinfo user Name|pass Password|generic <prog> <args>',
817 ' date',
818 ' help',
819 'Report problems to <root@example.org>',
820 ])
821
822 def test_list(self):
823 resp, groups = self.server.list()
824 self.assertEqual(len(groups), 6)
825 g = groups[1]
826 self.assertEqual(g,
827 GroupInfo("comp.lang.python.announce", "0000001153",
828 "0000000993", "m"))
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000829 resp, groups = self.server.list("*distutils*")
830 self.assertEqual(len(groups), 2)
831 g = groups[0]
832 self.assertEqual(g,
833 GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
834 "0000000001", "m"))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000835
836 def test_stat(self):
837 resp, art_num, message_id = self.server.stat(3000234)
838 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
839 self.assertEqual(art_num, 3000234)
840 self.assertEqual(message_id, "<45223423@example.com>")
841 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
842 self.assertEqual(resp, "223 0 <45223423@example.com>")
843 self.assertEqual(art_num, 0)
844 self.assertEqual(message_id, "<45223423@example.com>")
845 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
846 self.server.stat("<non.existent.id>")
847 self.assertEqual(cm.exception.response, "430 No Such Article Found")
848 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
849 self.server.stat()
850 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
851
852 def test_next(self):
853 resp, art_num, message_id = self.server.next()
854 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
855 self.assertEqual(art_num, 3000237)
856 self.assertEqual(message_id, "<668929@example.org>")
857
858 def test_last(self):
859 resp, art_num, message_id = self.server.last()
860 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
861 self.assertEqual(art_num, 3000234)
862 self.assertEqual(message_id, "<45223423@example.com>")
863
864 def test_description(self):
865 desc = self.server.description("comp.lang.python")
866 self.assertEqual(desc, "The Python computer language.")
867 desc = self.server.description("comp.lang.pythonx")
868 self.assertEqual(desc, "")
869
870 def test_descriptions(self):
871 resp, groups = self.server.descriptions("comp.lang.python")
872 self.assertEqual(resp, '215 Descriptions in form "group description".')
873 self.assertEqual(groups, {
874 "comp.lang.python": "The Python computer language.",
875 })
876 resp, groups = self.server.descriptions("comp.lang.python*")
877 self.assertEqual(groups, {
878 "comp.lang.python": "The Python computer language.",
879 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
880 })
881 resp, groups = self.server.descriptions("comp.lang.pythonx")
882 self.assertEqual(groups, {})
883
884 def test_group(self):
885 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
886 self.assertTrue(resp.startswith("211 "), resp)
887 self.assertEqual(first, 761)
888 self.assertEqual(last, 1265)
889 self.assertEqual(count, 486)
890 self.assertEqual(group, "fr.comp.lang.python")
891 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
892 self.server.group("comp.lang.python.devel")
893 exc = cm.exception
894 self.assertTrue(exc.response.startswith("411 No such group"),
895 exc.response)
896
897 def test_newnews(self):
898 # NEWNEWS comp.lang.python [20]100913 082004
899 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
900 resp, ids = self.server.newnews("comp.lang.python", dt)
901 expected = (
902 "230 list of newsarticles (NNTP v{0}) "
903 "created after Mon Sep 13 08:20:04 2010 follows"
904 ).format(self.nntp_version)
905 self.assertEqual(resp, expected)
906 self.assertEqual(ids, [
907 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
908 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
909 ])
910 # NEWNEWS fr.comp.lang.python [20]100913 082004
911 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
912 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
913 self.assertEqual(resp, "230 An empty list of newsarticles follows")
914 self.assertEqual(ids, [])
915
916 def _check_article_body(self, lines):
917 self.assertEqual(len(lines), 4)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +0000918 self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000919 self.assertEqual(lines[-2], b"")
920 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
921 self.assertEqual(lines[-4], b"This is just a test article.")
922
923 def _check_article_head(self, lines):
924 self.assertEqual(len(lines), 4)
925 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
926 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
927
928 def _check_article_data(self, lines):
929 self.assertEqual(len(lines), 9)
930 self._check_article_head(lines[:4])
931 self._check_article_body(lines[-4:])
932 self.assertEqual(lines[4], b"")
933
934 def test_article(self):
935 # ARTICLE
936 resp, info = self.server.article()
937 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
938 art_num, message_id, lines = info
939 self.assertEqual(art_num, 3000237)
940 self.assertEqual(message_id, "<45223423@example.com>")
941 self._check_article_data(lines)
942 # ARTICLE num
943 resp, info = self.server.article(3000234)
944 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
945 art_num, message_id, lines = info
946 self.assertEqual(art_num, 3000234)
947 self.assertEqual(message_id, "<45223423@example.com>")
948 self._check_article_data(lines)
949 # ARTICLE id
950 resp, info = self.server.article("<45223423@example.com>")
951 self.assertEqual(resp, "220 0 <45223423@example.com>")
952 art_num, message_id, lines = info
953 self.assertEqual(art_num, 0)
954 self.assertEqual(message_id, "<45223423@example.com>")
955 self._check_article_data(lines)
956 # Non-existent id
957 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
958 self.server.article("<non-existent@example.com>")
959 self.assertEqual(cm.exception.response, "430 No Such Article Found")
960
961 def test_article_file(self):
962 # With a "file" argument
963 f = io.BytesIO()
964 resp, info = self.server.article(file=f)
965 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
966 art_num, message_id, lines = info
967 self.assertEqual(art_num, 3000237)
968 self.assertEqual(message_id, "<45223423@example.com>")
969 self.assertEqual(lines, [])
970 data = f.getvalue()
971 self.assertTrue(data.startswith(
972 b'From: "Demo User" <nobody@example.net>\r\n'
973 b'Subject: I am just a test article\r\n'
974 ), ascii(data))
975 self.assertTrue(data.endswith(
976 b'This is just a test article.\r\n'
977 b'.Here is a dot-starting line.\r\n'
978 b'\r\n'
979 b'-- Signed by Andr\xc3\xa9.\r\n'
980 ), ascii(data))
981
982 def test_head(self):
983 # HEAD
984 resp, info = self.server.head()
985 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
986 art_num, message_id, lines = info
987 self.assertEqual(art_num, 3000237)
988 self.assertEqual(message_id, "<45223423@example.com>")
989 self._check_article_head(lines)
990 # HEAD num
991 resp, info = self.server.head(3000234)
992 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
993 art_num, message_id, lines = info
994 self.assertEqual(art_num, 3000234)
995 self.assertEqual(message_id, "<45223423@example.com>")
996 self._check_article_head(lines)
997 # HEAD id
998 resp, info = self.server.head("<45223423@example.com>")
999 self.assertEqual(resp, "221 0 <45223423@example.com>")
1000 art_num, message_id, lines = info
1001 self.assertEqual(art_num, 0)
1002 self.assertEqual(message_id, "<45223423@example.com>")
1003 self._check_article_head(lines)
1004 # Non-existent id
1005 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1006 self.server.head("<non-existent@example.com>")
1007 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1008
Antoine Pitrou2640b522012-02-15 18:53:18 +01001009 def test_head_file(self):
1010 f = io.BytesIO()
1011 resp, info = self.server.head(file=f)
1012 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1013 art_num, message_id, lines = info
1014 self.assertEqual(art_num, 3000237)
1015 self.assertEqual(message_id, "<45223423@example.com>")
1016 self.assertEqual(lines, [])
1017 data = f.getvalue()
1018 self.assertTrue(data.startswith(
1019 b'From: "Demo User" <nobody@example.net>\r\n'
1020 b'Subject: I am just a test article\r\n'
1021 ), ascii(data))
1022 self.assertFalse(data.endswith(
1023 b'This is just a test article.\r\n'
1024 b'.Here is a dot-starting line.\r\n'
1025 b'\r\n'
1026 b'-- Signed by Andr\xc3\xa9.\r\n'
1027 ), ascii(data))
1028
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001029 def test_body(self):
1030 # BODY
1031 resp, info = self.server.body()
1032 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1033 art_num, message_id, lines = info
1034 self.assertEqual(art_num, 3000237)
1035 self.assertEqual(message_id, "<45223423@example.com>")
1036 self._check_article_body(lines)
1037 # BODY num
1038 resp, info = self.server.body(3000234)
1039 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
1040 art_num, message_id, lines = info
1041 self.assertEqual(art_num, 3000234)
1042 self.assertEqual(message_id, "<45223423@example.com>")
1043 self._check_article_body(lines)
1044 # BODY id
1045 resp, info = self.server.body("<45223423@example.com>")
1046 self.assertEqual(resp, "222 0 <45223423@example.com>")
1047 art_num, message_id, lines = info
1048 self.assertEqual(art_num, 0)
1049 self.assertEqual(message_id, "<45223423@example.com>")
1050 self._check_article_body(lines)
1051 # Non-existent id
1052 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1053 self.server.body("<non-existent@example.com>")
1054 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1055
Antoine Pitrou2640b522012-02-15 18:53:18 +01001056 def test_body_file(self):
1057 f = io.BytesIO()
1058 resp, info = self.server.body(file=f)
1059 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1060 art_num, message_id, lines = info
1061 self.assertEqual(art_num, 3000237)
1062 self.assertEqual(message_id, "<45223423@example.com>")
1063 self.assertEqual(lines, [])
1064 data = f.getvalue()
1065 self.assertFalse(data.startswith(
1066 b'From: "Demo User" <nobody@example.net>\r\n'
1067 b'Subject: I am just a test article\r\n'
1068 ), ascii(data))
1069 self.assertTrue(data.endswith(
1070 b'This is just a test article.\r\n'
1071 b'.Here is a dot-starting line.\r\n'
1072 b'\r\n'
1073 b'-- Signed by Andr\xc3\xa9.\r\n'
1074 ), ascii(data))
1075
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001076 def check_over_xover_resp(self, resp, overviews):
1077 self.assertTrue(resp.startswith("224 "), resp)
1078 self.assertEqual(len(overviews), 3)
1079 art_num, over = overviews[0]
1080 self.assertEqual(art_num, 57)
1081 self.assertEqual(over, {
1082 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
1083 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
1084 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
1085 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
1086 "references": "<hvalf7$ort$1@dough.gmane.org>",
1087 ":bytes": "7103",
1088 ":lines": "16",
1089 "xref": "news.gmane.org gmane.comp.python.authors:57"
1090 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001091 art_num, over = overviews[1]
1092 self.assertEqual(over["xref"], None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001093 art_num, over = overviews[2]
1094 self.assertEqual(over["subject"],
1095 "Re: Message d'erreur incompréhensible (par moi)")
1096
1097 def test_xover(self):
1098 resp, overviews = self.server.xover(57, 59)
1099 self.check_over_xover_resp(resp, overviews)
1100
1101 def test_over(self):
1102 # In NNTP "v1", this will fallback on XOVER
1103 resp, overviews = self.server.over((57, 59))
1104 self.check_over_xover_resp(resp, overviews)
1105
1106 sample_post = (
1107 b'From: "Demo User" <nobody@example.net>\r\n'
1108 b'Subject: I am just a test article\r\n'
1109 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
1110 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
1111 b'\r\n'
1112 b'This is just a test article.\r\n'
1113 b'.Here is a dot-starting line.\r\n'
1114 b'\r\n'
1115 b'-- Signed by Andr\xc3\xa9.\r\n'
1116 )
1117
1118 def _check_posted_body(self):
1119 # Check the raw body as received by the server
1120 lines = self.handler.posted_body
1121 # One additional line for the "." terminator
1122 self.assertEqual(len(lines), 10)
1123 self.assertEqual(lines[-1], b'.\r\n')
1124 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
1125 self.assertEqual(lines[-3], b'\r\n')
1126 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
1127 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
1128
1129 def _check_post_ihave_sub(self, func, *args, file_factory):
1130 # First the prepared post with CRLF endings
1131 post = self.sample_post
1132 func_args = args + (file_factory(post),)
1133 self.handler.posted_body = None
1134 resp = func(*func_args)
1135 self._check_posted_body()
1136 # Then the same post with "normal" line endings - they should be
1137 # converted by NNTP.post and NNTP.ihave.
1138 post = self.sample_post.replace(b"\r\n", b"\n")
1139 func_args = args + (file_factory(post),)
1140 self.handler.posted_body = None
1141 resp = func(*func_args)
1142 self._check_posted_body()
1143 return resp
1144
1145 def check_post_ihave(self, func, success_resp, *args):
1146 # With a bytes object
1147 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1148 self.assertEqual(resp, success_resp)
1149 # With a bytearray object
1150 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1151 self.assertEqual(resp, success_resp)
1152 # With a file object
1153 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1154 self.assertEqual(resp, success_resp)
1155 # With an iterable of terminated lines
1156 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001157 return iter(b.splitlines(keepends=True))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001158 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1159 self.assertEqual(resp, success_resp)
1160 # With an iterable of non-terminated lines
1161 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001162 return iter(b.splitlines(keepends=False))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001163 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1164 self.assertEqual(resp, success_resp)
1165
1166 def test_post(self):
1167 self.check_post_ihave(self.server.post, "240 Article received OK")
1168 self.handler.allow_posting = False
1169 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1170 self.server.post(self.sample_post)
1171 self.assertEqual(cm.exception.response,
1172 "440 Posting not permitted")
1173
1174 def test_ihave(self):
1175 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1176 "<i.am.an.article.you.will.want@example.com>")
1177 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1178 self.server.ihave("<another.message.id>", self.sample_post)
1179 self.assertEqual(cm.exception.response,
1180 "435 Article not wanted")
1181
Georg Brandl28e78412013-10-27 07:29:47 +01001182 def test_too_long_lines(self):
1183 dt = datetime.datetime(2010, 1, 1, 9, 0, 0)
1184 self.assertRaises(nntplib.NNTPDataError,
1185 self.server.newnews, "comp.lang.python", dt)
1186
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001187
1188class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1189 """Tests an NNTP v1 server (no capabilities)."""
1190
1191 nntp_version = 1
1192 handler_class = NNTPv1Handler
1193
1194 def test_caps(self):
1195 caps = self.server.getcapabilities()
1196 self.assertEqual(caps, {})
1197 self.assertEqual(self.server.nntp_version, 1)
Antoine Pitroua0781152010-11-05 19:16:37 +00001198 self.assertEqual(self.server.nntp_implementation, None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001199
1200
1201class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1202 """Tests an NNTP v2 server (with capabilities)."""
1203
1204 nntp_version = 2
1205 handler_class = NNTPv2Handler
1206
1207 def test_caps(self):
1208 caps = self.server.getcapabilities()
1209 self.assertEqual(caps, {
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001210 'VERSION': ['2', '3'],
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001211 'IMPLEMENTATION': ['INN', '2.5.1'],
1212 'AUTHINFO': ['USER'],
1213 'HDR': [],
1214 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1215 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1216 'OVER': [],
1217 'POST': [],
1218 'READER': [],
1219 })
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001220 self.assertEqual(self.server.nntp_version, 3)
Antoine Pitroua0781152010-11-05 19:16:37 +00001221 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001222
1223
Antoine Pitrou54411c12012-02-12 19:14:17 +01001224class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
1225 """Tests a probably NNTP v2 server with capabilities only after login."""
1226
1227 nntp_version = 2
1228 handler_class = CapsAfterLoginNNTPv2Handler
1229
1230 def test_caps_only_after_login(self):
1231 self.assertEqual(self.server._caps, {})
1232 self.server.login('testuser', 'testpw')
1233 self.assertIn('VERSION', self.server._caps)
1234
1235
Antoine Pitrou71135622012-02-14 23:29:34 +01001236class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin,
1237 unittest.TestCase):
1238 """Same tests as for v2 but we tell NTTP to send MODE READER to a server
1239 that isn't in READER mode by default."""
1240
1241 nntp_version = 2
1242 handler_class = ModeSwitchingNNTPv2Handler
1243
1244 def test_we_are_in_reader_mode_after_connect(self):
1245 self.assertIn('READER', self.server._caps)
1246
1247
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001248class MiscTests(unittest.TestCase):
1249
1250 def test_decode_header(self):
1251 def gives(a, b):
1252 self.assertEqual(nntplib.decode_header(a), b)
1253 gives("" , "")
1254 gives("a plain header", "a plain header")
1255 gives(" with extra spaces ", " with extra spaces ")
1256 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "DĂ©buter en Python")
1257 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1258 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1259 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1260 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1261 "Re: problème de matrice")
1262 # A natively utf-8 header (found in the real world!)
1263 gives("Re: Message d'erreur incompréhensible (par moi)",
1264 "Re: Message d'erreur incompréhensible (par moi)")
1265
1266 def test_parse_overview_fmt(self):
1267 # The minimal (default) response
1268 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1269 "References:", ":bytes", ":lines"]
1270 self.assertEqual(nntplib._parse_overview_fmt(lines),
1271 ["subject", "from", "date", "message-id", "references",
1272 ":bytes", ":lines"])
1273 # The minimal response using alternative names
1274 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1275 "References:", "Bytes:", "Lines:"]
1276 self.assertEqual(nntplib._parse_overview_fmt(lines),
1277 ["subject", "from", "date", "message-id", "references",
1278 ":bytes", ":lines"])
1279 # Variations in casing
1280 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1281 "References:", "BYTES:", "Lines:"]
1282 self.assertEqual(nntplib._parse_overview_fmt(lines),
1283 ["subject", "from", "date", "message-id", "references",
1284 ":bytes", ":lines"])
1285 # First example from RFC 3977
1286 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1287 "References:", ":bytes", ":lines", "Xref:full",
1288 "Distribution:full"]
1289 self.assertEqual(nntplib._parse_overview_fmt(lines),
1290 ["subject", "from", "date", "message-id", "references",
1291 ":bytes", ":lines", "xref", "distribution"])
1292 # Second example from RFC 3977
1293 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1294 "References:", "Bytes:", "Lines:", "Xref:FULL",
1295 "Distribution:FULL"]
1296 self.assertEqual(nntplib._parse_overview_fmt(lines),
1297 ["subject", "from", "date", "message-id", "references",
1298 ":bytes", ":lines", "xref", "distribution"])
1299 # A classic response from INN
1300 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1301 "References:", "Bytes:", "Lines:", "Xref:full"]
1302 self.assertEqual(nntplib._parse_overview_fmt(lines),
1303 ["subject", "from", "date", "message-id", "references",
1304 ":bytes", ":lines", "xref"])
1305
1306 def test_parse_overview(self):
1307 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1308 # First example from RFC 3977
1309 lines = [
1310 '3000234\tI am just a test article\t"Demo User" '
1311 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1312 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1313 '17\tXref: news.example.com misc.test:3000363',
1314 ]
1315 overview = nntplib._parse_overview(lines, fmt)
1316 (art_num, fields), = overview
1317 self.assertEqual(art_num, 3000234)
1318 self.assertEqual(fields, {
1319 'subject': 'I am just a test article',
1320 'from': '"Demo User" <nobody@example.com>',
1321 'date': '6 Oct 1998 04:38:40 -0500',
1322 'message-id': '<45223423@example.com>',
1323 'references': '<45454@example.net>',
1324 ':bytes': '1234',
1325 ':lines': '17',
1326 'xref': 'news.example.com misc.test:3000363',
1327 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001328 # Second example; here the "Xref" field is totally absent (including
1329 # the header name) and comes out as None
1330 lines = [
1331 '3000234\tI am just a test article\t"Demo User" '
1332 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1333 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1334 '17\t\t',
1335 ]
1336 overview = nntplib._parse_overview(lines, fmt)
1337 (art_num, fields), = overview
1338 self.assertEqual(fields['xref'], None)
1339 # Third example; the "Xref" is an empty string, while "references"
1340 # is a single space.
1341 lines = [
1342 '3000234\tI am just a test article\t"Demo User" '
1343 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1344 '<45223423@example.com>\t \t1234\t'
1345 '17\tXref: \t',
1346 ]
1347 overview = nntplib._parse_overview(lines, fmt)
1348 (art_num, fields), = overview
1349 self.assertEqual(fields['references'], ' ')
1350 self.assertEqual(fields['xref'], '')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001351
1352 def test_parse_datetime(self):
1353 def gives(a, b, *c):
1354 self.assertEqual(nntplib._parse_datetime(a, b),
1355 datetime.datetime(*c))
1356 # Output of DATE command
1357 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1358 # Variations
1359 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1360 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1361 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1362
1363 def test_unparse_datetime(self):
1364 # Test non-legacy mode
1365 # 1) with a datetime
1366 def gives(y, M, d, h, m, s, date_str, time_str):
1367 dt = datetime.datetime(y, M, d, h, m, s)
1368 self.assertEqual(nntplib._unparse_datetime(dt),
1369 (date_str, time_str))
1370 self.assertEqual(nntplib._unparse_datetime(dt, False),
1371 (date_str, time_str))
1372 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1373 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1374 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1375 # 2) with a date
1376 def gives(y, M, d, date_str, time_str):
1377 dt = datetime.date(y, M, d)
1378 self.assertEqual(nntplib._unparse_datetime(dt),
1379 (date_str, time_str))
1380 self.assertEqual(nntplib._unparse_datetime(dt, False),
1381 (date_str, time_str))
1382 gives(1999, 6, 23, "19990623", "000000")
1383 gives(2000, 6, 23, "20000623", "000000")
1384 gives(2010, 6, 5, "20100605", "000000")
1385
1386 def test_unparse_datetime_legacy(self):
1387 # Test legacy mode (RFC 977)
1388 # 1) with a datetime
1389 def gives(y, M, d, h, m, s, date_str, time_str):
1390 dt = datetime.datetime(y, M, d, h, m, s)
1391 self.assertEqual(nntplib._unparse_datetime(dt, True),
1392 (date_str, time_str))
1393 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1394 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1395 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1396 # 2) with a date
1397 def gives(y, M, d, date_str, time_str):
1398 dt = datetime.date(y, M, d)
1399 self.assertEqual(nntplib._unparse_datetime(dt, True),
1400 (date_str, time_str))
1401 gives(1999, 6, 23, "990623", "000000")
1402 gives(2000, 6, 23, "000623", "000000")
1403 gives(2010, 6, 5, "100605", "000000")
1404
Serhiy Storchaka43767632013-11-03 21:31:38 +02001405 @unittest.skipUnless(ssl, 'requires SSL support')
1406 def test_ssl_support(self):
1407 self.assertTrue(hasattr(nntplib, 'NNTP_SSL'))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001408
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001409
Berker Peksag96756b62014-09-20 08:53:05 +03001410class PublicAPITests(unittest.TestCase):
1411 """Ensures that the correct values are exposed in the public API."""
1412
1413 def test_module_all_attribute(self):
1414 self.assertTrue(hasattr(nntplib, '__all__'))
1415 target_api = ['NNTP', 'NNTPError', 'NNTPReplyError',
1416 'NNTPTemporaryError', 'NNTPPermanentError',
1417 'NNTPProtocolError', 'NNTPDataError', 'decode_header']
1418 if ssl is not None:
1419 target_api.append('NNTP_SSL')
1420 self.assertEqual(set(nntplib.__all__), set(target_api))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001421
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001422class MockSocketTests(unittest.TestCase):
1423 """Tests involving a mock socket object
1424
1425 Used where the _NNTPServerIO file object is not enough."""
1426
1427 nntp_class = nntplib.NNTP
1428
1429 def check_constructor_error_conditions(
1430 self, handler_class,
1431 expected_error_type, expected_error_msg,
1432 login=None, password=None):
1433
1434 class mock_socket_module:
1435 def create_connection(address, timeout):
1436 return MockSocket()
1437
1438 class MockSocket:
1439 def close(self):
1440 nonlocal socket_closed
1441 socket_closed = True
1442
1443 def makefile(socket, mode):
1444 handler = handler_class()
1445 _, file = make_mock_file(handler)
1446 files.append(file)
1447 return file
1448
1449 socket_closed = False
1450 files = []
1451 with patch('nntplib.socket', mock_socket_module), \
1452 self.assertRaisesRegex(expected_error_type, expected_error_msg):
1453 self.nntp_class('dummy', user=login, password=password)
1454 self.assertTrue(socket_closed)
1455 for f in files:
1456 self.assertTrue(f.closed)
1457
1458 def test_bad_welcome(self):
1459 #Test a bad welcome message
1460 class Handler(NNTPv1Handler):
1461 welcome = 'Bad Welcome'
1462 self.check_constructor_error_conditions(
1463 Handler, nntplib.NNTPProtocolError, Handler.welcome)
1464
1465 def test_service_temporarily_unavailable(self):
1466 #Test service temporarily unavailable
1467 class Handler(NNTPv1Handler):
Martin Pantereb995702016-07-28 01:11:04 +00001468 welcome = '400 Service temporarily unavailable'
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001469 self.check_constructor_error_conditions(
1470 Handler, nntplib.NNTPTemporaryError, Handler.welcome)
1471
1472 def test_service_permanently_unavailable(self):
1473 #Test service permanently unavailable
1474 class Handler(NNTPv1Handler):
Martin Pantereb995702016-07-28 01:11:04 +00001475 welcome = '502 Service permanently unavailable'
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001476 self.check_constructor_error_conditions(
1477 Handler, nntplib.NNTPPermanentError, Handler.welcome)
1478
1479 def test_bad_capabilities(self):
1480 #Test a bad capabilities response
1481 class Handler(NNTPv1Handler):
1482 def handle_CAPABILITIES(self):
1483 self.push_lit(capabilities_response)
1484 capabilities_response = '201 bad capability'
1485 self.check_constructor_error_conditions(
1486 Handler, nntplib.NNTPReplyError, capabilities_response)
1487
1488 def test_login_aborted(self):
1489 #Test a bad authinfo response
1490 login = 't@e.com'
1491 password = 'python'
1492 class Handler(NNTPv1Handler):
1493 def handle_AUTHINFO(self, *args):
1494 self.push_lit(authinfo_response)
1495 authinfo_response = '503 Mechanism not recognized'
1496 self.check_constructor_error_conditions(
1497 Handler, nntplib.NNTPPermanentError, authinfo_response,
1498 login, password)
1499
Serhiy Storchaka80774342015-04-03 15:02:20 +03001500class bypass_context:
1501 """Bypass encryption and actual SSL module"""
1502 def wrap_socket(sock, **args):
1503 return sock
1504
1505@unittest.skipUnless(ssl, 'requires SSL support')
1506class MockSslTests(MockSocketTests):
1507 @staticmethod
1508 def nntp_class(*pos, **kw):
1509 return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw)
Victor Stinner8c9bba02015-04-03 11:06:40 +02001510
Martin Panter8f19e8e2016-01-19 01:10:58 +00001511@unittest.skipUnless(threading, 'requires multithreading')
1512class LocalServerTests(unittest.TestCase):
1513 def setUp(self):
1514 sock = socket.socket()
1515 port = support.bind_port(sock)
1516 sock.listen()
1517 self.background = threading.Thread(
1518 target=self.run_server, args=(sock,))
1519 self.background.start()
1520 self.addCleanup(self.background.join)
1521
1522 self.nntp = NNTP(support.HOST, port, usenetrc=False).__enter__()
1523 self.addCleanup(self.nntp.__exit__, None, None, None)
1524
1525 def run_server(self, sock):
1526 # Could be generalized to handle more commands in separate methods
1527 with sock:
1528 [client, _] = sock.accept()
1529 with contextlib.ExitStack() as cleanup:
1530 cleanup.enter_context(client)
1531 reader = cleanup.enter_context(client.makefile('rb'))
1532 client.sendall(b'200 Server ready\r\n')
1533 while True:
1534 cmd = reader.readline()
1535 if cmd == b'CAPABILITIES\r\n':
1536 client.sendall(
1537 b'101 Capability list:\r\n'
1538 b'VERSION 2\r\n'
1539 b'STARTTLS\r\n'
1540 b'.\r\n'
1541 )
1542 elif cmd == b'STARTTLS\r\n':
1543 reader.close()
1544 client.sendall(b'382 Begin TLS negotiation now\r\n')
1545 client = ssl.wrap_socket(
1546 client, server_side=True, certfile=certfile)
1547 cleanup.enter_context(client)
1548 reader = cleanup.enter_context(client.makefile('rb'))
1549 elif cmd == b'QUIT\r\n':
1550 client.sendall(b'205 Bye!\r\n')
1551 break
1552 else:
1553 raise ValueError('Unexpected command {!r}'.format(cmd))
1554
1555 @unittest.skipUnless(ssl, 'requires SSL support')
1556 def test_starttls(self):
1557 file = self.nntp.file
1558 sock = self.nntp.sock
1559 self.nntp.starttls()
1560 # Check that the socket and internal pseudo-file really were
1561 # changed.
1562 self.assertNotEqual(file, self.nntp.file)
1563 self.assertNotEqual(sock, self.nntp.sock)
1564 # Check that the new socket really is an SSL one
1565 self.assertIsInstance(self.nntp.sock, ssl.SSLSocket)
1566 # Check that trying starttls when it's already active fails.
1567 self.assertRaises(ValueError, self.nntp.starttls)
1568
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001569
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001570if __name__ == "__main__":
Berker Peksag96756b62014-09-20 08:53:05 +03001571 unittest.main()