factor Engine out of ok core

This makes Engines (task execution strategies: serial, thread, fork)
pluggable just like most of the rest of ok.  It removes the thread and
process limits, as I find myself rarely caring about what they are
exactly.  Instead of limiting to num-cores, we just allow any number of
concurrent threads, and any number of concurrent child processes subject
to OS limitations.

Change-Id: Icef49d86818fe9a4b7380efb60e73e40bc2e6b73
Reviewed-on: https://skia-review.googlesource.com/27140
Reviewed-by: Mike Klein <mtklein@chromium.org>
Commit-Queue: Mike Klein <mtklein@chromium.org>
diff --git a/BUILD.gn b/BUILD.gn
index 6f39b5d..5b7b787 100644
--- a/BUILD.gn
+++ b/BUILD.gn
@@ -891,14 +891,14 @@
     if (defined(invoker.is_shared_library) && invoker.is_shared_library) {
       shared_library("lib" + target_name) {
         forward_variables_from(invoker, "*", [ "is_shared_library" ])
-        configs += [ ":skia_private", ]
+        configs += [ ":skia_private" ]
         testonly = true
       }
     } else {
       _executable = target_name
       executable(_executable) {
         forward_variables_from(invoker, "*", [ "is_shared_library" ])
-        configs += [ ":skia_private", ]
+        configs += [ ":skia_private" ]
         testonly = true
       }
     }
@@ -1268,6 +1268,7 @@
     sources = [
       "tools/ok.cpp",
       "tools/ok_dsts.cpp",
+      "tools/ok_engines.cpp",
       "tools/ok_srcs.cpp",
       "tools/ok_test.cpp",
       "tools/ok_vias.cpp",
diff --git a/tools/ok.cpp b/tools/ok.cpp
index edda9be..edb60f1 100644
--- a/tools/ok.cpp
+++ b/tools/ok.cpp
@@ -13,11 +13,9 @@
 #include "SkImage.h"
 #include "ok.h"
 #include <chrono>
-#include <future>
 #include <list>
 #include <stdio.h>
 #include <stdlib.h>
-#include <thread>
 #include <vector>
 
 #if !defined(__has_include)
@@ -97,80 +95,11 @@
     }
 #endif
 
-struct Engine {
-    virtual ~Engine() {}
-    virtual bool spawn(std::function<Status(void)>) = 0;
-    virtual Status wait_one() = 0;
+struct EngineType {
+    const char *name, *help;
+    std::unique_ptr<Engine> (*factory)(Options);
 };
-
-struct SerialEngine : Engine {
-    Status last = Status::None;
-
-    bool spawn(std::function<Status(void)> fn) override {
-        last = fn();
-        return true;
-    }
-
-    Status wait_one() override {
-        Status s = last;
-        last = Status::None;
-        return s;
-    }
-};
-
-struct ThreadEngine : Engine {
-    std::list<std::future<Status>> live;
-    const std::chrono::steady_clock::time_point the_past = std::chrono::steady_clock::now();
-
-    bool spawn(std::function<Status(void)> fn) override {
-        live.push_back(std::async(std::launch::async, fn));
-        return true;
-    }
-
-    Status wait_one() override {
-        if (live.empty()) {
-            return Status::None;
-        }
-
-        for (;;) {
-            for (auto it = live.begin(); it != live.end(); it++) {
-                if (it->wait_until(the_past) == std::future_status::ready) {
-                    Status s = it->get();
-                    live.erase(it);
-                    return s;
-                }
-            }
-        }
-    }
-};
-
-#if defined(_MSC_VER)
-    using ForkEngine = ThreadEngine;
-#else
-    #include <sys/wait.h>
-    #include <unistd.h>
-
-    struct ForkEngine : Engine {
-        bool spawn(std::function<Status(void)> fn) override {
-            switch (fork()) {
-                case  0: _exit((int)fn());
-                case -1: return false;
-                default: return true;
-            }
-        }
-
-        Status wait_one() override {
-            do {
-                int status;
-                if (wait(&status) > 0) {
-                    return WIFEXITED(status) ? (Status)WEXITSTATUS(status)
-                                             : Status::Crashed;
-                }
-            } while (errno == EINTR);
-            return Status::None;
-        }
-    };
-#endif
+static std::vector<EngineType> engine_types;
 
 struct StreamType {
     const char *name, *help;
@@ -206,7 +135,7 @@
     SkGraphics::Init();
     setup_crash_handler();
 
-    int                                       jobs{1};
+    std::unique_ptr<Engine>                   engine;
     std::unique_ptr<Stream>                   stream;
     std::function<std::unique_ptr<Dst>(void)> dst_factory = []{
         // A default Dst that's enough for unit tests and not much else.
@@ -218,28 +147,36 @@
     };
 
     auto help = [&] {
-        std::string stream_help = help_for(stream_types),
+        std::string engine_help = help_for(engine_types),
+                    stream_help = help_for(stream_types),
                        dst_help = help_for(   dst_types),
                        via_help = help_for(   via_types);
 
-        printf("%s [-j N] src[:k=v,...] dst[:k=v,...] [via[:k=v,...] ...]            \n"
-                "  -j: Run at most N processes at any time.                          \n"
-                "      If <0, use -N threads instead.                                \n"
-                "      If 0, use one thread in one process.                          \n"
-                "      If 1 (default) or -1, auto-detect N.                          \n"
+        printf("%s [engine] src[:k=v,...] dst[:k=v,...] [via[:k=v,...] ...]          \n"
+                " engine: how to execute tasks%s                                     \n"
                 " src: content to draw%s                                             \n"
                 " dst: how to draw that content%s                                    \n"
                 " via: wrappers around dst%s                                         \n"
                 " Most srcs, dsts and vias have options, e.g. skp:dir=skps sw:ct=565 \n",
-                argv[0], stream_help.c_str(), dst_help.c_str(), via_help.c_str());
+                argv[0],
+                engine_help.c_str(), stream_help.c_str(), dst_help.c_str(), via_help.c_str());
         return 1;
     };
 
     for (int i = 1; i < argc; i++) {
-        if (0 == strcmp("-j",     argv[i])) { jobs = atoi(argv[++i]); }
         if (0 == strcmp("-h",     argv[i])) { return help(); }
         if (0 == strcmp("--help", argv[i])) { return help(); }
 
+        for (auto e : engine_types) {
+            size_t len = strlen(e.name);
+            if (0 == strncmp(e.name, argv[i], len)) {
+                switch (argv[i][len]) {
+                    case  ':': len++;
+                    case '\0': engine = e.factory(Options{argv[i]+len});
+                }
+            }
+        }
+
         for (auto s : stream_types) {
             size_t len = strlen(s.name);
             if (0 == strncmp(s.name, argv[i], len)) {
@@ -275,12 +212,12 @@
     }
     if (!stream) { return help(); }
 
-    std::unique_ptr<Engine> engine;
-    if (jobs == 0) { engine.reset(new SerialEngine);                  }
-    if (jobs  > 0) { engine.reset(new   ForkEngine); defer_logging(); }
-    if (jobs  < 0) { engine.reset(new ThreadEngine); jobs = -jobs;    }
+    if (!engine) { engine = engine_types.back().factory(Options{}); }
 
-    if (jobs == 1) { jobs = std::thread::hardware_concurrency(); }
+    // If we know engine->spawn() will never crash, we can defer logging until we exit.
+    if (engine->crashproof()) {
+        defer_logging();
+    }
 
     int ok = 0, failed = 0, crashed = 0, skipped = 0;
 
@@ -306,13 +243,35 @@
         fflush(stdout);
     };
 
+    std::list<std::future<Status>> live;
+    const auto the_past = std::chrono::steady_clock::now();
+
+    auto wait_one = [&] {
+        if (live.empty()) {
+            return Status::None;
+        }
+
+        for (;;) {
+            for (auto it = live.begin(); it != live.end(); it++) {
+                if (it->wait_until(the_past) != std::future_status::timeout) {
+                    Status s = it->get();
+                    live.erase(it);
+                    return s;
+                }
+            }
+        }
+    };
+
     auto spawn = [&](std::function<Status(void)> fn) {
-        if (--jobs < 0) {
-            update_stats(engine->wait_one());
+        std::future<Status> status;
+        for (;;) {
+            status = engine->spawn(fn);
+            if (status.valid()) {
+                break;
+            }
+            update_stats(wait_one());
         }
-        while (!engine->spawn(fn)) {
-            update_stats(engine->wait_one());
-        }
+        live.push_back(std::move(status));
     };
 
     for (std::unique_ptr<Src> owned = stream->next(); owned; owned = stream->next()) {
@@ -328,7 +287,7 @@
     }
 
     for (Status s = Status::OK; s != Status::None; ) {
-        s = engine->wait_one();
+        s = wait_one();
         update_stats(s);
     }
     printf("\n");
@@ -337,6 +296,10 @@
 
 
 Register::Register(const char* name, const char* help,
+                   std::unique_ptr<Engine> (*factory)(Options)) {
+    engine_types.push_back(EngineType{name, help, factory});
+}
+Register::Register(const char* name, const char* help,
                    std::unique_ptr<Stream> (*factory)(Options)) {
     stream_types.push_back(StreamType{name, help, factory});
 }
diff --git a/tools/ok.h b/tools/ok.h
index f55842b..502df23 100644
--- a/tools/ok.h
+++ b/tools/ok.h
@@ -10,6 +10,7 @@
 
 #include "SkCanvas.h"
 #include <functional>
+#include <future>
 #include <map>
 #include <memory>
 #include <string>
@@ -24,6 +25,12 @@
 
 enum class Status { OK, Failed, Crashed, Skipped, None };
 
+struct Engine {
+    virtual ~Engine() {}
+    virtual bool                crashproof()                       = 0;
+    virtual std::future<Status> spawn(std::function<Status(void)>) = 0;
+};
+
 struct Src {
     virtual ~Src() {}
     virtual std::string name()     = 0;
@@ -52,6 +59,7 @@
 
 // Create globals to register your new type of Stream or Dst.
 struct Register {
+    Register(const char* name, const char* help, std::unique_ptr<Engine> (*factory)(Options));
     Register(const char* name, const char* help, std::unique_ptr<Stream> (*factory)(Options));
     Register(const char* name, const char* help, std::unique_ptr<Dst>    (*factory)(Options));
     Register(const char* name, const char* help,
diff --git a/tools/ok_engines.cpp b/tools/ok_engines.cpp
new file mode 100644
index 0000000..e2218bf
--- /dev/null
+++ b/tools/ok_engines.cpp
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2017 Google Inc.
+ *
+ * Use of this source code is governed by a BSD-style license that can be
+ * found in the LICENSE file.
+ */
+
+#include "ok.h"
+
+struct SerialEngine : Engine {
+    static std::unique_ptr<Engine> Factory(Options) {
+        SerialEngine engine;
+        return move_unique(engine);
+    }
+
+    bool crashproof() override { return false; }
+
+    std::future<Status> spawn(std::function<Status(void)> fn) override {
+        return std::async(std::launch::deferred, fn);
+    }
+};
+static Register serial("serial",
+                       "Run tasks serially on the main thread of a single process.",
+                       SerialEngine::Factory);
+
+struct ThreadEngine : Engine {
+    static std::unique_ptr<Engine> Factory(Options) {
+        ThreadEngine engine;
+        return move_unique(engine);
+    }
+
+    bool crashproof() override { return false; }
+
+    std::future<Status> spawn(std::function<Status(void)> fn) override {
+        return std::async(std::launch::async, fn);
+    }
+};
+static Register thread("thread",
+                       "Run each task on its own thread of a single process.",
+                       ThreadEngine::Factory);
+
+#if !defined(_MSC_VER)
+    #include <sys/wait.h>
+    #include <unistd.h>
+
+    struct ForkEngine : Engine {
+        static std::unique_ptr<Engine> Factory(Options) {
+            ForkEngine engine;
+            return move_unique(engine);
+        }
+
+        bool crashproof() override { return true; }
+
+        std::future<Status> spawn(std::function<Status(void)> fn) override {
+            switch (fork()) {
+                case  0:
+                    // We are the spawned child process.
+                    // Run fn() and exit() with its Status as our return code.
+                    _exit((int)fn());
+
+                case -1:
+                    // The OS won't let us fork() another process right now.
+                    // We'll need to wait for at least one live task to finish and try again.
+                    return std::future<Status>();
+
+                default:
+                    // We succesfully spawned a child process!
+                    // This will wait for any spawned process to finish and return its Status.
+                    return std::async(std::launch::deferred, [] {
+                        do {
+                            int status;
+                            if (wait(&status) > 0) {
+                                return WIFEXITED(status) ? (Status)WEXITSTATUS(status)
+                                                         : Status::Crashed;
+                            }
+                        } while (errno == EINTR);
+                        return Status::None;
+                    });
+            }
+        }
+    };
+    static Register _fork("fork",
+                          "Run each task in an independent process with fork().",
+                          ForkEngine::Factory);
+#endif