blob: bb780bfa004b578e77d3665a0e28ecad4d1214e8 [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +00002import socket
Antoine Pitrou69ab9512010-09-29 15:03:40 +00003import datetime
4import textwrap
5import unittest
Antoine Pitroude609182010-11-18 17:29:23 +00006import functools
Antoine Pitrou69ab9512010-09-29 15:03:40 +00007import contextlib
Martin Panter8f19e8e2016-01-19 01:10:58 +00008import os.path
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02009import threading
10
Antoine Pitrou69ab9512010-09-29 15:03:40 +000011from test import support
Serhiy Storchaka43767632013-11-03 21:31:38 +020012from nntplib import NNTP, GroupInfo
Antoine Pitrou69ab9512010-09-29 15:03:40 +000013import nntplib
Serhiy Storchaka52027c32015-03-21 09:40:26 +020014from unittest.mock import patch
Serhiy Storchaka43767632013-11-03 21:31:38 +020015try:
Antoine Pitrou1cb121e2010-11-09 18:54:37 +000016 import ssl
Serhiy Storchaka43767632013-11-03 21:31:38 +020017except ImportError:
18 ssl = None
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020019
Antoine Pitrou69ab9512010-09-29 15:03:40 +000020
21TIMEOUT = 30
Martin Panter8f19e8e2016-01-19 01:10:58 +000022certfile = os.path.join(os.path.dirname(__file__), 'keycert3.pem')
Antoine Pitrou69ab9512010-09-29 15:03:40 +000023
24# TODO:
25# - test the `file` arg to more commands
26# - test error conditions
Antoine Pitroua5785b12010-09-29 16:19:50 +000027# - test auth and `usenetrc`
Antoine Pitrou69ab9512010-09-29 15:03:40 +000028
29
30class NetworkedNNTPTestsMixin:
31
32 def test_welcome(self):
33 welcome = self.server.getwelcome()
34 self.assertEqual(str, type(welcome))
35
36 def test_help(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000037 resp, lines = self.server.help()
Antoine Pitrou69ab9512010-09-29 15:03:40 +000038 self.assertTrue(resp.startswith("100 "), resp)
Antoine Pitrou08eeada2010-11-04 21:36:15 +000039 for line in lines:
Antoine Pitrou69ab9512010-09-29 15:03:40 +000040 self.assertEqual(str, type(line))
41
42 def test_list(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000043 resp, groups = self.server.list()
44 if len(groups) > 0:
45 self.assertEqual(GroupInfo, type(groups[0]))
46 self.assertEqual(str, type(groups[0].group))
47
48 def test_list_active(self):
49 resp, groups = self.server.list(self.GROUP_PAT)
50 if len(groups) > 0:
51 self.assertEqual(GroupInfo, type(groups[0]))
52 self.assertEqual(str, type(groups[0].group))
Antoine Pitrou69ab9512010-09-29 15:03:40 +000053
54 def test_unknown_command(self):
55 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
56 self.server._shortcmd("XYZZY")
57 resp = cm.exception.response
58 self.assertTrue(resp.startswith("500 "), resp)
59
60 def test_newgroups(self):
61 # gmane gets a constant influx of new groups. In order not to stress
62 # the server too much, we choose a recent date in the past.
63 dt = datetime.date.today() - datetime.timedelta(days=7)
64 resp, groups = self.server.newgroups(dt)
65 if len(groups) > 0:
66 self.assertIsInstance(groups[0], GroupInfo)
67 self.assertIsInstance(groups[0].group, str)
68
69 def test_description(self):
70 def _check_desc(desc):
71 # Sanity checks
72 self.assertIsInstance(desc, str)
73 self.assertNotIn(self.GROUP_NAME, desc)
74 desc = self.server.description(self.GROUP_NAME)
75 _check_desc(desc)
76 # Another sanity check
77 self.assertIn("Python", desc)
78 # With a pattern
79 desc = self.server.description(self.GROUP_PAT)
80 _check_desc(desc)
81 # Shouldn't exist
82 desc = self.server.description("zk.brrtt.baz")
83 self.assertEqual(desc, '')
84
85 def test_descriptions(self):
86 resp, descs = self.server.descriptions(self.GROUP_PAT)
87 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
88 self.assertTrue(
89 resp.startswith("215 ") or resp.startswith("282 "), resp)
90 self.assertIsInstance(descs, dict)
91 desc = descs[self.GROUP_NAME]
92 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
93
94 def test_group(self):
95 result = self.server.group(self.GROUP_NAME)
96 self.assertEqual(5, len(result))
97 resp, count, first, last, group = result
98 self.assertEqual(group, self.GROUP_NAME)
99 self.assertIsInstance(count, int)
100 self.assertIsInstance(first, int)
101 self.assertIsInstance(last, int)
102 self.assertLessEqual(first, last)
103 self.assertTrue(resp.startswith("211 "), resp)
104
105 def test_date(self):
106 resp, date = self.server.date()
107 self.assertIsInstance(date, datetime.datetime)
108 # Sanity check
109 self.assertGreaterEqual(date.year, 1995)
110 self.assertLessEqual(date.year, 2030)
111
112 def _check_art_dict(self, art_dict):
113 # Some sanity checks for a field dictionary returned by OVER / XOVER
114 self.assertIsInstance(art_dict, dict)
115 # NNTP has 7 mandatory fields
116 self.assertGreaterEqual(art_dict.keys(),
117 {"subject", "from", "date", "message-id",
118 "references", ":bytes", ":lines"}
119 )
120 for v in art_dict.values():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000121 self.assertIsInstance(v, (str, type(None)))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000122
123 def test_xover(self):
124 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000125 resp, lines = self.server.xover(last - 5, last)
126 if len(lines) == 0:
127 self.skipTest("no articles retrieved")
128 # The 'last' article is not necessarily part of the output (cancelled?)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000129 art_num, art_dict = lines[0]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000130 self.assertGreaterEqual(art_num, last - 5)
131 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000132 self._check_art_dict(art_dict)
133
Xavier de Gayeac13bee2016-12-16 20:49:10 +0100134 @unittest.skipIf(True, 'temporarily skipped until a permanent solution'
135 ' is found for issue #28971')
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000136 def test_over(self):
137 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
138 start = last - 10
139 # The "start-" article range form
140 resp, lines = self.server.over((start, None))
141 art_num, art_dict = lines[0]
142 self._check_art_dict(art_dict)
143 # The "start-end" article range form
144 resp, lines = self.server.over((start, last))
145 art_num, art_dict = lines[-1]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000146 # The 'last' article is not necessarily part of the output (cancelled?)
147 self.assertGreaterEqual(art_num, start)
148 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000149 self._check_art_dict(art_dict)
150 # XXX The "message_id" form is unsupported by gmane
151 # 503 Overview by message-ID unsupported
152
153 def test_xhdr(self):
154 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
155 resp, lines = self.server.xhdr('subject', last)
156 for line in lines:
157 self.assertEqual(str, type(line[1]))
158
159 def check_article_resp(self, resp, article, art_num=None):
160 self.assertIsInstance(article, nntplib.ArticleInfo)
161 if art_num is not None:
162 self.assertEqual(article.number, art_num)
163 for line in article.lines:
164 self.assertIsInstance(line, bytes)
165 # XXX this could exceptionally happen...
166 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
167
168 def test_article_head_body(self):
169 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000170 # Try to find an available article
171 for art_num in (last, first, last - 1):
172 try:
173 resp, head = self.server.head(art_num)
174 except nntplib.NNTPTemporaryError as e:
175 if not e.response.startswith("423 "):
176 raise
177 # "423 No such article" => choose another one
178 continue
179 break
180 else:
181 self.skipTest("could not find a suitable article number")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000182 self.assertTrue(resp.startswith("221 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000183 self.check_article_resp(resp, head, art_num)
184 resp, body = self.server.body(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000185 self.assertTrue(resp.startswith("222 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000186 self.check_article_resp(resp, body, art_num)
187 resp, article = self.server.article(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000188 self.assertTrue(resp.startswith("220 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000189 self.check_article_resp(resp, article, art_num)
Nick Coghlan14d99a12012-06-17 21:27:18 +1000190 # Tolerate running the tests from behind a NNTP virus checker
Antoine Pitrou1f5d2a02012-06-24 16:28:18 +0200191 blacklist = lambda line: line.startswith(b'X-Antivirus')
192 filtered_head_lines = [line for line in head.lines
193 if not blacklist(line)]
Nick Coghlan14d99a12012-06-17 21:27:18 +1000194 filtered_lines = [line for line in article.lines
Antoine Pitrou1f5d2a02012-06-24 16:28:18 +0200195 if not blacklist(line)]
196 self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000197
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000198 def test_capabilities(self):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000199 # The server under test implements NNTP version 2 and has a
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000200 # couple of well-known capabilities. Just sanity check that we
201 # got them.
202 def _check_caps(caps):
203 caps_list = caps['LIST']
204 self.assertIsInstance(caps_list, (list, tuple))
205 self.assertIn('OVERVIEW.FMT', caps_list)
206 self.assertGreaterEqual(self.server.nntp_version, 2)
207 _check_caps(self.server.getcapabilities())
208 # This re-emits the command
209 resp, caps = self.server.capabilities()
210 _check_caps(caps)
211
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000212 def test_zlogin(self):
213 # This test must be the penultimate because further commands will be
214 # refused.
215 baduser = "notarealuser"
216 badpw = "notarealpassword"
217 # Check that bogus credentials cause failure
218 self.assertRaises(nntplib.NNTPError, self.server.login,
219 user=baduser, password=badpw, usenetrc=False)
220 # FIXME: We should check that correct credentials succeed, but that
221 # would require valid details for some server somewhere to be in the
222 # test suite, I think. Gmane is anonymous, at least as used for the
223 # other tests.
224
225 def test_zzquit(self):
226 # This test must be called last, hence the name
227 cls = type(self)
Antoine Pitrou3bce11c2010-11-21 17:14:19 +0000228 try:
229 self.server.quit()
230 finally:
231 cls.server = None
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000232
Antoine Pitroude609182010-11-18 17:29:23 +0000233 @classmethod
234 def wrap_methods(cls):
235 # Wrap all methods in a transient_internet() exception catcher
236 # XXX put a generic version in test.support?
237 def wrap_meth(meth):
238 @functools.wraps(meth)
239 def wrapped(self):
240 with support.transient_internet(self.NNTP_HOST):
241 meth(self)
242 return wrapped
243 for name in dir(cls):
244 if not name.startswith('test_'):
245 continue
246 meth = getattr(cls, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200247 if not callable(meth):
Antoine Pitroude609182010-11-18 17:29:23 +0000248 continue
249 # Need to use a closure so that meth remains bound to its current
250 # value
251 setattr(cls, name, wrap_meth(meth))
252
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000253 def test_with_statement(self):
254 def is_connected():
255 if not hasattr(server, 'file'):
256 return False
257 try:
258 server.help()
Andrew Svetlov0832af62012-12-18 23:10:48 +0200259 except (OSError, EOFError):
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000260 return False
261 return True
262
263 with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
264 self.assertTrue(is_connected())
265 self.assertTrue(server.help())
266 self.assertFalse(is_connected())
267
268 with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
269 server.quit()
270 self.assertFalse(is_connected())
271
272
Antoine Pitroude609182010-11-18 17:29:23 +0000273NetworkedNNTPTestsMixin.wrap_methods()
274
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000275
INADA Naoki067931d2017-07-26 23:43:22 +0900276EOF_ERRORS = (EOFError,)
Victor Stinner5b4feb72017-07-24 17:41:02 +0200277if ssl is not None:
INADA Naoki067931d2017-07-26 23:43:22 +0900278 EOF_ERRORS += (ssl.SSLEOFError,)
Victor Stinner5b4feb72017-07-24 17:41:02 +0200279
280
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000281class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
282 # This server supports STARTTLS (gmane doesn't)
283 NNTP_HOST = 'news.trigofacile.com'
284 GROUP_NAME = 'fr.comp.lang.python'
285 GROUP_PAT = 'fr.comp.lang.*'
286
Antoine Pitroude609182010-11-18 17:29:23 +0000287 NNTP_CLASS = NNTP
288
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000289 @classmethod
290 def setUpClass(cls):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000291 support.requires("network")
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000292 with support.transient_internet(cls.NNTP_HOST):
Victor Stinner5bccca52017-04-27 17:30:13 +0200293 try:
294 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, timeout=TIMEOUT,
295 usenetrc=False)
Victor Stinner5b4feb72017-07-24 17:41:02 +0200296 except EOF_ERRORS:
Victor Stinner5bccca52017-04-27 17:30:13 +0200297 raise unittest.SkipTest(f"{cls} got EOF error on connecting "
298 f"to {cls.NNTP_HOST!r}")
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000299
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000300 @classmethod
301 def tearDownClass(cls):
302 if cls.server is not None:
303 cls.server.quit()
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000304
Serhiy Storchaka43767632013-11-03 21:31:38 +0200305@unittest.skipUnless(ssl, 'requires SSL support')
306class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000307
Serhiy Storchaka43767632013-11-03 21:31:38 +0200308 # Technical limits for this public NNTP server (see http://www.aioe.org):
309 # "Only two concurrent connections per IP address are allowed and
310 # 400 connections per day are accepted from each IP address."
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000311
Serhiy Storchaka43767632013-11-03 21:31:38 +0200312 NNTP_HOST = 'nntp.aioe.org'
313 GROUP_NAME = 'comp.lang.python'
314 GROUP_PAT = 'comp.lang.*'
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000315
Serhiy Storchaka43767632013-11-03 21:31:38 +0200316 NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None)
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000317
Serhiy Storchaka43767632013-11-03 21:31:38 +0200318 # Disabled as it produces too much data
319 test_list = None
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000320
Serhiy Storchaka43767632013-11-03 21:31:38 +0200321 # Disabled as the connection will already be encrypted.
322 test_starttls = None
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000323
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000324
325#
326# Non-networked tests using a local server (or something mocking it).
327#
328
329class _NNTPServerIO(io.RawIOBase):
330 """A raw IO object allowing NNTP commands to be received and processed
331 by a handler. The handler can push responses which can then be read
332 from the IO object."""
333
334 def __init__(self, handler):
335 io.RawIOBase.__init__(self)
336 # The channel from the client
337 self.c2s = io.BytesIO()
338 # The channel to the client
339 self.s2c = io.BytesIO()
340 self.handler = handler
341 self.handler.start(self.c2s.readline, self.push_data)
342
343 def readable(self):
344 return True
345
346 def writable(self):
347 return True
348
349 def push_data(self, data):
350 """Push (buffer) some data to send to the client."""
351 pos = self.s2c.tell()
352 self.s2c.seek(0, 2)
353 self.s2c.write(data)
354 self.s2c.seek(pos)
355
356 def write(self, b):
357 """The client sends us some data"""
358 pos = self.c2s.tell()
359 self.c2s.write(b)
360 self.c2s.seek(pos)
361 self.handler.process_pending()
362 return len(b)
363
364 def readinto(self, buf):
365 """The client wants to read a response"""
366 self.handler.process_pending()
367 b = self.s2c.read(len(buf))
368 n = len(b)
369 buf[:n] = b
370 return n
371
372
Serhiy Storchaka52027c32015-03-21 09:40:26 +0200373def make_mock_file(handler):
374 sio = _NNTPServerIO(handler)
375 # Using BufferedRWPair instead of BufferedRandom ensures the file
376 # isn't seekable.
377 file = io.BufferedRWPair(sio, sio)
378 return (sio, file)
379
380
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000381class MockedNNTPTestsMixin:
382 # Override in derived classes
383 handler_class = None
384
385 def setUp(self):
386 super().setUp()
387 self.make_server()
388
389 def tearDown(self):
390 super().tearDown()
391 del self.server
392
393 def make_server(self, *args, **kwargs):
394 self.handler = self.handler_class()
Serhiy Storchaka52027c32015-03-21 09:40:26 +0200395 self.sio, file = make_mock_file(self.handler)
Antoine Pitroua5785b12010-09-29 16:19:50 +0000396 self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000397 return self.server
398
399
Antoine Pitrou71135622012-02-14 23:29:34 +0100400class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin):
401 def setUp(self):
402 super().setUp()
403 self.make_server(readermode=True)
404
405
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000406class NNTPv1Handler:
407 """A handler for RFC 977"""
408
409 welcome = "200 NNTP mock server"
410
411 def start(self, readline, push_data):
412 self.in_body = False
413 self.allow_posting = True
414 self._readline = readline
415 self._push_data = push_data
Antoine Pitrou54411c12012-02-12 19:14:17 +0100416 self._logged_in = False
417 self._user_sent = False
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000418 # Our welcome
419 self.handle_welcome()
420
421 def _decode(self, data):
422 return str(data, "utf-8", "surrogateescape")
423
424 def process_pending(self):
425 if self.in_body:
426 while True:
427 line = self._readline()
428 if not line:
429 return
430 self.body.append(line)
431 if line == b".\r\n":
432 break
433 try:
434 meth, tokens = self.body_callback
435 meth(*tokens, body=self.body)
436 finally:
437 self.body_callback = None
438 self.body = None
439 self.in_body = False
440 while True:
441 line = self._decode(self._readline())
442 if not line:
443 return
444 if not line.endswith("\r\n"):
445 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
446 line = line[:-2]
447 cmd, *tokens = line.split()
448 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
449 meth = getattr(self, "handle_" + cmd.upper(), None)
450 if meth is None:
451 self.handle_unknown()
452 else:
453 try:
454 meth(*tokens)
455 except Exception as e:
456 raise ValueError("command failed: {!r}".format(line)) from e
457 else:
458 if self.in_body:
459 self.body_callback = meth, tokens
460 self.body = []
461
462 def expect_body(self):
463 """Flag that the client is expected to post a request body"""
464 self.in_body = True
465
466 def push_data(self, data):
467 """Push some binary data"""
468 self._push_data(data)
469
470 def push_lit(self, lit):
471 """Push a string literal"""
472 lit = textwrap.dedent(lit)
473 lit = "\r\n".join(lit.splitlines()) + "\r\n"
474 lit = lit.encode('utf-8')
475 self.push_data(lit)
476
477 def handle_unknown(self):
478 self.push_lit("500 What?")
479
480 def handle_welcome(self):
481 self.push_lit(self.welcome)
482
483 def handle_QUIT(self):
484 self.push_lit("205 Bye!")
485
486 def handle_DATE(self):
487 self.push_lit("111 20100914001155")
488
489 def handle_GROUP(self, group):
490 if group == "fr.comp.lang.python":
491 self.push_lit("211 486 761 1265 fr.comp.lang.python")
492 else:
493 self.push_lit("411 No such group {}".format(group))
494
495 def handle_HELP(self):
496 self.push_lit("""\
497 100 Legal commands
498 authinfo user Name|pass Password|generic <prog> <args>
499 date
500 help
501 Report problems to <root@example.org>
502 .""")
503
504 def handle_STAT(self, message_spec=None):
505 if message_spec is None:
506 self.push_lit("412 No newsgroup selected")
507 elif message_spec == "3000234":
508 self.push_lit("223 3000234 <45223423@example.com>")
509 elif message_spec == "<45223423@example.com>":
510 self.push_lit("223 0 <45223423@example.com>")
511 else:
512 self.push_lit("430 No Such Article Found")
513
514 def handle_NEXT(self):
515 self.push_lit("223 3000237 <668929@example.org> retrieved")
516
517 def handle_LAST(self):
518 self.push_lit("223 3000234 <45223423@example.com> retrieved")
519
520 def handle_LIST(self, action=None, param=None):
521 if action is None:
522 self.push_lit("""\
523 215 Newsgroups in form "group high low flags".
524 comp.lang.python 0000052340 0000002828 y
525 comp.lang.python.announce 0000001153 0000000993 m
526 free.it.comp.lang.python 0000000002 0000000002 y
527 fr.comp.lang.python 0000001254 0000000760 y
528 free.it.comp.lang.python.learner 0000000000 0000000001 y
529 tw.bbs.comp.lang.python 0000000304 0000000304 y
530 .""")
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000531 elif action == "ACTIVE":
532 if param == "*distutils*":
533 self.push_lit("""\
534 215 Newsgroups in form "group high low flags"
535 gmane.comp.python.distutils.devel 0000014104 0000000001 m
536 gmane.comp.python.distutils.cvs 0000000000 0000000001 m
537 .""")
538 else:
539 self.push_lit("""\
540 215 Newsgroups in form "group high low flags"
541 .""")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000542 elif action == "OVERVIEW.FMT":
543 self.push_lit("""\
544 215 Order of fields in overview database.
545 Subject:
546 From:
547 Date:
548 Message-ID:
549 References:
550 Bytes:
551 Lines:
552 Xref:full
553 .""")
554 elif action == "NEWSGROUPS":
555 assert param is not None
556 if param == "comp.lang.python":
557 self.push_lit("""\
558 215 Descriptions in form "group description".
559 comp.lang.python\tThe Python computer language.
560 .""")
561 elif param == "comp.lang.python*":
562 self.push_lit("""\
563 215 Descriptions in form "group description".
564 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
565 comp.lang.python\tThe Python computer language.
566 .""")
567 else:
568 self.push_lit("""\
569 215 Descriptions in form "group description".
570 .""")
571 else:
572 self.push_lit('501 Unknown LIST keyword')
573
574 def handle_NEWNEWS(self, group, date_str, time_str):
575 # We hard code different return messages depending on passed
576 # argument and date syntax.
577 if (group == "comp.lang.python" and date_str == "20100913"
578 and time_str == "082004"):
579 # Date was passed in RFC 3977 format (NNTP "v2")
580 self.push_lit("""\
581 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
582 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
583 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
584 .""")
585 elif (group == "comp.lang.python" and date_str == "100913"
586 and time_str == "082004"):
587 # Date was passed in RFC 977 format (NNTP "v1")
588 self.push_lit("""\
589 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
590 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
591 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
592 .""")
Georg Brandl28e78412013-10-27 07:29:47 +0100593 elif (group == 'comp.lang.python' and
594 date_str in ('20100101', '100101') and
595 time_str == '090000'):
596 self.push_lit('too long line' * 3000 +
597 '\n.')
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000598 else:
599 self.push_lit("""\
600 230 An empty list of newsarticles follows
601 .""")
602 # (Note for experiments: many servers disable NEWNEWS.
603 # As of this writing, sicinfo3.epfl.ch doesn't.)
604
605 def handle_XOVER(self, message_spec):
606 if message_spec == "57-59":
607 self.push_lit(
608 "224 Overview information for 57-58 follows\n"
609 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
610 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
611 "\tSat, 19 Jun 2010 18:04:08 -0400"
612 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
613 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
614 "\tXref: news.gmane.org gmane.comp.python.authors:57"
615 "\n"
616 "58\tLooking for a few good bloggers"
617 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
618 "\tThu, 22 Jul 2010 09:14:14 -0400"
619 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
620 "\t\t6683\t16"
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000621 "\t"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000622 "\n"
Martin Panter6245cb32016-04-15 02:14:19 +0000623 # A UTF-8 overview line from fr.comp.lang.python
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000624 "59\tRe: Message d'erreur incompréhensible (par moi)"
625 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
626 "\tWed, 15 Sep 2010 18:09:15 +0200"
627 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
628 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
629 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
630 "\n"
631 ".\n")
632 else:
633 self.push_lit("""\
634 224 No articles
635 .""")
636
637 def handle_POST(self, *, body=None):
638 if body is None:
639 if self.allow_posting:
640 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
641 self.expect_body()
642 else:
643 self.push_lit("440 Posting not permitted")
644 else:
645 assert self.allow_posting
646 self.push_lit("240 Article received OK")
647 self.posted_body = body
648
649 def handle_IHAVE(self, message_id, *, body=None):
650 if body is None:
651 if (self.allow_posting and
652 message_id == "<i.am.an.article.you.will.want@example.com>"):
653 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
654 self.expect_body()
655 else:
656 self.push_lit("435 Article not wanted")
657 else:
658 assert self.allow_posting
659 self.push_lit("235 Article transferred OK")
660 self.posted_body = body
661
662 sample_head = """\
663 From: "Demo User" <nobody@example.net>
664 Subject: I am just a test article
665 Content-Type: text/plain; charset=UTF-8; format=flowed
666 Message-ID: <i.am.an.article.you.will.want@example.com>"""
667
668 sample_body = """\
669 This is just a test article.
670 ..Here is a dot-starting line.
671
672 -- Signed by Andr\xe9."""
673
674 sample_article = sample_head + "\n\n" + sample_body
675
676 def handle_ARTICLE(self, message_spec=None):
677 if message_spec is None:
678 self.push_lit("220 3000237 <45223423@example.com>")
679 elif message_spec == "<45223423@example.com>":
680 self.push_lit("220 0 <45223423@example.com>")
681 elif message_spec == "3000234":
682 self.push_lit("220 3000234 <45223423@example.com>")
683 else:
684 self.push_lit("430 No Such Article Found")
685 return
686 self.push_lit(self.sample_article)
687 self.push_lit(".")
688
689 def handle_HEAD(self, message_spec=None):
690 if message_spec is None:
691 self.push_lit("221 3000237 <45223423@example.com>")
692 elif message_spec == "<45223423@example.com>":
693 self.push_lit("221 0 <45223423@example.com>")
694 elif message_spec == "3000234":
695 self.push_lit("221 3000234 <45223423@example.com>")
696 else:
697 self.push_lit("430 No Such Article Found")
698 return
699 self.push_lit(self.sample_head)
700 self.push_lit(".")
701
702 def handle_BODY(self, message_spec=None):
703 if message_spec is None:
704 self.push_lit("222 3000237 <45223423@example.com>")
705 elif message_spec == "<45223423@example.com>":
706 self.push_lit("222 0 <45223423@example.com>")
707 elif message_spec == "3000234":
708 self.push_lit("222 3000234 <45223423@example.com>")
709 else:
710 self.push_lit("430 No Such Article Found")
711 return
712 self.push_lit(self.sample_body)
713 self.push_lit(".")
714
Antoine Pitrou54411c12012-02-12 19:14:17 +0100715 def handle_AUTHINFO(self, cred_type, data):
716 if self._logged_in:
717 self.push_lit('502 Already Logged In')
718 elif cred_type == 'user':
719 if self._user_sent:
720 self.push_lit('482 User Credential Already Sent')
721 else:
722 self.push_lit('381 Password Required')
723 self._user_sent = True
724 elif cred_type == 'pass':
725 self.push_lit('281 Login Successful')
726 self._logged_in = True
727 else:
728 raise Exception('Unknown cred type {}'.format(cred_type))
729
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000730
731class NNTPv2Handler(NNTPv1Handler):
732 """A handler for RFC 3977 (NNTP "v2")"""
733
734 def handle_CAPABILITIES(self):
Antoine Pitrou54411c12012-02-12 19:14:17 +0100735 fmt = """\
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000736 101 Capability list:
Antoine Pitrouf80b3f72010-11-02 22:31:52 +0000737 VERSION 2 3
Antoine Pitrou54411c12012-02-12 19:14:17 +0100738 IMPLEMENTATION INN 2.5.1{}
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000739 HDR
740 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
741 OVER
742 POST
743 READER
Antoine Pitrou54411c12012-02-12 19:14:17 +0100744 ."""
745
746 if not self._logged_in:
747 self.push_lit(fmt.format('\n AUTHINFO USER'))
748 else:
749 self.push_lit(fmt.format(''))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000750
Antoine Pitrou71135622012-02-14 23:29:34 +0100751 def handle_MODE(self, _):
752 raise Exception('MODE READER sent despite READER has been advertised')
753
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000754 def handle_OVER(self, message_spec=None):
755 return self.handle_XOVER(message_spec)
756
757
Antoine Pitrou54411c12012-02-12 19:14:17 +0100758class CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
759 """A handler that allows CAPABILITIES only after login"""
760
761 def handle_CAPABILITIES(self):
762 if not self._logged_in:
763 self.push_lit('480 You must log in.')
764 else:
765 super().handle_CAPABILITIES()
766
767
Antoine Pitrou71135622012-02-14 23:29:34 +0100768class ModeSwitchingNNTPv2Handler(NNTPv2Handler):
769 """A server that starts in transit mode"""
770
771 def __init__(self):
772 self._switched = False
773
774 def handle_CAPABILITIES(self):
775 fmt = """\
776 101 Capability list:
777 VERSION 2 3
778 IMPLEMENTATION INN 2.5.1
779 HDR
780 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
781 OVER
782 POST
783 {}READER
784 ."""
785 if self._switched:
786 self.push_lit(fmt.format(''))
787 else:
788 self.push_lit(fmt.format('MODE-'))
789
790 def handle_MODE(self, what):
791 assert not self._switched and what == 'reader'
792 self._switched = True
793 self.push_lit('200 Posting allowed')
794
795
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000796class NNTPv1v2TestsMixin:
797
798 def setUp(self):
799 super().setUp()
800
801 def test_welcome(self):
802 self.assertEqual(self.server.welcome, self.handler.welcome)
803
Antoine Pitrou54411c12012-02-12 19:14:17 +0100804 def test_authinfo(self):
805 if self.nntp_version == 2:
806 self.assertIn('AUTHINFO', self.server._caps)
807 self.server.login('testuser', 'testpw')
808 # if AUTHINFO is gone from _caps we also know that getcapabilities()
809 # has been called after login as it should
810 self.assertNotIn('AUTHINFO', self.server._caps)
811
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000812 def test_date(self):
813 resp, date = self.server.date()
814 self.assertEqual(resp, "111 20100914001155")
815 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
816
817 def test_quit(self):
818 self.assertFalse(self.sio.closed)
819 resp = self.server.quit()
820 self.assertEqual(resp, "205 Bye!")
821 self.assertTrue(self.sio.closed)
822
823 def test_help(self):
824 resp, help = self.server.help()
825 self.assertEqual(resp, "100 Legal commands")
826 self.assertEqual(help, [
827 ' authinfo user Name|pass Password|generic <prog> <args>',
828 ' date',
829 ' help',
830 'Report problems to <root@example.org>',
831 ])
832
833 def test_list(self):
834 resp, groups = self.server.list()
835 self.assertEqual(len(groups), 6)
836 g = groups[1]
837 self.assertEqual(g,
838 GroupInfo("comp.lang.python.announce", "0000001153",
839 "0000000993", "m"))
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000840 resp, groups = self.server.list("*distutils*")
841 self.assertEqual(len(groups), 2)
842 g = groups[0]
843 self.assertEqual(g,
844 GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
845 "0000000001", "m"))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000846
847 def test_stat(self):
848 resp, art_num, message_id = self.server.stat(3000234)
849 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
850 self.assertEqual(art_num, 3000234)
851 self.assertEqual(message_id, "<45223423@example.com>")
852 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
853 self.assertEqual(resp, "223 0 <45223423@example.com>")
854 self.assertEqual(art_num, 0)
855 self.assertEqual(message_id, "<45223423@example.com>")
856 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
857 self.server.stat("<non.existent.id>")
858 self.assertEqual(cm.exception.response, "430 No Such Article Found")
859 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
860 self.server.stat()
861 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
862
863 def test_next(self):
864 resp, art_num, message_id = self.server.next()
865 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
866 self.assertEqual(art_num, 3000237)
867 self.assertEqual(message_id, "<668929@example.org>")
868
869 def test_last(self):
870 resp, art_num, message_id = self.server.last()
871 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
872 self.assertEqual(art_num, 3000234)
873 self.assertEqual(message_id, "<45223423@example.com>")
874
875 def test_description(self):
876 desc = self.server.description("comp.lang.python")
877 self.assertEqual(desc, "The Python computer language.")
878 desc = self.server.description("comp.lang.pythonx")
879 self.assertEqual(desc, "")
880
881 def test_descriptions(self):
882 resp, groups = self.server.descriptions("comp.lang.python")
883 self.assertEqual(resp, '215 Descriptions in form "group description".')
884 self.assertEqual(groups, {
885 "comp.lang.python": "The Python computer language.",
886 })
887 resp, groups = self.server.descriptions("comp.lang.python*")
888 self.assertEqual(groups, {
889 "comp.lang.python": "The Python computer language.",
890 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
891 })
892 resp, groups = self.server.descriptions("comp.lang.pythonx")
893 self.assertEqual(groups, {})
894
895 def test_group(self):
896 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
897 self.assertTrue(resp.startswith("211 "), resp)
898 self.assertEqual(first, 761)
899 self.assertEqual(last, 1265)
900 self.assertEqual(count, 486)
901 self.assertEqual(group, "fr.comp.lang.python")
902 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
903 self.server.group("comp.lang.python.devel")
904 exc = cm.exception
905 self.assertTrue(exc.response.startswith("411 No such group"),
906 exc.response)
907
908 def test_newnews(self):
909 # NEWNEWS comp.lang.python [20]100913 082004
910 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
911 resp, ids = self.server.newnews("comp.lang.python", dt)
912 expected = (
913 "230 list of newsarticles (NNTP v{0}) "
914 "created after Mon Sep 13 08:20:04 2010 follows"
915 ).format(self.nntp_version)
916 self.assertEqual(resp, expected)
917 self.assertEqual(ids, [
918 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
919 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
920 ])
921 # NEWNEWS fr.comp.lang.python [20]100913 082004
922 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
923 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
924 self.assertEqual(resp, "230 An empty list of newsarticles follows")
925 self.assertEqual(ids, [])
926
927 def _check_article_body(self, lines):
928 self.assertEqual(len(lines), 4)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +0000929 self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000930 self.assertEqual(lines[-2], b"")
931 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
932 self.assertEqual(lines[-4], b"This is just a test article.")
933
934 def _check_article_head(self, lines):
935 self.assertEqual(len(lines), 4)
936 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
937 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
938
939 def _check_article_data(self, lines):
940 self.assertEqual(len(lines), 9)
941 self._check_article_head(lines[:4])
942 self._check_article_body(lines[-4:])
943 self.assertEqual(lines[4], b"")
944
945 def test_article(self):
946 # ARTICLE
947 resp, info = self.server.article()
948 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
949 art_num, message_id, lines = info
950 self.assertEqual(art_num, 3000237)
951 self.assertEqual(message_id, "<45223423@example.com>")
952 self._check_article_data(lines)
953 # ARTICLE num
954 resp, info = self.server.article(3000234)
955 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
956 art_num, message_id, lines = info
957 self.assertEqual(art_num, 3000234)
958 self.assertEqual(message_id, "<45223423@example.com>")
959 self._check_article_data(lines)
960 # ARTICLE id
961 resp, info = self.server.article("<45223423@example.com>")
962 self.assertEqual(resp, "220 0 <45223423@example.com>")
963 art_num, message_id, lines = info
964 self.assertEqual(art_num, 0)
965 self.assertEqual(message_id, "<45223423@example.com>")
966 self._check_article_data(lines)
967 # Non-existent id
968 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
969 self.server.article("<non-existent@example.com>")
970 self.assertEqual(cm.exception.response, "430 No Such Article Found")
971
972 def test_article_file(self):
973 # With a "file" argument
974 f = io.BytesIO()
975 resp, info = self.server.article(file=f)
976 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
977 art_num, message_id, lines = info
978 self.assertEqual(art_num, 3000237)
979 self.assertEqual(message_id, "<45223423@example.com>")
980 self.assertEqual(lines, [])
981 data = f.getvalue()
982 self.assertTrue(data.startswith(
983 b'From: "Demo User" <nobody@example.net>\r\n'
984 b'Subject: I am just a test article\r\n'
985 ), ascii(data))
986 self.assertTrue(data.endswith(
987 b'This is just a test article.\r\n'
988 b'.Here is a dot-starting line.\r\n'
989 b'\r\n'
990 b'-- Signed by Andr\xc3\xa9.\r\n'
991 ), ascii(data))
992
993 def test_head(self):
994 # HEAD
995 resp, info = self.server.head()
996 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
997 art_num, message_id, lines = info
998 self.assertEqual(art_num, 3000237)
999 self.assertEqual(message_id, "<45223423@example.com>")
1000 self._check_article_head(lines)
1001 # HEAD num
1002 resp, info = self.server.head(3000234)
1003 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
1004 art_num, message_id, lines = info
1005 self.assertEqual(art_num, 3000234)
1006 self.assertEqual(message_id, "<45223423@example.com>")
1007 self._check_article_head(lines)
1008 # HEAD id
1009 resp, info = self.server.head("<45223423@example.com>")
1010 self.assertEqual(resp, "221 0 <45223423@example.com>")
1011 art_num, message_id, lines = info
1012 self.assertEqual(art_num, 0)
1013 self.assertEqual(message_id, "<45223423@example.com>")
1014 self._check_article_head(lines)
1015 # Non-existent id
1016 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1017 self.server.head("<non-existent@example.com>")
1018 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1019
Antoine Pitrou2640b522012-02-15 18:53:18 +01001020 def test_head_file(self):
1021 f = io.BytesIO()
1022 resp, info = self.server.head(file=f)
1023 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1024 art_num, message_id, lines = info
1025 self.assertEqual(art_num, 3000237)
1026 self.assertEqual(message_id, "<45223423@example.com>")
1027 self.assertEqual(lines, [])
1028 data = f.getvalue()
1029 self.assertTrue(data.startswith(
1030 b'From: "Demo User" <nobody@example.net>\r\n'
1031 b'Subject: I am just a test article\r\n'
1032 ), ascii(data))
1033 self.assertFalse(data.endswith(
1034 b'This is just a test article.\r\n'
1035 b'.Here is a dot-starting line.\r\n'
1036 b'\r\n'
1037 b'-- Signed by Andr\xc3\xa9.\r\n'
1038 ), ascii(data))
1039
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001040 def test_body(self):
1041 # BODY
1042 resp, info = self.server.body()
1043 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1044 art_num, message_id, lines = info
1045 self.assertEqual(art_num, 3000237)
1046 self.assertEqual(message_id, "<45223423@example.com>")
1047 self._check_article_body(lines)
1048 # BODY num
1049 resp, info = self.server.body(3000234)
1050 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
1051 art_num, message_id, lines = info
1052 self.assertEqual(art_num, 3000234)
1053 self.assertEqual(message_id, "<45223423@example.com>")
1054 self._check_article_body(lines)
1055 # BODY id
1056 resp, info = self.server.body("<45223423@example.com>")
1057 self.assertEqual(resp, "222 0 <45223423@example.com>")
1058 art_num, message_id, lines = info
1059 self.assertEqual(art_num, 0)
1060 self.assertEqual(message_id, "<45223423@example.com>")
1061 self._check_article_body(lines)
1062 # Non-existent id
1063 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1064 self.server.body("<non-existent@example.com>")
1065 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1066
Antoine Pitrou2640b522012-02-15 18:53:18 +01001067 def test_body_file(self):
1068 f = io.BytesIO()
1069 resp, info = self.server.body(file=f)
1070 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1071 art_num, message_id, lines = info
1072 self.assertEqual(art_num, 3000237)
1073 self.assertEqual(message_id, "<45223423@example.com>")
1074 self.assertEqual(lines, [])
1075 data = f.getvalue()
1076 self.assertFalse(data.startswith(
1077 b'From: "Demo User" <nobody@example.net>\r\n'
1078 b'Subject: I am just a test article\r\n'
1079 ), ascii(data))
1080 self.assertTrue(data.endswith(
1081 b'This is just a test article.\r\n'
1082 b'.Here is a dot-starting line.\r\n'
1083 b'\r\n'
1084 b'-- Signed by Andr\xc3\xa9.\r\n'
1085 ), ascii(data))
1086
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001087 def check_over_xover_resp(self, resp, overviews):
1088 self.assertTrue(resp.startswith("224 "), resp)
1089 self.assertEqual(len(overviews), 3)
1090 art_num, over = overviews[0]
1091 self.assertEqual(art_num, 57)
1092 self.assertEqual(over, {
1093 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
1094 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
1095 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
1096 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
1097 "references": "<hvalf7$ort$1@dough.gmane.org>",
1098 ":bytes": "7103",
1099 ":lines": "16",
1100 "xref": "news.gmane.org gmane.comp.python.authors:57"
1101 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001102 art_num, over = overviews[1]
1103 self.assertEqual(over["xref"], None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001104 art_num, over = overviews[2]
1105 self.assertEqual(over["subject"],
1106 "Re: Message d'erreur incompréhensible (par moi)")
1107
1108 def test_xover(self):
1109 resp, overviews = self.server.xover(57, 59)
1110 self.check_over_xover_resp(resp, overviews)
1111
1112 def test_over(self):
1113 # In NNTP "v1", this will fallback on XOVER
1114 resp, overviews = self.server.over((57, 59))
1115 self.check_over_xover_resp(resp, overviews)
1116
1117 sample_post = (
1118 b'From: "Demo User" <nobody@example.net>\r\n'
1119 b'Subject: I am just a test article\r\n'
1120 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
1121 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
1122 b'\r\n'
1123 b'This is just a test article.\r\n'
1124 b'.Here is a dot-starting line.\r\n'
1125 b'\r\n'
1126 b'-- Signed by Andr\xc3\xa9.\r\n'
1127 )
1128
1129 def _check_posted_body(self):
1130 # Check the raw body as received by the server
1131 lines = self.handler.posted_body
1132 # One additional line for the "." terminator
1133 self.assertEqual(len(lines), 10)
1134 self.assertEqual(lines[-1], b'.\r\n')
1135 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
1136 self.assertEqual(lines[-3], b'\r\n')
1137 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
1138 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
1139
1140 def _check_post_ihave_sub(self, func, *args, file_factory):
1141 # First the prepared post with CRLF endings
1142 post = self.sample_post
1143 func_args = args + (file_factory(post),)
1144 self.handler.posted_body = None
1145 resp = func(*func_args)
1146 self._check_posted_body()
1147 # Then the same post with "normal" line endings - they should be
1148 # converted by NNTP.post and NNTP.ihave.
1149 post = self.sample_post.replace(b"\r\n", b"\n")
1150 func_args = args + (file_factory(post),)
1151 self.handler.posted_body = None
1152 resp = func(*func_args)
1153 self._check_posted_body()
1154 return resp
1155
1156 def check_post_ihave(self, func, success_resp, *args):
1157 # With a bytes object
1158 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1159 self.assertEqual(resp, success_resp)
1160 # With a bytearray object
1161 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1162 self.assertEqual(resp, success_resp)
1163 # With a file object
1164 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1165 self.assertEqual(resp, success_resp)
1166 # With an iterable of terminated lines
1167 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001168 return iter(b.splitlines(keepends=True))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001169 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1170 self.assertEqual(resp, success_resp)
1171 # With an iterable of non-terminated lines
1172 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001173 return iter(b.splitlines(keepends=False))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001174 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1175 self.assertEqual(resp, success_resp)
1176
1177 def test_post(self):
1178 self.check_post_ihave(self.server.post, "240 Article received OK")
1179 self.handler.allow_posting = False
1180 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1181 self.server.post(self.sample_post)
1182 self.assertEqual(cm.exception.response,
1183 "440 Posting not permitted")
1184
1185 def test_ihave(self):
1186 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1187 "<i.am.an.article.you.will.want@example.com>")
1188 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1189 self.server.ihave("<another.message.id>", self.sample_post)
1190 self.assertEqual(cm.exception.response,
1191 "435 Article not wanted")
1192
Georg Brandl28e78412013-10-27 07:29:47 +01001193 def test_too_long_lines(self):
1194 dt = datetime.datetime(2010, 1, 1, 9, 0, 0)
1195 self.assertRaises(nntplib.NNTPDataError,
1196 self.server.newnews, "comp.lang.python", dt)
1197
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001198
1199class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1200 """Tests an NNTP v1 server (no capabilities)."""
1201
1202 nntp_version = 1
1203 handler_class = NNTPv1Handler
1204
1205 def test_caps(self):
1206 caps = self.server.getcapabilities()
1207 self.assertEqual(caps, {})
1208 self.assertEqual(self.server.nntp_version, 1)
Antoine Pitroua0781152010-11-05 19:16:37 +00001209 self.assertEqual(self.server.nntp_implementation, None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001210
1211
1212class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1213 """Tests an NNTP v2 server (with capabilities)."""
1214
1215 nntp_version = 2
1216 handler_class = NNTPv2Handler
1217
1218 def test_caps(self):
1219 caps = self.server.getcapabilities()
1220 self.assertEqual(caps, {
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001221 'VERSION': ['2', '3'],
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001222 'IMPLEMENTATION': ['INN', '2.5.1'],
1223 'AUTHINFO': ['USER'],
1224 'HDR': [],
1225 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1226 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1227 'OVER': [],
1228 'POST': [],
1229 'READER': [],
1230 })
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001231 self.assertEqual(self.server.nntp_version, 3)
Antoine Pitroua0781152010-11-05 19:16:37 +00001232 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001233
1234
Antoine Pitrou54411c12012-02-12 19:14:17 +01001235class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
1236 """Tests a probably NNTP v2 server with capabilities only after login."""
1237
1238 nntp_version = 2
1239 handler_class = CapsAfterLoginNNTPv2Handler
1240
1241 def test_caps_only_after_login(self):
1242 self.assertEqual(self.server._caps, {})
1243 self.server.login('testuser', 'testpw')
1244 self.assertIn('VERSION', self.server._caps)
1245
1246
Antoine Pitrou71135622012-02-14 23:29:34 +01001247class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin,
1248 unittest.TestCase):
1249 """Same tests as for v2 but we tell NTTP to send MODE READER to a server
1250 that isn't in READER mode by default."""
1251
1252 nntp_version = 2
1253 handler_class = ModeSwitchingNNTPv2Handler
1254
1255 def test_we_are_in_reader_mode_after_connect(self):
1256 self.assertIn('READER', self.server._caps)
1257
1258
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001259class MiscTests(unittest.TestCase):
1260
1261 def test_decode_header(self):
1262 def gives(a, b):
1263 self.assertEqual(nntplib.decode_header(a), b)
1264 gives("" , "")
1265 gives("a plain header", "a plain header")
1266 gives(" with extra spaces ", " with extra spaces ")
1267 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "DĂ©buter en Python")
1268 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1269 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1270 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1271 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1272 "Re: problème de matrice")
1273 # A natively utf-8 header (found in the real world!)
1274 gives("Re: Message d'erreur incompréhensible (par moi)",
1275 "Re: Message d'erreur incompréhensible (par moi)")
1276
1277 def test_parse_overview_fmt(self):
1278 # The minimal (default) response
1279 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1280 "References:", ":bytes", ":lines"]
1281 self.assertEqual(nntplib._parse_overview_fmt(lines),
1282 ["subject", "from", "date", "message-id", "references",
1283 ":bytes", ":lines"])
1284 # The minimal response using alternative names
1285 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1286 "References:", "Bytes:", "Lines:"]
1287 self.assertEqual(nntplib._parse_overview_fmt(lines),
1288 ["subject", "from", "date", "message-id", "references",
1289 ":bytes", ":lines"])
1290 # Variations in casing
1291 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1292 "References:", "BYTES:", "Lines:"]
1293 self.assertEqual(nntplib._parse_overview_fmt(lines),
1294 ["subject", "from", "date", "message-id", "references",
1295 ":bytes", ":lines"])
1296 # First example from RFC 3977
1297 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1298 "References:", ":bytes", ":lines", "Xref:full",
1299 "Distribution:full"]
1300 self.assertEqual(nntplib._parse_overview_fmt(lines),
1301 ["subject", "from", "date", "message-id", "references",
1302 ":bytes", ":lines", "xref", "distribution"])
1303 # Second example from RFC 3977
1304 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1305 "References:", "Bytes:", "Lines:", "Xref:FULL",
1306 "Distribution:FULL"]
1307 self.assertEqual(nntplib._parse_overview_fmt(lines),
1308 ["subject", "from", "date", "message-id", "references",
1309 ":bytes", ":lines", "xref", "distribution"])
1310 # A classic response from INN
1311 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1312 "References:", "Bytes:", "Lines:", "Xref:full"]
1313 self.assertEqual(nntplib._parse_overview_fmt(lines),
1314 ["subject", "from", "date", "message-id", "references",
1315 ":bytes", ":lines", "xref"])
1316
1317 def test_parse_overview(self):
1318 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1319 # First example from RFC 3977
1320 lines = [
1321 '3000234\tI am just a test article\t"Demo User" '
1322 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1323 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1324 '17\tXref: news.example.com misc.test:3000363',
1325 ]
1326 overview = nntplib._parse_overview(lines, fmt)
1327 (art_num, fields), = overview
1328 self.assertEqual(art_num, 3000234)
1329 self.assertEqual(fields, {
1330 'subject': 'I am just a test article',
1331 'from': '"Demo User" <nobody@example.com>',
1332 'date': '6 Oct 1998 04:38:40 -0500',
1333 'message-id': '<45223423@example.com>',
1334 'references': '<45454@example.net>',
1335 ':bytes': '1234',
1336 ':lines': '17',
1337 'xref': 'news.example.com misc.test:3000363',
1338 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001339 # Second example; here the "Xref" field is totally absent (including
1340 # the header name) and comes out as None
1341 lines = [
1342 '3000234\tI am just a test article\t"Demo User" '
1343 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1344 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1345 '17\t\t',
1346 ]
1347 overview = nntplib._parse_overview(lines, fmt)
1348 (art_num, fields), = overview
1349 self.assertEqual(fields['xref'], None)
1350 # Third example; the "Xref" is an empty string, while "references"
1351 # is a single space.
1352 lines = [
1353 '3000234\tI am just a test article\t"Demo User" '
1354 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1355 '<45223423@example.com>\t \t1234\t'
1356 '17\tXref: \t',
1357 ]
1358 overview = nntplib._parse_overview(lines, fmt)
1359 (art_num, fields), = overview
1360 self.assertEqual(fields['references'], ' ')
1361 self.assertEqual(fields['xref'], '')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001362
1363 def test_parse_datetime(self):
1364 def gives(a, b, *c):
1365 self.assertEqual(nntplib._parse_datetime(a, b),
1366 datetime.datetime(*c))
1367 # Output of DATE command
1368 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1369 # Variations
1370 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1371 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1372 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1373
1374 def test_unparse_datetime(self):
1375 # Test non-legacy mode
1376 # 1) with a datetime
1377 def gives(y, M, d, h, m, s, date_str, time_str):
1378 dt = datetime.datetime(y, M, d, h, m, s)
1379 self.assertEqual(nntplib._unparse_datetime(dt),
1380 (date_str, time_str))
1381 self.assertEqual(nntplib._unparse_datetime(dt, False),
1382 (date_str, time_str))
1383 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1384 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1385 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1386 # 2) with a date
1387 def gives(y, M, d, date_str, time_str):
1388 dt = datetime.date(y, M, d)
1389 self.assertEqual(nntplib._unparse_datetime(dt),
1390 (date_str, time_str))
1391 self.assertEqual(nntplib._unparse_datetime(dt, False),
1392 (date_str, time_str))
1393 gives(1999, 6, 23, "19990623", "000000")
1394 gives(2000, 6, 23, "20000623", "000000")
1395 gives(2010, 6, 5, "20100605", "000000")
1396
1397 def test_unparse_datetime_legacy(self):
1398 # Test legacy mode (RFC 977)
1399 # 1) with a datetime
1400 def gives(y, M, d, h, m, s, date_str, time_str):
1401 dt = datetime.datetime(y, M, d, h, m, s)
1402 self.assertEqual(nntplib._unparse_datetime(dt, True),
1403 (date_str, time_str))
1404 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1405 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1406 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1407 # 2) with a date
1408 def gives(y, M, d, date_str, time_str):
1409 dt = datetime.date(y, M, d)
1410 self.assertEqual(nntplib._unparse_datetime(dt, True),
1411 (date_str, time_str))
1412 gives(1999, 6, 23, "990623", "000000")
1413 gives(2000, 6, 23, "000623", "000000")
1414 gives(2010, 6, 5, "100605", "000000")
1415
Serhiy Storchaka43767632013-11-03 21:31:38 +02001416 @unittest.skipUnless(ssl, 'requires SSL support')
1417 def test_ssl_support(self):
1418 self.assertTrue(hasattr(nntplib, 'NNTP_SSL'))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001419
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001420
Berker Peksag96756b62014-09-20 08:53:05 +03001421class PublicAPITests(unittest.TestCase):
1422 """Ensures that the correct values are exposed in the public API."""
1423
1424 def test_module_all_attribute(self):
1425 self.assertTrue(hasattr(nntplib, '__all__'))
1426 target_api = ['NNTP', 'NNTPError', 'NNTPReplyError',
1427 'NNTPTemporaryError', 'NNTPPermanentError',
1428 'NNTPProtocolError', 'NNTPDataError', 'decode_header']
1429 if ssl is not None:
1430 target_api.append('NNTP_SSL')
1431 self.assertEqual(set(nntplib.__all__), set(target_api))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001432
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001433class MockSocketTests(unittest.TestCase):
1434 """Tests involving a mock socket object
1435
1436 Used where the _NNTPServerIO file object is not enough."""
1437
1438 nntp_class = nntplib.NNTP
1439
1440 def check_constructor_error_conditions(
1441 self, handler_class,
1442 expected_error_type, expected_error_msg,
1443 login=None, password=None):
1444
1445 class mock_socket_module:
1446 def create_connection(address, timeout):
1447 return MockSocket()
1448
1449 class MockSocket:
1450 def close(self):
1451 nonlocal socket_closed
1452 socket_closed = True
1453
1454 def makefile(socket, mode):
1455 handler = handler_class()
1456 _, file = make_mock_file(handler)
1457 files.append(file)
1458 return file
1459
1460 socket_closed = False
1461 files = []
1462 with patch('nntplib.socket', mock_socket_module), \
1463 self.assertRaisesRegex(expected_error_type, expected_error_msg):
1464 self.nntp_class('dummy', user=login, password=password)
1465 self.assertTrue(socket_closed)
1466 for f in files:
1467 self.assertTrue(f.closed)
1468
1469 def test_bad_welcome(self):
1470 #Test a bad welcome message
1471 class Handler(NNTPv1Handler):
1472 welcome = 'Bad Welcome'
1473 self.check_constructor_error_conditions(
1474 Handler, nntplib.NNTPProtocolError, Handler.welcome)
1475
1476 def test_service_temporarily_unavailable(self):
1477 #Test service temporarily unavailable
1478 class Handler(NNTPv1Handler):
Martin Pantereb995702016-07-28 01:11:04 +00001479 welcome = '400 Service temporarily unavailable'
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001480 self.check_constructor_error_conditions(
1481 Handler, nntplib.NNTPTemporaryError, Handler.welcome)
1482
1483 def test_service_permanently_unavailable(self):
1484 #Test service permanently unavailable
1485 class Handler(NNTPv1Handler):
Martin Pantereb995702016-07-28 01:11:04 +00001486 welcome = '502 Service permanently unavailable'
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001487 self.check_constructor_error_conditions(
1488 Handler, nntplib.NNTPPermanentError, Handler.welcome)
1489
1490 def test_bad_capabilities(self):
1491 #Test a bad capabilities response
1492 class Handler(NNTPv1Handler):
1493 def handle_CAPABILITIES(self):
1494 self.push_lit(capabilities_response)
1495 capabilities_response = '201 bad capability'
1496 self.check_constructor_error_conditions(
1497 Handler, nntplib.NNTPReplyError, capabilities_response)
1498
1499 def test_login_aborted(self):
1500 #Test a bad authinfo response
1501 login = 't@e.com'
1502 password = 'python'
1503 class Handler(NNTPv1Handler):
1504 def handle_AUTHINFO(self, *args):
1505 self.push_lit(authinfo_response)
1506 authinfo_response = '503 Mechanism not recognized'
1507 self.check_constructor_error_conditions(
1508 Handler, nntplib.NNTPPermanentError, authinfo_response,
1509 login, password)
1510
Serhiy Storchaka80774342015-04-03 15:02:20 +03001511class bypass_context:
1512 """Bypass encryption and actual SSL module"""
1513 def wrap_socket(sock, **args):
1514 return sock
1515
1516@unittest.skipUnless(ssl, 'requires SSL support')
1517class MockSslTests(MockSocketTests):
1518 @staticmethod
1519 def nntp_class(*pos, **kw):
1520 return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw)
Victor Stinner8c9bba02015-04-03 11:06:40 +02001521
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001522
Martin Panter8f19e8e2016-01-19 01:10:58 +00001523class LocalServerTests(unittest.TestCase):
1524 def setUp(self):
1525 sock = socket.socket()
1526 port = support.bind_port(sock)
1527 sock.listen()
1528 self.background = threading.Thread(
1529 target=self.run_server, args=(sock,))
1530 self.background.start()
1531 self.addCleanup(self.background.join)
1532
1533 self.nntp = NNTP(support.HOST, port, usenetrc=False).__enter__()
1534 self.addCleanup(self.nntp.__exit__, None, None, None)
1535
1536 def run_server(self, sock):
1537 # Could be generalized to handle more commands in separate methods
1538 with sock:
1539 [client, _] = sock.accept()
1540 with contextlib.ExitStack() as cleanup:
1541 cleanup.enter_context(client)
1542 reader = cleanup.enter_context(client.makefile('rb'))
1543 client.sendall(b'200 Server ready\r\n')
1544 while True:
1545 cmd = reader.readline()
1546 if cmd == b'CAPABILITIES\r\n':
1547 client.sendall(
1548 b'101 Capability list:\r\n'
1549 b'VERSION 2\r\n'
1550 b'STARTTLS\r\n'
1551 b'.\r\n'
1552 )
1553 elif cmd == b'STARTTLS\r\n':
1554 reader.close()
1555 client.sendall(b'382 Begin TLS negotiation now\r\n')
Christian Heimesd0486372016-09-10 23:23:33 +02001556 context = ssl.SSLContext()
1557 context.load_cert_chain(certfile)
1558 client = context.wrap_socket(
1559 client, server_side=True)
Martin Panter8f19e8e2016-01-19 01:10:58 +00001560 cleanup.enter_context(client)
1561 reader = cleanup.enter_context(client.makefile('rb'))
1562 elif cmd == b'QUIT\r\n':
1563 client.sendall(b'205 Bye!\r\n')
1564 break
1565 else:
1566 raise ValueError('Unexpected command {!r}'.format(cmd))
1567
1568 @unittest.skipUnless(ssl, 'requires SSL support')
1569 def test_starttls(self):
1570 file = self.nntp.file
1571 sock = self.nntp.sock
1572 self.nntp.starttls()
1573 # Check that the socket and internal pseudo-file really were
1574 # changed.
1575 self.assertNotEqual(file, self.nntp.file)
1576 self.assertNotEqual(sock, self.nntp.sock)
1577 # Check that the new socket really is an SSL one
1578 self.assertIsInstance(self.nntp.sock, ssl.SSLSocket)
1579 # Check that trying starttls when it's already active fails.
1580 self.assertRaises(ValueError, self.nntp.starttls)
1581
Serhiy Storchaka52027c32015-03-21 09:40:26 +02001582
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001583if __name__ == "__main__":
Berker Peksag96756b62014-09-20 08:53:05 +03001584 unittest.main()