Added the skeleton for the flagging framework.
BUG=None
TEST=None
Change-Id: I72c37ac70ed2adca588ad9866a6bcc26775aed8b
Reviewed-on: https://gerrit-int.chromium.org/39096
Reviewed-by: Luis Lozano <llozano@chromium.org>
Tested-by: Yuheng Long <yuhenglong@google.com>
Commit-Queue: Yuheng Long <yuhenglong@google.com>
diff --git a/bestflags/pipeline_process.py b/bestflags/pipeline_process.py
new file mode 100644
index 0000000..6b878b3
--- /dev/null
+++ b/bestflags/pipeline_process.py
@@ -0,0 +1,51 @@
+"""Pipeline process that encapsulates the actual content.
+
+The actual stages include the Steering algorithm, the builder and the executor.
+"""
+
+__author__ = 'yuhenglong@google.com (Yuheng Long)'
+
+import multiprocessing
+
+
+class PipelineProcess(multiprocessing.Process):
+ """A process that encapsulates the actual content.
+
+ It continuously pull tasks from the queue until a poison pill is received.
+ Once a job is received, it will hand it to the actual stage for processing.
+ """
+
+ # Poison pill means shutdown
+ POISON_PILL = None
+
+ def __init__(self, method, task_queue, result_queue):
+ """Set up input/output queue and the actual method to be called.
+
+ Args:
+ method: The actual pipeline stage to be invoked.
+ task_queue: The input task queue for this pipeline stage.
+ result_queue: The output task queue for this pipeline stage.
+ """
+
+ multiprocessing.Process.__init__(self)
+ self._method = method
+ self._task_queue = task_queue
+ self._result_queue = result_queue
+
+ def run(self):
+ """Busy pulling the next task from the queue for execution.
+
+ Once a job is pulled, this stage invokes the actual stage method and submits
+ the result to the next pipeline stage.
+
+ The process will terminate on receiving the poison pill from previous stage.
+ """
+
+ while True:
+ next_task = self.task_queue.get()
+ if next_task is None:
+ # Poison pill means shutdown
+ self.result_queue.put(None)
+ break
+ self._method(next_task)
+ self.result_queue.put(next_task)