Merge "Import EncryptedFullBackupDataProcessor"
diff --git a/packages/BackupEncryption/src/com/android/server/backup/encryption/StreamUtils.java b/packages/BackupEncryption/src/com/android/server/backup/encryption/StreamUtils.java
index 91b2926..66be25b 100644
--- a/packages/BackupEncryption/src/com/android/server/backup/encryption/StreamUtils.java
+++ b/packages/BackupEncryption/src/com/android/server/backup/encryption/StreamUtils.java
@@ -18,9 +18,13 @@
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 
 /** Utility methods for dealing with Streams */
 public class StreamUtils {
+    private static final int MAX_COPY_BUFFER_SIZE = 1024; // 1k copy buffer size.
+
     /**
      * Close a Closeable and silently ignore any IOExceptions.
      *
@@ -33,4 +37,28 @@
             // Silently ignore
         }
     }
+
+    /**
+     * Copy data from an InputStream to an OutputStream upto a given number of bytes.
+     *
+     * @param in The source InputStream
+     * @param out The destination OutputStream
+     * @param limit The maximum number of bytes to copy
+     * @throws IOException Thrown if there is a problem performing the copy.
+     */
+    public static void copyStream(InputStream in, OutputStream out, int limit) throws IOException {
+        int bufferSize = Math.min(MAX_COPY_BUFFER_SIZE, limit);
+        byte[] buffer = new byte[bufferSize];
+
+        int copied = 0;
+        while (copied < limit) {
+            int maxReadSize = Math.min(bufferSize, limit - copied);
+            int read = in.read(buffer, 0, maxReadSize);
+            if (read < 0) {
+                return; // Reached the stream end before the limit
+            }
+            out.write(buffer, 0, read);
+            copied += read;
+        }
+    }
 }
diff --git a/packages/BackupEncryption/src/com/android/server/backup/encryption/tasks/EncryptedFullBackupDataProcessor.java b/packages/BackupEncryption/src/com/android/server/backup/encryption/tasks/EncryptedFullBackupDataProcessor.java
new file mode 100644
index 0000000..0baec8b
--- /dev/null
+++ b/packages/BackupEncryption/src/com/android/server/backup/encryption/tasks/EncryptedFullBackupDataProcessor.java
@@ -0,0 +1,210 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.server.backup.encryption.tasks;
+
+import static com.android.internal.util.Preconditions.checkNotNull;
+import static com.android.internal.util.Preconditions.checkState;
+
+import android.annotation.Nullable;
+import android.app.backup.BackupTransport;
+import android.content.Context;
+import android.util.Slog;
+
+import com.android.server.backup.encryption.FullBackupDataProcessor;
+import com.android.server.backup.encryption.StreamUtils;
+import com.android.server.backup.encryption.client.CryptoBackupServer;
+import com.android.server.backup.encryption.keys.RecoverableKeyStoreSecondaryKey;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.security.SecureRandom;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+/**
+ * Accepts backup data from a {@link InputStream} and passes it to the encrypted full data backup
+ * path.
+ */
+public class EncryptedFullBackupDataProcessor implements FullBackupDataProcessor {
+
+    private static final String TAG = "EncryptedFullBackupDP";
+
+    private final Context mContext;
+    private final ExecutorService mExecutorService;
+    private final CryptoBackupServer mCryptoBackupServer;
+    private final SecureRandom mSecureRandom;
+    private final RecoverableKeyStoreSecondaryKey mSecondaryKey;
+    private final String mPackageName;
+
+    @Nullable private InputStream mInputStream;
+    @Nullable private PipedOutputStream mOutputStream;
+    @Nullable private EncryptedFullBackupTask mBackupTask;
+    @Nullable private Future<Void> mBackupTaskFuture;
+    @Nullable private FullBackupCallbacks mFullBackupCallbacks;
+
+    public EncryptedFullBackupDataProcessor(
+            Context context,
+            ExecutorService executorService,
+            CryptoBackupServer cryptoBackupServer,
+            SecureRandom secureRandom,
+            RecoverableKeyStoreSecondaryKey secondaryKey,
+            String packageName) {
+        mContext = checkNotNull(context);
+        mExecutorService = checkNotNull(executorService);
+        mCryptoBackupServer = checkNotNull(cryptoBackupServer);
+        mSecureRandom = checkNotNull(secureRandom);
+        mSecondaryKey = checkNotNull(secondaryKey);
+        mPackageName = checkNotNull(packageName);
+    }
+
+    @Override
+    public boolean initiate(InputStream inputStream) throws IOException {
+        checkState(mBackupTask == null, "initiate() twice");
+
+        this.mInputStream = inputStream;
+        mOutputStream = new PipedOutputStream();
+
+        mBackupTask =
+                EncryptedFullBackupTask.newInstance(
+                        mContext,
+                        mCryptoBackupServer,
+                        mSecureRandom,
+                        mSecondaryKey,
+                        mPackageName,
+                        new PipedInputStream(mOutputStream));
+
+        return true;
+    }
+
+    @Override
+    public void start() {
+        checkState(mBackupTask != null, "start() before initiate()");
+        mBackupTaskFuture = mExecutorService.submit(mBackupTask);
+    }
+
+    @Override
+    public int pushData(int numBytes) {
+        checkState(
+                mBackupTaskFuture != null && mInputStream != null && mOutputStream != null,
+                "pushData() before start()");
+
+        // If the upload has failed then stop without pushing any more bytes.
+        if (mBackupTaskFuture.isDone()) {
+            Optional<Exception> exception = getTaskException();
+            Slog.e(TAG, "Encrypted upload failed", exception.orElse(null));
+            if (exception.isPresent()) {
+                reportNetworkFailureIfNecessary(exception.get());
+
+                if (exception.get().getCause() instanceof SizeQuotaExceededException) {
+                    return BackupTransport.TRANSPORT_QUOTA_EXCEEDED;
+                }
+            }
+
+            return BackupTransport.TRANSPORT_ERROR;
+        }
+
+        try {
+            StreamUtils.copyStream(mInputStream, mOutputStream, numBytes);
+        } catch (IOException e) {
+            Slog.e(TAG, "IOException when processing backup", e);
+            return BackupTransport.TRANSPORT_ERROR;
+        }
+
+        return BackupTransport.TRANSPORT_OK;
+    }
+
+    @Override
+    public void cancel() {
+        checkState(mBackupTaskFuture != null && mBackupTask != null, "cancel() before start()");
+        mBackupTask.cancel();
+        closeStreams();
+    }
+
+    @Override
+    public int finish() {
+        checkState(mBackupTaskFuture != null, "finish() before start()");
+
+        // getTaskException() waits for the task to finish. We must close the streams first, which
+        // causes the task to finish, otherwise it will block forever.
+        closeStreams();
+        Optional<Exception> exception = getTaskException();
+
+        if (exception.isPresent()) {
+            Slog.e(TAG, "Exception during encrypted full backup", exception.get());
+            reportNetworkFailureIfNecessary(exception.get());
+
+            if (exception.get().getCause() instanceof SizeQuotaExceededException) {
+                return BackupTransport.TRANSPORT_QUOTA_EXCEEDED;
+            }
+            return BackupTransport.TRANSPORT_ERROR;
+
+        } else {
+            if (mFullBackupCallbacks != null) {
+                mFullBackupCallbacks.onSuccess();
+            }
+
+            return BackupTransport.TRANSPORT_OK;
+        }
+    }
+
+    private void closeStreams() {
+        StreamUtils.closeQuietly(mInputStream);
+        StreamUtils.closeQuietly(mOutputStream);
+    }
+
+    @Override
+    public void handleCheckSizeRejectionZeroBytes() {
+        cancel();
+    }
+
+    @Override
+    public void handleCheckSizeRejectionQuotaExceeded() {
+        cancel();
+    }
+
+    @Override
+    public void handleSendBytesQuotaExceeded() {
+        cancel();
+    }
+
+    @Override
+    public void attachCallbacks(FullBackupCallbacks fullBackupCallbacks) {
+        this.mFullBackupCallbacks = fullBackupCallbacks;
+    }
+
+    private void reportNetworkFailureIfNecessary(Exception exception) {
+        if (!(exception.getCause() instanceof SizeQuotaExceededException)
+                && mFullBackupCallbacks != null) {
+            mFullBackupCallbacks.onTransferFailed();
+        }
+    }
+
+    private Optional<Exception> getTaskException() {
+        if (mBackupTaskFuture != null) {
+            try {
+                mBackupTaskFuture.get();
+            } catch (InterruptedException | ExecutionException e) {
+                return Optional.of(e);
+            }
+        }
+        return Optional.empty();
+    }
+}
diff --git a/packages/BackupEncryption/test/robolectric/src/com/android/server/backup/encryption/StreamUtilsTest.java b/packages/BackupEncryption/test/robolectric/src/com/android/server/backup/encryption/StreamUtilsTest.java
new file mode 100644
index 0000000..a95e87e
--- /dev/null
+++ b/packages/BackupEncryption/test/robolectric/src/com/android/server/backup/encryption/StreamUtilsTest.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.server.backup.encryption;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.robolectric.RobolectricTestRunner;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+@RunWith(RobolectricTestRunner.class)
+public class StreamUtilsTest {
+    private static final int SOURCE_DATA_SIZE = 64;
+
+    private byte[] mSourceData;
+
+    private InputStream mSource;
+    private ByteArrayOutputStream mDestination;
+
+    @Before
+    public void setUp() {
+        mSourceData = new byte[SOURCE_DATA_SIZE];
+        for (byte i = 0; i < SOURCE_DATA_SIZE; i++) {
+            mSourceData[i] = i;
+        }
+        mSource = new ByteArrayInputStream(mSourceData);
+        mDestination = new ByteArrayOutputStream();
+    }
+
+    @Test
+    public void copyStream_copiesAllBytesIfAsked() throws IOException {
+        StreamUtils.copyStream(mSource, mDestination, mSourceData.length);
+        assertOutputHasBytes(mSourceData.length);
+    }
+
+    @Test
+    public void copyStream_stopsShortIfAsked() throws IOException {
+        StreamUtils.copyStream(mSource, mDestination, mSourceData.length - 10);
+        assertOutputHasBytes(mSourceData.length - 10);
+    }
+
+    @Test
+    public void copyStream_stopsShortIfAskedToCopyMoreThanAvailable() throws IOException {
+        StreamUtils.copyStream(mSource, mDestination, mSourceData.length + 10);
+        assertOutputHasBytes(mSourceData.length);
+    }
+
+    private void assertOutputHasBytes(int count) {
+        byte[] output = mDestination.toByteArray();
+        assertThat(output.length).isEqualTo(count);
+        for (int i = 0; i < count; i++) {
+            assertThat(output[i]).isEqualTo(mSourceData[i]);
+        }
+    }
+}
diff --git a/packages/BackupEncryption/test/robolectric/src/com/android/server/backup/encryption/tasks/EncryptedFullBackupDataProcessorTest.java b/packages/BackupEncryption/test/robolectric/src/com/android/server/backup/encryption/tasks/EncryptedFullBackupDataProcessorTest.java
new file mode 100644
index 0000000..675d03f
--- /dev/null
+++ b/packages/BackupEncryption/test/robolectric/src/com/android/server/backup/encryption/tasks/EncryptedFullBackupDataProcessorTest.java
@@ -0,0 +1,387 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.server.backup.encryption.tasks;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertThrows;
+
+import android.annotation.Nullable;
+import android.app.backup.BackupTransport;
+import android.platform.test.annotations.Presubmit;
+
+import androidx.test.core.app.ApplicationProvider;
+
+import com.android.server.backup.encryption.FullBackupDataProcessor;
+import com.android.server.backup.encryption.chunking.ProtoStore;
+import com.android.server.backup.encryption.client.CryptoBackupServer;
+import com.android.server.backup.encryption.keys.RecoverableKeyStoreSecondaryKey;
+import com.android.server.backup.encryption.keys.TertiaryKeyManager;
+import com.android.server.backup.encryption.protos.nano.ChunksMetadataProto;
+import com.android.server.backup.encryption.testing.QueuingNonAutomaticExecutorService;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.primitives.Bytes;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.robolectric.RobolectricTestRunner;
+import org.robolectric.annotation.Config;
+import org.robolectric.annotation.Implementation;
+import org.robolectric.annotation.Implements;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.GeneralSecurityException;
+import java.security.SecureRandom;
+
+import javax.crypto.spec.SecretKeySpec;
+
+@RunWith(RobolectricTestRunner.class)
+@Presubmit
+@Config(
+        shadows = {
+            EncryptedFullBackupDataProcessorTest.ShadowEncryptedFullBackupTask.class,
+        })
+public class EncryptedFullBackupDataProcessorTest {
+
+    private static final String KEY_GENERATOR_ALGORITHM = "AES";
+
+    private static final String TEST_PACKAGE = "com.example.app1";
+    private static final byte[] TEST_DATA_1 = {1, 2, 3, 4};
+    private static final byte[] TEST_DATA_2 = {5, 6, 7, 8};
+
+    private final RecoverableKeyStoreSecondaryKey mTestSecondaryKey =
+            new RecoverableKeyStoreSecondaryKey(
+                    /*alias=*/ "test_key",
+                    new SecretKeySpec(
+                            new byte[] {
+                                1, 2, 3,
+                            },
+                            KEY_GENERATOR_ALGORITHM));
+
+    private QueuingNonAutomaticExecutorService mExecutorService;
+    private FullBackupDataProcessor mFullBackupDataProcessor;
+    @Mock private FullBackupDataProcessor.FullBackupCallbacks mFullBackupCallbacks;
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+
+        mExecutorService = new QueuingNonAutomaticExecutorService();
+        mFullBackupDataProcessor =
+                new EncryptedFullBackupDataProcessor(
+                        ApplicationProvider.getApplicationContext(),
+                        mExecutorService,
+                        mock(CryptoBackupServer.class),
+                        new SecureRandom(),
+                        mTestSecondaryKey,
+                        TEST_PACKAGE);
+    }
+
+    @After
+    public void tearDown() {
+        ShadowEncryptedFullBackupTask.reset();
+    }
+
+    @Test
+    public void initiate_callTwice_throws() throws Exception {
+        mFullBackupDataProcessor.initiate(new ByteArrayInputStream(new byte[10]));
+
+        assertThrows(
+                IllegalStateException.class,
+                () -> mFullBackupDataProcessor.initiate(new ByteArrayInputStream(new byte[10])));
+    }
+
+    @Test
+    public void pushData_writesDataToTask() throws Exception {
+        byte[] inputData = Bytes.concat(TEST_DATA_1, TEST_DATA_2);
+
+        mFullBackupDataProcessor.initiate(new ByteArrayInputStream(inputData));
+        mFullBackupDataProcessor.start();
+        mFullBackupDataProcessor.pushData(TEST_DATA_1.length);
+        mFullBackupDataProcessor.pushData(TEST_DATA_2.length);
+        finishBackupTask();
+        mFullBackupDataProcessor.finish();
+
+        byte[] result = ByteStreams.toByteArray(ShadowEncryptedFullBackupTask.sInputStream);
+        assertThat(result).isEqualTo(Bytes.concat(TEST_DATA_1, TEST_DATA_2));
+    }
+
+    @Test
+    public void pushData_noError_returnsOk() throws Exception {
+        mFullBackupDataProcessor.initiate(new ByteArrayInputStream(TEST_DATA_1));
+        mFullBackupDataProcessor.start();
+        int result = mFullBackupDataProcessor.pushData(TEST_DATA_1.length);
+        finishBackupTask();
+        mFullBackupDataProcessor.finish();
+
+        assertThat(result).isEqualTo(BackupTransport.TRANSPORT_OK);
+    }
+
+    @Test
+    public void pushData_ioExceptionOnCopy_returnsError() throws Exception {
+        mFullBackupDataProcessor.initiate(new ByteArrayInputStream(TEST_DATA_1));
+        mFullBackupDataProcessor.start();
+
+        // Close the stream so there's an IO error when the processor tries to write to it.
+        ShadowEncryptedFullBackupTask.sInputStream.close();
+        int result = mFullBackupDataProcessor.pushData(TEST_DATA_1.length);
+
+        finishBackupTask();
+        mFullBackupDataProcessor.finish();
+
+        assertThat(result).isEqualTo(BackupTransport.TRANSPORT_ERROR);
+    }
+
+    @Test
+    public void pushData_exceptionDuringUpload_returnsError() throws Exception {
+        byte[] inputData = Bytes.concat(TEST_DATA_1, TEST_DATA_2);
+
+        mFullBackupDataProcessor.initiate(new ByteArrayInputStream(inputData));
+        mFullBackupDataProcessor.start();
+        mFullBackupDataProcessor.pushData(TEST_DATA_1.length);
+        finishBackupTaskWithException(new IOException("Test exception"));
+        int result = mFullBackupDataProcessor.pushData(TEST_DATA_2.length);
+
+        assertThat(result).isEqualTo(BackupTransport.TRANSPORT_ERROR);
+    }
+
+    @Test
+    public void pushData_quotaExceptionDuringUpload_doesNotLogAndReturnsQuotaExceeded()
+            throws Exception {
+        mFullBackupDataProcessor.attachCallbacks(mFullBackupCallbacks);
+        byte[] inputData = Bytes.concat(TEST_DATA_1, TEST_DATA_2);
+
+        mFullBackupDataProcessor.initiate(new ByteArrayInputStream(inputData));
+        mFullBackupDataProcessor.start();
+        mFullBackupDataProcessor.pushData(TEST_DATA_1.length);
+        finishBackupTaskWithException(new SizeQuotaExceededException());
+        int result = mFullBackupDataProcessor.pushData(TEST_DATA_2.length);
+
+        assertThat(result).isEqualTo(BackupTransport.TRANSPORT_QUOTA_EXCEEDED);
+
+        verify(mFullBackupCallbacks, never()).onSuccess();
+        verify(mFullBackupCallbacks, never())
+                .onTransferFailed(); // FullBackupSession will handle this.
+    }
+
+    @Test
+    public void pushData_unexpectedEncryptedBackup_logs() throws Exception {
+        byte[] inputData = Bytes.concat(TEST_DATA_1, TEST_DATA_2);
+
+        mFullBackupDataProcessor.initiate(new ByteArrayInputStream(inputData));
+        mFullBackupDataProcessor.start();
+        mFullBackupDataProcessor.pushData(TEST_DATA_1.length);
+        finishBackupTaskWithException(new GeneralSecurityException());
+        int result = mFullBackupDataProcessor.pushData(TEST_DATA_2.length);
+
+        assertThat(result).isEqualTo(BackupTransport.TRANSPORT_ERROR);
+    }
+
+    @Test
+    public void pushData_permanentExceptionDuringUpload_callsErrorCallback() throws Exception {
+        mFullBackupDataProcessor.attachCallbacks(mFullBackupCallbacks);
+        byte[] inputData = Bytes.concat(TEST_DATA_1, TEST_DATA_2);
+
+        mFullBackupDataProcessor.initiate(new ByteArrayInputStream(inputData));
+        mFullBackupDataProcessor.start();
+        mFullBackupDataProcessor.pushData(TEST_DATA_1.length);
+        finishBackupTaskWithException(new IOException());
+        mFullBackupDataProcessor.pushData(TEST_DATA_2.length);
+
+        verify(mFullBackupCallbacks, never()).onSuccess();
+        verify(mFullBackupCallbacks).onTransferFailed();
+    }
+
+    @Test
+    public void pushData_beforeInitiate_throws() {
+        assertThrows(
+                IllegalStateException.class,
+                () -> mFullBackupDataProcessor.pushData(/*numBytes=*/ 10));
+    }
+
+    @Test
+    public void cancel_cancelsTask() throws Exception {
+        mFullBackupDataProcessor.initiate(new ByteArrayInputStream(TEST_DATA_1));
+        mFullBackupDataProcessor.start();
+        mFullBackupDataProcessor.pushData(TEST_DATA_1.length);
+        mFullBackupDataProcessor.cancel();
+
+        assertThat(ShadowEncryptedFullBackupTask.sCancelled).isTrue();
+    }
+
+    @Test
+    public void cancel_beforeInitiate_throws() {
+        assertThrows(IllegalStateException.class, () -> mFullBackupDataProcessor.cancel());
+    }
+
+    @Test
+    public void finish_noException_returnsTransportOk() throws Exception {
+        mFullBackupDataProcessor.initiate(new ByteArrayInputStream(TEST_DATA_1));
+        mFullBackupDataProcessor.start();
+        mFullBackupDataProcessor.pushData(TEST_DATA_1.length);
+        finishBackupTask();
+        int result = mFullBackupDataProcessor.finish();
+
+        assertThat(result).isEqualTo(BackupTransport.TRANSPORT_OK);
+    }
+
+    @Test
+    public void finish_exceptionDuringUpload_returnsTransportError() throws Exception {
+        mFullBackupDataProcessor.initiate(new ByteArrayInputStream(TEST_DATA_1));
+        mFullBackupDataProcessor.start();
+        mFullBackupDataProcessor.pushData(TEST_DATA_1.length);
+        finishBackupTaskWithException(new IOException("Test exception"));
+        int result = mFullBackupDataProcessor.finish();
+
+        assertThat(result).isEqualTo(BackupTransport.TRANSPORT_ERROR);
+    }
+
+    @Test
+    public void finish_successfulBackup_callsSuccessCallback() throws Exception {
+        mFullBackupDataProcessor.attachCallbacks(mFullBackupCallbacks);
+
+        mFullBackupDataProcessor.initiate(new ByteArrayInputStream(TEST_DATA_1));
+        mFullBackupDataProcessor.start();
+        mFullBackupDataProcessor.pushData(TEST_DATA_1.length);
+        finishBackupTask();
+        mFullBackupDataProcessor.finish();
+
+        verify(mFullBackupCallbacks).onSuccess();
+        verify(mFullBackupCallbacks, never()).onTransferFailed();
+    }
+
+    @Test
+    public void finish_backupFailedWithPermanentError_callsErrorCallback() throws Exception {
+        mFullBackupDataProcessor.attachCallbacks(mFullBackupCallbacks);
+
+        mFullBackupDataProcessor.initiate(new ByteArrayInputStream(TEST_DATA_1));
+        mFullBackupDataProcessor.start();
+        mFullBackupDataProcessor.pushData(TEST_DATA_1.length);
+        finishBackupTaskWithException(new IOException());
+        mFullBackupDataProcessor.finish();
+
+        verify(mFullBackupCallbacks, never()).onSuccess();
+        verify(mFullBackupCallbacks).onTransferFailed();
+    }
+
+    @Test
+    public void finish_backupFailedWithQuotaException_doesNotCallbackAndReturnsQuotaExceeded()
+            throws Exception {
+        mFullBackupDataProcessor.attachCallbacks(mFullBackupCallbacks);
+
+        mFullBackupDataProcessor.initiate(new ByteArrayInputStream(TEST_DATA_1));
+        mFullBackupDataProcessor.start();
+        mFullBackupDataProcessor.pushData(TEST_DATA_1.length);
+        finishBackupTaskWithException(new SizeQuotaExceededException());
+        int result = mFullBackupDataProcessor.finish();
+
+        assertThat(result).isEqualTo(BackupTransport.TRANSPORT_QUOTA_EXCEEDED);
+        verify(mFullBackupCallbacks, never()).onSuccess();
+        verify(mFullBackupCallbacks, never())
+                .onTransferFailed(); // FullBackupSession will handle this.
+    }
+
+    @Test
+    public void finish_beforeInitiate_throws() {
+        assertThrows(IllegalStateException.class, () -> mFullBackupDataProcessor.finish());
+    }
+
+    @Test
+    public void handleCheckSizeRejectionZeroBytes_cancelsTask() throws Exception {
+        mFullBackupDataProcessor.initiate(new ByteArrayInputStream(new byte[10]));
+        mFullBackupDataProcessor.start();
+        mFullBackupDataProcessor.handleCheckSizeRejectionZeroBytes();
+
+        assertThat(ShadowEncryptedFullBackupTask.sCancelled).isTrue();
+    }
+
+    @Test
+    public void handleCheckSizeRejectionQuotaExceeded_cancelsTask() throws Exception {
+        mFullBackupDataProcessor.initiate(new ByteArrayInputStream(TEST_DATA_1));
+        mFullBackupDataProcessor.start();
+        mFullBackupDataProcessor.pushData(TEST_DATA_1.length);
+        mFullBackupDataProcessor.handleCheckSizeRejectionQuotaExceeded();
+
+        assertThat(ShadowEncryptedFullBackupTask.sCancelled).isTrue();
+    }
+
+    @Test
+    public void handleSendBytesQuotaExceeded_cancelsTask() throws Exception {
+        mFullBackupDataProcessor.initiate(new ByteArrayInputStream(TEST_DATA_1));
+        mFullBackupDataProcessor.start();
+        mFullBackupDataProcessor.pushData(TEST_DATA_1.length);
+        mFullBackupDataProcessor.handleSendBytesQuotaExceeded();
+
+        assertThat(ShadowEncryptedFullBackupTask.sCancelled).isTrue();
+    }
+
+    private void finishBackupTask() {
+        mExecutorService.runNext();
+    }
+
+    private void finishBackupTaskWithException(Exception exception) {
+        ShadowEncryptedFullBackupTask.sOnCallException = exception;
+        finishBackupTask();
+    }
+
+    @Implements(EncryptedFullBackupTask.class)
+    public static class ShadowEncryptedFullBackupTask {
+
+        private static InputStream sInputStream;
+        @Nullable private static Exception sOnCallException;
+        private static boolean sCancelled;
+
+        public void __constructor__(
+                ProtoStore<ChunksMetadataProto.ChunkListing> chunkListingStore,
+                TertiaryKeyManager tertiaryKeyManager,
+                EncryptedBackupTask task,
+                InputStream inputStream,
+                String packageName,
+                SecureRandom secureRandom) {
+            sInputStream = inputStream;
+        }
+
+        @Implementation
+        public Void call() throws Exception {
+            if (sOnCallException != null) {
+                throw sOnCallException;
+            }
+
+            return null;
+        }
+
+        @Implementation
+        public void cancel() {
+            sCancelled = true;
+        }
+
+        public static void reset() {
+            sOnCallException = null;
+            sCancelled = false;
+        }
+    }
+}
diff --git a/packages/BackupEncryption/test/robolectric/src/com/android/server/backup/encryption/testing/QueuingNonAutomaticExecutorService.java b/packages/BackupEncryption/test/robolectric/src/com/android/server/backup/encryption/testing/QueuingNonAutomaticExecutorService.java
new file mode 100644
index 0000000..9d2272e
--- /dev/null
+++ b/packages/BackupEncryption/test/robolectric/src/com/android/server/backup/encryption/testing/QueuingNonAutomaticExecutorService.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.server.backup.encryption.testing;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * ExecutorService which needs to be stepped through the jobs in its' queue.
+ *
+ * <p>This is a deliberately simple implementation because it's only used in testing. The queued
+ * jobs are run on the main thread to eliminate any race condition bugs.
+ */
+public class QueuingNonAutomaticExecutorService extends AbstractExecutorService {
+
+    private List<Runnable> mWaitingJobs = new ArrayList<>();
+    private int mWaitingJobCount = 0;
+
+    @Override
+    public void shutdown() {
+        mWaitingJobCount = mWaitingJobs.size();
+        mWaitingJobs = null; // This will force an error if jobs are submitted after shutdown
+    }
+
+    @Override
+    public List<Runnable> shutdownNow() {
+        List<Runnable> queuedJobs = mWaitingJobs;
+        shutdown();
+        return queuedJobs;
+    }
+
+    @Override
+    public boolean isShutdown() {
+        return mWaitingJobs == null;
+    }
+
+    @Override
+    public boolean isTerminated() {
+        return mWaitingJobs == null && mWaitingJobCount == 0;
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+        long expiry = System.currentTimeMillis() + unit.toMillis(timeout);
+        for (Runnable job : mWaitingJobs) {
+            if (System.currentTimeMillis() > expiry) {
+                return false;
+            }
+
+            job.run();
+        }
+        return true;
+    }
+
+    @Override
+    public void execute(Runnable command) {
+        mWaitingJobs.add(command);
+    }
+
+    public void runNext() {
+        if (mWaitingJobs.isEmpty()) {
+            throw new IllegalStateException("Attempted to run jobs on an empty paused executor");
+        }
+
+        mWaitingJobs.remove(0).run();
+    }
+}