Send consumeTime from the app to InputDispatcher

When the app completes an input event by calling 'finishInputEvent', it
will now notify InputDispatcher about the time when the event was first
read by the app.

This will help InputManager collect dispatching latency statistics.

Sample dumpsys output for validation:
    android.view.ViewRootImpl$WindowInputEventReceiver
       mInputChannel: da2dbc7 com.google.latencyTester/com.google.latencyTester.activities.MainActivity (client)
       mSeqMap: {}
       mReceiverPtr:
        mInputConsumer:
          mResampleTouch = true
          mChannel = da2dbc7 com.google.latencyTester/com.google.latencyTester.activities.MainActivity (client)
          mMsgDeferred: false
          Batches:
              Batch:
                  Message 2277: MOTION action=MOVE
                      Pointer 0 : x=643.0 y=961.0
          mSeqChains:
              <empty>
          mConsumeTimes:
              seq = 2277 consumeTime = 74385633441431
The above dump was acquired during touch of the latencyTester app.
Also verified that normally, mConsumeTimes is empty.

Bug: 169866723
Test: atest inputflinger_tests
Test: adb shell dumpsys activity all | grep -i consume -C 20
Change-Id: Ib173bb38e5decc2e4a6b0cf9bca9aceb32352ec0
diff --git a/include/input/InputTransport.h b/include/input/InputTransport.h
index 8744ef7..11b714f 100644
--- a/include/input/InputTransport.h
+++ b/include/input/InputTransport.h
@@ -30,6 +30,7 @@
  */
 
 #include <string>
+#include <unordered_map>
 
 #include <android-base/chrono_utils.h>
 
@@ -155,6 +156,7 @@
         struct Finished {
             uint32_t empty1;
             uint32_t handled; // actually a bool, but we must maintain 8-byte alignment
+            nsecs_t consumeTime; // The time when the event was consumed by the receiving end
 
             inline size_t size() const { return sizeof(Finished); }
         } finished;
@@ -362,7 +364,8 @@
 
     /* Receives the finished signal from the consumer in reply to the original dispatch signal.
      * If a signal was received, returns the message sequence number,
-     * and whether the consumer handled the message.
+     * whether the consumer handled the message, and the time the event was first read by the
+     * consumer.
      *
      * The returned sequence number is never 0 unless the operation failed.
      *
@@ -371,7 +374,8 @@
      * Returns DEAD_OBJECT if the channel's peer has been closed.
      * Other errors probably indicate that the channel is broken.
      */
-    status_t receiveFinishedSignal(uint32_t* outSeq, bool* outHandled);
+    status_t receiveFinishedSignal(
+            const std::function<void(uint32_t seq, bool handled, nsecs_t consumeTime)>& callback);
 
 private:
     std::shared_ptr<InputChannel> mChannel;
@@ -577,6 +581,13 @@
     };
     std::vector<SeqChain> mSeqChains;
 
+    // The time at which each event with the sequence number 'seq' was consumed.
+    // This data is provided in 'finishInputEvent' so that the receiving end can measure the latency
+    // This collection is populated when the event is received, and the entries are erased when the
+    // events are finished. It should not grow infinitely because if an event is not ack'd, ANR
+    // will be raised for that connection, and no further events will be posted to that channel.
+    std::unordered_map<uint32_t /*seq*/, nsecs_t /*consumeTime*/> mConsumeTimes;
+
     status_t consumeBatch(InputEventFactoryInterface* factory,
             nsecs_t frameTime, uint32_t* outSeq, InputEvent** outEvent);
     status_t consumeSamples(InputEventFactoryInterface* factory,
@@ -589,6 +600,8 @@
     ssize_t findBatch(int32_t deviceId, int32_t source) const;
     ssize_t findTouchState(int32_t deviceId, int32_t source) const;
 
+    nsecs_t getConsumeTime(uint32_t seq) const;
+    void popConsumeTime(uint32_t seq);
     status_t sendUnchainedFinishedSignal(uint32_t seq, bool handled);
 
     static void rewriteMessage(TouchState& state, InputMessage& msg);
diff --git a/libs/input/InputTransport.cpp b/libs/input/InputTransport.cpp
index acea473..6218fdc 100644
--- a/libs/input/InputTransport.cpp
+++ b/libs/input/InputTransport.cpp
@@ -234,6 +234,7 @@
         }
         case InputMessage::Type::FINISHED: {
             msg->body.finished.handled = body.finished.handled;
+            msg->body.finished.consumeTime = body.finished.consumeTime;
             break;
         }
         case InputMessage::Type::FOCUS: {
@@ -597,7 +598,8 @@
     return mChannel->sendMessage(&msg);
 }
 
-status_t InputPublisher::receiveFinishedSignal(uint32_t* outSeq, bool* outHandled) {
+status_t InputPublisher::receiveFinishedSignal(
+        const std::function<void(uint32_t seq, bool handled, nsecs_t consumeTime)>& callback) {
     if (DEBUG_TRANSPORT_ACTIONS) {
         ALOGD("channel '%s' publisher ~ receiveFinishedSignal", mChannel->getName().c_str());
     }
@@ -605,8 +607,6 @@
     InputMessage msg;
     status_t result = mChannel->receiveMessage(&msg);
     if (result) {
-        *outSeq = 0;
-        *outHandled = false;
         return result;
     }
     if (msg.header.type != InputMessage::Type::FINISHED) {
@@ -614,8 +614,7 @@
                 mChannel->getName().c_str(), msg.header.type);
         return UNKNOWN_ERROR;
     }
-    *outSeq = msg.header.seq;
-    *outHandled = msg.body.finished.handled == 1;
+    callback(msg.header.seq, msg.body.finished.handled == 1, msg.body.finished.consumeTime);
     return OK;
 }
 
@@ -651,6 +650,9 @@
         } else {
             // Receive a fresh message.
             status_t result = mChannel->receiveMessage(&mMsg);
+            if (result == OK) {
+                mConsumeTimes.emplace(mMsg.header.seq, systemTime(SYSTEM_TIME_MONOTONIC));
+            }
             if (result) {
                 // Consume the next batched event unless batches are being held for later.
                 if (consumeBatches || result != WOULD_BLOCK) {
@@ -1147,12 +1149,33 @@
     return sendUnchainedFinishedSignal(seq, handled);
 }
 
+nsecs_t InputConsumer::getConsumeTime(uint32_t seq) const {
+    auto it = mConsumeTimes.find(seq);
+    // Consume time will be missing if either 'finishInputEvent' is called twice, or if it was
+    // called for the wrong (synthetic?) input event. Either way, it is a bug that should be fixed.
+    LOG_ALWAYS_FATAL_IF(it == mConsumeTimes.end(), "Could not find consume time for seq=%" PRIu32,
+                        seq);
+    return it->second;
+}
+
+void InputConsumer::popConsumeTime(uint32_t seq) {
+    mConsumeTimes.erase(seq);
+}
+
 status_t InputConsumer::sendUnchainedFinishedSignal(uint32_t seq, bool handled) {
     InputMessage msg;
     msg.header.type = InputMessage::Type::FINISHED;
     msg.header.seq = seq;
     msg.body.finished.handled = handled ? 1 : 0;
-    return mChannel->sendMessage(&msg);
+    msg.body.finished.consumeTime = getConsumeTime(seq);
+    status_t result = mChannel->sendMessage(&msg);
+    if (result == OK) {
+        // Remove the consume time if the socket write succeeded. We will not need to ack this
+        // message anymore. If the socket write did not succeed, we will try again and will still
+        // need consume time.
+        popConsumeTime(seq);
+    }
+    return result;
 }
 
 bool InputConsumer::hasDeferredEvent() const {
@@ -1304,8 +1327,9 @@
                     break;
                 }
                 case InputMessage::Type::FINISHED: {
-                    out += android::base::StringPrintf("handled=%s",
-                                                       toString(msg.body.finished.handled));
+                    out += android::base::StringPrintf("handled=%s, consumeTime=%" PRId64,
+                                                       toString(msg.body.finished.handled),
+                                                       msg.body.finished.consumeTime);
                     break;
                 }
                 case InputMessage::Type::FOCUS: {
@@ -1335,6 +1359,14 @@
     if (mSeqChains.empty()) {
         out += "    <empty>\n";
     }
+    out += "mConsumeTimes:\n";
+    for (const auto& [seq, consumeTime] : mConsumeTimes) {
+        out += android::base::StringPrintf("    seq = %" PRIu32 " consumeTime = %" PRId64, seq,
+                                           consumeTime);
+    }
+    if (mConsumeTimes.empty()) {
+        out += "    <empty>\n";
+    }
     return out;
 }
 
diff --git a/libs/input/tests/InputPublisherAndConsumer_test.cpp b/libs/input/tests/InputPublisherAndConsumer_test.cpp
index 9da7b69..e7e566d 100644
--- a/libs/input/tests/InputPublisherAndConsumer_test.cpp
+++ b/libs/input/tests/InputPublisherAndConsumer_test.cpp
@@ -82,6 +82,7 @@
     constexpr int32_t repeatCount = 1;
     constexpr nsecs_t downTime = 3;
     constexpr nsecs_t eventTime = 4;
+    const nsecs_t publishTime = systemTime(SYSTEM_TIME_MONOTONIC);
 
     status = mPublisher->publishKeyEvent(seq, eventId, deviceId, source, displayId, hmac, action,
                                          flags, keyCode, scanCode, metaState, repeatCount, downTime,
@@ -122,13 +123,22 @@
 
     uint32_t finishedSeq = 0;
     bool handled = false;
-    status = mPublisher->receiveFinishedSignal(&finishedSeq, &handled);
+    nsecs_t consumeTime;
+    status = mPublisher->receiveFinishedSignal(
+            [&finishedSeq, &handled, &consumeTime](uint32_t inSeq, bool inHandled,
+                                                   nsecs_t inConsumeTime) -> void {
+                finishedSeq = inSeq;
+                handled = inHandled;
+                consumeTime = inConsumeTime;
+            });
     ASSERT_EQ(OK, status)
             << "publisher receiveFinishedSignal should return OK";
     ASSERT_EQ(seq, finishedSeq)
             << "publisher receiveFinishedSignal should have returned the original sequence number";
     ASSERT_TRUE(handled)
             << "publisher receiveFinishedSignal should have set handled to consumer's reply";
+    ASSERT_GE(consumeTime, publishTime)
+            << "finished signal's consume time should be greater than publish time";
 }
 
 void InputPublisherAndConsumerTest::PublishAndConsumeMotionEvent() {
@@ -160,6 +170,7 @@
     constexpr nsecs_t downTime = 3;
     constexpr size_t pointerCount = 3;
     constexpr nsecs_t eventTime = 4;
+    const nsecs_t publishTime = systemTime(SYSTEM_TIME_MONOTONIC);
     PointerProperties pointerProperties[pointerCount];
     PointerCoords pointerCoords[pointerCount];
     for (size_t i = 0; i < pointerCount; i++) {
@@ -262,13 +273,22 @@
 
     uint32_t finishedSeq = 0;
     bool handled = true;
-    status = mPublisher->receiveFinishedSignal(&finishedSeq, &handled);
+    nsecs_t consumeTime;
+    status = mPublisher->receiveFinishedSignal(
+            [&finishedSeq, &handled, &consumeTime](uint32_t inSeq, bool inHandled,
+                                                   nsecs_t inConsumeTime) -> void {
+                finishedSeq = inSeq;
+                handled = inHandled;
+                consumeTime = inConsumeTime;
+            });
     ASSERT_EQ(OK, status)
             << "publisher receiveFinishedSignal should return OK";
     ASSERT_EQ(seq, finishedSeq)
             << "publisher receiveFinishedSignal should have returned the original sequence number";
     ASSERT_FALSE(handled)
             << "publisher receiveFinishedSignal should have set handled to consumer's reply";
+    ASSERT_GE(consumeTime, publishTime)
+            << "finished signal's consume time should be greater than publish time";
 }
 
 void InputPublisherAndConsumerTest::PublishAndConsumeFocusEvent() {
@@ -278,6 +298,7 @@
     int32_t eventId = InputEvent::nextId();
     constexpr bool hasFocus = true;
     constexpr bool inTouchMode = true;
+    const nsecs_t publishTime = systemTime(SYSTEM_TIME_MONOTONIC);
 
     status = mPublisher->publishFocusEvent(seq, eventId, hasFocus, inTouchMode);
     ASSERT_EQ(OK, status) << "publisher publishKeyEvent should return OK";
@@ -302,12 +323,21 @@
 
     uint32_t finishedSeq = 0;
     bool handled = false;
-    status = mPublisher->receiveFinishedSignal(&finishedSeq, &handled);
+    nsecs_t consumeTime;
+    status = mPublisher->receiveFinishedSignal(
+            [&finishedSeq, &handled, &consumeTime](uint32_t inSeq, bool inHandled,
+                                                   nsecs_t inConsumeTime) -> void {
+                finishedSeq = inSeq;
+                handled = inHandled;
+                consumeTime = inConsumeTime;
+            });
     ASSERT_EQ(OK, status) << "publisher receiveFinishedSignal should return OK";
     ASSERT_EQ(seq, finishedSeq)
             << "publisher receiveFinishedSignal should have returned the original sequence number";
     ASSERT_TRUE(handled)
             << "publisher receiveFinishedSignal should have set handled to consumer's reply";
+    ASSERT_GE(consumeTime, publishTime)
+            << "finished signal's consume time should be greater than publish time";
 }
 
 void InputPublisherAndConsumerTest::PublishAndConsumeCaptureEvent() {
@@ -316,6 +346,7 @@
     constexpr uint32_t seq = 42;
     int32_t eventId = InputEvent::nextId();
     constexpr bool captureEnabled = true;
+    const nsecs_t publishTime = systemTime(SYSTEM_TIME_MONOTONIC);
 
     status = mPublisher->publishCaptureEvent(seq, eventId, captureEnabled);
     ASSERT_EQ(OK, status) << "publisher publishKeyEvent should return OK";
@@ -339,12 +370,21 @@
 
     uint32_t finishedSeq = 0;
     bool handled = false;
-    status = mPublisher->receiveFinishedSignal(&finishedSeq, &handled);
+    nsecs_t consumeTime;
+    status = mPublisher->receiveFinishedSignal(
+            [&finishedSeq, &handled, &consumeTime](uint32_t inSeq, bool inHandled,
+                                                   nsecs_t inConsumeTime) -> void {
+                finishedSeq = inSeq;
+                handled = inHandled;
+                consumeTime = inConsumeTime;
+            });
     ASSERT_EQ(OK, status) << "publisher receiveFinishedSignal should return OK";
     ASSERT_EQ(seq, finishedSeq)
             << "publisher receiveFinishedSignal should have returned the original sequence number";
     ASSERT_TRUE(handled)
             << "publisher receiveFinishedSignal should have set handled to consumer's reply";
+    ASSERT_GE(consumeTime, publishTime)
+            << "finished signal's consume time should be greater than publish time";
 }
 
 TEST_F(InputPublisherAndConsumerTest, PublishKeyEvent_EndToEnd) {
diff --git a/libs/input/tests/StructLayout_test.cpp b/libs/input/tests/StructLayout_test.cpp
index 4107d61..a886585 100644
--- a/libs/input/tests/StructLayout_test.cpp
+++ b/libs/input/tests/StructLayout_test.cpp
@@ -83,6 +83,7 @@
   CHECK_OFFSET(InputMessage::Body::Capture, pointerCaptureEnabled, 4);
 
   CHECK_OFFSET(InputMessage::Body::Finished, handled, 4);
+  CHECK_OFFSET(InputMessage::Body::Finished, consumeTime, 8);
 }
 
 void TestHeaderSize() {
@@ -100,7 +101,7 @@
     static_assert(sizeof(InputMessage::Body::Motion) ==
                   offsetof(InputMessage::Body::Motion, pointers) +
                           sizeof(InputMessage::Body::Motion::Pointer) * MAX_POINTERS);
-    static_assert(sizeof(InputMessage::Body::Finished) == 8);
+    static_assert(sizeof(InputMessage::Body::Finished) == 16);
     static_assert(sizeof(InputMessage::Body::Focus) == 8);
     static_assert(sizeof(InputMessage::Body::Capture) == 8);
 }
diff --git a/services/inputflinger/dispatcher/Entry.h b/services/inputflinger/dispatcher/Entry.h
index 499f42e..45a007b 100644
--- a/services/inputflinger/dispatcher/Entry.h
+++ b/services/inputflinger/dispatcher/Entry.h
@@ -272,6 +272,7 @@
     std::string obscuringPackage;
     bool enabled;
     int32_t pid;
+    nsecs_t consumeTime; // time when the event was consumed by InputConsumer
 };
 
 } // namespace android::inputdispatcher
diff --git a/services/inputflinger/dispatcher/InputDispatcher.cpp b/services/inputflinger/dispatcher/InputDispatcher.cpp
index 2909939..e4237b5 100644
--- a/services/inputflinger/dispatcher/InputDispatcher.cpp
+++ b/services/inputflinger/dispatcher/InputDispatcher.cpp
@@ -3113,7 +3113,7 @@
 
 void InputDispatcher::finishDispatchCycleLocked(nsecs_t currentTime,
                                                 const sp<Connection>& connection, uint32_t seq,
-                                                bool handled) {
+                                                bool handled, nsecs_t consumeTime) {
 #if DEBUG_DISPATCH_CYCLE
     ALOGD("channel '%s' ~ finishDispatchCycle - seq=%u, handled=%s",
           connection->getInputChannelName().c_str(), seq, toString(handled));
@@ -3125,7 +3125,7 @@
     }
 
     // Notify other system components and prepare to start the next dispatch cycle.
-    onDispatchCycleFinishedLocked(currentTime, connection, seq, handled);
+    onDispatchCycleFinishedLocked(currentTime, connection, seq, handled, consumeTime);
 }
 
 void InputDispatcher::abortBrokenDispatchCycleLocked(nsecs_t currentTime,
@@ -3196,13 +3196,15 @@
             bool gotOne = false;
             status_t status;
             for (;;) {
-                uint32_t seq;
-                bool handled;
-                status = connection->inputPublisher.receiveFinishedSignal(&seq, &handled);
+                std::function<void(uint32_t seq, bool handled, nsecs_t consumeTime)> callback =
+                        std::bind(&InputDispatcher::finishDispatchCycleLocked, d, currentTime,
+                                  connection, std::placeholders::_1, std::placeholders::_2,
+                                  std::placeholders::_3);
+
+                status = connection->inputPublisher.receiveFinishedSignal(callback);
                 if (status) {
                     break;
                 }
-                d->finishDispatchCycleLocked(currentTime, connection, seq, handled);
                 gotOne = true;
             }
             if (gotOne) {
@@ -5154,13 +5156,14 @@
 
 void InputDispatcher::onDispatchCycleFinishedLocked(nsecs_t currentTime,
                                                     const sp<Connection>& connection, uint32_t seq,
-                                                    bool handled) {
+                                                    bool handled, nsecs_t consumeTime) {
     std::unique_ptr<CommandEntry> commandEntry = std::make_unique<CommandEntry>(
             &InputDispatcher::doDispatchCycleFinishedLockedInterruptible);
     commandEntry->connection = connection;
     commandEntry->eventTime = currentTime;
     commandEntry->seq = seq;
     commandEntry->handled = handled;
+    commandEntry->consumeTime = consumeTime;
     postCommandLocked(std::move(commandEntry));
 }
 
diff --git a/services/inputflinger/dispatcher/InputDispatcher.h b/services/inputflinger/dispatcher/InputDispatcher.h
index c93d74e..83094c2 100644
--- a/services/inputflinger/dispatcher/InputDispatcher.h
+++ b/services/inputflinger/dispatcher/InputDispatcher.h
@@ -528,7 +528,7 @@
     void startDispatchCycleLocked(nsecs_t currentTime, const sp<Connection>& connection)
             REQUIRES(mLock);
     void finishDispatchCycleLocked(nsecs_t currentTime, const sp<Connection>& connection,
-                                   uint32_t seq, bool handled) REQUIRES(mLock);
+                                   uint32_t seq, bool handled, nsecs_t consumeTime) REQUIRES(mLock);
     void abortBrokenDispatchCycleLocked(nsecs_t currentTime, const sp<Connection>& connection,
                                         bool notify) REQUIRES(mLock);
     void drainDispatchQueue(std::deque<DispatchEntry*>& queue);
@@ -578,7 +578,8 @@
 
     // Interesting events that we might like to log or tell the framework about.
     void onDispatchCycleFinishedLocked(nsecs_t currentTime, const sp<Connection>& connection,
-                                       uint32_t seq, bool handled) REQUIRES(mLock);
+                                       uint32_t seq, bool handled, nsecs_t consumeTime)
+            REQUIRES(mLock);
     void onDispatchCycleBrokenLocked(nsecs_t currentTime, const sp<Connection>& connection)
             REQUIRES(mLock);
     void onFocusChangedLocked(const FocusResolver::FocusChanges& changes) REQUIRES(mLock);