Adds ReactiveStreams extensions support for LiveData.
Test: LiveDataReactiveStreamsTest
Bug: 34847055
Change-Id: Ia00f3f99f2f5da43e7c0aafad664d5f9e2be5cff
diff --git a/app-toolkit/init.gradle b/app-toolkit/init.gradle
index f88900f..f7c684e 100644
--- a/app-toolkit/init.gradle
+++ b/app-toolkit/init.gradle
@@ -77,6 +77,8 @@
ext.espresso_version = "2.2.2"
ext.release_version = "1.0-SNAPSHOT"
ext.atsl_version = "0.6-alpha"
+ext.rxjava2_version = "2.0.6"
+ext.reactivestreams_version = "1.0.0"
// this Xerial version is newer than we want but we need it to fix
// https://github.com/xerial/sqlite-jdbc/issues/97
ext.xerial_version = "3.16.1"
diff --git a/app-toolkit/settings.gradle b/app-toolkit/settings.gradle
index 09a6b20..a3651a7 100644
--- a/app-toolkit/settings.gradle
+++ b/app-toolkit/settings.gradle
@@ -23,6 +23,9 @@
include ':lifecycle:extensions'
project(':lifecycle:extensions').projectDir = new File("../lifecycle/extensions")
+include ':lifecycle:reactivestreams'
+project(':lifecycle:reactivestreams').projectDir = new File("../lifecycle/reactivestreams")
+
include ':lifecycle:runtime'
project(':lifecycle:runtime').projectDir = new File("../lifecycle/runtime")
diff --git a/lifecycle/reactivestreams/.gitignore b/lifecycle/reactivestreams/.gitignore
new file mode 100644
index 0000000..796b96d
--- /dev/null
+++ b/lifecycle/reactivestreams/.gitignore
@@ -0,0 +1 @@
+/build
diff --git a/lifecycle/reactivestreams/build.gradle b/lifecycle/reactivestreams/build.gradle
new file mode 100644
index 0000000..5cb2a94
--- /dev/null
+++ b/lifecycle/reactivestreams/build.gradle
@@ -0,0 +1,69 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: 'com.android.library'
+apply plugin: 'maven'
+
+android {
+ compileSdkVersion compile_sdk_version
+ buildToolsVersion build_tools_version
+
+ defaultConfig {
+ minSdkVersion min_sdk_version
+ targetSdkVersion target_sdk_version
+ versionCode 1
+ versionName "1.0"
+
+ testInstrumentationRunner "android.support.test.runner.AndroidJUnitRunner"
+ }
+ buildTypes {
+ release {
+ minifyEnabled false
+ proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro'
+ }
+ }
+
+ testOptions {
+ unitTests.returnDefaultValues = true
+ }
+}
+
+allprojects {
+ dependencies {
+ compile project(":lifecycle:extensions")
+ compile project(":lifecycle:runtime")
+ compile "org.reactivestreams:reactive-streams:$reactivestreams_version"
+
+ testCompile "junit:junit:$junit_version"
+ testCompile "io.reactivex.rxjava2:rxjava:$rxjava2_version"
+
+ testCompile("com.android.support.test:runner:$atsl_version") {
+ exclude module: 'support-annotations'
+ }
+ androidTestCompile "com.android.support:appcompat-v7:$support_lib_version"
+ }
+}
+
+createAndroidCheckstyle(project)
+
+uploadArchives {
+ repositories {
+ mavenDeployer {
+ repository(url : rootProject.ext.localMavenRepo)
+ pom.artifactId = "reactivestreams"
+ }
+ }
+}
diff --git a/lifecycle/reactivestreams/proguard-rules.pro b/lifecycle/reactivestreams/proguard-rules.pro
new file mode 100644
index 0000000..b7210d1
--- /dev/null
+++ b/lifecycle/reactivestreams/proguard-rules.pro
@@ -0,0 +1,17 @@
+# Add project specific ProGuard rules here.
+# By default, the flags in this file are appended to flags specified
+# in /Users/yboyar/android/sdk/tools/proguard/proguard-android.txt
+# You can edit the include path and order by changing the proguardFiles
+# directive in build.gradle.
+#
+# For more details, see
+# http://developer.android.com/guide/developing/tools/proguard.html
+
+# Add any project specific keep options here:
+
+# If your project uses WebView with JS, uncomment the following
+# and specify the fully qualified class name to the JavaScript interface
+# class:
+#-keepclassmembers class fqcn.of.javascript.interface.for.webview {
+# public *;
+#}
diff --git a/lifecycle/reactivestreams/src/androidTest/AndroidManifest.xml b/lifecycle/reactivestreams/src/androidTest/AndroidManifest.xml
new file mode 100644
index 0000000..8254525
--- /dev/null
+++ b/lifecycle/reactivestreams/src/androidTest/AndroidManifest.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+ ~ Copyright (C) 2017 The Android Open Source Project
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<manifest xmlns:android="http://schemas.android.com/apk/res/android"
+ package="com.android.support.lifecycle.reactivestreams.test">
+
+ <application>
+ <activity android:name="com.android.support.lifecycle.viewmodeltest.ViewModelActivity"
+ android:theme="@style/Base.Theme.AppCompat">
+ </activity>
+ </application>
+
+</manifest>
diff --git a/lifecycle/reactivestreams/src/main/AndroidManifest.xml b/lifecycle/reactivestreams/src/main/AndroidManifest.xml
new file mode 100644
index 0000000..ea6ba2b
--- /dev/null
+++ b/lifecycle/reactivestreams/src/main/AndroidManifest.xml
@@ -0,0 +1,19 @@
+<!--
+ ~ Copyright (C) 2017 The Android Open Source Project
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<manifest xmlns:android="http://schemas.android.com/apk/res/android"
+ package="com.android.support.lifecycle.reactivestreams">
+</manifest>
diff --git a/lifecycle/reactivestreams/src/main/java/com/android/support/lifecycle/LiveDataReactiveStreams.java b/lifecycle/reactivestreams/src/main/java/com/android/support/lifecycle/LiveDataReactiveStreams.java
new file mode 100644
index 0000000..dbce0b0
--- /dev/null
+++ b/lifecycle/reactivestreams/src/main/java/com/android/support/lifecycle/LiveDataReactiveStreams.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.support.lifecycle;
+
+import android.support.annotation.Nullable;
+
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+import java.lang.ref.WeakReference;
+
+/**
+ * Adapts {@link LiveData} input and output to the ReactiveStreams spec.
+ */
+public final class LiveDataReactiveStreams {
+ private LiveDataReactiveStreams() {
+ }
+
+ /**
+ * Adapts the given {@link LiveData} stream to a ReactiveStreams {@link Publisher}.
+ *
+ * <p>
+ * By using a good publisher implementation such as RxJava 2.x Flowables, most consumers will
+ * be able to let the library deal with backpressure using operators and not need to worry about
+ * ever manually calling {@link Subscription#request}.
+ *
+ * <p>
+ * On subscription to the publisher, the observer will attach to the given {@link LiveData}.
+ * Once {@link Subscription#request) is called on the subscription object, an observer will be
+ * connected to the data stream. Calling request(Long.MAX_VALUE) is equivalent to creating an
+ * unbounded stream with no backpressure. If request with a finite count reaches 0, the observer
+ * will buffer the latest item and emit it to the subscriber when data is again requested. Any
+ * other items emitted during the time there was no backpressure requested will be dropped.
+ */
+ public static <T> Publisher<T> toPublisher(
+ final LifecycleProvider lifecycle, final LiveData<T> liveData) {
+ final Object lock = new Object();
+
+ return new Publisher<T>() {
+ boolean mObserving;
+ boolean mCanceled;
+ long mRequested;
+ @Nullable
+ T mLatest;
+
+ @Override
+ public void subscribe(final Subscriber<? super T> subscriber) {
+ final Observer<T> observer = new Observer<T>() {
+ @Override
+ public void onChanged(@Nullable T t) {
+ if (mCanceled) {
+ return;
+ }
+ synchronized (lock) {
+ if (mCanceled) {
+ return;
+ }
+ if (mRequested > 0) {
+ mLatest = null;
+ subscriber.onNext(t);
+ if (mRequested != Long.MAX_VALUE) {
+ mRequested--;
+ }
+ } else {
+ mLatest = t;
+ }
+ }
+ }
+ };
+
+ subscriber.onSubscribe(new Subscription() {
+ @Override
+ public void request(long n) {
+ if (n < 0) {
+ return;
+ }
+ if (mCanceled) {
+ return;
+ }
+ synchronized (lock) {
+ if (mCanceled) {
+ return;
+ }
+ // Prevent overflowage.
+ mRequested =
+ mRequested + n >= mRequested ? mRequested + n : Long.MAX_VALUE;
+ if (!mObserving) {
+ mObserving = true;
+ liveData.observe(lifecycle, observer);
+ } else if (mLatest != null) {
+ observer.onChanged(mLatest);
+ mLatest = null;
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ if (!mCanceled) {
+ synchronized (lock) {
+ if (!mCanceled) {
+ if (mObserving) {
+ liveData.removeObserver(observer);
+ mObserving = false;
+ }
+ mLatest = null;
+ mCanceled = true;
+ }
+ }
+ }
+ }
+ });
+ }
+
+ };
+ }
+
+ /**
+ * Creates an Observable {@link LiveData} stream from a ReactiveStreams publisher.
+ */
+ public static <T> LiveData<T> fromPublisher(final Publisher<T> publisher) {
+ LiveData<T> liveData = new LiveData<T>();
+ // Since we don't have a way to directly observe cancels, weakly hold the live data.
+ final WeakReference<LiveData<T>> liveDataRef = new WeakReference<>(liveData);
+
+ publisher.subscribe(new Subscriber<T>() {
+ @Override
+ public void onSubscribe(Subscription s) {
+ // Don't worry about backpressure. If the stream is too noisy then backpressure can
+ // be handled upstream.
+ s.request(Long.MAX_VALUE);
+ }
+
+ @Override
+ public void onNext(T t) {
+ LiveData<T> liveData = liveDataRef.get();
+ if (liveData != null) {
+ liveData.setValue(t);
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ // Errors should be handled upstream, so propagate as a crash.
+ throw new RuntimeException(t);
+ }
+
+ @Override
+ public void onComplete() {
+ }
+ });
+
+ return liveData;
+ }
+
+}
diff --git a/lifecycle/reactivestreams/src/test/java/com/android/support/lifecycle/LiveDataReactiveStreamsTest.java b/lifecycle/reactivestreams/src/test/java/com/android/support/lifecycle/LiveDataReactiveStreamsTest.java
new file mode 100644
index 0000000..53f1fcc
--- /dev/null
+++ b/lifecycle/reactivestreams/src/test/java/com/android/support/lifecycle/LiveDataReactiveStreamsTest.java
@@ -0,0 +1,295 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.support.lifecycle;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import android.support.annotation.Nullable;
+import android.support.test.filters.SmallTest;
+
+import com.android.support.executors.AppToolkitTaskExecutor;
+import com.android.support.executors.TaskExecutor;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import io.reactivex.Flowable;
+import io.reactivex.disposables.Disposable;
+import io.reactivex.functions.Consumer;
+import io.reactivex.processors.PublishProcessor;
+import io.reactivex.processors.ReplayProcessor;
+import io.reactivex.schedulers.TestScheduler;
+import io.reactivex.subjects.AsyncSubject;
+
+@SmallTest
+public class LiveDataReactiveStreamsTest {
+ private static final Lifecycle sLifecycle = new Lifecycle() {
+ @Override
+ public void addObserver(LifecycleObserver observer) {
+ }
+
+ @Override
+ public void removeObserver(LifecycleObserver observer) {
+ }
+
+ @Override
+ public int getCurrentState() {
+ return Lifecycle.RESUMED;
+ }
+ };
+ private static final LifecycleProvider sLifecycleProvider = new LifecycleProvider() {
+
+ @Override
+ public Lifecycle getLifecycle() {
+ return sLifecycle;
+ }
+
+ };
+
+ private final List<String> mLiveDataOutput = new ArrayList<>();
+ private final Observer<String> mObserver = new Observer<String>() {
+ @Override
+ public void onChanged(@Nullable String s) {
+ mLiveDataOutput.add(s);
+ }
+ };
+
+ private final ReplayProcessor<String> mOutputProcessor = ReplayProcessor.create();
+
+ private static final TestScheduler sBackgroundScheduler = new TestScheduler();
+ private Thread mTestThread;
+
+ @Before
+ public void init() {
+ mTestThread = Thread.currentThread();
+ AppToolkitTaskExecutor.getInstance().setDelegate(new TaskExecutor() {
+
+ @Override
+ public void executeOnDiskIO(Runnable runnable) {
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public void executeOnMainThread(final Runnable runnable) {
+ runnable.run();
+ }
+
+ @Override
+ public boolean isMainThread() {
+ return Thread.currentThread() == mTestThread;
+ }
+
+ });
+ }
+
+ @After
+ public void removeExecutorDelegate() {
+ AppToolkitTaskExecutor.getInstance().setDelegate(null);
+ }
+
+ @Test
+ public void convertsFromPublisher() {
+ PublishProcessor<String> processor = PublishProcessor.create();
+ LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
+
+ liveData.observe(sLifecycleProvider, mObserver);
+
+ processor.onNext("foo");
+ processor.onNext("bar");
+ processor.onNext("baz");
+
+ assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz")));
+ }
+
+ @Test
+ public void convertsFromPublisherWithMultipleObservers() {
+ final List<String> output2 = new ArrayList<>();
+ PublishProcessor<String> processor = PublishProcessor.create();
+ LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
+
+ liveData.observe(sLifecycleProvider, mObserver);
+
+ processor.onNext("foo");
+ processor.onNext("bar");
+
+ // The second mObserver should only get the newest value and any later values.
+ liveData.observe(sLifecycleProvider, new Observer<String>() {
+ @Override
+ public void onChanged(@Nullable String s) {
+ output2.add(s);
+ }
+ });
+
+ processor.onNext("baz");
+
+ assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz")));
+ assertThat(output2, is(Arrays.asList("bar", "baz")));
+ }
+
+ @Test
+ public void convertsFromAsyncPublisher() {
+ Flowable<String> input = Flowable.just("foo")
+ .concatWith(Flowable.just("bar", "baz").observeOn(sBackgroundScheduler));
+ LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(input);
+
+ liveData.observe(sLifecycleProvider, mObserver);
+
+ assertThat(mLiveDataOutput, is(Collections.singletonList("foo")));
+ sBackgroundScheduler.triggerActions();
+ assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz")));
+ }
+
+ @Test
+ public void convertsToPublisherWithSyncData() {
+ LiveData<String> liveData = new LiveData<>();
+ liveData.setValue("foo");
+ assertThat(liveData.getValue(), is("foo"));
+
+ Flowable.fromPublisher(LiveDataReactiveStreams.toPublisher(sLifecycleProvider, liveData))
+ .subscribe(mOutputProcessor);
+
+ liveData.setValue("bar");
+ liveData.setValue("baz");
+
+ assertThat(
+ mOutputProcessor.getValues(new String[]{}),
+ is(new String[] {"foo", "bar", "baz"}));
+ }
+
+ @Test
+ public void convertingToPublisherIsCancelable() {
+ LiveData<String> liveData = new LiveData<>();
+ liveData.setValue("foo");
+ assertThat(liveData.getValue(), is("foo"));
+
+ Disposable disposable = Flowable
+ .fromPublisher(LiveDataReactiveStreams.toPublisher(sLifecycleProvider, liveData))
+ .subscribe(new Consumer<String>() {
+ @Override
+ public void accept(String s) throws Exception {
+ mLiveDataOutput.add(s);
+ }
+ });
+
+ liveData.setValue("bar");
+ liveData.setValue("baz");
+
+ assertThat(liveData.getObserverCount(), is(1));
+ disposable.dispose();
+
+ liveData.setValue("fizz");
+ liveData.setValue("buzz");
+
+ assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz")));
+ // Canceling disposable should also remove livedata mObserver.
+ assertThat(liveData.getObserverCount(), is(0));
+ }
+
+ @Test
+ public void convertsToPublisherWithBackpressure() {
+ LiveData<String> liveData = new LiveData<>();
+
+ final AsyncSubject<Subscription> subscriptionSubject = AsyncSubject.create();
+
+ Flowable.fromPublisher(LiveDataReactiveStreams.toPublisher(sLifecycleProvider, liveData))
+ .subscribe(new Subscriber<String>() {
+ @Override
+ public void onSubscribe(Subscription s) {
+ subscriptionSubject.onNext(s);
+ subscriptionSubject.onComplete();
+ }
+
+ @Override
+ public void onNext(String s) {
+ mOutputProcessor.onNext(s);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ throw new RuntimeException(t);
+ }
+
+ @Override
+ public void onComplete() {
+ }
+ });
+
+ // Subscription should have happened synchronously. If it didn't, this will deadlock.
+ final Subscription subscription = subscriptionSubject.blockingSingle();
+
+ subscription.request(1);
+ assertThat(mOutputProcessor.getValues(new String[]{}), is(new String[] {}));
+
+ liveData.setValue("foo");
+ assertThat(mOutputProcessor.getValues(new String[]{}), is(new String[] {"foo"}));
+
+ subscription.request(2);
+ liveData.setValue("baz");
+ liveData.setValue("fizz");
+
+ assertThat(
+ mOutputProcessor.getValues(new String[]{}),
+ is(new String[] {"foo", "baz", "fizz"}));
+
+ // 'nyan' will be dropped as there is nothing currently requesting a stream.
+ liveData.setValue("nyan");
+ liveData.setValue("cat");
+
+ assertThat(
+ mOutputProcessor.getValues(new String[]{}),
+ is(new String[] {"foo", "baz", "fizz"}));
+
+ // When a new request comes in, the latest value will be pushed.
+ subscription.request(1);
+ assertThat(
+ mOutputProcessor.getValues(new String[]{}),
+ is(new String[] {"foo", "baz", "fizz", "cat"}));
+ }
+
+ @Test
+ public void convertsToPublisherWithAsyncData() {
+ LiveData<String> liveData = new LiveData<>();
+
+ Flowable.fromPublisher(LiveDataReactiveStreams.toPublisher(sLifecycleProvider, liveData))
+ .observeOn(sBackgroundScheduler)
+ .subscribe(mOutputProcessor);
+
+ liveData.setValue("foo");
+
+ assertThat(mOutputProcessor.getValues(new String[]{}), is(new String[] {}));
+ sBackgroundScheduler.triggerActions();
+ assertThat(mOutputProcessor.getValues(new String[]{}), is(new String[] {"foo"}));
+
+ liveData.setValue("bar");
+ liveData.setValue("baz");
+
+ assertThat(mOutputProcessor.getValues(new String[]{}), is(new String[] {"foo"}));
+ sBackgroundScheduler.triggerActions();
+ assertThat(mOutputProcessor.getValues(
+ new String[]{}),
+ is(new String[] {"foo", "bar", "baz"}));
+ }
+}