bpo-36802: Drop awrite()/aclose(), support await write() and await close() instead (#13099)

diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
index 79adf02..d9a9f5e 100644
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -352,6 +352,8 @@
         assert reader is None or isinstance(reader, StreamReader)
         self._reader = reader
         self._loop = loop
+        self._complete_fut = self._loop.create_future()
+        self._complete_fut.set_result(None)
 
     def __repr__(self):
         info = [self.__class__.__name__, f'transport={self._transport!r}']
@@ -365,9 +367,33 @@
 
     def write(self, data):
         self._transport.write(data)
+        return self._fast_drain()
 
     def writelines(self, data):
         self._transport.writelines(data)
+        return self._fast_drain()
+
+    def _fast_drain(self):
+        # The helper tries to use fast-path to return already existing complete future
+        # object if underlying transport is not paused and actual waiting for writing
+        # resume is not needed
+        if self._reader is not None:
+            # this branch will be simplified after merging reader with writer
+            exc = self._reader.exception()
+            if exc is not None:
+                fut = self._loop.create_future()
+                fut.set_exception(exc)
+                return fut
+        if not self._transport.is_closing():
+            if self._protocol._connection_lost:
+                fut = self._loop.create_future()
+                fut.set_exception(ConnectionResetError('Connection lost'))
+                return fut
+            if not self._protocol._paused:
+                # fast path, the stream is not paused
+                # no need to wait for resume signal
+                return self._complete_fut
+        return self._loop.create_task(self.drain())
 
     def write_eof(self):
         return self._transport.write_eof()
@@ -377,6 +403,7 @@
 
     def close(self):
         self._transport.close()
+        return self._protocol._get_close_waiter(self)
 
     def is_closing(self):
         return self._transport.is_closing()
@@ -408,14 +435,6 @@
             raise ConnectionResetError('Connection lost')
         await self._protocol._drain_helper()
 
-    async def aclose(self):
-        self.close()
-        await self.wait_closed()
-
-    async def awrite(self, data):
-        self.write(data)
-        await self.drain()
-
 
 class StreamReader: