blob: 19fb10a9c71678778382da2a42cd8b4ec8b57a32 [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +00002import socket
Antoine Pitrou69ab9512010-09-29 15:03:40 +00003import datetime
4import textwrap
5import unittest
Antoine Pitroude609182010-11-18 17:29:23 +00006import functools
Antoine Pitrou69ab9512010-09-29 15:03:40 +00007import contextlib
8from test import support
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00009from nntplib import NNTP, GroupInfo, _have_ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +000010import nntplib
Antoine Pitrou1cb121e2010-11-09 18:54:37 +000011if _have_ssl:
12 import ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +000013
14TIMEOUT = 30
15
16# TODO:
17# - test the `file` arg to more commands
18# - test error conditions
Antoine Pitroua5785b12010-09-29 16:19:50 +000019# - test auth and `usenetrc`
Antoine Pitrou69ab9512010-09-29 15:03:40 +000020
21
22class NetworkedNNTPTestsMixin:
23
24 def test_welcome(self):
25 welcome = self.server.getwelcome()
26 self.assertEqual(str, type(welcome))
27
28 def test_help(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000029 resp, lines = self.server.help()
Antoine Pitrou69ab9512010-09-29 15:03:40 +000030 self.assertTrue(resp.startswith("100 "), resp)
Antoine Pitrou08eeada2010-11-04 21:36:15 +000031 for line in lines:
Antoine Pitrou69ab9512010-09-29 15:03:40 +000032 self.assertEqual(str, type(line))
33
34 def test_list(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000035 resp, groups = self.server.list()
36 if len(groups) > 0:
37 self.assertEqual(GroupInfo, type(groups[0]))
38 self.assertEqual(str, type(groups[0].group))
39
40 def test_list_active(self):
41 resp, groups = self.server.list(self.GROUP_PAT)
42 if len(groups) > 0:
43 self.assertEqual(GroupInfo, type(groups[0]))
44 self.assertEqual(str, type(groups[0].group))
Antoine Pitrou69ab9512010-09-29 15:03:40 +000045
46 def test_unknown_command(self):
47 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
48 self.server._shortcmd("XYZZY")
49 resp = cm.exception.response
50 self.assertTrue(resp.startswith("500 "), resp)
51
52 def test_newgroups(self):
53 # gmane gets a constant influx of new groups. In order not to stress
54 # the server too much, we choose a recent date in the past.
55 dt = datetime.date.today() - datetime.timedelta(days=7)
56 resp, groups = self.server.newgroups(dt)
57 if len(groups) > 0:
58 self.assertIsInstance(groups[0], GroupInfo)
59 self.assertIsInstance(groups[0].group, str)
60
61 def test_description(self):
62 def _check_desc(desc):
63 # Sanity checks
64 self.assertIsInstance(desc, str)
65 self.assertNotIn(self.GROUP_NAME, desc)
66 desc = self.server.description(self.GROUP_NAME)
67 _check_desc(desc)
68 # Another sanity check
69 self.assertIn("Python", desc)
70 # With a pattern
71 desc = self.server.description(self.GROUP_PAT)
72 _check_desc(desc)
73 # Shouldn't exist
74 desc = self.server.description("zk.brrtt.baz")
75 self.assertEqual(desc, '')
76
77 def test_descriptions(self):
78 resp, descs = self.server.descriptions(self.GROUP_PAT)
79 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
80 self.assertTrue(
81 resp.startswith("215 ") or resp.startswith("282 "), resp)
82 self.assertIsInstance(descs, dict)
83 desc = descs[self.GROUP_NAME]
84 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
85
86 def test_group(self):
87 result = self.server.group(self.GROUP_NAME)
88 self.assertEqual(5, len(result))
89 resp, count, first, last, group = result
90 self.assertEqual(group, self.GROUP_NAME)
91 self.assertIsInstance(count, int)
92 self.assertIsInstance(first, int)
93 self.assertIsInstance(last, int)
94 self.assertLessEqual(first, last)
95 self.assertTrue(resp.startswith("211 "), resp)
96
97 def test_date(self):
98 resp, date = self.server.date()
99 self.assertIsInstance(date, datetime.datetime)
100 # Sanity check
101 self.assertGreaterEqual(date.year, 1995)
102 self.assertLessEqual(date.year, 2030)
103
104 def _check_art_dict(self, art_dict):
105 # Some sanity checks for a field dictionary returned by OVER / XOVER
106 self.assertIsInstance(art_dict, dict)
107 # NNTP has 7 mandatory fields
108 self.assertGreaterEqual(art_dict.keys(),
109 {"subject", "from", "date", "message-id",
110 "references", ":bytes", ":lines"}
111 )
112 for v in art_dict.values():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000113 self.assertIsInstance(v, (str, type(None)))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000114
115 def test_xover(self):
116 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000117 resp, lines = self.server.xover(last - 5, last)
118 if len(lines) == 0:
119 self.skipTest("no articles retrieved")
120 # The 'last' article is not necessarily part of the output (cancelled?)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000121 art_num, art_dict = lines[0]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000122 self.assertGreaterEqual(art_num, last - 5)
123 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000124 self._check_art_dict(art_dict)
125
126 def test_over(self):
127 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
128 start = last - 10
129 # The "start-" article range form
130 resp, lines = self.server.over((start, None))
131 art_num, art_dict = lines[0]
132 self._check_art_dict(art_dict)
133 # The "start-end" article range form
134 resp, lines = self.server.over((start, last))
135 art_num, art_dict = lines[-1]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000136 # The 'last' article is not necessarily part of the output (cancelled?)
137 self.assertGreaterEqual(art_num, start)
138 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000139 self._check_art_dict(art_dict)
140 # XXX The "message_id" form is unsupported by gmane
141 # 503 Overview by message-ID unsupported
142
143 def test_xhdr(self):
144 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
145 resp, lines = self.server.xhdr('subject', last)
146 for line in lines:
147 self.assertEqual(str, type(line[1]))
148
149 def check_article_resp(self, resp, article, art_num=None):
150 self.assertIsInstance(article, nntplib.ArticleInfo)
151 if art_num is not None:
152 self.assertEqual(article.number, art_num)
153 for line in article.lines:
154 self.assertIsInstance(line, bytes)
155 # XXX this could exceptionally happen...
156 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
157
158 def test_article_head_body(self):
159 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000160 # Try to find an available article
161 for art_num in (last, first, last - 1):
162 try:
163 resp, head = self.server.head(art_num)
164 except nntplib.NNTPTemporaryError as e:
165 if not e.response.startswith("423 "):
166 raise
167 # "423 No such article" => choose another one
168 continue
169 break
170 else:
171 self.skipTest("could not find a suitable article number")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000172 self.assertTrue(resp.startswith("221 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000173 self.check_article_resp(resp, head, art_num)
174 resp, body = self.server.body(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000175 self.assertTrue(resp.startswith("222 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000176 self.check_article_resp(resp, body, art_num)
177 resp, article = self.server.article(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000178 self.assertTrue(resp.startswith("220 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000179 self.check_article_resp(resp, article, art_num)
Nick Coghlan14d99a12012-06-17 21:27:18 +1000180 # Tolerate running the tests from behind a NNTP virus checker
Antoine Pitrou1f5d2a02012-06-24 16:28:18 +0200181 blacklist = lambda line: line.startswith(b'X-Antivirus')
182 filtered_head_lines = [line for line in head.lines
183 if not blacklist(line)]
Nick Coghlan14d99a12012-06-17 21:27:18 +1000184 filtered_lines = [line for line in article.lines
Antoine Pitrou1f5d2a02012-06-24 16:28:18 +0200185 if not blacklist(line)]
186 self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000187
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000188 def test_capabilities(self):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000189 # The server under test implements NNTP version 2 and has a
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000190 # couple of well-known capabilities. Just sanity check that we
191 # got them.
192 def _check_caps(caps):
193 caps_list = caps['LIST']
194 self.assertIsInstance(caps_list, (list, tuple))
195 self.assertIn('OVERVIEW.FMT', caps_list)
196 self.assertGreaterEqual(self.server.nntp_version, 2)
197 _check_caps(self.server.getcapabilities())
198 # This re-emits the command
199 resp, caps = self.server.capabilities()
200 _check_caps(caps)
201
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000202 if _have_ssl:
203 def test_starttls(self):
204 file = self.server.file
205 sock = self.server.sock
206 try:
207 self.server.starttls()
208 except nntplib.NNTPPermanentError:
209 self.skipTest("STARTTLS not supported by server.")
210 else:
211 # Check that the socket and internal pseudo-file really were
212 # changed.
213 self.assertNotEqual(file, self.server.file)
214 self.assertNotEqual(sock, self.server.sock)
215 # Check that the new socket really is an SSL one
216 self.assertIsInstance(self.server.sock, ssl.SSLSocket)
217 # Check that trying starttls when it's already active fails.
218 self.assertRaises(ValueError, self.server.starttls)
219
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000220 def test_zlogin(self):
221 # This test must be the penultimate because further commands will be
222 # refused.
223 baduser = "notarealuser"
224 badpw = "notarealpassword"
225 # Check that bogus credentials cause failure
226 self.assertRaises(nntplib.NNTPError, self.server.login,
227 user=baduser, password=badpw, usenetrc=False)
228 # FIXME: We should check that correct credentials succeed, but that
229 # would require valid details for some server somewhere to be in the
230 # test suite, I think. Gmane is anonymous, at least as used for the
231 # other tests.
232
233 def test_zzquit(self):
234 # This test must be called last, hence the name
235 cls = type(self)
Antoine Pitrou3bce11c2010-11-21 17:14:19 +0000236 try:
237 self.server.quit()
238 finally:
239 cls.server = None
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000240
Antoine Pitroude609182010-11-18 17:29:23 +0000241 @classmethod
242 def wrap_methods(cls):
243 # Wrap all methods in a transient_internet() exception catcher
244 # XXX put a generic version in test.support?
245 def wrap_meth(meth):
246 @functools.wraps(meth)
247 def wrapped(self):
248 with support.transient_internet(self.NNTP_HOST):
249 meth(self)
250 return wrapped
251 for name in dir(cls):
252 if not name.startswith('test_'):
253 continue
254 meth = getattr(cls, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200255 if not callable(meth):
Antoine Pitroude609182010-11-18 17:29:23 +0000256 continue
257 # Need to use a closure so that meth remains bound to its current
258 # value
259 setattr(cls, name, wrap_meth(meth))
260
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000261 def test_with_statement(self):
262 def is_connected():
263 if not hasattr(server, 'file'):
264 return False
265 try:
266 server.help()
267 except (socket.error, EOFError):
268 return False
269 return True
270
271 with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
272 self.assertTrue(is_connected())
273 self.assertTrue(server.help())
274 self.assertFalse(is_connected())
275
276 with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
277 server.quit()
278 self.assertFalse(is_connected())
279
280
Antoine Pitroude609182010-11-18 17:29:23 +0000281NetworkedNNTPTestsMixin.wrap_methods()
282
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000283
284class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
285 # This server supports STARTTLS (gmane doesn't)
286 NNTP_HOST = 'news.trigofacile.com'
287 GROUP_NAME = 'fr.comp.lang.python'
288 GROUP_PAT = 'fr.comp.lang.*'
289
Antoine Pitroude609182010-11-18 17:29:23 +0000290 NNTP_CLASS = NNTP
291
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000292 @classmethod
293 def setUpClass(cls):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000294 support.requires("network")
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000295 with support.transient_internet(cls.NNTP_HOST):
Antoine Pitroude609182010-11-18 17:29:23 +0000296 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, timeout=TIMEOUT, usenetrc=False)
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000297
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000298 @classmethod
299 def tearDownClass(cls):
300 if cls.server is not None:
301 cls.server.quit()
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000302
303
304if _have_ssl:
Antoine Pitroude609182010-11-18 17:29:23 +0000305 class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000306
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000307 # Technical limits for this public NNTP server (see http://www.aioe.org):
308 # "Only two concurrent connections per IP address are allowed and
309 # 400 connections per day are accepted from each IP address."
310
311 NNTP_HOST = 'nntp.aioe.org'
312 GROUP_NAME = 'comp.lang.python'
313 GROUP_PAT = 'comp.lang.*'
314
Antoine Pitroude609182010-11-18 17:29:23 +0000315 NNTP_CLASS = nntplib.NNTP_SSL
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000316
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000317 # Disabled as it produces too much data
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000318 test_list = None
319
320 # Disabled as the connection will already be encrypted.
321 test_starttls = None
322
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000323
324#
325# Non-networked tests using a local server (or something mocking it).
326#
327
328class _NNTPServerIO(io.RawIOBase):
329 """A raw IO object allowing NNTP commands to be received and processed
330 by a handler. The handler can push responses which can then be read
331 from the IO object."""
332
333 def __init__(self, handler):
334 io.RawIOBase.__init__(self)
335 # The channel from the client
336 self.c2s = io.BytesIO()
337 # The channel to the client
338 self.s2c = io.BytesIO()
339 self.handler = handler
340 self.handler.start(self.c2s.readline, self.push_data)
341
342 def readable(self):
343 return True
344
345 def writable(self):
346 return True
347
348 def push_data(self, data):
349 """Push (buffer) some data to send to the client."""
350 pos = self.s2c.tell()
351 self.s2c.seek(0, 2)
352 self.s2c.write(data)
353 self.s2c.seek(pos)
354
355 def write(self, b):
356 """The client sends us some data"""
357 pos = self.c2s.tell()
358 self.c2s.write(b)
359 self.c2s.seek(pos)
360 self.handler.process_pending()
361 return len(b)
362
363 def readinto(self, buf):
364 """The client wants to read a response"""
365 self.handler.process_pending()
366 b = self.s2c.read(len(buf))
367 n = len(b)
368 buf[:n] = b
369 return n
370
371
372class MockedNNTPTestsMixin:
373 # Override in derived classes
374 handler_class = None
375
376 def setUp(self):
377 super().setUp()
378 self.make_server()
379
380 def tearDown(self):
381 super().tearDown()
382 del self.server
383
384 def make_server(self, *args, **kwargs):
385 self.handler = self.handler_class()
386 self.sio = _NNTPServerIO(self.handler)
387 # Using BufferedRWPair instead of BufferedRandom ensures the file
388 # isn't seekable.
389 file = io.BufferedRWPair(self.sio, self.sio)
Antoine Pitroua5785b12010-09-29 16:19:50 +0000390 self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000391 return self.server
392
393
Antoine Pitrou71135622012-02-14 23:29:34 +0100394class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin):
395 def setUp(self):
396 super().setUp()
397 self.make_server(readermode=True)
398
399
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000400class NNTPv1Handler:
401 """A handler for RFC 977"""
402
403 welcome = "200 NNTP mock server"
404
405 def start(self, readline, push_data):
406 self.in_body = False
407 self.allow_posting = True
408 self._readline = readline
409 self._push_data = push_data
Antoine Pitrou54411c12012-02-12 19:14:17 +0100410 self._logged_in = False
411 self._user_sent = False
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000412 # Our welcome
413 self.handle_welcome()
414
415 def _decode(self, data):
416 return str(data, "utf-8", "surrogateescape")
417
418 def process_pending(self):
419 if self.in_body:
420 while True:
421 line = self._readline()
422 if not line:
423 return
424 self.body.append(line)
425 if line == b".\r\n":
426 break
427 try:
428 meth, tokens = self.body_callback
429 meth(*tokens, body=self.body)
430 finally:
431 self.body_callback = None
432 self.body = None
433 self.in_body = False
434 while True:
435 line = self._decode(self._readline())
436 if not line:
437 return
438 if not line.endswith("\r\n"):
439 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
440 line = line[:-2]
441 cmd, *tokens = line.split()
442 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
443 meth = getattr(self, "handle_" + cmd.upper(), None)
444 if meth is None:
445 self.handle_unknown()
446 else:
447 try:
448 meth(*tokens)
449 except Exception as e:
450 raise ValueError("command failed: {!r}".format(line)) from e
451 else:
452 if self.in_body:
453 self.body_callback = meth, tokens
454 self.body = []
455
456 def expect_body(self):
457 """Flag that the client is expected to post a request body"""
458 self.in_body = True
459
460 def push_data(self, data):
461 """Push some binary data"""
462 self._push_data(data)
463
464 def push_lit(self, lit):
465 """Push a string literal"""
466 lit = textwrap.dedent(lit)
467 lit = "\r\n".join(lit.splitlines()) + "\r\n"
468 lit = lit.encode('utf-8')
469 self.push_data(lit)
470
471 def handle_unknown(self):
472 self.push_lit("500 What?")
473
474 def handle_welcome(self):
475 self.push_lit(self.welcome)
476
477 def handle_QUIT(self):
478 self.push_lit("205 Bye!")
479
480 def handle_DATE(self):
481 self.push_lit("111 20100914001155")
482
483 def handle_GROUP(self, group):
484 if group == "fr.comp.lang.python":
485 self.push_lit("211 486 761 1265 fr.comp.lang.python")
486 else:
487 self.push_lit("411 No such group {}".format(group))
488
489 def handle_HELP(self):
490 self.push_lit("""\
491 100 Legal commands
492 authinfo user Name|pass Password|generic <prog> <args>
493 date
494 help
495 Report problems to <root@example.org>
496 .""")
497
498 def handle_STAT(self, message_spec=None):
499 if message_spec is None:
500 self.push_lit("412 No newsgroup selected")
501 elif message_spec == "3000234":
502 self.push_lit("223 3000234 <45223423@example.com>")
503 elif message_spec == "<45223423@example.com>":
504 self.push_lit("223 0 <45223423@example.com>")
505 else:
506 self.push_lit("430 No Such Article Found")
507
508 def handle_NEXT(self):
509 self.push_lit("223 3000237 <668929@example.org> retrieved")
510
511 def handle_LAST(self):
512 self.push_lit("223 3000234 <45223423@example.com> retrieved")
513
514 def handle_LIST(self, action=None, param=None):
515 if action is None:
516 self.push_lit("""\
517 215 Newsgroups in form "group high low flags".
518 comp.lang.python 0000052340 0000002828 y
519 comp.lang.python.announce 0000001153 0000000993 m
520 free.it.comp.lang.python 0000000002 0000000002 y
521 fr.comp.lang.python 0000001254 0000000760 y
522 free.it.comp.lang.python.learner 0000000000 0000000001 y
523 tw.bbs.comp.lang.python 0000000304 0000000304 y
524 .""")
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000525 elif action == "ACTIVE":
526 if param == "*distutils*":
527 self.push_lit("""\
528 215 Newsgroups in form "group high low flags"
529 gmane.comp.python.distutils.devel 0000014104 0000000001 m
530 gmane.comp.python.distutils.cvs 0000000000 0000000001 m
531 .""")
532 else:
533 self.push_lit("""\
534 215 Newsgroups in form "group high low flags"
535 .""")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000536 elif action == "OVERVIEW.FMT":
537 self.push_lit("""\
538 215 Order of fields in overview database.
539 Subject:
540 From:
541 Date:
542 Message-ID:
543 References:
544 Bytes:
545 Lines:
546 Xref:full
547 .""")
548 elif action == "NEWSGROUPS":
549 assert param is not None
550 if param == "comp.lang.python":
551 self.push_lit("""\
552 215 Descriptions in form "group description".
553 comp.lang.python\tThe Python computer language.
554 .""")
555 elif param == "comp.lang.python*":
556 self.push_lit("""\
557 215 Descriptions in form "group description".
558 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
559 comp.lang.python\tThe Python computer language.
560 .""")
561 else:
562 self.push_lit("""\
563 215 Descriptions in form "group description".
564 .""")
565 else:
566 self.push_lit('501 Unknown LIST keyword')
567
568 def handle_NEWNEWS(self, group, date_str, time_str):
569 # We hard code different return messages depending on passed
570 # argument and date syntax.
571 if (group == "comp.lang.python" and date_str == "20100913"
572 and time_str == "082004"):
573 # Date was passed in RFC 3977 format (NNTP "v2")
574 self.push_lit("""\
575 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
576 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
577 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
578 .""")
579 elif (group == "comp.lang.python" and date_str == "100913"
580 and time_str == "082004"):
581 # Date was passed in RFC 977 format (NNTP "v1")
582 self.push_lit("""\
583 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
584 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
585 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
586 .""")
587 else:
588 self.push_lit("""\
589 230 An empty list of newsarticles follows
590 .""")
591 # (Note for experiments: many servers disable NEWNEWS.
592 # As of this writing, sicinfo3.epfl.ch doesn't.)
593
594 def handle_XOVER(self, message_spec):
595 if message_spec == "57-59":
596 self.push_lit(
597 "224 Overview information for 57-58 follows\n"
598 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
599 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
600 "\tSat, 19 Jun 2010 18:04:08 -0400"
601 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
602 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
603 "\tXref: news.gmane.org gmane.comp.python.authors:57"
604 "\n"
605 "58\tLooking for a few good bloggers"
606 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
607 "\tThu, 22 Jul 2010 09:14:14 -0400"
608 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
609 "\t\t6683\t16"
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000610 "\t"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000611 "\n"
612 # An UTF-8 overview line from fr.comp.lang.python
613 "59\tRe: Message d'erreur incompréhensible (par moi)"
614 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
615 "\tWed, 15 Sep 2010 18:09:15 +0200"
616 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
617 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
618 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
619 "\n"
620 ".\n")
621 else:
622 self.push_lit("""\
623 224 No articles
624 .""")
625
626 def handle_POST(self, *, body=None):
627 if body is None:
628 if self.allow_posting:
629 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
630 self.expect_body()
631 else:
632 self.push_lit("440 Posting not permitted")
633 else:
634 assert self.allow_posting
635 self.push_lit("240 Article received OK")
636 self.posted_body = body
637
638 def handle_IHAVE(self, message_id, *, body=None):
639 if body is None:
640 if (self.allow_posting and
641 message_id == "<i.am.an.article.you.will.want@example.com>"):
642 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
643 self.expect_body()
644 else:
645 self.push_lit("435 Article not wanted")
646 else:
647 assert self.allow_posting
648 self.push_lit("235 Article transferred OK")
649 self.posted_body = body
650
651 sample_head = """\
652 From: "Demo User" <nobody@example.net>
653 Subject: I am just a test article
654 Content-Type: text/plain; charset=UTF-8; format=flowed
655 Message-ID: <i.am.an.article.you.will.want@example.com>"""
656
657 sample_body = """\
658 This is just a test article.
659 ..Here is a dot-starting line.
660
661 -- Signed by Andr\xe9."""
662
663 sample_article = sample_head + "\n\n" + sample_body
664
665 def handle_ARTICLE(self, message_spec=None):
666 if message_spec is None:
667 self.push_lit("220 3000237 <45223423@example.com>")
668 elif message_spec == "<45223423@example.com>":
669 self.push_lit("220 0 <45223423@example.com>")
670 elif message_spec == "3000234":
671 self.push_lit("220 3000234 <45223423@example.com>")
672 else:
673 self.push_lit("430 No Such Article Found")
674 return
675 self.push_lit(self.sample_article)
676 self.push_lit(".")
677
678 def handle_HEAD(self, message_spec=None):
679 if message_spec is None:
680 self.push_lit("221 3000237 <45223423@example.com>")
681 elif message_spec == "<45223423@example.com>":
682 self.push_lit("221 0 <45223423@example.com>")
683 elif message_spec == "3000234":
684 self.push_lit("221 3000234 <45223423@example.com>")
685 else:
686 self.push_lit("430 No Such Article Found")
687 return
688 self.push_lit(self.sample_head)
689 self.push_lit(".")
690
691 def handle_BODY(self, message_spec=None):
692 if message_spec is None:
693 self.push_lit("222 3000237 <45223423@example.com>")
694 elif message_spec == "<45223423@example.com>":
695 self.push_lit("222 0 <45223423@example.com>")
696 elif message_spec == "3000234":
697 self.push_lit("222 3000234 <45223423@example.com>")
698 else:
699 self.push_lit("430 No Such Article Found")
700 return
701 self.push_lit(self.sample_body)
702 self.push_lit(".")
703
Antoine Pitrou54411c12012-02-12 19:14:17 +0100704 def handle_AUTHINFO(self, cred_type, data):
705 if self._logged_in:
706 self.push_lit('502 Already Logged In')
707 elif cred_type == 'user':
708 if self._user_sent:
709 self.push_lit('482 User Credential Already Sent')
710 else:
711 self.push_lit('381 Password Required')
712 self._user_sent = True
713 elif cred_type == 'pass':
714 self.push_lit('281 Login Successful')
715 self._logged_in = True
716 else:
717 raise Exception('Unknown cred type {}'.format(cred_type))
718
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000719
720class NNTPv2Handler(NNTPv1Handler):
721 """A handler for RFC 3977 (NNTP "v2")"""
722
723 def handle_CAPABILITIES(self):
Antoine Pitrou54411c12012-02-12 19:14:17 +0100724 fmt = """\
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000725 101 Capability list:
Antoine Pitrouf80b3f72010-11-02 22:31:52 +0000726 VERSION 2 3
Antoine Pitrou54411c12012-02-12 19:14:17 +0100727 IMPLEMENTATION INN 2.5.1{}
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000728 HDR
729 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
730 OVER
731 POST
732 READER
Antoine Pitrou54411c12012-02-12 19:14:17 +0100733 ."""
734
735 if not self._logged_in:
736 self.push_lit(fmt.format('\n AUTHINFO USER'))
737 else:
738 self.push_lit(fmt.format(''))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000739
Antoine Pitrou71135622012-02-14 23:29:34 +0100740 def handle_MODE(self, _):
741 raise Exception('MODE READER sent despite READER has been advertised')
742
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000743 def handle_OVER(self, message_spec=None):
744 return self.handle_XOVER(message_spec)
745
746
Antoine Pitrou54411c12012-02-12 19:14:17 +0100747class CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
748 """A handler that allows CAPABILITIES only after login"""
749
750 def handle_CAPABILITIES(self):
751 if not self._logged_in:
752 self.push_lit('480 You must log in.')
753 else:
754 super().handle_CAPABILITIES()
755
756
Antoine Pitrou71135622012-02-14 23:29:34 +0100757class ModeSwitchingNNTPv2Handler(NNTPv2Handler):
758 """A server that starts in transit mode"""
759
760 def __init__(self):
761 self._switched = False
762
763 def handle_CAPABILITIES(self):
764 fmt = """\
765 101 Capability list:
766 VERSION 2 3
767 IMPLEMENTATION INN 2.5.1
768 HDR
769 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
770 OVER
771 POST
772 {}READER
773 ."""
774 if self._switched:
775 self.push_lit(fmt.format(''))
776 else:
777 self.push_lit(fmt.format('MODE-'))
778
779 def handle_MODE(self, what):
780 assert not self._switched and what == 'reader'
781 self._switched = True
782 self.push_lit('200 Posting allowed')
783
784
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000785class NNTPv1v2TestsMixin:
786
787 def setUp(self):
788 super().setUp()
789
790 def test_welcome(self):
791 self.assertEqual(self.server.welcome, self.handler.welcome)
792
Antoine Pitrou54411c12012-02-12 19:14:17 +0100793 def test_authinfo(self):
794 if self.nntp_version == 2:
795 self.assertIn('AUTHINFO', self.server._caps)
796 self.server.login('testuser', 'testpw')
797 # if AUTHINFO is gone from _caps we also know that getcapabilities()
798 # has been called after login as it should
799 self.assertNotIn('AUTHINFO', self.server._caps)
800
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000801 def test_date(self):
802 resp, date = self.server.date()
803 self.assertEqual(resp, "111 20100914001155")
804 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
805
806 def test_quit(self):
807 self.assertFalse(self.sio.closed)
808 resp = self.server.quit()
809 self.assertEqual(resp, "205 Bye!")
810 self.assertTrue(self.sio.closed)
811
812 def test_help(self):
813 resp, help = self.server.help()
814 self.assertEqual(resp, "100 Legal commands")
815 self.assertEqual(help, [
816 ' authinfo user Name|pass Password|generic <prog> <args>',
817 ' date',
818 ' help',
819 'Report problems to <root@example.org>',
820 ])
821
822 def test_list(self):
823 resp, groups = self.server.list()
824 self.assertEqual(len(groups), 6)
825 g = groups[1]
826 self.assertEqual(g,
827 GroupInfo("comp.lang.python.announce", "0000001153",
828 "0000000993", "m"))
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000829 resp, groups = self.server.list("*distutils*")
830 self.assertEqual(len(groups), 2)
831 g = groups[0]
832 self.assertEqual(g,
833 GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
834 "0000000001", "m"))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000835
836 def test_stat(self):
837 resp, art_num, message_id = self.server.stat(3000234)
838 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
839 self.assertEqual(art_num, 3000234)
840 self.assertEqual(message_id, "<45223423@example.com>")
841 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
842 self.assertEqual(resp, "223 0 <45223423@example.com>")
843 self.assertEqual(art_num, 0)
844 self.assertEqual(message_id, "<45223423@example.com>")
845 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
846 self.server.stat("<non.existent.id>")
847 self.assertEqual(cm.exception.response, "430 No Such Article Found")
848 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
849 self.server.stat()
850 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
851
852 def test_next(self):
853 resp, art_num, message_id = self.server.next()
854 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
855 self.assertEqual(art_num, 3000237)
856 self.assertEqual(message_id, "<668929@example.org>")
857
858 def test_last(self):
859 resp, art_num, message_id = self.server.last()
860 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
861 self.assertEqual(art_num, 3000234)
862 self.assertEqual(message_id, "<45223423@example.com>")
863
864 def test_description(self):
865 desc = self.server.description("comp.lang.python")
866 self.assertEqual(desc, "The Python computer language.")
867 desc = self.server.description("comp.lang.pythonx")
868 self.assertEqual(desc, "")
869
870 def test_descriptions(self):
871 resp, groups = self.server.descriptions("comp.lang.python")
872 self.assertEqual(resp, '215 Descriptions in form "group description".')
873 self.assertEqual(groups, {
874 "comp.lang.python": "The Python computer language.",
875 })
876 resp, groups = self.server.descriptions("comp.lang.python*")
877 self.assertEqual(groups, {
878 "comp.lang.python": "The Python computer language.",
879 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
880 })
881 resp, groups = self.server.descriptions("comp.lang.pythonx")
882 self.assertEqual(groups, {})
883
884 def test_group(self):
885 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
886 self.assertTrue(resp.startswith("211 "), resp)
887 self.assertEqual(first, 761)
888 self.assertEqual(last, 1265)
889 self.assertEqual(count, 486)
890 self.assertEqual(group, "fr.comp.lang.python")
891 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
892 self.server.group("comp.lang.python.devel")
893 exc = cm.exception
894 self.assertTrue(exc.response.startswith("411 No such group"),
895 exc.response)
896
897 def test_newnews(self):
898 # NEWNEWS comp.lang.python [20]100913 082004
899 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
900 resp, ids = self.server.newnews("comp.lang.python", dt)
901 expected = (
902 "230 list of newsarticles (NNTP v{0}) "
903 "created after Mon Sep 13 08:20:04 2010 follows"
904 ).format(self.nntp_version)
905 self.assertEqual(resp, expected)
906 self.assertEqual(ids, [
907 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
908 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
909 ])
910 # NEWNEWS fr.comp.lang.python [20]100913 082004
911 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
912 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
913 self.assertEqual(resp, "230 An empty list of newsarticles follows")
914 self.assertEqual(ids, [])
915
916 def _check_article_body(self, lines):
917 self.assertEqual(len(lines), 4)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +0000918 self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000919 self.assertEqual(lines[-2], b"")
920 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
921 self.assertEqual(lines[-4], b"This is just a test article.")
922
923 def _check_article_head(self, lines):
924 self.assertEqual(len(lines), 4)
925 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
926 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
927
928 def _check_article_data(self, lines):
929 self.assertEqual(len(lines), 9)
930 self._check_article_head(lines[:4])
931 self._check_article_body(lines[-4:])
932 self.assertEqual(lines[4], b"")
933
934 def test_article(self):
935 # ARTICLE
936 resp, info = self.server.article()
937 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
938 art_num, message_id, lines = info
939 self.assertEqual(art_num, 3000237)
940 self.assertEqual(message_id, "<45223423@example.com>")
941 self._check_article_data(lines)
942 # ARTICLE num
943 resp, info = self.server.article(3000234)
944 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
945 art_num, message_id, lines = info
946 self.assertEqual(art_num, 3000234)
947 self.assertEqual(message_id, "<45223423@example.com>")
948 self._check_article_data(lines)
949 # ARTICLE id
950 resp, info = self.server.article("<45223423@example.com>")
951 self.assertEqual(resp, "220 0 <45223423@example.com>")
952 art_num, message_id, lines = info
953 self.assertEqual(art_num, 0)
954 self.assertEqual(message_id, "<45223423@example.com>")
955 self._check_article_data(lines)
956 # Non-existent id
957 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
958 self.server.article("<non-existent@example.com>")
959 self.assertEqual(cm.exception.response, "430 No Such Article Found")
960
961 def test_article_file(self):
962 # With a "file" argument
963 f = io.BytesIO()
964 resp, info = self.server.article(file=f)
965 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
966 art_num, message_id, lines = info
967 self.assertEqual(art_num, 3000237)
968 self.assertEqual(message_id, "<45223423@example.com>")
969 self.assertEqual(lines, [])
970 data = f.getvalue()
971 self.assertTrue(data.startswith(
972 b'From: "Demo User" <nobody@example.net>\r\n'
973 b'Subject: I am just a test article\r\n'
974 ), ascii(data))
975 self.assertTrue(data.endswith(
976 b'This is just a test article.\r\n'
977 b'.Here is a dot-starting line.\r\n'
978 b'\r\n'
979 b'-- Signed by Andr\xc3\xa9.\r\n'
980 ), ascii(data))
981
982 def test_head(self):
983 # HEAD
984 resp, info = self.server.head()
985 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
986 art_num, message_id, lines = info
987 self.assertEqual(art_num, 3000237)
988 self.assertEqual(message_id, "<45223423@example.com>")
989 self._check_article_head(lines)
990 # HEAD num
991 resp, info = self.server.head(3000234)
992 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
993 art_num, message_id, lines = info
994 self.assertEqual(art_num, 3000234)
995 self.assertEqual(message_id, "<45223423@example.com>")
996 self._check_article_head(lines)
997 # HEAD id
998 resp, info = self.server.head("<45223423@example.com>")
999 self.assertEqual(resp, "221 0 <45223423@example.com>")
1000 art_num, message_id, lines = info
1001 self.assertEqual(art_num, 0)
1002 self.assertEqual(message_id, "<45223423@example.com>")
1003 self._check_article_head(lines)
1004 # Non-existent id
1005 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1006 self.server.head("<non-existent@example.com>")
1007 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1008
Antoine Pitrou2640b522012-02-15 18:53:18 +01001009 def test_head_file(self):
1010 f = io.BytesIO()
1011 resp, info = self.server.head(file=f)
1012 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1013 art_num, message_id, lines = info
1014 self.assertEqual(art_num, 3000237)
1015 self.assertEqual(message_id, "<45223423@example.com>")
1016 self.assertEqual(lines, [])
1017 data = f.getvalue()
1018 self.assertTrue(data.startswith(
1019 b'From: "Demo User" <nobody@example.net>\r\n'
1020 b'Subject: I am just a test article\r\n'
1021 ), ascii(data))
1022 self.assertFalse(data.endswith(
1023 b'This is just a test article.\r\n'
1024 b'.Here is a dot-starting line.\r\n'
1025 b'\r\n'
1026 b'-- Signed by Andr\xc3\xa9.\r\n'
1027 ), ascii(data))
1028
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001029 def test_body(self):
1030 # BODY
1031 resp, info = self.server.body()
1032 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1033 art_num, message_id, lines = info
1034 self.assertEqual(art_num, 3000237)
1035 self.assertEqual(message_id, "<45223423@example.com>")
1036 self._check_article_body(lines)
1037 # BODY num
1038 resp, info = self.server.body(3000234)
1039 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
1040 art_num, message_id, lines = info
1041 self.assertEqual(art_num, 3000234)
1042 self.assertEqual(message_id, "<45223423@example.com>")
1043 self._check_article_body(lines)
1044 # BODY id
1045 resp, info = self.server.body("<45223423@example.com>")
1046 self.assertEqual(resp, "222 0 <45223423@example.com>")
1047 art_num, message_id, lines = info
1048 self.assertEqual(art_num, 0)
1049 self.assertEqual(message_id, "<45223423@example.com>")
1050 self._check_article_body(lines)
1051 # Non-existent id
1052 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1053 self.server.body("<non-existent@example.com>")
1054 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1055
Antoine Pitrou2640b522012-02-15 18:53:18 +01001056 def test_body_file(self):
1057 f = io.BytesIO()
1058 resp, info = self.server.body(file=f)
1059 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1060 art_num, message_id, lines = info
1061 self.assertEqual(art_num, 3000237)
1062 self.assertEqual(message_id, "<45223423@example.com>")
1063 self.assertEqual(lines, [])
1064 data = f.getvalue()
1065 self.assertFalse(data.startswith(
1066 b'From: "Demo User" <nobody@example.net>\r\n'
1067 b'Subject: I am just a test article\r\n'
1068 ), ascii(data))
1069 self.assertTrue(data.endswith(
1070 b'This is just a test article.\r\n'
1071 b'.Here is a dot-starting line.\r\n'
1072 b'\r\n'
1073 b'-- Signed by Andr\xc3\xa9.\r\n'
1074 ), ascii(data))
1075
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001076 def check_over_xover_resp(self, resp, overviews):
1077 self.assertTrue(resp.startswith("224 "), resp)
1078 self.assertEqual(len(overviews), 3)
1079 art_num, over = overviews[0]
1080 self.assertEqual(art_num, 57)
1081 self.assertEqual(over, {
1082 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
1083 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
1084 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
1085 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
1086 "references": "<hvalf7$ort$1@dough.gmane.org>",
1087 ":bytes": "7103",
1088 ":lines": "16",
1089 "xref": "news.gmane.org gmane.comp.python.authors:57"
1090 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001091 art_num, over = overviews[1]
1092 self.assertEqual(over["xref"], None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001093 art_num, over = overviews[2]
1094 self.assertEqual(over["subject"],
1095 "Re: Message d'erreur incompréhensible (par moi)")
1096
1097 def test_xover(self):
1098 resp, overviews = self.server.xover(57, 59)
1099 self.check_over_xover_resp(resp, overviews)
1100
1101 def test_over(self):
1102 # In NNTP "v1", this will fallback on XOVER
1103 resp, overviews = self.server.over((57, 59))
1104 self.check_over_xover_resp(resp, overviews)
1105
1106 sample_post = (
1107 b'From: "Demo User" <nobody@example.net>\r\n'
1108 b'Subject: I am just a test article\r\n'
1109 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
1110 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
1111 b'\r\n'
1112 b'This is just a test article.\r\n'
1113 b'.Here is a dot-starting line.\r\n'
1114 b'\r\n'
1115 b'-- Signed by Andr\xc3\xa9.\r\n'
1116 )
1117
1118 def _check_posted_body(self):
1119 # Check the raw body as received by the server
1120 lines = self.handler.posted_body
1121 # One additional line for the "." terminator
1122 self.assertEqual(len(lines), 10)
1123 self.assertEqual(lines[-1], b'.\r\n')
1124 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
1125 self.assertEqual(lines[-3], b'\r\n')
1126 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
1127 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
1128
1129 def _check_post_ihave_sub(self, func, *args, file_factory):
1130 # First the prepared post with CRLF endings
1131 post = self.sample_post
1132 func_args = args + (file_factory(post),)
1133 self.handler.posted_body = None
1134 resp = func(*func_args)
1135 self._check_posted_body()
1136 # Then the same post with "normal" line endings - they should be
1137 # converted by NNTP.post and NNTP.ihave.
1138 post = self.sample_post.replace(b"\r\n", b"\n")
1139 func_args = args + (file_factory(post),)
1140 self.handler.posted_body = None
1141 resp = func(*func_args)
1142 self._check_posted_body()
1143 return resp
1144
1145 def check_post_ihave(self, func, success_resp, *args):
1146 # With a bytes object
1147 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1148 self.assertEqual(resp, success_resp)
1149 # With a bytearray object
1150 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1151 self.assertEqual(resp, success_resp)
1152 # With a file object
1153 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1154 self.assertEqual(resp, success_resp)
1155 # With an iterable of terminated lines
1156 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001157 return iter(b.splitlines(keepends=True))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001158 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1159 self.assertEqual(resp, success_resp)
1160 # With an iterable of non-terminated lines
1161 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001162 return iter(b.splitlines(keepends=False))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001163 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1164 self.assertEqual(resp, success_resp)
1165
1166 def test_post(self):
1167 self.check_post_ihave(self.server.post, "240 Article received OK")
1168 self.handler.allow_posting = False
1169 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1170 self.server.post(self.sample_post)
1171 self.assertEqual(cm.exception.response,
1172 "440 Posting not permitted")
1173
1174 def test_ihave(self):
1175 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1176 "<i.am.an.article.you.will.want@example.com>")
1177 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1178 self.server.ihave("<another.message.id>", self.sample_post)
1179 self.assertEqual(cm.exception.response,
1180 "435 Article not wanted")
1181
1182
1183class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1184 """Tests an NNTP v1 server (no capabilities)."""
1185
1186 nntp_version = 1
1187 handler_class = NNTPv1Handler
1188
1189 def test_caps(self):
1190 caps = self.server.getcapabilities()
1191 self.assertEqual(caps, {})
1192 self.assertEqual(self.server.nntp_version, 1)
Antoine Pitroua0781152010-11-05 19:16:37 +00001193 self.assertEqual(self.server.nntp_implementation, None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001194
1195
1196class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1197 """Tests an NNTP v2 server (with capabilities)."""
1198
1199 nntp_version = 2
1200 handler_class = NNTPv2Handler
1201
1202 def test_caps(self):
1203 caps = self.server.getcapabilities()
1204 self.assertEqual(caps, {
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001205 'VERSION': ['2', '3'],
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001206 'IMPLEMENTATION': ['INN', '2.5.1'],
1207 'AUTHINFO': ['USER'],
1208 'HDR': [],
1209 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1210 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1211 'OVER': [],
1212 'POST': [],
1213 'READER': [],
1214 })
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001215 self.assertEqual(self.server.nntp_version, 3)
Antoine Pitroua0781152010-11-05 19:16:37 +00001216 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001217
1218
Antoine Pitrou54411c12012-02-12 19:14:17 +01001219class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
1220 """Tests a probably NNTP v2 server with capabilities only after login."""
1221
1222 nntp_version = 2
1223 handler_class = CapsAfterLoginNNTPv2Handler
1224
1225 def test_caps_only_after_login(self):
1226 self.assertEqual(self.server._caps, {})
1227 self.server.login('testuser', 'testpw')
1228 self.assertIn('VERSION', self.server._caps)
1229
1230
Antoine Pitrou71135622012-02-14 23:29:34 +01001231class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin,
1232 unittest.TestCase):
1233 """Same tests as for v2 but we tell NTTP to send MODE READER to a server
1234 that isn't in READER mode by default."""
1235
1236 nntp_version = 2
1237 handler_class = ModeSwitchingNNTPv2Handler
1238
1239 def test_we_are_in_reader_mode_after_connect(self):
1240 self.assertIn('READER', self.server._caps)
1241
1242
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001243class MiscTests(unittest.TestCase):
1244
1245 def test_decode_header(self):
1246 def gives(a, b):
1247 self.assertEqual(nntplib.decode_header(a), b)
1248 gives("" , "")
1249 gives("a plain header", "a plain header")
1250 gives(" with extra spaces ", " with extra spaces ")
1251 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "DĂ©buter en Python")
1252 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1253 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1254 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1255 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1256 "Re: problème de matrice")
1257 # A natively utf-8 header (found in the real world!)
1258 gives("Re: Message d'erreur incompréhensible (par moi)",
1259 "Re: Message d'erreur incompréhensible (par moi)")
1260
1261 def test_parse_overview_fmt(self):
1262 # The minimal (default) response
1263 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1264 "References:", ":bytes", ":lines"]
1265 self.assertEqual(nntplib._parse_overview_fmt(lines),
1266 ["subject", "from", "date", "message-id", "references",
1267 ":bytes", ":lines"])
1268 # The minimal response using alternative names
1269 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1270 "References:", "Bytes:", "Lines:"]
1271 self.assertEqual(nntplib._parse_overview_fmt(lines),
1272 ["subject", "from", "date", "message-id", "references",
1273 ":bytes", ":lines"])
1274 # Variations in casing
1275 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1276 "References:", "BYTES:", "Lines:"]
1277 self.assertEqual(nntplib._parse_overview_fmt(lines),
1278 ["subject", "from", "date", "message-id", "references",
1279 ":bytes", ":lines"])
1280 # First example from RFC 3977
1281 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1282 "References:", ":bytes", ":lines", "Xref:full",
1283 "Distribution:full"]
1284 self.assertEqual(nntplib._parse_overview_fmt(lines),
1285 ["subject", "from", "date", "message-id", "references",
1286 ":bytes", ":lines", "xref", "distribution"])
1287 # Second example from RFC 3977
1288 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1289 "References:", "Bytes:", "Lines:", "Xref:FULL",
1290 "Distribution:FULL"]
1291 self.assertEqual(nntplib._parse_overview_fmt(lines),
1292 ["subject", "from", "date", "message-id", "references",
1293 ":bytes", ":lines", "xref", "distribution"])
1294 # A classic response from INN
1295 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1296 "References:", "Bytes:", "Lines:", "Xref:full"]
1297 self.assertEqual(nntplib._parse_overview_fmt(lines),
1298 ["subject", "from", "date", "message-id", "references",
1299 ":bytes", ":lines", "xref"])
1300
1301 def test_parse_overview(self):
1302 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1303 # First example from RFC 3977
1304 lines = [
1305 '3000234\tI am just a test article\t"Demo User" '
1306 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1307 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1308 '17\tXref: news.example.com misc.test:3000363',
1309 ]
1310 overview = nntplib._parse_overview(lines, fmt)
1311 (art_num, fields), = overview
1312 self.assertEqual(art_num, 3000234)
1313 self.assertEqual(fields, {
1314 'subject': 'I am just a test article',
1315 'from': '"Demo User" <nobody@example.com>',
1316 'date': '6 Oct 1998 04:38:40 -0500',
1317 'message-id': '<45223423@example.com>',
1318 'references': '<45454@example.net>',
1319 ':bytes': '1234',
1320 ':lines': '17',
1321 'xref': 'news.example.com misc.test:3000363',
1322 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001323 # Second example; here the "Xref" field is totally absent (including
1324 # the header name) and comes out as None
1325 lines = [
1326 '3000234\tI am just a test article\t"Demo User" '
1327 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1328 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1329 '17\t\t',
1330 ]
1331 overview = nntplib._parse_overview(lines, fmt)
1332 (art_num, fields), = overview
1333 self.assertEqual(fields['xref'], None)
1334 # Third example; the "Xref" is an empty string, while "references"
1335 # is a single space.
1336 lines = [
1337 '3000234\tI am just a test article\t"Demo User" '
1338 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1339 '<45223423@example.com>\t \t1234\t'
1340 '17\tXref: \t',
1341 ]
1342 overview = nntplib._parse_overview(lines, fmt)
1343 (art_num, fields), = overview
1344 self.assertEqual(fields['references'], ' ')
1345 self.assertEqual(fields['xref'], '')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001346
1347 def test_parse_datetime(self):
1348 def gives(a, b, *c):
1349 self.assertEqual(nntplib._parse_datetime(a, b),
1350 datetime.datetime(*c))
1351 # Output of DATE command
1352 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1353 # Variations
1354 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1355 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1356 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1357
1358 def test_unparse_datetime(self):
1359 # Test non-legacy mode
1360 # 1) with a datetime
1361 def gives(y, M, d, h, m, s, date_str, time_str):
1362 dt = datetime.datetime(y, M, d, h, m, s)
1363 self.assertEqual(nntplib._unparse_datetime(dt),
1364 (date_str, time_str))
1365 self.assertEqual(nntplib._unparse_datetime(dt, False),
1366 (date_str, time_str))
1367 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1368 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1369 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1370 # 2) with a date
1371 def gives(y, M, d, date_str, time_str):
1372 dt = datetime.date(y, M, d)
1373 self.assertEqual(nntplib._unparse_datetime(dt),
1374 (date_str, time_str))
1375 self.assertEqual(nntplib._unparse_datetime(dt, False),
1376 (date_str, time_str))
1377 gives(1999, 6, 23, "19990623", "000000")
1378 gives(2000, 6, 23, "20000623", "000000")
1379 gives(2010, 6, 5, "20100605", "000000")
1380
1381 def test_unparse_datetime_legacy(self):
1382 # Test legacy mode (RFC 977)
1383 # 1) with a datetime
1384 def gives(y, M, d, h, m, s, date_str, time_str):
1385 dt = datetime.datetime(y, M, d, h, m, s)
1386 self.assertEqual(nntplib._unparse_datetime(dt, True),
1387 (date_str, time_str))
1388 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1389 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1390 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1391 # 2) with a date
1392 def gives(y, M, d, date_str, time_str):
1393 dt = datetime.date(y, M, d)
1394 self.assertEqual(nntplib._unparse_datetime(dt, True),
1395 (date_str, time_str))
1396 gives(1999, 6, 23, "990623", "000000")
1397 gives(2000, 6, 23, "000623", "000000")
1398 gives(2010, 6, 5, "100605", "000000")
1399
1400
1401def test_main():
Antoine Pitrou54411c12012-02-12 19:14:17 +01001402 tests = [MiscTests, NNTPv1Tests, NNTPv2Tests, CapsAfterLoginNNTPv2Tests,
Antoine Pitrou71135622012-02-14 23:29:34 +01001403 SendReaderNNTPv2Tests, NetworkedNNTPTests]
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00001404 if _have_ssl:
1405 tests.append(NetworkedNNTP_SSLTests)
1406 support.run_unittest(*tests)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001407
1408
1409if __name__ == "__main__":
1410 test_main()