Issue #27243: Fix __aiter__ protocol
diff --git a/Lib/_collections_abc.py b/Lib/_collections_abc.py
index f89bb6f..fc9c9f1 100644
--- a/Lib/_collections_abc.py
+++ b/Lib/_collections_abc.py
@@ -156,7 +156,7 @@
     __slots__ = ()
 
     @abstractmethod
-    async def __aiter__(self):
+    def __aiter__(self):
         return AsyncIterator()
 
     @classmethod
@@ -176,7 +176,7 @@
         """Return the next item or raise StopAsyncIteration when exhausted."""
         raise StopAsyncIteration
 
-    async def __aiter__(self):
+    def __aiter__(self):
         return self
 
     @classmethod
diff --git a/Lib/asyncio/compat.py b/Lib/asyncio/compat.py
index 660b7e7..4790bb4 100644
--- a/Lib/asyncio/compat.py
+++ b/Lib/asyncio/compat.py
@@ -4,6 +4,7 @@
 
 PY34 = sys.version_info >= (3, 4)
 PY35 = sys.version_info >= (3, 5)
+PY352 = sys.version_info >= (3, 5, 2)
 
 
 def flatten_list_bytes(list_of_data):
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
index 6f465af..c88a87c 100644
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -689,3 +689,9 @@
             if val == b'':
                 raise StopAsyncIteration
             return val
+
+    if compat.PY352:
+        # In Python 3.5.2 and greater, __aiter__ should return
+        # the asynchronous iterator directly.
+        def __aiter__(self):
+            return self
diff --git a/Lib/test/test_coroutines.py b/Lib/test/test_coroutines.py
index 187348d..4f725ae 100644
--- a/Lib/test/test_coroutines.py
+++ b/Lib/test/test_coroutines.py
@@ -1255,8 +1255,9 @@
 
         buffer = []
         async def test1():
-            async for i1, i2 in AsyncIter():
-                buffer.append(i1 + i2)
+            with self.assertWarnsRegex(PendingDeprecationWarning, "legacy"):
+                async for i1, i2 in AsyncIter():
+                    buffer.append(i1 + i2)
 
         yielded, _ = run_async(test1())
         # Make sure that __aiter__ was called only once
@@ -1268,12 +1269,13 @@
         buffer = []
         async def test2():
             nonlocal buffer
-            async for i in AsyncIter():
-                buffer.append(i[0])
-                if i[0] == 20:
-                    break
-            else:
-                buffer.append('what?')
+            with self.assertWarnsRegex(PendingDeprecationWarning, "legacy"):
+                async for i in AsyncIter():
+                    buffer.append(i[0])
+                    if i[0] == 20:
+                        break
+                else:
+                    buffer.append('what?')
             buffer.append('end')
 
         yielded, _ = run_async(test2())
@@ -1286,12 +1288,13 @@
         buffer = []
         async def test3():
             nonlocal buffer
-            async for i in AsyncIter():
-                if i[0] > 20:
-                    continue
-                buffer.append(i[0])
-            else:
-                buffer.append('what?')
+            with self.assertWarnsRegex(PendingDeprecationWarning, "legacy"):
+                async for i in AsyncIter():
+                    if i[0] > 20:
+                        continue
+                    buffer.append(i[0])
+                else:
+                    buffer.append('what?')
             buffer.append('end')
 
         yielded, _ = run_async(test3())
@@ -1338,7 +1341,7 @@
 
     def test_for_4(self):
         class I:
-            async def __aiter__(self):
+            def __aiter__(self):
                 return self
 
             def __anext__(self):
@@ -1368,8 +1371,9 @@
                 return 123
 
         async def foo():
-            async for i in I():
-                print('never going to happen')
+            with self.assertWarnsRegex(PendingDeprecationWarning, "legacy"):
+                async for i in I():
+                    print('never going to happen')
 
         with self.assertRaisesRegex(
                 TypeError,
@@ -1393,7 +1397,7 @@
             def __init__(self):
                 self.i = 0
 
-            async def __aiter__(self):
+            def __aiter__(self):
                 return self
 
             async def __anext__(self):
@@ -1417,7 +1421,11 @@
                     I += 1
             I += 1000
 
-        run_async(main())
+        with warnings.catch_warnings():
+            warnings.simplefilter("error")
+            # Test that __aiter__ that returns an asyncronous iterator
+            # directly does not throw any warnings.
+            run_async(main())
         self.assertEqual(I, 111011)
 
         self.assertEqual(sys.getrefcount(manager), mrefs_before)
@@ -1472,13 +1480,63 @@
                 1/0
         async def foo():
             nonlocal CNT
-            async for i in AI():
-                CNT += 1
+            with self.assertWarnsRegex(PendingDeprecationWarning, "legacy"):
+                async for i in AI():
+                    CNT += 1
             CNT += 10
         with self.assertRaises(ZeroDivisionError):
             run_async(foo())
         self.assertEqual(CNT, 0)
 
+    def test_for_8(self):
+        CNT = 0
+        class AI:
+            def __aiter__(self):
+                1/0
+        async def foo():
+            nonlocal CNT
+            async for i in AI():
+                CNT += 1
+            CNT += 10
+        with self.assertRaises(ZeroDivisionError):
+            with warnings.catch_warnings():
+                warnings.simplefilter("error")
+                # Test that if __aiter__ raises an exception it propagates
+                # without any kind of warning.
+                run_async(foo())
+        self.assertEqual(CNT, 0)
+
+    def test_for_9(self):
+        # Test that PendingDeprecationWarning can safely be converted into
+        # an exception (__aiter__ should not have a chance to raise
+        # a ZeroDivisionError.)
+        class AI:
+            async def __aiter__(self):
+                1/0
+        async def foo():
+            async for i in AI():
+                pass
+
+        with self.assertRaises(PendingDeprecationWarning):
+            with warnings.catch_warnings():
+                warnings.simplefilter("error")
+                run_async(foo())
+
+    def test_for_10(self):
+        # Test that PendingDeprecationWarning can safely be converted into
+        # an exception.
+        class AI:
+            async def __aiter__(self):
+                pass
+        async def foo():
+            async for i in AI():
+                pass
+
+        with self.assertRaises(PendingDeprecationWarning):
+            with warnings.catch_warnings():
+                warnings.simplefilter("error")
+                run_async(foo())
+
     def test_copy(self):
         async def func(): pass
         coro = func()
diff --git a/Lib/test/test_grammar.py b/Lib/test/test_grammar.py
index d68cc7d..154e3b6 100644
--- a/Lib/test/test_grammar.py
+++ b/Lib/test/test_grammar.py
@@ -1076,7 +1076,7 @@
         class Done(Exception): pass
 
         class AIter:
-            async def __aiter__(self):
+            def __aiter__(self):
                 return self
             async def __anext__(self):
                 raise StopAsyncIteration