blob: 5ab87c48bb2be768dbdd786298e9e8aef891963d [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +00002import socket
Antoine Pitrou69ab9512010-09-29 15:03:40 +00003import datetime
4import textwrap
5import unittest
Antoine Pitroude609182010-11-18 17:29:23 +00006import functools
Antoine Pitrou69ab9512010-09-29 15:03:40 +00007import contextlib
8from test import support
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00009from nntplib import NNTP, GroupInfo, _have_ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +000010import nntplib
Antoine Pitrou1cb121e2010-11-09 18:54:37 +000011if _have_ssl:
12 import ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +000013
14TIMEOUT = 30
15
16# TODO:
17# - test the `file` arg to more commands
18# - test error conditions
Antoine Pitroua5785b12010-09-29 16:19:50 +000019# - test auth and `usenetrc`
Antoine Pitrou69ab9512010-09-29 15:03:40 +000020
21
22class NetworkedNNTPTestsMixin:
23
24 def test_welcome(self):
25 welcome = self.server.getwelcome()
26 self.assertEqual(str, type(welcome))
27
28 def test_help(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000029 resp, lines = self.server.help()
Antoine Pitrou69ab9512010-09-29 15:03:40 +000030 self.assertTrue(resp.startswith("100 "), resp)
Antoine Pitrou08eeada2010-11-04 21:36:15 +000031 for line in lines:
Antoine Pitrou69ab9512010-09-29 15:03:40 +000032 self.assertEqual(str, type(line))
33
34 def test_list(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000035 resp, groups = self.server.list()
36 if len(groups) > 0:
37 self.assertEqual(GroupInfo, type(groups[0]))
38 self.assertEqual(str, type(groups[0].group))
39
40 def test_list_active(self):
41 resp, groups = self.server.list(self.GROUP_PAT)
42 if len(groups) > 0:
43 self.assertEqual(GroupInfo, type(groups[0]))
44 self.assertEqual(str, type(groups[0].group))
Antoine Pitrou69ab9512010-09-29 15:03:40 +000045
46 def test_unknown_command(self):
47 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
48 self.server._shortcmd("XYZZY")
49 resp = cm.exception.response
50 self.assertTrue(resp.startswith("500 "), resp)
51
52 def test_newgroups(self):
53 # gmane gets a constant influx of new groups. In order not to stress
54 # the server too much, we choose a recent date in the past.
55 dt = datetime.date.today() - datetime.timedelta(days=7)
56 resp, groups = self.server.newgroups(dt)
57 if len(groups) > 0:
58 self.assertIsInstance(groups[0], GroupInfo)
59 self.assertIsInstance(groups[0].group, str)
60
61 def test_description(self):
62 def _check_desc(desc):
63 # Sanity checks
64 self.assertIsInstance(desc, str)
65 self.assertNotIn(self.GROUP_NAME, desc)
66 desc = self.server.description(self.GROUP_NAME)
67 _check_desc(desc)
68 # Another sanity check
69 self.assertIn("Python", desc)
70 # With a pattern
71 desc = self.server.description(self.GROUP_PAT)
72 _check_desc(desc)
73 # Shouldn't exist
74 desc = self.server.description("zk.brrtt.baz")
75 self.assertEqual(desc, '')
76
77 def test_descriptions(self):
78 resp, descs = self.server.descriptions(self.GROUP_PAT)
79 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
80 self.assertTrue(
81 resp.startswith("215 ") or resp.startswith("282 "), resp)
82 self.assertIsInstance(descs, dict)
83 desc = descs[self.GROUP_NAME]
84 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
85
86 def test_group(self):
87 result = self.server.group(self.GROUP_NAME)
88 self.assertEqual(5, len(result))
89 resp, count, first, last, group = result
90 self.assertEqual(group, self.GROUP_NAME)
91 self.assertIsInstance(count, int)
92 self.assertIsInstance(first, int)
93 self.assertIsInstance(last, int)
94 self.assertLessEqual(first, last)
95 self.assertTrue(resp.startswith("211 "), resp)
96
97 def test_date(self):
98 resp, date = self.server.date()
99 self.assertIsInstance(date, datetime.datetime)
100 # Sanity check
101 self.assertGreaterEqual(date.year, 1995)
102 self.assertLessEqual(date.year, 2030)
103
104 def _check_art_dict(self, art_dict):
105 # Some sanity checks for a field dictionary returned by OVER / XOVER
106 self.assertIsInstance(art_dict, dict)
107 # NNTP has 7 mandatory fields
108 self.assertGreaterEqual(art_dict.keys(),
109 {"subject", "from", "date", "message-id",
110 "references", ":bytes", ":lines"}
111 )
112 for v in art_dict.values():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000113 self.assertIsInstance(v, (str, type(None)))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000114
115 def test_xover(self):
116 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000117 resp, lines = self.server.xover(last - 5, last)
118 if len(lines) == 0:
119 self.skipTest("no articles retrieved")
120 # The 'last' article is not necessarily part of the output (cancelled?)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000121 art_num, art_dict = lines[0]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000122 self.assertGreaterEqual(art_num, last - 5)
123 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000124 self._check_art_dict(art_dict)
125
126 def test_over(self):
127 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
128 start = last - 10
129 # The "start-" article range form
130 resp, lines = self.server.over((start, None))
131 art_num, art_dict = lines[0]
132 self._check_art_dict(art_dict)
133 # The "start-end" article range form
134 resp, lines = self.server.over((start, last))
135 art_num, art_dict = lines[-1]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000136 # The 'last' article is not necessarily part of the output (cancelled?)
137 self.assertGreaterEqual(art_num, start)
138 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000139 self._check_art_dict(art_dict)
140 # XXX The "message_id" form is unsupported by gmane
141 # 503 Overview by message-ID unsupported
142
143 def test_xhdr(self):
144 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
145 resp, lines = self.server.xhdr('subject', last)
146 for line in lines:
147 self.assertEqual(str, type(line[1]))
148
149 def check_article_resp(self, resp, article, art_num=None):
150 self.assertIsInstance(article, nntplib.ArticleInfo)
151 if art_num is not None:
152 self.assertEqual(article.number, art_num)
153 for line in article.lines:
154 self.assertIsInstance(line, bytes)
155 # XXX this could exceptionally happen...
156 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
157
158 def test_article_head_body(self):
159 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000160 # Try to find an available article
161 for art_num in (last, first, last - 1):
162 try:
163 resp, head = self.server.head(art_num)
164 except nntplib.NNTPTemporaryError as e:
165 if not e.response.startswith("423 "):
166 raise
167 # "423 No such article" => choose another one
168 continue
169 break
170 else:
171 self.skipTest("could not find a suitable article number")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000172 self.assertTrue(resp.startswith("221 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000173 self.check_article_resp(resp, head, art_num)
174 resp, body = self.server.body(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000175 self.assertTrue(resp.startswith("222 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000176 self.check_article_resp(resp, body, art_num)
177 resp, article = self.server.article(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000178 self.assertTrue(resp.startswith("220 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000179 self.check_article_resp(resp, article, art_num)
Nick Coghlan14d99a12012-06-17 21:27:18 +1000180 # Tolerate running the tests from behind a NNTP virus checker
Antoine Pitrou1f5d2a02012-06-24 16:28:18 +0200181 blacklist = lambda line: line.startswith(b'X-Antivirus')
182 filtered_head_lines = [line for line in head.lines
183 if not blacklist(line)]
Nick Coghlan14d99a12012-06-17 21:27:18 +1000184 filtered_lines = [line for line in article.lines
Antoine Pitrou1f5d2a02012-06-24 16:28:18 +0200185 if not blacklist(line)]
186 self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000187
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000188 def test_capabilities(self):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000189 # The server under test implements NNTP version 2 and has a
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000190 # couple of well-known capabilities. Just sanity check that we
191 # got them.
192 def _check_caps(caps):
193 caps_list = caps['LIST']
194 self.assertIsInstance(caps_list, (list, tuple))
195 self.assertIn('OVERVIEW.FMT', caps_list)
196 self.assertGreaterEqual(self.server.nntp_version, 2)
197 _check_caps(self.server.getcapabilities())
198 # This re-emits the command
199 resp, caps = self.server.capabilities()
200 _check_caps(caps)
201
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000202 if _have_ssl:
203 def test_starttls(self):
204 file = self.server.file
205 sock = self.server.sock
206 try:
207 self.server.starttls()
208 except nntplib.NNTPPermanentError:
209 self.skipTest("STARTTLS not supported by server.")
210 else:
211 # Check that the socket and internal pseudo-file really were
212 # changed.
213 self.assertNotEqual(file, self.server.file)
214 self.assertNotEqual(sock, self.server.sock)
215 # Check that the new socket really is an SSL one
216 self.assertIsInstance(self.server.sock, ssl.SSLSocket)
217 # Check that trying starttls when it's already active fails.
218 self.assertRaises(ValueError, self.server.starttls)
219
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000220 def test_zlogin(self):
221 # This test must be the penultimate because further commands will be
222 # refused.
223 baduser = "notarealuser"
224 badpw = "notarealpassword"
225 # Check that bogus credentials cause failure
226 self.assertRaises(nntplib.NNTPError, self.server.login,
227 user=baduser, password=badpw, usenetrc=False)
228 # FIXME: We should check that correct credentials succeed, but that
229 # would require valid details for some server somewhere to be in the
230 # test suite, I think. Gmane is anonymous, at least as used for the
231 # other tests.
232
233 def test_zzquit(self):
234 # This test must be called last, hence the name
235 cls = type(self)
Antoine Pitrou3bce11c2010-11-21 17:14:19 +0000236 try:
237 self.server.quit()
238 finally:
239 cls.server = None
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000240
Antoine Pitroude609182010-11-18 17:29:23 +0000241 @classmethod
242 def wrap_methods(cls):
243 # Wrap all methods in a transient_internet() exception catcher
244 # XXX put a generic version in test.support?
245 def wrap_meth(meth):
246 @functools.wraps(meth)
247 def wrapped(self):
248 with support.transient_internet(self.NNTP_HOST):
249 meth(self)
250 return wrapped
251 for name in dir(cls):
252 if not name.startswith('test_'):
253 continue
254 meth = getattr(cls, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200255 if not callable(meth):
Antoine Pitroude609182010-11-18 17:29:23 +0000256 continue
257 # Need to use a closure so that meth remains bound to its current
258 # value
259 setattr(cls, name, wrap_meth(meth))
260
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000261 def test_with_statement(self):
262 def is_connected():
263 if not hasattr(server, 'file'):
264 return False
265 try:
266 server.help()
267 except (socket.error, EOFError):
268 return False
269 return True
270
271 with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
272 self.assertTrue(is_connected())
273 self.assertTrue(server.help())
274 self.assertFalse(is_connected())
275
276 with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
277 server.quit()
278 self.assertFalse(is_connected())
279
280
Antoine Pitroude609182010-11-18 17:29:23 +0000281NetworkedNNTPTestsMixin.wrap_methods()
282
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000283
284class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
285 # This server supports STARTTLS (gmane doesn't)
286 NNTP_HOST = 'news.trigofacile.com'
287 GROUP_NAME = 'fr.comp.lang.python'
288 GROUP_PAT = 'fr.comp.lang.*'
289
Antoine Pitroude609182010-11-18 17:29:23 +0000290 NNTP_CLASS = NNTP
291
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000292 @classmethod
293 def setUpClass(cls):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000294 support.requires("network")
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000295 with support.transient_internet(cls.NNTP_HOST):
Antoine Pitroude609182010-11-18 17:29:23 +0000296 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, timeout=TIMEOUT, usenetrc=False)
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000297
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000298 @classmethod
299 def tearDownClass(cls):
300 if cls.server is not None:
301 cls.server.quit()
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000302
303
304if _have_ssl:
Antoine Pitroude609182010-11-18 17:29:23 +0000305 class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000306
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000307 # Technical limits for this public NNTP server (see http://www.aioe.org):
308 # "Only two concurrent connections per IP address are allowed and
309 # 400 connections per day are accepted from each IP address."
310
311 NNTP_HOST = 'nntp.aioe.org'
312 GROUP_NAME = 'comp.lang.python'
313 GROUP_PAT = 'comp.lang.*'
314
Antoine Pitroude609182010-11-18 17:29:23 +0000315 NNTP_CLASS = nntplib.NNTP_SSL
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000316
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000317 # Disabled as it produces too much data
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000318 test_list = None
319
320 # Disabled as the connection will already be encrypted.
321 test_starttls = None
322
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000323
324#
325# Non-networked tests using a local server (or something mocking it).
326#
327
328class _NNTPServerIO(io.RawIOBase):
329 """A raw IO object allowing NNTP commands to be received and processed
330 by a handler. The handler can push responses which can then be read
331 from the IO object."""
332
333 def __init__(self, handler):
334 io.RawIOBase.__init__(self)
335 # The channel from the client
336 self.c2s = io.BytesIO()
337 # The channel to the client
338 self.s2c = io.BytesIO()
339 self.handler = handler
340 self.handler.start(self.c2s.readline, self.push_data)
341
342 def readable(self):
343 return True
344
345 def writable(self):
346 return True
347
348 def push_data(self, data):
349 """Push (buffer) some data to send to the client."""
350 pos = self.s2c.tell()
351 self.s2c.seek(0, 2)
352 self.s2c.write(data)
353 self.s2c.seek(pos)
354
355 def write(self, b):
356 """The client sends us some data"""
357 pos = self.c2s.tell()
358 self.c2s.write(b)
359 self.c2s.seek(pos)
360 self.handler.process_pending()
361 return len(b)
362
363 def readinto(self, buf):
364 """The client wants to read a response"""
365 self.handler.process_pending()
366 b = self.s2c.read(len(buf))
367 n = len(b)
368 buf[:n] = b
369 return n
370
371
372class MockedNNTPTestsMixin:
373 # Override in derived classes
374 handler_class = None
375
376 def setUp(self):
377 super().setUp()
378 self.make_server()
379
380 def tearDown(self):
381 super().tearDown()
382 del self.server
383
384 def make_server(self, *args, **kwargs):
385 self.handler = self.handler_class()
386 self.sio = _NNTPServerIO(self.handler)
387 # Using BufferedRWPair instead of BufferedRandom ensures the file
388 # isn't seekable.
389 file = io.BufferedRWPair(self.sio, self.sio)
Antoine Pitroua5785b12010-09-29 16:19:50 +0000390 self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000391 return self.server
392
393
Antoine Pitrou71135622012-02-14 23:29:34 +0100394class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin):
395 def setUp(self):
396 super().setUp()
397 self.make_server(readermode=True)
398
399
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000400class NNTPv1Handler:
401 """A handler for RFC 977"""
402
403 welcome = "200 NNTP mock server"
404
405 def start(self, readline, push_data):
406 self.in_body = False
407 self.allow_posting = True
408 self._readline = readline
409 self._push_data = push_data
Antoine Pitrou54411c12012-02-12 19:14:17 +0100410 self._logged_in = False
411 self._user_sent = False
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000412 # Our welcome
413 self.handle_welcome()
414
415 def _decode(self, data):
416 return str(data, "utf-8", "surrogateescape")
417
418 def process_pending(self):
419 if self.in_body:
420 while True:
421 line = self._readline()
422 if not line:
423 return
424 self.body.append(line)
425 if line == b".\r\n":
426 break
427 try:
428 meth, tokens = self.body_callback
429 meth(*tokens, body=self.body)
430 finally:
431 self.body_callback = None
432 self.body = None
433 self.in_body = False
434 while True:
435 line = self._decode(self._readline())
436 if not line:
437 return
438 if not line.endswith("\r\n"):
439 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
440 line = line[:-2]
441 cmd, *tokens = line.split()
442 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
443 meth = getattr(self, "handle_" + cmd.upper(), None)
444 if meth is None:
445 self.handle_unknown()
446 else:
447 try:
448 meth(*tokens)
449 except Exception as e:
450 raise ValueError("command failed: {!r}".format(line)) from e
451 else:
452 if self.in_body:
453 self.body_callback = meth, tokens
454 self.body = []
455
456 def expect_body(self):
457 """Flag that the client is expected to post a request body"""
458 self.in_body = True
459
460 def push_data(self, data):
461 """Push some binary data"""
462 self._push_data(data)
463
464 def push_lit(self, lit):
465 """Push a string literal"""
466 lit = textwrap.dedent(lit)
467 lit = "\r\n".join(lit.splitlines()) + "\r\n"
468 lit = lit.encode('utf-8')
469 self.push_data(lit)
470
471 def handle_unknown(self):
472 self.push_lit("500 What?")
473
474 def handle_welcome(self):
475 self.push_lit(self.welcome)
476
477 def handle_QUIT(self):
478 self.push_lit("205 Bye!")
479
480 def handle_DATE(self):
481 self.push_lit("111 20100914001155")
482
483 def handle_GROUP(self, group):
484 if group == "fr.comp.lang.python":
485 self.push_lit("211 486 761 1265 fr.comp.lang.python")
486 else:
487 self.push_lit("411 No such group {}".format(group))
488
489 def handle_HELP(self):
490 self.push_lit("""\
491 100 Legal commands
492 authinfo user Name|pass Password|generic <prog> <args>
493 date
494 help
495 Report problems to <root@example.org>
496 .""")
497
498 def handle_STAT(self, message_spec=None):
499 if message_spec is None:
500 self.push_lit("412 No newsgroup selected")
501 elif message_spec == "3000234":
502 self.push_lit("223 3000234 <45223423@example.com>")
503 elif message_spec == "<45223423@example.com>":
504 self.push_lit("223 0 <45223423@example.com>")
505 else:
506 self.push_lit("430 No Such Article Found")
507
508 def handle_NEXT(self):
509 self.push_lit("223 3000237 <668929@example.org> retrieved")
510
511 def handle_LAST(self):
512 self.push_lit("223 3000234 <45223423@example.com> retrieved")
513
514 def handle_LIST(self, action=None, param=None):
515 if action is None:
516 self.push_lit("""\
517 215 Newsgroups in form "group high low flags".
518 comp.lang.python 0000052340 0000002828 y
519 comp.lang.python.announce 0000001153 0000000993 m
520 free.it.comp.lang.python 0000000002 0000000002 y
521 fr.comp.lang.python 0000001254 0000000760 y
522 free.it.comp.lang.python.learner 0000000000 0000000001 y
523 tw.bbs.comp.lang.python 0000000304 0000000304 y
524 .""")
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000525 elif action == "ACTIVE":
526 if param == "*distutils*":
527 self.push_lit("""\
528 215 Newsgroups in form "group high low flags"
529 gmane.comp.python.distutils.devel 0000014104 0000000001 m
530 gmane.comp.python.distutils.cvs 0000000000 0000000001 m
531 .""")
532 else:
533 self.push_lit("""\
534 215 Newsgroups in form "group high low flags"
535 .""")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000536 elif action == "OVERVIEW.FMT":
537 self.push_lit("""\
538 215 Order of fields in overview database.
539 Subject:
540 From:
541 Date:
542 Message-ID:
543 References:
544 Bytes:
545 Lines:
546 Xref:full
547 .""")
548 elif action == "NEWSGROUPS":
549 assert param is not None
550 if param == "comp.lang.python":
551 self.push_lit("""\
552 215 Descriptions in form "group description".
553 comp.lang.python\tThe Python computer language.
554 .""")
555 elif param == "comp.lang.python*":
556 self.push_lit("""\
557 215 Descriptions in form "group description".
558 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
559 comp.lang.python\tThe Python computer language.
560 .""")
561 else:
562 self.push_lit("""\
563 215 Descriptions in form "group description".
564 .""")
565 else:
566 self.push_lit('501 Unknown LIST keyword')
567
568 def handle_NEWNEWS(self, group, date_str, time_str):
569 # We hard code different return messages depending on passed
570 # argument and date syntax.
571 if (group == "comp.lang.python" and date_str == "20100913"
572 and time_str == "082004"):
573 # Date was passed in RFC 3977 format (NNTP "v2")
574 self.push_lit("""\
575 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
576 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
577 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
578 .""")
579 elif (group == "comp.lang.python" and date_str == "100913"
580 and time_str == "082004"):
581 # Date was passed in RFC 977 format (NNTP "v1")
582 self.push_lit("""\
583 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
584 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
585 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
586 .""")
Georg Brandl28e78412013-10-27 07:29:47 +0100587 elif (group == 'comp.lang.python' and
588 date_str in ('20100101', '100101') and
589 time_str == '090000'):
590 self.push_lit('too long line' * 3000 +
591 '\n.')
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000592 else:
593 self.push_lit("""\
594 230 An empty list of newsarticles follows
595 .""")
596 # (Note for experiments: many servers disable NEWNEWS.
597 # As of this writing, sicinfo3.epfl.ch doesn't.)
598
599 def handle_XOVER(self, message_spec):
600 if message_spec == "57-59":
601 self.push_lit(
602 "224 Overview information for 57-58 follows\n"
603 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
604 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
605 "\tSat, 19 Jun 2010 18:04:08 -0400"
606 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
607 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
608 "\tXref: news.gmane.org gmane.comp.python.authors:57"
609 "\n"
610 "58\tLooking for a few good bloggers"
611 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
612 "\tThu, 22 Jul 2010 09:14:14 -0400"
613 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
614 "\t\t6683\t16"
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000615 "\t"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000616 "\n"
617 # An UTF-8 overview line from fr.comp.lang.python
618 "59\tRe: Message d'erreur incompréhensible (par moi)"
619 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
620 "\tWed, 15 Sep 2010 18:09:15 +0200"
621 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
622 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
623 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
624 "\n"
625 ".\n")
626 else:
627 self.push_lit("""\
628 224 No articles
629 .""")
630
631 def handle_POST(self, *, body=None):
632 if body is None:
633 if self.allow_posting:
634 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
635 self.expect_body()
636 else:
637 self.push_lit("440 Posting not permitted")
638 else:
639 assert self.allow_posting
640 self.push_lit("240 Article received OK")
641 self.posted_body = body
642
643 def handle_IHAVE(self, message_id, *, body=None):
644 if body is None:
645 if (self.allow_posting and
646 message_id == "<i.am.an.article.you.will.want@example.com>"):
647 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
648 self.expect_body()
649 else:
650 self.push_lit("435 Article not wanted")
651 else:
652 assert self.allow_posting
653 self.push_lit("235 Article transferred OK")
654 self.posted_body = body
655
656 sample_head = """\
657 From: "Demo User" <nobody@example.net>
658 Subject: I am just a test article
659 Content-Type: text/plain; charset=UTF-8; format=flowed
660 Message-ID: <i.am.an.article.you.will.want@example.com>"""
661
662 sample_body = """\
663 This is just a test article.
664 ..Here is a dot-starting line.
665
666 -- Signed by Andr\xe9."""
667
668 sample_article = sample_head + "\n\n" + sample_body
669
670 def handle_ARTICLE(self, message_spec=None):
671 if message_spec is None:
672 self.push_lit("220 3000237 <45223423@example.com>")
673 elif message_spec == "<45223423@example.com>":
674 self.push_lit("220 0 <45223423@example.com>")
675 elif message_spec == "3000234":
676 self.push_lit("220 3000234 <45223423@example.com>")
677 else:
678 self.push_lit("430 No Such Article Found")
679 return
680 self.push_lit(self.sample_article)
681 self.push_lit(".")
682
683 def handle_HEAD(self, message_spec=None):
684 if message_spec is None:
685 self.push_lit("221 3000237 <45223423@example.com>")
686 elif message_spec == "<45223423@example.com>":
687 self.push_lit("221 0 <45223423@example.com>")
688 elif message_spec == "3000234":
689 self.push_lit("221 3000234 <45223423@example.com>")
690 else:
691 self.push_lit("430 No Such Article Found")
692 return
693 self.push_lit(self.sample_head)
694 self.push_lit(".")
695
696 def handle_BODY(self, message_spec=None):
697 if message_spec is None:
698 self.push_lit("222 3000237 <45223423@example.com>")
699 elif message_spec == "<45223423@example.com>":
700 self.push_lit("222 0 <45223423@example.com>")
701 elif message_spec == "3000234":
702 self.push_lit("222 3000234 <45223423@example.com>")
703 else:
704 self.push_lit("430 No Such Article Found")
705 return
706 self.push_lit(self.sample_body)
707 self.push_lit(".")
708
Antoine Pitrou54411c12012-02-12 19:14:17 +0100709 def handle_AUTHINFO(self, cred_type, data):
710 if self._logged_in:
711 self.push_lit('502 Already Logged In')
712 elif cred_type == 'user':
713 if self._user_sent:
714 self.push_lit('482 User Credential Already Sent')
715 else:
716 self.push_lit('381 Password Required')
717 self._user_sent = True
718 elif cred_type == 'pass':
719 self.push_lit('281 Login Successful')
720 self._logged_in = True
721 else:
722 raise Exception('Unknown cred type {}'.format(cred_type))
723
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000724
725class NNTPv2Handler(NNTPv1Handler):
726 """A handler for RFC 3977 (NNTP "v2")"""
727
728 def handle_CAPABILITIES(self):
Antoine Pitrou54411c12012-02-12 19:14:17 +0100729 fmt = """\
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000730 101 Capability list:
Antoine Pitrouf80b3f72010-11-02 22:31:52 +0000731 VERSION 2 3
Antoine Pitrou54411c12012-02-12 19:14:17 +0100732 IMPLEMENTATION INN 2.5.1{}
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000733 HDR
734 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
735 OVER
736 POST
737 READER
Antoine Pitrou54411c12012-02-12 19:14:17 +0100738 ."""
739
740 if not self._logged_in:
741 self.push_lit(fmt.format('\n AUTHINFO USER'))
742 else:
743 self.push_lit(fmt.format(''))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000744
Antoine Pitrou71135622012-02-14 23:29:34 +0100745 def handle_MODE(self, _):
746 raise Exception('MODE READER sent despite READER has been advertised')
747
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000748 def handle_OVER(self, message_spec=None):
749 return self.handle_XOVER(message_spec)
750
751
Antoine Pitrou54411c12012-02-12 19:14:17 +0100752class CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
753 """A handler that allows CAPABILITIES only after login"""
754
755 def handle_CAPABILITIES(self):
756 if not self._logged_in:
757 self.push_lit('480 You must log in.')
758 else:
759 super().handle_CAPABILITIES()
760
761
Antoine Pitrou71135622012-02-14 23:29:34 +0100762class ModeSwitchingNNTPv2Handler(NNTPv2Handler):
763 """A server that starts in transit mode"""
764
765 def __init__(self):
766 self._switched = False
767
768 def handle_CAPABILITIES(self):
769 fmt = """\
770 101 Capability list:
771 VERSION 2 3
772 IMPLEMENTATION INN 2.5.1
773 HDR
774 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
775 OVER
776 POST
777 {}READER
778 ."""
779 if self._switched:
780 self.push_lit(fmt.format(''))
781 else:
782 self.push_lit(fmt.format('MODE-'))
783
784 def handle_MODE(self, what):
785 assert not self._switched and what == 'reader'
786 self._switched = True
787 self.push_lit('200 Posting allowed')
788
789
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000790class NNTPv1v2TestsMixin:
791
792 def setUp(self):
793 super().setUp()
794
795 def test_welcome(self):
796 self.assertEqual(self.server.welcome, self.handler.welcome)
797
Antoine Pitrou54411c12012-02-12 19:14:17 +0100798 def test_authinfo(self):
799 if self.nntp_version == 2:
800 self.assertIn('AUTHINFO', self.server._caps)
801 self.server.login('testuser', 'testpw')
802 # if AUTHINFO is gone from _caps we also know that getcapabilities()
803 # has been called after login as it should
804 self.assertNotIn('AUTHINFO', self.server._caps)
805
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000806 def test_date(self):
807 resp, date = self.server.date()
808 self.assertEqual(resp, "111 20100914001155")
809 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
810
811 def test_quit(self):
812 self.assertFalse(self.sio.closed)
813 resp = self.server.quit()
814 self.assertEqual(resp, "205 Bye!")
815 self.assertTrue(self.sio.closed)
816
817 def test_help(self):
818 resp, help = self.server.help()
819 self.assertEqual(resp, "100 Legal commands")
820 self.assertEqual(help, [
821 ' authinfo user Name|pass Password|generic <prog> <args>',
822 ' date',
823 ' help',
824 'Report problems to <root@example.org>',
825 ])
826
827 def test_list(self):
828 resp, groups = self.server.list()
829 self.assertEqual(len(groups), 6)
830 g = groups[1]
831 self.assertEqual(g,
832 GroupInfo("comp.lang.python.announce", "0000001153",
833 "0000000993", "m"))
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000834 resp, groups = self.server.list("*distutils*")
835 self.assertEqual(len(groups), 2)
836 g = groups[0]
837 self.assertEqual(g,
838 GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
839 "0000000001", "m"))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000840
841 def test_stat(self):
842 resp, art_num, message_id = self.server.stat(3000234)
843 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
844 self.assertEqual(art_num, 3000234)
845 self.assertEqual(message_id, "<45223423@example.com>")
846 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
847 self.assertEqual(resp, "223 0 <45223423@example.com>")
848 self.assertEqual(art_num, 0)
849 self.assertEqual(message_id, "<45223423@example.com>")
850 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
851 self.server.stat("<non.existent.id>")
852 self.assertEqual(cm.exception.response, "430 No Such Article Found")
853 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
854 self.server.stat()
855 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
856
857 def test_next(self):
858 resp, art_num, message_id = self.server.next()
859 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
860 self.assertEqual(art_num, 3000237)
861 self.assertEqual(message_id, "<668929@example.org>")
862
863 def test_last(self):
864 resp, art_num, message_id = self.server.last()
865 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
866 self.assertEqual(art_num, 3000234)
867 self.assertEqual(message_id, "<45223423@example.com>")
868
869 def test_description(self):
870 desc = self.server.description("comp.lang.python")
871 self.assertEqual(desc, "The Python computer language.")
872 desc = self.server.description("comp.lang.pythonx")
873 self.assertEqual(desc, "")
874
875 def test_descriptions(self):
876 resp, groups = self.server.descriptions("comp.lang.python")
877 self.assertEqual(resp, '215 Descriptions in form "group description".')
878 self.assertEqual(groups, {
879 "comp.lang.python": "The Python computer language.",
880 })
881 resp, groups = self.server.descriptions("comp.lang.python*")
882 self.assertEqual(groups, {
883 "comp.lang.python": "The Python computer language.",
884 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
885 })
886 resp, groups = self.server.descriptions("comp.lang.pythonx")
887 self.assertEqual(groups, {})
888
889 def test_group(self):
890 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
891 self.assertTrue(resp.startswith("211 "), resp)
892 self.assertEqual(first, 761)
893 self.assertEqual(last, 1265)
894 self.assertEqual(count, 486)
895 self.assertEqual(group, "fr.comp.lang.python")
896 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
897 self.server.group("comp.lang.python.devel")
898 exc = cm.exception
899 self.assertTrue(exc.response.startswith("411 No such group"),
900 exc.response)
901
902 def test_newnews(self):
903 # NEWNEWS comp.lang.python [20]100913 082004
904 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
905 resp, ids = self.server.newnews("comp.lang.python", dt)
906 expected = (
907 "230 list of newsarticles (NNTP v{0}) "
908 "created after Mon Sep 13 08:20:04 2010 follows"
909 ).format(self.nntp_version)
910 self.assertEqual(resp, expected)
911 self.assertEqual(ids, [
912 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
913 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
914 ])
915 # NEWNEWS fr.comp.lang.python [20]100913 082004
916 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
917 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
918 self.assertEqual(resp, "230 An empty list of newsarticles follows")
919 self.assertEqual(ids, [])
920
921 def _check_article_body(self, lines):
922 self.assertEqual(len(lines), 4)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +0000923 self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000924 self.assertEqual(lines[-2], b"")
925 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
926 self.assertEqual(lines[-4], b"This is just a test article.")
927
928 def _check_article_head(self, lines):
929 self.assertEqual(len(lines), 4)
930 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
931 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
932
933 def _check_article_data(self, lines):
934 self.assertEqual(len(lines), 9)
935 self._check_article_head(lines[:4])
936 self._check_article_body(lines[-4:])
937 self.assertEqual(lines[4], b"")
938
939 def test_article(self):
940 # ARTICLE
941 resp, info = self.server.article()
942 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
943 art_num, message_id, lines = info
944 self.assertEqual(art_num, 3000237)
945 self.assertEqual(message_id, "<45223423@example.com>")
946 self._check_article_data(lines)
947 # ARTICLE num
948 resp, info = self.server.article(3000234)
949 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
950 art_num, message_id, lines = info
951 self.assertEqual(art_num, 3000234)
952 self.assertEqual(message_id, "<45223423@example.com>")
953 self._check_article_data(lines)
954 # ARTICLE id
955 resp, info = self.server.article("<45223423@example.com>")
956 self.assertEqual(resp, "220 0 <45223423@example.com>")
957 art_num, message_id, lines = info
958 self.assertEqual(art_num, 0)
959 self.assertEqual(message_id, "<45223423@example.com>")
960 self._check_article_data(lines)
961 # Non-existent id
962 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
963 self.server.article("<non-existent@example.com>")
964 self.assertEqual(cm.exception.response, "430 No Such Article Found")
965
966 def test_article_file(self):
967 # With a "file" argument
968 f = io.BytesIO()
969 resp, info = self.server.article(file=f)
970 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
971 art_num, message_id, lines = info
972 self.assertEqual(art_num, 3000237)
973 self.assertEqual(message_id, "<45223423@example.com>")
974 self.assertEqual(lines, [])
975 data = f.getvalue()
976 self.assertTrue(data.startswith(
977 b'From: "Demo User" <nobody@example.net>\r\n'
978 b'Subject: I am just a test article\r\n'
979 ), ascii(data))
980 self.assertTrue(data.endswith(
981 b'This is just a test article.\r\n'
982 b'.Here is a dot-starting line.\r\n'
983 b'\r\n'
984 b'-- Signed by Andr\xc3\xa9.\r\n'
985 ), ascii(data))
986
987 def test_head(self):
988 # HEAD
989 resp, info = self.server.head()
990 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
991 art_num, message_id, lines = info
992 self.assertEqual(art_num, 3000237)
993 self.assertEqual(message_id, "<45223423@example.com>")
994 self._check_article_head(lines)
995 # HEAD num
996 resp, info = self.server.head(3000234)
997 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
998 art_num, message_id, lines = info
999 self.assertEqual(art_num, 3000234)
1000 self.assertEqual(message_id, "<45223423@example.com>")
1001 self._check_article_head(lines)
1002 # HEAD id
1003 resp, info = self.server.head("<45223423@example.com>")
1004 self.assertEqual(resp, "221 0 <45223423@example.com>")
1005 art_num, message_id, lines = info
1006 self.assertEqual(art_num, 0)
1007 self.assertEqual(message_id, "<45223423@example.com>")
1008 self._check_article_head(lines)
1009 # Non-existent id
1010 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1011 self.server.head("<non-existent@example.com>")
1012 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1013
Antoine Pitrou2640b522012-02-15 18:53:18 +01001014 def test_head_file(self):
1015 f = io.BytesIO()
1016 resp, info = self.server.head(file=f)
1017 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1018 art_num, message_id, lines = info
1019 self.assertEqual(art_num, 3000237)
1020 self.assertEqual(message_id, "<45223423@example.com>")
1021 self.assertEqual(lines, [])
1022 data = f.getvalue()
1023 self.assertTrue(data.startswith(
1024 b'From: "Demo User" <nobody@example.net>\r\n'
1025 b'Subject: I am just a test article\r\n'
1026 ), ascii(data))
1027 self.assertFalse(data.endswith(
1028 b'This is just a test article.\r\n'
1029 b'.Here is a dot-starting line.\r\n'
1030 b'\r\n'
1031 b'-- Signed by Andr\xc3\xa9.\r\n'
1032 ), ascii(data))
1033
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001034 def test_body(self):
1035 # BODY
1036 resp, info = self.server.body()
1037 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1038 art_num, message_id, lines = info
1039 self.assertEqual(art_num, 3000237)
1040 self.assertEqual(message_id, "<45223423@example.com>")
1041 self._check_article_body(lines)
1042 # BODY num
1043 resp, info = self.server.body(3000234)
1044 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
1045 art_num, message_id, lines = info
1046 self.assertEqual(art_num, 3000234)
1047 self.assertEqual(message_id, "<45223423@example.com>")
1048 self._check_article_body(lines)
1049 # BODY id
1050 resp, info = self.server.body("<45223423@example.com>")
1051 self.assertEqual(resp, "222 0 <45223423@example.com>")
1052 art_num, message_id, lines = info
1053 self.assertEqual(art_num, 0)
1054 self.assertEqual(message_id, "<45223423@example.com>")
1055 self._check_article_body(lines)
1056 # Non-existent id
1057 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1058 self.server.body("<non-existent@example.com>")
1059 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1060
Antoine Pitrou2640b522012-02-15 18:53:18 +01001061 def test_body_file(self):
1062 f = io.BytesIO()
1063 resp, info = self.server.body(file=f)
1064 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1065 art_num, message_id, lines = info
1066 self.assertEqual(art_num, 3000237)
1067 self.assertEqual(message_id, "<45223423@example.com>")
1068 self.assertEqual(lines, [])
1069 data = f.getvalue()
1070 self.assertFalse(data.startswith(
1071 b'From: "Demo User" <nobody@example.net>\r\n'
1072 b'Subject: I am just a test article\r\n'
1073 ), ascii(data))
1074 self.assertTrue(data.endswith(
1075 b'This is just a test article.\r\n'
1076 b'.Here is a dot-starting line.\r\n'
1077 b'\r\n'
1078 b'-- Signed by Andr\xc3\xa9.\r\n'
1079 ), ascii(data))
1080
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001081 def check_over_xover_resp(self, resp, overviews):
1082 self.assertTrue(resp.startswith("224 "), resp)
1083 self.assertEqual(len(overviews), 3)
1084 art_num, over = overviews[0]
1085 self.assertEqual(art_num, 57)
1086 self.assertEqual(over, {
1087 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
1088 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
1089 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
1090 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
1091 "references": "<hvalf7$ort$1@dough.gmane.org>",
1092 ":bytes": "7103",
1093 ":lines": "16",
1094 "xref": "news.gmane.org gmane.comp.python.authors:57"
1095 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001096 art_num, over = overviews[1]
1097 self.assertEqual(over["xref"], None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001098 art_num, over = overviews[2]
1099 self.assertEqual(over["subject"],
1100 "Re: Message d'erreur incompréhensible (par moi)")
1101
1102 def test_xover(self):
1103 resp, overviews = self.server.xover(57, 59)
1104 self.check_over_xover_resp(resp, overviews)
1105
1106 def test_over(self):
1107 # In NNTP "v1", this will fallback on XOVER
1108 resp, overviews = self.server.over((57, 59))
1109 self.check_over_xover_resp(resp, overviews)
1110
1111 sample_post = (
1112 b'From: "Demo User" <nobody@example.net>\r\n'
1113 b'Subject: I am just a test article\r\n'
1114 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
1115 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
1116 b'\r\n'
1117 b'This is just a test article.\r\n'
1118 b'.Here is a dot-starting line.\r\n'
1119 b'\r\n'
1120 b'-- Signed by Andr\xc3\xa9.\r\n'
1121 )
1122
1123 def _check_posted_body(self):
1124 # Check the raw body as received by the server
1125 lines = self.handler.posted_body
1126 # One additional line for the "." terminator
1127 self.assertEqual(len(lines), 10)
1128 self.assertEqual(lines[-1], b'.\r\n')
1129 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
1130 self.assertEqual(lines[-3], b'\r\n')
1131 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
1132 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
1133
1134 def _check_post_ihave_sub(self, func, *args, file_factory):
1135 # First the prepared post with CRLF endings
1136 post = self.sample_post
1137 func_args = args + (file_factory(post),)
1138 self.handler.posted_body = None
1139 resp = func(*func_args)
1140 self._check_posted_body()
1141 # Then the same post with "normal" line endings - they should be
1142 # converted by NNTP.post and NNTP.ihave.
1143 post = self.sample_post.replace(b"\r\n", b"\n")
1144 func_args = args + (file_factory(post),)
1145 self.handler.posted_body = None
1146 resp = func(*func_args)
1147 self._check_posted_body()
1148 return resp
1149
1150 def check_post_ihave(self, func, success_resp, *args):
1151 # With a bytes object
1152 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1153 self.assertEqual(resp, success_resp)
1154 # With a bytearray object
1155 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1156 self.assertEqual(resp, success_resp)
1157 # With a file object
1158 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1159 self.assertEqual(resp, success_resp)
1160 # With an iterable of terminated lines
1161 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001162 return iter(b.splitlines(keepends=True))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001163 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1164 self.assertEqual(resp, success_resp)
1165 # With an iterable of non-terminated lines
1166 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001167 return iter(b.splitlines(keepends=False))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001168 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1169 self.assertEqual(resp, success_resp)
1170
1171 def test_post(self):
1172 self.check_post_ihave(self.server.post, "240 Article received OK")
1173 self.handler.allow_posting = False
1174 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1175 self.server.post(self.sample_post)
1176 self.assertEqual(cm.exception.response,
1177 "440 Posting not permitted")
1178
1179 def test_ihave(self):
1180 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1181 "<i.am.an.article.you.will.want@example.com>")
1182 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1183 self.server.ihave("<another.message.id>", self.sample_post)
1184 self.assertEqual(cm.exception.response,
1185 "435 Article not wanted")
1186
Georg Brandl28e78412013-10-27 07:29:47 +01001187 def test_too_long_lines(self):
1188 dt = datetime.datetime(2010, 1, 1, 9, 0, 0)
1189 self.assertRaises(nntplib.NNTPDataError,
1190 self.server.newnews, "comp.lang.python", dt)
1191
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001192
1193class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1194 """Tests an NNTP v1 server (no capabilities)."""
1195
1196 nntp_version = 1
1197 handler_class = NNTPv1Handler
1198
1199 def test_caps(self):
1200 caps = self.server.getcapabilities()
1201 self.assertEqual(caps, {})
1202 self.assertEqual(self.server.nntp_version, 1)
Antoine Pitroua0781152010-11-05 19:16:37 +00001203 self.assertEqual(self.server.nntp_implementation, None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001204
1205
1206class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1207 """Tests an NNTP v2 server (with capabilities)."""
1208
1209 nntp_version = 2
1210 handler_class = NNTPv2Handler
1211
1212 def test_caps(self):
1213 caps = self.server.getcapabilities()
1214 self.assertEqual(caps, {
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001215 'VERSION': ['2', '3'],
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001216 'IMPLEMENTATION': ['INN', '2.5.1'],
1217 'AUTHINFO': ['USER'],
1218 'HDR': [],
1219 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1220 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1221 'OVER': [],
1222 'POST': [],
1223 'READER': [],
1224 })
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001225 self.assertEqual(self.server.nntp_version, 3)
Antoine Pitroua0781152010-11-05 19:16:37 +00001226 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001227
1228
Antoine Pitrou54411c12012-02-12 19:14:17 +01001229class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
1230 """Tests a probably NNTP v2 server with capabilities only after login."""
1231
1232 nntp_version = 2
1233 handler_class = CapsAfterLoginNNTPv2Handler
1234
1235 def test_caps_only_after_login(self):
1236 self.assertEqual(self.server._caps, {})
1237 self.server.login('testuser', 'testpw')
1238 self.assertIn('VERSION', self.server._caps)
1239
1240
Antoine Pitrou71135622012-02-14 23:29:34 +01001241class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin,
1242 unittest.TestCase):
1243 """Same tests as for v2 but we tell NTTP to send MODE READER to a server
1244 that isn't in READER mode by default."""
1245
1246 nntp_version = 2
1247 handler_class = ModeSwitchingNNTPv2Handler
1248
1249 def test_we_are_in_reader_mode_after_connect(self):
1250 self.assertIn('READER', self.server._caps)
1251
1252
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001253class MiscTests(unittest.TestCase):
1254
1255 def test_decode_header(self):
1256 def gives(a, b):
1257 self.assertEqual(nntplib.decode_header(a), b)
1258 gives("" , "")
1259 gives("a plain header", "a plain header")
1260 gives(" with extra spaces ", " with extra spaces ")
1261 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "DĂ©buter en Python")
1262 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1263 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1264 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1265 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1266 "Re: problème de matrice")
1267 # A natively utf-8 header (found in the real world!)
1268 gives("Re: Message d'erreur incompréhensible (par moi)",
1269 "Re: Message d'erreur incompréhensible (par moi)")
1270
1271 def test_parse_overview_fmt(self):
1272 # The minimal (default) response
1273 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1274 "References:", ":bytes", ":lines"]
1275 self.assertEqual(nntplib._parse_overview_fmt(lines),
1276 ["subject", "from", "date", "message-id", "references",
1277 ":bytes", ":lines"])
1278 # The minimal response using alternative names
1279 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1280 "References:", "Bytes:", "Lines:"]
1281 self.assertEqual(nntplib._parse_overview_fmt(lines),
1282 ["subject", "from", "date", "message-id", "references",
1283 ":bytes", ":lines"])
1284 # Variations in casing
1285 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1286 "References:", "BYTES:", "Lines:"]
1287 self.assertEqual(nntplib._parse_overview_fmt(lines),
1288 ["subject", "from", "date", "message-id", "references",
1289 ":bytes", ":lines"])
1290 # First example from RFC 3977
1291 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1292 "References:", ":bytes", ":lines", "Xref:full",
1293 "Distribution:full"]
1294 self.assertEqual(nntplib._parse_overview_fmt(lines),
1295 ["subject", "from", "date", "message-id", "references",
1296 ":bytes", ":lines", "xref", "distribution"])
1297 # Second example from RFC 3977
1298 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1299 "References:", "Bytes:", "Lines:", "Xref:FULL",
1300 "Distribution:FULL"]
1301 self.assertEqual(nntplib._parse_overview_fmt(lines),
1302 ["subject", "from", "date", "message-id", "references",
1303 ":bytes", ":lines", "xref", "distribution"])
1304 # A classic response from INN
1305 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1306 "References:", "Bytes:", "Lines:", "Xref:full"]
1307 self.assertEqual(nntplib._parse_overview_fmt(lines),
1308 ["subject", "from", "date", "message-id", "references",
1309 ":bytes", ":lines", "xref"])
1310
1311 def test_parse_overview(self):
1312 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1313 # First example from RFC 3977
1314 lines = [
1315 '3000234\tI am just a test article\t"Demo User" '
1316 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1317 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1318 '17\tXref: news.example.com misc.test:3000363',
1319 ]
1320 overview = nntplib._parse_overview(lines, fmt)
1321 (art_num, fields), = overview
1322 self.assertEqual(art_num, 3000234)
1323 self.assertEqual(fields, {
1324 'subject': 'I am just a test article',
1325 'from': '"Demo User" <nobody@example.com>',
1326 'date': '6 Oct 1998 04:38:40 -0500',
1327 'message-id': '<45223423@example.com>',
1328 'references': '<45454@example.net>',
1329 ':bytes': '1234',
1330 ':lines': '17',
1331 'xref': 'news.example.com misc.test:3000363',
1332 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001333 # Second example; here the "Xref" field is totally absent (including
1334 # the header name) and comes out as None
1335 lines = [
1336 '3000234\tI am just a test article\t"Demo User" '
1337 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1338 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1339 '17\t\t',
1340 ]
1341 overview = nntplib._parse_overview(lines, fmt)
1342 (art_num, fields), = overview
1343 self.assertEqual(fields['xref'], None)
1344 # Third example; the "Xref" is an empty string, while "references"
1345 # is a single space.
1346 lines = [
1347 '3000234\tI am just a test article\t"Demo User" '
1348 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1349 '<45223423@example.com>\t \t1234\t'
1350 '17\tXref: \t',
1351 ]
1352 overview = nntplib._parse_overview(lines, fmt)
1353 (art_num, fields), = overview
1354 self.assertEqual(fields['references'], ' ')
1355 self.assertEqual(fields['xref'], '')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001356
1357 def test_parse_datetime(self):
1358 def gives(a, b, *c):
1359 self.assertEqual(nntplib._parse_datetime(a, b),
1360 datetime.datetime(*c))
1361 # Output of DATE command
1362 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1363 # Variations
1364 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1365 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1366 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1367
1368 def test_unparse_datetime(self):
1369 # Test non-legacy mode
1370 # 1) with a datetime
1371 def gives(y, M, d, h, m, s, date_str, time_str):
1372 dt = datetime.datetime(y, M, d, h, m, s)
1373 self.assertEqual(nntplib._unparse_datetime(dt),
1374 (date_str, time_str))
1375 self.assertEqual(nntplib._unparse_datetime(dt, False),
1376 (date_str, time_str))
1377 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1378 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1379 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1380 # 2) with a date
1381 def gives(y, M, d, date_str, time_str):
1382 dt = datetime.date(y, M, d)
1383 self.assertEqual(nntplib._unparse_datetime(dt),
1384 (date_str, time_str))
1385 self.assertEqual(nntplib._unparse_datetime(dt, False),
1386 (date_str, time_str))
1387 gives(1999, 6, 23, "19990623", "000000")
1388 gives(2000, 6, 23, "20000623", "000000")
1389 gives(2010, 6, 5, "20100605", "000000")
1390
1391 def test_unparse_datetime_legacy(self):
1392 # Test legacy mode (RFC 977)
1393 # 1) with a datetime
1394 def gives(y, M, d, h, m, s, date_str, time_str):
1395 dt = datetime.datetime(y, M, d, h, m, s)
1396 self.assertEqual(nntplib._unparse_datetime(dt, True),
1397 (date_str, time_str))
1398 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1399 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1400 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1401 # 2) with a date
1402 def gives(y, M, d, date_str, time_str):
1403 dt = datetime.date(y, M, d)
1404 self.assertEqual(nntplib._unparse_datetime(dt, True),
1405 (date_str, time_str))
1406 gives(1999, 6, 23, "990623", "000000")
1407 gives(2000, 6, 23, "000623", "000000")
1408 gives(2010, 6, 5, "100605", "000000")
1409
1410
1411def test_main():
Antoine Pitrou54411c12012-02-12 19:14:17 +01001412 tests = [MiscTests, NNTPv1Tests, NNTPv2Tests, CapsAfterLoginNNTPv2Tests,
Antoine Pitrou71135622012-02-14 23:29:34 +01001413 SendReaderNNTPv2Tests, NetworkedNNTPTests]
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00001414 if _have_ssl:
1415 tests.append(NetworkedNNTP_SSLTests)
1416 support.run_unittest(*tests)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001417
1418
1419if __name__ == "__main__":
1420 test_main()