blob: 23fb64727d7b3ea249ded72f5c14309420a84ee4 [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
2import datetime
3import textwrap
4import unittest
Antoine Pitroude609182010-11-18 17:29:23 +00005import functools
Antoine Pitrou69ab9512010-09-29 15:03:40 +00006import contextlib
7from test import support
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00008from nntplib import NNTP, GroupInfo, _have_ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +00009import nntplib
Antoine Pitrou1cb121e2010-11-09 18:54:37 +000010if _have_ssl:
11 import ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +000012
13TIMEOUT = 30
14
15# TODO:
16# - test the `file` arg to more commands
17# - test error conditions
Antoine Pitroua5785b12010-09-29 16:19:50 +000018# - test auth and `usenetrc`
Antoine Pitrou69ab9512010-09-29 15:03:40 +000019
20
21class NetworkedNNTPTestsMixin:
22
23 def test_welcome(self):
24 welcome = self.server.getwelcome()
25 self.assertEqual(str, type(welcome))
26
27 def test_help(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000028 resp, lines = self.server.help()
Antoine Pitrou69ab9512010-09-29 15:03:40 +000029 self.assertTrue(resp.startswith("100 "), resp)
Antoine Pitrou08eeada2010-11-04 21:36:15 +000030 for line in lines:
Antoine Pitrou69ab9512010-09-29 15:03:40 +000031 self.assertEqual(str, type(line))
32
33 def test_list(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000034 resp, groups = self.server.list()
35 if len(groups) > 0:
36 self.assertEqual(GroupInfo, type(groups[0]))
37 self.assertEqual(str, type(groups[0].group))
38
39 def test_list_active(self):
40 resp, groups = self.server.list(self.GROUP_PAT)
41 if len(groups) > 0:
42 self.assertEqual(GroupInfo, type(groups[0]))
43 self.assertEqual(str, type(groups[0].group))
Antoine Pitrou69ab9512010-09-29 15:03:40 +000044
45 def test_unknown_command(self):
46 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
47 self.server._shortcmd("XYZZY")
48 resp = cm.exception.response
49 self.assertTrue(resp.startswith("500 "), resp)
50
51 def test_newgroups(self):
52 # gmane gets a constant influx of new groups. In order not to stress
53 # the server too much, we choose a recent date in the past.
54 dt = datetime.date.today() - datetime.timedelta(days=7)
55 resp, groups = self.server.newgroups(dt)
56 if len(groups) > 0:
57 self.assertIsInstance(groups[0], GroupInfo)
58 self.assertIsInstance(groups[0].group, str)
59
60 def test_description(self):
61 def _check_desc(desc):
62 # Sanity checks
63 self.assertIsInstance(desc, str)
64 self.assertNotIn(self.GROUP_NAME, desc)
65 desc = self.server.description(self.GROUP_NAME)
66 _check_desc(desc)
67 # Another sanity check
68 self.assertIn("Python", desc)
69 # With a pattern
70 desc = self.server.description(self.GROUP_PAT)
71 _check_desc(desc)
72 # Shouldn't exist
73 desc = self.server.description("zk.brrtt.baz")
74 self.assertEqual(desc, '')
75
76 def test_descriptions(self):
77 resp, descs = self.server.descriptions(self.GROUP_PAT)
78 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
79 self.assertTrue(
80 resp.startswith("215 ") or resp.startswith("282 "), resp)
81 self.assertIsInstance(descs, dict)
82 desc = descs[self.GROUP_NAME]
83 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
84
85 def test_group(self):
86 result = self.server.group(self.GROUP_NAME)
87 self.assertEqual(5, len(result))
88 resp, count, first, last, group = result
89 self.assertEqual(group, self.GROUP_NAME)
90 self.assertIsInstance(count, int)
91 self.assertIsInstance(first, int)
92 self.assertIsInstance(last, int)
93 self.assertLessEqual(first, last)
94 self.assertTrue(resp.startswith("211 "), resp)
95
96 def test_date(self):
97 resp, date = self.server.date()
98 self.assertIsInstance(date, datetime.datetime)
99 # Sanity check
100 self.assertGreaterEqual(date.year, 1995)
101 self.assertLessEqual(date.year, 2030)
102
103 def _check_art_dict(self, art_dict):
104 # Some sanity checks for a field dictionary returned by OVER / XOVER
105 self.assertIsInstance(art_dict, dict)
106 # NNTP has 7 mandatory fields
107 self.assertGreaterEqual(art_dict.keys(),
108 {"subject", "from", "date", "message-id",
109 "references", ":bytes", ":lines"}
110 )
111 for v in art_dict.values():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000112 self.assertIsInstance(v, (str, type(None)))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000113
114 def test_xover(self):
115 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000116 resp, lines = self.server.xover(last - 5, last)
117 if len(lines) == 0:
118 self.skipTest("no articles retrieved")
119 # The 'last' article is not necessarily part of the output (cancelled?)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000120 art_num, art_dict = lines[0]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000121 self.assertGreaterEqual(art_num, last - 5)
122 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000123 self._check_art_dict(art_dict)
124
125 def test_over(self):
126 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
127 start = last - 10
128 # The "start-" article range form
129 resp, lines = self.server.over((start, None))
130 art_num, art_dict = lines[0]
131 self._check_art_dict(art_dict)
132 # The "start-end" article range form
133 resp, lines = self.server.over((start, last))
134 art_num, art_dict = lines[-1]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000135 # The 'last' article is not necessarily part of the output (cancelled?)
136 self.assertGreaterEqual(art_num, start)
137 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000138 self._check_art_dict(art_dict)
139 # XXX The "message_id" form is unsupported by gmane
140 # 503 Overview by message-ID unsupported
141
142 def test_xhdr(self):
143 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
144 resp, lines = self.server.xhdr('subject', last)
145 for line in lines:
146 self.assertEqual(str, type(line[1]))
147
148 def check_article_resp(self, resp, article, art_num=None):
149 self.assertIsInstance(article, nntplib.ArticleInfo)
150 if art_num is not None:
151 self.assertEqual(article.number, art_num)
152 for line in article.lines:
153 self.assertIsInstance(line, bytes)
154 # XXX this could exceptionally happen...
155 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
156
157 def test_article_head_body(self):
158 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000159 # Try to find an available article
160 for art_num in (last, first, last - 1):
161 try:
162 resp, head = self.server.head(art_num)
163 except nntplib.NNTPTemporaryError as e:
164 if not e.response.startswith("423 "):
165 raise
166 # "423 No such article" => choose another one
167 continue
168 break
169 else:
170 self.skipTest("could not find a suitable article number")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000171 self.assertTrue(resp.startswith("221 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000172 self.check_article_resp(resp, head, art_num)
173 resp, body = self.server.body(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000174 self.assertTrue(resp.startswith("222 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000175 self.check_article_resp(resp, body, art_num)
176 resp, article = self.server.article(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000177 self.assertTrue(resp.startswith("220 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000178 self.check_article_resp(resp, article, art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000179 self.assertEqual(article.lines, head.lines + [b''] + body.lines)
180
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000181 def test_capabilities(self):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000182 # The server under test implements NNTP version 2 and has a
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000183 # couple of well-known capabilities. Just sanity check that we
184 # got them.
185 def _check_caps(caps):
186 caps_list = caps['LIST']
187 self.assertIsInstance(caps_list, (list, tuple))
188 self.assertIn('OVERVIEW.FMT', caps_list)
189 self.assertGreaterEqual(self.server.nntp_version, 2)
190 _check_caps(self.server.getcapabilities())
191 # This re-emits the command
192 resp, caps = self.server.capabilities()
193 _check_caps(caps)
194
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000195 if _have_ssl:
196 def test_starttls(self):
197 file = self.server.file
198 sock = self.server.sock
199 try:
200 self.server.starttls()
201 except nntplib.NNTPPermanentError:
202 self.skipTest("STARTTLS not supported by server.")
203 else:
204 # Check that the socket and internal pseudo-file really were
205 # changed.
206 self.assertNotEqual(file, self.server.file)
207 self.assertNotEqual(sock, self.server.sock)
208 # Check that the new socket really is an SSL one
209 self.assertIsInstance(self.server.sock, ssl.SSLSocket)
210 # Check that trying starttls when it's already active fails.
211 self.assertRaises(ValueError, self.server.starttls)
212
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000213 def test_zlogin(self):
214 # This test must be the penultimate because further commands will be
215 # refused.
216 baduser = "notarealuser"
217 badpw = "notarealpassword"
218 # Check that bogus credentials cause failure
219 self.assertRaises(nntplib.NNTPError, self.server.login,
220 user=baduser, password=badpw, usenetrc=False)
221 # FIXME: We should check that correct credentials succeed, but that
222 # would require valid details for some server somewhere to be in the
223 # test suite, I think. Gmane is anonymous, at least as used for the
224 # other tests.
225
226 def test_zzquit(self):
227 # This test must be called last, hence the name
228 cls = type(self)
Antoine Pitrou3bce11c2010-11-21 17:14:19 +0000229 try:
230 self.server.quit()
231 finally:
232 cls.server = None
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000233
Antoine Pitroude609182010-11-18 17:29:23 +0000234 @classmethod
235 def wrap_methods(cls):
236 # Wrap all methods in a transient_internet() exception catcher
237 # XXX put a generic version in test.support?
238 def wrap_meth(meth):
239 @functools.wraps(meth)
240 def wrapped(self):
241 with support.transient_internet(self.NNTP_HOST):
242 meth(self)
243 return wrapped
244 for name in dir(cls):
245 if not name.startswith('test_'):
246 continue
247 meth = getattr(cls, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200248 if not callable(meth):
Antoine Pitroude609182010-11-18 17:29:23 +0000249 continue
250 # Need to use a closure so that meth remains bound to its current
251 # value
252 setattr(cls, name, wrap_meth(meth))
253
254NetworkedNNTPTestsMixin.wrap_methods()
255
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000256
257class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
258 # This server supports STARTTLS (gmane doesn't)
259 NNTP_HOST = 'news.trigofacile.com'
260 GROUP_NAME = 'fr.comp.lang.python'
261 GROUP_PAT = 'fr.comp.lang.*'
262
Antoine Pitroude609182010-11-18 17:29:23 +0000263 NNTP_CLASS = NNTP
264
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000265 @classmethod
266 def setUpClass(cls):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000267 support.requires("network")
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000268 with support.transient_internet(cls.NNTP_HOST):
Antoine Pitroude609182010-11-18 17:29:23 +0000269 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, timeout=TIMEOUT, usenetrc=False)
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000270
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000271 @classmethod
272 def tearDownClass(cls):
273 if cls.server is not None:
274 cls.server.quit()
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000275
276
277if _have_ssl:
Antoine Pitroude609182010-11-18 17:29:23 +0000278 class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000279
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000280 # Technical limits for this public NNTP server (see http://www.aioe.org):
281 # "Only two concurrent connections per IP address are allowed and
282 # 400 connections per day are accepted from each IP address."
283
284 NNTP_HOST = 'nntp.aioe.org'
285 GROUP_NAME = 'comp.lang.python'
286 GROUP_PAT = 'comp.lang.*'
287
Antoine Pitroude609182010-11-18 17:29:23 +0000288 NNTP_CLASS = nntplib.NNTP_SSL
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000289
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000290 # Disabled as it produces too much data
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000291 test_list = None
292
293 # Disabled as the connection will already be encrypted.
294 test_starttls = None
295
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000296
297#
298# Non-networked tests using a local server (or something mocking it).
299#
300
301class _NNTPServerIO(io.RawIOBase):
302 """A raw IO object allowing NNTP commands to be received and processed
303 by a handler. The handler can push responses which can then be read
304 from the IO object."""
305
306 def __init__(self, handler):
307 io.RawIOBase.__init__(self)
308 # The channel from the client
309 self.c2s = io.BytesIO()
310 # The channel to the client
311 self.s2c = io.BytesIO()
312 self.handler = handler
313 self.handler.start(self.c2s.readline, self.push_data)
314
315 def readable(self):
316 return True
317
318 def writable(self):
319 return True
320
321 def push_data(self, data):
322 """Push (buffer) some data to send to the client."""
323 pos = self.s2c.tell()
324 self.s2c.seek(0, 2)
325 self.s2c.write(data)
326 self.s2c.seek(pos)
327
328 def write(self, b):
329 """The client sends us some data"""
330 pos = self.c2s.tell()
331 self.c2s.write(b)
332 self.c2s.seek(pos)
333 self.handler.process_pending()
334 return len(b)
335
336 def readinto(self, buf):
337 """The client wants to read a response"""
338 self.handler.process_pending()
339 b = self.s2c.read(len(buf))
340 n = len(b)
341 buf[:n] = b
342 return n
343
344
345class MockedNNTPTestsMixin:
346 # Override in derived classes
347 handler_class = None
348
349 def setUp(self):
350 super().setUp()
351 self.make_server()
352
353 def tearDown(self):
354 super().tearDown()
355 del self.server
356
357 def make_server(self, *args, **kwargs):
358 self.handler = self.handler_class()
359 self.sio = _NNTPServerIO(self.handler)
360 # Using BufferedRWPair instead of BufferedRandom ensures the file
361 # isn't seekable.
362 file = io.BufferedRWPair(self.sio, self.sio)
Antoine Pitroua5785b12010-09-29 16:19:50 +0000363 self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000364 return self.server
365
366
367class NNTPv1Handler:
368 """A handler for RFC 977"""
369
370 welcome = "200 NNTP mock server"
371
372 def start(self, readline, push_data):
373 self.in_body = False
374 self.allow_posting = True
375 self._readline = readline
376 self._push_data = push_data
Antoine Pitrou54411c12012-02-12 19:14:17 +0100377 self._logged_in = False
378 self._user_sent = False
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000379 # Our welcome
380 self.handle_welcome()
381
382 def _decode(self, data):
383 return str(data, "utf-8", "surrogateescape")
384
385 def process_pending(self):
386 if self.in_body:
387 while True:
388 line = self._readline()
389 if not line:
390 return
391 self.body.append(line)
392 if line == b".\r\n":
393 break
394 try:
395 meth, tokens = self.body_callback
396 meth(*tokens, body=self.body)
397 finally:
398 self.body_callback = None
399 self.body = None
400 self.in_body = False
401 while True:
402 line = self._decode(self._readline())
403 if not line:
404 return
405 if not line.endswith("\r\n"):
406 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
407 line = line[:-2]
408 cmd, *tokens = line.split()
409 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
410 meth = getattr(self, "handle_" + cmd.upper(), None)
411 if meth is None:
412 self.handle_unknown()
413 else:
414 try:
415 meth(*tokens)
416 except Exception as e:
417 raise ValueError("command failed: {!r}".format(line)) from e
418 else:
419 if self.in_body:
420 self.body_callback = meth, tokens
421 self.body = []
422
423 def expect_body(self):
424 """Flag that the client is expected to post a request body"""
425 self.in_body = True
426
427 def push_data(self, data):
428 """Push some binary data"""
429 self._push_data(data)
430
431 def push_lit(self, lit):
432 """Push a string literal"""
433 lit = textwrap.dedent(lit)
434 lit = "\r\n".join(lit.splitlines()) + "\r\n"
435 lit = lit.encode('utf-8')
436 self.push_data(lit)
437
438 def handle_unknown(self):
439 self.push_lit("500 What?")
440
441 def handle_welcome(self):
442 self.push_lit(self.welcome)
443
444 def handle_QUIT(self):
445 self.push_lit("205 Bye!")
446
447 def handle_DATE(self):
448 self.push_lit("111 20100914001155")
449
450 def handle_GROUP(self, group):
451 if group == "fr.comp.lang.python":
452 self.push_lit("211 486 761 1265 fr.comp.lang.python")
453 else:
454 self.push_lit("411 No such group {}".format(group))
455
456 def handle_HELP(self):
457 self.push_lit("""\
458 100 Legal commands
459 authinfo user Name|pass Password|generic <prog> <args>
460 date
461 help
462 Report problems to <root@example.org>
463 .""")
464
465 def handle_STAT(self, message_spec=None):
466 if message_spec is None:
467 self.push_lit("412 No newsgroup selected")
468 elif message_spec == "3000234":
469 self.push_lit("223 3000234 <45223423@example.com>")
470 elif message_spec == "<45223423@example.com>":
471 self.push_lit("223 0 <45223423@example.com>")
472 else:
473 self.push_lit("430 No Such Article Found")
474
475 def handle_NEXT(self):
476 self.push_lit("223 3000237 <668929@example.org> retrieved")
477
478 def handle_LAST(self):
479 self.push_lit("223 3000234 <45223423@example.com> retrieved")
480
481 def handle_LIST(self, action=None, param=None):
482 if action is None:
483 self.push_lit("""\
484 215 Newsgroups in form "group high low flags".
485 comp.lang.python 0000052340 0000002828 y
486 comp.lang.python.announce 0000001153 0000000993 m
487 free.it.comp.lang.python 0000000002 0000000002 y
488 fr.comp.lang.python 0000001254 0000000760 y
489 free.it.comp.lang.python.learner 0000000000 0000000001 y
490 tw.bbs.comp.lang.python 0000000304 0000000304 y
491 .""")
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000492 elif action == "ACTIVE":
493 if param == "*distutils*":
494 self.push_lit("""\
495 215 Newsgroups in form "group high low flags"
496 gmane.comp.python.distutils.devel 0000014104 0000000001 m
497 gmane.comp.python.distutils.cvs 0000000000 0000000001 m
498 .""")
499 else:
500 self.push_lit("""\
501 215 Newsgroups in form "group high low flags"
502 .""")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000503 elif action == "OVERVIEW.FMT":
504 self.push_lit("""\
505 215 Order of fields in overview database.
506 Subject:
507 From:
508 Date:
509 Message-ID:
510 References:
511 Bytes:
512 Lines:
513 Xref:full
514 .""")
515 elif action == "NEWSGROUPS":
516 assert param is not None
517 if param == "comp.lang.python":
518 self.push_lit("""\
519 215 Descriptions in form "group description".
520 comp.lang.python\tThe Python computer language.
521 .""")
522 elif param == "comp.lang.python*":
523 self.push_lit("""\
524 215 Descriptions in form "group description".
525 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
526 comp.lang.python\tThe Python computer language.
527 .""")
528 else:
529 self.push_lit("""\
530 215 Descriptions in form "group description".
531 .""")
532 else:
533 self.push_lit('501 Unknown LIST keyword')
534
535 def handle_NEWNEWS(self, group, date_str, time_str):
536 # We hard code different return messages depending on passed
537 # argument and date syntax.
538 if (group == "comp.lang.python" and date_str == "20100913"
539 and time_str == "082004"):
540 # Date was passed in RFC 3977 format (NNTP "v2")
541 self.push_lit("""\
542 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
543 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
544 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
545 .""")
546 elif (group == "comp.lang.python" and date_str == "100913"
547 and time_str == "082004"):
548 # Date was passed in RFC 977 format (NNTP "v1")
549 self.push_lit("""\
550 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
551 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
552 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
553 .""")
554 else:
555 self.push_lit("""\
556 230 An empty list of newsarticles follows
557 .""")
558 # (Note for experiments: many servers disable NEWNEWS.
559 # As of this writing, sicinfo3.epfl.ch doesn't.)
560
561 def handle_XOVER(self, message_spec):
562 if message_spec == "57-59":
563 self.push_lit(
564 "224 Overview information for 57-58 follows\n"
565 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
566 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
567 "\tSat, 19 Jun 2010 18:04:08 -0400"
568 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
569 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
570 "\tXref: news.gmane.org gmane.comp.python.authors:57"
571 "\n"
572 "58\tLooking for a few good bloggers"
573 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
574 "\tThu, 22 Jul 2010 09:14:14 -0400"
575 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
576 "\t\t6683\t16"
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000577 "\t"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000578 "\n"
579 # An UTF-8 overview line from fr.comp.lang.python
580 "59\tRe: Message d'erreur incompréhensible (par moi)"
581 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
582 "\tWed, 15 Sep 2010 18:09:15 +0200"
583 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
584 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
585 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
586 "\n"
587 ".\n")
588 else:
589 self.push_lit("""\
590 224 No articles
591 .""")
592
593 def handle_POST(self, *, body=None):
594 if body is None:
595 if self.allow_posting:
596 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
597 self.expect_body()
598 else:
599 self.push_lit("440 Posting not permitted")
600 else:
601 assert self.allow_posting
602 self.push_lit("240 Article received OK")
603 self.posted_body = body
604
605 def handle_IHAVE(self, message_id, *, body=None):
606 if body is None:
607 if (self.allow_posting and
608 message_id == "<i.am.an.article.you.will.want@example.com>"):
609 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
610 self.expect_body()
611 else:
612 self.push_lit("435 Article not wanted")
613 else:
614 assert self.allow_posting
615 self.push_lit("235 Article transferred OK")
616 self.posted_body = body
617
618 sample_head = """\
619 From: "Demo User" <nobody@example.net>
620 Subject: I am just a test article
621 Content-Type: text/plain; charset=UTF-8; format=flowed
622 Message-ID: <i.am.an.article.you.will.want@example.com>"""
623
624 sample_body = """\
625 This is just a test article.
626 ..Here is a dot-starting line.
627
628 -- Signed by Andr\xe9."""
629
630 sample_article = sample_head + "\n\n" + sample_body
631
632 def handle_ARTICLE(self, message_spec=None):
633 if message_spec is None:
634 self.push_lit("220 3000237 <45223423@example.com>")
635 elif message_spec == "<45223423@example.com>":
636 self.push_lit("220 0 <45223423@example.com>")
637 elif message_spec == "3000234":
638 self.push_lit("220 3000234 <45223423@example.com>")
639 else:
640 self.push_lit("430 No Such Article Found")
641 return
642 self.push_lit(self.sample_article)
643 self.push_lit(".")
644
645 def handle_HEAD(self, message_spec=None):
646 if message_spec is None:
647 self.push_lit("221 3000237 <45223423@example.com>")
648 elif message_spec == "<45223423@example.com>":
649 self.push_lit("221 0 <45223423@example.com>")
650 elif message_spec == "3000234":
651 self.push_lit("221 3000234 <45223423@example.com>")
652 else:
653 self.push_lit("430 No Such Article Found")
654 return
655 self.push_lit(self.sample_head)
656 self.push_lit(".")
657
658 def handle_BODY(self, message_spec=None):
659 if message_spec is None:
660 self.push_lit("222 3000237 <45223423@example.com>")
661 elif message_spec == "<45223423@example.com>":
662 self.push_lit("222 0 <45223423@example.com>")
663 elif message_spec == "3000234":
664 self.push_lit("222 3000234 <45223423@example.com>")
665 else:
666 self.push_lit("430 No Such Article Found")
667 return
668 self.push_lit(self.sample_body)
669 self.push_lit(".")
670
Antoine Pitrou54411c12012-02-12 19:14:17 +0100671 def handle_AUTHINFO(self, cred_type, data):
672 if self._logged_in:
673 self.push_lit('502 Already Logged In')
674 elif cred_type == 'user':
675 if self._user_sent:
676 self.push_lit('482 User Credential Already Sent')
677 else:
678 self.push_lit('381 Password Required')
679 self._user_sent = True
680 elif cred_type == 'pass':
681 self.push_lit('281 Login Successful')
682 self._logged_in = True
683 else:
684 raise Exception('Unknown cred type {}'.format(cred_type))
685
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000686
687class NNTPv2Handler(NNTPv1Handler):
688 """A handler for RFC 3977 (NNTP "v2")"""
689
690 def handle_CAPABILITIES(self):
Antoine Pitrou54411c12012-02-12 19:14:17 +0100691 fmt = """\
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000692 101 Capability list:
Antoine Pitrouf80b3f72010-11-02 22:31:52 +0000693 VERSION 2 3
Antoine Pitrou54411c12012-02-12 19:14:17 +0100694 IMPLEMENTATION INN 2.5.1{}
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000695 HDR
696 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
697 OVER
698 POST
699 READER
Antoine Pitrou54411c12012-02-12 19:14:17 +0100700 ."""
701
702 if not self._logged_in:
703 self.push_lit(fmt.format('\n AUTHINFO USER'))
704 else:
705 self.push_lit(fmt.format(''))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000706
707 def handle_OVER(self, message_spec=None):
708 return self.handle_XOVER(message_spec)
709
710
Antoine Pitrou54411c12012-02-12 19:14:17 +0100711class CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
712 """A handler that allows CAPABILITIES only after login"""
713
714 def handle_CAPABILITIES(self):
715 if not self._logged_in:
716 self.push_lit('480 You must log in.')
717 else:
718 super().handle_CAPABILITIES()
719
720
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000721class NNTPv1v2TestsMixin:
722
723 def setUp(self):
724 super().setUp()
725
726 def test_welcome(self):
727 self.assertEqual(self.server.welcome, self.handler.welcome)
728
Antoine Pitrou54411c12012-02-12 19:14:17 +0100729 def test_authinfo(self):
730 if self.nntp_version == 2:
731 self.assertIn('AUTHINFO', self.server._caps)
732 self.server.login('testuser', 'testpw')
733 # if AUTHINFO is gone from _caps we also know that getcapabilities()
734 # has been called after login as it should
735 self.assertNotIn('AUTHINFO', self.server._caps)
736
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000737 def test_date(self):
738 resp, date = self.server.date()
739 self.assertEqual(resp, "111 20100914001155")
740 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
741
742 def test_quit(self):
743 self.assertFalse(self.sio.closed)
744 resp = self.server.quit()
745 self.assertEqual(resp, "205 Bye!")
746 self.assertTrue(self.sio.closed)
747
748 def test_help(self):
749 resp, help = self.server.help()
750 self.assertEqual(resp, "100 Legal commands")
751 self.assertEqual(help, [
752 ' authinfo user Name|pass Password|generic <prog> <args>',
753 ' date',
754 ' help',
755 'Report problems to <root@example.org>',
756 ])
757
758 def test_list(self):
759 resp, groups = self.server.list()
760 self.assertEqual(len(groups), 6)
761 g = groups[1]
762 self.assertEqual(g,
763 GroupInfo("comp.lang.python.announce", "0000001153",
764 "0000000993", "m"))
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000765 resp, groups = self.server.list("*distutils*")
766 self.assertEqual(len(groups), 2)
767 g = groups[0]
768 self.assertEqual(g,
769 GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
770 "0000000001", "m"))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000771
772 def test_stat(self):
773 resp, art_num, message_id = self.server.stat(3000234)
774 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
775 self.assertEqual(art_num, 3000234)
776 self.assertEqual(message_id, "<45223423@example.com>")
777 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
778 self.assertEqual(resp, "223 0 <45223423@example.com>")
779 self.assertEqual(art_num, 0)
780 self.assertEqual(message_id, "<45223423@example.com>")
781 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
782 self.server.stat("<non.existent.id>")
783 self.assertEqual(cm.exception.response, "430 No Such Article Found")
784 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
785 self.server.stat()
786 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
787
788 def test_next(self):
789 resp, art_num, message_id = self.server.next()
790 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
791 self.assertEqual(art_num, 3000237)
792 self.assertEqual(message_id, "<668929@example.org>")
793
794 def test_last(self):
795 resp, art_num, message_id = self.server.last()
796 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
797 self.assertEqual(art_num, 3000234)
798 self.assertEqual(message_id, "<45223423@example.com>")
799
800 def test_description(self):
801 desc = self.server.description("comp.lang.python")
802 self.assertEqual(desc, "The Python computer language.")
803 desc = self.server.description("comp.lang.pythonx")
804 self.assertEqual(desc, "")
805
806 def test_descriptions(self):
807 resp, groups = self.server.descriptions("comp.lang.python")
808 self.assertEqual(resp, '215 Descriptions in form "group description".')
809 self.assertEqual(groups, {
810 "comp.lang.python": "The Python computer language.",
811 })
812 resp, groups = self.server.descriptions("comp.lang.python*")
813 self.assertEqual(groups, {
814 "comp.lang.python": "The Python computer language.",
815 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
816 })
817 resp, groups = self.server.descriptions("comp.lang.pythonx")
818 self.assertEqual(groups, {})
819
820 def test_group(self):
821 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
822 self.assertTrue(resp.startswith("211 "), resp)
823 self.assertEqual(first, 761)
824 self.assertEqual(last, 1265)
825 self.assertEqual(count, 486)
826 self.assertEqual(group, "fr.comp.lang.python")
827 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
828 self.server.group("comp.lang.python.devel")
829 exc = cm.exception
830 self.assertTrue(exc.response.startswith("411 No such group"),
831 exc.response)
832
833 def test_newnews(self):
834 # NEWNEWS comp.lang.python [20]100913 082004
835 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
836 resp, ids = self.server.newnews("comp.lang.python", dt)
837 expected = (
838 "230 list of newsarticles (NNTP v{0}) "
839 "created after Mon Sep 13 08:20:04 2010 follows"
840 ).format(self.nntp_version)
841 self.assertEqual(resp, expected)
842 self.assertEqual(ids, [
843 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
844 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
845 ])
846 # NEWNEWS fr.comp.lang.python [20]100913 082004
847 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
848 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
849 self.assertEqual(resp, "230 An empty list of newsarticles follows")
850 self.assertEqual(ids, [])
851
852 def _check_article_body(self, lines):
853 self.assertEqual(len(lines), 4)
854 self.assertEqual(lines[-1].decode('utf8'), "-- Signed by André.")
855 self.assertEqual(lines[-2], b"")
856 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
857 self.assertEqual(lines[-4], b"This is just a test article.")
858
859 def _check_article_head(self, lines):
860 self.assertEqual(len(lines), 4)
861 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
862 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
863
864 def _check_article_data(self, lines):
865 self.assertEqual(len(lines), 9)
866 self._check_article_head(lines[:4])
867 self._check_article_body(lines[-4:])
868 self.assertEqual(lines[4], b"")
869
870 def test_article(self):
871 # ARTICLE
872 resp, info = self.server.article()
873 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
874 art_num, message_id, lines = info
875 self.assertEqual(art_num, 3000237)
876 self.assertEqual(message_id, "<45223423@example.com>")
877 self._check_article_data(lines)
878 # ARTICLE num
879 resp, info = self.server.article(3000234)
880 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
881 art_num, message_id, lines = info
882 self.assertEqual(art_num, 3000234)
883 self.assertEqual(message_id, "<45223423@example.com>")
884 self._check_article_data(lines)
885 # ARTICLE id
886 resp, info = self.server.article("<45223423@example.com>")
887 self.assertEqual(resp, "220 0 <45223423@example.com>")
888 art_num, message_id, lines = info
889 self.assertEqual(art_num, 0)
890 self.assertEqual(message_id, "<45223423@example.com>")
891 self._check_article_data(lines)
892 # Non-existent id
893 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
894 self.server.article("<non-existent@example.com>")
895 self.assertEqual(cm.exception.response, "430 No Such Article Found")
896
897 def test_article_file(self):
898 # With a "file" argument
899 f = io.BytesIO()
900 resp, info = self.server.article(file=f)
901 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
902 art_num, message_id, lines = info
903 self.assertEqual(art_num, 3000237)
904 self.assertEqual(message_id, "<45223423@example.com>")
905 self.assertEqual(lines, [])
906 data = f.getvalue()
907 self.assertTrue(data.startswith(
908 b'From: "Demo User" <nobody@example.net>\r\n'
909 b'Subject: I am just a test article\r\n'
910 ), ascii(data))
911 self.assertTrue(data.endswith(
912 b'This is just a test article.\r\n'
913 b'.Here is a dot-starting line.\r\n'
914 b'\r\n'
915 b'-- Signed by Andr\xc3\xa9.\r\n'
916 ), ascii(data))
917
918 def test_head(self):
919 # HEAD
920 resp, info = self.server.head()
921 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
922 art_num, message_id, lines = info
923 self.assertEqual(art_num, 3000237)
924 self.assertEqual(message_id, "<45223423@example.com>")
925 self._check_article_head(lines)
926 # HEAD num
927 resp, info = self.server.head(3000234)
928 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
929 art_num, message_id, lines = info
930 self.assertEqual(art_num, 3000234)
931 self.assertEqual(message_id, "<45223423@example.com>")
932 self._check_article_head(lines)
933 # HEAD id
934 resp, info = self.server.head("<45223423@example.com>")
935 self.assertEqual(resp, "221 0 <45223423@example.com>")
936 art_num, message_id, lines = info
937 self.assertEqual(art_num, 0)
938 self.assertEqual(message_id, "<45223423@example.com>")
939 self._check_article_head(lines)
940 # Non-existent id
941 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
942 self.server.head("<non-existent@example.com>")
943 self.assertEqual(cm.exception.response, "430 No Such Article Found")
944
945 def test_body(self):
946 # BODY
947 resp, info = self.server.body()
948 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
949 art_num, message_id, lines = info
950 self.assertEqual(art_num, 3000237)
951 self.assertEqual(message_id, "<45223423@example.com>")
952 self._check_article_body(lines)
953 # BODY num
954 resp, info = self.server.body(3000234)
955 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
956 art_num, message_id, lines = info
957 self.assertEqual(art_num, 3000234)
958 self.assertEqual(message_id, "<45223423@example.com>")
959 self._check_article_body(lines)
960 # BODY id
961 resp, info = self.server.body("<45223423@example.com>")
962 self.assertEqual(resp, "222 0 <45223423@example.com>")
963 art_num, message_id, lines = info
964 self.assertEqual(art_num, 0)
965 self.assertEqual(message_id, "<45223423@example.com>")
966 self._check_article_body(lines)
967 # Non-existent id
968 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
969 self.server.body("<non-existent@example.com>")
970 self.assertEqual(cm.exception.response, "430 No Such Article Found")
971
972 def check_over_xover_resp(self, resp, overviews):
973 self.assertTrue(resp.startswith("224 "), resp)
974 self.assertEqual(len(overviews), 3)
975 art_num, over = overviews[0]
976 self.assertEqual(art_num, 57)
977 self.assertEqual(over, {
978 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
979 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
980 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
981 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
982 "references": "<hvalf7$ort$1@dough.gmane.org>",
983 ":bytes": "7103",
984 ":lines": "16",
985 "xref": "news.gmane.org gmane.comp.python.authors:57"
986 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000987 art_num, over = overviews[1]
988 self.assertEqual(over["xref"], None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000989 art_num, over = overviews[2]
990 self.assertEqual(over["subject"],
991 "Re: Message d'erreur incompréhensible (par moi)")
992
993 def test_xover(self):
994 resp, overviews = self.server.xover(57, 59)
995 self.check_over_xover_resp(resp, overviews)
996
997 def test_over(self):
998 # In NNTP "v1", this will fallback on XOVER
999 resp, overviews = self.server.over((57, 59))
1000 self.check_over_xover_resp(resp, overviews)
1001
1002 sample_post = (
1003 b'From: "Demo User" <nobody@example.net>\r\n'
1004 b'Subject: I am just a test article\r\n'
1005 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
1006 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
1007 b'\r\n'
1008 b'This is just a test article.\r\n'
1009 b'.Here is a dot-starting line.\r\n'
1010 b'\r\n'
1011 b'-- Signed by Andr\xc3\xa9.\r\n'
1012 )
1013
1014 def _check_posted_body(self):
1015 # Check the raw body as received by the server
1016 lines = self.handler.posted_body
1017 # One additional line for the "." terminator
1018 self.assertEqual(len(lines), 10)
1019 self.assertEqual(lines[-1], b'.\r\n')
1020 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
1021 self.assertEqual(lines[-3], b'\r\n')
1022 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
1023 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
1024
1025 def _check_post_ihave_sub(self, func, *args, file_factory):
1026 # First the prepared post with CRLF endings
1027 post = self.sample_post
1028 func_args = args + (file_factory(post),)
1029 self.handler.posted_body = None
1030 resp = func(*func_args)
1031 self._check_posted_body()
1032 # Then the same post with "normal" line endings - they should be
1033 # converted by NNTP.post and NNTP.ihave.
1034 post = self.sample_post.replace(b"\r\n", b"\n")
1035 func_args = args + (file_factory(post),)
1036 self.handler.posted_body = None
1037 resp = func(*func_args)
1038 self._check_posted_body()
1039 return resp
1040
1041 def check_post_ihave(self, func, success_resp, *args):
1042 # With a bytes object
1043 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1044 self.assertEqual(resp, success_resp)
1045 # With a bytearray object
1046 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1047 self.assertEqual(resp, success_resp)
1048 # With a file object
1049 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1050 self.assertEqual(resp, success_resp)
1051 # With an iterable of terminated lines
1052 def iterlines(b):
1053 return iter(b.splitlines(True))
1054 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1055 self.assertEqual(resp, success_resp)
1056 # With an iterable of non-terminated lines
1057 def iterlines(b):
1058 return iter(b.splitlines(False))
1059 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1060 self.assertEqual(resp, success_resp)
1061
1062 def test_post(self):
1063 self.check_post_ihave(self.server.post, "240 Article received OK")
1064 self.handler.allow_posting = False
1065 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1066 self.server.post(self.sample_post)
1067 self.assertEqual(cm.exception.response,
1068 "440 Posting not permitted")
1069
1070 def test_ihave(self):
1071 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1072 "<i.am.an.article.you.will.want@example.com>")
1073 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1074 self.server.ihave("<another.message.id>", self.sample_post)
1075 self.assertEqual(cm.exception.response,
1076 "435 Article not wanted")
1077
1078
1079class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1080 """Tests an NNTP v1 server (no capabilities)."""
1081
1082 nntp_version = 1
1083 handler_class = NNTPv1Handler
1084
1085 def test_caps(self):
1086 caps = self.server.getcapabilities()
1087 self.assertEqual(caps, {})
1088 self.assertEqual(self.server.nntp_version, 1)
Antoine Pitroua0781152010-11-05 19:16:37 +00001089 self.assertEqual(self.server.nntp_implementation, None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001090
1091
1092class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1093 """Tests an NNTP v2 server (with capabilities)."""
1094
1095 nntp_version = 2
1096 handler_class = NNTPv2Handler
1097
1098 def test_caps(self):
1099 caps = self.server.getcapabilities()
1100 self.assertEqual(caps, {
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001101 'VERSION': ['2', '3'],
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001102 'IMPLEMENTATION': ['INN', '2.5.1'],
1103 'AUTHINFO': ['USER'],
1104 'HDR': [],
1105 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1106 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1107 'OVER': [],
1108 'POST': [],
1109 'READER': [],
1110 })
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001111 self.assertEqual(self.server.nntp_version, 3)
Antoine Pitroua0781152010-11-05 19:16:37 +00001112 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001113
1114
Antoine Pitrou54411c12012-02-12 19:14:17 +01001115class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
1116 """Tests a probably NNTP v2 server with capabilities only after login."""
1117
1118 nntp_version = 2
1119 handler_class = CapsAfterLoginNNTPv2Handler
1120
1121 def test_caps_only_after_login(self):
1122 self.assertEqual(self.server._caps, {})
1123 self.server.login('testuser', 'testpw')
1124 self.assertIn('VERSION', self.server._caps)
1125
1126
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001127class MiscTests(unittest.TestCase):
1128
1129 def test_decode_header(self):
1130 def gives(a, b):
1131 self.assertEqual(nntplib.decode_header(a), b)
1132 gives("" , "")
1133 gives("a plain header", "a plain header")
1134 gives(" with extra spaces ", " with extra spaces ")
1135 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
1136 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1137 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1138 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1139 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1140 "Re: problème de matrice")
1141 # A natively utf-8 header (found in the real world!)
1142 gives("Re: Message d'erreur incompréhensible (par moi)",
1143 "Re: Message d'erreur incompréhensible (par moi)")
1144
1145 def test_parse_overview_fmt(self):
1146 # The minimal (default) response
1147 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1148 "References:", ":bytes", ":lines"]
1149 self.assertEqual(nntplib._parse_overview_fmt(lines),
1150 ["subject", "from", "date", "message-id", "references",
1151 ":bytes", ":lines"])
1152 # The minimal response using alternative names
1153 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1154 "References:", "Bytes:", "Lines:"]
1155 self.assertEqual(nntplib._parse_overview_fmt(lines),
1156 ["subject", "from", "date", "message-id", "references",
1157 ":bytes", ":lines"])
1158 # Variations in casing
1159 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1160 "References:", "BYTES:", "Lines:"]
1161 self.assertEqual(nntplib._parse_overview_fmt(lines),
1162 ["subject", "from", "date", "message-id", "references",
1163 ":bytes", ":lines"])
1164 # First example from RFC 3977
1165 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1166 "References:", ":bytes", ":lines", "Xref:full",
1167 "Distribution:full"]
1168 self.assertEqual(nntplib._parse_overview_fmt(lines),
1169 ["subject", "from", "date", "message-id", "references",
1170 ":bytes", ":lines", "xref", "distribution"])
1171 # Second example from RFC 3977
1172 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1173 "References:", "Bytes:", "Lines:", "Xref:FULL",
1174 "Distribution:FULL"]
1175 self.assertEqual(nntplib._parse_overview_fmt(lines),
1176 ["subject", "from", "date", "message-id", "references",
1177 ":bytes", ":lines", "xref", "distribution"])
1178 # A classic response from INN
1179 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1180 "References:", "Bytes:", "Lines:", "Xref:full"]
1181 self.assertEqual(nntplib._parse_overview_fmt(lines),
1182 ["subject", "from", "date", "message-id", "references",
1183 ":bytes", ":lines", "xref"])
1184
1185 def test_parse_overview(self):
1186 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1187 # First example from RFC 3977
1188 lines = [
1189 '3000234\tI am just a test article\t"Demo User" '
1190 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1191 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1192 '17\tXref: news.example.com misc.test:3000363',
1193 ]
1194 overview = nntplib._parse_overview(lines, fmt)
1195 (art_num, fields), = overview
1196 self.assertEqual(art_num, 3000234)
1197 self.assertEqual(fields, {
1198 'subject': 'I am just a test article',
1199 'from': '"Demo User" <nobody@example.com>',
1200 'date': '6 Oct 1998 04:38:40 -0500',
1201 'message-id': '<45223423@example.com>',
1202 'references': '<45454@example.net>',
1203 ':bytes': '1234',
1204 ':lines': '17',
1205 'xref': 'news.example.com misc.test:3000363',
1206 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001207 # Second example; here the "Xref" field is totally absent (including
1208 # the header name) and comes out as None
1209 lines = [
1210 '3000234\tI am just a test article\t"Demo User" '
1211 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1212 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1213 '17\t\t',
1214 ]
1215 overview = nntplib._parse_overview(lines, fmt)
1216 (art_num, fields), = overview
1217 self.assertEqual(fields['xref'], None)
1218 # Third example; the "Xref" is an empty string, while "references"
1219 # is a single space.
1220 lines = [
1221 '3000234\tI am just a test article\t"Demo User" '
1222 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1223 '<45223423@example.com>\t \t1234\t'
1224 '17\tXref: \t',
1225 ]
1226 overview = nntplib._parse_overview(lines, fmt)
1227 (art_num, fields), = overview
1228 self.assertEqual(fields['references'], ' ')
1229 self.assertEqual(fields['xref'], '')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001230
1231 def test_parse_datetime(self):
1232 def gives(a, b, *c):
1233 self.assertEqual(nntplib._parse_datetime(a, b),
1234 datetime.datetime(*c))
1235 # Output of DATE command
1236 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1237 # Variations
1238 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1239 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1240 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1241
1242 def test_unparse_datetime(self):
1243 # Test non-legacy mode
1244 # 1) with a datetime
1245 def gives(y, M, d, h, m, s, date_str, time_str):
1246 dt = datetime.datetime(y, M, d, h, m, s)
1247 self.assertEqual(nntplib._unparse_datetime(dt),
1248 (date_str, time_str))
1249 self.assertEqual(nntplib._unparse_datetime(dt, False),
1250 (date_str, time_str))
1251 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1252 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1253 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1254 # 2) with a date
1255 def gives(y, M, d, date_str, time_str):
1256 dt = datetime.date(y, M, d)
1257 self.assertEqual(nntplib._unparse_datetime(dt),
1258 (date_str, time_str))
1259 self.assertEqual(nntplib._unparse_datetime(dt, False),
1260 (date_str, time_str))
1261 gives(1999, 6, 23, "19990623", "000000")
1262 gives(2000, 6, 23, "20000623", "000000")
1263 gives(2010, 6, 5, "20100605", "000000")
1264
1265 def test_unparse_datetime_legacy(self):
1266 # Test legacy mode (RFC 977)
1267 # 1) with a datetime
1268 def gives(y, M, d, h, m, s, date_str, time_str):
1269 dt = datetime.datetime(y, M, d, h, m, s)
1270 self.assertEqual(nntplib._unparse_datetime(dt, True),
1271 (date_str, time_str))
1272 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1273 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1274 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1275 # 2) with a date
1276 def gives(y, M, d, date_str, time_str):
1277 dt = datetime.date(y, M, d)
1278 self.assertEqual(nntplib._unparse_datetime(dt, True),
1279 (date_str, time_str))
1280 gives(1999, 6, 23, "990623", "000000")
1281 gives(2000, 6, 23, "000623", "000000")
1282 gives(2010, 6, 5, "100605", "000000")
1283
1284
1285def test_main():
Antoine Pitrou54411c12012-02-12 19:14:17 +01001286 tests = [MiscTests, NNTPv1Tests, NNTPv2Tests, CapsAfterLoginNNTPv2Tests,
1287 NetworkedNNTPTests]
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00001288 if _have_ssl:
1289 tests.append(NetworkedNNTP_SSLTests)
1290 support.run_unittest(*tests)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001291
1292
1293if __name__ == "__main__":
1294 test_main()