blob: 19509463a2ba1322275498758a0ac041e3cf9b16 [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +00002import socket
Antoine Pitrou69ab9512010-09-29 15:03:40 +00003import datetime
4import textwrap
5import unittest
Antoine Pitroude609182010-11-18 17:29:23 +00006import functools
Antoine Pitrou69ab9512010-09-29 15:03:40 +00007import contextlib
Dong-hee Naaa92a7c2020-05-16 19:31:54 +09008import nntplib
Martin Panter8f19e8e2016-01-19 01:10:58 +00009import os.path
Gregory P. Smith2cc02232019-05-06 17:54:06 -040010import re
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020011import threading
12
Antoine Pitrou69ab9512010-09-29 15:03:40 +000013from test import support
Serhiy Storchaka16994912020-04-25 10:06:29 +030014from test.support import socket_helper
Serhiy Storchaka43767632013-11-03 21:31:38 +020015from nntplib import NNTP, GroupInfo
Serhiy Storchaka52027c32015-03-21 09:40:26 +020016from unittest.mock import patch
Serhiy Storchaka43767632013-11-03 21:31:38 +020017try:
Antoine Pitrou1cb121e2010-11-09 18:54:37 +000018 import ssl
Serhiy Storchaka43767632013-11-03 21:31:38 +020019except ImportError:
20 ssl = None
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020021
Antoine Pitrou69ab9512010-09-29 15:03:40 +000022
Martin Panter8f19e8e2016-01-19 01:10:58 +000023certfile = os.path.join(os.path.dirname(__file__), 'keycert3.pem')
Antoine Pitrou69ab9512010-09-29 15:03:40 +000024
Gregory P. Smith2cc02232019-05-06 17:54:06 -040025if ssl is not None:
26 SSLError = ssl.SSLError
27else:
28 class SSLError(Exception):
29 """Non-existent exception class when we lack SSL support."""
30 reason = "This will never be raised."
31
Antoine Pitrou69ab9512010-09-29 15:03:40 +000032# TODO:
33# - test the `file` arg to more commands
34# - test error conditions
Antoine Pitroua5785b12010-09-29 16:19:50 +000035# - test auth and `usenetrc`
Antoine Pitrou69ab9512010-09-29 15:03:40 +000036
37
38class NetworkedNNTPTestsMixin:
39
Christian Heimese9832522021-05-01 20:53:10 +020040 ssl_context = None
41
Antoine Pitrou69ab9512010-09-29 15:03:40 +000042 def test_welcome(self):
43 welcome = self.server.getwelcome()
44 self.assertEqual(str, type(welcome))
45
46 def test_help(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000047 resp, lines = self.server.help()
Antoine Pitrou69ab9512010-09-29 15:03:40 +000048 self.assertTrue(resp.startswith("100 "), resp)
Antoine Pitrou08eeada2010-11-04 21:36:15 +000049 for line in lines:
Antoine Pitrou69ab9512010-09-29 15:03:40 +000050 self.assertEqual(str, type(line))
51
52 def test_list(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000053 resp, groups = self.server.list()
54 if len(groups) > 0:
55 self.assertEqual(GroupInfo, type(groups[0]))
56 self.assertEqual(str, type(groups[0].group))
57
58 def test_list_active(self):
59 resp, groups = self.server.list(self.GROUP_PAT)
60 if len(groups) > 0:
61 self.assertEqual(GroupInfo, type(groups[0]))
62 self.assertEqual(str, type(groups[0].group))
Antoine Pitrou69ab9512010-09-29 15:03:40 +000063
64 def test_unknown_command(self):
65 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
66 self.server._shortcmd("XYZZY")
67 resp = cm.exception.response
68 self.assertTrue(resp.startswith("500 "), resp)
69
70 def test_newgroups(self):
71 # gmane gets a constant influx of new groups. In order not to stress
72 # the server too much, we choose a recent date in the past.
73 dt = datetime.date.today() - datetime.timedelta(days=7)
74 resp, groups = self.server.newgroups(dt)
75 if len(groups) > 0:
76 self.assertIsInstance(groups[0], GroupInfo)
77 self.assertIsInstance(groups[0].group, str)
78
79 def test_description(self):
80 def _check_desc(desc):
81 # Sanity checks
82 self.assertIsInstance(desc, str)
83 self.assertNotIn(self.GROUP_NAME, desc)
84 desc = self.server.description(self.GROUP_NAME)
85 _check_desc(desc)
86 # Another sanity check
Dong-hee Naec316532021-01-01 23:20:33 +090087 self.assertIn(self.DESC, desc)
Antoine Pitrou69ab9512010-09-29 15:03:40 +000088 # With a pattern
89 desc = self.server.description(self.GROUP_PAT)
90 _check_desc(desc)
91 # Shouldn't exist
92 desc = self.server.description("zk.brrtt.baz")
93 self.assertEqual(desc, '')
94
95 def test_descriptions(self):
96 resp, descs = self.server.descriptions(self.GROUP_PAT)
97 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
98 self.assertTrue(
99 resp.startswith("215 ") or resp.startswith("282 "), resp)
100 self.assertIsInstance(descs, dict)
101 desc = descs[self.GROUP_NAME]
102 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
103
104 def test_group(self):
105 result = self.server.group(self.GROUP_NAME)
106 self.assertEqual(5, len(result))
107 resp, count, first, last, group = result
108 self.assertEqual(group, self.GROUP_NAME)
109 self.assertIsInstance(count, int)
110 self.assertIsInstance(first, int)
111 self.assertIsInstance(last, int)
112 self.assertLessEqual(first, last)
113 self.assertTrue(resp.startswith("211 "), resp)
114
115 def test_date(self):
116 resp, date = self.server.date()
117 self.assertIsInstance(date, datetime.datetime)
118 # Sanity check
119 self.assertGreaterEqual(date.year, 1995)
120 self.assertLessEqual(date.year, 2030)
121
122 def _check_art_dict(self, art_dict):
123 # Some sanity checks for a field dictionary returned by OVER / XOVER
124 self.assertIsInstance(art_dict, dict)
125 # NNTP has 7 mandatory fields
126 self.assertGreaterEqual(art_dict.keys(),
127 {"subject", "from", "date", "message-id",
128 "references", ":bytes", ":lines"}
129 )
130 for v in art_dict.values():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000131 self.assertIsInstance(v, (str, type(None)))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000132
133 def test_xover(self):
134 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000135 resp, lines = self.server.xover(last - 5, last)
136 if len(lines) == 0:
137 self.skipTest("no articles retrieved")
138 # The 'last' article is not necessarily part of the output (cancelled?)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000139 art_num, art_dict = lines[0]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000140 self.assertGreaterEqual(art_num, last - 5)
141 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000142 self._check_art_dict(art_dict)
143
Xavier de Gayeac13bee2016-12-16 20:49:10 +0100144 @unittest.skipIf(True, 'temporarily skipped until a permanent solution'
145 ' is found for issue #28971')
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000146 def test_over(self):
147 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
148 start = last - 10
149 # The "start-" article range form
150 resp, lines = self.server.over((start, None))
151 art_num, art_dict = lines[0]
152 self._check_art_dict(art_dict)
153 # The "start-end" article range form
154 resp, lines = self.server.over((start, last))
155 art_num, art_dict = lines[-1]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000156 # The 'last' article is not necessarily part of the output (cancelled?)
157 self.assertGreaterEqual(art_num, start)
158 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000159 self._check_art_dict(art_dict)
160 # XXX The "message_id" form is unsupported by gmane
161 # 503 Overview by message-ID unsupported
162
163 def test_xhdr(self):
164 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
165 resp, lines = self.server.xhdr('subject', last)
166 for line in lines:
167 self.assertEqual(str, type(line[1]))
168
169 def check_article_resp(self, resp, article, art_num=None):
170 self.assertIsInstance(article, nntplib.ArticleInfo)
171 if art_num is not None:
172 self.assertEqual(article.number, art_num)
173 for line in article.lines:
174 self.assertIsInstance(line, bytes)
175 # XXX this could exceptionally happen...
176 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
177
Victor Stinner706cb312017-11-25 02:42:18 +0100178 @unittest.skipIf(True, "FIXME: see bpo-32128")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000179 def test_article_head_body(self):
180 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000181 # Try to find an available article
182 for art_num in (last, first, last - 1):
183 try:
184 resp, head = self.server.head(art_num)
185 except nntplib.NNTPTemporaryError as e:
186 if not e.response.startswith("423 "):
187 raise
188 # "423 No such article" => choose another one
189 continue
190 break
191 else:
192 self.skipTest("could not find a suitable article number")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000193 self.assertTrue(resp.startswith("221 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000194 self.check_article_resp(resp, head, art_num)
195 resp, body = self.server.body(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000196 self.assertTrue(resp.startswith("222 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000197 self.check_article_resp(resp, body, art_num)
198 resp, article = self.server.article(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000199 self.assertTrue(resp.startswith("220 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000200 self.check_article_resp(resp, article, art_num)
Nick Coghlan14d99a12012-06-17 21:27:18 +1000201 # Tolerate running the tests from behind a NNTP virus checker
Victor Stinnerfabd7bb2020-08-11 15:26:59 +0200202 denylist = lambda line: line.startswith(b'X-Antivirus')
Antoine Pitrou1f5d2a02012-06-24 16:28:18 +0200203 filtered_head_lines = [line for line in head.lines
Victor Stinnerfabd7bb2020-08-11 15:26:59 +0200204 if not denylist(line)]
Nick Coghlan14d99a12012-06-17 21:27:18 +1000205 filtered_lines = [line for line in article.lines
Victor Stinnerfabd7bb2020-08-11 15:26:59 +0200206 if not denylist(line)]
Antoine Pitrou1f5d2a02012-06-24 16:28:18 +0200207 self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000208
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000209 def test_capabilities(self):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000210 # The server under test implements NNTP version 2 and has a
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000211 # couple of well-known capabilities. Just sanity check that we
212 # got them.
213 def _check_caps(caps):
214 caps_list = caps['LIST']
215 self.assertIsInstance(caps_list, (list, tuple))
216 self.assertIn('OVERVIEW.FMT', caps_list)
217 self.assertGreaterEqual(self.server.nntp_version, 2)
218 _check_caps(self.server.getcapabilities())
219 # This re-emits the command
220 resp, caps = self.server.capabilities()
221 _check_caps(caps)
222
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000223 def test_zlogin(self):
224 # This test must be the penultimate because further commands will be
225 # refused.
226 baduser = "notarealuser"
227 badpw = "notarealpassword"
228 # Check that bogus credentials cause failure
229 self.assertRaises(nntplib.NNTPError, self.server.login,
230 user=baduser, password=badpw, usenetrc=False)
231 # FIXME: We should check that correct credentials succeed, but that
232 # would require valid details for some server somewhere to be in the
233 # test suite, I think. Gmane is anonymous, at least as used for the
234 # other tests.
235
236 def test_zzquit(self):
237 # This test must be called last, hence the name
238 cls = type(self)
Antoine Pitrou3bce11c2010-11-21 17:14:19 +0000239 try:
240 self.server.quit()
241 finally:
242 cls.server = None
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000243
Antoine Pitroude609182010-11-18 17:29:23 +0000244 @classmethod
245 def wrap_methods(cls):
246 # Wrap all methods in a transient_internet() exception catcher
247 # XXX put a generic version in test.support?
248 def wrap_meth(meth):
249 @functools.wraps(meth)
250 def wrapped(self):
Serhiy Storchakabfb1cf42020-04-29 10:36:20 +0300251 with socket_helper.transient_internet(self.NNTP_HOST):
Antoine Pitroude609182010-11-18 17:29:23 +0000252 meth(self)
253 return wrapped
254 for name in dir(cls):
255 if not name.startswith('test_'):
256 continue
257 meth = getattr(cls, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200258 if not callable(meth):
Antoine Pitroude609182010-11-18 17:29:23 +0000259 continue
260 # Need to use a closure so that meth remains bound to its current
261 # value
262 setattr(cls, name, wrap_meth(meth))
263
Dong-hee Na1b335ae2020-01-12 02:39:15 +0900264 def test_timeout(self):
265 with self.assertRaises(ValueError):
266 self.NNTP_CLASS(self.NNTP_HOST, timeout=0, usenetrc=False)
267
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000268 def test_with_statement(self):
269 def is_connected():
270 if not hasattr(server, 'file'):
271 return False
272 try:
273 server.help()
Andrew Svetlov0832af62012-12-18 23:10:48 +0200274 except (OSError, EOFError):
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000275 return False
276 return True
277
Christian Heimese9832522021-05-01 20:53:10 +0200278 kwargs = dict(
279 timeout=support.INTERNET_TIMEOUT,
280 usenetrc=False
281 )
282 if self.ssl_context is not None:
283 kwargs["ssl_context"] = self.ssl_context
284
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400285 try:
Christian Heimese9832522021-05-01 20:53:10 +0200286 server = self.NNTP_CLASS(self.NNTP_HOST, **kwargs)
Victor Stinner1d0f9b32019-12-10 22:09:23 +0100287 with server:
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400288 self.assertTrue(is_connected())
289 self.assertTrue(server.help())
290 self.assertFalse(is_connected())
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000291
Christian Heimese9832522021-05-01 20:53:10 +0200292 server = self.NNTP_CLASS(self.NNTP_HOST, **kwargs)
Victor Stinner1d0f9b32019-12-10 22:09:23 +0100293 with server:
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400294 server.quit()
295 self.assertFalse(is_connected())
296 except SSLError as ssl_err:
297 # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
298 if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
299 raise unittest.SkipTest(f"Got {ssl_err} connecting "
300 f"to {self.NNTP_HOST!r}")
301 raise
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000302
303
Antoine Pitroude609182010-11-18 17:29:23 +0000304NetworkedNNTPTestsMixin.wrap_methods()
305
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000306
INADA Naoki067931d2017-07-26 23:43:22 +0900307EOF_ERRORS = (EOFError,)
Victor Stinner5b4feb72017-07-24 17:41:02 +0200308if ssl is not None:
INADA Naoki067931d2017-07-26 23:43:22 +0900309 EOF_ERRORS += (ssl.SSLEOFError,)
Victor Stinner5b4feb72017-07-24 17:41:02 +0200310
311
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000312class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
313 # This server supports STARTTLS (gmane doesn't)
314 NNTP_HOST = 'news.trigofacile.com'
315 GROUP_NAME = 'fr.comp.lang.python'
316 GROUP_PAT = 'fr.comp.lang.*'
Dong-hee Naec316532021-01-01 23:20:33 +0900317 DESC = 'Python'
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000318
Antoine Pitroude609182010-11-18 17:29:23 +0000319 NNTP_CLASS = NNTP
320
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000321 @classmethod
322 def setUpClass(cls):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000323 support.requires("network")
Christian Heimese9832522021-05-01 20:53:10 +0200324 kwargs = dict(
325 timeout=support.INTERNET_TIMEOUT,
326 usenetrc=False
327 )
328 if cls.ssl_context is not None:
329 kwargs["ssl_context"] = cls.ssl_context
Serhiy Storchakabfb1cf42020-04-29 10:36:20 +0300330 with socket_helper.transient_internet(cls.NNTP_HOST):
Victor Stinner5bccca52017-04-27 17:30:13 +0200331 try:
Christian Heimese9832522021-05-01 20:53:10 +0200332 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, **kwargs)
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400333 except SSLError as ssl_err:
334 # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
335 if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
336 raise unittest.SkipTest(f"{cls} got {ssl_err} connecting "
337 f"to {cls.NNTP_HOST!r}")
Christian Heimese9832522021-05-01 20:53:10 +0200338 print(cls.NNTP_HOST)
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400339 raise
Victor Stinner5b4feb72017-07-24 17:41:02 +0200340 except EOF_ERRORS:
Victor Stinner5bccca52017-04-27 17:30:13 +0200341 raise unittest.SkipTest(f"{cls} got EOF error on connecting "
342 f"to {cls.NNTP_HOST!r}")
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000343
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000344 @classmethod
345 def tearDownClass(cls):
346 if cls.server is not None:
347 cls.server.quit()
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000348
Serhiy Storchaka43767632013-11-03 21:31:38 +0200349@unittest.skipUnless(ssl, 'requires SSL support')
350class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000351
Serhiy Storchaka43767632013-11-03 21:31:38 +0200352 # Technical limits for this public NNTP server (see http://www.aioe.org):
353 # "Only two concurrent connections per IP address are allowed and
354 # 400 connections per day are accepted from each IP address."
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000355
Serhiy Storchaka43767632013-11-03 21:31:38 +0200356 NNTP_HOST = 'nntp.aioe.org'
Dong-hee Naec316532021-01-01 23:20:33 +0900357 # bpo-42794: aioe.test is one of the official groups on this server
358 # used for testing: https://news.aioe.org/manual/aioe-hierarchy/
359 GROUP_NAME = 'aioe.test'
360 GROUP_PAT = 'aioe.*'
361 DESC = 'test'
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000362
Serhiy Storchaka43767632013-11-03 21:31:38 +0200363 NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None)
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000364
Serhiy Storchaka43767632013-11-03 21:31:38 +0200365 # Disabled as it produces too much data
366 test_list = None
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000367
Serhiy Storchaka43767632013-11-03 21:31:38 +0200368 # Disabled as the connection will already be encrypted.
369 test_starttls = None
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000370
Christian Heimese9832522021-05-01 20:53:10 +0200371 ssl_context = ssl._create_unverified_context()
372 ssl_context.set_ciphers("DEFAULT")
373 ssl_context.maximum_version = ssl.TLSVersion.TLSv1_2
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000374
375#
376# Non-networked tests using a local server (or something mocking it).
377#
378
379class _NNTPServerIO(io.RawIOBase):
380 """A raw IO object allowing NNTP commands to be received and processed
381 by a handler. The handler can push responses which can then be read
382 from the IO object."""
383
384 def __init__(self, handler):
385 io.RawIOBase.__init__(self)
386 # The channel from the client
387 self.c2s = io.BytesIO()
388 # The channel to the client
389 self.s2c = io.BytesIO()
390 self.handler = handler
391 self.handler.start(self.c2s.readline, self.push_data)
392
393 def readable(self):
394 return True
395
396 def writable(self):
397 return True
398
399 def push_data(self, data):
400 """Push (buffer) some data to send to the client."""
401 pos = self.s2c.tell()
402 self.s2c.seek(0, 2)
403 self.s2c.write(data)
404 self.s2c.seek(pos)
405
406 def write(self, b):
407 """The client sends us some data"""
408 pos = self.c2s.tell()
409 self.c2s.write(b)
410 self.c2s.seek(pos)
411 self.handler.process_pending()
412 return len(b)
413
414 def readinto(self, buf):
415 """The client wants to read a response"""
416 self.handler.process_pending()
417 b = self.s2c.read(len(buf))
418 n = len(b)
419 buf[:n] = b
420 return n
421
422
Serhiy Storchaka52027c32015-03-21 09:40:26 +0200423def make_mock_file(handler):
424 sio = _NNTPServerIO(handler)
425 # Using BufferedRWPair instead of BufferedRandom ensures the file
426 # isn't seekable.
427 file = io.BufferedRWPair(sio, sio)
428 return (sio, file)
429
430
Dong-hee Naaa92a7c2020-05-16 19:31:54 +0900431class NNTPServer(nntplib.NNTP):
432
433 def __init__(self, f, host, readermode=None):
434 self.file = f
435 self.host = host
436 self._base_init(readermode)
437
438 def _close(self):
439 self.file.close()
440 del self.file
441
442
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000443class MockedNNTPTestsMixin:
444 # Override in derived classes
445 handler_class = None
446
447 def setUp(self):
448 super().setUp()
449 self.make_server()
450
451 def tearDown(self):
452 super().tearDown()
453 del self.server
454
455 def make_server(self, *args, **kwargs):
456 self.handler = self.handler_class()
Serhiy Storchaka52027c32015-03-21 09:40:26 +0200457 self.sio, file = make_mock_file(self.handler)
Dong-hee Naaa92a7c2020-05-16 19:31:54 +0900458 self.server = NNTPServer(file, 'test.server', *args, **kwargs)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000459 return self.server
460
461
Antoine Pitrou71135622012-02-14 23:29:34 +0100462class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin):
463 def setUp(self):
464 super().setUp()
465 self.make_server(readermode=True)
466
467
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000468class NNTPv1Handler:
469 """A handler for RFC 977"""
470
471 welcome = "200 NNTP mock server"
472
473 def start(self, readline, push_data):
474 self.in_body = False
475 self.allow_posting = True
476 self._readline = readline
477 self._push_data = push_data
Antoine Pitrou54411c12012-02-12 19:14:17 +0100478 self._logged_in = False
479 self._user_sent = False
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000480 # Our welcome
481 self.handle_welcome()
482
483 def _decode(self, data):
484 return str(data, "utf-8", "surrogateescape")
485
486 def process_pending(self):
487 if self.in_body:
488 while True:
489 line = self._readline()
490 if not line:
491 return
492 self.body.append(line)
493 if line == b".\r\n":
494 break
495 try:
496 meth, tokens = self.body_callback
497 meth(*tokens, body=self.body)
498 finally:
499 self.body_callback = None
500 self.body = None
501 self.in_body = False
502 while True:
503 line = self._decode(self._readline())
504 if not line:
505 return
506 if not line.endswith("\r\n"):
507 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
508 line = line[:-2]
509 cmd, *tokens = line.split()
510 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
511 meth = getattr(self, "handle_" + cmd.upper(), None)
512 if meth is None:
513 self.handle_unknown()
514 else:
515 try:
516 meth(*tokens)
517 except Exception as e:
518 raise ValueError("command failed: {!r}".format(line)) from e
519 else:
520 if self.in_body:
521 self.body_callback = meth, tokens
522 self.body = []
523
524 def expect_body(self):
525 """Flag that the client is expected to post a request body"""
526 self.in_body = True
527
528 def push_data(self, data):
529 """Push some binary data"""
530 self._push_data(data)
531
532 def push_lit(self, lit):
533 """Push a string literal"""
534 lit = textwrap.dedent(lit)
535 lit = "\r\n".join(lit.splitlines()) + "\r\n"
536 lit = lit.encode('utf-8')
537 self.push_data(lit)
538
539 def handle_unknown(self):
540 self.push_lit("500 What?")
541
542 def handle_welcome(self):
543 self.push_lit(self.welcome)
544
545 def handle_QUIT(self):
546 self.push_lit("205 Bye!")
547
548 def handle_DATE(self):
549 self.push_lit("111 20100914001155")
550
551 def handle_GROUP(self, group):
552 if group == "fr.comp.lang.python":
553 self.push_lit("211 486 761 1265 fr.comp.lang.python")
554 else:
555 self.push_lit("411 No such group {}".format(group))
556
557 def handle_HELP(self):
558 self.push_lit("""\
559 100 Legal commands
560 authinfo user Name|pass Password|generic <prog> <args>
561 date
562 help
563 Report problems to <root@example.org>
564 .""")
565
566 def handle_STAT(self, message_spec=None):
567 if message_spec is None:
568 self.push_lit("412 No newsgroup selected")
569 elif message_spec == "3000234":
570 self.push_lit("223 3000234 <45223423@example.com>")
571 elif message_spec == "<45223423@example.com>":
572 self.push_lit("223 0 <45223423@example.com>")
573 else:
574 self.push_lit("430 No Such Article Found")
575
576 def handle_NEXT(self):
577 self.push_lit("223 3000237 <668929@example.org> retrieved")
578
579 def handle_LAST(self):
580 self.push_lit("223 3000234 <45223423@example.com> retrieved")
581
582 def handle_LIST(self, action=None, param=None):
583 if action is None:
584 self.push_lit("""\
585 215 Newsgroups in form "group high low flags".
586 comp.lang.python 0000052340 0000002828 y
587 comp.lang.python.announce 0000001153 0000000993 m
588 free.it.comp.lang.python 0000000002 0000000002 y
589 fr.comp.lang.python 0000001254 0000000760 y
590 free.it.comp.lang.python.learner 0000000000 0000000001 y
591 tw.bbs.comp.lang.python 0000000304 0000000304 y
592 .""")
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000593 elif action == "ACTIVE":
594 if param == "*distutils*":
595 self.push_lit("""\
596 215 Newsgroups in form "group high low flags"
597 gmane.comp.python.distutils.devel 0000014104 0000000001 m
598 gmane.comp.python.distutils.cvs 0000000000 0000000001 m
599 .""")
600 else:
601 self.push_lit("""\
602 215 Newsgroups in form "group high low flags"
603 .""")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000604 elif action == "OVERVIEW.FMT":
605 self.push_lit("""\
606 215 Order of fields in overview database.
607 Subject:
608 From:
609 Date:
610 Message-ID:
611 References:
612 Bytes:
613 Lines:
614 Xref:full
615 .""")
616 elif action == "NEWSGROUPS":
617 assert param is not None
618 if param == "comp.lang.python":
619 self.push_lit("""\
620 215 Descriptions in form "group description".
621 comp.lang.python\tThe Python computer language.
622 .""")
623 elif param == "comp.lang.python*":
624 self.push_lit("""\
625 215 Descriptions in form "group description".
626 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
627 comp.lang.python\tThe Python computer language.
628 .""")
629 else:
630 self.push_lit("""\
631 215 Descriptions in form "group description".
632 .""")
633 else:
634 self.push_lit('501 Unknown LIST keyword')
635
636 def handle_NEWNEWS(self, group, date_str, time_str):
637 # We hard code different return messages depending on passed
638 # argument and date syntax.
639 if (group == "comp.lang.python" and date_str == "20100913"
640 and time_str == "082004"):
641 # Date was passed in RFC 3977 format (NNTP "v2")
642 self.push_lit("""\
643 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
644 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
645 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
646 .""")
647 elif (group == "comp.lang.python" and date_str == "100913"
648 and time_str == "082004"):
649 # Date was passed in RFC 977 format (NNTP "v1")
650 self.push_lit("""\
651 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
652 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
653 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
654 .""")
Georg Brandl28e78412013-10-27 07:29:47 +0100655 elif (group == 'comp.lang.python' and
656 date_str in ('20100101', '100101') and
657 time_str == '090000'):
658 self.push_lit('too long line' * 3000 +
659 '\n.')
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000660 else:
661 self.push_lit("""\
662 230 An empty list of newsarticles follows
663 .""")
664 # (Note for experiments: many servers disable NEWNEWS.
665 # As of this writing, sicinfo3.epfl.ch doesn't.)
666
667 def handle_XOVER(self, message_spec):
668 if message_spec == "57-59":
669 self.push_lit(
670 "224 Overview information for 57-58 follows\n"
671 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
672 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
673 "\tSat, 19 Jun 2010 18:04:08 -0400"
674 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
675 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
Dong-hee Na2e6a8ef2020-01-09 00:29:34 +0900676 "\tXref: news.gmane.io gmane.comp.python.authors:57"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000677 "\n"
678 "58\tLooking for a few good bloggers"
679 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
680 "\tThu, 22 Jul 2010 09:14:14 -0400"
681 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
682 "\t\t6683\t16"
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000683 "\t"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000684 "\n"
Martin Panter6245cb32016-04-15 02:14:19 +0000685 # A UTF-8 overview line from fr.comp.lang.python
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000686 "59\tRe: Message d'erreur incompréhensible (par moi)"
687 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
688 "\tWed, 15 Sep 2010 18:09:15 +0200"
689 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
690 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
691 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
692 "\n"
693 ".\n")
694 else:
695 self.push_lit("""\
696 224 No articles
697 .""")
698
699 def handle_POST(self, *, body=None):
700 if body is None:
701 if self.allow_posting:
702 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
703 self.expect_body()
704 else:
705 self.push_lit("440 Posting not permitted")
706 else:
707 assert self.allow_posting
708 self.push_lit("240 Article received OK")
709 self.posted_body = body
710
711 def handle_IHAVE(self, message_id, *, body=None):
712 if body is None:
713 if (self.allow_posting and
714 message_id == "<i.am.an.article.you.will.want@example.com>"):
715 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
716 self.expect_body()
717 else:
718 self.push_lit("435 Article not wanted")
719 else:
720 assert self.allow_posting
721 self.push_lit("235 Article transferred OK")
722 self.posted_body = body
723
724 sample_head = """\
725 From: "Demo User" <nobody@example.net>
726 Subject: I am just a test article
727 Content-Type: text/plain; charset=UTF-8; format=flowed
728 Message-ID: <i.am.an.article.you.will.want@example.com>"""
729
730 sample_body = """\
731 This is just a test article.
732 ..Here is a dot-starting line.
733
734 -- Signed by Andr\xe9."""
735
736 sample_article = sample_head + "\n\n" + sample_body
737
738 def handle_ARTICLE(self, message_spec=None):
739 if message_spec is None:
740 self.push_lit("220 3000237 <45223423@example.com>")
741 elif message_spec == "<45223423@example.com>":
742 self.push_lit("220 0 <45223423@example.com>")
743 elif message_spec == "3000234":
744 self.push_lit("220 3000234 <45223423@example.com>")
745 else:
746 self.push_lit("430 No Such Article Found")
747 return
748 self.push_lit(self.sample_article)
749 self.push_lit(".")
750
751 def handle_HEAD(self, message_spec=None):
752 if message_spec is None:
753 self.push_lit("221 3000237 <45223423@example.com>")
754 elif message_spec == "<45223423@example.com>":
755 self.push_lit("221 0 <45223423@example.com>")
756 elif message_spec == "3000234":
757 self.push_lit("221 3000234 <45223423@example.com>")
758 else:
759 self.push_lit("430 No Such Article Found")
760 return
761 self.push_lit(self.sample_head)
762 self.push_lit(".")
763
764 def handle_BODY(self, message_spec=None):
765 if message_spec is None:
766 self.push_lit("222 3000237 <45223423@example.com>")
767 elif message_spec == "<45223423@example.com>":
768 self.push_lit("222 0 <45223423@example.com>")
769 elif message_spec == "3000234":
770 self.push_lit("222 3000234 <45223423@example.com>")
771 else:
772 self.push_lit("430 No Such Article Found")
773 return
774 self.push_lit(self.sample_body)
775 self.push_lit(".")
776
Antoine Pitrou54411c12012-02-12 19:14:17 +0100777 def handle_AUTHINFO(self, cred_type, data):
778 if self._logged_in:
779 self.push_lit('502 Already Logged In')
780 elif cred_type == 'user':
781 if self._user_sent:
782 self.push_lit('482 User Credential Already Sent')
783 else:
784 self.push_lit('381 Password Required')
785 self._user_sent = True
786 elif cred_type == 'pass':
787 self.push_lit('281 Login Successful')
788 self._logged_in = True
789 else:
790 raise Exception('Unknown cred type {}'.format(cred_type))
791
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000792
793class NNTPv2Handler(NNTPv1Handler):
794 """A handler for RFC 3977 (NNTP "v2")"""
795
796 def handle_CAPABILITIES(self):
Antoine Pitrou54411c12012-02-12 19:14:17 +0100797 fmt = """\
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000798 101 Capability list:
Antoine Pitrouf80b3f72010-11-02 22:31:52 +0000799 VERSION 2 3
Antoine Pitrou54411c12012-02-12 19:14:17 +0100800 IMPLEMENTATION INN 2.5.1{}
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000801 HDR
802 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
803 OVER
804 POST
805 READER
Antoine Pitrou54411c12012-02-12 19:14:17 +0100806 ."""
807
808 if not self._logged_in:
809 self.push_lit(fmt.format('\n AUTHINFO USER'))
810 else:
811 self.push_lit(fmt.format(''))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000812
Antoine Pitrou71135622012-02-14 23:29:34 +0100813 def handle_MODE(self, _):
814 raise Exception('MODE READER sent despite READER has been advertised')
815
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000816 def handle_OVER(self, message_spec=None):
817 return self.handle_XOVER(message_spec)
818
819
Antoine Pitrou54411c12012-02-12 19:14:17 +0100820class CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
821 """A handler that allows CAPABILITIES only after login"""
822
823 def handle_CAPABILITIES(self):
824 if not self._logged_in:
825 self.push_lit('480 You must log in.')
826 else:
827 super().handle_CAPABILITIES()
828
829
Antoine Pitrou71135622012-02-14 23:29:34 +0100830class ModeSwitchingNNTPv2Handler(NNTPv2Handler):
831 """A server that starts in transit mode"""
832
833 def __init__(self):
834 self._switched = False
835
836 def handle_CAPABILITIES(self):
837 fmt = """\
838 101 Capability list:
839 VERSION 2 3
840 IMPLEMENTATION INN 2.5.1
841 HDR
842 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
843 OVER
844 POST
845 {}READER
846 ."""
847 if self._switched:
848 self.push_lit(fmt.format(''))
849 else:
850 self.push_lit(fmt.format('MODE-'))
851
852 def handle_MODE(self, what):
853 assert not self._switched and what == 'reader'
854 self._switched = True
855 self.push_lit('200 Posting allowed')
856
857
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000858class NNTPv1v2TestsMixin:
859
860 def setUp(self):
861 super().setUp()
862
863 def test_welcome(self):
864 self.assertEqual(self.server.welcome, self.handler.welcome)
865
Antoine Pitrou54411c12012-02-12 19:14:17 +0100866 def test_authinfo(self):
867 if self.nntp_version == 2:
868 self.assertIn('AUTHINFO', self.server._caps)
869 self.server.login('testuser', 'testpw')
870 # if AUTHINFO is gone from _caps we also know that getcapabilities()
871 # has been called after login as it should
872 self.assertNotIn('AUTHINFO', self.server._caps)
873
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000874 def test_date(self):
875 resp, date = self.server.date()
876 self.assertEqual(resp, "111 20100914001155")
877 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
878
879 def test_quit(self):
880 self.assertFalse(self.sio.closed)
881 resp = self.server.quit()
882 self.assertEqual(resp, "205 Bye!")
883 self.assertTrue(self.sio.closed)
884
885 def test_help(self):
886 resp, help = self.server.help()
887 self.assertEqual(resp, "100 Legal commands")
888 self.assertEqual(help, [
889 ' authinfo user Name|pass Password|generic <prog> <args>',
890 ' date',
891 ' help',
892 'Report problems to <root@example.org>',
893 ])
894
895 def test_list(self):
896 resp, groups = self.server.list()
897 self.assertEqual(len(groups), 6)
898 g = groups[1]
899 self.assertEqual(g,
900 GroupInfo("comp.lang.python.announce", "0000001153",
901 "0000000993", "m"))
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000902 resp, groups = self.server.list("*distutils*")
903 self.assertEqual(len(groups), 2)
904 g = groups[0]
905 self.assertEqual(g,
906 GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
907 "0000000001", "m"))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000908
909 def test_stat(self):
910 resp, art_num, message_id = self.server.stat(3000234)
911 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
912 self.assertEqual(art_num, 3000234)
913 self.assertEqual(message_id, "<45223423@example.com>")
914 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
915 self.assertEqual(resp, "223 0 <45223423@example.com>")
916 self.assertEqual(art_num, 0)
917 self.assertEqual(message_id, "<45223423@example.com>")
918 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
919 self.server.stat("<non.existent.id>")
920 self.assertEqual(cm.exception.response, "430 No Such Article Found")
921 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
922 self.server.stat()
923 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
924
925 def test_next(self):
926 resp, art_num, message_id = self.server.next()
927 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
928 self.assertEqual(art_num, 3000237)
929 self.assertEqual(message_id, "<668929@example.org>")
930
931 def test_last(self):
932 resp, art_num, message_id = self.server.last()
933 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
934 self.assertEqual(art_num, 3000234)
935 self.assertEqual(message_id, "<45223423@example.com>")
936
937 def test_description(self):
938 desc = self.server.description("comp.lang.python")
939 self.assertEqual(desc, "The Python computer language.")
940 desc = self.server.description("comp.lang.pythonx")
941 self.assertEqual(desc, "")
942
943 def test_descriptions(self):
944 resp, groups = self.server.descriptions("comp.lang.python")
945 self.assertEqual(resp, '215 Descriptions in form "group description".')
946 self.assertEqual(groups, {
947 "comp.lang.python": "The Python computer language.",
948 })
949 resp, groups = self.server.descriptions("comp.lang.python*")
950 self.assertEqual(groups, {
951 "comp.lang.python": "The Python computer language.",
952 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
953 })
954 resp, groups = self.server.descriptions("comp.lang.pythonx")
955 self.assertEqual(groups, {})
956
957 def test_group(self):
958 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
959 self.assertTrue(resp.startswith("211 "), resp)
960 self.assertEqual(first, 761)
961 self.assertEqual(last, 1265)
962 self.assertEqual(count, 486)
963 self.assertEqual(group, "fr.comp.lang.python")
964 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
965 self.server.group("comp.lang.python.devel")
966 exc = cm.exception
967 self.assertTrue(exc.response.startswith("411 No such group"),
968 exc.response)
969
970 def test_newnews(self):
971 # NEWNEWS comp.lang.python [20]100913 082004
972 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
973 resp, ids = self.server.newnews("comp.lang.python", dt)
974 expected = (
975 "230 list of newsarticles (NNTP v{0}) "
976 "created after Mon Sep 13 08:20:04 2010 follows"
977 ).format(self.nntp_version)
978 self.assertEqual(resp, expected)
979 self.assertEqual(ids, [
980 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
981 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
982 ])
983 # NEWNEWS fr.comp.lang.python [20]100913 082004
984 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
985 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
986 self.assertEqual(resp, "230 An empty list of newsarticles follows")
987 self.assertEqual(ids, [])
988
989 def _check_article_body(self, lines):
990 self.assertEqual(len(lines), 4)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +0000991 self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000992 self.assertEqual(lines[-2], b"")
993 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
994 self.assertEqual(lines[-4], b"This is just a test article.")
995
996 def _check_article_head(self, lines):
997 self.assertEqual(len(lines), 4)
998 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
999 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
1000
1001 def _check_article_data(self, lines):
1002 self.assertEqual(len(lines), 9)
1003 self._check_article_head(lines[:4])
1004 self._check_article_body(lines[-4:])
1005 self.assertEqual(lines[4], b"")
1006
1007 def test_article(self):
1008 # ARTICLE
1009 resp, info = self.server.article()
1010 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
1011 art_num, message_id, lines = info
1012 self.assertEqual(art_num, 3000237)
1013 self.assertEqual(message_id, "<45223423@example.com>")
1014 self._check_article_data(lines)
1015 # ARTICLE num
1016 resp, info = self.server.article(3000234)
1017 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
1018 art_num, message_id, lines = info
1019 self.assertEqual(art_num, 3000234)
1020 self.assertEqual(message_id, "<45223423@example.com>")
1021 self._check_article_data(lines)
1022 # ARTICLE id
1023 resp, info = self.server.article("<45223423@example.com>")
1024 self.assertEqual(resp, "220 0 <45223423@example.com>")
1025 art_num, message_id, lines = info
1026 self.assertEqual(art_num, 0)
1027 self.assertEqual(message_id, "<45223423@example.com>")
1028 self._check_article_data(lines)
1029 # Non-existent id
1030 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1031 self.server.article("<non-existent@example.com>")
1032 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1033
1034 def test_article_file(self):
1035 # With a "file" argument
1036 f = io.BytesIO()
1037 resp, info = self.server.article(file=f)
1038 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
1039 art_num, message_id, lines = info
1040 self.assertEqual(art_num, 3000237)
1041 self.assertEqual(message_id, "<45223423@example.com>")
1042 self.assertEqual(lines, [])
1043 data = f.getvalue()
1044 self.assertTrue(data.startswith(
1045 b'From: "Demo User" <nobody@example.net>\r\n'
1046 b'Subject: I am just a test article\r\n'
1047 ), ascii(data))
1048 self.assertTrue(data.endswith(
1049 b'This is just a test article.\r\n'
1050 b'.Here is a dot-starting line.\r\n'
1051 b'\r\n'
1052 b'-- Signed by Andr\xc3\xa9.\r\n'
1053 ), ascii(data))
1054
1055 def test_head(self):
1056 # HEAD
1057 resp, info = self.server.head()
1058 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1059 art_num, message_id, lines = info
1060 self.assertEqual(art_num, 3000237)
1061 self.assertEqual(message_id, "<45223423@example.com>")
1062 self._check_article_head(lines)
1063 # HEAD num
1064 resp, info = self.server.head(3000234)
1065 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
1066 art_num, message_id, lines = info
1067 self.assertEqual(art_num, 3000234)
1068 self.assertEqual(message_id, "<45223423@example.com>")
1069 self._check_article_head(lines)
1070 # HEAD id
1071 resp, info = self.server.head("<45223423@example.com>")
1072 self.assertEqual(resp, "221 0 <45223423@example.com>")
1073 art_num, message_id, lines = info
1074 self.assertEqual(art_num, 0)
1075 self.assertEqual(message_id, "<45223423@example.com>")
1076 self._check_article_head(lines)
1077 # Non-existent id
1078 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1079 self.server.head("<non-existent@example.com>")
1080 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1081
Antoine Pitrou2640b522012-02-15 18:53:18 +01001082 def test_head_file(self):
1083 f = io.BytesIO()
1084 resp, info = self.server.head(file=f)
1085 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1086 art_num, message_id, lines = info
1087 self.assertEqual(art_num, 3000237)
1088 self.assertEqual(message_id, "<45223423@example.com>")
1089 self.assertEqual(lines, [])
1090 data = f.getvalue()
1091 self.assertTrue(data.startswith(
1092 b'From: "Demo User" <nobody@example.net>\r\n'
1093 b'Subject: I am just a test article\r\n'
1094 ), ascii(data))
1095 self.assertFalse(data.endswith(
1096 b'This is just a test article.\r\n'
1097 b'.Here is a dot-starting line.\r\n'
1098 b'\r\n'
1099 b'-- Signed by Andr\xc3\xa9.\r\n'
1100 ), ascii(data))
1101
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001102 def test_body(self):
1103 # BODY
1104 resp, info = self.server.body()
1105 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1106 art_num, message_id, lines = info
1107 self.assertEqual(art_num, 3000237)
1108 self.assertEqual(message_id, "<45223423@example.com>")
1109 self._check_article_body(lines)
1110 # BODY num
1111 resp, info = self.server.body(3000234)
1112 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
1113 art_num, message_id, lines = info
1114 self.assertEqual(art_num, 3000234)
1115 self.assertEqual(message_id, "<45223423@example.com>")
1116 self._check_article_body(lines)
1117 # BODY id
1118 resp, info = self.server.body("<45223423@example.com>")
1119 self.assertEqual(resp, "222 0 <45223423@example.com>")
1120 art_num, message_id, lines = info
1121 self.assertEqual(art_num, 0)
1122 self.assertEqual(message_id, "<45223423@example.com>")
1123 self._check_article_body(lines)
1124 # Non-existent id
1125 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1126 self.server.body("<non-existent@example.com>")
1127 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1128
Antoine Pitrou2640b522012-02-15 18:53:18 +01001129 def test_body_file(self):
1130 f = io.BytesIO()
1131 resp, info = self.server.body(file=f)
1132 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1133 art_num, message_id, lines = info
1134 self.assertEqual(art_num, 3000237)
1135 self.assertEqual(message_id, "<45223423@example.com>")
1136 self.assertEqual(lines, [])
1137 data = f.getvalue()
1138 self.assertFalse(data.startswith(
1139 b'From: "Demo User" <nobody@example.net>\r\n'
1140 b'Subject: I am just a test article\r\n'
1141 ), ascii(data))
1142 self.assertTrue(data.endswith(
1143 b'This is just a test article.\r\n'
1144 b'.Here is a dot-starting line.\r\n'
1145 b'\r\n'
1146 b'-- Signed by Andr\xc3\xa9.\r\n'
1147 ), ascii(data))
1148
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001149 def check_over_xover_resp(self, resp, overviews):
1150 self.assertTrue(resp.startswith("224 "), resp)
1151 self.assertEqual(len(overviews), 3)
1152 art_num, over = overviews[0]
1153 self.assertEqual(art_num, 57)
1154 self.assertEqual(over, {
1155 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
1156 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
1157 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
1158 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
1159 "references": "<hvalf7$ort$1@dough.gmane.org>",
1160 ":bytes": "7103",
1161 ":lines": "16",
Dong-hee Na2e6a8ef2020-01-09 00:29:34 +09001162 "xref": "news.gmane.io gmane.comp.python.authors:57"
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001163 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001164 art_num, over = overviews[1]
1165 self.assertEqual(over["xref"], None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001166 art_num, over = overviews[2]
1167 self.assertEqual(over["subject"],
1168 "Re: Message d'erreur incompréhensible (par moi)")
1169
1170 def test_xover(self):
1171 resp, overviews = self.server.xover(57, 59)
1172 self.check_over_xover_resp(resp, overviews)
1173
1174 def test_over(self):
1175 # In NNTP "v1", this will fallback on XOVER
1176 resp, overviews = self.server.over((57, 59))
1177 self.check_over_xover_resp(resp, overviews)
1178
1179 sample_post = (
1180 b'From: "Demo User" <nobody@example.net>\r\n'
1181 b'Subject: I am just a test article\r\n'
1182 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
1183 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
1184 b'\r\n'
1185 b'This is just a test article.\r\n'
1186 b'.Here is a dot-starting line.\r\n'
1187 b'\r\n'
1188 b'-- Signed by Andr\xc3\xa9.\r\n'
1189 )
1190
1191 def _check_posted_body(self):
1192 # Check the raw body as received by the server
1193 lines = self.handler.posted_body
1194 # One additional line for the "." terminator
1195 self.assertEqual(len(lines), 10)
1196 self.assertEqual(lines[-1], b'.\r\n')
1197 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
1198 self.assertEqual(lines[-3], b'\r\n')
1199 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
1200 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
1201
1202 def _check_post_ihave_sub(self, func, *args, file_factory):
1203 # First the prepared post with CRLF endings
1204 post = self.sample_post
1205 func_args = args + (file_factory(post),)
1206 self.handler.posted_body = None
1207 resp = func(*func_args)
1208 self._check_posted_body()
1209 # Then the same post with "normal" line endings - they should be
1210 # converted by NNTP.post and NNTP.ihave.
1211 post = self.sample_post.replace(b"\r\n", b"\n")
1212 func_args = args + (file_factory(post),)
1213 self.handler.posted_body = None
1214 resp = func(*func_args)
1215 self._check_posted_body()
1216 return resp
1217
1218 def check_post_ihave(self, func, success_resp, *args):
1219 # With a bytes object
1220 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1221 self.assertEqual(resp, success_resp)
1222 # With a bytearray object
1223 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1224 self.assertEqual(resp, success_resp)
1225 # With a file object
1226 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1227 self.assertEqual(resp, success_resp)
1228 # With an iterable of terminated lines
1229 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001230 return iter(b.splitlines(keepends=True))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001231 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1232 self.assertEqual(resp, success_resp)
1233 # With an iterable of non-terminated lines
1234 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001235 return iter(b.splitlines(keepends=False))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001236 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1237 self.assertEqual(resp, success_resp)
1238
1239 def test_post(self):
1240 self.check_post_ihave(self.server.post, "240 Article received OK")
1241 self.handler.allow_posting = False
1242 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1243 self.server.post(self.sample_post)
1244 self.assertEqual(cm.exception.response,
1245 "440 Posting not permitted")
1246
1247 def test_ihave(self):
1248 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1249 "<i.am.an.article.you.will.want@example.com>")
1250 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1251 self.server.ihave("<another.message.id>", self.sample_post)
1252 self.assertEqual(cm.exception.response,
1253 "435 Article not wanted")
1254
Georg Brandl28e78412013-10-27 07:29:47 +01001255 def test_too_long_lines(self):
1256 dt = datetime.datetime(2010, 1, 1, 9, 0, 0)
1257 self.assertRaises(nntplib.NNTPDataError,
1258 self.server.newnews, "comp.lang.python", dt)
1259
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001260
1261class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1262 """Tests an NNTP v1 server (no capabilities)."""
1263
1264 nntp_version = 1
1265 handler_class = NNTPv1Handler
1266
1267 def test_caps(self):
1268 caps = self.server.getcapabilities()
1269 self.assertEqual(caps, {})
1270 self.assertEqual(self.server.nntp_version, 1)
Antoine Pitroua0781152010-11-05 19:16:37 +00001271 self.assertEqual(self.server.nntp_implementation, None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001272
1273
1274class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1275 """Tests an NNTP v2 server (with capabilities)."""
1276
1277 nntp_version = 2
1278 handler_class = NNTPv2Handler
1279
1280 def test_caps(self):
1281 caps = self.server.getcapabilities()
1282 self.assertEqual(caps, {
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001283 'VERSION': ['2', '3'],
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001284 'IMPLEMENTATION': ['INN', '2.5.1'],
1285 'AUTHINFO': ['USER'],
1286 'HDR': [],
1287 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1288 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1289 'OVER': [],
1290 'POST': [],
1291 'READER': [],
1292 })
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001293 self.assertEqual(self.server.nntp_version, 3)
Antoine Pitroua0781152010-11-05 19:16:37 +00001294 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001295
1296
Antoine Pitrou54411c12012-02-12 19:14:17 +01001297class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
1298 """Tests a probably NNTP v2 server with capabilities only after login."""
1299
1300 nntp_version = 2
1301 handler_class = CapsAfterLoginNNTPv2Handler
1302
1303 def test_caps_only_after_login(self):
1304 self.assertEqual(self.server._caps, {})
1305 self.server.login('testuser', 'testpw')
1306 self.assertIn('VERSION', self.server._caps)
1307
1308
Antoine Pitrou71135622012-02-14 23:29:34 +01001309class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin,
1310 unittest.TestCase):
1311 """Same tests as for v2 but we tell NTTP to send MODE READER to a server
1312 that isn't in READER mode by default."""
1313
1314 nntp_version = 2
1315 handler_class = ModeSwitchingNNTPv2Handler
1316
1317 def test_we_are_in_reader_mode_after_connect(self):
1318 self.assertIn('READER', self.server._caps)
1319
1320
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001321class MiscTests(unittest.TestCase):
1322
1323 def test_decode_header(self):
1324 def gives(a, b):
1325 self.assertEqual(nntplib.decode_header(a), b)
1326 gives("" , "")
1327 gives("a plain header", "a plain header")
1328 gives(" with extra spaces ", " with extra spaces ")
1329 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
1330 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1331 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1332 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1333 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1334 "Re: problème de matrice")
1335 # A natively utf-8 header (found in the real world!)
1336 gives("Re: Message d'erreur incompréhensible (par moi)",
1337 "Re: Message d'erreur incompréhensible (par moi)")
1338
1339 def test_parse_overview_fmt(self):
1340 # The minimal (default) response
1341 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1342 "References:", ":bytes", ":lines"]
1343 self.assertEqual(nntplib._parse_overview_fmt(lines),
1344 ["subject", "from", "date", "message-id", "references",
1345 ":bytes", ":lines"])
1346 # The minimal response using alternative names
1347 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1348 "References:", "Bytes:", "Lines:"]
1349 self.assertEqual(nntplib._parse_overview_fmt(lines),
1350 ["subject", "from", "date", "message-id", "references",
1351 ":bytes", ":lines"])
1352 # Variations in casing
1353 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1354 "References:", "BYTES:", "Lines:"]
1355 self.assertEqual(nntplib._parse_overview_fmt(lines),
1356 ["subject", "from", "date", "message-id", "references",
1357 ":bytes", ":lines"])
1358 # First example from RFC 3977
1359 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1360 "References:", ":bytes", ":lines", "Xref:full",
1361 "Distribution:full"]
1362 self.assertEqual(nntplib._parse_overview_fmt(lines),
1363 ["subject", "from", "date", "message-id", "references",
1364 ":bytes", ":lines", "xref", "distribution"])
1365 # Second example from RFC 3977
1366 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1367 "References:", "Bytes:", "Lines:", "Xref:FULL",
1368 "Distribution:FULL"]
1369 self.assertEqual(nntplib._parse_overview_fmt(lines),
1370 ["subject", "from", "date", "message-id", "references",
1371 ":bytes", ":lines", "xref", "distribution"])
1372 # A classic response from INN
1373 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1374 "References:", "Bytes:", "Lines:", "Xref:full"]
1375 self.assertEqual(nntplib._parse_overview_fmt(lines),
1376 ["subject", "from", "date", "message-id", "references",
1377 ":bytes", ":lines", "xref"])
1378
1379 def test_parse_overview(self):
1380 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1381 # First example from RFC 3977
1382 lines = [
1383 '3000234\tI am just a test article\t"Demo User" '
1384 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1385 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1386 '17\tXref: news.example.com misc.test:3000363',
1387 ]
1388 overview = nntplib._parse_overview(lines, fmt)
1389 (art_num, fields), = overview
1390 self.assertEqual(art_num, 3000234)
1391 self.assertEqual(fields, {
1392 'subject': 'I am just a test article',
1393 'from': '"Demo User" <nobody@example.com>',
1394 'date': '6 Oct 1998 04:38:40 -0500',
1395 'message-id': '<45223423@example.com>',
1396 'references': '<45454@example.net>',
1397 ':bytes': '1234',
1398 ':lines': '17',
1399 'xref': 'news.example.com misc.test:3000363',
1400 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001401 # Second example; here the "Xref" field is totally absent (including
1402 # the header name) and comes out as None
1403 lines = [
1404 '3000234\tI am just a test article\t"Demo User" '
1405 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1406 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1407 '17\t\t',
1408 ]
1409 overview = nntplib._parse_overview(lines, fmt)
1410 (art_num, fields), = overview
1411 self.assertEqual(fields['xref'], None)
1412 # Third example; the "Xref" is an empty string, while "references"
1413 # is a single space.
1414 lines = [
1415 '3000234\tI am just a test article\t"Demo User" '
1416 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1417 '<45223423@example.com>\t \t1234\t'
1418 '17\tXref: \t',
1419 ]
1420 overview = nntplib._parse_overview(lines, fmt)
1421 (art_num, fields), = overview
1422 self.assertEqual(fields['references'], ' ')
1423 self.assertEqual(fields['xref'], '')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001424
1425 def test_parse_datetime(self):
1426 def gives(a, b, *c):
1427 self.assertEqual(nntplib._parse_datetime(a, b),
1428 datetime.datetime(*c))
1429 # Output of DATE command
1430 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1431 # Variations
1432 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1433 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1434 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1435
1436 def test_unparse_datetime(self):
1437 # Test non-legacy mode
1438 # 1) with a datetime
1439 def gives(y, M, d, h, m, s, date_str, time_str):
1440 dt = datetime.datetime(y, M, d, h, m, s)
1441 self.assertEqual(nntplib._unparse_datetime(dt),
1442 (date_str, time_str))
1443 self.assertEqual(nntplib._unparse_datetime(dt, False),
1444 (date_str, time_str))
1445 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1446 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1447 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1448 # 2) with a date
1449 def gives(y, M, d, date_str, time_str):
1450 dt = datetime.date(y, M, d)
1451 self.assertEqual(nntplib._unparse_datetime(dt),
1452 (date_str, time_str))
1453 self.assertEqual(nntplib._unparse_datetime(dt, False),
1454 (date_str, time_str))
1455 gives(1999, 6, 23, "19990623", "000000")
1456 gives(2000, 6, 23, "20000623", "000000")
1457 gives(2010, 6, 5, "20100605", "000000")
1458
1459 def test_unparse_datetime_legacy(self):
1460 # Test legacy mode (RFC 977)
1461 # 1) with a datetime
1462 def gives(y, M, d, h, m, s, date_str, time_str):
1463 dt = datetime.datetime(y, M, d, h, m, s)
1464 self.assertEqual(nntplib._unparse_datetime(dt, True),
1465 (date_str, time_str))
1466 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1467 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1468 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1469 # 2) with a date
1470 def gives(y, M, d, date_str, time_str):
1471 dt = datetime.date(y, M, d)
1472 self.assertEqual(nntplib._unparse_datetime(dt, True),
1473 (date_str, time_str))
1474 gives(1999, 6, 23, "990623", "000000")
1475 gives(2000, 6, 23, "000623", "000000")
1476 gives(2010, 6, 5, "100605", "000000")
1477
Serhiy Storchaka43767632013-11-03 21:31:38 +02001478 @unittest.skipUnless(ssl, 'requires SSL support')
1479 def test_ssl_support(self):
1480 self.assertTrue(hasattr(nntplib, 'NNTP_SSL'))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001481
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001482
Berker Peksag96756b62014-09-20 08:53:05 +03001483class PublicAPITests(unittest.TestCase):
1484 """Ensures that the correct values are exposed in the public API."""
1485
1486 def test_module_all_attribute(self):
1487 self.assertTrue(hasattr(nntplib, '__all__'))
1488 target_api = ['NNTP', 'NNTPError', 'NNTPReplyError',
1489 'NNTPTemporaryError', 'NNTPPermanentError',
1490 'NNTPProtocolError', 'NNTPDataError', 'decode_header']
1491 if ssl is not None:
1492 target_api.append('NNTP_SSL')
1493 self.assertEqual(set(nntplib.__all__), set(target_api))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001494
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001495class MockSocketTests(unittest.TestCase):
1496 """Tests involving a mock socket object
1497
1498 Used where the _NNTPServerIO file object is not enough."""
1499
1500 nntp_class = nntplib.NNTP
1501
1502 def check_constructor_error_conditions(
1503 self, handler_class,
1504 expected_error_type, expected_error_msg,
1505 login=None, password=None):
1506
1507 class mock_socket_module:
1508 def create_connection(address, timeout):
1509 return MockSocket()
1510
1511 class MockSocket:
1512 def close(self):
1513 nonlocal socket_closed
1514 socket_closed = True
1515
1516 def makefile(socket, mode):
1517 handler = handler_class()
1518 _, file = make_mock_file(handler)
1519 files.append(file)
1520 return file
1521
1522 socket_closed = False
1523 files = []
1524 with patch('nntplib.socket', mock_socket_module), \
1525 self.assertRaisesRegex(expected_error_type, expected_error_msg):
1526 self.nntp_class('dummy', user=login, password=password)
1527 self.assertTrue(socket_closed)
1528 for f in files:
1529 self.assertTrue(f.closed)
1530
1531 def test_bad_welcome(self):
1532 #Test a bad welcome message
1533 class Handler(NNTPv1Handler):
1534 welcome = 'Bad Welcome'
1535 self.check_constructor_error_conditions(
1536 Handler, nntplib.NNTPProtocolError, Handler.welcome)
1537
1538 def test_service_temporarily_unavailable(self):
1539 #Test service temporarily unavailable
1540 class Handler(NNTPv1Handler):
Martin Pantereb995702016-07-28 01:11:04 +00001541 welcome = '400 Service temporarily unavailable'
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001542 self.check_constructor_error_conditions(
1543 Handler, nntplib.NNTPTemporaryError, Handler.welcome)
1544
1545 def test_service_permanently_unavailable(self):
1546 #Test service permanently unavailable
1547 class Handler(NNTPv1Handler):
Martin Pantereb995702016-07-28 01:11:04 +00001548 welcome = '502 Service permanently unavailable'
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001549 self.check_constructor_error_conditions(
1550 Handler, nntplib.NNTPPermanentError, Handler.welcome)
1551
1552 def test_bad_capabilities(self):
1553 #Test a bad capabilities response
1554 class Handler(NNTPv1Handler):
1555 def handle_CAPABILITIES(self):
1556 self.push_lit(capabilities_response)
1557 capabilities_response = '201 bad capability'
1558 self.check_constructor_error_conditions(
1559 Handler, nntplib.NNTPReplyError, capabilities_response)
1560
1561 def test_login_aborted(self):
1562 #Test a bad authinfo response
1563 login = 't@e.com'
1564 password = 'python'
1565 class Handler(NNTPv1Handler):
1566 def handle_AUTHINFO(self, *args):
1567 self.push_lit(authinfo_response)
1568 authinfo_response = '503 Mechanism not recognized'
1569 self.check_constructor_error_conditions(
1570 Handler, nntplib.NNTPPermanentError, authinfo_response,
1571 login, password)
1572
Serhiy Storchaka80774342015-04-03 15:02:20 +03001573class bypass_context:
1574 """Bypass encryption and actual SSL module"""
1575 def wrap_socket(sock, **args):
1576 return sock
1577
1578@unittest.skipUnless(ssl, 'requires SSL support')
1579class MockSslTests(MockSocketTests):
1580 @staticmethod
1581 def nntp_class(*pos, **kw):
1582 return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw)
Victor Stinner8c9bba02015-04-03 11:06:40 +02001583
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001584
Martin Panter8f19e8e2016-01-19 01:10:58 +00001585class LocalServerTests(unittest.TestCase):
1586 def setUp(self):
1587 sock = socket.socket()
Serhiy Storchaka16994912020-04-25 10:06:29 +03001588 port = socket_helper.bind_port(sock)
Martin Panter8f19e8e2016-01-19 01:10:58 +00001589 sock.listen()
1590 self.background = threading.Thread(
1591 target=self.run_server, args=(sock,))
1592 self.background.start()
1593 self.addCleanup(self.background.join)
1594
Serhiy Storchaka16994912020-04-25 10:06:29 +03001595 self.nntp = NNTP(socket_helper.HOST, port, usenetrc=False).__enter__()
Martin Panter8f19e8e2016-01-19 01:10:58 +00001596 self.addCleanup(self.nntp.__exit__, None, None, None)
1597
1598 def run_server(self, sock):
1599 # Could be generalized to handle more commands in separate methods
1600 with sock:
1601 [client, _] = sock.accept()
1602 with contextlib.ExitStack() as cleanup:
1603 cleanup.enter_context(client)
1604 reader = cleanup.enter_context(client.makefile('rb'))
1605 client.sendall(b'200 Server ready\r\n')
1606 while True:
1607 cmd = reader.readline()
1608 if cmd == b'CAPABILITIES\r\n':
1609 client.sendall(
1610 b'101 Capability list:\r\n'
1611 b'VERSION 2\r\n'
1612 b'STARTTLS\r\n'
1613 b'.\r\n'
1614 )
1615 elif cmd == b'STARTTLS\r\n':
1616 reader.close()
1617 client.sendall(b'382 Begin TLS negotiation now\r\n')
Christian Heimes2875c602021-04-19 07:27:10 +02001618 context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimesd0486372016-09-10 23:23:33 +02001619 context.load_cert_chain(certfile)
1620 client = context.wrap_socket(
1621 client, server_side=True)
Martin Panter8f19e8e2016-01-19 01:10:58 +00001622 cleanup.enter_context(client)
1623 reader = cleanup.enter_context(client.makefile('rb'))
1624 elif cmd == b'QUIT\r\n':
1625 client.sendall(b'205 Bye!\r\n')
1626 break
1627 else:
1628 raise ValueError('Unexpected command {!r}'.format(cmd))
1629
1630 @unittest.skipUnless(ssl, 'requires SSL support')
1631 def test_starttls(self):
1632 file = self.nntp.file
1633 sock = self.nntp.sock
1634 self.nntp.starttls()
1635 # Check that the socket and internal pseudo-file really were
1636 # changed.
1637 self.assertNotEqual(file, self.nntp.file)
1638 self.assertNotEqual(sock, self.nntp.sock)
1639 # Check that the new socket really is an SSL one
1640 self.assertIsInstance(self.nntp.sock, ssl.SSLSocket)
1641 # Check that trying starttls when it's already active fails.
1642 self.assertRaises(ValueError, self.nntp.starttls)
1643
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001644
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001645if __name__ == "__main__":
Berker Peksag96756b62014-09-20 08:53:05 +03001646 unittest.main()