blob: 88c54f4e6f37d8ef65f0cd3dadc4575d8e2cfa44 [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +00002import socket
Antoine Pitrou69ab9512010-09-29 15:03:40 +00003import datetime
4import textwrap
5import unittest
Antoine Pitroude609182010-11-18 17:29:23 +00006import functools
Antoine Pitrou69ab9512010-09-29 15:03:40 +00007import contextlib
Martin Panter8f19e8e2016-01-19 01:10:58 +00008import os.path
Gregory P. Smith2cc02232019-05-06 17:54:06 -04009import re
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020010import threading
11
Antoine Pitrou69ab9512010-09-29 15:03:40 +000012from test import support
Serhiy Storchaka43767632013-11-03 21:31:38 +020013from nntplib import NNTP, GroupInfo
Antoine Pitrou69ab9512010-09-29 15:03:40 +000014import nntplib
Serhiy Storchaka52027c32015-03-21 09:40:26 +020015from unittest.mock import patch
Serhiy Storchaka43767632013-11-03 21:31:38 +020016try:
Antoine Pitrou1cb121e2010-11-09 18:54:37 +000017 import ssl
Serhiy Storchaka43767632013-11-03 21:31:38 +020018except ImportError:
19 ssl = None
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020020
Antoine Pitrou69ab9512010-09-29 15:03:40 +000021
Martin Panter8f19e8e2016-01-19 01:10:58 +000022certfile = os.path.join(os.path.dirname(__file__), 'keycert3.pem')
Antoine Pitrou69ab9512010-09-29 15:03:40 +000023
Gregory P. Smith2cc02232019-05-06 17:54:06 -040024if ssl is not None:
25 SSLError = ssl.SSLError
26else:
27 class SSLError(Exception):
28 """Non-existent exception class when we lack SSL support."""
29 reason = "This will never be raised."
30
Antoine Pitrou69ab9512010-09-29 15:03:40 +000031# TODO:
32# - test the `file` arg to more commands
33# - test error conditions
Antoine Pitroua5785b12010-09-29 16:19:50 +000034# - test auth and `usenetrc`
Antoine Pitrou69ab9512010-09-29 15:03:40 +000035
36
37class NetworkedNNTPTestsMixin:
38
39 def test_welcome(self):
40 welcome = self.server.getwelcome()
41 self.assertEqual(str, type(welcome))
42
43 def test_help(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000044 resp, lines = self.server.help()
Antoine Pitrou69ab9512010-09-29 15:03:40 +000045 self.assertTrue(resp.startswith("100 "), resp)
Antoine Pitrou08eeada2010-11-04 21:36:15 +000046 for line in lines:
Antoine Pitrou69ab9512010-09-29 15:03:40 +000047 self.assertEqual(str, type(line))
48
49 def test_list(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000050 resp, groups = self.server.list()
51 if len(groups) > 0:
52 self.assertEqual(GroupInfo, type(groups[0]))
53 self.assertEqual(str, type(groups[0].group))
54
55 def test_list_active(self):
56 resp, groups = self.server.list(self.GROUP_PAT)
57 if len(groups) > 0:
58 self.assertEqual(GroupInfo, type(groups[0]))
59 self.assertEqual(str, type(groups[0].group))
Antoine Pitrou69ab9512010-09-29 15:03:40 +000060
61 def test_unknown_command(self):
62 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
63 self.server._shortcmd("XYZZY")
64 resp = cm.exception.response
65 self.assertTrue(resp.startswith("500 "), resp)
66
67 def test_newgroups(self):
68 # gmane gets a constant influx of new groups. In order not to stress
69 # the server too much, we choose a recent date in the past.
70 dt = datetime.date.today() - datetime.timedelta(days=7)
71 resp, groups = self.server.newgroups(dt)
72 if len(groups) > 0:
73 self.assertIsInstance(groups[0], GroupInfo)
74 self.assertIsInstance(groups[0].group, str)
75
76 def test_description(self):
77 def _check_desc(desc):
78 # Sanity checks
79 self.assertIsInstance(desc, str)
80 self.assertNotIn(self.GROUP_NAME, desc)
81 desc = self.server.description(self.GROUP_NAME)
82 _check_desc(desc)
83 # Another sanity check
84 self.assertIn("Python", desc)
85 # With a pattern
86 desc = self.server.description(self.GROUP_PAT)
87 _check_desc(desc)
88 # Shouldn't exist
89 desc = self.server.description("zk.brrtt.baz")
90 self.assertEqual(desc, '')
91
92 def test_descriptions(self):
93 resp, descs = self.server.descriptions(self.GROUP_PAT)
94 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
95 self.assertTrue(
96 resp.startswith("215 ") or resp.startswith("282 "), resp)
97 self.assertIsInstance(descs, dict)
98 desc = descs[self.GROUP_NAME]
99 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
100
101 def test_group(self):
102 result = self.server.group(self.GROUP_NAME)
103 self.assertEqual(5, len(result))
104 resp, count, first, last, group = result
105 self.assertEqual(group, self.GROUP_NAME)
106 self.assertIsInstance(count, int)
107 self.assertIsInstance(first, int)
108 self.assertIsInstance(last, int)
109 self.assertLessEqual(first, last)
110 self.assertTrue(resp.startswith("211 "), resp)
111
112 def test_date(self):
113 resp, date = self.server.date()
114 self.assertIsInstance(date, datetime.datetime)
115 # Sanity check
116 self.assertGreaterEqual(date.year, 1995)
117 self.assertLessEqual(date.year, 2030)
118
119 def _check_art_dict(self, art_dict):
120 # Some sanity checks for a field dictionary returned by OVER / XOVER
121 self.assertIsInstance(art_dict, dict)
122 # NNTP has 7 mandatory fields
123 self.assertGreaterEqual(art_dict.keys(),
124 {"subject", "from", "date", "message-id",
125 "references", ":bytes", ":lines"}
126 )
127 for v in art_dict.values():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000128 self.assertIsInstance(v, (str, type(None)))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000129
130 def test_xover(self):
131 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000132 resp, lines = self.server.xover(last - 5, last)
133 if len(lines) == 0:
134 self.skipTest("no articles retrieved")
135 # The 'last' article is not necessarily part of the output (cancelled?)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000136 art_num, art_dict = lines[0]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000137 self.assertGreaterEqual(art_num, last - 5)
138 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000139 self._check_art_dict(art_dict)
140
Xavier de Gayeac13bee2016-12-16 20:49:10 +0100141 @unittest.skipIf(True, 'temporarily skipped until a permanent solution'
142 ' is found for issue #28971')
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000143 def test_over(self):
144 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
145 start = last - 10
146 # The "start-" article range form
147 resp, lines = self.server.over((start, None))
148 art_num, art_dict = lines[0]
149 self._check_art_dict(art_dict)
150 # The "start-end" article range form
151 resp, lines = self.server.over((start, last))
152 art_num, art_dict = lines[-1]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000153 # The 'last' article is not necessarily part of the output (cancelled?)
154 self.assertGreaterEqual(art_num, start)
155 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000156 self._check_art_dict(art_dict)
157 # XXX The "message_id" form is unsupported by gmane
158 # 503 Overview by message-ID unsupported
159
160 def test_xhdr(self):
161 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
162 resp, lines = self.server.xhdr('subject', last)
163 for line in lines:
164 self.assertEqual(str, type(line[1]))
165
166 def check_article_resp(self, resp, article, art_num=None):
167 self.assertIsInstance(article, nntplib.ArticleInfo)
168 if art_num is not None:
169 self.assertEqual(article.number, art_num)
170 for line in article.lines:
171 self.assertIsInstance(line, bytes)
172 # XXX this could exceptionally happen...
173 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
174
Victor Stinner706cb312017-11-25 02:42:18 +0100175 @unittest.skipIf(True, "FIXME: see bpo-32128")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000176 def test_article_head_body(self):
177 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000178 # Try to find an available article
179 for art_num in (last, first, last - 1):
180 try:
181 resp, head = self.server.head(art_num)
182 except nntplib.NNTPTemporaryError as e:
183 if not e.response.startswith("423 "):
184 raise
185 # "423 No such article" => choose another one
186 continue
187 break
188 else:
189 self.skipTest("could not find a suitable article number")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000190 self.assertTrue(resp.startswith("221 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000191 self.check_article_resp(resp, head, art_num)
192 resp, body = self.server.body(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000193 self.assertTrue(resp.startswith("222 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000194 self.check_article_resp(resp, body, art_num)
195 resp, article = self.server.article(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000196 self.assertTrue(resp.startswith("220 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000197 self.check_article_resp(resp, article, art_num)
Nick Coghlan14d99a12012-06-17 21:27:18 +1000198 # Tolerate running the tests from behind a NNTP virus checker
Antoine Pitrou1f5d2a02012-06-24 16:28:18 +0200199 blacklist = lambda line: line.startswith(b'X-Antivirus')
200 filtered_head_lines = [line for line in head.lines
201 if not blacklist(line)]
Nick Coghlan14d99a12012-06-17 21:27:18 +1000202 filtered_lines = [line for line in article.lines
Antoine Pitrou1f5d2a02012-06-24 16:28:18 +0200203 if not blacklist(line)]
204 self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000205
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000206 def test_capabilities(self):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000207 # The server under test implements NNTP version 2 and has a
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000208 # couple of well-known capabilities. Just sanity check that we
209 # got them.
210 def _check_caps(caps):
211 caps_list = caps['LIST']
212 self.assertIsInstance(caps_list, (list, tuple))
213 self.assertIn('OVERVIEW.FMT', caps_list)
214 self.assertGreaterEqual(self.server.nntp_version, 2)
215 _check_caps(self.server.getcapabilities())
216 # This re-emits the command
217 resp, caps = self.server.capabilities()
218 _check_caps(caps)
219
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000220 def test_zlogin(self):
221 # This test must be the penultimate because further commands will be
222 # refused.
223 baduser = "notarealuser"
224 badpw = "notarealpassword"
225 # Check that bogus credentials cause failure
226 self.assertRaises(nntplib.NNTPError, self.server.login,
227 user=baduser, password=badpw, usenetrc=False)
228 # FIXME: We should check that correct credentials succeed, but that
229 # would require valid details for some server somewhere to be in the
230 # test suite, I think. Gmane is anonymous, at least as used for the
231 # other tests.
232
233 def test_zzquit(self):
234 # This test must be called last, hence the name
235 cls = type(self)
Antoine Pitrou3bce11c2010-11-21 17:14:19 +0000236 try:
237 self.server.quit()
238 finally:
239 cls.server = None
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000240
Antoine Pitroude609182010-11-18 17:29:23 +0000241 @classmethod
242 def wrap_methods(cls):
243 # Wrap all methods in a transient_internet() exception catcher
244 # XXX put a generic version in test.support?
245 def wrap_meth(meth):
246 @functools.wraps(meth)
247 def wrapped(self):
248 with support.transient_internet(self.NNTP_HOST):
249 meth(self)
250 return wrapped
251 for name in dir(cls):
252 if not name.startswith('test_'):
253 continue
254 meth = getattr(cls, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200255 if not callable(meth):
Antoine Pitroude609182010-11-18 17:29:23 +0000256 continue
257 # Need to use a closure so that meth remains bound to its current
258 # value
259 setattr(cls, name, wrap_meth(meth))
260
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000261 def test_with_statement(self):
262 def is_connected():
263 if not hasattr(server, 'file'):
264 return False
265 try:
266 server.help()
Andrew Svetlov0832af62012-12-18 23:10:48 +0200267 except (OSError, EOFError):
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000268 return False
269 return True
270
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400271 try:
Victor Stinner1d0f9b32019-12-10 22:09:23 +0100272 server = self.NNTP_CLASS(self.NNTP_HOST,
273 timeout=support.INTERNET_TIMEOUT,
274 usenetrc=False)
275 with server:
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400276 self.assertTrue(is_connected())
277 self.assertTrue(server.help())
278 self.assertFalse(is_connected())
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000279
Victor Stinner1d0f9b32019-12-10 22:09:23 +0100280 server = self.NNTP_CLASS(self.NNTP_HOST,
281 timeout=support.INTERNET_TIMEOUT,
282 usenetrc=False)
283 with server:
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400284 server.quit()
285 self.assertFalse(is_connected())
286 except SSLError as ssl_err:
287 # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
288 if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
289 raise unittest.SkipTest(f"Got {ssl_err} connecting "
290 f"to {self.NNTP_HOST!r}")
291 raise
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000292
293
Antoine Pitroude609182010-11-18 17:29:23 +0000294NetworkedNNTPTestsMixin.wrap_methods()
295
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000296
INADA Naoki067931d2017-07-26 23:43:22 +0900297EOF_ERRORS = (EOFError,)
Victor Stinner5b4feb72017-07-24 17:41:02 +0200298if ssl is not None:
INADA Naoki067931d2017-07-26 23:43:22 +0900299 EOF_ERRORS += (ssl.SSLEOFError,)
Victor Stinner5b4feb72017-07-24 17:41:02 +0200300
301
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000302class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
303 # This server supports STARTTLS (gmane doesn't)
304 NNTP_HOST = 'news.trigofacile.com'
305 GROUP_NAME = 'fr.comp.lang.python'
306 GROUP_PAT = 'fr.comp.lang.*'
307
Antoine Pitroude609182010-11-18 17:29:23 +0000308 NNTP_CLASS = NNTP
309
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000310 @classmethod
311 def setUpClass(cls):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000312 support.requires("network")
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000313 with support.transient_internet(cls.NNTP_HOST):
Victor Stinner5bccca52017-04-27 17:30:13 +0200314 try:
Victor Stinner1d0f9b32019-12-10 22:09:23 +0100315 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST,
316 timeout=support.INTERNET_TIMEOUT,
Victor Stinner5bccca52017-04-27 17:30:13 +0200317 usenetrc=False)
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400318 except SSLError as ssl_err:
319 # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
320 if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
321 raise unittest.SkipTest(f"{cls} got {ssl_err} connecting "
322 f"to {cls.NNTP_HOST!r}")
323 raise
Victor Stinner5b4feb72017-07-24 17:41:02 +0200324 except EOF_ERRORS:
Victor Stinner5bccca52017-04-27 17:30:13 +0200325 raise unittest.SkipTest(f"{cls} got EOF error on connecting "
326 f"to {cls.NNTP_HOST!r}")
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000327
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000328 @classmethod
329 def tearDownClass(cls):
330 if cls.server is not None:
331 cls.server.quit()
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000332
Serhiy Storchaka43767632013-11-03 21:31:38 +0200333@unittest.skipUnless(ssl, 'requires SSL support')
334class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000335
Serhiy Storchaka43767632013-11-03 21:31:38 +0200336 # Technical limits for this public NNTP server (see http://www.aioe.org):
337 # "Only two concurrent connections per IP address are allowed and
338 # 400 connections per day are accepted from each IP address."
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000339
Serhiy Storchaka43767632013-11-03 21:31:38 +0200340 NNTP_HOST = 'nntp.aioe.org'
341 GROUP_NAME = 'comp.lang.python'
342 GROUP_PAT = 'comp.lang.*'
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000343
Serhiy Storchaka43767632013-11-03 21:31:38 +0200344 NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None)
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000345
Serhiy Storchaka43767632013-11-03 21:31:38 +0200346 # Disabled as it produces too much data
347 test_list = None
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000348
Serhiy Storchaka43767632013-11-03 21:31:38 +0200349 # Disabled as the connection will already be encrypted.
350 test_starttls = None
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000351
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000352
353#
354# Non-networked tests using a local server (or something mocking it).
355#
356
357class _NNTPServerIO(io.RawIOBase):
358 """A raw IO object allowing NNTP commands to be received and processed
359 by a handler. The handler can push responses which can then be read
360 from the IO object."""
361
362 def __init__(self, handler):
363 io.RawIOBase.__init__(self)
364 # The channel from the client
365 self.c2s = io.BytesIO()
366 # The channel to the client
367 self.s2c = io.BytesIO()
368 self.handler = handler
369 self.handler.start(self.c2s.readline, self.push_data)
370
371 def readable(self):
372 return True
373
374 def writable(self):
375 return True
376
377 def push_data(self, data):
378 """Push (buffer) some data to send to the client."""
379 pos = self.s2c.tell()
380 self.s2c.seek(0, 2)
381 self.s2c.write(data)
382 self.s2c.seek(pos)
383
384 def write(self, b):
385 """The client sends us some data"""
386 pos = self.c2s.tell()
387 self.c2s.write(b)
388 self.c2s.seek(pos)
389 self.handler.process_pending()
390 return len(b)
391
392 def readinto(self, buf):
393 """The client wants to read a response"""
394 self.handler.process_pending()
395 b = self.s2c.read(len(buf))
396 n = len(b)
397 buf[:n] = b
398 return n
399
400
Serhiy Storchaka52027c32015-03-21 09:40:26 +0200401def make_mock_file(handler):
402 sio = _NNTPServerIO(handler)
403 # Using BufferedRWPair instead of BufferedRandom ensures the file
404 # isn't seekable.
405 file = io.BufferedRWPair(sio, sio)
406 return (sio, file)
407
408
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000409class MockedNNTPTestsMixin:
410 # Override in derived classes
411 handler_class = None
412
413 def setUp(self):
414 super().setUp()
415 self.make_server()
416
417 def tearDown(self):
418 super().tearDown()
419 del self.server
420
421 def make_server(self, *args, **kwargs):
422 self.handler = self.handler_class()
Serhiy Storchaka52027c32015-03-21 09:40:26 +0200423 self.sio, file = make_mock_file(self.handler)
Antoine Pitroua5785b12010-09-29 16:19:50 +0000424 self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000425 return self.server
426
427
Antoine Pitrou71135622012-02-14 23:29:34 +0100428class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin):
429 def setUp(self):
430 super().setUp()
431 self.make_server(readermode=True)
432
433
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000434class NNTPv1Handler:
435 """A handler for RFC 977"""
436
437 welcome = "200 NNTP mock server"
438
439 def start(self, readline, push_data):
440 self.in_body = False
441 self.allow_posting = True
442 self._readline = readline
443 self._push_data = push_data
Antoine Pitrou54411c12012-02-12 19:14:17 +0100444 self._logged_in = False
445 self._user_sent = False
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000446 # Our welcome
447 self.handle_welcome()
448
449 def _decode(self, data):
450 return str(data, "utf-8", "surrogateescape")
451
452 def process_pending(self):
453 if self.in_body:
454 while True:
455 line = self._readline()
456 if not line:
457 return
458 self.body.append(line)
459 if line == b".\r\n":
460 break
461 try:
462 meth, tokens = self.body_callback
463 meth(*tokens, body=self.body)
464 finally:
465 self.body_callback = None
466 self.body = None
467 self.in_body = False
468 while True:
469 line = self._decode(self._readline())
470 if not line:
471 return
472 if not line.endswith("\r\n"):
473 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
474 line = line[:-2]
475 cmd, *tokens = line.split()
476 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
477 meth = getattr(self, "handle_" + cmd.upper(), None)
478 if meth is None:
479 self.handle_unknown()
480 else:
481 try:
482 meth(*tokens)
483 except Exception as e:
484 raise ValueError("command failed: {!r}".format(line)) from e
485 else:
486 if self.in_body:
487 self.body_callback = meth, tokens
488 self.body = []
489
490 def expect_body(self):
491 """Flag that the client is expected to post a request body"""
492 self.in_body = True
493
494 def push_data(self, data):
495 """Push some binary data"""
496 self._push_data(data)
497
498 def push_lit(self, lit):
499 """Push a string literal"""
500 lit = textwrap.dedent(lit)
501 lit = "\r\n".join(lit.splitlines()) + "\r\n"
502 lit = lit.encode('utf-8')
503 self.push_data(lit)
504
505 def handle_unknown(self):
506 self.push_lit("500 What?")
507
508 def handle_welcome(self):
509 self.push_lit(self.welcome)
510
511 def handle_QUIT(self):
512 self.push_lit("205 Bye!")
513
514 def handle_DATE(self):
515 self.push_lit("111 20100914001155")
516
517 def handle_GROUP(self, group):
518 if group == "fr.comp.lang.python":
519 self.push_lit("211 486 761 1265 fr.comp.lang.python")
520 else:
521 self.push_lit("411 No such group {}".format(group))
522
523 def handle_HELP(self):
524 self.push_lit("""\
525 100 Legal commands
526 authinfo user Name|pass Password|generic <prog> <args>
527 date
528 help
529 Report problems to <root@example.org>
530 .""")
531
532 def handle_STAT(self, message_spec=None):
533 if message_spec is None:
534 self.push_lit("412 No newsgroup selected")
535 elif message_spec == "3000234":
536 self.push_lit("223 3000234 <45223423@example.com>")
537 elif message_spec == "<45223423@example.com>":
538 self.push_lit("223 0 <45223423@example.com>")
539 else:
540 self.push_lit("430 No Such Article Found")
541
542 def handle_NEXT(self):
543 self.push_lit("223 3000237 <668929@example.org> retrieved")
544
545 def handle_LAST(self):
546 self.push_lit("223 3000234 <45223423@example.com> retrieved")
547
548 def handle_LIST(self, action=None, param=None):
549 if action is None:
550 self.push_lit("""\
551 215 Newsgroups in form "group high low flags".
552 comp.lang.python 0000052340 0000002828 y
553 comp.lang.python.announce 0000001153 0000000993 m
554 free.it.comp.lang.python 0000000002 0000000002 y
555 fr.comp.lang.python 0000001254 0000000760 y
556 free.it.comp.lang.python.learner 0000000000 0000000001 y
557 tw.bbs.comp.lang.python 0000000304 0000000304 y
558 .""")
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000559 elif action == "ACTIVE":
560 if param == "*distutils*":
561 self.push_lit("""\
562 215 Newsgroups in form "group high low flags"
563 gmane.comp.python.distutils.devel 0000014104 0000000001 m
564 gmane.comp.python.distutils.cvs 0000000000 0000000001 m
565 .""")
566 else:
567 self.push_lit("""\
568 215 Newsgroups in form "group high low flags"
569 .""")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000570 elif action == "OVERVIEW.FMT":
571 self.push_lit("""\
572 215 Order of fields in overview database.
573 Subject:
574 From:
575 Date:
576 Message-ID:
577 References:
578 Bytes:
579 Lines:
580 Xref:full
581 .""")
582 elif action == "NEWSGROUPS":
583 assert param is not None
584 if param == "comp.lang.python":
585 self.push_lit("""\
586 215 Descriptions in form "group description".
587 comp.lang.python\tThe Python computer language.
588 .""")
589 elif param == "comp.lang.python*":
590 self.push_lit("""\
591 215 Descriptions in form "group description".
592 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
593 comp.lang.python\tThe Python computer language.
594 .""")
595 else:
596 self.push_lit("""\
597 215 Descriptions in form "group description".
598 .""")
599 else:
600 self.push_lit('501 Unknown LIST keyword')
601
602 def handle_NEWNEWS(self, group, date_str, time_str):
603 # We hard code different return messages depending on passed
604 # argument and date syntax.
605 if (group == "comp.lang.python" and date_str == "20100913"
606 and time_str == "082004"):
607 # Date was passed in RFC 3977 format (NNTP "v2")
608 self.push_lit("""\
609 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
610 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
611 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
612 .""")
613 elif (group == "comp.lang.python" and date_str == "100913"
614 and time_str == "082004"):
615 # Date was passed in RFC 977 format (NNTP "v1")
616 self.push_lit("""\
617 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
618 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
619 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
620 .""")
Georg Brandl28e78412013-10-27 07:29:47 +0100621 elif (group == 'comp.lang.python' and
622 date_str in ('20100101', '100101') and
623 time_str == '090000'):
624 self.push_lit('too long line' * 3000 +
625 '\n.')
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000626 else:
627 self.push_lit("""\
628 230 An empty list of newsarticles follows
629 .""")
630 # (Note for experiments: many servers disable NEWNEWS.
631 # As of this writing, sicinfo3.epfl.ch doesn't.)
632
633 def handle_XOVER(self, message_spec):
634 if message_spec == "57-59":
635 self.push_lit(
636 "224 Overview information for 57-58 follows\n"
637 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
638 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
639 "\tSat, 19 Jun 2010 18:04:08 -0400"
640 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
641 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
Dong-hee Na2e6a8ef2020-01-09 00:29:34 +0900642 "\tXref: news.gmane.io gmane.comp.python.authors:57"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000643 "\n"
644 "58\tLooking for a few good bloggers"
645 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
646 "\tThu, 22 Jul 2010 09:14:14 -0400"
647 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
648 "\t\t6683\t16"
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000649 "\t"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000650 "\n"
Martin Panter6245cb32016-04-15 02:14:19 +0000651 # A UTF-8 overview line from fr.comp.lang.python
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000652 "59\tRe: Message d'erreur incompréhensible (par moi)"
653 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
654 "\tWed, 15 Sep 2010 18:09:15 +0200"
655 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
656 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
657 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
658 "\n"
659 ".\n")
660 else:
661 self.push_lit("""\
662 224 No articles
663 .""")
664
665 def handle_POST(self, *, body=None):
666 if body is None:
667 if self.allow_posting:
668 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
669 self.expect_body()
670 else:
671 self.push_lit("440 Posting not permitted")
672 else:
673 assert self.allow_posting
674 self.push_lit("240 Article received OK")
675 self.posted_body = body
676
677 def handle_IHAVE(self, message_id, *, body=None):
678 if body is None:
679 if (self.allow_posting and
680 message_id == "<i.am.an.article.you.will.want@example.com>"):
681 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
682 self.expect_body()
683 else:
684 self.push_lit("435 Article not wanted")
685 else:
686 assert self.allow_posting
687 self.push_lit("235 Article transferred OK")
688 self.posted_body = body
689
690 sample_head = """\
691 From: "Demo User" <nobody@example.net>
692 Subject: I am just a test article
693 Content-Type: text/plain; charset=UTF-8; format=flowed
694 Message-ID: <i.am.an.article.you.will.want@example.com>"""
695
696 sample_body = """\
697 This is just a test article.
698 ..Here is a dot-starting line.
699
700 -- Signed by Andr\xe9."""
701
702 sample_article = sample_head + "\n\n" + sample_body
703
704 def handle_ARTICLE(self, message_spec=None):
705 if message_spec is None:
706 self.push_lit("220 3000237 <45223423@example.com>")
707 elif message_spec == "<45223423@example.com>":
708 self.push_lit("220 0 <45223423@example.com>")
709 elif message_spec == "3000234":
710 self.push_lit("220 3000234 <45223423@example.com>")
711 else:
712 self.push_lit("430 No Such Article Found")
713 return
714 self.push_lit(self.sample_article)
715 self.push_lit(".")
716
717 def handle_HEAD(self, message_spec=None):
718 if message_spec is None:
719 self.push_lit("221 3000237 <45223423@example.com>")
720 elif message_spec == "<45223423@example.com>":
721 self.push_lit("221 0 <45223423@example.com>")
722 elif message_spec == "3000234":
723 self.push_lit("221 3000234 <45223423@example.com>")
724 else:
725 self.push_lit("430 No Such Article Found")
726 return
727 self.push_lit(self.sample_head)
728 self.push_lit(".")
729
730 def handle_BODY(self, message_spec=None):
731 if message_spec is None:
732 self.push_lit("222 3000237 <45223423@example.com>")
733 elif message_spec == "<45223423@example.com>":
734 self.push_lit("222 0 <45223423@example.com>")
735 elif message_spec == "3000234":
736 self.push_lit("222 3000234 <45223423@example.com>")
737 else:
738 self.push_lit("430 No Such Article Found")
739 return
740 self.push_lit(self.sample_body)
741 self.push_lit(".")
742
Antoine Pitrou54411c12012-02-12 19:14:17 +0100743 def handle_AUTHINFO(self, cred_type, data):
744 if self._logged_in:
745 self.push_lit('502 Already Logged In')
746 elif cred_type == 'user':
747 if self._user_sent:
748 self.push_lit('482 User Credential Already Sent')
749 else:
750 self.push_lit('381 Password Required')
751 self._user_sent = True
752 elif cred_type == 'pass':
753 self.push_lit('281 Login Successful')
754 self._logged_in = True
755 else:
756 raise Exception('Unknown cred type {}'.format(cred_type))
757
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000758
759class NNTPv2Handler(NNTPv1Handler):
760 """A handler for RFC 3977 (NNTP "v2")"""
761
762 def handle_CAPABILITIES(self):
Antoine Pitrou54411c12012-02-12 19:14:17 +0100763 fmt = """\
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000764 101 Capability list:
Antoine Pitrouf80b3f72010-11-02 22:31:52 +0000765 VERSION 2 3
Antoine Pitrou54411c12012-02-12 19:14:17 +0100766 IMPLEMENTATION INN 2.5.1{}
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000767 HDR
768 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
769 OVER
770 POST
771 READER
Antoine Pitrou54411c12012-02-12 19:14:17 +0100772 ."""
773
774 if not self._logged_in:
775 self.push_lit(fmt.format('\n AUTHINFO USER'))
776 else:
777 self.push_lit(fmt.format(''))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000778
Antoine Pitrou71135622012-02-14 23:29:34 +0100779 def handle_MODE(self, _):
780 raise Exception('MODE READER sent despite READER has been advertised')
781
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000782 def handle_OVER(self, message_spec=None):
783 return self.handle_XOVER(message_spec)
784
785
Antoine Pitrou54411c12012-02-12 19:14:17 +0100786class CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
787 """A handler that allows CAPABILITIES only after login"""
788
789 def handle_CAPABILITIES(self):
790 if not self._logged_in:
791 self.push_lit('480 You must log in.')
792 else:
793 super().handle_CAPABILITIES()
794
795
Antoine Pitrou71135622012-02-14 23:29:34 +0100796class ModeSwitchingNNTPv2Handler(NNTPv2Handler):
797 """A server that starts in transit mode"""
798
799 def __init__(self):
800 self._switched = False
801
802 def handle_CAPABILITIES(self):
803 fmt = """\
804 101 Capability list:
805 VERSION 2 3
806 IMPLEMENTATION INN 2.5.1
807 HDR
808 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
809 OVER
810 POST
811 {}READER
812 ."""
813 if self._switched:
814 self.push_lit(fmt.format(''))
815 else:
816 self.push_lit(fmt.format('MODE-'))
817
818 def handle_MODE(self, what):
819 assert not self._switched and what == 'reader'
820 self._switched = True
821 self.push_lit('200 Posting allowed')
822
823
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000824class NNTPv1v2TestsMixin:
825
826 def setUp(self):
827 super().setUp()
828
829 def test_welcome(self):
830 self.assertEqual(self.server.welcome, self.handler.welcome)
831
Antoine Pitrou54411c12012-02-12 19:14:17 +0100832 def test_authinfo(self):
833 if self.nntp_version == 2:
834 self.assertIn('AUTHINFO', self.server._caps)
835 self.server.login('testuser', 'testpw')
836 # if AUTHINFO is gone from _caps we also know that getcapabilities()
837 # has been called after login as it should
838 self.assertNotIn('AUTHINFO', self.server._caps)
839
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000840 def test_date(self):
841 resp, date = self.server.date()
842 self.assertEqual(resp, "111 20100914001155")
843 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
844
845 def test_quit(self):
846 self.assertFalse(self.sio.closed)
847 resp = self.server.quit()
848 self.assertEqual(resp, "205 Bye!")
849 self.assertTrue(self.sio.closed)
850
851 def test_help(self):
852 resp, help = self.server.help()
853 self.assertEqual(resp, "100 Legal commands")
854 self.assertEqual(help, [
855 ' authinfo user Name|pass Password|generic <prog> <args>',
856 ' date',
857 ' help',
858 'Report problems to <root@example.org>',
859 ])
860
861 def test_list(self):
862 resp, groups = self.server.list()
863 self.assertEqual(len(groups), 6)
864 g = groups[1]
865 self.assertEqual(g,
866 GroupInfo("comp.lang.python.announce", "0000001153",
867 "0000000993", "m"))
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000868 resp, groups = self.server.list("*distutils*")
869 self.assertEqual(len(groups), 2)
870 g = groups[0]
871 self.assertEqual(g,
872 GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
873 "0000000001", "m"))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000874
875 def test_stat(self):
876 resp, art_num, message_id = self.server.stat(3000234)
877 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
878 self.assertEqual(art_num, 3000234)
879 self.assertEqual(message_id, "<45223423@example.com>")
880 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
881 self.assertEqual(resp, "223 0 <45223423@example.com>")
882 self.assertEqual(art_num, 0)
883 self.assertEqual(message_id, "<45223423@example.com>")
884 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
885 self.server.stat("<non.existent.id>")
886 self.assertEqual(cm.exception.response, "430 No Such Article Found")
887 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
888 self.server.stat()
889 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
890
891 def test_next(self):
892 resp, art_num, message_id = self.server.next()
893 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
894 self.assertEqual(art_num, 3000237)
895 self.assertEqual(message_id, "<668929@example.org>")
896
897 def test_last(self):
898 resp, art_num, message_id = self.server.last()
899 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
900 self.assertEqual(art_num, 3000234)
901 self.assertEqual(message_id, "<45223423@example.com>")
902
903 def test_description(self):
904 desc = self.server.description("comp.lang.python")
905 self.assertEqual(desc, "The Python computer language.")
906 desc = self.server.description("comp.lang.pythonx")
907 self.assertEqual(desc, "")
908
909 def test_descriptions(self):
910 resp, groups = self.server.descriptions("comp.lang.python")
911 self.assertEqual(resp, '215 Descriptions in form "group description".')
912 self.assertEqual(groups, {
913 "comp.lang.python": "The Python computer language.",
914 })
915 resp, groups = self.server.descriptions("comp.lang.python*")
916 self.assertEqual(groups, {
917 "comp.lang.python": "The Python computer language.",
918 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
919 })
920 resp, groups = self.server.descriptions("comp.lang.pythonx")
921 self.assertEqual(groups, {})
922
923 def test_group(self):
924 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
925 self.assertTrue(resp.startswith("211 "), resp)
926 self.assertEqual(first, 761)
927 self.assertEqual(last, 1265)
928 self.assertEqual(count, 486)
929 self.assertEqual(group, "fr.comp.lang.python")
930 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
931 self.server.group("comp.lang.python.devel")
932 exc = cm.exception
933 self.assertTrue(exc.response.startswith("411 No such group"),
934 exc.response)
935
936 def test_newnews(self):
937 # NEWNEWS comp.lang.python [20]100913 082004
938 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
939 resp, ids = self.server.newnews("comp.lang.python", dt)
940 expected = (
941 "230 list of newsarticles (NNTP v{0}) "
942 "created after Mon Sep 13 08:20:04 2010 follows"
943 ).format(self.nntp_version)
944 self.assertEqual(resp, expected)
945 self.assertEqual(ids, [
946 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
947 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
948 ])
949 # NEWNEWS fr.comp.lang.python [20]100913 082004
950 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
951 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
952 self.assertEqual(resp, "230 An empty list of newsarticles follows")
953 self.assertEqual(ids, [])
954
955 def _check_article_body(self, lines):
956 self.assertEqual(len(lines), 4)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +0000957 self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000958 self.assertEqual(lines[-2], b"")
959 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
960 self.assertEqual(lines[-4], b"This is just a test article.")
961
962 def _check_article_head(self, lines):
963 self.assertEqual(len(lines), 4)
964 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
965 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
966
967 def _check_article_data(self, lines):
968 self.assertEqual(len(lines), 9)
969 self._check_article_head(lines[:4])
970 self._check_article_body(lines[-4:])
971 self.assertEqual(lines[4], b"")
972
973 def test_article(self):
974 # ARTICLE
975 resp, info = self.server.article()
976 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
977 art_num, message_id, lines = info
978 self.assertEqual(art_num, 3000237)
979 self.assertEqual(message_id, "<45223423@example.com>")
980 self._check_article_data(lines)
981 # ARTICLE num
982 resp, info = self.server.article(3000234)
983 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
984 art_num, message_id, lines = info
985 self.assertEqual(art_num, 3000234)
986 self.assertEqual(message_id, "<45223423@example.com>")
987 self._check_article_data(lines)
988 # ARTICLE id
989 resp, info = self.server.article("<45223423@example.com>")
990 self.assertEqual(resp, "220 0 <45223423@example.com>")
991 art_num, message_id, lines = info
992 self.assertEqual(art_num, 0)
993 self.assertEqual(message_id, "<45223423@example.com>")
994 self._check_article_data(lines)
995 # Non-existent id
996 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
997 self.server.article("<non-existent@example.com>")
998 self.assertEqual(cm.exception.response, "430 No Such Article Found")
999
1000 def test_article_file(self):
1001 # With a "file" argument
1002 f = io.BytesIO()
1003 resp, info = self.server.article(file=f)
1004 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
1005 art_num, message_id, lines = info
1006 self.assertEqual(art_num, 3000237)
1007 self.assertEqual(message_id, "<45223423@example.com>")
1008 self.assertEqual(lines, [])
1009 data = f.getvalue()
1010 self.assertTrue(data.startswith(
1011 b'From: "Demo User" <nobody@example.net>\r\n'
1012 b'Subject: I am just a test article\r\n'
1013 ), ascii(data))
1014 self.assertTrue(data.endswith(
1015 b'This is just a test article.\r\n'
1016 b'.Here is a dot-starting line.\r\n'
1017 b'\r\n'
1018 b'-- Signed by Andr\xc3\xa9.\r\n'
1019 ), ascii(data))
1020
1021 def test_head(self):
1022 # HEAD
1023 resp, info = self.server.head()
1024 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1025 art_num, message_id, lines = info
1026 self.assertEqual(art_num, 3000237)
1027 self.assertEqual(message_id, "<45223423@example.com>")
1028 self._check_article_head(lines)
1029 # HEAD num
1030 resp, info = self.server.head(3000234)
1031 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
1032 art_num, message_id, lines = info
1033 self.assertEqual(art_num, 3000234)
1034 self.assertEqual(message_id, "<45223423@example.com>")
1035 self._check_article_head(lines)
1036 # HEAD id
1037 resp, info = self.server.head("<45223423@example.com>")
1038 self.assertEqual(resp, "221 0 <45223423@example.com>")
1039 art_num, message_id, lines = info
1040 self.assertEqual(art_num, 0)
1041 self.assertEqual(message_id, "<45223423@example.com>")
1042 self._check_article_head(lines)
1043 # Non-existent id
1044 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1045 self.server.head("<non-existent@example.com>")
1046 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1047
Antoine Pitrou2640b522012-02-15 18:53:18 +01001048 def test_head_file(self):
1049 f = io.BytesIO()
1050 resp, info = self.server.head(file=f)
1051 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1052 art_num, message_id, lines = info
1053 self.assertEqual(art_num, 3000237)
1054 self.assertEqual(message_id, "<45223423@example.com>")
1055 self.assertEqual(lines, [])
1056 data = f.getvalue()
1057 self.assertTrue(data.startswith(
1058 b'From: "Demo User" <nobody@example.net>\r\n'
1059 b'Subject: I am just a test article\r\n'
1060 ), ascii(data))
1061 self.assertFalse(data.endswith(
1062 b'This is just a test article.\r\n'
1063 b'.Here is a dot-starting line.\r\n'
1064 b'\r\n'
1065 b'-- Signed by Andr\xc3\xa9.\r\n'
1066 ), ascii(data))
1067
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001068 def test_body(self):
1069 # BODY
1070 resp, info = self.server.body()
1071 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1072 art_num, message_id, lines = info
1073 self.assertEqual(art_num, 3000237)
1074 self.assertEqual(message_id, "<45223423@example.com>")
1075 self._check_article_body(lines)
1076 # BODY num
1077 resp, info = self.server.body(3000234)
1078 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
1079 art_num, message_id, lines = info
1080 self.assertEqual(art_num, 3000234)
1081 self.assertEqual(message_id, "<45223423@example.com>")
1082 self._check_article_body(lines)
1083 # BODY id
1084 resp, info = self.server.body("<45223423@example.com>")
1085 self.assertEqual(resp, "222 0 <45223423@example.com>")
1086 art_num, message_id, lines = info
1087 self.assertEqual(art_num, 0)
1088 self.assertEqual(message_id, "<45223423@example.com>")
1089 self._check_article_body(lines)
1090 # Non-existent id
1091 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1092 self.server.body("<non-existent@example.com>")
1093 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1094
Antoine Pitrou2640b522012-02-15 18:53:18 +01001095 def test_body_file(self):
1096 f = io.BytesIO()
1097 resp, info = self.server.body(file=f)
1098 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1099 art_num, message_id, lines = info
1100 self.assertEqual(art_num, 3000237)
1101 self.assertEqual(message_id, "<45223423@example.com>")
1102 self.assertEqual(lines, [])
1103 data = f.getvalue()
1104 self.assertFalse(data.startswith(
1105 b'From: "Demo User" <nobody@example.net>\r\n'
1106 b'Subject: I am just a test article\r\n'
1107 ), ascii(data))
1108 self.assertTrue(data.endswith(
1109 b'This is just a test article.\r\n'
1110 b'.Here is a dot-starting line.\r\n'
1111 b'\r\n'
1112 b'-- Signed by Andr\xc3\xa9.\r\n'
1113 ), ascii(data))
1114
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001115 def check_over_xover_resp(self, resp, overviews):
1116 self.assertTrue(resp.startswith("224 "), resp)
1117 self.assertEqual(len(overviews), 3)
1118 art_num, over = overviews[0]
1119 self.assertEqual(art_num, 57)
1120 self.assertEqual(over, {
1121 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
1122 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
1123 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
1124 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
1125 "references": "<hvalf7$ort$1@dough.gmane.org>",
1126 ":bytes": "7103",
1127 ":lines": "16",
Dong-hee Na2e6a8ef2020-01-09 00:29:34 +09001128 "xref": "news.gmane.io gmane.comp.python.authors:57"
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001129 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001130 art_num, over = overviews[1]
1131 self.assertEqual(over["xref"], None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001132 art_num, over = overviews[2]
1133 self.assertEqual(over["subject"],
1134 "Re: Message d'erreur incompréhensible (par moi)")
1135
1136 def test_xover(self):
1137 resp, overviews = self.server.xover(57, 59)
1138 self.check_over_xover_resp(resp, overviews)
1139
1140 def test_over(self):
1141 # In NNTP "v1", this will fallback on XOVER
1142 resp, overviews = self.server.over((57, 59))
1143 self.check_over_xover_resp(resp, overviews)
1144
1145 sample_post = (
1146 b'From: "Demo User" <nobody@example.net>\r\n'
1147 b'Subject: I am just a test article\r\n'
1148 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
1149 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
1150 b'\r\n'
1151 b'This is just a test article.\r\n'
1152 b'.Here is a dot-starting line.\r\n'
1153 b'\r\n'
1154 b'-- Signed by Andr\xc3\xa9.\r\n'
1155 )
1156
1157 def _check_posted_body(self):
1158 # Check the raw body as received by the server
1159 lines = self.handler.posted_body
1160 # One additional line for the "." terminator
1161 self.assertEqual(len(lines), 10)
1162 self.assertEqual(lines[-1], b'.\r\n')
1163 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
1164 self.assertEqual(lines[-3], b'\r\n')
1165 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
1166 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
1167
1168 def _check_post_ihave_sub(self, func, *args, file_factory):
1169 # First the prepared post with CRLF endings
1170 post = self.sample_post
1171 func_args = args + (file_factory(post),)
1172 self.handler.posted_body = None
1173 resp = func(*func_args)
1174 self._check_posted_body()
1175 # Then the same post with "normal" line endings - they should be
1176 # converted by NNTP.post and NNTP.ihave.
1177 post = self.sample_post.replace(b"\r\n", b"\n")
1178 func_args = args + (file_factory(post),)
1179 self.handler.posted_body = None
1180 resp = func(*func_args)
1181 self._check_posted_body()
1182 return resp
1183
1184 def check_post_ihave(self, func, success_resp, *args):
1185 # With a bytes object
1186 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1187 self.assertEqual(resp, success_resp)
1188 # With a bytearray object
1189 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1190 self.assertEqual(resp, success_resp)
1191 # With a file object
1192 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1193 self.assertEqual(resp, success_resp)
1194 # With an iterable of terminated lines
1195 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001196 return iter(b.splitlines(keepends=True))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001197 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1198 self.assertEqual(resp, success_resp)
1199 # With an iterable of non-terminated lines
1200 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001201 return iter(b.splitlines(keepends=False))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001202 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1203 self.assertEqual(resp, success_resp)
1204
1205 def test_post(self):
1206 self.check_post_ihave(self.server.post, "240 Article received OK")
1207 self.handler.allow_posting = False
1208 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1209 self.server.post(self.sample_post)
1210 self.assertEqual(cm.exception.response,
1211 "440 Posting not permitted")
1212
1213 def test_ihave(self):
1214 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1215 "<i.am.an.article.you.will.want@example.com>")
1216 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1217 self.server.ihave("<another.message.id>", self.sample_post)
1218 self.assertEqual(cm.exception.response,
1219 "435 Article not wanted")
1220
Georg Brandl28e78412013-10-27 07:29:47 +01001221 def test_too_long_lines(self):
1222 dt = datetime.datetime(2010, 1, 1, 9, 0, 0)
1223 self.assertRaises(nntplib.NNTPDataError,
1224 self.server.newnews, "comp.lang.python", dt)
1225
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001226
1227class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1228 """Tests an NNTP v1 server (no capabilities)."""
1229
1230 nntp_version = 1
1231 handler_class = NNTPv1Handler
1232
1233 def test_caps(self):
1234 caps = self.server.getcapabilities()
1235 self.assertEqual(caps, {})
1236 self.assertEqual(self.server.nntp_version, 1)
Antoine Pitroua0781152010-11-05 19:16:37 +00001237 self.assertEqual(self.server.nntp_implementation, None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001238
1239
1240class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1241 """Tests an NNTP v2 server (with capabilities)."""
1242
1243 nntp_version = 2
1244 handler_class = NNTPv2Handler
1245
1246 def test_caps(self):
1247 caps = self.server.getcapabilities()
1248 self.assertEqual(caps, {
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001249 'VERSION': ['2', '3'],
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001250 'IMPLEMENTATION': ['INN', '2.5.1'],
1251 'AUTHINFO': ['USER'],
1252 'HDR': [],
1253 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1254 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1255 'OVER': [],
1256 'POST': [],
1257 'READER': [],
1258 })
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001259 self.assertEqual(self.server.nntp_version, 3)
Antoine Pitroua0781152010-11-05 19:16:37 +00001260 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001261
1262
Antoine Pitrou54411c12012-02-12 19:14:17 +01001263class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
1264 """Tests a probably NNTP v2 server with capabilities only after login."""
1265
1266 nntp_version = 2
1267 handler_class = CapsAfterLoginNNTPv2Handler
1268
1269 def test_caps_only_after_login(self):
1270 self.assertEqual(self.server._caps, {})
1271 self.server.login('testuser', 'testpw')
1272 self.assertIn('VERSION', self.server._caps)
1273
1274
Antoine Pitrou71135622012-02-14 23:29:34 +01001275class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin,
1276 unittest.TestCase):
1277 """Same tests as for v2 but we tell NTTP to send MODE READER to a server
1278 that isn't in READER mode by default."""
1279
1280 nntp_version = 2
1281 handler_class = ModeSwitchingNNTPv2Handler
1282
1283 def test_we_are_in_reader_mode_after_connect(self):
1284 self.assertIn('READER', self.server._caps)
1285
1286
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001287class MiscTests(unittest.TestCase):
1288
1289 def test_decode_header(self):
1290 def gives(a, b):
1291 self.assertEqual(nntplib.decode_header(a), b)
1292 gives("" , "")
1293 gives("a plain header", "a plain header")
1294 gives(" with extra spaces ", " with extra spaces ")
1295 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
1296 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1297 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1298 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1299 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1300 "Re: problème de matrice")
1301 # A natively utf-8 header (found in the real world!)
1302 gives("Re: Message d'erreur incompréhensible (par moi)",
1303 "Re: Message d'erreur incompréhensible (par moi)")
1304
1305 def test_parse_overview_fmt(self):
1306 # The minimal (default) response
1307 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1308 "References:", ":bytes", ":lines"]
1309 self.assertEqual(nntplib._parse_overview_fmt(lines),
1310 ["subject", "from", "date", "message-id", "references",
1311 ":bytes", ":lines"])
1312 # The minimal response using alternative names
1313 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1314 "References:", "Bytes:", "Lines:"]
1315 self.assertEqual(nntplib._parse_overview_fmt(lines),
1316 ["subject", "from", "date", "message-id", "references",
1317 ":bytes", ":lines"])
1318 # Variations in casing
1319 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1320 "References:", "BYTES:", "Lines:"]
1321 self.assertEqual(nntplib._parse_overview_fmt(lines),
1322 ["subject", "from", "date", "message-id", "references",
1323 ":bytes", ":lines"])
1324 # First example from RFC 3977
1325 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1326 "References:", ":bytes", ":lines", "Xref:full",
1327 "Distribution:full"]
1328 self.assertEqual(nntplib._parse_overview_fmt(lines),
1329 ["subject", "from", "date", "message-id", "references",
1330 ":bytes", ":lines", "xref", "distribution"])
1331 # Second example from RFC 3977
1332 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1333 "References:", "Bytes:", "Lines:", "Xref:FULL",
1334 "Distribution:FULL"]
1335 self.assertEqual(nntplib._parse_overview_fmt(lines),
1336 ["subject", "from", "date", "message-id", "references",
1337 ":bytes", ":lines", "xref", "distribution"])
1338 # A classic response from INN
1339 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1340 "References:", "Bytes:", "Lines:", "Xref:full"]
1341 self.assertEqual(nntplib._parse_overview_fmt(lines),
1342 ["subject", "from", "date", "message-id", "references",
1343 ":bytes", ":lines", "xref"])
1344
1345 def test_parse_overview(self):
1346 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1347 # First example from RFC 3977
1348 lines = [
1349 '3000234\tI am just a test article\t"Demo User" '
1350 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1351 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1352 '17\tXref: news.example.com misc.test:3000363',
1353 ]
1354 overview = nntplib._parse_overview(lines, fmt)
1355 (art_num, fields), = overview
1356 self.assertEqual(art_num, 3000234)
1357 self.assertEqual(fields, {
1358 'subject': 'I am just a test article',
1359 'from': '"Demo User" <nobody@example.com>',
1360 'date': '6 Oct 1998 04:38:40 -0500',
1361 'message-id': '<45223423@example.com>',
1362 'references': '<45454@example.net>',
1363 ':bytes': '1234',
1364 ':lines': '17',
1365 'xref': 'news.example.com misc.test:3000363',
1366 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001367 # Second example; here the "Xref" field is totally absent (including
1368 # the header name) and comes out as None
1369 lines = [
1370 '3000234\tI am just a test article\t"Demo User" '
1371 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1372 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1373 '17\t\t',
1374 ]
1375 overview = nntplib._parse_overview(lines, fmt)
1376 (art_num, fields), = overview
1377 self.assertEqual(fields['xref'], None)
1378 # Third example; the "Xref" is an empty string, while "references"
1379 # is a single space.
1380 lines = [
1381 '3000234\tI am just a test article\t"Demo User" '
1382 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1383 '<45223423@example.com>\t \t1234\t'
1384 '17\tXref: \t',
1385 ]
1386 overview = nntplib._parse_overview(lines, fmt)
1387 (art_num, fields), = overview
1388 self.assertEqual(fields['references'], ' ')
1389 self.assertEqual(fields['xref'], '')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001390
1391 def test_parse_datetime(self):
1392 def gives(a, b, *c):
1393 self.assertEqual(nntplib._parse_datetime(a, b),
1394 datetime.datetime(*c))
1395 # Output of DATE command
1396 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1397 # Variations
1398 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1399 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1400 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1401
1402 def test_unparse_datetime(self):
1403 # Test non-legacy mode
1404 # 1) with a datetime
1405 def gives(y, M, d, h, m, s, date_str, time_str):
1406 dt = datetime.datetime(y, M, d, h, m, s)
1407 self.assertEqual(nntplib._unparse_datetime(dt),
1408 (date_str, time_str))
1409 self.assertEqual(nntplib._unparse_datetime(dt, False),
1410 (date_str, time_str))
1411 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1412 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1413 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1414 # 2) with a date
1415 def gives(y, M, d, date_str, time_str):
1416 dt = datetime.date(y, M, d)
1417 self.assertEqual(nntplib._unparse_datetime(dt),
1418 (date_str, time_str))
1419 self.assertEqual(nntplib._unparse_datetime(dt, False),
1420 (date_str, time_str))
1421 gives(1999, 6, 23, "19990623", "000000")
1422 gives(2000, 6, 23, "20000623", "000000")
1423 gives(2010, 6, 5, "20100605", "000000")
1424
1425 def test_unparse_datetime_legacy(self):
1426 # Test legacy mode (RFC 977)
1427 # 1) with a datetime
1428 def gives(y, M, d, h, m, s, date_str, time_str):
1429 dt = datetime.datetime(y, M, d, h, m, s)
1430 self.assertEqual(nntplib._unparse_datetime(dt, True),
1431 (date_str, time_str))
1432 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1433 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1434 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1435 # 2) with a date
1436 def gives(y, M, d, date_str, time_str):
1437 dt = datetime.date(y, M, d)
1438 self.assertEqual(nntplib._unparse_datetime(dt, True),
1439 (date_str, time_str))
1440 gives(1999, 6, 23, "990623", "000000")
1441 gives(2000, 6, 23, "000623", "000000")
1442 gives(2010, 6, 5, "100605", "000000")
1443
Serhiy Storchaka43767632013-11-03 21:31:38 +02001444 @unittest.skipUnless(ssl, 'requires SSL support')
1445 def test_ssl_support(self):
1446 self.assertTrue(hasattr(nntplib, 'NNTP_SSL'))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001447
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001448
Berker Peksag96756b62014-09-20 08:53:05 +03001449class PublicAPITests(unittest.TestCase):
1450 """Ensures that the correct values are exposed in the public API."""
1451
1452 def test_module_all_attribute(self):
1453 self.assertTrue(hasattr(nntplib, '__all__'))
1454 target_api = ['NNTP', 'NNTPError', 'NNTPReplyError',
1455 'NNTPTemporaryError', 'NNTPPermanentError',
1456 'NNTPProtocolError', 'NNTPDataError', 'decode_header']
1457 if ssl is not None:
1458 target_api.append('NNTP_SSL')
1459 self.assertEqual(set(nntplib.__all__), set(target_api))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001460
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001461class MockSocketTests(unittest.TestCase):
1462 """Tests involving a mock socket object
1463
1464 Used where the _NNTPServerIO file object is not enough."""
1465
1466 nntp_class = nntplib.NNTP
1467
1468 def check_constructor_error_conditions(
1469 self, handler_class,
1470 expected_error_type, expected_error_msg,
1471 login=None, password=None):
1472
1473 class mock_socket_module:
1474 def create_connection(address, timeout):
1475 return MockSocket()
1476
1477 class MockSocket:
1478 def close(self):
1479 nonlocal socket_closed
1480 socket_closed = True
1481
1482 def makefile(socket, mode):
1483 handler = handler_class()
1484 _, file = make_mock_file(handler)
1485 files.append(file)
1486 return file
1487
1488 socket_closed = False
1489 files = []
1490 with patch('nntplib.socket', mock_socket_module), \
1491 self.assertRaisesRegex(expected_error_type, expected_error_msg):
1492 self.nntp_class('dummy', user=login, password=password)
1493 self.assertTrue(socket_closed)
1494 for f in files:
1495 self.assertTrue(f.closed)
1496
1497 def test_bad_welcome(self):
1498 #Test a bad welcome message
1499 class Handler(NNTPv1Handler):
1500 welcome = 'Bad Welcome'
1501 self.check_constructor_error_conditions(
1502 Handler, nntplib.NNTPProtocolError, Handler.welcome)
1503
1504 def test_service_temporarily_unavailable(self):
1505 #Test service temporarily unavailable
1506 class Handler(NNTPv1Handler):
Martin Pantereb995702016-07-28 01:11:04 +00001507 welcome = '400 Service temporarily unavailable'
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001508 self.check_constructor_error_conditions(
1509 Handler, nntplib.NNTPTemporaryError, Handler.welcome)
1510
1511 def test_service_permanently_unavailable(self):
1512 #Test service permanently unavailable
1513 class Handler(NNTPv1Handler):
Martin Pantereb995702016-07-28 01:11:04 +00001514 welcome = '502 Service permanently unavailable'
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001515 self.check_constructor_error_conditions(
1516 Handler, nntplib.NNTPPermanentError, Handler.welcome)
1517
1518 def test_bad_capabilities(self):
1519 #Test a bad capabilities response
1520 class Handler(NNTPv1Handler):
1521 def handle_CAPABILITIES(self):
1522 self.push_lit(capabilities_response)
1523 capabilities_response = '201 bad capability'
1524 self.check_constructor_error_conditions(
1525 Handler, nntplib.NNTPReplyError, capabilities_response)
1526
1527 def test_login_aborted(self):
1528 #Test a bad authinfo response
1529 login = 't@e.com'
1530 password = 'python'
1531 class Handler(NNTPv1Handler):
1532 def handle_AUTHINFO(self, *args):
1533 self.push_lit(authinfo_response)
1534 authinfo_response = '503 Mechanism not recognized'
1535 self.check_constructor_error_conditions(
1536 Handler, nntplib.NNTPPermanentError, authinfo_response,
1537 login, password)
1538
Serhiy Storchaka80774342015-04-03 15:02:20 +03001539class bypass_context:
1540 """Bypass encryption and actual SSL module"""
1541 def wrap_socket(sock, **args):
1542 return sock
1543
1544@unittest.skipUnless(ssl, 'requires SSL support')
1545class MockSslTests(MockSocketTests):
1546 @staticmethod
1547 def nntp_class(*pos, **kw):
1548 return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw)
Victor Stinner8c9bba02015-04-03 11:06:40 +02001549
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001550
Martin Panter8f19e8e2016-01-19 01:10:58 +00001551class LocalServerTests(unittest.TestCase):
1552 def setUp(self):
1553 sock = socket.socket()
1554 port = support.bind_port(sock)
1555 sock.listen()
1556 self.background = threading.Thread(
1557 target=self.run_server, args=(sock,))
1558 self.background.start()
1559 self.addCleanup(self.background.join)
1560
1561 self.nntp = NNTP(support.HOST, port, usenetrc=False).__enter__()
1562 self.addCleanup(self.nntp.__exit__, None, None, None)
1563
1564 def run_server(self, sock):
1565 # Could be generalized to handle more commands in separate methods
1566 with sock:
1567 [client, _] = sock.accept()
1568 with contextlib.ExitStack() as cleanup:
1569 cleanup.enter_context(client)
1570 reader = cleanup.enter_context(client.makefile('rb'))
1571 client.sendall(b'200 Server ready\r\n')
1572 while True:
1573 cmd = reader.readline()
1574 if cmd == b'CAPABILITIES\r\n':
1575 client.sendall(
1576 b'101 Capability list:\r\n'
1577 b'VERSION 2\r\n'
1578 b'STARTTLS\r\n'
1579 b'.\r\n'
1580 )
1581 elif cmd == b'STARTTLS\r\n':
1582 reader.close()
1583 client.sendall(b'382 Begin TLS negotiation now\r\n')
Christian Heimesd0486372016-09-10 23:23:33 +02001584 context = ssl.SSLContext()
1585 context.load_cert_chain(certfile)
1586 client = context.wrap_socket(
1587 client, server_side=True)
Martin Panter8f19e8e2016-01-19 01:10:58 +00001588 cleanup.enter_context(client)
1589 reader = cleanup.enter_context(client.makefile('rb'))
1590 elif cmd == b'QUIT\r\n':
1591 client.sendall(b'205 Bye!\r\n')
1592 break
1593 else:
1594 raise ValueError('Unexpected command {!r}'.format(cmd))
1595
1596 @unittest.skipUnless(ssl, 'requires SSL support')
1597 def test_starttls(self):
1598 file = self.nntp.file
1599 sock = self.nntp.sock
1600 self.nntp.starttls()
1601 # Check that the socket and internal pseudo-file really were
1602 # changed.
1603 self.assertNotEqual(file, self.nntp.file)
1604 self.assertNotEqual(sock, self.nntp.sock)
1605 # Check that the new socket really is an SSL one
1606 self.assertIsInstance(self.nntp.sock, ssl.SSLSocket)
1607 # Check that trying starttls when it's already active fails.
1608 self.assertRaises(ValueError, self.nntp.starttls)
1609
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001610
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001611if __name__ == "__main__":
Berker Peksag96756b62014-09-20 08:53:05 +03001612 unittest.main()