Merge "Extract RunUtil interrupt methods."
diff --git a/src/com/android/tradefed/command/CommandInterrupter.java b/src/com/android/tradefed/command/CommandInterrupter.java
new file mode 100644
index 0000000..78b86bd
--- /dev/null
+++ b/src/com/android/tradefed/command/CommandInterrupter.java
@@ -0,0 +1,180 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.android.tradefed.command;
+
+import com.android.tradefed.log.LogUtil.CLog;
+import com.android.tradefed.util.RunInterruptedException;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+
+/** Service allowing TradeFederation commands to be interrupted or marked as uninterruptible. */
+public class CommandInterrupter {
+
+ /** Singleton. */
+ public static final CommandInterrupter INSTANCE = new CommandInterrupter();
+
+ private Map<Thread, Boolean> mMapIsInterruptAllowed = new HashMap<>();
+ private Map<Thread, String> mMapInterruptThreads = new HashMap<>();
+ private Map<Thread, Timer> mWatchdogInterrupt = new HashMap<>();
+
+ @VisibleForTesting
+ // FIXME: reduce visibility once RunUtil interrupt tests are removed
+ public CommandInterrupter() {}
+
+ /** Remove the thread that are not alive anymore from our tracking to keep the list small. */
+ private void cleanInterruptStateThreadMap() {
+ synchronized (mMapIsInterruptAllowed) {
+ for (Iterator<Thread> iterator = mMapIsInterruptAllowed.keySet().iterator();
+ iterator.hasNext();
+ ) {
+ Thread t = iterator.next();
+ if (!t.isAlive()) {
+ iterator.remove();
+ }
+ }
+ }
+ }
+
+ /**
+ * Allows/disallows run interrupts on the current thread. If it is allowed, run operations of
+ * the current thread can be interrupted from other threads via {@link #interrupt} method.
+ *
+ * @param allow whether to allow run interrupts on the current thread.
+ */
+ public void allowInterrupt(boolean allow) {
+ CLog.d("run interrupt allowed: %s", allow);
+ synchronized (mMapIsInterruptAllowed) {
+ mMapIsInterruptAllowed.put(Thread.currentThread(), allow);
+ }
+ checkInterrupted();
+ }
+
+ /**
+ * Give the interrupt status of the RunUtil.
+ *
+ * @return true if the Run can be interrupted, false otherwise.
+ */
+ public boolean isInterruptAllowed() {
+ synchronized (mMapIsInterruptAllowed) {
+ if (mMapIsInterruptAllowed.get(Thread.currentThread()) == null) {
+ // We don't add in this case to keep the map relatively small.
+ return false;
+ }
+ return mMapIsInterruptAllowed.get(Thread.currentThread());
+ }
+ }
+
+ /**
+ * Set as interruptible after some waiting time. {@link CommandScheduler#shutdownHard()} to
+ * enforce we terminate eventually.
+ *
+ * @param thread the thread that will become interruptible.
+ * @param timeMs time to wait before setting interruptible.
+ */
+ // FIXME: reduce visibility once RunUtil interrupt methods are removed
+ public void setInterruptibleInFuture(Thread thread, final long timeMs) {
+ CLog.w("Setting future interruption in %s ms", timeMs);
+ synchronized (mMapIsInterruptAllowed) {
+ if (Boolean.TRUE.equals(mMapIsInterruptAllowed.get(thread))) {
+ CLog.v("Thread is already interruptible. setInterruptibleInFuture is inop.");
+ return;
+ }
+ }
+ Timer timer = new Timer(true);
+ synchronized (mWatchdogInterrupt) {
+ mWatchdogInterrupt.put(thread, timer);
+ }
+ timer.schedule(new InterruptTask(thread), timeMs);
+ }
+
+ /**
+ * Interrupts the ongoing/forthcoming run operations on the given thread. The run operations on
+ * the given thread will throw {@link RunInterruptedException}.
+ *
+ * @param thread
+ * @param message the message for {@link RunInterruptedException}.
+ */
+ // FIXME: reduce visibility once RunUtil interrupt methods are removed
+ public synchronized void interrupt(Thread thread, String message) {
+ if (message == null) {
+ throw new IllegalArgumentException("message cannot be null.");
+ }
+ mMapInterruptThreads.put(thread, message);
+ checkInterrupted();
+ }
+
+ public synchronized void checkInterrupted() {
+ // Keep the map of thread's state clean of dead threads.
+ this.cleanInterruptStateThreadMap();
+
+ final Thread thread = Thread.currentThread();
+ if (isInterruptAllowed()) {
+ final String message = mMapInterruptThreads.remove(thread);
+ if (message != null) {
+ thread.interrupt();
+ throw new RunInterruptedException(message);
+ }
+ }
+ }
+
+ /** Allow to stop the Timer Thread for the run util instance if started. */
+ @VisibleForTesting
+ // FIXME: reduce visibility once RunUtil interrupt tests are removed
+ public void terminateTimer() {
+ if (mWatchdogInterrupt != null && !mWatchdogInterrupt.isEmpty()) {
+ for (Timer t : mWatchdogInterrupt.values()) {
+ t.purge();
+ t.cancel();
+ }
+ }
+ }
+
+ /** Timer that will execute a interrupt on the Thread registered. */
+ private class InterruptTask extends TimerTask {
+
+ private Thread mToInterrupt = null;
+
+ public InterruptTask(Thread t) {
+ mToInterrupt = t;
+ }
+
+ @Override
+ public void run() {
+ if (mToInterrupt != null) {
+ synchronized (mWatchdogInterrupt) {
+ // Ensure that the timer associated with the task is cancelled too.
+ mWatchdogInterrupt.get(mToInterrupt).cancel();
+ }
+
+ CLog.e("Interrupting with TimerTask");
+ synchronized (mMapIsInterruptAllowed) {
+ mMapIsInterruptAllowed.put(mToInterrupt, true);
+ }
+ mToInterrupt.interrupt();
+
+ synchronized (mWatchdogInterrupt) {
+ mWatchdogInterrupt.remove(mToInterrupt);
+ }
+ }
+ }
+ }
+}
diff --git a/src/com/android/tradefed/util/RunUtil.java b/src/com/android/tradefed/util/RunUtil.java
index bea62aa..923ebd5 100644
--- a/src/com/android/tradefed/util/RunUtil.java
+++ b/src/com/android/tradefed/util/RunUtil.java
@@ -16,6 +16,7 @@
package com.android.tradefed.util;
+import com.android.tradefed.command.CommandInterrupter;
import com.android.tradefed.log.LogUtil.CLog;
import com.google.common.annotations.VisibleForTesting;
@@ -29,15 +30,14 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+
/**
* A collection of helper methods for executing operations.
*/
@@ -55,22 +55,19 @@
private Map<String, String> mEnvVariables = new HashMap<String, String>();
private Set<String> mUnsetEnvVariables = new HashSet<String>();
private EnvPriority mEnvVariablePriority = EnvPriority.UNSET;
- // TODO: remove once confidence in new mechanism is better
- private ThreadLocal<Boolean> mIsInterruptAllowed =
- new ThreadLocal<Boolean>() {
- @Override
- protected Boolean initialValue() {
- return Boolean.FALSE;
- }
- };
- private Map<Thread, Boolean> mMapIsInterruptAllowed = new HashMap<Thread, Boolean>();
- private Map<Thread, String> mMapInterruptThreads = new HashMap<Thread, String>();
- private Map<Thread, Timer> mWatchdogInterrupt = new HashMap<>();
+
+ private final CommandInterrupter mInterrupter;
/**
* Create a new {@link RunUtil} object to use.
*/
public RunUtil() {
+ this(CommandInterrupter.INSTANCE);
+ }
+
+ @VisibleForTesting
+ RunUtil(@Nonnull CommandInterrupter interrupter) {
+ mInterrupter = interrupter;
}
/**
@@ -88,20 +85,6 @@
return sDefaultInstance;
}
- /** Remove the thread that are not alive anymore from our tracking to keep the list small. */
- private void cleanInterruptStateThreadMap() {
- synchronized (mMapIsInterruptAllowed) {
- for (Iterator<Thread> iterator = mMapIsInterruptAllowed.keySet().iterator();
- iterator.hasNext();
- ) {
- Thread t = iterator.next();
- if (!t.isAlive()) {
- iterator.remove();
- }
- }
- }
- }
-
/**
* {@inheritDoc}
*/
@@ -334,7 +317,7 @@
@Override
public CommandStatus runTimed(long timeout, IRunUtil.IRunnableResult runnable,
boolean logErrors) {
- checkInterrupted();
+ mInterrupter.checkInterrupted();
RunnableNotifier runThread = new RunnableNotifier(runnable, logErrors);
if (logErrors) {
if (timeout > 0l) {
@@ -364,7 +347,7 @@
CLog.i("runTimed: received an interrupt but uninterruptible mode, ignoring");
}
}
- checkInterrupted();
+ mInterrupter.checkInterrupted();
} while ((timeout == 0l || (System.currentTimeMillis() - startTime) < timeout)
&& runThread.isAlive());
// Snapshot the status when out of the run loop because thread may terminate and return a
@@ -374,7 +357,7 @@
CLog.i("runTimed: Calling interrupt, status is %s", status);
runThread.cancel();
}
- checkInterrupted();
+ mInterrupter.checkInterrupted();
return status;
}
@@ -458,7 +441,7 @@
*/
@Override
public void sleep(long time) {
- checkInterrupted();
+ mInterrupter.checkInterrupted();
if (time <= 0) {
return;
}
@@ -468,87 +451,31 @@
// ignore
CLog.d("sleep interrupted");
}
- checkInterrupted();
+ mInterrupter.checkInterrupted();
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public void allowInterrupt(boolean allow) {
- CLog.d("run interrupt allowed: %s", allow);
- mIsInterruptAllowed.set(allow);
- synchronized (mMapIsInterruptAllowed) {
- mMapIsInterruptAllowed.put(Thread.currentThread(), allow);
- }
- checkInterrupted();
+ mInterrupter.allowInterrupt(allow);
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public boolean isInterruptAllowed() {
- synchronized (mMapIsInterruptAllowed) {
- if (mMapIsInterruptAllowed.get(Thread.currentThread()) == null) {
- // We don't add in this case to keep the map relatively small.
- return false;
- }
- return mMapIsInterruptAllowed.get(Thread.currentThread());
- }
+ return mInterrupter.isInterruptAllowed();
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public void setInterruptibleInFuture(Thread thread, final long timeMs) {
- CLog.w("Setting future interruption in %s ms", timeMs);
- synchronized (mMapIsInterruptAllowed) {
- if (Boolean.TRUE.equals(mMapIsInterruptAllowed.get(thread))) {
- CLog.v("Thread is already interruptible. setInterruptibleInFuture is inop.");
- return;
- }
- }
- Timer timer = new Timer(true);
- synchronized (mWatchdogInterrupt) {
- mWatchdogInterrupt.put(thread, timer);
- }
- timer.schedule(new InterruptTask(thread), timeMs);
+ mInterrupter.setInterruptibleInFuture(thread, timeMs);
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public synchronized void interrupt(Thread thread, String message) {
- if (message == null) {
- throw new IllegalArgumentException("message cannot be null.");
- }
- mMapInterruptThreads.put(thread, message);
- checkInterrupted();
- }
-
- private synchronized void checkInterrupted() {
- // Keep the map of thread's state clean of dead threads.
- this.cleanInterruptStateThreadMap();
-
- final Thread thread = Thread.currentThread();
- // TODO: remove once confidence in new mechanism is better
- // This should only turn on when a shutdownHard is called with a shutdown timeout, since
- // the old way cannot change the thread interruptible state.
- if (mIsInterruptAllowed.get() != isInterruptAllowed()) {
- CLog.e(
- "Mismatched between old/new Interruptible allowed, old: %s vs new:%s",
- mIsInterruptAllowed.get(), mMapIsInterruptAllowed.get(Thread.currentThread()));
- }
- if (isInterruptAllowed()) {
- final String message = mMapInterruptThreads.remove(thread);
- if (message != null) {
- thread.interrupt();
- throw new RunInterruptedException(message);
- }
- }
+ mInterrupter.interrupt(thread, message);
}
/**
@@ -792,42 +719,7 @@
/** Allow to stop the Timer Thread for the run util instance if started. */
@VisibleForTesting
void terminateTimer() {
- if (mWatchdogInterrupt != null && !mWatchdogInterrupt.isEmpty()) {
- for (Timer t : mWatchdogInterrupt.values()) {
- t.purge();
- t.cancel();
- }
- }
- }
-
- /** Timer that will execute a interrupt on the Thread registered. */
- private class InterruptTask extends TimerTask {
-
- private Thread mToInterrupt = null;
-
- public InterruptTask(Thread t) {
- mToInterrupt = t;
- }
-
- @Override
- public void run() {
- if (mToInterrupt != null) {
- synchronized (mWatchdogInterrupt) {
- // Ensure that the timer associated with the task is cancelled too.
- mWatchdogInterrupt.get(mToInterrupt).cancel();
- }
-
- CLog.e("Interrupting with TimerTask");
- synchronized (mMapIsInterruptAllowed) {
- mMapIsInterruptAllowed.put(mToInterrupt, true);
- }
- mToInterrupt.interrupt();
-
- synchronized (mWatchdogInterrupt) {
- mWatchdogInterrupt.remove(mToInterrupt);
- }
- }
- }
+ mInterrupter.terminateTimer();
}
/**
diff --git a/tests/src/com/android/tradefed/UnitTests.java b/tests/src/com/android/tradefed/UnitTests.java
index 259e1e1..04c0631 100644
--- a/tests/src/com/android/tradefed/UnitTests.java
+++ b/tests/src/com/android/tradefed/UnitTests.java
@@ -28,6 +28,7 @@
import com.android.tradefed.build.OtaZipfileBuildProviderTest;
import com.android.tradefed.command.CommandFileParserTest;
import com.android.tradefed.command.CommandFileWatcherTest;
+import com.android.tradefed.command.CommandInterrupterTest;
import com.android.tradefed.command.CommandOptionsTest;
import com.android.tradefed.command.CommandRunnerTest;
import com.android.tradefed.command.CommandSchedulerTest;
@@ -332,6 +333,7 @@
// command
CommandFileParserTest.class,
CommandFileWatcherTest.class,
+ CommandInterrupterTest.class,
CommandOptionsTest.class,
CommandRunnerTest.class,
CommandSchedulerTest.class,
diff --git a/tests/src/com/android/tradefed/command/CommandInterrupterTest.java b/tests/src/com/android/tradefed/command/CommandInterrupterTest.java
new file mode 100644
index 0000000..596b37c
--- /dev/null
+++ b/tests/src/com/android/tradefed/command/CommandInterrupterTest.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.android.tradefed.command;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.android.tradefed.util.RunInterruptedException;
+import com.android.tradefed.util.RunUtil;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link CommandInterrupter} */
+@RunWith(JUnit4.class)
+public class CommandInterrupterTest {
+
+ private static final String MESSAGE = "message";
+
+ private CommandInterrupter mInterrupter;
+
+ @Before
+ public void setUp() {
+ mInterrupter = new CommandInterrupter();
+ }
+
+ @Test
+ public void testAllowInterrupt() throws InterruptedException {
+ execute(
+ () -> {
+ // interrupts initially blocked
+ assertFalse(mInterrupter.isInterruptAllowed());
+
+ // thread can be made interruptible
+ mInterrupter.allowInterrupt(true);
+ assertTrue(mInterrupter.isInterruptAllowed());
+ });
+ }
+
+ @Test
+ public void testInterrupt() throws InterruptedException {
+ execute(
+ () -> {
+ try {
+ // can interrupt the thread
+ mInterrupter.allowInterrupt(true);
+ mInterrupter.interrupt(Thread.currentThread(), MESSAGE);
+ fail("RunInterruptedException was expected");
+ } catch (RunInterruptedException e) {
+ assertEquals(MESSAGE, e.getMessage());
+ }
+ });
+ }
+
+ @Test
+ public void testInterrupt_blocked() throws InterruptedException {
+ execute(
+ () -> {
+ // track whether interrupts were successfully blocked
+ boolean success = false;
+
+ try {
+ // not interrupted if interrupts disallowed
+ mInterrupter.allowInterrupt(false);
+ mInterrupter.interrupt(Thread.currentThread(), MESSAGE);
+ success = true;
+
+ // interrupted once interrupts allowed
+ mInterrupter.allowInterrupt(true);
+ fail("RunInterruptedException was expected");
+ } catch (RunInterruptedException e) {
+ assertEquals(MESSAGE, e.getMessage());
+ assertTrue(success);
+ }
+ });
+ }
+
+ @Test
+ public void testSetInterruptibleInFuture() throws InterruptedException {
+ execute(
+ () -> {
+ try {
+ // allow interruptions after a delay
+ mInterrupter.setInterruptibleInFuture(Thread.currentThread(), 200L);
+
+ // not yet marked as interruptible
+ RunUtil.getDefault().sleep(50);
+ assertFalse(mInterrupter.isInterruptAllowed());
+
+ // marked as interruptible after enough time has passed
+ RunUtil.getDefault().sleep(200L);
+ assertTrue(mInterrupter.isInterruptAllowed());
+ } finally {
+ mInterrupter.terminateTimer();
+ }
+ });
+ }
+
+ @Test
+ public void testSetInterruptibleInFuture_alreadyAllowed() throws InterruptedException {
+ execute(
+ () -> {
+ try {
+ // interrupts allowed
+ mInterrupter.allowInterrupt(true);
+
+ // unchanged after asynchronously allowing interrupts
+ mInterrupter.setInterruptibleInFuture(Thread.currentThread(), 200L);
+ assertTrue(mInterrupter.isInterruptAllowed());
+ } finally {
+ mInterrupter.terminateTimer();
+ }
+ });
+ }
+
+ // Execute test in separate thread
+ private static void execute(Runnable runnable) throws InterruptedException {
+ Thread thread = new Thread(runnable, "CommandInterrupterTest");
+ thread.setDaemon(true);
+ thread.start();
+ thread.join();
+ }
+}
diff --git a/tests/src/com/android/tradefed/util/RunUtilTest.java b/tests/src/com/android/tradefed/util/RunUtilTest.java
index 56806f6..7d1b221 100644
--- a/tests/src/com/android/tradefed/util/RunUtilTest.java
+++ b/tests/src/com/android/tradefed/util/RunUtilTest.java
@@ -23,11 +23,13 @@
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
+import com.android.tradefed.command.CommandInterrupter;
import com.android.tradefed.util.IRunUtil.EnvPriority;
import com.android.tradefed.util.IRunUtil.IRunnableResult;
import com.android.tradefed.util.RunUtil.RunnableResult;
import org.easymock.EasyMock;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -58,10 +60,16 @@
@Before
public void setUp() throws Exception {
- mRunUtil = new RunUtil();
+ mRunUtil = new RunUtil(new CommandInterrupter());
mMockRunnableResult = null;
}
+ @After
+ public void tearDown() {
+ // clear interrupted status
+ Thread.interrupted();
+ }
+
/** Test class on {@link RunUtil} in order to avoid creating a real process. */
class SpyRunUtil extends RunUtil {
private boolean mShouldThrow = false;