blob: 86827f37f5be7909ab9978cc5b9d599ed8bc6f0a [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
2import datetime
3import textwrap
4import unittest
Antoine Pitroude609182010-11-18 17:29:23 +00005import functools
Antoine Pitrou69ab9512010-09-29 15:03:40 +00006import contextlib
7from test import support
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00008from nntplib import NNTP, GroupInfo, _have_ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +00009import nntplib
Antoine Pitrou1cb121e2010-11-09 18:54:37 +000010if _have_ssl:
11 import ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +000012
13TIMEOUT = 30
14
15# TODO:
16# - test the `file` arg to more commands
17# - test error conditions
Antoine Pitroua5785b12010-09-29 16:19:50 +000018# - test auth and `usenetrc`
Antoine Pitrou69ab9512010-09-29 15:03:40 +000019
20
21class NetworkedNNTPTestsMixin:
22
23 def test_welcome(self):
24 welcome = self.server.getwelcome()
25 self.assertEqual(str, type(welcome))
26
27 def test_help(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000028 resp, lines = self.server.help()
Antoine Pitrou69ab9512010-09-29 15:03:40 +000029 self.assertTrue(resp.startswith("100 "), resp)
Antoine Pitrou08eeada2010-11-04 21:36:15 +000030 for line in lines:
Antoine Pitrou69ab9512010-09-29 15:03:40 +000031 self.assertEqual(str, type(line))
32
33 def test_list(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000034 resp, groups = self.server.list()
35 if len(groups) > 0:
36 self.assertEqual(GroupInfo, type(groups[0]))
37 self.assertEqual(str, type(groups[0].group))
38
39 def test_list_active(self):
40 resp, groups = self.server.list(self.GROUP_PAT)
41 if len(groups) > 0:
42 self.assertEqual(GroupInfo, type(groups[0]))
43 self.assertEqual(str, type(groups[0].group))
Antoine Pitrou69ab9512010-09-29 15:03:40 +000044
45 def test_unknown_command(self):
46 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
47 self.server._shortcmd("XYZZY")
48 resp = cm.exception.response
49 self.assertTrue(resp.startswith("500 "), resp)
50
51 def test_newgroups(self):
52 # gmane gets a constant influx of new groups. In order not to stress
53 # the server too much, we choose a recent date in the past.
54 dt = datetime.date.today() - datetime.timedelta(days=7)
55 resp, groups = self.server.newgroups(dt)
56 if len(groups) > 0:
57 self.assertIsInstance(groups[0], GroupInfo)
58 self.assertIsInstance(groups[0].group, str)
59
60 def test_description(self):
61 def _check_desc(desc):
62 # Sanity checks
63 self.assertIsInstance(desc, str)
64 self.assertNotIn(self.GROUP_NAME, desc)
65 desc = self.server.description(self.GROUP_NAME)
66 _check_desc(desc)
67 # Another sanity check
68 self.assertIn("Python", desc)
69 # With a pattern
70 desc = self.server.description(self.GROUP_PAT)
71 _check_desc(desc)
72 # Shouldn't exist
73 desc = self.server.description("zk.brrtt.baz")
74 self.assertEqual(desc, '')
75
76 def test_descriptions(self):
77 resp, descs = self.server.descriptions(self.GROUP_PAT)
78 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
79 self.assertTrue(
80 resp.startswith("215 ") or resp.startswith("282 "), resp)
81 self.assertIsInstance(descs, dict)
82 desc = descs[self.GROUP_NAME]
83 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
84
85 def test_group(self):
86 result = self.server.group(self.GROUP_NAME)
87 self.assertEqual(5, len(result))
88 resp, count, first, last, group = result
89 self.assertEqual(group, self.GROUP_NAME)
90 self.assertIsInstance(count, int)
91 self.assertIsInstance(first, int)
92 self.assertIsInstance(last, int)
93 self.assertLessEqual(first, last)
94 self.assertTrue(resp.startswith("211 "), resp)
95
96 def test_date(self):
97 resp, date = self.server.date()
98 self.assertIsInstance(date, datetime.datetime)
99 # Sanity check
100 self.assertGreaterEqual(date.year, 1995)
101 self.assertLessEqual(date.year, 2030)
102
103 def _check_art_dict(self, art_dict):
104 # Some sanity checks for a field dictionary returned by OVER / XOVER
105 self.assertIsInstance(art_dict, dict)
106 # NNTP has 7 mandatory fields
107 self.assertGreaterEqual(art_dict.keys(),
108 {"subject", "from", "date", "message-id",
109 "references", ":bytes", ":lines"}
110 )
111 for v in art_dict.values():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000112 self.assertIsInstance(v, (str, type(None)))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000113
114 def test_xover(self):
115 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000116 resp, lines = self.server.xover(last - 5, last)
117 if len(lines) == 0:
118 self.skipTest("no articles retrieved")
119 # The 'last' article is not necessarily part of the output (cancelled?)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000120 art_num, art_dict = lines[0]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000121 self.assertGreaterEqual(art_num, last - 5)
122 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000123 self._check_art_dict(art_dict)
124
125 def test_over(self):
126 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
127 start = last - 10
128 # The "start-" article range form
129 resp, lines = self.server.over((start, None))
130 art_num, art_dict = lines[0]
131 self._check_art_dict(art_dict)
132 # The "start-end" article range form
133 resp, lines = self.server.over((start, last))
134 art_num, art_dict = lines[-1]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000135 # The 'last' article is not necessarily part of the output (cancelled?)
136 self.assertGreaterEqual(art_num, start)
137 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000138 self._check_art_dict(art_dict)
139 # XXX The "message_id" form is unsupported by gmane
140 # 503 Overview by message-ID unsupported
141
142 def test_xhdr(self):
143 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
144 resp, lines = self.server.xhdr('subject', last)
145 for line in lines:
146 self.assertEqual(str, type(line[1]))
147
148 def check_article_resp(self, resp, article, art_num=None):
149 self.assertIsInstance(article, nntplib.ArticleInfo)
150 if art_num is not None:
151 self.assertEqual(article.number, art_num)
152 for line in article.lines:
153 self.assertIsInstance(line, bytes)
154 # XXX this could exceptionally happen...
155 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
156
157 def test_article_head_body(self):
158 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000159 # Try to find an available article
160 for art_num in (last, first, last - 1):
161 try:
162 resp, head = self.server.head(art_num)
163 except nntplib.NNTPTemporaryError as e:
164 if not e.response.startswith("423 "):
165 raise
166 # "423 No such article" => choose another one
167 continue
168 break
169 else:
170 self.skipTest("could not find a suitable article number")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000171 self.assertTrue(resp.startswith("221 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000172 self.check_article_resp(resp, head, art_num)
173 resp, body = self.server.body(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000174 self.assertTrue(resp.startswith("222 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000175 self.check_article_resp(resp, body, art_num)
176 resp, article = self.server.article(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000177 self.assertTrue(resp.startswith("220 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000178 self.check_article_resp(resp, article, art_num)
Nick Coghlan14d99a12012-06-17 21:27:18 +1000179 # Tolerate running the tests from behind a NNTP virus checker
180 filtered_lines = [line for line in article.lines
181 if not line.startswith(b'X-Antivirus')]
182 self.assertEqual(filtered_lines, head.lines + [b''] + body.lines)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000183
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000184 def test_capabilities(self):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000185 # The server under test implements NNTP version 2 and has a
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000186 # couple of well-known capabilities. Just sanity check that we
187 # got them.
188 def _check_caps(caps):
189 caps_list = caps['LIST']
190 self.assertIsInstance(caps_list, (list, tuple))
191 self.assertIn('OVERVIEW.FMT', caps_list)
192 self.assertGreaterEqual(self.server.nntp_version, 2)
193 _check_caps(self.server.getcapabilities())
194 # This re-emits the command
195 resp, caps = self.server.capabilities()
196 _check_caps(caps)
197
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000198 if _have_ssl:
199 def test_starttls(self):
200 file = self.server.file
201 sock = self.server.sock
202 try:
203 self.server.starttls()
204 except nntplib.NNTPPermanentError:
205 self.skipTest("STARTTLS not supported by server.")
206 else:
207 # Check that the socket and internal pseudo-file really were
208 # changed.
209 self.assertNotEqual(file, self.server.file)
210 self.assertNotEqual(sock, self.server.sock)
211 # Check that the new socket really is an SSL one
212 self.assertIsInstance(self.server.sock, ssl.SSLSocket)
213 # Check that trying starttls when it's already active fails.
214 self.assertRaises(ValueError, self.server.starttls)
215
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000216 def test_zlogin(self):
217 # This test must be the penultimate because further commands will be
218 # refused.
219 baduser = "notarealuser"
220 badpw = "notarealpassword"
221 # Check that bogus credentials cause failure
222 self.assertRaises(nntplib.NNTPError, self.server.login,
223 user=baduser, password=badpw, usenetrc=False)
224 # FIXME: We should check that correct credentials succeed, but that
225 # would require valid details for some server somewhere to be in the
226 # test suite, I think. Gmane is anonymous, at least as used for the
227 # other tests.
228
229 def test_zzquit(self):
230 # This test must be called last, hence the name
231 cls = type(self)
Antoine Pitrou3bce11c2010-11-21 17:14:19 +0000232 try:
233 self.server.quit()
234 finally:
235 cls.server = None
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000236
Antoine Pitroude609182010-11-18 17:29:23 +0000237 @classmethod
238 def wrap_methods(cls):
239 # Wrap all methods in a transient_internet() exception catcher
240 # XXX put a generic version in test.support?
241 def wrap_meth(meth):
242 @functools.wraps(meth)
243 def wrapped(self):
244 with support.transient_internet(self.NNTP_HOST):
245 meth(self)
246 return wrapped
247 for name in dir(cls):
248 if not name.startswith('test_'):
249 continue
250 meth = getattr(cls, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200251 if not callable(meth):
Antoine Pitroude609182010-11-18 17:29:23 +0000252 continue
253 # Need to use a closure so that meth remains bound to its current
254 # value
255 setattr(cls, name, wrap_meth(meth))
256
257NetworkedNNTPTestsMixin.wrap_methods()
258
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000259
260class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
261 # This server supports STARTTLS (gmane doesn't)
262 NNTP_HOST = 'news.trigofacile.com'
263 GROUP_NAME = 'fr.comp.lang.python'
264 GROUP_PAT = 'fr.comp.lang.*'
265
Antoine Pitroude609182010-11-18 17:29:23 +0000266 NNTP_CLASS = NNTP
267
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000268 @classmethod
269 def setUpClass(cls):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000270 support.requires("network")
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000271 with support.transient_internet(cls.NNTP_HOST):
Antoine Pitroude609182010-11-18 17:29:23 +0000272 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, timeout=TIMEOUT, usenetrc=False)
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000273
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000274 @classmethod
275 def tearDownClass(cls):
276 if cls.server is not None:
277 cls.server.quit()
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000278
279
280if _have_ssl:
Antoine Pitroude609182010-11-18 17:29:23 +0000281 class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000282
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000283 # Technical limits for this public NNTP server (see http://www.aioe.org):
284 # "Only two concurrent connections per IP address are allowed and
285 # 400 connections per day are accepted from each IP address."
286
287 NNTP_HOST = 'nntp.aioe.org'
288 GROUP_NAME = 'comp.lang.python'
289 GROUP_PAT = 'comp.lang.*'
290
Antoine Pitroude609182010-11-18 17:29:23 +0000291 NNTP_CLASS = nntplib.NNTP_SSL
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000292
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000293 # Disabled as it produces too much data
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000294 test_list = None
295
296 # Disabled as the connection will already be encrypted.
297 test_starttls = None
298
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000299
300#
301# Non-networked tests using a local server (or something mocking it).
302#
303
304class _NNTPServerIO(io.RawIOBase):
305 """A raw IO object allowing NNTP commands to be received and processed
306 by a handler. The handler can push responses which can then be read
307 from the IO object."""
308
309 def __init__(self, handler):
310 io.RawIOBase.__init__(self)
311 # The channel from the client
312 self.c2s = io.BytesIO()
313 # The channel to the client
314 self.s2c = io.BytesIO()
315 self.handler = handler
316 self.handler.start(self.c2s.readline, self.push_data)
317
318 def readable(self):
319 return True
320
321 def writable(self):
322 return True
323
324 def push_data(self, data):
325 """Push (buffer) some data to send to the client."""
326 pos = self.s2c.tell()
327 self.s2c.seek(0, 2)
328 self.s2c.write(data)
329 self.s2c.seek(pos)
330
331 def write(self, b):
332 """The client sends us some data"""
333 pos = self.c2s.tell()
334 self.c2s.write(b)
335 self.c2s.seek(pos)
336 self.handler.process_pending()
337 return len(b)
338
339 def readinto(self, buf):
340 """The client wants to read a response"""
341 self.handler.process_pending()
342 b = self.s2c.read(len(buf))
343 n = len(b)
344 buf[:n] = b
345 return n
346
347
348class MockedNNTPTestsMixin:
349 # Override in derived classes
350 handler_class = None
351
352 def setUp(self):
353 super().setUp()
354 self.make_server()
355
356 def tearDown(self):
357 super().tearDown()
358 del self.server
359
360 def make_server(self, *args, **kwargs):
361 self.handler = self.handler_class()
362 self.sio = _NNTPServerIO(self.handler)
363 # Using BufferedRWPair instead of BufferedRandom ensures the file
364 # isn't seekable.
365 file = io.BufferedRWPair(self.sio, self.sio)
Antoine Pitroua5785b12010-09-29 16:19:50 +0000366 self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000367 return self.server
368
369
Antoine Pitrou71135622012-02-14 23:29:34 +0100370class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin):
371 def setUp(self):
372 super().setUp()
373 self.make_server(readermode=True)
374
375
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000376class NNTPv1Handler:
377 """A handler for RFC 977"""
378
379 welcome = "200 NNTP mock server"
380
381 def start(self, readline, push_data):
382 self.in_body = False
383 self.allow_posting = True
384 self._readline = readline
385 self._push_data = push_data
Antoine Pitrou54411c12012-02-12 19:14:17 +0100386 self._logged_in = False
387 self._user_sent = False
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000388 # Our welcome
389 self.handle_welcome()
390
391 def _decode(self, data):
392 return str(data, "utf-8", "surrogateescape")
393
394 def process_pending(self):
395 if self.in_body:
396 while True:
397 line = self._readline()
398 if not line:
399 return
400 self.body.append(line)
401 if line == b".\r\n":
402 break
403 try:
404 meth, tokens = self.body_callback
405 meth(*tokens, body=self.body)
406 finally:
407 self.body_callback = None
408 self.body = None
409 self.in_body = False
410 while True:
411 line = self._decode(self._readline())
412 if not line:
413 return
414 if not line.endswith("\r\n"):
415 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
416 line = line[:-2]
417 cmd, *tokens = line.split()
418 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
419 meth = getattr(self, "handle_" + cmd.upper(), None)
420 if meth is None:
421 self.handle_unknown()
422 else:
423 try:
424 meth(*tokens)
425 except Exception as e:
426 raise ValueError("command failed: {!r}".format(line)) from e
427 else:
428 if self.in_body:
429 self.body_callback = meth, tokens
430 self.body = []
431
432 def expect_body(self):
433 """Flag that the client is expected to post a request body"""
434 self.in_body = True
435
436 def push_data(self, data):
437 """Push some binary data"""
438 self._push_data(data)
439
440 def push_lit(self, lit):
441 """Push a string literal"""
442 lit = textwrap.dedent(lit)
443 lit = "\r\n".join(lit.splitlines()) + "\r\n"
444 lit = lit.encode('utf-8')
445 self.push_data(lit)
446
447 def handle_unknown(self):
448 self.push_lit("500 What?")
449
450 def handle_welcome(self):
451 self.push_lit(self.welcome)
452
453 def handle_QUIT(self):
454 self.push_lit("205 Bye!")
455
456 def handle_DATE(self):
457 self.push_lit("111 20100914001155")
458
459 def handle_GROUP(self, group):
460 if group == "fr.comp.lang.python":
461 self.push_lit("211 486 761 1265 fr.comp.lang.python")
462 else:
463 self.push_lit("411 No such group {}".format(group))
464
465 def handle_HELP(self):
466 self.push_lit("""\
467 100 Legal commands
468 authinfo user Name|pass Password|generic <prog> <args>
469 date
470 help
471 Report problems to <root@example.org>
472 .""")
473
474 def handle_STAT(self, message_spec=None):
475 if message_spec is None:
476 self.push_lit("412 No newsgroup selected")
477 elif message_spec == "3000234":
478 self.push_lit("223 3000234 <45223423@example.com>")
479 elif message_spec == "<45223423@example.com>":
480 self.push_lit("223 0 <45223423@example.com>")
481 else:
482 self.push_lit("430 No Such Article Found")
483
484 def handle_NEXT(self):
485 self.push_lit("223 3000237 <668929@example.org> retrieved")
486
487 def handle_LAST(self):
488 self.push_lit("223 3000234 <45223423@example.com> retrieved")
489
490 def handle_LIST(self, action=None, param=None):
491 if action is None:
492 self.push_lit("""\
493 215 Newsgroups in form "group high low flags".
494 comp.lang.python 0000052340 0000002828 y
495 comp.lang.python.announce 0000001153 0000000993 m
496 free.it.comp.lang.python 0000000002 0000000002 y
497 fr.comp.lang.python 0000001254 0000000760 y
498 free.it.comp.lang.python.learner 0000000000 0000000001 y
499 tw.bbs.comp.lang.python 0000000304 0000000304 y
500 .""")
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000501 elif action == "ACTIVE":
502 if param == "*distutils*":
503 self.push_lit("""\
504 215 Newsgroups in form "group high low flags"
505 gmane.comp.python.distutils.devel 0000014104 0000000001 m
506 gmane.comp.python.distutils.cvs 0000000000 0000000001 m
507 .""")
508 else:
509 self.push_lit("""\
510 215 Newsgroups in form "group high low flags"
511 .""")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000512 elif action == "OVERVIEW.FMT":
513 self.push_lit("""\
514 215 Order of fields in overview database.
515 Subject:
516 From:
517 Date:
518 Message-ID:
519 References:
520 Bytes:
521 Lines:
522 Xref:full
523 .""")
524 elif action == "NEWSGROUPS":
525 assert param is not None
526 if param == "comp.lang.python":
527 self.push_lit("""\
528 215 Descriptions in form "group description".
529 comp.lang.python\tThe Python computer language.
530 .""")
531 elif param == "comp.lang.python*":
532 self.push_lit("""\
533 215 Descriptions in form "group description".
534 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
535 comp.lang.python\tThe Python computer language.
536 .""")
537 else:
538 self.push_lit("""\
539 215 Descriptions in form "group description".
540 .""")
541 else:
542 self.push_lit('501 Unknown LIST keyword')
543
544 def handle_NEWNEWS(self, group, date_str, time_str):
545 # We hard code different return messages depending on passed
546 # argument and date syntax.
547 if (group == "comp.lang.python" and date_str == "20100913"
548 and time_str == "082004"):
549 # Date was passed in RFC 3977 format (NNTP "v2")
550 self.push_lit("""\
551 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
552 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
553 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
554 .""")
555 elif (group == "comp.lang.python" and date_str == "100913"
556 and time_str == "082004"):
557 # Date was passed in RFC 977 format (NNTP "v1")
558 self.push_lit("""\
559 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
560 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
561 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
562 .""")
563 else:
564 self.push_lit("""\
565 230 An empty list of newsarticles follows
566 .""")
567 # (Note for experiments: many servers disable NEWNEWS.
568 # As of this writing, sicinfo3.epfl.ch doesn't.)
569
570 def handle_XOVER(self, message_spec):
571 if message_spec == "57-59":
572 self.push_lit(
573 "224 Overview information for 57-58 follows\n"
574 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
575 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
576 "\tSat, 19 Jun 2010 18:04:08 -0400"
577 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
578 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
579 "\tXref: news.gmane.org gmane.comp.python.authors:57"
580 "\n"
581 "58\tLooking for a few good bloggers"
582 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
583 "\tThu, 22 Jul 2010 09:14:14 -0400"
584 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
585 "\t\t6683\t16"
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000586 "\t"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000587 "\n"
588 # An UTF-8 overview line from fr.comp.lang.python
589 "59\tRe: Message d'erreur incompréhensible (par moi)"
590 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
591 "\tWed, 15 Sep 2010 18:09:15 +0200"
592 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
593 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
594 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
595 "\n"
596 ".\n")
597 else:
598 self.push_lit("""\
599 224 No articles
600 .""")
601
602 def handle_POST(self, *, body=None):
603 if body is None:
604 if self.allow_posting:
605 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
606 self.expect_body()
607 else:
608 self.push_lit("440 Posting not permitted")
609 else:
610 assert self.allow_posting
611 self.push_lit("240 Article received OK")
612 self.posted_body = body
613
614 def handle_IHAVE(self, message_id, *, body=None):
615 if body is None:
616 if (self.allow_posting and
617 message_id == "<i.am.an.article.you.will.want@example.com>"):
618 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
619 self.expect_body()
620 else:
621 self.push_lit("435 Article not wanted")
622 else:
623 assert self.allow_posting
624 self.push_lit("235 Article transferred OK")
625 self.posted_body = body
626
627 sample_head = """\
628 From: "Demo User" <nobody@example.net>
629 Subject: I am just a test article
630 Content-Type: text/plain; charset=UTF-8; format=flowed
631 Message-ID: <i.am.an.article.you.will.want@example.com>"""
632
633 sample_body = """\
634 This is just a test article.
635 ..Here is a dot-starting line.
636
637 -- Signed by Andr\xe9."""
638
639 sample_article = sample_head + "\n\n" + sample_body
640
641 def handle_ARTICLE(self, message_spec=None):
642 if message_spec is None:
643 self.push_lit("220 3000237 <45223423@example.com>")
644 elif message_spec == "<45223423@example.com>":
645 self.push_lit("220 0 <45223423@example.com>")
646 elif message_spec == "3000234":
647 self.push_lit("220 3000234 <45223423@example.com>")
648 else:
649 self.push_lit("430 No Such Article Found")
650 return
651 self.push_lit(self.sample_article)
652 self.push_lit(".")
653
654 def handle_HEAD(self, message_spec=None):
655 if message_spec is None:
656 self.push_lit("221 3000237 <45223423@example.com>")
657 elif message_spec == "<45223423@example.com>":
658 self.push_lit("221 0 <45223423@example.com>")
659 elif message_spec == "3000234":
660 self.push_lit("221 3000234 <45223423@example.com>")
661 else:
662 self.push_lit("430 No Such Article Found")
663 return
664 self.push_lit(self.sample_head)
665 self.push_lit(".")
666
667 def handle_BODY(self, message_spec=None):
668 if message_spec is None:
669 self.push_lit("222 3000237 <45223423@example.com>")
670 elif message_spec == "<45223423@example.com>":
671 self.push_lit("222 0 <45223423@example.com>")
672 elif message_spec == "3000234":
673 self.push_lit("222 3000234 <45223423@example.com>")
674 else:
675 self.push_lit("430 No Such Article Found")
676 return
677 self.push_lit(self.sample_body)
678 self.push_lit(".")
679
Antoine Pitrou54411c12012-02-12 19:14:17 +0100680 def handle_AUTHINFO(self, cred_type, data):
681 if self._logged_in:
682 self.push_lit('502 Already Logged In')
683 elif cred_type == 'user':
684 if self._user_sent:
685 self.push_lit('482 User Credential Already Sent')
686 else:
687 self.push_lit('381 Password Required')
688 self._user_sent = True
689 elif cred_type == 'pass':
690 self.push_lit('281 Login Successful')
691 self._logged_in = True
692 else:
693 raise Exception('Unknown cred type {}'.format(cred_type))
694
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000695
696class NNTPv2Handler(NNTPv1Handler):
697 """A handler for RFC 3977 (NNTP "v2")"""
698
699 def handle_CAPABILITIES(self):
Antoine Pitrou54411c12012-02-12 19:14:17 +0100700 fmt = """\
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000701 101 Capability list:
Antoine Pitrouf80b3f72010-11-02 22:31:52 +0000702 VERSION 2 3
Antoine Pitrou54411c12012-02-12 19:14:17 +0100703 IMPLEMENTATION INN 2.5.1{}
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000704 HDR
705 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
706 OVER
707 POST
708 READER
Antoine Pitrou54411c12012-02-12 19:14:17 +0100709 ."""
710
711 if not self._logged_in:
712 self.push_lit(fmt.format('\n AUTHINFO USER'))
713 else:
714 self.push_lit(fmt.format(''))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000715
Antoine Pitrou71135622012-02-14 23:29:34 +0100716 def handle_MODE(self, _):
717 raise Exception('MODE READER sent despite READER has been advertised')
718
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000719 def handle_OVER(self, message_spec=None):
720 return self.handle_XOVER(message_spec)
721
722
Antoine Pitrou54411c12012-02-12 19:14:17 +0100723class CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
724 """A handler that allows CAPABILITIES only after login"""
725
726 def handle_CAPABILITIES(self):
727 if not self._logged_in:
728 self.push_lit('480 You must log in.')
729 else:
730 super().handle_CAPABILITIES()
731
732
Antoine Pitrou71135622012-02-14 23:29:34 +0100733class ModeSwitchingNNTPv2Handler(NNTPv2Handler):
734 """A server that starts in transit mode"""
735
736 def __init__(self):
737 self._switched = False
738
739 def handle_CAPABILITIES(self):
740 fmt = """\
741 101 Capability list:
742 VERSION 2 3
743 IMPLEMENTATION INN 2.5.1
744 HDR
745 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
746 OVER
747 POST
748 {}READER
749 ."""
750 if self._switched:
751 self.push_lit(fmt.format(''))
752 else:
753 self.push_lit(fmt.format('MODE-'))
754
755 def handle_MODE(self, what):
756 assert not self._switched and what == 'reader'
757 self._switched = True
758 self.push_lit('200 Posting allowed')
759
760
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000761class NNTPv1v2TestsMixin:
762
763 def setUp(self):
764 super().setUp()
765
766 def test_welcome(self):
767 self.assertEqual(self.server.welcome, self.handler.welcome)
768
Antoine Pitrou54411c12012-02-12 19:14:17 +0100769 def test_authinfo(self):
770 if self.nntp_version == 2:
771 self.assertIn('AUTHINFO', self.server._caps)
772 self.server.login('testuser', 'testpw')
773 # if AUTHINFO is gone from _caps we also know that getcapabilities()
774 # has been called after login as it should
775 self.assertNotIn('AUTHINFO', self.server._caps)
776
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000777 def test_date(self):
778 resp, date = self.server.date()
779 self.assertEqual(resp, "111 20100914001155")
780 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
781
782 def test_quit(self):
783 self.assertFalse(self.sio.closed)
784 resp = self.server.quit()
785 self.assertEqual(resp, "205 Bye!")
786 self.assertTrue(self.sio.closed)
787
788 def test_help(self):
789 resp, help = self.server.help()
790 self.assertEqual(resp, "100 Legal commands")
791 self.assertEqual(help, [
792 ' authinfo user Name|pass Password|generic <prog> <args>',
793 ' date',
794 ' help',
795 'Report problems to <root@example.org>',
796 ])
797
798 def test_list(self):
799 resp, groups = self.server.list()
800 self.assertEqual(len(groups), 6)
801 g = groups[1]
802 self.assertEqual(g,
803 GroupInfo("comp.lang.python.announce", "0000001153",
804 "0000000993", "m"))
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000805 resp, groups = self.server.list("*distutils*")
806 self.assertEqual(len(groups), 2)
807 g = groups[0]
808 self.assertEqual(g,
809 GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
810 "0000000001", "m"))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000811
812 def test_stat(self):
813 resp, art_num, message_id = self.server.stat(3000234)
814 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
815 self.assertEqual(art_num, 3000234)
816 self.assertEqual(message_id, "<45223423@example.com>")
817 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
818 self.assertEqual(resp, "223 0 <45223423@example.com>")
819 self.assertEqual(art_num, 0)
820 self.assertEqual(message_id, "<45223423@example.com>")
821 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
822 self.server.stat("<non.existent.id>")
823 self.assertEqual(cm.exception.response, "430 No Such Article Found")
824 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
825 self.server.stat()
826 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
827
828 def test_next(self):
829 resp, art_num, message_id = self.server.next()
830 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
831 self.assertEqual(art_num, 3000237)
832 self.assertEqual(message_id, "<668929@example.org>")
833
834 def test_last(self):
835 resp, art_num, message_id = self.server.last()
836 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
837 self.assertEqual(art_num, 3000234)
838 self.assertEqual(message_id, "<45223423@example.com>")
839
840 def test_description(self):
841 desc = self.server.description("comp.lang.python")
842 self.assertEqual(desc, "The Python computer language.")
843 desc = self.server.description("comp.lang.pythonx")
844 self.assertEqual(desc, "")
845
846 def test_descriptions(self):
847 resp, groups = self.server.descriptions("comp.lang.python")
848 self.assertEqual(resp, '215 Descriptions in form "group description".')
849 self.assertEqual(groups, {
850 "comp.lang.python": "The Python computer language.",
851 })
852 resp, groups = self.server.descriptions("comp.lang.python*")
853 self.assertEqual(groups, {
854 "comp.lang.python": "The Python computer language.",
855 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
856 })
857 resp, groups = self.server.descriptions("comp.lang.pythonx")
858 self.assertEqual(groups, {})
859
860 def test_group(self):
861 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
862 self.assertTrue(resp.startswith("211 "), resp)
863 self.assertEqual(first, 761)
864 self.assertEqual(last, 1265)
865 self.assertEqual(count, 486)
866 self.assertEqual(group, "fr.comp.lang.python")
867 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
868 self.server.group("comp.lang.python.devel")
869 exc = cm.exception
870 self.assertTrue(exc.response.startswith("411 No such group"),
871 exc.response)
872
873 def test_newnews(self):
874 # NEWNEWS comp.lang.python [20]100913 082004
875 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
876 resp, ids = self.server.newnews("comp.lang.python", dt)
877 expected = (
878 "230 list of newsarticles (NNTP v{0}) "
879 "created after Mon Sep 13 08:20:04 2010 follows"
880 ).format(self.nntp_version)
881 self.assertEqual(resp, expected)
882 self.assertEqual(ids, [
883 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
884 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
885 ])
886 # NEWNEWS fr.comp.lang.python [20]100913 082004
887 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
888 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
889 self.assertEqual(resp, "230 An empty list of newsarticles follows")
890 self.assertEqual(ids, [])
891
892 def _check_article_body(self, lines):
893 self.assertEqual(len(lines), 4)
894 self.assertEqual(lines[-1].decode('utf8'), "-- Signed by André.")
895 self.assertEqual(lines[-2], b"")
896 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
897 self.assertEqual(lines[-4], b"This is just a test article.")
898
899 def _check_article_head(self, lines):
900 self.assertEqual(len(lines), 4)
901 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
902 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
903
904 def _check_article_data(self, lines):
905 self.assertEqual(len(lines), 9)
906 self._check_article_head(lines[:4])
907 self._check_article_body(lines[-4:])
908 self.assertEqual(lines[4], b"")
909
910 def test_article(self):
911 # ARTICLE
912 resp, info = self.server.article()
913 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
914 art_num, message_id, lines = info
915 self.assertEqual(art_num, 3000237)
916 self.assertEqual(message_id, "<45223423@example.com>")
917 self._check_article_data(lines)
918 # ARTICLE num
919 resp, info = self.server.article(3000234)
920 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
921 art_num, message_id, lines = info
922 self.assertEqual(art_num, 3000234)
923 self.assertEqual(message_id, "<45223423@example.com>")
924 self._check_article_data(lines)
925 # ARTICLE id
926 resp, info = self.server.article("<45223423@example.com>")
927 self.assertEqual(resp, "220 0 <45223423@example.com>")
928 art_num, message_id, lines = info
929 self.assertEqual(art_num, 0)
930 self.assertEqual(message_id, "<45223423@example.com>")
931 self._check_article_data(lines)
932 # Non-existent id
933 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
934 self.server.article("<non-existent@example.com>")
935 self.assertEqual(cm.exception.response, "430 No Such Article Found")
936
937 def test_article_file(self):
938 # With a "file" argument
939 f = io.BytesIO()
940 resp, info = self.server.article(file=f)
941 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
942 art_num, message_id, lines = info
943 self.assertEqual(art_num, 3000237)
944 self.assertEqual(message_id, "<45223423@example.com>")
945 self.assertEqual(lines, [])
946 data = f.getvalue()
947 self.assertTrue(data.startswith(
948 b'From: "Demo User" <nobody@example.net>\r\n'
949 b'Subject: I am just a test article\r\n'
950 ), ascii(data))
951 self.assertTrue(data.endswith(
952 b'This is just a test article.\r\n'
953 b'.Here is a dot-starting line.\r\n'
954 b'\r\n'
955 b'-- Signed by Andr\xc3\xa9.\r\n'
956 ), ascii(data))
957
958 def test_head(self):
959 # HEAD
960 resp, info = self.server.head()
961 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
962 art_num, message_id, lines = info
963 self.assertEqual(art_num, 3000237)
964 self.assertEqual(message_id, "<45223423@example.com>")
965 self._check_article_head(lines)
966 # HEAD num
967 resp, info = self.server.head(3000234)
968 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
969 art_num, message_id, lines = info
970 self.assertEqual(art_num, 3000234)
971 self.assertEqual(message_id, "<45223423@example.com>")
972 self._check_article_head(lines)
973 # HEAD id
974 resp, info = self.server.head("<45223423@example.com>")
975 self.assertEqual(resp, "221 0 <45223423@example.com>")
976 art_num, message_id, lines = info
977 self.assertEqual(art_num, 0)
978 self.assertEqual(message_id, "<45223423@example.com>")
979 self._check_article_head(lines)
980 # Non-existent id
981 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
982 self.server.head("<non-existent@example.com>")
983 self.assertEqual(cm.exception.response, "430 No Such Article Found")
984
Antoine Pitrou2640b522012-02-15 18:53:18 +0100985 def test_head_file(self):
986 f = io.BytesIO()
987 resp, info = self.server.head(file=f)
988 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
989 art_num, message_id, lines = info
990 self.assertEqual(art_num, 3000237)
991 self.assertEqual(message_id, "<45223423@example.com>")
992 self.assertEqual(lines, [])
993 data = f.getvalue()
994 self.assertTrue(data.startswith(
995 b'From: "Demo User" <nobody@example.net>\r\n'
996 b'Subject: I am just a test article\r\n'
997 ), ascii(data))
998 self.assertFalse(data.endswith(
999 b'This is just a test article.\r\n'
1000 b'.Here is a dot-starting line.\r\n'
1001 b'\r\n'
1002 b'-- Signed by Andr\xc3\xa9.\r\n'
1003 ), ascii(data))
1004
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001005 def test_body(self):
1006 # BODY
1007 resp, info = self.server.body()
1008 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1009 art_num, message_id, lines = info
1010 self.assertEqual(art_num, 3000237)
1011 self.assertEqual(message_id, "<45223423@example.com>")
1012 self._check_article_body(lines)
1013 # BODY num
1014 resp, info = self.server.body(3000234)
1015 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
1016 art_num, message_id, lines = info
1017 self.assertEqual(art_num, 3000234)
1018 self.assertEqual(message_id, "<45223423@example.com>")
1019 self._check_article_body(lines)
1020 # BODY id
1021 resp, info = self.server.body("<45223423@example.com>")
1022 self.assertEqual(resp, "222 0 <45223423@example.com>")
1023 art_num, message_id, lines = info
1024 self.assertEqual(art_num, 0)
1025 self.assertEqual(message_id, "<45223423@example.com>")
1026 self._check_article_body(lines)
1027 # Non-existent id
1028 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1029 self.server.body("<non-existent@example.com>")
1030 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1031
Antoine Pitrou2640b522012-02-15 18:53:18 +01001032 def test_body_file(self):
1033 f = io.BytesIO()
1034 resp, info = self.server.body(file=f)
1035 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1036 art_num, message_id, lines = info
1037 self.assertEqual(art_num, 3000237)
1038 self.assertEqual(message_id, "<45223423@example.com>")
1039 self.assertEqual(lines, [])
1040 data = f.getvalue()
1041 self.assertFalse(data.startswith(
1042 b'From: "Demo User" <nobody@example.net>\r\n'
1043 b'Subject: I am just a test article\r\n'
1044 ), ascii(data))
1045 self.assertTrue(data.endswith(
1046 b'This is just a test article.\r\n'
1047 b'.Here is a dot-starting line.\r\n'
1048 b'\r\n'
1049 b'-- Signed by Andr\xc3\xa9.\r\n'
1050 ), ascii(data))
1051
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001052 def check_over_xover_resp(self, resp, overviews):
1053 self.assertTrue(resp.startswith("224 "), resp)
1054 self.assertEqual(len(overviews), 3)
1055 art_num, over = overviews[0]
1056 self.assertEqual(art_num, 57)
1057 self.assertEqual(over, {
1058 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
1059 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
1060 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
1061 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
1062 "references": "<hvalf7$ort$1@dough.gmane.org>",
1063 ":bytes": "7103",
1064 ":lines": "16",
1065 "xref": "news.gmane.org gmane.comp.python.authors:57"
1066 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001067 art_num, over = overviews[1]
1068 self.assertEqual(over["xref"], None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001069 art_num, over = overviews[2]
1070 self.assertEqual(over["subject"],
1071 "Re: Message d'erreur incompréhensible (par moi)")
1072
1073 def test_xover(self):
1074 resp, overviews = self.server.xover(57, 59)
1075 self.check_over_xover_resp(resp, overviews)
1076
1077 def test_over(self):
1078 # In NNTP "v1", this will fallback on XOVER
1079 resp, overviews = self.server.over((57, 59))
1080 self.check_over_xover_resp(resp, overviews)
1081
1082 sample_post = (
1083 b'From: "Demo User" <nobody@example.net>\r\n'
1084 b'Subject: I am just a test article\r\n'
1085 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
1086 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
1087 b'\r\n'
1088 b'This is just a test article.\r\n'
1089 b'.Here is a dot-starting line.\r\n'
1090 b'\r\n'
1091 b'-- Signed by Andr\xc3\xa9.\r\n'
1092 )
1093
1094 def _check_posted_body(self):
1095 # Check the raw body as received by the server
1096 lines = self.handler.posted_body
1097 # One additional line for the "." terminator
1098 self.assertEqual(len(lines), 10)
1099 self.assertEqual(lines[-1], b'.\r\n')
1100 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
1101 self.assertEqual(lines[-3], b'\r\n')
1102 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
1103 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
1104
1105 def _check_post_ihave_sub(self, func, *args, file_factory):
1106 # First the prepared post with CRLF endings
1107 post = self.sample_post
1108 func_args = args + (file_factory(post),)
1109 self.handler.posted_body = None
1110 resp = func(*func_args)
1111 self._check_posted_body()
1112 # Then the same post with "normal" line endings - they should be
1113 # converted by NNTP.post and NNTP.ihave.
1114 post = self.sample_post.replace(b"\r\n", b"\n")
1115 func_args = args + (file_factory(post),)
1116 self.handler.posted_body = None
1117 resp = func(*func_args)
1118 self._check_posted_body()
1119 return resp
1120
1121 def check_post_ihave(self, func, success_resp, *args):
1122 # With a bytes object
1123 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1124 self.assertEqual(resp, success_resp)
1125 # With a bytearray object
1126 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1127 self.assertEqual(resp, success_resp)
1128 # With a file object
1129 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1130 self.assertEqual(resp, success_resp)
1131 # With an iterable of terminated lines
1132 def iterlines(b):
1133 return iter(b.splitlines(True))
1134 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1135 self.assertEqual(resp, success_resp)
1136 # With an iterable of non-terminated lines
1137 def iterlines(b):
1138 return iter(b.splitlines(False))
1139 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1140 self.assertEqual(resp, success_resp)
1141
1142 def test_post(self):
1143 self.check_post_ihave(self.server.post, "240 Article received OK")
1144 self.handler.allow_posting = False
1145 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1146 self.server.post(self.sample_post)
1147 self.assertEqual(cm.exception.response,
1148 "440 Posting not permitted")
1149
1150 def test_ihave(self):
1151 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1152 "<i.am.an.article.you.will.want@example.com>")
1153 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1154 self.server.ihave("<another.message.id>", self.sample_post)
1155 self.assertEqual(cm.exception.response,
1156 "435 Article not wanted")
1157
1158
1159class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1160 """Tests an NNTP v1 server (no capabilities)."""
1161
1162 nntp_version = 1
1163 handler_class = NNTPv1Handler
1164
1165 def test_caps(self):
1166 caps = self.server.getcapabilities()
1167 self.assertEqual(caps, {})
1168 self.assertEqual(self.server.nntp_version, 1)
Antoine Pitroua0781152010-11-05 19:16:37 +00001169 self.assertEqual(self.server.nntp_implementation, None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001170
1171
1172class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1173 """Tests an NNTP v2 server (with capabilities)."""
1174
1175 nntp_version = 2
1176 handler_class = NNTPv2Handler
1177
1178 def test_caps(self):
1179 caps = self.server.getcapabilities()
1180 self.assertEqual(caps, {
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001181 'VERSION': ['2', '3'],
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001182 'IMPLEMENTATION': ['INN', '2.5.1'],
1183 'AUTHINFO': ['USER'],
1184 'HDR': [],
1185 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1186 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1187 'OVER': [],
1188 'POST': [],
1189 'READER': [],
1190 })
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001191 self.assertEqual(self.server.nntp_version, 3)
Antoine Pitroua0781152010-11-05 19:16:37 +00001192 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001193
1194
Antoine Pitrou54411c12012-02-12 19:14:17 +01001195class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
1196 """Tests a probably NNTP v2 server with capabilities only after login."""
1197
1198 nntp_version = 2
1199 handler_class = CapsAfterLoginNNTPv2Handler
1200
1201 def test_caps_only_after_login(self):
1202 self.assertEqual(self.server._caps, {})
1203 self.server.login('testuser', 'testpw')
1204 self.assertIn('VERSION', self.server._caps)
1205
1206
Antoine Pitrou71135622012-02-14 23:29:34 +01001207class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin,
1208 unittest.TestCase):
1209 """Same tests as for v2 but we tell NTTP to send MODE READER to a server
1210 that isn't in READER mode by default."""
1211
1212 nntp_version = 2
1213 handler_class = ModeSwitchingNNTPv2Handler
1214
1215 def test_we_are_in_reader_mode_after_connect(self):
1216 self.assertIn('READER', self.server._caps)
1217
1218
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001219class MiscTests(unittest.TestCase):
1220
1221 def test_decode_header(self):
1222 def gives(a, b):
1223 self.assertEqual(nntplib.decode_header(a), b)
1224 gives("" , "")
1225 gives("a plain header", "a plain header")
1226 gives(" with extra spaces ", " with extra spaces ")
1227 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
1228 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1229 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1230 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1231 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1232 "Re: problème de matrice")
1233 # A natively utf-8 header (found in the real world!)
1234 gives("Re: Message d'erreur incompréhensible (par moi)",
1235 "Re: Message d'erreur incompréhensible (par moi)")
1236
1237 def test_parse_overview_fmt(self):
1238 # The minimal (default) response
1239 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1240 "References:", ":bytes", ":lines"]
1241 self.assertEqual(nntplib._parse_overview_fmt(lines),
1242 ["subject", "from", "date", "message-id", "references",
1243 ":bytes", ":lines"])
1244 # The minimal response using alternative names
1245 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1246 "References:", "Bytes:", "Lines:"]
1247 self.assertEqual(nntplib._parse_overview_fmt(lines),
1248 ["subject", "from", "date", "message-id", "references",
1249 ":bytes", ":lines"])
1250 # Variations in casing
1251 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1252 "References:", "BYTES:", "Lines:"]
1253 self.assertEqual(nntplib._parse_overview_fmt(lines),
1254 ["subject", "from", "date", "message-id", "references",
1255 ":bytes", ":lines"])
1256 # First example from RFC 3977
1257 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1258 "References:", ":bytes", ":lines", "Xref:full",
1259 "Distribution:full"]
1260 self.assertEqual(nntplib._parse_overview_fmt(lines),
1261 ["subject", "from", "date", "message-id", "references",
1262 ":bytes", ":lines", "xref", "distribution"])
1263 # Second example from RFC 3977
1264 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1265 "References:", "Bytes:", "Lines:", "Xref:FULL",
1266 "Distribution:FULL"]
1267 self.assertEqual(nntplib._parse_overview_fmt(lines),
1268 ["subject", "from", "date", "message-id", "references",
1269 ":bytes", ":lines", "xref", "distribution"])
1270 # A classic response from INN
1271 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1272 "References:", "Bytes:", "Lines:", "Xref:full"]
1273 self.assertEqual(nntplib._parse_overview_fmt(lines),
1274 ["subject", "from", "date", "message-id", "references",
1275 ":bytes", ":lines", "xref"])
1276
1277 def test_parse_overview(self):
1278 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1279 # First example from RFC 3977
1280 lines = [
1281 '3000234\tI am just a test article\t"Demo User" '
1282 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1283 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1284 '17\tXref: news.example.com misc.test:3000363',
1285 ]
1286 overview = nntplib._parse_overview(lines, fmt)
1287 (art_num, fields), = overview
1288 self.assertEqual(art_num, 3000234)
1289 self.assertEqual(fields, {
1290 'subject': 'I am just a test article',
1291 'from': '"Demo User" <nobody@example.com>',
1292 'date': '6 Oct 1998 04:38:40 -0500',
1293 'message-id': '<45223423@example.com>',
1294 'references': '<45454@example.net>',
1295 ':bytes': '1234',
1296 ':lines': '17',
1297 'xref': 'news.example.com misc.test:3000363',
1298 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001299 # Second example; here the "Xref" field is totally absent (including
1300 # the header name) and comes out as None
1301 lines = [
1302 '3000234\tI am just a test article\t"Demo User" '
1303 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1304 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1305 '17\t\t',
1306 ]
1307 overview = nntplib._parse_overview(lines, fmt)
1308 (art_num, fields), = overview
1309 self.assertEqual(fields['xref'], None)
1310 # Third example; the "Xref" is an empty string, while "references"
1311 # is a single space.
1312 lines = [
1313 '3000234\tI am just a test article\t"Demo User" '
1314 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1315 '<45223423@example.com>\t \t1234\t'
1316 '17\tXref: \t',
1317 ]
1318 overview = nntplib._parse_overview(lines, fmt)
1319 (art_num, fields), = overview
1320 self.assertEqual(fields['references'], ' ')
1321 self.assertEqual(fields['xref'], '')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001322
1323 def test_parse_datetime(self):
1324 def gives(a, b, *c):
1325 self.assertEqual(nntplib._parse_datetime(a, b),
1326 datetime.datetime(*c))
1327 # Output of DATE command
1328 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1329 # Variations
1330 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1331 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1332 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1333
1334 def test_unparse_datetime(self):
1335 # Test non-legacy mode
1336 # 1) with a datetime
1337 def gives(y, M, d, h, m, s, date_str, time_str):
1338 dt = datetime.datetime(y, M, d, h, m, s)
1339 self.assertEqual(nntplib._unparse_datetime(dt),
1340 (date_str, time_str))
1341 self.assertEqual(nntplib._unparse_datetime(dt, False),
1342 (date_str, time_str))
1343 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1344 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1345 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1346 # 2) with a date
1347 def gives(y, M, d, date_str, time_str):
1348 dt = datetime.date(y, M, d)
1349 self.assertEqual(nntplib._unparse_datetime(dt),
1350 (date_str, time_str))
1351 self.assertEqual(nntplib._unparse_datetime(dt, False),
1352 (date_str, time_str))
1353 gives(1999, 6, 23, "19990623", "000000")
1354 gives(2000, 6, 23, "20000623", "000000")
1355 gives(2010, 6, 5, "20100605", "000000")
1356
1357 def test_unparse_datetime_legacy(self):
1358 # Test legacy mode (RFC 977)
1359 # 1) with a datetime
1360 def gives(y, M, d, h, m, s, date_str, time_str):
1361 dt = datetime.datetime(y, M, d, h, m, s)
1362 self.assertEqual(nntplib._unparse_datetime(dt, True),
1363 (date_str, time_str))
1364 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1365 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1366 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1367 # 2) with a date
1368 def gives(y, M, d, date_str, time_str):
1369 dt = datetime.date(y, M, d)
1370 self.assertEqual(nntplib._unparse_datetime(dt, True),
1371 (date_str, time_str))
1372 gives(1999, 6, 23, "990623", "000000")
1373 gives(2000, 6, 23, "000623", "000000")
1374 gives(2010, 6, 5, "100605", "000000")
1375
1376
1377def test_main():
Antoine Pitrou54411c12012-02-12 19:14:17 +01001378 tests = [MiscTests, NNTPv1Tests, NNTPv2Tests, CapsAfterLoginNNTPv2Tests,
Antoine Pitrou71135622012-02-14 23:29:34 +01001379 SendReaderNNTPv2Tests, NetworkedNNTPTests]
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00001380 if _have_ssl:
1381 tests.append(NetworkedNNTP_SSLTests)
1382 support.run_unittest(*tests)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001383
1384
1385if __name__ == "__main__":
1386 test_main()