adb: don't close sockets before hitting EOF.

Reimplement commit de4b85ee780b4f4a0223b949b9a36d3ca1762635 using
fdevent. The previous attempt was reverted because we were blindly
continuing when revents & POLLIN == 0, which ignored POLLHUP/POLLERR,
leading to spinloops when the opposite end of the file descriptor was
shutdown when we had no data left to read.

This patch reimplements the functionality implemented by that commit
using fdevent, which gets us detection of spin loops for free.

Bug: http://b/74616284
Test: ./test_device.py
Change-Id: I1abd671fef4c29e99dad968aa66bb754ca382578
diff --git a/test_device.py b/test_device.py
index 34f8fd9..f95a5b3 100755
--- a/test_device.py
+++ b/test_device.py
@@ -35,6 +35,8 @@
 import time
 import unittest
 
+from datetime import datetime
+
 import adb
 
 def requires_root(func):
@@ -1335,6 +1337,63 @@
             self.device.forward_remove("tcp:{}".format(local_port))
 
 
+class SocketTest(DeviceTest):
+    def test_socket_flush(self):
+        """Test that we handle socket closure properly.
+
+        If we're done writing to a socket, closing before the other end has
+        closed will send a TCP_RST if we have incoming data queued up, which
+        may result in data that we've written being discarded.
+
+        Bug: http://b/74616284
+        """
+        s = socket.create_connection(("localhost", 5037))
+
+        def adb_length_prefixed(string):
+            encoded = string.encode("utf8")
+            result = b"%04x%s" % (len(encoded), encoded)
+            return result
+
+        if "ANDROID_SERIAL" in os.environ:
+            transport_string = "host:transport:" + os.environ["ANDROID_SERIAL"]
+        else:
+            transport_string = "host:transport-any"
+
+        s.sendall(adb_length_prefixed(transport_string))
+        response = s.recv(4)
+        self.assertEquals(b"OKAY", response)
+
+        shell_string = "shell:sleep 0.5; dd if=/dev/zero bs=1m count=1 status=none; echo foo"
+        s.sendall(adb_length_prefixed(shell_string))
+
+        response = s.recv(4)
+        self.assertEquals(b"OKAY", response)
+
+        # Spawn a thread that dumps garbage into the socket until failure.
+        def spam():
+            buf = b"\0" * 16384
+            try:
+                while True:
+                    s.sendall(buf)
+            except Exception as ex:
+                print(ex)
+
+        thread = threading.Thread(target=spam)
+        thread.start()
+
+        time.sleep(1)
+
+        received = b""
+        while True:
+            read = s.recv(512)
+            if len(read) == 0:
+                break
+            received += read
+
+        self.assertEquals(1024 * 1024 + len("foo\n"), len(received))
+        thread.join()
+
+
 if sys.platform == "win32":
     # From https://stackoverflow.com/a/38749458
     import os