Add support for HDFS IO engine
Signed-off-by: Jens Axboe <axboe@fb.com>
diff --git a/HOWTO b/HOWTO
index 87346ae..d728353 100644
--- a/HOWTO
+++ b/HOWTO
@@ -694,6 +694,8 @@
having to go through FUSE. This ioengine
defines engine specific options.
+ hdfs Read and write through Hadoop (HDFS).
+
external Prefix to specify loading an external
IO engine object file. Append the engine
filename, eg ioengine=external:/tmp/foo.o
diff --git a/Makefile b/Makefile
index 65e95be..8d86269 100644
--- a/Makefile
+++ b/Makefile
@@ -38,6 +38,13 @@
profiles/tiobench.c profiles/act.c io_u_queue.c filelock.c \
lib/tp.c
+ifdef CONFIG_LIBHDFS
+ HDFSFLAGS= -I $(JAVA_HOME)/include -I $(JAVA_HOME)/include/linux -I $(FIO_LIBHDFS_INCLUDE)
+ HDFSLIB= $(JAVA_HOME)/jre/lib/amd64/server/libjvm.so $(FIO_LIBHDFS_LIB)/liblibhdfs.a
+ CFLAGS += $(HDFSFLAGS)
+ SOURCE += engines/libhdfs.c
+endif
+
ifdef CONFIG_64BIT_LLP64
CFLAGS += -DBITS_PER_LONG=32
endif
@@ -268,7 +275,7 @@
$(QUIET_LINK)$(CC) $(LDFLAGS) $(CFLAGS) -o $@ $(T_IEEE_OBJS) $(LIBS)
fio: $(FIO_OBJS)
- $(QUIET_LINK)$(CC) $(LDFLAGS) $(CFLAGS) -o $@ $(FIO_OBJS) $(LIBS)
+ $(QUIET_LINK)$(CC) $(LDFLAGS) $(CFLAGS) -o $@ $(FIO_OBJS) $(LIBS) $(HDFSLIB)
gfio: $(GFIO_OBJS)
$(QUIET_LINK)$(CC) $(LDFLAGS) -o gfio $(GFIO_OBJS) $(LIBS) $(GTK_LDFLAGS)
diff --git a/configure b/configure
index 9ac1845..d8ab371 100755
--- a/configure
+++ b/configure
@@ -134,6 +134,7 @@
show_help="no"
exit_val=0
gfio="no"
+libhdfs="no"
# parse options
for opt do
@@ -160,6 +161,8 @@
;;
--disable-gfapi) disable_gfapi="yes"
;;
+ --enable-libhdfs) libhdfs="yes"
+ ;;
--help)
show_help="yes"
;;
@@ -178,6 +181,7 @@
echo "--esx Configure build options for esx"
echo "--enable-gfio Enable building of gtk gfio"
echo "--disable-numa Disable libnuma even if found"
+ echo "--enable-libhdfs Enable hdfs support"
exit $exit_val
fi
@@ -1226,6 +1230,29 @@
fi
fi
echo "s390_z196_facilities $s390_z196_facilities"
+
+##########################################
+# Check if we have required environment variables configured for libhdfs
+if test "$libhdfs" = "yes" ; then
+ hdfs_conf_error=0
+ if test "$JAVA_HOME" = "" ; then
+ echo "configure: JAVA_HOME should be defined to jdk/jvm path"
+ hdfs_conf_error=1
+ fi
+ if test "$FIO_LIBHDFS_INCLUDE" = "" ; then
+ echo "configure: FIO_LIBHDFS_INCLUDE should be defined to libhdfs inlude path"
+ hdfs_conf_error=1
+ fi
+ if test "$FIO_LIBHDFS_LIB" = "" ; then
+ echo "configure: FIO_LIBHDFS_LIB should be defined to libhdfs library path"
+ hdfs_conf_error=1
+ fi
+ if test "$hdfs_conf_error" = "1" ; then
+ exit 1
+ fi
+fi
+echo "HDFS engine $libhdfs"
+
#############################################################################
if test "$wordsize" = "64" ; then
@@ -1367,6 +1394,9 @@
if test "$gf_fadvise" = "yes" ; then
output_sym "CONFIG_GF_FADVISE"
fi
+if test "$libhdfs" = "yes" ; then
+ output_sym "CONFIG_LIBHDFS"
+fi
if test "$zlib" = "no" ; then
echo "Consider installing zlib-dev (zlib-devel), some fio features depend on it."
diff --git a/engines/libhdfs.c b/engines/libhdfs.c
new file mode 100644
index 0000000..773d46b
--- /dev/null
+++ b/engines/libhdfs.c
@@ -0,0 +1,240 @@
+/*
+ * libhdfs engine
+ *
+ * this engine helps perform read/write operations on hdfs cluster using
+ * libhdfs. hdfs doesnot support modification of data once file is created.
+ *
+ * so to mimic that create many files of small size (e.g 256k), and this
+ * engine select a file based on the offset generated by fio.
+ *
+ * thus, random reads and writes can also be achieved with this logic.
+ *
+ * NOTE: please set environment variables FIO_HDFS_BS and FIO_HDFS_FCOUNT
+ * to appropriate value to work this engine properly
+ *
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/uio.h>
+#include <errno.h>
+#include <assert.h>
+
+#include "../fio.h"
+
+#include "hdfs.h"
+
+struct hdfsio_data {
+ char host[256];
+ int port;
+ hdfsFS fs;
+ hdfsFile fp;
+ unsigned long fsbs;
+ unsigned long fscount;
+ unsigned long curr_file_id;
+ unsigned int numjobs;
+ unsigned int fid_correction;
+};
+
+static int fio_hdfsio_setup_fs_params(struct hdfsio_data *hd)
+{
+ /* make sure that hdfsConnect is invoked before executing this function */
+ hdfsSetWorkingDirectory(hd->fs, "/.perftest");
+ hd->fp = hdfsOpenFile(hd->fs, ".fcount", O_RDONLY, 0, 0, 0);
+ if (hd->fp) {
+ hdfsRead(hd->fs, hd->fp, &(hd->fscount), sizeof(hd->fscount));
+ hdfsCloseFile(hd->fs, hd->fp);
+ }
+ hd->fp = hdfsOpenFile(hd->fs, ".fbs", O_RDONLY, 0, 0, 0);
+ if (hd->fp) {
+ hdfsRead(hd->fs, hd->fp, &(hd->fsbs), sizeof(hd->fsbs));
+ hdfsCloseFile(hd->fs, hd->fp);
+ }
+
+ return 0;
+}
+
+static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u)
+{
+ struct hdfsio_data *hd;
+ hdfsFileInfo *fi;
+ unsigned long f_id;
+ char fname[80];
+ int open_flags = 0;
+
+ hd = td->io_ops->data;
+
+ if (hd->curr_file_id == -1) {
+ /* see comment in fio_hdfsio_setup() function */
+ fio_hdfsio_setup_fs_params(hd);
+ }
+
+ /* find out file id based on the offset generated by fio */
+ f_id = (io_u->offset / hd->fsbs) + hd->fid_correction;
+
+ if (f_id == hd->curr_file_id) {
+ /* file is already open */
+ return 0;
+ }
+
+ if (hd->curr_file_id != -1) {
+ hdfsCloseFile(hd->fs, hd->fp);
+ }
+
+ if (io_u->ddir == DDIR_READ) {
+ open_flags = O_RDONLY;
+ } else if (io_u->ddir == DDIR_WRITE) {
+ open_flags = O_WRONLY;
+ } else {
+ printf("Invalid I/O Operation\n");
+ }
+
+ hd->curr_file_id = f_id;
+ do {
+ sprintf(fname, ".f%lu", f_id);
+ fi = hdfsGetPathInfo(hd->fs, fname);
+ if (fi->mSize >= hd->fsbs || io_u->ddir == DDIR_WRITE) {
+ /* file has enough data to read OR file is opened in write mode */
+ hd->fp =
+ hdfsOpenFile(hd->fs, fname, open_flags, 0, 0,
+ hd->fsbs);
+ if (hd->fp) {
+ break;
+ }
+ }
+ /* file is empty, so try next file for reading */
+ f_id = (f_id + 1) % hd->fscount;
+ } while (1);
+
+ return 0;
+}
+
+static int fio_io_end(struct thread_data *td, struct io_u *io_u, int ret)
+{
+ if (ret != (int)io_u->xfer_buflen) {
+ if (ret >= 0) {
+ io_u->resid = io_u->xfer_buflen - ret;
+ io_u->error = 0;
+ return FIO_Q_COMPLETED;
+ } else
+ io_u->error = errno;
+ }
+
+ if (io_u->error)
+ td_verror(td, io_u->error, "xfer");
+
+ return FIO_Q_COMPLETED;
+}
+
+static int fio_hdfsio_queue(struct thread_data *td, struct io_u *io_u)
+{
+ struct hdfsio_data *hd;
+ int ret = 0;
+
+ hd = td->io_ops->data;
+
+ if (io_u->ddir == DDIR_READ) {
+ ret =
+ hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
+ } else if (io_u->ddir == DDIR_WRITE) {
+ ret =
+ hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf,
+ io_u->xfer_buflen);
+ } else {
+ printf("Invalid I/O Operation\n");
+ }
+
+ return fio_io_end(td, io_u, ret);
+}
+
+int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f)
+{
+ struct hdfsio_data *hd;
+
+ hd = td->io_ops->data;
+ hd->fs = hdfsConnect(hd->host, hd->port);
+ hdfsSetWorkingDirectory(hd->fs, "/.perftest");
+ hd->fid_correction = (getpid() % hd->numjobs);
+
+ return 0;
+}
+
+int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f)
+{
+ struct hdfsio_data *hd;
+
+ hd = td->io_ops->data;
+ hdfsDisconnect(hd->fs);
+
+ return 0;
+}
+
+static int fio_hdfsio_setup(struct thread_data *td)
+{
+ struct hdfsio_data *hd;
+ struct fio_file *f;
+ static unsigned int numjobs = 1; /* atleast one job has to be there! */
+ numjobs = (td->o.numjobs > numjobs) ? td->o.numjobs : numjobs;
+
+ if (!td->io_ops->data) {
+ hd = malloc(sizeof(*hd));;
+
+ memset(hd, 0, sizeof(*hd));
+ td->io_ops->data = hd;
+
+ /* separate host and port from filename */
+ *(strchr(td->o.filename, ',')) = ' ';
+ sscanf(td->o.filename, "%s%d", hd->host, &(hd->port));
+
+ /* read fbs and fcount and based on that set f->real_file_size */
+ f = td->files[0];
+#if 0
+ /* IMHO, this should be done here instead of fio_hdfsio_prep()
+ * but somehow calling it here doesn't seem to work,
+ * some problem with libhdfs that needs to be debugged */
+ hd->fs = hdfsConnect(hd->host, hd->port);
+ fio_hdfsio_setup_fs_params(hd);
+ hdfsDisconnect(hd->fs);
+#else
+ /* so, as an alternate, using environment variables */
+ if (getenv("FIO_HDFS_FCOUNT") && getenv("FIO_HDFS_BS")) {
+ hd->fscount = atol(getenv("FIO_HDFS_FCOUNT"));
+ hd->fsbs = atol(getenv("FIO_HDFS_BS"));
+ } else {
+ fprintf(stderr,
+ "FIO_HDFS_FCOUNT and/or FIO_HDFS_BS not set.\n");
+ return 1;
+ }
+#endif
+ f->real_file_size = hd->fscount * hd->fsbs;
+
+ td->o.nr_files = 1;
+ hd->curr_file_id = -1;
+ hd->numjobs = numjobs;
+ fio_file_set_size_known(f);
+ }
+
+ return 0;
+}
+
+static struct ioengine_ops ioengine_hdfs = {
+ .name = "libhdfs",
+ .version = FIO_IOOPS_VERSION,
+ .setup = fio_hdfsio_setup,
+ .prep = fio_hdfsio_prep,
+ .queue = fio_hdfsio_queue,
+ .open_file = fio_hdfsio_open_file,
+ .close_file = fio_hdfsio_close_file,
+ .flags = FIO_SYNCIO,
+};
+
+static void fio_init fio_hdfsio_register(void)
+{
+ register_ioengine(&ioengine_hdfs);
+}
+
+static void fio_exit fio_hdfsio_unregister(void)
+{
+ unregister_ioengine(&ioengine_hdfs);
+}
diff --git a/fio.1 b/fio.1
index 5291126..b5ff3cc 100644
--- a/fio.1
+++ b/fio.1
@@ -612,6 +612,9 @@
Using Glusterfs libgfapi async interface to direct access to Glusterfs volumes without
having to go through FUSE. This ioengine defines engine specific
options.
+.TP
+.B hdfs
+Read and write through Hadoop (HDFS)
.RE
.P
.RE
diff --git a/options.c b/options.c
index 3a3321f..484efc1 100644
--- a/options.c
+++ b/options.c
@@ -1541,7 +1541,11 @@
.help = "Glusterfs libgfapi(async) based engine"
},
#endif
-
+#ifdef CONFIG_LIBHDFS
+ { .ival = "hdfs",
+ .help = "Hadoop Distributed Filesystem (HDFS) engine"
+ },
+#endif
{ .ival = "external",
.help = "Load external engine (append name)",
},