Adds ReactiveStreams extensions support for LiveData.

Test: LiveDataReactiveStreamsTest
Bug: 34847055
Change-Id: Ia00f3f99f2f5da43e7c0aafad664d5f9e2be5cff
diff --git a/app-toolkit/init.gradle b/app-toolkit/init.gradle
index f88900f..f7c684e 100644
--- a/app-toolkit/init.gradle
+++ b/app-toolkit/init.gradle
@@ -77,6 +77,8 @@
 ext.espresso_version = "2.2.2"
 ext.release_version = "1.0-SNAPSHOT"
 ext.atsl_version = "0.6-alpha"
+ext.rxjava2_version = "2.0.6"
+ext.reactivestreams_version = "1.0.0"
 // this Xerial version is newer than we want but we need it to fix
 // https://github.com/xerial/sqlite-jdbc/issues/97
 ext.xerial_version = "3.16.1"
diff --git a/app-toolkit/settings.gradle b/app-toolkit/settings.gradle
index 09a6b20..a3651a7 100644
--- a/app-toolkit/settings.gradle
+++ b/app-toolkit/settings.gradle
@@ -23,6 +23,9 @@
 include ':lifecycle:extensions'
 project(':lifecycle:extensions').projectDir = new File("../lifecycle/extensions")
 
+include ':lifecycle:reactivestreams'
+project(':lifecycle:reactivestreams').projectDir = new File("../lifecycle/reactivestreams")
+
 include ':lifecycle:runtime'
 project(':lifecycle:runtime').projectDir = new File("../lifecycle/runtime")
 
diff --git a/lifecycle/reactivestreams/.gitignore b/lifecycle/reactivestreams/.gitignore
new file mode 100644
index 0000000..796b96d
--- /dev/null
+++ b/lifecycle/reactivestreams/.gitignore
@@ -0,0 +1 @@
+/build
diff --git a/lifecycle/reactivestreams/build.gradle b/lifecycle/reactivestreams/build.gradle
new file mode 100644
index 0000000..5cb2a94
--- /dev/null
+++ b/lifecycle/reactivestreams/build.gradle
@@ -0,0 +1,69 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: 'com.android.library'
+apply plugin: 'maven'
+
+android {
+    compileSdkVersion compile_sdk_version
+    buildToolsVersion build_tools_version
+
+    defaultConfig {
+        minSdkVersion min_sdk_version
+        targetSdkVersion target_sdk_version
+        versionCode 1
+        versionName "1.0"
+
+        testInstrumentationRunner "android.support.test.runner.AndroidJUnitRunner"
+    }
+    buildTypes {
+        release {
+            minifyEnabled false
+            proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro'
+        }
+    }
+
+    testOptions {
+        unitTests.returnDefaultValues = true
+    }
+}
+
+allprojects {
+    dependencies {
+        compile project(":lifecycle:extensions")
+        compile project(":lifecycle:runtime")
+        compile "org.reactivestreams:reactive-streams:$reactivestreams_version"
+
+        testCompile "junit:junit:$junit_version"
+        testCompile "io.reactivex.rxjava2:rxjava:$rxjava2_version"
+
+        testCompile("com.android.support.test:runner:$atsl_version") {
+            exclude module: 'support-annotations'
+        }
+        androidTestCompile "com.android.support:appcompat-v7:$support_lib_version"
+    }
+}
+
+createAndroidCheckstyle(project)
+
+uploadArchives {
+    repositories {
+        mavenDeployer {
+            repository(url : rootProject.ext.localMavenRepo)
+            pom.artifactId = "reactivestreams"
+        }
+    }
+}
diff --git a/lifecycle/reactivestreams/proguard-rules.pro b/lifecycle/reactivestreams/proguard-rules.pro
new file mode 100644
index 0000000..b7210d1
--- /dev/null
+++ b/lifecycle/reactivestreams/proguard-rules.pro
@@ -0,0 +1,17 @@
+# Add project specific ProGuard rules here.
+# By default, the flags in this file are appended to flags specified
+# in /Users/yboyar/android/sdk/tools/proguard/proguard-android.txt
+# You can edit the include path and order by changing the proguardFiles
+# directive in build.gradle.
+#
+# For more details, see
+#   http://developer.android.com/guide/developing/tools/proguard.html
+
+# Add any project specific keep options here:
+
+# If your project uses WebView with JS, uncomment the following
+# and specify the fully qualified class name to the JavaScript interface
+# class:
+#-keepclassmembers class fqcn.of.javascript.interface.for.webview {
+#   public *;
+#}
diff --git a/lifecycle/reactivestreams/src/androidTest/AndroidManifest.xml b/lifecycle/reactivestreams/src/androidTest/AndroidManifest.xml
new file mode 100644
index 0000000..8254525
--- /dev/null
+++ b/lifecycle/reactivestreams/src/androidTest/AndroidManifest.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+  ~ Copyright (C) 2017 The Android Open Source Project
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<manifest xmlns:android="http://schemas.android.com/apk/res/android"
+          package="com.android.support.lifecycle.reactivestreams.test">
+
+    <application>
+        <activity android:name="com.android.support.lifecycle.viewmodeltest.ViewModelActivity"
+                  android:theme="@style/Base.Theme.AppCompat">
+        </activity>
+    </application>
+
+</manifest>
diff --git a/lifecycle/reactivestreams/src/main/AndroidManifest.xml b/lifecycle/reactivestreams/src/main/AndroidManifest.xml
new file mode 100644
index 0000000..ea6ba2b
--- /dev/null
+++ b/lifecycle/reactivestreams/src/main/AndroidManifest.xml
@@ -0,0 +1,19 @@
+<!--
+  ~ Copyright (C) 2017 The Android Open Source Project
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<manifest xmlns:android="http://schemas.android.com/apk/res/android"
+          package="com.android.support.lifecycle.reactivestreams">
+</manifest>
diff --git a/lifecycle/reactivestreams/src/main/java/com/android/support/lifecycle/LiveDataReactiveStreams.java b/lifecycle/reactivestreams/src/main/java/com/android/support/lifecycle/LiveDataReactiveStreams.java
new file mode 100644
index 0000000..dbce0b0
--- /dev/null
+++ b/lifecycle/reactivestreams/src/main/java/com/android/support/lifecycle/LiveDataReactiveStreams.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.support.lifecycle;
+
+import android.support.annotation.Nullable;
+
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+import java.lang.ref.WeakReference;
+
+/**
+ * Adapts {@link LiveData} input and output to the ReactiveStreams spec.
+ */
+public final class LiveDataReactiveStreams {
+    private LiveDataReactiveStreams() {
+    }
+
+    /**
+     * Adapts the given {@link LiveData} stream to a ReactiveStreams {@link Publisher}.
+     *
+     * <p>
+     * By using a good publisher implementation such as RxJava 2.x Flowables, most consumers will
+     * be able to let the library deal with backpressure using operators and not need to worry about
+     * ever manually calling {@link Subscription#request}.
+     *
+     * <p>
+     * On subscription to the publisher, the observer will attach to the given {@link LiveData}.
+     * Once {@link Subscription#request) is called on the subscription object, an observer will be
+     * connected to the data stream. Calling request(Long.MAX_VALUE) is equivalent to creating an
+     * unbounded stream with no backpressure. If request with a finite count reaches 0, the observer
+     * will buffer the latest item and emit it to the subscriber when data is again requested. Any
+     * other items emitted during the time there was no backpressure requested will be dropped.
+     */
+    public static <T> Publisher<T> toPublisher(
+            final LifecycleProvider lifecycle, final LiveData<T> liveData) {
+        final Object lock = new Object();
+
+        return new Publisher<T>() {
+            boolean mObserving;
+            boolean mCanceled;
+            long mRequested;
+            @Nullable
+            T mLatest;
+
+            @Override
+            public void subscribe(final Subscriber<? super T> subscriber) {
+                final Observer<T> observer = new Observer<T>() {
+                    @Override
+                    public void onChanged(@Nullable T t) {
+                        if (mCanceled) {
+                            return;
+                        }
+                        synchronized (lock) {
+                            if (mCanceled) {
+                                return;
+                            }
+                            if (mRequested > 0) {
+                                mLatest = null;
+                                subscriber.onNext(t);
+                                if (mRequested != Long.MAX_VALUE) {
+                                    mRequested--;
+                                }
+                            } else {
+                                mLatest = t;
+                            }
+                        }
+                    }
+                };
+
+                subscriber.onSubscribe(new Subscription() {
+                    @Override
+                    public void request(long n) {
+                        if (n < 0) {
+                            return;
+                        }
+                        if (mCanceled) {
+                            return;
+                        }
+                        synchronized (lock) {
+                            if (mCanceled) {
+                                return;
+                            }
+                            // Prevent overflowage.
+                            mRequested =
+                                    mRequested + n >= mRequested ? mRequested + n : Long.MAX_VALUE;
+                            if (!mObserving) {
+                                mObserving = true;
+                                liveData.observe(lifecycle, observer);
+                            } else if (mLatest != null) {
+                                observer.onChanged(mLatest);
+                                mLatest = null;
+                            }
+                        }
+                    }
+
+                    @Override
+                    public void cancel() {
+                        if (!mCanceled) {
+                            synchronized (lock) {
+                                if (!mCanceled) {
+                                    if (mObserving) {
+                                        liveData.removeObserver(observer);
+                                        mObserving = false;
+                                    }
+                                    mLatest = null;
+                                    mCanceled = true;
+                                }
+                            }
+                        }
+                    }
+                });
+            }
+
+        };
+    }
+
+    /**
+     * Creates an Observable {@link LiveData} stream from a ReactiveStreams publisher.
+     */
+    public static <T> LiveData<T> fromPublisher(final Publisher<T> publisher) {
+        LiveData<T> liveData = new LiveData<T>();
+        // Since we don't have a way to directly observe cancels, weakly hold the live data.
+        final WeakReference<LiveData<T>> liveDataRef = new WeakReference<>(liveData);
+
+        publisher.subscribe(new Subscriber<T>() {
+            @Override
+            public void onSubscribe(Subscription s) {
+                // Don't worry about backpressure. If the stream is too noisy then backpressure can
+                // be handled upstream.
+                s.request(Long.MAX_VALUE);
+            }
+
+            @Override
+            public void onNext(T t) {
+                LiveData<T> liveData = liveDataRef.get();
+                if (liveData != null) {
+                    liveData.setValue(t);
+                }
+            }
+
+            @Override
+            public void onError(Throwable t) {
+                // Errors should be handled upstream, so propagate as a crash.
+                throw new RuntimeException(t);
+            }
+
+            @Override
+            public void onComplete() {
+            }
+        });
+
+        return liveData;
+    }
+
+}
diff --git a/lifecycle/reactivestreams/src/test/java/com/android/support/lifecycle/LiveDataReactiveStreamsTest.java b/lifecycle/reactivestreams/src/test/java/com/android/support/lifecycle/LiveDataReactiveStreamsTest.java
new file mode 100644
index 0000000..53f1fcc
--- /dev/null
+++ b/lifecycle/reactivestreams/src/test/java/com/android/support/lifecycle/LiveDataReactiveStreamsTest.java
@@ -0,0 +1,295 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.support.lifecycle;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import android.support.annotation.Nullable;
+import android.support.test.filters.SmallTest;
+
+import com.android.support.executors.AppToolkitTaskExecutor;
+import com.android.support.executors.TaskExecutor;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import io.reactivex.Flowable;
+import io.reactivex.disposables.Disposable;
+import io.reactivex.functions.Consumer;
+import io.reactivex.processors.PublishProcessor;
+import io.reactivex.processors.ReplayProcessor;
+import io.reactivex.schedulers.TestScheduler;
+import io.reactivex.subjects.AsyncSubject;
+
+@SmallTest
+public class LiveDataReactiveStreamsTest {
+    private static final Lifecycle sLifecycle = new Lifecycle() {
+        @Override
+        public void addObserver(LifecycleObserver observer) {
+        }
+
+        @Override
+        public void removeObserver(LifecycleObserver observer) {
+        }
+
+        @Override
+        public int getCurrentState() {
+            return Lifecycle.RESUMED;
+        }
+    };
+    private static final LifecycleProvider sLifecycleProvider = new LifecycleProvider() {
+
+        @Override
+        public Lifecycle getLifecycle() {
+            return sLifecycle;
+        }
+
+    };
+
+    private final List<String> mLiveDataOutput = new ArrayList<>();
+    private final Observer<String> mObserver = new Observer<String>() {
+        @Override
+        public void onChanged(@Nullable String s) {
+            mLiveDataOutput.add(s);
+        }
+    };
+
+    private final ReplayProcessor<String> mOutputProcessor = ReplayProcessor.create();
+
+    private static final TestScheduler sBackgroundScheduler = new TestScheduler();
+    private Thread mTestThread;
+
+    @Before
+    public void init() {
+        mTestThread = Thread.currentThread();
+        AppToolkitTaskExecutor.getInstance().setDelegate(new TaskExecutor() {
+
+            @Override
+            public void executeOnDiskIO(Runnable runnable) {
+                throw new IllegalStateException();
+            }
+
+            @Override
+            public void executeOnMainThread(final Runnable runnable) {
+                runnable.run();
+            }
+
+            @Override
+            public boolean isMainThread() {
+                return Thread.currentThread() == mTestThread;
+            }
+
+        });
+    }
+
+    @After
+    public void removeExecutorDelegate() {
+        AppToolkitTaskExecutor.getInstance().setDelegate(null);
+    }
+
+    @Test
+    public void convertsFromPublisher() {
+        PublishProcessor<String> processor = PublishProcessor.create();
+        LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
+
+        liveData.observe(sLifecycleProvider, mObserver);
+
+        processor.onNext("foo");
+        processor.onNext("bar");
+        processor.onNext("baz");
+
+        assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz")));
+    }
+
+    @Test
+    public void convertsFromPublisherWithMultipleObservers() {
+        final List<String> output2 = new ArrayList<>();
+        PublishProcessor<String> processor = PublishProcessor.create();
+        LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
+
+        liveData.observe(sLifecycleProvider, mObserver);
+
+        processor.onNext("foo");
+        processor.onNext("bar");
+
+        // The second mObserver should only get the newest value and any later values.
+        liveData.observe(sLifecycleProvider, new Observer<String>() {
+            @Override
+            public void onChanged(@Nullable String s) {
+                output2.add(s);
+            }
+        });
+
+        processor.onNext("baz");
+
+        assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz")));
+        assertThat(output2, is(Arrays.asList("bar", "baz")));
+    }
+
+    @Test
+    public void convertsFromAsyncPublisher() {
+        Flowable<String> input = Flowable.just("foo")
+                .concatWith(Flowable.just("bar", "baz").observeOn(sBackgroundScheduler));
+        LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(input);
+
+        liveData.observe(sLifecycleProvider, mObserver);
+
+        assertThat(mLiveDataOutput, is(Collections.singletonList("foo")));
+        sBackgroundScheduler.triggerActions();
+        assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz")));
+    }
+
+    @Test
+    public void convertsToPublisherWithSyncData() {
+        LiveData<String> liveData = new LiveData<>();
+        liveData.setValue("foo");
+        assertThat(liveData.getValue(), is("foo"));
+
+        Flowable.fromPublisher(LiveDataReactiveStreams.toPublisher(sLifecycleProvider, liveData))
+                .subscribe(mOutputProcessor);
+
+        liveData.setValue("bar");
+        liveData.setValue("baz");
+
+        assertThat(
+                mOutputProcessor.getValues(new String[]{}),
+                is(new String[] {"foo", "bar", "baz"}));
+    }
+
+    @Test
+    public void convertingToPublisherIsCancelable() {
+        LiveData<String> liveData = new LiveData<>();
+        liveData.setValue("foo");
+        assertThat(liveData.getValue(), is("foo"));
+
+        Disposable disposable = Flowable
+                .fromPublisher(LiveDataReactiveStreams.toPublisher(sLifecycleProvider, liveData))
+                .subscribe(new Consumer<String>() {
+                    @Override
+                    public void accept(String s) throws Exception {
+                        mLiveDataOutput.add(s);
+                    }
+                });
+
+        liveData.setValue("bar");
+        liveData.setValue("baz");
+
+        assertThat(liveData.getObserverCount(), is(1));
+        disposable.dispose();
+
+        liveData.setValue("fizz");
+        liveData.setValue("buzz");
+
+        assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz")));
+        // Canceling disposable should also remove livedata mObserver.
+        assertThat(liveData.getObserverCount(), is(0));
+    }
+
+    @Test
+    public void convertsToPublisherWithBackpressure() {
+        LiveData<String> liveData = new LiveData<>();
+
+        final AsyncSubject<Subscription> subscriptionSubject = AsyncSubject.create();
+
+        Flowable.fromPublisher(LiveDataReactiveStreams.toPublisher(sLifecycleProvider, liveData))
+                .subscribe(new Subscriber<String>() {
+                    @Override
+                    public void onSubscribe(Subscription s) {
+                        subscriptionSubject.onNext(s);
+                        subscriptionSubject.onComplete();
+                    }
+
+                    @Override
+                    public void onNext(String s) {
+                        mOutputProcessor.onNext(s);
+                    }
+
+                    @Override
+                    public void onError(Throwable t) {
+                        throw new RuntimeException(t);
+                    }
+
+                    @Override
+                    public void onComplete() {
+                    }
+                });
+
+        // Subscription should have happened synchronously. If it didn't, this will deadlock.
+        final Subscription subscription = subscriptionSubject.blockingSingle();
+
+        subscription.request(1);
+        assertThat(mOutputProcessor.getValues(new String[]{}), is(new String[] {}));
+
+        liveData.setValue("foo");
+        assertThat(mOutputProcessor.getValues(new String[]{}), is(new String[] {"foo"}));
+
+        subscription.request(2);
+        liveData.setValue("baz");
+        liveData.setValue("fizz");
+
+        assertThat(
+                mOutputProcessor.getValues(new String[]{}),
+                is(new String[] {"foo", "baz", "fizz"}));
+
+        // 'nyan' will be dropped as there is nothing currently requesting a stream.
+        liveData.setValue("nyan");
+        liveData.setValue("cat");
+
+        assertThat(
+                mOutputProcessor.getValues(new String[]{}),
+                is(new String[] {"foo", "baz", "fizz"}));
+
+        // When a new request comes in, the latest value will be pushed.
+        subscription.request(1);
+        assertThat(
+                mOutputProcessor.getValues(new String[]{}),
+                is(new String[] {"foo", "baz", "fizz", "cat"}));
+    }
+
+    @Test
+    public void convertsToPublisherWithAsyncData() {
+        LiveData<String> liveData = new LiveData<>();
+
+        Flowable.fromPublisher(LiveDataReactiveStreams.toPublisher(sLifecycleProvider, liveData))
+                .observeOn(sBackgroundScheduler)
+                .subscribe(mOutputProcessor);
+
+        liveData.setValue("foo");
+
+        assertThat(mOutputProcessor.getValues(new String[]{}), is(new String[] {}));
+        sBackgroundScheduler.triggerActions();
+        assertThat(mOutputProcessor.getValues(new String[]{}), is(new String[] {"foo"}));
+
+        liveData.setValue("bar");
+        liveData.setValue("baz");
+
+        assertThat(mOutputProcessor.getValues(new String[]{}), is(new String[] {"foo"}));
+        sBackgroundScheduler.triggerActions();
+        assertThat(mOutputProcessor.getValues(
+                new String[]{}),
+                is(new String[] {"foo", "bar", "baz"}));
+    }
+}