Room RxJava2 Support, Step 1

This CL adds support for returning Flowable & Publisher from
Room DAO queries. We may introduce an Optional support later
for nullable queries.

I've also changed live data check to use type utils instead of string
matching.

Bug: 37009497
Test: RxRoomTest, RxJava2Test
Change-Id: I00e10efcf7599f3d33cd7e4b6b638b546fb5c1e9
diff --git a/app-toolkit/core-testing/build.gradle b/app-toolkit/core-testing/build.gradle
index 5aa8994..e347ff6 100644
--- a/app-toolkit/core-testing/build.gradle
+++ b/app-toolkit/core-testing/build.gradle
@@ -46,7 +46,7 @@
     compile(libs.junit) {
         exclude module: 'hamcrest-core'
     }
-    compile libs.mockito.all
+    compile libs.mockito_core
 }
 
 archivesBaseName = "core-testing"
diff --git a/app-toolkit/settings.gradle b/app-toolkit/settings.gradle
index 61f2e38..a2c1b55 100644
--- a/app-toolkit/settings.gradle
+++ b/app-toolkit/settings.gradle
@@ -72,6 +72,9 @@
 include ":room:testing"
 project(':room:testing').projectDir = new File(supportRoot, "room/testing")
 
+include ":room:rxjava2"
+project(':room:rxjava2').projectDir = new File(supportRoot, "room/rxjava2")
+
 include ':room:integration-tests:testapp'
 project(':room:integration-tests:testapp').projectDir = new File(supportRoot, "room/integration-tests/testapp")
 
diff --git a/room/common/src/main/java/com/android/support/room/Query.java b/room/common/src/main/java/com/android/support/room/Query.java
index 7b51cb3..e5999fd 100644
--- a/room/common/src/main/java/com/android/support/room/Query.java
+++ b/room/common/src/main/java/com/android/support/room/Query.java
@@ -64,6 +64,12 @@
  * query may return {@link android.database.Cursor Cursor} or any query result can be wrapped in
  * a {@link com.android.support.lifecycle.LiveData LiveData}.
  * <p>
+ * <b>RxJava2</b> If you are using RxJava2, you can also return {@code Flowable<T>} or
+ * {@code Publisher<T>} from query methods. Since Reactive Streams does not allow {@code null}, if
+ * the query returns a nullable type, it will not dispatch anything if the value is {@code null}
+ * (like fetching an {@link Entity} row that does not exist).
+ * You can return {@code Flowable<T[]>} or {@code Flowable<List<T>>} to workaround this limitation.
+ * <p>
  * UPDATE or DELETE queries can return {@code void} or {@code int}. If it is an {@code int},
  * the value is the number of rows affected by this query.
  * <p>
diff --git a/room/compiler/src/main/kotlin/com/android/support/room/ext/javapoet_ext.kt b/room/compiler/src/main/kotlin/com/android/support/room/ext/javapoet_ext.kt
index b5b05a9..a7f8cfa 100644
--- a/room/compiler/src/main/kotlin/com/android/support/room/ext/javapoet_ext.kt
+++ b/room/compiler/src/main/kotlin/com/android/support/room/ext/javapoet_ext.kt
@@ -93,3 +93,15 @@
     val SET = ClassName.get("java.util", "Set")
     val STRING = ClassName.get("java.lang", "String")
 }
+
+object RxJava2TypeNames {
+    val FLOWABLE = ClassName.get("io.reactivex", "Flowable")
+}
+
+object ReactiveStreamsTypeNames {
+    val PUBLISHER = ClassName.get("org.reactivestreams", "Publisher")
+}
+
+object RoomRxJava2TypeNames {
+    val RX_ROOM = ClassName.get("com.android.support.room", "RxRoom")
+}
diff --git a/room/compiler/src/main/kotlin/com/android/support/room/processor/ProcessorErrors.kt b/room/compiler/src/main/kotlin/com/android/support/room/processor/ProcessorErrors.kt
index 977c5ce..6f8bec6 100644
--- a/room/compiler/src/main/kotlin/com/android/support/room/processor/ProcessorErrors.kt
+++ b/room/compiler/src/main/kotlin/com/android/support/room/processor/ProcessorErrors.kt
@@ -422,4 +422,6 @@
                 to add $entity to the entities section of the @Database?
                 """.trim()
     }
+    val MISSING_ROOM_RXJAVA2_ARTIFACT = "To use RxJava2 features, you must add `rxjava2`" +
+            " artifact from Room as a dependency. com.android.support.room:rxjava2:<version>"
 }
diff --git a/room/compiler/src/main/kotlin/com/android/support/room/solver/TypeAdapterStore.kt b/room/compiler/src/main/kotlin/com/android/support/room/solver/TypeAdapterStore.kt
index 9eb37ba..872a232 100644
--- a/room/compiler/src/main/kotlin/com/android/support/room/solver/TypeAdapterStore.kt
+++ b/room/compiler/src/main/kotlin/com/android/support/room/solver/TypeAdapterStore.kt
@@ -19,6 +19,9 @@
 import com.android.support.room.Entity
 import com.android.support.room.ext.AndroidTypeNames
 import com.android.support.room.ext.LifecyclesTypeNames
+import com.android.support.room.ext.ReactiveStreamsTypeNames
+import com.android.support.room.ext.RoomRxJava2TypeNames
+import com.android.support.room.ext.RxJava2TypeNames
 import com.android.support.room.ext.hasAnnotation
 import com.android.support.room.log.RLog
 import com.android.support.room.parser.ParsedQuery
@@ -27,6 +30,7 @@
 import com.android.support.room.processor.EntityProcessor
 import com.android.support.room.processor.FieldProcessor
 import com.android.support.room.processor.PojoProcessor
+import com.android.support.room.processor.ProcessorErrors
 import com.android.support.room.solver.query.parameter.ArrayQueryParameterAdapter
 import com.android.support.room.solver.query.parameter.BasicQueryParameterAdapter
 import com.android.support.room.solver.query.parameter.CollectionQueryParameterAdapter
@@ -34,6 +38,7 @@
 import com.android.support.room.solver.query.result.ArrayQueryResultAdapter
 import com.android.support.room.solver.query.result.CursorQueryResultBinder
 import com.android.support.room.solver.query.result.EntityRowAdapter
+import com.android.support.room.solver.query.result.FlowableQueryResultBinder
 import com.android.support.room.solver.query.result.InstantQueryResultBinder
 import com.android.support.room.solver.query.result.ListQueryResultAdapter
 import com.android.support.room.solver.query.result.LiveDataQueryResultBinder
@@ -126,11 +131,26 @@
         typeConverters = converters
     }
 
+    val hasRxJava2Artifact by lazy {
+        context.processingEnv.elementUtils
+                .getTypeElement(RoomRxJava2TypeNames.RX_ROOM.toString()) != null
+    }
+
     // type mirrors that be converted into columns w/o an extra converter
     private val knownColumnTypeMirrors by lazy {
         columnTypeAdapters.map { it.out }
     }
 
+    private val liveDataTypeMirror: TypeMirror? by lazy {
+        context.processingEnv.elementUtils
+                .getTypeElement(LifecyclesTypeNames.LIVE_DATA.toString())?.asType()
+    }
+
+    private val flowableTypeMirror: TypeMirror? by lazy {
+        context.processingEnv.elementUtils
+                .getTypeElement(RxJava2TypeNames.FLOWABLE.toString())?.asType()
+    }
+
     /**
      * Searches 1 way to bind a value into a statement.
      */
@@ -235,13 +255,26 @@
         return findTypeConverter(listOf(input), listOf(output))
     }
 
-    private fun isLiveData(declared: DeclaredType): Boolean {
-        val typeElement = MoreElements.asType(declared.asElement())
-        val qName = typeElement.qualifiedName.toString()
-        // even though computable live data is internal, we still check for it as we may inherit
-        // it from some internal class.
-        return qName == LifecyclesTypeNames.COMPUTABLE_LIVE_DATA.toString() ||
-                qName == LifecyclesTypeNames.LIVE_DATA.toString()
+    @VisibleForTesting
+    fun isLiveData(declared: DeclaredType): Boolean {
+        if (liveDataTypeMirror == null) {
+            return false
+        }
+        val erasure = context.processingEnv.typeUtils.erasure(declared)
+        return context.processingEnv.typeUtils.isAssignable(liveDataTypeMirror, erasure)
+    }
+
+    @VisibleForTesting
+    fun isRxJava2Publisher(declared: DeclaredType): Boolean {
+        if (flowableTypeMirror == null) {
+            return false
+        }
+        val erasure = context.processingEnv.typeUtils.erasure(declared)
+        val match = context.processingEnv.typeUtils.isAssignable(flowableTypeMirror, erasure)
+        if (match && !hasRxJava2Artifact) {
+            context.logger.e(ProcessorErrors.MISSING_ROOM_RXJAVA2_ARTIFACT)
+        }
+        return match
     }
 
     fun findQueryResultBinder(typeMirror: TypeMirror, query: ParsedQuery): QueryResultBinder {
@@ -257,6 +290,10 @@
                     val liveDataTypeArg = declared.typeArguments.first()
                     LiveDataQueryResultBinder(liveDataTypeArg, query.tables.map { it.name },
                             findQueryResultAdapter(liveDataTypeArg, query))
+                } else if (isRxJava2Publisher(declared)) {
+                    val typeArg = declared.typeArguments.first()
+                    FlowableQueryResultBinder(typeArg, query.tables.map { it.name },
+                            findQueryResultAdapter(typeArg, query))
                 } else {
                     InstantQueryResultBinder(findQueryResultAdapter(typeMirror, query))
                 }
diff --git a/room/compiler/src/main/kotlin/com/android/support/room/solver/query/result/BaseObservableQueryResultBinder.kt b/room/compiler/src/main/kotlin/com/android/support/room/solver/query/result/BaseObservableQueryResultBinder.kt
new file mode 100644
index 0000000..8a5bc8d
--- /dev/null
+++ b/room/compiler/src/main/kotlin/com/android/support/room/solver/query/result/BaseObservableQueryResultBinder.kt
@@ -0,0 +1,63 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.support.room.solver.query.result
+
+import com.android.support.room.ext.AndroidTypeNames
+import com.android.support.room.ext.L
+import com.android.support.room.ext.N
+import com.android.support.room.ext.T
+import com.android.support.room.solver.CodeGenScope
+import com.android.support.room.writer.DaoWriter
+import com.squareup.javapoet.MethodSpec
+import javax.lang.model.element.Modifier
+
+/**
+ * Base class for query result binders that observe the database. It includes common functionality
+ * like creating a finalizer to release the query or creating the actual adapter call code.
+ */
+abstract class BaseObservableQueryResultBinder(adapter: QueryResultAdapter?)
+    : QueryResultBinder(adapter) {
+
+    protected fun createFinalizeMethod(roomSQLiteQueryVar: String): MethodSpec {
+        return MethodSpec.methodBuilder("finalize").apply {
+            addModifiers(Modifier.PROTECTED)
+            addAnnotation(Override::class.java)
+            addStatement("$L.release()", roomSQLiteQueryVar)
+        }.build()
+    }
+
+    protected fun createRunQueryAndReturnStatements(builder: MethodSpec.Builder,
+                                                    roomSQLiteQueryVar: String,
+                                                    scope: CodeGenScope) {
+        val outVar = scope.getTmpVar("_result")
+        val cursorVar = scope.getTmpVar("_cursor")
+        builder.apply {
+            addStatement("final $T $L = $N.query($L)", AndroidTypeNames.CURSOR, cursorVar,
+                    DaoWriter.dbField, roomSQLiteQueryVar)
+            beginControlFlow("try").apply {
+                val adapterScope = scope.fork()
+                adapter?.convert(outVar, cursorVar, adapterScope)
+                addCode(adapterScope.builder().build())
+                addStatement("return $L", outVar)
+            }
+            nextControlFlow("finally").apply {
+                addStatement("$L.close()", cursorVar)
+            }
+            endControlFlow()
+        }
+    }
+}
diff --git a/room/compiler/src/main/kotlin/com/android/support/room/solver/query/result/FlowableQueryResultBinder.kt b/room/compiler/src/main/kotlin/com/android/support/room/solver/query/result/FlowableQueryResultBinder.kt
new file mode 100644
index 0000000..afcc331
--- /dev/null
+++ b/room/compiler/src/main/kotlin/com/android/support/room/solver/query/result/FlowableQueryResultBinder.kt
@@ -0,0 +1,61 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.support.room.solver.query.result
+
+import com.android.support.room.ext.L
+import com.android.support.room.ext.N
+import com.android.support.room.ext.RoomRxJava2TypeNames
+import com.android.support.room.ext.T
+import com.android.support.room.ext.arrayTypeName
+import com.android.support.room.ext.typeName
+import com.android.support.room.solver.CodeGenScope
+import com.android.support.room.writer.DaoWriter
+import com.squareup.javapoet.FieldSpec
+import com.squareup.javapoet.MethodSpec
+import com.squareup.javapoet.ParameterizedTypeName
+import com.squareup.javapoet.TypeSpec
+import javax.lang.model.element.Modifier
+import javax.lang.model.type.TypeMirror
+
+/**
+ * Binds the result as an RxJava2 Flowable.
+ */
+class FlowableQueryResultBinder(val typeArg: TypeMirror, val queryTableNames: List<String>,
+                                adapter: QueryResultAdapter?)
+    : BaseObservableQueryResultBinder(adapter) {
+    override fun convertAndReturn(roomSQLiteQueryVar: String, dbField: FieldSpec,
+                                  scope: CodeGenScope) {
+        val callableImpl = TypeSpec.anonymousClassBuilder("").apply {
+            val typeName = typeArg.typeName()
+            superclass(ParameterizedTypeName.get(java.util.concurrent.Callable::class.typeName(),
+                    typeName))
+            addMethod(MethodSpec.methodBuilder("call").apply {
+                returns(typeName)
+                addException(Exception::class.typeName())
+                addModifiers(Modifier.PUBLIC)
+                createRunQueryAndReturnStatements(this, roomSQLiteQueryVar, scope)
+            }.build())
+            addMethod(createFinalizeMethod(roomSQLiteQueryVar))
+        }.build()
+        scope.builder().apply {
+            val tableNamesList = queryTableNames.joinToString(",") { "\"$it\"" }
+            addStatement("return $T.createFlowable($N, new $T{$L}, $L)",
+                    RoomRxJava2TypeNames.RX_ROOM, DaoWriter.dbField,
+                    String::class.arrayTypeName(), tableNamesList, callableImpl)
+        }
+    }
+}
diff --git a/room/compiler/src/main/kotlin/com/android/support/room/solver/query/result/LiveDataQueryResultBinder.kt b/room/compiler/src/main/kotlin/com/android/support/room/solver/query/result/LiveDataQueryResultBinder.kt
index bb7612d..2596ca0 100644
--- a/room/compiler/src/main/kotlin/com/android/support/room/solver/query/result/LiveDataQueryResultBinder.kt
+++ b/room/compiler/src/main/kotlin/com/android/support/room/solver/query/result/LiveDataQueryResultBinder.kt
@@ -39,7 +39,7 @@
  */
 class LiveDataQueryResultBinder(val typeArg: TypeMirror, queryTableNames: List<String>,
                                 adapter: QueryResultAdapter?)
-    : QueryResultBinder(adapter) {
+    : BaseObservableQueryResultBinder(adapter) {
     @Suppress("JoinDeclarationAndAssignment")
     val tableNames = ((adapter?.accessedTableNames() ?: emptyList()) + queryTableNames).toSet()
     override fun convertAndReturn(roomSQLiteQueryVar : String, dbField: FieldSpec,
@@ -66,14 +66,6 @@
         }
     }
 
-    private fun createFinalizeMethod(roomSQLiteQueryVar: String): MethodSpec {
-        return MethodSpec.methodBuilder("finalize").apply {
-            addModifiers(Modifier.PROTECTED)
-            addAnnotation(Override::class.java)
-            addStatement("$L.release()", roomSQLiteQueryVar)
-        }.build()
-    }
-
     private fun createComputeMethod(roomSQLiteQueryVar: String, typeName: TypeName,
                                     observerField: FieldSpec, dbField: FieldSpec,
                                     scope: CodeGenScope): MethodSpec {
@@ -81,8 +73,6 @@
             addAnnotation(Override::class.java)
             addModifiers(Modifier.PROTECTED)
             returns(typeName)
-            val outVar = scope.getTmpVar("_result")
-            val cursorVar = scope.getTmpVar("_cursor")
 
             beginControlFlow("if ($N == null)", observerField).apply {
                 addStatement("$N = $L", observerField, createAnonymousObserver())
@@ -91,18 +81,7 @@
             }
             endControlFlow()
 
-            addStatement("final $T $L = $N.query($L)", AndroidTypeNames.CURSOR, cursorVar,
-                    DaoWriter.dbField, roomSQLiteQueryVar)
-            beginControlFlow("try").apply {
-                val adapterScope = scope.fork()
-                adapter?.convert(outVar, cursorVar, adapterScope)
-                addCode(adapterScope.builder().build())
-                addStatement("return $L", outVar)
-            }
-            nextControlFlow("finally").apply {
-                addStatement("$L.close()", cursorVar)
-            }
-            endControlFlow()
+            createRunQueryAndReturnStatements(this, roomSQLiteQueryVar, scope)
         }.build()
     }
 
diff --git a/room/compiler/src/test/data/common/input/Rx2Room.java b/room/compiler/src/test/data/common/input/Rx2Room.java
new file mode 100644
index 0000000..b26ef50
--- /dev/null
+++ b/room/compiler/src/test/data/common/input/Rx2Room.java
@@ -0,0 +1,5 @@
+// mock rx2 helper
+package com.android.support.room;
+
+class RxRoom {
+}
\ No newline at end of file
diff --git a/room/compiler/src/test/data/common/input/reactivestreams/Publisher.java b/room/compiler/src/test/data/common/input/reactivestreams/Publisher.java
new file mode 100644
index 0000000..4ecc9f5
--- /dev/null
+++ b/room/compiler/src/test/data/common/input/reactivestreams/Publisher.java
@@ -0,0 +1,4 @@
+// fake reactivestreams publisher
+package org.reactivestreams;
+public interface Publisher<T> {
+}
\ No newline at end of file
diff --git a/room/compiler/src/test/data/common/input/rxjava2/Flowable.java b/room/compiler/src/test/data/common/input/rxjava2/Flowable.java
new file mode 100644
index 0000000..2d9d4d0
--- /dev/null
+++ b/room/compiler/src/test/data/common/input/rxjava2/Flowable.java
@@ -0,0 +1,6 @@
+// fake rx flowable
+package io.reactivex;
+import org.reactivestreams.Publisher;
+
+public abstract class Flowable<T> implements Publisher<T> {
+}
\ No newline at end of file
diff --git a/room/compiler/src/test/kotlin/com/android/support/room/solver/TypeAdapterStoreTest.kt b/room/compiler/src/test/kotlin/com/android/support/room/solver/TypeAdapterStoreTest.kt
index 8268d9d..fd83856 100644
--- a/room/compiler/src/test/kotlin/com/android/support/room/solver/TypeAdapterStoreTest.kt
+++ b/room/compiler/src/test/kotlin/com/android/support/room/solver/TypeAdapterStoreTest.kt
@@ -16,15 +16,21 @@
 
 package com.android.support.room.solver
 
+import COMMON
 import com.android.support.room.Entity
 import com.android.support.room.ext.L
+import com.android.support.room.ext.LifecyclesTypeNames
+import com.android.support.room.ext.ReactiveStreamsTypeNames
 import com.android.support.room.ext.RoomTypeNames.STRING_UTIL
+import com.android.support.room.ext.RxJava2TypeNames
 import com.android.support.room.ext.T
 import com.android.support.room.processor.Context
+import com.android.support.room.processor.ProcessorErrors
 import com.android.support.room.solver.types.CompositeAdapter
 import com.android.support.room.solver.types.TypeConverter
 import com.android.support.room.testing.TestInvocation
 import com.android.support.room.testing.TestProcessor
+import com.google.auto.common.MoreTypes
 import com.google.common.truth.Truth
 import com.google.testing.compile.CompileTester
 import com.google.testing.compile.JavaFileObjects
@@ -37,6 +43,7 @@
 import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.JUnit4
+import simpleRun
 import testCodeGenScope
 import javax.annotation.processing.ProcessingEnvironment
 import javax.lang.model.type.TypeKind
@@ -166,6 +173,53 @@
         }
     }
 
+    @Test
+    fun testMissingRxRoom() {
+        simpleRun(jfos = *arrayOf(COMMON.PUBLISHER, COMMON.FLOWABLE)) { invocation ->
+            val publisherElement = invocation.processingEnv.elementUtils
+                    .getTypeElement(ReactiveStreamsTypeNames.PUBLISHER.toString())
+            assertThat(publisherElement, notNullValue())
+            assertThat(invocation.context.typeAdapterStore.isRxJava2Publisher(
+                    MoreTypes.asDeclared(publisherElement.asType())), `is`(true))
+        }.failsToCompile().withErrorContaining(ProcessorErrors.MISSING_ROOM_RXJAVA2_ARTIFACT)
+    }
+
+    @Test
+    fun testFindPublisher() {
+        simpleRun(jfos = *arrayOf(COMMON.PUBLISHER, COMMON.FLOWABLE, COMMON.RX2_ROOM)) {
+            invocation ->
+            val publisher = invocation.processingEnv.elementUtils
+                    .getTypeElement(ReactiveStreamsTypeNames.PUBLISHER.toString())
+            assertThat(publisher, notNullValue())
+            assertThat(invocation.context.typeAdapterStore.isRxJava2Publisher(
+                    MoreTypes.asDeclared(publisher.asType())), `is`(true))
+        }.compilesWithoutError()
+    }
+
+    @Test
+    fun testFindFlowable() {
+        simpleRun(jfos = *arrayOf(COMMON.PUBLISHER, COMMON.FLOWABLE, COMMON.RX2_ROOM)) {
+            invocation ->
+            val flowable = invocation.processingEnv.elementUtils
+                    .getTypeElement(RxJava2TypeNames.FLOWABLE.toString())
+            assertThat(flowable, notNullValue())
+            assertThat(invocation.context.typeAdapterStore.isRxJava2Publisher(
+                    MoreTypes.asDeclared(flowable.asType())), `is`(true))
+        }.compilesWithoutError()
+    }
+
+    @Test
+    fun testFindLiveData() {
+        simpleRun(jfos = *arrayOf(COMMON.COMPUTABLE_LIVE_DATA, COMMON.LIVE_DATA)) {
+            invocation ->
+            val liveData = invocation.processingEnv.elementUtils
+                    .getTypeElement(LifecyclesTypeNames.LIVE_DATA.toString())
+            assertThat(liveData, notNullValue())
+            assertThat(invocation.context.typeAdapterStore.isLiveData(
+                    MoreTypes.asDeclared(liveData.asType())), `is`(true))
+        }.compilesWithoutError()
+    }
+
     private fun createIntListToStringBinders(invocation: TestInvocation): List<TypeConverter> {
         val intType = invocation.processingEnv.elementUtils
                 .getTypeElement(Integer::class.java.canonicalName)
diff --git a/room/compiler/src/test/kotlin/com/android/support/room/testing/test_util.kt b/room/compiler/src/test/kotlin/com/android/support/room/testing/test_util.kt
index b8f6bf7..62d6f6c 100644
--- a/room/compiler/src/test/kotlin/com/android/support/room/testing/test_util.kt
+++ b/room/compiler/src/test/kotlin/com/android/support/room/testing/test_util.kt
@@ -21,6 +21,9 @@
 import com.android.support.room.Query
 import com.android.support.room.Relation
 import com.android.support.room.ext.LifecyclesTypeNames
+import com.android.support.room.ext.ReactiveStreamsTypeNames
+import com.android.support.room.ext.RoomRxJava2TypeNames
+import com.android.support.room.ext.RxJava2TypeNames
 import com.android.support.room.processor.EntityProcessor
 import com.android.support.room.solver.CodeGenScope
 import com.android.support.room.testing.TestInvocation
@@ -69,6 +72,17 @@
         loadJavaCode("common/input/ComputableLiveData.java",
                 LifecyclesTypeNames.COMPUTABLE_LIVE_DATA.toString())
     }
+    val PUBLISHER by lazy {
+        loadJavaCode("common/input/reactivestreams/Publisher.java",
+                ReactiveStreamsTypeNames.PUBLISHER.toString())
+    }
+    val FLOWABLE by lazy {
+        loadJavaCode("common/input/rxjava2/Flowable.java", RxJava2TypeNames.FLOWABLE.toString())
+    }
+
+    val RX2_ROOM by lazy {
+        loadJavaCode("common/input/Rx2Room.java", RoomRxJava2TypeNames.RX_ROOM.toString())
+    }
 }
 fun testCodeGenScope(): CodeGenScope {
     return CodeGenScope(Mockito.mock(ClassWriter::class.java))
diff --git a/room/integration-tests/testapp/build.gradle b/room/integration-tests/testapp/build.gradle
index 81706d4..c1fc243 100644
--- a/room/integration-tests/testapp/build.gradle
+++ b/room/integration-tests/testapp/build.gradle
@@ -59,18 +59,20 @@
 
     androidTestCompile(libs.test_runner) {
         exclude module: 'support-annotations'
+        exclude module: 'hamcrest-core'
     }
     androidTestCompile(libs.espresso_core, {
         exclude group: 'com.android.support', module: 'support-annotations'
+        exclude module: "hamcrest-core"
     })
     // IJ's gradle integration just cannot figure this out ...
     androidTestCompile project(':lifecycle:extensions')
     androidTestCompile project(':lifecycle:common')
     androidTestCompile project(':lifecycle:runtime')
     androidTestCompile project(':room:testing')
-    testCompile libs.junit
-    testCompile libs.mockito_core
-
+    androidTestCompile project(':room:rxjava2')
+    androidTestCompile libs.rx_java
+    testCompile libs.mockito.all
 }
 
 createAndroidCheckstyle(project)
diff --git a/room/integration-tests/testapp/src/androidTest/java/com/android/support/room/integration/testapp/dao/UserDao.java b/room/integration-tests/testapp/src/androidTest/java/com/android/support/room/integration/testapp/dao/UserDao.java
index 49ec86f..60d009d 100644
--- a/room/integration-tests/testapp/src/androidTest/java/com/android/support/room/integration/testapp/dao/UserDao.java
+++ b/room/integration-tests/testapp/src/androidTest/java/com/android/support/room/integration/testapp/dao/UserDao.java
@@ -28,9 +28,13 @@
 import com.android.support.room.integration.testapp.vo.AvgWeightByAge;
 import com.android.support.room.integration.testapp.vo.User;
 
+import org.reactivestreams.Publisher;
+
 import java.util.Date;
 import java.util.List;
 
+import io.reactivex.Flowable;
+
 @SuppressWarnings("SameParameterValue")
 @Dao
 public interface UserDao {
@@ -108,4 +112,13 @@
 
     @Query("select mId from user where mId IN (:ids)")
     Cursor findUsersAsCursor(int... ids);
+
+    @Query("select * from user where mId = :id")
+    Flowable<User> flowableUserById(int id);
+
+    @Query("select COUNT(*) from user")
+    Flowable<Integer> flowableCountUsers();
+
+    @Query("select COUNT(*) from user")
+    Publisher<Integer> publisherCountUsers();
 }
diff --git a/room/integration-tests/testapp/src/androidTest/java/com/android/support/room/integration/testapp/test/RxJava2Test.java b/room/integration-tests/testapp/src/androidTest/java/com/android/support/room/integration/testapp/test/RxJava2Test.java
new file mode 100644
index 0000000..cf5623a
--- /dev/null
+++ b/room/integration-tests/testapp/src/androidTest/java/com/android/support/room/integration/testapp/test/RxJava2Test.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.support.room.integration.testapp.test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import android.support.test.filters.MediumTest;
+import android.support.test.filters.SmallTest;
+import android.support.test.runner.AndroidJUnit4;
+
+import com.android.support.executors.AppToolkitTaskExecutor;
+import com.android.support.executors.TaskExecutor;
+import com.android.support.room.integration.testapp.vo.User;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import io.reactivex.disposables.Disposable;
+import io.reactivex.schedulers.TestScheduler;
+import io.reactivex.subscribers.TestSubscriber;
+
+@SmallTest
+@RunWith(AndroidJUnit4.class)
+public class RxJava2Test extends TestDatabaseTest {
+
+    private TestScheduler mTestScheduler;
+
+    @Before
+    public void setupSchedulers() {
+        mTestScheduler = new TestScheduler();
+        mTestScheduler.start();
+        AppToolkitTaskExecutor.getInstance().setDelegate(new TaskExecutor() {
+            @Override
+            public void executeOnDiskIO(Runnable runnable) {
+                mTestScheduler.scheduleDirect(runnable);
+            }
+
+            @Override
+            public void postToMainThread(Runnable runnable) {
+                Assert.fail("no main thread in this test");
+            }
+
+            @Override
+            public boolean isMainThread() {
+                return false;
+            }
+        });
+    }
+
+    @After
+    public void clearSchedulers() {
+        mTestScheduler.shutdown();
+        AppToolkitTaskExecutor.getInstance().setDelegate(null);
+    }
+
+    private void drain() throws InterruptedException {
+        mTestScheduler.triggerActions();
+    }
+
+    @Test
+    public void observeOnce() throws InterruptedException {
+        User user = TestUtil.createUser(3);
+        mUserDao.insert(user);
+        drain();
+        TestSubscriber<User> consumer = new TestSubscriber<>();
+        Disposable disposable = mUserDao.flowableUserById(3).subscribeWith(consumer);
+        drain();
+        consumer.assertValue(user);
+        disposable.dispose();
+    }
+
+    @Test
+    public void observeChangeAndDispose() throws InterruptedException {
+        User user = TestUtil.createUser(3);
+        mUserDao.insert(user);
+        drain();
+        TestSubscriber<User> consumer = new TestSubscriber<>();
+        Disposable disposable = mUserDao.flowableUserById(3).observeOn(mTestScheduler)
+                .subscribeWith(consumer);
+        drain();
+        assertThat(consumer.values().get(0), is(user));
+        user.setName("rxy");
+        mUserDao.insertOrReplace(user);
+        drain();
+        User next = consumer.values().get(1);
+        assertThat(next, is(user));
+        disposable.dispose();
+        user.setName("foo");
+        mUserDao.insertOrReplace(user);
+        drain();
+        assertThat(consumer.valueCount(), is(2));
+    }
+
+    @Test
+    @MediumTest
+    public void observeEmpty() throws InterruptedException {
+        TestSubscriber<User> consumer = new TestSubscriber<>();
+        Disposable disposable = mUserDao.flowableUserById(3).observeOn(mTestScheduler)
+                .subscribeWith(consumer);
+        drain();
+        consumer.assertNoValues();
+        User user = TestUtil.createUser(3);
+        mUserDao.insert(user);
+        drain();
+        assertThat(consumer.values().get(0), is(user));
+        disposable.dispose();
+        user.setAge(88);
+        mUserDao.insertOrReplace(user);
+        drain();
+        assertThat(consumer.valueCount(), is(1));
+    }
+
+    @Test
+    public void flowableCountUsers() throws InterruptedException {
+        TestSubscriber<Integer> consumer = new TestSubscriber<>();
+        mUserDao.flowableCountUsers()
+                .observeOn(mTestScheduler)
+                .subscribe(consumer);
+        drain();
+        assertThat(consumer.values().get(0), is(0));
+        mUserDao.insertAll(TestUtil.createUsersArray(1, 3, 4, 6));
+        drain();
+        assertThat(consumer.values().get(1), is(4));
+        mUserDao.deleteByUids(3, 7);
+        drain();
+        assertThat(consumer.values().get(2), is(3));
+        mUserDao.deleteByUids(101);
+        drain();
+        assertThat(consumer.valueCount(), is(3));
+    }
+
+    @Test
+    @MediumTest
+    public void publisherCountUsers() throws InterruptedException {
+        TestSubscriber<Integer> subscriber = new TestSubscriber<>();
+        mUserDao.publisherCountUsers().subscribe(subscriber);
+        drain();
+        subscriber.assertSubscribed();
+        subscriber.request(2);
+        drain();
+        subscriber.assertValue(0);
+        mUserDao.insert(TestUtil.createUser(2));
+        drain();
+        subscriber.assertValues(0, 1);
+        subscriber.cancel();
+        subscriber.assertNoErrors();
+    }
+}
diff --git a/room/rxjava2/build.gradle b/room/rxjava2/build.gradle
new file mode 100644
index 0000000..1cb2479
--- /dev/null
+++ b/room/rxjava2/build.gradle
@@ -0,0 +1,72 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import com.android.builder.core.BuilderConstants
+apply plugin: 'com.android.library'
+apply plugin: 'maven'
+
+android {
+    compileSdkVersion tools.current_sdk
+    buildToolsVersion tools.build_tools_version
+
+    defaultConfig {
+        minSdkVersion flatfoot.min_sdk
+        targetSdkVersion tools.current_sdk
+        versionCode 1
+        versionName "1.0"
+
+        testInstrumentationRunner "android.support.test.runner.AndroidJUnitRunner"
+
+    }
+    buildTypes {
+        release {
+            minifyEnabled false
+            proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro'
+        }
+    }
+
+    testOptions {
+        unitTests.returnDefaultValues = true
+    }
+    compileOptions {
+        sourceCompatibility JavaVersion.VERSION_1_7
+        targetCompatibility JavaVersion.VERSION_1_7
+    }
+}
+
+dependencies {
+    compile project(":room:common")
+    compile project(":room:runtime")
+    compile project(":apptoolkit:core")
+    compile libs.support.core_utils
+    compile libs.rx_java
+    testCompile libs.junit
+    testCompile libs.mockito.all
+    testCompile project(":apptoolkit:core-testing")
+}
+
+archivesBaseName = "rxjava2"
+
+createAndroidCheckstyle(project)
+
+android.libraryVariants.all { variant ->
+    def name = variant.buildType.name
+    def suffix = name.capitalize()
+    project.tasks.create(name: "jar${suffix}", type: Jar){
+        dependsOn variant.javaCompile
+        from variant.javaCompile.destinationDir
+        destinationDir new File(project.buildDir, "libJar")
+    }
+}
diff --git a/room/rxjava2/src/main/AndroidManifest.xml b/room/rxjava2/src/main/AndroidManifest.xml
new file mode 100644
index 0000000..870eae1
--- /dev/null
+++ b/room/rxjava2/src/main/AndroidManifest.xml
@@ -0,0 +1,19 @@
+<!--
+  ~ Copyright (C) 2017 The Android Open Source Project
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<manifest xmlns:android="http://schemas.android.com/apk/res/android"
+          package="com.android.support.room.rxjava2">
+</manifest>
diff --git a/room/rxjava2/src/main/java/com/android/support/room/RxRoom.java b/room/rxjava2/src/main/java/com/android/support/room/RxRoom.java
new file mode 100644
index 0000000..4343502
--- /dev/null
+++ b/room/rxjava2/src/main/java/com/android/support/room/RxRoom.java
@@ -0,0 +1,188 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.support.room;
+
+import android.support.annotation.Nullable;
+import android.support.annotation.RestrictTo;
+
+import com.android.support.executors.AppToolkitTaskExecutor;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.reactivex.BackpressureStrategy;
+import io.reactivex.Flowable;
+import io.reactivex.FlowableEmitter;
+import io.reactivex.FlowableOnSubscribe;
+import io.reactivex.Scheduler;
+import io.reactivex.annotations.NonNull;
+import io.reactivex.disposables.Disposable;
+import io.reactivex.disposables.Disposables;
+import io.reactivex.functions.Action;
+import io.reactivex.functions.Function;
+import io.reactivex.functions.Predicate;
+
+/**
+ * Helper class to add RxJava2 support to Room.
+ */
+@SuppressWarnings("WeakerAccess")
+public class RxRoom {
+    /**
+     * Data dispatched by the publisher created by {@link #createFlowable(RoomDatabase, String...)}.
+     */
+    public static final Object NOTHING = new Object();
+
+    /**
+     * Creates a {@link Flowable} that emits at least once and also re-emits whenever one of the
+     * observed tables is updated.
+     * <p>
+     * You can easily chain a database operation to downstream of this {@link Flowable} to ensure
+     * that it re-runs when database is modified.
+     * <p>
+     * Since database invalidation is batched, multiple changes in the database may results in just
+     * 1 emission.
+     *
+     * @param database   The database instance
+     * @param tableNames The list of table names that should be observed
+     * @return A {@link Flowable} which emits {@link #NOTHING} when one of the observed tables
+     * is modified (also once when the invalidation tracker connection is established).
+     */
+    public static Flowable<Object> createFlowable(final RoomDatabase database,
+            final String... tableNames) {
+        return Flowable.create(new FlowableOnSubscribe<Object>() {
+            @Override
+            public void subscribe(final FlowableEmitter<Object> emitter) throws Exception {
+                final InvalidationTracker.Observer observer = new InvalidationTracker.Observer(
+                        tableNames) {
+                    @Override
+                    public void onInvalidated() {
+                        if (!emitter.isCancelled()) {
+                            emitter.onNext(NOTHING);
+                        }
+                    }
+                };
+                if (!emitter.isCancelled()) {
+                    database.getInvalidationTracker().addObserver(observer);
+                    emitter.setDisposable(Disposables.fromAction(new Action() {
+                        @Override
+                        public void run() throws Exception {
+                            database.getInvalidationTracker().removeObserver(observer);
+                        }
+                    }));
+                }
+
+                // emit once to avoid missing any data and also easy chaining
+                if (!emitter.isCancelled()) {
+                    emitter.onNext(NOTHING);
+                }
+            }
+        }, BackpressureStrategy.LATEST);
+    }
+
+    /**
+     * Helper method used by generated code to bind a Callable such that it will be run in
+     * our disk io thread and will automatically block null values since RxJava2 does not like null.
+     *
+     * @hide
+     */
+    @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
+    public static <T> Flowable<T> createFlowable(final RoomDatabase database,
+            final String[] tableNames, final Callable<T> callable) {
+        return createFlowable(database, tableNames).observeOn(sAppToolkitIOScheduler)
+                .map(new Function<Object, Optional<T>>() {
+                    @Override
+                    public Optional<T> apply(@NonNull Object o) throws Exception {
+                        T data = callable.call();
+                        return new Optional<>(data);
+                    }
+                }).filter(new Predicate<Optional<T>>() {
+                    @Override
+                    public boolean test(@NonNull Optional<T> optional) throws Exception {
+                        return optional.mValue != null;
+                    }
+                }).map(new Function<Optional<T>, T>() {
+                    @Override
+                    public T apply(@NonNull Optional<T> optional) throws Exception {
+                        return optional.mValue;
+                    }
+                });
+    }
+
+    private static Scheduler sAppToolkitIOScheduler = new Scheduler() {
+        @Override
+        public Worker createWorker() {
+            final AtomicBoolean mDisposed = new AtomicBoolean(false);
+            return new Worker() {
+                @Override
+                public Disposable schedule(@NonNull Runnable run, long delay,
+                        @NonNull TimeUnit unit) {
+                    DisposableRunnable disposable = new DisposableRunnable(run, mDisposed);
+                    AppToolkitTaskExecutor.getInstance().executeOnDiskIO(run);
+                    return disposable;
+                }
+
+                @Override
+                public void dispose() {
+                    mDisposed.set(true);
+                }
+
+                @Override
+                public boolean isDisposed() {
+                    return mDisposed.get();
+                }
+            };
+        }
+    };
+
+    private static class DisposableRunnable implements Disposable, Runnable {
+        private final Runnable mActual;
+        private volatile boolean mDisposed = false;
+        private final AtomicBoolean mGlobalDisposed;
+
+        DisposableRunnable(Runnable actual, AtomicBoolean globalDisposed) {
+            mActual = actual;
+            mGlobalDisposed = globalDisposed;
+        }
+
+        @Override
+        public void dispose() {
+            mDisposed = true;
+        }
+
+        @Override
+        public boolean isDisposed() {
+            return mDisposed || mGlobalDisposed.get();
+        }
+
+        @Override
+        public void run() {
+            if (!isDisposed()) {
+                mActual.run();
+            }
+        }
+    }
+
+    static class Optional<T> {
+        @Nullable
+        final T mValue;
+
+        Optional(@Nullable T value) {
+            this.mValue = value;
+        }
+    }
+}
diff --git a/room/rxjava2/src/test/java/com/android/support/room/RxRoomTest.java b/room/rxjava2/src/test/java/com/android/support/room/RxRoomTest.java
new file mode 100644
index 0000000..4303611
--- /dev/null
+++ b/room/rxjava2/src/test/java/com/android/support/room/RxRoomTest.java
@@ -0,0 +1,172 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.support.room;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.android.support.apptoolkit.testing.JunitTaskExecutorRule;
+
+import org.hamcrest.CoreMatchers;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.reactivex.Flowable;
+import io.reactivex.annotations.NonNull;
+import io.reactivex.disposables.Disposable;
+import io.reactivex.functions.Consumer;
+import io.reactivex.subscribers.TestSubscriber;
+
+@RunWith(JUnit4.class)
+public class RxRoomTest {
+    @Rule
+    public JunitTaskExecutorRule mExecutor = new JunitTaskExecutorRule(1, false);
+    private RoomDatabase mDatabase;
+    private InvalidationTracker mInvalidationTracker;
+    private List<InvalidationTracker.Observer> mAddedObservers = new ArrayList<>();
+
+    @Before
+    public void init() {
+        mDatabase = mock(RoomDatabase.class);
+        mInvalidationTracker = mock(InvalidationTracker.class);
+        when(mDatabase.getInvalidationTracker()).thenReturn(mInvalidationTracker);
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                mAddedObservers.add((InvalidationTracker.Observer) invocation.getArguments()[0]);
+                return null;
+            }
+        }).when(mInvalidationTracker).addObserver(any(InvalidationTracker.Observer.class));
+    }
+
+    @Test
+    public void basicAddRemove() {
+        Flowable<Object> flowable = RxRoom.createFlowable(mDatabase, "a", "b");
+        verify(mInvalidationTracker, never()).addObserver(any(InvalidationTracker.Observer.class));
+        Disposable disposable = flowable.subscribe();
+        verify(mInvalidationTracker).addObserver(any(InvalidationTracker.Observer.class));
+        assertThat(mAddedObservers.size(), CoreMatchers.is(1));
+
+        InvalidationTracker.Observer observer = mAddedObservers.get(0);
+        disposable.dispose();
+
+        verify(mInvalidationTracker).removeObserver(observer);
+
+        disposable = flowable.subscribe();
+        verify(mInvalidationTracker, times(2))
+                .addObserver(any(InvalidationTracker.Observer.class));
+        assertThat(mAddedObservers.size(), CoreMatchers.is(2));
+        assertThat(mAddedObservers.get(1), CoreMatchers.not(CoreMatchers.sameInstance(observer)));
+        InvalidationTracker.Observer observer2 = mAddedObservers.get(1);
+        disposable.dispose();
+        verify(mInvalidationTracker).removeObserver(observer2);
+    }
+
+    @Test
+    public void basicNotify() throws InterruptedException {
+        Flowable<Object> flowable = RxRoom.createFlowable(mDatabase, "a", "b");
+        CountingConsumer consumer = new CountingConsumer();
+        Disposable disposable = flowable.subscribe(consumer);
+        assertThat(mAddedObservers.size(), CoreMatchers.is(1));
+        InvalidationTracker.Observer observer = mAddedObservers.get(0);
+        assertThat(consumer.mCount, CoreMatchers.is(1));
+        observer.onInvalidated();
+        assertThat(consumer.mCount, CoreMatchers.is(2));
+        observer.onInvalidated();
+        assertThat(consumer.mCount, CoreMatchers.is(3));
+        disposable.dispose();
+        observer.onInvalidated();
+        assertThat(consumer.mCount, CoreMatchers.is(3));
+    }
+
+    @Test
+    public void internalCallable() throws InterruptedException {
+        final AtomicReference<String> value = new AtomicReference<>(null);
+        final Flowable<String> flowable = RxRoom.createFlowable(mDatabase, new String[]{"a", "b"},
+                new Callable<String>() {
+                    @Override
+                    public String call() throws Exception {
+                        return value.get();
+                    }
+                });
+        final CountingConsumer consumer = new CountingConsumer();
+        flowable.subscribe(consumer);
+        InvalidationTracker.Observer observer = mAddedObservers.get(0);
+        drain();
+        // no value because it is null
+        assertThat(consumer.mCount, CoreMatchers.is(0));
+        value.set("bla");
+        observer.onInvalidated();
+        drain();
+        // get value
+        assertThat(consumer.mCount, CoreMatchers.is(1));
+        observer.onInvalidated();
+        drain();
+        // get value
+        assertThat(consumer.mCount, CoreMatchers.is(2));
+        value.set(null);
+        observer.onInvalidated();
+        drain();
+        // no value
+        assertThat(consumer.mCount, CoreMatchers.is(2));
+    }
+
+    private void drain() throws InterruptedException {
+        mExecutor.drainTasks(2);
+    }
+
+    @Test
+    public void exception() throws InterruptedException {
+        final Flowable<String> flowable = RxRoom.createFlowable(mDatabase, new String[]{"a"},
+                new Callable<String>() {
+                    @Override
+                    public String call() throws Exception {
+                        throw new Exception("i want exception");
+                    }
+                });
+        TestSubscriber<String> subscriber = new TestSubscriber<>();
+        flowable.subscribe(subscriber);
+        drain();
+        assertThat(subscriber.errorCount(), CoreMatchers.is(1));
+        assertThat(subscriber.errors().get(0).getMessage(), CoreMatchers.is("i want exception"));
+    }
+
+    private static class CountingConsumer implements Consumer<Object> {
+        int mCount = 0;
+
+        @Override
+        public void accept(@NonNull Object o) throws Exception {
+            mCount++;
+        }
+    }
+}