blob: 7335b23b2d440eda0f29e10b63861a867b6c181c [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
2import datetime
3import textwrap
4import unittest
Antoine Pitroude609182010-11-18 17:29:23 +00005import functools
Antoine Pitrou69ab9512010-09-29 15:03:40 +00006import contextlib
7from test import support
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00008from nntplib import NNTP, GroupInfo, _have_ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +00009import nntplib
Antoine Pitrou1cb121e2010-11-09 18:54:37 +000010if _have_ssl:
11 import ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +000012
13TIMEOUT = 30
14
15# TODO:
16# - test the `file` arg to more commands
17# - test error conditions
Antoine Pitroua5785b12010-09-29 16:19:50 +000018# - test auth and `usenetrc`
Antoine Pitrou69ab9512010-09-29 15:03:40 +000019
20
21class NetworkedNNTPTestsMixin:
22
23 def test_welcome(self):
24 welcome = self.server.getwelcome()
25 self.assertEqual(str, type(welcome))
26
27 def test_help(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000028 resp, lines = self.server.help()
Antoine Pitrou69ab9512010-09-29 15:03:40 +000029 self.assertTrue(resp.startswith("100 "), resp)
Antoine Pitrou08eeada2010-11-04 21:36:15 +000030 for line in lines:
Antoine Pitrou69ab9512010-09-29 15:03:40 +000031 self.assertEqual(str, type(line))
32
33 def test_list(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000034 resp, groups = self.server.list()
35 if len(groups) > 0:
36 self.assertEqual(GroupInfo, type(groups[0]))
37 self.assertEqual(str, type(groups[0].group))
38
39 def test_list_active(self):
40 resp, groups = self.server.list(self.GROUP_PAT)
41 if len(groups) > 0:
42 self.assertEqual(GroupInfo, type(groups[0]))
43 self.assertEqual(str, type(groups[0].group))
Antoine Pitrou69ab9512010-09-29 15:03:40 +000044
45 def test_unknown_command(self):
46 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
47 self.server._shortcmd("XYZZY")
48 resp = cm.exception.response
49 self.assertTrue(resp.startswith("500 "), resp)
50
51 def test_newgroups(self):
52 # gmane gets a constant influx of new groups. In order not to stress
53 # the server too much, we choose a recent date in the past.
54 dt = datetime.date.today() - datetime.timedelta(days=7)
55 resp, groups = self.server.newgroups(dt)
56 if len(groups) > 0:
57 self.assertIsInstance(groups[0], GroupInfo)
58 self.assertIsInstance(groups[0].group, str)
59
60 def test_description(self):
61 def _check_desc(desc):
62 # Sanity checks
63 self.assertIsInstance(desc, str)
64 self.assertNotIn(self.GROUP_NAME, desc)
65 desc = self.server.description(self.GROUP_NAME)
66 _check_desc(desc)
67 # Another sanity check
68 self.assertIn("Python", desc)
69 # With a pattern
70 desc = self.server.description(self.GROUP_PAT)
71 _check_desc(desc)
72 # Shouldn't exist
73 desc = self.server.description("zk.brrtt.baz")
74 self.assertEqual(desc, '')
75
76 def test_descriptions(self):
77 resp, descs = self.server.descriptions(self.GROUP_PAT)
78 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
79 self.assertTrue(
80 resp.startswith("215 ") or resp.startswith("282 "), resp)
81 self.assertIsInstance(descs, dict)
82 desc = descs[self.GROUP_NAME]
83 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
84
85 def test_group(self):
86 result = self.server.group(self.GROUP_NAME)
87 self.assertEqual(5, len(result))
88 resp, count, first, last, group = result
89 self.assertEqual(group, self.GROUP_NAME)
90 self.assertIsInstance(count, int)
91 self.assertIsInstance(first, int)
92 self.assertIsInstance(last, int)
93 self.assertLessEqual(first, last)
94 self.assertTrue(resp.startswith("211 "), resp)
95
96 def test_date(self):
97 resp, date = self.server.date()
98 self.assertIsInstance(date, datetime.datetime)
99 # Sanity check
100 self.assertGreaterEqual(date.year, 1995)
101 self.assertLessEqual(date.year, 2030)
102
103 def _check_art_dict(self, art_dict):
104 # Some sanity checks for a field dictionary returned by OVER / XOVER
105 self.assertIsInstance(art_dict, dict)
106 # NNTP has 7 mandatory fields
107 self.assertGreaterEqual(art_dict.keys(),
108 {"subject", "from", "date", "message-id",
109 "references", ":bytes", ":lines"}
110 )
111 for v in art_dict.values():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000112 self.assertIsInstance(v, (str, type(None)))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000113
114 def test_xover(self):
115 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000116 resp, lines = self.server.xover(last - 5, last)
117 if len(lines) == 0:
118 self.skipTest("no articles retrieved")
119 # The 'last' article is not necessarily part of the output (cancelled?)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000120 art_num, art_dict = lines[0]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000121 self.assertGreaterEqual(art_num, last - 5)
122 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000123 self._check_art_dict(art_dict)
124
125 def test_over(self):
126 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
127 start = last - 10
128 # The "start-" article range form
129 resp, lines = self.server.over((start, None))
130 art_num, art_dict = lines[0]
131 self._check_art_dict(art_dict)
132 # The "start-end" article range form
133 resp, lines = self.server.over((start, last))
134 art_num, art_dict = lines[-1]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000135 # The 'last' article is not necessarily part of the output (cancelled?)
136 self.assertGreaterEqual(art_num, start)
137 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000138 self._check_art_dict(art_dict)
139 # XXX The "message_id" form is unsupported by gmane
140 # 503 Overview by message-ID unsupported
141
142 def test_xhdr(self):
143 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
144 resp, lines = self.server.xhdr('subject', last)
145 for line in lines:
146 self.assertEqual(str, type(line[1]))
147
148 def check_article_resp(self, resp, article, art_num=None):
149 self.assertIsInstance(article, nntplib.ArticleInfo)
150 if art_num is not None:
151 self.assertEqual(article.number, art_num)
152 for line in article.lines:
153 self.assertIsInstance(line, bytes)
154 # XXX this could exceptionally happen...
155 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
156
157 def test_article_head_body(self):
158 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000159 # Try to find an available article
160 for art_num in (last, first, last - 1):
161 try:
162 resp, head = self.server.head(art_num)
163 except nntplib.NNTPTemporaryError as e:
164 if not e.response.startswith("423 "):
165 raise
166 # "423 No such article" => choose another one
167 continue
168 break
169 else:
170 self.skipTest("could not find a suitable article number")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000171 self.assertTrue(resp.startswith("221 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000172 self.check_article_resp(resp, head, art_num)
173 resp, body = self.server.body(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000174 self.assertTrue(resp.startswith("222 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000175 self.check_article_resp(resp, body, art_num)
176 resp, article = self.server.article(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000177 self.assertTrue(resp.startswith("220 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000178 self.check_article_resp(resp, article, art_num)
Nick Coghlan14d99a12012-06-17 21:27:18 +1000179 # Tolerate running the tests from behind a NNTP virus checker
Antoine Pitrou1dd75a62012-06-25 18:08:54 +0200180 blacklist = lambda line: line.startswith(b'X-Antivirus')
181 filtered_head_lines = [line for line in head.lines
182 if not blacklist(line)]
Nick Coghlan14d99a12012-06-17 21:27:18 +1000183 filtered_lines = [line for line in article.lines
Antoine Pitrou1dd75a62012-06-25 18:08:54 +0200184 if not blacklist(line)]
185 self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000186
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000187 def test_capabilities(self):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000188 # The server under test implements NNTP version 2 and has a
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000189 # couple of well-known capabilities. Just sanity check that we
190 # got them.
191 def _check_caps(caps):
192 caps_list = caps['LIST']
193 self.assertIsInstance(caps_list, (list, tuple))
194 self.assertIn('OVERVIEW.FMT', caps_list)
195 self.assertGreaterEqual(self.server.nntp_version, 2)
196 _check_caps(self.server.getcapabilities())
197 # This re-emits the command
198 resp, caps = self.server.capabilities()
199 _check_caps(caps)
200
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000201 if _have_ssl:
202 def test_starttls(self):
203 file = self.server.file
204 sock = self.server.sock
205 try:
206 self.server.starttls()
207 except nntplib.NNTPPermanentError:
208 self.skipTest("STARTTLS not supported by server.")
209 else:
210 # Check that the socket and internal pseudo-file really were
211 # changed.
212 self.assertNotEqual(file, self.server.file)
213 self.assertNotEqual(sock, self.server.sock)
214 # Check that the new socket really is an SSL one
215 self.assertIsInstance(self.server.sock, ssl.SSLSocket)
216 # Check that trying starttls when it's already active fails.
217 self.assertRaises(ValueError, self.server.starttls)
218
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000219 def test_zlogin(self):
220 # This test must be the penultimate because further commands will be
221 # refused.
222 baduser = "notarealuser"
223 badpw = "notarealpassword"
224 # Check that bogus credentials cause failure
225 self.assertRaises(nntplib.NNTPError, self.server.login,
226 user=baduser, password=badpw, usenetrc=False)
227 # FIXME: We should check that correct credentials succeed, but that
228 # would require valid details for some server somewhere to be in the
229 # test suite, I think. Gmane is anonymous, at least as used for the
230 # other tests.
231
232 def test_zzquit(self):
233 # This test must be called last, hence the name
234 cls = type(self)
Antoine Pitrou3bce11c2010-11-21 17:14:19 +0000235 try:
236 self.server.quit()
237 finally:
238 cls.server = None
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000239
Antoine Pitroude609182010-11-18 17:29:23 +0000240 @classmethod
241 def wrap_methods(cls):
242 # Wrap all methods in a transient_internet() exception catcher
243 # XXX put a generic version in test.support?
244 def wrap_meth(meth):
245 @functools.wraps(meth)
246 def wrapped(self):
247 with support.transient_internet(self.NNTP_HOST):
248 meth(self)
249 return wrapped
250 for name in dir(cls):
251 if not name.startswith('test_'):
252 continue
253 meth = getattr(cls, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200254 if not callable(meth):
Antoine Pitroude609182010-11-18 17:29:23 +0000255 continue
256 # Need to use a closure so that meth remains bound to its current
257 # value
258 setattr(cls, name, wrap_meth(meth))
259
260NetworkedNNTPTestsMixin.wrap_methods()
261
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000262
263class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
264 # This server supports STARTTLS (gmane doesn't)
265 NNTP_HOST = 'news.trigofacile.com'
266 GROUP_NAME = 'fr.comp.lang.python'
267 GROUP_PAT = 'fr.comp.lang.*'
268
Antoine Pitroude609182010-11-18 17:29:23 +0000269 NNTP_CLASS = NNTP
270
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000271 @classmethod
272 def setUpClass(cls):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000273 support.requires("network")
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000274 with support.transient_internet(cls.NNTP_HOST):
Antoine Pitroude609182010-11-18 17:29:23 +0000275 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, timeout=TIMEOUT, usenetrc=False)
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000276
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000277 @classmethod
278 def tearDownClass(cls):
279 if cls.server is not None:
280 cls.server.quit()
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000281
282
283if _have_ssl:
Antoine Pitroude609182010-11-18 17:29:23 +0000284 class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000285
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000286 # Technical limits for this public NNTP server (see http://www.aioe.org):
287 # "Only two concurrent connections per IP address are allowed and
288 # 400 connections per day are accepted from each IP address."
289
290 NNTP_HOST = 'nntp.aioe.org'
291 GROUP_NAME = 'comp.lang.python'
292 GROUP_PAT = 'comp.lang.*'
293
Antoine Pitroude609182010-11-18 17:29:23 +0000294 NNTP_CLASS = nntplib.NNTP_SSL
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000295
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000296 # Disabled as it produces too much data
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000297 test_list = None
298
299 # Disabled as the connection will already be encrypted.
300 test_starttls = None
301
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000302
303#
304# Non-networked tests using a local server (or something mocking it).
305#
306
307class _NNTPServerIO(io.RawIOBase):
308 """A raw IO object allowing NNTP commands to be received and processed
309 by a handler. The handler can push responses which can then be read
310 from the IO object."""
311
312 def __init__(self, handler):
313 io.RawIOBase.__init__(self)
314 # The channel from the client
315 self.c2s = io.BytesIO()
316 # The channel to the client
317 self.s2c = io.BytesIO()
318 self.handler = handler
319 self.handler.start(self.c2s.readline, self.push_data)
320
321 def readable(self):
322 return True
323
324 def writable(self):
325 return True
326
327 def push_data(self, data):
328 """Push (buffer) some data to send to the client."""
329 pos = self.s2c.tell()
330 self.s2c.seek(0, 2)
331 self.s2c.write(data)
332 self.s2c.seek(pos)
333
334 def write(self, b):
335 """The client sends us some data"""
336 pos = self.c2s.tell()
337 self.c2s.write(b)
338 self.c2s.seek(pos)
339 self.handler.process_pending()
340 return len(b)
341
342 def readinto(self, buf):
343 """The client wants to read a response"""
344 self.handler.process_pending()
345 b = self.s2c.read(len(buf))
346 n = len(b)
347 buf[:n] = b
348 return n
349
350
351class MockedNNTPTestsMixin:
352 # Override in derived classes
353 handler_class = None
354
355 def setUp(self):
356 super().setUp()
357 self.make_server()
358
359 def tearDown(self):
360 super().tearDown()
361 del self.server
362
363 def make_server(self, *args, **kwargs):
364 self.handler = self.handler_class()
365 self.sio = _NNTPServerIO(self.handler)
366 # Using BufferedRWPair instead of BufferedRandom ensures the file
367 # isn't seekable.
368 file = io.BufferedRWPair(self.sio, self.sio)
Antoine Pitroua5785b12010-09-29 16:19:50 +0000369 self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000370 return self.server
371
372
Antoine Pitrou71135622012-02-14 23:29:34 +0100373class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin):
374 def setUp(self):
375 super().setUp()
376 self.make_server(readermode=True)
377
378
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000379class NNTPv1Handler:
380 """A handler for RFC 977"""
381
382 welcome = "200 NNTP mock server"
383
384 def start(self, readline, push_data):
385 self.in_body = False
386 self.allow_posting = True
387 self._readline = readline
388 self._push_data = push_data
Antoine Pitrou54411c12012-02-12 19:14:17 +0100389 self._logged_in = False
390 self._user_sent = False
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000391 # Our welcome
392 self.handle_welcome()
393
394 def _decode(self, data):
395 return str(data, "utf-8", "surrogateescape")
396
397 def process_pending(self):
398 if self.in_body:
399 while True:
400 line = self._readline()
401 if not line:
402 return
403 self.body.append(line)
404 if line == b".\r\n":
405 break
406 try:
407 meth, tokens = self.body_callback
408 meth(*tokens, body=self.body)
409 finally:
410 self.body_callback = None
411 self.body = None
412 self.in_body = False
413 while True:
414 line = self._decode(self._readline())
415 if not line:
416 return
417 if not line.endswith("\r\n"):
418 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
419 line = line[:-2]
420 cmd, *tokens = line.split()
421 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
422 meth = getattr(self, "handle_" + cmd.upper(), None)
423 if meth is None:
424 self.handle_unknown()
425 else:
426 try:
427 meth(*tokens)
428 except Exception as e:
429 raise ValueError("command failed: {!r}".format(line)) from e
430 else:
431 if self.in_body:
432 self.body_callback = meth, tokens
433 self.body = []
434
435 def expect_body(self):
436 """Flag that the client is expected to post a request body"""
437 self.in_body = True
438
439 def push_data(self, data):
440 """Push some binary data"""
441 self._push_data(data)
442
443 def push_lit(self, lit):
444 """Push a string literal"""
445 lit = textwrap.dedent(lit)
446 lit = "\r\n".join(lit.splitlines()) + "\r\n"
447 lit = lit.encode('utf-8')
448 self.push_data(lit)
449
450 def handle_unknown(self):
451 self.push_lit("500 What?")
452
453 def handle_welcome(self):
454 self.push_lit(self.welcome)
455
456 def handle_QUIT(self):
457 self.push_lit("205 Bye!")
458
459 def handle_DATE(self):
460 self.push_lit("111 20100914001155")
461
462 def handle_GROUP(self, group):
463 if group == "fr.comp.lang.python":
464 self.push_lit("211 486 761 1265 fr.comp.lang.python")
465 else:
466 self.push_lit("411 No such group {}".format(group))
467
468 def handle_HELP(self):
469 self.push_lit("""\
470 100 Legal commands
471 authinfo user Name|pass Password|generic <prog> <args>
472 date
473 help
474 Report problems to <root@example.org>
475 .""")
476
477 def handle_STAT(self, message_spec=None):
478 if message_spec is None:
479 self.push_lit("412 No newsgroup selected")
480 elif message_spec == "3000234":
481 self.push_lit("223 3000234 <45223423@example.com>")
482 elif message_spec == "<45223423@example.com>":
483 self.push_lit("223 0 <45223423@example.com>")
484 else:
485 self.push_lit("430 No Such Article Found")
486
487 def handle_NEXT(self):
488 self.push_lit("223 3000237 <668929@example.org> retrieved")
489
490 def handle_LAST(self):
491 self.push_lit("223 3000234 <45223423@example.com> retrieved")
492
493 def handle_LIST(self, action=None, param=None):
494 if action is None:
495 self.push_lit("""\
496 215 Newsgroups in form "group high low flags".
497 comp.lang.python 0000052340 0000002828 y
498 comp.lang.python.announce 0000001153 0000000993 m
499 free.it.comp.lang.python 0000000002 0000000002 y
500 fr.comp.lang.python 0000001254 0000000760 y
501 free.it.comp.lang.python.learner 0000000000 0000000001 y
502 tw.bbs.comp.lang.python 0000000304 0000000304 y
503 .""")
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000504 elif action == "ACTIVE":
505 if param == "*distutils*":
506 self.push_lit("""\
507 215 Newsgroups in form "group high low flags"
508 gmane.comp.python.distutils.devel 0000014104 0000000001 m
509 gmane.comp.python.distutils.cvs 0000000000 0000000001 m
510 .""")
511 else:
512 self.push_lit("""\
513 215 Newsgroups in form "group high low flags"
514 .""")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000515 elif action == "OVERVIEW.FMT":
516 self.push_lit("""\
517 215 Order of fields in overview database.
518 Subject:
519 From:
520 Date:
521 Message-ID:
522 References:
523 Bytes:
524 Lines:
525 Xref:full
526 .""")
527 elif action == "NEWSGROUPS":
528 assert param is not None
529 if param == "comp.lang.python":
530 self.push_lit("""\
531 215 Descriptions in form "group description".
532 comp.lang.python\tThe Python computer language.
533 .""")
534 elif param == "comp.lang.python*":
535 self.push_lit("""\
536 215 Descriptions in form "group description".
537 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
538 comp.lang.python\tThe Python computer language.
539 .""")
540 else:
541 self.push_lit("""\
542 215 Descriptions in form "group description".
543 .""")
544 else:
545 self.push_lit('501 Unknown LIST keyword')
546
547 def handle_NEWNEWS(self, group, date_str, time_str):
548 # We hard code different return messages depending on passed
549 # argument and date syntax.
550 if (group == "comp.lang.python" and date_str == "20100913"
551 and time_str == "082004"):
552 # Date was passed in RFC 3977 format (NNTP "v2")
553 self.push_lit("""\
554 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
555 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
556 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
557 .""")
558 elif (group == "comp.lang.python" and date_str == "100913"
559 and time_str == "082004"):
560 # Date was passed in RFC 977 format (NNTP "v1")
561 self.push_lit("""\
562 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
563 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
564 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
565 .""")
566 else:
567 self.push_lit("""\
568 230 An empty list of newsarticles follows
569 .""")
570 # (Note for experiments: many servers disable NEWNEWS.
571 # As of this writing, sicinfo3.epfl.ch doesn't.)
572
573 def handle_XOVER(self, message_spec):
574 if message_spec == "57-59":
575 self.push_lit(
576 "224 Overview information for 57-58 follows\n"
577 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
578 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
579 "\tSat, 19 Jun 2010 18:04:08 -0400"
580 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
581 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
582 "\tXref: news.gmane.org gmane.comp.python.authors:57"
583 "\n"
584 "58\tLooking for a few good bloggers"
585 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
586 "\tThu, 22 Jul 2010 09:14:14 -0400"
587 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
588 "\t\t6683\t16"
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000589 "\t"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000590 "\n"
591 # An UTF-8 overview line from fr.comp.lang.python
592 "59\tRe: Message d'erreur incompréhensible (par moi)"
593 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
594 "\tWed, 15 Sep 2010 18:09:15 +0200"
595 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
596 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
597 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
598 "\n"
599 ".\n")
600 else:
601 self.push_lit("""\
602 224 No articles
603 .""")
604
605 def handle_POST(self, *, body=None):
606 if body is None:
607 if self.allow_posting:
608 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
609 self.expect_body()
610 else:
611 self.push_lit("440 Posting not permitted")
612 else:
613 assert self.allow_posting
614 self.push_lit("240 Article received OK")
615 self.posted_body = body
616
617 def handle_IHAVE(self, message_id, *, body=None):
618 if body is None:
619 if (self.allow_posting and
620 message_id == "<i.am.an.article.you.will.want@example.com>"):
621 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
622 self.expect_body()
623 else:
624 self.push_lit("435 Article not wanted")
625 else:
626 assert self.allow_posting
627 self.push_lit("235 Article transferred OK")
628 self.posted_body = body
629
630 sample_head = """\
631 From: "Demo User" <nobody@example.net>
632 Subject: I am just a test article
633 Content-Type: text/plain; charset=UTF-8; format=flowed
634 Message-ID: <i.am.an.article.you.will.want@example.com>"""
635
636 sample_body = """\
637 This is just a test article.
638 ..Here is a dot-starting line.
639
640 -- Signed by Andr\xe9."""
641
642 sample_article = sample_head + "\n\n" + sample_body
643
644 def handle_ARTICLE(self, message_spec=None):
645 if message_spec is None:
646 self.push_lit("220 3000237 <45223423@example.com>")
647 elif message_spec == "<45223423@example.com>":
648 self.push_lit("220 0 <45223423@example.com>")
649 elif message_spec == "3000234":
650 self.push_lit("220 3000234 <45223423@example.com>")
651 else:
652 self.push_lit("430 No Such Article Found")
653 return
654 self.push_lit(self.sample_article)
655 self.push_lit(".")
656
657 def handle_HEAD(self, message_spec=None):
658 if message_spec is None:
659 self.push_lit("221 3000237 <45223423@example.com>")
660 elif message_spec == "<45223423@example.com>":
661 self.push_lit("221 0 <45223423@example.com>")
662 elif message_spec == "3000234":
663 self.push_lit("221 3000234 <45223423@example.com>")
664 else:
665 self.push_lit("430 No Such Article Found")
666 return
667 self.push_lit(self.sample_head)
668 self.push_lit(".")
669
670 def handle_BODY(self, message_spec=None):
671 if message_spec is None:
672 self.push_lit("222 3000237 <45223423@example.com>")
673 elif message_spec == "<45223423@example.com>":
674 self.push_lit("222 0 <45223423@example.com>")
675 elif message_spec == "3000234":
676 self.push_lit("222 3000234 <45223423@example.com>")
677 else:
678 self.push_lit("430 No Such Article Found")
679 return
680 self.push_lit(self.sample_body)
681 self.push_lit(".")
682
Antoine Pitrou54411c12012-02-12 19:14:17 +0100683 def handle_AUTHINFO(self, cred_type, data):
684 if self._logged_in:
685 self.push_lit('502 Already Logged In')
686 elif cred_type == 'user':
687 if self._user_sent:
688 self.push_lit('482 User Credential Already Sent')
689 else:
690 self.push_lit('381 Password Required')
691 self._user_sent = True
692 elif cred_type == 'pass':
693 self.push_lit('281 Login Successful')
694 self._logged_in = True
695 else:
696 raise Exception('Unknown cred type {}'.format(cred_type))
697
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000698
699class NNTPv2Handler(NNTPv1Handler):
700 """A handler for RFC 3977 (NNTP "v2")"""
701
702 def handle_CAPABILITIES(self):
Antoine Pitrou54411c12012-02-12 19:14:17 +0100703 fmt = """\
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000704 101 Capability list:
Antoine Pitrouf80b3f72010-11-02 22:31:52 +0000705 VERSION 2 3
Antoine Pitrou54411c12012-02-12 19:14:17 +0100706 IMPLEMENTATION INN 2.5.1{}
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000707 HDR
708 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
709 OVER
710 POST
711 READER
Antoine Pitrou54411c12012-02-12 19:14:17 +0100712 ."""
713
714 if not self._logged_in:
715 self.push_lit(fmt.format('\n AUTHINFO USER'))
716 else:
717 self.push_lit(fmt.format(''))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000718
Antoine Pitrou71135622012-02-14 23:29:34 +0100719 def handle_MODE(self, _):
720 raise Exception('MODE READER sent despite READER has been advertised')
721
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000722 def handle_OVER(self, message_spec=None):
723 return self.handle_XOVER(message_spec)
724
725
Antoine Pitrou54411c12012-02-12 19:14:17 +0100726class CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
727 """A handler that allows CAPABILITIES only after login"""
728
729 def handle_CAPABILITIES(self):
730 if not self._logged_in:
731 self.push_lit('480 You must log in.')
732 else:
733 super().handle_CAPABILITIES()
734
735
Antoine Pitrou71135622012-02-14 23:29:34 +0100736class ModeSwitchingNNTPv2Handler(NNTPv2Handler):
737 """A server that starts in transit mode"""
738
739 def __init__(self):
740 self._switched = False
741
742 def handle_CAPABILITIES(self):
743 fmt = """\
744 101 Capability list:
745 VERSION 2 3
746 IMPLEMENTATION INN 2.5.1
747 HDR
748 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
749 OVER
750 POST
751 {}READER
752 ."""
753 if self._switched:
754 self.push_lit(fmt.format(''))
755 else:
756 self.push_lit(fmt.format('MODE-'))
757
758 def handle_MODE(self, what):
759 assert not self._switched and what == 'reader'
760 self._switched = True
761 self.push_lit('200 Posting allowed')
762
763
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000764class NNTPv1v2TestsMixin:
765
766 def setUp(self):
767 super().setUp()
768
769 def test_welcome(self):
770 self.assertEqual(self.server.welcome, self.handler.welcome)
771
Antoine Pitrou54411c12012-02-12 19:14:17 +0100772 def test_authinfo(self):
773 if self.nntp_version == 2:
774 self.assertIn('AUTHINFO', self.server._caps)
775 self.server.login('testuser', 'testpw')
776 # if AUTHINFO is gone from _caps we also know that getcapabilities()
777 # has been called after login as it should
778 self.assertNotIn('AUTHINFO', self.server._caps)
779
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000780 def test_date(self):
781 resp, date = self.server.date()
782 self.assertEqual(resp, "111 20100914001155")
783 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
784
785 def test_quit(self):
786 self.assertFalse(self.sio.closed)
787 resp = self.server.quit()
788 self.assertEqual(resp, "205 Bye!")
789 self.assertTrue(self.sio.closed)
790
791 def test_help(self):
792 resp, help = self.server.help()
793 self.assertEqual(resp, "100 Legal commands")
794 self.assertEqual(help, [
795 ' authinfo user Name|pass Password|generic <prog> <args>',
796 ' date',
797 ' help',
798 'Report problems to <root@example.org>',
799 ])
800
801 def test_list(self):
802 resp, groups = self.server.list()
803 self.assertEqual(len(groups), 6)
804 g = groups[1]
805 self.assertEqual(g,
806 GroupInfo("comp.lang.python.announce", "0000001153",
807 "0000000993", "m"))
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000808 resp, groups = self.server.list("*distutils*")
809 self.assertEqual(len(groups), 2)
810 g = groups[0]
811 self.assertEqual(g,
812 GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
813 "0000000001", "m"))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000814
815 def test_stat(self):
816 resp, art_num, message_id = self.server.stat(3000234)
817 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
818 self.assertEqual(art_num, 3000234)
819 self.assertEqual(message_id, "<45223423@example.com>")
820 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
821 self.assertEqual(resp, "223 0 <45223423@example.com>")
822 self.assertEqual(art_num, 0)
823 self.assertEqual(message_id, "<45223423@example.com>")
824 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
825 self.server.stat("<non.existent.id>")
826 self.assertEqual(cm.exception.response, "430 No Such Article Found")
827 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
828 self.server.stat()
829 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
830
831 def test_next(self):
832 resp, art_num, message_id = self.server.next()
833 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
834 self.assertEqual(art_num, 3000237)
835 self.assertEqual(message_id, "<668929@example.org>")
836
837 def test_last(self):
838 resp, art_num, message_id = self.server.last()
839 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
840 self.assertEqual(art_num, 3000234)
841 self.assertEqual(message_id, "<45223423@example.com>")
842
843 def test_description(self):
844 desc = self.server.description("comp.lang.python")
845 self.assertEqual(desc, "The Python computer language.")
846 desc = self.server.description("comp.lang.pythonx")
847 self.assertEqual(desc, "")
848
849 def test_descriptions(self):
850 resp, groups = self.server.descriptions("comp.lang.python")
851 self.assertEqual(resp, '215 Descriptions in form "group description".')
852 self.assertEqual(groups, {
853 "comp.lang.python": "The Python computer language.",
854 })
855 resp, groups = self.server.descriptions("comp.lang.python*")
856 self.assertEqual(groups, {
857 "comp.lang.python": "The Python computer language.",
858 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
859 })
860 resp, groups = self.server.descriptions("comp.lang.pythonx")
861 self.assertEqual(groups, {})
862
863 def test_group(self):
864 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
865 self.assertTrue(resp.startswith("211 "), resp)
866 self.assertEqual(first, 761)
867 self.assertEqual(last, 1265)
868 self.assertEqual(count, 486)
869 self.assertEqual(group, "fr.comp.lang.python")
870 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
871 self.server.group("comp.lang.python.devel")
872 exc = cm.exception
873 self.assertTrue(exc.response.startswith("411 No such group"),
874 exc.response)
875
876 def test_newnews(self):
877 # NEWNEWS comp.lang.python [20]100913 082004
878 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
879 resp, ids = self.server.newnews("comp.lang.python", dt)
880 expected = (
881 "230 list of newsarticles (NNTP v{0}) "
882 "created after Mon Sep 13 08:20:04 2010 follows"
883 ).format(self.nntp_version)
884 self.assertEqual(resp, expected)
885 self.assertEqual(ids, [
886 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
887 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
888 ])
889 # NEWNEWS fr.comp.lang.python [20]100913 082004
890 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
891 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
892 self.assertEqual(resp, "230 An empty list of newsarticles follows")
893 self.assertEqual(ids, [])
894
895 def _check_article_body(self, lines):
896 self.assertEqual(len(lines), 4)
897 self.assertEqual(lines[-1].decode('utf8'), "-- Signed by André.")
898 self.assertEqual(lines[-2], b"")
899 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
900 self.assertEqual(lines[-4], b"This is just a test article.")
901
902 def _check_article_head(self, lines):
903 self.assertEqual(len(lines), 4)
904 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
905 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
906
907 def _check_article_data(self, lines):
908 self.assertEqual(len(lines), 9)
909 self._check_article_head(lines[:4])
910 self._check_article_body(lines[-4:])
911 self.assertEqual(lines[4], b"")
912
913 def test_article(self):
914 # ARTICLE
915 resp, info = self.server.article()
916 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
917 art_num, message_id, lines = info
918 self.assertEqual(art_num, 3000237)
919 self.assertEqual(message_id, "<45223423@example.com>")
920 self._check_article_data(lines)
921 # ARTICLE num
922 resp, info = self.server.article(3000234)
923 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
924 art_num, message_id, lines = info
925 self.assertEqual(art_num, 3000234)
926 self.assertEqual(message_id, "<45223423@example.com>")
927 self._check_article_data(lines)
928 # ARTICLE id
929 resp, info = self.server.article("<45223423@example.com>")
930 self.assertEqual(resp, "220 0 <45223423@example.com>")
931 art_num, message_id, lines = info
932 self.assertEqual(art_num, 0)
933 self.assertEqual(message_id, "<45223423@example.com>")
934 self._check_article_data(lines)
935 # Non-existent id
936 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
937 self.server.article("<non-existent@example.com>")
938 self.assertEqual(cm.exception.response, "430 No Such Article Found")
939
940 def test_article_file(self):
941 # With a "file" argument
942 f = io.BytesIO()
943 resp, info = self.server.article(file=f)
944 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
945 art_num, message_id, lines = info
946 self.assertEqual(art_num, 3000237)
947 self.assertEqual(message_id, "<45223423@example.com>")
948 self.assertEqual(lines, [])
949 data = f.getvalue()
950 self.assertTrue(data.startswith(
951 b'From: "Demo User" <nobody@example.net>\r\n'
952 b'Subject: I am just a test article\r\n'
953 ), ascii(data))
954 self.assertTrue(data.endswith(
955 b'This is just a test article.\r\n'
956 b'.Here is a dot-starting line.\r\n'
957 b'\r\n'
958 b'-- Signed by Andr\xc3\xa9.\r\n'
959 ), ascii(data))
960
961 def test_head(self):
962 # HEAD
963 resp, info = self.server.head()
964 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
965 art_num, message_id, lines = info
966 self.assertEqual(art_num, 3000237)
967 self.assertEqual(message_id, "<45223423@example.com>")
968 self._check_article_head(lines)
969 # HEAD num
970 resp, info = self.server.head(3000234)
971 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
972 art_num, message_id, lines = info
973 self.assertEqual(art_num, 3000234)
974 self.assertEqual(message_id, "<45223423@example.com>")
975 self._check_article_head(lines)
976 # HEAD id
977 resp, info = self.server.head("<45223423@example.com>")
978 self.assertEqual(resp, "221 0 <45223423@example.com>")
979 art_num, message_id, lines = info
980 self.assertEqual(art_num, 0)
981 self.assertEqual(message_id, "<45223423@example.com>")
982 self._check_article_head(lines)
983 # Non-existent id
984 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
985 self.server.head("<non-existent@example.com>")
986 self.assertEqual(cm.exception.response, "430 No Such Article Found")
987
Antoine Pitrou2640b522012-02-15 18:53:18 +0100988 def test_head_file(self):
989 f = io.BytesIO()
990 resp, info = self.server.head(file=f)
991 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
992 art_num, message_id, lines = info
993 self.assertEqual(art_num, 3000237)
994 self.assertEqual(message_id, "<45223423@example.com>")
995 self.assertEqual(lines, [])
996 data = f.getvalue()
997 self.assertTrue(data.startswith(
998 b'From: "Demo User" <nobody@example.net>\r\n'
999 b'Subject: I am just a test article\r\n'
1000 ), ascii(data))
1001 self.assertFalse(data.endswith(
1002 b'This is just a test article.\r\n'
1003 b'.Here is a dot-starting line.\r\n'
1004 b'\r\n'
1005 b'-- Signed by Andr\xc3\xa9.\r\n'
1006 ), ascii(data))
1007
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001008 def test_body(self):
1009 # BODY
1010 resp, info = self.server.body()
1011 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1012 art_num, message_id, lines = info
1013 self.assertEqual(art_num, 3000237)
1014 self.assertEqual(message_id, "<45223423@example.com>")
1015 self._check_article_body(lines)
1016 # BODY num
1017 resp, info = self.server.body(3000234)
1018 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
1019 art_num, message_id, lines = info
1020 self.assertEqual(art_num, 3000234)
1021 self.assertEqual(message_id, "<45223423@example.com>")
1022 self._check_article_body(lines)
1023 # BODY id
1024 resp, info = self.server.body("<45223423@example.com>")
1025 self.assertEqual(resp, "222 0 <45223423@example.com>")
1026 art_num, message_id, lines = info
1027 self.assertEqual(art_num, 0)
1028 self.assertEqual(message_id, "<45223423@example.com>")
1029 self._check_article_body(lines)
1030 # Non-existent id
1031 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1032 self.server.body("<non-existent@example.com>")
1033 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1034
Antoine Pitrou2640b522012-02-15 18:53:18 +01001035 def test_body_file(self):
1036 f = io.BytesIO()
1037 resp, info = self.server.body(file=f)
1038 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1039 art_num, message_id, lines = info
1040 self.assertEqual(art_num, 3000237)
1041 self.assertEqual(message_id, "<45223423@example.com>")
1042 self.assertEqual(lines, [])
1043 data = f.getvalue()
1044 self.assertFalse(data.startswith(
1045 b'From: "Demo User" <nobody@example.net>\r\n'
1046 b'Subject: I am just a test article\r\n'
1047 ), ascii(data))
1048 self.assertTrue(data.endswith(
1049 b'This is just a test article.\r\n'
1050 b'.Here is a dot-starting line.\r\n'
1051 b'\r\n'
1052 b'-- Signed by Andr\xc3\xa9.\r\n'
1053 ), ascii(data))
1054
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001055 def check_over_xover_resp(self, resp, overviews):
1056 self.assertTrue(resp.startswith("224 "), resp)
1057 self.assertEqual(len(overviews), 3)
1058 art_num, over = overviews[0]
1059 self.assertEqual(art_num, 57)
1060 self.assertEqual(over, {
1061 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
1062 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
1063 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
1064 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
1065 "references": "<hvalf7$ort$1@dough.gmane.org>",
1066 ":bytes": "7103",
1067 ":lines": "16",
1068 "xref": "news.gmane.org gmane.comp.python.authors:57"
1069 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001070 art_num, over = overviews[1]
1071 self.assertEqual(over["xref"], None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001072 art_num, over = overviews[2]
1073 self.assertEqual(over["subject"],
1074 "Re: Message d'erreur incompréhensible (par moi)")
1075
1076 def test_xover(self):
1077 resp, overviews = self.server.xover(57, 59)
1078 self.check_over_xover_resp(resp, overviews)
1079
1080 def test_over(self):
1081 # In NNTP "v1", this will fallback on XOVER
1082 resp, overviews = self.server.over((57, 59))
1083 self.check_over_xover_resp(resp, overviews)
1084
1085 sample_post = (
1086 b'From: "Demo User" <nobody@example.net>\r\n'
1087 b'Subject: I am just a test article\r\n'
1088 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
1089 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
1090 b'\r\n'
1091 b'This is just a test article.\r\n'
1092 b'.Here is a dot-starting line.\r\n'
1093 b'\r\n'
1094 b'-- Signed by Andr\xc3\xa9.\r\n'
1095 )
1096
1097 def _check_posted_body(self):
1098 # Check the raw body as received by the server
1099 lines = self.handler.posted_body
1100 # One additional line for the "." terminator
1101 self.assertEqual(len(lines), 10)
1102 self.assertEqual(lines[-1], b'.\r\n')
1103 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
1104 self.assertEqual(lines[-3], b'\r\n')
1105 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
1106 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
1107
1108 def _check_post_ihave_sub(self, func, *args, file_factory):
1109 # First the prepared post with CRLF endings
1110 post = self.sample_post
1111 func_args = args + (file_factory(post),)
1112 self.handler.posted_body = None
1113 resp = func(*func_args)
1114 self._check_posted_body()
1115 # Then the same post with "normal" line endings - they should be
1116 # converted by NNTP.post and NNTP.ihave.
1117 post = self.sample_post.replace(b"\r\n", b"\n")
1118 func_args = args + (file_factory(post),)
1119 self.handler.posted_body = None
1120 resp = func(*func_args)
1121 self._check_posted_body()
1122 return resp
1123
1124 def check_post_ihave(self, func, success_resp, *args):
1125 # With a bytes object
1126 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1127 self.assertEqual(resp, success_resp)
1128 # With a bytearray object
1129 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1130 self.assertEqual(resp, success_resp)
1131 # With a file object
1132 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1133 self.assertEqual(resp, success_resp)
1134 # With an iterable of terminated lines
1135 def iterlines(b):
1136 return iter(b.splitlines(True))
1137 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1138 self.assertEqual(resp, success_resp)
1139 # With an iterable of non-terminated lines
1140 def iterlines(b):
1141 return iter(b.splitlines(False))
1142 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1143 self.assertEqual(resp, success_resp)
1144
1145 def test_post(self):
1146 self.check_post_ihave(self.server.post, "240 Article received OK")
1147 self.handler.allow_posting = False
1148 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1149 self.server.post(self.sample_post)
1150 self.assertEqual(cm.exception.response,
1151 "440 Posting not permitted")
1152
1153 def test_ihave(self):
1154 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1155 "<i.am.an.article.you.will.want@example.com>")
1156 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1157 self.server.ihave("<another.message.id>", self.sample_post)
1158 self.assertEqual(cm.exception.response,
1159 "435 Article not wanted")
1160
1161
1162class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1163 """Tests an NNTP v1 server (no capabilities)."""
1164
1165 nntp_version = 1
1166 handler_class = NNTPv1Handler
1167
1168 def test_caps(self):
1169 caps = self.server.getcapabilities()
1170 self.assertEqual(caps, {})
1171 self.assertEqual(self.server.nntp_version, 1)
Antoine Pitroua0781152010-11-05 19:16:37 +00001172 self.assertEqual(self.server.nntp_implementation, None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001173
1174
1175class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1176 """Tests an NNTP v2 server (with capabilities)."""
1177
1178 nntp_version = 2
1179 handler_class = NNTPv2Handler
1180
1181 def test_caps(self):
1182 caps = self.server.getcapabilities()
1183 self.assertEqual(caps, {
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001184 'VERSION': ['2', '3'],
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001185 'IMPLEMENTATION': ['INN', '2.5.1'],
1186 'AUTHINFO': ['USER'],
1187 'HDR': [],
1188 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1189 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1190 'OVER': [],
1191 'POST': [],
1192 'READER': [],
1193 })
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001194 self.assertEqual(self.server.nntp_version, 3)
Antoine Pitroua0781152010-11-05 19:16:37 +00001195 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001196
1197
Antoine Pitrou54411c12012-02-12 19:14:17 +01001198class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
1199 """Tests a probably NNTP v2 server with capabilities only after login."""
1200
1201 nntp_version = 2
1202 handler_class = CapsAfterLoginNNTPv2Handler
1203
1204 def test_caps_only_after_login(self):
1205 self.assertEqual(self.server._caps, {})
1206 self.server.login('testuser', 'testpw')
1207 self.assertIn('VERSION', self.server._caps)
1208
1209
Antoine Pitrou71135622012-02-14 23:29:34 +01001210class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin,
1211 unittest.TestCase):
1212 """Same tests as for v2 but we tell NTTP to send MODE READER to a server
1213 that isn't in READER mode by default."""
1214
1215 nntp_version = 2
1216 handler_class = ModeSwitchingNNTPv2Handler
1217
1218 def test_we_are_in_reader_mode_after_connect(self):
1219 self.assertIn('READER', self.server._caps)
1220
1221
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001222class MiscTests(unittest.TestCase):
1223
1224 def test_decode_header(self):
1225 def gives(a, b):
1226 self.assertEqual(nntplib.decode_header(a), b)
1227 gives("" , "")
1228 gives("a plain header", "a plain header")
1229 gives(" with extra spaces ", " with extra spaces ")
1230 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
1231 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1232 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1233 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1234 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1235 "Re: problème de matrice")
1236 # A natively utf-8 header (found in the real world!)
1237 gives("Re: Message d'erreur incompréhensible (par moi)",
1238 "Re: Message d'erreur incompréhensible (par moi)")
1239
1240 def test_parse_overview_fmt(self):
1241 # The minimal (default) response
1242 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1243 "References:", ":bytes", ":lines"]
1244 self.assertEqual(nntplib._parse_overview_fmt(lines),
1245 ["subject", "from", "date", "message-id", "references",
1246 ":bytes", ":lines"])
1247 # The minimal response using alternative names
1248 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1249 "References:", "Bytes:", "Lines:"]
1250 self.assertEqual(nntplib._parse_overview_fmt(lines),
1251 ["subject", "from", "date", "message-id", "references",
1252 ":bytes", ":lines"])
1253 # Variations in casing
1254 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1255 "References:", "BYTES:", "Lines:"]
1256 self.assertEqual(nntplib._parse_overview_fmt(lines),
1257 ["subject", "from", "date", "message-id", "references",
1258 ":bytes", ":lines"])
1259 # First example from RFC 3977
1260 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1261 "References:", ":bytes", ":lines", "Xref:full",
1262 "Distribution:full"]
1263 self.assertEqual(nntplib._parse_overview_fmt(lines),
1264 ["subject", "from", "date", "message-id", "references",
1265 ":bytes", ":lines", "xref", "distribution"])
1266 # Second example from RFC 3977
1267 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1268 "References:", "Bytes:", "Lines:", "Xref:FULL",
1269 "Distribution:FULL"]
1270 self.assertEqual(nntplib._parse_overview_fmt(lines),
1271 ["subject", "from", "date", "message-id", "references",
1272 ":bytes", ":lines", "xref", "distribution"])
1273 # A classic response from INN
1274 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1275 "References:", "Bytes:", "Lines:", "Xref:full"]
1276 self.assertEqual(nntplib._parse_overview_fmt(lines),
1277 ["subject", "from", "date", "message-id", "references",
1278 ":bytes", ":lines", "xref"])
1279
1280 def test_parse_overview(self):
1281 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1282 # First example from RFC 3977
1283 lines = [
1284 '3000234\tI am just a test article\t"Demo User" '
1285 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1286 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1287 '17\tXref: news.example.com misc.test:3000363',
1288 ]
1289 overview = nntplib._parse_overview(lines, fmt)
1290 (art_num, fields), = overview
1291 self.assertEqual(art_num, 3000234)
1292 self.assertEqual(fields, {
1293 'subject': 'I am just a test article',
1294 'from': '"Demo User" <nobody@example.com>',
1295 'date': '6 Oct 1998 04:38:40 -0500',
1296 'message-id': '<45223423@example.com>',
1297 'references': '<45454@example.net>',
1298 ':bytes': '1234',
1299 ':lines': '17',
1300 'xref': 'news.example.com misc.test:3000363',
1301 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001302 # Second example; here the "Xref" field is totally absent (including
1303 # the header name) and comes out as None
1304 lines = [
1305 '3000234\tI am just a test article\t"Demo User" '
1306 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1307 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1308 '17\t\t',
1309 ]
1310 overview = nntplib._parse_overview(lines, fmt)
1311 (art_num, fields), = overview
1312 self.assertEqual(fields['xref'], None)
1313 # Third example; the "Xref" is an empty string, while "references"
1314 # is a single space.
1315 lines = [
1316 '3000234\tI am just a test article\t"Demo User" '
1317 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1318 '<45223423@example.com>\t \t1234\t'
1319 '17\tXref: \t',
1320 ]
1321 overview = nntplib._parse_overview(lines, fmt)
1322 (art_num, fields), = overview
1323 self.assertEqual(fields['references'], ' ')
1324 self.assertEqual(fields['xref'], '')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001325
1326 def test_parse_datetime(self):
1327 def gives(a, b, *c):
1328 self.assertEqual(nntplib._parse_datetime(a, b),
1329 datetime.datetime(*c))
1330 # Output of DATE command
1331 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1332 # Variations
1333 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1334 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1335 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1336
1337 def test_unparse_datetime(self):
1338 # Test non-legacy mode
1339 # 1) with a datetime
1340 def gives(y, M, d, h, m, s, date_str, time_str):
1341 dt = datetime.datetime(y, M, d, h, m, s)
1342 self.assertEqual(nntplib._unparse_datetime(dt),
1343 (date_str, time_str))
1344 self.assertEqual(nntplib._unparse_datetime(dt, False),
1345 (date_str, time_str))
1346 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1347 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1348 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1349 # 2) with a date
1350 def gives(y, M, d, date_str, time_str):
1351 dt = datetime.date(y, M, d)
1352 self.assertEqual(nntplib._unparse_datetime(dt),
1353 (date_str, time_str))
1354 self.assertEqual(nntplib._unparse_datetime(dt, False),
1355 (date_str, time_str))
1356 gives(1999, 6, 23, "19990623", "000000")
1357 gives(2000, 6, 23, "20000623", "000000")
1358 gives(2010, 6, 5, "20100605", "000000")
1359
1360 def test_unparse_datetime_legacy(self):
1361 # Test legacy mode (RFC 977)
1362 # 1) with a datetime
1363 def gives(y, M, d, h, m, s, date_str, time_str):
1364 dt = datetime.datetime(y, M, d, h, m, s)
1365 self.assertEqual(nntplib._unparse_datetime(dt, True),
1366 (date_str, time_str))
1367 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1368 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1369 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1370 # 2) with a date
1371 def gives(y, M, d, date_str, time_str):
1372 dt = datetime.date(y, M, d)
1373 self.assertEqual(nntplib._unparse_datetime(dt, True),
1374 (date_str, time_str))
1375 gives(1999, 6, 23, "990623", "000000")
1376 gives(2000, 6, 23, "000623", "000000")
1377 gives(2010, 6, 5, "100605", "000000")
1378
1379
1380def test_main():
Antoine Pitrou54411c12012-02-12 19:14:17 +01001381 tests = [MiscTests, NNTPv1Tests, NNTPv2Tests, CapsAfterLoginNNTPv2Tests,
Antoine Pitrou71135622012-02-14 23:29:34 +01001382 SendReaderNNTPv2Tests, NetworkedNNTPTests]
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00001383 if _have_ssl:
1384 tests.append(NetworkedNNTP_SSLTests)
1385 support.run_unittest(*tests)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001386
1387
1388if __name__ == "__main__":
1389 test_main()