#13700: Make imap.authenticate with authobject work.

This fixes a bytes/string confusion in the API which prevented
custom authobjects from working at all.

Original patch by Erno Tukia.
diff --git a/Lib/test/test_imaplib.py b/Lib/test/test_imaplib.py
index 43ee7df..6b29943 100644
--- a/Lib/test/test_imaplib.py
+++ b/Lib/test/test_imaplib.py
@@ -78,14 +78,25 @@
 class SimpleIMAPHandler(socketserver.StreamRequestHandler):
 
     timeout = 1
+    continuation = None
+    capabilities = ''
 
     def _send(self, message):
         if verbose: print("SENT: %r" % message.strip())
         self.wfile.write(message)
 
+    def _send_line(self, message):
+        self._send(message + b'\r\n')
+
+    def _send_textline(self, message):
+        self._send_line(message.encode('ASCII'))
+
+    def _send_tagged(self, tag, code, message):
+        self._send_textline(' '.join((tag, code, message)))
+
     def handle(self):
         # Send a welcome message.
-        self._send(b'* OK IMAP4rev1\r\n')
+        self._send_textline('* OK IMAP4rev1')
         while 1:
             # Gather up input until we receive a line terminator or we timeout.
             # Accumulate read(1) because it's simpler to handle the differences
@@ -105,19 +116,33 @@
                     break
 
             if verbose: print('GOT: %r' % line.strip())
-            splitline = line.split()
-            tag = splitline[0].decode('ASCII')
-            cmd = splitline[1].decode('ASCII')
+            if self.continuation:
+                try:
+                    self.continuation.send(line)
+                except StopIteration:
+                    self.continuation = None
+                continue
+            splitline = line.decode('ASCII').split()
+            tag = splitline[0]
+            cmd = splitline[1]
             args = splitline[2:]
 
             if hasattr(self, 'cmd_'+cmd):
-                getattr(self, 'cmd_'+cmd)(tag, args)
+                continuation = getattr(self, 'cmd_'+cmd)(tag, args)
+                if continuation:
+                    self.continuation = continuation
+                    next(continuation)
             else:
-                self._send('{} BAD {} unknown\r\n'.format(tag, cmd).encode('ASCII'))
+                self._send_tagged(tag, 'BAD', cmd + ' unknown')
 
     def cmd_CAPABILITY(self, tag, args):
-        self._send(b'* CAPABILITY IMAP4rev1\r\n')
-        self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode('ASCII'))
+        caps = 'IMAP4rev1 ' + self.capabilities if self.capabilities else 'IMAP4rev1'
+        self._send_textline('* CAPABILITY ' + caps)
+        self._send_tagged(tag, 'OK', 'CAPABILITY completed')
+
+    def cmd_LOGOUT(self, tag, args):
+        self._send_textline('* BYE IMAP4ref1 Server logging out')
+        self._send_tagged(tag, 'OK', 'LOGOUT completed')
 
 
 class BaseThreadedNetworkedTests(unittest.TestCase):
@@ -167,6 +192,16 @@
         finally:
             self.reap_server(server, thread)
 
+    @contextmanager
+    def reaped_pair(self, hdlr):
+        server, thread = self.make_server((support.HOST, 0), hdlr)
+        client = self.imap_class(*server.server_address)
+        try:
+            yield server, client
+        finally:
+            client.logout()
+            self.reap_server(server, thread)
+
     @reap_threads
     def test_connect(self):
         with self.reaped_server(SimpleIMAPHandler) as server:
@@ -192,12 +227,86 @@
 
             def cmd_CAPABILITY(self, tag, args):
                 self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
-                self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode('ASCII'))
+                self._send_tagged(tag, 'OK', 'CAPABILITY completed')
 
         with self.reaped_server(BadNewlineHandler) as server:
             self.assertRaises(imaplib.IMAP4.abort,
                               self.imap_class, *server.server_address)
 
+    @reap_threads
+    def test_bad_auth_name(self):
+
+        class MyServer(SimpleIMAPHandler):
+
+            def cmd_AUTHENTICATE(self, tag, args):
+                self._send_tagged(tag, 'NO', 'unrecognized authentication '
+                        'type {}'.format(args[0]))
+
+        with self.reaped_pair(MyServer) as (server, client):
+            with self.assertRaises(imaplib.IMAP4.error):
+                client.authenticate('METHOD', lambda: 1)
+
+    @reap_threads
+    def test_invalid_authentication(self):
+
+        class MyServer(SimpleIMAPHandler):
+
+            def cmd_AUTHENTICATE(self, tag, args):
+                self._send_textline('+')
+                self.response = yield
+                self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid')
+
+        with self.reaped_pair(MyServer) as (server, client):
+            with self.assertRaises(imaplib.IMAP4.error):
+                code, data = client.authenticate('MYAUTH', lambda x: b'fake')
+
+    @reap_threads
+    def test_valid_authentication(self):
+
+        class MyServer(SimpleIMAPHandler):
+
+            def cmd_AUTHENTICATE(self, tag, args):
+                self._send_textline('+')
+                self.server.response = yield
+                self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
+
+        with self.reaped_pair(MyServer) as (server, client):
+            code, data = client.authenticate('MYAUTH', lambda x: b'fake')
+            self.assertEqual(code, 'OK')
+            self.assertEqual(server.response,
+                             b'ZmFrZQ==\r\n') #b64 encoded 'fake'
+
+        with self.reaped_pair(MyServer) as (server, client):
+            code, data = client.authenticate('MYAUTH', lambda x: 'fake')
+            self.assertEqual(code, 'OK')
+            self.assertEqual(server.response,
+                             b'ZmFrZQ==\r\n') #b64 encoded 'fake'
+
+    @reap_threads
+    def test_login_cram_md5(self):
+
+        class AuthHandler(SimpleIMAPHandler):
+
+            capabilities = 'LOGINDISABLED AUTH=CRAM-MD5'
+
+            def cmd_AUTHENTICATE(self, tag, args):
+                self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm'
+                                       'VzdG9uLm1jaS5uZXQ=')
+                r = yield
+                if r ==  b'dGltIGYxY2E2YmU0NjRiOWVmYTFjY2E2ZmZkNmNmMmQ5ZjMy\r\n':
+                    self._send_tagged(tag, 'OK', 'CRAM-MD5 successful')
+                else:
+                    self._send_tagged(tag, 'NO', 'No access')
+
+        with self.reaped_pair(AuthHandler) as (server, client):
+            self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
+            ret, data = client.login_cram_md5("tim", "tanstaaftanstaaf")
+            self.assertEqual(ret, "OK")
+
+        with self.reaped_pair(AuthHandler) as (server, client):
+            self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
+            ret, data = client.login_cram_md5("tim", b"tanstaaftanstaaf")
+            self.assertEqual(ret, "OK")
 
 
 class ThreadedNetworkedTests(BaseThreadedNetworkedTests):