blob: e1155236787ee713176a8ff303537cb871f6644e [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +00002import socket
Antoine Pitrou69ab9512010-09-29 15:03:40 +00003import datetime
4import textwrap
5import unittest
Antoine Pitroude609182010-11-18 17:29:23 +00006import functools
Antoine Pitrou69ab9512010-09-29 15:03:40 +00007import contextlib
8from test import support
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00009from nntplib import NNTP, GroupInfo, _have_ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +000010import nntplib
Antoine Pitrou1cb121e2010-11-09 18:54:37 +000011if _have_ssl:
12 import ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +000013
14TIMEOUT = 30
15
16# TODO:
17# - test the `file` arg to more commands
18# - test error conditions
Antoine Pitroua5785b12010-09-29 16:19:50 +000019# - test auth and `usenetrc`
Antoine Pitrou69ab9512010-09-29 15:03:40 +000020
21
22class NetworkedNNTPTestsMixin:
23
24 def test_welcome(self):
25 welcome = self.server.getwelcome()
26 self.assertEqual(str, type(welcome))
27
28 def test_help(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000029 resp, lines = self.server.help()
Antoine Pitrou69ab9512010-09-29 15:03:40 +000030 self.assertTrue(resp.startswith("100 "), resp)
Antoine Pitrou08eeada2010-11-04 21:36:15 +000031 for line in lines:
Antoine Pitrou69ab9512010-09-29 15:03:40 +000032 self.assertEqual(str, type(line))
33
34 def test_list(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000035 resp, groups = self.server.list()
36 if len(groups) > 0:
37 self.assertEqual(GroupInfo, type(groups[0]))
38 self.assertEqual(str, type(groups[0].group))
39
40 def test_list_active(self):
41 resp, groups = self.server.list(self.GROUP_PAT)
42 if len(groups) > 0:
43 self.assertEqual(GroupInfo, type(groups[0]))
44 self.assertEqual(str, type(groups[0].group))
Antoine Pitrou69ab9512010-09-29 15:03:40 +000045
46 def test_unknown_command(self):
47 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
48 self.server._shortcmd("XYZZY")
49 resp = cm.exception.response
50 self.assertTrue(resp.startswith("500 "), resp)
51
52 def test_newgroups(self):
53 # gmane gets a constant influx of new groups. In order not to stress
54 # the server too much, we choose a recent date in the past.
55 dt = datetime.date.today() - datetime.timedelta(days=7)
56 resp, groups = self.server.newgroups(dt)
57 if len(groups) > 0:
58 self.assertIsInstance(groups[0], GroupInfo)
59 self.assertIsInstance(groups[0].group, str)
60
61 def test_description(self):
62 def _check_desc(desc):
63 # Sanity checks
64 self.assertIsInstance(desc, str)
65 self.assertNotIn(self.GROUP_NAME, desc)
66 desc = self.server.description(self.GROUP_NAME)
67 _check_desc(desc)
68 # Another sanity check
69 self.assertIn("Python", desc)
70 # With a pattern
71 desc = self.server.description(self.GROUP_PAT)
72 _check_desc(desc)
73 # Shouldn't exist
74 desc = self.server.description("zk.brrtt.baz")
75 self.assertEqual(desc, '')
76
77 def test_descriptions(self):
78 resp, descs = self.server.descriptions(self.GROUP_PAT)
79 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
80 self.assertTrue(
81 resp.startswith("215 ") or resp.startswith("282 "), resp)
82 self.assertIsInstance(descs, dict)
83 desc = descs[self.GROUP_NAME]
84 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
85
86 def test_group(self):
87 result = self.server.group(self.GROUP_NAME)
88 self.assertEqual(5, len(result))
89 resp, count, first, last, group = result
90 self.assertEqual(group, self.GROUP_NAME)
91 self.assertIsInstance(count, int)
92 self.assertIsInstance(first, int)
93 self.assertIsInstance(last, int)
94 self.assertLessEqual(first, last)
95 self.assertTrue(resp.startswith("211 "), resp)
96
97 def test_date(self):
98 resp, date = self.server.date()
99 self.assertIsInstance(date, datetime.datetime)
100 # Sanity check
101 self.assertGreaterEqual(date.year, 1995)
102 self.assertLessEqual(date.year, 2030)
103
104 def _check_art_dict(self, art_dict):
105 # Some sanity checks for a field dictionary returned by OVER / XOVER
106 self.assertIsInstance(art_dict, dict)
107 # NNTP has 7 mandatory fields
108 self.assertGreaterEqual(art_dict.keys(),
109 {"subject", "from", "date", "message-id",
110 "references", ":bytes", ":lines"}
111 )
112 for v in art_dict.values():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000113 self.assertIsInstance(v, (str, type(None)))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000114
115 def test_xover(self):
116 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000117 resp, lines = self.server.xover(last - 5, last)
118 if len(lines) == 0:
119 self.skipTest("no articles retrieved")
120 # The 'last' article is not necessarily part of the output (cancelled?)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000121 art_num, art_dict = lines[0]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000122 self.assertGreaterEqual(art_num, last - 5)
123 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000124 self._check_art_dict(art_dict)
125
126 def test_over(self):
127 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
128 start = last - 10
129 # The "start-" article range form
130 resp, lines = self.server.over((start, None))
131 art_num, art_dict = lines[0]
132 self._check_art_dict(art_dict)
133 # The "start-end" article range form
134 resp, lines = self.server.over((start, last))
135 art_num, art_dict = lines[-1]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000136 # The 'last' article is not necessarily part of the output (cancelled?)
137 self.assertGreaterEqual(art_num, start)
138 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000139 self._check_art_dict(art_dict)
140 # XXX The "message_id" form is unsupported by gmane
141 # 503 Overview by message-ID unsupported
142
143 def test_xhdr(self):
144 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
145 resp, lines = self.server.xhdr('subject', last)
146 for line in lines:
147 self.assertEqual(str, type(line[1]))
148
149 def check_article_resp(self, resp, article, art_num=None):
150 self.assertIsInstance(article, nntplib.ArticleInfo)
151 if art_num is not None:
152 self.assertEqual(article.number, art_num)
153 for line in article.lines:
154 self.assertIsInstance(line, bytes)
155 # XXX this could exceptionally happen...
156 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
157
158 def test_article_head_body(self):
159 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000160 # Try to find an available article
161 for art_num in (last, first, last - 1):
162 try:
163 resp, head = self.server.head(art_num)
164 except nntplib.NNTPTemporaryError as e:
165 if not e.response.startswith("423 "):
166 raise
167 # "423 No such article" => choose another one
168 continue
169 break
170 else:
171 self.skipTest("could not find a suitable article number")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000172 self.assertTrue(resp.startswith("221 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000173 self.check_article_resp(resp, head, art_num)
174 resp, body = self.server.body(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000175 self.assertTrue(resp.startswith("222 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000176 self.check_article_resp(resp, body, art_num)
177 resp, article = self.server.article(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000178 self.assertTrue(resp.startswith("220 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000179 self.check_article_resp(resp, article, art_num)
Nick Coghlan14d99a12012-06-17 21:27:18 +1000180 # Tolerate running the tests from behind a NNTP virus checker
181 filtered_lines = [line for line in article.lines
182 if not line.startswith(b'X-Antivirus')]
183 self.assertEqual(filtered_lines, head.lines + [b''] + body.lines)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000184
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000185 def test_capabilities(self):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000186 # The server under test implements NNTP version 2 and has a
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000187 # couple of well-known capabilities. Just sanity check that we
188 # got them.
189 def _check_caps(caps):
190 caps_list = caps['LIST']
191 self.assertIsInstance(caps_list, (list, tuple))
192 self.assertIn('OVERVIEW.FMT', caps_list)
193 self.assertGreaterEqual(self.server.nntp_version, 2)
194 _check_caps(self.server.getcapabilities())
195 # This re-emits the command
196 resp, caps = self.server.capabilities()
197 _check_caps(caps)
198
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000199 if _have_ssl:
200 def test_starttls(self):
201 file = self.server.file
202 sock = self.server.sock
203 try:
204 self.server.starttls()
205 except nntplib.NNTPPermanentError:
206 self.skipTest("STARTTLS not supported by server.")
207 else:
208 # Check that the socket and internal pseudo-file really were
209 # changed.
210 self.assertNotEqual(file, self.server.file)
211 self.assertNotEqual(sock, self.server.sock)
212 # Check that the new socket really is an SSL one
213 self.assertIsInstance(self.server.sock, ssl.SSLSocket)
214 # Check that trying starttls when it's already active fails.
215 self.assertRaises(ValueError, self.server.starttls)
216
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000217 def test_zlogin(self):
218 # This test must be the penultimate because further commands will be
219 # refused.
220 baduser = "notarealuser"
221 badpw = "notarealpassword"
222 # Check that bogus credentials cause failure
223 self.assertRaises(nntplib.NNTPError, self.server.login,
224 user=baduser, password=badpw, usenetrc=False)
225 # FIXME: We should check that correct credentials succeed, but that
226 # would require valid details for some server somewhere to be in the
227 # test suite, I think. Gmane is anonymous, at least as used for the
228 # other tests.
229
230 def test_zzquit(self):
231 # This test must be called last, hence the name
232 cls = type(self)
Antoine Pitrou3bce11c2010-11-21 17:14:19 +0000233 try:
234 self.server.quit()
235 finally:
236 cls.server = None
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000237
Antoine Pitroude609182010-11-18 17:29:23 +0000238 @classmethod
239 def wrap_methods(cls):
240 # Wrap all methods in a transient_internet() exception catcher
241 # XXX put a generic version in test.support?
242 def wrap_meth(meth):
243 @functools.wraps(meth)
244 def wrapped(self):
245 with support.transient_internet(self.NNTP_HOST):
246 meth(self)
247 return wrapped
248 for name in dir(cls):
249 if not name.startswith('test_'):
250 continue
251 meth = getattr(cls, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200252 if not callable(meth):
Antoine Pitroude609182010-11-18 17:29:23 +0000253 continue
254 # Need to use a closure so that meth remains bound to its current
255 # value
256 setattr(cls, name, wrap_meth(meth))
257
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000258 def test_with_statement(self):
259 def is_connected():
260 if not hasattr(server, 'file'):
261 return False
262 try:
263 server.help()
264 except (socket.error, EOFError):
265 return False
266 return True
267
268 with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
269 self.assertTrue(is_connected())
270 self.assertTrue(server.help())
271 self.assertFalse(is_connected())
272
273 with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
274 server.quit()
275 self.assertFalse(is_connected())
276
277
Antoine Pitroude609182010-11-18 17:29:23 +0000278NetworkedNNTPTestsMixin.wrap_methods()
279
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000280
281class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
282 # This server supports STARTTLS (gmane doesn't)
283 NNTP_HOST = 'news.trigofacile.com'
284 GROUP_NAME = 'fr.comp.lang.python'
285 GROUP_PAT = 'fr.comp.lang.*'
286
Antoine Pitroude609182010-11-18 17:29:23 +0000287 NNTP_CLASS = NNTP
288
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000289 @classmethod
290 def setUpClass(cls):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000291 support.requires("network")
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000292 with support.transient_internet(cls.NNTP_HOST):
Antoine Pitroude609182010-11-18 17:29:23 +0000293 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, timeout=TIMEOUT, usenetrc=False)
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000294
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000295 @classmethod
296 def tearDownClass(cls):
297 if cls.server is not None:
298 cls.server.quit()
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000299
300
301if _have_ssl:
Antoine Pitroude609182010-11-18 17:29:23 +0000302 class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000303
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000304 # Technical limits for this public NNTP server (see http://www.aioe.org):
305 # "Only two concurrent connections per IP address are allowed and
306 # 400 connections per day are accepted from each IP address."
307
308 NNTP_HOST = 'nntp.aioe.org'
309 GROUP_NAME = 'comp.lang.python'
310 GROUP_PAT = 'comp.lang.*'
311
Antoine Pitroude609182010-11-18 17:29:23 +0000312 NNTP_CLASS = nntplib.NNTP_SSL
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000313
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000314 # Disabled as it produces too much data
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000315 test_list = None
316
317 # Disabled as the connection will already be encrypted.
318 test_starttls = None
319
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000320
321#
322# Non-networked tests using a local server (or something mocking it).
323#
324
325class _NNTPServerIO(io.RawIOBase):
326 """A raw IO object allowing NNTP commands to be received and processed
327 by a handler. The handler can push responses which can then be read
328 from the IO object."""
329
330 def __init__(self, handler):
331 io.RawIOBase.__init__(self)
332 # The channel from the client
333 self.c2s = io.BytesIO()
334 # The channel to the client
335 self.s2c = io.BytesIO()
336 self.handler = handler
337 self.handler.start(self.c2s.readline, self.push_data)
338
339 def readable(self):
340 return True
341
342 def writable(self):
343 return True
344
345 def push_data(self, data):
346 """Push (buffer) some data to send to the client."""
347 pos = self.s2c.tell()
348 self.s2c.seek(0, 2)
349 self.s2c.write(data)
350 self.s2c.seek(pos)
351
352 def write(self, b):
353 """The client sends us some data"""
354 pos = self.c2s.tell()
355 self.c2s.write(b)
356 self.c2s.seek(pos)
357 self.handler.process_pending()
358 return len(b)
359
360 def readinto(self, buf):
361 """The client wants to read a response"""
362 self.handler.process_pending()
363 b = self.s2c.read(len(buf))
364 n = len(b)
365 buf[:n] = b
366 return n
367
368
369class MockedNNTPTestsMixin:
370 # Override in derived classes
371 handler_class = None
372
373 def setUp(self):
374 super().setUp()
375 self.make_server()
376
377 def tearDown(self):
378 super().tearDown()
379 del self.server
380
381 def make_server(self, *args, **kwargs):
382 self.handler = self.handler_class()
383 self.sio = _NNTPServerIO(self.handler)
384 # Using BufferedRWPair instead of BufferedRandom ensures the file
385 # isn't seekable.
386 file = io.BufferedRWPair(self.sio, self.sio)
Antoine Pitroua5785b12010-09-29 16:19:50 +0000387 self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000388 return self.server
389
390
Antoine Pitrou71135622012-02-14 23:29:34 +0100391class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin):
392 def setUp(self):
393 super().setUp()
394 self.make_server(readermode=True)
395
396
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000397class NNTPv1Handler:
398 """A handler for RFC 977"""
399
400 welcome = "200 NNTP mock server"
401
402 def start(self, readline, push_data):
403 self.in_body = False
404 self.allow_posting = True
405 self._readline = readline
406 self._push_data = push_data
Antoine Pitrou54411c12012-02-12 19:14:17 +0100407 self._logged_in = False
408 self._user_sent = False
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000409 # Our welcome
410 self.handle_welcome()
411
412 def _decode(self, data):
413 return str(data, "utf-8", "surrogateescape")
414
415 def process_pending(self):
416 if self.in_body:
417 while True:
418 line = self._readline()
419 if not line:
420 return
421 self.body.append(line)
422 if line == b".\r\n":
423 break
424 try:
425 meth, tokens = self.body_callback
426 meth(*tokens, body=self.body)
427 finally:
428 self.body_callback = None
429 self.body = None
430 self.in_body = False
431 while True:
432 line = self._decode(self._readline())
433 if not line:
434 return
435 if not line.endswith("\r\n"):
436 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
437 line = line[:-2]
438 cmd, *tokens = line.split()
439 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
440 meth = getattr(self, "handle_" + cmd.upper(), None)
441 if meth is None:
442 self.handle_unknown()
443 else:
444 try:
445 meth(*tokens)
446 except Exception as e:
447 raise ValueError("command failed: {!r}".format(line)) from e
448 else:
449 if self.in_body:
450 self.body_callback = meth, tokens
451 self.body = []
452
453 def expect_body(self):
454 """Flag that the client is expected to post a request body"""
455 self.in_body = True
456
457 def push_data(self, data):
458 """Push some binary data"""
459 self._push_data(data)
460
461 def push_lit(self, lit):
462 """Push a string literal"""
463 lit = textwrap.dedent(lit)
464 lit = "\r\n".join(lit.splitlines()) + "\r\n"
465 lit = lit.encode('utf-8')
466 self.push_data(lit)
467
468 def handle_unknown(self):
469 self.push_lit("500 What?")
470
471 def handle_welcome(self):
472 self.push_lit(self.welcome)
473
474 def handle_QUIT(self):
475 self.push_lit("205 Bye!")
476
477 def handle_DATE(self):
478 self.push_lit("111 20100914001155")
479
480 def handle_GROUP(self, group):
481 if group == "fr.comp.lang.python":
482 self.push_lit("211 486 761 1265 fr.comp.lang.python")
483 else:
484 self.push_lit("411 No such group {}".format(group))
485
486 def handle_HELP(self):
487 self.push_lit("""\
488 100 Legal commands
489 authinfo user Name|pass Password|generic <prog> <args>
490 date
491 help
492 Report problems to <root@example.org>
493 .""")
494
495 def handle_STAT(self, message_spec=None):
496 if message_spec is None:
497 self.push_lit("412 No newsgroup selected")
498 elif message_spec == "3000234":
499 self.push_lit("223 3000234 <45223423@example.com>")
500 elif message_spec == "<45223423@example.com>":
501 self.push_lit("223 0 <45223423@example.com>")
502 else:
503 self.push_lit("430 No Such Article Found")
504
505 def handle_NEXT(self):
506 self.push_lit("223 3000237 <668929@example.org> retrieved")
507
508 def handle_LAST(self):
509 self.push_lit("223 3000234 <45223423@example.com> retrieved")
510
511 def handle_LIST(self, action=None, param=None):
512 if action is None:
513 self.push_lit("""\
514 215 Newsgroups in form "group high low flags".
515 comp.lang.python 0000052340 0000002828 y
516 comp.lang.python.announce 0000001153 0000000993 m
517 free.it.comp.lang.python 0000000002 0000000002 y
518 fr.comp.lang.python 0000001254 0000000760 y
519 free.it.comp.lang.python.learner 0000000000 0000000001 y
520 tw.bbs.comp.lang.python 0000000304 0000000304 y
521 .""")
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000522 elif action == "ACTIVE":
523 if param == "*distutils*":
524 self.push_lit("""\
525 215 Newsgroups in form "group high low flags"
526 gmane.comp.python.distutils.devel 0000014104 0000000001 m
527 gmane.comp.python.distutils.cvs 0000000000 0000000001 m
528 .""")
529 else:
530 self.push_lit("""\
531 215 Newsgroups in form "group high low flags"
532 .""")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000533 elif action == "OVERVIEW.FMT":
534 self.push_lit("""\
535 215 Order of fields in overview database.
536 Subject:
537 From:
538 Date:
539 Message-ID:
540 References:
541 Bytes:
542 Lines:
543 Xref:full
544 .""")
545 elif action == "NEWSGROUPS":
546 assert param is not None
547 if param == "comp.lang.python":
548 self.push_lit("""\
549 215 Descriptions in form "group description".
550 comp.lang.python\tThe Python computer language.
551 .""")
552 elif param == "comp.lang.python*":
553 self.push_lit("""\
554 215 Descriptions in form "group description".
555 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
556 comp.lang.python\tThe Python computer language.
557 .""")
558 else:
559 self.push_lit("""\
560 215 Descriptions in form "group description".
561 .""")
562 else:
563 self.push_lit('501 Unknown LIST keyword')
564
565 def handle_NEWNEWS(self, group, date_str, time_str):
566 # We hard code different return messages depending on passed
567 # argument and date syntax.
568 if (group == "comp.lang.python" and date_str == "20100913"
569 and time_str == "082004"):
570 # Date was passed in RFC 3977 format (NNTP "v2")
571 self.push_lit("""\
572 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
573 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
574 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
575 .""")
576 elif (group == "comp.lang.python" and date_str == "100913"
577 and time_str == "082004"):
578 # Date was passed in RFC 977 format (NNTP "v1")
579 self.push_lit("""\
580 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
581 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
582 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
583 .""")
584 else:
585 self.push_lit("""\
586 230 An empty list of newsarticles follows
587 .""")
588 # (Note for experiments: many servers disable NEWNEWS.
589 # As of this writing, sicinfo3.epfl.ch doesn't.)
590
591 def handle_XOVER(self, message_spec):
592 if message_spec == "57-59":
593 self.push_lit(
594 "224 Overview information for 57-58 follows\n"
595 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
596 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
597 "\tSat, 19 Jun 2010 18:04:08 -0400"
598 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
599 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
600 "\tXref: news.gmane.org gmane.comp.python.authors:57"
601 "\n"
602 "58\tLooking for a few good bloggers"
603 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
604 "\tThu, 22 Jul 2010 09:14:14 -0400"
605 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
606 "\t\t6683\t16"
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000607 "\t"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000608 "\n"
609 # An UTF-8 overview line from fr.comp.lang.python
610 "59\tRe: Message d'erreur incompréhensible (par moi)"
611 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
612 "\tWed, 15 Sep 2010 18:09:15 +0200"
613 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
614 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
615 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
616 "\n"
617 ".\n")
618 else:
619 self.push_lit("""\
620 224 No articles
621 .""")
622
623 def handle_POST(self, *, body=None):
624 if body is None:
625 if self.allow_posting:
626 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
627 self.expect_body()
628 else:
629 self.push_lit("440 Posting not permitted")
630 else:
631 assert self.allow_posting
632 self.push_lit("240 Article received OK")
633 self.posted_body = body
634
635 def handle_IHAVE(self, message_id, *, body=None):
636 if body is None:
637 if (self.allow_posting and
638 message_id == "<i.am.an.article.you.will.want@example.com>"):
639 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
640 self.expect_body()
641 else:
642 self.push_lit("435 Article not wanted")
643 else:
644 assert self.allow_posting
645 self.push_lit("235 Article transferred OK")
646 self.posted_body = body
647
648 sample_head = """\
649 From: "Demo User" <nobody@example.net>
650 Subject: I am just a test article
651 Content-Type: text/plain; charset=UTF-8; format=flowed
652 Message-ID: <i.am.an.article.you.will.want@example.com>"""
653
654 sample_body = """\
655 This is just a test article.
656 ..Here is a dot-starting line.
657
658 -- Signed by Andr\xe9."""
659
660 sample_article = sample_head + "\n\n" + sample_body
661
662 def handle_ARTICLE(self, message_spec=None):
663 if message_spec is None:
664 self.push_lit("220 3000237 <45223423@example.com>")
665 elif message_spec == "<45223423@example.com>":
666 self.push_lit("220 0 <45223423@example.com>")
667 elif message_spec == "3000234":
668 self.push_lit("220 3000234 <45223423@example.com>")
669 else:
670 self.push_lit("430 No Such Article Found")
671 return
672 self.push_lit(self.sample_article)
673 self.push_lit(".")
674
675 def handle_HEAD(self, message_spec=None):
676 if message_spec is None:
677 self.push_lit("221 3000237 <45223423@example.com>")
678 elif message_spec == "<45223423@example.com>":
679 self.push_lit("221 0 <45223423@example.com>")
680 elif message_spec == "3000234":
681 self.push_lit("221 3000234 <45223423@example.com>")
682 else:
683 self.push_lit("430 No Such Article Found")
684 return
685 self.push_lit(self.sample_head)
686 self.push_lit(".")
687
688 def handle_BODY(self, message_spec=None):
689 if message_spec is None:
690 self.push_lit("222 3000237 <45223423@example.com>")
691 elif message_spec == "<45223423@example.com>":
692 self.push_lit("222 0 <45223423@example.com>")
693 elif message_spec == "3000234":
694 self.push_lit("222 3000234 <45223423@example.com>")
695 else:
696 self.push_lit("430 No Such Article Found")
697 return
698 self.push_lit(self.sample_body)
699 self.push_lit(".")
700
Antoine Pitrou54411c12012-02-12 19:14:17 +0100701 def handle_AUTHINFO(self, cred_type, data):
702 if self._logged_in:
703 self.push_lit('502 Already Logged In')
704 elif cred_type == 'user':
705 if self._user_sent:
706 self.push_lit('482 User Credential Already Sent')
707 else:
708 self.push_lit('381 Password Required')
709 self._user_sent = True
710 elif cred_type == 'pass':
711 self.push_lit('281 Login Successful')
712 self._logged_in = True
713 else:
714 raise Exception('Unknown cred type {}'.format(cred_type))
715
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000716
717class NNTPv2Handler(NNTPv1Handler):
718 """A handler for RFC 3977 (NNTP "v2")"""
719
720 def handle_CAPABILITIES(self):
Antoine Pitrou54411c12012-02-12 19:14:17 +0100721 fmt = """\
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000722 101 Capability list:
Antoine Pitrouf80b3f72010-11-02 22:31:52 +0000723 VERSION 2 3
Antoine Pitrou54411c12012-02-12 19:14:17 +0100724 IMPLEMENTATION INN 2.5.1{}
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000725 HDR
726 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
727 OVER
728 POST
729 READER
Antoine Pitrou54411c12012-02-12 19:14:17 +0100730 ."""
731
732 if not self._logged_in:
733 self.push_lit(fmt.format('\n AUTHINFO USER'))
734 else:
735 self.push_lit(fmt.format(''))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000736
Antoine Pitrou71135622012-02-14 23:29:34 +0100737 def handle_MODE(self, _):
738 raise Exception('MODE READER sent despite READER has been advertised')
739
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000740 def handle_OVER(self, message_spec=None):
741 return self.handle_XOVER(message_spec)
742
743
Antoine Pitrou54411c12012-02-12 19:14:17 +0100744class CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
745 """A handler that allows CAPABILITIES only after login"""
746
747 def handle_CAPABILITIES(self):
748 if not self._logged_in:
749 self.push_lit('480 You must log in.')
750 else:
751 super().handle_CAPABILITIES()
752
753
Antoine Pitrou71135622012-02-14 23:29:34 +0100754class ModeSwitchingNNTPv2Handler(NNTPv2Handler):
755 """A server that starts in transit mode"""
756
757 def __init__(self):
758 self._switched = False
759
760 def handle_CAPABILITIES(self):
761 fmt = """\
762 101 Capability list:
763 VERSION 2 3
764 IMPLEMENTATION INN 2.5.1
765 HDR
766 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
767 OVER
768 POST
769 {}READER
770 ."""
771 if self._switched:
772 self.push_lit(fmt.format(''))
773 else:
774 self.push_lit(fmt.format('MODE-'))
775
776 def handle_MODE(self, what):
777 assert not self._switched and what == 'reader'
778 self._switched = True
779 self.push_lit('200 Posting allowed')
780
781
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000782class NNTPv1v2TestsMixin:
783
784 def setUp(self):
785 super().setUp()
786
787 def test_welcome(self):
788 self.assertEqual(self.server.welcome, self.handler.welcome)
789
Antoine Pitrou54411c12012-02-12 19:14:17 +0100790 def test_authinfo(self):
791 if self.nntp_version == 2:
792 self.assertIn('AUTHINFO', self.server._caps)
793 self.server.login('testuser', 'testpw')
794 # if AUTHINFO is gone from _caps we also know that getcapabilities()
795 # has been called after login as it should
796 self.assertNotIn('AUTHINFO', self.server._caps)
797
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000798 def test_date(self):
799 resp, date = self.server.date()
800 self.assertEqual(resp, "111 20100914001155")
801 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
802
803 def test_quit(self):
804 self.assertFalse(self.sio.closed)
805 resp = self.server.quit()
806 self.assertEqual(resp, "205 Bye!")
807 self.assertTrue(self.sio.closed)
808
809 def test_help(self):
810 resp, help = self.server.help()
811 self.assertEqual(resp, "100 Legal commands")
812 self.assertEqual(help, [
813 ' authinfo user Name|pass Password|generic <prog> <args>',
814 ' date',
815 ' help',
816 'Report problems to <root@example.org>',
817 ])
818
819 def test_list(self):
820 resp, groups = self.server.list()
821 self.assertEqual(len(groups), 6)
822 g = groups[1]
823 self.assertEqual(g,
824 GroupInfo("comp.lang.python.announce", "0000001153",
825 "0000000993", "m"))
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000826 resp, groups = self.server.list("*distutils*")
827 self.assertEqual(len(groups), 2)
828 g = groups[0]
829 self.assertEqual(g,
830 GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
831 "0000000001", "m"))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000832
833 def test_stat(self):
834 resp, art_num, message_id = self.server.stat(3000234)
835 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
836 self.assertEqual(art_num, 3000234)
837 self.assertEqual(message_id, "<45223423@example.com>")
838 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
839 self.assertEqual(resp, "223 0 <45223423@example.com>")
840 self.assertEqual(art_num, 0)
841 self.assertEqual(message_id, "<45223423@example.com>")
842 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
843 self.server.stat("<non.existent.id>")
844 self.assertEqual(cm.exception.response, "430 No Such Article Found")
845 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
846 self.server.stat()
847 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
848
849 def test_next(self):
850 resp, art_num, message_id = self.server.next()
851 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
852 self.assertEqual(art_num, 3000237)
853 self.assertEqual(message_id, "<668929@example.org>")
854
855 def test_last(self):
856 resp, art_num, message_id = self.server.last()
857 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
858 self.assertEqual(art_num, 3000234)
859 self.assertEqual(message_id, "<45223423@example.com>")
860
861 def test_description(self):
862 desc = self.server.description("comp.lang.python")
863 self.assertEqual(desc, "The Python computer language.")
864 desc = self.server.description("comp.lang.pythonx")
865 self.assertEqual(desc, "")
866
867 def test_descriptions(self):
868 resp, groups = self.server.descriptions("comp.lang.python")
869 self.assertEqual(resp, '215 Descriptions in form "group description".')
870 self.assertEqual(groups, {
871 "comp.lang.python": "The Python computer language.",
872 })
873 resp, groups = self.server.descriptions("comp.lang.python*")
874 self.assertEqual(groups, {
875 "comp.lang.python": "The Python computer language.",
876 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
877 })
878 resp, groups = self.server.descriptions("comp.lang.pythonx")
879 self.assertEqual(groups, {})
880
881 def test_group(self):
882 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
883 self.assertTrue(resp.startswith("211 "), resp)
884 self.assertEqual(first, 761)
885 self.assertEqual(last, 1265)
886 self.assertEqual(count, 486)
887 self.assertEqual(group, "fr.comp.lang.python")
888 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
889 self.server.group("comp.lang.python.devel")
890 exc = cm.exception
891 self.assertTrue(exc.response.startswith("411 No such group"),
892 exc.response)
893
894 def test_newnews(self):
895 # NEWNEWS comp.lang.python [20]100913 082004
896 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
897 resp, ids = self.server.newnews("comp.lang.python", dt)
898 expected = (
899 "230 list of newsarticles (NNTP v{0}) "
900 "created after Mon Sep 13 08:20:04 2010 follows"
901 ).format(self.nntp_version)
902 self.assertEqual(resp, expected)
903 self.assertEqual(ids, [
904 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
905 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
906 ])
907 # NEWNEWS fr.comp.lang.python [20]100913 082004
908 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
909 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
910 self.assertEqual(resp, "230 An empty list of newsarticles follows")
911 self.assertEqual(ids, [])
912
913 def _check_article_body(self, lines):
914 self.assertEqual(len(lines), 4)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +0000915 self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000916 self.assertEqual(lines[-2], b"")
917 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
918 self.assertEqual(lines[-4], b"This is just a test article.")
919
920 def _check_article_head(self, lines):
921 self.assertEqual(len(lines), 4)
922 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
923 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
924
925 def _check_article_data(self, lines):
926 self.assertEqual(len(lines), 9)
927 self._check_article_head(lines[:4])
928 self._check_article_body(lines[-4:])
929 self.assertEqual(lines[4], b"")
930
931 def test_article(self):
932 # ARTICLE
933 resp, info = self.server.article()
934 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
935 art_num, message_id, lines = info
936 self.assertEqual(art_num, 3000237)
937 self.assertEqual(message_id, "<45223423@example.com>")
938 self._check_article_data(lines)
939 # ARTICLE num
940 resp, info = self.server.article(3000234)
941 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
942 art_num, message_id, lines = info
943 self.assertEqual(art_num, 3000234)
944 self.assertEqual(message_id, "<45223423@example.com>")
945 self._check_article_data(lines)
946 # ARTICLE id
947 resp, info = self.server.article("<45223423@example.com>")
948 self.assertEqual(resp, "220 0 <45223423@example.com>")
949 art_num, message_id, lines = info
950 self.assertEqual(art_num, 0)
951 self.assertEqual(message_id, "<45223423@example.com>")
952 self._check_article_data(lines)
953 # Non-existent id
954 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
955 self.server.article("<non-existent@example.com>")
956 self.assertEqual(cm.exception.response, "430 No Such Article Found")
957
958 def test_article_file(self):
959 # With a "file" argument
960 f = io.BytesIO()
961 resp, info = self.server.article(file=f)
962 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
963 art_num, message_id, lines = info
964 self.assertEqual(art_num, 3000237)
965 self.assertEqual(message_id, "<45223423@example.com>")
966 self.assertEqual(lines, [])
967 data = f.getvalue()
968 self.assertTrue(data.startswith(
969 b'From: "Demo User" <nobody@example.net>\r\n'
970 b'Subject: I am just a test article\r\n'
971 ), ascii(data))
972 self.assertTrue(data.endswith(
973 b'This is just a test article.\r\n'
974 b'.Here is a dot-starting line.\r\n'
975 b'\r\n'
976 b'-- Signed by Andr\xc3\xa9.\r\n'
977 ), ascii(data))
978
979 def test_head(self):
980 # HEAD
981 resp, info = self.server.head()
982 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
983 art_num, message_id, lines = info
984 self.assertEqual(art_num, 3000237)
985 self.assertEqual(message_id, "<45223423@example.com>")
986 self._check_article_head(lines)
987 # HEAD num
988 resp, info = self.server.head(3000234)
989 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
990 art_num, message_id, lines = info
991 self.assertEqual(art_num, 3000234)
992 self.assertEqual(message_id, "<45223423@example.com>")
993 self._check_article_head(lines)
994 # HEAD id
995 resp, info = self.server.head("<45223423@example.com>")
996 self.assertEqual(resp, "221 0 <45223423@example.com>")
997 art_num, message_id, lines = info
998 self.assertEqual(art_num, 0)
999 self.assertEqual(message_id, "<45223423@example.com>")
1000 self._check_article_head(lines)
1001 # Non-existent id
1002 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1003 self.server.head("<non-existent@example.com>")
1004 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1005
Antoine Pitrou2640b522012-02-15 18:53:18 +01001006 def test_head_file(self):
1007 f = io.BytesIO()
1008 resp, info = self.server.head(file=f)
1009 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1010 art_num, message_id, lines = info
1011 self.assertEqual(art_num, 3000237)
1012 self.assertEqual(message_id, "<45223423@example.com>")
1013 self.assertEqual(lines, [])
1014 data = f.getvalue()
1015 self.assertTrue(data.startswith(
1016 b'From: "Demo User" <nobody@example.net>\r\n'
1017 b'Subject: I am just a test article\r\n'
1018 ), ascii(data))
1019 self.assertFalse(data.endswith(
1020 b'This is just a test article.\r\n'
1021 b'.Here is a dot-starting line.\r\n'
1022 b'\r\n'
1023 b'-- Signed by Andr\xc3\xa9.\r\n'
1024 ), ascii(data))
1025
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001026 def test_body(self):
1027 # BODY
1028 resp, info = self.server.body()
1029 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1030 art_num, message_id, lines = info
1031 self.assertEqual(art_num, 3000237)
1032 self.assertEqual(message_id, "<45223423@example.com>")
1033 self._check_article_body(lines)
1034 # BODY num
1035 resp, info = self.server.body(3000234)
1036 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
1037 art_num, message_id, lines = info
1038 self.assertEqual(art_num, 3000234)
1039 self.assertEqual(message_id, "<45223423@example.com>")
1040 self._check_article_body(lines)
1041 # BODY id
1042 resp, info = self.server.body("<45223423@example.com>")
1043 self.assertEqual(resp, "222 0 <45223423@example.com>")
1044 art_num, message_id, lines = info
1045 self.assertEqual(art_num, 0)
1046 self.assertEqual(message_id, "<45223423@example.com>")
1047 self._check_article_body(lines)
1048 # Non-existent id
1049 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1050 self.server.body("<non-existent@example.com>")
1051 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1052
Antoine Pitrou2640b522012-02-15 18:53:18 +01001053 def test_body_file(self):
1054 f = io.BytesIO()
1055 resp, info = self.server.body(file=f)
1056 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1057 art_num, message_id, lines = info
1058 self.assertEqual(art_num, 3000237)
1059 self.assertEqual(message_id, "<45223423@example.com>")
1060 self.assertEqual(lines, [])
1061 data = f.getvalue()
1062 self.assertFalse(data.startswith(
1063 b'From: "Demo User" <nobody@example.net>\r\n'
1064 b'Subject: I am just a test article\r\n'
1065 ), ascii(data))
1066 self.assertTrue(data.endswith(
1067 b'This is just a test article.\r\n'
1068 b'.Here is a dot-starting line.\r\n'
1069 b'\r\n'
1070 b'-- Signed by Andr\xc3\xa9.\r\n'
1071 ), ascii(data))
1072
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001073 def check_over_xover_resp(self, resp, overviews):
1074 self.assertTrue(resp.startswith("224 "), resp)
1075 self.assertEqual(len(overviews), 3)
1076 art_num, over = overviews[0]
1077 self.assertEqual(art_num, 57)
1078 self.assertEqual(over, {
1079 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
1080 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
1081 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
1082 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
1083 "references": "<hvalf7$ort$1@dough.gmane.org>",
1084 ":bytes": "7103",
1085 ":lines": "16",
1086 "xref": "news.gmane.org gmane.comp.python.authors:57"
1087 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001088 art_num, over = overviews[1]
1089 self.assertEqual(over["xref"], None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001090 art_num, over = overviews[2]
1091 self.assertEqual(over["subject"],
1092 "Re: Message d'erreur incompréhensible (par moi)")
1093
1094 def test_xover(self):
1095 resp, overviews = self.server.xover(57, 59)
1096 self.check_over_xover_resp(resp, overviews)
1097
1098 def test_over(self):
1099 # In NNTP "v1", this will fallback on XOVER
1100 resp, overviews = self.server.over((57, 59))
1101 self.check_over_xover_resp(resp, overviews)
1102
1103 sample_post = (
1104 b'From: "Demo User" <nobody@example.net>\r\n'
1105 b'Subject: I am just a test article\r\n'
1106 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
1107 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
1108 b'\r\n'
1109 b'This is just a test article.\r\n'
1110 b'.Here is a dot-starting line.\r\n'
1111 b'\r\n'
1112 b'-- Signed by Andr\xc3\xa9.\r\n'
1113 )
1114
1115 def _check_posted_body(self):
1116 # Check the raw body as received by the server
1117 lines = self.handler.posted_body
1118 # One additional line for the "." terminator
1119 self.assertEqual(len(lines), 10)
1120 self.assertEqual(lines[-1], b'.\r\n')
1121 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
1122 self.assertEqual(lines[-3], b'\r\n')
1123 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
1124 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
1125
1126 def _check_post_ihave_sub(self, func, *args, file_factory):
1127 # First the prepared post with CRLF endings
1128 post = self.sample_post
1129 func_args = args + (file_factory(post),)
1130 self.handler.posted_body = None
1131 resp = func(*func_args)
1132 self._check_posted_body()
1133 # Then the same post with "normal" line endings - they should be
1134 # converted by NNTP.post and NNTP.ihave.
1135 post = self.sample_post.replace(b"\r\n", b"\n")
1136 func_args = args + (file_factory(post),)
1137 self.handler.posted_body = None
1138 resp = func(*func_args)
1139 self._check_posted_body()
1140 return resp
1141
1142 def check_post_ihave(self, func, success_resp, *args):
1143 # With a bytes object
1144 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1145 self.assertEqual(resp, success_resp)
1146 # With a bytearray object
1147 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1148 self.assertEqual(resp, success_resp)
1149 # With a file object
1150 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1151 self.assertEqual(resp, success_resp)
1152 # With an iterable of terminated lines
1153 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001154 return iter(b.splitlines(keepends=True))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001155 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1156 self.assertEqual(resp, success_resp)
1157 # With an iterable of non-terminated lines
1158 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001159 return iter(b.splitlines(keepends=False))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001160 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1161 self.assertEqual(resp, success_resp)
1162
1163 def test_post(self):
1164 self.check_post_ihave(self.server.post, "240 Article received OK")
1165 self.handler.allow_posting = False
1166 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1167 self.server.post(self.sample_post)
1168 self.assertEqual(cm.exception.response,
1169 "440 Posting not permitted")
1170
1171 def test_ihave(self):
1172 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1173 "<i.am.an.article.you.will.want@example.com>")
1174 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1175 self.server.ihave("<another.message.id>", self.sample_post)
1176 self.assertEqual(cm.exception.response,
1177 "435 Article not wanted")
1178
1179
1180class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1181 """Tests an NNTP v1 server (no capabilities)."""
1182
1183 nntp_version = 1
1184 handler_class = NNTPv1Handler
1185
1186 def test_caps(self):
1187 caps = self.server.getcapabilities()
1188 self.assertEqual(caps, {})
1189 self.assertEqual(self.server.nntp_version, 1)
Antoine Pitroua0781152010-11-05 19:16:37 +00001190 self.assertEqual(self.server.nntp_implementation, None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001191
1192
1193class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1194 """Tests an NNTP v2 server (with capabilities)."""
1195
1196 nntp_version = 2
1197 handler_class = NNTPv2Handler
1198
1199 def test_caps(self):
1200 caps = self.server.getcapabilities()
1201 self.assertEqual(caps, {
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001202 'VERSION': ['2', '3'],
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001203 'IMPLEMENTATION': ['INN', '2.5.1'],
1204 'AUTHINFO': ['USER'],
1205 'HDR': [],
1206 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1207 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1208 'OVER': [],
1209 'POST': [],
1210 'READER': [],
1211 })
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001212 self.assertEqual(self.server.nntp_version, 3)
Antoine Pitroua0781152010-11-05 19:16:37 +00001213 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001214
1215
Antoine Pitrou54411c12012-02-12 19:14:17 +01001216class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
1217 """Tests a probably NNTP v2 server with capabilities only after login."""
1218
1219 nntp_version = 2
1220 handler_class = CapsAfterLoginNNTPv2Handler
1221
1222 def test_caps_only_after_login(self):
1223 self.assertEqual(self.server._caps, {})
1224 self.server.login('testuser', 'testpw')
1225 self.assertIn('VERSION', self.server._caps)
1226
1227
Antoine Pitrou71135622012-02-14 23:29:34 +01001228class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin,
1229 unittest.TestCase):
1230 """Same tests as for v2 but we tell NTTP to send MODE READER to a server
1231 that isn't in READER mode by default."""
1232
1233 nntp_version = 2
1234 handler_class = ModeSwitchingNNTPv2Handler
1235
1236 def test_we_are_in_reader_mode_after_connect(self):
1237 self.assertIn('READER', self.server._caps)
1238
1239
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001240class MiscTests(unittest.TestCase):
1241
1242 def test_decode_header(self):
1243 def gives(a, b):
1244 self.assertEqual(nntplib.decode_header(a), b)
1245 gives("" , "")
1246 gives("a plain header", "a plain header")
1247 gives(" with extra spaces ", " with extra spaces ")
1248 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
1249 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1250 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1251 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1252 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1253 "Re: problème de matrice")
1254 # A natively utf-8 header (found in the real world!)
1255 gives("Re: Message d'erreur incompréhensible (par moi)",
1256 "Re: Message d'erreur incompréhensible (par moi)")
1257
1258 def test_parse_overview_fmt(self):
1259 # The minimal (default) response
1260 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1261 "References:", ":bytes", ":lines"]
1262 self.assertEqual(nntplib._parse_overview_fmt(lines),
1263 ["subject", "from", "date", "message-id", "references",
1264 ":bytes", ":lines"])
1265 # The minimal response using alternative names
1266 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1267 "References:", "Bytes:", "Lines:"]
1268 self.assertEqual(nntplib._parse_overview_fmt(lines),
1269 ["subject", "from", "date", "message-id", "references",
1270 ":bytes", ":lines"])
1271 # Variations in casing
1272 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1273 "References:", "BYTES:", "Lines:"]
1274 self.assertEqual(nntplib._parse_overview_fmt(lines),
1275 ["subject", "from", "date", "message-id", "references",
1276 ":bytes", ":lines"])
1277 # First example from RFC 3977
1278 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1279 "References:", ":bytes", ":lines", "Xref:full",
1280 "Distribution:full"]
1281 self.assertEqual(nntplib._parse_overview_fmt(lines),
1282 ["subject", "from", "date", "message-id", "references",
1283 ":bytes", ":lines", "xref", "distribution"])
1284 # Second example from RFC 3977
1285 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1286 "References:", "Bytes:", "Lines:", "Xref:FULL",
1287 "Distribution:FULL"]
1288 self.assertEqual(nntplib._parse_overview_fmt(lines),
1289 ["subject", "from", "date", "message-id", "references",
1290 ":bytes", ":lines", "xref", "distribution"])
1291 # A classic response from INN
1292 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1293 "References:", "Bytes:", "Lines:", "Xref:full"]
1294 self.assertEqual(nntplib._parse_overview_fmt(lines),
1295 ["subject", "from", "date", "message-id", "references",
1296 ":bytes", ":lines", "xref"])
1297
1298 def test_parse_overview(self):
1299 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1300 # First example from RFC 3977
1301 lines = [
1302 '3000234\tI am just a test article\t"Demo User" '
1303 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1304 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1305 '17\tXref: news.example.com misc.test:3000363',
1306 ]
1307 overview = nntplib._parse_overview(lines, fmt)
1308 (art_num, fields), = overview
1309 self.assertEqual(art_num, 3000234)
1310 self.assertEqual(fields, {
1311 'subject': 'I am just a test article',
1312 'from': '"Demo User" <nobody@example.com>',
1313 'date': '6 Oct 1998 04:38:40 -0500',
1314 'message-id': '<45223423@example.com>',
1315 'references': '<45454@example.net>',
1316 ':bytes': '1234',
1317 ':lines': '17',
1318 'xref': 'news.example.com misc.test:3000363',
1319 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001320 # Second example; here the "Xref" field is totally absent (including
1321 # the header name) and comes out as None
1322 lines = [
1323 '3000234\tI am just a test article\t"Demo User" '
1324 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1325 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1326 '17\t\t',
1327 ]
1328 overview = nntplib._parse_overview(lines, fmt)
1329 (art_num, fields), = overview
1330 self.assertEqual(fields['xref'], None)
1331 # Third example; the "Xref" is an empty string, while "references"
1332 # is a single space.
1333 lines = [
1334 '3000234\tI am just a test article\t"Demo User" '
1335 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1336 '<45223423@example.com>\t \t1234\t'
1337 '17\tXref: \t',
1338 ]
1339 overview = nntplib._parse_overview(lines, fmt)
1340 (art_num, fields), = overview
1341 self.assertEqual(fields['references'], ' ')
1342 self.assertEqual(fields['xref'], '')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001343
1344 def test_parse_datetime(self):
1345 def gives(a, b, *c):
1346 self.assertEqual(nntplib._parse_datetime(a, b),
1347 datetime.datetime(*c))
1348 # Output of DATE command
1349 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1350 # Variations
1351 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1352 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1353 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1354
1355 def test_unparse_datetime(self):
1356 # Test non-legacy mode
1357 # 1) with a datetime
1358 def gives(y, M, d, h, m, s, date_str, time_str):
1359 dt = datetime.datetime(y, M, d, h, m, s)
1360 self.assertEqual(nntplib._unparse_datetime(dt),
1361 (date_str, time_str))
1362 self.assertEqual(nntplib._unparse_datetime(dt, False),
1363 (date_str, time_str))
1364 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1365 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1366 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1367 # 2) with a date
1368 def gives(y, M, d, date_str, time_str):
1369 dt = datetime.date(y, M, d)
1370 self.assertEqual(nntplib._unparse_datetime(dt),
1371 (date_str, time_str))
1372 self.assertEqual(nntplib._unparse_datetime(dt, False),
1373 (date_str, time_str))
1374 gives(1999, 6, 23, "19990623", "000000")
1375 gives(2000, 6, 23, "20000623", "000000")
1376 gives(2010, 6, 5, "20100605", "000000")
1377
1378 def test_unparse_datetime_legacy(self):
1379 # Test legacy mode (RFC 977)
1380 # 1) with a datetime
1381 def gives(y, M, d, h, m, s, date_str, time_str):
1382 dt = datetime.datetime(y, M, d, h, m, s)
1383 self.assertEqual(nntplib._unparse_datetime(dt, True),
1384 (date_str, time_str))
1385 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1386 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1387 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1388 # 2) with a date
1389 def gives(y, M, d, date_str, time_str):
1390 dt = datetime.date(y, M, d)
1391 self.assertEqual(nntplib._unparse_datetime(dt, True),
1392 (date_str, time_str))
1393 gives(1999, 6, 23, "990623", "000000")
1394 gives(2000, 6, 23, "000623", "000000")
1395 gives(2010, 6, 5, "100605", "000000")
1396
1397
1398def test_main():
Antoine Pitrou54411c12012-02-12 19:14:17 +01001399 tests = [MiscTests, NNTPv1Tests, NNTPv2Tests, CapsAfterLoginNNTPv2Tests,
Antoine Pitrou71135622012-02-14 23:29:34 +01001400 SendReaderNNTPv2Tests, NetworkedNNTPTests]
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00001401 if _have_ssl:
1402 tests.append(NetworkedNNTP_SSLTests)
1403 support.run_unittest(*tests)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001404
1405
1406if __name__ == "__main__":
1407 test_main()