Di Qian | 38c02a7 | 2019-11-18 19:14:07 -0800 | [diff] [blame] | 1 | /* |
| 2 | * Copyright (C) 2019 The Android Open Source Project |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | package com.android.tradefed.cluster; |
| 17 | |
Daniel Peykov | 0cb492a | 2019-12-13 10:49:35 -0800 | [diff] [blame] | 18 | import com.android.ddmlib.testrunner.TestResult.TestStatus; |
Di Qian | 38c02a7 | 2019-11-18 19:14:07 -0800 | [diff] [blame] | 19 | import com.android.tradefed.build.BuildInfo; |
Julien Desprez | 5c33047 | 2020-02-05 14:36:20 -0800 | [diff] [blame] | 20 | import com.android.tradefed.cluster.ClusterHostEvent.HostEventType; |
Di Qian | 38c02a7 | 2019-11-18 19:14:07 -0800 | [diff] [blame] | 21 | import com.android.tradefed.command.CommandScheduler; |
| 22 | import com.android.tradefed.command.ICommandScheduler; |
| 23 | import com.android.tradefed.command.remote.DeviceDescriptor; |
| 24 | import com.android.tradefed.config.ConfigurationException; |
| 25 | import com.android.tradefed.config.IConfiguration; |
| 26 | import com.android.tradefed.device.DeviceAllocationState; |
| 27 | import com.android.tradefed.device.DeviceNotAvailableException; |
| 28 | import com.android.tradefed.device.FreeDeviceState; |
| 29 | import com.android.tradefed.device.IDeviceManager; |
| 30 | import com.android.tradefed.device.ITestDevice; |
| 31 | import com.android.tradefed.device.NoDeviceException; |
| 32 | import com.android.tradefed.device.battery.BatteryController; |
| 33 | import com.android.tradefed.device.battery.IBatteryInfo; |
| 34 | import com.android.tradefed.device.battery.IBatteryInfo.BatteryState; |
| 35 | import com.android.tradefed.invoker.IInvocationContext; |
| 36 | import com.android.tradefed.invoker.InvocationContext; |
Julien Desprez | 5c33047 | 2020-02-05 14:36:20 -0800 | [diff] [blame] | 37 | import com.android.tradefed.invoker.logger.InvocationMetricLogger.InvocationMetricKey; |
Di Qian | 38c02a7 | 2019-11-18 19:14:07 -0800 | [diff] [blame] | 38 | import com.android.tradefed.log.LogUtil.CLog; |
| 39 | import com.android.tradefed.result.CollectingTestListener; |
| 40 | import com.android.tradefed.result.ITestSummaryListener; |
| 41 | import com.android.tradefed.result.TestRunResult; |
| 42 | import com.android.tradefed.result.TestSummary; |
| 43 | import com.android.tradefed.util.FileUtil; |
| 44 | import com.android.tradefed.util.MultiMap; |
| 45 | import com.android.tradefed.util.QuotationAwareTokenizer; |
Julien Desprez | 8777066 | 2020-02-25 11:28:46 -0800 | [diff] [blame] | 46 | import com.android.tradefed.util.StreamUtil; |
Di Qian | 38c02a7 | 2019-11-18 19:14:07 -0800 | [diff] [blame] | 47 | |
Di Qian | 38c02a7 | 2019-11-18 19:14:07 -0800 | [diff] [blame] | 48 | import com.google.common.primitives.Ints; |
| 49 | |
| 50 | import org.json.JSONException; |
| 51 | |
| 52 | import java.io.File; |
| 53 | import java.io.IOException; |
| 54 | import java.util.Collection; |
| 55 | import java.util.Collections; |
| 56 | import java.util.HashMap; |
| 57 | import java.util.HashSet; |
| 58 | import java.util.LinkedList; |
| 59 | import java.util.List; |
| 60 | import java.util.Map; |
Di Qian | 38c02a7 | 2019-11-18 19:14:07 -0800 | [diff] [blame] | 61 | import java.util.Optional; |
| 62 | import java.util.Set; |
| 63 | import java.util.concurrent.RejectedExecutionHandler; |
| 64 | import java.util.concurrent.ScheduledFuture; |
| 65 | import java.util.concurrent.ScheduledThreadPoolExecutor; |
| 66 | import java.util.concurrent.ThreadFactory; |
| 67 | import java.util.concurrent.ThreadPoolExecutor; |
| 68 | import java.util.concurrent.TimeUnit; |
| 69 | |
| 70 | /** |
| 71 | * A {@link ICommandScheduler} to support TFC (Tradefed Cluster). This scheduler runs commands from |
| 72 | * TFC command-queue and uploads invocation events to TFC command-event-queue. |
| 73 | */ |
| 74 | public class ClusterCommandScheduler extends CommandScheduler { |
| 75 | |
| 76 | /** The {@link ScheduledThreadPoolExecutor} used to manage heartbeats. */ |
| 77 | private ScheduledThreadPoolExecutor mHeartbeatThreadPool = null; |
| 78 | |
| 79 | /** The {@link IClusterOptions} instance used to store cluster-related settings. */ |
| 80 | private IClusterOptions mClusterOptions; |
| 81 | |
| 82 | /** The {@link IClusterClient} instance used to interact with the TFC backend. */ |
| 83 | private IClusterClient mClusterClient; |
| 84 | |
| 85 | /** |
| 86 | * A {@link ThreadFactory} which returns threads in a dedicated heartbeat group. |
| 87 | * |
| 88 | * <p>This class is used as a factory by {@code mHeartbeatThreadPool} in order to segregate |
| 89 | * heartbeat threads from other "stray" threads to avoid tripping loose thread detection in |
| 90 | * {@link CommandScheduler}. |
| 91 | */ |
| 92 | private static class HeartbeatThreadFactory implements ThreadFactory { |
| 93 | private static final ThreadGroup HB_GROUP; |
| 94 | |
| 95 | static { |
| 96 | // fetch root thread group as this class may be initialized by an invocation thread |
| 97 | ThreadGroup tg = Thread.currentThread().getThreadGroup(); |
| 98 | while (tg.getParent() != null) { |
| 99 | tg = tg.getParent(); |
| 100 | } |
| 101 | HB_GROUP = new ThreadGroup(tg, "ClusterCommandScheduler.heartbeat"); |
| 102 | } |
| 103 | |
| 104 | @Override |
| 105 | public Thread newThread(Runnable r) { |
| 106 | Thread thread = new Thread(HB_GROUP, r); |
| 107 | // heartbeat should always get cancelled, but ensure it doesn't prevent JVM exit |
| 108 | thread.setDaemon(true); |
| 109 | return thread; |
| 110 | } |
| 111 | } |
| 112 | |
| 113 | /** {@inheritDoc} */ |
| 114 | @Override |
| 115 | public void start() { |
| 116 | UploadHostEventWithState(HostState.RUNNING); |
| 117 | super.start(); |
| 118 | } |
| 119 | |
| 120 | /** {@inheritDoc} */ |
| 121 | @Override |
| 122 | public void shutdown() { |
| 123 | UploadHostEventWithState(HostState.QUITTING); |
| 124 | getHeartbeatThreadPool().shutdown(); |
| 125 | super.shutdown(); |
| 126 | } |
| 127 | |
| 128 | @Override |
| 129 | public synchronized void shutdownHard() { |
| 130 | UploadHostEventWithState(HostState.KILLING); |
| 131 | getHeartbeatThreadPool().shutdown(); |
| 132 | super.shutdownHard(); |
| 133 | } |
| 134 | |
| 135 | /** |
| 136 | * A {@link com.android.tradefed.command.ICommandScheduler.IScheduledInvocationListener} to |
| 137 | * upload events to TFC. |
| 138 | */ |
| 139 | class InvocationEventHandler extends CollectingTestListener |
| 140 | implements IScheduledInvocationListener, ITestSummaryListener { |
| 141 | |
| 142 | private ScheduledFuture<?> mHeartbeat; |
| 143 | private final ClusterCommand mCommandTask; |
| 144 | private Set<String> mDeviceSerials = new HashSet<>(); |
| 145 | private String mSummary; |
Yichun Li | 6e2a24a | 2020-02-21 14:30:35 -0800 | [diff] [blame] | 146 | private Set<String> processedSummaries = new HashSet<>(); |
Di Qian | 38c02a7 | 2019-11-18 19:14:07 -0800 | [diff] [blame] | 147 | private String mError; |
| 148 | private File mWorkDir; |
| 149 | private InvocationStatus mInvocationStatus; |
| 150 | |
| 151 | /** |
| 152 | * Creates a {@link InvocationEventHandler} to track the given {@link ClusterCommand}. |
| 153 | * |
| 154 | * @param commandTask the {@link ClusterCommand} to track. |
| 155 | */ |
| 156 | public InvocationEventHandler(ClusterCommand commandTask) { |
| 157 | mCommandTask = commandTask; |
| 158 | } |
| 159 | |
| 160 | /** |
| 161 | * Sets a work directory for an invocation. |
| 162 | * |
| 163 | * @param dir a work directory. |
| 164 | */ |
| 165 | public void setWorkDir(File dir) { |
| 166 | mWorkDir = dir; |
| 167 | } |
| 168 | |
| 169 | private ClusterCommandEvent.Builder createEventBuilder() { |
| 170 | final ClusterCommandEvent.Builder builder = |
| 171 | ClusterCommandEvent.createEventBuilder(mCommandTask) |
| 172 | .setHostName(ClusterHostUtil.getHostName()); |
| 173 | if (!mDeviceSerials.isEmpty()) { |
| 174 | builder.setDeviceSerials(mDeviceSerials); |
| 175 | } |
| 176 | return builder; |
| 177 | } |
| 178 | |
| 179 | private void updateInvocationStatus() { |
| 180 | if (!getClusterOptions().shouldUploadInvocationStatus()) { |
| 181 | return; |
| 182 | } |
| 183 | final InvocationStatus obj = new InvocationStatus(); |
| 184 | final Collection<TestRunResult> testRunResults = this.getMergedTestRunResults(); |
| 185 | for (final TestRunResult result : testRunResults) { |
| 186 | final TestGroupStatus testGroupStatus = |
| 187 | new TestGroupStatus( |
| 188 | result.getName(), |
| 189 | result.getNumTests(), |
| 190 | result.getNumCompleteTests(), |
| 191 | result.getNumAllFailedTests(), |
Daniel Peykov | 0cb492a | 2019-12-13 10:49:35 -0800 | [diff] [blame] | 192 | result.getNumTestsInState(TestStatus.PASSED), |
Di Qian | 38c02a7 | 2019-11-18 19:14:07 -0800 | [diff] [blame] | 193 | result.isRunComplete(), |
Moon Kim | d15525e | 2020-04-28 16:49:10 -0700 | [diff] [blame] | 194 | result.getElapsedTime()); |
Di Qian | 38c02a7 | 2019-11-18 19:14:07 -0800 | [diff] [blame] | 195 | obj.addTestGroupStatus(testGroupStatus); |
| 196 | } |
| 197 | mInvocationStatus = obj; |
| 198 | } |
| 199 | |
| 200 | /** {@inheritDoc} */ |
| 201 | @Override |
| 202 | public void invocationInitiated(IInvocationContext context) { |
| 203 | for (ITestDevice device : context.getDevices()) { |
| 204 | mDeviceSerials.add(device.getSerialNumber()); |
| 205 | } |
| 206 | final ClusterCommandEvent event = |
| 207 | createEventBuilder() |
| 208 | .setType(ClusterCommandEvent.Type.InvocationInitiated) |
| 209 | .build(); |
| 210 | getClusterClient().getCommandEventUploader().postEvent(event); |
| 211 | getClusterClient().getCommandEventUploader().flush(); |
| 212 | mHeartbeat = startHeartbeat(); |
| 213 | // Check that devices are in charging state before starting the invocation. |
| 214 | for (ITestDevice device : context.getDevices()) { |
| 215 | try { |
| 216 | BatteryState state = BatteryController.getDeviceChargingState(device); |
| 217 | if (BatteryState.NOT_CHARGING.equals(state)) { |
| 218 | IBatteryInfo info = BatteryController.getBatteryInfoForDevice(device); |
| 219 | if (info != null) { |
| 220 | info.enableCharging(device); |
| 221 | } |
| 222 | } |
| 223 | } catch (DeviceNotAvailableException e) { |
| 224 | CLog.e(e); |
| 225 | } |
| 226 | } |
| 227 | } |
| 228 | |
| 229 | /** {@inheritDoc} */ |
| 230 | @Override |
| 231 | public void invocationStarted(IInvocationContext context) { |
| 232 | super.invocationStarted(context); |
| 233 | final ClusterCommandEvent event = |
| 234 | createEventBuilder() |
| 235 | .setType(ClusterCommandEvent.Type.InvocationStarted) |
| 236 | .build(); |
| 237 | getClusterClient().getCommandEventUploader().postEvent(event); |
| 238 | getClusterClient().getCommandEventUploader().flush(); |
| 239 | } |
| 240 | |
| 241 | @Override |
| 242 | public void testRunStarted(String name, int numTests) { |
| 243 | testRunStarted(name, numTests, 0); |
| 244 | } |
| 245 | |
| 246 | @Override |
| 247 | public void testRunStarted(String name, int numTests, int attemptNumber) { |
| 248 | testRunStarted(name, numTests, attemptNumber, System.currentTimeMillis()); |
| 249 | } |
| 250 | |
| 251 | /** {@inheritDoc} */ |
| 252 | @Override |
| 253 | public void testRunStarted(String name, int numTests, int attemptNumber, long startTime) { |
| 254 | super.testRunStarted(name, numTests, attemptNumber, startTime); |
| 255 | updateInvocationStatus(); |
| 256 | } |
| 257 | |
| 258 | /** {@inheritDoc} */ |
| 259 | @Override |
| 260 | public void invocationFailed(Throwable cause) { |
| 261 | super.invocationFailed(cause); |
| 262 | |
Julien Desprez | 8777066 | 2020-02-25 11:28:46 -0800 | [diff] [blame] | 263 | mError = StreamUtil.getStackTrace(cause); |
Di Qian | 38c02a7 | 2019-11-18 19:14:07 -0800 | [diff] [blame] | 264 | } |
| 265 | |
| 266 | /** {@inheritDoc} */ |
| 267 | @Override |
| 268 | public void invocationEnded(long elapsedTime) { |
| 269 | super.invocationEnded(elapsedTime); |
| 270 | |
| 271 | ClusterCommandEvent event = |
| 272 | createEventBuilder() |
| 273 | .setType(ClusterCommandEvent.Type.InvocationEnded) |
| 274 | .setData(ClusterCommandEvent.DATA_KEY_ERROR, mError) |
| 275 | .build(); |
| 276 | getClusterClient().getCommandEventUploader().postEvent(event); |
| 277 | getClusterClient().getCommandEventUploader().flush(); |
| 278 | } |
| 279 | |
| 280 | /** {@inheritDoc} */ |
| 281 | @Override |
| 282 | public void invocationComplete( |
| 283 | IInvocationContext metadata, Map<ITestDevice, FreeDeviceState> devicesStates) { |
| 284 | if (mWorkDir != null) { |
| 285 | FileUtil.recursiveDelete(mWorkDir); |
| 286 | } |
| 287 | |
| 288 | // TODO: handle multi-device where only one of the build could be missing. |
| 289 | if (getPrimaryBuildInfo() == null && mError == null) { |
| 290 | mError = "build not found"; |
| 291 | } |
| 292 | |
Julien Desprez | 5c33047 | 2020-02-05 14:36:20 -0800 | [diff] [blame] | 293 | String fetchBuildTimeMillis = "-1"; |
| 294 | String setupTimeMillis = "-1"; |
| 295 | if (metadata != null) { |
| 296 | fetchBuildTimeMillis = |
| 297 | metadata.getAttributes() |
| 298 | .getUniqueMap() |
| 299 | .get(InvocationMetricKey.FETCH_BUILD.toString()); |
| 300 | setupTimeMillis = |
| 301 | metadata.getAttributes() |
| 302 | .getUniqueMap() |
| 303 | .get(InvocationMetricKey.SETUP.toString()); |
Di Qian | 38c02a7 | 2019-11-18 19:14:07 -0800 | [diff] [blame] | 304 | } |
| 305 | |
| 306 | // Stop heartbeat thread before sending InvocationCompleted event. |
| 307 | if (mHeartbeat != null) { |
| 308 | mHeartbeat.cancel(true); |
| 309 | } |
| 310 | updateInvocationStatus(); |
| 311 | final ClusterCommandEvent event = |
| 312 | createEventBuilder() |
| 313 | .setType(ClusterCommandEvent.Type.InvocationCompleted) |
| 314 | .setInvocationStatus(mInvocationStatus) |
| 315 | .setData(ClusterCommandEvent.DATA_KEY_ERROR, mError) |
| 316 | .setData(ClusterCommandEvent.DATA_KEY_SUMMARY, mSummary) |
| 317 | .setData( |
| 318 | ClusterCommandEvent.DATA_KEY_FETCH_BUILD_TIME_MILLIS, |
Julien Desprez | 5c33047 | 2020-02-05 14:36:20 -0800 | [diff] [blame] | 319 | fetchBuildTimeMillis) |
Di Qian | 38c02a7 | 2019-11-18 19:14:07 -0800 | [diff] [blame] | 320 | .setData( |
Julien Desprez | 5c33047 | 2020-02-05 14:36:20 -0800 | [diff] [blame] | 321 | ClusterCommandEvent.DATA_KEY_SETUP_TIME_MILLIS, setupTimeMillis) |
Di Qian | 38c02a7 | 2019-11-18 19:14:07 -0800 | [diff] [blame] | 322 | .setData( |
| 323 | ClusterCommandEvent.DATA_KEY_TOTAL_TEST_COUNT, |
| 324 | Integer.toString(getNumTotalTests())) |
| 325 | .setData( |
| 326 | ClusterCommandEvent.DATA_KEY_FAILED_TEST_COUNT, |
| 327 | Integer.toString(getNumAllFailedTests())) |
| 328 | .setData( |
Daniel Peykov | 0cb492a | 2019-12-13 10:49:35 -0800 | [diff] [blame] | 329 | ClusterCommandEvent.DATA_KEY_PASSED_TEST_COUNT, |
| 330 | Integer.toString(getNumTestsInState(TestStatus.PASSED))) |
| 331 | .setData( |
Di Qian | 38c02a7 | 2019-11-18 19:14:07 -0800 | [diff] [blame] | 332 | ClusterCommandEvent.DATA_KEY_FAILED_TEST_RUN_COUNT, |
| 333 | Integer.toString(getNumAllFailedTestRuns())) |
| 334 | .build(); |
| 335 | getClusterClient().getCommandEventUploader().postEvent(event); |
| 336 | getClusterClient().getCommandEventUploader().flush(); |
| 337 | } |
| 338 | |
| 339 | /** {@inheritDoc} */ |
| 340 | @Override |
Yichun Li | b935e79 | 2020-02-10 18:31:48 -0800 | [diff] [blame] | 341 | public void putEarlySummary(List<TestSummary> summaries) { |
| 342 | if (getClusterOptions().shouldCollectEarlyTestSummary()) { |
| 343 | putSummary(summaries); |
| 344 | } |
| 345 | } |
| 346 | |
| 347 | /** {@inheritDoc} */ |
| 348 | @Override |
Di Qian | 38c02a7 | 2019-11-18 19:14:07 -0800 | [diff] [blame] | 349 | public void putSummary(List<TestSummary> summaries) { |
Yichun Li | b935e79 | 2020-02-10 18:31:48 -0800 | [diff] [blame] | 350 | StringBuilder sb = new StringBuilder(); |
Di Qian | 38c02a7 | 2019-11-18 19:14:07 -0800 | [diff] [blame] | 351 | for (final TestSummary summary : summaries) { |
Yichun Li | 6e2a24a | 2020-02-21 14:30:35 -0800 | [diff] [blame] | 352 | String summaryString = summary.getSummary().toString(); |
| 353 | if (!processedSummaries.contains(summaryString)) { |
| 354 | processedSummaries.add(summaryString); |
| 355 | sb.append(summaryString); |
| 356 | sb.append("\n"); |
| 357 | } |
Di Qian | 38c02a7 | 2019-11-18 19:14:07 -0800 | [diff] [blame] | 358 | } |
Yichun Li | b935e79 | 2020-02-10 18:31:48 -0800 | [diff] [blame] | 359 | mSummary = mSummary == null ? sb.toString() : mSummary + sb.toString(); |
Di Qian | 38c02a7 | 2019-11-18 19:14:07 -0800 | [diff] [blame] | 360 | } |
| 361 | |
| 362 | private ScheduledFuture<?> startHeartbeat() { |
| 363 | return getHeartbeatThreadPool() |
| 364 | .scheduleAtFixedRate( |
| 365 | new HeartbeatSender(), |
| 366 | 0, |
| 367 | getClusterOptions().getInvocationHeartbeatInterval(), |
| 368 | TimeUnit.MILLISECONDS); |
| 369 | } |
| 370 | |
| 371 | class HeartbeatSender implements Runnable { |
| 372 | @Override |
| 373 | public void run() { |
| 374 | try { |
| 375 | // check cluster command's status |
| 376 | if (getClusterOptions().checkCommandState()) { |
| 377 | ClusterCommand.State status = |
| 378 | getClusterClient() |
| 379 | .getCommandState( |
| 380 | mCommandTask.getRequestId(), |
| 381 | mCommandTask.getCommandId()); |
| 382 | if (ClusterCommand.State.CANCELED.equals(status)) { |
frankfeng | 9a79fe0 | 2020-03-13 10:48:30 -0700 | [diff] [blame] | 383 | // TODO: retrieve cancel reason from TFC. |
| 384 | String cause = |
| 385 | String.format( |
frankfeng | 34da1ef | 2020-04-09 17:53:40 -0700 | [diff] [blame] | 386 | "The cluster client %s has marked command " |
| 387 | + "(requestId=%s, commandId=%s) canceled", |
| 388 | getClusterClient().getClass().getSimpleName(), |
frankfeng | 9a79fe0 | 2020-03-13 10:48:30 -0700 | [diff] [blame] | 389 | mCommandTask.getRequestId(), |
| 390 | mCommandTask.getCommandId()); |
| 391 | CLog.w("Stop invocation due to: %s", cause); |
Di Qian | 38c02a7 | 2019-11-18 19:14:07 -0800 | [diff] [blame] | 392 | Optional.ofNullable(getInvocationContext()) |
| 393 | .map(IInvocationContext::getInvocationId) |
| 394 | .map(Ints::tryParse) |
frankfeng | 9a79fe0 | 2020-03-13 10:48:30 -0700 | [diff] [blame] | 395 | .ifPresent(invocationId -> stopInvocation(invocationId, cause)); |
Di Qian | 38c02a7 | 2019-11-18 19:14:07 -0800 | [diff] [blame] | 396 | } |
| 397 | } |
| 398 | |
| 399 | final ClusterCommandEvent event = |
| 400 | createEventBuilder() |
| 401 | .setType(ClusterCommandEvent.Type.TestRunInProgress) |
| 402 | .setInvocationStatus(mInvocationStatus) |
| 403 | .build(); |
| 404 | getClusterClient().getCommandEventUploader().postEvent(event); |
| 405 | } catch (Exception e) { |
| 406 | CLog.e("Error sending heartbeat to TFC:"); |
| 407 | CLog.e(e); |
| 408 | } |
| 409 | } |
| 410 | } |
| 411 | } |
| 412 | |
| 413 | synchronized ScheduledThreadPoolExecutor getHeartbeatThreadPool() { |
| 414 | if (mHeartbeatThreadPool == null) { |
| 415 | mHeartbeatThreadPool = new ScheduledThreadPoolExecutor(1, new HeartbeatThreadFactory()); |
| 416 | // instead of throwing some exception on shutdown we simply log it. |
| 417 | mHeartbeatThreadPool.setRejectedExecutionHandler( |
| 418 | new RejectedExecutionHandler() { |
| 419 | @Override |
| 420 | public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { |
| 421 | CLog.w( |
| 422 | "Rejecting Task %s rejected from executor %s", |
| 423 | r.toString(), e.toString()); |
| 424 | } |
| 425 | }); |
| 426 | // continue existing heartbeats after shutdown (until invocation is complete) |
| 427 | mHeartbeatThreadPool.setContinueExistingPeriodicTasksAfterShutdownPolicy(true); |
| 428 | } |
| 429 | return mHeartbeatThreadPool; |
| 430 | } |
| 431 | |
| 432 | /** {@inheritDoc} */ |
| 433 | @Override |
| 434 | protected void processReadyCommands(IDeviceManager manager) { |
| 435 | super.processReadyCommands(manager); |
| 436 | |
| 437 | if (isShuttingDown()) { |
| 438 | return; |
| 439 | } |
| 440 | |
| 441 | List<ClusterCommand> commands = null; |
| 442 | MultiMap<String, DeviceDescriptor> devices = getAvailableDevices(manager); |
| 443 | if (devices.isEmpty()) { |
| 444 | CLog.d("No devices are available for testing."); |
| 445 | return; |
| 446 | } |
| 447 | // Lease command tasks through the leasehosttasks API. |
| 448 | // Here we get all devices (available or not), so TFC will analyze the device tree to |
| 449 | // decide which group is allocated and which group is available. |
| 450 | devices = getDevices(manager, false); |
| 451 | commands = fetchHostCommands(devices); |
| 452 | if (commands.isEmpty()) { |
| 453 | CLog.d("No commands available for testing."); |
| 454 | return; |
| 455 | } |
| 456 | execCommands(commands); |
| 457 | } |
| 458 | |
| 459 | /** |
| 460 | * Returns a map containing available devices grouped by their types. |
| 461 | * |
| 462 | * @param manager a {@link IDeviceManager}. |
| 463 | * @return a {@link MultiMap} of String to DeviceDescriptor containing available devices. |
| 464 | */ |
| 465 | MultiMap<String, DeviceDescriptor> getAvailableDevices(IDeviceManager manager) { |
| 466 | return getDevices(manager, true); |
| 467 | } |
| 468 | |
| 469 | /** |
| 470 | * Returns a map containing devices grouped by their types. |
| 471 | * |
| 472 | * @param manager a {@link IDeviceManager}. |
| 473 | * @param availableOnly only return available devices or all devices. |
| 474 | * @return a {@link MultiMap} of String to DeviceDescriptor containing available devices. |
| 475 | */ |
| 476 | MultiMap<String, DeviceDescriptor> getDevices(IDeviceManager manager, boolean availableOnly) { |
| 477 | // Getting available device types |
| 478 | final MultiMap<String, DeviceDescriptor> devices = new MultiMap<>(); |
| 479 | for (final DeviceDescriptor device : manager.listAllDevices()) { |
| 480 | if (availableOnly && device.getState() != DeviceAllocationState.Available) { |
| 481 | continue; |
| 482 | } |
| 483 | if (ClusterHostUtil.isIpPort(device.getSerial())) { |
| 484 | // Note(b/28802876): Skipping IP:PORT serials from cluster scheduling because they |
| 485 | // behave differently from physical devices and are not fully supported by TF. |
| 486 | continue; |
| 487 | } |
| 488 | String runTargetFormat = getClusterOptions().getRunTargetFormat(); |
| 489 | String runTarget = |
| 490 | ClusterHostUtil.getRunTarget( |
| 491 | device, runTargetFormat, getClusterOptions().getDeviceTag()); |
| 492 | CLog.d("%s is available", runTarget); |
| 493 | devices.put(runTarget, device); |
| 494 | } |
| 495 | return devices; |
| 496 | } |
| 497 | |
| 498 | /** |
| 499 | * Get available flashing permits. |
| 500 | * |
| 501 | * @return the number of available flashing permits. |
| 502 | */ |
| 503 | private int getAvailableFlashingPermits() { |
| 504 | // By default there is no limit on available flashing permits. |
| 505 | int availableFlashingPermits = Integer.MAX_VALUE; |
| 506 | final IClusterOptions options = getClusterOptions(); |
| 507 | |
| 508 | boolean checkFlashingPermitsLease = options.checkFlashingPermitsOnLease(); |
| 509 | if (checkFlashingPermitsLease) { |
| 510 | availableFlashingPermits = getDeviceManager().getAvailableFlashingPermits(); |
| 511 | CLog.i("available flasher permits %d", availableFlashingPermits); |
| 512 | } |
| 513 | return availableFlashingPermits; |
| 514 | } |
| 515 | |
| 516 | /** |
| 517 | * Fetches commands for devices from the Tradefed Cluster's leasehosttasks API. |
| 518 | * |
| 519 | * @param devices a {@link MultiMap} of String to DeviceDescriptor containing devices. |
| 520 | * @return a list of {@link ClusterCommand}s. |
| 521 | */ |
| 522 | List<ClusterCommand> fetchHostCommands(final MultiMap<String, DeviceDescriptor> devices) { |
| 523 | CLog.d("fetching cluster host commands from leasehosttasks..."); |
| 524 | int availableFlashingPermits = getAvailableFlashingPermits(); |
| 525 | |
| 526 | // Don't try to lease if there are no flasher permits available |
| 527 | if (availableFlashingPermits == 0) { |
| 528 | CLog.i("There is no available flashing permits. Not lease any additional commands."); |
| 529 | return Collections.<ClusterCommand>emptyList(); |
| 530 | } |
| 531 | |
| 532 | final IClusterOptions options = getClusterOptions(); |
| 533 | final MultiMap<String, String> deviceGroups = options.getDeviceGroup(); |
| 534 | final Map<String, String> deviceToGroup = new HashMap<>(); |
| 535 | for (String group : deviceGroups.keySet()) { |
| 536 | for (String deviceSerial : deviceGroups.get(group)) { |
| 537 | deviceToGroup.put(deviceSerial, group); |
| 538 | } |
| 539 | } |
| 540 | List<ClusterDeviceInfo> deviceInfos = new LinkedList<>(); |
| 541 | for (String runTarget : devices.keySet()) { |
| 542 | for (DeviceDescriptor d : devices.get(runTarget)) { |
| 543 | String groupName = deviceToGroup.getOrDefault(d.getSerial(), null); |
| 544 | ClusterDeviceInfo deviceInfo = |
| 545 | new ClusterDeviceInfo.Builder() |
| 546 | .setDeviceDescriptor(d) |
| 547 | .setRunTarget(runTarget) |
| 548 | .setGroupName(groupName) |
| 549 | .build(); |
| 550 | deviceInfos.add(deviceInfo); |
| 551 | } |
| 552 | } |
| 553 | try { |
| 554 | int count = Math.min(deviceInfos.size(), availableFlashingPermits); |
| 555 | List<ClusterCommand> commands = |
| 556 | getClusterClient() |
| 557 | .leaseHostCommands( |
| 558 | options.getClusterId(), |
| 559 | ClusterHostUtil.getHostName(), |
| 560 | deviceInfos, |
| 561 | options.getNextClusterIds(), |
| 562 | count); |
| 563 | return commands; |
| 564 | } catch (JSONException e) { |
| 565 | CLog.e(e); |
| 566 | return Collections.<ClusterCommand>emptyList(); |
| 567 | } |
| 568 | } |
| 569 | |
| 570 | /** |
| 571 | * Executes commands fetched from the cluster command queue. |
| 572 | * |
| 573 | * @param commands a list of {@link ClusterCommand}s fetched from the cluster command queue. |
| 574 | */ |
| 575 | void execCommands(final List<ClusterCommand> commands) { |
| 576 | for (final ClusterCommand commandTask : commands) { |
| 577 | try { |
| 578 | final InvocationEventHandler handler = new InvocationEventHandler(commandTask); |
| 579 | switch (commandTask.getRequestType()) { |
| 580 | case UNMANAGED: |
| 581 | execClusterCommand(commandTask, handler); |
| 582 | break; |
| 583 | case MANAGED: |
| 584 | execManagedClusterCommand(commandTask, handler); |
| 585 | break; |
| 586 | default: |
| 587 | throw new UnsupportedOperationException(); |
| 588 | } |
| 589 | } catch (NoDeviceException e) { |
| 590 | CLog.w( |
| 591 | "no device meets requirements for cluster command [%s]; returning...", |
| 592 | commandTask.getTaskId()); |
| 593 | CLog.w(e); |
| 594 | IClusterEventUploader<ClusterCommandEvent> eventUploader = |
| 595 | getClusterClient().getCommandEventUploader(); |
| 596 | eventUploader.postEvent( |
| 597 | ClusterCommandEvent.createEventBuilder(commandTask) |
| 598 | .setHostName(ClusterHostUtil.getHostName()) |
| 599 | .setType(ClusterCommandEvent.Type.AllocationFailed) |
| 600 | .build()); |
| 601 | eventUploader.flush(); |
| 602 | } catch (ConfigurationException | IOException | JSONException e) { |
| 603 | CLog.w("failed to execute cluster command [%s]: %s", commandTask.getTaskId(), e); |
| 604 | CLog.w(e); |
| 605 | IClusterEventUploader<ClusterCommandEvent> eventUploader = |
| 606 | getClusterClient().getCommandEventUploader(); |
| 607 | eventUploader.postEvent( |
| 608 | ClusterCommandEvent.createEventBuilder(commandTask) |
| 609 | .setHostName(ClusterHostUtil.getHostName()) |
| 610 | .setType(ClusterCommandEvent.Type.ConfigurationError) |
| 611 | .setData(ClusterCommandEvent.DATA_KEY_ERROR, e.toString()) |
| 612 | .build()); |
| 613 | eventUploader.flush(); |
| 614 | } |
| 615 | } |
| 616 | } |
| 617 | |
| 618 | void execClusterCommand(ClusterCommand commandTask, InvocationEventHandler handler) |
| 619 | throws ConfigurationException, IllegalArgumentException, NoDeviceException { |
| 620 | String cmdLine = commandTask.getCommandLine(); |
| 621 | String[] args = QuotationAwareTokenizer.tokenizeLine(cmdLine); |
| 622 | // If it is a dry run command skip execution. |
| 623 | if (dryRunCommand(handler, args)) { |
| 624 | return; |
| 625 | } |
| 626 | // Append device serials to command. |
| 627 | // By assigning all applicable serials, TF will try one by one until allocation |
| 628 | // succeeds (or fails for all). This mitigates the issue where a single bad |
| 629 | // device can starve tests. |
| 630 | if (commandTask.getTargetDeviceSerials() != null) { |
| 631 | for (String serial : commandTask.getTargetDeviceSerials()) { |
| 632 | cmdLine += " --serial "; |
| 633 | cmdLine += serial; |
| 634 | } |
| 635 | } |
| 636 | CLog.i("executing cluster command: [%s] %s", commandTask.getTaskId(), cmdLine); |
| 637 | execCommand(handler, QuotationAwareTokenizer.tokenizeLine(cmdLine)); |
| 638 | } |
| 639 | |
| 640 | void execManagedClusterCommand(ClusterCommand commandTask, InvocationEventHandler handler) |
| 641 | throws IOException, JSONException, ConfigurationException, NoDeviceException { |
| 642 | File workDir = null; |
| 643 | try { |
| 644 | workDir = new File(System.getProperty("java.io.tmpdir"), commandTask.getAttemptId()); |
| 645 | workDir.mkdirs(); |
| 646 | final String requestId = commandTask.getRequestId(); |
| 647 | final String commandId = commandTask.getCommandId(); |
| 648 | final IClusterClient client = getClusterClient(); |
| 649 | final TestEnvironment testEnvironment = client.getTestEnvironment(requestId); |
| 650 | final List<TestResource> testResources = client.getTestResources(requestId); |
| 651 | final TestContext testContext = client.getTestContext(requestId, commandId); |
| 652 | testResources.addAll(testContext.getTestResources()); |
| 653 | final File configFile = |
| 654 | new ClusterCommandConfigBuilder() |
| 655 | .setWorkDir(workDir) |
| 656 | .setClusterCommand(commandTask) |
| 657 | .setTestEnvironment(testEnvironment) |
| 658 | .setTestResources(testResources) |
| 659 | .setTestContext(testContext) |
| 660 | .build(); |
| 661 | CLog.i("executing cluster command: [%s] %s", commandTask.getTaskId(), configFile); |
| 662 | CLog.d("configFile: %s", FileUtil.readStringFromFile(configFile)); |
| 663 | // FIXME: Find a way to upload a config file after an invocation is completed for |
| 664 | // debugging. |
| 665 | handler.setWorkDir(workDir); |
| 666 | execCommand(handler, new String[] {configFile.getAbsolutePath()}); |
| 667 | // Unset workDir to avoid being cleaned up |
| 668 | workDir = null; |
| 669 | } finally { |
| 670 | if (workDir != null) { |
| 671 | FileUtil.recursiveDelete(workDir); |
| 672 | } |
| 673 | } |
| 674 | } |
| 675 | |
| 676 | /** |
| 677 | * Determines if a given command is a dry-run. If the command is a dry-run, validate it. If |
| 678 | * there are any configs issue, it will throw a ConfigurationException. |
| 679 | * |
| 680 | * @param handler {@link InvocationEventHandler} to report events for dry-run validation. |
| 681 | * @param args the command to validate. |
| 682 | * @return true if the command are a dry run, false otherwise. |
| 683 | * @throws ConfigurationException |
| 684 | */ |
| 685 | protected boolean dryRunCommand(final InvocationEventHandler handler, String[] args) |
| 686 | throws ConfigurationException { |
| 687 | IConfiguration config = |
| 688 | getConfigFactory().createConfigurationFromArgs(args, null, getKeyStoreClient()); |
| 689 | if (config.getCommandOptions().isDryRunMode()) { |
| 690 | IInvocationContext context = new InvocationContext(); |
| 691 | context.addDeviceBuildInfo("stub", new BuildInfo()); |
| 692 | handler.invocationStarted(context); |
| 693 | config.validateOptions(); |
| 694 | handler.invocationEnded(0); |
| 695 | IInvocationContext nullMeta = null; |
| 696 | handler.invocationComplete(nullMeta, null); |
| 697 | return true; |
| 698 | } |
| 699 | return false; |
| 700 | } |
| 701 | |
| 702 | /** Get the {@link IClusterOptions} instance used to store cluster-related settings. */ |
| 703 | IClusterOptions getClusterOptions() { |
| 704 | if (mClusterOptions == null) { |
| 705 | mClusterOptions = ClusterHostUtil.getClusterOptions(); |
| 706 | } |
| 707 | return mClusterOptions; |
| 708 | } |
| 709 | |
| 710 | /** Get the {@link IClusterClient} instance used to interact with the TFC backend. */ |
| 711 | IClusterClient getClusterClient() { |
| 712 | if (mClusterClient == null) { |
| 713 | mClusterClient = ClusterHostUtil.getClusterClient(); |
| 714 | } |
| 715 | return mClusterClient; |
| 716 | } |
| 717 | |
| 718 | /** Event triggered, to upload host states */ |
| 719 | private void UploadHostEventWithState(HostState state) { |
| 720 | try { |
| 721 | IClusterEventUploader<ClusterHostEvent> Uploader = |
| 722 | getClusterClient().getHostEventUploader(); |
| 723 | ClusterHostEvent.Builder builder = |
| 724 | new ClusterHostEvent.Builder() |
| 725 | .setClusterId(getClusterOptions().getClusterId()) |
| 726 | .setHostEventType(HostEventType.HostStateChanged) |
| 727 | .setHostName(ClusterHostUtil.getHostName()) |
| 728 | .setHostState(state); |
| 729 | CLog.d("event uploading with state %s", state.toString()); |
| 730 | ClusterHostEvent event = builder.build(); |
| 731 | Uploader.postEvent(event); |
| 732 | CLog.d("event %s uploaded with state %s", event.toString(), state.toString()); |
| 733 | Uploader.flush(); |
| 734 | } catch (RuntimeException e) { |
| 735 | CLog.e("failed to upload host state %s to TFC: %s", state.toString(), e); |
| 736 | } |
| 737 | } |
| 738 | } |