Make StatsLog drop less.

+ Create a thread-safe LogEventQueue to buffer log events.

+ The socket listner thread will read from socket and write to the buffer as quickly as possible
  to minimize the data loss in socket.

+ All pushed data is fetched from the the buffer and processed in a dedicated thread. After an
  event is fetched from the queue, we no longer block the socket listener thread.

+ Report event queue stats via statsdstats, including the min and max queue event history span in
  the queue (to understand how slow statsd can be and how fast the events can be)

Bug: 119031518
Test: unit tests added in statsd_test

Change-Id: I6b65ed9a678935b2e24302ba4b36e69c157adde4
diff --git a/cmds/statsd/Android.bp b/cmds/statsd/Android.bp
index 8cd409e..017cb6d 100644
--- a/cmds/statsd/Android.bp
+++ b/cmds/statsd/Android.bp
@@ -81,7 +81,7 @@
         "src/external/StatsPullerManager.cpp",
         "src/external/puller_util.cpp",
         "src/logd/LogEvent.cpp",
-        "src/logd/LogListener.cpp",
+        "src/logd/LogEventQueue.cpp",
         "src/matchers/CombinationLogMatchingTracker.cpp",
         "src/matchers/EventMatcherWizard.cpp",
         "src/matchers/matcher_util.cpp",
@@ -226,6 +226,7 @@
         "tests/indexed_priority_queue_test.cpp",
         "tests/LogEntryMatcher_test.cpp",
         "tests/LogEvent_test.cpp",
+        "tests/log_event/LogEventQueue_test.cpp",
         "tests/MetricsManager_test.cpp",
         "tests/StatsLogProcessor_test.cpp",
         "tests/StatsService_test.cpp",
diff --git a/cmds/statsd/src/StatsService.cpp b/cmds/statsd/src/StatsService.cpp
index 52ecdc8..bfe2da9 100644
--- a/cmds/statsd/src/StatsService.cpp
+++ b/cmds/statsd/src/StatsService.cpp
@@ -132,34 +132,36 @@
     }                                                             \
 }
 
-StatsService::StatsService(const sp<Looper>& handlerLooper)
-    : mAnomalyAlarmMonitor(new AlarmMonitor(MIN_DIFF_TO_UPDATE_REGISTERED_ALARM_SECS,
-       [](const sp<IStatsCompanionService>& sc, int64_t timeMillis) {
-           if (sc != nullptr) {
-               sc->setAnomalyAlarm(timeMillis);
-               StatsdStats::getInstance().noteRegisteredAnomalyAlarmChanged();
-           }
-       },
-       [](const sp<IStatsCompanionService>& sc) {
-           if (sc != nullptr) {
-               sc->cancelAnomalyAlarm();
-               StatsdStats::getInstance().noteRegisteredAnomalyAlarmChanged();
-           }
-       })),
-   mPeriodicAlarmMonitor(new AlarmMonitor(MIN_DIFF_TO_UPDATE_REGISTERED_ALARM_SECS,
-      [](const sp<IStatsCompanionService>& sc, int64_t timeMillis) {
-           if (sc != nullptr) {
-               sc->setAlarmForSubscriberTriggering(timeMillis);
-               StatsdStats::getInstance().noteRegisteredPeriodicAlarmChanged();
-           }
-      },
-      [](const sp<IStatsCompanionService>& sc) {
-           if (sc != nullptr) {
-               sc->cancelAlarmForSubscriberTriggering();
-               StatsdStats::getInstance().noteRegisteredPeriodicAlarmChanged();
-           }
-
-      }))  {
+StatsService::StatsService(const sp<Looper>& handlerLooper, shared_ptr<LogEventQueue> queue)
+    : mAnomalyAlarmMonitor(new AlarmMonitor(
+              MIN_DIFF_TO_UPDATE_REGISTERED_ALARM_SECS,
+              [](const sp<IStatsCompanionService>& sc, int64_t timeMillis) {
+                  if (sc != nullptr) {
+                      sc->setAnomalyAlarm(timeMillis);
+                      StatsdStats::getInstance().noteRegisteredAnomalyAlarmChanged();
+                  }
+              },
+              [](const sp<IStatsCompanionService>& sc) {
+                  if (sc != nullptr) {
+                      sc->cancelAnomalyAlarm();
+                      StatsdStats::getInstance().noteRegisteredAnomalyAlarmChanged();
+                  }
+              })),
+      mPeriodicAlarmMonitor(new AlarmMonitor(
+              MIN_DIFF_TO_UPDATE_REGISTERED_ALARM_SECS,
+              [](const sp<IStatsCompanionService>& sc, int64_t timeMillis) {
+                  if (sc != nullptr) {
+                      sc->setAlarmForSubscriberTriggering(timeMillis);
+                      StatsdStats::getInstance().noteRegisteredPeriodicAlarmChanged();
+                  }
+              },
+              [](const sp<IStatsCompanionService>& sc) {
+                  if (sc != nullptr) {
+                      sc->cancelAlarmForSubscriberTriggering();
+                      StatsdStats::getInstance().noteRegisteredPeriodicAlarmChanged();
+                  }
+              })),
+      mEventQueue(queue) {
     mUidMap = UidMap::getInstance();
     mPullerManager = new StatsPullerManager();
     StatsPuller::SetUidMap(mUidMap);
@@ -201,11 +203,33 @@
     mConfigManager->AddListener(mProcessor);
 
     init_system_properties();
+
+    if (mEventQueue != nullptr) {
+        std::thread pushedEventThread([this] { readLogs(); });
+        pushedEventThread.detach();
+    }
 }
 
 StatsService::~StatsService() {
 }
 
+/* Runs on a dedicated thread to process pushed events. */
+void StatsService::readLogs() {
+    // Read forever..... long live statsd
+    while (1) {
+        // Block until an event is available.
+        auto event = mEventQueue->waitPop();
+        // Pass it to StatsLogProcess to all configs/metrics
+        // At this point, the LogEventQueue is not blocked, so that the socketListener
+        // can read events from the socket and write to buffer to avoid data drop.
+        mProcessor->OnLogEvent(event.get());
+        // The ShellSubscriber is only used by shell for local debugging.
+        if (mShellSubscriber != nullptr) {
+            mShellSubscriber->onLogEvent(*event);
+        }
+    }
+}
+
 void StatsService::init_system_properties() {
     mEngBuild = false;
     const prop_info* buildType = __system_property_find("ro.build.type");
@@ -1009,6 +1033,7 @@
     }
 }
 
+// Test only interface!!!
 void StatsService::OnLogEvent(LogEvent* event) {
     mProcessor->OnLogEvent(event);
     if (mShellSubscriber != nullptr) {
diff --git a/cmds/statsd/src/StatsService.h b/cmds/statsd/src/StatsService.h
index d24565a..cbe5b48 100644
--- a/cmds/statsd/src/StatsService.h
+++ b/cmds/statsd/src/StatsService.h
@@ -22,7 +22,7 @@
 #include "anomaly/AlarmMonitor.h"
 #include "config/ConfigManager.h"
 #include "external/StatsPullerManager.h"
-#include "logd/LogListener.h"
+#include "logd/LogEventQueue.h"
 #include "packages/UidMap.h"
 #include "shell/ShellSubscriber.h"
 #include "statscompanion_util.h"
@@ -52,11 +52,10 @@
 using android::hardware::Return;
 
 class StatsService : public BnStatsManager,
-                     public LogListener,
                      public IStats,
                      public IBinder::DeathRecipient {
 public:
-    StatsService(const sp<Looper>& handlerLooper);
+    StatsService(const sp<Looper>& handlerLooper, std::shared_ptr<LogEventQueue> queue);
     virtual ~StatsService();
 
     /** The anomaly alarm registered with AlarmManager won't be updated by less than this. */
@@ -92,7 +91,7 @@
     void Terminate();
 
     /**
-     * Called by LogReader when there's a log event to process.
+     * Test ONLY interface. In real world, StatsService reads from LogEventQueue.
      */
     virtual void OnLogEvent(LogEvent* event);
 
@@ -278,6 +277,9 @@
      */
     void print_cmd_help(int out);
 
+    /* Runs on its dedicated thread to process pushed stats event from socket. */
+    void readLogs();
+
     /**
      * Trigger a broadcast.
      */
@@ -410,6 +412,8 @@
 
     sp<ShellSubscriber> mShellSubscriber;
 
+    std::shared_ptr<LogEventQueue> mEventQueue;
+
     FRIEND_TEST(StatsServiceTest, TestAddConfig_simple);
     FRIEND_TEST(StatsServiceTest, TestAddConfig_empty);
     FRIEND_TEST(StatsServiceTest, TestAddConfig_invalid);
diff --git a/cmds/statsd/src/guardrail/StatsdStats.cpp b/cmds/statsd/src/guardrail/StatsdStats.cpp
index 29100aa..74a4c87 100644
--- a/cmds/statsd/src/guardrail/StatsdStats.cpp
+++ b/cmds/statsd/src/guardrail/StatsdStats.cpp
@@ -50,6 +50,7 @@
 const int FIELD_ID_PERIODIC_ALARM_STATS = 12;
 const int FIELD_ID_SYSTEM_SERVER_RESTART = 15;
 const int FIELD_ID_LOGGER_ERROR_STATS = 16;
+const int FIELD_ID_OVERFLOW = 18;
 
 const int FIELD_ID_ATOM_STATS_TAG = 1;
 const int FIELD_ID_ATOM_STATS_COUNT = 2;
@@ -64,6 +65,10 @@
 const int FIELD_ID_LOG_LOSS_STATS_UID = 5;
 const int FIELD_ID_LOG_LOSS_STATS_PID = 6;
 
+const int FIELD_ID_OVERFLOW_COUNT = 1;
+const int FIELD_ID_OVERFLOW_MAX_HISTORY = 2;
+const int FIELD_ID_OVERFLOW_MIN_HISTORY = 3;
+
 const int FIELD_ID_CONFIG_STATS_UID = 1;
 const int FIELD_ID_CONFIG_STATS_ID = 2;
 const int FIELD_ID_CONFIG_STATS_CREATION = 3;
@@ -235,6 +240,22 @@
     noteDataDropped(key, totalBytes, getWallClockSec());
 }
 
+void StatsdStats::noteEventQueueOverflow(int64_t oldestEventTimestampNs) {
+    lock_guard<std::mutex> lock(mLock);
+
+    mOverflowCount++;
+
+    int64_t history = getElapsedRealtimeNs() - oldestEventTimestampNs;
+
+    if (history > mMaxQueueHistoryNs) {
+        mMaxQueueHistoryNs = history;
+    }
+
+    if (history < mMinQueueHistoryNs) {
+        mMinQueueHistoryNs = history;
+    }
+}
+
 void StatsdStats::noteDataDropped(const ConfigKey& key, const size_t totalBytes, int32_t timeSec) {
     lock_guard<std::mutex> lock(mLock);
     auto it = mConfigStats.find(key);
@@ -534,6 +555,9 @@
     mPeriodicAlarmRegisteredStats = 0;
     mSystemServerRestartSec.clear();
     mLogLossStats.clear();
+    mOverflowCount = 0;
+    mMinQueueHistoryNs = kInt64Max;
+    mMaxQueueHistoryNs = 0;
     for (auto& config : mConfigStats) {
         config.second->broadcast_sent_time_sec.clear();
         config.second->activation_time_sec.clear();
@@ -726,6 +750,9 @@
                 (long long)loss.mWallClockSec, loss.mCount, loss.mLastError, loss.mLastTag,
                 loss.mUid, loss.mPid);
     }
+
+    dprintf(out, "Event queue overflow: %d; MaxHistoryNs: %lld; MinHistoryNs: %lld\n",
+            mOverflowCount, (long long)mMaxQueueHistoryNs, (long long)mMinQueueHistoryNs);
 }
 
 void addConfigStatsToProto(const ConfigStats& configStats, ProtoOutputStream* proto) {
@@ -904,6 +931,16 @@
         proto.end(token);
     }
 
+    if (mOverflowCount > 0) {
+        uint64_t token = proto.start(FIELD_TYPE_MESSAGE | FIELD_ID_OVERFLOW);
+        proto.write(FIELD_TYPE_INT32 | FIELD_ID_OVERFLOW_COUNT, (int32_t)mOverflowCount);
+        proto.write(FIELD_TYPE_INT64 | FIELD_ID_OVERFLOW_MAX_HISTORY,
+                    (long long)mMaxQueueHistoryNs);
+        proto.write(FIELD_TYPE_INT64 | FIELD_ID_OVERFLOW_MIN_HISTORY,
+                    (long long)mMinQueueHistoryNs);
+        proto.end(token);
+    }
+
     for (const auto& restart : mSystemServerRestartSec) {
         proto.write(FIELD_TYPE_INT32 | FIELD_ID_SYSTEM_SERVER_RESTART | FIELD_COUNT_REPEATED,
                     restart);
diff --git a/cmds/statsd/src/guardrail/StatsdStats.h b/cmds/statsd/src/guardrail/StatsdStats.h
index 434920e..88ecccc 100644
--- a/cmds/statsd/src/guardrail/StatsdStats.h
+++ b/cmds/statsd/src/guardrail/StatsdStats.h
@@ -160,6 +160,8 @@
     // Max platform atom tag number.
     static const int32_t kMaxPlatformAtomTag = 100000;
 
+    static const int64_t kInt64Max = 0x7fffffffffffffffLL;
+
     /**
      * Report a new config has been received and report the static stats about the config.
      *
@@ -419,6 +421,10 @@
      */
     void noteBucketUnknownCondition(int64_t metricId);
 
+    /* Reports one event has been dropped due to queue overflow, and the oldest event timestamp in
+     * the queue */
+    void noteEventQueueOverflow(int64_t oldestEventTimestampNs);
+
     /**
      * Reset the historical stats. Including all stats in icebox, and the tracked stats about
      * metrics, matchers, and atoms. The active configs will be kept and StatsdStats will continue
@@ -522,6 +528,17 @@
         int32_t mPid;
     };
 
+    // Max of {(now - oldestEventTimestamp) when overflow happens}.
+    // This number is helpful to understand how SLOW statsd can be.
+    int64_t mMaxQueueHistoryNs = 0;
+
+    // Min of {(now - oldestEventTimestamp) when overflow happens}.
+    // This number is helpful to understand how FAST the events floods to statsd.
+    int64_t mMinQueueHistoryNs = kInt64Max;
+
+    // Total number of events that are lost due to queue overflow.
+    int32_t mOverflowCount = 0;
+
     // Timestamps when we detect log loss, and the number of logs lost.
     std::list<LogLossStats> mLogLossStats;
 
diff --git a/cmds/statsd/src/logd/LogEvent.cpp b/cmds/statsd/src/logd/LogEvent.cpp
index d9f5415..d5e66fd 100644
--- a/cmds/statsd/src/logd/LogEvent.cpp
+++ b/cmds/statsd/src/logd/LogEvent.cpp
@@ -118,6 +118,7 @@
 
 LogEvent::LogEvent(int32_t tagId, int64_t wallClockTimestampNs, int64_t elapsedTimestampNs) {
     mLogdTimestampNs = wallClockTimestampNs;
+    mElapsedTimestampNs = elapsedTimestampNs;
     mTagId = tagId;
     mLogUid = 0;
     mContext = create_android_logger(1937006964); // the event tag shared by all stats logs
@@ -246,7 +247,8 @@
     mValues.push_back(FieldValue(Field(mTagId, getSimpleField(4)), Value(trainInfo.status)));
 }
 
-LogEvent::LogEvent(int32_t tagId, int64_t timestampNs) : LogEvent(tagId, timestampNs, 0) {}
+LogEvent::LogEvent(int32_t tagId, int64_t timestampNs) : LogEvent(tagId, timestampNs, timestampNs) {
+}
 
 LogEvent::LogEvent(int32_t tagId, int64_t timestampNs, int32_t uid) {
     mLogdTimestampNs = timestampNs;
diff --git a/cmds/statsd/src/logd/LogEventQueue.cpp b/cmds/statsd/src/logd/LogEventQueue.cpp
new file mode 100644
index 0000000..146464b
--- /dev/null
+++ b/cmds/statsd/src/logd/LogEventQueue.cpp
@@ -0,0 +1,62 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define DEBUG false  // STOPSHIP if true
+#include "Log.h"
+
+#include "LogEventQueue.h"
+
+namespace android {
+namespace os {
+namespace statsd {
+
+using std::unique_lock;
+using std::unique_ptr;
+
+unique_ptr<LogEvent> LogEventQueue::waitPop() {
+    std::unique_lock<std::mutex> lock(mMutex);
+
+    if (mQueue.empty()) {
+        mCondition.wait(lock, [this] { return !this->mQueue.empty(); });
+    }
+
+    unique_ptr<LogEvent> item = std::move(mQueue.front());
+    mQueue.pop();
+
+    return item;
+}
+
+bool LogEventQueue::push(unique_ptr<LogEvent> item, int64_t* oldestTimestampNs) {
+    bool success;
+    {
+        std::unique_lock<std::mutex> lock(mMutex);
+        if (mQueue.size() < mQueueLimit) {
+            mQueue.push(std::move(item));
+            success = true;
+        } else {
+            // safe operation as queue must not be empty.
+            *oldestTimestampNs = mQueue.front()->GetElapsedTimestampNs();
+            success = false;
+        }
+    }
+
+    mCondition.notify_one();
+    return success;
+}
+
+}  // namespace statsd
+}  // namespace os
+}  // namespace android
diff --git a/cmds/statsd/src/logd/LogEventQueue.h b/cmds/statsd/src/logd/LogEventQueue.h
new file mode 100644
index 0000000..b4fd63f
--- /dev/null
+++ b/cmds/statsd/src/logd/LogEventQueue.h
@@ -0,0 +1,59 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "LogEvent.h"
+
+#include <condition_variable>
+#include <memory>
+#include <mutex>
+#include <queue>
+#include <thread>
+
+namespace android {
+namespace os {
+namespace statsd {
+
+/**
+ * A zero copy thread safe queue buffer for producing and consuming LogEvent.
+ */
+class LogEventQueue {
+public:
+    explicit LogEventQueue(size_t maxSize) : mQueueLimit(maxSize){};
+
+    /**
+     * Blocking read one event from the queue.
+     */
+    std::unique_ptr<LogEvent> waitPop();
+
+    /**
+     * Puts a LogEvent ptr to the end of the queue.
+     * Returns false on failure when the queue is full, and output the oldest event timestamp
+     * in the queue.
+     */
+    bool push(std::unique_ptr<LogEvent> event, int64_t* oldestTimestampNs);
+
+private:
+    const size_t mQueueLimit;
+    std::condition_variable mCondition;
+    std::mutex mMutex;
+    std::queue<std::unique_ptr<LogEvent>> mQueue;
+};
+
+}  // namespace statsd
+}  // namespace os
+}  // namespace android
diff --git a/cmds/statsd/src/logd/LogListener.cpp b/cmds/statsd/src/logd/LogListener.cpp
deleted file mode 100644
index ddb26f9..0000000
--- a/cmds/statsd/src/logd/LogListener.cpp
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright (C) 2017 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "logd/LogListener.h"
-
-namespace android {
-namespace os {
-namespace statsd {
-
-LogListener::LogListener() {
-}
-
-LogListener::~LogListener() {
-}
-
-}  // namespace statsd
-}  // namespace os
-}  // namespace android
diff --git a/cmds/statsd/src/logd/LogListener.h b/cmds/statsd/src/logd/LogListener.h
deleted file mode 100644
index d8b06e9..0000000
--- a/cmds/statsd/src/logd/LogListener.h
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright (C) 2017 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include "logd/LogEvent.h"
-
-#include <utils/RefBase.h>
-
-namespace android {
-namespace os {
-namespace statsd {
-
-/**
- * Callback for LogReader
- */
-class LogListener : public virtual android::RefBase {
-public:
-    LogListener();
-    virtual ~LogListener();
-
-    virtual void OnLogEvent(LogEvent* msg) = 0;
-};
-
-}  // namespace statsd
-}  // namespace os
-}  // namespace android
diff --git a/cmds/statsd/src/main.cpp b/cmds/statsd/src/main.cpp
index eddc86e..68082c2 100644
--- a/cmds/statsd/src/main.cpp
+++ b/cmds/statsd/src/main.cpp
@@ -80,8 +80,11 @@
 
     ::android::hardware::configureRpcThreadpool(1 /*threads*/, false /*willJoin*/);
 
+    std::shared_ptr<LogEventQueue> eventQueue =
+            std::make_shared<LogEventQueue>(2000 /*buffer limit. Buffer is NOT pre-allocated*/);
+
     // Create the service
-    gStatsService = new StatsService(looper);
+    gStatsService = new StatsService(looper, eventQueue);
     if (defaultServiceManager()->addService(String16("stats"), gStatsService, false,
                 IServiceManager::DUMP_FLAG_PRIORITY_NORMAL | IServiceManager::DUMP_FLAG_PROTO)
             != 0) {
@@ -101,13 +104,13 @@
 
     gStatsService->Startup();
 
-    sp<StatsSocketListener> socketListener = new StatsSocketListener(gStatsService);
+    sp<StatsSocketListener> socketListener = new StatsSocketListener(eventQueue);
 
-        ALOGI("using statsd socket");
-        // Backlog and /proc/sys/net/unix/max_dgram_qlen set to large value
-        if (socketListener->startListener(600)) {
-            exit(1);
-        }
+    ALOGI("Statsd starts to listen to socket.");
+    // Backlog and /proc/sys/net/unix/max_dgram_qlen set to large value
+    if (socketListener->startListener(600)) {
+        exit(1);
+    }
 
     // Loop forever -- the reports run on this thread in a handler, and the
     // binder calls remain responsive in their pool of one thread.
diff --git a/cmds/statsd/src/socket/StatsSocketListener.cpp b/cmds/statsd/src/socket/StatsSocketListener.cpp
index aed926d..92200f9 100755
--- a/cmds/statsd/src/socket/StatsSocketListener.cpp
+++ b/cmds/statsd/src/socket/StatsSocketListener.cpp
@@ -41,8 +41,8 @@
 
 static const int kLogMsgHeaderSize = 28;
 
-StatsSocketListener::StatsSocketListener(const sp<LogListener>& listener)
-    : SocketListener(getLogSocket(), false /*start listen*/), mListener(listener) {
+StatsSocketListener::StatsSocketListener(std::shared_ptr<LogEventQueue> queue)
+    : SocketListener(getLogSocket(), false /*start listen*/), mQueue(queue) {
 }
 
 StatsSocketListener::~StatsSocketListener() {
@@ -134,10 +134,11 @@
     msg.entry.uid = cred->uid;
 
     memcpy(msg.buf + kLogMsgHeaderSize, ptr, n + 1);
-    LogEvent event(msg);
 
-    // Call the listener
-    mListener->OnLogEvent(&event);
+    int64_t oldestTimestamp;
+    if (!mQueue->push(std::make_unique<LogEvent>(msg), &oldestTimestamp)) {
+        StatsdStats::getInstance().noteEventQueueOverflow(oldestTimestamp);
+    }
 
     return true;
 }
diff --git a/cmds/statsd/src/socket/StatsSocketListener.h b/cmds/statsd/src/socket/StatsSocketListener.h
index b8185d2..2167a56 100644
--- a/cmds/statsd/src/socket/StatsSocketListener.h
+++ b/cmds/statsd/src/socket/StatsSocketListener.h
@@ -17,7 +17,7 @@
 
 #include <sysutils/SocketListener.h>
 #include <utils/RefBase.h>
-#include "logd/LogListener.h"
+#include "logd/LogEventQueue.h"
 
 // DEFAULT_OVERFLOWUID is defined in linux/highuid.h, which is not part of
 // the uapi headers for userspace to use.  This value is filled in on the
@@ -35,7 +35,7 @@
 
 class StatsSocketListener : public SocketListener, public virtual android::RefBase {
 public:
-    explicit StatsSocketListener(const sp<LogListener>& listener);
+    explicit StatsSocketListener(std::shared_ptr<LogEventQueue> queue);
 
     virtual ~StatsSocketListener();
 
@@ -47,7 +47,7 @@
     /**
      * Who is going to get the events when they're read.
      */
-    sp<LogListener> mListener;
+    std::shared_ptr<LogEventQueue> mQueue;
 };
 }  // namespace statsd
 }  // namespace os
diff --git a/cmds/statsd/src/stats_log.proto b/cmds/statsd/src/stats_log.proto
index 967c3525..1dfc433 100644
--- a/cmds/statsd/src/stats_log.proto
+++ b/cmds/statsd/src/stats_log.proto
@@ -460,6 +460,14 @@
         optional int32 pid = 6;
     }
     repeated LogLossStats detected_log_loss = 16;
+
+    message EventQueueOverflow {
+        optional int32 count = 1;
+        optional int64 max_queue_history_ns = 2;
+        optional int64 min_queue_history_ns = 3;
+    }
+
+    optional EventQueueOverflow queue_overflow = 18;
 }
 
 message AlertTriggerDetails {
diff --git a/cmds/statsd/tests/StatsService_test.cpp b/cmds/statsd/tests/StatsService_test.cpp
index 560fb9f..7c00531 100644
--- a/cmds/statsd/tests/StatsService_test.cpp
+++ b/cmds/statsd/tests/StatsService_test.cpp
@@ -33,7 +33,7 @@
 #ifdef __ANDROID__
 
 TEST(StatsServiceTest, TestAddConfig_simple) {
-    StatsService service(nullptr);
+    StatsService service(nullptr, nullptr);
     StatsdConfig config;
     config.set_id(12345);
     string serialized = config.SerializeAsString();
@@ -43,7 +43,7 @@
 }
 
 TEST(StatsServiceTest, TestAddConfig_empty) {
-    StatsService service(nullptr);
+    StatsService service(nullptr, nullptr);
     string serialized = "";
 
     EXPECT_TRUE(
@@ -51,7 +51,7 @@
 }
 
 TEST(StatsServiceTest, TestAddConfig_invalid) {
-    StatsService service(nullptr);
+    StatsService service(nullptr, nullptr);
     string serialized = "Invalid config!";
 
     EXPECT_FALSE(
@@ -69,7 +69,7 @@
 
     int32_t uid;
 
-    StatsService service(nullptr);
+    StatsService service(nullptr, nullptr);
     service.mEngBuild = true;
 
     // "-1"
diff --git a/cmds/statsd/tests/e2e/PartialBucket_e2e_test.cpp b/cmds/statsd/tests/e2e/PartialBucket_e2e_test.cpp
index 3dff7f5..309d251 100644
--- a/cmds/statsd/tests/e2e/PartialBucket_e2e_test.cpp
+++ b/cmds/statsd/tests/e2e/PartialBucket_e2e_test.cpp
@@ -111,7 +111,7 @@
 }
 
 TEST(PartialBucketE2eTest, TestCountMetricWithoutSplit) {
-    StatsService service(nullptr);
+    StatsService service(nullptr, nullptr);
     SendConfig(service, MakeConfig());
     int64_t start = getElapsedRealtimeNs();  // This is the start-time the metrics producers are
                                              // initialized with.
@@ -126,7 +126,7 @@
 }
 
 TEST(PartialBucketE2eTest, TestCountMetricNoSplitOnNewApp) {
-    StatsService service(nullptr);
+    StatsService service(nullptr, nullptr);
     SendConfig(service, MakeConfig());
     int64_t start = getElapsedRealtimeNs();  // This is the start-time the metrics producers are
                                              // initialized with.
@@ -146,7 +146,7 @@
 }
 
 TEST(PartialBucketE2eTest, TestCountMetricSplitOnUpgrade) {
-    StatsService service(nullptr);
+    StatsService service(nullptr, nullptr);
     SendConfig(service, MakeConfig());
     int64_t start = getElapsedRealtimeNs();  // This is the start-time the metrics producers are
                                              // initialized with.
@@ -171,7 +171,7 @@
 }
 
 TEST(PartialBucketE2eTest, TestCountMetricSplitOnRemoval) {
-    StatsService service(nullptr);
+    StatsService service(nullptr, nullptr);
     SendConfig(service, MakeConfig());
     int64_t start = getElapsedRealtimeNs();  // This is the start-time the metrics producers are
                                              // initialized with.
@@ -195,7 +195,7 @@
 }
 
 TEST(PartialBucketE2eTest, TestValueMetricWithoutMinPartialBucket) {
-    StatsService service(nullptr);
+    StatsService service(nullptr, nullptr);
     // Partial buckets don't occur when app is first installed.
     service.mUidMap->updateApp(1, String16(kApp1.c_str()), 1, 1, String16("v1"), String16(""));
     SendConfig(service, MakeValueMetricConfig(0));
@@ -213,7 +213,7 @@
 }
 
 TEST(PartialBucketE2eTest, TestValueMetricWithMinPartialBucket) {
-    StatsService service(nullptr);
+    StatsService service(nullptr, nullptr);
     // Partial buckets don't occur when app is first installed.
     service.mUidMap->updateApp(1, String16(kApp1.c_str()), 1, 1, String16("v1"), String16(""));
     SendConfig(service, MakeValueMetricConfig(60 * NS_PER_SEC /* One minute */));
@@ -237,7 +237,7 @@
 }
 
 TEST(PartialBucketE2eTest, TestGaugeMetricWithoutMinPartialBucket) {
-    StatsService service(nullptr);
+    StatsService service(nullptr, nullptr);
     // Partial buckets don't occur when app is first installed.
     service.mUidMap->updateApp(1, String16(kApp1.c_str()), 1, 1, String16("v1"), String16(""));
     SendConfig(service, MakeGaugeMetricConfig(0));
@@ -255,7 +255,7 @@
 }
 
 TEST(PartialBucketE2eTest, TestGaugeMetricWithMinPartialBucket) {
-    StatsService service(nullptr);
+    StatsService service(nullptr, nullptr);
     // Partial buckets don't occur when app is first installed.
     service.mUidMap->updateApp(1, String16(kApp1.c_str()), 1, 1, String16("v1"), String16(""));
     SendConfig(service, MakeGaugeMetricConfig(60 * NS_PER_SEC /* One minute */));
diff --git a/cmds/statsd/tests/log_event/LogEventQueue_test.cpp b/cmds/statsd/tests/log_event/LogEventQueue_test.cpp
new file mode 100644
index 0000000..f27d129
--- /dev/null
+++ b/cmds/statsd/tests/log_event/LogEventQueue_test.cpp
@@ -0,0 +1,100 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "logd/LogEventQueue.h"
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+#include <thread>
+
+#include <stdio.h>
+
+namespace android {
+namespace os {
+namespace statsd {
+
+using namespace android;
+using namespace testing;
+
+using std::unique_ptr;
+
+#ifdef __ANDROID__
+TEST(LogEventQueue_test, TestGoodConsumer) {
+    LogEventQueue queue(50);
+    int64_t timeBaseNs = 100;
+    std::thread writer([&queue, timeBaseNs] {
+        for (int i = 0; i < 100; i++) {
+            int64_t oldestEventNs;
+            bool success = queue.push(std::make_unique<LogEvent>(10, timeBaseNs + i * 1000),
+                                      &oldestEventNs);
+            EXPECT_TRUE(success);
+            std::this_thread::sleep_for(std::chrono::milliseconds(1));
+        }
+    });
+
+    std::thread reader([&queue, timeBaseNs] {
+        for (int i = 0; i < 100; i++) {
+            auto event = queue.waitPop();
+            EXPECT_TRUE(event != nullptr);
+            // All events are in right order.
+            EXPECT_EQ(timeBaseNs + i * 1000, event->GetElapsedTimestampNs());
+        }
+    });
+
+    reader.join();
+    writer.join();
+}
+
+TEST(LogEventQueue_test, TestSlowConsumer) {
+    LogEventQueue queue(50);
+    int64_t timeBaseNs = 100;
+    std::thread writer([&queue, timeBaseNs] {
+        int failure_count = 0;
+        int64_t oldestEventNs;
+        for (int i = 0; i < 100; i++) {
+            bool success = queue.push(std::make_unique<LogEvent>(10, timeBaseNs + i * 1000),
+                                      &oldestEventNs);
+            if (!success) failure_count++;
+            std::this_thread::sleep_for(std::chrono::milliseconds(1));
+        }
+
+        // There is some remote chance that reader thread not get chance to run before writer thread
+        // ends. That's why the following comparison is not "==".
+        // There will be at least 45 events lost due to overflow.
+        EXPECT_TRUE(failure_count >= 45);
+        // The oldest event must be at least the 6th event.
+        EXPECT_TRUE(oldestEventNs <= (100 + 5 * 1000));
+    });
+
+    std::thread reader([&queue, timeBaseNs] {
+        // The consumer quickly processed 5 events, then it got stuck (not reading anymore).
+        for (int i = 0; i < 5; i++) {
+            auto event = queue.waitPop();
+            EXPECT_TRUE(event != nullptr);
+            // All events are in right order.
+            EXPECT_EQ(timeBaseNs + i * 1000, event->GetElapsedTimestampNs());
+        }
+    });
+
+    reader.join();
+    writer.join();
+}
+
+#else
+GTEST_LOG_(INFO) << "This test does nothing.\n";
+#endif
+
+}  // namespace statsd
+}  // namespace os
+}  // namespace android