blob: 13bb5059d59fb04ae003127034798e12cfbdb8cf [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
2import datetime
3import textwrap
4import unittest
Antoine Pitroude609182010-11-18 17:29:23 +00005import functools
Antoine Pitrou69ab9512010-09-29 15:03:40 +00006import contextlib
Antoine Pitroude609182010-11-18 17:29:23 +00007import collections
Antoine Pitrou69ab9512010-09-29 15:03:40 +00008from test import support
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00009from nntplib import NNTP, GroupInfo, _have_ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +000010import nntplib
Antoine Pitrou1cb121e2010-11-09 18:54:37 +000011if _have_ssl:
12 import ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +000013
14TIMEOUT = 30
15
16# TODO:
17# - test the `file` arg to more commands
18# - test error conditions
Antoine Pitroua5785b12010-09-29 16:19:50 +000019# - test auth and `usenetrc`
Antoine Pitrou69ab9512010-09-29 15:03:40 +000020
21
22class NetworkedNNTPTestsMixin:
23
24 def test_welcome(self):
25 welcome = self.server.getwelcome()
26 self.assertEqual(str, type(welcome))
27
28 def test_help(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000029 resp, lines = self.server.help()
Antoine Pitrou69ab9512010-09-29 15:03:40 +000030 self.assertTrue(resp.startswith("100 "), resp)
Antoine Pitrou08eeada2010-11-04 21:36:15 +000031 for line in lines:
Antoine Pitrou69ab9512010-09-29 15:03:40 +000032 self.assertEqual(str, type(line))
33
34 def test_list(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000035 resp, groups = self.server.list()
36 if len(groups) > 0:
37 self.assertEqual(GroupInfo, type(groups[0]))
38 self.assertEqual(str, type(groups[0].group))
39
40 def test_list_active(self):
41 resp, groups = self.server.list(self.GROUP_PAT)
42 if len(groups) > 0:
43 self.assertEqual(GroupInfo, type(groups[0]))
44 self.assertEqual(str, type(groups[0].group))
Antoine Pitrou69ab9512010-09-29 15:03:40 +000045
46 def test_unknown_command(self):
47 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
48 self.server._shortcmd("XYZZY")
49 resp = cm.exception.response
50 self.assertTrue(resp.startswith("500 "), resp)
51
52 def test_newgroups(self):
53 # gmane gets a constant influx of new groups. In order not to stress
54 # the server too much, we choose a recent date in the past.
55 dt = datetime.date.today() - datetime.timedelta(days=7)
56 resp, groups = self.server.newgroups(dt)
57 if len(groups) > 0:
58 self.assertIsInstance(groups[0], GroupInfo)
59 self.assertIsInstance(groups[0].group, str)
60
61 def test_description(self):
62 def _check_desc(desc):
63 # Sanity checks
64 self.assertIsInstance(desc, str)
65 self.assertNotIn(self.GROUP_NAME, desc)
66 desc = self.server.description(self.GROUP_NAME)
67 _check_desc(desc)
68 # Another sanity check
69 self.assertIn("Python", desc)
70 # With a pattern
71 desc = self.server.description(self.GROUP_PAT)
72 _check_desc(desc)
73 # Shouldn't exist
74 desc = self.server.description("zk.brrtt.baz")
75 self.assertEqual(desc, '')
76
77 def test_descriptions(self):
78 resp, descs = self.server.descriptions(self.GROUP_PAT)
79 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
80 self.assertTrue(
81 resp.startswith("215 ") or resp.startswith("282 "), resp)
82 self.assertIsInstance(descs, dict)
83 desc = descs[self.GROUP_NAME]
84 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
85
86 def test_group(self):
87 result = self.server.group(self.GROUP_NAME)
88 self.assertEqual(5, len(result))
89 resp, count, first, last, group = result
90 self.assertEqual(group, self.GROUP_NAME)
91 self.assertIsInstance(count, int)
92 self.assertIsInstance(first, int)
93 self.assertIsInstance(last, int)
94 self.assertLessEqual(first, last)
95 self.assertTrue(resp.startswith("211 "), resp)
96
97 def test_date(self):
98 resp, date = self.server.date()
99 self.assertIsInstance(date, datetime.datetime)
100 # Sanity check
101 self.assertGreaterEqual(date.year, 1995)
102 self.assertLessEqual(date.year, 2030)
103
104 def _check_art_dict(self, art_dict):
105 # Some sanity checks for a field dictionary returned by OVER / XOVER
106 self.assertIsInstance(art_dict, dict)
107 # NNTP has 7 mandatory fields
108 self.assertGreaterEqual(art_dict.keys(),
109 {"subject", "from", "date", "message-id",
110 "references", ":bytes", ":lines"}
111 )
112 for v in art_dict.values():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000113 self.assertIsInstance(v, (str, type(None)))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000114
115 def test_xover(self):
116 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000117 resp, lines = self.server.xover(last - 5, last)
118 if len(lines) == 0:
119 self.skipTest("no articles retrieved")
120 # The 'last' article is not necessarily part of the output (cancelled?)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000121 art_num, art_dict = lines[0]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000122 self.assertGreaterEqual(art_num, last - 5)
123 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000124 self._check_art_dict(art_dict)
125
126 def test_over(self):
127 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
128 start = last - 10
129 # The "start-" article range form
130 resp, lines = self.server.over((start, None))
131 art_num, art_dict = lines[0]
132 self._check_art_dict(art_dict)
133 # The "start-end" article range form
134 resp, lines = self.server.over((start, last))
135 art_num, art_dict = lines[-1]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000136 # The 'last' article is not necessarily part of the output (cancelled?)
137 self.assertGreaterEqual(art_num, start)
138 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000139 self._check_art_dict(art_dict)
140 # XXX The "message_id" form is unsupported by gmane
141 # 503 Overview by message-ID unsupported
142
143 def test_xhdr(self):
144 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
145 resp, lines = self.server.xhdr('subject', last)
146 for line in lines:
147 self.assertEqual(str, type(line[1]))
148
149 def check_article_resp(self, resp, article, art_num=None):
150 self.assertIsInstance(article, nntplib.ArticleInfo)
151 if art_num is not None:
152 self.assertEqual(article.number, art_num)
153 for line in article.lines:
154 self.assertIsInstance(line, bytes)
155 # XXX this could exceptionally happen...
156 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
157
158 def test_article_head_body(self):
159 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000160 # Try to find an available article
161 for art_num in (last, first, last - 1):
162 try:
163 resp, head = self.server.head(art_num)
164 except nntplib.NNTPTemporaryError as e:
165 if not e.response.startswith("423 "):
166 raise
167 # "423 No such article" => choose another one
168 continue
169 break
170 else:
171 self.skipTest("could not find a suitable article number")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000172 self.assertTrue(resp.startswith("221 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000173 self.check_article_resp(resp, head, art_num)
174 resp, body = self.server.body(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000175 self.assertTrue(resp.startswith("222 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000176 self.check_article_resp(resp, body, art_num)
177 resp, article = self.server.article(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000178 self.assertTrue(resp.startswith("220 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000179 self.check_article_resp(resp, article, art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000180 self.assertEqual(article.lines, head.lines + [b''] + body.lines)
181
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000182 def test_capabilities(self):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000183 # The server under test implements NNTP version 2 and has a
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000184 # couple of well-known capabilities. Just sanity check that we
185 # got them.
186 def _check_caps(caps):
187 caps_list = caps['LIST']
188 self.assertIsInstance(caps_list, (list, tuple))
189 self.assertIn('OVERVIEW.FMT', caps_list)
190 self.assertGreaterEqual(self.server.nntp_version, 2)
191 _check_caps(self.server.getcapabilities())
192 # This re-emits the command
193 resp, caps = self.server.capabilities()
194 _check_caps(caps)
195
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000196 if _have_ssl:
197 def test_starttls(self):
198 file = self.server.file
199 sock = self.server.sock
200 try:
201 self.server.starttls()
202 except nntplib.NNTPPermanentError:
203 self.skipTest("STARTTLS not supported by server.")
204 else:
205 # Check that the socket and internal pseudo-file really were
206 # changed.
207 self.assertNotEqual(file, self.server.file)
208 self.assertNotEqual(sock, self.server.sock)
209 # Check that the new socket really is an SSL one
210 self.assertIsInstance(self.server.sock, ssl.SSLSocket)
211 # Check that trying starttls when it's already active fails.
212 self.assertRaises(ValueError, self.server.starttls)
213
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000214 def test_zlogin(self):
215 # This test must be the penultimate because further commands will be
216 # refused.
217 baduser = "notarealuser"
218 badpw = "notarealpassword"
219 # Check that bogus credentials cause failure
220 self.assertRaises(nntplib.NNTPError, self.server.login,
221 user=baduser, password=badpw, usenetrc=False)
222 # FIXME: We should check that correct credentials succeed, but that
223 # would require valid details for some server somewhere to be in the
224 # test suite, I think. Gmane is anonymous, at least as used for the
225 # other tests.
226
227 def test_zzquit(self):
228 # This test must be called last, hence the name
229 cls = type(self)
230 self.server.quit()
231 cls.server = None
232
Antoine Pitroude609182010-11-18 17:29:23 +0000233 @classmethod
234 def wrap_methods(cls):
235 # Wrap all methods in a transient_internet() exception catcher
236 # XXX put a generic version in test.support?
237 def wrap_meth(meth):
238 @functools.wraps(meth)
239 def wrapped(self):
240 with support.transient_internet(self.NNTP_HOST):
241 meth(self)
242 return wrapped
243 for name in dir(cls):
244 if not name.startswith('test_'):
245 continue
246 meth = getattr(cls, name)
247 if not isinstance(meth, collections.Callable):
248 continue
249 # Need to use a closure so that meth remains bound to its current
250 # value
251 setattr(cls, name, wrap_meth(meth))
252
253NetworkedNNTPTestsMixin.wrap_methods()
254
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000255
256class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
257 # This server supports STARTTLS (gmane doesn't)
258 NNTP_HOST = 'news.trigofacile.com'
259 GROUP_NAME = 'fr.comp.lang.python'
260 GROUP_PAT = 'fr.comp.lang.*'
261
Antoine Pitroude609182010-11-18 17:29:23 +0000262 NNTP_CLASS = NNTP
263
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000264 @classmethod
265 def setUpClass(cls):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000266 support.requires("network")
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000267 with support.transient_internet(cls.NNTP_HOST):
Antoine Pitroude609182010-11-18 17:29:23 +0000268 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, timeout=TIMEOUT, usenetrc=False)
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000269
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000270 @classmethod
271 def tearDownClass(cls):
272 if cls.server is not None:
273 cls.server.quit()
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000274
275
276if _have_ssl:
Antoine Pitroude609182010-11-18 17:29:23 +0000277 class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000278
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000279 # Technical limits for this public NNTP server (see http://www.aioe.org):
280 # "Only two concurrent connections per IP address are allowed and
281 # 400 connections per day are accepted from each IP address."
282
283 NNTP_HOST = 'nntp.aioe.org'
284 GROUP_NAME = 'comp.lang.python'
285 GROUP_PAT = 'comp.lang.*'
286
Antoine Pitroude609182010-11-18 17:29:23 +0000287 NNTP_CLASS = nntplib.NNTP_SSL
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000288
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000289 # Disabled as it produces too much data
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000290 test_list = None
291
292 # Disabled as the connection will already be encrypted.
293 test_starttls = None
294
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000295
296#
297# Non-networked tests using a local server (or something mocking it).
298#
299
300class _NNTPServerIO(io.RawIOBase):
301 """A raw IO object allowing NNTP commands to be received and processed
302 by a handler. The handler can push responses which can then be read
303 from the IO object."""
304
305 def __init__(self, handler):
306 io.RawIOBase.__init__(self)
307 # The channel from the client
308 self.c2s = io.BytesIO()
309 # The channel to the client
310 self.s2c = io.BytesIO()
311 self.handler = handler
312 self.handler.start(self.c2s.readline, self.push_data)
313
314 def readable(self):
315 return True
316
317 def writable(self):
318 return True
319
320 def push_data(self, data):
321 """Push (buffer) some data to send to the client."""
322 pos = self.s2c.tell()
323 self.s2c.seek(0, 2)
324 self.s2c.write(data)
325 self.s2c.seek(pos)
326
327 def write(self, b):
328 """The client sends us some data"""
329 pos = self.c2s.tell()
330 self.c2s.write(b)
331 self.c2s.seek(pos)
332 self.handler.process_pending()
333 return len(b)
334
335 def readinto(self, buf):
336 """The client wants to read a response"""
337 self.handler.process_pending()
338 b = self.s2c.read(len(buf))
339 n = len(b)
340 buf[:n] = b
341 return n
342
343
344class MockedNNTPTestsMixin:
345 # Override in derived classes
346 handler_class = None
347
348 def setUp(self):
349 super().setUp()
350 self.make_server()
351
352 def tearDown(self):
353 super().tearDown()
354 del self.server
355
356 def make_server(self, *args, **kwargs):
357 self.handler = self.handler_class()
358 self.sio = _NNTPServerIO(self.handler)
359 # Using BufferedRWPair instead of BufferedRandom ensures the file
360 # isn't seekable.
361 file = io.BufferedRWPair(self.sio, self.sio)
Antoine Pitroua5785b12010-09-29 16:19:50 +0000362 self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000363 return self.server
364
365
366class NNTPv1Handler:
367 """A handler for RFC 977"""
368
369 welcome = "200 NNTP mock server"
370
371 def start(self, readline, push_data):
372 self.in_body = False
373 self.allow_posting = True
374 self._readline = readline
375 self._push_data = push_data
376 # Our welcome
377 self.handle_welcome()
378
379 def _decode(self, data):
380 return str(data, "utf-8", "surrogateescape")
381
382 def process_pending(self):
383 if self.in_body:
384 while True:
385 line = self._readline()
386 if not line:
387 return
388 self.body.append(line)
389 if line == b".\r\n":
390 break
391 try:
392 meth, tokens = self.body_callback
393 meth(*tokens, body=self.body)
394 finally:
395 self.body_callback = None
396 self.body = None
397 self.in_body = False
398 while True:
399 line = self._decode(self._readline())
400 if not line:
401 return
402 if not line.endswith("\r\n"):
403 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
404 line = line[:-2]
405 cmd, *tokens = line.split()
406 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
407 meth = getattr(self, "handle_" + cmd.upper(), None)
408 if meth is None:
409 self.handle_unknown()
410 else:
411 try:
412 meth(*tokens)
413 except Exception as e:
414 raise ValueError("command failed: {!r}".format(line)) from e
415 else:
416 if self.in_body:
417 self.body_callback = meth, tokens
418 self.body = []
419
420 def expect_body(self):
421 """Flag that the client is expected to post a request body"""
422 self.in_body = True
423
424 def push_data(self, data):
425 """Push some binary data"""
426 self._push_data(data)
427
428 def push_lit(self, lit):
429 """Push a string literal"""
430 lit = textwrap.dedent(lit)
431 lit = "\r\n".join(lit.splitlines()) + "\r\n"
432 lit = lit.encode('utf-8')
433 self.push_data(lit)
434
435 def handle_unknown(self):
436 self.push_lit("500 What?")
437
438 def handle_welcome(self):
439 self.push_lit(self.welcome)
440
441 def handle_QUIT(self):
442 self.push_lit("205 Bye!")
443
444 def handle_DATE(self):
445 self.push_lit("111 20100914001155")
446
447 def handle_GROUP(self, group):
448 if group == "fr.comp.lang.python":
449 self.push_lit("211 486 761 1265 fr.comp.lang.python")
450 else:
451 self.push_lit("411 No such group {}".format(group))
452
453 def handle_HELP(self):
454 self.push_lit("""\
455 100 Legal commands
456 authinfo user Name|pass Password|generic <prog> <args>
457 date
458 help
459 Report problems to <root@example.org>
460 .""")
461
462 def handle_STAT(self, message_spec=None):
463 if message_spec is None:
464 self.push_lit("412 No newsgroup selected")
465 elif message_spec == "3000234":
466 self.push_lit("223 3000234 <45223423@example.com>")
467 elif message_spec == "<45223423@example.com>":
468 self.push_lit("223 0 <45223423@example.com>")
469 else:
470 self.push_lit("430 No Such Article Found")
471
472 def handle_NEXT(self):
473 self.push_lit("223 3000237 <668929@example.org> retrieved")
474
475 def handle_LAST(self):
476 self.push_lit("223 3000234 <45223423@example.com> retrieved")
477
478 def handle_LIST(self, action=None, param=None):
479 if action is None:
480 self.push_lit("""\
481 215 Newsgroups in form "group high low flags".
482 comp.lang.python 0000052340 0000002828 y
483 comp.lang.python.announce 0000001153 0000000993 m
484 free.it.comp.lang.python 0000000002 0000000002 y
485 fr.comp.lang.python 0000001254 0000000760 y
486 free.it.comp.lang.python.learner 0000000000 0000000001 y
487 tw.bbs.comp.lang.python 0000000304 0000000304 y
488 .""")
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000489 elif action == "ACTIVE":
490 if param == "*distutils*":
491 self.push_lit("""\
492 215 Newsgroups in form "group high low flags"
493 gmane.comp.python.distutils.devel 0000014104 0000000001 m
494 gmane.comp.python.distutils.cvs 0000000000 0000000001 m
495 .""")
496 else:
497 self.push_lit("""\
498 215 Newsgroups in form "group high low flags"
499 .""")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000500 elif action == "OVERVIEW.FMT":
501 self.push_lit("""\
502 215 Order of fields in overview database.
503 Subject:
504 From:
505 Date:
506 Message-ID:
507 References:
508 Bytes:
509 Lines:
510 Xref:full
511 .""")
512 elif action == "NEWSGROUPS":
513 assert param is not None
514 if param == "comp.lang.python":
515 self.push_lit("""\
516 215 Descriptions in form "group description".
517 comp.lang.python\tThe Python computer language.
518 .""")
519 elif param == "comp.lang.python*":
520 self.push_lit("""\
521 215 Descriptions in form "group description".
522 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
523 comp.lang.python\tThe Python computer language.
524 .""")
525 else:
526 self.push_lit("""\
527 215 Descriptions in form "group description".
528 .""")
529 else:
530 self.push_lit('501 Unknown LIST keyword')
531
532 def handle_NEWNEWS(self, group, date_str, time_str):
533 # We hard code different return messages depending on passed
534 # argument and date syntax.
535 if (group == "comp.lang.python" and date_str == "20100913"
536 and time_str == "082004"):
537 # Date was passed in RFC 3977 format (NNTP "v2")
538 self.push_lit("""\
539 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
540 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
541 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
542 .""")
543 elif (group == "comp.lang.python" and date_str == "100913"
544 and time_str == "082004"):
545 # Date was passed in RFC 977 format (NNTP "v1")
546 self.push_lit("""\
547 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
548 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
549 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
550 .""")
551 else:
552 self.push_lit("""\
553 230 An empty list of newsarticles follows
554 .""")
555 # (Note for experiments: many servers disable NEWNEWS.
556 # As of this writing, sicinfo3.epfl.ch doesn't.)
557
558 def handle_XOVER(self, message_spec):
559 if message_spec == "57-59":
560 self.push_lit(
561 "224 Overview information for 57-58 follows\n"
562 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
563 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
564 "\tSat, 19 Jun 2010 18:04:08 -0400"
565 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
566 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
567 "\tXref: news.gmane.org gmane.comp.python.authors:57"
568 "\n"
569 "58\tLooking for a few good bloggers"
570 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
571 "\tThu, 22 Jul 2010 09:14:14 -0400"
572 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
573 "\t\t6683\t16"
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000574 "\t"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000575 "\n"
576 # An UTF-8 overview line from fr.comp.lang.python
577 "59\tRe: Message d'erreur incompréhensible (par moi)"
578 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
579 "\tWed, 15 Sep 2010 18:09:15 +0200"
580 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
581 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
582 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
583 "\n"
584 ".\n")
585 else:
586 self.push_lit("""\
587 224 No articles
588 .""")
589
590 def handle_POST(self, *, body=None):
591 if body is None:
592 if self.allow_posting:
593 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
594 self.expect_body()
595 else:
596 self.push_lit("440 Posting not permitted")
597 else:
598 assert self.allow_posting
599 self.push_lit("240 Article received OK")
600 self.posted_body = body
601
602 def handle_IHAVE(self, message_id, *, body=None):
603 if body is None:
604 if (self.allow_posting and
605 message_id == "<i.am.an.article.you.will.want@example.com>"):
606 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
607 self.expect_body()
608 else:
609 self.push_lit("435 Article not wanted")
610 else:
611 assert self.allow_posting
612 self.push_lit("235 Article transferred OK")
613 self.posted_body = body
614
615 sample_head = """\
616 From: "Demo User" <nobody@example.net>
617 Subject: I am just a test article
618 Content-Type: text/plain; charset=UTF-8; format=flowed
619 Message-ID: <i.am.an.article.you.will.want@example.com>"""
620
621 sample_body = """\
622 This is just a test article.
623 ..Here is a dot-starting line.
624
625 -- Signed by Andr\xe9."""
626
627 sample_article = sample_head + "\n\n" + sample_body
628
629 def handle_ARTICLE(self, message_spec=None):
630 if message_spec is None:
631 self.push_lit("220 3000237 <45223423@example.com>")
632 elif message_spec == "<45223423@example.com>":
633 self.push_lit("220 0 <45223423@example.com>")
634 elif message_spec == "3000234":
635 self.push_lit("220 3000234 <45223423@example.com>")
636 else:
637 self.push_lit("430 No Such Article Found")
638 return
639 self.push_lit(self.sample_article)
640 self.push_lit(".")
641
642 def handle_HEAD(self, message_spec=None):
643 if message_spec is None:
644 self.push_lit("221 3000237 <45223423@example.com>")
645 elif message_spec == "<45223423@example.com>":
646 self.push_lit("221 0 <45223423@example.com>")
647 elif message_spec == "3000234":
648 self.push_lit("221 3000234 <45223423@example.com>")
649 else:
650 self.push_lit("430 No Such Article Found")
651 return
652 self.push_lit(self.sample_head)
653 self.push_lit(".")
654
655 def handle_BODY(self, message_spec=None):
656 if message_spec is None:
657 self.push_lit("222 3000237 <45223423@example.com>")
658 elif message_spec == "<45223423@example.com>":
659 self.push_lit("222 0 <45223423@example.com>")
660 elif message_spec == "3000234":
661 self.push_lit("222 3000234 <45223423@example.com>")
662 else:
663 self.push_lit("430 No Such Article Found")
664 return
665 self.push_lit(self.sample_body)
666 self.push_lit(".")
667
668
669class NNTPv2Handler(NNTPv1Handler):
670 """A handler for RFC 3977 (NNTP "v2")"""
671
672 def handle_CAPABILITIES(self):
673 self.push_lit("""\
674 101 Capability list:
Antoine Pitrouf80b3f72010-11-02 22:31:52 +0000675 VERSION 2 3
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000676 IMPLEMENTATION INN 2.5.1
677 AUTHINFO USER
678 HDR
679 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
680 OVER
681 POST
682 READER
683 .""")
684
685 def handle_OVER(self, message_spec=None):
686 return self.handle_XOVER(message_spec)
687
688
689class NNTPv1v2TestsMixin:
690
691 def setUp(self):
692 super().setUp()
693
694 def test_welcome(self):
695 self.assertEqual(self.server.welcome, self.handler.welcome)
696
697 def test_date(self):
698 resp, date = self.server.date()
699 self.assertEqual(resp, "111 20100914001155")
700 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
701
702 def test_quit(self):
703 self.assertFalse(self.sio.closed)
704 resp = self.server.quit()
705 self.assertEqual(resp, "205 Bye!")
706 self.assertTrue(self.sio.closed)
707
708 def test_help(self):
709 resp, help = self.server.help()
710 self.assertEqual(resp, "100 Legal commands")
711 self.assertEqual(help, [
712 ' authinfo user Name|pass Password|generic <prog> <args>',
713 ' date',
714 ' help',
715 'Report problems to <root@example.org>',
716 ])
717
718 def test_list(self):
719 resp, groups = self.server.list()
720 self.assertEqual(len(groups), 6)
721 g = groups[1]
722 self.assertEqual(g,
723 GroupInfo("comp.lang.python.announce", "0000001153",
724 "0000000993", "m"))
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000725 resp, groups = self.server.list("*distutils*")
726 self.assertEqual(len(groups), 2)
727 g = groups[0]
728 self.assertEqual(g,
729 GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
730 "0000000001", "m"))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000731
732 def test_stat(self):
733 resp, art_num, message_id = self.server.stat(3000234)
734 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
735 self.assertEqual(art_num, 3000234)
736 self.assertEqual(message_id, "<45223423@example.com>")
737 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
738 self.assertEqual(resp, "223 0 <45223423@example.com>")
739 self.assertEqual(art_num, 0)
740 self.assertEqual(message_id, "<45223423@example.com>")
741 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
742 self.server.stat("<non.existent.id>")
743 self.assertEqual(cm.exception.response, "430 No Such Article Found")
744 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
745 self.server.stat()
746 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
747
748 def test_next(self):
749 resp, art_num, message_id = self.server.next()
750 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
751 self.assertEqual(art_num, 3000237)
752 self.assertEqual(message_id, "<668929@example.org>")
753
754 def test_last(self):
755 resp, art_num, message_id = self.server.last()
756 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
757 self.assertEqual(art_num, 3000234)
758 self.assertEqual(message_id, "<45223423@example.com>")
759
760 def test_description(self):
761 desc = self.server.description("comp.lang.python")
762 self.assertEqual(desc, "The Python computer language.")
763 desc = self.server.description("comp.lang.pythonx")
764 self.assertEqual(desc, "")
765
766 def test_descriptions(self):
767 resp, groups = self.server.descriptions("comp.lang.python")
768 self.assertEqual(resp, '215 Descriptions in form "group description".')
769 self.assertEqual(groups, {
770 "comp.lang.python": "The Python computer language.",
771 })
772 resp, groups = self.server.descriptions("comp.lang.python*")
773 self.assertEqual(groups, {
774 "comp.lang.python": "The Python computer language.",
775 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
776 })
777 resp, groups = self.server.descriptions("comp.lang.pythonx")
778 self.assertEqual(groups, {})
779
780 def test_group(self):
781 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
782 self.assertTrue(resp.startswith("211 "), resp)
783 self.assertEqual(first, 761)
784 self.assertEqual(last, 1265)
785 self.assertEqual(count, 486)
786 self.assertEqual(group, "fr.comp.lang.python")
787 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
788 self.server.group("comp.lang.python.devel")
789 exc = cm.exception
790 self.assertTrue(exc.response.startswith("411 No such group"),
791 exc.response)
792
793 def test_newnews(self):
794 # NEWNEWS comp.lang.python [20]100913 082004
795 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
796 resp, ids = self.server.newnews("comp.lang.python", dt)
797 expected = (
798 "230 list of newsarticles (NNTP v{0}) "
799 "created after Mon Sep 13 08:20:04 2010 follows"
800 ).format(self.nntp_version)
801 self.assertEqual(resp, expected)
802 self.assertEqual(ids, [
803 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
804 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
805 ])
806 # NEWNEWS fr.comp.lang.python [20]100913 082004
807 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
808 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
809 self.assertEqual(resp, "230 An empty list of newsarticles follows")
810 self.assertEqual(ids, [])
811
812 def _check_article_body(self, lines):
813 self.assertEqual(len(lines), 4)
814 self.assertEqual(lines[-1].decode('utf8'), "-- Signed by André.")
815 self.assertEqual(lines[-2], b"")
816 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
817 self.assertEqual(lines[-4], b"This is just a test article.")
818
819 def _check_article_head(self, lines):
820 self.assertEqual(len(lines), 4)
821 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
822 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
823
824 def _check_article_data(self, lines):
825 self.assertEqual(len(lines), 9)
826 self._check_article_head(lines[:4])
827 self._check_article_body(lines[-4:])
828 self.assertEqual(lines[4], b"")
829
830 def test_article(self):
831 # ARTICLE
832 resp, info = self.server.article()
833 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
834 art_num, message_id, lines = info
835 self.assertEqual(art_num, 3000237)
836 self.assertEqual(message_id, "<45223423@example.com>")
837 self._check_article_data(lines)
838 # ARTICLE num
839 resp, info = self.server.article(3000234)
840 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
841 art_num, message_id, lines = info
842 self.assertEqual(art_num, 3000234)
843 self.assertEqual(message_id, "<45223423@example.com>")
844 self._check_article_data(lines)
845 # ARTICLE id
846 resp, info = self.server.article("<45223423@example.com>")
847 self.assertEqual(resp, "220 0 <45223423@example.com>")
848 art_num, message_id, lines = info
849 self.assertEqual(art_num, 0)
850 self.assertEqual(message_id, "<45223423@example.com>")
851 self._check_article_data(lines)
852 # Non-existent id
853 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
854 self.server.article("<non-existent@example.com>")
855 self.assertEqual(cm.exception.response, "430 No Such Article Found")
856
857 def test_article_file(self):
858 # With a "file" argument
859 f = io.BytesIO()
860 resp, info = self.server.article(file=f)
861 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
862 art_num, message_id, lines = info
863 self.assertEqual(art_num, 3000237)
864 self.assertEqual(message_id, "<45223423@example.com>")
865 self.assertEqual(lines, [])
866 data = f.getvalue()
867 self.assertTrue(data.startswith(
868 b'From: "Demo User" <nobody@example.net>\r\n'
869 b'Subject: I am just a test article\r\n'
870 ), ascii(data))
871 self.assertTrue(data.endswith(
872 b'This is just a test article.\r\n'
873 b'.Here is a dot-starting line.\r\n'
874 b'\r\n'
875 b'-- Signed by Andr\xc3\xa9.\r\n'
876 ), ascii(data))
877
878 def test_head(self):
879 # HEAD
880 resp, info = self.server.head()
881 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
882 art_num, message_id, lines = info
883 self.assertEqual(art_num, 3000237)
884 self.assertEqual(message_id, "<45223423@example.com>")
885 self._check_article_head(lines)
886 # HEAD num
887 resp, info = self.server.head(3000234)
888 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
889 art_num, message_id, lines = info
890 self.assertEqual(art_num, 3000234)
891 self.assertEqual(message_id, "<45223423@example.com>")
892 self._check_article_head(lines)
893 # HEAD id
894 resp, info = self.server.head("<45223423@example.com>")
895 self.assertEqual(resp, "221 0 <45223423@example.com>")
896 art_num, message_id, lines = info
897 self.assertEqual(art_num, 0)
898 self.assertEqual(message_id, "<45223423@example.com>")
899 self._check_article_head(lines)
900 # Non-existent id
901 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
902 self.server.head("<non-existent@example.com>")
903 self.assertEqual(cm.exception.response, "430 No Such Article Found")
904
905 def test_body(self):
906 # BODY
907 resp, info = self.server.body()
908 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
909 art_num, message_id, lines = info
910 self.assertEqual(art_num, 3000237)
911 self.assertEqual(message_id, "<45223423@example.com>")
912 self._check_article_body(lines)
913 # BODY num
914 resp, info = self.server.body(3000234)
915 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
916 art_num, message_id, lines = info
917 self.assertEqual(art_num, 3000234)
918 self.assertEqual(message_id, "<45223423@example.com>")
919 self._check_article_body(lines)
920 # BODY id
921 resp, info = self.server.body("<45223423@example.com>")
922 self.assertEqual(resp, "222 0 <45223423@example.com>")
923 art_num, message_id, lines = info
924 self.assertEqual(art_num, 0)
925 self.assertEqual(message_id, "<45223423@example.com>")
926 self._check_article_body(lines)
927 # Non-existent id
928 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
929 self.server.body("<non-existent@example.com>")
930 self.assertEqual(cm.exception.response, "430 No Such Article Found")
931
932 def check_over_xover_resp(self, resp, overviews):
933 self.assertTrue(resp.startswith("224 "), resp)
934 self.assertEqual(len(overviews), 3)
935 art_num, over = overviews[0]
936 self.assertEqual(art_num, 57)
937 self.assertEqual(over, {
938 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
939 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
940 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
941 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
942 "references": "<hvalf7$ort$1@dough.gmane.org>",
943 ":bytes": "7103",
944 ":lines": "16",
945 "xref": "news.gmane.org gmane.comp.python.authors:57"
946 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000947 art_num, over = overviews[1]
948 self.assertEqual(over["xref"], None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000949 art_num, over = overviews[2]
950 self.assertEqual(over["subject"],
951 "Re: Message d'erreur incompréhensible (par moi)")
952
953 def test_xover(self):
954 resp, overviews = self.server.xover(57, 59)
955 self.check_over_xover_resp(resp, overviews)
956
957 def test_over(self):
958 # In NNTP "v1", this will fallback on XOVER
959 resp, overviews = self.server.over((57, 59))
960 self.check_over_xover_resp(resp, overviews)
961
962 sample_post = (
963 b'From: "Demo User" <nobody@example.net>\r\n'
964 b'Subject: I am just a test article\r\n'
965 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
966 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
967 b'\r\n'
968 b'This is just a test article.\r\n'
969 b'.Here is a dot-starting line.\r\n'
970 b'\r\n'
971 b'-- Signed by Andr\xc3\xa9.\r\n'
972 )
973
974 def _check_posted_body(self):
975 # Check the raw body as received by the server
976 lines = self.handler.posted_body
977 # One additional line for the "." terminator
978 self.assertEqual(len(lines), 10)
979 self.assertEqual(lines[-1], b'.\r\n')
980 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
981 self.assertEqual(lines[-3], b'\r\n')
982 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
983 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
984
985 def _check_post_ihave_sub(self, func, *args, file_factory):
986 # First the prepared post with CRLF endings
987 post = self.sample_post
988 func_args = args + (file_factory(post),)
989 self.handler.posted_body = None
990 resp = func(*func_args)
991 self._check_posted_body()
992 # Then the same post with "normal" line endings - they should be
993 # converted by NNTP.post and NNTP.ihave.
994 post = self.sample_post.replace(b"\r\n", b"\n")
995 func_args = args + (file_factory(post),)
996 self.handler.posted_body = None
997 resp = func(*func_args)
998 self._check_posted_body()
999 return resp
1000
1001 def check_post_ihave(self, func, success_resp, *args):
1002 # With a bytes object
1003 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1004 self.assertEqual(resp, success_resp)
1005 # With a bytearray object
1006 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1007 self.assertEqual(resp, success_resp)
1008 # With a file object
1009 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1010 self.assertEqual(resp, success_resp)
1011 # With an iterable of terminated lines
1012 def iterlines(b):
1013 return iter(b.splitlines(True))
1014 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1015 self.assertEqual(resp, success_resp)
1016 # With an iterable of non-terminated lines
1017 def iterlines(b):
1018 return iter(b.splitlines(False))
1019 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1020 self.assertEqual(resp, success_resp)
1021
1022 def test_post(self):
1023 self.check_post_ihave(self.server.post, "240 Article received OK")
1024 self.handler.allow_posting = False
1025 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1026 self.server.post(self.sample_post)
1027 self.assertEqual(cm.exception.response,
1028 "440 Posting not permitted")
1029
1030 def test_ihave(self):
1031 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1032 "<i.am.an.article.you.will.want@example.com>")
1033 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1034 self.server.ihave("<another.message.id>", self.sample_post)
1035 self.assertEqual(cm.exception.response,
1036 "435 Article not wanted")
1037
1038
1039class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1040 """Tests an NNTP v1 server (no capabilities)."""
1041
1042 nntp_version = 1
1043 handler_class = NNTPv1Handler
1044
1045 def test_caps(self):
1046 caps = self.server.getcapabilities()
1047 self.assertEqual(caps, {})
1048 self.assertEqual(self.server.nntp_version, 1)
Antoine Pitroua0781152010-11-05 19:16:37 +00001049 self.assertEqual(self.server.nntp_implementation, None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001050
1051
1052class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1053 """Tests an NNTP v2 server (with capabilities)."""
1054
1055 nntp_version = 2
1056 handler_class = NNTPv2Handler
1057
1058 def test_caps(self):
1059 caps = self.server.getcapabilities()
1060 self.assertEqual(caps, {
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001061 'VERSION': ['2', '3'],
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001062 'IMPLEMENTATION': ['INN', '2.5.1'],
1063 'AUTHINFO': ['USER'],
1064 'HDR': [],
1065 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1066 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1067 'OVER': [],
1068 'POST': [],
1069 'READER': [],
1070 })
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001071 self.assertEqual(self.server.nntp_version, 3)
Antoine Pitroua0781152010-11-05 19:16:37 +00001072 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001073
1074
1075class MiscTests(unittest.TestCase):
1076
1077 def test_decode_header(self):
1078 def gives(a, b):
1079 self.assertEqual(nntplib.decode_header(a), b)
1080 gives("" , "")
1081 gives("a plain header", "a plain header")
1082 gives(" with extra spaces ", " with extra spaces ")
1083 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
1084 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1085 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1086 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1087 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1088 "Re: problème de matrice")
1089 # A natively utf-8 header (found in the real world!)
1090 gives("Re: Message d'erreur incompréhensible (par moi)",
1091 "Re: Message d'erreur incompréhensible (par moi)")
1092
1093 def test_parse_overview_fmt(self):
1094 # The minimal (default) response
1095 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1096 "References:", ":bytes", ":lines"]
1097 self.assertEqual(nntplib._parse_overview_fmt(lines),
1098 ["subject", "from", "date", "message-id", "references",
1099 ":bytes", ":lines"])
1100 # The minimal response using alternative names
1101 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1102 "References:", "Bytes:", "Lines:"]
1103 self.assertEqual(nntplib._parse_overview_fmt(lines),
1104 ["subject", "from", "date", "message-id", "references",
1105 ":bytes", ":lines"])
1106 # Variations in casing
1107 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1108 "References:", "BYTES:", "Lines:"]
1109 self.assertEqual(nntplib._parse_overview_fmt(lines),
1110 ["subject", "from", "date", "message-id", "references",
1111 ":bytes", ":lines"])
1112 # First example from RFC 3977
1113 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1114 "References:", ":bytes", ":lines", "Xref:full",
1115 "Distribution:full"]
1116 self.assertEqual(nntplib._parse_overview_fmt(lines),
1117 ["subject", "from", "date", "message-id", "references",
1118 ":bytes", ":lines", "xref", "distribution"])
1119 # Second example from RFC 3977
1120 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1121 "References:", "Bytes:", "Lines:", "Xref:FULL",
1122 "Distribution:FULL"]
1123 self.assertEqual(nntplib._parse_overview_fmt(lines),
1124 ["subject", "from", "date", "message-id", "references",
1125 ":bytes", ":lines", "xref", "distribution"])
1126 # A classic response from INN
1127 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1128 "References:", "Bytes:", "Lines:", "Xref:full"]
1129 self.assertEqual(nntplib._parse_overview_fmt(lines),
1130 ["subject", "from", "date", "message-id", "references",
1131 ":bytes", ":lines", "xref"])
1132
1133 def test_parse_overview(self):
1134 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1135 # First example from RFC 3977
1136 lines = [
1137 '3000234\tI am just a test article\t"Demo User" '
1138 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1139 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1140 '17\tXref: news.example.com misc.test:3000363',
1141 ]
1142 overview = nntplib._parse_overview(lines, fmt)
1143 (art_num, fields), = overview
1144 self.assertEqual(art_num, 3000234)
1145 self.assertEqual(fields, {
1146 'subject': 'I am just a test article',
1147 'from': '"Demo User" <nobody@example.com>',
1148 'date': '6 Oct 1998 04:38:40 -0500',
1149 'message-id': '<45223423@example.com>',
1150 'references': '<45454@example.net>',
1151 ':bytes': '1234',
1152 ':lines': '17',
1153 'xref': 'news.example.com misc.test:3000363',
1154 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001155 # Second example; here the "Xref" field is totally absent (including
1156 # the header name) and comes out as None
1157 lines = [
1158 '3000234\tI am just a test article\t"Demo User" '
1159 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1160 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1161 '17\t\t',
1162 ]
1163 overview = nntplib._parse_overview(lines, fmt)
1164 (art_num, fields), = overview
1165 self.assertEqual(fields['xref'], None)
1166 # Third example; the "Xref" is an empty string, while "references"
1167 # is a single space.
1168 lines = [
1169 '3000234\tI am just a test article\t"Demo User" '
1170 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1171 '<45223423@example.com>\t \t1234\t'
1172 '17\tXref: \t',
1173 ]
1174 overview = nntplib._parse_overview(lines, fmt)
1175 (art_num, fields), = overview
1176 self.assertEqual(fields['references'], ' ')
1177 self.assertEqual(fields['xref'], '')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001178
1179 def test_parse_datetime(self):
1180 def gives(a, b, *c):
1181 self.assertEqual(nntplib._parse_datetime(a, b),
1182 datetime.datetime(*c))
1183 # Output of DATE command
1184 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1185 # Variations
1186 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1187 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1188 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1189
1190 def test_unparse_datetime(self):
1191 # Test non-legacy mode
1192 # 1) with a datetime
1193 def gives(y, M, d, h, m, s, date_str, time_str):
1194 dt = datetime.datetime(y, M, d, h, m, s)
1195 self.assertEqual(nntplib._unparse_datetime(dt),
1196 (date_str, time_str))
1197 self.assertEqual(nntplib._unparse_datetime(dt, False),
1198 (date_str, time_str))
1199 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1200 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1201 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1202 # 2) with a date
1203 def gives(y, M, d, date_str, time_str):
1204 dt = datetime.date(y, M, d)
1205 self.assertEqual(nntplib._unparse_datetime(dt),
1206 (date_str, time_str))
1207 self.assertEqual(nntplib._unparse_datetime(dt, False),
1208 (date_str, time_str))
1209 gives(1999, 6, 23, "19990623", "000000")
1210 gives(2000, 6, 23, "20000623", "000000")
1211 gives(2010, 6, 5, "20100605", "000000")
1212
1213 def test_unparse_datetime_legacy(self):
1214 # Test legacy mode (RFC 977)
1215 # 1) with a datetime
1216 def gives(y, M, d, h, m, s, date_str, time_str):
1217 dt = datetime.datetime(y, M, d, h, m, s)
1218 self.assertEqual(nntplib._unparse_datetime(dt, True),
1219 (date_str, time_str))
1220 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1221 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1222 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1223 # 2) with a date
1224 def gives(y, M, d, date_str, time_str):
1225 dt = datetime.date(y, M, d)
1226 self.assertEqual(nntplib._unparse_datetime(dt, True),
1227 (date_str, time_str))
1228 gives(1999, 6, 23, "990623", "000000")
1229 gives(2000, 6, 23, "000623", "000000")
1230 gives(2010, 6, 5, "100605", "000000")
1231
1232
1233def test_main():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00001234 tests = [MiscTests, NNTPv1Tests, NNTPv2Tests, NetworkedNNTPTests]
1235 if _have_ssl:
1236 tests.append(NetworkedNNTP_SSLTests)
1237 support.run_unittest(*tests)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001238
1239
1240if __name__ == "__main__":
1241 test_main()