blob: 2a5a0b97eea63457ae8bd892486c91ad2cf8d143 [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +00002import socket
Antoine Pitrou69ab9512010-09-29 15:03:40 +00003import datetime
4import textwrap
5import unittest
Antoine Pitroude609182010-11-18 17:29:23 +00006import functools
Antoine Pitrou69ab9512010-09-29 15:03:40 +00007import contextlib
Martin Panter8f19e8e2016-01-19 01:10:58 +00008import os.path
Gregory P. Smith2cc02232019-05-06 17:54:06 -04009import re
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020010import threading
11
Antoine Pitrou69ab9512010-09-29 15:03:40 +000012from test import support
Serhiy Storchaka16994912020-04-25 10:06:29 +030013from test.support import socket_helper
Serhiy Storchaka43767632013-11-03 21:31:38 +020014from nntplib import NNTP, GroupInfo
Antoine Pitrou69ab9512010-09-29 15:03:40 +000015import nntplib
Serhiy Storchaka52027c32015-03-21 09:40:26 +020016from unittest.mock import patch
Serhiy Storchaka43767632013-11-03 21:31:38 +020017try:
Antoine Pitrou1cb121e2010-11-09 18:54:37 +000018 import ssl
Serhiy Storchaka43767632013-11-03 21:31:38 +020019except ImportError:
20 ssl = None
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020021
Antoine Pitrou69ab9512010-09-29 15:03:40 +000022
Martin Panter8f19e8e2016-01-19 01:10:58 +000023certfile = os.path.join(os.path.dirname(__file__), 'keycert3.pem')
Antoine Pitrou69ab9512010-09-29 15:03:40 +000024
Gregory P. Smith2cc02232019-05-06 17:54:06 -040025if ssl is not None:
26 SSLError = ssl.SSLError
27else:
28 class SSLError(Exception):
29 """Non-existent exception class when we lack SSL support."""
30 reason = "This will never be raised."
31
Antoine Pitrou69ab9512010-09-29 15:03:40 +000032# TODO:
33# - test the `file` arg to more commands
34# - test error conditions
Antoine Pitroua5785b12010-09-29 16:19:50 +000035# - test auth and `usenetrc`
Antoine Pitrou69ab9512010-09-29 15:03:40 +000036
37
38class NetworkedNNTPTestsMixin:
39
40 def test_welcome(self):
41 welcome = self.server.getwelcome()
42 self.assertEqual(str, type(welcome))
43
44 def test_help(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000045 resp, lines = self.server.help()
Antoine Pitrou69ab9512010-09-29 15:03:40 +000046 self.assertTrue(resp.startswith("100 "), resp)
Antoine Pitrou08eeada2010-11-04 21:36:15 +000047 for line in lines:
Antoine Pitrou69ab9512010-09-29 15:03:40 +000048 self.assertEqual(str, type(line))
49
50 def test_list(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000051 resp, groups = self.server.list()
52 if len(groups) > 0:
53 self.assertEqual(GroupInfo, type(groups[0]))
54 self.assertEqual(str, type(groups[0].group))
55
56 def test_list_active(self):
57 resp, groups = self.server.list(self.GROUP_PAT)
58 if len(groups) > 0:
59 self.assertEqual(GroupInfo, type(groups[0]))
60 self.assertEqual(str, type(groups[0].group))
Antoine Pitrou69ab9512010-09-29 15:03:40 +000061
62 def test_unknown_command(self):
63 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
64 self.server._shortcmd("XYZZY")
65 resp = cm.exception.response
66 self.assertTrue(resp.startswith("500 "), resp)
67
68 def test_newgroups(self):
69 # gmane gets a constant influx of new groups. In order not to stress
70 # the server too much, we choose a recent date in the past.
71 dt = datetime.date.today() - datetime.timedelta(days=7)
72 resp, groups = self.server.newgroups(dt)
73 if len(groups) > 0:
74 self.assertIsInstance(groups[0], GroupInfo)
75 self.assertIsInstance(groups[0].group, str)
76
77 def test_description(self):
78 def _check_desc(desc):
79 # Sanity checks
80 self.assertIsInstance(desc, str)
81 self.assertNotIn(self.GROUP_NAME, desc)
82 desc = self.server.description(self.GROUP_NAME)
83 _check_desc(desc)
84 # Another sanity check
85 self.assertIn("Python", desc)
86 # With a pattern
87 desc = self.server.description(self.GROUP_PAT)
88 _check_desc(desc)
89 # Shouldn't exist
90 desc = self.server.description("zk.brrtt.baz")
91 self.assertEqual(desc, '')
92
93 def test_descriptions(self):
94 resp, descs = self.server.descriptions(self.GROUP_PAT)
95 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
96 self.assertTrue(
97 resp.startswith("215 ") or resp.startswith("282 "), resp)
98 self.assertIsInstance(descs, dict)
99 desc = descs[self.GROUP_NAME]
100 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
101
102 def test_group(self):
103 result = self.server.group(self.GROUP_NAME)
104 self.assertEqual(5, len(result))
105 resp, count, first, last, group = result
106 self.assertEqual(group, self.GROUP_NAME)
107 self.assertIsInstance(count, int)
108 self.assertIsInstance(first, int)
109 self.assertIsInstance(last, int)
110 self.assertLessEqual(first, last)
111 self.assertTrue(resp.startswith("211 "), resp)
112
113 def test_date(self):
114 resp, date = self.server.date()
115 self.assertIsInstance(date, datetime.datetime)
116 # Sanity check
117 self.assertGreaterEqual(date.year, 1995)
118 self.assertLessEqual(date.year, 2030)
119
120 def _check_art_dict(self, art_dict):
121 # Some sanity checks for a field dictionary returned by OVER / XOVER
122 self.assertIsInstance(art_dict, dict)
123 # NNTP has 7 mandatory fields
124 self.assertGreaterEqual(art_dict.keys(),
125 {"subject", "from", "date", "message-id",
126 "references", ":bytes", ":lines"}
127 )
128 for v in art_dict.values():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000129 self.assertIsInstance(v, (str, type(None)))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000130
131 def test_xover(self):
132 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000133 resp, lines = self.server.xover(last - 5, last)
134 if len(lines) == 0:
135 self.skipTest("no articles retrieved")
136 # The 'last' article is not necessarily part of the output (cancelled?)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000137 art_num, art_dict = lines[0]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000138 self.assertGreaterEqual(art_num, last - 5)
139 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000140 self._check_art_dict(art_dict)
141
Xavier de Gayeac13bee2016-12-16 20:49:10 +0100142 @unittest.skipIf(True, 'temporarily skipped until a permanent solution'
143 ' is found for issue #28971')
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000144 def test_over(self):
145 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
146 start = last - 10
147 # The "start-" article range form
148 resp, lines = self.server.over((start, None))
149 art_num, art_dict = lines[0]
150 self._check_art_dict(art_dict)
151 # The "start-end" article range form
152 resp, lines = self.server.over((start, last))
153 art_num, art_dict = lines[-1]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000154 # The 'last' article is not necessarily part of the output (cancelled?)
155 self.assertGreaterEqual(art_num, start)
156 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000157 self._check_art_dict(art_dict)
158 # XXX The "message_id" form is unsupported by gmane
159 # 503 Overview by message-ID unsupported
160
161 def test_xhdr(self):
162 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
163 resp, lines = self.server.xhdr('subject', last)
164 for line in lines:
165 self.assertEqual(str, type(line[1]))
166
167 def check_article_resp(self, resp, article, art_num=None):
168 self.assertIsInstance(article, nntplib.ArticleInfo)
169 if art_num is not None:
170 self.assertEqual(article.number, art_num)
171 for line in article.lines:
172 self.assertIsInstance(line, bytes)
173 # XXX this could exceptionally happen...
174 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
175
Victor Stinner706cb312017-11-25 02:42:18 +0100176 @unittest.skipIf(True, "FIXME: see bpo-32128")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000177 def test_article_head_body(self):
178 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000179 # Try to find an available article
180 for art_num in (last, first, last - 1):
181 try:
182 resp, head = self.server.head(art_num)
183 except nntplib.NNTPTemporaryError as e:
184 if not e.response.startswith("423 "):
185 raise
186 # "423 No such article" => choose another one
187 continue
188 break
189 else:
190 self.skipTest("could not find a suitable article number")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000191 self.assertTrue(resp.startswith("221 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000192 self.check_article_resp(resp, head, art_num)
193 resp, body = self.server.body(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000194 self.assertTrue(resp.startswith("222 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000195 self.check_article_resp(resp, body, art_num)
196 resp, article = self.server.article(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000197 self.assertTrue(resp.startswith("220 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000198 self.check_article_resp(resp, article, art_num)
Nick Coghlan14d99a12012-06-17 21:27:18 +1000199 # Tolerate running the tests from behind a NNTP virus checker
Antoine Pitrou1f5d2a02012-06-24 16:28:18 +0200200 blacklist = lambda line: line.startswith(b'X-Antivirus')
201 filtered_head_lines = [line for line in head.lines
202 if not blacklist(line)]
Nick Coghlan14d99a12012-06-17 21:27:18 +1000203 filtered_lines = [line for line in article.lines
Antoine Pitrou1f5d2a02012-06-24 16:28:18 +0200204 if not blacklist(line)]
205 self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000206
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000207 def test_capabilities(self):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000208 # The server under test implements NNTP version 2 and has a
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000209 # couple of well-known capabilities. Just sanity check that we
210 # got them.
211 def _check_caps(caps):
212 caps_list = caps['LIST']
213 self.assertIsInstance(caps_list, (list, tuple))
214 self.assertIn('OVERVIEW.FMT', caps_list)
215 self.assertGreaterEqual(self.server.nntp_version, 2)
216 _check_caps(self.server.getcapabilities())
217 # This re-emits the command
218 resp, caps = self.server.capabilities()
219 _check_caps(caps)
220
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000221 def test_zlogin(self):
222 # This test must be the penultimate because further commands will be
223 # refused.
224 baduser = "notarealuser"
225 badpw = "notarealpassword"
226 # Check that bogus credentials cause failure
227 self.assertRaises(nntplib.NNTPError, self.server.login,
228 user=baduser, password=badpw, usenetrc=False)
229 # FIXME: We should check that correct credentials succeed, but that
230 # would require valid details for some server somewhere to be in the
231 # test suite, I think. Gmane is anonymous, at least as used for the
232 # other tests.
233
234 def test_zzquit(self):
235 # This test must be called last, hence the name
236 cls = type(self)
Antoine Pitrou3bce11c2010-11-21 17:14:19 +0000237 try:
238 self.server.quit()
239 finally:
240 cls.server = None
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000241
Antoine Pitroude609182010-11-18 17:29:23 +0000242 @classmethod
243 def wrap_methods(cls):
244 # Wrap all methods in a transient_internet() exception catcher
245 # XXX put a generic version in test.support?
246 def wrap_meth(meth):
247 @functools.wraps(meth)
248 def wrapped(self):
249 with support.transient_internet(self.NNTP_HOST):
250 meth(self)
251 return wrapped
252 for name in dir(cls):
253 if not name.startswith('test_'):
254 continue
255 meth = getattr(cls, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200256 if not callable(meth):
Antoine Pitroude609182010-11-18 17:29:23 +0000257 continue
258 # Need to use a closure so that meth remains bound to its current
259 # value
260 setattr(cls, name, wrap_meth(meth))
261
Dong-hee Na1b335ae2020-01-12 02:39:15 +0900262 def test_timeout(self):
263 with self.assertRaises(ValueError):
264 self.NNTP_CLASS(self.NNTP_HOST, timeout=0, usenetrc=False)
265
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000266 def test_with_statement(self):
267 def is_connected():
268 if not hasattr(server, 'file'):
269 return False
270 try:
271 server.help()
Andrew Svetlov0832af62012-12-18 23:10:48 +0200272 except (OSError, EOFError):
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000273 return False
274 return True
275
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400276 try:
Victor Stinner1d0f9b32019-12-10 22:09:23 +0100277 server = self.NNTP_CLASS(self.NNTP_HOST,
278 timeout=support.INTERNET_TIMEOUT,
279 usenetrc=False)
280 with server:
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400281 self.assertTrue(is_connected())
282 self.assertTrue(server.help())
283 self.assertFalse(is_connected())
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000284
Victor Stinner1d0f9b32019-12-10 22:09:23 +0100285 server = self.NNTP_CLASS(self.NNTP_HOST,
286 timeout=support.INTERNET_TIMEOUT,
287 usenetrc=False)
288 with server:
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400289 server.quit()
290 self.assertFalse(is_connected())
291 except SSLError as ssl_err:
292 # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
293 if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
294 raise unittest.SkipTest(f"Got {ssl_err} connecting "
295 f"to {self.NNTP_HOST!r}")
296 raise
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000297
298
Antoine Pitroude609182010-11-18 17:29:23 +0000299NetworkedNNTPTestsMixin.wrap_methods()
300
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000301
INADA Naoki067931d2017-07-26 23:43:22 +0900302EOF_ERRORS = (EOFError,)
Victor Stinner5b4feb72017-07-24 17:41:02 +0200303if ssl is not None:
INADA Naoki067931d2017-07-26 23:43:22 +0900304 EOF_ERRORS += (ssl.SSLEOFError,)
Victor Stinner5b4feb72017-07-24 17:41:02 +0200305
306
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000307class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
308 # This server supports STARTTLS (gmane doesn't)
309 NNTP_HOST = 'news.trigofacile.com'
310 GROUP_NAME = 'fr.comp.lang.python'
311 GROUP_PAT = 'fr.comp.lang.*'
312
Antoine Pitroude609182010-11-18 17:29:23 +0000313 NNTP_CLASS = NNTP
314
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000315 @classmethod
316 def setUpClass(cls):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000317 support.requires("network")
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000318 with support.transient_internet(cls.NNTP_HOST):
Victor Stinner5bccca52017-04-27 17:30:13 +0200319 try:
Victor Stinner1d0f9b32019-12-10 22:09:23 +0100320 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST,
321 timeout=support.INTERNET_TIMEOUT,
Victor Stinner5bccca52017-04-27 17:30:13 +0200322 usenetrc=False)
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400323 except SSLError as ssl_err:
324 # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
325 if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
326 raise unittest.SkipTest(f"{cls} got {ssl_err} connecting "
327 f"to {cls.NNTP_HOST!r}")
328 raise
Victor Stinner5b4feb72017-07-24 17:41:02 +0200329 except EOF_ERRORS:
Victor Stinner5bccca52017-04-27 17:30:13 +0200330 raise unittest.SkipTest(f"{cls} got EOF error on connecting "
331 f"to {cls.NNTP_HOST!r}")
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000332
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000333 @classmethod
334 def tearDownClass(cls):
335 if cls.server is not None:
336 cls.server.quit()
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000337
Serhiy Storchaka43767632013-11-03 21:31:38 +0200338@unittest.skipUnless(ssl, 'requires SSL support')
339class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000340
Serhiy Storchaka43767632013-11-03 21:31:38 +0200341 # Technical limits for this public NNTP server (see http://www.aioe.org):
342 # "Only two concurrent connections per IP address are allowed and
343 # 400 connections per day are accepted from each IP address."
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000344
Serhiy Storchaka43767632013-11-03 21:31:38 +0200345 NNTP_HOST = 'nntp.aioe.org'
346 GROUP_NAME = 'comp.lang.python'
347 GROUP_PAT = 'comp.lang.*'
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000348
Serhiy Storchaka43767632013-11-03 21:31:38 +0200349 NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None)
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000350
Serhiy Storchaka43767632013-11-03 21:31:38 +0200351 # Disabled as it produces too much data
352 test_list = None
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000353
Serhiy Storchaka43767632013-11-03 21:31:38 +0200354 # Disabled as the connection will already be encrypted.
355 test_starttls = None
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000356
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000357
358#
359# Non-networked tests using a local server (or something mocking it).
360#
361
362class _NNTPServerIO(io.RawIOBase):
363 """A raw IO object allowing NNTP commands to be received and processed
364 by a handler. The handler can push responses which can then be read
365 from the IO object."""
366
367 def __init__(self, handler):
368 io.RawIOBase.__init__(self)
369 # The channel from the client
370 self.c2s = io.BytesIO()
371 # The channel to the client
372 self.s2c = io.BytesIO()
373 self.handler = handler
374 self.handler.start(self.c2s.readline, self.push_data)
375
376 def readable(self):
377 return True
378
379 def writable(self):
380 return True
381
382 def push_data(self, data):
383 """Push (buffer) some data to send to the client."""
384 pos = self.s2c.tell()
385 self.s2c.seek(0, 2)
386 self.s2c.write(data)
387 self.s2c.seek(pos)
388
389 def write(self, b):
390 """The client sends us some data"""
391 pos = self.c2s.tell()
392 self.c2s.write(b)
393 self.c2s.seek(pos)
394 self.handler.process_pending()
395 return len(b)
396
397 def readinto(self, buf):
398 """The client wants to read a response"""
399 self.handler.process_pending()
400 b = self.s2c.read(len(buf))
401 n = len(b)
402 buf[:n] = b
403 return n
404
405
Serhiy Storchaka52027c32015-03-21 09:40:26 +0200406def make_mock_file(handler):
407 sio = _NNTPServerIO(handler)
408 # Using BufferedRWPair instead of BufferedRandom ensures the file
409 # isn't seekable.
410 file = io.BufferedRWPair(sio, sio)
411 return (sio, file)
412
413
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000414class MockedNNTPTestsMixin:
415 # Override in derived classes
416 handler_class = None
417
418 def setUp(self):
419 super().setUp()
420 self.make_server()
421
422 def tearDown(self):
423 super().tearDown()
424 del self.server
425
426 def make_server(self, *args, **kwargs):
427 self.handler = self.handler_class()
Serhiy Storchaka52027c32015-03-21 09:40:26 +0200428 self.sio, file = make_mock_file(self.handler)
Antoine Pitroua5785b12010-09-29 16:19:50 +0000429 self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000430 return self.server
431
432
Antoine Pitrou71135622012-02-14 23:29:34 +0100433class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin):
434 def setUp(self):
435 super().setUp()
436 self.make_server(readermode=True)
437
438
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000439class NNTPv1Handler:
440 """A handler for RFC 977"""
441
442 welcome = "200 NNTP mock server"
443
444 def start(self, readline, push_data):
445 self.in_body = False
446 self.allow_posting = True
447 self._readline = readline
448 self._push_data = push_data
Antoine Pitrou54411c12012-02-12 19:14:17 +0100449 self._logged_in = False
450 self._user_sent = False
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000451 # Our welcome
452 self.handle_welcome()
453
454 def _decode(self, data):
455 return str(data, "utf-8", "surrogateescape")
456
457 def process_pending(self):
458 if self.in_body:
459 while True:
460 line = self._readline()
461 if not line:
462 return
463 self.body.append(line)
464 if line == b".\r\n":
465 break
466 try:
467 meth, tokens = self.body_callback
468 meth(*tokens, body=self.body)
469 finally:
470 self.body_callback = None
471 self.body = None
472 self.in_body = False
473 while True:
474 line = self._decode(self._readline())
475 if not line:
476 return
477 if not line.endswith("\r\n"):
478 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
479 line = line[:-2]
480 cmd, *tokens = line.split()
481 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
482 meth = getattr(self, "handle_" + cmd.upper(), None)
483 if meth is None:
484 self.handle_unknown()
485 else:
486 try:
487 meth(*tokens)
488 except Exception as e:
489 raise ValueError("command failed: {!r}".format(line)) from e
490 else:
491 if self.in_body:
492 self.body_callback = meth, tokens
493 self.body = []
494
495 def expect_body(self):
496 """Flag that the client is expected to post a request body"""
497 self.in_body = True
498
499 def push_data(self, data):
500 """Push some binary data"""
501 self._push_data(data)
502
503 def push_lit(self, lit):
504 """Push a string literal"""
505 lit = textwrap.dedent(lit)
506 lit = "\r\n".join(lit.splitlines()) + "\r\n"
507 lit = lit.encode('utf-8')
508 self.push_data(lit)
509
510 def handle_unknown(self):
511 self.push_lit("500 What?")
512
513 def handle_welcome(self):
514 self.push_lit(self.welcome)
515
516 def handle_QUIT(self):
517 self.push_lit("205 Bye!")
518
519 def handle_DATE(self):
520 self.push_lit("111 20100914001155")
521
522 def handle_GROUP(self, group):
523 if group == "fr.comp.lang.python":
524 self.push_lit("211 486 761 1265 fr.comp.lang.python")
525 else:
526 self.push_lit("411 No such group {}".format(group))
527
528 def handle_HELP(self):
529 self.push_lit("""\
530 100 Legal commands
531 authinfo user Name|pass Password|generic <prog> <args>
532 date
533 help
534 Report problems to <root@example.org>
535 .""")
536
537 def handle_STAT(self, message_spec=None):
538 if message_spec is None:
539 self.push_lit("412 No newsgroup selected")
540 elif message_spec == "3000234":
541 self.push_lit("223 3000234 <45223423@example.com>")
542 elif message_spec == "<45223423@example.com>":
543 self.push_lit("223 0 <45223423@example.com>")
544 else:
545 self.push_lit("430 No Such Article Found")
546
547 def handle_NEXT(self):
548 self.push_lit("223 3000237 <668929@example.org> retrieved")
549
550 def handle_LAST(self):
551 self.push_lit("223 3000234 <45223423@example.com> retrieved")
552
553 def handle_LIST(self, action=None, param=None):
554 if action is None:
555 self.push_lit("""\
556 215 Newsgroups in form "group high low flags".
557 comp.lang.python 0000052340 0000002828 y
558 comp.lang.python.announce 0000001153 0000000993 m
559 free.it.comp.lang.python 0000000002 0000000002 y
560 fr.comp.lang.python 0000001254 0000000760 y
561 free.it.comp.lang.python.learner 0000000000 0000000001 y
562 tw.bbs.comp.lang.python 0000000304 0000000304 y
563 .""")
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000564 elif action == "ACTIVE":
565 if param == "*distutils*":
566 self.push_lit("""\
567 215 Newsgroups in form "group high low flags"
568 gmane.comp.python.distutils.devel 0000014104 0000000001 m
569 gmane.comp.python.distutils.cvs 0000000000 0000000001 m
570 .""")
571 else:
572 self.push_lit("""\
573 215 Newsgroups in form "group high low flags"
574 .""")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000575 elif action == "OVERVIEW.FMT":
576 self.push_lit("""\
577 215 Order of fields in overview database.
578 Subject:
579 From:
580 Date:
581 Message-ID:
582 References:
583 Bytes:
584 Lines:
585 Xref:full
586 .""")
587 elif action == "NEWSGROUPS":
588 assert param is not None
589 if param == "comp.lang.python":
590 self.push_lit("""\
591 215 Descriptions in form "group description".
592 comp.lang.python\tThe Python computer language.
593 .""")
594 elif param == "comp.lang.python*":
595 self.push_lit("""\
596 215 Descriptions in form "group description".
597 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
598 comp.lang.python\tThe Python computer language.
599 .""")
600 else:
601 self.push_lit("""\
602 215 Descriptions in form "group description".
603 .""")
604 else:
605 self.push_lit('501 Unknown LIST keyword')
606
607 def handle_NEWNEWS(self, group, date_str, time_str):
608 # We hard code different return messages depending on passed
609 # argument and date syntax.
610 if (group == "comp.lang.python" and date_str == "20100913"
611 and time_str == "082004"):
612 # Date was passed in RFC 3977 format (NNTP "v2")
613 self.push_lit("""\
614 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
615 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
616 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
617 .""")
618 elif (group == "comp.lang.python" and date_str == "100913"
619 and time_str == "082004"):
620 # Date was passed in RFC 977 format (NNTP "v1")
621 self.push_lit("""\
622 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
623 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
624 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
625 .""")
Georg Brandl28e78412013-10-27 07:29:47 +0100626 elif (group == 'comp.lang.python' and
627 date_str in ('20100101', '100101') and
628 time_str == '090000'):
629 self.push_lit('too long line' * 3000 +
630 '\n.')
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000631 else:
632 self.push_lit("""\
633 230 An empty list of newsarticles follows
634 .""")
635 # (Note for experiments: many servers disable NEWNEWS.
636 # As of this writing, sicinfo3.epfl.ch doesn't.)
637
638 def handle_XOVER(self, message_spec):
639 if message_spec == "57-59":
640 self.push_lit(
641 "224 Overview information for 57-58 follows\n"
642 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
643 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
644 "\tSat, 19 Jun 2010 18:04:08 -0400"
645 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
646 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
Dong-hee Na2e6a8ef2020-01-09 00:29:34 +0900647 "\tXref: news.gmane.io gmane.comp.python.authors:57"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000648 "\n"
649 "58\tLooking for a few good bloggers"
650 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
651 "\tThu, 22 Jul 2010 09:14:14 -0400"
652 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
653 "\t\t6683\t16"
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000654 "\t"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000655 "\n"
Martin Panter6245cb32016-04-15 02:14:19 +0000656 # A UTF-8 overview line from fr.comp.lang.python
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000657 "59\tRe: Message d'erreur incompréhensible (par moi)"
658 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
659 "\tWed, 15 Sep 2010 18:09:15 +0200"
660 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
661 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
662 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
663 "\n"
664 ".\n")
665 else:
666 self.push_lit("""\
667 224 No articles
668 .""")
669
670 def handle_POST(self, *, body=None):
671 if body is None:
672 if self.allow_posting:
673 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
674 self.expect_body()
675 else:
676 self.push_lit("440 Posting not permitted")
677 else:
678 assert self.allow_posting
679 self.push_lit("240 Article received OK")
680 self.posted_body = body
681
682 def handle_IHAVE(self, message_id, *, body=None):
683 if body is None:
684 if (self.allow_posting and
685 message_id == "<i.am.an.article.you.will.want@example.com>"):
686 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
687 self.expect_body()
688 else:
689 self.push_lit("435 Article not wanted")
690 else:
691 assert self.allow_posting
692 self.push_lit("235 Article transferred OK")
693 self.posted_body = body
694
695 sample_head = """\
696 From: "Demo User" <nobody@example.net>
697 Subject: I am just a test article
698 Content-Type: text/plain; charset=UTF-8; format=flowed
699 Message-ID: <i.am.an.article.you.will.want@example.com>"""
700
701 sample_body = """\
702 This is just a test article.
703 ..Here is a dot-starting line.
704
705 -- Signed by Andr\xe9."""
706
707 sample_article = sample_head + "\n\n" + sample_body
708
709 def handle_ARTICLE(self, message_spec=None):
710 if message_spec is None:
711 self.push_lit("220 3000237 <45223423@example.com>")
712 elif message_spec == "<45223423@example.com>":
713 self.push_lit("220 0 <45223423@example.com>")
714 elif message_spec == "3000234":
715 self.push_lit("220 3000234 <45223423@example.com>")
716 else:
717 self.push_lit("430 No Such Article Found")
718 return
719 self.push_lit(self.sample_article)
720 self.push_lit(".")
721
722 def handle_HEAD(self, message_spec=None):
723 if message_spec is None:
724 self.push_lit("221 3000237 <45223423@example.com>")
725 elif message_spec == "<45223423@example.com>":
726 self.push_lit("221 0 <45223423@example.com>")
727 elif message_spec == "3000234":
728 self.push_lit("221 3000234 <45223423@example.com>")
729 else:
730 self.push_lit("430 No Such Article Found")
731 return
732 self.push_lit(self.sample_head)
733 self.push_lit(".")
734
735 def handle_BODY(self, message_spec=None):
736 if message_spec is None:
737 self.push_lit("222 3000237 <45223423@example.com>")
738 elif message_spec == "<45223423@example.com>":
739 self.push_lit("222 0 <45223423@example.com>")
740 elif message_spec == "3000234":
741 self.push_lit("222 3000234 <45223423@example.com>")
742 else:
743 self.push_lit("430 No Such Article Found")
744 return
745 self.push_lit(self.sample_body)
746 self.push_lit(".")
747
Antoine Pitrou54411c12012-02-12 19:14:17 +0100748 def handle_AUTHINFO(self, cred_type, data):
749 if self._logged_in:
750 self.push_lit('502 Already Logged In')
751 elif cred_type == 'user':
752 if self._user_sent:
753 self.push_lit('482 User Credential Already Sent')
754 else:
755 self.push_lit('381 Password Required')
756 self._user_sent = True
757 elif cred_type == 'pass':
758 self.push_lit('281 Login Successful')
759 self._logged_in = True
760 else:
761 raise Exception('Unknown cred type {}'.format(cred_type))
762
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000763
764class NNTPv2Handler(NNTPv1Handler):
765 """A handler for RFC 3977 (NNTP "v2")"""
766
767 def handle_CAPABILITIES(self):
Antoine Pitrou54411c12012-02-12 19:14:17 +0100768 fmt = """\
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000769 101 Capability list:
Antoine Pitrouf80b3f72010-11-02 22:31:52 +0000770 VERSION 2 3
Antoine Pitrou54411c12012-02-12 19:14:17 +0100771 IMPLEMENTATION INN 2.5.1{}
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000772 HDR
773 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
774 OVER
775 POST
776 READER
Antoine Pitrou54411c12012-02-12 19:14:17 +0100777 ."""
778
779 if not self._logged_in:
780 self.push_lit(fmt.format('\n AUTHINFO USER'))
781 else:
782 self.push_lit(fmt.format(''))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000783
Antoine Pitrou71135622012-02-14 23:29:34 +0100784 def handle_MODE(self, _):
785 raise Exception('MODE READER sent despite READER has been advertised')
786
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000787 def handle_OVER(self, message_spec=None):
788 return self.handle_XOVER(message_spec)
789
790
Antoine Pitrou54411c12012-02-12 19:14:17 +0100791class CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
792 """A handler that allows CAPABILITIES only after login"""
793
794 def handle_CAPABILITIES(self):
795 if not self._logged_in:
796 self.push_lit('480 You must log in.')
797 else:
798 super().handle_CAPABILITIES()
799
800
Antoine Pitrou71135622012-02-14 23:29:34 +0100801class ModeSwitchingNNTPv2Handler(NNTPv2Handler):
802 """A server that starts in transit mode"""
803
804 def __init__(self):
805 self._switched = False
806
807 def handle_CAPABILITIES(self):
808 fmt = """\
809 101 Capability list:
810 VERSION 2 3
811 IMPLEMENTATION INN 2.5.1
812 HDR
813 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
814 OVER
815 POST
816 {}READER
817 ."""
818 if self._switched:
819 self.push_lit(fmt.format(''))
820 else:
821 self.push_lit(fmt.format('MODE-'))
822
823 def handle_MODE(self, what):
824 assert not self._switched and what == 'reader'
825 self._switched = True
826 self.push_lit('200 Posting allowed')
827
828
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000829class NNTPv1v2TestsMixin:
830
831 def setUp(self):
832 super().setUp()
833
834 def test_welcome(self):
835 self.assertEqual(self.server.welcome, self.handler.welcome)
836
Antoine Pitrou54411c12012-02-12 19:14:17 +0100837 def test_authinfo(self):
838 if self.nntp_version == 2:
839 self.assertIn('AUTHINFO', self.server._caps)
840 self.server.login('testuser', 'testpw')
841 # if AUTHINFO is gone from _caps we also know that getcapabilities()
842 # has been called after login as it should
843 self.assertNotIn('AUTHINFO', self.server._caps)
844
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000845 def test_date(self):
846 resp, date = self.server.date()
847 self.assertEqual(resp, "111 20100914001155")
848 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
849
850 def test_quit(self):
851 self.assertFalse(self.sio.closed)
852 resp = self.server.quit()
853 self.assertEqual(resp, "205 Bye!")
854 self.assertTrue(self.sio.closed)
855
856 def test_help(self):
857 resp, help = self.server.help()
858 self.assertEqual(resp, "100 Legal commands")
859 self.assertEqual(help, [
860 ' authinfo user Name|pass Password|generic <prog> <args>',
861 ' date',
862 ' help',
863 'Report problems to <root@example.org>',
864 ])
865
866 def test_list(self):
867 resp, groups = self.server.list()
868 self.assertEqual(len(groups), 6)
869 g = groups[1]
870 self.assertEqual(g,
871 GroupInfo("comp.lang.python.announce", "0000001153",
872 "0000000993", "m"))
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000873 resp, groups = self.server.list("*distutils*")
874 self.assertEqual(len(groups), 2)
875 g = groups[0]
876 self.assertEqual(g,
877 GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
878 "0000000001", "m"))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000879
880 def test_stat(self):
881 resp, art_num, message_id = self.server.stat(3000234)
882 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
883 self.assertEqual(art_num, 3000234)
884 self.assertEqual(message_id, "<45223423@example.com>")
885 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
886 self.assertEqual(resp, "223 0 <45223423@example.com>")
887 self.assertEqual(art_num, 0)
888 self.assertEqual(message_id, "<45223423@example.com>")
889 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
890 self.server.stat("<non.existent.id>")
891 self.assertEqual(cm.exception.response, "430 No Such Article Found")
892 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
893 self.server.stat()
894 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
895
896 def test_next(self):
897 resp, art_num, message_id = self.server.next()
898 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
899 self.assertEqual(art_num, 3000237)
900 self.assertEqual(message_id, "<668929@example.org>")
901
902 def test_last(self):
903 resp, art_num, message_id = self.server.last()
904 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
905 self.assertEqual(art_num, 3000234)
906 self.assertEqual(message_id, "<45223423@example.com>")
907
908 def test_description(self):
909 desc = self.server.description("comp.lang.python")
910 self.assertEqual(desc, "The Python computer language.")
911 desc = self.server.description("comp.lang.pythonx")
912 self.assertEqual(desc, "")
913
914 def test_descriptions(self):
915 resp, groups = self.server.descriptions("comp.lang.python")
916 self.assertEqual(resp, '215 Descriptions in form "group description".')
917 self.assertEqual(groups, {
918 "comp.lang.python": "The Python computer language.",
919 })
920 resp, groups = self.server.descriptions("comp.lang.python*")
921 self.assertEqual(groups, {
922 "comp.lang.python": "The Python computer language.",
923 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
924 })
925 resp, groups = self.server.descriptions("comp.lang.pythonx")
926 self.assertEqual(groups, {})
927
928 def test_group(self):
929 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
930 self.assertTrue(resp.startswith("211 "), resp)
931 self.assertEqual(first, 761)
932 self.assertEqual(last, 1265)
933 self.assertEqual(count, 486)
934 self.assertEqual(group, "fr.comp.lang.python")
935 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
936 self.server.group("comp.lang.python.devel")
937 exc = cm.exception
938 self.assertTrue(exc.response.startswith("411 No such group"),
939 exc.response)
940
941 def test_newnews(self):
942 # NEWNEWS comp.lang.python [20]100913 082004
943 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
944 resp, ids = self.server.newnews("comp.lang.python", dt)
945 expected = (
946 "230 list of newsarticles (NNTP v{0}) "
947 "created after Mon Sep 13 08:20:04 2010 follows"
948 ).format(self.nntp_version)
949 self.assertEqual(resp, expected)
950 self.assertEqual(ids, [
951 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
952 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
953 ])
954 # NEWNEWS fr.comp.lang.python [20]100913 082004
955 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
956 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
957 self.assertEqual(resp, "230 An empty list of newsarticles follows")
958 self.assertEqual(ids, [])
959
960 def _check_article_body(self, lines):
961 self.assertEqual(len(lines), 4)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +0000962 self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000963 self.assertEqual(lines[-2], b"")
964 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
965 self.assertEqual(lines[-4], b"This is just a test article.")
966
967 def _check_article_head(self, lines):
968 self.assertEqual(len(lines), 4)
969 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
970 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
971
972 def _check_article_data(self, lines):
973 self.assertEqual(len(lines), 9)
974 self._check_article_head(lines[:4])
975 self._check_article_body(lines[-4:])
976 self.assertEqual(lines[4], b"")
977
978 def test_article(self):
979 # ARTICLE
980 resp, info = self.server.article()
981 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
982 art_num, message_id, lines = info
983 self.assertEqual(art_num, 3000237)
984 self.assertEqual(message_id, "<45223423@example.com>")
985 self._check_article_data(lines)
986 # ARTICLE num
987 resp, info = self.server.article(3000234)
988 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
989 art_num, message_id, lines = info
990 self.assertEqual(art_num, 3000234)
991 self.assertEqual(message_id, "<45223423@example.com>")
992 self._check_article_data(lines)
993 # ARTICLE id
994 resp, info = self.server.article("<45223423@example.com>")
995 self.assertEqual(resp, "220 0 <45223423@example.com>")
996 art_num, message_id, lines = info
997 self.assertEqual(art_num, 0)
998 self.assertEqual(message_id, "<45223423@example.com>")
999 self._check_article_data(lines)
1000 # Non-existent id
1001 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1002 self.server.article("<non-existent@example.com>")
1003 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1004
1005 def test_article_file(self):
1006 # With a "file" argument
1007 f = io.BytesIO()
1008 resp, info = self.server.article(file=f)
1009 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
1010 art_num, message_id, lines = info
1011 self.assertEqual(art_num, 3000237)
1012 self.assertEqual(message_id, "<45223423@example.com>")
1013 self.assertEqual(lines, [])
1014 data = f.getvalue()
1015 self.assertTrue(data.startswith(
1016 b'From: "Demo User" <nobody@example.net>\r\n'
1017 b'Subject: I am just a test article\r\n'
1018 ), ascii(data))
1019 self.assertTrue(data.endswith(
1020 b'This is just a test article.\r\n'
1021 b'.Here is a dot-starting line.\r\n'
1022 b'\r\n'
1023 b'-- Signed by Andr\xc3\xa9.\r\n'
1024 ), ascii(data))
1025
1026 def test_head(self):
1027 # HEAD
1028 resp, info = self.server.head()
1029 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1030 art_num, message_id, lines = info
1031 self.assertEqual(art_num, 3000237)
1032 self.assertEqual(message_id, "<45223423@example.com>")
1033 self._check_article_head(lines)
1034 # HEAD num
1035 resp, info = self.server.head(3000234)
1036 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
1037 art_num, message_id, lines = info
1038 self.assertEqual(art_num, 3000234)
1039 self.assertEqual(message_id, "<45223423@example.com>")
1040 self._check_article_head(lines)
1041 # HEAD id
1042 resp, info = self.server.head("<45223423@example.com>")
1043 self.assertEqual(resp, "221 0 <45223423@example.com>")
1044 art_num, message_id, lines = info
1045 self.assertEqual(art_num, 0)
1046 self.assertEqual(message_id, "<45223423@example.com>")
1047 self._check_article_head(lines)
1048 # Non-existent id
1049 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1050 self.server.head("<non-existent@example.com>")
1051 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1052
Antoine Pitrou2640b522012-02-15 18:53:18 +01001053 def test_head_file(self):
1054 f = io.BytesIO()
1055 resp, info = self.server.head(file=f)
1056 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1057 art_num, message_id, lines = info
1058 self.assertEqual(art_num, 3000237)
1059 self.assertEqual(message_id, "<45223423@example.com>")
1060 self.assertEqual(lines, [])
1061 data = f.getvalue()
1062 self.assertTrue(data.startswith(
1063 b'From: "Demo User" <nobody@example.net>\r\n'
1064 b'Subject: I am just a test article\r\n'
1065 ), ascii(data))
1066 self.assertFalse(data.endswith(
1067 b'This is just a test article.\r\n'
1068 b'.Here is a dot-starting line.\r\n'
1069 b'\r\n'
1070 b'-- Signed by Andr\xc3\xa9.\r\n'
1071 ), ascii(data))
1072
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001073 def test_body(self):
1074 # BODY
1075 resp, info = self.server.body()
1076 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1077 art_num, message_id, lines = info
1078 self.assertEqual(art_num, 3000237)
1079 self.assertEqual(message_id, "<45223423@example.com>")
1080 self._check_article_body(lines)
1081 # BODY num
1082 resp, info = self.server.body(3000234)
1083 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
1084 art_num, message_id, lines = info
1085 self.assertEqual(art_num, 3000234)
1086 self.assertEqual(message_id, "<45223423@example.com>")
1087 self._check_article_body(lines)
1088 # BODY id
1089 resp, info = self.server.body("<45223423@example.com>")
1090 self.assertEqual(resp, "222 0 <45223423@example.com>")
1091 art_num, message_id, lines = info
1092 self.assertEqual(art_num, 0)
1093 self.assertEqual(message_id, "<45223423@example.com>")
1094 self._check_article_body(lines)
1095 # Non-existent id
1096 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1097 self.server.body("<non-existent@example.com>")
1098 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1099
Antoine Pitrou2640b522012-02-15 18:53:18 +01001100 def test_body_file(self):
1101 f = io.BytesIO()
1102 resp, info = self.server.body(file=f)
1103 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1104 art_num, message_id, lines = info
1105 self.assertEqual(art_num, 3000237)
1106 self.assertEqual(message_id, "<45223423@example.com>")
1107 self.assertEqual(lines, [])
1108 data = f.getvalue()
1109 self.assertFalse(data.startswith(
1110 b'From: "Demo User" <nobody@example.net>\r\n'
1111 b'Subject: I am just a test article\r\n'
1112 ), ascii(data))
1113 self.assertTrue(data.endswith(
1114 b'This is just a test article.\r\n'
1115 b'.Here is a dot-starting line.\r\n'
1116 b'\r\n'
1117 b'-- Signed by Andr\xc3\xa9.\r\n'
1118 ), ascii(data))
1119
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001120 def check_over_xover_resp(self, resp, overviews):
1121 self.assertTrue(resp.startswith("224 "), resp)
1122 self.assertEqual(len(overviews), 3)
1123 art_num, over = overviews[0]
1124 self.assertEqual(art_num, 57)
1125 self.assertEqual(over, {
1126 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
1127 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
1128 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
1129 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
1130 "references": "<hvalf7$ort$1@dough.gmane.org>",
1131 ":bytes": "7103",
1132 ":lines": "16",
Dong-hee Na2e6a8ef2020-01-09 00:29:34 +09001133 "xref": "news.gmane.io gmane.comp.python.authors:57"
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001134 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001135 art_num, over = overviews[1]
1136 self.assertEqual(over["xref"], None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001137 art_num, over = overviews[2]
1138 self.assertEqual(over["subject"],
1139 "Re: Message d'erreur incompréhensible (par moi)")
1140
1141 def test_xover(self):
1142 resp, overviews = self.server.xover(57, 59)
1143 self.check_over_xover_resp(resp, overviews)
1144
1145 def test_over(self):
1146 # In NNTP "v1", this will fallback on XOVER
1147 resp, overviews = self.server.over((57, 59))
1148 self.check_over_xover_resp(resp, overviews)
1149
1150 sample_post = (
1151 b'From: "Demo User" <nobody@example.net>\r\n'
1152 b'Subject: I am just a test article\r\n'
1153 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
1154 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
1155 b'\r\n'
1156 b'This is just a test article.\r\n'
1157 b'.Here is a dot-starting line.\r\n'
1158 b'\r\n'
1159 b'-- Signed by Andr\xc3\xa9.\r\n'
1160 )
1161
1162 def _check_posted_body(self):
1163 # Check the raw body as received by the server
1164 lines = self.handler.posted_body
1165 # One additional line for the "." terminator
1166 self.assertEqual(len(lines), 10)
1167 self.assertEqual(lines[-1], b'.\r\n')
1168 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
1169 self.assertEqual(lines[-3], b'\r\n')
1170 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
1171 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
1172
1173 def _check_post_ihave_sub(self, func, *args, file_factory):
1174 # First the prepared post with CRLF endings
1175 post = self.sample_post
1176 func_args = args + (file_factory(post),)
1177 self.handler.posted_body = None
1178 resp = func(*func_args)
1179 self._check_posted_body()
1180 # Then the same post with "normal" line endings - they should be
1181 # converted by NNTP.post and NNTP.ihave.
1182 post = self.sample_post.replace(b"\r\n", b"\n")
1183 func_args = args + (file_factory(post),)
1184 self.handler.posted_body = None
1185 resp = func(*func_args)
1186 self._check_posted_body()
1187 return resp
1188
1189 def check_post_ihave(self, func, success_resp, *args):
1190 # With a bytes object
1191 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1192 self.assertEqual(resp, success_resp)
1193 # With a bytearray object
1194 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1195 self.assertEqual(resp, success_resp)
1196 # With a file object
1197 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1198 self.assertEqual(resp, success_resp)
1199 # With an iterable of terminated lines
1200 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001201 return iter(b.splitlines(keepends=True))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001202 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1203 self.assertEqual(resp, success_resp)
1204 # With an iterable of non-terminated lines
1205 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001206 return iter(b.splitlines(keepends=False))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001207 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1208 self.assertEqual(resp, success_resp)
1209
1210 def test_post(self):
1211 self.check_post_ihave(self.server.post, "240 Article received OK")
1212 self.handler.allow_posting = False
1213 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1214 self.server.post(self.sample_post)
1215 self.assertEqual(cm.exception.response,
1216 "440 Posting not permitted")
1217
1218 def test_ihave(self):
1219 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1220 "<i.am.an.article.you.will.want@example.com>")
1221 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1222 self.server.ihave("<another.message.id>", self.sample_post)
1223 self.assertEqual(cm.exception.response,
1224 "435 Article not wanted")
1225
Georg Brandl28e78412013-10-27 07:29:47 +01001226 def test_too_long_lines(self):
1227 dt = datetime.datetime(2010, 1, 1, 9, 0, 0)
1228 self.assertRaises(nntplib.NNTPDataError,
1229 self.server.newnews, "comp.lang.python", dt)
1230
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001231
1232class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1233 """Tests an NNTP v1 server (no capabilities)."""
1234
1235 nntp_version = 1
1236 handler_class = NNTPv1Handler
1237
1238 def test_caps(self):
1239 caps = self.server.getcapabilities()
1240 self.assertEqual(caps, {})
1241 self.assertEqual(self.server.nntp_version, 1)
Antoine Pitroua0781152010-11-05 19:16:37 +00001242 self.assertEqual(self.server.nntp_implementation, None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001243
1244
1245class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1246 """Tests an NNTP v2 server (with capabilities)."""
1247
1248 nntp_version = 2
1249 handler_class = NNTPv2Handler
1250
1251 def test_caps(self):
1252 caps = self.server.getcapabilities()
1253 self.assertEqual(caps, {
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001254 'VERSION': ['2', '3'],
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001255 'IMPLEMENTATION': ['INN', '2.5.1'],
1256 'AUTHINFO': ['USER'],
1257 'HDR': [],
1258 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1259 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1260 'OVER': [],
1261 'POST': [],
1262 'READER': [],
1263 })
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001264 self.assertEqual(self.server.nntp_version, 3)
Antoine Pitroua0781152010-11-05 19:16:37 +00001265 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001266
1267
Antoine Pitrou54411c12012-02-12 19:14:17 +01001268class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
1269 """Tests a probably NNTP v2 server with capabilities only after login."""
1270
1271 nntp_version = 2
1272 handler_class = CapsAfterLoginNNTPv2Handler
1273
1274 def test_caps_only_after_login(self):
1275 self.assertEqual(self.server._caps, {})
1276 self.server.login('testuser', 'testpw')
1277 self.assertIn('VERSION', self.server._caps)
1278
1279
Antoine Pitrou71135622012-02-14 23:29:34 +01001280class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin,
1281 unittest.TestCase):
1282 """Same tests as for v2 but we tell NTTP to send MODE READER to a server
1283 that isn't in READER mode by default."""
1284
1285 nntp_version = 2
1286 handler_class = ModeSwitchingNNTPv2Handler
1287
1288 def test_we_are_in_reader_mode_after_connect(self):
1289 self.assertIn('READER', self.server._caps)
1290
1291
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001292class MiscTests(unittest.TestCase):
1293
1294 def test_decode_header(self):
1295 def gives(a, b):
1296 self.assertEqual(nntplib.decode_header(a), b)
1297 gives("" , "")
1298 gives("a plain header", "a plain header")
1299 gives(" with extra spaces ", " with extra spaces ")
1300 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
1301 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1302 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1303 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1304 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1305 "Re: problème de matrice")
1306 # A natively utf-8 header (found in the real world!)
1307 gives("Re: Message d'erreur incompréhensible (par moi)",
1308 "Re: Message d'erreur incompréhensible (par moi)")
1309
1310 def test_parse_overview_fmt(self):
1311 # The minimal (default) response
1312 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1313 "References:", ":bytes", ":lines"]
1314 self.assertEqual(nntplib._parse_overview_fmt(lines),
1315 ["subject", "from", "date", "message-id", "references",
1316 ":bytes", ":lines"])
1317 # The minimal response using alternative names
1318 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1319 "References:", "Bytes:", "Lines:"]
1320 self.assertEqual(nntplib._parse_overview_fmt(lines),
1321 ["subject", "from", "date", "message-id", "references",
1322 ":bytes", ":lines"])
1323 # Variations in casing
1324 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1325 "References:", "BYTES:", "Lines:"]
1326 self.assertEqual(nntplib._parse_overview_fmt(lines),
1327 ["subject", "from", "date", "message-id", "references",
1328 ":bytes", ":lines"])
1329 # First example from RFC 3977
1330 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1331 "References:", ":bytes", ":lines", "Xref:full",
1332 "Distribution:full"]
1333 self.assertEqual(nntplib._parse_overview_fmt(lines),
1334 ["subject", "from", "date", "message-id", "references",
1335 ":bytes", ":lines", "xref", "distribution"])
1336 # Second example from RFC 3977
1337 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1338 "References:", "Bytes:", "Lines:", "Xref:FULL",
1339 "Distribution:FULL"]
1340 self.assertEqual(nntplib._parse_overview_fmt(lines),
1341 ["subject", "from", "date", "message-id", "references",
1342 ":bytes", ":lines", "xref", "distribution"])
1343 # A classic response from INN
1344 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1345 "References:", "Bytes:", "Lines:", "Xref:full"]
1346 self.assertEqual(nntplib._parse_overview_fmt(lines),
1347 ["subject", "from", "date", "message-id", "references",
1348 ":bytes", ":lines", "xref"])
1349
1350 def test_parse_overview(self):
1351 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1352 # First example from RFC 3977
1353 lines = [
1354 '3000234\tI am just a test article\t"Demo User" '
1355 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1356 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1357 '17\tXref: news.example.com misc.test:3000363',
1358 ]
1359 overview = nntplib._parse_overview(lines, fmt)
1360 (art_num, fields), = overview
1361 self.assertEqual(art_num, 3000234)
1362 self.assertEqual(fields, {
1363 'subject': 'I am just a test article',
1364 'from': '"Demo User" <nobody@example.com>',
1365 'date': '6 Oct 1998 04:38:40 -0500',
1366 'message-id': '<45223423@example.com>',
1367 'references': '<45454@example.net>',
1368 ':bytes': '1234',
1369 ':lines': '17',
1370 'xref': 'news.example.com misc.test:3000363',
1371 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001372 # Second example; here the "Xref" field is totally absent (including
1373 # the header name) and comes out as None
1374 lines = [
1375 '3000234\tI am just a test article\t"Demo User" '
1376 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1377 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1378 '17\t\t',
1379 ]
1380 overview = nntplib._parse_overview(lines, fmt)
1381 (art_num, fields), = overview
1382 self.assertEqual(fields['xref'], None)
1383 # Third example; the "Xref" is an empty string, while "references"
1384 # is a single space.
1385 lines = [
1386 '3000234\tI am just a test article\t"Demo User" '
1387 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1388 '<45223423@example.com>\t \t1234\t'
1389 '17\tXref: \t',
1390 ]
1391 overview = nntplib._parse_overview(lines, fmt)
1392 (art_num, fields), = overview
1393 self.assertEqual(fields['references'], ' ')
1394 self.assertEqual(fields['xref'], '')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001395
1396 def test_parse_datetime(self):
1397 def gives(a, b, *c):
1398 self.assertEqual(nntplib._parse_datetime(a, b),
1399 datetime.datetime(*c))
1400 # Output of DATE command
1401 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1402 # Variations
1403 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1404 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1405 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1406
1407 def test_unparse_datetime(self):
1408 # Test non-legacy mode
1409 # 1) with a datetime
1410 def gives(y, M, d, h, m, s, date_str, time_str):
1411 dt = datetime.datetime(y, M, d, h, m, s)
1412 self.assertEqual(nntplib._unparse_datetime(dt),
1413 (date_str, time_str))
1414 self.assertEqual(nntplib._unparse_datetime(dt, False),
1415 (date_str, time_str))
1416 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1417 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1418 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1419 # 2) with a date
1420 def gives(y, M, d, date_str, time_str):
1421 dt = datetime.date(y, M, d)
1422 self.assertEqual(nntplib._unparse_datetime(dt),
1423 (date_str, time_str))
1424 self.assertEqual(nntplib._unparse_datetime(dt, False),
1425 (date_str, time_str))
1426 gives(1999, 6, 23, "19990623", "000000")
1427 gives(2000, 6, 23, "20000623", "000000")
1428 gives(2010, 6, 5, "20100605", "000000")
1429
1430 def test_unparse_datetime_legacy(self):
1431 # Test legacy mode (RFC 977)
1432 # 1) with a datetime
1433 def gives(y, M, d, h, m, s, date_str, time_str):
1434 dt = datetime.datetime(y, M, d, h, m, s)
1435 self.assertEqual(nntplib._unparse_datetime(dt, True),
1436 (date_str, time_str))
1437 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1438 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1439 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1440 # 2) with a date
1441 def gives(y, M, d, date_str, time_str):
1442 dt = datetime.date(y, M, d)
1443 self.assertEqual(nntplib._unparse_datetime(dt, True),
1444 (date_str, time_str))
1445 gives(1999, 6, 23, "990623", "000000")
1446 gives(2000, 6, 23, "000623", "000000")
1447 gives(2010, 6, 5, "100605", "000000")
1448
Serhiy Storchaka43767632013-11-03 21:31:38 +02001449 @unittest.skipUnless(ssl, 'requires SSL support')
1450 def test_ssl_support(self):
1451 self.assertTrue(hasattr(nntplib, 'NNTP_SSL'))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001452
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001453
Berker Peksag96756b62014-09-20 08:53:05 +03001454class PublicAPITests(unittest.TestCase):
1455 """Ensures that the correct values are exposed in the public API."""
1456
1457 def test_module_all_attribute(self):
1458 self.assertTrue(hasattr(nntplib, '__all__'))
1459 target_api = ['NNTP', 'NNTPError', 'NNTPReplyError',
1460 'NNTPTemporaryError', 'NNTPPermanentError',
1461 'NNTPProtocolError', 'NNTPDataError', 'decode_header']
1462 if ssl is not None:
1463 target_api.append('NNTP_SSL')
1464 self.assertEqual(set(nntplib.__all__), set(target_api))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001465
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001466class MockSocketTests(unittest.TestCase):
1467 """Tests involving a mock socket object
1468
1469 Used where the _NNTPServerIO file object is not enough."""
1470
1471 nntp_class = nntplib.NNTP
1472
1473 def check_constructor_error_conditions(
1474 self, handler_class,
1475 expected_error_type, expected_error_msg,
1476 login=None, password=None):
1477
1478 class mock_socket_module:
1479 def create_connection(address, timeout):
1480 return MockSocket()
1481
1482 class MockSocket:
1483 def close(self):
1484 nonlocal socket_closed
1485 socket_closed = True
1486
1487 def makefile(socket, mode):
1488 handler = handler_class()
1489 _, file = make_mock_file(handler)
1490 files.append(file)
1491 return file
1492
1493 socket_closed = False
1494 files = []
1495 with patch('nntplib.socket', mock_socket_module), \
1496 self.assertRaisesRegex(expected_error_type, expected_error_msg):
1497 self.nntp_class('dummy', user=login, password=password)
1498 self.assertTrue(socket_closed)
1499 for f in files:
1500 self.assertTrue(f.closed)
1501
1502 def test_bad_welcome(self):
1503 #Test a bad welcome message
1504 class Handler(NNTPv1Handler):
1505 welcome = 'Bad Welcome'
1506 self.check_constructor_error_conditions(
1507 Handler, nntplib.NNTPProtocolError, Handler.welcome)
1508
1509 def test_service_temporarily_unavailable(self):
1510 #Test service temporarily unavailable
1511 class Handler(NNTPv1Handler):
Martin Pantereb995702016-07-28 01:11:04 +00001512 welcome = '400 Service temporarily unavailable'
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001513 self.check_constructor_error_conditions(
1514 Handler, nntplib.NNTPTemporaryError, Handler.welcome)
1515
1516 def test_service_permanently_unavailable(self):
1517 #Test service permanently unavailable
1518 class Handler(NNTPv1Handler):
Martin Pantereb995702016-07-28 01:11:04 +00001519 welcome = '502 Service permanently unavailable'
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001520 self.check_constructor_error_conditions(
1521 Handler, nntplib.NNTPPermanentError, Handler.welcome)
1522
1523 def test_bad_capabilities(self):
1524 #Test a bad capabilities response
1525 class Handler(NNTPv1Handler):
1526 def handle_CAPABILITIES(self):
1527 self.push_lit(capabilities_response)
1528 capabilities_response = '201 bad capability'
1529 self.check_constructor_error_conditions(
1530 Handler, nntplib.NNTPReplyError, capabilities_response)
1531
1532 def test_login_aborted(self):
1533 #Test a bad authinfo response
1534 login = 't@e.com'
1535 password = 'python'
1536 class Handler(NNTPv1Handler):
1537 def handle_AUTHINFO(self, *args):
1538 self.push_lit(authinfo_response)
1539 authinfo_response = '503 Mechanism not recognized'
1540 self.check_constructor_error_conditions(
1541 Handler, nntplib.NNTPPermanentError, authinfo_response,
1542 login, password)
1543
Serhiy Storchaka80774342015-04-03 15:02:20 +03001544class bypass_context:
1545 """Bypass encryption and actual SSL module"""
1546 def wrap_socket(sock, **args):
1547 return sock
1548
1549@unittest.skipUnless(ssl, 'requires SSL support')
1550class MockSslTests(MockSocketTests):
1551 @staticmethod
1552 def nntp_class(*pos, **kw):
1553 return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw)
Victor Stinner8c9bba02015-04-03 11:06:40 +02001554
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001555
Martin Panter8f19e8e2016-01-19 01:10:58 +00001556class LocalServerTests(unittest.TestCase):
1557 def setUp(self):
1558 sock = socket.socket()
Serhiy Storchaka16994912020-04-25 10:06:29 +03001559 port = socket_helper.bind_port(sock)
Martin Panter8f19e8e2016-01-19 01:10:58 +00001560 sock.listen()
1561 self.background = threading.Thread(
1562 target=self.run_server, args=(sock,))
1563 self.background.start()
1564 self.addCleanup(self.background.join)
1565
Serhiy Storchaka16994912020-04-25 10:06:29 +03001566 self.nntp = NNTP(socket_helper.HOST, port, usenetrc=False).__enter__()
Martin Panter8f19e8e2016-01-19 01:10:58 +00001567 self.addCleanup(self.nntp.__exit__, None, None, None)
1568
1569 def run_server(self, sock):
1570 # Could be generalized to handle more commands in separate methods
1571 with sock:
1572 [client, _] = sock.accept()
1573 with contextlib.ExitStack() as cleanup:
1574 cleanup.enter_context(client)
1575 reader = cleanup.enter_context(client.makefile('rb'))
1576 client.sendall(b'200 Server ready\r\n')
1577 while True:
1578 cmd = reader.readline()
1579 if cmd == b'CAPABILITIES\r\n':
1580 client.sendall(
1581 b'101 Capability list:\r\n'
1582 b'VERSION 2\r\n'
1583 b'STARTTLS\r\n'
1584 b'.\r\n'
1585 )
1586 elif cmd == b'STARTTLS\r\n':
1587 reader.close()
1588 client.sendall(b'382 Begin TLS negotiation now\r\n')
Christian Heimesd0486372016-09-10 23:23:33 +02001589 context = ssl.SSLContext()
1590 context.load_cert_chain(certfile)
1591 client = context.wrap_socket(
1592 client, server_side=True)
Martin Panter8f19e8e2016-01-19 01:10:58 +00001593 cleanup.enter_context(client)
1594 reader = cleanup.enter_context(client.makefile('rb'))
1595 elif cmd == b'QUIT\r\n':
1596 client.sendall(b'205 Bye!\r\n')
1597 break
1598 else:
1599 raise ValueError('Unexpected command {!r}'.format(cmd))
1600
1601 @unittest.skipUnless(ssl, 'requires SSL support')
1602 def test_starttls(self):
1603 file = self.nntp.file
1604 sock = self.nntp.sock
1605 self.nntp.starttls()
1606 # Check that the socket and internal pseudo-file really were
1607 # changed.
1608 self.assertNotEqual(file, self.nntp.file)
1609 self.assertNotEqual(sock, self.nntp.sock)
1610 # Check that the new socket really is an SSL one
1611 self.assertIsInstance(self.nntp.sock, ssl.SSLSocket)
1612 # Check that trying starttls when it's already active fails.
1613 self.assertRaises(ValueError, self.nntp.starttls)
1614
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001615
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001616if __name__ == "__main__":
Berker Peksag96756b62014-09-20 08:53:05 +03001617 unittest.main()