Add KafkaReader for processing streaming data with Apache Kafka (#14098)
* Add KafkaReader for processing streaming data with Apache Kafka
Apache Kafka is a widely used distributed streaming platform in
open source community. The goal of this fix is to create a contrib
Reader ops (inherits ReaderBase and is similiar to
TextLineReader/TFRecordReader) so that it is possible to reader
Kafka streaming data from TensorFlow in a similiar fashion.
This fix uses a C/C++ Apache Kafka client library librdkafka which
is released under the 2-clause BSD license, and is widely used in
a number of Kafka bindings such as Go, Python, C#/.Net, etc.
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Add KafkaReader Python wrapper.
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Add BUILD file and op registration for KafkaReader.
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Add C++ Kernel for KafkaReader
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Add librdkafka to third_party packages in Bazel
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Add contrib/kafka to part of the contrib bazel file.
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Update workspace.bzl
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Comment out clean_deps of `tensorflow/core:framework` and `tensorflow/core:lib`
so that it is possible to build with ReaderBase.
See 1419 for details.
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Add group id flag.
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Sync offset
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Add test cases and scipt to start and stop Kafka server (with docker)
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Convert to KafkaConsumer from the legacy Consumer with librdkafka
so that thread join does not hang.
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Only output offset as the key.
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Add timeout attr so that Kafka Consumer could use
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Build Kafka kernels by default, so that to get around the linkage issue.
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Convert KafkaReader to KafkaDataset.
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Fix workspace.bzl for kafka with tf_http_archive
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Add public visibility
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Address review feedbacks
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Optionally select Kafka support through ./configure
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
diff --git a/configure.py b/configure.py
index 083fed1..16763b8 100644
--- a/configure.py
+++ b/configure.py
@@ -1354,6 +1354,7 @@
environ_cp['TF_NEED_GCP'] = '0'
environ_cp['TF_NEED_HDFS'] = '0'
environ_cp['TF_NEED_JEMALLOC'] = '0'
+ environ_cp['TF_NEED_KAFKA'] = '0'
environ_cp['TF_NEED_OPENCL_SYCL'] = '0'
environ_cp['TF_NEED_COMPUTECPP'] = '0'
environ_cp['TF_NEED_OPENCL'] = '0'
@@ -1372,6 +1373,8 @@
'with_hdfs_support', True, 'hdfs')
set_build_var(environ_cp, 'TF_NEED_S3', 'Amazon S3 File System',
'with_s3_support', True, 's3')
+ set_build_var(environ_cp, 'TF_NEED_KAFKA', 'Apache Kafka Platform',
+ 'with_kafka_support', False, 'kafka')
set_build_var(environ_cp, 'TF_ENABLE_XLA', 'XLA JIT', 'with_xla_support',
False, 'xla')
set_build_var(environ_cp, 'TF_NEED_GDR', 'GDR', 'with_gdr_support',