blob: 179d31ec609912b11a94e6c7073848eb198ee41a [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
2import datetime
3import textwrap
4import unittest
Antoine Pitroude609182010-11-18 17:29:23 +00005import functools
Antoine Pitrou69ab9512010-09-29 15:03:40 +00006import contextlib
7from test import support
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00008from nntplib import NNTP, GroupInfo, _have_ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +00009import nntplib
Antoine Pitrou1cb121e2010-11-09 18:54:37 +000010if _have_ssl:
11 import ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +000012
13TIMEOUT = 30
14
15# TODO:
16# - test the `file` arg to more commands
17# - test error conditions
Antoine Pitroua5785b12010-09-29 16:19:50 +000018# - test auth and `usenetrc`
Antoine Pitrou69ab9512010-09-29 15:03:40 +000019
20
21class NetworkedNNTPTestsMixin:
22
23 def test_welcome(self):
24 welcome = self.server.getwelcome()
25 self.assertEqual(str, type(welcome))
26
27 def test_help(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000028 resp, lines = self.server.help()
Antoine Pitrou69ab9512010-09-29 15:03:40 +000029 self.assertTrue(resp.startswith("100 "), resp)
Antoine Pitrou08eeada2010-11-04 21:36:15 +000030 for line in lines:
Antoine Pitrou69ab9512010-09-29 15:03:40 +000031 self.assertEqual(str, type(line))
32
33 def test_list(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000034 resp, groups = self.server.list()
35 if len(groups) > 0:
36 self.assertEqual(GroupInfo, type(groups[0]))
37 self.assertEqual(str, type(groups[0].group))
38
39 def test_list_active(self):
40 resp, groups = self.server.list(self.GROUP_PAT)
41 if len(groups) > 0:
42 self.assertEqual(GroupInfo, type(groups[0]))
43 self.assertEqual(str, type(groups[0].group))
Antoine Pitrou69ab9512010-09-29 15:03:40 +000044
45 def test_unknown_command(self):
46 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
47 self.server._shortcmd("XYZZY")
48 resp = cm.exception.response
49 self.assertTrue(resp.startswith("500 "), resp)
50
51 def test_newgroups(self):
52 # gmane gets a constant influx of new groups. In order not to stress
53 # the server too much, we choose a recent date in the past.
54 dt = datetime.date.today() - datetime.timedelta(days=7)
55 resp, groups = self.server.newgroups(dt)
56 if len(groups) > 0:
57 self.assertIsInstance(groups[0], GroupInfo)
58 self.assertIsInstance(groups[0].group, str)
59
60 def test_description(self):
61 def _check_desc(desc):
62 # Sanity checks
63 self.assertIsInstance(desc, str)
64 self.assertNotIn(self.GROUP_NAME, desc)
65 desc = self.server.description(self.GROUP_NAME)
66 _check_desc(desc)
67 # Another sanity check
68 self.assertIn("Python", desc)
69 # With a pattern
70 desc = self.server.description(self.GROUP_PAT)
71 _check_desc(desc)
72 # Shouldn't exist
73 desc = self.server.description("zk.brrtt.baz")
74 self.assertEqual(desc, '')
75
76 def test_descriptions(self):
77 resp, descs = self.server.descriptions(self.GROUP_PAT)
78 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
79 self.assertTrue(
80 resp.startswith("215 ") or resp.startswith("282 "), resp)
81 self.assertIsInstance(descs, dict)
82 desc = descs[self.GROUP_NAME]
83 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
84
85 def test_group(self):
86 result = self.server.group(self.GROUP_NAME)
87 self.assertEqual(5, len(result))
88 resp, count, first, last, group = result
89 self.assertEqual(group, self.GROUP_NAME)
90 self.assertIsInstance(count, int)
91 self.assertIsInstance(first, int)
92 self.assertIsInstance(last, int)
93 self.assertLessEqual(first, last)
94 self.assertTrue(resp.startswith("211 "), resp)
95
96 def test_date(self):
97 resp, date = self.server.date()
98 self.assertIsInstance(date, datetime.datetime)
99 # Sanity check
100 self.assertGreaterEqual(date.year, 1995)
101 self.assertLessEqual(date.year, 2030)
102
103 def _check_art_dict(self, art_dict):
104 # Some sanity checks for a field dictionary returned by OVER / XOVER
105 self.assertIsInstance(art_dict, dict)
106 # NNTP has 7 mandatory fields
107 self.assertGreaterEqual(art_dict.keys(),
108 {"subject", "from", "date", "message-id",
109 "references", ":bytes", ":lines"}
110 )
111 for v in art_dict.values():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000112 self.assertIsInstance(v, (str, type(None)))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000113
114 def test_xover(self):
115 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000116 resp, lines = self.server.xover(last - 5, last)
117 if len(lines) == 0:
118 self.skipTest("no articles retrieved")
119 # The 'last' article is not necessarily part of the output (cancelled?)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000120 art_num, art_dict = lines[0]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000121 self.assertGreaterEqual(art_num, last - 5)
122 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000123 self._check_art_dict(art_dict)
124
125 def test_over(self):
126 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
127 start = last - 10
128 # The "start-" article range form
129 resp, lines = self.server.over((start, None))
130 art_num, art_dict = lines[0]
131 self._check_art_dict(art_dict)
132 # The "start-end" article range form
133 resp, lines = self.server.over((start, last))
134 art_num, art_dict = lines[-1]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000135 # The 'last' article is not necessarily part of the output (cancelled?)
136 self.assertGreaterEqual(art_num, start)
137 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000138 self._check_art_dict(art_dict)
139 # XXX The "message_id" form is unsupported by gmane
140 # 503 Overview by message-ID unsupported
141
142 def test_xhdr(self):
143 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
144 resp, lines = self.server.xhdr('subject', last)
145 for line in lines:
146 self.assertEqual(str, type(line[1]))
147
148 def check_article_resp(self, resp, article, art_num=None):
149 self.assertIsInstance(article, nntplib.ArticleInfo)
150 if art_num is not None:
151 self.assertEqual(article.number, art_num)
152 for line in article.lines:
153 self.assertIsInstance(line, bytes)
154 # XXX this could exceptionally happen...
155 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
156
157 def test_article_head_body(self):
158 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000159 # Try to find an available article
160 for art_num in (last, first, last - 1):
161 try:
162 resp, head = self.server.head(art_num)
163 except nntplib.NNTPTemporaryError as e:
164 if not e.response.startswith("423 "):
165 raise
166 # "423 No such article" => choose another one
167 continue
168 break
169 else:
170 self.skipTest("could not find a suitable article number")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000171 self.assertTrue(resp.startswith("221 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000172 self.check_article_resp(resp, head, art_num)
173 resp, body = self.server.body(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000174 self.assertTrue(resp.startswith("222 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000175 self.check_article_resp(resp, body, art_num)
176 resp, article = self.server.article(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000177 self.assertTrue(resp.startswith("220 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000178 self.check_article_resp(resp, article, art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000179 self.assertEqual(article.lines, head.lines + [b''] + body.lines)
180
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000181 def test_capabilities(self):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000182 # The server under test implements NNTP version 2 and has a
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000183 # couple of well-known capabilities. Just sanity check that we
184 # got them.
185 def _check_caps(caps):
186 caps_list = caps['LIST']
187 self.assertIsInstance(caps_list, (list, tuple))
188 self.assertIn('OVERVIEW.FMT', caps_list)
189 self.assertGreaterEqual(self.server.nntp_version, 2)
190 _check_caps(self.server.getcapabilities())
191 # This re-emits the command
192 resp, caps = self.server.capabilities()
193 _check_caps(caps)
194
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000195 if _have_ssl:
196 def test_starttls(self):
197 file = self.server.file
198 sock = self.server.sock
199 try:
200 self.server.starttls()
201 except nntplib.NNTPPermanentError:
202 self.skipTest("STARTTLS not supported by server.")
203 else:
204 # Check that the socket and internal pseudo-file really were
205 # changed.
206 self.assertNotEqual(file, self.server.file)
207 self.assertNotEqual(sock, self.server.sock)
208 # Check that the new socket really is an SSL one
209 self.assertIsInstance(self.server.sock, ssl.SSLSocket)
210 # Check that trying starttls when it's already active fails.
211 self.assertRaises(ValueError, self.server.starttls)
212
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000213 def test_zlogin(self):
214 # This test must be the penultimate because further commands will be
215 # refused.
216 baduser = "notarealuser"
217 badpw = "notarealpassword"
218 # Check that bogus credentials cause failure
219 self.assertRaises(nntplib.NNTPError, self.server.login,
220 user=baduser, password=badpw, usenetrc=False)
221 # FIXME: We should check that correct credentials succeed, but that
222 # would require valid details for some server somewhere to be in the
223 # test suite, I think. Gmane is anonymous, at least as used for the
224 # other tests.
225
226 def test_zzquit(self):
227 # This test must be called last, hence the name
228 cls = type(self)
Antoine Pitrou3bce11c2010-11-21 17:14:19 +0000229 try:
230 self.server.quit()
231 finally:
232 cls.server = None
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000233
Antoine Pitroude609182010-11-18 17:29:23 +0000234 @classmethod
235 def wrap_methods(cls):
236 # Wrap all methods in a transient_internet() exception catcher
237 # XXX put a generic version in test.support?
238 def wrap_meth(meth):
239 @functools.wraps(meth)
240 def wrapped(self):
241 with support.transient_internet(self.NNTP_HOST):
242 meth(self)
243 return wrapped
244 for name in dir(cls):
245 if not name.startswith('test_'):
246 continue
247 meth = getattr(cls, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200248 if not callable(meth):
Antoine Pitroude609182010-11-18 17:29:23 +0000249 continue
250 # Need to use a closure so that meth remains bound to its current
251 # value
252 setattr(cls, name, wrap_meth(meth))
253
254NetworkedNNTPTestsMixin.wrap_methods()
255
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000256
257class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
258 # This server supports STARTTLS (gmane doesn't)
259 NNTP_HOST = 'news.trigofacile.com'
260 GROUP_NAME = 'fr.comp.lang.python'
261 GROUP_PAT = 'fr.comp.lang.*'
262
Antoine Pitroude609182010-11-18 17:29:23 +0000263 NNTP_CLASS = NNTP
264
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000265 @classmethod
266 def setUpClass(cls):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000267 support.requires("network")
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000268 with support.transient_internet(cls.NNTP_HOST):
Antoine Pitroude609182010-11-18 17:29:23 +0000269 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, timeout=TIMEOUT, usenetrc=False)
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000270
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000271 @classmethod
272 def tearDownClass(cls):
273 if cls.server is not None:
274 cls.server.quit()
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000275
276
277if _have_ssl:
Antoine Pitroude609182010-11-18 17:29:23 +0000278 class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000279
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000280 # Technical limits for this public NNTP server (see http://www.aioe.org):
281 # "Only two concurrent connections per IP address are allowed and
282 # 400 connections per day are accepted from each IP address."
283
284 NNTP_HOST = 'nntp.aioe.org'
285 GROUP_NAME = 'comp.lang.python'
286 GROUP_PAT = 'comp.lang.*'
287
Antoine Pitroude609182010-11-18 17:29:23 +0000288 NNTP_CLASS = nntplib.NNTP_SSL
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000289
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000290 # Disabled as it produces too much data
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000291 test_list = None
292
293 # Disabled as the connection will already be encrypted.
294 test_starttls = None
295
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000296
297#
298# Non-networked tests using a local server (or something mocking it).
299#
300
301class _NNTPServerIO(io.RawIOBase):
302 """A raw IO object allowing NNTP commands to be received and processed
303 by a handler. The handler can push responses which can then be read
304 from the IO object."""
305
306 def __init__(self, handler):
307 io.RawIOBase.__init__(self)
308 # The channel from the client
309 self.c2s = io.BytesIO()
310 # The channel to the client
311 self.s2c = io.BytesIO()
312 self.handler = handler
313 self.handler.start(self.c2s.readline, self.push_data)
314
315 def readable(self):
316 return True
317
318 def writable(self):
319 return True
320
321 def push_data(self, data):
322 """Push (buffer) some data to send to the client."""
323 pos = self.s2c.tell()
324 self.s2c.seek(0, 2)
325 self.s2c.write(data)
326 self.s2c.seek(pos)
327
328 def write(self, b):
329 """The client sends us some data"""
330 pos = self.c2s.tell()
331 self.c2s.write(b)
332 self.c2s.seek(pos)
333 self.handler.process_pending()
334 return len(b)
335
336 def readinto(self, buf):
337 """The client wants to read a response"""
338 self.handler.process_pending()
339 b = self.s2c.read(len(buf))
340 n = len(b)
341 buf[:n] = b
342 return n
343
344
345class MockedNNTPTestsMixin:
346 # Override in derived classes
347 handler_class = None
348
349 def setUp(self):
350 super().setUp()
351 self.make_server()
352
353 def tearDown(self):
354 super().tearDown()
355 del self.server
356
357 def make_server(self, *args, **kwargs):
358 self.handler = self.handler_class()
359 self.sio = _NNTPServerIO(self.handler)
360 # Using BufferedRWPair instead of BufferedRandom ensures the file
361 # isn't seekable.
362 file = io.BufferedRWPair(self.sio, self.sio)
Antoine Pitroua5785b12010-09-29 16:19:50 +0000363 self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000364 return self.server
365
366
Antoine Pitrou71135622012-02-14 23:29:34 +0100367class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin):
368 def setUp(self):
369 super().setUp()
370 self.make_server(readermode=True)
371
372
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000373class NNTPv1Handler:
374 """A handler for RFC 977"""
375
376 welcome = "200 NNTP mock server"
377
378 def start(self, readline, push_data):
379 self.in_body = False
380 self.allow_posting = True
381 self._readline = readline
382 self._push_data = push_data
Antoine Pitrou54411c12012-02-12 19:14:17 +0100383 self._logged_in = False
384 self._user_sent = False
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000385 # Our welcome
386 self.handle_welcome()
387
388 def _decode(self, data):
389 return str(data, "utf-8", "surrogateescape")
390
391 def process_pending(self):
392 if self.in_body:
393 while True:
394 line = self._readline()
395 if not line:
396 return
397 self.body.append(line)
398 if line == b".\r\n":
399 break
400 try:
401 meth, tokens = self.body_callback
402 meth(*tokens, body=self.body)
403 finally:
404 self.body_callback = None
405 self.body = None
406 self.in_body = False
407 while True:
408 line = self._decode(self._readline())
409 if not line:
410 return
411 if not line.endswith("\r\n"):
412 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
413 line = line[:-2]
414 cmd, *tokens = line.split()
415 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
416 meth = getattr(self, "handle_" + cmd.upper(), None)
417 if meth is None:
418 self.handle_unknown()
419 else:
420 try:
421 meth(*tokens)
422 except Exception as e:
423 raise ValueError("command failed: {!r}".format(line)) from e
424 else:
425 if self.in_body:
426 self.body_callback = meth, tokens
427 self.body = []
428
429 def expect_body(self):
430 """Flag that the client is expected to post a request body"""
431 self.in_body = True
432
433 def push_data(self, data):
434 """Push some binary data"""
435 self._push_data(data)
436
437 def push_lit(self, lit):
438 """Push a string literal"""
439 lit = textwrap.dedent(lit)
440 lit = "\r\n".join(lit.splitlines()) + "\r\n"
441 lit = lit.encode('utf-8')
442 self.push_data(lit)
443
444 def handle_unknown(self):
445 self.push_lit("500 What?")
446
447 def handle_welcome(self):
448 self.push_lit(self.welcome)
449
450 def handle_QUIT(self):
451 self.push_lit("205 Bye!")
452
453 def handle_DATE(self):
454 self.push_lit("111 20100914001155")
455
456 def handle_GROUP(self, group):
457 if group == "fr.comp.lang.python":
458 self.push_lit("211 486 761 1265 fr.comp.lang.python")
459 else:
460 self.push_lit("411 No such group {}".format(group))
461
462 def handle_HELP(self):
463 self.push_lit("""\
464 100 Legal commands
465 authinfo user Name|pass Password|generic <prog> <args>
466 date
467 help
468 Report problems to <root@example.org>
469 .""")
470
471 def handle_STAT(self, message_spec=None):
472 if message_spec is None:
473 self.push_lit("412 No newsgroup selected")
474 elif message_spec == "3000234":
475 self.push_lit("223 3000234 <45223423@example.com>")
476 elif message_spec == "<45223423@example.com>":
477 self.push_lit("223 0 <45223423@example.com>")
478 else:
479 self.push_lit("430 No Such Article Found")
480
481 def handle_NEXT(self):
482 self.push_lit("223 3000237 <668929@example.org> retrieved")
483
484 def handle_LAST(self):
485 self.push_lit("223 3000234 <45223423@example.com> retrieved")
486
487 def handle_LIST(self, action=None, param=None):
488 if action is None:
489 self.push_lit("""\
490 215 Newsgroups in form "group high low flags".
491 comp.lang.python 0000052340 0000002828 y
492 comp.lang.python.announce 0000001153 0000000993 m
493 free.it.comp.lang.python 0000000002 0000000002 y
494 fr.comp.lang.python 0000001254 0000000760 y
495 free.it.comp.lang.python.learner 0000000000 0000000001 y
496 tw.bbs.comp.lang.python 0000000304 0000000304 y
497 .""")
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000498 elif action == "ACTIVE":
499 if param == "*distutils*":
500 self.push_lit("""\
501 215 Newsgroups in form "group high low flags"
502 gmane.comp.python.distutils.devel 0000014104 0000000001 m
503 gmane.comp.python.distutils.cvs 0000000000 0000000001 m
504 .""")
505 else:
506 self.push_lit("""\
507 215 Newsgroups in form "group high low flags"
508 .""")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000509 elif action == "OVERVIEW.FMT":
510 self.push_lit("""\
511 215 Order of fields in overview database.
512 Subject:
513 From:
514 Date:
515 Message-ID:
516 References:
517 Bytes:
518 Lines:
519 Xref:full
520 .""")
521 elif action == "NEWSGROUPS":
522 assert param is not None
523 if param == "comp.lang.python":
524 self.push_lit("""\
525 215 Descriptions in form "group description".
526 comp.lang.python\tThe Python computer language.
527 .""")
528 elif param == "comp.lang.python*":
529 self.push_lit("""\
530 215 Descriptions in form "group description".
531 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
532 comp.lang.python\tThe Python computer language.
533 .""")
534 else:
535 self.push_lit("""\
536 215 Descriptions in form "group description".
537 .""")
538 else:
539 self.push_lit('501 Unknown LIST keyword')
540
541 def handle_NEWNEWS(self, group, date_str, time_str):
542 # We hard code different return messages depending on passed
543 # argument and date syntax.
544 if (group == "comp.lang.python" and date_str == "20100913"
545 and time_str == "082004"):
546 # Date was passed in RFC 3977 format (NNTP "v2")
547 self.push_lit("""\
548 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
549 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
550 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
551 .""")
552 elif (group == "comp.lang.python" and date_str == "100913"
553 and time_str == "082004"):
554 # Date was passed in RFC 977 format (NNTP "v1")
555 self.push_lit("""\
556 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
557 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
558 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
559 .""")
560 else:
561 self.push_lit("""\
562 230 An empty list of newsarticles follows
563 .""")
564 # (Note for experiments: many servers disable NEWNEWS.
565 # As of this writing, sicinfo3.epfl.ch doesn't.)
566
567 def handle_XOVER(self, message_spec):
568 if message_spec == "57-59":
569 self.push_lit(
570 "224 Overview information for 57-58 follows\n"
571 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
572 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
573 "\tSat, 19 Jun 2010 18:04:08 -0400"
574 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
575 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
576 "\tXref: news.gmane.org gmane.comp.python.authors:57"
577 "\n"
578 "58\tLooking for a few good bloggers"
579 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
580 "\tThu, 22 Jul 2010 09:14:14 -0400"
581 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
582 "\t\t6683\t16"
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000583 "\t"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000584 "\n"
585 # An UTF-8 overview line from fr.comp.lang.python
586 "59\tRe: Message d'erreur incompréhensible (par moi)"
587 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
588 "\tWed, 15 Sep 2010 18:09:15 +0200"
589 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
590 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
591 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
592 "\n"
593 ".\n")
594 else:
595 self.push_lit("""\
596 224 No articles
597 .""")
598
599 def handle_POST(self, *, body=None):
600 if body is None:
601 if self.allow_posting:
602 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
603 self.expect_body()
604 else:
605 self.push_lit("440 Posting not permitted")
606 else:
607 assert self.allow_posting
608 self.push_lit("240 Article received OK")
609 self.posted_body = body
610
611 def handle_IHAVE(self, message_id, *, body=None):
612 if body is None:
613 if (self.allow_posting and
614 message_id == "<i.am.an.article.you.will.want@example.com>"):
615 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
616 self.expect_body()
617 else:
618 self.push_lit("435 Article not wanted")
619 else:
620 assert self.allow_posting
621 self.push_lit("235 Article transferred OK")
622 self.posted_body = body
623
624 sample_head = """\
625 From: "Demo User" <nobody@example.net>
626 Subject: I am just a test article
627 Content-Type: text/plain; charset=UTF-8; format=flowed
628 Message-ID: <i.am.an.article.you.will.want@example.com>"""
629
630 sample_body = """\
631 This is just a test article.
632 ..Here is a dot-starting line.
633
634 -- Signed by Andr\xe9."""
635
636 sample_article = sample_head + "\n\n" + sample_body
637
638 def handle_ARTICLE(self, message_spec=None):
639 if message_spec is None:
640 self.push_lit("220 3000237 <45223423@example.com>")
641 elif message_spec == "<45223423@example.com>":
642 self.push_lit("220 0 <45223423@example.com>")
643 elif message_spec == "3000234":
644 self.push_lit("220 3000234 <45223423@example.com>")
645 else:
646 self.push_lit("430 No Such Article Found")
647 return
648 self.push_lit(self.sample_article)
649 self.push_lit(".")
650
651 def handle_HEAD(self, message_spec=None):
652 if message_spec is None:
653 self.push_lit("221 3000237 <45223423@example.com>")
654 elif message_spec == "<45223423@example.com>":
655 self.push_lit("221 0 <45223423@example.com>")
656 elif message_spec == "3000234":
657 self.push_lit("221 3000234 <45223423@example.com>")
658 else:
659 self.push_lit("430 No Such Article Found")
660 return
661 self.push_lit(self.sample_head)
662 self.push_lit(".")
663
664 def handle_BODY(self, message_spec=None):
665 if message_spec is None:
666 self.push_lit("222 3000237 <45223423@example.com>")
667 elif message_spec == "<45223423@example.com>":
668 self.push_lit("222 0 <45223423@example.com>")
669 elif message_spec == "3000234":
670 self.push_lit("222 3000234 <45223423@example.com>")
671 else:
672 self.push_lit("430 No Such Article Found")
673 return
674 self.push_lit(self.sample_body)
675 self.push_lit(".")
676
Antoine Pitrou54411c12012-02-12 19:14:17 +0100677 def handle_AUTHINFO(self, cred_type, data):
678 if self._logged_in:
679 self.push_lit('502 Already Logged In')
680 elif cred_type == 'user':
681 if self._user_sent:
682 self.push_lit('482 User Credential Already Sent')
683 else:
684 self.push_lit('381 Password Required')
685 self._user_sent = True
686 elif cred_type == 'pass':
687 self.push_lit('281 Login Successful')
688 self._logged_in = True
689 else:
690 raise Exception('Unknown cred type {}'.format(cred_type))
691
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000692
693class NNTPv2Handler(NNTPv1Handler):
694 """A handler for RFC 3977 (NNTP "v2")"""
695
696 def handle_CAPABILITIES(self):
Antoine Pitrou54411c12012-02-12 19:14:17 +0100697 fmt = """\
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000698 101 Capability list:
Antoine Pitrouf80b3f72010-11-02 22:31:52 +0000699 VERSION 2 3
Antoine Pitrou54411c12012-02-12 19:14:17 +0100700 IMPLEMENTATION INN 2.5.1{}
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000701 HDR
702 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
703 OVER
704 POST
705 READER
Antoine Pitrou54411c12012-02-12 19:14:17 +0100706 ."""
707
708 if not self._logged_in:
709 self.push_lit(fmt.format('\n AUTHINFO USER'))
710 else:
711 self.push_lit(fmt.format(''))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000712
Antoine Pitrou71135622012-02-14 23:29:34 +0100713 def handle_MODE(self, _):
714 raise Exception('MODE READER sent despite READER has been advertised')
715
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000716 def handle_OVER(self, message_spec=None):
717 return self.handle_XOVER(message_spec)
718
719
Antoine Pitrou54411c12012-02-12 19:14:17 +0100720class CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
721 """A handler that allows CAPABILITIES only after login"""
722
723 def handle_CAPABILITIES(self):
724 if not self._logged_in:
725 self.push_lit('480 You must log in.')
726 else:
727 super().handle_CAPABILITIES()
728
729
Antoine Pitrou71135622012-02-14 23:29:34 +0100730class ModeSwitchingNNTPv2Handler(NNTPv2Handler):
731 """A server that starts in transit mode"""
732
733 def __init__(self):
734 self._switched = False
735
736 def handle_CAPABILITIES(self):
737 fmt = """\
738 101 Capability list:
739 VERSION 2 3
740 IMPLEMENTATION INN 2.5.1
741 HDR
742 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
743 OVER
744 POST
745 {}READER
746 ."""
747 if self._switched:
748 self.push_lit(fmt.format(''))
749 else:
750 self.push_lit(fmt.format('MODE-'))
751
752 def handle_MODE(self, what):
753 assert not self._switched and what == 'reader'
754 self._switched = True
755 self.push_lit('200 Posting allowed')
756
757
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000758class NNTPv1v2TestsMixin:
759
760 def setUp(self):
761 super().setUp()
762
763 def test_welcome(self):
764 self.assertEqual(self.server.welcome, self.handler.welcome)
765
Antoine Pitrou54411c12012-02-12 19:14:17 +0100766 def test_authinfo(self):
767 if self.nntp_version == 2:
768 self.assertIn('AUTHINFO', self.server._caps)
769 self.server.login('testuser', 'testpw')
770 # if AUTHINFO is gone from _caps we also know that getcapabilities()
771 # has been called after login as it should
772 self.assertNotIn('AUTHINFO', self.server._caps)
773
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000774 def test_date(self):
775 resp, date = self.server.date()
776 self.assertEqual(resp, "111 20100914001155")
777 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
778
779 def test_quit(self):
780 self.assertFalse(self.sio.closed)
781 resp = self.server.quit()
782 self.assertEqual(resp, "205 Bye!")
783 self.assertTrue(self.sio.closed)
784
785 def test_help(self):
786 resp, help = self.server.help()
787 self.assertEqual(resp, "100 Legal commands")
788 self.assertEqual(help, [
789 ' authinfo user Name|pass Password|generic <prog> <args>',
790 ' date',
791 ' help',
792 'Report problems to <root@example.org>',
793 ])
794
795 def test_list(self):
796 resp, groups = self.server.list()
797 self.assertEqual(len(groups), 6)
798 g = groups[1]
799 self.assertEqual(g,
800 GroupInfo("comp.lang.python.announce", "0000001153",
801 "0000000993", "m"))
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000802 resp, groups = self.server.list("*distutils*")
803 self.assertEqual(len(groups), 2)
804 g = groups[0]
805 self.assertEqual(g,
806 GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
807 "0000000001", "m"))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000808
809 def test_stat(self):
810 resp, art_num, message_id = self.server.stat(3000234)
811 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
812 self.assertEqual(art_num, 3000234)
813 self.assertEqual(message_id, "<45223423@example.com>")
814 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
815 self.assertEqual(resp, "223 0 <45223423@example.com>")
816 self.assertEqual(art_num, 0)
817 self.assertEqual(message_id, "<45223423@example.com>")
818 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
819 self.server.stat("<non.existent.id>")
820 self.assertEqual(cm.exception.response, "430 No Such Article Found")
821 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
822 self.server.stat()
823 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
824
825 def test_next(self):
826 resp, art_num, message_id = self.server.next()
827 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
828 self.assertEqual(art_num, 3000237)
829 self.assertEqual(message_id, "<668929@example.org>")
830
831 def test_last(self):
832 resp, art_num, message_id = self.server.last()
833 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
834 self.assertEqual(art_num, 3000234)
835 self.assertEqual(message_id, "<45223423@example.com>")
836
837 def test_description(self):
838 desc = self.server.description("comp.lang.python")
839 self.assertEqual(desc, "The Python computer language.")
840 desc = self.server.description("comp.lang.pythonx")
841 self.assertEqual(desc, "")
842
843 def test_descriptions(self):
844 resp, groups = self.server.descriptions("comp.lang.python")
845 self.assertEqual(resp, '215 Descriptions in form "group description".')
846 self.assertEqual(groups, {
847 "comp.lang.python": "The Python computer language.",
848 })
849 resp, groups = self.server.descriptions("comp.lang.python*")
850 self.assertEqual(groups, {
851 "comp.lang.python": "The Python computer language.",
852 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
853 })
854 resp, groups = self.server.descriptions("comp.lang.pythonx")
855 self.assertEqual(groups, {})
856
857 def test_group(self):
858 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
859 self.assertTrue(resp.startswith("211 "), resp)
860 self.assertEqual(first, 761)
861 self.assertEqual(last, 1265)
862 self.assertEqual(count, 486)
863 self.assertEqual(group, "fr.comp.lang.python")
864 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
865 self.server.group("comp.lang.python.devel")
866 exc = cm.exception
867 self.assertTrue(exc.response.startswith("411 No such group"),
868 exc.response)
869
870 def test_newnews(self):
871 # NEWNEWS comp.lang.python [20]100913 082004
872 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
873 resp, ids = self.server.newnews("comp.lang.python", dt)
874 expected = (
875 "230 list of newsarticles (NNTP v{0}) "
876 "created after Mon Sep 13 08:20:04 2010 follows"
877 ).format(self.nntp_version)
878 self.assertEqual(resp, expected)
879 self.assertEqual(ids, [
880 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
881 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
882 ])
883 # NEWNEWS fr.comp.lang.python [20]100913 082004
884 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
885 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
886 self.assertEqual(resp, "230 An empty list of newsarticles follows")
887 self.assertEqual(ids, [])
888
889 def _check_article_body(self, lines):
890 self.assertEqual(len(lines), 4)
891 self.assertEqual(lines[-1].decode('utf8'), "-- Signed by André.")
892 self.assertEqual(lines[-2], b"")
893 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
894 self.assertEqual(lines[-4], b"This is just a test article.")
895
896 def _check_article_head(self, lines):
897 self.assertEqual(len(lines), 4)
898 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
899 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
900
901 def _check_article_data(self, lines):
902 self.assertEqual(len(lines), 9)
903 self._check_article_head(lines[:4])
904 self._check_article_body(lines[-4:])
905 self.assertEqual(lines[4], b"")
906
907 def test_article(self):
908 # ARTICLE
909 resp, info = self.server.article()
910 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
911 art_num, message_id, lines = info
912 self.assertEqual(art_num, 3000237)
913 self.assertEqual(message_id, "<45223423@example.com>")
914 self._check_article_data(lines)
915 # ARTICLE num
916 resp, info = self.server.article(3000234)
917 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
918 art_num, message_id, lines = info
919 self.assertEqual(art_num, 3000234)
920 self.assertEqual(message_id, "<45223423@example.com>")
921 self._check_article_data(lines)
922 # ARTICLE id
923 resp, info = self.server.article("<45223423@example.com>")
924 self.assertEqual(resp, "220 0 <45223423@example.com>")
925 art_num, message_id, lines = info
926 self.assertEqual(art_num, 0)
927 self.assertEqual(message_id, "<45223423@example.com>")
928 self._check_article_data(lines)
929 # Non-existent id
930 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
931 self.server.article("<non-existent@example.com>")
932 self.assertEqual(cm.exception.response, "430 No Such Article Found")
933
934 def test_article_file(self):
935 # With a "file" argument
936 f = io.BytesIO()
937 resp, info = self.server.article(file=f)
938 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
939 art_num, message_id, lines = info
940 self.assertEqual(art_num, 3000237)
941 self.assertEqual(message_id, "<45223423@example.com>")
942 self.assertEqual(lines, [])
943 data = f.getvalue()
944 self.assertTrue(data.startswith(
945 b'From: "Demo User" <nobody@example.net>\r\n'
946 b'Subject: I am just a test article\r\n'
947 ), ascii(data))
948 self.assertTrue(data.endswith(
949 b'This is just a test article.\r\n'
950 b'.Here is a dot-starting line.\r\n'
951 b'\r\n'
952 b'-- Signed by Andr\xc3\xa9.\r\n'
953 ), ascii(data))
954
955 def test_head(self):
956 # HEAD
957 resp, info = self.server.head()
958 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
959 art_num, message_id, lines = info
960 self.assertEqual(art_num, 3000237)
961 self.assertEqual(message_id, "<45223423@example.com>")
962 self._check_article_head(lines)
963 # HEAD num
964 resp, info = self.server.head(3000234)
965 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
966 art_num, message_id, lines = info
967 self.assertEqual(art_num, 3000234)
968 self.assertEqual(message_id, "<45223423@example.com>")
969 self._check_article_head(lines)
970 # HEAD id
971 resp, info = self.server.head("<45223423@example.com>")
972 self.assertEqual(resp, "221 0 <45223423@example.com>")
973 art_num, message_id, lines = info
974 self.assertEqual(art_num, 0)
975 self.assertEqual(message_id, "<45223423@example.com>")
976 self._check_article_head(lines)
977 # Non-existent id
978 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
979 self.server.head("<non-existent@example.com>")
980 self.assertEqual(cm.exception.response, "430 No Such Article Found")
981
982 def test_body(self):
983 # BODY
984 resp, info = self.server.body()
985 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
986 art_num, message_id, lines = info
987 self.assertEqual(art_num, 3000237)
988 self.assertEqual(message_id, "<45223423@example.com>")
989 self._check_article_body(lines)
990 # BODY num
991 resp, info = self.server.body(3000234)
992 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
993 art_num, message_id, lines = info
994 self.assertEqual(art_num, 3000234)
995 self.assertEqual(message_id, "<45223423@example.com>")
996 self._check_article_body(lines)
997 # BODY id
998 resp, info = self.server.body("<45223423@example.com>")
999 self.assertEqual(resp, "222 0 <45223423@example.com>")
1000 art_num, message_id, lines = info
1001 self.assertEqual(art_num, 0)
1002 self.assertEqual(message_id, "<45223423@example.com>")
1003 self._check_article_body(lines)
1004 # Non-existent id
1005 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1006 self.server.body("<non-existent@example.com>")
1007 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1008
1009 def check_over_xover_resp(self, resp, overviews):
1010 self.assertTrue(resp.startswith("224 "), resp)
1011 self.assertEqual(len(overviews), 3)
1012 art_num, over = overviews[0]
1013 self.assertEqual(art_num, 57)
1014 self.assertEqual(over, {
1015 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
1016 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
1017 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
1018 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
1019 "references": "<hvalf7$ort$1@dough.gmane.org>",
1020 ":bytes": "7103",
1021 ":lines": "16",
1022 "xref": "news.gmane.org gmane.comp.python.authors:57"
1023 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001024 art_num, over = overviews[1]
1025 self.assertEqual(over["xref"], None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001026 art_num, over = overviews[2]
1027 self.assertEqual(over["subject"],
1028 "Re: Message d'erreur incompréhensible (par moi)")
1029
1030 def test_xover(self):
1031 resp, overviews = self.server.xover(57, 59)
1032 self.check_over_xover_resp(resp, overviews)
1033
1034 def test_over(self):
1035 # In NNTP "v1", this will fallback on XOVER
1036 resp, overviews = self.server.over((57, 59))
1037 self.check_over_xover_resp(resp, overviews)
1038
1039 sample_post = (
1040 b'From: "Demo User" <nobody@example.net>\r\n'
1041 b'Subject: I am just a test article\r\n'
1042 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
1043 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
1044 b'\r\n'
1045 b'This is just a test article.\r\n'
1046 b'.Here is a dot-starting line.\r\n'
1047 b'\r\n'
1048 b'-- Signed by Andr\xc3\xa9.\r\n'
1049 )
1050
1051 def _check_posted_body(self):
1052 # Check the raw body as received by the server
1053 lines = self.handler.posted_body
1054 # One additional line for the "." terminator
1055 self.assertEqual(len(lines), 10)
1056 self.assertEqual(lines[-1], b'.\r\n')
1057 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
1058 self.assertEqual(lines[-3], b'\r\n')
1059 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
1060 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
1061
1062 def _check_post_ihave_sub(self, func, *args, file_factory):
1063 # First the prepared post with CRLF endings
1064 post = self.sample_post
1065 func_args = args + (file_factory(post),)
1066 self.handler.posted_body = None
1067 resp = func(*func_args)
1068 self._check_posted_body()
1069 # Then the same post with "normal" line endings - they should be
1070 # converted by NNTP.post and NNTP.ihave.
1071 post = self.sample_post.replace(b"\r\n", b"\n")
1072 func_args = args + (file_factory(post),)
1073 self.handler.posted_body = None
1074 resp = func(*func_args)
1075 self._check_posted_body()
1076 return resp
1077
1078 def check_post_ihave(self, func, success_resp, *args):
1079 # With a bytes object
1080 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1081 self.assertEqual(resp, success_resp)
1082 # With a bytearray object
1083 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1084 self.assertEqual(resp, success_resp)
1085 # With a file object
1086 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1087 self.assertEqual(resp, success_resp)
1088 # With an iterable of terminated lines
1089 def iterlines(b):
1090 return iter(b.splitlines(True))
1091 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1092 self.assertEqual(resp, success_resp)
1093 # With an iterable of non-terminated lines
1094 def iterlines(b):
1095 return iter(b.splitlines(False))
1096 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1097 self.assertEqual(resp, success_resp)
1098
1099 def test_post(self):
1100 self.check_post_ihave(self.server.post, "240 Article received OK")
1101 self.handler.allow_posting = False
1102 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1103 self.server.post(self.sample_post)
1104 self.assertEqual(cm.exception.response,
1105 "440 Posting not permitted")
1106
1107 def test_ihave(self):
1108 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1109 "<i.am.an.article.you.will.want@example.com>")
1110 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1111 self.server.ihave("<another.message.id>", self.sample_post)
1112 self.assertEqual(cm.exception.response,
1113 "435 Article not wanted")
1114
1115
1116class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1117 """Tests an NNTP v1 server (no capabilities)."""
1118
1119 nntp_version = 1
1120 handler_class = NNTPv1Handler
1121
1122 def test_caps(self):
1123 caps = self.server.getcapabilities()
1124 self.assertEqual(caps, {})
1125 self.assertEqual(self.server.nntp_version, 1)
Antoine Pitroua0781152010-11-05 19:16:37 +00001126 self.assertEqual(self.server.nntp_implementation, None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001127
1128
1129class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1130 """Tests an NNTP v2 server (with capabilities)."""
1131
1132 nntp_version = 2
1133 handler_class = NNTPv2Handler
1134
1135 def test_caps(self):
1136 caps = self.server.getcapabilities()
1137 self.assertEqual(caps, {
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001138 'VERSION': ['2', '3'],
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001139 'IMPLEMENTATION': ['INN', '2.5.1'],
1140 'AUTHINFO': ['USER'],
1141 'HDR': [],
1142 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1143 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1144 'OVER': [],
1145 'POST': [],
1146 'READER': [],
1147 })
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001148 self.assertEqual(self.server.nntp_version, 3)
Antoine Pitroua0781152010-11-05 19:16:37 +00001149 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001150
1151
Antoine Pitrou54411c12012-02-12 19:14:17 +01001152class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
1153 """Tests a probably NNTP v2 server with capabilities only after login."""
1154
1155 nntp_version = 2
1156 handler_class = CapsAfterLoginNNTPv2Handler
1157
1158 def test_caps_only_after_login(self):
1159 self.assertEqual(self.server._caps, {})
1160 self.server.login('testuser', 'testpw')
1161 self.assertIn('VERSION', self.server._caps)
1162
1163
Antoine Pitrou71135622012-02-14 23:29:34 +01001164class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin,
1165 unittest.TestCase):
1166 """Same tests as for v2 but we tell NTTP to send MODE READER to a server
1167 that isn't in READER mode by default."""
1168
1169 nntp_version = 2
1170 handler_class = ModeSwitchingNNTPv2Handler
1171
1172 def test_we_are_in_reader_mode_after_connect(self):
1173 self.assertIn('READER', self.server._caps)
1174
1175
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001176class MiscTests(unittest.TestCase):
1177
1178 def test_decode_header(self):
1179 def gives(a, b):
1180 self.assertEqual(nntplib.decode_header(a), b)
1181 gives("" , "")
1182 gives("a plain header", "a plain header")
1183 gives(" with extra spaces ", " with extra spaces ")
1184 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
1185 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1186 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1187 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1188 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1189 "Re: problème de matrice")
1190 # A natively utf-8 header (found in the real world!)
1191 gives("Re: Message d'erreur incompréhensible (par moi)",
1192 "Re: Message d'erreur incompréhensible (par moi)")
1193
1194 def test_parse_overview_fmt(self):
1195 # The minimal (default) response
1196 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1197 "References:", ":bytes", ":lines"]
1198 self.assertEqual(nntplib._parse_overview_fmt(lines),
1199 ["subject", "from", "date", "message-id", "references",
1200 ":bytes", ":lines"])
1201 # The minimal response using alternative names
1202 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1203 "References:", "Bytes:", "Lines:"]
1204 self.assertEqual(nntplib._parse_overview_fmt(lines),
1205 ["subject", "from", "date", "message-id", "references",
1206 ":bytes", ":lines"])
1207 # Variations in casing
1208 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1209 "References:", "BYTES:", "Lines:"]
1210 self.assertEqual(nntplib._parse_overview_fmt(lines),
1211 ["subject", "from", "date", "message-id", "references",
1212 ":bytes", ":lines"])
1213 # First example from RFC 3977
1214 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1215 "References:", ":bytes", ":lines", "Xref:full",
1216 "Distribution:full"]
1217 self.assertEqual(nntplib._parse_overview_fmt(lines),
1218 ["subject", "from", "date", "message-id", "references",
1219 ":bytes", ":lines", "xref", "distribution"])
1220 # Second example from RFC 3977
1221 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1222 "References:", "Bytes:", "Lines:", "Xref:FULL",
1223 "Distribution:FULL"]
1224 self.assertEqual(nntplib._parse_overview_fmt(lines),
1225 ["subject", "from", "date", "message-id", "references",
1226 ":bytes", ":lines", "xref", "distribution"])
1227 # A classic response from INN
1228 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1229 "References:", "Bytes:", "Lines:", "Xref:full"]
1230 self.assertEqual(nntplib._parse_overview_fmt(lines),
1231 ["subject", "from", "date", "message-id", "references",
1232 ":bytes", ":lines", "xref"])
1233
1234 def test_parse_overview(self):
1235 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1236 # First example from RFC 3977
1237 lines = [
1238 '3000234\tI am just a test article\t"Demo User" '
1239 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1240 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1241 '17\tXref: news.example.com misc.test:3000363',
1242 ]
1243 overview = nntplib._parse_overview(lines, fmt)
1244 (art_num, fields), = overview
1245 self.assertEqual(art_num, 3000234)
1246 self.assertEqual(fields, {
1247 'subject': 'I am just a test article',
1248 'from': '"Demo User" <nobody@example.com>',
1249 'date': '6 Oct 1998 04:38:40 -0500',
1250 'message-id': '<45223423@example.com>',
1251 'references': '<45454@example.net>',
1252 ':bytes': '1234',
1253 ':lines': '17',
1254 'xref': 'news.example.com misc.test:3000363',
1255 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001256 # Second example; here the "Xref" field is totally absent (including
1257 # the header name) and comes out as None
1258 lines = [
1259 '3000234\tI am just a test article\t"Demo User" '
1260 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1261 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1262 '17\t\t',
1263 ]
1264 overview = nntplib._parse_overview(lines, fmt)
1265 (art_num, fields), = overview
1266 self.assertEqual(fields['xref'], None)
1267 # Third example; the "Xref" is an empty string, while "references"
1268 # is a single space.
1269 lines = [
1270 '3000234\tI am just a test article\t"Demo User" '
1271 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1272 '<45223423@example.com>\t \t1234\t'
1273 '17\tXref: \t',
1274 ]
1275 overview = nntplib._parse_overview(lines, fmt)
1276 (art_num, fields), = overview
1277 self.assertEqual(fields['references'], ' ')
1278 self.assertEqual(fields['xref'], '')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001279
1280 def test_parse_datetime(self):
1281 def gives(a, b, *c):
1282 self.assertEqual(nntplib._parse_datetime(a, b),
1283 datetime.datetime(*c))
1284 # Output of DATE command
1285 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1286 # Variations
1287 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1288 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1289 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1290
1291 def test_unparse_datetime(self):
1292 # Test non-legacy mode
1293 # 1) with a datetime
1294 def gives(y, M, d, h, m, s, date_str, time_str):
1295 dt = datetime.datetime(y, M, d, h, m, s)
1296 self.assertEqual(nntplib._unparse_datetime(dt),
1297 (date_str, time_str))
1298 self.assertEqual(nntplib._unparse_datetime(dt, False),
1299 (date_str, time_str))
1300 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1301 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1302 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1303 # 2) with a date
1304 def gives(y, M, d, date_str, time_str):
1305 dt = datetime.date(y, M, d)
1306 self.assertEqual(nntplib._unparse_datetime(dt),
1307 (date_str, time_str))
1308 self.assertEqual(nntplib._unparse_datetime(dt, False),
1309 (date_str, time_str))
1310 gives(1999, 6, 23, "19990623", "000000")
1311 gives(2000, 6, 23, "20000623", "000000")
1312 gives(2010, 6, 5, "20100605", "000000")
1313
1314 def test_unparse_datetime_legacy(self):
1315 # Test legacy mode (RFC 977)
1316 # 1) with a datetime
1317 def gives(y, M, d, h, m, s, date_str, time_str):
1318 dt = datetime.datetime(y, M, d, h, m, s)
1319 self.assertEqual(nntplib._unparse_datetime(dt, True),
1320 (date_str, time_str))
1321 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1322 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1323 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1324 # 2) with a date
1325 def gives(y, M, d, date_str, time_str):
1326 dt = datetime.date(y, M, d)
1327 self.assertEqual(nntplib._unparse_datetime(dt, True),
1328 (date_str, time_str))
1329 gives(1999, 6, 23, "990623", "000000")
1330 gives(2000, 6, 23, "000623", "000000")
1331 gives(2010, 6, 5, "100605", "000000")
1332
1333
1334def test_main():
Antoine Pitrou54411c12012-02-12 19:14:17 +01001335 tests = [MiscTests, NNTPv1Tests, NNTPv2Tests, CapsAfterLoginNNTPv2Tests,
Antoine Pitrou71135622012-02-14 23:29:34 +01001336 SendReaderNNTPv2Tests, NetworkedNNTPTests]
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00001337 if _have_ssl:
1338 tests.append(NetworkedNNTP_SSLTests)
1339 support.run_unittest(*tests)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001340
1341
1342if __name__ == "__main__":
1343 test_main()