blob: 31413895ea6b02117e1a0c93246cd116512f0d6e [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +00002import socket
Antoine Pitrou69ab9512010-09-29 15:03:40 +00003import datetime
4import textwrap
5import unittest
Antoine Pitroude609182010-11-18 17:29:23 +00006import functools
Antoine Pitrou69ab9512010-09-29 15:03:40 +00007import contextlib
8from test import support
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00009from nntplib import NNTP, GroupInfo, _have_ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +000010import nntplib
Antoine Pitrou1cb121e2010-11-09 18:54:37 +000011if _have_ssl:
12 import ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +000013
14TIMEOUT = 30
15
16# TODO:
17# - test the `file` arg to more commands
18# - test error conditions
Antoine Pitroua5785b12010-09-29 16:19:50 +000019# - test auth and `usenetrc`
Antoine Pitrou69ab9512010-09-29 15:03:40 +000020
21
22class NetworkedNNTPTestsMixin:
23
24 def test_welcome(self):
25 welcome = self.server.getwelcome()
26 self.assertEqual(str, type(welcome))
27
28 def test_help(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000029 resp, lines = self.server.help()
Antoine Pitrou69ab9512010-09-29 15:03:40 +000030 self.assertTrue(resp.startswith("100 "), resp)
Antoine Pitrou08eeada2010-11-04 21:36:15 +000031 for line in lines:
Antoine Pitrou69ab9512010-09-29 15:03:40 +000032 self.assertEqual(str, type(line))
33
34 def test_list(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000035 resp, groups = self.server.list()
36 if len(groups) > 0:
37 self.assertEqual(GroupInfo, type(groups[0]))
38 self.assertEqual(str, type(groups[0].group))
39
40 def test_list_active(self):
41 resp, groups = self.server.list(self.GROUP_PAT)
42 if len(groups) > 0:
43 self.assertEqual(GroupInfo, type(groups[0]))
44 self.assertEqual(str, type(groups[0].group))
Antoine Pitrou69ab9512010-09-29 15:03:40 +000045
46 def test_unknown_command(self):
47 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
48 self.server._shortcmd("XYZZY")
49 resp = cm.exception.response
50 self.assertTrue(resp.startswith("500 "), resp)
51
52 def test_newgroups(self):
53 # gmane gets a constant influx of new groups. In order not to stress
54 # the server too much, we choose a recent date in the past.
55 dt = datetime.date.today() - datetime.timedelta(days=7)
56 resp, groups = self.server.newgroups(dt)
57 if len(groups) > 0:
58 self.assertIsInstance(groups[0], GroupInfo)
59 self.assertIsInstance(groups[0].group, str)
60
61 def test_description(self):
62 def _check_desc(desc):
63 # Sanity checks
64 self.assertIsInstance(desc, str)
65 self.assertNotIn(self.GROUP_NAME, desc)
66 desc = self.server.description(self.GROUP_NAME)
67 _check_desc(desc)
68 # Another sanity check
69 self.assertIn("Python", desc)
70 # With a pattern
71 desc = self.server.description(self.GROUP_PAT)
72 _check_desc(desc)
73 # Shouldn't exist
74 desc = self.server.description("zk.brrtt.baz")
75 self.assertEqual(desc, '')
76
77 def test_descriptions(self):
78 resp, descs = self.server.descriptions(self.GROUP_PAT)
79 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
80 self.assertTrue(
81 resp.startswith("215 ") or resp.startswith("282 "), resp)
82 self.assertIsInstance(descs, dict)
83 desc = descs[self.GROUP_NAME]
84 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
85
86 def test_group(self):
87 result = self.server.group(self.GROUP_NAME)
88 self.assertEqual(5, len(result))
89 resp, count, first, last, group = result
90 self.assertEqual(group, self.GROUP_NAME)
91 self.assertIsInstance(count, int)
92 self.assertIsInstance(first, int)
93 self.assertIsInstance(last, int)
94 self.assertLessEqual(first, last)
95 self.assertTrue(resp.startswith("211 "), resp)
96
97 def test_date(self):
98 resp, date = self.server.date()
99 self.assertIsInstance(date, datetime.datetime)
100 # Sanity check
101 self.assertGreaterEqual(date.year, 1995)
102 self.assertLessEqual(date.year, 2030)
103
104 def _check_art_dict(self, art_dict):
105 # Some sanity checks for a field dictionary returned by OVER / XOVER
106 self.assertIsInstance(art_dict, dict)
107 # NNTP has 7 mandatory fields
108 self.assertGreaterEqual(art_dict.keys(),
109 {"subject", "from", "date", "message-id",
110 "references", ":bytes", ":lines"}
111 )
112 for v in art_dict.values():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000113 self.assertIsInstance(v, (str, type(None)))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000114
115 def test_xover(self):
116 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000117 resp, lines = self.server.xover(last - 5, last)
118 if len(lines) == 0:
119 self.skipTest("no articles retrieved")
120 # The 'last' article is not necessarily part of the output (cancelled?)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000121 art_num, art_dict = lines[0]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000122 self.assertGreaterEqual(art_num, last - 5)
123 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000124 self._check_art_dict(art_dict)
125
126 def test_over(self):
127 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
128 start = last - 10
129 # The "start-" article range form
130 resp, lines = self.server.over((start, None))
131 art_num, art_dict = lines[0]
132 self._check_art_dict(art_dict)
133 # The "start-end" article range form
134 resp, lines = self.server.over((start, last))
135 art_num, art_dict = lines[-1]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000136 # The 'last' article is not necessarily part of the output (cancelled?)
137 self.assertGreaterEqual(art_num, start)
138 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000139 self._check_art_dict(art_dict)
140 # XXX The "message_id" form is unsupported by gmane
141 # 503 Overview by message-ID unsupported
142
143 def test_xhdr(self):
144 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
145 resp, lines = self.server.xhdr('subject', last)
146 for line in lines:
147 self.assertEqual(str, type(line[1]))
148
149 def check_article_resp(self, resp, article, art_num=None):
150 self.assertIsInstance(article, nntplib.ArticleInfo)
151 if art_num is not None:
152 self.assertEqual(article.number, art_num)
153 for line in article.lines:
154 self.assertIsInstance(line, bytes)
155 # XXX this could exceptionally happen...
156 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
157
158 def test_article_head_body(self):
159 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000160 # Try to find an available article
161 for art_num in (last, first, last - 1):
162 try:
163 resp, head = self.server.head(art_num)
164 except nntplib.NNTPTemporaryError as e:
165 if not e.response.startswith("423 "):
166 raise
167 # "423 No such article" => choose another one
168 continue
169 break
170 else:
171 self.skipTest("could not find a suitable article number")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000172 self.assertTrue(resp.startswith("221 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000173 self.check_article_resp(resp, head, art_num)
174 resp, body = self.server.body(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000175 self.assertTrue(resp.startswith("222 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000176 self.check_article_resp(resp, body, art_num)
177 resp, article = self.server.article(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000178 self.assertTrue(resp.startswith("220 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000179 self.check_article_resp(resp, article, art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000180 self.assertEqual(article.lines, head.lines + [b''] + body.lines)
181
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000182 def test_capabilities(self):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000183 # The server under test implements NNTP version 2 and has a
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000184 # couple of well-known capabilities. Just sanity check that we
185 # got them.
186 def _check_caps(caps):
187 caps_list = caps['LIST']
188 self.assertIsInstance(caps_list, (list, tuple))
189 self.assertIn('OVERVIEW.FMT', caps_list)
190 self.assertGreaterEqual(self.server.nntp_version, 2)
191 _check_caps(self.server.getcapabilities())
192 # This re-emits the command
193 resp, caps = self.server.capabilities()
194 _check_caps(caps)
195
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000196 if _have_ssl:
197 def test_starttls(self):
198 file = self.server.file
199 sock = self.server.sock
200 try:
201 self.server.starttls()
202 except nntplib.NNTPPermanentError:
203 self.skipTest("STARTTLS not supported by server.")
204 else:
205 # Check that the socket and internal pseudo-file really were
206 # changed.
207 self.assertNotEqual(file, self.server.file)
208 self.assertNotEqual(sock, self.server.sock)
209 # Check that the new socket really is an SSL one
210 self.assertIsInstance(self.server.sock, ssl.SSLSocket)
211 # Check that trying starttls when it's already active fails.
212 self.assertRaises(ValueError, self.server.starttls)
213
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000214 def test_zlogin(self):
215 # This test must be the penultimate because further commands will be
216 # refused.
217 baduser = "notarealuser"
218 badpw = "notarealpassword"
219 # Check that bogus credentials cause failure
220 self.assertRaises(nntplib.NNTPError, self.server.login,
221 user=baduser, password=badpw, usenetrc=False)
222 # FIXME: We should check that correct credentials succeed, but that
223 # would require valid details for some server somewhere to be in the
224 # test suite, I think. Gmane is anonymous, at least as used for the
225 # other tests.
226
227 def test_zzquit(self):
228 # This test must be called last, hence the name
229 cls = type(self)
Antoine Pitrou3bce11c2010-11-21 17:14:19 +0000230 try:
231 self.server.quit()
232 finally:
233 cls.server = None
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000234
Antoine Pitroude609182010-11-18 17:29:23 +0000235 @classmethod
236 def wrap_methods(cls):
237 # Wrap all methods in a transient_internet() exception catcher
238 # XXX put a generic version in test.support?
239 def wrap_meth(meth):
240 @functools.wraps(meth)
241 def wrapped(self):
242 with support.transient_internet(self.NNTP_HOST):
243 meth(self)
244 return wrapped
245 for name in dir(cls):
246 if not name.startswith('test_'):
247 continue
248 meth = getattr(cls, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200249 if not callable(meth):
Antoine Pitroude609182010-11-18 17:29:23 +0000250 continue
251 # Need to use a closure so that meth remains bound to its current
252 # value
253 setattr(cls, name, wrap_meth(meth))
254
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000255 def test_with_statement(self):
256 def is_connected():
257 if not hasattr(server, 'file'):
258 return False
259 try:
260 server.help()
261 except (socket.error, EOFError):
262 return False
263 return True
264
265 with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
266 self.assertTrue(is_connected())
267 self.assertTrue(server.help())
268 self.assertFalse(is_connected())
269
270 with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
271 server.quit()
272 self.assertFalse(is_connected())
273
274
Antoine Pitroude609182010-11-18 17:29:23 +0000275NetworkedNNTPTestsMixin.wrap_methods()
276
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000277
278class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
279 # This server supports STARTTLS (gmane doesn't)
280 NNTP_HOST = 'news.trigofacile.com'
281 GROUP_NAME = 'fr.comp.lang.python'
282 GROUP_PAT = 'fr.comp.lang.*'
283
Antoine Pitroude609182010-11-18 17:29:23 +0000284 NNTP_CLASS = NNTP
285
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000286 @classmethod
287 def setUpClass(cls):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000288 support.requires("network")
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000289 with support.transient_internet(cls.NNTP_HOST):
Antoine Pitroude609182010-11-18 17:29:23 +0000290 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, timeout=TIMEOUT, usenetrc=False)
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000291
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000292 @classmethod
293 def tearDownClass(cls):
294 if cls.server is not None:
295 cls.server.quit()
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000296
297
298if _have_ssl:
Antoine Pitroude609182010-11-18 17:29:23 +0000299 class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000300
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000301 # Technical limits for this public NNTP server (see http://www.aioe.org):
302 # "Only two concurrent connections per IP address are allowed and
303 # 400 connections per day are accepted from each IP address."
304
305 NNTP_HOST = 'nntp.aioe.org'
306 GROUP_NAME = 'comp.lang.python'
307 GROUP_PAT = 'comp.lang.*'
308
Antoine Pitroude609182010-11-18 17:29:23 +0000309 NNTP_CLASS = nntplib.NNTP_SSL
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000310
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000311 # Disabled as it produces too much data
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000312 test_list = None
313
314 # Disabled as the connection will already be encrypted.
315 test_starttls = None
316
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000317
318#
319# Non-networked tests using a local server (or something mocking it).
320#
321
322class _NNTPServerIO(io.RawIOBase):
323 """A raw IO object allowing NNTP commands to be received and processed
324 by a handler. The handler can push responses which can then be read
325 from the IO object."""
326
327 def __init__(self, handler):
328 io.RawIOBase.__init__(self)
329 # The channel from the client
330 self.c2s = io.BytesIO()
331 # The channel to the client
332 self.s2c = io.BytesIO()
333 self.handler = handler
334 self.handler.start(self.c2s.readline, self.push_data)
335
336 def readable(self):
337 return True
338
339 def writable(self):
340 return True
341
342 def push_data(self, data):
343 """Push (buffer) some data to send to the client."""
344 pos = self.s2c.tell()
345 self.s2c.seek(0, 2)
346 self.s2c.write(data)
347 self.s2c.seek(pos)
348
349 def write(self, b):
350 """The client sends us some data"""
351 pos = self.c2s.tell()
352 self.c2s.write(b)
353 self.c2s.seek(pos)
354 self.handler.process_pending()
355 return len(b)
356
357 def readinto(self, buf):
358 """The client wants to read a response"""
359 self.handler.process_pending()
360 b = self.s2c.read(len(buf))
361 n = len(b)
362 buf[:n] = b
363 return n
364
365
366class MockedNNTPTestsMixin:
367 # Override in derived classes
368 handler_class = None
369
370 def setUp(self):
371 super().setUp()
372 self.make_server()
373
374 def tearDown(self):
375 super().tearDown()
376 del self.server
377
378 def make_server(self, *args, **kwargs):
379 self.handler = self.handler_class()
380 self.sio = _NNTPServerIO(self.handler)
381 # Using BufferedRWPair instead of BufferedRandom ensures the file
382 # isn't seekable.
383 file = io.BufferedRWPair(self.sio, self.sio)
Antoine Pitroua5785b12010-09-29 16:19:50 +0000384 self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000385 return self.server
386
387
388class NNTPv1Handler:
389 """A handler for RFC 977"""
390
391 welcome = "200 NNTP mock server"
392
393 def start(self, readline, push_data):
394 self.in_body = False
395 self.allow_posting = True
396 self._readline = readline
397 self._push_data = push_data
398 # Our welcome
399 self.handle_welcome()
400
401 def _decode(self, data):
402 return str(data, "utf-8", "surrogateescape")
403
404 def process_pending(self):
405 if self.in_body:
406 while True:
407 line = self._readline()
408 if not line:
409 return
410 self.body.append(line)
411 if line == b".\r\n":
412 break
413 try:
414 meth, tokens = self.body_callback
415 meth(*tokens, body=self.body)
416 finally:
417 self.body_callback = None
418 self.body = None
419 self.in_body = False
420 while True:
421 line = self._decode(self._readline())
422 if not line:
423 return
424 if not line.endswith("\r\n"):
425 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
426 line = line[:-2]
427 cmd, *tokens = line.split()
428 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
429 meth = getattr(self, "handle_" + cmd.upper(), None)
430 if meth is None:
431 self.handle_unknown()
432 else:
433 try:
434 meth(*tokens)
435 except Exception as e:
436 raise ValueError("command failed: {!r}".format(line)) from e
437 else:
438 if self.in_body:
439 self.body_callback = meth, tokens
440 self.body = []
441
442 def expect_body(self):
443 """Flag that the client is expected to post a request body"""
444 self.in_body = True
445
446 def push_data(self, data):
447 """Push some binary data"""
448 self._push_data(data)
449
450 def push_lit(self, lit):
451 """Push a string literal"""
452 lit = textwrap.dedent(lit)
453 lit = "\r\n".join(lit.splitlines()) + "\r\n"
454 lit = lit.encode('utf-8')
455 self.push_data(lit)
456
457 def handle_unknown(self):
458 self.push_lit("500 What?")
459
460 def handle_welcome(self):
461 self.push_lit(self.welcome)
462
463 def handle_QUIT(self):
464 self.push_lit("205 Bye!")
465
466 def handle_DATE(self):
467 self.push_lit("111 20100914001155")
468
469 def handle_GROUP(self, group):
470 if group == "fr.comp.lang.python":
471 self.push_lit("211 486 761 1265 fr.comp.lang.python")
472 else:
473 self.push_lit("411 No such group {}".format(group))
474
475 def handle_HELP(self):
476 self.push_lit("""\
477 100 Legal commands
478 authinfo user Name|pass Password|generic <prog> <args>
479 date
480 help
481 Report problems to <root@example.org>
482 .""")
483
484 def handle_STAT(self, message_spec=None):
485 if message_spec is None:
486 self.push_lit("412 No newsgroup selected")
487 elif message_spec == "3000234":
488 self.push_lit("223 3000234 <45223423@example.com>")
489 elif message_spec == "<45223423@example.com>":
490 self.push_lit("223 0 <45223423@example.com>")
491 else:
492 self.push_lit("430 No Such Article Found")
493
494 def handle_NEXT(self):
495 self.push_lit("223 3000237 <668929@example.org> retrieved")
496
497 def handle_LAST(self):
498 self.push_lit("223 3000234 <45223423@example.com> retrieved")
499
500 def handle_LIST(self, action=None, param=None):
501 if action is None:
502 self.push_lit("""\
503 215 Newsgroups in form "group high low flags".
504 comp.lang.python 0000052340 0000002828 y
505 comp.lang.python.announce 0000001153 0000000993 m
506 free.it.comp.lang.python 0000000002 0000000002 y
507 fr.comp.lang.python 0000001254 0000000760 y
508 free.it.comp.lang.python.learner 0000000000 0000000001 y
509 tw.bbs.comp.lang.python 0000000304 0000000304 y
510 .""")
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000511 elif action == "ACTIVE":
512 if param == "*distutils*":
513 self.push_lit("""\
514 215 Newsgroups in form "group high low flags"
515 gmane.comp.python.distutils.devel 0000014104 0000000001 m
516 gmane.comp.python.distutils.cvs 0000000000 0000000001 m
517 .""")
518 else:
519 self.push_lit("""\
520 215 Newsgroups in form "group high low flags"
521 .""")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000522 elif action == "OVERVIEW.FMT":
523 self.push_lit("""\
524 215 Order of fields in overview database.
525 Subject:
526 From:
527 Date:
528 Message-ID:
529 References:
530 Bytes:
531 Lines:
532 Xref:full
533 .""")
534 elif action == "NEWSGROUPS":
535 assert param is not None
536 if param == "comp.lang.python":
537 self.push_lit("""\
538 215 Descriptions in form "group description".
539 comp.lang.python\tThe Python computer language.
540 .""")
541 elif param == "comp.lang.python*":
542 self.push_lit("""\
543 215 Descriptions in form "group description".
544 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
545 comp.lang.python\tThe Python computer language.
546 .""")
547 else:
548 self.push_lit("""\
549 215 Descriptions in form "group description".
550 .""")
551 else:
552 self.push_lit('501 Unknown LIST keyword')
553
554 def handle_NEWNEWS(self, group, date_str, time_str):
555 # We hard code different return messages depending on passed
556 # argument and date syntax.
557 if (group == "comp.lang.python" and date_str == "20100913"
558 and time_str == "082004"):
559 # Date was passed in RFC 3977 format (NNTP "v2")
560 self.push_lit("""\
561 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
562 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
563 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
564 .""")
565 elif (group == "comp.lang.python" and date_str == "100913"
566 and time_str == "082004"):
567 # Date was passed in RFC 977 format (NNTP "v1")
568 self.push_lit("""\
569 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
570 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
571 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
572 .""")
573 else:
574 self.push_lit("""\
575 230 An empty list of newsarticles follows
576 .""")
577 # (Note for experiments: many servers disable NEWNEWS.
578 # As of this writing, sicinfo3.epfl.ch doesn't.)
579
580 def handle_XOVER(self, message_spec):
581 if message_spec == "57-59":
582 self.push_lit(
583 "224 Overview information for 57-58 follows\n"
584 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
585 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
586 "\tSat, 19 Jun 2010 18:04:08 -0400"
587 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
588 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
589 "\tXref: news.gmane.org gmane.comp.python.authors:57"
590 "\n"
591 "58\tLooking for a few good bloggers"
592 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
593 "\tThu, 22 Jul 2010 09:14:14 -0400"
594 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
595 "\t\t6683\t16"
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000596 "\t"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000597 "\n"
598 # An UTF-8 overview line from fr.comp.lang.python
599 "59\tRe: Message d'erreur incompréhensible (par moi)"
600 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
601 "\tWed, 15 Sep 2010 18:09:15 +0200"
602 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
603 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
604 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
605 "\n"
606 ".\n")
607 else:
608 self.push_lit("""\
609 224 No articles
610 .""")
611
612 def handle_POST(self, *, body=None):
613 if body is None:
614 if self.allow_posting:
615 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
616 self.expect_body()
617 else:
618 self.push_lit("440 Posting not permitted")
619 else:
620 assert self.allow_posting
621 self.push_lit("240 Article received OK")
622 self.posted_body = body
623
624 def handle_IHAVE(self, message_id, *, body=None):
625 if body is None:
626 if (self.allow_posting and
627 message_id == "<i.am.an.article.you.will.want@example.com>"):
628 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
629 self.expect_body()
630 else:
631 self.push_lit("435 Article not wanted")
632 else:
633 assert self.allow_posting
634 self.push_lit("235 Article transferred OK")
635 self.posted_body = body
636
637 sample_head = """\
638 From: "Demo User" <nobody@example.net>
639 Subject: I am just a test article
640 Content-Type: text/plain; charset=UTF-8; format=flowed
641 Message-ID: <i.am.an.article.you.will.want@example.com>"""
642
643 sample_body = """\
644 This is just a test article.
645 ..Here is a dot-starting line.
646
647 -- Signed by Andr\xe9."""
648
649 sample_article = sample_head + "\n\n" + sample_body
650
651 def handle_ARTICLE(self, message_spec=None):
652 if message_spec is None:
653 self.push_lit("220 3000237 <45223423@example.com>")
654 elif message_spec == "<45223423@example.com>":
655 self.push_lit("220 0 <45223423@example.com>")
656 elif message_spec == "3000234":
657 self.push_lit("220 3000234 <45223423@example.com>")
658 else:
659 self.push_lit("430 No Such Article Found")
660 return
661 self.push_lit(self.sample_article)
662 self.push_lit(".")
663
664 def handle_HEAD(self, message_spec=None):
665 if message_spec is None:
666 self.push_lit("221 3000237 <45223423@example.com>")
667 elif message_spec == "<45223423@example.com>":
668 self.push_lit("221 0 <45223423@example.com>")
669 elif message_spec == "3000234":
670 self.push_lit("221 3000234 <45223423@example.com>")
671 else:
672 self.push_lit("430 No Such Article Found")
673 return
674 self.push_lit(self.sample_head)
675 self.push_lit(".")
676
677 def handle_BODY(self, message_spec=None):
678 if message_spec is None:
679 self.push_lit("222 3000237 <45223423@example.com>")
680 elif message_spec == "<45223423@example.com>":
681 self.push_lit("222 0 <45223423@example.com>")
682 elif message_spec == "3000234":
683 self.push_lit("222 3000234 <45223423@example.com>")
684 else:
685 self.push_lit("430 No Such Article Found")
686 return
687 self.push_lit(self.sample_body)
688 self.push_lit(".")
689
690
691class NNTPv2Handler(NNTPv1Handler):
692 """A handler for RFC 3977 (NNTP "v2")"""
693
694 def handle_CAPABILITIES(self):
695 self.push_lit("""\
696 101 Capability list:
Antoine Pitrouf80b3f72010-11-02 22:31:52 +0000697 VERSION 2 3
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000698 IMPLEMENTATION INN 2.5.1
699 AUTHINFO USER
700 HDR
701 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
702 OVER
703 POST
704 READER
705 .""")
706
707 def handle_OVER(self, message_spec=None):
708 return self.handle_XOVER(message_spec)
709
710
711class NNTPv1v2TestsMixin:
712
713 def setUp(self):
714 super().setUp()
715
716 def test_welcome(self):
717 self.assertEqual(self.server.welcome, self.handler.welcome)
718
719 def test_date(self):
720 resp, date = self.server.date()
721 self.assertEqual(resp, "111 20100914001155")
722 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
723
724 def test_quit(self):
725 self.assertFalse(self.sio.closed)
726 resp = self.server.quit()
727 self.assertEqual(resp, "205 Bye!")
728 self.assertTrue(self.sio.closed)
729
730 def test_help(self):
731 resp, help = self.server.help()
732 self.assertEqual(resp, "100 Legal commands")
733 self.assertEqual(help, [
734 ' authinfo user Name|pass Password|generic <prog> <args>',
735 ' date',
736 ' help',
737 'Report problems to <root@example.org>',
738 ])
739
740 def test_list(self):
741 resp, groups = self.server.list()
742 self.assertEqual(len(groups), 6)
743 g = groups[1]
744 self.assertEqual(g,
745 GroupInfo("comp.lang.python.announce", "0000001153",
746 "0000000993", "m"))
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000747 resp, groups = self.server.list("*distutils*")
748 self.assertEqual(len(groups), 2)
749 g = groups[0]
750 self.assertEqual(g,
751 GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
752 "0000000001", "m"))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000753
754 def test_stat(self):
755 resp, art_num, message_id = self.server.stat(3000234)
756 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
757 self.assertEqual(art_num, 3000234)
758 self.assertEqual(message_id, "<45223423@example.com>")
759 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
760 self.assertEqual(resp, "223 0 <45223423@example.com>")
761 self.assertEqual(art_num, 0)
762 self.assertEqual(message_id, "<45223423@example.com>")
763 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
764 self.server.stat("<non.existent.id>")
765 self.assertEqual(cm.exception.response, "430 No Such Article Found")
766 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
767 self.server.stat()
768 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
769
770 def test_next(self):
771 resp, art_num, message_id = self.server.next()
772 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
773 self.assertEqual(art_num, 3000237)
774 self.assertEqual(message_id, "<668929@example.org>")
775
776 def test_last(self):
777 resp, art_num, message_id = self.server.last()
778 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
779 self.assertEqual(art_num, 3000234)
780 self.assertEqual(message_id, "<45223423@example.com>")
781
782 def test_description(self):
783 desc = self.server.description("comp.lang.python")
784 self.assertEqual(desc, "The Python computer language.")
785 desc = self.server.description("comp.lang.pythonx")
786 self.assertEqual(desc, "")
787
788 def test_descriptions(self):
789 resp, groups = self.server.descriptions("comp.lang.python")
790 self.assertEqual(resp, '215 Descriptions in form "group description".')
791 self.assertEqual(groups, {
792 "comp.lang.python": "The Python computer language.",
793 })
794 resp, groups = self.server.descriptions("comp.lang.python*")
795 self.assertEqual(groups, {
796 "comp.lang.python": "The Python computer language.",
797 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
798 })
799 resp, groups = self.server.descriptions("comp.lang.pythonx")
800 self.assertEqual(groups, {})
801
802 def test_group(self):
803 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
804 self.assertTrue(resp.startswith("211 "), resp)
805 self.assertEqual(first, 761)
806 self.assertEqual(last, 1265)
807 self.assertEqual(count, 486)
808 self.assertEqual(group, "fr.comp.lang.python")
809 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
810 self.server.group("comp.lang.python.devel")
811 exc = cm.exception
812 self.assertTrue(exc.response.startswith("411 No such group"),
813 exc.response)
814
815 def test_newnews(self):
816 # NEWNEWS comp.lang.python [20]100913 082004
817 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
818 resp, ids = self.server.newnews("comp.lang.python", dt)
819 expected = (
820 "230 list of newsarticles (NNTP v{0}) "
821 "created after Mon Sep 13 08:20:04 2010 follows"
822 ).format(self.nntp_version)
823 self.assertEqual(resp, expected)
824 self.assertEqual(ids, [
825 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
826 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
827 ])
828 # NEWNEWS fr.comp.lang.python [20]100913 082004
829 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
830 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
831 self.assertEqual(resp, "230 An empty list of newsarticles follows")
832 self.assertEqual(ids, [])
833
834 def _check_article_body(self, lines):
835 self.assertEqual(len(lines), 4)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +0000836 self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000837 self.assertEqual(lines[-2], b"")
838 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
839 self.assertEqual(lines[-4], b"This is just a test article.")
840
841 def _check_article_head(self, lines):
842 self.assertEqual(len(lines), 4)
843 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
844 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
845
846 def _check_article_data(self, lines):
847 self.assertEqual(len(lines), 9)
848 self._check_article_head(lines[:4])
849 self._check_article_body(lines[-4:])
850 self.assertEqual(lines[4], b"")
851
852 def test_article(self):
853 # ARTICLE
854 resp, info = self.server.article()
855 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
856 art_num, message_id, lines = info
857 self.assertEqual(art_num, 3000237)
858 self.assertEqual(message_id, "<45223423@example.com>")
859 self._check_article_data(lines)
860 # ARTICLE num
861 resp, info = self.server.article(3000234)
862 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
863 art_num, message_id, lines = info
864 self.assertEqual(art_num, 3000234)
865 self.assertEqual(message_id, "<45223423@example.com>")
866 self._check_article_data(lines)
867 # ARTICLE id
868 resp, info = self.server.article("<45223423@example.com>")
869 self.assertEqual(resp, "220 0 <45223423@example.com>")
870 art_num, message_id, lines = info
871 self.assertEqual(art_num, 0)
872 self.assertEqual(message_id, "<45223423@example.com>")
873 self._check_article_data(lines)
874 # Non-existent id
875 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
876 self.server.article("<non-existent@example.com>")
877 self.assertEqual(cm.exception.response, "430 No Such Article Found")
878
879 def test_article_file(self):
880 # With a "file" argument
881 f = io.BytesIO()
882 resp, info = self.server.article(file=f)
883 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
884 art_num, message_id, lines = info
885 self.assertEqual(art_num, 3000237)
886 self.assertEqual(message_id, "<45223423@example.com>")
887 self.assertEqual(lines, [])
888 data = f.getvalue()
889 self.assertTrue(data.startswith(
890 b'From: "Demo User" <nobody@example.net>\r\n'
891 b'Subject: I am just a test article\r\n'
892 ), ascii(data))
893 self.assertTrue(data.endswith(
894 b'This is just a test article.\r\n'
895 b'.Here is a dot-starting line.\r\n'
896 b'\r\n'
897 b'-- Signed by Andr\xc3\xa9.\r\n'
898 ), ascii(data))
899
900 def test_head(self):
901 # HEAD
902 resp, info = self.server.head()
903 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
904 art_num, message_id, lines = info
905 self.assertEqual(art_num, 3000237)
906 self.assertEqual(message_id, "<45223423@example.com>")
907 self._check_article_head(lines)
908 # HEAD num
909 resp, info = self.server.head(3000234)
910 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
911 art_num, message_id, lines = info
912 self.assertEqual(art_num, 3000234)
913 self.assertEqual(message_id, "<45223423@example.com>")
914 self._check_article_head(lines)
915 # HEAD id
916 resp, info = self.server.head("<45223423@example.com>")
917 self.assertEqual(resp, "221 0 <45223423@example.com>")
918 art_num, message_id, lines = info
919 self.assertEqual(art_num, 0)
920 self.assertEqual(message_id, "<45223423@example.com>")
921 self._check_article_head(lines)
922 # Non-existent id
923 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
924 self.server.head("<non-existent@example.com>")
925 self.assertEqual(cm.exception.response, "430 No Such Article Found")
926
927 def test_body(self):
928 # BODY
929 resp, info = self.server.body()
930 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
931 art_num, message_id, lines = info
932 self.assertEqual(art_num, 3000237)
933 self.assertEqual(message_id, "<45223423@example.com>")
934 self._check_article_body(lines)
935 # BODY num
936 resp, info = self.server.body(3000234)
937 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
938 art_num, message_id, lines = info
939 self.assertEqual(art_num, 3000234)
940 self.assertEqual(message_id, "<45223423@example.com>")
941 self._check_article_body(lines)
942 # BODY id
943 resp, info = self.server.body("<45223423@example.com>")
944 self.assertEqual(resp, "222 0 <45223423@example.com>")
945 art_num, message_id, lines = info
946 self.assertEqual(art_num, 0)
947 self.assertEqual(message_id, "<45223423@example.com>")
948 self._check_article_body(lines)
949 # Non-existent id
950 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
951 self.server.body("<non-existent@example.com>")
952 self.assertEqual(cm.exception.response, "430 No Such Article Found")
953
954 def check_over_xover_resp(self, resp, overviews):
955 self.assertTrue(resp.startswith("224 "), resp)
956 self.assertEqual(len(overviews), 3)
957 art_num, over = overviews[0]
958 self.assertEqual(art_num, 57)
959 self.assertEqual(over, {
960 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
961 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
962 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
963 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
964 "references": "<hvalf7$ort$1@dough.gmane.org>",
965 ":bytes": "7103",
966 ":lines": "16",
967 "xref": "news.gmane.org gmane.comp.python.authors:57"
968 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000969 art_num, over = overviews[1]
970 self.assertEqual(over["xref"], None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000971 art_num, over = overviews[2]
972 self.assertEqual(over["subject"],
973 "Re: Message d'erreur incompréhensible (par moi)")
974
975 def test_xover(self):
976 resp, overviews = self.server.xover(57, 59)
977 self.check_over_xover_resp(resp, overviews)
978
979 def test_over(self):
980 # In NNTP "v1", this will fallback on XOVER
981 resp, overviews = self.server.over((57, 59))
982 self.check_over_xover_resp(resp, overviews)
983
984 sample_post = (
985 b'From: "Demo User" <nobody@example.net>\r\n'
986 b'Subject: I am just a test article\r\n'
987 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
988 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
989 b'\r\n'
990 b'This is just a test article.\r\n'
991 b'.Here is a dot-starting line.\r\n'
992 b'\r\n'
993 b'-- Signed by Andr\xc3\xa9.\r\n'
994 )
995
996 def _check_posted_body(self):
997 # Check the raw body as received by the server
998 lines = self.handler.posted_body
999 # One additional line for the "." terminator
1000 self.assertEqual(len(lines), 10)
1001 self.assertEqual(lines[-1], b'.\r\n')
1002 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
1003 self.assertEqual(lines[-3], b'\r\n')
1004 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
1005 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
1006
1007 def _check_post_ihave_sub(self, func, *args, file_factory):
1008 # First the prepared post with CRLF endings
1009 post = self.sample_post
1010 func_args = args + (file_factory(post),)
1011 self.handler.posted_body = None
1012 resp = func(*func_args)
1013 self._check_posted_body()
1014 # Then the same post with "normal" line endings - they should be
1015 # converted by NNTP.post and NNTP.ihave.
1016 post = self.sample_post.replace(b"\r\n", b"\n")
1017 func_args = args + (file_factory(post),)
1018 self.handler.posted_body = None
1019 resp = func(*func_args)
1020 self._check_posted_body()
1021 return resp
1022
1023 def check_post_ihave(self, func, success_resp, *args):
1024 # With a bytes object
1025 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1026 self.assertEqual(resp, success_resp)
1027 # With a bytearray object
1028 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1029 self.assertEqual(resp, success_resp)
1030 # With a file object
1031 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1032 self.assertEqual(resp, success_resp)
1033 # With an iterable of terminated lines
1034 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001035 return iter(b.splitlines(keepends=True))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001036 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1037 self.assertEqual(resp, success_resp)
1038 # With an iterable of non-terminated lines
1039 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001040 return iter(b.splitlines(keepends=False))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001041 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1042 self.assertEqual(resp, success_resp)
1043
1044 def test_post(self):
1045 self.check_post_ihave(self.server.post, "240 Article received OK")
1046 self.handler.allow_posting = False
1047 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1048 self.server.post(self.sample_post)
1049 self.assertEqual(cm.exception.response,
1050 "440 Posting not permitted")
1051
1052 def test_ihave(self):
1053 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1054 "<i.am.an.article.you.will.want@example.com>")
1055 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1056 self.server.ihave("<another.message.id>", self.sample_post)
1057 self.assertEqual(cm.exception.response,
1058 "435 Article not wanted")
1059
1060
1061class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1062 """Tests an NNTP v1 server (no capabilities)."""
1063
1064 nntp_version = 1
1065 handler_class = NNTPv1Handler
1066
1067 def test_caps(self):
1068 caps = self.server.getcapabilities()
1069 self.assertEqual(caps, {})
1070 self.assertEqual(self.server.nntp_version, 1)
Antoine Pitroua0781152010-11-05 19:16:37 +00001071 self.assertEqual(self.server.nntp_implementation, None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001072
1073
1074class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1075 """Tests an NNTP v2 server (with capabilities)."""
1076
1077 nntp_version = 2
1078 handler_class = NNTPv2Handler
1079
1080 def test_caps(self):
1081 caps = self.server.getcapabilities()
1082 self.assertEqual(caps, {
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001083 'VERSION': ['2', '3'],
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001084 'IMPLEMENTATION': ['INN', '2.5.1'],
1085 'AUTHINFO': ['USER'],
1086 'HDR': [],
1087 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1088 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1089 'OVER': [],
1090 'POST': [],
1091 'READER': [],
1092 })
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001093 self.assertEqual(self.server.nntp_version, 3)
Antoine Pitroua0781152010-11-05 19:16:37 +00001094 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001095
1096
1097class MiscTests(unittest.TestCase):
1098
1099 def test_decode_header(self):
1100 def gives(a, b):
1101 self.assertEqual(nntplib.decode_header(a), b)
1102 gives("" , "")
1103 gives("a plain header", "a plain header")
1104 gives(" with extra spaces ", " with extra spaces ")
1105 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
1106 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1107 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1108 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1109 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1110 "Re: problème de matrice")
1111 # A natively utf-8 header (found in the real world!)
1112 gives("Re: Message d'erreur incompréhensible (par moi)",
1113 "Re: Message d'erreur incompréhensible (par moi)")
1114
1115 def test_parse_overview_fmt(self):
1116 # The minimal (default) response
1117 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1118 "References:", ":bytes", ":lines"]
1119 self.assertEqual(nntplib._parse_overview_fmt(lines),
1120 ["subject", "from", "date", "message-id", "references",
1121 ":bytes", ":lines"])
1122 # The minimal response using alternative names
1123 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1124 "References:", "Bytes:", "Lines:"]
1125 self.assertEqual(nntplib._parse_overview_fmt(lines),
1126 ["subject", "from", "date", "message-id", "references",
1127 ":bytes", ":lines"])
1128 # Variations in casing
1129 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1130 "References:", "BYTES:", "Lines:"]
1131 self.assertEqual(nntplib._parse_overview_fmt(lines),
1132 ["subject", "from", "date", "message-id", "references",
1133 ":bytes", ":lines"])
1134 # First example from RFC 3977
1135 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1136 "References:", ":bytes", ":lines", "Xref:full",
1137 "Distribution:full"]
1138 self.assertEqual(nntplib._parse_overview_fmt(lines),
1139 ["subject", "from", "date", "message-id", "references",
1140 ":bytes", ":lines", "xref", "distribution"])
1141 # Second example from RFC 3977
1142 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1143 "References:", "Bytes:", "Lines:", "Xref:FULL",
1144 "Distribution:FULL"]
1145 self.assertEqual(nntplib._parse_overview_fmt(lines),
1146 ["subject", "from", "date", "message-id", "references",
1147 ":bytes", ":lines", "xref", "distribution"])
1148 # A classic response from INN
1149 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1150 "References:", "Bytes:", "Lines:", "Xref:full"]
1151 self.assertEqual(nntplib._parse_overview_fmt(lines),
1152 ["subject", "from", "date", "message-id", "references",
1153 ":bytes", ":lines", "xref"])
1154
1155 def test_parse_overview(self):
1156 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1157 # First example from RFC 3977
1158 lines = [
1159 '3000234\tI am just a test article\t"Demo User" '
1160 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1161 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1162 '17\tXref: news.example.com misc.test:3000363',
1163 ]
1164 overview = nntplib._parse_overview(lines, fmt)
1165 (art_num, fields), = overview
1166 self.assertEqual(art_num, 3000234)
1167 self.assertEqual(fields, {
1168 'subject': 'I am just a test article',
1169 'from': '"Demo User" <nobody@example.com>',
1170 'date': '6 Oct 1998 04:38:40 -0500',
1171 'message-id': '<45223423@example.com>',
1172 'references': '<45454@example.net>',
1173 ':bytes': '1234',
1174 ':lines': '17',
1175 'xref': 'news.example.com misc.test:3000363',
1176 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001177 # Second example; here the "Xref" field is totally absent (including
1178 # the header name) and comes out as None
1179 lines = [
1180 '3000234\tI am just a test article\t"Demo User" '
1181 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1182 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1183 '17\t\t',
1184 ]
1185 overview = nntplib._parse_overview(lines, fmt)
1186 (art_num, fields), = overview
1187 self.assertEqual(fields['xref'], None)
1188 # Third example; the "Xref" is an empty string, while "references"
1189 # is a single space.
1190 lines = [
1191 '3000234\tI am just a test article\t"Demo User" '
1192 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1193 '<45223423@example.com>\t \t1234\t'
1194 '17\tXref: \t',
1195 ]
1196 overview = nntplib._parse_overview(lines, fmt)
1197 (art_num, fields), = overview
1198 self.assertEqual(fields['references'], ' ')
1199 self.assertEqual(fields['xref'], '')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001200
1201 def test_parse_datetime(self):
1202 def gives(a, b, *c):
1203 self.assertEqual(nntplib._parse_datetime(a, b),
1204 datetime.datetime(*c))
1205 # Output of DATE command
1206 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1207 # Variations
1208 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1209 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1210 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1211
1212 def test_unparse_datetime(self):
1213 # Test non-legacy mode
1214 # 1) with a datetime
1215 def gives(y, M, d, h, m, s, date_str, time_str):
1216 dt = datetime.datetime(y, M, d, h, m, s)
1217 self.assertEqual(nntplib._unparse_datetime(dt),
1218 (date_str, time_str))
1219 self.assertEqual(nntplib._unparse_datetime(dt, False),
1220 (date_str, time_str))
1221 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1222 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1223 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1224 # 2) with a date
1225 def gives(y, M, d, date_str, time_str):
1226 dt = datetime.date(y, M, d)
1227 self.assertEqual(nntplib._unparse_datetime(dt),
1228 (date_str, time_str))
1229 self.assertEqual(nntplib._unparse_datetime(dt, False),
1230 (date_str, time_str))
1231 gives(1999, 6, 23, "19990623", "000000")
1232 gives(2000, 6, 23, "20000623", "000000")
1233 gives(2010, 6, 5, "20100605", "000000")
1234
1235 def test_unparse_datetime_legacy(self):
1236 # Test legacy mode (RFC 977)
1237 # 1) with a datetime
1238 def gives(y, M, d, h, m, s, date_str, time_str):
1239 dt = datetime.datetime(y, M, d, h, m, s)
1240 self.assertEqual(nntplib._unparse_datetime(dt, True),
1241 (date_str, time_str))
1242 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1243 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1244 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1245 # 2) with a date
1246 def gives(y, M, d, date_str, time_str):
1247 dt = datetime.date(y, M, d)
1248 self.assertEqual(nntplib._unparse_datetime(dt, True),
1249 (date_str, time_str))
1250 gives(1999, 6, 23, "990623", "000000")
1251 gives(2000, 6, 23, "000623", "000000")
1252 gives(2010, 6, 5, "100605", "000000")
1253
1254
1255def test_main():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00001256 tests = [MiscTests, NNTPv1Tests, NNTPv2Tests, NetworkedNNTPTests]
1257 if _have_ssl:
1258 tests.append(NetworkedNNTP_SSLTests)
1259 support.run_unittest(*tests)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001260
1261
1262if __name__ == "__main__":
1263 test_main()