blob: 618b403bfb5bd33546613332858af39e3824cd95 [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +00002import socket
Antoine Pitrou69ab9512010-09-29 15:03:40 +00003import datetime
4import textwrap
5import unittest
Antoine Pitroude609182010-11-18 17:29:23 +00006import functools
Antoine Pitrou69ab9512010-09-29 15:03:40 +00007import contextlib
Martin Panter8f19e8e2016-01-19 01:10:58 +00008import os.path
Gregory P. Smith2cc02232019-05-06 17:54:06 -04009import re
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020010import threading
11
Antoine Pitrou69ab9512010-09-29 15:03:40 +000012from test import support
Serhiy Storchaka43767632013-11-03 21:31:38 +020013from nntplib import NNTP, GroupInfo
Antoine Pitrou69ab9512010-09-29 15:03:40 +000014import nntplib
Serhiy Storchaka52027c32015-03-21 09:40:26 +020015from unittest.mock import patch
Serhiy Storchaka43767632013-11-03 21:31:38 +020016try:
Antoine Pitrou1cb121e2010-11-09 18:54:37 +000017 import ssl
Serhiy Storchaka43767632013-11-03 21:31:38 +020018except ImportError:
19 ssl = None
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020020
Antoine Pitrou69ab9512010-09-29 15:03:40 +000021
22TIMEOUT = 30
Martin Panter8f19e8e2016-01-19 01:10:58 +000023certfile = os.path.join(os.path.dirname(__file__), 'keycert3.pem')
Antoine Pitrou69ab9512010-09-29 15:03:40 +000024
Gregory P. Smith2cc02232019-05-06 17:54:06 -040025if ssl is not None:
26 SSLError = ssl.SSLError
27else:
28 class SSLError(Exception):
29 """Non-existent exception class when we lack SSL support."""
30 reason = "This will never be raised."
31
Antoine Pitrou69ab9512010-09-29 15:03:40 +000032# TODO:
33# - test the `file` arg to more commands
34# - test error conditions
Antoine Pitroua5785b12010-09-29 16:19:50 +000035# - test auth and `usenetrc`
Antoine Pitrou69ab9512010-09-29 15:03:40 +000036
37
38class NetworkedNNTPTestsMixin:
39
40 def test_welcome(self):
41 welcome = self.server.getwelcome()
42 self.assertEqual(str, type(welcome))
43
44 def test_help(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000045 resp, lines = self.server.help()
Antoine Pitrou69ab9512010-09-29 15:03:40 +000046 self.assertTrue(resp.startswith("100 "), resp)
Antoine Pitrou08eeada2010-11-04 21:36:15 +000047 for line in lines:
Antoine Pitrou69ab9512010-09-29 15:03:40 +000048 self.assertEqual(str, type(line))
49
50 def test_list(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000051 resp, groups = self.server.list()
52 if len(groups) > 0:
53 self.assertEqual(GroupInfo, type(groups[0]))
54 self.assertEqual(str, type(groups[0].group))
55
56 def test_list_active(self):
57 resp, groups = self.server.list(self.GROUP_PAT)
58 if len(groups) > 0:
59 self.assertEqual(GroupInfo, type(groups[0]))
60 self.assertEqual(str, type(groups[0].group))
Antoine Pitrou69ab9512010-09-29 15:03:40 +000061
62 def test_unknown_command(self):
63 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
64 self.server._shortcmd("XYZZY")
65 resp = cm.exception.response
66 self.assertTrue(resp.startswith("500 "), resp)
67
68 def test_newgroups(self):
69 # gmane gets a constant influx of new groups. In order not to stress
70 # the server too much, we choose a recent date in the past.
71 dt = datetime.date.today() - datetime.timedelta(days=7)
72 resp, groups = self.server.newgroups(dt)
73 if len(groups) > 0:
74 self.assertIsInstance(groups[0], GroupInfo)
75 self.assertIsInstance(groups[0].group, str)
76
77 def test_description(self):
78 def _check_desc(desc):
79 # Sanity checks
80 self.assertIsInstance(desc, str)
81 self.assertNotIn(self.GROUP_NAME, desc)
82 desc = self.server.description(self.GROUP_NAME)
83 _check_desc(desc)
84 # Another sanity check
85 self.assertIn("Python", desc)
86 # With a pattern
87 desc = self.server.description(self.GROUP_PAT)
88 _check_desc(desc)
89 # Shouldn't exist
90 desc = self.server.description("zk.brrtt.baz")
91 self.assertEqual(desc, '')
92
93 def test_descriptions(self):
94 resp, descs = self.server.descriptions(self.GROUP_PAT)
95 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
96 self.assertTrue(
97 resp.startswith("215 ") or resp.startswith("282 "), resp)
98 self.assertIsInstance(descs, dict)
99 desc = descs[self.GROUP_NAME]
100 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
101
102 def test_group(self):
103 result = self.server.group(self.GROUP_NAME)
104 self.assertEqual(5, len(result))
105 resp, count, first, last, group = result
106 self.assertEqual(group, self.GROUP_NAME)
107 self.assertIsInstance(count, int)
108 self.assertIsInstance(first, int)
109 self.assertIsInstance(last, int)
110 self.assertLessEqual(first, last)
111 self.assertTrue(resp.startswith("211 "), resp)
112
113 def test_date(self):
114 resp, date = self.server.date()
115 self.assertIsInstance(date, datetime.datetime)
116 # Sanity check
117 self.assertGreaterEqual(date.year, 1995)
118 self.assertLessEqual(date.year, 2030)
119
120 def _check_art_dict(self, art_dict):
121 # Some sanity checks for a field dictionary returned by OVER / XOVER
122 self.assertIsInstance(art_dict, dict)
123 # NNTP has 7 mandatory fields
124 self.assertGreaterEqual(art_dict.keys(),
125 {"subject", "from", "date", "message-id",
126 "references", ":bytes", ":lines"}
127 )
128 for v in art_dict.values():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000129 self.assertIsInstance(v, (str, type(None)))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000130
131 def test_xover(self):
132 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000133 resp, lines = self.server.xover(last - 5, last)
134 if len(lines) == 0:
135 self.skipTest("no articles retrieved")
136 # The 'last' article is not necessarily part of the output (cancelled?)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000137 art_num, art_dict = lines[0]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000138 self.assertGreaterEqual(art_num, last - 5)
139 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000140 self._check_art_dict(art_dict)
141
Xavier de Gayeac13bee2016-12-16 20:49:10 +0100142 @unittest.skipIf(True, 'temporarily skipped until a permanent solution'
143 ' is found for issue #28971')
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000144 def test_over(self):
145 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
146 start = last - 10
147 # The "start-" article range form
148 resp, lines = self.server.over((start, None))
149 art_num, art_dict = lines[0]
150 self._check_art_dict(art_dict)
151 # The "start-end" article range form
152 resp, lines = self.server.over((start, last))
153 art_num, art_dict = lines[-1]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000154 # The 'last' article is not necessarily part of the output (cancelled?)
155 self.assertGreaterEqual(art_num, start)
156 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000157 self._check_art_dict(art_dict)
158 # XXX The "message_id" form is unsupported by gmane
159 # 503 Overview by message-ID unsupported
160
161 def test_xhdr(self):
162 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
163 resp, lines = self.server.xhdr('subject', last)
164 for line in lines:
165 self.assertEqual(str, type(line[1]))
166
167 def check_article_resp(self, resp, article, art_num=None):
168 self.assertIsInstance(article, nntplib.ArticleInfo)
169 if art_num is not None:
170 self.assertEqual(article.number, art_num)
171 for line in article.lines:
172 self.assertIsInstance(line, bytes)
173 # XXX this could exceptionally happen...
174 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
175
Victor Stinner706cb312017-11-25 02:42:18 +0100176 @unittest.skipIf(True, "FIXME: see bpo-32128")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000177 def test_article_head_body(self):
178 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000179 # Try to find an available article
180 for art_num in (last, first, last - 1):
181 try:
182 resp, head = self.server.head(art_num)
183 except nntplib.NNTPTemporaryError as e:
184 if not e.response.startswith("423 "):
185 raise
186 # "423 No such article" => choose another one
187 continue
188 break
189 else:
190 self.skipTest("could not find a suitable article number")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000191 self.assertTrue(resp.startswith("221 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000192 self.check_article_resp(resp, head, art_num)
193 resp, body = self.server.body(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000194 self.assertTrue(resp.startswith("222 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000195 self.check_article_resp(resp, body, art_num)
196 resp, article = self.server.article(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000197 self.assertTrue(resp.startswith("220 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000198 self.check_article_resp(resp, article, art_num)
Nick Coghlan14d99a12012-06-17 21:27:18 +1000199 # Tolerate running the tests from behind a NNTP virus checker
Antoine Pitrou1f5d2a02012-06-24 16:28:18 +0200200 blacklist = lambda line: line.startswith(b'X-Antivirus')
201 filtered_head_lines = [line for line in head.lines
202 if not blacklist(line)]
Nick Coghlan14d99a12012-06-17 21:27:18 +1000203 filtered_lines = [line for line in article.lines
Antoine Pitrou1f5d2a02012-06-24 16:28:18 +0200204 if not blacklist(line)]
205 self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000206
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000207 def test_capabilities(self):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000208 # The server under test implements NNTP version 2 and has a
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000209 # couple of well-known capabilities. Just sanity check that we
210 # got them.
211 def _check_caps(caps):
212 caps_list = caps['LIST']
213 self.assertIsInstance(caps_list, (list, tuple))
214 self.assertIn('OVERVIEW.FMT', caps_list)
215 self.assertGreaterEqual(self.server.nntp_version, 2)
216 _check_caps(self.server.getcapabilities())
217 # This re-emits the command
218 resp, caps = self.server.capabilities()
219 _check_caps(caps)
220
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000221 def test_zlogin(self):
222 # This test must be the penultimate because further commands will be
223 # refused.
224 baduser = "notarealuser"
225 badpw = "notarealpassword"
226 # Check that bogus credentials cause failure
227 self.assertRaises(nntplib.NNTPError, self.server.login,
228 user=baduser, password=badpw, usenetrc=False)
229 # FIXME: We should check that correct credentials succeed, but that
230 # would require valid details for some server somewhere to be in the
231 # test suite, I think. Gmane is anonymous, at least as used for the
232 # other tests.
233
234 def test_zzquit(self):
235 # This test must be called last, hence the name
236 cls = type(self)
Antoine Pitrou3bce11c2010-11-21 17:14:19 +0000237 try:
238 self.server.quit()
239 finally:
240 cls.server = None
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000241
Antoine Pitroude609182010-11-18 17:29:23 +0000242 @classmethod
243 def wrap_methods(cls):
244 # Wrap all methods in a transient_internet() exception catcher
245 # XXX put a generic version in test.support?
246 def wrap_meth(meth):
247 @functools.wraps(meth)
248 def wrapped(self):
249 with support.transient_internet(self.NNTP_HOST):
250 meth(self)
251 return wrapped
252 for name in dir(cls):
253 if not name.startswith('test_'):
254 continue
255 meth = getattr(cls, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200256 if not callable(meth):
Antoine Pitroude609182010-11-18 17:29:23 +0000257 continue
258 # Need to use a closure so that meth remains bound to its current
259 # value
260 setattr(cls, name, wrap_meth(meth))
261
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000262 def test_with_statement(self):
263 def is_connected():
264 if not hasattr(server, 'file'):
265 return False
266 try:
267 server.help()
Andrew Svetlov0832af62012-12-18 23:10:48 +0200268 except (OSError, EOFError):
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000269 return False
270 return True
271
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400272 try:
273 with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
274 self.assertTrue(is_connected())
275 self.assertTrue(server.help())
276 self.assertFalse(is_connected())
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000277
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400278 with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
279 server.quit()
280 self.assertFalse(is_connected())
281 except SSLError as ssl_err:
282 # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
283 if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
284 raise unittest.SkipTest(f"Got {ssl_err} connecting "
285 f"to {self.NNTP_HOST!r}")
286 raise
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000287
288
Antoine Pitroude609182010-11-18 17:29:23 +0000289NetworkedNNTPTestsMixin.wrap_methods()
290
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000291
INADA Naoki067931d2017-07-26 23:43:22 +0900292EOF_ERRORS = (EOFError,)
Victor Stinner5b4feb72017-07-24 17:41:02 +0200293if ssl is not None:
INADA Naoki067931d2017-07-26 23:43:22 +0900294 EOF_ERRORS += (ssl.SSLEOFError,)
Victor Stinner5b4feb72017-07-24 17:41:02 +0200295
296
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000297class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
298 # This server supports STARTTLS (gmane doesn't)
299 NNTP_HOST = 'news.trigofacile.com'
300 GROUP_NAME = 'fr.comp.lang.python'
301 GROUP_PAT = 'fr.comp.lang.*'
302
Antoine Pitroude609182010-11-18 17:29:23 +0000303 NNTP_CLASS = NNTP
304
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000305 @classmethod
306 def setUpClass(cls):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000307 support.requires("network")
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000308 with support.transient_internet(cls.NNTP_HOST):
Victor Stinner5bccca52017-04-27 17:30:13 +0200309 try:
310 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, timeout=TIMEOUT,
311 usenetrc=False)
Gregory P. Smith2cc02232019-05-06 17:54:06 -0400312 except SSLError as ssl_err:
313 # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
314 if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
315 raise unittest.SkipTest(f"{cls} got {ssl_err} connecting "
316 f"to {cls.NNTP_HOST!r}")
317 raise
Victor Stinner5b4feb72017-07-24 17:41:02 +0200318 except EOF_ERRORS:
Victor Stinner5bccca52017-04-27 17:30:13 +0200319 raise unittest.SkipTest(f"{cls} got EOF error on connecting "
320 f"to {cls.NNTP_HOST!r}")
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000321
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000322 @classmethod
323 def tearDownClass(cls):
324 if cls.server is not None:
325 cls.server.quit()
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000326
Serhiy Storchaka43767632013-11-03 21:31:38 +0200327@unittest.skipUnless(ssl, 'requires SSL support')
328class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000329
Serhiy Storchaka43767632013-11-03 21:31:38 +0200330 # Technical limits for this public NNTP server (see http://www.aioe.org):
331 # "Only two concurrent connections per IP address are allowed and
332 # 400 connections per day are accepted from each IP address."
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000333
Serhiy Storchaka43767632013-11-03 21:31:38 +0200334 NNTP_HOST = 'nntp.aioe.org'
335 GROUP_NAME = 'comp.lang.python'
336 GROUP_PAT = 'comp.lang.*'
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000337
Serhiy Storchaka43767632013-11-03 21:31:38 +0200338 NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None)
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000339
Serhiy Storchaka43767632013-11-03 21:31:38 +0200340 # Disabled as it produces too much data
341 test_list = None
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000342
Serhiy Storchaka43767632013-11-03 21:31:38 +0200343 # Disabled as the connection will already be encrypted.
344 test_starttls = None
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000345
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000346
347#
348# Non-networked tests using a local server (or something mocking it).
349#
350
351class _NNTPServerIO(io.RawIOBase):
352 """A raw IO object allowing NNTP commands to be received and processed
353 by a handler. The handler can push responses which can then be read
354 from the IO object."""
355
356 def __init__(self, handler):
357 io.RawIOBase.__init__(self)
358 # The channel from the client
359 self.c2s = io.BytesIO()
360 # The channel to the client
361 self.s2c = io.BytesIO()
362 self.handler = handler
363 self.handler.start(self.c2s.readline, self.push_data)
364
365 def readable(self):
366 return True
367
368 def writable(self):
369 return True
370
371 def push_data(self, data):
372 """Push (buffer) some data to send to the client."""
373 pos = self.s2c.tell()
374 self.s2c.seek(0, 2)
375 self.s2c.write(data)
376 self.s2c.seek(pos)
377
378 def write(self, b):
379 """The client sends us some data"""
380 pos = self.c2s.tell()
381 self.c2s.write(b)
382 self.c2s.seek(pos)
383 self.handler.process_pending()
384 return len(b)
385
386 def readinto(self, buf):
387 """The client wants to read a response"""
388 self.handler.process_pending()
389 b = self.s2c.read(len(buf))
390 n = len(b)
391 buf[:n] = b
392 return n
393
394
Serhiy Storchaka52027c32015-03-21 09:40:26 +0200395def make_mock_file(handler):
396 sio = _NNTPServerIO(handler)
397 # Using BufferedRWPair instead of BufferedRandom ensures the file
398 # isn't seekable.
399 file = io.BufferedRWPair(sio, sio)
400 return (sio, file)
401
402
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000403class MockedNNTPTestsMixin:
404 # Override in derived classes
405 handler_class = None
406
407 def setUp(self):
408 super().setUp()
409 self.make_server()
410
411 def tearDown(self):
412 super().tearDown()
413 del self.server
414
415 def make_server(self, *args, **kwargs):
416 self.handler = self.handler_class()
Serhiy Storchaka52027c32015-03-21 09:40:26 +0200417 self.sio, file = make_mock_file(self.handler)
Antoine Pitroua5785b12010-09-29 16:19:50 +0000418 self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000419 return self.server
420
421
Antoine Pitrou71135622012-02-14 23:29:34 +0100422class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin):
423 def setUp(self):
424 super().setUp()
425 self.make_server(readermode=True)
426
427
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000428class NNTPv1Handler:
429 """A handler for RFC 977"""
430
431 welcome = "200 NNTP mock server"
432
433 def start(self, readline, push_data):
434 self.in_body = False
435 self.allow_posting = True
436 self._readline = readline
437 self._push_data = push_data
Antoine Pitrou54411c12012-02-12 19:14:17 +0100438 self._logged_in = False
439 self._user_sent = False
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000440 # Our welcome
441 self.handle_welcome()
442
443 def _decode(self, data):
444 return str(data, "utf-8", "surrogateescape")
445
446 def process_pending(self):
447 if self.in_body:
448 while True:
449 line = self._readline()
450 if not line:
451 return
452 self.body.append(line)
453 if line == b".\r\n":
454 break
455 try:
456 meth, tokens = self.body_callback
457 meth(*tokens, body=self.body)
458 finally:
459 self.body_callback = None
460 self.body = None
461 self.in_body = False
462 while True:
463 line = self._decode(self._readline())
464 if not line:
465 return
466 if not line.endswith("\r\n"):
467 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
468 line = line[:-2]
469 cmd, *tokens = line.split()
470 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
471 meth = getattr(self, "handle_" + cmd.upper(), None)
472 if meth is None:
473 self.handle_unknown()
474 else:
475 try:
476 meth(*tokens)
477 except Exception as e:
478 raise ValueError("command failed: {!r}".format(line)) from e
479 else:
480 if self.in_body:
481 self.body_callback = meth, tokens
482 self.body = []
483
484 def expect_body(self):
485 """Flag that the client is expected to post a request body"""
486 self.in_body = True
487
488 def push_data(self, data):
489 """Push some binary data"""
490 self._push_data(data)
491
492 def push_lit(self, lit):
493 """Push a string literal"""
494 lit = textwrap.dedent(lit)
495 lit = "\r\n".join(lit.splitlines()) + "\r\n"
496 lit = lit.encode('utf-8')
497 self.push_data(lit)
498
499 def handle_unknown(self):
500 self.push_lit("500 What?")
501
502 def handle_welcome(self):
503 self.push_lit(self.welcome)
504
505 def handle_QUIT(self):
506 self.push_lit("205 Bye!")
507
508 def handle_DATE(self):
509 self.push_lit("111 20100914001155")
510
511 def handle_GROUP(self, group):
512 if group == "fr.comp.lang.python":
513 self.push_lit("211 486 761 1265 fr.comp.lang.python")
514 else:
515 self.push_lit("411 No such group {}".format(group))
516
517 def handle_HELP(self):
518 self.push_lit("""\
519 100 Legal commands
520 authinfo user Name|pass Password|generic <prog> <args>
521 date
522 help
523 Report problems to <root@example.org>
524 .""")
525
526 def handle_STAT(self, message_spec=None):
527 if message_spec is None:
528 self.push_lit("412 No newsgroup selected")
529 elif message_spec == "3000234":
530 self.push_lit("223 3000234 <45223423@example.com>")
531 elif message_spec == "<45223423@example.com>":
532 self.push_lit("223 0 <45223423@example.com>")
533 else:
534 self.push_lit("430 No Such Article Found")
535
536 def handle_NEXT(self):
537 self.push_lit("223 3000237 <668929@example.org> retrieved")
538
539 def handle_LAST(self):
540 self.push_lit("223 3000234 <45223423@example.com> retrieved")
541
542 def handle_LIST(self, action=None, param=None):
543 if action is None:
544 self.push_lit("""\
545 215 Newsgroups in form "group high low flags".
546 comp.lang.python 0000052340 0000002828 y
547 comp.lang.python.announce 0000001153 0000000993 m
548 free.it.comp.lang.python 0000000002 0000000002 y
549 fr.comp.lang.python 0000001254 0000000760 y
550 free.it.comp.lang.python.learner 0000000000 0000000001 y
551 tw.bbs.comp.lang.python 0000000304 0000000304 y
552 .""")
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000553 elif action == "ACTIVE":
554 if param == "*distutils*":
555 self.push_lit("""\
556 215 Newsgroups in form "group high low flags"
557 gmane.comp.python.distutils.devel 0000014104 0000000001 m
558 gmane.comp.python.distutils.cvs 0000000000 0000000001 m
559 .""")
560 else:
561 self.push_lit("""\
562 215 Newsgroups in form "group high low flags"
563 .""")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000564 elif action == "OVERVIEW.FMT":
565 self.push_lit("""\
566 215 Order of fields in overview database.
567 Subject:
568 From:
569 Date:
570 Message-ID:
571 References:
572 Bytes:
573 Lines:
574 Xref:full
575 .""")
576 elif action == "NEWSGROUPS":
577 assert param is not None
578 if param == "comp.lang.python":
579 self.push_lit("""\
580 215 Descriptions in form "group description".
581 comp.lang.python\tThe Python computer language.
582 .""")
583 elif param == "comp.lang.python*":
584 self.push_lit("""\
585 215 Descriptions in form "group description".
586 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
587 comp.lang.python\tThe Python computer language.
588 .""")
589 else:
590 self.push_lit("""\
591 215 Descriptions in form "group description".
592 .""")
593 else:
594 self.push_lit('501 Unknown LIST keyword')
595
596 def handle_NEWNEWS(self, group, date_str, time_str):
597 # We hard code different return messages depending on passed
598 # argument and date syntax.
599 if (group == "comp.lang.python" and date_str == "20100913"
600 and time_str == "082004"):
601 # Date was passed in RFC 3977 format (NNTP "v2")
602 self.push_lit("""\
603 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
604 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
605 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
606 .""")
607 elif (group == "comp.lang.python" and date_str == "100913"
608 and time_str == "082004"):
609 # Date was passed in RFC 977 format (NNTP "v1")
610 self.push_lit("""\
611 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
612 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
613 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
614 .""")
Georg Brandl28e78412013-10-27 07:29:47 +0100615 elif (group == 'comp.lang.python' and
616 date_str in ('20100101', '100101') and
617 time_str == '090000'):
618 self.push_lit('too long line' * 3000 +
619 '\n.')
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000620 else:
621 self.push_lit("""\
622 230 An empty list of newsarticles follows
623 .""")
624 # (Note for experiments: many servers disable NEWNEWS.
625 # As of this writing, sicinfo3.epfl.ch doesn't.)
626
627 def handle_XOVER(self, message_spec):
628 if message_spec == "57-59":
629 self.push_lit(
630 "224 Overview information for 57-58 follows\n"
631 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
632 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
633 "\tSat, 19 Jun 2010 18:04:08 -0400"
634 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
635 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
636 "\tXref: news.gmane.org gmane.comp.python.authors:57"
637 "\n"
638 "58\tLooking for a few good bloggers"
639 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
640 "\tThu, 22 Jul 2010 09:14:14 -0400"
641 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
642 "\t\t6683\t16"
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000643 "\t"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000644 "\n"
Martin Panter6245cb32016-04-15 02:14:19 +0000645 # A UTF-8 overview line from fr.comp.lang.python
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000646 "59\tRe: Message d'erreur incompréhensible (par moi)"
647 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
648 "\tWed, 15 Sep 2010 18:09:15 +0200"
649 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
650 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
651 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
652 "\n"
653 ".\n")
654 else:
655 self.push_lit("""\
656 224 No articles
657 .""")
658
659 def handle_POST(self, *, body=None):
660 if body is None:
661 if self.allow_posting:
662 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
663 self.expect_body()
664 else:
665 self.push_lit("440 Posting not permitted")
666 else:
667 assert self.allow_posting
668 self.push_lit("240 Article received OK")
669 self.posted_body = body
670
671 def handle_IHAVE(self, message_id, *, body=None):
672 if body is None:
673 if (self.allow_posting and
674 message_id == "<i.am.an.article.you.will.want@example.com>"):
675 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
676 self.expect_body()
677 else:
678 self.push_lit("435 Article not wanted")
679 else:
680 assert self.allow_posting
681 self.push_lit("235 Article transferred OK")
682 self.posted_body = body
683
684 sample_head = """\
685 From: "Demo User" <nobody@example.net>
686 Subject: I am just a test article
687 Content-Type: text/plain; charset=UTF-8; format=flowed
688 Message-ID: <i.am.an.article.you.will.want@example.com>"""
689
690 sample_body = """\
691 This is just a test article.
692 ..Here is a dot-starting line.
693
694 -- Signed by Andr\xe9."""
695
696 sample_article = sample_head + "\n\n" + sample_body
697
698 def handle_ARTICLE(self, message_spec=None):
699 if message_spec is None:
700 self.push_lit("220 3000237 <45223423@example.com>")
701 elif message_spec == "<45223423@example.com>":
702 self.push_lit("220 0 <45223423@example.com>")
703 elif message_spec == "3000234":
704 self.push_lit("220 3000234 <45223423@example.com>")
705 else:
706 self.push_lit("430 No Such Article Found")
707 return
708 self.push_lit(self.sample_article)
709 self.push_lit(".")
710
711 def handle_HEAD(self, message_spec=None):
712 if message_spec is None:
713 self.push_lit("221 3000237 <45223423@example.com>")
714 elif message_spec == "<45223423@example.com>":
715 self.push_lit("221 0 <45223423@example.com>")
716 elif message_spec == "3000234":
717 self.push_lit("221 3000234 <45223423@example.com>")
718 else:
719 self.push_lit("430 No Such Article Found")
720 return
721 self.push_lit(self.sample_head)
722 self.push_lit(".")
723
724 def handle_BODY(self, message_spec=None):
725 if message_spec is None:
726 self.push_lit("222 3000237 <45223423@example.com>")
727 elif message_spec == "<45223423@example.com>":
728 self.push_lit("222 0 <45223423@example.com>")
729 elif message_spec == "3000234":
730 self.push_lit("222 3000234 <45223423@example.com>")
731 else:
732 self.push_lit("430 No Such Article Found")
733 return
734 self.push_lit(self.sample_body)
735 self.push_lit(".")
736
Antoine Pitrou54411c12012-02-12 19:14:17 +0100737 def handle_AUTHINFO(self, cred_type, data):
738 if self._logged_in:
739 self.push_lit('502 Already Logged In')
740 elif cred_type == 'user':
741 if self._user_sent:
742 self.push_lit('482 User Credential Already Sent')
743 else:
744 self.push_lit('381 Password Required')
745 self._user_sent = True
746 elif cred_type == 'pass':
747 self.push_lit('281 Login Successful')
748 self._logged_in = True
749 else:
750 raise Exception('Unknown cred type {}'.format(cred_type))
751
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000752
753class NNTPv2Handler(NNTPv1Handler):
754 """A handler for RFC 3977 (NNTP "v2")"""
755
756 def handle_CAPABILITIES(self):
Antoine Pitrou54411c12012-02-12 19:14:17 +0100757 fmt = """\
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000758 101 Capability list:
Antoine Pitrouf80b3f72010-11-02 22:31:52 +0000759 VERSION 2 3
Antoine Pitrou54411c12012-02-12 19:14:17 +0100760 IMPLEMENTATION INN 2.5.1{}
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000761 HDR
762 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
763 OVER
764 POST
765 READER
Antoine Pitrou54411c12012-02-12 19:14:17 +0100766 ."""
767
768 if not self._logged_in:
769 self.push_lit(fmt.format('\n AUTHINFO USER'))
770 else:
771 self.push_lit(fmt.format(''))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000772
Antoine Pitrou71135622012-02-14 23:29:34 +0100773 def handle_MODE(self, _):
774 raise Exception('MODE READER sent despite READER has been advertised')
775
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000776 def handle_OVER(self, message_spec=None):
777 return self.handle_XOVER(message_spec)
778
779
Antoine Pitrou54411c12012-02-12 19:14:17 +0100780class CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
781 """A handler that allows CAPABILITIES only after login"""
782
783 def handle_CAPABILITIES(self):
784 if not self._logged_in:
785 self.push_lit('480 You must log in.')
786 else:
787 super().handle_CAPABILITIES()
788
789
Antoine Pitrou71135622012-02-14 23:29:34 +0100790class ModeSwitchingNNTPv2Handler(NNTPv2Handler):
791 """A server that starts in transit mode"""
792
793 def __init__(self):
794 self._switched = False
795
796 def handle_CAPABILITIES(self):
797 fmt = """\
798 101 Capability list:
799 VERSION 2 3
800 IMPLEMENTATION INN 2.5.1
801 HDR
802 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
803 OVER
804 POST
805 {}READER
806 ."""
807 if self._switched:
808 self.push_lit(fmt.format(''))
809 else:
810 self.push_lit(fmt.format('MODE-'))
811
812 def handle_MODE(self, what):
813 assert not self._switched and what == 'reader'
814 self._switched = True
815 self.push_lit('200 Posting allowed')
816
817
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000818class NNTPv1v2TestsMixin:
819
820 def setUp(self):
821 super().setUp()
822
823 def test_welcome(self):
824 self.assertEqual(self.server.welcome, self.handler.welcome)
825
Antoine Pitrou54411c12012-02-12 19:14:17 +0100826 def test_authinfo(self):
827 if self.nntp_version == 2:
828 self.assertIn('AUTHINFO', self.server._caps)
829 self.server.login('testuser', 'testpw')
830 # if AUTHINFO is gone from _caps we also know that getcapabilities()
831 # has been called after login as it should
832 self.assertNotIn('AUTHINFO', self.server._caps)
833
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000834 def test_date(self):
835 resp, date = self.server.date()
836 self.assertEqual(resp, "111 20100914001155")
837 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
838
839 def test_quit(self):
840 self.assertFalse(self.sio.closed)
841 resp = self.server.quit()
842 self.assertEqual(resp, "205 Bye!")
843 self.assertTrue(self.sio.closed)
844
845 def test_help(self):
846 resp, help = self.server.help()
847 self.assertEqual(resp, "100 Legal commands")
848 self.assertEqual(help, [
849 ' authinfo user Name|pass Password|generic <prog> <args>',
850 ' date',
851 ' help',
852 'Report problems to <root@example.org>',
853 ])
854
855 def test_list(self):
856 resp, groups = self.server.list()
857 self.assertEqual(len(groups), 6)
858 g = groups[1]
859 self.assertEqual(g,
860 GroupInfo("comp.lang.python.announce", "0000001153",
861 "0000000993", "m"))
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000862 resp, groups = self.server.list("*distutils*")
863 self.assertEqual(len(groups), 2)
864 g = groups[0]
865 self.assertEqual(g,
866 GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
867 "0000000001", "m"))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000868
869 def test_stat(self):
870 resp, art_num, message_id = self.server.stat(3000234)
871 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
872 self.assertEqual(art_num, 3000234)
873 self.assertEqual(message_id, "<45223423@example.com>")
874 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
875 self.assertEqual(resp, "223 0 <45223423@example.com>")
876 self.assertEqual(art_num, 0)
877 self.assertEqual(message_id, "<45223423@example.com>")
878 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
879 self.server.stat("<non.existent.id>")
880 self.assertEqual(cm.exception.response, "430 No Such Article Found")
881 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
882 self.server.stat()
883 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
884
885 def test_next(self):
886 resp, art_num, message_id = self.server.next()
887 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
888 self.assertEqual(art_num, 3000237)
889 self.assertEqual(message_id, "<668929@example.org>")
890
891 def test_last(self):
892 resp, art_num, message_id = self.server.last()
893 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
894 self.assertEqual(art_num, 3000234)
895 self.assertEqual(message_id, "<45223423@example.com>")
896
897 def test_description(self):
898 desc = self.server.description("comp.lang.python")
899 self.assertEqual(desc, "The Python computer language.")
900 desc = self.server.description("comp.lang.pythonx")
901 self.assertEqual(desc, "")
902
903 def test_descriptions(self):
904 resp, groups = self.server.descriptions("comp.lang.python")
905 self.assertEqual(resp, '215 Descriptions in form "group description".')
906 self.assertEqual(groups, {
907 "comp.lang.python": "The Python computer language.",
908 })
909 resp, groups = self.server.descriptions("comp.lang.python*")
910 self.assertEqual(groups, {
911 "comp.lang.python": "The Python computer language.",
912 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
913 })
914 resp, groups = self.server.descriptions("comp.lang.pythonx")
915 self.assertEqual(groups, {})
916
917 def test_group(self):
918 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
919 self.assertTrue(resp.startswith("211 "), resp)
920 self.assertEqual(first, 761)
921 self.assertEqual(last, 1265)
922 self.assertEqual(count, 486)
923 self.assertEqual(group, "fr.comp.lang.python")
924 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
925 self.server.group("comp.lang.python.devel")
926 exc = cm.exception
927 self.assertTrue(exc.response.startswith("411 No such group"),
928 exc.response)
929
930 def test_newnews(self):
931 # NEWNEWS comp.lang.python [20]100913 082004
932 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
933 resp, ids = self.server.newnews("comp.lang.python", dt)
934 expected = (
935 "230 list of newsarticles (NNTP v{0}) "
936 "created after Mon Sep 13 08:20:04 2010 follows"
937 ).format(self.nntp_version)
938 self.assertEqual(resp, expected)
939 self.assertEqual(ids, [
940 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
941 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
942 ])
943 # NEWNEWS fr.comp.lang.python [20]100913 082004
944 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
945 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
946 self.assertEqual(resp, "230 An empty list of newsarticles follows")
947 self.assertEqual(ids, [])
948
949 def _check_article_body(self, lines):
950 self.assertEqual(len(lines), 4)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +0000951 self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000952 self.assertEqual(lines[-2], b"")
953 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
954 self.assertEqual(lines[-4], b"This is just a test article.")
955
956 def _check_article_head(self, lines):
957 self.assertEqual(len(lines), 4)
958 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
959 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
960
961 def _check_article_data(self, lines):
962 self.assertEqual(len(lines), 9)
963 self._check_article_head(lines[:4])
964 self._check_article_body(lines[-4:])
965 self.assertEqual(lines[4], b"")
966
967 def test_article(self):
968 # ARTICLE
969 resp, info = self.server.article()
970 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
971 art_num, message_id, lines = info
972 self.assertEqual(art_num, 3000237)
973 self.assertEqual(message_id, "<45223423@example.com>")
974 self._check_article_data(lines)
975 # ARTICLE num
976 resp, info = self.server.article(3000234)
977 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
978 art_num, message_id, lines = info
979 self.assertEqual(art_num, 3000234)
980 self.assertEqual(message_id, "<45223423@example.com>")
981 self._check_article_data(lines)
982 # ARTICLE id
983 resp, info = self.server.article("<45223423@example.com>")
984 self.assertEqual(resp, "220 0 <45223423@example.com>")
985 art_num, message_id, lines = info
986 self.assertEqual(art_num, 0)
987 self.assertEqual(message_id, "<45223423@example.com>")
988 self._check_article_data(lines)
989 # Non-existent id
990 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
991 self.server.article("<non-existent@example.com>")
992 self.assertEqual(cm.exception.response, "430 No Such Article Found")
993
994 def test_article_file(self):
995 # With a "file" argument
996 f = io.BytesIO()
997 resp, info = self.server.article(file=f)
998 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
999 art_num, message_id, lines = info
1000 self.assertEqual(art_num, 3000237)
1001 self.assertEqual(message_id, "<45223423@example.com>")
1002 self.assertEqual(lines, [])
1003 data = f.getvalue()
1004 self.assertTrue(data.startswith(
1005 b'From: "Demo User" <nobody@example.net>\r\n'
1006 b'Subject: I am just a test article\r\n'
1007 ), ascii(data))
1008 self.assertTrue(data.endswith(
1009 b'This is just a test article.\r\n'
1010 b'.Here is a dot-starting line.\r\n'
1011 b'\r\n'
1012 b'-- Signed by Andr\xc3\xa9.\r\n'
1013 ), ascii(data))
1014
1015 def test_head(self):
1016 # HEAD
1017 resp, info = self.server.head()
1018 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1019 art_num, message_id, lines = info
1020 self.assertEqual(art_num, 3000237)
1021 self.assertEqual(message_id, "<45223423@example.com>")
1022 self._check_article_head(lines)
1023 # HEAD num
1024 resp, info = self.server.head(3000234)
1025 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
1026 art_num, message_id, lines = info
1027 self.assertEqual(art_num, 3000234)
1028 self.assertEqual(message_id, "<45223423@example.com>")
1029 self._check_article_head(lines)
1030 # HEAD id
1031 resp, info = self.server.head("<45223423@example.com>")
1032 self.assertEqual(resp, "221 0 <45223423@example.com>")
1033 art_num, message_id, lines = info
1034 self.assertEqual(art_num, 0)
1035 self.assertEqual(message_id, "<45223423@example.com>")
1036 self._check_article_head(lines)
1037 # Non-existent id
1038 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1039 self.server.head("<non-existent@example.com>")
1040 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1041
Antoine Pitrou2640b522012-02-15 18:53:18 +01001042 def test_head_file(self):
1043 f = io.BytesIO()
1044 resp, info = self.server.head(file=f)
1045 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1046 art_num, message_id, lines = info
1047 self.assertEqual(art_num, 3000237)
1048 self.assertEqual(message_id, "<45223423@example.com>")
1049 self.assertEqual(lines, [])
1050 data = f.getvalue()
1051 self.assertTrue(data.startswith(
1052 b'From: "Demo User" <nobody@example.net>\r\n'
1053 b'Subject: I am just a test article\r\n'
1054 ), ascii(data))
1055 self.assertFalse(data.endswith(
1056 b'This is just a test article.\r\n'
1057 b'.Here is a dot-starting line.\r\n'
1058 b'\r\n'
1059 b'-- Signed by Andr\xc3\xa9.\r\n'
1060 ), ascii(data))
1061
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001062 def test_body(self):
1063 # BODY
1064 resp, info = self.server.body()
1065 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1066 art_num, message_id, lines = info
1067 self.assertEqual(art_num, 3000237)
1068 self.assertEqual(message_id, "<45223423@example.com>")
1069 self._check_article_body(lines)
1070 # BODY num
1071 resp, info = self.server.body(3000234)
1072 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
1073 art_num, message_id, lines = info
1074 self.assertEqual(art_num, 3000234)
1075 self.assertEqual(message_id, "<45223423@example.com>")
1076 self._check_article_body(lines)
1077 # BODY id
1078 resp, info = self.server.body("<45223423@example.com>")
1079 self.assertEqual(resp, "222 0 <45223423@example.com>")
1080 art_num, message_id, lines = info
1081 self.assertEqual(art_num, 0)
1082 self.assertEqual(message_id, "<45223423@example.com>")
1083 self._check_article_body(lines)
1084 # Non-existent id
1085 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1086 self.server.body("<non-existent@example.com>")
1087 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1088
Antoine Pitrou2640b522012-02-15 18:53:18 +01001089 def test_body_file(self):
1090 f = io.BytesIO()
1091 resp, info = self.server.body(file=f)
1092 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1093 art_num, message_id, lines = info
1094 self.assertEqual(art_num, 3000237)
1095 self.assertEqual(message_id, "<45223423@example.com>")
1096 self.assertEqual(lines, [])
1097 data = f.getvalue()
1098 self.assertFalse(data.startswith(
1099 b'From: "Demo User" <nobody@example.net>\r\n'
1100 b'Subject: I am just a test article\r\n'
1101 ), ascii(data))
1102 self.assertTrue(data.endswith(
1103 b'This is just a test article.\r\n'
1104 b'.Here is a dot-starting line.\r\n'
1105 b'\r\n'
1106 b'-- Signed by Andr\xc3\xa9.\r\n'
1107 ), ascii(data))
1108
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001109 def check_over_xover_resp(self, resp, overviews):
1110 self.assertTrue(resp.startswith("224 "), resp)
1111 self.assertEqual(len(overviews), 3)
1112 art_num, over = overviews[0]
1113 self.assertEqual(art_num, 57)
1114 self.assertEqual(over, {
1115 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
1116 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
1117 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
1118 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
1119 "references": "<hvalf7$ort$1@dough.gmane.org>",
1120 ":bytes": "7103",
1121 ":lines": "16",
1122 "xref": "news.gmane.org gmane.comp.python.authors:57"
1123 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001124 art_num, over = overviews[1]
1125 self.assertEqual(over["xref"], None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001126 art_num, over = overviews[2]
1127 self.assertEqual(over["subject"],
1128 "Re: Message d'erreur incompréhensible (par moi)")
1129
1130 def test_xover(self):
1131 resp, overviews = self.server.xover(57, 59)
1132 self.check_over_xover_resp(resp, overviews)
1133
1134 def test_over(self):
1135 # In NNTP "v1", this will fallback on XOVER
1136 resp, overviews = self.server.over((57, 59))
1137 self.check_over_xover_resp(resp, overviews)
1138
1139 sample_post = (
1140 b'From: "Demo User" <nobody@example.net>\r\n'
1141 b'Subject: I am just a test article\r\n'
1142 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
1143 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
1144 b'\r\n'
1145 b'This is just a test article.\r\n'
1146 b'.Here is a dot-starting line.\r\n'
1147 b'\r\n'
1148 b'-- Signed by Andr\xc3\xa9.\r\n'
1149 )
1150
1151 def _check_posted_body(self):
1152 # Check the raw body as received by the server
1153 lines = self.handler.posted_body
1154 # One additional line for the "." terminator
1155 self.assertEqual(len(lines), 10)
1156 self.assertEqual(lines[-1], b'.\r\n')
1157 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
1158 self.assertEqual(lines[-3], b'\r\n')
1159 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
1160 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
1161
1162 def _check_post_ihave_sub(self, func, *args, file_factory):
1163 # First the prepared post with CRLF endings
1164 post = self.sample_post
1165 func_args = args + (file_factory(post),)
1166 self.handler.posted_body = None
1167 resp = func(*func_args)
1168 self._check_posted_body()
1169 # Then the same post with "normal" line endings - they should be
1170 # converted by NNTP.post and NNTP.ihave.
1171 post = self.sample_post.replace(b"\r\n", b"\n")
1172 func_args = args + (file_factory(post),)
1173 self.handler.posted_body = None
1174 resp = func(*func_args)
1175 self._check_posted_body()
1176 return resp
1177
1178 def check_post_ihave(self, func, success_resp, *args):
1179 # With a bytes object
1180 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1181 self.assertEqual(resp, success_resp)
1182 # With a bytearray object
1183 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1184 self.assertEqual(resp, success_resp)
1185 # With a file object
1186 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1187 self.assertEqual(resp, success_resp)
1188 # With an iterable of terminated lines
1189 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001190 return iter(b.splitlines(keepends=True))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001191 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1192 self.assertEqual(resp, success_resp)
1193 # With an iterable of non-terminated lines
1194 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001195 return iter(b.splitlines(keepends=False))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001196 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1197 self.assertEqual(resp, success_resp)
1198
1199 def test_post(self):
1200 self.check_post_ihave(self.server.post, "240 Article received OK")
1201 self.handler.allow_posting = False
1202 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1203 self.server.post(self.sample_post)
1204 self.assertEqual(cm.exception.response,
1205 "440 Posting not permitted")
1206
1207 def test_ihave(self):
1208 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1209 "<i.am.an.article.you.will.want@example.com>")
1210 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1211 self.server.ihave("<another.message.id>", self.sample_post)
1212 self.assertEqual(cm.exception.response,
1213 "435 Article not wanted")
1214
Georg Brandl28e78412013-10-27 07:29:47 +01001215 def test_too_long_lines(self):
1216 dt = datetime.datetime(2010, 1, 1, 9, 0, 0)
1217 self.assertRaises(nntplib.NNTPDataError,
1218 self.server.newnews, "comp.lang.python", dt)
1219
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001220
1221class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1222 """Tests an NNTP v1 server (no capabilities)."""
1223
1224 nntp_version = 1
1225 handler_class = NNTPv1Handler
1226
1227 def test_caps(self):
1228 caps = self.server.getcapabilities()
1229 self.assertEqual(caps, {})
1230 self.assertEqual(self.server.nntp_version, 1)
Antoine Pitroua0781152010-11-05 19:16:37 +00001231 self.assertEqual(self.server.nntp_implementation, None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001232
1233
1234class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1235 """Tests an NNTP v2 server (with capabilities)."""
1236
1237 nntp_version = 2
1238 handler_class = NNTPv2Handler
1239
1240 def test_caps(self):
1241 caps = self.server.getcapabilities()
1242 self.assertEqual(caps, {
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001243 'VERSION': ['2', '3'],
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001244 'IMPLEMENTATION': ['INN', '2.5.1'],
1245 'AUTHINFO': ['USER'],
1246 'HDR': [],
1247 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1248 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1249 'OVER': [],
1250 'POST': [],
1251 'READER': [],
1252 })
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001253 self.assertEqual(self.server.nntp_version, 3)
Antoine Pitroua0781152010-11-05 19:16:37 +00001254 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001255
1256
Antoine Pitrou54411c12012-02-12 19:14:17 +01001257class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
1258 """Tests a probably NNTP v2 server with capabilities only after login."""
1259
1260 nntp_version = 2
1261 handler_class = CapsAfterLoginNNTPv2Handler
1262
1263 def test_caps_only_after_login(self):
1264 self.assertEqual(self.server._caps, {})
1265 self.server.login('testuser', 'testpw')
1266 self.assertIn('VERSION', self.server._caps)
1267
1268
Antoine Pitrou71135622012-02-14 23:29:34 +01001269class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin,
1270 unittest.TestCase):
1271 """Same tests as for v2 but we tell NTTP to send MODE READER to a server
1272 that isn't in READER mode by default."""
1273
1274 nntp_version = 2
1275 handler_class = ModeSwitchingNNTPv2Handler
1276
1277 def test_we_are_in_reader_mode_after_connect(self):
1278 self.assertIn('READER', self.server._caps)
1279
1280
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001281class MiscTests(unittest.TestCase):
1282
1283 def test_decode_header(self):
1284 def gives(a, b):
1285 self.assertEqual(nntplib.decode_header(a), b)
1286 gives("" , "")
1287 gives("a plain header", "a plain header")
1288 gives(" with extra spaces ", " with extra spaces ")
1289 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "DĂ©buter en Python")
1290 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1291 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1292 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1293 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1294 "Re: problème de matrice")
1295 # A natively utf-8 header (found in the real world!)
1296 gives("Re: Message d'erreur incompréhensible (par moi)",
1297 "Re: Message d'erreur incompréhensible (par moi)")
1298
1299 def test_parse_overview_fmt(self):
1300 # The minimal (default) response
1301 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1302 "References:", ":bytes", ":lines"]
1303 self.assertEqual(nntplib._parse_overview_fmt(lines),
1304 ["subject", "from", "date", "message-id", "references",
1305 ":bytes", ":lines"])
1306 # The minimal response using alternative names
1307 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1308 "References:", "Bytes:", "Lines:"]
1309 self.assertEqual(nntplib._parse_overview_fmt(lines),
1310 ["subject", "from", "date", "message-id", "references",
1311 ":bytes", ":lines"])
1312 # Variations in casing
1313 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1314 "References:", "BYTES:", "Lines:"]
1315 self.assertEqual(nntplib._parse_overview_fmt(lines),
1316 ["subject", "from", "date", "message-id", "references",
1317 ":bytes", ":lines"])
1318 # First example from RFC 3977
1319 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1320 "References:", ":bytes", ":lines", "Xref:full",
1321 "Distribution:full"]
1322 self.assertEqual(nntplib._parse_overview_fmt(lines),
1323 ["subject", "from", "date", "message-id", "references",
1324 ":bytes", ":lines", "xref", "distribution"])
1325 # Second example from RFC 3977
1326 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1327 "References:", "Bytes:", "Lines:", "Xref:FULL",
1328 "Distribution:FULL"]
1329 self.assertEqual(nntplib._parse_overview_fmt(lines),
1330 ["subject", "from", "date", "message-id", "references",
1331 ":bytes", ":lines", "xref", "distribution"])
1332 # A classic response from INN
1333 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1334 "References:", "Bytes:", "Lines:", "Xref:full"]
1335 self.assertEqual(nntplib._parse_overview_fmt(lines),
1336 ["subject", "from", "date", "message-id", "references",
1337 ":bytes", ":lines", "xref"])
1338
1339 def test_parse_overview(self):
1340 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1341 # First example from RFC 3977
1342 lines = [
1343 '3000234\tI am just a test article\t"Demo User" '
1344 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1345 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1346 '17\tXref: news.example.com misc.test:3000363',
1347 ]
1348 overview = nntplib._parse_overview(lines, fmt)
1349 (art_num, fields), = overview
1350 self.assertEqual(art_num, 3000234)
1351 self.assertEqual(fields, {
1352 'subject': 'I am just a test article',
1353 'from': '"Demo User" <nobody@example.com>',
1354 'date': '6 Oct 1998 04:38:40 -0500',
1355 'message-id': '<45223423@example.com>',
1356 'references': '<45454@example.net>',
1357 ':bytes': '1234',
1358 ':lines': '17',
1359 'xref': 'news.example.com misc.test:3000363',
1360 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001361 # Second example; here the "Xref" field is totally absent (including
1362 # the header name) and comes out as None
1363 lines = [
1364 '3000234\tI am just a test article\t"Demo User" '
1365 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1366 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1367 '17\t\t',
1368 ]
1369 overview = nntplib._parse_overview(lines, fmt)
1370 (art_num, fields), = overview
1371 self.assertEqual(fields['xref'], None)
1372 # Third example; the "Xref" is an empty string, while "references"
1373 # is a single space.
1374 lines = [
1375 '3000234\tI am just a test article\t"Demo User" '
1376 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1377 '<45223423@example.com>\t \t1234\t'
1378 '17\tXref: \t',
1379 ]
1380 overview = nntplib._parse_overview(lines, fmt)
1381 (art_num, fields), = overview
1382 self.assertEqual(fields['references'], ' ')
1383 self.assertEqual(fields['xref'], '')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001384
1385 def test_parse_datetime(self):
1386 def gives(a, b, *c):
1387 self.assertEqual(nntplib._parse_datetime(a, b),
1388 datetime.datetime(*c))
1389 # Output of DATE command
1390 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1391 # Variations
1392 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1393 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1394 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1395
1396 def test_unparse_datetime(self):
1397 # Test non-legacy mode
1398 # 1) with a datetime
1399 def gives(y, M, d, h, m, s, date_str, time_str):
1400 dt = datetime.datetime(y, M, d, h, m, s)
1401 self.assertEqual(nntplib._unparse_datetime(dt),
1402 (date_str, time_str))
1403 self.assertEqual(nntplib._unparse_datetime(dt, False),
1404 (date_str, time_str))
1405 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1406 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1407 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1408 # 2) with a date
1409 def gives(y, M, d, date_str, time_str):
1410 dt = datetime.date(y, M, d)
1411 self.assertEqual(nntplib._unparse_datetime(dt),
1412 (date_str, time_str))
1413 self.assertEqual(nntplib._unparse_datetime(dt, False),
1414 (date_str, time_str))
1415 gives(1999, 6, 23, "19990623", "000000")
1416 gives(2000, 6, 23, "20000623", "000000")
1417 gives(2010, 6, 5, "20100605", "000000")
1418
1419 def test_unparse_datetime_legacy(self):
1420 # Test legacy mode (RFC 977)
1421 # 1) with a datetime
1422 def gives(y, M, d, h, m, s, date_str, time_str):
1423 dt = datetime.datetime(y, M, d, h, m, s)
1424 self.assertEqual(nntplib._unparse_datetime(dt, True),
1425 (date_str, time_str))
1426 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1427 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1428 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1429 # 2) with a date
1430 def gives(y, M, d, date_str, time_str):
1431 dt = datetime.date(y, M, d)
1432 self.assertEqual(nntplib._unparse_datetime(dt, True),
1433 (date_str, time_str))
1434 gives(1999, 6, 23, "990623", "000000")
1435 gives(2000, 6, 23, "000623", "000000")
1436 gives(2010, 6, 5, "100605", "000000")
1437
Serhiy Storchaka43767632013-11-03 21:31:38 +02001438 @unittest.skipUnless(ssl, 'requires SSL support')
1439 def test_ssl_support(self):
1440 self.assertTrue(hasattr(nntplib, 'NNTP_SSL'))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001441
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001442
Berker Peksag96756b62014-09-20 08:53:05 +03001443class PublicAPITests(unittest.TestCase):
1444 """Ensures that the correct values are exposed in the public API."""
1445
1446 def test_module_all_attribute(self):
1447 self.assertTrue(hasattr(nntplib, '__all__'))
1448 target_api = ['NNTP', 'NNTPError', 'NNTPReplyError',
1449 'NNTPTemporaryError', 'NNTPPermanentError',
1450 'NNTPProtocolError', 'NNTPDataError', 'decode_header']
1451 if ssl is not None:
1452 target_api.append('NNTP_SSL')
1453 self.assertEqual(set(nntplib.__all__), set(target_api))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001454
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001455class MockSocketTests(unittest.TestCase):
1456 """Tests involving a mock socket object
1457
1458 Used where the _NNTPServerIO file object is not enough."""
1459
1460 nntp_class = nntplib.NNTP
1461
1462 def check_constructor_error_conditions(
1463 self, handler_class,
1464 expected_error_type, expected_error_msg,
1465 login=None, password=None):
1466
1467 class mock_socket_module:
1468 def create_connection(address, timeout):
1469 return MockSocket()
1470
1471 class MockSocket:
1472 def close(self):
1473 nonlocal socket_closed
1474 socket_closed = True
1475
1476 def makefile(socket, mode):
1477 handler = handler_class()
1478 _, file = make_mock_file(handler)
1479 files.append(file)
1480 return file
1481
1482 socket_closed = False
1483 files = []
1484 with patch('nntplib.socket', mock_socket_module), \
1485 self.assertRaisesRegex(expected_error_type, expected_error_msg):
1486 self.nntp_class('dummy', user=login, password=password)
1487 self.assertTrue(socket_closed)
1488 for f in files:
1489 self.assertTrue(f.closed)
1490
1491 def test_bad_welcome(self):
1492 #Test a bad welcome message
1493 class Handler(NNTPv1Handler):
1494 welcome = 'Bad Welcome'
1495 self.check_constructor_error_conditions(
1496 Handler, nntplib.NNTPProtocolError, Handler.welcome)
1497
1498 def test_service_temporarily_unavailable(self):
1499 #Test service temporarily unavailable
1500 class Handler(NNTPv1Handler):
Martin Pantereb995702016-07-28 01:11:04 +00001501 welcome = '400 Service temporarily unavailable'
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001502 self.check_constructor_error_conditions(
1503 Handler, nntplib.NNTPTemporaryError, Handler.welcome)
1504
1505 def test_service_permanently_unavailable(self):
1506 #Test service permanently unavailable
1507 class Handler(NNTPv1Handler):
Martin Pantereb995702016-07-28 01:11:04 +00001508 welcome = '502 Service permanently unavailable'
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001509 self.check_constructor_error_conditions(
1510 Handler, nntplib.NNTPPermanentError, Handler.welcome)
1511
1512 def test_bad_capabilities(self):
1513 #Test a bad capabilities response
1514 class Handler(NNTPv1Handler):
1515 def handle_CAPABILITIES(self):
1516 self.push_lit(capabilities_response)
1517 capabilities_response = '201 bad capability'
1518 self.check_constructor_error_conditions(
1519 Handler, nntplib.NNTPReplyError, capabilities_response)
1520
1521 def test_login_aborted(self):
1522 #Test a bad authinfo response
1523 login = 't@e.com'
1524 password = 'python'
1525 class Handler(NNTPv1Handler):
1526 def handle_AUTHINFO(self, *args):
1527 self.push_lit(authinfo_response)
1528 authinfo_response = '503 Mechanism not recognized'
1529 self.check_constructor_error_conditions(
1530 Handler, nntplib.NNTPPermanentError, authinfo_response,
1531 login, password)
1532
Serhiy Storchaka80774342015-04-03 15:02:20 +03001533class bypass_context:
1534 """Bypass encryption and actual SSL module"""
1535 def wrap_socket(sock, **args):
1536 return sock
1537
1538@unittest.skipUnless(ssl, 'requires SSL support')
1539class MockSslTests(MockSocketTests):
1540 @staticmethod
1541 def nntp_class(*pos, **kw):
1542 return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw)
Victor Stinner8c9bba02015-04-03 11:06:40 +02001543
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001544
Martin Panter8f19e8e2016-01-19 01:10:58 +00001545class LocalServerTests(unittest.TestCase):
1546 def setUp(self):
1547 sock = socket.socket()
1548 port = support.bind_port(sock)
1549 sock.listen()
1550 self.background = threading.Thread(
1551 target=self.run_server, args=(sock,))
1552 self.background.start()
1553 self.addCleanup(self.background.join)
1554
1555 self.nntp = NNTP(support.HOST, port, usenetrc=False).__enter__()
1556 self.addCleanup(self.nntp.__exit__, None, None, None)
1557
1558 def run_server(self, sock):
1559 # Could be generalized to handle more commands in separate methods
1560 with sock:
1561 [client, _] = sock.accept()
1562 with contextlib.ExitStack() as cleanup:
1563 cleanup.enter_context(client)
1564 reader = cleanup.enter_context(client.makefile('rb'))
1565 client.sendall(b'200 Server ready\r\n')
1566 while True:
1567 cmd = reader.readline()
1568 if cmd == b'CAPABILITIES\r\n':
1569 client.sendall(
1570 b'101 Capability list:\r\n'
1571 b'VERSION 2\r\n'
1572 b'STARTTLS\r\n'
1573 b'.\r\n'
1574 )
1575 elif cmd == b'STARTTLS\r\n':
1576 reader.close()
1577 client.sendall(b'382 Begin TLS negotiation now\r\n')
Christian Heimesd0486372016-09-10 23:23:33 +02001578 context = ssl.SSLContext()
1579 context.load_cert_chain(certfile)
1580 client = context.wrap_socket(
1581 client, server_side=True)
Martin Panter8f19e8e2016-01-19 01:10:58 +00001582 cleanup.enter_context(client)
1583 reader = cleanup.enter_context(client.makefile('rb'))
1584 elif cmd == b'QUIT\r\n':
1585 client.sendall(b'205 Bye!\r\n')
1586 break
1587 else:
1588 raise ValueError('Unexpected command {!r}'.format(cmd))
1589
1590 @unittest.skipUnless(ssl, 'requires SSL support')
1591 def test_starttls(self):
1592 file = self.nntp.file
1593 sock = self.nntp.sock
1594 self.nntp.starttls()
1595 # Check that the socket and internal pseudo-file really were
1596 # changed.
1597 self.assertNotEqual(file, self.nntp.file)
1598 self.assertNotEqual(sock, self.nntp.sock)
1599 # Check that the new socket really is an SSL one
1600 self.assertIsInstance(self.nntp.sock, ssl.SSLSocket)
1601 # Check that trying starttls when it's already active fails.
1602 self.assertRaises(ValueError, self.nntp.starttls)
1603
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001604
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001605if __name__ == "__main__":
Berker Peksag96756b62014-09-20 08:53:05 +03001606 unittest.main()