blob: 71a4ec022b149e2a863b0925087309bde7863042 [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +00002import socket
Antoine Pitrou69ab9512010-09-29 15:03:40 +00003import datetime
4import textwrap
5import unittest
Antoine Pitroude609182010-11-18 17:29:23 +00006import functools
Antoine Pitrou69ab9512010-09-29 15:03:40 +00007import contextlib
8from test import support
Serhiy Storchaka43767632013-11-03 21:31:38 +02009from nntplib import NNTP, GroupInfo
Antoine Pitrou69ab9512010-09-29 15:03:40 +000010import nntplib
Serhiy Storchaka43767632013-11-03 21:31:38 +020011try:
Antoine Pitrou1cb121e2010-11-09 18:54:37 +000012 import ssl
Serhiy Storchaka43767632013-11-03 21:31:38 +020013except ImportError:
14 ssl = None
Antoine Pitrou69ab9512010-09-29 15:03:40 +000015
16TIMEOUT = 30
17
18# TODO:
19# - test the `file` arg to more commands
20# - test error conditions
Antoine Pitroua5785b12010-09-29 16:19:50 +000021# - test auth and `usenetrc`
Antoine Pitrou69ab9512010-09-29 15:03:40 +000022
23
24class NetworkedNNTPTestsMixin:
25
26 def test_welcome(self):
27 welcome = self.server.getwelcome()
28 self.assertEqual(str, type(welcome))
29
30 def test_help(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000031 resp, lines = self.server.help()
Antoine Pitrou69ab9512010-09-29 15:03:40 +000032 self.assertTrue(resp.startswith("100 "), resp)
Antoine Pitrou08eeada2010-11-04 21:36:15 +000033 for line in lines:
Antoine Pitrou69ab9512010-09-29 15:03:40 +000034 self.assertEqual(str, type(line))
35
36 def test_list(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000037 resp, groups = self.server.list()
38 if len(groups) > 0:
39 self.assertEqual(GroupInfo, type(groups[0]))
40 self.assertEqual(str, type(groups[0].group))
41
42 def test_list_active(self):
43 resp, groups = self.server.list(self.GROUP_PAT)
44 if len(groups) > 0:
45 self.assertEqual(GroupInfo, type(groups[0]))
46 self.assertEqual(str, type(groups[0].group))
Antoine Pitrou69ab9512010-09-29 15:03:40 +000047
48 def test_unknown_command(self):
49 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
50 self.server._shortcmd("XYZZY")
51 resp = cm.exception.response
52 self.assertTrue(resp.startswith("500 "), resp)
53
54 def test_newgroups(self):
55 # gmane gets a constant influx of new groups. In order not to stress
56 # the server too much, we choose a recent date in the past.
57 dt = datetime.date.today() - datetime.timedelta(days=7)
58 resp, groups = self.server.newgroups(dt)
59 if len(groups) > 0:
60 self.assertIsInstance(groups[0], GroupInfo)
61 self.assertIsInstance(groups[0].group, str)
62
63 def test_description(self):
64 def _check_desc(desc):
65 # Sanity checks
66 self.assertIsInstance(desc, str)
67 self.assertNotIn(self.GROUP_NAME, desc)
68 desc = self.server.description(self.GROUP_NAME)
69 _check_desc(desc)
70 # Another sanity check
71 self.assertIn("Python", desc)
72 # With a pattern
73 desc = self.server.description(self.GROUP_PAT)
74 _check_desc(desc)
75 # Shouldn't exist
76 desc = self.server.description("zk.brrtt.baz")
77 self.assertEqual(desc, '')
78
79 def test_descriptions(self):
80 resp, descs = self.server.descriptions(self.GROUP_PAT)
81 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
82 self.assertTrue(
83 resp.startswith("215 ") or resp.startswith("282 "), resp)
84 self.assertIsInstance(descs, dict)
85 desc = descs[self.GROUP_NAME]
86 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
87
88 def test_group(self):
89 result = self.server.group(self.GROUP_NAME)
90 self.assertEqual(5, len(result))
91 resp, count, first, last, group = result
92 self.assertEqual(group, self.GROUP_NAME)
93 self.assertIsInstance(count, int)
94 self.assertIsInstance(first, int)
95 self.assertIsInstance(last, int)
96 self.assertLessEqual(first, last)
97 self.assertTrue(resp.startswith("211 "), resp)
98
99 def test_date(self):
100 resp, date = self.server.date()
101 self.assertIsInstance(date, datetime.datetime)
102 # Sanity check
103 self.assertGreaterEqual(date.year, 1995)
104 self.assertLessEqual(date.year, 2030)
105
106 def _check_art_dict(self, art_dict):
107 # Some sanity checks for a field dictionary returned by OVER / XOVER
108 self.assertIsInstance(art_dict, dict)
109 # NNTP has 7 mandatory fields
110 self.assertGreaterEqual(art_dict.keys(),
111 {"subject", "from", "date", "message-id",
112 "references", ":bytes", ":lines"}
113 )
114 for v in art_dict.values():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000115 self.assertIsInstance(v, (str, type(None)))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000116
117 def test_xover(self):
118 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000119 resp, lines = self.server.xover(last - 5, last)
120 if len(lines) == 0:
121 self.skipTest("no articles retrieved")
122 # The 'last' article is not necessarily part of the output (cancelled?)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000123 art_num, art_dict = lines[0]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000124 self.assertGreaterEqual(art_num, last - 5)
125 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000126 self._check_art_dict(art_dict)
127
128 def test_over(self):
129 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
130 start = last - 10
131 # The "start-" article range form
132 resp, lines = self.server.over((start, None))
133 art_num, art_dict = lines[0]
134 self._check_art_dict(art_dict)
135 # The "start-end" article range form
136 resp, lines = self.server.over((start, last))
137 art_num, art_dict = lines[-1]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000138 # The 'last' article is not necessarily part of the output (cancelled?)
139 self.assertGreaterEqual(art_num, start)
140 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000141 self._check_art_dict(art_dict)
142 # XXX The "message_id" form is unsupported by gmane
143 # 503 Overview by message-ID unsupported
144
145 def test_xhdr(self):
146 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
147 resp, lines = self.server.xhdr('subject', last)
148 for line in lines:
149 self.assertEqual(str, type(line[1]))
150
151 def check_article_resp(self, resp, article, art_num=None):
152 self.assertIsInstance(article, nntplib.ArticleInfo)
153 if art_num is not None:
154 self.assertEqual(article.number, art_num)
155 for line in article.lines:
156 self.assertIsInstance(line, bytes)
157 # XXX this could exceptionally happen...
158 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
159
160 def test_article_head_body(self):
161 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000162 # Try to find an available article
163 for art_num in (last, first, last - 1):
164 try:
165 resp, head = self.server.head(art_num)
166 except nntplib.NNTPTemporaryError as e:
167 if not e.response.startswith("423 "):
168 raise
169 # "423 No such article" => choose another one
170 continue
171 break
172 else:
173 self.skipTest("could not find a suitable article number")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000174 self.assertTrue(resp.startswith("221 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000175 self.check_article_resp(resp, head, art_num)
176 resp, body = self.server.body(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000177 self.assertTrue(resp.startswith("222 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000178 self.check_article_resp(resp, body, art_num)
179 resp, article = self.server.article(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000180 self.assertTrue(resp.startswith("220 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000181 self.check_article_resp(resp, article, art_num)
Nick Coghlan14d99a12012-06-17 21:27:18 +1000182 # Tolerate running the tests from behind a NNTP virus checker
Antoine Pitrou1f5d2a02012-06-24 16:28:18 +0200183 blacklist = lambda line: line.startswith(b'X-Antivirus')
184 filtered_head_lines = [line for line in head.lines
185 if not blacklist(line)]
Nick Coghlan14d99a12012-06-17 21:27:18 +1000186 filtered_lines = [line for line in article.lines
Antoine Pitrou1f5d2a02012-06-24 16:28:18 +0200187 if not blacklist(line)]
188 self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000189
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000190 def test_capabilities(self):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000191 # The server under test implements NNTP version 2 and has a
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000192 # couple of well-known capabilities. Just sanity check that we
193 # got them.
194 def _check_caps(caps):
195 caps_list = caps['LIST']
196 self.assertIsInstance(caps_list, (list, tuple))
197 self.assertIn('OVERVIEW.FMT', caps_list)
198 self.assertGreaterEqual(self.server.nntp_version, 2)
199 _check_caps(self.server.getcapabilities())
200 # This re-emits the command
201 resp, caps = self.server.capabilities()
202 _check_caps(caps)
203
Serhiy Storchaka43767632013-11-03 21:31:38 +0200204 @unittest.skipUnless(ssl, 'requires SSL support')
205 def test_starttls(self):
206 file = self.server.file
207 sock = self.server.sock
208 try:
209 self.server.starttls()
210 except nntplib.NNTPPermanentError:
211 self.skipTest("STARTTLS not supported by server.")
212 else:
213 # Check that the socket and internal pseudo-file really were
214 # changed.
215 self.assertNotEqual(file, self.server.file)
216 self.assertNotEqual(sock, self.server.sock)
217 # Check that the new socket really is an SSL one
218 self.assertIsInstance(self.server.sock, ssl.SSLSocket)
219 # Check that trying starttls when it's already active fails.
220 self.assertRaises(ValueError, self.server.starttls)
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000221
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000222 def test_zlogin(self):
223 # This test must be the penultimate because further commands will be
224 # refused.
225 baduser = "notarealuser"
226 badpw = "notarealpassword"
227 # Check that bogus credentials cause failure
228 self.assertRaises(nntplib.NNTPError, self.server.login,
229 user=baduser, password=badpw, usenetrc=False)
230 # FIXME: We should check that correct credentials succeed, but that
231 # would require valid details for some server somewhere to be in the
232 # test suite, I think. Gmane is anonymous, at least as used for the
233 # other tests.
234
235 def test_zzquit(self):
236 # This test must be called last, hence the name
237 cls = type(self)
Antoine Pitrou3bce11c2010-11-21 17:14:19 +0000238 try:
239 self.server.quit()
240 finally:
241 cls.server = None
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000242
Antoine Pitroude609182010-11-18 17:29:23 +0000243 @classmethod
244 def wrap_methods(cls):
245 # Wrap all methods in a transient_internet() exception catcher
246 # XXX put a generic version in test.support?
247 def wrap_meth(meth):
248 @functools.wraps(meth)
249 def wrapped(self):
250 with support.transient_internet(self.NNTP_HOST):
251 meth(self)
252 return wrapped
253 for name in dir(cls):
254 if not name.startswith('test_'):
255 continue
256 meth = getattr(cls, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200257 if not callable(meth):
Antoine Pitroude609182010-11-18 17:29:23 +0000258 continue
259 # Need to use a closure so that meth remains bound to its current
260 # value
261 setattr(cls, name, wrap_meth(meth))
262
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000263 def test_with_statement(self):
264 def is_connected():
265 if not hasattr(server, 'file'):
266 return False
267 try:
268 server.help()
Andrew Svetlov0832af62012-12-18 23:10:48 +0200269 except (OSError, EOFError):
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000270 return False
271 return True
272
273 with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
274 self.assertTrue(is_connected())
275 self.assertTrue(server.help())
276 self.assertFalse(is_connected())
277
278 with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
279 server.quit()
280 self.assertFalse(is_connected())
281
282
Antoine Pitroude609182010-11-18 17:29:23 +0000283NetworkedNNTPTestsMixin.wrap_methods()
284
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000285
286class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
287 # This server supports STARTTLS (gmane doesn't)
288 NNTP_HOST = 'news.trigofacile.com'
289 GROUP_NAME = 'fr.comp.lang.python'
290 GROUP_PAT = 'fr.comp.lang.*'
291
Antoine Pitroude609182010-11-18 17:29:23 +0000292 NNTP_CLASS = NNTP
293
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000294 @classmethod
295 def setUpClass(cls):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000296 support.requires("network")
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000297 with support.transient_internet(cls.NNTP_HOST):
Antoine Pitroude609182010-11-18 17:29:23 +0000298 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, timeout=TIMEOUT, usenetrc=False)
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000299
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000300 @classmethod
301 def tearDownClass(cls):
302 if cls.server is not None:
303 cls.server.quit()
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000304
Serhiy Storchaka43767632013-11-03 21:31:38 +0200305@unittest.skipUnless(ssl, 'requires SSL support')
306class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000307
Serhiy Storchaka43767632013-11-03 21:31:38 +0200308 # Technical limits for this public NNTP server (see http://www.aioe.org):
309 # "Only two concurrent connections per IP address are allowed and
310 # 400 connections per day are accepted from each IP address."
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000311
Serhiy Storchaka43767632013-11-03 21:31:38 +0200312 NNTP_HOST = 'nntp.aioe.org'
313 GROUP_NAME = 'comp.lang.python'
314 GROUP_PAT = 'comp.lang.*'
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000315
Serhiy Storchaka43767632013-11-03 21:31:38 +0200316 NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None)
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000317
Serhiy Storchaka43767632013-11-03 21:31:38 +0200318 # Disabled as it produces too much data
319 test_list = None
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000320
Serhiy Storchaka43767632013-11-03 21:31:38 +0200321 # Disabled as the connection will already be encrypted.
322 test_starttls = None
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000323
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000324
325#
326# Non-networked tests using a local server (or something mocking it).
327#
328
329class _NNTPServerIO(io.RawIOBase):
330 """A raw IO object allowing NNTP commands to be received and processed
331 by a handler. The handler can push responses which can then be read
332 from the IO object."""
333
334 def __init__(self, handler):
335 io.RawIOBase.__init__(self)
336 # The channel from the client
337 self.c2s = io.BytesIO()
338 # The channel to the client
339 self.s2c = io.BytesIO()
340 self.handler = handler
341 self.handler.start(self.c2s.readline, self.push_data)
342
343 def readable(self):
344 return True
345
346 def writable(self):
347 return True
348
349 def push_data(self, data):
350 """Push (buffer) some data to send to the client."""
351 pos = self.s2c.tell()
352 self.s2c.seek(0, 2)
353 self.s2c.write(data)
354 self.s2c.seek(pos)
355
356 def write(self, b):
357 """The client sends us some data"""
358 pos = self.c2s.tell()
359 self.c2s.write(b)
360 self.c2s.seek(pos)
361 self.handler.process_pending()
362 return len(b)
363
364 def readinto(self, buf):
365 """The client wants to read a response"""
366 self.handler.process_pending()
367 b = self.s2c.read(len(buf))
368 n = len(b)
369 buf[:n] = b
370 return n
371
372
373class MockedNNTPTestsMixin:
374 # Override in derived classes
375 handler_class = None
376
377 def setUp(self):
378 super().setUp()
379 self.make_server()
380
381 def tearDown(self):
382 super().tearDown()
383 del self.server
384
385 def make_server(self, *args, **kwargs):
386 self.handler = self.handler_class()
387 self.sio = _NNTPServerIO(self.handler)
388 # Using BufferedRWPair instead of BufferedRandom ensures the file
389 # isn't seekable.
390 file = io.BufferedRWPair(self.sio, self.sio)
Antoine Pitroua5785b12010-09-29 16:19:50 +0000391 self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000392 return self.server
393
394
Antoine Pitrou71135622012-02-14 23:29:34 +0100395class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin):
396 def setUp(self):
397 super().setUp()
398 self.make_server(readermode=True)
399
400
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000401class NNTPv1Handler:
402 """A handler for RFC 977"""
403
404 welcome = "200 NNTP mock server"
405
406 def start(self, readline, push_data):
407 self.in_body = False
408 self.allow_posting = True
409 self._readline = readline
410 self._push_data = push_data
Antoine Pitrou54411c12012-02-12 19:14:17 +0100411 self._logged_in = False
412 self._user_sent = False
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000413 # Our welcome
414 self.handle_welcome()
415
416 def _decode(self, data):
417 return str(data, "utf-8", "surrogateescape")
418
419 def process_pending(self):
420 if self.in_body:
421 while True:
422 line = self._readline()
423 if not line:
424 return
425 self.body.append(line)
426 if line == b".\r\n":
427 break
428 try:
429 meth, tokens = self.body_callback
430 meth(*tokens, body=self.body)
431 finally:
432 self.body_callback = None
433 self.body = None
434 self.in_body = False
435 while True:
436 line = self._decode(self._readline())
437 if not line:
438 return
439 if not line.endswith("\r\n"):
440 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
441 line = line[:-2]
442 cmd, *tokens = line.split()
443 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
444 meth = getattr(self, "handle_" + cmd.upper(), None)
445 if meth is None:
446 self.handle_unknown()
447 else:
448 try:
449 meth(*tokens)
450 except Exception as e:
451 raise ValueError("command failed: {!r}".format(line)) from e
452 else:
453 if self.in_body:
454 self.body_callback = meth, tokens
455 self.body = []
456
457 def expect_body(self):
458 """Flag that the client is expected to post a request body"""
459 self.in_body = True
460
461 def push_data(self, data):
462 """Push some binary data"""
463 self._push_data(data)
464
465 def push_lit(self, lit):
466 """Push a string literal"""
467 lit = textwrap.dedent(lit)
468 lit = "\r\n".join(lit.splitlines()) + "\r\n"
469 lit = lit.encode('utf-8')
470 self.push_data(lit)
471
472 def handle_unknown(self):
473 self.push_lit("500 What?")
474
475 def handle_welcome(self):
476 self.push_lit(self.welcome)
477
478 def handle_QUIT(self):
479 self.push_lit("205 Bye!")
480
481 def handle_DATE(self):
482 self.push_lit("111 20100914001155")
483
484 def handle_GROUP(self, group):
485 if group == "fr.comp.lang.python":
486 self.push_lit("211 486 761 1265 fr.comp.lang.python")
487 else:
488 self.push_lit("411 No such group {}".format(group))
489
490 def handle_HELP(self):
491 self.push_lit("""\
492 100 Legal commands
493 authinfo user Name|pass Password|generic <prog> <args>
494 date
495 help
496 Report problems to <root@example.org>
497 .""")
498
499 def handle_STAT(self, message_spec=None):
500 if message_spec is None:
501 self.push_lit("412 No newsgroup selected")
502 elif message_spec == "3000234":
503 self.push_lit("223 3000234 <45223423@example.com>")
504 elif message_spec == "<45223423@example.com>":
505 self.push_lit("223 0 <45223423@example.com>")
506 else:
507 self.push_lit("430 No Such Article Found")
508
509 def handle_NEXT(self):
510 self.push_lit("223 3000237 <668929@example.org> retrieved")
511
512 def handle_LAST(self):
513 self.push_lit("223 3000234 <45223423@example.com> retrieved")
514
515 def handle_LIST(self, action=None, param=None):
516 if action is None:
517 self.push_lit("""\
518 215 Newsgroups in form "group high low flags".
519 comp.lang.python 0000052340 0000002828 y
520 comp.lang.python.announce 0000001153 0000000993 m
521 free.it.comp.lang.python 0000000002 0000000002 y
522 fr.comp.lang.python 0000001254 0000000760 y
523 free.it.comp.lang.python.learner 0000000000 0000000001 y
524 tw.bbs.comp.lang.python 0000000304 0000000304 y
525 .""")
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000526 elif action == "ACTIVE":
527 if param == "*distutils*":
528 self.push_lit("""\
529 215 Newsgroups in form "group high low flags"
530 gmane.comp.python.distutils.devel 0000014104 0000000001 m
531 gmane.comp.python.distutils.cvs 0000000000 0000000001 m
532 .""")
533 else:
534 self.push_lit("""\
535 215 Newsgroups in form "group high low flags"
536 .""")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000537 elif action == "OVERVIEW.FMT":
538 self.push_lit("""\
539 215 Order of fields in overview database.
540 Subject:
541 From:
542 Date:
543 Message-ID:
544 References:
545 Bytes:
546 Lines:
547 Xref:full
548 .""")
549 elif action == "NEWSGROUPS":
550 assert param is not None
551 if param == "comp.lang.python":
552 self.push_lit("""\
553 215 Descriptions in form "group description".
554 comp.lang.python\tThe Python computer language.
555 .""")
556 elif param == "comp.lang.python*":
557 self.push_lit("""\
558 215 Descriptions in form "group description".
559 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
560 comp.lang.python\tThe Python computer language.
561 .""")
562 else:
563 self.push_lit("""\
564 215 Descriptions in form "group description".
565 .""")
566 else:
567 self.push_lit('501 Unknown LIST keyword')
568
569 def handle_NEWNEWS(self, group, date_str, time_str):
570 # We hard code different return messages depending on passed
571 # argument and date syntax.
572 if (group == "comp.lang.python" and date_str == "20100913"
573 and time_str == "082004"):
574 # Date was passed in RFC 3977 format (NNTP "v2")
575 self.push_lit("""\
576 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
577 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
578 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
579 .""")
580 elif (group == "comp.lang.python" and date_str == "100913"
581 and time_str == "082004"):
582 # Date was passed in RFC 977 format (NNTP "v1")
583 self.push_lit("""\
584 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
585 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
586 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
587 .""")
Georg Brandl28e78412013-10-27 07:29:47 +0100588 elif (group == 'comp.lang.python' and
589 date_str in ('20100101', '100101') and
590 time_str == '090000'):
591 self.push_lit('too long line' * 3000 +
592 '\n.')
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000593 else:
594 self.push_lit("""\
595 230 An empty list of newsarticles follows
596 .""")
597 # (Note for experiments: many servers disable NEWNEWS.
598 # As of this writing, sicinfo3.epfl.ch doesn't.)
599
600 def handle_XOVER(self, message_spec):
601 if message_spec == "57-59":
602 self.push_lit(
603 "224 Overview information for 57-58 follows\n"
604 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
605 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
606 "\tSat, 19 Jun 2010 18:04:08 -0400"
607 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
608 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
609 "\tXref: news.gmane.org gmane.comp.python.authors:57"
610 "\n"
611 "58\tLooking for a few good bloggers"
612 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
613 "\tThu, 22 Jul 2010 09:14:14 -0400"
614 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
615 "\t\t6683\t16"
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000616 "\t"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000617 "\n"
618 # An UTF-8 overview line from fr.comp.lang.python
619 "59\tRe: Message d'erreur incompréhensible (par moi)"
620 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
621 "\tWed, 15 Sep 2010 18:09:15 +0200"
622 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
623 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
624 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
625 "\n"
626 ".\n")
627 else:
628 self.push_lit("""\
629 224 No articles
630 .""")
631
632 def handle_POST(self, *, body=None):
633 if body is None:
634 if self.allow_posting:
635 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
636 self.expect_body()
637 else:
638 self.push_lit("440 Posting not permitted")
639 else:
640 assert self.allow_posting
641 self.push_lit("240 Article received OK")
642 self.posted_body = body
643
644 def handle_IHAVE(self, message_id, *, body=None):
645 if body is None:
646 if (self.allow_posting and
647 message_id == "<i.am.an.article.you.will.want@example.com>"):
648 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
649 self.expect_body()
650 else:
651 self.push_lit("435 Article not wanted")
652 else:
653 assert self.allow_posting
654 self.push_lit("235 Article transferred OK")
655 self.posted_body = body
656
657 sample_head = """\
658 From: "Demo User" <nobody@example.net>
659 Subject: I am just a test article
660 Content-Type: text/plain; charset=UTF-8; format=flowed
661 Message-ID: <i.am.an.article.you.will.want@example.com>"""
662
663 sample_body = """\
664 This is just a test article.
665 ..Here is a dot-starting line.
666
667 -- Signed by Andr\xe9."""
668
669 sample_article = sample_head + "\n\n" + sample_body
670
671 def handle_ARTICLE(self, message_spec=None):
672 if message_spec is None:
673 self.push_lit("220 3000237 <45223423@example.com>")
674 elif message_spec == "<45223423@example.com>":
675 self.push_lit("220 0 <45223423@example.com>")
676 elif message_spec == "3000234":
677 self.push_lit("220 3000234 <45223423@example.com>")
678 else:
679 self.push_lit("430 No Such Article Found")
680 return
681 self.push_lit(self.sample_article)
682 self.push_lit(".")
683
684 def handle_HEAD(self, message_spec=None):
685 if message_spec is None:
686 self.push_lit("221 3000237 <45223423@example.com>")
687 elif message_spec == "<45223423@example.com>":
688 self.push_lit("221 0 <45223423@example.com>")
689 elif message_spec == "3000234":
690 self.push_lit("221 3000234 <45223423@example.com>")
691 else:
692 self.push_lit("430 No Such Article Found")
693 return
694 self.push_lit(self.sample_head)
695 self.push_lit(".")
696
697 def handle_BODY(self, message_spec=None):
698 if message_spec is None:
699 self.push_lit("222 3000237 <45223423@example.com>")
700 elif message_spec == "<45223423@example.com>":
701 self.push_lit("222 0 <45223423@example.com>")
702 elif message_spec == "3000234":
703 self.push_lit("222 3000234 <45223423@example.com>")
704 else:
705 self.push_lit("430 No Such Article Found")
706 return
707 self.push_lit(self.sample_body)
708 self.push_lit(".")
709
Antoine Pitrou54411c12012-02-12 19:14:17 +0100710 def handle_AUTHINFO(self, cred_type, data):
711 if self._logged_in:
712 self.push_lit('502 Already Logged In')
713 elif cred_type == 'user':
714 if self._user_sent:
715 self.push_lit('482 User Credential Already Sent')
716 else:
717 self.push_lit('381 Password Required')
718 self._user_sent = True
719 elif cred_type == 'pass':
720 self.push_lit('281 Login Successful')
721 self._logged_in = True
722 else:
723 raise Exception('Unknown cred type {}'.format(cred_type))
724
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000725
726class NNTPv2Handler(NNTPv1Handler):
727 """A handler for RFC 3977 (NNTP "v2")"""
728
729 def handle_CAPABILITIES(self):
Antoine Pitrou54411c12012-02-12 19:14:17 +0100730 fmt = """\
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000731 101 Capability list:
Antoine Pitrouf80b3f72010-11-02 22:31:52 +0000732 VERSION 2 3
Antoine Pitrou54411c12012-02-12 19:14:17 +0100733 IMPLEMENTATION INN 2.5.1{}
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000734 HDR
735 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
736 OVER
737 POST
738 READER
Antoine Pitrou54411c12012-02-12 19:14:17 +0100739 ."""
740
741 if not self._logged_in:
742 self.push_lit(fmt.format('\n AUTHINFO USER'))
743 else:
744 self.push_lit(fmt.format(''))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000745
Antoine Pitrou71135622012-02-14 23:29:34 +0100746 def handle_MODE(self, _):
747 raise Exception('MODE READER sent despite READER has been advertised')
748
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000749 def handle_OVER(self, message_spec=None):
750 return self.handle_XOVER(message_spec)
751
752
Antoine Pitrou54411c12012-02-12 19:14:17 +0100753class CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
754 """A handler that allows CAPABILITIES only after login"""
755
756 def handle_CAPABILITIES(self):
757 if not self._logged_in:
758 self.push_lit('480 You must log in.')
759 else:
760 super().handle_CAPABILITIES()
761
762
Antoine Pitrou71135622012-02-14 23:29:34 +0100763class ModeSwitchingNNTPv2Handler(NNTPv2Handler):
764 """A server that starts in transit mode"""
765
766 def __init__(self):
767 self._switched = False
768
769 def handle_CAPABILITIES(self):
770 fmt = """\
771 101 Capability list:
772 VERSION 2 3
773 IMPLEMENTATION INN 2.5.1
774 HDR
775 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
776 OVER
777 POST
778 {}READER
779 ."""
780 if self._switched:
781 self.push_lit(fmt.format(''))
782 else:
783 self.push_lit(fmt.format('MODE-'))
784
785 def handle_MODE(self, what):
786 assert not self._switched and what == 'reader'
787 self._switched = True
788 self.push_lit('200 Posting allowed')
789
790
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000791class NNTPv1v2TestsMixin:
792
793 def setUp(self):
794 super().setUp()
795
796 def test_welcome(self):
797 self.assertEqual(self.server.welcome, self.handler.welcome)
798
Antoine Pitrou54411c12012-02-12 19:14:17 +0100799 def test_authinfo(self):
800 if self.nntp_version == 2:
801 self.assertIn('AUTHINFO', self.server._caps)
802 self.server.login('testuser', 'testpw')
803 # if AUTHINFO is gone from _caps we also know that getcapabilities()
804 # has been called after login as it should
805 self.assertNotIn('AUTHINFO', self.server._caps)
806
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000807 def test_date(self):
808 resp, date = self.server.date()
809 self.assertEqual(resp, "111 20100914001155")
810 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
811
812 def test_quit(self):
813 self.assertFalse(self.sio.closed)
814 resp = self.server.quit()
815 self.assertEqual(resp, "205 Bye!")
816 self.assertTrue(self.sio.closed)
817
818 def test_help(self):
819 resp, help = self.server.help()
820 self.assertEqual(resp, "100 Legal commands")
821 self.assertEqual(help, [
822 ' authinfo user Name|pass Password|generic <prog> <args>',
823 ' date',
824 ' help',
825 'Report problems to <root@example.org>',
826 ])
827
828 def test_list(self):
829 resp, groups = self.server.list()
830 self.assertEqual(len(groups), 6)
831 g = groups[1]
832 self.assertEqual(g,
833 GroupInfo("comp.lang.python.announce", "0000001153",
834 "0000000993", "m"))
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000835 resp, groups = self.server.list("*distutils*")
836 self.assertEqual(len(groups), 2)
837 g = groups[0]
838 self.assertEqual(g,
839 GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
840 "0000000001", "m"))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000841
842 def test_stat(self):
843 resp, art_num, message_id = self.server.stat(3000234)
844 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
845 self.assertEqual(art_num, 3000234)
846 self.assertEqual(message_id, "<45223423@example.com>")
847 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
848 self.assertEqual(resp, "223 0 <45223423@example.com>")
849 self.assertEqual(art_num, 0)
850 self.assertEqual(message_id, "<45223423@example.com>")
851 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
852 self.server.stat("<non.existent.id>")
853 self.assertEqual(cm.exception.response, "430 No Such Article Found")
854 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
855 self.server.stat()
856 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
857
858 def test_next(self):
859 resp, art_num, message_id = self.server.next()
860 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
861 self.assertEqual(art_num, 3000237)
862 self.assertEqual(message_id, "<668929@example.org>")
863
864 def test_last(self):
865 resp, art_num, message_id = self.server.last()
866 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
867 self.assertEqual(art_num, 3000234)
868 self.assertEqual(message_id, "<45223423@example.com>")
869
870 def test_description(self):
871 desc = self.server.description("comp.lang.python")
872 self.assertEqual(desc, "The Python computer language.")
873 desc = self.server.description("comp.lang.pythonx")
874 self.assertEqual(desc, "")
875
876 def test_descriptions(self):
877 resp, groups = self.server.descriptions("comp.lang.python")
878 self.assertEqual(resp, '215 Descriptions in form "group description".')
879 self.assertEqual(groups, {
880 "comp.lang.python": "The Python computer language.",
881 })
882 resp, groups = self.server.descriptions("comp.lang.python*")
883 self.assertEqual(groups, {
884 "comp.lang.python": "The Python computer language.",
885 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
886 })
887 resp, groups = self.server.descriptions("comp.lang.pythonx")
888 self.assertEqual(groups, {})
889
890 def test_group(self):
891 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
892 self.assertTrue(resp.startswith("211 "), resp)
893 self.assertEqual(first, 761)
894 self.assertEqual(last, 1265)
895 self.assertEqual(count, 486)
896 self.assertEqual(group, "fr.comp.lang.python")
897 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
898 self.server.group("comp.lang.python.devel")
899 exc = cm.exception
900 self.assertTrue(exc.response.startswith("411 No such group"),
901 exc.response)
902
903 def test_newnews(self):
904 # NEWNEWS comp.lang.python [20]100913 082004
905 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
906 resp, ids = self.server.newnews("comp.lang.python", dt)
907 expected = (
908 "230 list of newsarticles (NNTP v{0}) "
909 "created after Mon Sep 13 08:20:04 2010 follows"
910 ).format(self.nntp_version)
911 self.assertEqual(resp, expected)
912 self.assertEqual(ids, [
913 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
914 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
915 ])
916 # NEWNEWS fr.comp.lang.python [20]100913 082004
917 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
918 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
919 self.assertEqual(resp, "230 An empty list of newsarticles follows")
920 self.assertEqual(ids, [])
921
922 def _check_article_body(self, lines):
923 self.assertEqual(len(lines), 4)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +0000924 self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000925 self.assertEqual(lines[-2], b"")
926 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
927 self.assertEqual(lines[-4], b"This is just a test article.")
928
929 def _check_article_head(self, lines):
930 self.assertEqual(len(lines), 4)
931 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
932 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
933
934 def _check_article_data(self, lines):
935 self.assertEqual(len(lines), 9)
936 self._check_article_head(lines[:4])
937 self._check_article_body(lines[-4:])
938 self.assertEqual(lines[4], b"")
939
940 def test_article(self):
941 # ARTICLE
942 resp, info = self.server.article()
943 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
944 art_num, message_id, lines = info
945 self.assertEqual(art_num, 3000237)
946 self.assertEqual(message_id, "<45223423@example.com>")
947 self._check_article_data(lines)
948 # ARTICLE num
949 resp, info = self.server.article(3000234)
950 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
951 art_num, message_id, lines = info
952 self.assertEqual(art_num, 3000234)
953 self.assertEqual(message_id, "<45223423@example.com>")
954 self._check_article_data(lines)
955 # ARTICLE id
956 resp, info = self.server.article("<45223423@example.com>")
957 self.assertEqual(resp, "220 0 <45223423@example.com>")
958 art_num, message_id, lines = info
959 self.assertEqual(art_num, 0)
960 self.assertEqual(message_id, "<45223423@example.com>")
961 self._check_article_data(lines)
962 # Non-existent id
963 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
964 self.server.article("<non-existent@example.com>")
965 self.assertEqual(cm.exception.response, "430 No Such Article Found")
966
967 def test_article_file(self):
968 # With a "file" argument
969 f = io.BytesIO()
970 resp, info = self.server.article(file=f)
971 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
972 art_num, message_id, lines = info
973 self.assertEqual(art_num, 3000237)
974 self.assertEqual(message_id, "<45223423@example.com>")
975 self.assertEqual(lines, [])
976 data = f.getvalue()
977 self.assertTrue(data.startswith(
978 b'From: "Demo User" <nobody@example.net>\r\n'
979 b'Subject: I am just a test article\r\n'
980 ), ascii(data))
981 self.assertTrue(data.endswith(
982 b'This is just a test article.\r\n'
983 b'.Here is a dot-starting line.\r\n'
984 b'\r\n'
985 b'-- Signed by Andr\xc3\xa9.\r\n'
986 ), ascii(data))
987
988 def test_head(self):
989 # HEAD
990 resp, info = self.server.head()
991 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
992 art_num, message_id, lines = info
993 self.assertEqual(art_num, 3000237)
994 self.assertEqual(message_id, "<45223423@example.com>")
995 self._check_article_head(lines)
996 # HEAD num
997 resp, info = self.server.head(3000234)
998 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
999 art_num, message_id, lines = info
1000 self.assertEqual(art_num, 3000234)
1001 self.assertEqual(message_id, "<45223423@example.com>")
1002 self._check_article_head(lines)
1003 # HEAD id
1004 resp, info = self.server.head("<45223423@example.com>")
1005 self.assertEqual(resp, "221 0 <45223423@example.com>")
1006 art_num, message_id, lines = info
1007 self.assertEqual(art_num, 0)
1008 self.assertEqual(message_id, "<45223423@example.com>")
1009 self._check_article_head(lines)
1010 # Non-existent id
1011 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1012 self.server.head("<non-existent@example.com>")
1013 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1014
Antoine Pitrou2640b522012-02-15 18:53:18 +01001015 def test_head_file(self):
1016 f = io.BytesIO()
1017 resp, info = self.server.head(file=f)
1018 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1019 art_num, message_id, lines = info
1020 self.assertEqual(art_num, 3000237)
1021 self.assertEqual(message_id, "<45223423@example.com>")
1022 self.assertEqual(lines, [])
1023 data = f.getvalue()
1024 self.assertTrue(data.startswith(
1025 b'From: "Demo User" <nobody@example.net>\r\n'
1026 b'Subject: I am just a test article\r\n'
1027 ), ascii(data))
1028 self.assertFalse(data.endswith(
1029 b'This is just a test article.\r\n'
1030 b'.Here is a dot-starting line.\r\n'
1031 b'\r\n'
1032 b'-- Signed by Andr\xc3\xa9.\r\n'
1033 ), ascii(data))
1034
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001035 def test_body(self):
1036 # BODY
1037 resp, info = self.server.body()
1038 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1039 art_num, message_id, lines = info
1040 self.assertEqual(art_num, 3000237)
1041 self.assertEqual(message_id, "<45223423@example.com>")
1042 self._check_article_body(lines)
1043 # BODY num
1044 resp, info = self.server.body(3000234)
1045 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
1046 art_num, message_id, lines = info
1047 self.assertEqual(art_num, 3000234)
1048 self.assertEqual(message_id, "<45223423@example.com>")
1049 self._check_article_body(lines)
1050 # BODY id
1051 resp, info = self.server.body("<45223423@example.com>")
1052 self.assertEqual(resp, "222 0 <45223423@example.com>")
1053 art_num, message_id, lines = info
1054 self.assertEqual(art_num, 0)
1055 self.assertEqual(message_id, "<45223423@example.com>")
1056 self._check_article_body(lines)
1057 # Non-existent id
1058 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1059 self.server.body("<non-existent@example.com>")
1060 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1061
Antoine Pitrou2640b522012-02-15 18:53:18 +01001062 def test_body_file(self):
1063 f = io.BytesIO()
1064 resp, info = self.server.body(file=f)
1065 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1066 art_num, message_id, lines = info
1067 self.assertEqual(art_num, 3000237)
1068 self.assertEqual(message_id, "<45223423@example.com>")
1069 self.assertEqual(lines, [])
1070 data = f.getvalue()
1071 self.assertFalse(data.startswith(
1072 b'From: "Demo User" <nobody@example.net>\r\n'
1073 b'Subject: I am just a test article\r\n'
1074 ), ascii(data))
1075 self.assertTrue(data.endswith(
1076 b'This is just a test article.\r\n'
1077 b'.Here is a dot-starting line.\r\n'
1078 b'\r\n'
1079 b'-- Signed by Andr\xc3\xa9.\r\n'
1080 ), ascii(data))
1081
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001082 def check_over_xover_resp(self, resp, overviews):
1083 self.assertTrue(resp.startswith("224 "), resp)
1084 self.assertEqual(len(overviews), 3)
1085 art_num, over = overviews[0]
1086 self.assertEqual(art_num, 57)
1087 self.assertEqual(over, {
1088 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
1089 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
1090 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
1091 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
1092 "references": "<hvalf7$ort$1@dough.gmane.org>",
1093 ":bytes": "7103",
1094 ":lines": "16",
1095 "xref": "news.gmane.org gmane.comp.python.authors:57"
1096 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001097 art_num, over = overviews[1]
1098 self.assertEqual(over["xref"], None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001099 art_num, over = overviews[2]
1100 self.assertEqual(over["subject"],
1101 "Re: Message d'erreur incompréhensible (par moi)")
1102
1103 def test_xover(self):
1104 resp, overviews = self.server.xover(57, 59)
1105 self.check_over_xover_resp(resp, overviews)
1106
1107 def test_over(self):
1108 # In NNTP "v1", this will fallback on XOVER
1109 resp, overviews = self.server.over((57, 59))
1110 self.check_over_xover_resp(resp, overviews)
1111
1112 sample_post = (
1113 b'From: "Demo User" <nobody@example.net>\r\n'
1114 b'Subject: I am just a test article\r\n'
1115 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
1116 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
1117 b'\r\n'
1118 b'This is just a test article.\r\n'
1119 b'.Here is a dot-starting line.\r\n'
1120 b'\r\n'
1121 b'-- Signed by Andr\xc3\xa9.\r\n'
1122 )
1123
1124 def _check_posted_body(self):
1125 # Check the raw body as received by the server
1126 lines = self.handler.posted_body
1127 # One additional line for the "." terminator
1128 self.assertEqual(len(lines), 10)
1129 self.assertEqual(lines[-1], b'.\r\n')
1130 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
1131 self.assertEqual(lines[-3], b'\r\n')
1132 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
1133 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
1134
1135 def _check_post_ihave_sub(self, func, *args, file_factory):
1136 # First the prepared post with CRLF endings
1137 post = self.sample_post
1138 func_args = args + (file_factory(post),)
1139 self.handler.posted_body = None
1140 resp = func(*func_args)
1141 self._check_posted_body()
1142 # Then the same post with "normal" line endings - they should be
1143 # converted by NNTP.post and NNTP.ihave.
1144 post = self.sample_post.replace(b"\r\n", b"\n")
1145 func_args = args + (file_factory(post),)
1146 self.handler.posted_body = None
1147 resp = func(*func_args)
1148 self._check_posted_body()
1149 return resp
1150
1151 def check_post_ihave(self, func, success_resp, *args):
1152 # With a bytes object
1153 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1154 self.assertEqual(resp, success_resp)
1155 # With a bytearray object
1156 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1157 self.assertEqual(resp, success_resp)
1158 # With a file object
1159 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1160 self.assertEqual(resp, success_resp)
1161 # With an iterable of terminated lines
1162 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001163 return iter(b.splitlines(keepends=True))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001164 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1165 self.assertEqual(resp, success_resp)
1166 # With an iterable of non-terminated lines
1167 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001168 return iter(b.splitlines(keepends=False))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001169 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1170 self.assertEqual(resp, success_resp)
1171
1172 def test_post(self):
1173 self.check_post_ihave(self.server.post, "240 Article received OK")
1174 self.handler.allow_posting = False
1175 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1176 self.server.post(self.sample_post)
1177 self.assertEqual(cm.exception.response,
1178 "440 Posting not permitted")
1179
1180 def test_ihave(self):
1181 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1182 "<i.am.an.article.you.will.want@example.com>")
1183 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1184 self.server.ihave("<another.message.id>", self.sample_post)
1185 self.assertEqual(cm.exception.response,
1186 "435 Article not wanted")
1187
Georg Brandl28e78412013-10-27 07:29:47 +01001188 def test_too_long_lines(self):
1189 dt = datetime.datetime(2010, 1, 1, 9, 0, 0)
1190 self.assertRaises(nntplib.NNTPDataError,
1191 self.server.newnews, "comp.lang.python", dt)
1192
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001193
1194class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1195 """Tests an NNTP v1 server (no capabilities)."""
1196
1197 nntp_version = 1
1198 handler_class = NNTPv1Handler
1199
1200 def test_caps(self):
1201 caps = self.server.getcapabilities()
1202 self.assertEqual(caps, {})
1203 self.assertEqual(self.server.nntp_version, 1)
Antoine Pitroua0781152010-11-05 19:16:37 +00001204 self.assertEqual(self.server.nntp_implementation, None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001205
1206
1207class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1208 """Tests an NNTP v2 server (with capabilities)."""
1209
1210 nntp_version = 2
1211 handler_class = NNTPv2Handler
1212
1213 def test_caps(self):
1214 caps = self.server.getcapabilities()
1215 self.assertEqual(caps, {
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001216 'VERSION': ['2', '3'],
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001217 'IMPLEMENTATION': ['INN', '2.5.1'],
1218 'AUTHINFO': ['USER'],
1219 'HDR': [],
1220 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1221 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1222 'OVER': [],
1223 'POST': [],
1224 'READER': [],
1225 })
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001226 self.assertEqual(self.server.nntp_version, 3)
Antoine Pitroua0781152010-11-05 19:16:37 +00001227 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001228
1229
Antoine Pitrou54411c12012-02-12 19:14:17 +01001230class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
1231 """Tests a probably NNTP v2 server with capabilities only after login."""
1232
1233 nntp_version = 2
1234 handler_class = CapsAfterLoginNNTPv2Handler
1235
1236 def test_caps_only_after_login(self):
1237 self.assertEqual(self.server._caps, {})
1238 self.server.login('testuser', 'testpw')
1239 self.assertIn('VERSION', self.server._caps)
1240
1241
Antoine Pitrou71135622012-02-14 23:29:34 +01001242class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin,
1243 unittest.TestCase):
1244 """Same tests as for v2 but we tell NTTP to send MODE READER to a server
1245 that isn't in READER mode by default."""
1246
1247 nntp_version = 2
1248 handler_class = ModeSwitchingNNTPv2Handler
1249
1250 def test_we_are_in_reader_mode_after_connect(self):
1251 self.assertIn('READER', self.server._caps)
1252
1253
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001254class MiscTests(unittest.TestCase):
1255
1256 def test_decode_header(self):
1257 def gives(a, b):
1258 self.assertEqual(nntplib.decode_header(a), b)
1259 gives("" , "")
1260 gives("a plain header", "a plain header")
1261 gives(" with extra spaces ", " with extra spaces ")
1262 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "DĂ©buter en Python")
1263 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1264 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1265 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1266 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1267 "Re: problème de matrice")
1268 # A natively utf-8 header (found in the real world!)
1269 gives("Re: Message d'erreur incompréhensible (par moi)",
1270 "Re: Message d'erreur incompréhensible (par moi)")
1271
1272 def test_parse_overview_fmt(self):
1273 # The minimal (default) response
1274 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1275 "References:", ":bytes", ":lines"]
1276 self.assertEqual(nntplib._parse_overview_fmt(lines),
1277 ["subject", "from", "date", "message-id", "references",
1278 ":bytes", ":lines"])
1279 # The minimal response using alternative names
1280 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1281 "References:", "Bytes:", "Lines:"]
1282 self.assertEqual(nntplib._parse_overview_fmt(lines),
1283 ["subject", "from", "date", "message-id", "references",
1284 ":bytes", ":lines"])
1285 # Variations in casing
1286 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1287 "References:", "BYTES:", "Lines:"]
1288 self.assertEqual(nntplib._parse_overview_fmt(lines),
1289 ["subject", "from", "date", "message-id", "references",
1290 ":bytes", ":lines"])
1291 # First example from RFC 3977
1292 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1293 "References:", ":bytes", ":lines", "Xref:full",
1294 "Distribution:full"]
1295 self.assertEqual(nntplib._parse_overview_fmt(lines),
1296 ["subject", "from", "date", "message-id", "references",
1297 ":bytes", ":lines", "xref", "distribution"])
1298 # Second example from RFC 3977
1299 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1300 "References:", "Bytes:", "Lines:", "Xref:FULL",
1301 "Distribution:FULL"]
1302 self.assertEqual(nntplib._parse_overview_fmt(lines),
1303 ["subject", "from", "date", "message-id", "references",
1304 ":bytes", ":lines", "xref", "distribution"])
1305 # A classic response from INN
1306 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1307 "References:", "Bytes:", "Lines:", "Xref:full"]
1308 self.assertEqual(nntplib._parse_overview_fmt(lines),
1309 ["subject", "from", "date", "message-id", "references",
1310 ":bytes", ":lines", "xref"])
1311
1312 def test_parse_overview(self):
1313 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1314 # First example from RFC 3977
1315 lines = [
1316 '3000234\tI am just a test article\t"Demo User" '
1317 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1318 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1319 '17\tXref: news.example.com misc.test:3000363',
1320 ]
1321 overview = nntplib._parse_overview(lines, fmt)
1322 (art_num, fields), = overview
1323 self.assertEqual(art_num, 3000234)
1324 self.assertEqual(fields, {
1325 'subject': 'I am just a test article',
1326 'from': '"Demo User" <nobody@example.com>',
1327 'date': '6 Oct 1998 04:38:40 -0500',
1328 'message-id': '<45223423@example.com>',
1329 'references': '<45454@example.net>',
1330 ':bytes': '1234',
1331 ':lines': '17',
1332 'xref': 'news.example.com misc.test:3000363',
1333 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001334 # Second example; here the "Xref" field is totally absent (including
1335 # the header name) and comes out as None
1336 lines = [
1337 '3000234\tI am just a test article\t"Demo User" '
1338 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1339 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1340 '17\t\t',
1341 ]
1342 overview = nntplib._parse_overview(lines, fmt)
1343 (art_num, fields), = overview
1344 self.assertEqual(fields['xref'], None)
1345 # Third example; the "Xref" is an empty string, while "references"
1346 # is a single space.
1347 lines = [
1348 '3000234\tI am just a test article\t"Demo User" '
1349 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1350 '<45223423@example.com>\t \t1234\t'
1351 '17\tXref: \t',
1352 ]
1353 overview = nntplib._parse_overview(lines, fmt)
1354 (art_num, fields), = overview
1355 self.assertEqual(fields['references'], ' ')
1356 self.assertEqual(fields['xref'], '')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001357
1358 def test_parse_datetime(self):
1359 def gives(a, b, *c):
1360 self.assertEqual(nntplib._parse_datetime(a, b),
1361 datetime.datetime(*c))
1362 # Output of DATE command
1363 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1364 # Variations
1365 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1366 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1367 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1368
1369 def test_unparse_datetime(self):
1370 # Test non-legacy mode
1371 # 1) with a datetime
1372 def gives(y, M, d, h, m, s, date_str, time_str):
1373 dt = datetime.datetime(y, M, d, h, m, s)
1374 self.assertEqual(nntplib._unparse_datetime(dt),
1375 (date_str, time_str))
1376 self.assertEqual(nntplib._unparse_datetime(dt, False),
1377 (date_str, time_str))
1378 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1379 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1380 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1381 # 2) with a date
1382 def gives(y, M, d, date_str, time_str):
1383 dt = datetime.date(y, M, d)
1384 self.assertEqual(nntplib._unparse_datetime(dt),
1385 (date_str, time_str))
1386 self.assertEqual(nntplib._unparse_datetime(dt, False),
1387 (date_str, time_str))
1388 gives(1999, 6, 23, "19990623", "000000")
1389 gives(2000, 6, 23, "20000623", "000000")
1390 gives(2010, 6, 5, "20100605", "000000")
1391
1392 def test_unparse_datetime_legacy(self):
1393 # Test legacy mode (RFC 977)
1394 # 1) with a datetime
1395 def gives(y, M, d, h, m, s, date_str, time_str):
1396 dt = datetime.datetime(y, M, d, h, m, s)
1397 self.assertEqual(nntplib._unparse_datetime(dt, True),
1398 (date_str, time_str))
1399 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1400 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1401 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1402 # 2) with a date
1403 def gives(y, M, d, date_str, time_str):
1404 dt = datetime.date(y, M, d)
1405 self.assertEqual(nntplib._unparse_datetime(dt, True),
1406 (date_str, time_str))
1407 gives(1999, 6, 23, "990623", "000000")
1408 gives(2000, 6, 23, "000623", "000000")
1409 gives(2010, 6, 5, "100605", "000000")
1410
Serhiy Storchaka43767632013-11-03 21:31:38 +02001411 @unittest.skipUnless(ssl, 'requires SSL support')
1412 def test_ssl_support(self):
1413 self.assertTrue(hasattr(nntplib, 'NNTP_SSL'))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001414
1415def test_main():
Antoine Pitrou54411c12012-02-12 19:14:17 +01001416 tests = [MiscTests, NNTPv1Tests, NNTPv2Tests, CapsAfterLoginNNTPv2Tests,
Serhiy Storchaka43767632013-11-03 21:31:38 +02001417 SendReaderNNTPv2Tests, NetworkedNNTPTests, NetworkedNNTP_SSLTests]
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00001418 support.run_unittest(*tests)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001419
1420
1421if __name__ == "__main__":
1422 test_main()