bpo-38242: Revert "bpo-36889: Merge asyncio streams (GH-13251)" (#16482)

See https://bugs.python.org/issue38242 for more details
diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py
index bddfb01..c9506b1 100644
--- a/Lib/asyncio/subprocess.py
+++ b/Lib/asyncio/subprocess.py
@@ -19,16 +19,14 @@
                                protocols.SubprocessProtocol):
     """Like StreamReaderProtocol, but for a subprocess."""
 
-    def __init__(self, limit, loop, *, _asyncio_internal=False):
-        super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
+    def __init__(self, limit, loop):
+        super().__init__(loop=loop)
         self._limit = limit
         self.stdin = self.stdout = self.stderr = None
         self._transport = None
         self._process_exited = False
         self._pipe_fds = []
         self._stdin_closed = self._loop.create_future()
-        self._stdout_closed = self._loop.create_future()
-        self._stderr_closed = self._loop.create_future()
 
     def __repr__(self):
         info = [self.__class__.__name__]
@@ -42,35 +40,27 @@
 
     def connection_made(self, transport):
         self._transport = transport
+
         stdout_transport = transport.get_pipe_transport(1)
         if stdout_transport is not None:
-            self.stdout = streams.Stream(mode=streams.StreamMode.READ,
-                                         transport=stdout_transport,
-                                         protocol=self,
-                                         limit=self._limit,
-                                         loop=self._loop,
-                                         _asyncio_internal=True)
-            self.stdout._set_transport(stdout_transport)
+            self.stdout = streams.StreamReader(limit=self._limit,
+                                               loop=self._loop)
+            self.stdout.set_transport(stdout_transport)
             self._pipe_fds.append(1)
 
         stderr_transport = transport.get_pipe_transport(2)
         if stderr_transport is not None:
-            self.stderr = streams.Stream(mode=streams.StreamMode.READ,
-                                         transport=stderr_transport,
-                                         protocol=self,
-                                         limit=self._limit,
-                                         loop=self._loop,
-                                         _asyncio_internal=True)
-            self.stderr._set_transport(stderr_transport)
+            self.stderr = streams.StreamReader(limit=self._limit,
+                                               loop=self._loop)
+            self.stderr.set_transport(stderr_transport)
             self._pipe_fds.append(2)
 
         stdin_transport = transport.get_pipe_transport(0)
         if stdin_transport is not None:
-            self.stdin = streams.Stream(mode=streams.StreamMode.WRITE,
-                                        transport=stdin_transport,
-                                        protocol=self,
-                                        loop=self._loop,
-                                        _asyncio_internal=True)
+            self.stdin = streams.StreamWriter(stdin_transport,
+                                              protocol=self,
+                                              reader=None,
+                                              loop=self._loop)
 
     def pipe_data_received(self, fd, data):
         if fd == 1:
@@ -80,7 +70,7 @@
         else:
             reader = None
         if reader is not None:
-            reader._feed_data(data)
+            reader.feed_data(data)
 
     def pipe_connection_lost(self, fd, exc):
         if fd == 0:
@@ -101,9 +91,9 @@
             reader = None
         if reader is not None:
             if exc is None:
-                reader._feed_eof()
+                reader.feed_eof()
             else:
-                reader._set_exception(exc)
+                reader.set_exception(exc)
 
         if fd in self._pipe_fds:
             self._pipe_fds.remove(fd)
@@ -121,20 +111,10 @@
     def _get_close_waiter(self, stream):
         if stream is self.stdin:
             return self._stdin_closed
-        elif stream is self.stdout:
-            return self._stdout_closed
-        elif stream is self.stderr:
-            return self._stderr_closed
 
 
 class Process:
-    def __init__(self, transport, protocol, loop, *, _asyncio_internal=False):
-        if not _asyncio_internal:
-            warnings.warn(f"{self.__class__} should be instantiated "
-                          "by asyncio internals only, "
-                          "please avoid its creation from user code",
-                          DeprecationWarning)
-
+    def __init__(self, transport, protocol, loop):
         self._transport = transport
         self._protocol = protocol
         self._loop = loop
@@ -232,13 +212,12 @@
         )
 
     protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
-                                                        loop=loop,
-                                                        _asyncio_internal=True)
+                                                        loop=loop)
     transport, protocol = await loop.subprocess_shell(
         protocol_factory,
         cmd, stdin=stdin, stdout=stdout,
         stderr=stderr, **kwds)
-    return Process(transport, protocol, loop, _asyncio_internal=True)
+    return Process(transport, protocol, loop)
 
 
 async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
@@ -253,11 +232,10 @@
                       stacklevel=2
         )
     protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
-                                                        loop=loop,
-                                                        _asyncio_internal=True)
+                                                        loop=loop)
     transport, protocol = await loop.subprocess_exec(
         protocol_factory,
         program, *args,
         stdin=stdin, stdout=stdout,
         stderr=stderr, **kwds)
-    return Process(transport, protocol, loop, _asyncio_internal=True)
+    return Process(transport, protocol, loop)