blob: cc490f1e97ed8aabb3b659c22b1f6b6541a17712 [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
2import datetime
3import textwrap
4import unittest
Antoine Pitroude609182010-11-18 17:29:23 +00005import functools
Antoine Pitrou69ab9512010-09-29 15:03:40 +00006import contextlib
7from test import support
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00008from nntplib import NNTP, GroupInfo, _have_ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +00009import nntplib
Antoine Pitrou1cb121e2010-11-09 18:54:37 +000010if _have_ssl:
11 import ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +000012
13TIMEOUT = 30
14
15# TODO:
16# - test the `file` arg to more commands
17# - test error conditions
Antoine Pitroua5785b12010-09-29 16:19:50 +000018# - test auth and `usenetrc`
Antoine Pitrou69ab9512010-09-29 15:03:40 +000019
20
21class NetworkedNNTPTestsMixin:
22
23 def test_welcome(self):
24 welcome = self.server.getwelcome()
25 self.assertEqual(str, type(welcome))
26
27 def test_help(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000028 resp, lines = self.server.help()
Antoine Pitrou69ab9512010-09-29 15:03:40 +000029 self.assertTrue(resp.startswith("100 "), resp)
Antoine Pitrou08eeada2010-11-04 21:36:15 +000030 for line in lines:
Antoine Pitrou69ab9512010-09-29 15:03:40 +000031 self.assertEqual(str, type(line))
32
33 def test_list(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000034 resp, groups = self.server.list()
35 if len(groups) > 0:
36 self.assertEqual(GroupInfo, type(groups[0]))
37 self.assertEqual(str, type(groups[0].group))
38
39 def test_list_active(self):
40 resp, groups = self.server.list(self.GROUP_PAT)
41 if len(groups) > 0:
42 self.assertEqual(GroupInfo, type(groups[0]))
43 self.assertEqual(str, type(groups[0].group))
Antoine Pitrou69ab9512010-09-29 15:03:40 +000044
45 def test_unknown_command(self):
46 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
47 self.server._shortcmd("XYZZY")
48 resp = cm.exception.response
49 self.assertTrue(resp.startswith("500 "), resp)
50
51 def test_newgroups(self):
52 # gmane gets a constant influx of new groups. In order not to stress
53 # the server too much, we choose a recent date in the past.
54 dt = datetime.date.today() - datetime.timedelta(days=7)
55 resp, groups = self.server.newgroups(dt)
56 if len(groups) > 0:
57 self.assertIsInstance(groups[0], GroupInfo)
58 self.assertIsInstance(groups[0].group, str)
59
60 def test_description(self):
61 def _check_desc(desc):
62 # Sanity checks
63 self.assertIsInstance(desc, str)
64 self.assertNotIn(self.GROUP_NAME, desc)
65 desc = self.server.description(self.GROUP_NAME)
66 _check_desc(desc)
67 # Another sanity check
68 self.assertIn("Python", desc)
69 # With a pattern
70 desc = self.server.description(self.GROUP_PAT)
71 _check_desc(desc)
72 # Shouldn't exist
73 desc = self.server.description("zk.brrtt.baz")
74 self.assertEqual(desc, '')
75
76 def test_descriptions(self):
77 resp, descs = self.server.descriptions(self.GROUP_PAT)
78 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
79 self.assertTrue(
80 resp.startswith("215 ") or resp.startswith("282 "), resp)
81 self.assertIsInstance(descs, dict)
82 desc = descs[self.GROUP_NAME]
83 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
84
85 def test_group(self):
86 result = self.server.group(self.GROUP_NAME)
87 self.assertEqual(5, len(result))
88 resp, count, first, last, group = result
89 self.assertEqual(group, self.GROUP_NAME)
90 self.assertIsInstance(count, int)
91 self.assertIsInstance(first, int)
92 self.assertIsInstance(last, int)
93 self.assertLessEqual(first, last)
94 self.assertTrue(resp.startswith("211 "), resp)
95
96 def test_date(self):
97 resp, date = self.server.date()
98 self.assertIsInstance(date, datetime.datetime)
99 # Sanity check
100 self.assertGreaterEqual(date.year, 1995)
101 self.assertLessEqual(date.year, 2030)
102
103 def _check_art_dict(self, art_dict):
104 # Some sanity checks for a field dictionary returned by OVER / XOVER
105 self.assertIsInstance(art_dict, dict)
106 # NNTP has 7 mandatory fields
107 self.assertGreaterEqual(art_dict.keys(),
108 {"subject", "from", "date", "message-id",
109 "references", ":bytes", ":lines"}
110 )
111 for v in art_dict.values():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000112 self.assertIsInstance(v, (str, type(None)))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000113
114 def test_xover(self):
115 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000116 resp, lines = self.server.xover(last - 5, last)
117 if len(lines) == 0:
118 self.skipTest("no articles retrieved")
119 # The 'last' article is not necessarily part of the output (cancelled?)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000120 art_num, art_dict = lines[0]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000121 self.assertGreaterEqual(art_num, last - 5)
122 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000123 self._check_art_dict(art_dict)
124
125 def test_over(self):
126 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
127 start = last - 10
128 # The "start-" article range form
129 resp, lines = self.server.over((start, None))
130 art_num, art_dict = lines[0]
131 self._check_art_dict(art_dict)
132 # The "start-end" article range form
133 resp, lines = self.server.over((start, last))
134 art_num, art_dict = lines[-1]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000135 # The 'last' article is not necessarily part of the output (cancelled?)
136 self.assertGreaterEqual(art_num, start)
137 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000138 self._check_art_dict(art_dict)
139 # XXX The "message_id" form is unsupported by gmane
140 # 503 Overview by message-ID unsupported
141
142 def test_xhdr(self):
143 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
144 resp, lines = self.server.xhdr('subject', last)
145 for line in lines:
146 self.assertEqual(str, type(line[1]))
147
148 def check_article_resp(self, resp, article, art_num=None):
149 self.assertIsInstance(article, nntplib.ArticleInfo)
150 if art_num is not None:
151 self.assertEqual(article.number, art_num)
152 for line in article.lines:
153 self.assertIsInstance(line, bytes)
154 # XXX this could exceptionally happen...
155 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
156
157 def test_article_head_body(self):
158 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000159 # Try to find an available article
160 for art_num in (last, first, last - 1):
161 try:
162 resp, head = self.server.head(art_num)
163 except nntplib.NNTPTemporaryError as e:
164 if not e.response.startswith("423 "):
165 raise
166 # "423 No such article" => choose another one
167 continue
168 break
169 else:
170 self.skipTest("could not find a suitable article number")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000171 self.assertTrue(resp.startswith("221 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000172 self.check_article_resp(resp, head, art_num)
173 resp, body = self.server.body(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000174 self.assertTrue(resp.startswith("222 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000175 self.check_article_resp(resp, body, art_num)
176 resp, article = self.server.article(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000177 self.assertTrue(resp.startswith("220 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000178 self.check_article_resp(resp, article, art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000179 self.assertEqual(article.lines, head.lines + [b''] + body.lines)
180
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000181 def test_capabilities(self):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000182 # The server under test implements NNTP version 2 and has a
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000183 # couple of well-known capabilities. Just sanity check that we
184 # got them.
185 def _check_caps(caps):
186 caps_list = caps['LIST']
187 self.assertIsInstance(caps_list, (list, tuple))
188 self.assertIn('OVERVIEW.FMT', caps_list)
189 self.assertGreaterEqual(self.server.nntp_version, 2)
190 _check_caps(self.server.getcapabilities())
191 # This re-emits the command
192 resp, caps = self.server.capabilities()
193 _check_caps(caps)
194
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000195 if _have_ssl:
196 def test_starttls(self):
197 file = self.server.file
198 sock = self.server.sock
199 try:
200 self.server.starttls()
201 except nntplib.NNTPPermanentError:
202 self.skipTest("STARTTLS not supported by server.")
203 else:
204 # Check that the socket and internal pseudo-file really were
205 # changed.
206 self.assertNotEqual(file, self.server.file)
207 self.assertNotEqual(sock, self.server.sock)
208 # Check that the new socket really is an SSL one
209 self.assertIsInstance(self.server.sock, ssl.SSLSocket)
210 # Check that trying starttls when it's already active fails.
211 self.assertRaises(ValueError, self.server.starttls)
212
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000213 def test_zlogin(self):
214 # This test must be the penultimate because further commands will be
215 # refused.
216 baduser = "notarealuser"
217 badpw = "notarealpassword"
218 # Check that bogus credentials cause failure
219 self.assertRaises(nntplib.NNTPError, self.server.login,
220 user=baduser, password=badpw, usenetrc=False)
221 # FIXME: We should check that correct credentials succeed, but that
222 # would require valid details for some server somewhere to be in the
223 # test suite, I think. Gmane is anonymous, at least as used for the
224 # other tests.
225
226 def test_zzquit(self):
227 # This test must be called last, hence the name
228 cls = type(self)
Antoine Pitrou3bce11c2010-11-21 17:14:19 +0000229 try:
230 self.server.quit()
231 finally:
232 cls.server = None
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000233
Antoine Pitroude609182010-11-18 17:29:23 +0000234 @classmethod
235 def wrap_methods(cls):
236 # Wrap all methods in a transient_internet() exception catcher
237 # XXX put a generic version in test.support?
238 def wrap_meth(meth):
239 @functools.wraps(meth)
240 def wrapped(self):
241 with support.transient_internet(self.NNTP_HOST):
242 meth(self)
243 return wrapped
244 for name in dir(cls):
245 if not name.startswith('test_'):
246 continue
247 meth = getattr(cls, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200248 if not callable(meth):
Antoine Pitroude609182010-11-18 17:29:23 +0000249 continue
250 # Need to use a closure so that meth remains bound to its current
251 # value
252 setattr(cls, name, wrap_meth(meth))
253
254NetworkedNNTPTestsMixin.wrap_methods()
255
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000256
257class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
258 # This server supports STARTTLS (gmane doesn't)
259 NNTP_HOST = 'news.trigofacile.com'
260 GROUP_NAME = 'fr.comp.lang.python'
261 GROUP_PAT = 'fr.comp.lang.*'
262
Antoine Pitroude609182010-11-18 17:29:23 +0000263 NNTP_CLASS = NNTP
264
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000265 @classmethod
266 def setUpClass(cls):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000267 support.requires("network")
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000268 with support.transient_internet(cls.NNTP_HOST):
Antoine Pitroude609182010-11-18 17:29:23 +0000269 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, timeout=TIMEOUT, usenetrc=False)
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000270
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000271 @classmethod
272 def tearDownClass(cls):
273 if cls.server is not None:
274 cls.server.quit()
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000275
276
277if _have_ssl:
Antoine Pitroude609182010-11-18 17:29:23 +0000278 class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000279
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000280 # Technical limits for this public NNTP server (see http://www.aioe.org):
281 # "Only two concurrent connections per IP address are allowed and
282 # 400 connections per day are accepted from each IP address."
283
284 NNTP_HOST = 'nntp.aioe.org'
285 GROUP_NAME = 'comp.lang.python'
286 GROUP_PAT = 'comp.lang.*'
287
Antoine Pitroude609182010-11-18 17:29:23 +0000288 NNTP_CLASS = nntplib.NNTP_SSL
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000289
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000290 # Disabled as it produces too much data
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000291 test_list = None
292
293 # Disabled as the connection will already be encrypted.
294 test_starttls = None
295
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000296
297#
298# Non-networked tests using a local server (or something mocking it).
299#
300
301class _NNTPServerIO(io.RawIOBase):
302 """A raw IO object allowing NNTP commands to be received and processed
303 by a handler. The handler can push responses which can then be read
304 from the IO object."""
305
306 def __init__(self, handler):
307 io.RawIOBase.__init__(self)
308 # The channel from the client
309 self.c2s = io.BytesIO()
310 # The channel to the client
311 self.s2c = io.BytesIO()
312 self.handler = handler
313 self.handler.start(self.c2s.readline, self.push_data)
314
315 def readable(self):
316 return True
317
318 def writable(self):
319 return True
320
321 def push_data(self, data):
322 """Push (buffer) some data to send to the client."""
323 pos = self.s2c.tell()
324 self.s2c.seek(0, 2)
325 self.s2c.write(data)
326 self.s2c.seek(pos)
327
328 def write(self, b):
329 """The client sends us some data"""
330 pos = self.c2s.tell()
331 self.c2s.write(b)
332 self.c2s.seek(pos)
333 self.handler.process_pending()
334 return len(b)
335
336 def readinto(self, buf):
337 """The client wants to read a response"""
338 self.handler.process_pending()
339 b = self.s2c.read(len(buf))
340 n = len(b)
341 buf[:n] = b
342 return n
343
344
345class MockedNNTPTestsMixin:
346 # Override in derived classes
347 handler_class = None
348
349 def setUp(self):
350 super().setUp()
351 self.make_server()
352
353 def tearDown(self):
354 super().tearDown()
355 del self.server
356
357 def make_server(self, *args, **kwargs):
358 self.handler = self.handler_class()
359 self.sio = _NNTPServerIO(self.handler)
360 # Using BufferedRWPair instead of BufferedRandom ensures the file
361 # isn't seekable.
362 file = io.BufferedRWPair(self.sio, self.sio)
Antoine Pitroua5785b12010-09-29 16:19:50 +0000363 self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000364 return self.server
365
366
Antoine Pitrou71135622012-02-14 23:29:34 +0100367class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin):
368 def setUp(self):
369 super().setUp()
370 self.make_server(readermode=True)
371
372
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000373class NNTPv1Handler:
374 """A handler for RFC 977"""
375
376 welcome = "200 NNTP mock server"
377
378 def start(self, readline, push_data):
379 self.in_body = False
380 self.allow_posting = True
381 self._readline = readline
382 self._push_data = push_data
Antoine Pitrou54411c12012-02-12 19:14:17 +0100383 self._logged_in = False
384 self._user_sent = False
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000385 # Our welcome
386 self.handle_welcome()
387
388 def _decode(self, data):
389 return str(data, "utf-8", "surrogateescape")
390
391 def process_pending(self):
392 if self.in_body:
393 while True:
394 line = self._readline()
395 if not line:
396 return
397 self.body.append(line)
398 if line == b".\r\n":
399 break
400 try:
401 meth, tokens = self.body_callback
402 meth(*tokens, body=self.body)
403 finally:
404 self.body_callback = None
405 self.body = None
406 self.in_body = False
407 while True:
408 line = self._decode(self._readline())
409 if not line:
410 return
411 if not line.endswith("\r\n"):
412 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
413 line = line[:-2]
414 cmd, *tokens = line.split()
415 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
416 meth = getattr(self, "handle_" + cmd.upper(), None)
417 if meth is None:
418 self.handle_unknown()
419 else:
420 try:
421 meth(*tokens)
422 except Exception as e:
423 raise ValueError("command failed: {!r}".format(line)) from e
424 else:
425 if self.in_body:
426 self.body_callback = meth, tokens
427 self.body = []
428
429 def expect_body(self):
430 """Flag that the client is expected to post a request body"""
431 self.in_body = True
432
433 def push_data(self, data):
434 """Push some binary data"""
435 self._push_data(data)
436
437 def push_lit(self, lit):
438 """Push a string literal"""
439 lit = textwrap.dedent(lit)
440 lit = "\r\n".join(lit.splitlines()) + "\r\n"
441 lit = lit.encode('utf-8')
442 self.push_data(lit)
443
444 def handle_unknown(self):
445 self.push_lit("500 What?")
446
447 def handle_welcome(self):
448 self.push_lit(self.welcome)
449
450 def handle_QUIT(self):
451 self.push_lit("205 Bye!")
452
453 def handle_DATE(self):
454 self.push_lit("111 20100914001155")
455
456 def handle_GROUP(self, group):
457 if group == "fr.comp.lang.python":
458 self.push_lit("211 486 761 1265 fr.comp.lang.python")
459 else:
460 self.push_lit("411 No such group {}".format(group))
461
462 def handle_HELP(self):
463 self.push_lit("""\
464 100 Legal commands
465 authinfo user Name|pass Password|generic <prog> <args>
466 date
467 help
468 Report problems to <root@example.org>
469 .""")
470
471 def handle_STAT(self, message_spec=None):
472 if message_spec is None:
473 self.push_lit("412 No newsgroup selected")
474 elif message_spec == "3000234":
475 self.push_lit("223 3000234 <45223423@example.com>")
476 elif message_spec == "<45223423@example.com>":
477 self.push_lit("223 0 <45223423@example.com>")
478 else:
479 self.push_lit("430 No Such Article Found")
480
481 def handle_NEXT(self):
482 self.push_lit("223 3000237 <668929@example.org> retrieved")
483
484 def handle_LAST(self):
485 self.push_lit("223 3000234 <45223423@example.com> retrieved")
486
487 def handle_LIST(self, action=None, param=None):
488 if action is None:
489 self.push_lit("""\
490 215 Newsgroups in form "group high low flags".
491 comp.lang.python 0000052340 0000002828 y
492 comp.lang.python.announce 0000001153 0000000993 m
493 free.it.comp.lang.python 0000000002 0000000002 y
494 fr.comp.lang.python 0000001254 0000000760 y
495 free.it.comp.lang.python.learner 0000000000 0000000001 y
496 tw.bbs.comp.lang.python 0000000304 0000000304 y
497 .""")
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000498 elif action == "ACTIVE":
499 if param == "*distutils*":
500 self.push_lit("""\
501 215 Newsgroups in form "group high low flags"
502 gmane.comp.python.distutils.devel 0000014104 0000000001 m
503 gmane.comp.python.distutils.cvs 0000000000 0000000001 m
504 .""")
505 else:
506 self.push_lit("""\
507 215 Newsgroups in form "group high low flags"
508 .""")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000509 elif action == "OVERVIEW.FMT":
510 self.push_lit("""\
511 215 Order of fields in overview database.
512 Subject:
513 From:
514 Date:
515 Message-ID:
516 References:
517 Bytes:
518 Lines:
519 Xref:full
520 .""")
521 elif action == "NEWSGROUPS":
522 assert param is not None
523 if param == "comp.lang.python":
524 self.push_lit("""\
525 215 Descriptions in form "group description".
526 comp.lang.python\tThe Python computer language.
527 .""")
528 elif param == "comp.lang.python*":
529 self.push_lit("""\
530 215 Descriptions in form "group description".
531 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
532 comp.lang.python\tThe Python computer language.
533 .""")
534 else:
535 self.push_lit("""\
536 215 Descriptions in form "group description".
537 .""")
538 else:
539 self.push_lit('501 Unknown LIST keyword')
540
541 def handle_NEWNEWS(self, group, date_str, time_str):
542 # We hard code different return messages depending on passed
543 # argument and date syntax.
544 if (group == "comp.lang.python" and date_str == "20100913"
545 and time_str == "082004"):
546 # Date was passed in RFC 3977 format (NNTP "v2")
547 self.push_lit("""\
548 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
549 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
550 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
551 .""")
552 elif (group == "comp.lang.python" and date_str == "100913"
553 and time_str == "082004"):
554 # Date was passed in RFC 977 format (NNTP "v1")
555 self.push_lit("""\
556 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
557 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
558 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
559 .""")
560 else:
561 self.push_lit("""\
562 230 An empty list of newsarticles follows
563 .""")
564 # (Note for experiments: many servers disable NEWNEWS.
565 # As of this writing, sicinfo3.epfl.ch doesn't.)
566
567 def handle_XOVER(self, message_spec):
568 if message_spec == "57-59":
569 self.push_lit(
570 "224 Overview information for 57-58 follows\n"
571 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
572 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
573 "\tSat, 19 Jun 2010 18:04:08 -0400"
574 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
575 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
576 "\tXref: news.gmane.org gmane.comp.python.authors:57"
577 "\n"
578 "58\tLooking for a few good bloggers"
579 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
580 "\tThu, 22 Jul 2010 09:14:14 -0400"
581 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
582 "\t\t6683\t16"
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000583 "\t"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000584 "\n"
585 # An UTF-8 overview line from fr.comp.lang.python
586 "59\tRe: Message d'erreur incompréhensible (par moi)"
587 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
588 "\tWed, 15 Sep 2010 18:09:15 +0200"
589 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
590 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
591 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
592 "\n"
593 ".\n")
594 else:
595 self.push_lit("""\
596 224 No articles
597 .""")
598
599 def handle_POST(self, *, body=None):
600 if body is None:
601 if self.allow_posting:
602 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
603 self.expect_body()
604 else:
605 self.push_lit("440 Posting not permitted")
606 else:
607 assert self.allow_posting
608 self.push_lit("240 Article received OK")
609 self.posted_body = body
610
611 def handle_IHAVE(self, message_id, *, body=None):
612 if body is None:
613 if (self.allow_posting and
614 message_id == "<i.am.an.article.you.will.want@example.com>"):
615 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
616 self.expect_body()
617 else:
618 self.push_lit("435 Article not wanted")
619 else:
620 assert self.allow_posting
621 self.push_lit("235 Article transferred OK")
622 self.posted_body = body
623
624 sample_head = """\
625 From: "Demo User" <nobody@example.net>
626 Subject: I am just a test article
627 Content-Type: text/plain; charset=UTF-8; format=flowed
628 Message-ID: <i.am.an.article.you.will.want@example.com>"""
629
630 sample_body = """\
631 This is just a test article.
632 ..Here is a dot-starting line.
633
634 -- Signed by Andr\xe9."""
635
636 sample_article = sample_head + "\n\n" + sample_body
637
638 def handle_ARTICLE(self, message_spec=None):
639 if message_spec is None:
640 self.push_lit("220 3000237 <45223423@example.com>")
641 elif message_spec == "<45223423@example.com>":
642 self.push_lit("220 0 <45223423@example.com>")
643 elif message_spec == "3000234":
644 self.push_lit("220 3000234 <45223423@example.com>")
645 else:
646 self.push_lit("430 No Such Article Found")
647 return
648 self.push_lit(self.sample_article)
649 self.push_lit(".")
650
651 def handle_HEAD(self, message_spec=None):
652 if message_spec is None:
653 self.push_lit("221 3000237 <45223423@example.com>")
654 elif message_spec == "<45223423@example.com>":
655 self.push_lit("221 0 <45223423@example.com>")
656 elif message_spec == "3000234":
657 self.push_lit("221 3000234 <45223423@example.com>")
658 else:
659 self.push_lit("430 No Such Article Found")
660 return
661 self.push_lit(self.sample_head)
662 self.push_lit(".")
663
664 def handle_BODY(self, message_spec=None):
665 if message_spec is None:
666 self.push_lit("222 3000237 <45223423@example.com>")
667 elif message_spec == "<45223423@example.com>":
668 self.push_lit("222 0 <45223423@example.com>")
669 elif message_spec == "3000234":
670 self.push_lit("222 3000234 <45223423@example.com>")
671 else:
672 self.push_lit("430 No Such Article Found")
673 return
674 self.push_lit(self.sample_body)
675 self.push_lit(".")
676
Antoine Pitrou54411c12012-02-12 19:14:17 +0100677 def handle_AUTHINFO(self, cred_type, data):
678 if self._logged_in:
679 self.push_lit('502 Already Logged In')
680 elif cred_type == 'user':
681 if self._user_sent:
682 self.push_lit('482 User Credential Already Sent')
683 else:
684 self.push_lit('381 Password Required')
685 self._user_sent = True
686 elif cred_type == 'pass':
687 self.push_lit('281 Login Successful')
688 self._logged_in = True
689 else:
690 raise Exception('Unknown cred type {}'.format(cred_type))
691
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000692
693class NNTPv2Handler(NNTPv1Handler):
694 """A handler for RFC 3977 (NNTP "v2")"""
695
696 def handle_CAPABILITIES(self):
Antoine Pitrou54411c12012-02-12 19:14:17 +0100697 fmt = """\
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000698 101 Capability list:
Antoine Pitrouf80b3f72010-11-02 22:31:52 +0000699 VERSION 2 3
Antoine Pitrou54411c12012-02-12 19:14:17 +0100700 IMPLEMENTATION INN 2.5.1{}
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000701 HDR
702 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
703 OVER
704 POST
705 READER
Antoine Pitrou54411c12012-02-12 19:14:17 +0100706 ."""
707
708 if not self._logged_in:
709 self.push_lit(fmt.format('\n AUTHINFO USER'))
710 else:
711 self.push_lit(fmt.format(''))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000712
Antoine Pitrou71135622012-02-14 23:29:34 +0100713 def handle_MODE(self, _):
714 raise Exception('MODE READER sent despite READER has been advertised')
715
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000716 def handle_OVER(self, message_spec=None):
717 return self.handle_XOVER(message_spec)
718
719
Antoine Pitrou54411c12012-02-12 19:14:17 +0100720class CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
721 """A handler that allows CAPABILITIES only after login"""
722
723 def handle_CAPABILITIES(self):
724 if not self._logged_in:
725 self.push_lit('480 You must log in.')
726 else:
727 super().handle_CAPABILITIES()
728
729
Antoine Pitrou71135622012-02-14 23:29:34 +0100730class ModeSwitchingNNTPv2Handler(NNTPv2Handler):
731 """A server that starts in transit mode"""
732
733 def __init__(self):
734 self._switched = False
735
736 def handle_CAPABILITIES(self):
737 fmt = """\
738 101 Capability list:
739 VERSION 2 3
740 IMPLEMENTATION INN 2.5.1
741 HDR
742 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
743 OVER
744 POST
745 {}READER
746 ."""
747 if self._switched:
748 self.push_lit(fmt.format(''))
749 else:
750 self.push_lit(fmt.format('MODE-'))
751
752 def handle_MODE(self, what):
753 assert not self._switched and what == 'reader'
754 self._switched = True
755 self.push_lit('200 Posting allowed')
756
757
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000758class NNTPv1v2TestsMixin:
759
760 def setUp(self):
761 super().setUp()
762
763 def test_welcome(self):
764 self.assertEqual(self.server.welcome, self.handler.welcome)
765
Antoine Pitrou54411c12012-02-12 19:14:17 +0100766 def test_authinfo(self):
767 if self.nntp_version == 2:
768 self.assertIn('AUTHINFO', self.server._caps)
769 self.server.login('testuser', 'testpw')
770 # if AUTHINFO is gone from _caps we also know that getcapabilities()
771 # has been called after login as it should
772 self.assertNotIn('AUTHINFO', self.server._caps)
773
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000774 def test_date(self):
775 resp, date = self.server.date()
776 self.assertEqual(resp, "111 20100914001155")
777 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
778
779 def test_quit(self):
780 self.assertFalse(self.sio.closed)
781 resp = self.server.quit()
782 self.assertEqual(resp, "205 Bye!")
783 self.assertTrue(self.sio.closed)
784
785 def test_help(self):
786 resp, help = self.server.help()
787 self.assertEqual(resp, "100 Legal commands")
788 self.assertEqual(help, [
789 ' authinfo user Name|pass Password|generic <prog> <args>',
790 ' date',
791 ' help',
792 'Report problems to <root@example.org>',
793 ])
794
795 def test_list(self):
796 resp, groups = self.server.list()
797 self.assertEqual(len(groups), 6)
798 g = groups[1]
799 self.assertEqual(g,
800 GroupInfo("comp.lang.python.announce", "0000001153",
801 "0000000993", "m"))
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000802 resp, groups = self.server.list("*distutils*")
803 self.assertEqual(len(groups), 2)
804 g = groups[0]
805 self.assertEqual(g,
806 GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
807 "0000000001", "m"))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000808
809 def test_stat(self):
810 resp, art_num, message_id = self.server.stat(3000234)
811 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
812 self.assertEqual(art_num, 3000234)
813 self.assertEqual(message_id, "<45223423@example.com>")
814 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
815 self.assertEqual(resp, "223 0 <45223423@example.com>")
816 self.assertEqual(art_num, 0)
817 self.assertEqual(message_id, "<45223423@example.com>")
818 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
819 self.server.stat("<non.existent.id>")
820 self.assertEqual(cm.exception.response, "430 No Such Article Found")
821 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
822 self.server.stat()
823 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
824
825 def test_next(self):
826 resp, art_num, message_id = self.server.next()
827 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
828 self.assertEqual(art_num, 3000237)
829 self.assertEqual(message_id, "<668929@example.org>")
830
831 def test_last(self):
832 resp, art_num, message_id = self.server.last()
833 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
834 self.assertEqual(art_num, 3000234)
835 self.assertEqual(message_id, "<45223423@example.com>")
836
837 def test_description(self):
838 desc = self.server.description("comp.lang.python")
839 self.assertEqual(desc, "The Python computer language.")
840 desc = self.server.description("comp.lang.pythonx")
841 self.assertEqual(desc, "")
842
843 def test_descriptions(self):
844 resp, groups = self.server.descriptions("comp.lang.python")
845 self.assertEqual(resp, '215 Descriptions in form "group description".')
846 self.assertEqual(groups, {
847 "comp.lang.python": "The Python computer language.",
848 })
849 resp, groups = self.server.descriptions("comp.lang.python*")
850 self.assertEqual(groups, {
851 "comp.lang.python": "The Python computer language.",
852 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
853 })
854 resp, groups = self.server.descriptions("comp.lang.pythonx")
855 self.assertEqual(groups, {})
856
857 def test_group(self):
858 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
859 self.assertTrue(resp.startswith("211 "), resp)
860 self.assertEqual(first, 761)
861 self.assertEqual(last, 1265)
862 self.assertEqual(count, 486)
863 self.assertEqual(group, "fr.comp.lang.python")
864 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
865 self.server.group("comp.lang.python.devel")
866 exc = cm.exception
867 self.assertTrue(exc.response.startswith("411 No such group"),
868 exc.response)
869
870 def test_newnews(self):
871 # NEWNEWS comp.lang.python [20]100913 082004
872 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
873 resp, ids = self.server.newnews("comp.lang.python", dt)
874 expected = (
875 "230 list of newsarticles (NNTP v{0}) "
876 "created after Mon Sep 13 08:20:04 2010 follows"
877 ).format(self.nntp_version)
878 self.assertEqual(resp, expected)
879 self.assertEqual(ids, [
880 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
881 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
882 ])
883 # NEWNEWS fr.comp.lang.python [20]100913 082004
884 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
885 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
886 self.assertEqual(resp, "230 An empty list of newsarticles follows")
887 self.assertEqual(ids, [])
888
889 def _check_article_body(self, lines):
890 self.assertEqual(len(lines), 4)
891 self.assertEqual(lines[-1].decode('utf8'), "-- Signed by André.")
892 self.assertEqual(lines[-2], b"")
893 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
894 self.assertEqual(lines[-4], b"This is just a test article.")
895
896 def _check_article_head(self, lines):
897 self.assertEqual(len(lines), 4)
898 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
899 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
900
901 def _check_article_data(self, lines):
902 self.assertEqual(len(lines), 9)
903 self._check_article_head(lines[:4])
904 self._check_article_body(lines[-4:])
905 self.assertEqual(lines[4], b"")
906
907 def test_article(self):
908 # ARTICLE
909 resp, info = self.server.article()
910 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
911 art_num, message_id, lines = info
912 self.assertEqual(art_num, 3000237)
913 self.assertEqual(message_id, "<45223423@example.com>")
914 self._check_article_data(lines)
915 # ARTICLE num
916 resp, info = self.server.article(3000234)
917 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
918 art_num, message_id, lines = info
919 self.assertEqual(art_num, 3000234)
920 self.assertEqual(message_id, "<45223423@example.com>")
921 self._check_article_data(lines)
922 # ARTICLE id
923 resp, info = self.server.article("<45223423@example.com>")
924 self.assertEqual(resp, "220 0 <45223423@example.com>")
925 art_num, message_id, lines = info
926 self.assertEqual(art_num, 0)
927 self.assertEqual(message_id, "<45223423@example.com>")
928 self._check_article_data(lines)
929 # Non-existent id
930 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
931 self.server.article("<non-existent@example.com>")
932 self.assertEqual(cm.exception.response, "430 No Such Article Found")
933
934 def test_article_file(self):
935 # With a "file" argument
936 f = io.BytesIO()
937 resp, info = self.server.article(file=f)
938 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
939 art_num, message_id, lines = info
940 self.assertEqual(art_num, 3000237)
941 self.assertEqual(message_id, "<45223423@example.com>")
942 self.assertEqual(lines, [])
943 data = f.getvalue()
944 self.assertTrue(data.startswith(
945 b'From: "Demo User" <nobody@example.net>\r\n'
946 b'Subject: I am just a test article\r\n'
947 ), ascii(data))
948 self.assertTrue(data.endswith(
949 b'This is just a test article.\r\n'
950 b'.Here is a dot-starting line.\r\n'
951 b'\r\n'
952 b'-- Signed by Andr\xc3\xa9.\r\n'
953 ), ascii(data))
954
955 def test_head(self):
956 # HEAD
957 resp, info = self.server.head()
958 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
959 art_num, message_id, lines = info
960 self.assertEqual(art_num, 3000237)
961 self.assertEqual(message_id, "<45223423@example.com>")
962 self._check_article_head(lines)
963 # HEAD num
964 resp, info = self.server.head(3000234)
965 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
966 art_num, message_id, lines = info
967 self.assertEqual(art_num, 3000234)
968 self.assertEqual(message_id, "<45223423@example.com>")
969 self._check_article_head(lines)
970 # HEAD id
971 resp, info = self.server.head("<45223423@example.com>")
972 self.assertEqual(resp, "221 0 <45223423@example.com>")
973 art_num, message_id, lines = info
974 self.assertEqual(art_num, 0)
975 self.assertEqual(message_id, "<45223423@example.com>")
976 self._check_article_head(lines)
977 # Non-existent id
978 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
979 self.server.head("<non-existent@example.com>")
980 self.assertEqual(cm.exception.response, "430 No Such Article Found")
981
Antoine Pitrou2640b522012-02-15 18:53:18 +0100982 def test_head_file(self):
983 f = io.BytesIO()
984 resp, info = self.server.head(file=f)
985 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
986 art_num, message_id, lines = info
987 self.assertEqual(art_num, 3000237)
988 self.assertEqual(message_id, "<45223423@example.com>")
989 self.assertEqual(lines, [])
990 data = f.getvalue()
991 self.assertTrue(data.startswith(
992 b'From: "Demo User" <nobody@example.net>\r\n'
993 b'Subject: I am just a test article\r\n'
994 ), ascii(data))
995 self.assertFalse(data.endswith(
996 b'This is just a test article.\r\n'
997 b'.Here is a dot-starting line.\r\n'
998 b'\r\n'
999 b'-- Signed by Andr\xc3\xa9.\r\n'
1000 ), ascii(data))
1001
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001002 def test_body(self):
1003 # BODY
1004 resp, info = self.server.body()
1005 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1006 art_num, message_id, lines = info
1007 self.assertEqual(art_num, 3000237)
1008 self.assertEqual(message_id, "<45223423@example.com>")
1009 self._check_article_body(lines)
1010 # BODY num
1011 resp, info = self.server.body(3000234)
1012 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
1013 art_num, message_id, lines = info
1014 self.assertEqual(art_num, 3000234)
1015 self.assertEqual(message_id, "<45223423@example.com>")
1016 self._check_article_body(lines)
1017 # BODY id
1018 resp, info = self.server.body("<45223423@example.com>")
1019 self.assertEqual(resp, "222 0 <45223423@example.com>")
1020 art_num, message_id, lines = info
1021 self.assertEqual(art_num, 0)
1022 self.assertEqual(message_id, "<45223423@example.com>")
1023 self._check_article_body(lines)
1024 # Non-existent id
1025 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1026 self.server.body("<non-existent@example.com>")
1027 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1028
Antoine Pitrou2640b522012-02-15 18:53:18 +01001029 def test_body_file(self):
1030 f = io.BytesIO()
1031 resp, info = self.server.body(file=f)
1032 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1033 art_num, message_id, lines = info
1034 self.assertEqual(art_num, 3000237)
1035 self.assertEqual(message_id, "<45223423@example.com>")
1036 self.assertEqual(lines, [])
1037 data = f.getvalue()
1038 self.assertFalse(data.startswith(
1039 b'From: "Demo User" <nobody@example.net>\r\n'
1040 b'Subject: I am just a test article\r\n'
1041 ), ascii(data))
1042 self.assertTrue(data.endswith(
1043 b'This is just a test article.\r\n'
1044 b'.Here is a dot-starting line.\r\n'
1045 b'\r\n'
1046 b'-- Signed by Andr\xc3\xa9.\r\n'
1047 ), ascii(data))
1048
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001049 def check_over_xover_resp(self, resp, overviews):
1050 self.assertTrue(resp.startswith("224 "), resp)
1051 self.assertEqual(len(overviews), 3)
1052 art_num, over = overviews[0]
1053 self.assertEqual(art_num, 57)
1054 self.assertEqual(over, {
1055 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
1056 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
1057 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
1058 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
1059 "references": "<hvalf7$ort$1@dough.gmane.org>",
1060 ":bytes": "7103",
1061 ":lines": "16",
1062 "xref": "news.gmane.org gmane.comp.python.authors:57"
1063 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001064 art_num, over = overviews[1]
1065 self.assertEqual(over["xref"], None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001066 art_num, over = overviews[2]
1067 self.assertEqual(over["subject"],
1068 "Re: Message d'erreur incompréhensible (par moi)")
1069
1070 def test_xover(self):
1071 resp, overviews = self.server.xover(57, 59)
1072 self.check_over_xover_resp(resp, overviews)
1073
1074 def test_over(self):
1075 # In NNTP "v1", this will fallback on XOVER
1076 resp, overviews = self.server.over((57, 59))
1077 self.check_over_xover_resp(resp, overviews)
1078
1079 sample_post = (
1080 b'From: "Demo User" <nobody@example.net>\r\n'
1081 b'Subject: I am just a test article\r\n'
1082 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
1083 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
1084 b'\r\n'
1085 b'This is just a test article.\r\n'
1086 b'.Here is a dot-starting line.\r\n'
1087 b'\r\n'
1088 b'-- Signed by Andr\xc3\xa9.\r\n'
1089 )
1090
1091 def _check_posted_body(self):
1092 # Check the raw body as received by the server
1093 lines = self.handler.posted_body
1094 # One additional line for the "." terminator
1095 self.assertEqual(len(lines), 10)
1096 self.assertEqual(lines[-1], b'.\r\n')
1097 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
1098 self.assertEqual(lines[-3], b'\r\n')
1099 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
1100 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
1101
1102 def _check_post_ihave_sub(self, func, *args, file_factory):
1103 # First the prepared post with CRLF endings
1104 post = self.sample_post
1105 func_args = args + (file_factory(post),)
1106 self.handler.posted_body = None
1107 resp = func(*func_args)
1108 self._check_posted_body()
1109 # Then the same post with "normal" line endings - they should be
1110 # converted by NNTP.post and NNTP.ihave.
1111 post = self.sample_post.replace(b"\r\n", b"\n")
1112 func_args = args + (file_factory(post),)
1113 self.handler.posted_body = None
1114 resp = func(*func_args)
1115 self._check_posted_body()
1116 return resp
1117
1118 def check_post_ihave(self, func, success_resp, *args):
1119 # With a bytes object
1120 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1121 self.assertEqual(resp, success_resp)
1122 # With a bytearray object
1123 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1124 self.assertEqual(resp, success_resp)
1125 # With a file object
1126 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1127 self.assertEqual(resp, success_resp)
1128 # With an iterable of terminated lines
1129 def iterlines(b):
1130 return iter(b.splitlines(True))
1131 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1132 self.assertEqual(resp, success_resp)
1133 # With an iterable of non-terminated lines
1134 def iterlines(b):
1135 return iter(b.splitlines(False))
1136 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1137 self.assertEqual(resp, success_resp)
1138
1139 def test_post(self):
1140 self.check_post_ihave(self.server.post, "240 Article received OK")
1141 self.handler.allow_posting = False
1142 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1143 self.server.post(self.sample_post)
1144 self.assertEqual(cm.exception.response,
1145 "440 Posting not permitted")
1146
1147 def test_ihave(self):
1148 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1149 "<i.am.an.article.you.will.want@example.com>")
1150 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1151 self.server.ihave("<another.message.id>", self.sample_post)
1152 self.assertEqual(cm.exception.response,
1153 "435 Article not wanted")
1154
1155
1156class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1157 """Tests an NNTP v1 server (no capabilities)."""
1158
1159 nntp_version = 1
1160 handler_class = NNTPv1Handler
1161
1162 def test_caps(self):
1163 caps = self.server.getcapabilities()
1164 self.assertEqual(caps, {})
1165 self.assertEqual(self.server.nntp_version, 1)
Antoine Pitroua0781152010-11-05 19:16:37 +00001166 self.assertEqual(self.server.nntp_implementation, None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001167
1168
1169class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1170 """Tests an NNTP v2 server (with capabilities)."""
1171
1172 nntp_version = 2
1173 handler_class = NNTPv2Handler
1174
1175 def test_caps(self):
1176 caps = self.server.getcapabilities()
1177 self.assertEqual(caps, {
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001178 'VERSION': ['2', '3'],
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001179 'IMPLEMENTATION': ['INN', '2.5.1'],
1180 'AUTHINFO': ['USER'],
1181 'HDR': [],
1182 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1183 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1184 'OVER': [],
1185 'POST': [],
1186 'READER': [],
1187 })
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001188 self.assertEqual(self.server.nntp_version, 3)
Antoine Pitroua0781152010-11-05 19:16:37 +00001189 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001190
1191
Antoine Pitrou54411c12012-02-12 19:14:17 +01001192class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
1193 """Tests a probably NNTP v2 server with capabilities only after login."""
1194
1195 nntp_version = 2
1196 handler_class = CapsAfterLoginNNTPv2Handler
1197
1198 def test_caps_only_after_login(self):
1199 self.assertEqual(self.server._caps, {})
1200 self.server.login('testuser', 'testpw')
1201 self.assertIn('VERSION', self.server._caps)
1202
1203
Antoine Pitrou71135622012-02-14 23:29:34 +01001204class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin,
1205 unittest.TestCase):
1206 """Same tests as for v2 but we tell NTTP to send MODE READER to a server
1207 that isn't in READER mode by default."""
1208
1209 nntp_version = 2
1210 handler_class = ModeSwitchingNNTPv2Handler
1211
1212 def test_we_are_in_reader_mode_after_connect(self):
1213 self.assertIn('READER', self.server._caps)
1214
1215
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001216class MiscTests(unittest.TestCase):
1217
1218 def test_decode_header(self):
1219 def gives(a, b):
1220 self.assertEqual(nntplib.decode_header(a), b)
1221 gives("" , "")
1222 gives("a plain header", "a plain header")
1223 gives(" with extra spaces ", " with extra spaces ")
1224 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
1225 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1226 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1227 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1228 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1229 "Re: problème de matrice")
1230 # A natively utf-8 header (found in the real world!)
1231 gives("Re: Message d'erreur incompréhensible (par moi)",
1232 "Re: Message d'erreur incompréhensible (par moi)")
1233
1234 def test_parse_overview_fmt(self):
1235 # The minimal (default) response
1236 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1237 "References:", ":bytes", ":lines"]
1238 self.assertEqual(nntplib._parse_overview_fmt(lines),
1239 ["subject", "from", "date", "message-id", "references",
1240 ":bytes", ":lines"])
1241 # The minimal response using alternative names
1242 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1243 "References:", "Bytes:", "Lines:"]
1244 self.assertEqual(nntplib._parse_overview_fmt(lines),
1245 ["subject", "from", "date", "message-id", "references",
1246 ":bytes", ":lines"])
1247 # Variations in casing
1248 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1249 "References:", "BYTES:", "Lines:"]
1250 self.assertEqual(nntplib._parse_overview_fmt(lines),
1251 ["subject", "from", "date", "message-id", "references",
1252 ":bytes", ":lines"])
1253 # First example from RFC 3977
1254 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1255 "References:", ":bytes", ":lines", "Xref:full",
1256 "Distribution:full"]
1257 self.assertEqual(nntplib._parse_overview_fmt(lines),
1258 ["subject", "from", "date", "message-id", "references",
1259 ":bytes", ":lines", "xref", "distribution"])
1260 # Second example from RFC 3977
1261 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1262 "References:", "Bytes:", "Lines:", "Xref:FULL",
1263 "Distribution:FULL"]
1264 self.assertEqual(nntplib._parse_overview_fmt(lines),
1265 ["subject", "from", "date", "message-id", "references",
1266 ":bytes", ":lines", "xref", "distribution"])
1267 # A classic response from INN
1268 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1269 "References:", "Bytes:", "Lines:", "Xref:full"]
1270 self.assertEqual(nntplib._parse_overview_fmt(lines),
1271 ["subject", "from", "date", "message-id", "references",
1272 ":bytes", ":lines", "xref"])
1273
1274 def test_parse_overview(self):
1275 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1276 # First example from RFC 3977
1277 lines = [
1278 '3000234\tI am just a test article\t"Demo User" '
1279 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1280 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1281 '17\tXref: news.example.com misc.test:3000363',
1282 ]
1283 overview = nntplib._parse_overview(lines, fmt)
1284 (art_num, fields), = overview
1285 self.assertEqual(art_num, 3000234)
1286 self.assertEqual(fields, {
1287 'subject': 'I am just a test article',
1288 'from': '"Demo User" <nobody@example.com>',
1289 'date': '6 Oct 1998 04:38:40 -0500',
1290 'message-id': '<45223423@example.com>',
1291 'references': '<45454@example.net>',
1292 ':bytes': '1234',
1293 ':lines': '17',
1294 'xref': 'news.example.com misc.test:3000363',
1295 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001296 # Second example; here the "Xref" field is totally absent (including
1297 # the header name) and comes out as None
1298 lines = [
1299 '3000234\tI am just a test article\t"Demo User" '
1300 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1301 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1302 '17\t\t',
1303 ]
1304 overview = nntplib._parse_overview(lines, fmt)
1305 (art_num, fields), = overview
1306 self.assertEqual(fields['xref'], None)
1307 # Third example; the "Xref" is an empty string, while "references"
1308 # is a single space.
1309 lines = [
1310 '3000234\tI am just a test article\t"Demo User" '
1311 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1312 '<45223423@example.com>\t \t1234\t'
1313 '17\tXref: \t',
1314 ]
1315 overview = nntplib._parse_overview(lines, fmt)
1316 (art_num, fields), = overview
1317 self.assertEqual(fields['references'], ' ')
1318 self.assertEqual(fields['xref'], '')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001319
1320 def test_parse_datetime(self):
1321 def gives(a, b, *c):
1322 self.assertEqual(nntplib._parse_datetime(a, b),
1323 datetime.datetime(*c))
1324 # Output of DATE command
1325 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1326 # Variations
1327 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1328 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1329 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1330
1331 def test_unparse_datetime(self):
1332 # Test non-legacy mode
1333 # 1) with a datetime
1334 def gives(y, M, d, h, m, s, date_str, time_str):
1335 dt = datetime.datetime(y, M, d, h, m, s)
1336 self.assertEqual(nntplib._unparse_datetime(dt),
1337 (date_str, time_str))
1338 self.assertEqual(nntplib._unparse_datetime(dt, False),
1339 (date_str, time_str))
1340 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1341 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1342 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1343 # 2) with a date
1344 def gives(y, M, d, date_str, time_str):
1345 dt = datetime.date(y, M, d)
1346 self.assertEqual(nntplib._unparse_datetime(dt),
1347 (date_str, time_str))
1348 self.assertEqual(nntplib._unparse_datetime(dt, False),
1349 (date_str, time_str))
1350 gives(1999, 6, 23, "19990623", "000000")
1351 gives(2000, 6, 23, "20000623", "000000")
1352 gives(2010, 6, 5, "20100605", "000000")
1353
1354 def test_unparse_datetime_legacy(self):
1355 # Test legacy mode (RFC 977)
1356 # 1) with a datetime
1357 def gives(y, M, d, h, m, s, date_str, time_str):
1358 dt = datetime.datetime(y, M, d, h, m, s)
1359 self.assertEqual(nntplib._unparse_datetime(dt, True),
1360 (date_str, time_str))
1361 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1362 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1363 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1364 # 2) with a date
1365 def gives(y, M, d, date_str, time_str):
1366 dt = datetime.date(y, M, d)
1367 self.assertEqual(nntplib._unparse_datetime(dt, True),
1368 (date_str, time_str))
1369 gives(1999, 6, 23, "990623", "000000")
1370 gives(2000, 6, 23, "000623", "000000")
1371 gives(2010, 6, 5, "100605", "000000")
1372
1373
1374def test_main():
Antoine Pitrou54411c12012-02-12 19:14:17 +01001375 tests = [MiscTests, NNTPv1Tests, NNTPv2Tests, CapsAfterLoginNNTPv2Tests,
Antoine Pitrou71135622012-02-14 23:29:34 +01001376 SendReaderNNTPv2Tests, NetworkedNNTPTests]
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00001377 if _have_ssl:
1378 tests.append(NetworkedNNTP_SSLTests)
1379 support.run_unittest(*tests)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001380
1381
1382if __name__ == "__main__":
1383 test_main()