blob: d239a9c71d2c8c7fed8d36285ae5b7a86423a3ca [file] [log] [blame]
Di Qian38c02a72019-11-18 19:14:07 -08001/*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package com.android.tradefed.cluster;
17
18import com.android.tradefed.log.LogUtil.CLog;
19
20import java.io.IOException;
21import java.util.ArrayList;
22import java.util.List;
23import java.util.Queue;
24import java.util.concurrent.ConcurrentLinkedQueue;
25
26/** ClusterEventUploader class, which uploads {@link IClusterEvent} to TFC. */
27public abstract class ClusterEventUploader<T extends IClusterEvent>
28 implements IClusterEventUploader<T> {
29
30 // Default maximum event batch size
31 private static final int DEFAULT_MAX_BATCH_SIZE = 200;
32
33 // Default event upload interval in ms.
34 private static final long DEFAULT_EVENT_UPLOAD_INTERVAL = 60 * 1000;
35
36 private int mMaxBatchSize = DEFAULT_MAX_BATCH_SIZE;
37 private long mEventUploadInterval = DEFAULT_EVENT_UPLOAD_INTERVAL;
38 private long mLastEventUploadTime = 0;
39 private Queue<T> mEventQueue = new ConcurrentLinkedQueue<T>();
40
41 /** {@inheritDoc} */
42 @Override
43 public void setMaxBatchSize(int batchSize) {
44 mMaxBatchSize = batchSize;
45 }
46
47 /** {@inheritDoc} */
48 @Override
49 public int getMaxBatchSize() {
50 return mMaxBatchSize;
51 }
52
53 /** {@inheritDoc} */
54 @Override
55 public void setEventUploadInterval(long interval) {
56 mEventUploadInterval = interval;
57 }
58
59 /** {@inheritDoc} */
60 @Override
61 public long getEventUploadInterval() {
62 return mEventUploadInterval;
63 }
64
65 /** {@inheritDoc} */
66 @Override
67 public void postEvent(final T event) {
68 mEventQueue.add(event);
69 uploadEvents(false);
70 }
71
72 /** {@inheritDoc} */
73 @Override
74 public void flush() {
75 uploadEvents(true);
76 }
77
78 /**
79 * Upload events.
80 *
81 * @param uploadNow upload now or wait for uploading with other events.
82 */
83 private void uploadEvents(final boolean uploadNow) {
84 final long now = System.currentTimeMillis();
85 if (!uploadNow && now - mLastEventUploadTime < getEventUploadInterval()) {
86 return;
87 }
88 uploadEvents();
89 }
90
91 /** Synchronized actually upload events. Only one thread will upload the events. */
92 private synchronized void uploadEvents() {
93 // Tradefed Cluster is unable to process command events larger than 100 KiB (b/72104215).
94 // The ClusterCommandEvent Builder provides some protection against this by limiting data
95 // fields to 1 KiB, but the number of data fields is not bounded, so we're not entirely
96 // protected against this.
97
98 mLastEventUploadTime = System.currentTimeMillis();
99 List<T> events = new ArrayList<T>();
100 try {
101 // Upload batches of events until there are no more left
102 while (!mEventQueue.isEmpty()) {
103 // Limit the number of events to upload at once
104 int batchSize = getMaxBatchSize();
105 while (!mEventQueue.isEmpty() && events.size() < batchSize) {
106 events.add(mEventQueue.poll());
107 }
108 doUploadEvents(events);
109 events.clear();
110 }
111 } catch (IOException e) {
112 CLog.w("failed to upload events: %s", e);
113 CLog.w("events will be uploaded with the next event.");
114 mEventQueue.addAll(events);
115 }
116 }
117
118 protected abstract void doUploadEvents(List<T> events) throws IOException;
119}