Make NDC not block forever and handle restart.

Still possible for netd to get wedged but system won't die because of it.
Leads the way to having forking netd - then only individual commands would
wedge, promoting stability.

bug:5864209
bug:6019952
Change-Id: I43e4c5072863b8b812d4fe24d30d92eb1a11651a
diff --git a/services/java/com/android/server/NativeDaemonConnector.java b/services/java/com/android/server/NativeDaemonConnector.java
index 308661f..6593a0f 100644
--- a/services/java/com/android/server/NativeDaemonConnector.java
+++ b/services/java/com/android/server/NativeDaemonConnector.java
@@ -34,8 +34,8 @@
 import java.io.OutputStream;
 import java.io.PrintWriter;
 import java.util.ArrayList;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.LinkedList;
 
 /**
  * Generic connector class for interfacing with a native daemon which uses the
@@ -50,11 +50,15 @@
     private OutputStream mOutputStream;
     private LocalLog mLocalLog;
 
-    private final BlockingQueue<NativeDaemonEvent> mResponseQueue;
+    private final ResponseQueue mResponseQueue;
 
     private INativeDaemonConnectorCallbacks mCallbacks;
     private Handler mCallbackHandler;
 
+    private AtomicInteger mSequenceNumber;
+
+    private static final int DEFAULT_TIMEOUT = 1 * 60 * 1000; /* 1 minute */
+
     /** Lock held whenever communicating with native daemon. */
     private final Object mDaemonLock = new Object();
 
@@ -64,7 +68,8 @@
             int responseQueueSize, String logTag, int maxLogSize) {
         mCallbacks = callbacks;
         mSocket = socket;
-        mResponseQueue = new LinkedBlockingQueue<NativeDaemonEvent>(responseQueueSize);
+        mResponseQueue = new ResponseQueue(responseQueueSize);
+        mSequenceNumber = new AtomicInteger(0);
         TAG = logTag != null ? logTag : "NativeDaemonConnector";
         mLocalLog = new LocalLog(maxLogSize);
     }
@@ -79,7 +84,7 @@
             try {
                 listenToSocket();
             } catch (Exception e) {
-                Slog.e(TAG, "Error in NativeDaemonConnector", e);
+                loge("Error in NativeDaemonConnector: " + e);
                 SystemClock.sleep(5000);
             }
         }
@@ -90,12 +95,10 @@
         String event = (String) msg.obj;
         try {
             if (!mCallbacks.onEvent(msg.what, event, event.split(" "))) {
-                Slog.w(TAG, String.format(
-                        "Unhandled event '%s'", event));
+                log(String.format("Unhandled event '%s'", event));
             }
         } catch (Exception e) {
-            Slog.e(TAG, String.format(
-                    "Error handling '%s'", event), e);
+            loge("Error handling '" + event + "': " + e);
         }
         return true;
     }
@@ -111,7 +114,9 @@
             socket.connect(address);
 
             InputStream inputStream = socket.getInputStream();
-            mOutputStream = socket.getOutputStream();
+            synchronized (mDaemonLock) {
+                mOutputStream = socket.getOutputStream();
+            }
 
             mCallbacks.onDaemonConnected();
 
@@ -120,7 +125,10 @@
 
             while (true) {
                 int count = inputStream.read(buffer, start, BUFFER_SIZE - start);
-                if (count < 0) break;
+                if (count < 0) {
+                    loge("got " + count + " reading with start = " + start);
+                    break;
+                }
 
                 // Add our starting point to the count and reset the start.
                 count += start;
@@ -140,14 +148,10 @@
                                 mCallbackHandler.sendMessage(mCallbackHandler.obtainMessage(
                                         event.getCode(), event.getRawEvent()));
                             } else {
-                                try {
-                                    mResponseQueue.put(event);
-                                } catch (InterruptedException ex) {
-                                    Slog.e(TAG, "Failed to put response onto queue: " + ex);
-                                }
+                                mResponseQueue.add(event.getCmdNumber(), event);
                             }
                         } catch (IllegalArgumentException e) {
-                            Slog.w(TAG, "Problem parsing message: " + rawEvent, e);
+                            log("Problem parsing message: " + rawEvent + " - " + e);
                         }
 
                         start = i + 1;
@@ -169,15 +173,16 @@
                 }
             }
         } catch (IOException ex) {
-            Slog.e(TAG, "Communications error", ex);
+            loge("Communications error: " + ex);
             throw ex;
         } finally {
             synchronized (mDaemonLock) {
                 if (mOutputStream != null) {
                     try {
+                        loge("closing stream for " + mSocket);
                         mOutputStream.close();
                     } catch (IOException e) {
-                        Slog.w(TAG, "Failed closing output stream", e);
+                        loge("Failed closing output stream: " + e);
                     }
                     mOutputStream = null;
                 }
@@ -188,17 +193,17 @@
                     socket.close();
                 }
             } catch (IOException ex) {
-                Slog.w(TAG, "Failed closing socket", ex);
+                loge("Failed closing socket: " + ex);
             }
         }
     }
 
     /**
-     * Send command to daemon, escaping arguments as needed.
+     * Make command for daemon, escaping arguments as needed.
      *
-     * @return the final command issued.
+     * @return the final command.
      */
-    private String sendCommandLocked(String cmd, Object... args)
+    private StringBuilder makeCommand(String cmd, Object... args)
             throws NativeDaemonConnectorException {
         // TODO: eventually enforce that cmd doesn't contain arguments
         if (cmd.indexOf('\0') >= 0) {
@@ -216,22 +221,33 @@
             appendEscaped(builder, argString);
         }
 
-        final String unterminated = builder.toString();
-        log("SND -> {" + unterminated + "}");
+        return builder;
+    }
+
+    private int sendCommand(StringBuilder builder)
+            throws NativeDaemonConnectorException {
+
+        int sequenceNumber = mSequenceNumber.incrementAndGet();
+
+        builder.insert(0, Integer.toString(sequenceNumber) + " ");
+
+        log("SND -> {" + builder.toString() + "}");
 
         builder.append('\0');
 
-        if (mOutputStream == null) {
-            throw new NativeDaemonConnectorException("missing output stream");
-        } else {
-            try {
-                mOutputStream.write(builder.toString().getBytes(Charsets.UTF_8));
-            } catch (IOException e) {
-                throw new NativeDaemonConnectorException("problem sending command", e);
+        synchronized (mDaemonLock) {
+            if (mOutputStream == null) {
+                throw new NativeDaemonConnectorException("missing output stream");
+            } else {
+                try {
+                    mOutputStream.write(builder.toString().getBytes(Charsets.UTF_8));
+                } catch (IOException e) {
+                    throw new NativeDaemonConnectorException("problem sending command", e);
+                }
             }
         }
 
-        return unterminated;
+        return sequenceNumber;
     }
 
     /**
@@ -292,39 +308,42 @@
      */
     public NativeDaemonEvent[] executeForList(String cmd, Object... args)
             throws NativeDaemonConnectorException {
-        synchronized (mDaemonLock) {
-            return executeLocked(cmd, args);
-        }
+            return execute(DEFAULT_TIMEOUT, cmd, args);
     }
 
-    private NativeDaemonEvent[] executeLocked(String cmd, Object... args)
+    /**
+     * Issue the given command to the native daemon and return any
+     * {@linke NativeDaemonEvent@isClassContinue()} responses, including the
+     * final terminal response.  Note that the timeout does not count time in
+     * deep sleep.
+     *
+     * @throws NativeDaemonConnectorException when problem communicating with
+     *             native daemon, or if the response matches
+     *             {@link NativeDaemonEvent#isClassClientError()} or
+     *             {@link NativeDaemonEvent#isClassServerError()}.
+     */
+    public NativeDaemonEvent[] execute(int timeout, String cmd, Object... args)
             throws NativeDaemonConnectorException {
         final ArrayList<NativeDaemonEvent> events = Lists.newArrayList();
-
-        while (mResponseQueue.size() > 0) {
-            try {
-                log("ignoring {" + mResponseQueue.take() + "}");
-            } catch (Exception e) {}
-        }
-
-        final String sentCommand = sendCommandLocked(cmd, args);
+        final StringBuilder sentCommand = makeCommand(cmd, args);
+        final int cmdNumber = sendCommand(sentCommand);
 
         NativeDaemonEvent event = null;
+        cmd = sentCommand.toString();
         do {
-            try {
-                event = mResponseQueue.take();
-            } catch (InterruptedException e) {
-                Slog.w(TAG, "interrupted waiting for event line");
-                continue;
+            event = mResponseQueue.remove(cmdNumber, timeout, cmd);
+            if (event == null) {
+                loge("timed-out waiting for response to " + cmdNumber + " " + cmd);
+                throw new NativeDaemonFailureException(cmd, event);
             }
             events.add(event);
         } while (event.isClassContinue());
 
         if (event.isClassClientError()) {
-            throw new NativeDaemonArgumentException(sentCommand, event);
+            throw new NativeDaemonArgumentException(cmd, event);
         }
         if (event.isClassServerError()) {
-            throw new NativeDaemonFailureException(sentCommand, event);
+            throw new NativeDaemonFailureException(cmd, event);
         }
 
         return events.toArray(new NativeDaemonEvent[events.size()]);
@@ -448,10 +467,120 @@
 
     public void dump(FileDescriptor fd, PrintWriter pw, String[] args) {
         mLocalLog.dump(fd, pw, args);
+        pw.println();
+        mResponseQueue.dump(fd, pw, args);
     }
 
     private void log(String logstring) {
         if (LOGD) Slog.d(TAG, logstring);
         mLocalLog.log(logstring);
     }
+
+    private void loge(String logstring) {
+        Slog.e(TAG, logstring);
+        mLocalLog.log(logstring);
+    }
+
+    private static class ResponseQueue {
+
+        private static class Response {
+            public int cmdNum;
+            public LinkedList<NativeDaemonEvent> responses = new LinkedList<NativeDaemonEvent>();
+            public String request;
+            public Response(int c, String r) {cmdNum = c; request = r;}
+        }
+
+        private final LinkedList<Response> mResponses;
+        private int mMaxCount;
+
+        ResponseQueue(int maxCount) {
+            mResponses = new LinkedList<Response>();
+            mMaxCount = maxCount;
+        }
+
+        public void add(int cmdNum, NativeDaemonEvent response) {
+            Response found = null;
+            synchronized (mResponses) {
+                for (Response r : mResponses) {
+                    if (r.cmdNum == cmdNum) {
+                        found = r;
+                        break;
+                    }
+                }
+                if (found == null) {
+                    // didn't find it - make sure our queue isn't too big before adding
+                    // another..
+                    while (mResponses.size() >= mMaxCount) {
+                        Slog.e("NativeDaemonConnector.ResponseQueue",
+                                "more buffered than allowed: " + mResponses.size() +
+                                " >= " + mMaxCount);
+                        // let any waiter timeout waiting for this
+                        Response r = mResponses.remove();
+                        Slog.e("NativeDaemonConnector.ResponseQueue",
+                                "Removing request: " + r.request + " (" + r.cmdNum + ")");
+                    }
+                    found = new Response(cmdNum, null);
+                    mResponses.add(found);
+                }
+                found.responses.add(response);
+            }
+            synchronized (found) {
+                found.notify();
+            }
+        }
+
+        // note that the timeout does not count time in deep sleep.  If you don't want
+        // the device to sleep, hold a wakelock
+        public NativeDaemonEvent remove(int cmdNum, int timeoutMs, String origCmd) {
+            long endTime = SystemClock.uptimeMillis() + timeoutMs;
+            long nowTime;
+            Response found = null;
+            while (true) {
+                synchronized (mResponses) {
+                    for (Response response : mResponses) {
+                        if (response.cmdNum == cmdNum) {
+                            found = response;
+                            // how many response fragments are left
+                            switch (response.responses.size()) {
+                            case 0:  // haven't got any - must wait
+                                break;
+                            case 1:  // last one - remove this from the master list
+                                mResponses.remove(response); // fall through
+                            default: // take one and move on
+                                response.request = origCmd;
+                                return response.responses.remove();
+                            }
+                        }
+                    }
+                    nowTime = SystemClock.uptimeMillis();
+                    if (endTime <= nowTime) {
+                        Slog.e("NativeDaemonConnector.ResponseQueue",
+                                "Timeout waiting for response");
+                        return null;
+                    }
+                    /* pre-allocate so we have something unique to wait on */
+                    if (found == null) {
+                        found = new Response(cmdNum, origCmd);
+                        mResponses.add(found);
+                    }
+                }
+                try {
+                    synchronized (found) {
+                        found.wait(endTime - nowTime);
+                    }
+                } catch (InterruptedException e) {
+                    // loop around to check if we're done or if it's time to stop waiting
+                }
+            }
+        }
+
+        public void dump(FileDescriptor fd, PrintWriter pw, String[] args) {
+            pw.println("Pending requests:");
+            synchronized (mResponses) {
+                for (Response response : mResponses) {
+                    pw.println("  Cmd " + response.cmdNum + " - " + response.request);
+                }
+            }
+        }
+    }
 }