blob: 1df64fa7c6b00b2272be8032fce3cd7335ae4379 [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +00002import socket
Antoine Pitrou69ab9512010-09-29 15:03:40 +00003import datetime
4import textwrap
5import unittest
Antoine Pitroude609182010-11-18 17:29:23 +00006import functools
Antoine Pitrou69ab9512010-09-29 15:03:40 +00007import contextlib
Dong-hee Naaa92a7c2020-05-16 19:31:54 +09008import nntplib
Martin Panter8f19e8e2016-01-19 01:10:58 +00009import os.path
Gregory P. Smith2cc02232019-05-06 17:54:06 -040010import re
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020011import threading
12
Antoine Pitrou69ab9512010-09-29 15:03:40 +000013from test import support
Serhiy Storchaka16994912020-04-25 10:06:29 +030014from test.support import socket_helper
Serhiy Storchaka43767632013-11-03 21:31:38 +020015from nntplib import NNTP, GroupInfo
Serhiy Storchaka52027c32015-03-21 09:40:26 +020016from unittest.mock import patch
Serhiy Storchaka43767632013-11-03 21:31:38 +020017try:
Antoine Pitrou1cb121e2010-11-09 18:54:37 +000018 import ssl
Serhiy Storchaka43767632013-11-03 21:31:38 +020019except ImportError:
20 ssl = None
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020021
Antoine Pitrou69ab9512010-09-29 15:03:40 +000022
Martin Panter8f19e8e2016-01-19 01:10:58 +000023certfile = os.path.join(os.path.dirname(__file__), 'keycert3.pem')
Antoine Pitrou69ab9512010-09-29 15:03:40 +000024
Gregory P. Smith2cc02232019-05-06 17:54:06 -040025if ssl is not None:
26 SSLError = ssl.SSLError
27else:
28 class SSLError(Exception):
29 """Non-existent exception class when we lack SSL support."""
30 reason = "This will never be raised."
31
Antoine Pitrou69ab9512010-09-29 15:03:40 +000032# TODO:
33# - test the `file` arg to more commands
34# - test error conditions
Antoine Pitroua5785b12010-09-29 16:19:50 +000035# - test auth and `usenetrc`
Antoine Pitrou69ab9512010-09-29 15:03:40 +000036
37
38class NetworkedNNTPTestsMixin:
39
40 def test_welcome(self):
41 welcome = self.server.getwelcome()
42 self.assertEqual(str, type(welcome))
43
44 def test_help(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000045 resp, lines = self.server.help()
Antoine Pitrou69ab9512010-09-29 15:03:40 +000046 self.assertTrue(resp.startswith("100 "), resp)
Antoine Pitrou08eeada2010-11-04 21:36:15 +000047 for line in lines:
Antoine Pitrou69ab9512010-09-29 15:03:40 +000048 self.assertEqual(str, type(line))
49
50 def test_list(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000051 resp, groups = self.server.list()
52 if len(groups) > 0:
53 self.assertEqual(GroupInfo, type(groups[0]))
54 self.assertEqual(str, type(groups[0].group))
55
56 def test_list_active(self):
57 resp, groups = self.server.list(self.GROUP_PAT)
58 if len(groups) > 0:
59 self.assertEqual(GroupInfo, type(groups[0]))
60 self.assertEqual(str, type(groups[0].group))
Antoine Pitrou69ab9512010-09-29 15:03:40 +000061
62 def test_unknown_command(self):
63 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
64 self.server._shortcmd("XYZZY")
65 resp = cm.exception.response
66 self.assertTrue(resp.startswith("500 "), resp)
67
68 def test_newgroups(self):
69 # gmane gets a constant influx of new groups. In order not to stress
70 # the server too much, we choose a recent date in the past.
71 dt = datetime.date.today() - datetime.timedelta(days=7)
72 resp, groups = self.server.newgroups(dt)
73 if len(groups) > 0:
74 self.assertIsInstance(groups[0], GroupInfo)
75 self.assertIsInstance(groups[0].group, str)
76
77 def test_description(self):
78 def _check_desc(desc):
79 # Sanity checks
80 self.assertIsInstance(desc, str)
81 self.assertNotIn(self.GROUP_NAME, desc)
82 desc = self.server.description(self.GROUP_NAME)
83 _check_desc(desc)
84 # Another sanity check
85 self.assertIn("Python", desc)
86 # With a pattern
87 desc = self.server.description(self.GROUP_PAT)
88 _check_desc(desc)
89 # Shouldn't exist
90 desc = self.server.description("zk.brrtt.baz")
91 self.assertEqual(desc, '')
92
93 def test_descriptions(self):
94 resp, descs = self.server.descriptions(self.GROUP_PAT)
95 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
96 self.assertTrue(
97 resp.startswith("215 ") or resp.startswith("282 "), resp)
98 self.assertIsInstance(descs, dict)
99 desc = descs[self.GROUP_NAME]
100 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
101
102 def test_group(self):
103 result = self.server.group(self.GROUP_NAME)
104 self.assertEqual(5, len(result))
105 resp, count, first, last, group = result
106 self.assertEqual(group, self.GROUP_NAME)
107 self.assertIsInstance(count, int)
108 self.assertIsInstance(first, int)
109 self.assertIsInstance(last, int)
110 self.assertLessEqual(first, last)
111 self.assertTrue(resp.startswith("211 "), resp)
112
113 def test_date(self):
114 resp, date = self.server.date()
115 self.assertIsInstance(date, datetime.datetime)
116 # Sanity check
117 self.assertGreaterEqual(date.year, 1995)
118 self.assertLessEqual(date.year, 2030)
119
120 def _check_art_dict(self, art_dict):
121 # Some sanity checks for a field dictionary returned by OVER / XOVER
122 self.assertIsInstance(art_dict, dict)
123 # NNTP has 7 mandatory fields
124 self.assertGreaterEqual(art_dict.keys(),
125 {"subject", "from", "date", "message-id",
126 "references", ":bytes", ":lines"}
127 )
128 for v in art_dict.values():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000129 self.assertIsInstance(v, (str, type(None)))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000130
131 def test_xover(self):
132 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000133 resp, lines = self.server.xover(last - 5, last)
134 if len(lines) == 0:
135 self.skipTest("no articles retrieved")
136 # The 'last' article is not necessarily part of the output (cancelled?)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000137 art_num, art_dict = lines[0]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000138 self.assertGreaterEqual(art_num, last - 5)
139 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000140 self._check_art_dict(art_dict)
141
Xavier de Gayeac13bee2016-12-16 20:49:10 +0100142 @unittest.skipIf(True, 'temporarily skipped until a permanent solution'
143 ' is found for issue #28971')
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000144 def test_over(self):
145 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
146 start = last - 10
147 # The "start-" article range form
148 resp, lines = self.server.over((start, None))
149 art_num, art_dict = lines[0]
150 self._check_art_dict(art_dict)
151 # The "start-end" article range form
152 resp, lines = self.server.over((start, last))
153 art_num, art_dict = lines[-1]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000154 # The 'last' article is not necessarily part of the output (cancelled?)
155 self.assertGreaterEqual(art_num, start)
156 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000157 self._check_art_dict(art_dict)
158 # XXX The "message_id" form is unsupported by gmane
159 # 503 Overview by message-ID unsupported
160
161 def test_xhdr(self):
162 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
163 resp, lines = self.server.xhdr('subject', last)
164 for line in lines:
165 self.assertEqual(str, type(line[1]))
166
167 def check_article_resp(self, resp, article, art_num=None):
168 self.assertIsInstance(article, nntplib.ArticleInfo)
169 if art_num is not None:
170 self.assertEqual(article.number, art_num)
171 for line in article.lines:
172 self.assertIsInstance(line, bytes)
173 # XXX this could exceptionally happen...
174 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
175
Victor Stinner706cb312017-11-25 02:42:18 +0100176 @unittest.skipIf(True, "FIXME: see bpo-32128")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000177 def test_article_head_body(self):
178 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000179 # Try to find an available article
180 for art_num in (last, first, last - 1):
181 try:
182 resp, head = self.server.head(art_num)
183 except nntplib.NNTPTemporaryError as e:
184 if not e.response.startswith("423 "):
185 raise
186 # "423 No such article" => choose another one
187 continue
188 break
189 else:
190 self.skipTest("could not find a suitable article number")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000191 self.assertTrue(resp.startswith("221 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000192 self.check_article_resp(resp, head, art_num)
193 resp, body = self.server.body(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000194 self.assertTrue(resp.startswith("222 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000195 self.check_article_resp(resp, body, art_num)
196 resp, article = self.server.article(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000197 self.assertTrue(resp.startswith("220 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000198 self.check_article_resp(resp, article, art_num)
Nick Coghlan14d99a12012-06-17 21:27:18 +1000199 # Tolerate running the tests from behind a NNTP virus checker
Antoine Pitrou1f5d2a02012-06-24 16:28:18 +0200200 blacklist = lambda line: line.startswith(b'X-Antivirus')
201 filtered_head_lines = [line for line in head.lines
202 if not blacklist(line)]
Nick Coghlan14d99a12012-06-17 21:27:18 +1000203 filtered_lines = [line for line in article.lines
Antoine Pitrou1f5d2a02012-06-24 16:28:18 +0200204 if not blacklist(line)]
205 self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000206
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000207 def test_capabilities(self):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000208 # The server under test implements NNTP version 2 and has a
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000209 # couple of well-known capabilities. Just sanity check that we
210 # got them.
211 def _check_caps(caps):
212 caps_list = caps['LIST']
213 self.assertIsInstance(caps_list, (list, tuple))
214 self.assertIn('OVERVIEW.FMT', caps_list)
215 self.assertGreaterEqual(self.server.nntp_version, 2)
216 _check_caps(self.server.getcapabilities())
217 # This re-emits the command
218 resp, caps = self.server.capabilities()
219 _check_caps(caps)
220
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000221 def test_zlogin(self):
222 # This test must be the penultimate because further commands will be
223 # refused.
224 baduser = "notarealuser"
225 badpw = "notarealpassword"
226 # Check that bogus credentials cause failure
227 self.assertRaises(nntplib.NNTPError, self.server.login,
228 user=baduser, password=badpw, usenetrc=False)
229 # FIXME: We should check that correct credentials succeed, but that
230 # would require valid details for some server somewhere to be in the
231 # test suite, I think. Gmane is anonymous, at least as used for the
232 # other tests.
233
234 def test_zzquit(self):
235 # This test must be called last, hence the name
236 cls = type(self)
Antoine Pitrou3bce11c2010-11-21 17:14:19 +0000237 try:
238 self.server.quit()
239 finally:
240 cls.server = None
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000241
Antoine Pitroude609182010-11-18 17:29:23 +0000242 @classmethod
243 def wrap_methods(cls):
244 # Wrap all methods in a transient_internet() exception catcher
245 # XXX put a generic version in test.support?
246 def wrap_meth(meth):
247 @functools.wraps(meth)
248 def wrapped(self):
Serhiy Storchakabfb1cf42020-04-29 10:36:20 +0300249 with socket_helper.transient_internet(self.NNTP_HOST):
Antoine Pitroude609182010-11-18 17:29:23 +0000250 meth(self)
251 return wrapped
252 for name in dir(cls):
253 if not name.startswith('test_'):
254 continue
255 meth = getattr(cls, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200256 if not callable(meth):
Antoine Pitroude609182010-11-18 17:29:23 +0000257 continue
258 # Need to use a closure so that meth remains bound to its current
259 # value
260 setattr(cls, name, wrap_meth(meth))
261
Dong-hee Na1b335ae2020-01-12 02:39:15 +0900262 def test_timeout(self):
263 with self.assertRaises(ValueError):
264 self.NNTP_CLASS(self.NNTP_HOST, timeout=0, usenetrc=False)
265
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000266 def test_with_statement(self):
267 def is_connected():
268 if not hasattr(server, 'file'):
269 return False
270 try:
271 server.help()
Andrew Svetlov0832af62012-12-18 23:10:48 +0200272 except (OSError, EOFError):
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000273 return False
274 return True
275
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400276 try:
Victor Stinner1d0f9b32019-12-10 22:09:23 +0100277 server = self.NNTP_CLASS(self.NNTP_HOST,
278 timeout=support.INTERNET_TIMEOUT,
279 usenetrc=False)
280 with server:
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400281 self.assertTrue(is_connected())
282 self.assertTrue(server.help())
283 self.assertFalse(is_connected())
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000284
Victor Stinner1d0f9b32019-12-10 22:09:23 +0100285 server = self.NNTP_CLASS(self.NNTP_HOST,
286 timeout=support.INTERNET_TIMEOUT,
287 usenetrc=False)
288 with server:
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400289 server.quit()
290 self.assertFalse(is_connected())
291 except SSLError as ssl_err:
292 # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
293 if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
294 raise unittest.SkipTest(f"Got {ssl_err} connecting "
295 f"to {self.NNTP_HOST!r}")
296 raise
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000297
298
Antoine Pitroude609182010-11-18 17:29:23 +0000299NetworkedNNTPTestsMixin.wrap_methods()
300
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000301
INADA Naoki067931d2017-07-26 23:43:22 +0900302EOF_ERRORS = (EOFError,)
Victor Stinner5b4feb72017-07-24 17:41:02 +0200303if ssl is not None:
INADA Naoki067931d2017-07-26 23:43:22 +0900304 EOF_ERRORS += (ssl.SSLEOFError,)
Victor Stinner5b4feb72017-07-24 17:41:02 +0200305
306
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000307class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
308 # This server supports STARTTLS (gmane doesn't)
309 NNTP_HOST = 'news.trigofacile.com'
310 GROUP_NAME = 'fr.comp.lang.python'
311 GROUP_PAT = 'fr.comp.lang.*'
312
Antoine Pitroude609182010-11-18 17:29:23 +0000313 NNTP_CLASS = NNTP
314
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000315 @classmethod
316 def setUpClass(cls):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000317 support.requires("network")
Serhiy Storchakabfb1cf42020-04-29 10:36:20 +0300318 with socket_helper.transient_internet(cls.NNTP_HOST):
Victor Stinner5bccca52017-04-27 17:30:13 +0200319 try:
Victor Stinner1d0f9b32019-12-10 22:09:23 +0100320 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST,
321 timeout=support.INTERNET_TIMEOUT,
Victor Stinner5bccca52017-04-27 17:30:13 +0200322 usenetrc=False)
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400323 except SSLError as ssl_err:
324 # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
325 if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
326 raise unittest.SkipTest(f"{cls} got {ssl_err} connecting "
327 f"to {cls.NNTP_HOST!r}")
328 raise
Victor Stinner5b4feb72017-07-24 17:41:02 +0200329 except EOF_ERRORS:
Victor Stinner5bccca52017-04-27 17:30:13 +0200330 raise unittest.SkipTest(f"{cls} got EOF error on connecting "
331 f"to {cls.NNTP_HOST!r}")
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000332
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000333 @classmethod
334 def tearDownClass(cls):
335 if cls.server is not None:
336 cls.server.quit()
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000337
Serhiy Storchaka43767632013-11-03 21:31:38 +0200338@unittest.skipUnless(ssl, 'requires SSL support')
339class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000340
Serhiy Storchaka43767632013-11-03 21:31:38 +0200341 # Technical limits for this public NNTP server (see http://www.aioe.org):
342 # "Only two concurrent connections per IP address are allowed and
343 # 400 connections per day are accepted from each IP address."
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000344
Serhiy Storchaka43767632013-11-03 21:31:38 +0200345 NNTP_HOST = 'nntp.aioe.org'
346 GROUP_NAME = 'comp.lang.python'
347 GROUP_PAT = 'comp.lang.*'
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000348
Serhiy Storchaka43767632013-11-03 21:31:38 +0200349 NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None)
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000350
Serhiy Storchaka43767632013-11-03 21:31:38 +0200351 # Disabled as it produces too much data
352 test_list = None
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000353
Serhiy Storchaka43767632013-11-03 21:31:38 +0200354 # Disabled as the connection will already be encrypted.
355 test_starttls = None
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000356
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000357
358#
359# Non-networked tests using a local server (or something mocking it).
360#
361
362class _NNTPServerIO(io.RawIOBase):
363 """A raw IO object allowing NNTP commands to be received and processed
364 by a handler. The handler can push responses which can then be read
365 from the IO object."""
366
367 def __init__(self, handler):
368 io.RawIOBase.__init__(self)
369 # The channel from the client
370 self.c2s = io.BytesIO()
371 # The channel to the client
372 self.s2c = io.BytesIO()
373 self.handler = handler
374 self.handler.start(self.c2s.readline, self.push_data)
375
376 def readable(self):
377 return True
378
379 def writable(self):
380 return True
381
382 def push_data(self, data):
383 """Push (buffer) some data to send to the client."""
384 pos = self.s2c.tell()
385 self.s2c.seek(0, 2)
386 self.s2c.write(data)
387 self.s2c.seek(pos)
388
389 def write(self, b):
390 """The client sends us some data"""
391 pos = self.c2s.tell()
392 self.c2s.write(b)
393 self.c2s.seek(pos)
394 self.handler.process_pending()
395 return len(b)
396
397 def readinto(self, buf):
398 """The client wants to read a response"""
399 self.handler.process_pending()
400 b = self.s2c.read(len(buf))
401 n = len(b)
402 buf[:n] = b
403 return n
404
405
Serhiy Storchaka52027c32015-03-21 09:40:26 +0200406def make_mock_file(handler):
407 sio = _NNTPServerIO(handler)
408 # Using BufferedRWPair instead of BufferedRandom ensures the file
409 # isn't seekable.
410 file = io.BufferedRWPair(sio, sio)
411 return (sio, file)
412
413
Dong-hee Naaa92a7c2020-05-16 19:31:54 +0900414class NNTPServer(nntplib.NNTP):
415
416 def __init__(self, f, host, readermode=None):
417 self.file = f
418 self.host = host
419 self._base_init(readermode)
420
421 def _close(self):
422 self.file.close()
423 del self.file
424
425
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000426class MockedNNTPTestsMixin:
427 # Override in derived classes
428 handler_class = None
429
430 def setUp(self):
431 super().setUp()
432 self.make_server()
433
434 def tearDown(self):
435 super().tearDown()
436 del self.server
437
438 def make_server(self, *args, **kwargs):
439 self.handler = self.handler_class()
Serhiy Storchaka52027c32015-03-21 09:40:26 +0200440 self.sio, file = make_mock_file(self.handler)
Dong-hee Naaa92a7c2020-05-16 19:31:54 +0900441 self.server = NNTPServer(file, 'test.server', *args, **kwargs)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000442 return self.server
443
444
Antoine Pitrou71135622012-02-14 23:29:34 +0100445class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin):
446 def setUp(self):
447 super().setUp()
448 self.make_server(readermode=True)
449
450
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000451class NNTPv1Handler:
452 """A handler for RFC 977"""
453
454 welcome = "200 NNTP mock server"
455
456 def start(self, readline, push_data):
457 self.in_body = False
458 self.allow_posting = True
459 self._readline = readline
460 self._push_data = push_data
Antoine Pitrou54411c12012-02-12 19:14:17 +0100461 self._logged_in = False
462 self._user_sent = False
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000463 # Our welcome
464 self.handle_welcome()
465
466 def _decode(self, data):
467 return str(data, "utf-8", "surrogateescape")
468
469 def process_pending(self):
470 if self.in_body:
471 while True:
472 line = self._readline()
473 if not line:
474 return
475 self.body.append(line)
476 if line == b".\r\n":
477 break
478 try:
479 meth, tokens = self.body_callback
480 meth(*tokens, body=self.body)
481 finally:
482 self.body_callback = None
483 self.body = None
484 self.in_body = False
485 while True:
486 line = self._decode(self._readline())
487 if not line:
488 return
489 if not line.endswith("\r\n"):
490 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
491 line = line[:-2]
492 cmd, *tokens = line.split()
493 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
494 meth = getattr(self, "handle_" + cmd.upper(), None)
495 if meth is None:
496 self.handle_unknown()
497 else:
498 try:
499 meth(*tokens)
500 except Exception as e:
501 raise ValueError("command failed: {!r}".format(line)) from e
502 else:
503 if self.in_body:
504 self.body_callback = meth, tokens
505 self.body = []
506
507 def expect_body(self):
508 """Flag that the client is expected to post a request body"""
509 self.in_body = True
510
511 def push_data(self, data):
512 """Push some binary data"""
513 self._push_data(data)
514
515 def push_lit(self, lit):
516 """Push a string literal"""
517 lit = textwrap.dedent(lit)
518 lit = "\r\n".join(lit.splitlines()) + "\r\n"
519 lit = lit.encode('utf-8')
520 self.push_data(lit)
521
522 def handle_unknown(self):
523 self.push_lit("500 What?")
524
525 def handle_welcome(self):
526 self.push_lit(self.welcome)
527
528 def handle_QUIT(self):
529 self.push_lit("205 Bye!")
530
531 def handle_DATE(self):
532 self.push_lit("111 20100914001155")
533
534 def handle_GROUP(self, group):
535 if group == "fr.comp.lang.python":
536 self.push_lit("211 486 761 1265 fr.comp.lang.python")
537 else:
538 self.push_lit("411 No such group {}".format(group))
539
540 def handle_HELP(self):
541 self.push_lit("""\
542 100 Legal commands
543 authinfo user Name|pass Password|generic <prog> <args>
544 date
545 help
546 Report problems to <root@example.org>
547 .""")
548
549 def handle_STAT(self, message_spec=None):
550 if message_spec is None:
551 self.push_lit("412 No newsgroup selected")
552 elif message_spec == "3000234":
553 self.push_lit("223 3000234 <45223423@example.com>")
554 elif message_spec == "<45223423@example.com>":
555 self.push_lit("223 0 <45223423@example.com>")
556 else:
557 self.push_lit("430 No Such Article Found")
558
559 def handle_NEXT(self):
560 self.push_lit("223 3000237 <668929@example.org> retrieved")
561
562 def handle_LAST(self):
563 self.push_lit("223 3000234 <45223423@example.com> retrieved")
564
565 def handle_LIST(self, action=None, param=None):
566 if action is None:
567 self.push_lit("""\
568 215 Newsgroups in form "group high low flags".
569 comp.lang.python 0000052340 0000002828 y
570 comp.lang.python.announce 0000001153 0000000993 m
571 free.it.comp.lang.python 0000000002 0000000002 y
572 fr.comp.lang.python 0000001254 0000000760 y
573 free.it.comp.lang.python.learner 0000000000 0000000001 y
574 tw.bbs.comp.lang.python 0000000304 0000000304 y
575 .""")
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000576 elif action == "ACTIVE":
577 if param == "*distutils*":
578 self.push_lit("""\
579 215 Newsgroups in form "group high low flags"
580 gmane.comp.python.distutils.devel 0000014104 0000000001 m
581 gmane.comp.python.distutils.cvs 0000000000 0000000001 m
582 .""")
583 else:
584 self.push_lit("""\
585 215 Newsgroups in form "group high low flags"
586 .""")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000587 elif action == "OVERVIEW.FMT":
588 self.push_lit("""\
589 215 Order of fields in overview database.
590 Subject:
591 From:
592 Date:
593 Message-ID:
594 References:
595 Bytes:
596 Lines:
597 Xref:full
598 .""")
599 elif action == "NEWSGROUPS":
600 assert param is not None
601 if param == "comp.lang.python":
602 self.push_lit("""\
603 215 Descriptions in form "group description".
604 comp.lang.python\tThe Python computer language.
605 .""")
606 elif param == "comp.lang.python*":
607 self.push_lit("""\
608 215 Descriptions in form "group description".
609 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
610 comp.lang.python\tThe Python computer language.
611 .""")
612 else:
613 self.push_lit("""\
614 215 Descriptions in form "group description".
615 .""")
616 else:
617 self.push_lit('501 Unknown LIST keyword')
618
619 def handle_NEWNEWS(self, group, date_str, time_str):
620 # We hard code different return messages depending on passed
621 # argument and date syntax.
622 if (group == "comp.lang.python" and date_str == "20100913"
623 and time_str == "082004"):
624 # Date was passed in RFC 3977 format (NNTP "v2")
625 self.push_lit("""\
626 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
627 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
628 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
629 .""")
630 elif (group == "comp.lang.python" and date_str == "100913"
631 and time_str == "082004"):
632 # Date was passed in RFC 977 format (NNTP "v1")
633 self.push_lit("""\
634 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
635 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
636 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
637 .""")
Georg Brandl28e78412013-10-27 07:29:47 +0100638 elif (group == 'comp.lang.python' and
639 date_str in ('20100101', '100101') and
640 time_str == '090000'):
641 self.push_lit('too long line' * 3000 +
642 '\n.')
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000643 else:
644 self.push_lit("""\
645 230 An empty list of newsarticles follows
646 .""")
647 # (Note for experiments: many servers disable NEWNEWS.
648 # As of this writing, sicinfo3.epfl.ch doesn't.)
649
650 def handle_XOVER(self, message_spec):
651 if message_spec == "57-59":
652 self.push_lit(
653 "224 Overview information for 57-58 follows\n"
654 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
655 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
656 "\tSat, 19 Jun 2010 18:04:08 -0400"
657 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
658 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
Dong-hee Na2e6a8ef2020-01-09 00:29:34 +0900659 "\tXref: news.gmane.io gmane.comp.python.authors:57"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000660 "\n"
661 "58\tLooking for a few good bloggers"
662 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
663 "\tThu, 22 Jul 2010 09:14:14 -0400"
664 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
665 "\t\t6683\t16"
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000666 "\t"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000667 "\n"
Martin Panter6245cb32016-04-15 02:14:19 +0000668 # A UTF-8 overview line from fr.comp.lang.python
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000669 "59\tRe: Message d'erreur incompréhensible (par moi)"
670 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
671 "\tWed, 15 Sep 2010 18:09:15 +0200"
672 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
673 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
674 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
675 "\n"
676 ".\n")
677 else:
678 self.push_lit("""\
679 224 No articles
680 .""")
681
682 def handle_POST(self, *, body=None):
683 if body is None:
684 if self.allow_posting:
685 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
686 self.expect_body()
687 else:
688 self.push_lit("440 Posting not permitted")
689 else:
690 assert self.allow_posting
691 self.push_lit("240 Article received OK")
692 self.posted_body = body
693
694 def handle_IHAVE(self, message_id, *, body=None):
695 if body is None:
696 if (self.allow_posting and
697 message_id == "<i.am.an.article.you.will.want@example.com>"):
698 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
699 self.expect_body()
700 else:
701 self.push_lit("435 Article not wanted")
702 else:
703 assert self.allow_posting
704 self.push_lit("235 Article transferred OK")
705 self.posted_body = body
706
707 sample_head = """\
708 From: "Demo User" <nobody@example.net>
709 Subject: I am just a test article
710 Content-Type: text/plain; charset=UTF-8; format=flowed
711 Message-ID: <i.am.an.article.you.will.want@example.com>"""
712
713 sample_body = """\
714 This is just a test article.
715 ..Here is a dot-starting line.
716
717 -- Signed by Andr\xe9."""
718
719 sample_article = sample_head + "\n\n" + sample_body
720
721 def handle_ARTICLE(self, message_spec=None):
722 if message_spec is None:
723 self.push_lit("220 3000237 <45223423@example.com>")
724 elif message_spec == "<45223423@example.com>":
725 self.push_lit("220 0 <45223423@example.com>")
726 elif message_spec == "3000234":
727 self.push_lit("220 3000234 <45223423@example.com>")
728 else:
729 self.push_lit("430 No Such Article Found")
730 return
731 self.push_lit(self.sample_article)
732 self.push_lit(".")
733
734 def handle_HEAD(self, message_spec=None):
735 if message_spec is None:
736 self.push_lit("221 3000237 <45223423@example.com>")
737 elif message_spec == "<45223423@example.com>":
738 self.push_lit("221 0 <45223423@example.com>")
739 elif message_spec == "3000234":
740 self.push_lit("221 3000234 <45223423@example.com>")
741 else:
742 self.push_lit("430 No Such Article Found")
743 return
744 self.push_lit(self.sample_head)
745 self.push_lit(".")
746
747 def handle_BODY(self, message_spec=None):
748 if message_spec is None:
749 self.push_lit("222 3000237 <45223423@example.com>")
750 elif message_spec == "<45223423@example.com>":
751 self.push_lit("222 0 <45223423@example.com>")
752 elif message_spec == "3000234":
753 self.push_lit("222 3000234 <45223423@example.com>")
754 else:
755 self.push_lit("430 No Such Article Found")
756 return
757 self.push_lit(self.sample_body)
758 self.push_lit(".")
759
Antoine Pitrou54411c12012-02-12 19:14:17 +0100760 def handle_AUTHINFO(self, cred_type, data):
761 if self._logged_in:
762 self.push_lit('502 Already Logged In')
763 elif cred_type == 'user':
764 if self._user_sent:
765 self.push_lit('482 User Credential Already Sent')
766 else:
767 self.push_lit('381 Password Required')
768 self._user_sent = True
769 elif cred_type == 'pass':
770 self.push_lit('281 Login Successful')
771 self._logged_in = True
772 else:
773 raise Exception('Unknown cred type {}'.format(cred_type))
774
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000775
776class NNTPv2Handler(NNTPv1Handler):
777 """A handler for RFC 3977 (NNTP "v2")"""
778
779 def handle_CAPABILITIES(self):
Antoine Pitrou54411c12012-02-12 19:14:17 +0100780 fmt = """\
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000781 101 Capability list:
Antoine Pitrouf80b3f72010-11-02 22:31:52 +0000782 VERSION 2 3
Antoine Pitrou54411c12012-02-12 19:14:17 +0100783 IMPLEMENTATION INN 2.5.1{}
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000784 HDR
785 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
786 OVER
787 POST
788 READER
Antoine Pitrou54411c12012-02-12 19:14:17 +0100789 ."""
790
791 if not self._logged_in:
792 self.push_lit(fmt.format('\n AUTHINFO USER'))
793 else:
794 self.push_lit(fmt.format(''))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000795
Antoine Pitrou71135622012-02-14 23:29:34 +0100796 def handle_MODE(self, _):
797 raise Exception('MODE READER sent despite READER has been advertised')
798
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000799 def handle_OVER(self, message_spec=None):
800 return self.handle_XOVER(message_spec)
801
802
Antoine Pitrou54411c12012-02-12 19:14:17 +0100803class CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
804 """A handler that allows CAPABILITIES only after login"""
805
806 def handle_CAPABILITIES(self):
807 if not self._logged_in:
808 self.push_lit('480 You must log in.')
809 else:
810 super().handle_CAPABILITIES()
811
812
Antoine Pitrou71135622012-02-14 23:29:34 +0100813class ModeSwitchingNNTPv2Handler(NNTPv2Handler):
814 """A server that starts in transit mode"""
815
816 def __init__(self):
817 self._switched = False
818
819 def handle_CAPABILITIES(self):
820 fmt = """\
821 101 Capability list:
822 VERSION 2 3
823 IMPLEMENTATION INN 2.5.1
824 HDR
825 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
826 OVER
827 POST
828 {}READER
829 ."""
830 if self._switched:
831 self.push_lit(fmt.format(''))
832 else:
833 self.push_lit(fmt.format('MODE-'))
834
835 def handle_MODE(self, what):
836 assert not self._switched and what == 'reader'
837 self._switched = True
838 self.push_lit('200 Posting allowed')
839
840
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000841class NNTPv1v2TestsMixin:
842
843 def setUp(self):
844 super().setUp()
845
846 def test_welcome(self):
847 self.assertEqual(self.server.welcome, self.handler.welcome)
848
Antoine Pitrou54411c12012-02-12 19:14:17 +0100849 def test_authinfo(self):
850 if self.nntp_version == 2:
851 self.assertIn('AUTHINFO', self.server._caps)
852 self.server.login('testuser', 'testpw')
853 # if AUTHINFO is gone from _caps we also know that getcapabilities()
854 # has been called after login as it should
855 self.assertNotIn('AUTHINFO', self.server._caps)
856
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000857 def test_date(self):
858 resp, date = self.server.date()
859 self.assertEqual(resp, "111 20100914001155")
860 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
861
862 def test_quit(self):
863 self.assertFalse(self.sio.closed)
864 resp = self.server.quit()
865 self.assertEqual(resp, "205 Bye!")
866 self.assertTrue(self.sio.closed)
867
868 def test_help(self):
869 resp, help = self.server.help()
870 self.assertEqual(resp, "100 Legal commands")
871 self.assertEqual(help, [
872 ' authinfo user Name|pass Password|generic <prog> <args>',
873 ' date',
874 ' help',
875 'Report problems to <root@example.org>',
876 ])
877
878 def test_list(self):
879 resp, groups = self.server.list()
880 self.assertEqual(len(groups), 6)
881 g = groups[1]
882 self.assertEqual(g,
883 GroupInfo("comp.lang.python.announce", "0000001153",
884 "0000000993", "m"))
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000885 resp, groups = self.server.list("*distutils*")
886 self.assertEqual(len(groups), 2)
887 g = groups[0]
888 self.assertEqual(g,
889 GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
890 "0000000001", "m"))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000891
892 def test_stat(self):
893 resp, art_num, message_id = self.server.stat(3000234)
894 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
895 self.assertEqual(art_num, 3000234)
896 self.assertEqual(message_id, "<45223423@example.com>")
897 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
898 self.assertEqual(resp, "223 0 <45223423@example.com>")
899 self.assertEqual(art_num, 0)
900 self.assertEqual(message_id, "<45223423@example.com>")
901 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
902 self.server.stat("<non.existent.id>")
903 self.assertEqual(cm.exception.response, "430 No Such Article Found")
904 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
905 self.server.stat()
906 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
907
908 def test_next(self):
909 resp, art_num, message_id = self.server.next()
910 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
911 self.assertEqual(art_num, 3000237)
912 self.assertEqual(message_id, "<668929@example.org>")
913
914 def test_last(self):
915 resp, art_num, message_id = self.server.last()
916 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
917 self.assertEqual(art_num, 3000234)
918 self.assertEqual(message_id, "<45223423@example.com>")
919
920 def test_description(self):
921 desc = self.server.description("comp.lang.python")
922 self.assertEqual(desc, "The Python computer language.")
923 desc = self.server.description("comp.lang.pythonx")
924 self.assertEqual(desc, "")
925
926 def test_descriptions(self):
927 resp, groups = self.server.descriptions("comp.lang.python")
928 self.assertEqual(resp, '215 Descriptions in form "group description".')
929 self.assertEqual(groups, {
930 "comp.lang.python": "The Python computer language.",
931 })
932 resp, groups = self.server.descriptions("comp.lang.python*")
933 self.assertEqual(groups, {
934 "comp.lang.python": "The Python computer language.",
935 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
936 })
937 resp, groups = self.server.descriptions("comp.lang.pythonx")
938 self.assertEqual(groups, {})
939
940 def test_group(self):
941 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
942 self.assertTrue(resp.startswith("211 "), resp)
943 self.assertEqual(first, 761)
944 self.assertEqual(last, 1265)
945 self.assertEqual(count, 486)
946 self.assertEqual(group, "fr.comp.lang.python")
947 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
948 self.server.group("comp.lang.python.devel")
949 exc = cm.exception
950 self.assertTrue(exc.response.startswith("411 No such group"),
951 exc.response)
952
953 def test_newnews(self):
954 # NEWNEWS comp.lang.python [20]100913 082004
955 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
956 resp, ids = self.server.newnews("comp.lang.python", dt)
957 expected = (
958 "230 list of newsarticles (NNTP v{0}) "
959 "created after Mon Sep 13 08:20:04 2010 follows"
960 ).format(self.nntp_version)
961 self.assertEqual(resp, expected)
962 self.assertEqual(ids, [
963 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
964 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
965 ])
966 # NEWNEWS fr.comp.lang.python [20]100913 082004
967 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
968 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
969 self.assertEqual(resp, "230 An empty list of newsarticles follows")
970 self.assertEqual(ids, [])
971
972 def _check_article_body(self, lines):
973 self.assertEqual(len(lines), 4)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +0000974 self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000975 self.assertEqual(lines[-2], b"")
976 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
977 self.assertEqual(lines[-4], b"This is just a test article.")
978
979 def _check_article_head(self, lines):
980 self.assertEqual(len(lines), 4)
981 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
982 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
983
984 def _check_article_data(self, lines):
985 self.assertEqual(len(lines), 9)
986 self._check_article_head(lines[:4])
987 self._check_article_body(lines[-4:])
988 self.assertEqual(lines[4], b"")
989
990 def test_article(self):
991 # ARTICLE
992 resp, info = self.server.article()
993 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
994 art_num, message_id, lines = info
995 self.assertEqual(art_num, 3000237)
996 self.assertEqual(message_id, "<45223423@example.com>")
997 self._check_article_data(lines)
998 # ARTICLE num
999 resp, info = self.server.article(3000234)
1000 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
1001 art_num, message_id, lines = info
1002 self.assertEqual(art_num, 3000234)
1003 self.assertEqual(message_id, "<45223423@example.com>")
1004 self._check_article_data(lines)
1005 # ARTICLE id
1006 resp, info = self.server.article("<45223423@example.com>")
1007 self.assertEqual(resp, "220 0 <45223423@example.com>")
1008 art_num, message_id, lines = info
1009 self.assertEqual(art_num, 0)
1010 self.assertEqual(message_id, "<45223423@example.com>")
1011 self._check_article_data(lines)
1012 # Non-existent id
1013 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1014 self.server.article("<non-existent@example.com>")
1015 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1016
1017 def test_article_file(self):
1018 # With a "file" argument
1019 f = io.BytesIO()
1020 resp, info = self.server.article(file=f)
1021 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
1022 art_num, message_id, lines = info
1023 self.assertEqual(art_num, 3000237)
1024 self.assertEqual(message_id, "<45223423@example.com>")
1025 self.assertEqual(lines, [])
1026 data = f.getvalue()
1027 self.assertTrue(data.startswith(
1028 b'From: "Demo User" <nobody@example.net>\r\n'
1029 b'Subject: I am just a test article\r\n'
1030 ), ascii(data))
1031 self.assertTrue(data.endswith(
1032 b'This is just a test article.\r\n'
1033 b'.Here is a dot-starting line.\r\n'
1034 b'\r\n'
1035 b'-- Signed by Andr\xc3\xa9.\r\n'
1036 ), ascii(data))
1037
1038 def test_head(self):
1039 # HEAD
1040 resp, info = self.server.head()
1041 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1042 art_num, message_id, lines = info
1043 self.assertEqual(art_num, 3000237)
1044 self.assertEqual(message_id, "<45223423@example.com>")
1045 self._check_article_head(lines)
1046 # HEAD num
1047 resp, info = self.server.head(3000234)
1048 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
1049 art_num, message_id, lines = info
1050 self.assertEqual(art_num, 3000234)
1051 self.assertEqual(message_id, "<45223423@example.com>")
1052 self._check_article_head(lines)
1053 # HEAD id
1054 resp, info = self.server.head("<45223423@example.com>")
1055 self.assertEqual(resp, "221 0 <45223423@example.com>")
1056 art_num, message_id, lines = info
1057 self.assertEqual(art_num, 0)
1058 self.assertEqual(message_id, "<45223423@example.com>")
1059 self._check_article_head(lines)
1060 # Non-existent id
1061 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1062 self.server.head("<non-existent@example.com>")
1063 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1064
Antoine Pitrou2640b522012-02-15 18:53:18 +01001065 def test_head_file(self):
1066 f = io.BytesIO()
1067 resp, info = self.server.head(file=f)
1068 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1069 art_num, message_id, lines = info
1070 self.assertEqual(art_num, 3000237)
1071 self.assertEqual(message_id, "<45223423@example.com>")
1072 self.assertEqual(lines, [])
1073 data = f.getvalue()
1074 self.assertTrue(data.startswith(
1075 b'From: "Demo User" <nobody@example.net>\r\n'
1076 b'Subject: I am just a test article\r\n'
1077 ), ascii(data))
1078 self.assertFalse(data.endswith(
1079 b'This is just a test article.\r\n'
1080 b'.Here is a dot-starting line.\r\n'
1081 b'\r\n'
1082 b'-- Signed by Andr\xc3\xa9.\r\n'
1083 ), ascii(data))
1084
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001085 def test_body(self):
1086 # BODY
1087 resp, info = self.server.body()
1088 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1089 art_num, message_id, lines = info
1090 self.assertEqual(art_num, 3000237)
1091 self.assertEqual(message_id, "<45223423@example.com>")
1092 self._check_article_body(lines)
1093 # BODY num
1094 resp, info = self.server.body(3000234)
1095 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
1096 art_num, message_id, lines = info
1097 self.assertEqual(art_num, 3000234)
1098 self.assertEqual(message_id, "<45223423@example.com>")
1099 self._check_article_body(lines)
1100 # BODY id
1101 resp, info = self.server.body("<45223423@example.com>")
1102 self.assertEqual(resp, "222 0 <45223423@example.com>")
1103 art_num, message_id, lines = info
1104 self.assertEqual(art_num, 0)
1105 self.assertEqual(message_id, "<45223423@example.com>")
1106 self._check_article_body(lines)
1107 # Non-existent id
1108 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1109 self.server.body("<non-existent@example.com>")
1110 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1111
Antoine Pitrou2640b522012-02-15 18:53:18 +01001112 def test_body_file(self):
1113 f = io.BytesIO()
1114 resp, info = self.server.body(file=f)
1115 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1116 art_num, message_id, lines = info
1117 self.assertEqual(art_num, 3000237)
1118 self.assertEqual(message_id, "<45223423@example.com>")
1119 self.assertEqual(lines, [])
1120 data = f.getvalue()
1121 self.assertFalse(data.startswith(
1122 b'From: "Demo User" <nobody@example.net>\r\n'
1123 b'Subject: I am just a test article\r\n'
1124 ), ascii(data))
1125 self.assertTrue(data.endswith(
1126 b'This is just a test article.\r\n'
1127 b'.Here is a dot-starting line.\r\n'
1128 b'\r\n'
1129 b'-- Signed by Andr\xc3\xa9.\r\n'
1130 ), ascii(data))
1131
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001132 def check_over_xover_resp(self, resp, overviews):
1133 self.assertTrue(resp.startswith("224 "), resp)
1134 self.assertEqual(len(overviews), 3)
1135 art_num, over = overviews[0]
1136 self.assertEqual(art_num, 57)
1137 self.assertEqual(over, {
1138 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
1139 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
1140 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
1141 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
1142 "references": "<hvalf7$ort$1@dough.gmane.org>",
1143 ":bytes": "7103",
1144 ":lines": "16",
Dong-hee Na2e6a8ef2020-01-09 00:29:34 +09001145 "xref": "news.gmane.io gmane.comp.python.authors:57"
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001146 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001147 art_num, over = overviews[1]
1148 self.assertEqual(over["xref"], None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001149 art_num, over = overviews[2]
1150 self.assertEqual(over["subject"],
1151 "Re: Message d'erreur incompréhensible (par moi)")
1152
1153 def test_xover(self):
1154 resp, overviews = self.server.xover(57, 59)
1155 self.check_over_xover_resp(resp, overviews)
1156
1157 def test_over(self):
1158 # In NNTP "v1", this will fallback on XOVER
1159 resp, overviews = self.server.over((57, 59))
1160 self.check_over_xover_resp(resp, overviews)
1161
1162 sample_post = (
1163 b'From: "Demo User" <nobody@example.net>\r\n'
1164 b'Subject: I am just a test article\r\n'
1165 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
1166 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
1167 b'\r\n'
1168 b'This is just a test article.\r\n'
1169 b'.Here is a dot-starting line.\r\n'
1170 b'\r\n'
1171 b'-- Signed by Andr\xc3\xa9.\r\n'
1172 )
1173
1174 def _check_posted_body(self):
1175 # Check the raw body as received by the server
1176 lines = self.handler.posted_body
1177 # One additional line for the "." terminator
1178 self.assertEqual(len(lines), 10)
1179 self.assertEqual(lines[-1], b'.\r\n')
1180 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
1181 self.assertEqual(lines[-3], b'\r\n')
1182 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
1183 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
1184
1185 def _check_post_ihave_sub(self, func, *args, file_factory):
1186 # First the prepared post with CRLF endings
1187 post = self.sample_post
1188 func_args = args + (file_factory(post),)
1189 self.handler.posted_body = None
1190 resp = func(*func_args)
1191 self._check_posted_body()
1192 # Then the same post with "normal" line endings - they should be
1193 # converted by NNTP.post and NNTP.ihave.
1194 post = self.sample_post.replace(b"\r\n", b"\n")
1195 func_args = args + (file_factory(post),)
1196 self.handler.posted_body = None
1197 resp = func(*func_args)
1198 self._check_posted_body()
1199 return resp
1200
1201 def check_post_ihave(self, func, success_resp, *args):
1202 # With a bytes object
1203 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1204 self.assertEqual(resp, success_resp)
1205 # With a bytearray object
1206 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1207 self.assertEqual(resp, success_resp)
1208 # With a file object
1209 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1210 self.assertEqual(resp, success_resp)
1211 # With an iterable of terminated lines
1212 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001213 return iter(b.splitlines(keepends=True))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001214 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1215 self.assertEqual(resp, success_resp)
1216 # With an iterable of non-terminated lines
1217 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001218 return iter(b.splitlines(keepends=False))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001219 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1220 self.assertEqual(resp, success_resp)
1221
1222 def test_post(self):
1223 self.check_post_ihave(self.server.post, "240 Article received OK")
1224 self.handler.allow_posting = False
1225 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1226 self.server.post(self.sample_post)
1227 self.assertEqual(cm.exception.response,
1228 "440 Posting not permitted")
1229
1230 def test_ihave(self):
1231 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1232 "<i.am.an.article.you.will.want@example.com>")
1233 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1234 self.server.ihave("<another.message.id>", self.sample_post)
1235 self.assertEqual(cm.exception.response,
1236 "435 Article not wanted")
1237
Georg Brandl28e78412013-10-27 07:29:47 +01001238 def test_too_long_lines(self):
1239 dt = datetime.datetime(2010, 1, 1, 9, 0, 0)
1240 self.assertRaises(nntplib.NNTPDataError,
1241 self.server.newnews, "comp.lang.python", dt)
1242
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001243
1244class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1245 """Tests an NNTP v1 server (no capabilities)."""
1246
1247 nntp_version = 1
1248 handler_class = NNTPv1Handler
1249
1250 def test_caps(self):
1251 caps = self.server.getcapabilities()
1252 self.assertEqual(caps, {})
1253 self.assertEqual(self.server.nntp_version, 1)
Antoine Pitroua0781152010-11-05 19:16:37 +00001254 self.assertEqual(self.server.nntp_implementation, None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001255
1256
1257class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1258 """Tests an NNTP v2 server (with capabilities)."""
1259
1260 nntp_version = 2
1261 handler_class = NNTPv2Handler
1262
1263 def test_caps(self):
1264 caps = self.server.getcapabilities()
1265 self.assertEqual(caps, {
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001266 'VERSION': ['2', '3'],
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001267 'IMPLEMENTATION': ['INN', '2.5.1'],
1268 'AUTHINFO': ['USER'],
1269 'HDR': [],
1270 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1271 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1272 'OVER': [],
1273 'POST': [],
1274 'READER': [],
1275 })
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001276 self.assertEqual(self.server.nntp_version, 3)
Antoine Pitroua0781152010-11-05 19:16:37 +00001277 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001278
1279
Antoine Pitrou54411c12012-02-12 19:14:17 +01001280class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
1281 """Tests a probably NNTP v2 server with capabilities only after login."""
1282
1283 nntp_version = 2
1284 handler_class = CapsAfterLoginNNTPv2Handler
1285
1286 def test_caps_only_after_login(self):
1287 self.assertEqual(self.server._caps, {})
1288 self.server.login('testuser', 'testpw')
1289 self.assertIn('VERSION', self.server._caps)
1290
1291
Antoine Pitrou71135622012-02-14 23:29:34 +01001292class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin,
1293 unittest.TestCase):
1294 """Same tests as for v2 but we tell NTTP to send MODE READER to a server
1295 that isn't in READER mode by default."""
1296
1297 nntp_version = 2
1298 handler_class = ModeSwitchingNNTPv2Handler
1299
1300 def test_we_are_in_reader_mode_after_connect(self):
1301 self.assertIn('READER', self.server._caps)
1302
1303
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001304class MiscTests(unittest.TestCase):
1305
1306 def test_decode_header(self):
1307 def gives(a, b):
1308 self.assertEqual(nntplib.decode_header(a), b)
1309 gives("" , "")
1310 gives("a plain header", "a plain header")
1311 gives(" with extra spaces ", " with extra spaces ")
1312 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
1313 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1314 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1315 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1316 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1317 "Re: problème de matrice")
1318 # A natively utf-8 header (found in the real world!)
1319 gives("Re: Message d'erreur incompréhensible (par moi)",
1320 "Re: Message d'erreur incompréhensible (par moi)")
1321
1322 def test_parse_overview_fmt(self):
1323 # The minimal (default) response
1324 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1325 "References:", ":bytes", ":lines"]
1326 self.assertEqual(nntplib._parse_overview_fmt(lines),
1327 ["subject", "from", "date", "message-id", "references",
1328 ":bytes", ":lines"])
1329 # The minimal response using alternative names
1330 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1331 "References:", "Bytes:", "Lines:"]
1332 self.assertEqual(nntplib._parse_overview_fmt(lines),
1333 ["subject", "from", "date", "message-id", "references",
1334 ":bytes", ":lines"])
1335 # Variations in casing
1336 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1337 "References:", "BYTES:", "Lines:"]
1338 self.assertEqual(nntplib._parse_overview_fmt(lines),
1339 ["subject", "from", "date", "message-id", "references",
1340 ":bytes", ":lines"])
1341 # First example from RFC 3977
1342 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1343 "References:", ":bytes", ":lines", "Xref:full",
1344 "Distribution:full"]
1345 self.assertEqual(nntplib._parse_overview_fmt(lines),
1346 ["subject", "from", "date", "message-id", "references",
1347 ":bytes", ":lines", "xref", "distribution"])
1348 # Second example from RFC 3977
1349 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1350 "References:", "Bytes:", "Lines:", "Xref:FULL",
1351 "Distribution:FULL"]
1352 self.assertEqual(nntplib._parse_overview_fmt(lines),
1353 ["subject", "from", "date", "message-id", "references",
1354 ":bytes", ":lines", "xref", "distribution"])
1355 # A classic response from INN
1356 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1357 "References:", "Bytes:", "Lines:", "Xref:full"]
1358 self.assertEqual(nntplib._parse_overview_fmt(lines),
1359 ["subject", "from", "date", "message-id", "references",
1360 ":bytes", ":lines", "xref"])
1361
1362 def test_parse_overview(self):
1363 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1364 # First example from RFC 3977
1365 lines = [
1366 '3000234\tI am just a test article\t"Demo User" '
1367 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1368 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1369 '17\tXref: news.example.com misc.test:3000363',
1370 ]
1371 overview = nntplib._parse_overview(lines, fmt)
1372 (art_num, fields), = overview
1373 self.assertEqual(art_num, 3000234)
1374 self.assertEqual(fields, {
1375 'subject': 'I am just a test article',
1376 'from': '"Demo User" <nobody@example.com>',
1377 'date': '6 Oct 1998 04:38:40 -0500',
1378 'message-id': '<45223423@example.com>',
1379 'references': '<45454@example.net>',
1380 ':bytes': '1234',
1381 ':lines': '17',
1382 'xref': 'news.example.com misc.test:3000363',
1383 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001384 # Second example; here the "Xref" field is totally absent (including
1385 # the header name) and comes out as None
1386 lines = [
1387 '3000234\tI am just a test article\t"Demo User" '
1388 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1389 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1390 '17\t\t',
1391 ]
1392 overview = nntplib._parse_overview(lines, fmt)
1393 (art_num, fields), = overview
1394 self.assertEqual(fields['xref'], None)
1395 # Third example; the "Xref" is an empty string, while "references"
1396 # is a single space.
1397 lines = [
1398 '3000234\tI am just a test article\t"Demo User" '
1399 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1400 '<45223423@example.com>\t \t1234\t'
1401 '17\tXref: \t',
1402 ]
1403 overview = nntplib._parse_overview(lines, fmt)
1404 (art_num, fields), = overview
1405 self.assertEqual(fields['references'], ' ')
1406 self.assertEqual(fields['xref'], '')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001407
1408 def test_parse_datetime(self):
1409 def gives(a, b, *c):
1410 self.assertEqual(nntplib._parse_datetime(a, b),
1411 datetime.datetime(*c))
1412 # Output of DATE command
1413 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1414 # Variations
1415 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1416 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1417 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1418
1419 def test_unparse_datetime(self):
1420 # Test non-legacy mode
1421 # 1) with a datetime
1422 def gives(y, M, d, h, m, s, date_str, time_str):
1423 dt = datetime.datetime(y, M, d, h, m, s)
1424 self.assertEqual(nntplib._unparse_datetime(dt),
1425 (date_str, time_str))
1426 self.assertEqual(nntplib._unparse_datetime(dt, False),
1427 (date_str, time_str))
1428 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1429 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1430 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1431 # 2) with a date
1432 def gives(y, M, d, date_str, time_str):
1433 dt = datetime.date(y, M, d)
1434 self.assertEqual(nntplib._unparse_datetime(dt),
1435 (date_str, time_str))
1436 self.assertEqual(nntplib._unparse_datetime(dt, False),
1437 (date_str, time_str))
1438 gives(1999, 6, 23, "19990623", "000000")
1439 gives(2000, 6, 23, "20000623", "000000")
1440 gives(2010, 6, 5, "20100605", "000000")
1441
1442 def test_unparse_datetime_legacy(self):
1443 # Test legacy mode (RFC 977)
1444 # 1) with a datetime
1445 def gives(y, M, d, h, m, s, date_str, time_str):
1446 dt = datetime.datetime(y, M, d, h, m, s)
1447 self.assertEqual(nntplib._unparse_datetime(dt, True),
1448 (date_str, time_str))
1449 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1450 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1451 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1452 # 2) with a date
1453 def gives(y, M, d, date_str, time_str):
1454 dt = datetime.date(y, M, d)
1455 self.assertEqual(nntplib._unparse_datetime(dt, True),
1456 (date_str, time_str))
1457 gives(1999, 6, 23, "990623", "000000")
1458 gives(2000, 6, 23, "000623", "000000")
1459 gives(2010, 6, 5, "100605", "000000")
1460
Serhiy Storchaka43767632013-11-03 21:31:38 +02001461 @unittest.skipUnless(ssl, 'requires SSL support')
1462 def test_ssl_support(self):
1463 self.assertTrue(hasattr(nntplib, 'NNTP_SSL'))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001464
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001465
Berker Peksag96756b62014-09-20 08:53:05 +03001466class PublicAPITests(unittest.TestCase):
1467 """Ensures that the correct values are exposed in the public API."""
1468
1469 def test_module_all_attribute(self):
1470 self.assertTrue(hasattr(nntplib, '__all__'))
1471 target_api = ['NNTP', 'NNTPError', 'NNTPReplyError',
1472 'NNTPTemporaryError', 'NNTPPermanentError',
1473 'NNTPProtocolError', 'NNTPDataError', 'decode_header']
1474 if ssl is not None:
1475 target_api.append('NNTP_SSL')
1476 self.assertEqual(set(nntplib.__all__), set(target_api))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001477
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001478class MockSocketTests(unittest.TestCase):
1479 """Tests involving a mock socket object
1480
1481 Used where the _NNTPServerIO file object is not enough."""
1482
1483 nntp_class = nntplib.NNTP
1484
1485 def check_constructor_error_conditions(
1486 self, handler_class,
1487 expected_error_type, expected_error_msg,
1488 login=None, password=None):
1489
1490 class mock_socket_module:
1491 def create_connection(address, timeout):
1492 return MockSocket()
1493
1494 class MockSocket:
1495 def close(self):
1496 nonlocal socket_closed
1497 socket_closed = True
1498
1499 def makefile(socket, mode):
1500 handler = handler_class()
1501 _, file = make_mock_file(handler)
1502 files.append(file)
1503 return file
1504
1505 socket_closed = False
1506 files = []
1507 with patch('nntplib.socket', mock_socket_module), \
1508 self.assertRaisesRegex(expected_error_type, expected_error_msg):
1509 self.nntp_class('dummy', user=login, password=password)
1510 self.assertTrue(socket_closed)
1511 for f in files:
1512 self.assertTrue(f.closed)
1513
1514 def test_bad_welcome(self):
1515 #Test a bad welcome message
1516 class Handler(NNTPv1Handler):
1517 welcome = 'Bad Welcome'
1518 self.check_constructor_error_conditions(
1519 Handler, nntplib.NNTPProtocolError, Handler.welcome)
1520
1521 def test_service_temporarily_unavailable(self):
1522 #Test service temporarily unavailable
1523 class Handler(NNTPv1Handler):
Martin Pantereb995702016-07-28 01:11:04 +00001524 welcome = '400 Service temporarily unavailable'
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001525 self.check_constructor_error_conditions(
1526 Handler, nntplib.NNTPTemporaryError, Handler.welcome)
1527
1528 def test_service_permanently_unavailable(self):
1529 #Test service permanently unavailable
1530 class Handler(NNTPv1Handler):
Martin Pantereb995702016-07-28 01:11:04 +00001531 welcome = '502 Service permanently unavailable'
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001532 self.check_constructor_error_conditions(
1533 Handler, nntplib.NNTPPermanentError, Handler.welcome)
1534
1535 def test_bad_capabilities(self):
1536 #Test a bad capabilities response
1537 class Handler(NNTPv1Handler):
1538 def handle_CAPABILITIES(self):
1539 self.push_lit(capabilities_response)
1540 capabilities_response = '201 bad capability'
1541 self.check_constructor_error_conditions(
1542 Handler, nntplib.NNTPReplyError, capabilities_response)
1543
1544 def test_login_aborted(self):
1545 #Test a bad authinfo response
1546 login = 't@e.com'
1547 password = 'python'
1548 class Handler(NNTPv1Handler):
1549 def handle_AUTHINFO(self, *args):
1550 self.push_lit(authinfo_response)
1551 authinfo_response = '503 Mechanism not recognized'
1552 self.check_constructor_error_conditions(
1553 Handler, nntplib.NNTPPermanentError, authinfo_response,
1554 login, password)
1555
Serhiy Storchaka80774342015-04-03 15:02:20 +03001556class bypass_context:
1557 """Bypass encryption and actual SSL module"""
1558 def wrap_socket(sock, **args):
1559 return sock
1560
1561@unittest.skipUnless(ssl, 'requires SSL support')
1562class MockSslTests(MockSocketTests):
1563 @staticmethod
1564 def nntp_class(*pos, **kw):
1565 return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw)
Victor Stinner8c9bba02015-04-03 11:06:40 +02001566
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001567
Martin Panter8f19e8e2016-01-19 01:10:58 +00001568class LocalServerTests(unittest.TestCase):
1569 def setUp(self):
1570 sock = socket.socket()
Serhiy Storchaka16994912020-04-25 10:06:29 +03001571 port = socket_helper.bind_port(sock)
Martin Panter8f19e8e2016-01-19 01:10:58 +00001572 sock.listen()
1573 self.background = threading.Thread(
1574 target=self.run_server, args=(sock,))
1575 self.background.start()
1576 self.addCleanup(self.background.join)
1577
Serhiy Storchaka16994912020-04-25 10:06:29 +03001578 self.nntp = NNTP(socket_helper.HOST, port, usenetrc=False).__enter__()
Martin Panter8f19e8e2016-01-19 01:10:58 +00001579 self.addCleanup(self.nntp.__exit__, None, None, None)
1580
1581 def run_server(self, sock):
1582 # Could be generalized to handle more commands in separate methods
1583 with sock:
1584 [client, _] = sock.accept()
1585 with contextlib.ExitStack() as cleanup:
1586 cleanup.enter_context(client)
1587 reader = cleanup.enter_context(client.makefile('rb'))
1588 client.sendall(b'200 Server ready\r\n')
1589 while True:
1590 cmd = reader.readline()
1591 if cmd == b'CAPABILITIES\r\n':
1592 client.sendall(
1593 b'101 Capability list:\r\n'
1594 b'VERSION 2\r\n'
1595 b'STARTTLS\r\n'
1596 b'.\r\n'
1597 )
1598 elif cmd == b'STARTTLS\r\n':
1599 reader.close()
1600 client.sendall(b'382 Begin TLS negotiation now\r\n')
Christian Heimesd0486372016-09-10 23:23:33 +02001601 context = ssl.SSLContext()
1602 context.load_cert_chain(certfile)
1603 client = context.wrap_socket(
1604 client, server_side=True)
Martin Panter8f19e8e2016-01-19 01:10:58 +00001605 cleanup.enter_context(client)
1606 reader = cleanup.enter_context(client.makefile('rb'))
1607 elif cmd == b'QUIT\r\n':
1608 client.sendall(b'205 Bye!\r\n')
1609 break
1610 else:
1611 raise ValueError('Unexpected command {!r}'.format(cmd))
1612
1613 @unittest.skipUnless(ssl, 'requires SSL support')
1614 def test_starttls(self):
1615 file = self.nntp.file
1616 sock = self.nntp.sock
1617 self.nntp.starttls()
1618 # Check that the socket and internal pseudo-file really were
1619 # changed.
1620 self.assertNotEqual(file, self.nntp.file)
1621 self.assertNotEqual(sock, self.nntp.sock)
1622 # Check that the new socket really is an SSL one
1623 self.assertIsInstance(self.nntp.sock, ssl.SSLSocket)
1624 # Check that trying starttls when it's already active fails.
1625 self.assertRaises(ValueError, self.nntp.starttls)
1626
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001627
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001628if __name__ == "__main__":
Berker Peksag96756b62014-09-20 08:53:05 +03001629 unittest.main()