Initialize the oneway queue for Bs* in constructor.

Also terminates the thread and delete it gently.

Bug: 31819198

Test: hidl_test
  * FooCallMeTest passed (repeated 1000 times) under passthrough mode.
  * ForReportResultsTest not fixed yet, still failing

Change-Id: Idce0cff3d226b46de94e577c9e3cbbe00ecb332b
diff --git a/generateCpp.cpp b/generateCpp.cpp
index 8e63349..eb6adbe 100644
--- a/generateCpp.cpp
+++ b/generateCpp.cpp
@@ -1520,7 +1520,7 @@
     out << ifaceName << ".h>\n\n";
 
     if (supportOneway) {
-        out << "#include <hidl/SynchronizedQueue.h>\n";
+        out << "#include <hidl/TaskRunner.h>\n";
     }
 
     enterLeaveNamespace(out, true /* enter */);
@@ -1553,15 +1553,12 @@
     out << "const sp<" << ifaceName << "> mImpl;\n";
 
     if (supportOneway) {
-        out << "SynchronizedQueue<std::function<void(void)>> mOnewayQueue;\n";
-        out << "std::thread *mOnewayThread = nullptr;\n";
+        out << "::android::hardware::TaskRunner mOnewayQueue;\n";
 
         out << "\n";
 
         out << "::android::hardware::Return<void> addOnewayTask("
                "std::function<void(void)>);\n\n";
-
-        out << "static const int kOnewayQueueMaxSize = 3000;\n";
     }
 
     out.unindent();
@@ -1575,7 +1572,6 @@
     return OK;
 }
 
-
 status_t AST::generatePassthroughSource(Formatter &out) const {
     const Interface *iface = mRootScope->getInterface();
 
@@ -1587,24 +1583,21 @@
         << klassName
         << "(const sp<"
         << iface->fullName()
-        << "> impl) : mImpl(impl) {}\n\n";
+        << "> impl) : mImpl(impl) {";
+    if (iface->hasOnewayMethods()) {
+        out << "\n";
+        out.indentBlock([&] {
+            out << "mOnewayQueue.setLimit(3000 /* similar limit to binderized */);\n";
+        });
+    }
+    out << "}\n\n";
 
     if (iface->hasOnewayMethods()) {
         out << "::android::hardware::Return<void> "
             << klassName
             << "::addOnewayTask(std::function<void(void)> fun) {\n";
         out.indent();
-        out << "if (mOnewayThread == nullptr) {\n";
-        out.indent();
-        out << "mOnewayThread = new std::thread([this]() {\n";
-        out.indent();
-        out << "while(true) { (this->mOnewayQueue.wait_pop())(); }";
-        out.unindent();
-        out << "});\n";
-        out.unindent();
-        out << "}\n\n";
-
-        out << "if (mOnewayQueue.size() > kOnewayQueueMaxSize) {\n";
+        out << "if (!mOnewayQueue.push(fun)) {\n";
         out.indent();
         out << "return ::android::hardware::Status::fromExceptionCode(\n";
         out.indent();
@@ -1613,10 +1606,6 @@
         out.unindent();
         out.unindent();
         out.unindent();
-        out << "} else {\n";
-        out.indent();
-        out << "mOnewayQueue.push(fun);\n";
-        out.unindent();
         out << "}\n";
 
         out << "return ::android::hardware::Status();\n";