blob: fdd76f9e9b3559c3a60a585c21005b54128d0b6d [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +00002import socket
Antoine Pitrou69ab9512010-09-29 15:03:40 +00003import datetime
4import textwrap
5import unittest
Antoine Pitroude609182010-11-18 17:29:23 +00006import functools
Antoine Pitrou69ab9512010-09-29 15:03:40 +00007import contextlib
Martin Panter8f19e8e2016-01-19 01:10:58 +00008import os.path
Gregory P. Smith2cc02232019-05-06 17:54:06 -04009import re
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020010import threading
11
Antoine Pitrou69ab9512010-09-29 15:03:40 +000012from test import support
Serhiy Storchaka43767632013-11-03 21:31:38 +020013from nntplib import NNTP, GroupInfo
Antoine Pitrou69ab9512010-09-29 15:03:40 +000014import nntplib
Serhiy Storchaka52027c32015-03-21 09:40:26 +020015from unittest.mock import patch
Serhiy Storchaka43767632013-11-03 21:31:38 +020016try:
Antoine Pitrou1cb121e2010-11-09 18:54:37 +000017 import ssl
Serhiy Storchaka43767632013-11-03 21:31:38 +020018except ImportError:
19 ssl = None
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020020
Antoine Pitrou69ab9512010-09-29 15:03:40 +000021
Martin Panter8f19e8e2016-01-19 01:10:58 +000022certfile = os.path.join(os.path.dirname(__file__), 'keycert3.pem')
Antoine Pitrou69ab9512010-09-29 15:03:40 +000023
Gregory P. Smith2cc02232019-05-06 17:54:06 -040024if ssl is not None:
25 SSLError = ssl.SSLError
26else:
27 class SSLError(Exception):
28 """Non-existent exception class when we lack SSL support."""
29 reason = "This will never be raised."
30
Antoine Pitrou69ab9512010-09-29 15:03:40 +000031# TODO:
32# - test the `file` arg to more commands
33# - test error conditions
Antoine Pitroua5785b12010-09-29 16:19:50 +000034# - test auth and `usenetrc`
Antoine Pitrou69ab9512010-09-29 15:03:40 +000035
36
37class NetworkedNNTPTestsMixin:
38
39 def test_welcome(self):
40 welcome = self.server.getwelcome()
41 self.assertEqual(str, type(welcome))
42
43 def test_help(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000044 resp, lines = self.server.help()
Antoine Pitrou69ab9512010-09-29 15:03:40 +000045 self.assertTrue(resp.startswith("100 "), resp)
Antoine Pitrou08eeada2010-11-04 21:36:15 +000046 for line in lines:
Antoine Pitrou69ab9512010-09-29 15:03:40 +000047 self.assertEqual(str, type(line))
48
49 def test_list(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000050 resp, groups = self.server.list()
51 if len(groups) > 0:
52 self.assertEqual(GroupInfo, type(groups[0]))
53 self.assertEqual(str, type(groups[0].group))
54
55 def test_list_active(self):
56 resp, groups = self.server.list(self.GROUP_PAT)
57 if len(groups) > 0:
58 self.assertEqual(GroupInfo, type(groups[0]))
59 self.assertEqual(str, type(groups[0].group))
Antoine Pitrou69ab9512010-09-29 15:03:40 +000060
61 def test_unknown_command(self):
62 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
63 self.server._shortcmd("XYZZY")
64 resp = cm.exception.response
65 self.assertTrue(resp.startswith("500 "), resp)
66
67 def test_newgroups(self):
68 # gmane gets a constant influx of new groups. In order not to stress
69 # the server too much, we choose a recent date in the past.
70 dt = datetime.date.today() - datetime.timedelta(days=7)
71 resp, groups = self.server.newgroups(dt)
72 if len(groups) > 0:
73 self.assertIsInstance(groups[0], GroupInfo)
74 self.assertIsInstance(groups[0].group, str)
75
76 def test_description(self):
77 def _check_desc(desc):
78 # Sanity checks
79 self.assertIsInstance(desc, str)
80 self.assertNotIn(self.GROUP_NAME, desc)
81 desc = self.server.description(self.GROUP_NAME)
82 _check_desc(desc)
83 # Another sanity check
84 self.assertIn("Python", desc)
85 # With a pattern
86 desc = self.server.description(self.GROUP_PAT)
87 _check_desc(desc)
88 # Shouldn't exist
89 desc = self.server.description("zk.brrtt.baz")
90 self.assertEqual(desc, '')
91
92 def test_descriptions(self):
93 resp, descs = self.server.descriptions(self.GROUP_PAT)
94 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
95 self.assertTrue(
96 resp.startswith("215 ") or resp.startswith("282 "), resp)
97 self.assertIsInstance(descs, dict)
98 desc = descs[self.GROUP_NAME]
99 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
100
101 def test_group(self):
102 result = self.server.group(self.GROUP_NAME)
103 self.assertEqual(5, len(result))
104 resp, count, first, last, group = result
105 self.assertEqual(group, self.GROUP_NAME)
106 self.assertIsInstance(count, int)
107 self.assertIsInstance(first, int)
108 self.assertIsInstance(last, int)
109 self.assertLessEqual(first, last)
110 self.assertTrue(resp.startswith("211 "), resp)
111
112 def test_date(self):
113 resp, date = self.server.date()
114 self.assertIsInstance(date, datetime.datetime)
115 # Sanity check
116 self.assertGreaterEqual(date.year, 1995)
117 self.assertLessEqual(date.year, 2030)
118
119 def _check_art_dict(self, art_dict):
120 # Some sanity checks for a field dictionary returned by OVER / XOVER
121 self.assertIsInstance(art_dict, dict)
122 # NNTP has 7 mandatory fields
123 self.assertGreaterEqual(art_dict.keys(),
124 {"subject", "from", "date", "message-id",
125 "references", ":bytes", ":lines"}
126 )
127 for v in art_dict.values():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000128 self.assertIsInstance(v, (str, type(None)))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000129
130 def test_xover(self):
131 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000132 resp, lines = self.server.xover(last - 5, last)
133 if len(lines) == 0:
134 self.skipTest("no articles retrieved")
135 # The 'last' article is not necessarily part of the output (cancelled?)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000136 art_num, art_dict = lines[0]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000137 self.assertGreaterEqual(art_num, last - 5)
138 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000139 self._check_art_dict(art_dict)
140
Xavier de Gayeac13bee2016-12-16 20:49:10 +0100141 @unittest.skipIf(True, 'temporarily skipped until a permanent solution'
142 ' is found for issue #28971')
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000143 def test_over(self):
144 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
145 start = last - 10
146 # The "start-" article range form
147 resp, lines = self.server.over((start, None))
148 art_num, art_dict = lines[0]
149 self._check_art_dict(art_dict)
150 # The "start-end" article range form
151 resp, lines = self.server.over((start, last))
152 art_num, art_dict = lines[-1]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000153 # The 'last' article is not necessarily part of the output (cancelled?)
154 self.assertGreaterEqual(art_num, start)
155 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000156 self._check_art_dict(art_dict)
157 # XXX The "message_id" form is unsupported by gmane
158 # 503 Overview by message-ID unsupported
159
160 def test_xhdr(self):
161 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
162 resp, lines = self.server.xhdr('subject', last)
163 for line in lines:
164 self.assertEqual(str, type(line[1]))
165
166 def check_article_resp(self, resp, article, art_num=None):
167 self.assertIsInstance(article, nntplib.ArticleInfo)
168 if art_num is not None:
169 self.assertEqual(article.number, art_num)
170 for line in article.lines:
171 self.assertIsInstance(line, bytes)
172 # XXX this could exceptionally happen...
173 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
174
Victor Stinner706cb312017-11-25 02:42:18 +0100175 @unittest.skipIf(True, "FIXME: see bpo-32128")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000176 def test_article_head_body(self):
177 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000178 # Try to find an available article
179 for art_num in (last, first, last - 1):
180 try:
181 resp, head = self.server.head(art_num)
182 except nntplib.NNTPTemporaryError as e:
183 if not e.response.startswith("423 "):
184 raise
185 # "423 No such article" => choose another one
186 continue
187 break
188 else:
189 self.skipTest("could not find a suitable article number")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000190 self.assertTrue(resp.startswith("221 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000191 self.check_article_resp(resp, head, art_num)
192 resp, body = self.server.body(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000193 self.assertTrue(resp.startswith("222 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000194 self.check_article_resp(resp, body, art_num)
195 resp, article = self.server.article(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000196 self.assertTrue(resp.startswith("220 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000197 self.check_article_resp(resp, article, art_num)
Nick Coghlan14d99a12012-06-17 21:27:18 +1000198 # Tolerate running the tests from behind a NNTP virus checker
Antoine Pitrou1f5d2a02012-06-24 16:28:18 +0200199 blacklist = lambda line: line.startswith(b'X-Antivirus')
200 filtered_head_lines = [line for line in head.lines
201 if not blacklist(line)]
Nick Coghlan14d99a12012-06-17 21:27:18 +1000202 filtered_lines = [line for line in article.lines
Antoine Pitrou1f5d2a02012-06-24 16:28:18 +0200203 if not blacklist(line)]
204 self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000205
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000206 def test_capabilities(self):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000207 # The server under test implements NNTP version 2 and has a
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000208 # couple of well-known capabilities. Just sanity check that we
209 # got them.
210 def _check_caps(caps):
211 caps_list = caps['LIST']
212 self.assertIsInstance(caps_list, (list, tuple))
213 self.assertIn('OVERVIEW.FMT', caps_list)
214 self.assertGreaterEqual(self.server.nntp_version, 2)
215 _check_caps(self.server.getcapabilities())
216 # This re-emits the command
217 resp, caps = self.server.capabilities()
218 _check_caps(caps)
219
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000220 def test_zlogin(self):
221 # This test must be the penultimate because further commands will be
222 # refused.
223 baduser = "notarealuser"
224 badpw = "notarealpassword"
225 # Check that bogus credentials cause failure
226 self.assertRaises(nntplib.NNTPError, self.server.login,
227 user=baduser, password=badpw, usenetrc=False)
228 # FIXME: We should check that correct credentials succeed, but that
229 # would require valid details for some server somewhere to be in the
230 # test suite, I think. Gmane is anonymous, at least as used for the
231 # other tests.
232
233 def test_zzquit(self):
234 # This test must be called last, hence the name
235 cls = type(self)
Antoine Pitrou3bce11c2010-11-21 17:14:19 +0000236 try:
237 self.server.quit()
238 finally:
239 cls.server = None
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000240
Antoine Pitroude609182010-11-18 17:29:23 +0000241 @classmethod
242 def wrap_methods(cls):
243 # Wrap all methods in a transient_internet() exception catcher
244 # XXX put a generic version in test.support?
245 def wrap_meth(meth):
246 @functools.wraps(meth)
247 def wrapped(self):
248 with support.transient_internet(self.NNTP_HOST):
249 meth(self)
250 return wrapped
251 for name in dir(cls):
252 if not name.startswith('test_'):
253 continue
254 meth = getattr(cls, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200255 if not callable(meth):
Antoine Pitroude609182010-11-18 17:29:23 +0000256 continue
257 # Need to use a closure so that meth remains bound to its current
258 # value
259 setattr(cls, name, wrap_meth(meth))
260
Dong-hee Na1b335ae2020-01-12 02:39:15 +0900261 def test_timeout(self):
262 with self.assertRaises(ValueError):
263 self.NNTP_CLASS(self.NNTP_HOST, timeout=0, usenetrc=False)
264
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000265 def test_with_statement(self):
266 def is_connected():
267 if not hasattr(server, 'file'):
268 return False
269 try:
270 server.help()
Andrew Svetlov0832af62012-12-18 23:10:48 +0200271 except (OSError, EOFError):
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000272 return False
273 return True
274
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400275 try:
Victor Stinner1d0f9b32019-12-10 22:09:23 +0100276 server = self.NNTP_CLASS(self.NNTP_HOST,
277 timeout=support.INTERNET_TIMEOUT,
278 usenetrc=False)
279 with server:
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400280 self.assertTrue(is_connected())
281 self.assertTrue(server.help())
282 self.assertFalse(is_connected())
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000283
Victor Stinner1d0f9b32019-12-10 22:09:23 +0100284 server = self.NNTP_CLASS(self.NNTP_HOST,
285 timeout=support.INTERNET_TIMEOUT,
286 usenetrc=False)
287 with server:
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400288 server.quit()
289 self.assertFalse(is_connected())
290 except SSLError as ssl_err:
291 # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
292 if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
293 raise unittest.SkipTest(f"Got {ssl_err} connecting "
294 f"to {self.NNTP_HOST!r}")
295 raise
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000296
297
Antoine Pitroude609182010-11-18 17:29:23 +0000298NetworkedNNTPTestsMixin.wrap_methods()
299
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000300
INADA Naoki067931d2017-07-26 23:43:22 +0900301EOF_ERRORS = (EOFError,)
Victor Stinner5b4feb72017-07-24 17:41:02 +0200302if ssl is not None:
INADA Naoki067931d2017-07-26 23:43:22 +0900303 EOF_ERRORS += (ssl.SSLEOFError,)
Victor Stinner5b4feb72017-07-24 17:41:02 +0200304
305
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000306class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
307 # This server supports STARTTLS (gmane doesn't)
308 NNTP_HOST = 'news.trigofacile.com'
309 GROUP_NAME = 'fr.comp.lang.python'
310 GROUP_PAT = 'fr.comp.lang.*'
311
Antoine Pitroude609182010-11-18 17:29:23 +0000312 NNTP_CLASS = NNTP
313
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000314 @classmethod
315 def setUpClass(cls):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000316 support.requires("network")
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000317 with support.transient_internet(cls.NNTP_HOST):
Victor Stinner5bccca52017-04-27 17:30:13 +0200318 try:
Victor Stinner1d0f9b32019-12-10 22:09:23 +0100319 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST,
320 timeout=support.INTERNET_TIMEOUT,
Victor Stinner5bccca52017-04-27 17:30:13 +0200321 usenetrc=False)
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400322 except SSLError as ssl_err:
323 # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
324 if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
325 raise unittest.SkipTest(f"{cls} got {ssl_err} connecting "
326 f"to {cls.NNTP_HOST!r}")
327 raise
Victor Stinner5b4feb72017-07-24 17:41:02 +0200328 except EOF_ERRORS:
Victor Stinner5bccca52017-04-27 17:30:13 +0200329 raise unittest.SkipTest(f"{cls} got EOF error on connecting "
330 f"to {cls.NNTP_HOST!r}")
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000331
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000332 @classmethod
333 def tearDownClass(cls):
334 if cls.server is not None:
335 cls.server.quit()
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000336
Serhiy Storchaka43767632013-11-03 21:31:38 +0200337@unittest.skipUnless(ssl, 'requires SSL support')
338class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000339
Serhiy Storchaka43767632013-11-03 21:31:38 +0200340 # Technical limits for this public NNTP server (see http://www.aioe.org):
341 # "Only two concurrent connections per IP address are allowed and
342 # 400 connections per day are accepted from each IP address."
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000343
Serhiy Storchaka43767632013-11-03 21:31:38 +0200344 NNTP_HOST = 'nntp.aioe.org'
345 GROUP_NAME = 'comp.lang.python'
346 GROUP_PAT = 'comp.lang.*'
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000347
Serhiy Storchaka43767632013-11-03 21:31:38 +0200348 NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None)
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000349
Serhiy Storchaka43767632013-11-03 21:31:38 +0200350 # Disabled as it produces too much data
351 test_list = None
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000352
Serhiy Storchaka43767632013-11-03 21:31:38 +0200353 # Disabled as the connection will already be encrypted.
354 test_starttls = None
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000355
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000356
357#
358# Non-networked tests using a local server (or something mocking it).
359#
360
361class _NNTPServerIO(io.RawIOBase):
362 """A raw IO object allowing NNTP commands to be received and processed
363 by a handler. The handler can push responses which can then be read
364 from the IO object."""
365
366 def __init__(self, handler):
367 io.RawIOBase.__init__(self)
368 # The channel from the client
369 self.c2s = io.BytesIO()
370 # The channel to the client
371 self.s2c = io.BytesIO()
372 self.handler = handler
373 self.handler.start(self.c2s.readline, self.push_data)
374
375 def readable(self):
376 return True
377
378 def writable(self):
379 return True
380
381 def push_data(self, data):
382 """Push (buffer) some data to send to the client."""
383 pos = self.s2c.tell()
384 self.s2c.seek(0, 2)
385 self.s2c.write(data)
386 self.s2c.seek(pos)
387
388 def write(self, b):
389 """The client sends us some data"""
390 pos = self.c2s.tell()
391 self.c2s.write(b)
392 self.c2s.seek(pos)
393 self.handler.process_pending()
394 return len(b)
395
396 def readinto(self, buf):
397 """The client wants to read a response"""
398 self.handler.process_pending()
399 b = self.s2c.read(len(buf))
400 n = len(b)
401 buf[:n] = b
402 return n
403
404
Serhiy Storchaka52027c32015-03-21 09:40:26 +0200405def make_mock_file(handler):
406 sio = _NNTPServerIO(handler)
407 # Using BufferedRWPair instead of BufferedRandom ensures the file
408 # isn't seekable.
409 file = io.BufferedRWPair(sio, sio)
410 return (sio, file)
411
412
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000413class MockedNNTPTestsMixin:
414 # Override in derived classes
415 handler_class = None
416
417 def setUp(self):
418 super().setUp()
419 self.make_server()
420
421 def tearDown(self):
422 super().tearDown()
423 del self.server
424
425 def make_server(self, *args, **kwargs):
426 self.handler = self.handler_class()
Serhiy Storchaka52027c32015-03-21 09:40:26 +0200427 self.sio, file = make_mock_file(self.handler)
Antoine Pitroua5785b12010-09-29 16:19:50 +0000428 self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000429 return self.server
430
431
Antoine Pitrou71135622012-02-14 23:29:34 +0100432class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin):
433 def setUp(self):
434 super().setUp()
435 self.make_server(readermode=True)
436
437
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000438class NNTPv1Handler:
439 """A handler for RFC 977"""
440
441 welcome = "200 NNTP mock server"
442
443 def start(self, readline, push_data):
444 self.in_body = False
445 self.allow_posting = True
446 self._readline = readline
447 self._push_data = push_data
Antoine Pitrou54411c12012-02-12 19:14:17 +0100448 self._logged_in = False
449 self._user_sent = False
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000450 # Our welcome
451 self.handle_welcome()
452
453 def _decode(self, data):
454 return str(data, "utf-8", "surrogateescape")
455
456 def process_pending(self):
457 if self.in_body:
458 while True:
459 line = self._readline()
460 if not line:
461 return
462 self.body.append(line)
463 if line == b".\r\n":
464 break
465 try:
466 meth, tokens = self.body_callback
467 meth(*tokens, body=self.body)
468 finally:
469 self.body_callback = None
470 self.body = None
471 self.in_body = False
472 while True:
473 line = self._decode(self._readline())
474 if not line:
475 return
476 if not line.endswith("\r\n"):
477 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
478 line = line[:-2]
479 cmd, *tokens = line.split()
480 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
481 meth = getattr(self, "handle_" + cmd.upper(), None)
482 if meth is None:
483 self.handle_unknown()
484 else:
485 try:
486 meth(*tokens)
487 except Exception as e:
488 raise ValueError("command failed: {!r}".format(line)) from e
489 else:
490 if self.in_body:
491 self.body_callback = meth, tokens
492 self.body = []
493
494 def expect_body(self):
495 """Flag that the client is expected to post a request body"""
496 self.in_body = True
497
498 def push_data(self, data):
499 """Push some binary data"""
500 self._push_data(data)
501
502 def push_lit(self, lit):
503 """Push a string literal"""
504 lit = textwrap.dedent(lit)
505 lit = "\r\n".join(lit.splitlines()) + "\r\n"
506 lit = lit.encode('utf-8')
507 self.push_data(lit)
508
509 def handle_unknown(self):
510 self.push_lit("500 What?")
511
512 def handle_welcome(self):
513 self.push_lit(self.welcome)
514
515 def handle_QUIT(self):
516 self.push_lit("205 Bye!")
517
518 def handle_DATE(self):
519 self.push_lit("111 20100914001155")
520
521 def handle_GROUP(self, group):
522 if group == "fr.comp.lang.python":
523 self.push_lit("211 486 761 1265 fr.comp.lang.python")
524 else:
525 self.push_lit("411 No such group {}".format(group))
526
527 def handle_HELP(self):
528 self.push_lit("""\
529 100 Legal commands
530 authinfo user Name|pass Password|generic <prog> <args>
531 date
532 help
533 Report problems to <root@example.org>
534 .""")
535
536 def handle_STAT(self, message_spec=None):
537 if message_spec is None:
538 self.push_lit("412 No newsgroup selected")
539 elif message_spec == "3000234":
540 self.push_lit("223 3000234 <45223423@example.com>")
541 elif message_spec == "<45223423@example.com>":
542 self.push_lit("223 0 <45223423@example.com>")
543 else:
544 self.push_lit("430 No Such Article Found")
545
546 def handle_NEXT(self):
547 self.push_lit("223 3000237 <668929@example.org> retrieved")
548
549 def handle_LAST(self):
550 self.push_lit("223 3000234 <45223423@example.com> retrieved")
551
552 def handle_LIST(self, action=None, param=None):
553 if action is None:
554 self.push_lit("""\
555 215 Newsgroups in form "group high low flags".
556 comp.lang.python 0000052340 0000002828 y
557 comp.lang.python.announce 0000001153 0000000993 m
558 free.it.comp.lang.python 0000000002 0000000002 y
559 fr.comp.lang.python 0000001254 0000000760 y
560 free.it.comp.lang.python.learner 0000000000 0000000001 y
561 tw.bbs.comp.lang.python 0000000304 0000000304 y
562 .""")
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000563 elif action == "ACTIVE":
564 if param == "*distutils*":
565 self.push_lit("""\
566 215 Newsgroups in form "group high low flags"
567 gmane.comp.python.distutils.devel 0000014104 0000000001 m
568 gmane.comp.python.distutils.cvs 0000000000 0000000001 m
569 .""")
570 else:
571 self.push_lit("""\
572 215 Newsgroups in form "group high low flags"
573 .""")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000574 elif action == "OVERVIEW.FMT":
575 self.push_lit("""\
576 215 Order of fields in overview database.
577 Subject:
578 From:
579 Date:
580 Message-ID:
581 References:
582 Bytes:
583 Lines:
584 Xref:full
585 .""")
586 elif action == "NEWSGROUPS":
587 assert param is not None
588 if param == "comp.lang.python":
589 self.push_lit("""\
590 215 Descriptions in form "group description".
591 comp.lang.python\tThe Python computer language.
592 .""")
593 elif param == "comp.lang.python*":
594 self.push_lit("""\
595 215 Descriptions in form "group description".
596 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
597 comp.lang.python\tThe Python computer language.
598 .""")
599 else:
600 self.push_lit("""\
601 215 Descriptions in form "group description".
602 .""")
603 else:
604 self.push_lit('501 Unknown LIST keyword')
605
606 def handle_NEWNEWS(self, group, date_str, time_str):
607 # We hard code different return messages depending on passed
608 # argument and date syntax.
609 if (group == "comp.lang.python" and date_str == "20100913"
610 and time_str == "082004"):
611 # Date was passed in RFC 3977 format (NNTP "v2")
612 self.push_lit("""\
613 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
614 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
615 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
616 .""")
617 elif (group == "comp.lang.python" and date_str == "100913"
618 and time_str == "082004"):
619 # Date was passed in RFC 977 format (NNTP "v1")
620 self.push_lit("""\
621 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
622 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
623 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
624 .""")
Georg Brandl28e78412013-10-27 07:29:47 +0100625 elif (group == 'comp.lang.python' and
626 date_str in ('20100101', '100101') and
627 time_str == '090000'):
628 self.push_lit('too long line' * 3000 +
629 '\n.')
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000630 else:
631 self.push_lit("""\
632 230 An empty list of newsarticles follows
633 .""")
634 # (Note for experiments: many servers disable NEWNEWS.
635 # As of this writing, sicinfo3.epfl.ch doesn't.)
636
637 def handle_XOVER(self, message_spec):
638 if message_spec == "57-59":
639 self.push_lit(
640 "224 Overview information for 57-58 follows\n"
641 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
642 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
643 "\tSat, 19 Jun 2010 18:04:08 -0400"
644 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
645 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
Dong-hee Na2e6a8ef2020-01-09 00:29:34 +0900646 "\tXref: news.gmane.io gmane.comp.python.authors:57"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000647 "\n"
648 "58\tLooking for a few good bloggers"
649 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
650 "\tThu, 22 Jul 2010 09:14:14 -0400"
651 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
652 "\t\t6683\t16"
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000653 "\t"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000654 "\n"
Martin Panter6245cb32016-04-15 02:14:19 +0000655 # A UTF-8 overview line from fr.comp.lang.python
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000656 "59\tRe: Message d'erreur incompréhensible (par moi)"
657 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
658 "\tWed, 15 Sep 2010 18:09:15 +0200"
659 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
660 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
661 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
662 "\n"
663 ".\n")
664 else:
665 self.push_lit("""\
666 224 No articles
667 .""")
668
669 def handle_POST(self, *, body=None):
670 if body is None:
671 if self.allow_posting:
672 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
673 self.expect_body()
674 else:
675 self.push_lit("440 Posting not permitted")
676 else:
677 assert self.allow_posting
678 self.push_lit("240 Article received OK")
679 self.posted_body = body
680
681 def handle_IHAVE(self, message_id, *, body=None):
682 if body is None:
683 if (self.allow_posting and
684 message_id == "<i.am.an.article.you.will.want@example.com>"):
685 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
686 self.expect_body()
687 else:
688 self.push_lit("435 Article not wanted")
689 else:
690 assert self.allow_posting
691 self.push_lit("235 Article transferred OK")
692 self.posted_body = body
693
694 sample_head = """\
695 From: "Demo User" <nobody@example.net>
696 Subject: I am just a test article
697 Content-Type: text/plain; charset=UTF-8; format=flowed
698 Message-ID: <i.am.an.article.you.will.want@example.com>"""
699
700 sample_body = """\
701 This is just a test article.
702 ..Here is a dot-starting line.
703
704 -- Signed by Andr\xe9."""
705
706 sample_article = sample_head + "\n\n" + sample_body
707
708 def handle_ARTICLE(self, message_spec=None):
709 if message_spec is None:
710 self.push_lit("220 3000237 <45223423@example.com>")
711 elif message_spec == "<45223423@example.com>":
712 self.push_lit("220 0 <45223423@example.com>")
713 elif message_spec == "3000234":
714 self.push_lit("220 3000234 <45223423@example.com>")
715 else:
716 self.push_lit("430 No Such Article Found")
717 return
718 self.push_lit(self.sample_article)
719 self.push_lit(".")
720
721 def handle_HEAD(self, message_spec=None):
722 if message_spec is None:
723 self.push_lit("221 3000237 <45223423@example.com>")
724 elif message_spec == "<45223423@example.com>":
725 self.push_lit("221 0 <45223423@example.com>")
726 elif message_spec == "3000234":
727 self.push_lit("221 3000234 <45223423@example.com>")
728 else:
729 self.push_lit("430 No Such Article Found")
730 return
731 self.push_lit(self.sample_head)
732 self.push_lit(".")
733
734 def handle_BODY(self, message_spec=None):
735 if message_spec is None:
736 self.push_lit("222 3000237 <45223423@example.com>")
737 elif message_spec == "<45223423@example.com>":
738 self.push_lit("222 0 <45223423@example.com>")
739 elif message_spec == "3000234":
740 self.push_lit("222 3000234 <45223423@example.com>")
741 else:
742 self.push_lit("430 No Such Article Found")
743 return
744 self.push_lit(self.sample_body)
745 self.push_lit(".")
746
Antoine Pitrou54411c12012-02-12 19:14:17 +0100747 def handle_AUTHINFO(self, cred_type, data):
748 if self._logged_in:
749 self.push_lit('502 Already Logged In')
750 elif cred_type == 'user':
751 if self._user_sent:
752 self.push_lit('482 User Credential Already Sent')
753 else:
754 self.push_lit('381 Password Required')
755 self._user_sent = True
756 elif cred_type == 'pass':
757 self.push_lit('281 Login Successful')
758 self._logged_in = True
759 else:
760 raise Exception('Unknown cred type {}'.format(cred_type))
761
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000762
763class NNTPv2Handler(NNTPv1Handler):
764 """A handler for RFC 3977 (NNTP "v2")"""
765
766 def handle_CAPABILITIES(self):
Antoine Pitrou54411c12012-02-12 19:14:17 +0100767 fmt = """\
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000768 101 Capability list:
Antoine Pitrouf80b3f72010-11-02 22:31:52 +0000769 VERSION 2 3
Antoine Pitrou54411c12012-02-12 19:14:17 +0100770 IMPLEMENTATION INN 2.5.1{}
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000771 HDR
772 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
773 OVER
774 POST
775 READER
Antoine Pitrou54411c12012-02-12 19:14:17 +0100776 ."""
777
778 if not self._logged_in:
779 self.push_lit(fmt.format('\n AUTHINFO USER'))
780 else:
781 self.push_lit(fmt.format(''))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000782
Antoine Pitrou71135622012-02-14 23:29:34 +0100783 def handle_MODE(self, _):
784 raise Exception('MODE READER sent despite READER has been advertised')
785
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000786 def handle_OVER(self, message_spec=None):
787 return self.handle_XOVER(message_spec)
788
789
Antoine Pitrou54411c12012-02-12 19:14:17 +0100790class CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
791 """A handler that allows CAPABILITIES only after login"""
792
793 def handle_CAPABILITIES(self):
794 if not self._logged_in:
795 self.push_lit('480 You must log in.')
796 else:
797 super().handle_CAPABILITIES()
798
799
Antoine Pitrou71135622012-02-14 23:29:34 +0100800class ModeSwitchingNNTPv2Handler(NNTPv2Handler):
801 """A server that starts in transit mode"""
802
803 def __init__(self):
804 self._switched = False
805
806 def handle_CAPABILITIES(self):
807 fmt = """\
808 101 Capability list:
809 VERSION 2 3
810 IMPLEMENTATION INN 2.5.1
811 HDR
812 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
813 OVER
814 POST
815 {}READER
816 ."""
817 if self._switched:
818 self.push_lit(fmt.format(''))
819 else:
820 self.push_lit(fmt.format('MODE-'))
821
822 def handle_MODE(self, what):
823 assert not self._switched and what == 'reader'
824 self._switched = True
825 self.push_lit('200 Posting allowed')
826
827
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000828class NNTPv1v2TestsMixin:
829
830 def setUp(self):
831 super().setUp()
832
833 def test_welcome(self):
834 self.assertEqual(self.server.welcome, self.handler.welcome)
835
Antoine Pitrou54411c12012-02-12 19:14:17 +0100836 def test_authinfo(self):
837 if self.nntp_version == 2:
838 self.assertIn('AUTHINFO', self.server._caps)
839 self.server.login('testuser', 'testpw')
840 # if AUTHINFO is gone from _caps we also know that getcapabilities()
841 # has been called after login as it should
842 self.assertNotIn('AUTHINFO', self.server._caps)
843
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000844 def test_date(self):
845 resp, date = self.server.date()
846 self.assertEqual(resp, "111 20100914001155")
847 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
848
849 def test_quit(self):
850 self.assertFalse(self.sio.closed)
851 resp = self.server.quit()
852 self.assertEqual(resp, "205 Bye!")
853 self.assertTrue(self.sio.closed)
854
855 def test_help(self):
856 resp, help = self.server.help()
857 self.assertEqual(resp, "100 Legal commands")
858 self.assertEqual(help, [
859 ' authinfo user Name|pass Password|generic <prog> <args>',
860 ' date',
861 ' help',
862 'Report problems to <root@example.org>',
863 ])
864
865 def test_list(self):
866 resp, groups = self.server.list()
867 self.assertEqual(len(groups), 6)
868 g = groups[1]
869 self.assertEqual(g,
870 GroupInfo("comp.lang.python.announce", "0000001153",
871 "0000000993", "m"))
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000872 resp, groups = self.server.list("*distutils*")
873 self.assertEqual(len(groups), 2)
874 g = groups[0]
875 self.assertEqual(g,
876 GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
877 "0000000001", "m"))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000878
879 def test_stat(self):
880 resp, art_num, message_id = self.server.stat(3000234)
881 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
882 self.assertEqual(art_num, 3000234)
883 self.assertEqual(message_id, "<45223423@example.com>")
884 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
885 self.assertEqual(resp, "223 0 <45223423@example.com>")
886 self.assertEqual(art_num, 0)
887 self.assertEqual(message_id, "<45223423@example.com>")
888 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
889 self.server.stat("<non.existent.id>")
890 self.assertEqual(cm.exception.response, "430 No Such Article Found")
891 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
892 self.server.stat()
893 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
894
895 def test_next(self):
896 resp, art_num, message_id = self.server.next()
897 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
898 self.assertEqual(art_num, 3000237)
899 self.assertEqual(message_id, "<668929@example.org>")
900
901 def test_last(self):
902 resp, art_num, message_id = self.server.last()
903 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
904 self.assertEqual(art_num, 3000234)
905 self.assertEqual(message_id, "<45223423@example.com>")
906
907 def test_description(self):
908 desc = self.server.description("comp.lang.python")
909 self.assertEqual(desc, "The Python computer language.")
910 desc = self.server.description("comp.lang.pythonx")
911 self.assertEqual(desc, "")
912
913 def test_descriptions(self):
914 resp, groups = self.server.descriptions("comp.lang.python")
915 self.assertEqual(resp, '215 Descriptions in form "group description".')
916 self.assertEqual(groups, {
917 "comp.lang.python": "The Python computer language.",
918 })
919 resp, groups = self.server.descriptions("comp.lang.python*")
920 self.assertEqual(groups, {
921 "comp.lang.python": "The Python computer language.",
922 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
923 })
924 resp, groups = self.server.descriptions("comp.lang.pythonx")
925 self.assertEqual(groups, {})
926
927 def test_group(self):
928 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
929 self.assertTrue(resp.startswith("211 "), resp)
930 self.assertEqual(first, 761)
931 self.assertEqual(last, 1265)
932 self.assertEqual(count, 486)
933 self.assertEqual(group, "fr.comp.lang.python")
934 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
935 self.server.group("comp.lang.python.devel")
936 exc = cm.exception
937 self.assertTrue(exc.response.startswith("411 No such group"),
938 exc.response)
939
940 def test_newnews(self):
941 # NEWNEWS comp.lang.python [20]100913 082004
942 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
943 resp, ids = self.server.newnews("comp.lang.python", dt)
944 expected = (
945 "230 list of newsarticles (NNTP v{0}) "
946 "created after Mon Sep 13 08:20:04 2010 follows"
947 ).format(self.nntp_version)
948 self.assertEqual(resp, expected)
949 self.assertEqual(ids, [
950 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
951 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
952 ])
953 # NEWNEWS fr.comp.lang.python [20]100913 082004
954 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
955 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
956 self.assertEqual(resp, "230 An empty list of newsarticles follows")
957 self.assertEqual(ids, [])
958
959 def _check_article_body(self, lines):
960 self.assertEqual(len(lines), 4)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +0000961 self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000962 self.assertEqual(lines[-2], b"")
963 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
964 self.assertEqual(lines[-4], b"This is just a test article.")
965
966 def _check_article_head(self, lines):
967 self.assertEqual(len(lines), 4)
968 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
969 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
970
971 def _check_article_data(self, lines):
972 self.assertEqual(len(lines), 9)
973 self._check_article_head(lines[:4])
974 self._check_article_body(lines[-4:])
975 self.assertEqual(lines[4], b"")
976
977 def test_article(self):
978 # ARTICLE
979 resp, info = self.server.article()
980 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
981 art_num, message_id, lines = info
982 self.assertEqual(art_num, 3000237)
983 self.assertEqual(message_id, "<45223423@example.com>")
984 self._check_article_data(lines)
985 # ARTICLE num
986 resp, info = self.server.article(3000234)
987 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
988 art_num, message_id, lines = info
989 self.assertEqual(art_num, 3000234)
990 self.assertEqual(message_id, "<45223423@example.com>")
991 self._check_article_data(lines)
992 # ARTICLE id
993 resp, info = self.server.article("<45223423@example.com>")
994 self.assertEqual(resp, "220 0 <45223423@example.com>")
995 art_num, message_id, lines = info
996 self.assertEqual(art_num, 0)
997 self.assertEqual(message_id, "<45223423@example.com>")
998 self._check_article_data(lines)
999 # Non-existent id
1000 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1001 self.server.article("<non-existent@example.com>")
1002 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1003
1004 def test_article_file(self):
1005 # With a "file" argument
1006 f = io.BytesIO()
1007 resp, info = self.server.article(file=f)
1008 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
1009 art_num, message_id, lines = info
1010 self.assertEqual(art_num, 3000237)
1011 self.assertEqual(message_id, "<45223423@example.com>")
1012 self.assertEqual(lines, [])
1013 data = f.getvalue()
1014 self.assertTrue(data.startswith(
1015 b'From: "Demo User" <nobody@example.net>\r\n'
1016 b'Subject: I am just a test article\r\n'
1017 ), ascii(data))
1018 self.assertTrue(data.endswith(
1019 b'This is just a test article.\r\n'
1020 b'.Here is a dot-starting line.\r\n'
1021 b'\r\n'
1022 b'-- Signed by Andr\xc3\xa9.\r\n'
1023 ), ascii(data))
1024
1025 def test_head(self):
1026 # HEAD
1027 resp, info = self.server.head()
1028 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1029 art_num, message_id, lines = info
1030 self.assertEqual(art_num, 3000237)
1031 self.assertEqual(message_id, "<45223423@example.com>")
1032 self._check_article_head(lines)
1033 # HEAD num
1034 resp, info = self.server.head(3000234)
1035 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
1036 art_num, message_id, lines = info
1037 self.assertEqual(art_num, 3000234)
1038 self.assertEqual(message_id, "<45223423@example.com>")
1039 self._check_article_head(lines)
1040 # HEAD id
1041 resp, info = self.server.head("<45223423@example.com>")
1042 self.assertEqual(resp, "221 0 <45223423@example.com>")
1043 art_num, message_id, lines = info
1044 self.assertEqual(art_num, 0)
1045 self.assertEqual(message_id, "<45223423@example.com>")
1046 self._check_article_head(lines)
1047 # Non-existent id
1048 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1049 self.server.head("<non-existent@example.com>")
1050 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1051
Antoine Pitrou2640b522012-02-15 18:53:18 +01001052 def test_head_file(self):
1053 f = io.BytesIO()
1054 resp, info = self.server.head(file=f)
1055 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1056 art_num, message_id, lines = info
1057 self.assertEqual(art_num, 3000237)
1058 self.assertEqual(message_id, "<45223423@example.com>")
1059 self.assertEqual(lines, [])
1060 data = f.getvalue()
1061 self.assertTrue(data.startswith(
1062 b'From: "Demo User" <nobody@example.net>\r\n'
1063 b'Subject: I am just a test article\r\n'
1064 ), ascii(data))
1065 self.assertFalse(data.endswith(
1066 b'This is just a test article.\r\n'
1067 b'.Here is a dot-starting line.\r\n'
1068 b'\r\n'
1069 b'-- Signed by Andr\xc3\xa9.\r\n'
1070 ), ascii(data))
1071
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001072 def test_body(self):
1073 # BODY
1074 resp, info = self.server.body()
1075 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1076 art_num, message_id, lines = info
1077 self.assertEqual(art_num, 3000237)
1078 self.assertEqual(message_id, "<45223423@example.com>")
1079 self._check_article_body(lines)
1080 # BODY num
1081 resp, info = self.server.body(3000234)
1082 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
1083 art_num, message_id, lines = info
1084 self.assertEqual(art_num, 3000234)
1085 self.assertEqual(message_id, "<45223423@example.com>")
1086 self._check_article_body(lines)
1087 # BODY id
1088 resp, info = self.server.body("<45223423@example.com>")
1089 self.assertEqual(resp, "222 0 <45223423@example.com>")
1090 art_num, message_id, lines = info
1091 self.assertEqual(art_num, 0)
1092 self.assertEqual(message_id, "<45223423@example.com>")
1093 self._check_article_body(lines)
1094 # Non-existent id
1095 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1096 self.server.body("<non-existent@example.com>")
1097 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1098
Antoine Pitrou2640b522012-02-15 18:53:18 +01001099 def test_body_file(self):
1100 f = io.BytesIO()
1101 resp, info = self.server.body(file=f)
1102 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1103 art_num, message_id, lines = info
1104 self.assertEqual(art_num, 3000237)
1105 self.assertEqual(message_id, "<45223423@example.com>")
1106 self.assertEqual(lines, [])
1107 data = f.getvalue()
1108 self.assertFalse(data.startswith(
1109 b'From: "Demo User" <nobody@example.net>\r\n'
1110 b'Subject: I am just a test article\r\n'
1111 ), ascii(data))
1112 self.assertTrue(data.endswith(
1113 b'This is just a test article.\r\n'
1114 b'.Here is a dot-starting line.\r\n'
1115 b'\r\n'
1116 b'-- Signed by Andr\xc3\xa9.\r\n'
1117 ), ascii(data))
1118
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001119 def check_over_xover_resp(self, resp, overviews):
1120 self.assertTrue(resp.startswith("224 "), resp)
1121 self.assertEqual(len(overviews), 3)
1122 art_num, over = overviews[0]
1123 self.assertEqual(art_num, 57)
1124 self.assertEqual(over, {
1125 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
1126 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
1127 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
1128 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
1129 "references": "<hvalf7$ort$1@dough.gmane.org>",
1130 ":bytes": "7103",
1131 ":lines": "16",
Dong-hee Na2e6a8ef2020-01-09 00:29:34 +09001132 "xref": "news.gmane.io gmane.comp.python.authors:57"
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001133 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001134 art_num, over = overviews[1]
1135 self.assertEqual(over["xref"], None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001136 art_num, over = overviews[2]
1137 self.assertEqual(over["subject"],
1138 "Re: Message d'erreur incompréhensible (par moi)")
1139
1140 def test_xover(self):
1141 resp, overviews = self.server.xover(57, 59)
1142 self.check_over_xover_resp(resp, overviews)
1143
1144 def test_over(self):
1145 # In NNTP "v1", this will fallback on XOVER
1146 resp, overviews = self.server.over((57, 59))
1147 self.check_over_xover_resp(resp, overviews)
1148
1149 sample_post = (
1150 b'From: "Demo User" <nobody@example.net>\r\n'
1151 b'Subject: I am just a test article\r\n'
1152 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
1153 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
1154 b'\r\n'
1155 b'This is just a test article.\r\n'
1156 b'.Here is a dot-starting line.\r\n'
1157 b'\r\n'
1158 b'-- Signed by Andr\xc3\xa9.\r\n'
1159 )
1160
1161 def _check_posted_body(self):
1162 # Check the raw body as received by the server
1163 lines = self.handler.posted_body
1164 # One additional line for the "." terminator
1165 self.assertEqual(len(lines), 10)
1166 self.assertEqual(lines[-1], b'.\r\n')
1167 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
1168 self.assertEqual(lines[-3], b'\r\n')
1169 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
1170 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
1171
1172 def _check_post_ihave_sub(self, func, *args, file_factory):
1173 # First the prepared post with CRLF endings
1174 post = self.sample_post
1175 func_args = args + (file_factory(post),)
1176 self.handler.posted_body = None
1177 resp = func(*func_args)
1178 self._check_posted_body()
1179 # Then the same post with "normal" line endings - they should be
1180 # converted by NNTP.post and NNTP.ihave.
1181 post = self.sample_post.replace(b"\r\n", b"\n")
1182 func_args = args + (file_factory(post),)
1183 self.handler.posted_body = None
1184 resp = func(*func_args)
1185 self._check_posted_body()
1186 return resp
1187
1188 def check_post_ihave(self, func, success_resp, *args):
1189 # With a bytes object
1190 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1191 self.assertEqual(resp, success_resp)
1192 # With a bytearray object
1193 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1194 self.assertEqual(resp, success_resp)
1195 # With a file object
1196 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1197 self.assertEqual(resp, success_resp)
1198 # With an iterable of terminated lines
1199 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001200 return iter(b.splitlines(keepends=True))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001201 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1202 self.assertEqual(resp, success_resp)
1203 # With an iterable of non-terminated lines
1204 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001205 return iter(b.splitlines(keepends=False))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001206 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1207 self.assertEqual(resp, success_resp)
1208
1209 def test_post(self):
1210 self.check_post_ihave(self.server.post, "240 Article received OK")
1211 self.handler.allow_posting = False
1212 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1213 self.server.post(self.sample_post)
1214 self.assertEqual(cm.exception.response,
1215 "440 Posting not permitted")
1216
1217 def test_ihave(self):
1218 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1219 "<i.am.an.article.you.will.want@example.com>")
1220 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1221 self.server.ihave("<another.message.id>", self.sample_post)
1222 self.assertEqual(cm.exception.response,
1223 "435 Article not wanted")
1224
Georg Brandl28e78412013-10-27 07:29:47 +01001225 def test_too_long_lines(self):
1226 dt = datetime.datetime(2010, 1, 1, 9, 0, 0)
1227 self.assertRaises(nntplib.NNTPDataError,
1228 self.server.newnews, "comp.lang.python", dt)
1229
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001230
1231class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1232 """Tests an NNTP v1 server (no capabilities)."""
1233
1234 nntp_version = 1
1235 handler_class = NNTPv1Handler
1236
1237 def test_caps(self):
1238 caps = self.server.getcapabilities()
1239 self.assertEqual(caps, {})
1240 self.assertEqual(self.server.nntp_version, 1)
Antoine Pitroua0781152010-11-05 19:16:37 +00001241 self.assertEqual(self.server.nntp_implementation, None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001242
1243
1244class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1245 """Tests an NNTP v2 server (with capabilities)."""
1246
1247 nntp_version = 2
1248 handler_class = NNTPv2Handler
1249
1250 def test_caps(self):
1251 caps = self.server.getcapabilities()
1252 self.assertEqual(caps, {
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001253 'VERSION': ['2', '3'],
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001254 'IMPLEMENTATION': ['INN', '2.5.1'],
1255 'AUTHINFO': ['USER'],
1256 'HDR': [],
1257 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1258 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1259 'OVER': [],
1260 'POST': [],
1261 'READER': [],
1262 })
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001263 self.assertEqual(self.server.nntp_version, 3)
Antoine Pitroua0781152010-11-05 19:16:37 +00001264 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001265
1266
Antoine Pitrou54411c12012-02-12 19:14:17 +01001267class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
1268 """Tests a probably NNTP v2 server with capabilities only after login."""
1269
1270 nntp_version = 2
1271 handler_class = CapsAfterLoginNNTPv2Handler
1272
1273 def test_caps_only_after_login(self):
1274 self.assertEqual(self.server._caps, {})
1275 self.server.login('testuser', 'testpw')
1276 self.assertIn('VERSION', self.server._caps)
1277
1278
Antoine Pitrou71135622012-02-14 23:29:34 +01001279class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin,
1280 unittest.TestCase):
1281 """Same tests as for v2 but we tell NTTP to send MODE READER to a server
1282 that isn't in READER mode by default."""
1283
1284 nntp_version = 2
1285 handler_class = ModeSwitchingNNTPv2Handler
1286
1287 def test_we_are_in_reader_mode_after_connect(self):
1288 self.assertIn('READER', self.server._caps)
1289
1290
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001291class MiscTests(unittest.TestCase):
1292
1293 def test_decode_header(self):
1294 def gives(a, b):
1295 self.assertEqual(nntplib.decode_header(a), b)
1296 gives("" , "")
1297 gives("a plain header", "a plain header")
1298 gives(" with extra spaces ", " with extra spaces ")
1299 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
1300 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1301 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1302 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1303 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1304 "Re: problème de matrice")
1305 # A natively utf-8 header (found in the real world!)
1306 gives("Re: Message d'erreur incompréhensible (par moi)",
1307 "Re: Message d'erreur incompréhensible (par moi)")
1308
1309 def test_parse_overview_fmt(self):
1310 # The minimal (default) response
1311 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1312 "References:", ":bytes", ":lines"]
1313 self.assertEqual(nntplib._parse_overview_fmt(lines),
1314 ["subject", "from", "date", "message-id", "references",
1315 ":bytes", ":lines"])
1316 # The minimal response using alternative names
1317 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1318 "References:", "Bytes:", "Lines:"]
1319 self.assertEqual(nntplib._parse_overview_fmt(lines),
1320 ["subject", "from", "date", "message-id", "references",
1321 ":bytes", ":lines"])
1322 # Variations in casing
1323 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1324 "References:", "BYTES:", "Lines:"]
1325 self.assertEqual(nntplib._parse_overview_fmt(lines),
1326 ["subject", "from", "date", "message-id", "references",
1327 ":bytes", ":lines"])
1328 # First example from RFC 3977
1329 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1330 "References:", ":bytes", ":lines", "Xref:full",
1331 "Distribution:full"]
1332 self.assertEqual(nntplib._parse_overview_fmt(lines),
1333 ["subject", "from", "date", "message-id", "references",
1334 ":bytes", ":lines", "xref", "distribution"])
1335 # Second example from RFC 3977
1336 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1337 "References:", "Bytes:", "Lines:", "Xref:FULL",
1338 "Distribution:FULL"]
1339 self.assertEqual(nntplib._parse_overview_fmt(lines),
1340 ["subject", "from", "date", "message-id", "references",
1341 ":bytes", ":lines", "xref", "distribution"])
1342 # A classic response from INN
1343 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1344 "References:", "Bytes:", "Lines:", "Xref:full"]
1345 self.assertEqual(nntplib._parse_overview_fmt(lines),
1346 ["subject", "from", "date", "message-id", "references",
1347 ":bytes", ":lines", "xref"])
1348
1349 def test_parse_overview(self):
1350 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1351 # First example from RFC 3977
1352 lines = [
1353 '3000234\tI am just a test article\t"Demo User" '
1354 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1355 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1356 '17\tXref: news.example.com misc.test:3000363',
1357 ]
1358 overview = nntplib._parse_overview(lines, fmt)
1359 (art_num, fields), = overview
1360 self.assertEqual(art_num, 3000234)
1361 self.assertEqual(fields, {
1362 'subject': 'I am just a test article',
1363 'from': '"Demo User" <nobody@example.com>',
1364 'date': '6 Oct 1998 04:38:40 -0500',
1365 'message-id': '<45223423@example.com>',
1366 'references': '<45454@example.net>',
1367 ':bytes': '1234',
1368 ':lines': '17',
1369 'xref': 'news.example.com misc.test:3000363',
1370 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001371 # Second example; here the "Xref" field is totally absent (including
1372 # the header name) and comes out as None
1373 lines = [
1374 '3000234\tI am just a test article\t"Demo User" '
1375 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1376 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1377 '17\t\t',
1378 ]
1379 overview = nntplib._parse_overview(lines, fmt)
1380 (art_num, fields), = overview
1381 self.assertEqual(fields['xref'], None)
1382 # Third example; the "Xref" is an empty string, while "references"
1383 # is a single space.
1384 lines = [
1385 '3000234\tI am just a test article\t"Demo User" '
1386 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1387 '<45223423@example.com>\t \t1234\t'
1388 '17\tXref: \t',
1389 ]
1390 overview = nntplib._parse_overview(lines, fmt)
1391 (art_num, fields), = overview
1392 self.assertEqual(fields['references'], ' ')
1393 self.assertEqual(fields['xref'], '')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001394
1395 def test_parse_datetime(self):
1396 def gives(a, b, *c):
1397 self.assertEqual(nntplib._parse_datetime(a, b),
1398 datetime.datetime(*c))
1399 # Output of DATE command
1400 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1401 # Variations
1402 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1403 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1404 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1405
1406 def test_unparse_datetime(self):
1407 # Test non-legacy mode
1408 # 1) with a datetime
1409 def gives(y, M, d, h, m, s, date_str, time_str):
1410 dt = datetime.datetime(y, M, d, h, m, s)
1411 self.assertEqual(nntplib._unparse_datetime(dt),
1412 (date_str, time_str))
1413 self.assertEqual(nntplib._unparse_datetime(dt, False),
1414 (date_str, time_str))
1415 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1416 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1417 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1418 # 2) with a date
1419 def gives(y, M, d, date_str, time_str):
1420 dt = datetime.date(y, M, d)
1421 self.assertEqual(nntplib._unparse_datetime(dt),
1422 (date_str, time_str))
1423 self.assertEqual(nntplib._unparse_datetime(dt, False),
1424 (date_str, time_str))
1425 gives(1999, 6, 23, "19990623", "000000")
1426 gives(2000, 6, 23, "20000623", "000000")
1427 gives(2010, 6, 5, "20100605", "000000")
1428
1429 def test_unparse_datetime_legacy(self):
1430 # Test legacy mode (RFC 977)
1431 # 1) with a datetime
1432 def gives(y, M, d, h, m, s, date_str, time_str):
1433 dt = datetime.datetime(y, M, d, h, m, s)
1434 self.assertEqual(nntplib._unparse_datetime(dt, True),
1435 (date_str, time_str))
1436 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1437 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1438 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1439 # 2) with a date
1440 def gives(y, M, d, date_str, time_str):
1441 dt = datetime.date(y, M, d)
1442 self.assertEqual(nntplib._unparse_datetime(dt, True),
1443 (date_str, time_str))
1444 gives(1999, 6, 23, "990623", "000000")
1445 gives(2000, 6, 23, "000623", "000000")
1446 gives(2010, 6, 5, "100605", "000000")
1447
Serhiy Storchaka43767632013-11-03 21:31:38 +02001448 @unittest.skipUnless(ssl, 'requires SSL support')
1449 def test_ssl_support(self):
1450 self.assertTrue(hasattr(nntplib, 'NNTP_SSL'))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001451
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001452
Berker Peksag96756b62014-09-20 08:53:05 +03001453class PublicAPITests(unittest.TestCase):
1454 """Ensures that the correct values are exposed in the public API."""
1455
1456 def test_module_all_attribute(self):
1457 self.assertTrue(hasattr(nntplib, '__all__'))
1458 target_api = ['NNTP', 'NNTPError', 'NNTPReplyError',
1459 'NNTPTemporaryError', 'NNTPPermanentError',
1460 'NNTPProtocolError', 'NNTPDataError', 'decode_header']
1461 if ssl is not None:
1462 target_api.append('NNTP_SSL')
1463 self.assertEqual(set(nntplib.__all__), set(target_api))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001464
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001465class MockSocketTests(unittest.TestCase):
1466 """Tests involving a mock socket object
1467
1468 Used where the _NNTPServerIO file object is not enough."""
1469
1470 nntp_class = nntplib.NNTP
1471
1472 def check_constructor_error_conditions(
1473 self, handler_class,
1474 expected_error_type, expected_error_msg,
1475 login=None, password=None):
1476
1477 class mock_socket_module:
1478 def create_connection(address, timeout):
1479 return MockSocket()
1480
1481 class MockSocket:
1482 def close(self):
1483 nonlocal socket_closed
1484 socket_closed = True
1485
1486 def makefile(socket, mode):
1487 handler = handler_class()
1488 _, file = make_mock_file(handler)
1489 files.append(file)
1490 return file
1491
1492 socket_closed = False
1493 files = []
1494 with patch('nntplib.socket', mock_socket_module), \
1495 self.assertRaisesRegex(expected_error_type, expected_error_msg):
1496 self.nntp_class('dummy', user=login, password=password)
1497 self.assertTrue(socket_closed)
1498 for f in files:
1499 self.assertTrue(f.closed)
1500
1501 def test_bad_welcome(self):
1502 #Test a bad welcome message
1503 class Handler(NNTPv1Handler):
1504 welcome = 'Bad Welcome'
1505 self.check_constructor_error_conditions(
1506 Handler, nntplib.NNTPProtocolError, Handler.welcome)
1507
1508 def test_service_temporarily_unavailable(self):
1509 #Test service temporarily unavailable
1510 class Handler(NNTPv1Handler):
Martin Pantereb995702016-07-28 01:11:04 +00001511 welcome = '400 Service temporarily unavailable'
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001512 self.check_constructor_error_conditions(
1513 Handler, nntplib.NNTPTemporaryError, Handler.welcome)
1514
1515 def test_service_permanently_unavailable(self):
1516 #Test service permanently unavailable
1517 class Handler(NNTPv1Handler):
Martin Pantereb995702016-07-28 01:11:04 +00001518 welcome = '502 Service permanently unavailable'
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001519 self.check_constructor_error_conditions(
1520 Handler, nntplib.NNTPPermanentError, Handler.welcome)
1521
1522 def test_bad_capabilities(self):
1523 #Test a bad capabilities response
1524 class Handler(NNTPv1Handler):
1525 def handle_CAPABILITIES(self):
1526 self.push_lit(capabilities_response)
1527 capabilities_response = '201 bad capability'
1528 self.check_constructor_error_conditions(
1529 Handler, nntplib.NNTPReplyError, capabilities_response)
1530
1531 def test_login_aborted(self):
1532 #Test a bad authinfo response
1533 login = 't@e.com'
1534 password = 'python'
1535 class Handler(NNTPv1Handler):
1536 def handle_AUTHINFO(self, *args):
1537 self.push_lit(authinfo_response)
1538 authinfo_response = '503 Mechanism not recognized'
1539 self.check_constructor_error_conditions(
1540 Handler, nntplib.NNTPPermanentError, authinfo_response,
1541 login, password)
1542
Serhiy Storchaka80774342015-04-03 15:02:20 +03001543class bypass_context:
1544 """Bypass encryption and actual SSL module"""
1545 def wrap_socket(sock, **args):
1546 return sock
1547
1548@unittest.skipUnless(ssl, 'requires SSL support')
1549class MockSslTests(MockSocketTests):
1550 @staticmethod
1551 def nntp_class(*pos, **kw):
1552 return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw)
Victor Stinner8c9bba02015-04-03 11:06:40 +02001553
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001554
Martin Panter8f19e8e2016-01-19 01:10:58 +00001555class LocalServerTests(unittest.TestCase):
1556 def setUp(self):
1557 sock = socket.socket()
1558 port = support.bind_port(sock)
1559 sock.listen()
1560 self.background = threading.Thread(
1561 target=self.run_server, args=(sock,))
1562 self.background.start()
1563 self.addCleanup(self.background.join)
1564
1565 self.nntp = NNTP(support.HOST, port, usenetrc=False).__enter__()
1566 self.addCleanup(self.nntp.__exit__, None, None, None)
1567
1568 def run_server(self, sock):
1569 # Could be generalized to handle more commands in separate methods
1570 with sock:
1571 [client, _] = sock.accept()
1572 with contextlib.ExitStack() as cleanup:
1573 cleanup.enter_context(client)
1574 reader = cleanup.enter_context(client.makefile('rb'))
1575 client.sendall(b'200 Server ready\r\n')
1576 while True:
1577 cmd = reader.readline()
1578 if cmd == b'CAPABILITIES\r\n':
1579 client.sendall(
1580 b'101 Capability list:\r\n'
1581 b'VERSION 2\r\n'
1582 b'STARTTLS\r\n'
1583 b'.\r\n'
1584 )
1585 elif cmd == b'STARTTLS\r\n':
1586 reader.close()
1587 client.sendall(b'382 Begin TLS negotiation now\r\n')
Christian Heimesd0486372016-09-10 23:23:33 +02001588 context = ssl.SSLContext()
1589 context.load_cert_chain(certfile)
1590 client = context.wrap_socket(
1591 client, server_side=True)
Martin Panter8f19e8e2016-01-19 01:10:58 +00001592 cleanup.enter_context(client)
1593 reader = cleanup.enter_context(client.makefile('rb'))
1594 elif cmd == b'QUIT\r\n':
1595 client.sendall(b'205 Bye!\r\n')
1596 break
1597 else:
1598 raise ValueError('Unexpected command {!r}'.format(cmd))
1599
1600 @unittest.skipUnless(ssl, 'requires SSL support')
1601 def test_starttls(self):
1602 file = self.nntp.file
1603 sock = self.nntp.sock
1604 self.nntp.starttls()
1605 # Check that the socket and internal pseudo-file really were
1606 # changed.
1607 self.assertNotEqual(file, self.nntp.file)
1608 self.assertNotEqual(sock, self.nntp.sock)
1609 # Check that the new socket really is an SSL one
1610 self.assertIsInstance(self.nntp.sock, ssl.SSLSocket)
1611 # Check that trying starttls when it's already active fails.
1612 self.assertRaises(ValueError, self.nntp.starttls)
1613
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001614
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001615if __name__ == "__main__":
Berker Peksag96756b62014-09-20 08:53:05 +03001616 unittest.main()