blob: 4f0592188f84438cb863062dd12b1c2bf6622aa4 [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +00002import socket
Antoine Pitrou69ab9512010-09-29 15:03:40 +00003import datetime
4import textwrap
5import unittest
Antoine Pitroude609182010-11-18 17:29:23 +00006import functools
Antoine Pitrou69ab9512010-09-29 15:03:40 +00007import contextlib
Dong-hee Naaa92a7c2020-05-16 19:31:54 +09008import nntplib
Martin Panter8f19e8e2016-01-19 01:10:58 +00009import os.path
Gregory P. Smith2cc02232019-05-06 17:54:06 -040010import re
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020011import threading
12
Antoine Pitrou69ab9512010-09-29 15:03:40 +000013from test import support
Serhiy Storchaka16994912020-04-25 10:06:29 +030014from test.support import socket_helper
Serhiy Storchaka43767632013-11-03 21:31:38 +020015from nntplib import NNTP, GroupInfo
Serhiy Storchaka52027c32015-03-21 09:40:26 +020016from unittest.mock import patch
Serhiy Storchaka43767632013-11-03 21:31:38 +020017try:
Antoine Pitrou1cb121e2010-11-09 18:54:37 +000018 import ssl
Serhiy Storchaka43767632013-11-03 21:31:38 +020019except ImportError:
20 ssl = None
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020021
Antoine Pitrou69ab9512010-09-29 15:03:40 +000022
Martin Panter8f19e8e2016-01-19 01:10:58 +000023certfile = os.path.join(os.path.dirname(__file__), 'keycert3.pem')
Antoine Pitrou69ab9512010-09-29 15:03:40 +000024
Gregory P. Smith2cc02232019-05-06 17:54:06 -040025if ssl is not None:
26 SSLError = ssl.SSLError
27else:
28 class SSLError(Exception):
29 """Non-existent exception class when we lack SSL support."""
30 reason = "This will never be raised."
31
Antoine Pitrou69ab9512010-09-29 15:03:40 +000032# TODO:
33# - test the `file` arg to more commands
34# - test error conditions
Antoine Pitroua5785b12010-09-29 16:19:50 +000035# - test auth and `usenetrc`
Antoine Pitrou69ab9512010-09-29 15:03:40 +000036
37
38class NetworkedNNTPTestsMixin:
39
Christian Heimese9832522021-05-01 20:53:10 +020040 ssl_context = None
41
Antoine Pitrou69ab9512010-09-29 15:03:40 +000042 def test_welcome(self):
43 welcome = self.server.getwelcome()
44 self.assertEqual(str, type(welcome))
45
46 def test_help(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000047 resp, lines = self.server.help()
Antoine Pitrou69ab9512010-09-29 15:03:40 +000048 self.assertTrue(resp.startswith("100 "), resp)
Antoine Pitrou08eeada2010-11-04 21:36:15 +000049 for line in lines:
Antoine Pitrou69ab9512010-09-29 15:03:40 +000050 self.assertEqual(str, type(line))
51
52 def test_list(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000053 resp, groups = self.server.list()
54 if len(groups) > 0:
55 self.assertEqual(GroupInfo, type(groups[0]))
56 self.assertEqual(str, type(groups[0].group))
57
58 def test_list_active(self):
59 resp, groups = self.server.list(self.GROUP_PAT)
60 if len(groups) > 0:
61 self.assertEqual(GroupInfo, type(groups[0]))
62 self.assertEqual(str, type(groups[0].group))
Antoine Pitrou69ab9512010-09-29 15:03:40 +000063
64 def test_unknown_command(self):
65 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
66 self.server._shortcmd("XYZZY")
67 resp = cm.exception.response
68 self.assertTrue(resp.startswith("500 "), resp)
69
70 def test_newgroups(self):
71 # gmane gets a constant influx of new groups. In order not to stress
72 # the server too much, we choose a recent date in the past.
73 dt = datetime.date.today() - datetime.timedelta(days=7)
74 resp, groups = self.server.newgroups(dt)
75 if len(groups) > 0:
76 self.assertIsInstance(groups[0], GroupInfo)
77 self.assertIsInstance(groups[0].group, str)
78
79 def test_description(self):
80 def _check_desc(desc):
81 # Sanity checks
82 self.assertIsInstance(desc, str)
83 self.assertNotIn(self.GROUP_NAME, desc)
84 desc = self.server.description(self.GROUP_NAME)
85 _check_desc(desc)
86 # Another sanity check
Dong-hee Naec316532021-01-01 23:20:33 +090087 self.assertIn(self.DESC, desc)
Antoine Pitrou69ab9512010-09-29 15:03:40 +000088 # With a pattern
89 desc = self.server.description(self.GROUP_PAT)
90 _check_desc(desc)
91 # Shouldn't exist
92 desc = self.server.description("zk.brrtt.baz")
93 self.assertEqual(desc, '')
94
95 def test_descriptions(self):
96 resp, descs = self.server.descriptions(self.GROUP_PAT)
97 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
98 self.assertTrue(
99 resp.startswith("215 ") or resp.startswith("282 "), resp)
100 self.assertIsInstance(descs, dict)
101 desc = descs[self.GROUP_NAME]
102 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
103
104 def test_group(self):
105 result = self.server.group(self.GROUP_NAME)
106 self.assertEqual(5, len(result))
107 resp, count, first, last, group = result
108 self.assertEqual(group, self.GROUP_NAME)
109 self.assertIsInstance(count, int)
110 self.assertIsInstance(first, int)
111 self.assertIsInstance(last, int)
112 self.assertLessEqual(first, last)
113 self.assertTrue(resp.startswith("211 "), resp)
114
115 def test_date(self):
116 resp, date = self.server.date()
117 self.assertIsInstance(date, datetime.datetime)
118 # Sanity check
119 self.assertGreaterEqual(date.year, 1995)
120 self.assertLessEqual(date.year, 2030)
121
122 def _check_art_dict(self, art_dict):
123 # Some sanity checks for a field dictionary returned by OVER / XOVER
124 self.assertIsInstance(art_dict, dict)
125 # NNTP has 7 mandatory fields
126 self.assertGreaterEqual(art_dict.keys(),
127 {"subject", "from", "date", "message-id",
128 "references", ":bytes", ":lines"}
129 )
130 for v in art_dict.values():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000131 self.assertIsInstance(v, (str, type(None)))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000132
133 def test_xover(self):
134 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000135 resp, lines = self.server.xover(last - 5, last)
136 if len(lines) == 0:
137 self.skipTest("no articles retrieved")
138 # The 'last' article is not necessarily part of the output (cancelled?)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000139 art_num, art_dict = lines[0]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000140 self.assertGreaterEqual(art_num, last - 5)
141 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000142 self._check_art_dict(art_dict)
143
Xavier de Gayeac13bee2016-12-16 20:49:10 +0100144 @unittest.skipIf(True, 'temporarily skipped until a permanent solution'
145 ' is found for issue #28971')
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000146 def test_over(self):
147 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
148 start = last - 10
149 # The "start-" article range form
150 resp, lines = self.server.over((start, None))
151 art_num, art_dict = lines[0]
152 self._check_art_dict(art_dict)
153 # The "start-end" article range form
154 resp, lines = self.server.over((start, last))
155 art_num, art_dict = lines[-1]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000156 # The 'last' article is not necessarily part of the output (cancelled?)
157 self.assertGreaterEqual(art_num, start)
158 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000159 self._check_art_dict(art_dict)
160 # XXX The "message_id" form is unsupported by gmane
161 # 503 Overview by message-ID unsupported
162
163 def test_xhdr(self):
164 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
165 resp, lines = self.server.xhdr('subject', last)
166 for line in lines:
167 self.assertEqual(str, type(line[1]))
168
169 def check_article_resp(self, resp, article, art_num=None):
170 self.assertIsInstance(article, nntplib.ArticleInfo)
171 if art_num is not None:
172 self.assertEqual(article.number, art_num)
173 for line in article.lines:
174 self.assertIsInstance(line, bytes)
175 # XXX this could exceptionally happen...
176 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
177
Victor Stinner706cb312017-11-25 02:42:18 +0100178 @unittest.skipIf(True, "FIXME: see bpo-32128")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000179 def test_article_head_body(self):
180 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000181 # Try to find an available article
182 for art_num in (last, first, last - 1):
183 try:
184 resp, head = self.server.head(art_num)
185 except nntplib.NNTPTemporaryError as e:
186 if not e.response.startswith("423 "):
187 raise
188 # "423 No such article" => choose another one
189 continue
190 break
191 else:
192 self.skipTest("could not find a suitable article number")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000193 self.assertTrue(resp.startswith("221 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000194 self.check_article_resp(resp, head, art_num)
195 resp, body = self.server.body(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000196 self.assertTrue(resp.startswith("222 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000197 self.check_article_resp(resp, body, art_num)
198 resp, article = self.server.article(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000199 self.assertTrue(resp.startswith("220 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000200 self.check_article_resp(resp, article, art_num)
Nick Coghlan14d99a12012-06-17 21:27:18 +1000201 # Tolerate running the tests from behind a NNTP virus checker
Victor Stinnerfabd7bb2020-08-11 15:26:59 +0200202 denylist = lambda line: line.startswith(b'X-Antivirus')
Antoine Pitrou1f5d2a02012-06-24 16:28:18 +0200203 filtered_head_lines = [line for line in head.lines
Victor Stinnerfabd7bb2020-08-11 15:26:59 +0200204 if not denylist(line)]
Nick Coghlan14d99a12012-06-17 21:27:18 +1000205 filtered_lines = [line for line in article.lines
Victor Stinnerfabd7bb2020-08-11 15:26:59 +0200206 if not denylist(line)]
Antoine Pitrou1f5d2a02012-06-24 16:28:18 +0200207 self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000208
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000209 def test_capabilities(self):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000210 # The server under test implements NNTP version 2 and has a
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000211 # couple of well-known capabilities. Just sanity check that we
212 # got them.
213 def _check_caps(caps):
214 caps_list = caps['LIST']
215 self.assertIsInstance(caps_list, (list, tuple))
216 self.assertIn('OVERVIEW.FMT', caps_list)
217 self.assertGreaterEqual(self.server.nntp_version, 2)
218 _check_caps(self.server.getcapabilities())
219 # This re-emits the command
220 resp, caps = self.server.capabilities()
221 _check_caps(caps)
222
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000223 def test_zlogin(self):
224 # This test must be the penultimate because further commands will be
225 # refused.
226 baduser = "notarealuser"
227 badpw = "notarealpassword"
228 # Check that bogus credentials cause failure
229 self.assertRaises(nntplib.NNTPError, self.server.login,
230 user=baduser, password=badpw, usenetrc=False)
231 # FIXME: We should check that correct credentials succeed, but that
232 # would require valid details for some server somewhere to be in the
233 # test suite, I think. Gmane is anonymous, at least as used for the
234 # other tests.
235
236 def test_zzquit(self):
237 # This test must be called last, hence the name
238 cls = type(self)
Antoine Pitrou3bce11c2010-11-21 17:14:19 +0000239 try:
240 self.server.quit()
241 finally:
242 cls.server = None
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000243
Antoine Pitroude609182010-11-18 17:29:23 +0000244 @classmethod
245 def wrap_methods(cls):
246 # Wrap all methods in a transient_internet() exception catcher
247 # XXX put a generic version in test.support?
248 def wrap_meth(meth):
249 @functools.wraps(meth)
250 def wrapped(self):
Serhiy Storchakabfb1cf42020-04-29 10:36:20 +0300251 with socket_helper.transient_internet(self.NNTP_HOST):
Antoine Pitroude609182010-11-18 17:29:23 +0000252 meth(self)
253 return wrapped
254 for name in dir(cls):
255 if not name.startswith('test_'):
256 continue
257 meth = getattr(cls, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200258 if not callable(meth):
Antoine Pitroude609182010-11-18 17:29:23 +0000259 continue
260 # Need to use a closure so that meth remains bound to its current
261 # value
262 setattr(cls, name, wrap_meth(meth))
263
Dong-hee Na1b335ae2020-01-12 02:39:15 +0900264 def test_timeout(self):
265 with self.assertRaises(ValueError):
266 self.NNTP_CLASS(self.NNTP_HOST, timeout=0, usenetrc=False)
267
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000268 def test_with_statement(self):
269 def is_connected():
270 if not hasattr(server, 'file'):
271 return False
272 try:
273 server.help()
Andrew Svetlov0832af62012-12-18 23:10:48 +0200274 except (OSError, EOFError):
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000275 return False
276 return True
277
Christian Heimese9832522021-05-01 20:53:10 +0200278 kwargs = dict(
279 timeout=support.INTERNET_TIMEOUT,
280 usenetrc=False
281 )
282 if self.ssl_context is not None:
283 kwargs["ssl_context"] = self.ssl_context
284
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400285 try:
Christian Heimese9832522021-05-01 20:53:10 +0200286 server = self.NNTP_CLASS(self.NNTP_HOST, **kwargs)
Victor Stinner1d0f9b32019-12-10 22:09:23 +0100287 with server:
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400288 self.assertTrue(is_connected())
289 self.assertTrue(server.help())
290 self.assertFalse(is_connected())
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000291
Christian Heimese9832522021-05-01 20:53:10 +0200292 server = self.NNTP_CLASS(self.NNTP_HOST, **kwargs)
Victor Stinner1d0f9b32019-12-10 22:09:23 +0100293 with server:
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400294 server.quit()
295 self.assertFalse(is_connected())
296 except SSLError as ssl_err:
297 # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
298 if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
299 raise unittest.SkipTest(f"Got {ssl_err} connecting "
300 f"to {self.NNTP_HOST!r}")
301 raise
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000302
303
Antoine Pitroude609182010-11-18 17:29:23 +0000304NetworkedNNTPTestsMixin.wrap_methods()
305
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000306
INADA Naoki067931d2017-07-26 23:43:22 +0900307EOF_ERRORS = (EOFError,)
Victor Stinner5b4feb72017-07-24 17:41:02 +0200308if ssl is not None:
INADA Naoki067931d2017-07-26 23:43:22 +0900309 EOF_ERRORS += (ssl.SSLEOFError,)
Victor Stinner5b4feb72017-07-24 17:41:02 +0200310
311
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000312class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
313 # This server supports STARTTLS (gmane doesn't)
314 NNTP_HOST = 'news.trigofacile.com'
315 GROUP_NAME = 'fr.comp.lang.python'
316 GROUP_PAT = 'fr.comp.lang.*'
Dong-hee Naec316532021-01-01 23:20:33 +0900317 DESC = 'Python'
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000318
Antoine Pitroude609182010-11-18 17:29:23 +0000319 NNTP_CLASS = NNTP
320
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000321 @classmethod
322 def setUpClass(cls):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000323 support.requires("network")
Christian Heimese9832522021-05-01 20:53:10 +0200324 kwargs = dict(
325 timeout=support.INTERNET_TIMEOUT,
326 usenetrc=False
327 )
328 if cls.ssl_context is not None:
329 kwargs["ssl_context"] = cls.ssl_context
Serhiy Storchakabfb1cf42020-04-29 10:36:20 +0300330 with socket_helper.transient_internet(cls.NNTP_HOST):
Victor Stinner5bccca52017-04-27 17:30:13 +0200331 try:
Christian Heimese9832522021-05-01 20:53:10 +0200332 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, **kwargs)
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400333 except SSLError as ssl_err:
334 # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
335 if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
336 raise unittest.SkipTest(f"{cls} got {ssl_err} connecting "
337 f"to {cls.NNTP_HOST!r}")
Christian Heimese9832522021-05-01 20:53:10 +0200338 print(cls.NNTP_HOST)
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400339 raise
Victor Stinner5b4feb72017-07-24 17:41:02 +0200340 except EOF_ERRORS:
Victor Stinner5bccca52017-04-27 17:30:13 +0200341 raise unittest.SkipTest(f"{cls} got EOF error on connecting "
342 f"to {cls.NNTP_HOST!r}")
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000343
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000344 @classmethod
345 def tearDownClass(cls):
346 if cls.server is not None:
347 cls.server.quit()
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000348
Serhiy Storchaka43767632013-11-03 21:31:38 +0200349@unittest.skipUnless(ssl, 'requires SSL support')
350class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000351
Serhiy Storchaka43767632013-11-03 21:31:38 +0200352 # Technical limits for this public NNTP server (see http://www.aioe.org):
353 # "Only two concurrent connections per IP address are allowed and
354 # 400 connections per day are accepted from each IP address."
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000355
Serhiy Storchaka43767632013-11-03 21:31:38 +0200356 NNTP_HOST = 'nntp.aioe.org'
Dong-hee Naec316532021-01-01 23:20:33 +0900357 # bpo-42794: aioe.test is one of the official groups on this server
358 # used for testing: https://news.aioe.org/manual/aioe-hierarchy/
359 GROUP_NAME = 'aioe.test'
360 GROUP_PAT = 'aioe.*'
361 DESC = 'test'
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000362
Serhiy Storchaka43767632013-11-03 21:31:38 +0200363 NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None)
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000364
Serhiy Storchaka43767632013-11-03 21:31:38 +0200365 # Disabled as it produces too much data
366 test_list = None
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000367
Serhiy Storchaka43767632013-11-03 21:31:38 +0200368 # Disabled as the connection will already be encrypted.
369 test_starttls = None
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000370
Christian Heimesa5669b32021-05-01 22:17:01 +0200371 if ssl is not None:
372 ssl_context = ssl._create_unverified_context()
373 ssl_context.set_ciphers("DEFAULT")
374 ssl_context.maximum_version = ssl.TLSVersion.TLSv1_2
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000375
376#
377# Non-networked tests using a local server (or something mocking it).
378#
379
380class _NNTPServerIO(io.RawIOBase):
381 """A raw IO object allowing NNTP commands to be received and processed
382 by a handler. The handler can push responses which can then be read
383 from the IO object."""
384
385 def __init__(self, handler):
386 io.RawIOBase.__init__(self)
387 # The channel from the client
388 self.c2s = io.BytesIO()
389 # The channel to the client
390 self.s2c = io.BytesIO()
391 self.handler = handler
392 self.handler.start(self.c2s.readline, self.push_data)
393
394 def readable(self):
395 return True
396
397 def writable(self):
398 return True
399
400 def push_data(self, data):
401 """Push (buffer) some data to send to the client."""
402 pos = self.s2c.tell()
403 self.s2c.seek(0, 2)
404 self.s2c.write(data)
405 self.s2c.seek(pos)
406
407 def write(self, b):
408 """The client sends us some data"""
409 pos = self.c2s.tell()
410 self.c2s.write(b)
411 self.c2s.seek(pos)
412 self.handler.process_pending()
413 return len(b)
414
415 def readinto(self, buf):
416 """The client wants to read a response"""
417 self.handler.process_pending()
418 b = self.s2c.read(len(buf))
419 n = len(b)
420 buf[:n] = b
421 return n
422
423
Serhiy Storchaka52027c32015-03-21 09:40:26 +0200424def make_mock_file(handler):
425 sio = _NNTPServerIO(handler)
426 # Using BufferedRWPair instead of BufferedRandom ensures the file
427 # isn't seekable.
428 file = io.BufferedRWPair(sio, sio)
429 return (sio, file)
430
431
Dong-hee Naaa92a7c2020-05-16 19:31:54 +0900432class NNTPServer(nntplib.NNTP):
433
434 def __init__(self, f, host, readermode=None):
435 self.file = f
436 self.host = host
437 self._base_init(readermode)
438
439 def _close(self):
440 self.file.close()
441 del self.file
442
443
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000444class MockedNNTPTestsMixin:
445 # Override in derived classes
446 handler_class = None
447
448 def setUp(self):
449 super().setUp()
450 self.make_server()
451
452 def tearDown(self):
453 super().tearDown()
454 del self.server
455
456 def make_server(self, *args, **kwargs):
457 self.handler = self.handler_class()
Serhiy Storchaka52027c32015-03-21 09:40:26 +0200458 self.sio, file = make_mock_file(self.handler)
Dong-hee Naaa92a7c2020-05-16 19:31:54 +0900459 self.server = NNTPServer(file, 'test.server', *args, **kwargs)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000460 return self.server
461
462
Antoine Pitrou71135622012-02-14 23:29:34 +0100463class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin):
464 def setUp(self):
465 super().setUp()
466 self.make_server(readermode=True)
467
468
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000469class NNTPv1Handler:
470 """A handler for RFC 977"""
471
472 welcome = "200 NNTP mock server"
473
474 def start(self, readline, push_data):
475 self.in_body = False
476 self.allow_posting = True
477 self._readline = readline
478 self._push_data = push_data
Antoine Pitrou54411c12012-02-12 19:14:17 +0100479 self._logged_in = False
480 self._user_sent = False
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000481 # Our welcome
482 self.handle_welcome()
483
484 def _decode(self, data):
485 return str(data, "utf-8", "surrogateescape")
486
487 def process_pending(self):
488 if self.in_body:
489 while True:
490 line = self._readline()
491 if not line:
492 return
493 self.body.append(line)
494 if line == b".\r\n":
495 break
496 try:
497 meth, tokens = self.body_callback
498 meth(*tokens, body=self.body)
499 finally:
500 self.body_callback = None
501 self.body = None
502 self.in_body = False
503 while True:
504 line = self._decode(self._readline())
505 if not line:
506 return
507 if not line.endswith("\r\n"):
508 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
509 line = line[:-2]
510 cmd, *tokens = line.split()
511 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
512 meth = getattr(self, "handle_" + cmd.upper(), None)
513 if meth is None:
514 self.handle_unknown()
515 else:
516 try:
517 meth(*tokens)
518 except Exception as e:
519 raise ValueError("command failed: {!r}".format(line)) from e
520 else:
521 if self.in_body:
522 self.body_callback = meth, tokens
523 self.body = []
524
525 def expect_body(self):
526 """Flag that the client is expected to post a request body"""
527 self.in_body = True
528
529 def push_data(self, data):
530 """Push some binary data"""
531 self._push_data(data)
532
533 def push_lit(self, lit):
534 """Push a string literal"""
535 lit = textwrap.dedent(lit)
536 lit = "\r\n".join(lit.splitlines()) + "\r\n"
537 lit = lit.encode('utf-8')
538 self.push_data(lit)
539
540 def handle_unknown(self):
541 self.push_lit("500 What?")
542
543 def handle_welcome(self):
544 self.push_lit(self.welcome)
545
546 def handle_QUIT(self):
547 self.push_lit("205 Bye!")
548
549 def handle_DATE(self):
550 self.push_lit("111 20100914001155")
551
552 def handle_GROUP(self, group):
553 if group == "fr.comp.lang.python":
554 self.push_lit("211 486 761 1265 fr.comp.lang.python")
555 else:
556 self.push_lit("411 No such group {}".format(group))
557
558 def handle_HELP(self):
559 self.push_lit("""\
560 100 Legal commands
561 authinfo user Name|pass Password|generic <prog> <args>
562 date
563 help
564 Report problems to <root@example.org>
565 .""")
566
567 def handle_STAT(self, message_spec=None):
568 if message_spec is None:
569 self.push_lit("412 No newsgroup selected")
570 elif message_spec == "3000234":
571 self.push_lit("223 3000234 <45223423@example.com>")
572 elif message_spec == "<45223423@example.com>":
573 self.push_lit("223 0 <45223423@example.com>")
574 else:
575 self.push_lit("430 No Such Article Found")
576
577 def handle_NEXT(self):
578 self.push_lit("223 3000237 <668929@example.org> retrieved")
579
580 def handle_LAST(self):
581 self.push_lit("223 3000234 <45223423@example.com> retrieved")
582
583 def handle_LIST(self, action=None, param=None):
584 if action is None:
585 self.push_lit("""\
586 215 Newsgroups in form "group high low flags".
587 comp.lang.python 0000052340 0000002828 y
588 comp.lang.python.announce 0000001153 0000000993 m
589 free.it.comp.lang.python 0000000002 0000000002 y
590 fr.comp.lang.python 0000001254 0000000760 y
591 free.it.comp.lang.python.learner 0000000000 0000000001 y
592 tw.bbs.comp.lang.python 0000000304 0000000304 y
593 .""")
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000594 elif action == "ACTIVE":
595 if param == "*distutils*":
596 self.push_lit("""\
597 215 Newsgroups in form "group high low flags"
598 gmane.comp.python.distutils.devel 0000014104 0000000001 m
599 gmane.comp.python.distutils.cvs 0000000000 0000000001 m
600 .""")
601 else:
602 self.push_lit("""\
603 215 Newsgroups in form "group high low flags"
604 .""")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000605 elif action == "OVERVIEW.FMT":
606 self.push_lit("""\
607 215 Order of fields in overview database.
608 Subject:
609 From:
610 Date:
611 Message-ID:
612 References:
613 Bytes:
614 Lines:
615 Xref:full
616 .""")
617 elif action == "NEWSGROUPS":
618 assert param is not None
619 if param == "comp.lang.python":
620 self.push_lit("""\
621 215 Descriptions in form "group description".
622 comp.lang.python\tThe Python computer language.
623 .""")
624 elif param == "comp.lang.python*":
625 self.push_lit("""\
626 215 Descriptions in form "group description".
627 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
628 comp.lang.python\tThe Python computer language.
629 .""")
630 else:
631 self.push_lit("""\
632 215 Descriptions in form "group description".
633 .""")
634 else:
635 self.push_lit('501 Unknown LIST keyword')
636
637 def handle_NEWNEWS(self, group, date_str, time_str):
638 # We hard code different return messages depending on passed
639 # argument and date syntax.
640 if (group == "comp.lang.python" and date_str == "20100913"
641 and time_str == "082004"):
642 # Date was passed in RFC 3977 format (NNTP "v2")
643 self.push_lit("""\
644 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
645 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
646 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
647 .""")
648 elif (group == "comp.lang.python" and date_str == "100913"
649 and time_str == "082004"):
650 # Date was passed in RFC 977 format (NNTP "v1")
651 self.push_lit("""\
652 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
653 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
654 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
655 .""")
Georg Brandl28e78412013-10-27 07:29:47 +0100656 elif (group == 'comp.lang.python' and
657 date_str in ('20100101', '100101') and
658 time_str == '090000'):
659 self.push_lit('too long line' * 3000 +
660 '\n.')
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000661 else:
662 self.push_lit("""\
663 230 An empty list of newsarticles follows
664 .""")
665 # (Note for experiments: many servers disable NEWNEWS.
666 # As of this writing, sicinfo3.epfl.ch doesn't.)
667
668 def handle_XOVER(self, message_spec):
669 if message_spec == "57-59":
670 self.push_lit(
671 "224 Overview information for 57-58 follows\n"
672 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
673 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
674 "\tSat, 19 Jun 2010 18:04:08 -0400"
675 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
676 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
Dong-hee Na2e6a8ef2020-01-09 00:29:34 +0900677 "\tXref: news.gmane.io gmane.comp.python.authors:57"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000678 "\n"
679 "58\tLooking for a few good bloggers"
680 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
681 "\tThu, 22 Jul 2010 09:14:14 -0400"
682 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
683 "\t\t6683\t16"
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000684 "\t"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000685 "\n"
Martin Panter6245cb32016-04-15 02:14:19 +0000686 # A UTF-8 overview line from fr.comp.lang.python
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000687 "59\tRe: Message d'erreur incompréhensible (par moi)"
688 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
689 "\tWed, 15 Sep 2010 18:09:15 +0200"
690 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
691 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
692 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
693 "\n"
694 ".\n")
695 else:
696 self.push_lit("""\
697 224 No articles
698 .""")
699
700 def handle_POST(self, *, body=None):
701 if body is None:
702 if self.allow_posting:
703 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
704 self.expect_body()
705 else:
706 self.push_lit("440 Posting not permitted")
707 else:
708 assert self.allow_posting
709 self.push_lit("240 Article received OK")
710 self.posted_body = body
711
712 def handle_IHAVE(self, message_id, *, body=None):
713 if body is None:
714 if (self.allow_posting and
715 message_id == "<i.am.an.article.you.will.want@example.com>"):
716 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
717 self.expect_body()
718 else:
719 self.push_lit("435 Article not wanted")
720 else:
721 assert self.allow_posting
722 self.push_lit("235 Article transferred OK")
723 self.posted_body = body
724
725 sample_head = """\
726 From: "Demo User" <nobody@example.net>
727 Subject: I am just a test article
728 Content-Type: text/plain; charset=UTF-8; format=flowed
729 Message-ID: <i.am.an.article.you.will.want@example.com>"""
730
731 sample_body = """\
732 This is just a test article.
733 ..Here is a dot-starting line.
734
735 -- Signed by Andr\xe9."""
736
737 sample_article = sample_head + "\n\n" + sample_body
738
739 def handle_ARTICLE(self, message_spec=None):
740 if message_spec is None:
741 self.push_lit("220 3000237 <45223423@example.com>")
742 elif message_spec == "<45223423@example.com>":
743 self.push_lit("220 0 <45223423@example.com>")
744 elif message_spec == "3000234":
745 self.push_lit("220 3000234 <45223423@example.com>")
746 else:
747 self.push_lit("430 No Such Article Found")
748 return
749 self.push_lit(self.sample_article)
750 self.push_lit(".")
751
752 def handle_HEAD(self, message_spec=None):
753 if message_spec is None:
754 self.push_lit("221 3000237 <45223423@example.com>")
755 elif message_spec == "<45223423@example.com>":
756 self.push_lit("221 0 <45223423@example.com>")
757 elif message_spec == "3000234":
758 self.push_lit("221 3000234 <45223423@example.com>")
759 else:
760 self.push_lit("430 No Such Article Found")
761 return
762 self.push_lit(self.sample_head)
763 self.push_lit(".")
764
765 def handle_BODY(self, message_spec=None):
766 if message_spec is None:
767 self.push_lit("222 3000237 <45223423@example.com>")
768 elif message_spec == "<45223423@example.com>":
769 self.push_lit("222 0 <45223423@example.com>")
770 elif message_spec == "3000234":
771 self.push_lit("222 3000234 <45223423@example.com>")
772 else:
773 self.push_lit("430 No Such Article Found")
774 return
775 self.push_lit(self.sample_body)
776 self.push_lit(".")
777
Antoine Pitrou54411c12012-02-12 19:14:17 +0100778 def handle_AUTHINFO(self, cred_type, data):
779 if self._logged_in:
780 self.push_lit('502 Already Logged In')
781 elif cred_type == 'user':
782 if self._user_sent:
783 self.push_lit('482 User Credential Already Sent')
784 else:
785 self.push_lit('381 Password Required')
786 self._user_sent = True
787 elif cred_type == 'pass':
788 self.push_lit('281 Login Successful')
789 self._logged_in = True
790 else:
791 raise Exception('Unknown cred type {}'.format(cred_type))
792
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000793
794class NNTPv2Handler(NNTPv1Handler):
795 """A handler for RFC 3977 (NNTP "v2")"""
796
797 def handle_CAPABILITIES(self):
Antoine Pitrou54411c12012-02-12 19:14:17 +0100798 fmt = """\
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000799 101 Capability list:
Antoine Pitrouf80b3f72010-11-02 22:31:52 +0000800 VERSION 2 3
Antoine Pitrou54411c12012-02-12 19:14:17 +0100801 IMPLEMENTATION INN 2.5.1{}
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000802 HDR
803 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
804 OVER
805 POST
806 READER
Antoine Pitrou54411c12012-02-12 19:14:17 +0100807 ."""
808
809 if not self._logged_in:
810 self.push_lit(fmt.format('\n AUTHINFO USER'))
811 else:
812 self.push_lit(fmt.format(''))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000813
Antoine Pitrou71135622012-02-14 23:29:34 +0100814 def handle_MODE(self, _):
815 raise Exception('MODE READER sent despite READER has been advertised')
816
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000817 def handle_OVER(self, message_spec=None):
818 return self.handle_XOVER(message_spec)
819
820
Antoine Pitrou54411c12012-02-12 19:14:17 +0100821class CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
822 """A handler that allows CAPABILITIES only after login"""
823
824 def handle_CAPABILITIES(self):
825 if not self._logged_in:
826 self.push_lit('480 You must log in.')
827 else:
828 super().handle_CAPABILITIES()
829
830
Antoine Pitrou71135622012-02-14 23:29:34 +0100831class ModeSwitchingNNTPv2Handler(NNTPv2Handler):
832 """A server that starts in transit mode"""
833
834 def __init__(self):
835 self._switched = False
836
837 def handle_CAPABILITIES(self):
838 fmt = """\
839 101 Capability list:
840 VERSION 2 3
841 IMPLEMENTATION INN 2.5.1
842 HDR
843 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
844 OVER
845 POST
846 {}READER
847 ."""
848 if self._switched:
849 self.push_lit(fmt.format(''))
850 else:
851 self.push_lit(fmt.format('MODE-'))
852
853 def handle_MODE(self, what):
854 assert not self._switched and what == 'reader'
855 self._switched = True
856 self.push_lit('200 Posting allowed')
857
858
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000859class NNTPv1v2TestsMixin:
860
861 def setUp(self):
862 super().setUp()
863
864 def test_welcome(self):
865 self.assertEqual(self.server.welcome, self.handler.welcome)
866
Antoine Pitrou54411c12012-02-12 19:14:17 +0100867 def test_authinfo(self):
868 if self.nntp_version == 2:
869 self.assertIn('AUTHINFO', self.server._caps)
870 self.server.login('testuser', 'testpw')
871 # if AUTHINFO is gone from _caps we also know that getcapabilities()
872 # has been called after login as it should
873 self.assertNotIn('AUTHINFO', self.server._caps)
874
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000875 def test_date(self):
876 resp, date = self.server.date()
877 self.assertEqual(resp, "111 20100914001155")
878 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
879
880 def test_quit(self):
881 self.assertFalse(self.sio.closed)
882 resp = self.server.quit()
883 self.assertEqual(resp, "205 Bye!")
884 self.assertTrue(self.sio.closed)
885
886 def test_help(self):
887 resp, help = self.server.help()
888 self.assertEqual(resp, "100 Legal commands")
889 self.assertEqual(help, [
890 ' authinfo user Name|pass Password|generic <prog> <args>',
891 ' date',
892 ' help',
893 'Report problems to <root@example.org>',
894 ])
895
896 def test_list(self):
897 resp, groups = self.server.list()
898 self.assertEqual(len(groups), 6)
899 g = groups[1]
900 self.assertEqual(g,
901 GroupInfo("comp.lang.python.announce", "0000001153",
902 "0000000993", "m"))
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000903 resp, groups = self.server.list("*distutils*")
904 self.assertEqual(len(groups), 2)
905 g = groups[0]
906 self.assertEqual(g,
907 GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
908 "0000000001", "m"))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000909
910 def test_stat(self):
911 resp, art_num, message_id = self.server.stat(3000234)
912 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
913 self.assertEqual(art_num, 3000234)
914 self.assertEqual(message_id, "<45223423@example.com>")
915 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
916 self.assertEqual(resp, "223 0 <45223423@example.com>")
917 self.assertEqual(art_num, 0)
918 self.assertEqual(message_id, "<45223423@example.com>")
919 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
920 self.server.stat("<non.existent.id>")
921 self.assertEqual(cm.exception.response, "430 No Such Article Found")
922 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
923 self.server.stat()
924 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
925
926 def test_next(self):
927 resp, art_num, message_id = self.server.next()
928 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
929 self.assertEqual(art_num, 3000237)
930 self.assertEqual(message_id, "<668929@example.org>")
931
932 def test_last(self):
933 resp, art_num, message_id = self.server.last()
934 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
935 self.assertEqual(art_num, 3000234)
936 self.assertEqual(message_id, "<45223423@example.com>")
937
938 def test_description(self):
939 desc = self.server.description("comp.lang.python")
940 self.assertEqual(desc, "The Python computer language.")
941 desc = self.server.description("comp.lang.pythonx")
942 self.assertEqual(desc, "")
943
944 def test_descriptions(self):
945 resp, groups = self.server.descriptions("comp.lang.python")
946 self.assertEqual(resp, '215 Descriptions in form "group description".')
947 self.assertEqual(groups, {
948 "comp.lang.python": "The Python computer language.",
949 })
950 resp, groups = self.server.descriptions("comp.lang.python*")
951 self.assertEqual(groups, {
952 "comp.lang.python": "The Python computer language.",
953 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
954 })
955 resp, groups = self.server.descriptions("comp.lang.pythonx")
956 self.assertEqual(groups, {})
957
958 def test_group(self):
959 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
960 self.assertTrue(resp.startswith("211 "), resp)
961 self.assertEqual(first, 761)
962 self.assertEqual(last, 1265)
963 self.assertEqual(count, 486)
964 self.assertEqual(group, "fr.comp.lang.python")
965 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
966 self.server.group("comp.lang.python.devel")
967 exc = cm.exception
968 self.assertTrue(exc.response.startswith("411 No such group"),
969 exc.response)
970
971 def test_newnews(self):
972 # NEWNEWS comp.lang.python [20]100913 082004
973 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
974 resp, ids = self.server.newnews("comp.lang.python", dt)
975 expected = (
976 "230 list of newsarticles (NNTP v{0}) "
977 "created after Mon Sep 13 08:20:04 2010 follows"
978 ).format(self.nntp_version)
979 self.assertEqual(resp, expected)
980 self.assertEqual(ids, [
981 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
982 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
983 ])
984 # NEWNEWS fr.comp.lang.python [20]100913 082004
985 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
986 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
987 self.assertEqual(resp, "230 An empty list of newsarticles follows")
988 self.assertEqual(ids, [])
989
990 def _check_article_body(self, lines):
991 self.assertEqual(len(lines), 4)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +0000992 self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000993 self.assertEqual(lines[-2], b"")
994 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
995 self.assertEqual(lines[-4], b"This is just a test article.")
996
997 def _check_article_head(self, lines):
998 self.assertEqual(len(lines), 4)
999 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
1000 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
1001
1002 def _check_article_data(self, lines):
1003 self.assertEqual(len(lines), 9)
1004 self._check_article_head(lines[:4])
1005 self._check_article_body(lines[-4:])
1006 self.assertEqual(lines[4], b"")
1007
1008 def test_article(self):
1009 # ARTICLE
1010 resp, info = self.server.article()
1011 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
1012 art_num, message_id, lines = info
1013 self.assertEqual(art_num, 3000237)
1014 self.assertEqual(message_id, "<45223423@example.com>")
1015 self._check_article_data(lines)
1016 # ARTICLE num
1017 resp, info = self.server.article(3000234)
1018 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
1019 art_num, message_id, lines = info
1020 self.assertEqual(art_num, 3000234)
1021 self.assertEqual(message_id, "<45223423@example.com>")
1022 self._check_article_data(lines)
1023 # ARTICLE id
1024 resp, info = self.server.article("<45223423@example.com>")
1025 self.assertEqual(resp, "220 0 <45223423@example.com>")
1026 art_num, message_id, lines = info
1027 self.assertEqual(art_num, 0)
1028 self.assertEqual(message_id, "<45223423@example.com>")
1029 self._check_article_data(lines)
1030 # Non-existent id
1031 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1032 self.server.article("<non-existent@example.com>")
1033 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1034
1035 def test_article_file(self):
1036 # With a "file" argument
1037 f = io.BytesIO()
1038 resp, info = self.server.article(file=f)
1039 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
1040 art_num, message_id, lines = info
1041 self.assertEqual(art_num, 3000237)
1042 self.assertEqual(message_id, "<45223423@example.com>")
1043 self.assertEqual(lines, [])
1044 data = f.getvalue()
1045 self.assertTrue(data.startswith(
1046 b'From: "Demo User" <nobody@example.net>\r\n'
1047 b'Subject: I am just a test article\r\n'
1048 ), ascii(data))
1049 self.assertTrue(data.endswith(
1050 b'This is just a test article.\r\n'
1051 b'.Here is a dot-starting line.\r\n'
1052 b'\r\n'
1053 b'-- Signed by Andr\xc3\xa9.\r\n'
1054 ), ascii(data))
1055
1056 def test_head(self):
1057 # HEAD
1058 resp, info = self.server.head()
1059 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1060 art_num, message_id, lines = info
1061 self.assertEqual(art_num, 3000237)
1062 self.assertEqual(message_id, "<45223423@example.com>")
1063 self._check_article_head(lines)
1064 # HEAD num
1065 resp, info = self.server.head(3000234)
1066 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
1067 art_num, message_id, lines = info
1068 self.assertEqual(art_num, 3000234)
1069 self.assertEqual(message_id, "<45223423@example.com>")
1070 self._check_article_head(lines)
1071 # HEAD id
1072 resp, info = self.server.head("<45223423@example.com>")
1073 self.assertEqual(resp, "221 0 <45223423@example.com>")
1074 art_num, message_id, lines = info
1075 self.assertEqual(art_num, 0)
1076 self.assertEqual(message_id, "<45223423@example.com>")
1077 self._check_article_head(lines)
1078 # Non-existent id
1079 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1080 self.server.head("<non-existent@example.com>")
1081 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1082
Antoine Pitrou2640b522012-02-15 18:53:18 +01001083 def test_head_file(self):
1084 f = io.BytesIO()
1085 resp, info = self.server.head(file=f)
1086 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1087 art_num, message_id, lines = info
1088 self.assertEqual(art_num, 3000237)
1089 self.assertEqual(message_id, "<45223423@example.com>")
1090 self.assertEqual(lines, [])
1091 data = f.getvalue()
1092 self.assertTrue(data.startswith(
1093 b'From: "Demo User" <nobody@example.net>\r\n'
1094 b'Subject: I am just a test article\r\n'
1095 ), ascii(data))
1096 self.assertFalse(data.endswith(
1097 b'This is just a test article.\r\n'
1098 b'.Here is a dot-starting line.\r\n'
1099 b'\r\n'
1100 b'-- Signed by Andr\xc3\xa9.\r\n'
1101 ), ascii(data))
1102
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001103 def test_body(self):
1104 # BODY
1105 resp, info = self.server.body()
1106 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1107 art_num, message_id, lines = info
1108 self.assertEqual(art_num, 3000237)
1109 self.assertEqual(message_id, "<45223423@example.com>")
1110 self._check_article_body(lines)
1111 # BODY num
1112 resp, info = self.server.body(3000234)
1113 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
1114 art_num, message_id, lines = info
1115 self.assertEqual(art_num, 3000234)
1116 self.assertEqual(message_id, "<45223423@example.com>")
1117 self._check_article_body(lines)
1118 # BODY id
1119 resp, info = self.server.body("<45223423@example.com>")
1120 self.assertEqual(resp, "222 0 <45223423@example.com>")
1121 art_num, message_id, lines = info
1122 self.assertEqual(art_num, 0)
1123 self.assertEqual(message_id, "<45223423@example.com>")
1124 self._check_article_body(lines)
1125 # Non-existent id
1126 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1127 self.server.body("<non-existent@example.com>")
1128 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1129
Antoine Pitrou2640b522012-02-15 18:53:18 +01001130 def test_body_file(self):
1131 f = io.BytesIO()
1132 resp, info = self.server.body(file=f)
1133 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1134 art_num, message_id, lines = info
1135 self.assertEqual(art_num, 3000237)
1136 self.assertEqual(message_id, "<45223423@example.com>")
1137 self.assertEqual(lines, [])
1138 data = f.getvalue()
1139 self.assertFalse(data.startswith(
1140 b'From: "Demo User" <nobody@example.net>\r\n'
1141 b'Subject: I am just a test article\r\n'
1142 ), ascii(data))
1143 self.assertTrue(data.endswith(
1144 b'This is just a test article.\r\n'
1145 b'.Here is a dot-starting line.\r\n'
1146 b'\r\n'
1147 b'-- Signed by Andr\xc3\xa9.\r\n'
1148 ), ascii(data))
1149
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001150 def check_over_xover_resp(self, resp, overviews):
1151 self.assertTrue(resp.startswith("224 "), resp)
1152 self.assertEqual(len(overviews), 3)
1153 art_num, over = overviews[0]
1154 self.assertEqual(art_num, 57)
1155 self.assertEqual(over, {
1156 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
1157 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
1158 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
1159 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
1160 "references": "<hvalf7$ort$1@dough.gmane.org>",
1161 ":bytes": "7103",
1162 ":lines": "16",
Dong-hee Na2e6a8ef2020-01-09 00:29:34 +09001163 "xref": "news.gmane.io gmane.comp.python.authors:57"
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001164 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001165 art_num, over = overviews[1]
1166 self.assertEqual(over["xref"], None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001167 art_num, over = overviews[2]
1168 self.assertEqual(over["subject"],
1169 "Re: Message d'erreur incompréhensible (par moi)")
1170
1171 def test_xover(self):
1172 resp, overviews = self.server.xover(57, 59)
1173 self.check_over_xover_resp(resp, overviews)
1174
1175 def test_over(self):
1176 # In NNTP "v1", this will fallback on XOVER
1177 resp, overviews = self.server.over((57, 59))
1178 self.check_over_xover_resp(resp, overviews)
1179
1180 sample_post = (
1181 b'From: "Demo User" <nobody@example.net>\r\n'
1182 b'Subject: I am just a test article\r\n'
1183 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
1184 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
1185 b'\r\n'
1186 b'This is just a test article.\r\n'
1187 b'.Here is a dot-starting line.\r\n'
1188 b'\r\n'
1189 b'-- Signed by Andr\xc3\xa9.\r\n'
1190 )
1191
1192 def _check_posted_body(self):
1193 # Check the raw body as received by the server
1194 lines = self.handler.posted_body
1195 # One additional line for the "." terminator
1196 self.assertEqual(len(lines), 10)
1197 self.assertEqual(lines[-1], b'.\r\n')
1198 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
1199 self.assertEqual(lines[-3], b'\r\n')
1200 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
1201 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
1202
1203 def _check_post_ihave_sub(self, func, *args, file_factory):
1204 # First the prepared post with CRLF endings
1205 post = self.sample_post
1206 func_args = args + (file_factory(post),)
1207 self.handler.posted_body = None
1208 resp = func(*func_args)
1209 self._check_posted_body()
1210 # Then the same post with "normal" line endings - they should be
1211 # converted by NNTP.post and NNTP.ihave.
1212 post = self.sample_post.replace(b"\r\n", b"\n")
1213 func_args = args + (file_factory(post),)
1214 self.handler.posted_body = None
1215 resp = func(*func_args)
1216 self._check_posted_body()
1217 return resp
1218
1219 def check_post_ihave(self, func, success_resp, *args):
1220 # With a bytes object
1221 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1222 self.assertEqual(resp, success_resp)
1223 # With a bytearray object
1224 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1225 self.assertEqual(resp, success_resp)
1226 # With a file object
1227 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1228 self.assertEqual(resp, success_resp)
1229 # With an iterable of terminated lines
1230 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001231 return iter(b.splitlines(keepends=True))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001232 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1233 self.assertEqual(resp, success_resp)
1234 # With an iterable of non-terminated lines
1235 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001236 return iter(b.splitlines(keepends=False))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001237 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1238 self.assertEqual(resp, success_resp)
1239
1240 def test_post(self):
1241 self.check_post_ihave(self.server.post, "240 Article received OK")
1242 self.handler.allow_posting = False
1243 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1244 self.server.post(self.sample_post)
1245 self.assertEqual(cm.exception.response,
1246 "440 Posting not permitted")
1247
1248 def test_ihave(self):
1249 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1250 "<i.am.an.article.you.will.want@example.com>")
1251 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1252 self.server.ihave("<another.message.id>", self.sample_post)
1253 self.assertEqual(cm.exception.response,
1254 "435 Article not wanted")
1255
Georg Brandl28e78412013-10-27 07:29:47 +01001256 def test_too_long_lines(self):
1257 dt = datetime.datetime(2010, 1, 1, 9, 0, 0)
1258 self.assertRaises(nntplib.NNTPDataError,
1259 self.server.newnews, "comp.lang.python", dt)
1260
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001261
1262class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1263 """Tests an NNTP v1 server (no capabilities)."""
1264
1265 nntp_version = 1
1266 handler_class = NNTPv1Handler
1267
1268 def test_caps(self):
1269 caps = self.server.getcapabilities()
1270 self.assertEqual(caps, {})
1271 self.assertEqual(self.server.nntp_version, 1)
Antoine Pitroua0781152010-11-05 19:16:37 +00001272 self.assertEqual(self.server.nntp_implementation, None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001273
1274
1275class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1276 """Tests an NNTP v2 server (with capabilities)."""
1277
1278 nntp_version = 2
1279 handler_class = NNTPv2Handler
1280
1281 def test_caps(self):
1282 caps = self.server.getcapabilities()
1283 self.assertEqual(caps, {
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001284 'VERSION': ['2', '3'],
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001285 'IMPLEMENTATION': ['INN', '2.5.1'],
1286 'AUTHINFO': ['USER'],
1287 'HDR': [],
1288 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1289 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1290 'OVER': [],
1291 'POST': [],
1292 'READER': [],
1293 })
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001294 self.assertEqual(self.server.nntp_version, 3)
Antoine Pitroua0781152010-11-05 19:16:37 +00001295 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001296
1297
Antoine Pitrou54411c12012-02-12 19:14:17 +01001298class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
1299 """Tests a probably NNTP v2 server with capabilities only after login."""
1300
1301 nntp_version = 2
1302 handler_class = CapsAfterLoginNNTPv2Handler
1303
1304 def test_caps_only_after_login(self):
1305 self.assertEqual(self.server._caps, {})
1306 self.server.login('testuser', 'testpw')
1307 self.assertIn('VERSION', self.server._caps)
1308
1309
Antoine Pitrou71135622012-02-14 23:29:34 +01001310class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin,
1311 unittest.TestCase):
1312 """Same tests as for v2 but we tell NTTP to send MODE READER to a server
1313 that isn't in READER mode by default."""
1314
1315 nntp_version = 2
1316 handler_class = ModeSwitchingNNTPv2Handler
1317
1318 def test_we_are_in_reader_mode_after_connect(self):
1319 self.assertIn('READER', self.server._caps)
1320
1321
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001322class MiscTests(unittest.TestCase):
1323
1324 def test_decode_header(self):
1325 def gives(a, b):
1326 self.assertEqual(nntplib.decode_header(a), b)
1327 gives("" , "")
1328 gives("a plain header", "a plain header")
1329 gives(" with extra spaces ", " with extra spaces ")
1330 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
1331 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1332 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1333 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1334 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1335 "Re: problème de matrice")
1336 # A natively utf-8 header (found in the real world!)
1337 gives("Re: Message d'erreur incompréhensible (par moi)",
1338 "Re: Message d'erreur incompréhensible (par moi)")
1339
1340 def test_parse_overview_fmt(self):
1341 # The minimal (default) response
1342 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1343 "References:", ":bytes", ":lines"]
1344 self.assertEqual(nntplib._parse_overview_fmt(lines),
1345 ["subject", "from", "date", "message-id", "references",
1346 ":bytes", ":lines"])
1347 # The minimal response using alternative names
1348 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1349 "References:", "Bytes:", "Lines:"]
1350 self.assertEqual(nntplib._parse_overview_fmt(lines),
1351 ["subject", "from", "date", "message-id", "references",
1352 ":bytes", ":lines"])
1353 # Variations in casing
1354 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1355 "References:", "BYTES:", "Lines:"]
1356 self.assertEqual(nntplib._parse_overview_fmt(lines),
1357 ["subject", "from", "date", "message-id", "references",
1358 ":bytes", ":lines"])
1359 # First example from RFC 3977
1360 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1361 "References:", ":bytes", ":lines", "Xref:full",
1362 "Distribution:full"]
1363 self.assertEqual(nntplib._parse_overview_fmt(lines),
1364 ["subject", "from", "date", "message-id", "references",
1365 ":bytes", ":lines", "xref", "distribution"])
1366 # Second example from RFC 3977
1367 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1368 "References:", "Bytes:", "Lines:", "Xref:FULL",
1369 "Distribution:FULL"]
1370 self.assertEqual(nntplib._parse_overview_fmt(lines),
1371 ["subject", "from", "date", "message-id", "references",
1372 ":bytes", ":lines", "xref", "distribution"])
1373 # A classic response from INN
1374 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1375 "References:", "Bytes:", "Lines:", "Xref:full"]
1376 self.assertEqual(nntplib._parse_overview_fmt(lines),
1377 ["subject", "from", "date", "message-id", "references",
1378 ":bytes", ":lines", "xref"])
1379
1380 def test_parse_overview(self):
1381 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1382 # First example from RFC 3977
1383 lines = [
1384 '3000234\tI am just a test article\t"Demo User" '
1385 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1386 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1387 '17\tXref: news.example.com misc.test:3000363',
1388 ]
1389 overview = nntplib._parse_overview(lines, fmt)
1390 (art_num, fields), = overview
1391 self.assertEqual(art_num, 3000234)
1392 self.assertEqual(fields, {
1393 'subject': 'I am just a test article',
1394 'from': '"Demo User" <nobody@example.com>',
1395 'date': '6 Oct 1998 04:38:40 -0500',
1396 'message-id': '<45223423@example.com>',
1397 'references': '<45454@example.net>',
1398 ':bytes': '1234',
1399 ':lines': '17',
1400 'xref': 'news.example.com misc.test:3000363',
1401 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001402 # Second example; here the "Xref" field is totally absent (including
1403 # the header name) and comes out as None
1404 lines = [
1405 '3000234\tI am just a test article\t"Demo User" '
1406 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1407 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1408 '17\t\t',
1409 ]
1410 overview = nntplib._parse_overview(lines, fmt)
1411 (art_num, fields), = overview
1412 self.assertEqual(fields['xref'], None)
1413 # Third example; the "Xref" is an empty string, while "references"
1414 # is a single space.
1415 lines = [
1416 '3000234\tI am just a test article\t"Demo User" '
1417 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1418 '<45223423@example.com>\t \t1234\t'
1419 '17\tXref: \t',
1420 ]
1421 overview = nntplib._parse_overview(lines, fmt)
1422 (art_num, fields), = overview
1423 self.assertEqual(fields['references'], ' ')
1424 self.assertEqual(fields['xref'], '')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001425
1426 def test_parse_datetime(self):
1427 def gives(a, b, *c):
1428 self.assertEqual(nntplib._parse_datetime(a, b),
1429 datetime.datetime(*c))
1430 # Output of DATE command
1431 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1432 # Variations
1433 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1434 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1435 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1436
1437 def test_unparse_datetime(self):
1438 # Test non-legacy mode
1439 # 1) with a datetime
1440 def gives(y, M, d, h, m, s, date_str, time_str):
1441 dt = datetime.datetime(y, M, d, h, m, s)
1442 self.assertEqual(nntplib._unparse_datetime(dt),
1443 (date_str, time_str))
1444 self.assertEqual(nntplib._unparse_datetime(dt, False),
1445 (date_str, time_str))
1446 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1447 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1448 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1449 # 2) with a date
1450 def gives(y, M, d, date_str, time_str):
1451 dt = datetime.date(y, M, d)
1452 self.assertEqual(nntplib._unparse_datetime(dt),
1453 (date_str, time_str))
1454 self.assertEqual(nntplib._unparse_datetime(dt, False),
1455 (date_str, time_str))
1456 gives(1999, 6, 23, "19990623", "000000")
1457 gives(2000, 6, 23, "20000623", "000000")
1458 gives(2010, 6, 5, "20100605", "000000")
1459
1460 def test_unparse_datetime_legacy(self):
1461 # Test legacy mode (RFC 977)
1462 # 1) with a datetime
1463 def gives(y, M, d, h, m, s, date_str, time_str):
1464 dt = datetime.datetime(y, M, d, h, m, s)
1465 self.assertEqual(nntplib._unparse_datetime(dt, True),
1466 (date_str, time_str))
1467 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1468 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1469 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1470 # 2) with a date
1471 def gives(y, M, d, date_str, time_str):
1472 dt = datetime.date(y, M, d)
1473 self.assertEqual(nntplib._unparse_datetime(dt, True),
1474 (date_str, time_str))
1475 gives(1999, 6, 23, "990623", "000000")
1476 gives(2000, 6, 23, "000623", "000000")
1477 gives(2010, 6, 5, "100605", "000000")
1478
Serhiy Storchaka43767632013-11-03 21:31:38 +02001479 @unittest.skipUnless(ssl, 'requires SSL support')
1480 def test_ssl_support(self):
1481 self.assertTrue(hasattr(nntplib, 'NNTP_SSL'))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001482
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001483
Berker Peksag96756b62014-09-20 08:53:05 +03001484class PublicAPITests(unittest.TestCase):
1485 """Ensures that the correct values are exposed in the public API."""
1486
1487 def test_module_all_attribute(self):
1488 self.assertTrue(hasattr(nntplib, '__all__'))
1489 target_api = ['NNTP', 'NNTPError', 'NNTPReplyError',
1490 'NNTPTemporaryError', 'NNTPPermanentError',
1491 'NNTPProtocolError', 'NNTPDataError', 'decode_header']
1492 if ssl is not None:
1493 target_api.append('NNTP_SSL')
1494 self.assertEqual(set(nntplib.__all__), set(target_api))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001495
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001496class MockSocketTests(unittest.TestCase):
1497 """Tests involving a mock socket object
1498
1499 Used where the _NNTPServerIO file object is not enough."""
1500
1501 nntp_class = nntplib.NNTP
1502
1503 def check_constructor_error_conditions(
1504 self, handler_class,
1505 expected_error_type, expected_error_msg,
1506 login=None, password=None):
1507
1508 class mock_socket_module:
1509 def create_connection(address, timeout):
1510 return MockSocket()
1511
1512 class MockSocket:
1513 def close(self):
1514 nonlocal socket_closed
1515 socket_closed = True
1516
1517 def makefile(socket, mode):
1518 handler = handler_class()
1519 _, file = make_mock_file(handler)
1520 files.append(file)
1521 return file
1522
1523 socket_closed = False
1524 files = []
1525 with patch('nntplib.socket', mock_socket_module), \
1526 self.assertRaisesRegex(expected_error_type, expected_error_msg):
1527 self.nntp_class('dummy', user=login, password=password)
1528 self.assertTrue(socket_closed)
1529 for f in files:
1530 self.assertTrue(f.closed)
1531
1532 def test_bad_welcome(self):
1533 #Test a bad welcome message
1534 class Handler(NNTPv1Handler):
1535 welcome = 'Bad Welcome'
1536 self.check_constructor_error_conditions(
1537 Handler, nntplib.NNTPProtocolError, Handler.welcome)
1538
1539 def test_service_temporarily_unavailable(self):
1540 #Test service temporarily unavailable
1541 class Handler(NNTPv1Handler):
Martin Pantereb995702016-07-28 01:11:04 +00001542 welcome = '400 Service temporarily unavailable'
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001543 self.check_constructor_error_conditions(
1544 Handler, nntplib.NNTPTemporaryError, Handler.welcome)
1545
1546 def test_service_permanently_unavailable(self):
1547 #Test service permanently unavailable
1548 class Handler(NNTPv1Handler):
Martin Pantereb995702016-07-28 01:11:04 +00001549 welcome = '502 Service permanently unavailable'
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001550 self.check_constructor_error_conditions(
1551 Handler, nntplib.NNTPPermanentError, Handler.welcome)
1552
1553 def test_bad_capabilities(self):
1554 #Test a bad capabilities response
1555 class Handler(NNTPv1Handler):
1556 def handle_CAPABILITIES(self):
1557 self.push_lit(capabilities_response)
1558 capabilities_response = '201 bad capability'
1559 self.check_constructor_error_conditions(
1560 Handler, nntplib.NNTPReplyError, capabilities_response)
1561
1562 def test_login_aborted(self):
1563 #Test a bad authinfo response
1564 login = 't@e.com'
1565 password = 'python'
1566 class Handler(NNTPv1Handler):
1567 def handle_AUTHINFO(self, *args):
1568 self.push_lit(authinfo_response)
1569 authinfo_response = '503 Mechanism not recognized'
1570 self.check_constructor_error_conditions(
1571 Handler, nntplib.NNTPPermanentError, authinfo_response,
1572 login, password)
1573
Serhiy Storchaka80774342015-04-03 15:02:20 +03001574class bypass_context:
1575 """Bypass encryption and actual SSL module"""
1576 def wrap_socket(sock, **args):
1577 return sock
1578
1579@unittest.skipUnless(ssl, 'requires SSL support')
1580class MockSslTests(MockSocketTests):
1581 @staticmethod
1582 def nntp_class(*pos, **kw):
1583 return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw)
Victor Stinner8c9bba02015-04-03 11:06:40 +02001584
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001585
Martin Panter8f19e8e2016-01-19 01:10:58 +00001586class LocalServerTests(unittest.TestCase):
1587 def setUp(self):
1588 sock = socket.socket()
Serhiy Storchaka16994912020-04-25 10:06:29 +03001589 port = socket_helper.bind_port(sock)
Martin Panter8f19e8e2016-01-19 01:10:58 +00001590 sock.listen()
1591 self.background = threading.Thread(
1592 target=self.run_server, args=(sock,))
1593 self.background.start()
1594 self.addCleanup(self.background.join)
1595
Serhiy Storchaka16994912020-04-25 10:06:29 +03001596 self.nntp = NNTP(socket_helper.HOST, port, usenetrc=False).__enter__()
Martin Panter8f19e8e2016-01-19 01:10:58 +00001597 self.addCleanup(self.nntp.__exit__, None, None, None)
1598
1599 def run_server(self, sock):
1600 # Could be generalized to handle more commands in separate methods
1601 with sock:
1602 [client, _] = sock.accept()
1603 with contextlib.ExitStack() as cleanup:
1604 cleanup.enter_context(client)
1605 reader = cleanup.enter_context(client.makefile('rb'))
1606 client.sendall(b'200 Server ready\r\n')
1607 while True:
1608 cmd = reader.readline()
1609 if cmd == b'CAPABILITIES\r\n':
1610 client.sendall(
1611 b'101 Capability list:\r\n'
1612 b'VERSION 2\r\n'
1613 b'STARTTLS\r\n'
1614 b'.\r\n'
1615 )
1616 elif cmd == b'STARTTLS\r\n':
1617 reader.close()
1618 client.sendall(b'382 Begin TLS negotiation now\r\n')
Christian Heimes2875c602021-04-19 07:27:10 +02001619 context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimesd0486372016-09-10 23:23:33 +02001620 context.load_cert_chain(certfile)
1621 client = context.wrap_socket(
1622 client, server_side=True)
Martin Panter8f19e8e2016-01-19 01:10:58 +00001623 cleanup.enter_context(client)
1624 reader = cleanup.enter_context(client.makefile('rb'))
1625 elif cmd == b'QUIT\r\n':
1626 client.sendall(b'205 Bye!\r\n')
1627 break
1628 else:
1629 raise ValueError('Unexpected command {!r}'.format(cmd))
1630
1631 @unittest.skipUnless(ssl, 'requires SSL support')
1632 def test_starttls(self):
1633 file = self.nntp.file
1634 sock = self.nntp.sock
1635 self.nntp.starttls()
1636 # Check that the socket and internal pseudo-file really were
1637 # changed.
1638 self.assertNotEqual(file, self.nntp.file)
1639 self.assertNotEqual(sock, self.nntp.sock)
1640 # Check that the new socket really is an SSL one
1641 self.assertIsInstance(self.nntp.sock, ssl.SSLSocket)
1642 # Check that trying starttls when it's already active fails.
1643 self.assertRaises(ValueError, self.nntp.starttls)
1644
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001645
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001646if __name__ == "__main__":
Berker Peksag96756b62014-09-20 08:53:05 +03001647 unittest.main()