blob: c5146c1e9251620226c3850c3263e194be9760e4 [file] [log] [blame]
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001import io
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +00002import socket
Antoine Pitrou69ab9512010-09-29 15:03:40 +00003import datetime
4import textwrap
5import unittest
Antoine Pitroude609182010-11-18 17:29:23 +00006import functools
Antoine Pitrou69ab9512010-09-29 15:03:40 +00007import contextlib
8from test import support
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00009from nntplib import NNTP, GroupInfo, _have_ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +000010import nntplib
Antoine Pitrou1cb121e2010-11-09 18:54:37 +000011if _have_ssl:
12 import ssl
Antoine Pitrou69ab9512010-09-29 15:03:40 +000013
14TIMEOUT = 30
15
16# TODO:
17# - test the `file` arg to more commands
18# - test error conditions
Antoine Pitroua5785b12010-09-29 16:19:50 +000019# - test auth and `usenetrc`
Antoine Pitrou69ab9512010-09-29 15:03:40 +000020
21
22class NetworkedNNTPTestsMixin:
23
24 def test_welcome(self):
25 welcome = self.server.getwelcome()
26 self.assertEqual(str, type(welcome))
27
28 def test_help(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000029 resp, lines = self.server.help()
Antoine Pitrou69ab9512010-09-29 15:03:40 +000030 self.assertTrue(resp.startswith("100 "), resp)
Antoine Pitrou08eeada2010-11-04 21:36:15 +000031 for line in lines:
Antoine Pitrou69ab9512010-09-29 15:03:40 +000032 self.assertEqual(str, type(line))
33
34 def test_list(self):
Antoine Pitrou08eeada2010-11-04 21:36:15 +000035 resp, groups = self.server.list()
36 if len(groups) > 0:
37 self.assertEqual(GroupInfo, type(groups[0]))
38 self.assertEqual(str, type(groups[0].group))
39
40 def test_list_active(self):
41 resp, groups = self.server.list(self.GROUP_PAT)
42 if len(groups) > 0:
43 self.assertEqual(GroupInfo, type(groups[0]))
44 self.assertEqual(str, type(groups[0].group))
Antoine Pitrou69ab9512010-09-29 15:03:40 +000045
46 def test_unknown_command(self):
47 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
48 self.server._shortcmd("XYZZY")
49 resp = cm.exception.response
50 self.assertTrue(resp.startswith("500 "), resp)
51
52 def test_newgroups(self):
53 # gmane gets a constant influx of new groups. In order not to stress
54 # the server too much, we choose a recent date in the past.
55 dt = datetime.date.today() - datetime.timedelta(days=7)
56 resp, groups = self.server.newgroups(dt)
57 if len(groups) > 0:
58 self.assertIsInstance(groups[0], GroupInfo)
59 self.assertIsInstance(groups[0].group, str)
60
61 def test_description(self):
62 def _check_desc(desc):
63 # Sanity checks
64 self.assertIsInstance(desc, str)
65 self.assertNotIn(self.GROUP_NAME, desc)
66 desc = self.server.description(self.GROUP_NAME)
67 _check_desc(desc)
68 # Another sanity check
69 self.assertIn("Python", desc)
70 # With a pattern
71 desc = self.server.description(self.GROUP_PAT)
72 _check_desc(desc)
73 # Shouldn't exist
74 desc = self.server.description("zk.brrtt.baz")
75 self.assertEqual(desc, '')
76
77 def test_descriptions(self):
78 resp, descs = self.server.descriptions(self.GROUP_PAT)
79 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
80 self.assertTrue(
81 resp.startswith("215 ") or resp.startswith("282 "), resp)
82 self.assertIsInstance(descs, dict)
83 desc = descs[self.GROUP_NAME]
84 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
85
86 def test_group(self):
87 result = self.server.group(self.GROUP_NAME)
88 self.assertEqual(5, len(result))
89 resp, count, first, last, group = result
90 self.assertEqual(group, self.GROUP_NAME)
91 self.assertIsInstance(count, int)
92 self.assertIsInstance(first, int)
93 self.assertIsInstance(last, int)
94 self.assertLessEqual(first, last)
95 self.assertTrue(resp.startswith("211 "), resp)
96
97 def test_date(self):
98 resp, date = self.server.date()
99 self.assertIsInstance(date, datetime.datetime)
100 # Sanity check
101 self.assertGreaterEqual(date.year, 1995)
102 self.assertLessEqual(date.year, 2030)
103
104 def _check_art_dict(self, art_dict):
105 # Some sanity checks for a field dictionary returned by OVER / XOVER
106 self.assertIsInstance(art_dict, dict)
107 # NNTP has 7 mandatory fields
108 self.assertGreaterEqual(art_dict.keys(),
109 {"subject", "from", "date", "message-id",
110 "references", ":bytes", ":lines"}
111 )
112 for v in art_dict.values():
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000113 self.assertIsInstance(v, (str, type(None)))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000114
115 def test_xover(self):
116 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000117 resp, lines = self.server.xover(last - 5, last)
118 if len(lines) == 0:
119 self.skipTest("no articles retrieved")
120 # The 'last' article is not necessarily part of the output (cancelled?)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000121 art_num, art_dict = lines[0]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000122 self.assertGreaterEqual(art_num, last - 5)
123 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000124 self._check_art_dict(art_dict)
125
126 def test_over(self):
127 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
128 start = last - 10
129 # The "start-" article range form
130 resp, lines = self.server.over((start, None))
131 art_num, art_dict = lines[0]
132 self._check_art_dict(art_dict)
133 # The "start-end" article range form
134 resp, lines = self.server.over((start, last))
135 art_num, art_dict = lines[-1]
Antoine Pitroud28f7902010-11-18 15:11:43 +0000136 # The 'last' article is not necessarily part of the output (cancelled?)
137 self.assertGreaterEqual(art_num, start)
138 self.assertLessEqual(art_num, last)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000139 self._check_art_dict(art_dict)
140 # XXX The "message_id" form is unsupported by gmane
141 # 503 Overview by message-ID unsupported
142
143 def test_xhdr(self):
144 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
145 resp, lines = self.server.xhdr('subject', last)
146 for line in lines:
147 self.assertEqual(str, type(line[1]))
148
149 def check_article_resp(self, resp, article, art_num=None):
150 self.assertIsInstance(article, nntplib.ArticleInfo)
151 if art_num is not None:
152 self.assertEqual(article.number, art_num)
153 for line in article.lines:
154 self.assertIsInstance(line, bytes)
155 # XXX this could exceptionally happen...
156 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
157
158 def test_article_head_body(self):
159 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000160 # Try to find an available article
161 for art_num in (last, first, last - 1):
162 try:
163 resp, head = self.server.head(art_num)
164 except nntplib.NNTPTemporaryError as e:
165 if not e.response.startswith("423 "):
166 raise
167 # "423 No such article" => choose another one
168 continue
169 break
170 else:
171 self.skipTest("could not find a suitable article number")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000172 self.assertTrue(resp.startswith("221 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000173 self.check_article_resp(resp, head, art_num)
174 resp, body = self.server.body(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000175 self.assertTrue(resp.startswith("222 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000176 self.check_article_resp(resp, body, art_num)
177 resp, article = self.server.article(art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000178 self.assertTrue(resp.startswith("220 "), resp)
Antoine Pitroud28f7902010-11-18 15:11:43 +0000179 self.check_article_resp(resp, article, art_num)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000180 self.assertEqual(article.lines, head.lines + [b''] + body.lines)
181
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000182 def test_capabilities(self):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000183 # The server under test implements NNTP version 2 and has a
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000184 # couple of well-known capabilities. Just sanity check that we
185 # got them.
186 def _check_caps(caps):
187 caps_list = caps['LIST']
188 self.assertIsInstance(caps_list, (list, tuple))
189 self.assertIn('OVERVIEW.FMT', caps_list)
190 self.assertGreaterEqual(self.server.nntp_version, 2)
191 _check_caps(self.server.getcapabilities())
192 # This re-emits the command
193 resp, caps = self.server.capabilities()
194 _check_caps(caps)
195
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000196 if _have_ssl:
197 def test_starttls(self):
198 file = self.server.file
199 sock = self.server.sock
200 try:
201 self.server.starttls()
202 except nntplib.NNTPPermanentError:
203 self.skipTest("STARTTLS not supported by server.")
204 else:
205 # Check that the socket and internal pseudo-file really were
206 # changed.
207 self.assertNotEqual(file, self.server.file)
208 self.assertNotEqual(sock, self.server.sock)
209 # Check that the new socket really is an SSL one
210 self.assertIsInstance(self.server.sock, ssl.SSLSocket)
211 # Check that trying starttls when it's already active fails.
212 self.assertRaises(ValueError, self.server.starttls)
213
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000214 def test_zlogin(self):
215 # This test must be the penultimate because further commands will be
216 # refused.
217 baduser = "notarealuser"
218 badpw = "notarealpassword"
219 # Check that bogus credentials cause failure
220 self.assertRaises(nntplib.NNTPError, self.server.login,
221 user=baduser, password=badpw, usenetrc=False)
222 # FIXME: We should check that correct credentials succeed, but that
223 # would require valid details for some server somewhere to be in the
224 # test suite, I think. Gmane is anonymous, at least as used for the
225 # other tests.
226
227 def test_zzquit(self):
228 # This test must be called last, hence the name
229 cls = type(self)
Antoine Pitrou3bce11c2010-11-21 17:14:19 +0000230 try:
231 self.server.quit()
232 finally:
233 cls.server = None
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000234
Antoine Pitroude609182010-11-18 17:29:23 +0000235 @classmethod
236 def wrap_methods(cls):
237 # Wrap all methods in a transient_internet() exception catcher
238 # XXX put a generic version in test.support?
239 def wrap_meth(meth):
240 @functools.wraps(meth)
241 def wrapped(self):
242 with support.transient_internet(self.NNTP_HOST):
243 meth(self)
244 return wrapped
245 for name in dir(cls):
246 if not name.startswith('test_'):
247 continue
248 meth = getattr(cls, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200249 if not callable(meth):
Antoine Pitroude609182010-11-18 17:29:23 +0000250 continue
251 # Need to use a closure so that meth remains bound to its current
252 # value
253 setattr(cls, name, wrap_meth(meth))
254
Giampaolo RodolĂ 424298a2011-03-03 18:34:06 +0000255 def test_with_statement(self):
256 def is_connected():
257 if not hasattr(server, 'file'):
258 return False
259 try:
260 server.help()
261 except (socket.error, EOFError):
262 return False
263 return True
264
265 with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
266 self.assertTrue(is_connected())
267 self.assertTrue(server.help())
268 self.assertFalse(is_connected())
269
270 with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
271 server.quit()
272 self.assertFalse(is_connected())
273
274
Antoine Pitroude609182010-11-18 17:29:23 +0000275NetworkedNNTPTestsMixin.wrap_methods()
276
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000277
278class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
279 # This server supports STARTTLS (gmane doesn't)
280 NNTP_HOST = 'news.trigofacile.com'
281 GROUP_NAME = 'fr.comp.lang.python'
282 GROUP_PAT = 'fr.comp.lang.*'
283
Antoine Pitroude609182010-11-18 17:29:23 +0000284 NNTP_CLASS = NNTP
285
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000286 @classmethod
287 def setUpClass(cls):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000288 support.requires("network")
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000289 with support.transient_internet(cls.NNTP_HOST):
Antoine Pitroude609182010-11-18 17:29:23 +0000290 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, timeout=TIMEOUT, usenetrc=False)
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000291
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000292 @classmethod
293 def tearDownClass(cls):
294 if cls.server is not None:
295 cls.server.quit()
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000296
297
298if _have_ssl:
Antoine Pitroude609182010-11-18 17:29:23 +0000299 class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000300
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000301 # Technical limits for this public NNTP server (see http://www.aioe.org):
302 # "Only two concurrent connections per IP address are allowed and
303 # 400 connections per day are accepted from each IP address."
304
305 NNTP_HOST = 'nntp.aioe.org'
306 GROUP_NAME = 'comp.lang.python'
307 GROUP_PAT = 'comp.lang.*'
308
Antoine Pitroude609182010-11-18 17:29:23 +0000309 NNTP_CLASS = nntplib.NNTP_SSL
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000310
Antoine Pitrou45ca9872010-11-13 00:28:53 +0000311 # Disabled as it produces too much data
Antoine Pitrou1cb121e2010-11-09 18:54:37 +0000312 test_list = None
313
314 # Disabled as the connection will already be encrypted.
315 test_starttls = None
316
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000317
318#
319# Non-networked tests using a local server (or something mocking it).
320#
321
322class _NNTPServerIO(io.RawIOBase):
323 """A raw IO object allowing NNTP commands to be received and processed
324 by a handler. The handler can push responses which can then be read
325 from the IO object."""
326
327 def __init__(self, handler):
328 io.RawIOBase.__init__(self)
329 # The channel from the client
330 self.c2s = io.BytesIO()
331 # The channel to the client
332 self.s2c = io.BytesIO()
333 self.handler = handler
334 self.handler.start(self.c2s.readline, self.push_data)
335
336 def readable(self):
337 return True
338
339 def writable(self):
340 return True
341
342 def push_data(self, data):
343 """Push (buffer) some data to send to the client."""
344 pos = self.s2c.tell()
345 self.s2c.seek(0, 2)
346 self.s2c.write(data)
347 self.s2c.seek(pos)
348
349 def write(self, b):
350 """The client sends us some data"""
351 pos = self.c2s.tell()
352 self.c2s.write(b)
353 self.c2s.seek(pos)
354 self.handler.process_pending()
355 return len(b)
356
357 def readinto(self, buf):
358 """The client wants to read a response"""
359 self.handler.process_pending()
360 b = self.s2c.read(len(buf))
361 n = len(b)
362 buf[:n] = b
363 return n
364
365
366class MockedNNTPTestsMixin:
367 # Override in derived classes
368 handler_class = None
369
370 def setUp(self):
371 super().setUp()
372 self.make_server()
373
374 def tearDown(self):
375 super().tearDown()
376 del self.server
377
378 def make_server(self, *args, **kwargs):
379 self.handler = self.handler_class()
380 self.sio = _NNTPServerIO(self.handler)
381 # Using BufferedRWPair instead of BufferedRandom ensures the file
382 # isn't seekable.
383 file = io.BufferedRWPair(self.sio, self.sio)
Antoine Pitroua5785b12010-09-29 16:19:50 +0000384 self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000385 return self.server
386
387
Antoine Pitrou71135622012-02-14 23:29:34 +0100388class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin):
389 def setUp(self):
390 super().setUp()
391 self.make_server(readermode=True)
392
393
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000394class NNTPv1Handler:
395 """A handler for RFC 977"""
396
397 welcome = "200 NNTP mock server"
398
399 def start(self, readline, push_data):
400 self.in_body = False
401 self.allow_posting = True
402 self._readline = readline
403 self._push_data = push_data
Antoine Pitrou54411c12012-02-12 19:14:17 +0100404 self._logged_in = False
405 self._user_sent = False
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000406 # Our welcome
407 self.handle_welcome()
408
409 def _decode(self, data):
410 return str(data, "utf-8", "surrogateescape")
411
412 def process_pending(self):
413 if self.in_body:
414 while True:
415 line = self._readline()
416 if not line:
417 return
418 self.body.append(line)
419 if line == b".\r\n":
420 break
421 try:
422 meth, tokens = self.body_callback
423 meth(*tokens, body=self.body)
424 finally:
425 self.body_callback = None
426 self.body = None
427 self.in_body = False
428 while True:
429 line = self._decode(self._readline())
430 if not line:
431 return
432 if not line.endswith("\r\n"):
433 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
434 line = line[:-2]
435 cmd, *tokens = line.split()
436 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
437 meth = getattr(self, "handle_" + cmd.upper(), None)
438 if meth is None:
439 self.handle_unknown()
440 else:
441 try:
442 meth(*tokens)
443 except Exception as e:
444 raise ValueError("command failed: {!r}".format(line)) from e
445 else:
446 if self.in_body:
447 self.body_callback = meth, tokens
448 self.body = []
449
450 def expect_body(self):
451 """Flag that the client is expected to post a request body"""
452 self.in_body = True
453
454 def push_data(self, data):
455 """Push some binary data"""
456 self._push_data(data)
457
458 def push_lit(self, lit):
459 """Push a string literal"""
460 lit = textwrap.dedent(lit)
461 lit = "\r\n".join(lit.splitlines()) + "\r\n"
462 lit = lit.encode('utf-8')
463 self.push_data(lit)
464
465 def handle_unknown(self):
466 self.push_lit("500 What?")
467
468 def handle_welcome(self):
469 self.push_lit(self.welcome)
470
471 def handle_QUIT(self):
472 self.push_lit("205 Bye!")
473
474 def handle_DATE(self):
475 self.push_lit("111 20100914001155")
476
477 def handle_GROUP(self, group):
478 if group == "fr.comp.lang.python":
479 self.push_lit("211 486 761 1265 fr.comp.lang.python")
480 else:
481 self.push_lit("411 No such group {}".format(group))
482
483 def handle_HELP(self):
484 self.push_lit("""\
485 100 Legal commands
486 authinfo user Name|pass Password|generic <prog> <args>
487 date
488 help
489 Report problems to <root@example.org>
490 .""")
491
492 def handle_STAT(self, message_spec=None):
493 if message_spec is None:
494 self.push_lit("412 No newsgroup selected")
495 elif message_spec == "3000234":
496 self.push_lit("223 3000234 <45223423@example.com>")
497 elif message_spec == "<45223423@example.com>":
498 self.push_lit("223 0 <45223423@example.com>")
499 else:
500 self.push_lit("430 No Such Article Found")
501
502 def handle_NEXT(self):
503 self.push_lit("223 3000237 <668929@example.org> retrieved")
504
505 def handle_LAST(self):
506 self.push_lit("223 3000234 <45223423@example.com> retrieved")
507
508 def handle_LIST(self, action=None, param=None):
509 if action is None:
510 self.push_lit("""\
511 215 Newsgroups in form "group high low flags".
512 comp.lang.python 0000052340 0000002828 y
513 comp.lang.python.announce 0000001153 0000000993 m
514 free.it.comp.lang.python 0000000002 0000000002 y
515 fr.comp.lang.python 0000001254 0000000760 y
516 free.it.comp.lang.python.learner 0000000000 0000000001 y
517 tw.bbs.comp.lang.python 0000000304 0000000304 y
518 .""")
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000519 elif action == "ACTIVE":
520 if param == "*distutils*":
521 self.push_lit("""\
522 215 Newsgroups in form "group high low flags"
523 gmane.comp.python.distutils.devel 0000014104 0000000001 m
524 gmane.comp.python.distutils.cvs 0000000000 0000000001 m
525 .""")
526 else:
527 self.push_lit("""\
528 215 Newsgroups in form "group high low flags"
529 .""")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000530 elif action == "OVERVIEW.FMT":
531 self.push_lit("""\
532 215 Order of fields in overview database.
533 Subject:
534 From:
535 Date:
536 Message-ID:
537 References:
538 Bytes:
539 Lines:
540 Xref:full
541 .""")
542 elif action == "NEWSGROUPS":
543 assert param is not None
544 if param == "comp.lang.python":
545 self.push_lit("""\
546 215 Descriptions in form "group description".
547 comp.lang.python\tThe Python computer language.
548 .""")
549 elif param == "comp.lang.python*":
550 self.push_lit("""\
551 215 Descriptions in form "group description".
552 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
553 comp.lang.python\tThe Python computer language.
554 .""")
555 else:
556 self.push_lit("""\
557 215 Descriptions in form "group description".
558 .""")
559 else:
560 self.push_lit('501 Unknown LIST keyword')
561
562 def handle_NEWNEWS(self, group, date_str, time_str):
563 # We hard code different return messages depending on passed
564 # argument and date syntax.
565 if (group == "comp.lang.python" and date_str == "20100913"
566 and time_str == "082004"):
567 # Date was passed in RFC 3977 format (NNTP "v2")
568 self.push_lit("""\
569 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
570 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
571 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
572 .""")
573 elif (group == "comp.lang.python" and date_str == "100913"
574 and time_str == "082004"):
575 # Date was passed in RFC 977 format (NNTP "v1")
576 self.push_lit("""\
577 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
578 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
579 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
580 .""")
581 else:
582 self.push_lit("""\
583 230 An empty list of newsarticles follows
584 .""")
585 # (Note for experiments: many servers disable NEWNEWS.
586 # As of this writing, sicinfo3.epfl.ch doesn't.)
587
588 def handle_XOVER(self, message_spec):
589 if message_spec == "57-59":
590 self.push_lit(
591 "224 Overview information for 57-58 follows\n"
592 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
593 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
594 "\tSat, 19 Jun 2010 18:04:08 -0400"
595 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
596 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
597 "\tXref: news.gmane.org gmane.comp.python.authors:57"
598 "\n"
599 "58\tLooking for a few good bloggers"
600 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
601 "\tThu, 22 Jul 2010 09:14:14 -0400"
602 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
603 "\t\t6683\t16"
Antoine Pitrou4103bc02010-11-03 18:18:43 +0000604 "\t"
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000605 "\n"
606 # An UTF-8 overview line from fr.comp.lang.python
607 "59\tRe: Message d'erreur incompréhensible (par moi)"
608 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
609 "\tWed, 15 Sep 2010 18:09:15 +0200"
610 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
611 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
612 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
613 "\n"
614 ".\n")
615 else:
616 self.push_lit("""\
617 224 No articles
618 .""")
619
620 def handle_POST(self, *, body=None):
621 if body is None:
622 if self.allow_posting:
623 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
624 self.expect_body()
625 else:
626 self.push_lit("440 Posting not permitted")
627 else:
628 assert self.allow_posting
629 self.push_lit("240 Article received OK")
630 self.posted_body = body
631
632 def handle_IHAVE(self, message_id, *, body=None):
633 if body is None:
634 if (self.allow_posting and
635 message_id == "<i.am.an.article.you.will.want@example.com>"):
636 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
637 self.expect_body()
638 else:
639 self.push_lit("435 Article not wanted")
640 else:
641 assert self.allow_posting
642 self.push_lit("235 Article transferred OK")
643 self.posted_body = body
644
645 sample_head = """\
646 From: "Demo User" <nobody@example.net>
647 Subject: I am just a test article
648 Content-Type: text/plain; charset=UTF-8; format=flowed
649 Message-ID: <i.am.an.article.you.will.want@example.com>"""
650
651 sample_body = """\
652 This is just a test article.
653 ..Here is a dot-starting line.
654
655 -- Signed by Andr\xe9."""
656
657 sample_article = sample_head + "\n\n" + sample_body
658
659 def handle_ARTICLE(self, message_spec=None):
660 if message_spec is None:
661 self.push_lit("220 3000237 <45223423@example.com>")
662 elif message_spec == "<45223423@example.com>":
663 self.push_lit("220 0 <45223423@example.com>")
664 elif message_spec == "3000234":
665 self.push_lit("220 3000234 <45223423@example.com>")
666 else:
667 self.push_lit("430 No Such Article Found")
668 return
669 self.push_lit(self.sample_article)
670 self.push_lit(".")
671
672 def handle_HEAD(self, message_spec=None):
673 if message_spec is None:
674 self.push_lit("221 3000237 <45223423@example.com>")
675 elif message_spec == "<45223423@example.com>":
676 self.push_lit("221 0 <45223423@example.com>")
677 elif message_spec == "3000234":
678 self.push_lit("221 3000234 <45223423@example.com>")
679 else:
680 self.push_lit("430 No Such Article Found")
681 return
682 self.push_lit(self.sample_head)
683 self.push_lit(".")
684
685 def handle_BODY(self, message_spec=None):
686 if message_spec is None:
687 self.push_lit("222 3000237 <45223423@example.com>")
688 elif message_spec == "<45223423@example.com>":
689 self.push_lit("222 0 <45223423@example.com>")
690 elif message_spec == "3000234":
691 self.push_lit("222 3000234 <45223423@example.com>")
692 else:
693 self.push_lit("430 No Such Article Found")
694 return
695 self.push_lit(self.sample_body)
696 self.push_lit(".")
697
Antoine Pitrou54411c12012-02-12 19:14:17 +0100698 def handle_AUTHINFO(self, cred_type, data):
699 if self._logged_in:
700 self.push_lit('502 Already Logged In')
701 elif cred_type == 'user':
702 if self._user_sent:
703 self.push_lit('482 User Credential Already Sent')
704 else:
705 self.push_lit('381 Password Required')
706 self._user_sent = True
707 elif cred_type == 'pass':
708 self.push_lit('281 Login Successful')
709 self._logged_in = True
710 else:
711 raise Exception('Unknown cred type {}'.format(cred_type))
712
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000713
714class NNTPv2Handler(NNTPv1Handler):
715 """A handler for RFC 3977 (NNTP "v2")"""
716
717 def handle_CAPABILITIES(self):
Antoine Pitrou54411c12012-02-12 19:14:17 +0100718 fmt = """\
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000719 101 Capability list:
Antoine Pitrouf80b3f72010-11-02 22:31:52 +0000720 VERSION 2 3
Antoine Pitrou54411c12012-02-12 19:14:17 +0100721 IMPLEMENTATION INN 2.5.1{}
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000722 HDR
723 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
724 OVER
725 POST
726 READER
Antoine Pitrou54411c12012-02-12 19:14:17 +0100727 ."""
728
729 if not self._logged_in:
730 self.push_lit(fmt.format('\n AUTHINFO USER'))
731 else:
732 self.push_lit(fmt.format(''))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000733
Antoine Pitrou71135622012-02-14 23:29:34 +0100734 def handle_MODE(self, _):
735 raise Exception('MODE READER sent despite READER has been advertised')
736
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000737 def handle_OVER(self, message_spec=None):
738 return self.handle_XOVER(message_spec)
739
740
Antoine Pitrou54411c12012-02-12 19:14:17 +0100741class CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
742 """A handler that allows CAPABILITIES only after login"""
743
744 def handle_CAPABILITIES(self):
745 if not self._logged_in:
746 self.push_lit('480 You must log in.')
747 else:
748 super().handle_CAPABILITIES()
749
750
Antoine Pitrou71135622012-02-14 23:29:34 +0100751class ModeSwitchingNNTPv2Handler(NNTPv2Handler):
752 """A server that starts in transit mode"""
753
754 def __init__(self):
755 self._switched = False
756
757 def handle_CAPABILITIES(self):
758 fmt = """\
759 101 Capability list:
760 VERSION 2 3
761 IMPLEMENTATION INN 2.5.1
762 HDR
763 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
764 OVER
765 POST
766 {}READER
767 ."""
768 if self._switched:
769 self.push_lit(fmt.format(''))
770 else:
771 self.push_lit(fmt.format('MODE-'))
772
773 def handle_MODE(self, what):
774 assert not self._switched and what == 'reader'
775 self._switched = True
776 self.push_lit('200 Posting allowed')
777
778
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000779class NNTPv1v2TestsMixin:
780
781 def setUp(self):
782 super().setUp()
783
784 def test_welcome(self):
785 self.assertEqual(self.server.welcome, self.handler.welcome)
786
Antoine Pitrou54411c12012-02-12 19:14:17 +0100787 def test_authinfo(self):
788 if self.nntp_version == 2:
789 self.assertIn('AUTHINFO', self.server._caps)
790 self.server.login('testuser', 'testpw')
791 # if AUTHINFO is gone from _caps we also know that getcapabilities()
792 # has been called after login as it should
793 self.assertNotIn('AUTHINFO', self.server._caps)
794
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000795 def test_date(self):
796 resp, date = self.server.date()
797 self.assertEqual(resp, "111 20100914001155")
798 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
799
800 def test_quit(self):
801 self.assertFalse(self.sio.closed)
802 resp = self.server.quit()
803 self.assertEqual(resp, "205 Bye!")
804 self.assertTrue(self.sio.closed)
805
806 def test_help(self):
807 resp, help = self.server.help()
808 self.assertEqual(resp, "100 Legal commands")
809 self.assertEqual(help, [
810 ' authinfo user Name|pass Password|generic <prog> <args>',
811 ' date',
812 ' help',
813 'Report problems to <root@example.org>',
814 ])
815
816 def test_list(self):
817 resp, groups = self.server.list()
818 self.assertEqual(len(groups), 6)
819 g = groups[1]
820 self.assertEqual(g,
821 GroupInfo("comp.lang.python.announce", "0000001153",
822 "0000000993", "m"))
Antoine Pitrou08eeada2010-11-04 21:36:15 +0000823 resp, groups = self.server.list("*distutils*")
824 self.assertEqual(len(groups), 2)
825 g = groups[0]
826 self.assertEqual(g,
827 GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
828 "0000000001", "m"))
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000829
830 def test_stat(self):
831 resp, art_num, message_id = self.server.stat(3000234)
832 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
833 self.assertEqual(art_num, 3000234)
834 self.assertEqual(message_id, "<45223423@example.com>")
835 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
836 self.assertEqual(resp, "223 0 <45223423@example.com>")
837 self.assertEqual(art_num, 0)
838 self.assertEqual(message_id, "<45223423@example.com>")
839 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
840 self.server.stat("<non.existent.id>")
841 self.assertEqual(cm.exception.response, "430 No Such Article Found")
842 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
843 self.server.stat()
844 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
845
846 def test_next(self):
847 resp, art_num, message_id = self.server.next()
848 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
849 self.assertEqual(art_num, 3000237)
850 self.assertEqual(message_id, "<668929@example.org>")
851
852 def test_last(self):
853 resp, art_num, message_id = self.server.last()
854 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
855 self.assertEqual(art_num, 3000234)
856 self.assertEqual(message_id, "<45223423@example.com>")
857
858 def test_description(self):
859 desc = self.server.description("comp.lang.python")
860 self.assertEqual(desc, "The Python computer language.")
861 desc = self.server.description("comp.lang.pythonx")
862 self.assertEqual(desc, "")
863
864 def test_descriptions(self):
865 resp, groups = self.server.descriptions("comp.lang.python")
866 self.assertEqual(resp, '215 Descriptions in form "group description".')
867 self.assertEqual(groups, {
868 "comp.lang.python": "The Python computer language.",
869 })
870 resp, groups = self.server.descriptions("comp.lang.python*")
871 self.assertEqual(groups, {
872 "comp.lang.python": "The Python computer language.",
873 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
874 })
875 resp, groups = self.server.descriptions("comp.lang.pythonx")
876 self.assertEqual(groups, {})
877
878 def test_group(self):
879 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
880 self.assertTrue(resp.startswith("211 "), resp)
881 self.assertEqual(first, 761)
882 self.assertEqual(last, 1265)
883 self.assertEqual(count, 486)
884 self.assertEqual(group, "fr.comp.lang.python")
885 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
886 self.server.group("comp.lang.python.devel")
887 exc = cm.exception
888 self.assertTrue(exc.response.startswith("411 No such group"),
889 exc.response)
890
891 def test_newnews(self):
892 # NEWNEWS comp.lang.python [20]100913 082004
893 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
894 resp, ids = self.server.newnews("comp.lang.python", dt)
895 expected = (
896 "230 list of newsarticles (NNTP v{0}) "
897 "created after Mon Sep 13 08:20:04 2010 follows"
898 ).format(self.nntp_version)
899 self.assertEqual(resp, expected)
900 self.assertEqual(ids, [
901 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
902 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
903 ])
904 # NEWNEWS fr.comp.lang.python [20]100913 082004
905 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
906 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
907 self.assertEqual(resp, "230 An empty list of newsarticles follows")
908 self.assertEqual(ids, [])
909
910 def _check_article_body(self, lines):
911 self.assertEqual(len(lines), 4)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +0000912 self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.")
Antoine Pitrou69ab9512010-09-29 15:03:40 +0000913 self.assertEqual(lines[-2], b"")
914 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
915 self.assertEqual(lines[-4], b"This is just a test article.")
916
917 def _check_article_head(self, lines):
918 self.assertEqual(len(lines), 4)
919 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
920 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
921
922 def _check_article_data(self, lines):
923 self.assertEqual(len(lines), 9)
924 self._check_article_head(lines[:4])
925 self._check_article_body(lines[-4:])
926 self.assertEqual(lines[4], b"")
927
928 def test_article(self):
929 # ARTICLE
930 resp, info = self.server.article()
931 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
932 art_num, message_id, lines = info
933 self.assertEqual(art_num, 3000237)
934 self.assertEqual(message_id, "<45223423@example.com>")
935 self._check_article_data(lines)
936 # ARTICLE num
937 resp, info = self.server.article(3000234)
938 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
939 art_num, message_id, lines = info
940 self.assertEqual(art_num, 3000234)
941 self.assertEqual(message_id, "<45223423@example.com>")
942 self._check_article_data(lines)
943 # ARTICLE id
944 resp, info = self.server.article("<45223423@example.com>")
945 self.assertEqual(resp, "220 0 <45223423@example.com>")
946 art_num, message_id, lines = info
947 self.assertEqual(art_num, 0)
948 self.assertEqual(message_id, "<45223423@example.com>")
949 self._check_article_data(lines)
950 # Non-existent id
951 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
952 self.server.article("<non-existent@example.com>")
953 self.assertEqual(cm.exception.response, "430 No Such Article Found")
954
955 def test_article_file(self):
956 # With a "file" argument
957 f = io.BytesIO()
958 resp, info = self.server.article(file=f)
959 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
960 art_num, message_id, lines = info
961 self.assertEqual(art_num, 3000237)
962 self.assertEqual(message_id, "<45223423@example.com>")
963 self.assertEqual(lines, [])
964 data = f.getvalue()
965 self.assertTrue(data.startswith(
966 b'From: "Demo User" <nobody@example.net>\r\n'
967 b'Subject: I am just a test article\r\n'
968 ), ascii(data))
969 self.assertTrue(data.endswith(
970 b'This is just a test article.\r\n'
971 b'.Here is a dot-starting line.\r\n'
972 b'\r\n'
973 b'-- Signed by Andr\xc3\xa9.\r\n'
974 ), ascii(data))
975
976 def test_head(self):
977 # HEAD
978 resp, info = self.server.head()
979 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
980 art_num, message_id, lines = info
981 self.assertEqual(art_num, 3000237)
982 self.assertEqual(message_id, "<45223423@example.com>")
983 self._check_article_head(lines)
984 # HEAD num
985 resp, info = self.server.head(3000234)
986 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
987 art_num, message_id, lines = info
988 self.assertEqual(art_num, 3000234)
989 self.assertEqual(message_id, "<45223423@example.com>")
990 self._check_article_head(lines)
991 # HEAD id
992 resp, info = self.server.head("<45223423@example.com>")
993 self.assertEqual(resp, "221 0 <45223423@example.com>")
994 art_num, message_id, lines = info
995 self.assertEqual(art_num, 0)
996 self.assertEqual(message_id, "<45223423@example.com>")
997 self._check_article_head(lines)
998 # Non-existent id
999 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1000 self.server.head("<non-existent@example.com>")
1001 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1002
Antoine Pitrou2640b522012-02-15 18:53:18 +01001003 def test_head_file(self):
1004 f = io.BytesIO()
1005 resp, info = self.server.head(file=f)
1006 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1007 art_num, message_id, lines = info
1008 self.assertEqual(art_num, 3000237)
1009 self.assertEqual(message_id, "<45223423@example.com>")
1010 self.assertEqual(lines, [])
1011 data = f.getvalue()
1012 self.assertTrue(data.startswith(
1013 b'From: "Demo User" <nobody@example.net>\r\n'
1014 b'Subject: I am just a test article\r\n'
1015 ), ascii(data))
1016 self.assertFalse(data.endswith(
1017 b'This is just a test article.\r\n'
1018 b'.Here is a dot-starting line.\r\n'
1019 b'\r\n'
1020 b'-- Signed by Andr\xc3\xa9.\r\n'
1021 ), ascii(data))
1022
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001023 def test_body(self):
1024 # BODY
1025 resp, info = self.server.body()
1026 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1027 art_num, message_id, lines = info
1028 self.assertEqual(art_num, 3000237)
1029 self.assertEqual(message_id, "<45223423@example.com>")
1030 self._check_article_body(lines)
1031 # BODY num
1032 resp, info = self.server.body(3000234)
1033 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
1034 art_num, message_id, lines = info
1035 self.assertEqual(art_num, 3000234)
1036 self.assertEqual(message_id, "<45223423@example.com>")
1037 self._check_article_body(lines)
1038 # BODY id
1039 resp, info = self.server.body("<45223423@example.com>")
1040 self.assertEqual(resp, "222 0 <45223423@example.com>")
1041 art_num, message_id, lines = info
1042 self.assertEqual(art_num, 0)
1043 self.assertEqual(message_id, "<45223423@example.com>")
1044 self._check_article_body(lines)
1045 # Non-existent id
1046 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1047 self.server.body("<non-existent@example.com>")
1048 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1049
Antoine Pitrou2640b522012-02-15 18:53:18 +01001050 def test_body_file(self):
1051 f = io.BytesIO()
1052 resp, info = self.server.body(file=f)
1053 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1054 art_num, message_id, lines = info
1055 self.assertEqual(art_num, 3000237)
1056 self.assertEqual(message_id, "<45223423@example.com>")
1057 self.assertEqual(lines, [])
1058 data = f.getvalue()
1059 self.assertFalse(data.startswith(
1060 b'From: "Demo User" <nobody@example.net>\r\n'
1061 b'Subject: I am just a test article\r\n'
1062 ), ascii(data))
1063 self.assertTrue(data.endswith(
1064 b'This is just a test article.\r\n'
1065 b'.Here is a dot-starting line.\r\n'
1066 b'\r\n'
1067 b'-- Signed by Andr\xc3\xa9.\r\n'
1068 ), ascii(data))
1069
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001070 def check_over_xover_resp(self, resp, overviews):
1071 self.assertTrue(resp.startswith("224 "), resp)
1072 self.assertEqual(len(overviews), 3)
1073 art_num, over = overviews[0]
1074 self.assertEqual(art_num, 57)
1075 self.assertEqual(over, {
1076 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
1077 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
1078 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
1079 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
1080 "references": "<hvalf7$ort$1@dough.gmane.org>",
1081 ":bytes": "7103",
1082 ":lines": "16",
1083 "xref": "news.gmane.org gmane.comp.python.authors:57"
1084 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001085 art_num, over = overviews[1]
1086 self.assertEqual(over["xref"], None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001087 art_num, over = overviews[2]
1088 self.assertEqual(over["subject"],
1089 "Re: Message d'erreur incompréhensible (par moi)")
1090
1091 def test_xover(self):
1092 resp, overviews = self.server.xover(57, 59)
1093 self.check_over_xover_resp(resp, overviews)
1094
1095 def test_over(self):
1096 # In NNTP "v1", this will fallback on XOVER
1097 resp, overviews = self.server.over((57, 59))
1098 self.check_over_xover_resp(resp, overviews)
1099
1100 sample_post = (
1101 b'From: "Demo User" <nobody@example.net>\r\n'
1102 b'Subject: I am just a test article\r\n'
1103 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
1104 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
1105 b'\r\n'
1106 b'This is just a test article.\r\n'
1107 b'.Here is a dot-starting line.\r\n'
1108 b'\r\n'
1109 b'-- Signed by Andr\xc3\xa9.\r\n'
1110 )
1111
1112 def _check_posted_body(self):
1113 # Check the raw body as received by the server
1114 lines = self.handler.posted_body
1115 # One additional line for the "." terminator
1116 self.assertEqual(len(lines), 10)
1117 self.assertEqual(lines[-1], b'.\r\n')
1118 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
1119 self.assertEqual(lines[-3], b'\r\n')
1120 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
1121 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
1122
1123 def _check_post_ihave_sub(self, func, *args, file_factory):
1124 # First the prepared post with CRLF endings
1125 post = self.sample_post
1126 func_args = args + (file_factory(post),)
1127 self.handler.posted_body = None
1128 resp = func(*func_args)
1129 self._check_posted_body()
1130 # Then the same post with "normal" line endings - they should be
1131 # converted by NNTP.post and NNTP.ihave.
1132 post = self.sample_post.replace(b"\r\n", b"\n")
1133 func_args = args + (file_factory(post),)
1134 self.handler.posted_body = None
1135 resp = func(*func_args)
1136 self._check_posted_body()
1137 return resp
1138
1139 def check_post_ihave(self, func, success_resp, *args):
1140 # With a bytes object
1141 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1142 self.assertEqual(resp, success_resp)
1143 # With a bytearray object
1144 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1145 self.assertEqual(resp, success_resp)
1146 # With a file object
1147 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1148 self.assertEqual(resp, success_resp)
1149 # With an iterable of terminated lines
1150 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001151 return iter(b.splitlines(keepends=True))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001152 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1153 self.assertEqual(resp, success_resp)
1154 # With an iterable of non-terminated lines
1155 def iterlines(b):
Ezio Melottid8b509b2011-09-28 17:37:55 +03001156 return iter(b.splitlines(keepends=False))
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001157 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1158 self.assertEqual(resp, success_resp)
1159
1160 def test_post(self):
1161 self.check_post_ihave(self.server.post, "240 Article received OK")
1162 self.handler.allow_posting = False
1163 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1164 self.server.post(self.sample_post)
1165 self.assertEqual(cm.exception.response,
1166 "440 Posting not permitted")
1167
1168 def test_ihave(self):
1169 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1170 "<i.am.an.article.you.will.want@example.com>")
1171 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1172 self.server.ihave("<another.message.id>", self.sample_post)
1173 self.assertEqual(cm.exception.response,
1174 "435 Article not wanted")
1175
1176
1177class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1178 """Tests an NNTP v1 server (no capabilities)."""
1179
1180 nntp_version = 1
1181 handler_class = NNTPv1Handler
1182
1183 def test_caps(self):
1184 caps = self.server.getcapabilities()
1185 self.assertEqual(caps, {})
1186 self.assertEqual(self.server.nntp_version, 1)
Antoine Pitroua0781152010-11-05 19:16:37 +00001187 self.assertEqual(self.server.nntp_implementation, None)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001188
1189
1190class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1191 """Tests an NNTP v2 server (with capabilities)."""
1192
1193 nntp_version = 2
1194 handler_class = NNTPv2Handler
1195
1196 def test_caps(self):
1197 caps = self.server.getcapabilities()
1198 self.assertEqual(caps, {
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001199 'VERSION': ['2', '3'],
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001200 'IMPLEMENTATION': ['INN', '2.5.1'],
1201 'AUTHINFO': ['USER'],
1202 'HDR': [],
1203 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1204 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1205 'OVER': [],
1206 'POST': [],
1207 'READER': [],
1208 })
Antoine Pitrouf80b3f72010-11-02 22:31:52 +00001209 self.assertEqual(self.server.nntp_version, 3)
Antoine Pitroua0781152010-11-05 19:16:37 +00001210 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001211
1212
Antoine Pitrou54411c12012-02-12 19:14:17 +01001213class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
1214 """Tests a probably NNTP v2 server with capabilities only after login."""
1215
1216 nntp_version = 2
1217 handler_class = CapsAfterLoginNNTPv2Handler
1218
1219 def test_caps_only_after_login(self):
1220 self.assertEqual(self.server._caps, {})
1221 self.server.login('testuser', 'testpw')
1222 self.assertIn('VERSION', self.server._caps)
1223
1224
Antoine Pitrou71135622012-02-14 23:29:34 +01001225class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin,
1226 unittest.TestCase):
1227 """Same tests as for v2 but we tell NTTP to send MODE READER to a server
1228 that isn't in READER mode by default."""
1229
1230 nntp_version = 2
1231 handler_class = ModeSwitchingNNTPv2Handler
1232
1233 def test_we_are_in_reader_mode_after_connect(self):
1234 self.assertIn('READER', self.server._caps)
1235
1236
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001237class MiscTests(unittest.TestCase):
1238
1239 def test_decode_header(self):
1240 def gives(a, b):
1241 self.assertEqual(nntplib.decode_header(a), b)
1242 gives("" , "")
1243 gives("a plain header", "a plain header")
1244 gives(" with extra spaces ", " with extra spaces ")
1245 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "DĂ©buter en Python")
1246 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1247 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1248 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1249 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1250 "Re: problème de matrice")
1251 # A natively utf-8 header (found in the real world!)
1252 gives("Re: Message d'erreur incompréhensible (par moi)",
1253 "Re: Message d'erreur incompréhensible (par moi)")
1254
1255 def test_parse_overview_fmt(self):
1256 # The minimal (default) response
1257 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1258 "References:", ":bytes", ":lines"]
1259 self.assertEqual(nntplib._parse_overview_fmt(lines),
1260 ["subject", "from", "date", "message-id", "references",
1261 ":bytes", ":lines"])
1262 # The minimal response using alternative names
1263 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1264 "References:", "Bytes:", "Lines:"]
1265 self.assertEqual(nntplib._parse_overview_fmt(lines),
1266 ["subject", "from", "date", "message-id", "references",
1267 ":bytes", ":lines"])
1268 # Variations in casing
1269 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1270 "References:", "BYTES:", "Lines:"]
1271 self.assertEqual(nntplib._parse_overview_fmt(lines),
1272 ["subject", "from", "date", "message-id", "references",
1273 ":bytes", ":lines"])
1274 # First example from RFC 3977
1275 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1276 "References:", ":bytes", ":lines", "Xref:full",
1277 "Distribution:full"]
1278 self.assertEqual(nntplib._parse_overview_fmt(lines),
1279 ["subject", "from", "date", "message-id", "references",
1280 ":bytes", ":lines", "xref", "distribution"])
1281 # Second example from RFC 3977
1282 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1283 "References:", "Bytes:", "Lines:", "Xref:FULL",
1284 "Distribution:FULL"]
1285 self.assertEqual(nntplib._parse_overview_fmt(lines),
1286 ["subject", "from", "date", "message-id", "references",
1287 ":bytes", ":lines", "xref", "distribution"])
1288 # A classic response from INN
1289 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1290 "References:", "Bytes:", "Lines:", "Xref:full"]
1291 self.assertEqual(nntplib._parse_overview_fmt(lines),
1292 ["subject", "from", "date", "message-id", "references",
1293 ":bytes", ":lines", "xref"])
1294
1295 def test_parse_overview(self):
1296 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1297 # First example from RFC 3977
1298 lines = [
1299 '3000234\tI am just a test article\t"Demo User" '
1300 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1301 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1302 '17\tXref: news.example.com misc.test:3000363',
1303 ]
1304 overview = nntplib._parse_overview(lines, fmt)
1305 (art_num, fields), = overview
1306 self.assertEqual(art_num, 3000234)
1307 self.assertEqual(fields, {
1308 'subject': 'I am just a test article',
1309 'from': '"Demo User" <nobody@example.com>',
1310 'date': '6 Oct 1998 04:38:40 -0500',
1311 'message-id': '<45223423@example.com>',
1312 'references': '<45454@example.net>',
1313 ':bytes': '1234',
1314 ':lines': '17',
1315 'xref': 'news.example.com misc.test:3000363',
1316 })
Antoine Pitrou4103bc02010-11-03 18:18:43 +00001317 # Second example; here the "Xref" field is totally absent (including
1318 # the header name) and comes out as None
1319 lines = [
1320 '3000234\tI am just a test article\t"Demo User" '
1321 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1322 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1323 '17\t\t',
1324 ]
1325 overview = nntplib._parse_overview(lines, fmt)
1326 (art_num, fields), = overview
1327 self.assertEqual(fields['xref'], None)
1328 # Third example; the "Xref" is an empty string, while "references"
1329 # is a single space.
1330 lines = [
1331 '3000234\tI am just a test article\t"Demo User" '
1332 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1333 '<45223423@example.com>\t \t1234\t'
1334 '17\tXref: \t',
1335 ]
1336 overview = nntplib._parse_overview(lines, fmt)
1337 (art_num, fields), = overview
1338 self.assertEqual(fields['references'], ' ')
1339 self.assertEqual(fields['xref'], '')
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001340
1341 def test_parse_datetime(self):
1342 def gives(a, b, *c):
1343 self.assertEqual(nntplib._parse_datetime(a, b),
1344 datetime.datetime(*c))
1345 # Output of DATE command
1346 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1347 # Variations
1348 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1349 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1350 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1351
1352 def test_unparse_datetime(self):
1353 # Test non-legacy mode
1354 # 1) with a datetime
1355 def gives(y, M, d, h, m, s, date_str, time_str):
1356 dt = datetime.datetime(y, M, d, h, m, s)
1357 self.assertEqual(nntplib._unparse_datetime(dt),
1358 (date_str, time_str))
1359 self.assertEqual(nntplib._unparse_datetime(dt, False),
1360 (date_str, time_str))
1361 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1362 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1363 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1364 # 2) with a date
1365 def gives(y, M, d, date_str, time_str):
1366 dt = datetime.date(y, M, d)
1367 self.assertEqual(nntplib._unparse_datetime(dt),
1368 (date_str, time_str))
1369 self.assertEqual(nntplib._unparse_datetime(dt, False),
1370 (date_str, time_str))
1371 gives(1999, 6, 23, "19990623", "000000")
1372 gives(2000, 6, 23, "20000623", "000000")
1373 gives(2010, 6, 5, "20100605", "000000")
1374
1375 def test_unparse_datetime_legacy(self):
1376 # Test legacy mode (RFC 977)
1377 # 1) with a datetime
1378 def gives(y, M, d, h, m, s, date_str, time_str):
1379 dt = datetime.datetime(y, M, d, h, m, s)
1380 self.assertEqual(nntplib._unparse_datetime(dt, True),
1381 (date_str, time_str))
1382 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1383 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1384 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1385 # 2) with a date
1386 def gives(y, M, d, date_str, time_str):
1387 dt = datetime.date(y, M, d)
1388 self.assertEqual(nntplib._unparse_datetime(dt, True),
1389 (date_str, time_str))
1390 gives(1999, 6, 23, "990623", "000000")
1391 gives(2000, 6, 23, "000623", "000000")
1392 gives(2010, 6, 5, "100605", "000000")
1393
1394
1395def test_main():
Antoine Pitrou54411c12012-02-12 19:14:17 +01001396 tests = [MiscTests, NNTPv1Tests, NNTPv2Tests, CapsAfterLoginNNTPv2Tests,
Antoine Pitrou71135622012-02-14 23:29:34 +01001397 SendReaderNNTPv2Tests, NetworkedNNTPTests]
Antoine Pitrou1cb121e2010-11-09 18:54:37 +00001398 if _have_ssl:
1399 tests.append(NetworkedNNTP_SSLTests)
1400 support.run_unittest(*tests)
Antoine Pitrou69ab9512010-09-29 15:03:40 +00001401
1402
1403if __name__ == "__main__":
1404 test_main()