Di Qian | 38c02a7 | 2019-11-18 19:14:07 -0800 | [diff] [blame] | 1 | /* |
| 2 | * Copyright (C) 2019 The Android Open Source Project |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | package com.android.tradefed.cluster; |
| 17 | |
| 18 | import com.android.tradefed.log.LogUtil.CLog; |
| 19 | |
| 20 | import java.io.IOException; |
| 21 | import java.util.ArrayList; |
| 22 | import java.util.List; |
| 23 | import java.util.Queue; |
| 24 | import java.util.concurrent.ConcurrentLinkedQueue; |
| 25 | |
| 26 | /** ClusterEventUploader class, which uploads {@link IClusterEvent} to TFC. */ |
| 27 | public abstract class ClusterEventUploader<T extends IClusterEvent> |
| 28 | implements IClusterEventUploader<T> { |
| 29 | |
| 30 | // Default maximum event batch size |
| 31 | private static final int DEFAULT_MAX_BATCH_SIZE = 200; |
| 32 | |
| 33 | // Default event upload interval in ms. |
| 34 | private static final long DEFAULT_EVENT_UPLOAD_INTERVAL = 60 * 1000; |
| 35 | |
| 36 | private int mMaxBatchSize = DEFAULT_MAX_BATCH_SIZE; |
| 37 | private long mEventUploadInterval = DEFAULT_EVENT_UPLOAD_INTERVAL; |
| 38 | private long mLastEventUploadTime = 0; |
| 39 | private Queue<T> mEventQueue = new ConcurrentLinkedQueue<T>(); |
| 40 | |
| 41 | /** {@inheritDoc} */ |
| 42 | @Override |
| 43 | public void setMaxBatchSize(int batchSize) { |
| 44 | mMaxBatchSize = batchSize; |
| 45 | } |
| 46 | |
| 47 | /** {@inheritDoc} */ |
| 48 | @Override |
| 49 | public int getMaxBatchSize() { |
| 50 | return mMaxBatchSize; |
| 51 | } |
| 52 | |
| 53 | /** {@inheritDoc} */ |
| 54 | @Override |
| 55 | public void setEventUploadInterval(long interval) { |
| 56 | mEventUploadInterval = interval; |
| 57 | } |
| 58 | |
| 59 | /** {@inheritDoc} */ |
| 60 | @Override |
| 61 | public long getEventUploadInterval() { |
| 62 | return mEventUploadInterval; |
| 63 | } |
| 64 | |
| 65 | /** {@inheritDoc} */ |
| 66 | @Override |
| 67 | public void postEvent(final T event) { |
| 68 | mEventQueue.add(event); |
| 69 | uploadEvents(false); |
| 70 | } |
| 71 | |
| 72 | /** {@inheritDoc} */ |
| 73 | @Override |
| 74 | public void flush() { |
| 75 | uploadEvents(true); |
| 76 | } |
| 77 | |
| 78 | /** |
| 79 | * Upload events. |
| 80 | * |
| 81 | * @param uploadNow upload now or wait for uploading with other events. |
| 82 | */ |
| 83 | private void uploadEvents(final boolean uploadNow) { |
| 84 | final long now = System.currentTimeMillis(); |
| 85 | if (!uploadNow && now - mLastEventUploadTime < getEventUploadInterval()) { |
| 86 | return; |
| 87 | } |
| 88 | uploadEvents(); |
| 89 | } |
| 90 | |
| 91 | /** Synchronized actually upload events. Only one thread will upload the events. */ |
| 92 | private synchronized void uploadEvents() { |
| 93 | // Tradefed Cluster is unable to process command events larger than 100 KiB (b/72104215). |
| 94 | // The ClusterCommandEvent Builder provides some protection against this by limiting data |
| 95 | // fields to 1 KiB, but the number of data fields is not bounded, so we're not entirely |
| 96 | // protected against this. |
| 97 | |
| 98 | mLastEventUploadTime = System.currentTimeMillis(); |
| 99 | List<T> events = new ArrayList<T>(); |
| 100 | try { |
| 101 | // Upload batches of events until there are no more left |
| 102 | while (!mEventQueue.isEmpty()) { |
| 103 | // Limit the number of events to upload at once |
| 104 | int batchSize = getMaxBatchSize(); |
| 105 | while (!mEventQueue.isEmpty() && events.size() < batchSize) { |
| 106 | events.add(mEventQueue.poll()); |
| 107 | } |
| 108 | doUploadEvents(events); |
| 109 | events.clear(); |
| 110 | } |
| 111 | } catch (IOException e) { |
| 112 | CLog.w("failed to upload events: %s", e); |
| 113 | CLog.w("events will be uploaded with the next event."); |
| 114 | mEventQueue.addAll(events); |
| 115 | } |
| 116 | } |
| 117 | |
| 118 | protected abstract void doUploadEvents(List<T> events) throws IOException; |
| 119 | } |