blob: 4dbf941036f0992b68346714e6a5ee6cba94a047 [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +00002import socket
Antoine Pitrou69ab9512010-09-29 15:03:40 +00003import datetime
4import textwrap
5import unittest
Antoine Pitroude609182010-11-18 17:29:23 +00006import functools
Antoine Pitrou69ab9512010-09-29 15:03:40 +00007import contextlib
Dong-hee Naaa92a7c2020-05-16 19:31:54 +09008import nntplib
Martin Panter8f19e8e2016-01-19 01:10:58 +00009import os.path
Gregory P. Smith2cc02232019-05-06 17:54:06 -040010import re
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020011import threading
12
Antoine Pitrou69ab9512010-09-29 15:03:40 +000013from test import support
Serhiy Storchaka16994912020-04-25 10:06:29 +030014from test.support import socket_helper
Serhiy Storchaka43767632013-11-03 21:31:38 +020015from nntplib import NNTP, GroupInfo
Serhiy Storchaka52027c32015-03-21 09:40:26 +020016from unittest.mock import patch
Serhiy Storchaka43767632013-11-03 21:31:38 +020017try:
Antoine Pitrou1cb121e2010-11-09 18:54:37 +000018 import ssl
Serhiy Storchaka43767632013-11-03 21:31:38 +020019except ImportError:
20 ssl = None
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020021
Antoine Pitrou69ab9512010-09-29 15:03:40 +000022
Martin Panter8f19e8e2016-01-19 01:10:58 +000023certfile = os.path.join(os.path.dirname(__file__), 'keycert3.pem')
Antoine Pitrou69ab9512010-09-29 15:03:40 +000024
Gregory P. Smith2cc02232019-05-06 17:54:06 -040025if ssl is not None:
26 SSLError = ssl.SSLError
27else:
28 class SSLError(Exception):
29 """Non-existent exception class when we lack SSL support."""
30 reason = "This will never be raised."
31
Antoine Pitrou69ab9512010-09-29 15:03:40 +000032# TODO:
33# - test the `file` arg to more commands
34# - test error conditions
Antoine Pitroua5785b12010-09-29 16:19:50 +000035# - test auth and `usenetrc`
Antoine Pitrou69ab9512010-09-29 15:03:40 +000036
37
38class NetworkedNNTPTestsMixin:
39
40 def test_welcome(self):
41 welcome = self.server.getwelcome()
42 self.assertEqual(str, type(welcome))
43
44 def test_help(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000045 resp, lines = self.server.help()
Antoine Pitrou69ab9512010-09-29 15:03:40 +000046 self.assertTrue(resp.startswith("100 "), resp)
Antoine Pitrou08eeada2010-11-04 21:36:15 +000047 for line in lines:
Antoine Pitrou69ab9512010-09-29 15:03:40 +000048 self.assertEqual(str, type(line))
49
50 def test_list(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000051 resp, groups = self.server.list()
52 if len(groups) > 0:
53 self.assertEqual(GroupInfo, type(groups[0]))
54 self.assertEqual(str, type(groups[0].group))
55
56 def test_list_active(self):
57 resp, groups = self.server.list(self.GROUP_PAT)
58 if len(groups) > 0:
59 self.assertEqual(GroupInfo, type(groups[0]))
60 self.assertEqual(str, type(groups[0].group))
Antoine Pitrou69ab9512010-09-29 15:03:40 +000061
62 def test_unknown_command(self):
63 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
64 self.server._shortcmd("XYZZY")
65 resp = cm.exception.response
66 self.assertTrue(resp.startswith("500 "), resp)
67
68 def test_newgroups(self):
69 # gmane gets a constant influx of new groups. In order not to stress
70 # the server too much, we choose a recent date in the past.
71 dt = datetime.date.today() - datetime.timedelta(days=7)
72 resp, groups = self.server.newgroups(dt)
73 if len(groups) > 0:
74 self.assertIsInstance(groups[0], GroupInfo)
75 self.assertIsInstance(groups[0].group, str)
76
77 def test_description(self):
78 def _check_desc(desc):
79 # Sanity checks
80 self.assertIsInstance(desc, str)
81 self.assertNotIn(self.GROUP_NAME, desc)
82 desc = self.server.description(self.GROUP_NAME)
83 _check_desc(desc)
84 # Another sanity check
Dong-hee Naec316532021-01-01 23:20:33 +090085 self.assertIn(self.DESC, desc)
Antoine Pitrou69ab9512010-09-29 15:03:40 +000086 # With a pattern
87 desc = self.server.description(self.GROUP_PAT)
88 _check_desc(desc)
89 # Shouldn't exist
90 desc = self.server.description("zk.brrtt.baz")
91 self.assertEqual(desc, '')
92
93 def test_descriptions(self):
94 resp, descs = self.server.descriptions(self.GROUP_PAT)
95 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
96 self.assertTrue(
97 resp.startswith("215 ") or resp.startswith("282 "), resp)
98 self.assertIsInstance(descs, dict)
99 desc = descs[self.GROUP_NAME]
100 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
101
102 def test_group(self):
103 result = self.server.group(self.GROUP_NAME)
104 self.assertEqual(5, len(result))
105 resp, count, first, last, group = result
106 self.assertEqual(group, self.GROUP_NAME)
107 self.assertIsInstance(count, int)
108 self.assertIsInstance(first, int)
109 self.assertIsInstance(last, int)
110 self.assertLessEqual(first, last)
111 self.assertTrue(resp.startswith("211 "), resp)
112
113 def test_date(self):
114 resp, date = self.server.date()
115 self.assertIsInstance(date, datetime.datetime)
116 # Sanity check
117 self.assertGreaterEqual(date.year, 1995)
118 self.assertLessEqual(date.year, 2030)
119
120 def _check_art_dict(self, art_dict):
121 # Some sanity checks for a field dictionary returned by OVER / XOVER
122 self.assertIsInstance(art_dict, dict)
123 # NNTP has 7 mandatory fields
124 self.assertGreaterEqual(art_dict.keys(),
125 {"subject", "from", "date", "message-id",
126 "references", ":bytes", ":lines"}
127 )
128 for v in art_dict.values():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000129 self.assertIsInstance(v, (str, type(None)))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000130
131 def test_xover(self):
132 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000133 resp, lines = self.server.xover(last - 5, last)
134 if len(lines) == 0:
135 self.skipTest("no articles retrieved")
136 # The 'last' article is not necessarily part of the output (cancelled?)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000137 art_num, art_dict = lines[0]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000138 self.assertGreaterEqual(art_num, last - 5)
139 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000140 self._check_art_dict(art_dict)
141
Xavier de Gayeac13bee2016-12-16 20:49:10 +0100142 @unittest.skipIf(True, 'temporarily skipped until a permanent solution'
143 ' is found for issue #28971')
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000144 def test_over(self):
145 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
146 start = last - 10
147 # The "start-" article range form
148 resp, lines = self.server.over((start, None))
149 art_num, art_dict = lines[0]
150 self._check_art_dict(art_dict)
151 # The "start-end" article range form
152 resp, lines = self.server.over((start, last))
153 art_num, art_dict = lines[-1]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000154 # The 'last' article is not necessarily part of the output (cancelled?)
155 self.assertGreaterEqual(art_num, start)
156 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000157 self._check_art_dict(art_dict)
158 # XXX The "message_id" form is unsupported by gmane
159 # 503 Overview by message-ID unsupported
160
161 def test_xhdr(self):
162 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
163 resp, lines = self.server.xhdr('subject', last)
164 for line in lines:
165 self.assertEqual(str, type(line[1]))
166
167 def check_article_resp(self, resp, article, art_num=None):
168 self.assertIsInstance(article, nntplib.ArticleInfo)
169 if art_num is not None:
170 self.assertEqual(article.number, art_num)
171 for line in article.lines:
172 self.assertIsInstance(line, bytes)
173 # XXX this could exceptionally happen...
174 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
175
Victor Stinner706cb312017-11-25 02:42:18 +0100176 @unittest.skipIf(True, "FIXME: see bpo-32128")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000177 def test_article_head_body(self):
178 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000179 # Try to find an available article
180 for art_num in (last, first, last - 1):
181 try:
182 resp, head = self.server.head(art_num)
183 except nntplib.NNTPTemporaryError as e:
184 if not e.response.startswith("423 "):
185 raise
186 # "423 No such article" => choose another one
187 continue
188 break
189 else:
190 self.skipTest("could not find a suitable article number")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000191 self.assertTrue(resp.startswith("221 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000192 self.check_article_resp(resp, head, art_num)
193 resp, body = self.server.body(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000194 self.assertTrue(resp.startswith("222 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000195 self.check_article_resp(resp, body, art_num)
196 resp, article = self.server.article(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000197 self.assertTrue(resp.startswith("220 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000198 self.check_article_resp(resp, article, art_num)
Nick Coghlan14d99a12012-06-17 21:27:18 +1000199 # Tolerate running the tests from behind a NNTP virus checker
Victor Stinnerfabd7bb2020-08-11 15:26:59 +0200200 denylist = lambda line: line.startswith(b'X-Antivirus')
Antoine Pitrou1f5d2a02012-06-24 16:28:18 +0200201 filtered_head_lines = [line for line in head.lines
Victor Stinnerfabd7bb2020-08-11 15:26:59 +0200202 if not denylist(line)]
Nick Coghlan14d99a12012-06-17 21:27:18 +1000203 filtered_lines = [line for line in article.lines
Victor Stinnerfabd7bb2020-08-11 15:26:59 +0200204 if not denylist(line)]
Antoine Pitrou1f5d2a02012-06-24 16:28:18 +0200205 self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000206
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000207 def test_capabilities(self):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000208 # The server under test implements NNTP version 2 and has a
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000209 # couple of well-known capabilities. Just sanity check that we
210 # got them.
211 def _check_caps(caps):
212 caps_list = caps['LIST']
213 self.assertIsInstance(caps_list, (list, tuple))
214 self.assertIn('OVERVIEW.FMT', caps_list)
215 self.assertGreaterEqual(self.server.nntp_version, 2)
216 _check_caps(self.server.getcapabilities())
217 # This re-emits the command
218 resp, caps = self.server.capabilities()
219 _check_caps(caps)
220
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000221 def test_zlogin(self):
222 # This test must be the penultimate because further commands will be
223 # refused.
224 baduser = "notarealuser"
225 badpw = "notarealpassword"
226 # Check that bogus credentials cause failure
227 self.assertRaises(nntplib.NNTPError, self.server.login,
228 user=baduser, password=badpw, usenetrc=False)
229 # FIXME: We should check that correct credentials succeed, but that
230 # would require valid details for some server somewhere to be in the
231 # test suite, I think. Gmane is anonymous, at least as used for the
232 # other tests.
233
234 def test_zzquit(self):
235 # This test must be called last, hence the name
236 cls = type(self)
Antoine Pitrou3bce11c2010-11-21 17:14:19 +0000237 try:
238 self.server.quit()
239 finally:
240 cls.server = None
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000241
Antoine Pitroude609182010-11-18 17:29:23 +0000242 @classmethod
243 def wrap_methods(cls):
244 # Wrap all methods in a transient_internet() exception catcher
245 # XXX put a generic version in test.support?
246 def wrap_meth(meth):
247 @functools.wraps(meth)
248 def wrapped(self):
Serhiy Storchakabfb1cf42020-04-29 10:36:20 +0300249 with socket_helper.transient_internet(self.NNTP_HOST):
Antoine Pitroude609182010-11-18 17:29:23 +0000250 meth(self)
251 return wrapped
252 for name in dir(cls):
253 if not name.startswith('test_'):
254 continue
255 meth = getattr(cls, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200256 if not callable(meth):
Antoine Pitroude609182010-11-18 17:29:23 +0000257 continue
258 # Need to use a closure so that meth remains bound to its current
259 # value
260 setattr(cls, name, wrap_meth(meth))
261
Dong-hee Na1b335ae2020-01-12 02:39:15 +0900262 def test_timeout(self):
263 with self.assertRaises(ValueError):
264 self.NNTP_CLASS(self.NNTP_HOST, timeout=0, usenetrc=False)
265
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000266 def test_with_statement(self):
267 def is_connected():
268 if not hasattr(server, 'file'):
269 return False
270 try:
271 server.help()
Andrew Svetlov0832af62012-12-18 23:10:48 +0200272 except (OSError, EOFError):
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000273 return False
274 return True
275
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400276 try:
Victor Stinner1d0f9b32019-12-10 22:09:23 +0100277 server = self.NNTP_CLASS(self.NNTP_HOST,
278 timeout=support.INTERNET_TIMEOUT,
279 usenetrc=False)
280 with server:
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400281 self.assertTrue(is_connected())
282 self.assertTrue(server.help())
283 self.assertFalse(is_connected())
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000284
Victor Stinner1d0f9b32019-12-10 22:09:23 +0100285 server = self.NNTP_CLASS(self.NNTP_HOST,
286 timeout=support.INTERNET_TIMEOUT,
287 usenetrc=False)
288 with server:
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400289 server.quit()
290 self.assertFalse(is_connected())
291 except SSLError as ssl_err:
292 # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
293 if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
294 raise unittest.SkipTest(f"Got {ssl_err} connecting "
295 f"to {self.NNTP_HOST!r}")
296 raise
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000297
298
Antoine Pitroude609182010-11-18 17:29:23 +0000299NetworkedNNTPTestsMixin.wrap_methods()
300
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000301
INADA Naoki067931d2017-07-26 23:43:22 +0900302EOF_ERRORS = (EOFError,)
Victor Stinner5b4feb72017-07-24 17:41:02 +0200303if ssl is not None:
INADA Naoki067931d2017-07-26 23:43:22 +0900304 EOF_ERRORS += (ssl.SSLEOFError,)
Victor Stinner5b4feb72017-07-24 17:41:02 +0200305
306
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000307class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
308 # This server supports STARTTLS (gmane doesn't)
309 NNTP_HOST = 'news.trigofacile.com'
310 GROUP_NAME = 'fr.comp.lang.python'
311 GROUP_PAT = 'fr.comp.lang.*'
Dong-hee Naec316532021-01-01 23:20:33 +0900312 DESC = 'Python'
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000313
Antoine Pitroude609182010-11-18 17:29:23 +0000314 NNTP_CLASS = NNTP
315
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000316 @classmethod
317 def setUpClass(cls):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000318 support.requires("network")
Serhiy Storchakabfb1cf42020-04-29 10:36:20 +0300319 with socket_helper.transient_internet(cls.NNTP_HOST):
Victor Stinner5bccca52017-04-27 17:30:13 +0200320 try:
Victor Stinner1d0f9b32019-12-10 22:09:23 +0100321 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST,
322 timeout=support.INTERNET_TIMEOUT,
Victor Stinner5bccca52017-04-27 17:30:13 +0200323 usenetrc=False)
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400324 except SSLError as ssl_err:
325 # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
326 if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
327 raise unittest.SkipTest(f"{cls} got {ssl_err} connecting "
328 f"to {cls.NNTP_HOST!r}")
329 raise
Victor Stinner5b4feb72017-07-24 17:41:02 +0200330 except EOF_ERRORS:
Victor Stinner5bccca52017-04-27 17:30:13 +0200331 raise unittest.SkipTest(f"{cls} got EOF error on connecting "
332 f"to {cls.NNTP_HOST!r}")
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000333
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000334 @classmethod
335 def tearDownClass(cls):
336 if cls.server is not None:
337 cls.server.quit()
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000338
Serhiy Storchaka43767632013-11-03 21:31:38 +0200339@unittest.skipUnless(ssl, 'requires SSL support')
340class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000341
Serhiy Storchaka43767632013-11-03 21:31:38 +0200342 # Technical limits for this public NNTP server (see http://www.aioe.org):
343 # "Only two concurrent connections per IP address are allowed and
344 # 400 connections per day are accepted from each IP address."
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000345
Serhiy Storchaka43767632013-11-03 21:31:38 +0200346 NNTP_HOST = 'nntp.aioe.org'
Dong-hee Naec316532021-01-01 23:20:33 +0900347 # bpo-42794: aioe.test is one of the official groups on this server
348 # used for testing: https://news.aioe.org/manual/aioe-hierarchy/
349 GROUP_NAME = 'aioe.test'
350 GROUP_PAT = 'aioe.*'
351 DESC = 'test'
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000352
Serhiy Storchaka43767632013-11-03 21:31:38 +0200353 NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None)
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000354
Serhiy Storchaka43767632013-11-03 21:31:38 +0200355 # Disabled as it produces too much data
356 test_list = None
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000357
Serhiy Storchaka43767632013-11-03 21:31:38 +0200358 # Disabled as the connection will already be encrypted.
359 test_starttls = None
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000360
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000361
362#
363# Non-networked tests using a local server (or something mocking it).
364#
365
366class _NNTPServerIO(io.RawIOBase):
367 """A raw IO object allowing NNTP commands to be received and processed
368 by a handler. The handler can push responses which can then be read
369 from the IO object."""
370
371 def __init__(self, handler):
372 io.RawIOBase.__init__(self)
373 # The channel from the client
374 self.c2s = io.BytesIO()
375 # The channel to the client
376 self.s2c = io.BytesIO()
377 self.handler = handler
378 self.handler.start(self.c2s.readline, self.push_data)
379
380 def readable(self):
381 return True
382
383 def writable(self):
384 return True
385
386 def push_data(self, data):
387 """Push (buffer) some data to send to the client."""
388 pos = self.s2c.tell()
389 self.s2c.seek(0, 2)
390 self.s2c.write(data)
391 self.s2c.seek(pos)
392
393 def write(self, b):
394 """The client sends us some data"""
395 pos = self.c2s.tell()
396 self.c2s.write(b)
397 self.c2s.seek(pos)
398 self.handler.process_pending()
399 return len(b)
400
401 def readinto(self, buf):
402 """The client wants to read a response"""
403 self.handler.process_pending()
404 b = self.s2c.read(len(buf))
405 n = len(b)
406 buf[:n] = b
407 return n
408
409
Serhiy Storchaka52027c32015-03-21 09:40:26 +0200410def make_mock_file(handler):
411 sio = _NNTPServerIO(handler)
412 # Using BufferedRWPair instead of BufferedRandom ensures the file
413 # isn't seekable.
414 file = io.BufferedRWPair(sio, sio)
415 return (sio, file)
416
417
Dong-hee Naaa92a7c2020-05-16 19:31:54 +0900418class NNTPServer(nntplib.NNTP):
419
420 def __init__(self, f, host, readermode=None):
421 self.file = f
422 self.host = host
423 self._base_init(readermode)
424
425 def _close(self):
426 self.file.close()
427 del self.file
428
429
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000430class MockedNNTPTestsMixin:
431 # Override in derived classes
432 handler_class = None
433
434 def setUp(self):
435 super().setUp()
436 self.make_server()
437
438 def tearDown(self):
439 super().tearDown()
440 del self.server
441
442 def make_server(self, *args, **kwargs):
443 self.handler = self.handler_class()
Serhiy Storchaka52027c32015-03-21 09:40:26 +0200444 self.sio, file = make_mock_file(self.handler)
Dong-hee Naaa92a7c2020-05-16 19:31:54 +0900445 self.server = NNTPServer(file, 'test.server', *args, **kwargs)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000446 return self.server
447
448
Antoine Pitrou71135622012-02-14 23:29:34 +0100449class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin):
450 def setUp(self):
451 super().setUp()
452 self.make_server(readermode=True)
453
454
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000455class NNTPv1Handler:
456 """A handler for RFC 977"""
457
458 welcome = "200 NNTP mock server"
459
460 def start(self, readline, push_data):
461 self.in_body = False
462 self.allow_posting = True
463 self._readline = readline
464 self._push_data = push_data
Antoine Pitrou54411c12012-02-12 19:14:17 +0100465 self._logged_in = False
466 self._user_sent = False
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000467 # Our welcome
468 self.handle_welcome()
469
470 def _decode(self, data):
471 return str(data, "utf-8", "surrogateescape")
472
473 def process_pending(self):
474 if self.in_body:
475 while True:
476 line = self._readline()
477 if not line:
478 return
479 self.body.append(line)
480 if line == b".\r\n":
481 break
482 try:
483 meth, tokens = self.body_callback
484 meth(*tokens, body=self.body)
485 finally:
486 self.body_callback = None
487 self.body = None
488 self.in_body = False
489 while True:
490 line = self._decode(self._readline())
491 if not line:
492 return
493 if not line.endswith("\r\n"):
494 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
495 line = line[:-2]
496 cmd, *tokens = line.split()
497 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
498 meth = getattr(self, "handle_" + cmd.upper(), None)
499 if meth is None:
500 self.handle_unknown()
501 else:
502 try:
503 meth(*tokens)
504 except Exception as e:
505 raise ValueError("command failed: {!r}".format(line)) from e
506 else:
507 if self.in_body:
508 self.body_callback = meth, tokens
509 self.body = []
510
511 def expect_body(self):
512 """Flag that the client is expected to post a request body"""
513 self.in_body = True
514
515 def push_data(self, data):
516 """Push some binary data"""
517 self._push_data(data)
518
519 def push_lit(self, lit):
520 """Push a string literal"""
521 lit = textwrap.dedent(lit)
522 lit = "\r\n".join(lit.splitlines()) + "\r\n"
523 lit = lit.encode('utf-8')
524 self.push_data(lit)
525
526 def handle_unknown(self):
527 self.push_lit("500 What?")
528
529 def handle_welcome(self):
530 self.push_lit(self.welcome)
531
532 def handle_QUIT(self):
533 self.push_lit("205 Bye!")
534
535 def handle_DATE(self):
536 self.push_lit("111 20100914001155")
537
538 def handle_GROUP(self, group):
539 if group == "fr.comp.lang.python":
540 self.push_lit("211 486 761 1265 fr.comp.lang.python")
541 else:
542 self.push_lit("411 No such group {}".format(group))
543
544 def handle_HELP(self):
545 self.push_lit("""\
546 100 Legal commands
547 authinfo user Name|pass Password|generic <prog> <args>
548 date
549 help
550 Report problems to <root@example.org>
551 .""")
552
553 def handle_STAT(self, message_spec=None):
554 if message_spec is None:
555 self.push_lit("412 No newsgroup selected")
556 elif message_spec == "3000234":
557 self.push_lit("223 3000234 <45223423@example.com>")
558 elif message_spec == "<45223423@example.com>":
559 self.push_lit("223 0 <45223423@example.com>")
560 else:
561 self.push_lit("430 No Such Article Found")
562
563 def handle_NEXT(self):
564 self.push_lit("223 3000237 <668929@example.org> retrieved")
565
566 def handle_LAST(self):
567 self.push_lit("223 3000234 <45223423@example.com> retrieved")
568
569 def handle_LIST(self, action=None, param=None):
570 if action is None:
571 self.push_lit("""\
572 215 Newsgroups in form "group high low flags".
573 comp.lang.python 0000052340 0000002828 y
574 comp.lang.python.announce 0000001153 0000000993 m
575 free.it.comp.lang.python 0000000002 0000000002 y
576 fr.comp.lang.python 0000001254 0000000760 y
577 free.it.comp.lang.python.learner 0000000000 0000000001 y
578 tw.bbs.comp.lang.python 0000000304 0000000304 y
579 .""")
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000580 elif action == "ACTIVE":
581 if param == "*distutils*":
582 self.push_lit("""\
583 215 Newsgroups in form "group high low flags"
584 gmane.comp.python.distutils.devel 0000014104 0000000001 m
585 gmane.comp.python.distutils.cvs 0000000000 0000000001 m
586 .""")
587 else:
588 self.push_lit("""\
589 215 Newsgroups in form "group high low flags"
590 .""")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000591 elif action == "OVERVIEW.FMT":
592 self.push_lit("""\
593 215 Order of fields in overview database.
594 Subject:
595 From:
596 Date:
597 Message-ID:
598 References:
599 Bytes:
600 Lines:
601 Xref:full
602 .""")
603 elif action == "NEWSGROUPS":
604 assert param is not None
605 if param == "comp.lang.python":
606 self.push_lit("""\
607 215 Descriptions in form "group description".
608 comp.lang.python\tThe Python computer language.
609 .""")
610 elif param == "comp.lang.python*":
611 self.push_lit("""\
612 215 Descriptions in form "group description".
613 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
614 comp.lang.python\tThe Python computer language.
615 .""")
616 else:
617 self.push_lit("""\
618 215 Descriptions in form "group description".
619 .""")
620 else:
621 self.push_lit('501 Unknown LIST keyword')
622
623 def handle_NEWNEWS(self, group, date_str, time_str):
624 # We hard code different return messages depending on passed
625 # argument and date syntax.
626 if (group == "comp.lang.python" and date_str == "20100913"
627 and time_str == "082004"):
628 # Date was passed in RFC 3977 format (NNTP "v2")
629 self.push_lit("""\
630 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
631 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
632 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
633 .""")
634 elif (group == "comp.lang.python" and date_str == "100913"
635 and time_str == "082004"):
636 # Date was passed in RFC 977 format (NNTP "v1")
637 self.push_lit("""\
638 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
639 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
640 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
641 .""")
Georg Brandl28e78412013-10-27 07:29:47 +0100642 elif (group == 'comp.lang.python' and
643 date_str in ('20100101', '100101') and
644 time_str == '090000'):
645 self.push_lit('too long line' * 3000 +
646 '\n.')
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000647 else:
648 self.push_lit("""\
649 230 An empty list of newsarticles follows
650 .""")
651 # (Note for experiments: many servers disable NEWNEWS.
652 # As of this writing, sicinfo3.epfl.ch doesn't.)
653
654 def handle_XOVER(self, message_spec):
655 if message_spec == "57-59":
656 self.push_lit(
657 "224 Overview information for 57-58 follows\n"
658 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
659 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
660 "\tSat, 19 Jun 2010 18:04:08 -0400"
661 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
662 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
Dong-hee Na2e6a8ef2020-01-09 00:29:34 +0900663 "\tXref: news.gmane.io gmane.comp.python.authors:57"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000664 "\n"
665 "58\tLooking for a few good bloggers"
666 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
667 "\tThu, 22 Jul 2010 09:14:14 -0400"
668 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
669 "\t\t6683\t16"
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000670 "\t"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000671 "\n"
Martin Panter6245cb32016-04-15 02:14:19 +0000672 # A UTF-8 overview line from fr.comp.lang.python
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000673 "59\tRe: Message d'erreur incompréhensible (par moi)"
674 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
675 "\tWed, 15 Sep 2010 18:09:15 +0200"
676 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
677 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
678 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
679 "\n"
680 ".\n")
681 else:
682 self.push_lit("""\
683 224 No articles
684 .""")
685
686 def handle_POST(self, *, body=None):
687 if body is None:
688 if self.allow_posting:
689 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
690 self.expect_body()
691 else:
692 self.push_lit("440 Posting not permitted")
693 else:
694 assert self.allow_posting
695 self.push_lit("240 Article received OK")
696 self.posted_body = body
697
698 def handle_IHAVE(self, message_id, *, body=None):
699 if body is None:
700 if (self.allow_posting and
701 message_id == "<i.am.an.article.you.will.want@example.com>"):
702 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
703 self.expect_body()
704 else:
705 self.push_lit("435 Article not wanted")
706 else:
707 assert self.allow_posting
708 self.push_lit("235 Article transferred OK")
709 self.posted_body = body
710
711 sample_head = """\
712 From: "Demo User" <nobody@example.net>
713 Subject: I am just a test article
714 Content-Type: text/plain; charset=UTF-8; format=flowed
715 Message-ID: <i.am.an.article.you.will.want@example.com>"""
716
717 sample_body = """\
718 This is just a test article.
719 ..Here is a dot-starting line.
720
721 -- Signed by Andr\xe9."""
722
723 sample_article = sample_head + "\n\n" + sample_body
724
725 def handle_ARTICLE(self, message_spec=None):
726 if message_spec is None:
727 self.push_lit("220 3000237 <45223423@example.com>")
728 elif message_spec == "<45223423@example.com>":
729 self.push_lit("220 0 <45223423@example.com>")
730 elif message_spec == "3000234":
731 self.push_lit("220 3000234 <45223423@example.com>")
732 else:
733 self.push_lit("430 No Such Article Found")
734 return
735 self.push_lit(self.sample_article)
736 self.push_lit(".")
737
738 def handle_HEAD(self, message_spec=None):
739 if message_spec is None:
740 self.push_lit("221 3000237 <45223423@example.com>")
741 elif message_spec == "<45223423@example.com>":
742 self.push_lit("221 0 <45223423@example.com>")
743 elif message_spec == "3000234":
744 self.push_lit("221 3000234 <45223423@example.com>")
745 else:
746 self.push_lit("430 No Such Article Found")
747 return
748 self.push_lit(self.sample_head)
749 self.push_lit(".")
750
751 def handle_BODY(self, message_spec=None):
752 if message_spec is None:
753 self.push_lit("222 3000237 <45223423@example.com>")
754 elif message_spec == "<45223423@example.com>":
755 self.push_lit("222 0 <45223423@example.com>")
756 elif message_spec == "3000234":
757 self.push_lit("222 3000234 <45223423@example.com>")
758 else:
759 self.push_lit("430 No Such Article Found")
760 return
761 self.push_lit(self.sample_body)
762 self.push_lit(".")
763
Antoine Pitrou54411c12012-02-12 19:14:17 +0100764 def handle_AUTHINFO(self, cred_type, data):
765 if self._logged_in:
766 self.push_lit('502 Already Logged In')
767 elif cred_type == 'user':
768 if self._user_sent:
769 self.push_lit('482 User Credential Already Sent')
770 else:
771 self.push_lit('381 Password Required')
772 self._user_sent = True
773 elif cred_type == 'pass':
774 self.push_lit('281 Login Successful')
775 self._logged_in = True
776 else:
777 raise Exception('Unknown cred type {}'.format(cred_type))
778
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000779
780class NNTPv2Handler(NNTPv1Handler):
781 """A handler for RFC 3977 (NNTP "v2")"""
782
783 def handle_CAPABILITIES(self):
Antoine Pitrou54411c12012-02-12 19:14:17 +0100784 fmt = """\
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000785 101 Capability list:
Antoine Pitrouf80b3f72010-11-02 22:31:52 +0000786 VERSION 2 3
Antoine Pitrou54411c12012-02-12 19:14:17 +0100787 IMPLEMENTATION INN 2.5.1{}
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000788 HDR
789 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
790 OVER
791 POST
792 READER
Antoine Pitrou54411c12012-02-12 19:14:17 +0100793 ."""
794
795 if not self._logged_in:
796 self.push_lit(fmt.format('\n AUTHINFO USER'))
797 else:
798 self.push_lit(fmt.format(''))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000799
Antoine Pitrou71135622012-02-14 23:29:34 +0100800 def handle_MODE(self, _):
801 raise Exception('MODE READER sent despite READER has been advertised')
802
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000803 def handle_OVER(self, message_spec=None):
804 return self.handle_XOVER(message_spec)
805
806
Antoine Pitrou54411c12012-02-12 19:14:17 +0100807class CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
808 """A handler that allows CAPABILITIES only after login"""
809
810 def handle_CAPABILITIES(self):
811 if not self._logged_in:
812 self.push_lit('480 You must log in.')
813 else:
814 super().handle_CAPABILITIES()
815
816
Antoine Pitrou71135622012-02-14 23:29:34 +0100817class ModeSwitchingNNTPv2Handler(NNTPv2Handler):
818 """A server that starts in transit mode"""
819
820 def __init__(self):
821 self._switched = False
822
823 def handle_CAPABILITIES(self):
824 fmt = """\
825 101 Capability list:
826 VERSION 2 3
827 IMPLEMENTATION INN 2.5.1
828 HDR
829 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
830 OVER
831 POST
832 {}READER
833 ."""
834 if self._switched:
835 self.push_lit(fmt.format(''))
836 else:
837 self.push_lit(fmt.format('MODE-'))
838
839 def handle_MODE(self, what):
840 assert not self._switched and what == 'reader'
841 self._switched = True
842 self.push_lit('200 Posting allowed')
843
844
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000845class NNTPv1v2TestsMixin:
846
847 def setUp(self):
848 super().setUp()
849
850 def test_welcome(self):
851 self.assertEqual(self.server.welcome, self.handler.welcome)
852
Antoine Pitrou54411c12012-02-12 19:14:17 +0100853 def test_authinfo(self):
854 if self.nntp_version == 2:
855 self.assertIn('AUTHINFO', self.server._caps)
856 self.server.login('testuser', 'testpw')
857 # if AUTHINFO is gone from _caps we also know that getcapabilities()
858 # has been called after login as it should
859 self.assertNotIn('AUTHINFO', self.server._caps)
860
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000861 def test_date(self):
862 resp, date = self.server.date()
863 self.assertEqual(resp, "111 20100914001155")
864 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
865
866 def test_quit(self):
867 self.assertFalse(self.sio.closed)
868 resp = self.server.quit()
869 self.assertEqual(resp, "205 Bye!")
870 self.assertTrue(self.sio.closed)
871
872 def test_help(self):
873 resp, help = self.server.help()
874 self.assertEqual(resp, "100 Legal commands")
875 self.assertEqual(help, [
876 ' authinfo user Name|pass Password|generic <prog> <args>',
877 ' date',
878 ' help',
879 'Report problems to <root@example.org>',
880 ])
881
882 def test_list(self):
883 resp, groups = self.server.list()
884 self.assertEqual(len(groups), 6)
885 g = groups[1]
886 self.assertEqual(g,
887 GroupInfo("comp.lang.python.announce", "0000001153",
888 "0000000993", "m"))
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000889 resp, groups = self.server.list("*distutils*")
890 self.assertEqual(len(groups), 2)
891 g = groups[0]
892 self.assertEqual(g,
893 GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
894 "0000000001", "m"))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000895
896 def test_stat(self):
897 resp, art_num, message_id = self.server.stat(3000234)
898 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
899 self.assertEqual(art_num, 3000234)
900 self.assertEqual(message_id, "<45223423@example.com>")
901 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
902 self.assertEqual(resp, "223 0 <45223423@example.com>")
903 self.assertEqual(art_num, 0)
904 self.assertEqual(message_id, "<45223423@example.com>")
905 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
906 self.server.stat("<non.existent.id>")
907 self.assertEqual(cm.exception.response, "430 No Such Article Found")
908 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
909 self.server.stat()
910 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
911
912 def test_next(self):
913 resp, art_num, message_id = self.server.next()
914 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
915 self.assertEqual(art_num, 3000237)
916 self.assertEqual(message_id, "<668929@example.org>")
917
918 def test_last(self):
919 resp, art_num, message_id = self.server.last()
920 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
921 self.assertEqual(art_num, 3000234)
922 self.assertEqual(message_id, "<45223423@example.com>")
923
924 def test_description(self):
925 desc = self.server.description("comp.lang.python")
926 self.assertEqual(desc, "The Python computer language.")
927 desc = self.server.description("comp.lang.pythonx")
928 self.assertEqual(desc, "")
929
930 def test_descriptions(self):
931 resp, groups = self.server.descriptions("comp.lang.python")
932 self.assertEqual(resp, '215 Descriptions in form "group description".')
933 self.assertEqual(groups, {
934 "comp.lang.python": "The Python computer language.",
935 })
936 resp, groups = self.server.descriptions("comp.lang.python*")
937 self.assertEqual(groups, {
938 "comp.lang.python": "The Python computer language.",
939 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
940 })
941 resp, groups = self.server.descriptions("comp.lang.pythonx")
942 self.assertEqual(groups, {})
943
944 def test_group(self):
945 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
946 self.assertTrue(resp.startswith("211 "), resp)
947 self.assertEqual(first, 761)
948 self.assertEqual(last, 1265)
949 self.assertEqual(count, 486)
950 self.assertEqual(group, "fr.comp.lang.python")
951 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
952 self.server.group("comp.lang.python.devel")
953 exc = cm.exception
954 self.assertTrue(exc.response.startswith("411 No such group"),
955 exc.response)
956
957 def test_newnews(self):
958 # NEWNEWS comp.lang.python [20]100913 082004
959 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
960 resp, ids = self.server.newnews("comp.lang.python", dt)
961 expected = (
962 "230 list of newsarticles (NNTP v{0}) "
963 "created after Mon Sep 13 08:20:04 2010 follows"
964 ).format(self.nntp_version)
965 self.assertEqual(resp, expected)
966 self.assertEqual(ids, [
967 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
968 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
969 ])
970 # NEWNEWS fr.comp.lang.python [20]100913 082004
971 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
972 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
973 self.assertEqual(resp, "230 An empty list of newsarticles follows")
974 self.assertEqual(ids, [])
975
976 def _check_article_body(self, lines):
977 self.assertEqual(len(lines), 4)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +0000978 self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000979 self.assertEqual(lines[-2], b"")
980 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
981 self.assertEqual(lines[-4], b"This is just a test article.")
982
983 def _check_article_head(self, lines):
984 self.assertEqual(len(lines), 4)
985 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
986 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
987
988 def _check_article_data(self, lines):
989 self.assertEqual(len(lines), 9)
990 self._check_article_head(lines[:4])
991 self._check_article_body(lines[-4:])
992 self.assertEqual(lines[4], b"")
993
994 def test_article(self):
995 # ARTICLE
996 resp, info = self.server.article()
997 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
998 art_num, message_id, lines = info
999 self.assertEqual(art_num, 3000237)
1000 self.assertEqual(message_id, "<45223423@example.com>")
1001 self._check_article_data(lines)
1002 # ARTICLE num
1003 resp, info = self.server.article(3000234)
1004 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
1005 art_num, message_id, lines = info
1006 self.assertEqual(art_num, 3000234)
1007 self.assertEqual(message_id, "<45223423@example.com>")
1008 self._check_article_data(lines)
1009 # ARTICLE id
1010 resp, info = self.server.article("<45223423@example.com>")
1011 self.assertEqual(resp, "220 0 <45223423@example.com>")
1012 art_num, message_id, lines = info
1013 self.assertEqual(art_num, 0)
1014 self.assertEqual(message_id, "<45223423@example.com>")
1015 self._check_article_data(lines)
1016 # Non-existent id
1017 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1018 self.server.article("<non-existent@example.com>")
1019 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1020
1021 def test_article_file(self):
1022 # With a "file" argument
1023 f = io.BytesIO()
1024 resp, info = self.server.article(file=f)
1025 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
1026 art_num, message_id, lines = info
1027 self.assertEqual(art_num, 3000237)
1028 self.assertEqual(message_id, "<45223423@example.com>")
1029 self.assertEqual(lines, [])
1030 data = f.getvalue()
1031 self.assertTrue(data.startswith(
1032 b'From: "Demo User" <nobody@example.net>\r\n'
1033 b'Subject: I am just a test article\r\n'
1034 ), ascii(data))
1035 self.assertTrue(data.endswith(
1036 b'This is just a test article.\r\n'
1037 b'.Here is a dot-starting line.\r\n'
1038 b'\r\n'
1039 b'-- Signed by Andr\xc3\xa9.\r\n'
1040 ), ascii(data))
1041
1042 def test_head(self):
1043 # HEAD
1044 resp, info = self.server.head()
1045 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1046 art_num, message_id, lines = info
1047 self.assertEqual(art_num, 3000237)
1048 self.assertEqual(message_id, "<45223423@example.com>")
1049 self._check_article_head(lines)
1050 # HEAD num
1051 resp, info = self.server.head(3000234)
1052 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
1053 art_num, message_id, lines = info
1054 self.assertEqual(art_num, 3000234)
1055 self.assertEqual(message_id, "<45223423@example.com>")
1056 self._check_article_head(lines)
1057 # HEAD id
1058 resp, info = self.server.head("<45223423@example.com>")
1059 self.assertEqual(resp, "221 0 <45223423@example.com>")
1060 art_num, message_id, lines = info
1061 self.assertEqual(art_num, 0)
1062 self.assertEqual(message_id, "<45223423@example.com>")
1063 self._check_article_head(lines)
1064 # Non-existent id
1065 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1066 self.server.head("<non-existent@example.com>")
1067 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1068
Antoine Pitrou2640b522012-02-15 18:53:18 +01001069 def test_head_file(self):
1070 f = io.BytesIO()
1071 resp, info = self.server.head(file=f)
1072 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1073 art_num, message_id, lines = info
1074 self.assertEqual(art_num, 3000237)
1075 self.assertEqual(message_id, "<45223423@example.com>")
1076 self.assertEqual(lines, [])
1077 data = f.getvalue()
1078 self.assertTrue(data.startswith(
1079 b'From: "Demo User" <nobody@example.net>\r\n'
1080 b'Subject: I am just a test article\r\n'
1081 ), ascii(data))
1082 self.assertFalse(data.endswith(
1083 b'This is just a test article.\r\n'
1084 b'.Here is a dot-starting line.\r\n'
1085 b'\r\n'
1086 b'-- Signed by Andr\xc3\xa9.\r\n'
1087 ), ascii(data))
1088
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001089 def test_body(self):
1090 # BODY
1091 resp, info = self.server.body()
1092 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1093 art_num, message_id, lines = info
1094 self.assertEqual(art_num, 3000237)
1095 self.assertEqual(message_id, "<45223423@example.com>")
1096 self._check_article_body(lines)
1097 # BODY num
1098 resp, info = self.server.body(3000234)
1099 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
1100 art_num, message_id, lines = info
1101 self.assertEqual(art_num, 3000234)
1102 self.assertEqual(message_id, "<45223423@example.com>")
1103 self._check_article_body(lines)
1104 # BODY id
1105 resp, info = self.server.body("<45223423@example.com>")
1106 self.assertEqual(resp, "222 0 <45223423@example.com>")
1107 art_num, message_id, lines = info
1108 self.assertEqual(art_num, 0)
1109 self.assertEqual(message_id, "<45223423@example.com>")
1110 self._check_article_body(lines)
1111 # Non-existent id
1112 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1113 self.server.body("<non-existent@example.com>")
1114 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1115
Antoine Pitrou2640b522012-02-15 18:53:18 +01001116 def test_body_file(self):
1117 f = io.BytesIO()
1118 resp, info = self.server.body(file=f)
1119 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1120 art_num, message_id, lines = info
1121 self.assertEqual(art_num, 3000237)
1122 self.assertEqual(message_id, "<45223423@example.com>")
1123 self.assertEqual(lines, [])
1124 data = f.getvalue()
1125 self.assertFalse(data.startswith(
1126 b'From: "Demo User" <nobody@example.net>\r\n'
1127 b'Subject: I am just a test article\r\n'
1128 ), ascii(data))
1129 self.assertTrue(data.endswith(
1130 b'This is just a test article.\r\n'
1131 b'.Here is a dot-starting line.\r\n'
1132 b'\r\n'
1133 b'-- Signed by Andr\xc3\xa9.\r\n'
1134 ), ascii(data))
1135
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001136 def check_over_xover_resp(self, resp, overviews):
1137 self.assertTrue(resp.startswith("224 "), resp)
1138 self.assertEqual(len(overviews), 3)
1139 art_num, over = overviews[0]
1140 self.assertEqual(art_num, 57)
1141 self.assertEqual(over, {
1142 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
1143 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
1144 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
1145 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
1146 "references": "<hvalf7$ort$1@dough.gmane.org>",
1147 ":bytes": "7103",
1148 ":lines": "16",
Dong-hee Na2e6a8ef2020-01-09 00:29:34 +09001149 "xref": "news.gmane.io gmane.comp.python.authors:57"
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001150 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001151 art_num, over = overviews[1]
1152 self.assertEqual(over["xref"], None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001153 art_num, over = overviews[2]
1154 self.assertEqual(over["subject"],
1155 "Re: Message d'erreur incompréhensible (par moi)")
1156
1157 def test_xover(self):
1158 resp, overviews = self.server.xover(57, 59)
1159 self.check_over_xover_resp(resp, overviews)
1160
1161 def test_over(self):
1162 # In NNTP "v1", this will fallback on XOVER
1163 resp, overviews = self.server.over((57, 59))
1164 self.check_over_xover_resp(resp, overviews)
1165
1166 sample_post = (
1167 b'From: "Demo User" <nobody@example.net>\r\n'
1168 b'Subject: I am just a test article\r\n'
1169 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
1170 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
1171 b'\r\n'
1172 b'This is just a test article.\r\n'
1173 b'.Here is a dot-starting line.\r\n'
1174 b'\r\n'
1175 b'-- Signed by Andr\xc3\xa9.\r\n'
1176 )
1177
1178 def _check_posted_body(self):
1179 # Check the raw body as received by the server
1180 lines = self.handler.posted_body
1181 # One additional line for the "." terminator
1182 self.assertEqual(len(lines), 10)
1183 self.assertEqual(lines[-1], b'.\r\n')
1184 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
1185 self.assertEqual(lines[-3], b'\r\n')
1186 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
1187 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
1188
1189 def _check_post_ihave_sub(self, func, *args, file_factory):
1190 # First the prepared post with CRLF endings
1191 post = self.sample_post
1192 func_args = args + (file_factory(post),)
1193 self.handler.posted_body = None
1194 resp = func(*func_args)
1195 self._check_posted_body()
1196 # Then the same post with "normal" line endings - they should be
1197 # converted by NNTP.post and NNTP.ihave.
1198 post = self.sample_post.replace(b"\r\n", b"\n")
1199 func_args = args + (file_factory(post),)
1200 self.handler.posted_body = None
1201 resp = func(*func_args)
1202 self._check_posted_body()
1203 return resp
1204
1205 def check_post_ihave(self, func, success_resp, *args):
1206 # With a bytes object
1207 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1208 self.assertEqual(resp, success_resp)
1209 # With a bytearray object
1210 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1211 self.assertEqual(resp, success_resp)
1212 # With a file object
1213 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1214 self.assertEqual(resp, success_resp)
1215 # With an iterable of terminated lines
1216 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001217 return iter(b.splitlines(keepends=True))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001218 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1219 self.assertEqual(resp, success_resp)
1220 # With an iterable of non-terminated lines
1221 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001222 return iter(b.splitlines(keepends=False))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001223 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1224 self.assertEqual(resp, success_resp)
1225
1226 def test_post(self):
1227 self.check_post_ihave(self.server.post, "240 Article received OK")
1228 self.handler.allow_posting = False
1229 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1230 self.server.post(self.sample_post)
1231 self.assertEqual(cm.exception.response,
1232 "440 Posting not permitted")
1233
1234 def test_ihave(self):
1235 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1236 "<i.am.an.article.you.will.want@example.com>")
1237 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1238 self.server.ihave("<another.message.id>", self.sample_post)
1239 self.assertEqual(cm.exception.response,
1240 "435 Article not wanted")
1241
Georg Brandl28e78412013-10-27 07:29:47 +01001242 def test_too_long_lines(self):
1243 dt = datetime.datetime(2010, 1, 1, 9, 0, 0)
1244 self.assertRaises(nntplib.NNTPDataError,
1245 self.server.newnews, "comp.lang.python", dt)
1246
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001247
1248class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1249 """Tests an NNTP v1 server (no capabilities)."""
1250
1251 nntp_version = 1
1252 handler_class = NNTPv1Handler
1253
1254 def test_caps(self):
1255 caps = self.server.getcapabilities()
1256 self.assertEqual(caps, {})
1257 self.assertEqual(self.server.nntp_version, 1)
Antoine Pitroua0781152010-11-05 19:16:37 +00001258 self.assertEqual(self.server.nntp_implementation, None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001259
1260
1261class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1262 """Tests an NNTP v2 server (with capabilities)."""
1263
1264 nntp_version = 2
1265 handler_class = NNTPv2Handler
1266
1267 def test_caps(self):
1268 caps = self.server.getcapabilities()
1269 self.assertEqual(caps, {
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001270 'VERSION': ['2', '3'],
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001271 'IMPLEMENTATION': ['INN', '2.5.1'],
1272 'AUTHINFO': ['USER'],
1273 'HDR': [],
1274 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1275 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1276 'OVER': [],
1277 'POST': [],
1278 'READER': [],
1279 })
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001280 self.assertEqual(self.server.nntp_version, 3)
Antoine Pitroua0781152010-11-05 19:16:37 +00001281 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001282
1283
Antoine Pitrou54411c12012-02-12 19:14:17 +01001284class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
1285 """Tests a probably NNTP v2 server with capabilities only after login."""
1286
1287 nntp_version = 2
1288 handler_class = CapsAfterLoginNNTPv2Handler
1289
1290 def test_caps_only_after_login(self):
1291 self.assertEqual(self.server._caps, {})
1292 self.server.login('testuser', 'testpw')
1293 self.assertIn('VERSION', self.server._caps)
1294
1295
Antoine Pitrou71135622012-02-14 23:29:34 +01001296class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin,
1297 unittest.TestCase):
1298 """Same tests as for v2 but we tell NTTP to send MODE READER to a server
1299 that isn't in READER mode by default."""
1300
1301 nntp_version = 2
1302 handler_class = ModeSwitchingNNTPv2Handler
1303
1304 def test_we_are_in_reader_mode_after_connect(self):
1305 self.assertIn('READER', self.server._caps)
1306
1307
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001308class MiscTests(unittest.TestCase):
1309
1310 def test_decode_header(self):
1311 def gives(a, b):
1312 self.assertEqual(nntplib.decode_header(a), b)
1313 gives("" , "")
1314 gives("a plain header", "a plain header")
1315 gives(" with extra spaces ", " with extra spaces ")
1316 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
1317 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1318 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1319 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1320 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1321 "Re: problème de matrice")
1322 # A natively utf-8 header (found in the real world!)
1323 gives("Re: Message d'erreur incompréhensible (par moi)",
1324 "Re: Message d'erreur incompréhensible (par moi)")
1325
1326 def test_parse_overview_fmt(self):
1327 # The minimal (default) response
1328 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1329 "References:", ":bytes", ":lines"]
1330 self.assertEqual(nntplib._parse_overview_fmt(lines),
1331 ["subject", "from", "date", "message-id", "references",
1332 ":bytes", ":lines"])
1333 # The minimal response using alternative names
1334 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1335 "References:", "Bytes:", "Lines:"]
1336 self.assertEqual(nntplib._parse_overview_fmt(lines),
1337 ["subject", "from", "date", "message-id", "references",
1338 ":bytes", ":lines"])
1339 # Variations in casing
1340 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1341 "References:", "BYTES:", "Lines:"]
1342 self.assertEqual(nntplib._parse_overview_fmt(lines),
1343 ["subject", "from", "date", "message-id", "references",
1344 ":bytes", ":lines"])
1345 # First example from RFC 3977
1346 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1347 "References:", ":bytes", ":lines", "Xref:full",
1348 "Distribution:full"]
1349 self.assertEqual(nntplib._parse_overview_fmt(lines),
1350 ["subject", "from", "date", "message-id", "references",
1351 ":bytes", ":lines", "xref", "distribution"])
1352 # Second example from RFC 3977
1353 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1354 "References:", "Bytes:", "Lines:", "Xref:FULL",
1355 "Distribution:FULL"]
1356 self.assertEqual(nntplib._parse_overview_fmt(lines),
1357 ["subject", "from", "date", "message-id", "references",
1358 ":bytes", ":lines", "xref", "distribution"])
1359 # A classic response from INN
1360 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1361 "References:", "Bytes:", "Lines:", "Xref:full"]
1362 self.assertEqual(nntplib._parse_overview_fmt(lines),
1363 ["subject", "from", "date", "message-id", "references",
1364 ":bytes", ":lines", "xref"])
1365
1366 def test_parse_overview(self):
1367 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1368 # First example from RFC 3977
1369 lines = [
1370 '3000234\tI am just a test article\t"Demo User" '
1371 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1372 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1373 '17\tXref: news.example.com misc.test:3000363',
1374 ]
1375 overview = nntplib._parse_overview(lines, fmt)
1376 (art_num, fields), = overview
1377 self.assertEqual(art_num, 3000234)
1378 self.assertEqual(fields, {
1379 'subject': 'I am just a test article',
1380 'from': '"Demo User" <nobody@example.com>',
1381 'date': '6 Oct 1998 04:38:40 -0500',
1382 'message-id': '<45223423@example.com>',
1383 'references': '<45454@example.net>',
1384 ':bytes': '1234',
1385 ':lines': '17',
1386 'xref': 'news.example.com misc.test:3000363',
1387 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001388 # Second example; here the "Xref" field is totally absent (including
1389 # the header name) and comes out as None
1390 lines = [
1391 '3000234\tI am just a test article\t"Demo User" '
1392 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1393 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1394 '17\t\t',
1395 ]
1396 overview = nntplib._parse_overview(lines, fmt)
1397 (art_num, fields), = overview
1398 self.assertEqual(fields['xref'], None)
1399 # Third example; the "Xref" is an empty string, while "references"
1400 # is a single space.
1401 lines = [
1402 '3000234\tI am just a test article\t"Demo User" '
1403 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1404 '<45223423@example.com>\t \t1234\t'
1405 '17\tXref: \t',
1406 ]
1407 overview = nntplib._parse_overview(lines, fmt)
1408 (art_num, fields), = overview
1409 self.assertEqual(fields['references'], ' ')
1410 self.assertEqual(fields['xref'], '')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001411
1412 def test_parse_datetime(self):
1413 def gives(a, b, *c):
1414 self.assertEqual(nntplib._parse_datetime(a, b),
1415 datetime.datetime(*c))
1416 # Output of DATE command
1417 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1418 # Variations
1419 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1420 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1421 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1422
1423 def test_unparse_datetime(self):
1424 # Test non-legacy mode
1425 # 1) with a datetime
1426 def gives(y, M, d, h, m, s, date_str, time_str):
1427 dt = datetime.datetime(y, M, d, h, m, s)
1428 self.assertEqual(nntplib._unparse_datetime(dt),
1429 (date_str, time_str))
1430 self.assertEqual(nntplib._unparse_datetime(dt, False),
1431 (date_str, time_str))
1432 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1433 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1434 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1435 # 2) with a date
1436 def gives(y, M, d, date_str, time_str):
1437 dt = datetime.date(y, M, d)
1438 self.assertEqual(nntplib._unparse_datetime(dt),
1439 (date_str, time_str))
1440 self.assertEqual(nntplib._unparse_datetime(dt, False),
1441 (date_str, time_str))
1442 gives(1999, 6, 23, "19990623", "000000")
1443 gives(2000, 6, 23, "20000623", "000000")
1444 gives(2010, 6, 5, "20100605", "000000")
1445
1446 def test_unparse_datetime_legacy(self):
1447 # Test legacy mode (RFC 977)
1448 # 1) with a datetime
1449 def gives(y, M, d, h, m, s, date_str, time_str):
1450 dt = datetime.datetime(y, M, d, h, m, s)
1451 self.assertEqual(nntplib._unparse_datetime(dt, True),
1452 (date_str, time_str))
1453 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1454 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1455 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1456 # 2) with a date
1457 def gives(y, M, d, date_str, time_str):
1458 dt = datetime.date(y, M, d)
1459 self.assertEqual(nntplib._unparse_datetime(dt, True),
1460 (date_str, time_str))
1461 gives(1999, 6, 23, "990623", "000000")
1462 gives(2000, 6, 23, "000623", "000000")
1463 gives(2010, 6, 5, "100605", "000000")
1464
Serhiy Storchaka43767632013-11-03 21:31:38 +02001465 @unittest.skipUnless(ssl, 'requires SSL support')
1466 def test_ssl_support(self):
1467 self.assertTrue(hasattr(nntplib, 'NNTP_SSL'))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001468
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001469
Berker Peksag96756b62014-09-20 08:53:05 +03001470class PublicAPITests(unittest.TestCase):
1471 """Ensures that the correct values are exposed in the public API."""
1472
1473 def test_module_all_attribute(self):
1474 self.assertTrue(hasattr(nntplib, '__all__'))
1475 target_api = ['NNTP', 'NNTPError', 'NNTPReplyError',
1476 'NNTPTemporaryError', 'NNTPPermanentError',
1477 'NNTPProtocolError', 'NNTPDataError', 'decode_header']
1478 if ssl is not None:
1479 target_api.append('NNTP_SSL')
1480 self.assertEqual(set(nntplib.__all__), set(target_api))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001481
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001482class MockSocketTests(unittest.TestCase):
1483 """Tests involving a mock socket object
1484
1485 Used where the _NNTPServerIO file object is not enough."""
1486
1487 nntp_class = nntplib.NNTP
1488
1489 def check_constructor_error_conditions(
1490 self, handler_class,
1491 expected_error_type, expected_error_msg,
1492 login=None, password=None):
1493
1494 class mock_socket_module:
1495 def create_connection(address, timeout):
1496 return MockSocket()
1497
1498 class MockSocket:
1499 def close(self):
1500 nonlocal socket_closed
1501 socket_closed = True
1502
1503 def makefile(socket, mode):
1504 handler = handler_class()
1505 _, file = make_mock_file(handler)
1506 files.append(file)
1507 return file
1508
1509 socket_closed = False
1510 files = []
1511 with patch('nntplib.socket', mock_socket_module), \
1512 self.assertRaisesRegex(expected_error_type, expected_error_msg):
1513 self.nntp_class('dummy', user=login, password=password)
1514 self.assertTrue(socket_closed)
1515 for f in files:
1516 self.assertTrue(f.closed)
1517
1518 def test_bad_welcome(self):
1519 #Test a bad welcome message
1520 class Handler(NNTPv1Handler):
1521 welcome = 'Bad Welcome'
1522 self.check_constructor_error_conditions(
1523 Handler, nntplib.NNTPProtocolError, Handler.welcome)
1524
1525 def test_service_temporarily_unavailable(self):
1526 #Test service temporarily unavailable
1527 class Handler(NNTPv1Handler):
Martin Pantereb995702016-07-28 01:11:04 +00001528 welcome = '400 Service temporarily unavailable'
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001529 self.check_constructor_error_conditions(
1530 Handler, nntplib.NNTPTemporaryError, Handler.welcome)
1531
1532 def test_service_permanently_unavailable(self):
1533 #Test service permanently unavailable
1534 class Handler(NNTPv1Handler):
Martin Pantereb995702016-07-28 01:11:04 +00001535 welcome = '502 Service permanently unavailable'
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001536 self.check_constructor_error_conditions(
1537 Handler, nntplib.NNTPPermanentError, Handler.welcome)
1538
1539 def test_bad_capabilities(self):
1540 #Test a bad capabilities response
1541 class Handler(NNTPv1Handler):
1542 def handle_CAPABILITIES(self):
1543 self.push_lit(capabilities_response)
1544 capabilities_response = '201 bad capability'
1545 self.check_constructor_error_conditions(
1546 Handler, nntplib.NNTPReplyError, capabilities_response)
1547
1548 def test_login_aborted(self):
1549 #Test a bad authinfo response
1550 login = 't@e.com'
1551 password = 'python'
1552 class Handler(NNTPv1Handler):
1553 def handle_AUTHINFO(self, *args):
1554 self.push_lit(authinfo_response)
1555 authinfo_response = '503 Mechanism not recognized'
1556 self.check_constructor_error_conditions(
1557 Handler, nntplib.NNTPPermanentError, authinfo_response,
1558 login, password)
1559
Serhiy Storchaka80774342015-04-03 15:02:20 +03001560class bypass_context:
1561 """Bypass encryption and actual SSL module"""
1562 def wrap_socket(sock, **args):
1563 return sock
1564
1565@unittest.skipUnless(ssl, 'requires SSL support')
1566class MockSslTests(MockSocketTests):
1567 @staticmethod
1568 def nntp_class(*pos, **kw):
1569 return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw)
Victor Stinner8c9bba02015-04-03 11:06:40 +02001570
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001571
Martin Panter8f19e8e2016-01-19 01:10:58 +00001572class LocalServerTests(unittest.TestCase):
1573 def setUp(self):
1574 sock = socket.socket()
Serhiy Storchaka16994912020-04-25 10:06:29 +03001575 port = socket_helper.bind_port(sock)
Martin Panter8f19e8e2016-01-19 01:10:58 +00001576 sock.listen()
1577 self.background = threading.Thread(
1578 target=self.run_server, args=(sock,))
1579 self.background.start()
1580 self.addCleanup(self.background.join)
1581
Serhiy Storchaka16994912020-04-25 10:06:29 +03001582 self.nntp = NNTP(socket_helper.HOST, port, usenetrc=False).__enter__()
Martin Panter8f19e8e2016-01-19 01:10:58 +00001583 self.addCleanup(self.nntp.__exit__, None, None, None)
1584
1585 def run_server(self, sock):
1586 # Could be generalized to handle more commands in separate methods
1587 with sock:
1588 [client, _] = sock.accept()
1589 with contextlib.ExitStack() as cleanup:
1590 cleanup.enter_context(client)
1591 reader = cleanup.enter_context(client.makefile('rb'))
1592 client.sendall(b'200 Server ready\r\n')
1593 while True:
1594 cmd = reader.readline()
1595 if cmd == b'CAPABILITIES\r\n':
1596 client.sendall(
1597 b'101 Capability list:\r\n'
1598 b'VERSION 2\r\n'
1599 b'STARTTLS\r\n'
1600 b'.\r\n'
1601 )
1602 elif cmd == b'STARTTLS\r\n':
1603 reader.close()
1604 client.sendall(b'382 Begin TLS negotiation now\r\n')
Christian Heimesd0486372016-09-10 23:23:33 +02001605 context = ssl.SSLContext()
1606 context.load_cert_chain(certfile)
1607 client = context.wrap_socket(
1608 client, server_side=True)
Martin Panter8f19e8e2016-01-19 01:10:58 +00001609 cleanup.enter_context(client)
1610 reader = cleanup.enter_context(client.makefile('rb'))
1611 elif cmd == b'QUIT\r\n':
1612 client.sendall(b'205 Bye!\r\n')
1613 break
1614 else:
1615 raise ValueError('Unexpected command {!r}'.format(cmd))
1616
1617 @unittest.skipUnless(ssl, 'requires SSL support')
1618 def test_starttls(self):
1619 file = self.nntp.file
1620 sock = self.nntp.sock
1621 self.nntp.starttls()
1622 # Check that the socket and internal pseudo-file really were
1623 # changed.
1624 self.assertNotEqual(file, self.nntp.file)
1625 self.assertNotEqual(sock, self.nntp.sock)
1626 # Check that the new socket really is an SSL one
1627 self.assertIsInstance(self.nntp.sock, ssl.SSLSocket)
1628 # Check that trying starttls when it's already active fails.
1629 self.assertRaises(ValueError, self.nntp.starttls)
1630
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001631
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001632if __name__ == "__main__":
Berker Peksag96756b62014-09-20 08:53:05 +03001633 unittest.main()