blob: 70f12c1102d871729666a07c5380264798277919 [file] [log] [blame]
Juanli Shena0aab7e2018-05-01 10:30:54 -07001/*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19#include <cstdlib>
20#include <set>
21#include <unordered_map>
22#include <vector>
23
24#include "src/cpp/server/load_reporter/load_data_store.h"
25
26namespace grpc {
27namespace load_reporter {
28
29// Some helper functions.
30namespace {
31
32// Given a map from type K to a set of value type V, finds the set associated
33// with the given key and erases the value from the set. If the set becomes
34// empty, also erases the key-set pair. Returns true if the value is erased
35// successfully.
36template <typename K, typename V>
37bool UnorderedMapOfSetEraseKeyValue(std::unordered_map<K, std::set<V>>& map,
38 const K& key, const V& value) {
39 auto it = map.find(key);
40 if (it != map.end()) {
41 size_t erased = it->second.erase(value);
42 if (it->second.size() == 0) {
43 map.erase(it);
44 }
45 return erased;
46 }
47 return false;
48};
49
50// Given a map from type K to a set of value type V, removes the given key and
51// the associated set, and returns the set. Returns an empty set if the key is
52// not found.
53template <typename K, typename V>
54std::set<V> UnorderedMapOfSetExtract(std::unordered_map<K, std::set<V>>& map,
55 const K& key) {
56 auto it = map.find(key);
57 if (it != map.end()) {
58 auto set = std::move(it->second);
59 map.erase(it);
60 return set;
61 }
62 return {};
63};
64
65// From a non-empty container, returns a pointer to a random element.
66template <typename C>
67const typename C::value_type* RandomElement(const C& container) {
68 GPR_ASSERT(!container.empty());
69 auto it = container.begin();
70 std::advance(it, std::rand() % container.size());
71 return &(*it);
72}
73
74} // namespace
75
76void PerBalancerStore::MergeRow(const LoadRecordKey& key,
77 const LoadRecordValue& value) {
78 // During suspension, the load data received will be dropped.
79 if (!suspended_) {
80 load_record_map_[key].MergeFrom(value);
81 gpr_log(GPR_DEBUG,
82 "[PerBalancerStore %p] Load data merged (Key: %s, Value: %s).",
83 this, key.ToString().c_str(), value.ToString().c_str());
84 } else {
85 gpr_log(GPR_DEBUG,
86 "[PerBalancerStore %p] Load data dropped (Key: %s, Value: %s).",
87 this, key.ToString().c_str(), value.ToString().c_str());
88 }
89 // We always keep track of num_calls_in_progress_, so that when this
90 // store is resumed, we still have a correct value of
91 // num_calls_in_progress_.
92 GPR_ASSERT(static_cast<int64_t>(num_calls_in_progress_) +
93 value.GetNumCallsInProgressDelta() >=
94 0);
95 num_calls_in_progress_ += value.GetNumCallsInProgressDelta();
96}
97
98void PerBalancerStore::Suspend() {
99 suspended_ = true;
100 load_record_map_.clear();
101 gpr_log(GPR_DEBUG, "[PerBalancerStore %p] Suspended.", this);
102}
103
104void PerBalancerStore::Resume() {
105 suspended_ = false;
106 gpr_log(GPR_DEBUG, "[PerBalancerStore %p] Resumed.", this);
107}
108
109uint64_t PerBalancerStore::GetNumCallsInProgressForReport() {
110 GPR_ASSERT(!suspended_);
111 last_reported_num_calls_in_progress_ = num_calls_in_progress_;
112 return num_calls_in_progress_;
113}
114
115void PerHostStore::ReportStreamCreated(const grpc::string& lb_id,
116 const grpc::string& load_key) {
117 GPR_ASSERT(lb_id != kInvalidLbId);
118 SetUpForNewLbId(lb_id, load_key);
119 // Prior to this one, there was no load balancer receiving report, so we may
120 // have unassigned orphaned stores to assign to this new balancer.
121 // TODO(juanlishen): If the load key of this new stream is the same with
122 // some previously adopted orphan store, we may want to take the orphan to
123 // this stream. Need to discuss with LB team.
124 if (assigned_stores_.size() == 1) {
125 for (const auto& p : per_balancer_stores_) {
126 const grpc::string& other_lb_id = p.first;
127 const std::unique_ptr<PerBalancerStore>& orphaned_store = p.second;
128 if (other_lb_id != lb_id) {
129 orphaned_store->Resume();
130 AssignOrphanedStore(orphaned_store.get(), lb_id);
131 }
132 }
133 }
134 // The first connected balancer will adopt the kInvalidLbId.
135 if (per_balancer_stores_.size() == 1) {
136 SetUpForNewLbId(kInvalidLbId, "");
137 ReportStreamClosed(kInvalidLbId);
138 }
139}
140
141void PerHostStore::ReportStreamClosed(const grpc::string& lb_id) {
142 auto it_store_for_gone_lb = per_balancer_stores_.find(lb_id);
143 GPR_ASSERT(it_store_for_gone_lb != per_balancer_stores_.end());
144 // Remove this closed stream from our records.
145 GPR_ASSERT(UnorderedMapOfSetEraseKeyValue(
146 load_key_to_receiving_lb_ids_, it_store_for_gone_lb->second->load_key(),
147 lb_id));
148 std::set<PerBalancerStore*> orphaned_stores =
149 UnorderedMapOfSetExtract(assigned_stores_, lb_id);
150 // The stores that were assigned to this balancer are orphaned now. They
151 // should be re-assigned to other balancers which are still receiving reports.
152 for (PerBalancerStore* orphaned_store : orphaned_stores) {
153 const grpc::string* new_receiver = nullptr;
154 auto it = load_key_to_receiving_lb_ids_.find(orphaned_store->load_key());
155 if (it != load_key_to_receiving_lb_ids_.end()) {
156 // First, try to pick from the active balancers with the same load key.
157 new_receiver = RandomElement(it->second);
158 } else if (!assigned_stores_.empty()) {
159 // If failed, pick from all the remaining active balancers.
160 new_receiver = &(RandomElement(assigned_stores_)->first);
161 }
162 if (new_receiver != nullptr) {
163 AssignOrphanedStore(orphaned_store, *new_receiver);
164 } else {
165 // Load data for an LB ID that can't be assigned to any stream should
166 // be dropped.
167 orphaned_store->Suspend();
168 }
169 }
170}
171
172PerBalancerStore* PerHostStore::FindPerBalancerStore(
173 const grpc::string& lb_id) const {
174 return per_balancer_stores_.find(lb_id) != per_balancer_stores_.end()
175 ? per_balancer_stores_.find(lb_id)->second.get()
176 : nullptr;
177}
178
179const std::set<PerBalancerStore*>* PerHostStore::GetAssignedStores(
180 const grpc::string& lb_id) const {
181 auto it = assigned_stores_.find(lb_id);
182 if (it == assigned_stores_.end()) return nullptr;
183 return &(it->second);
184}
185
186void PerHostStore::AssignOrphanedStore(PerBalancerStore* orphaned_store,
187 const grpc::string& new_receiver) {
188 auto it = assigned_stores_.find(new_receiver);
189 GPR_ASSERT(it != assigned_stores_.end());
190 it->second.insert(orphaned_store);
191 gpr_log(GPR_INFO,
192 "[PerHostStore %p] Re-assigned orphaned store (%p) with original LB"
193 " ID of %s to new receiver %s",
194 this, orphaned_store, orphaned_store->lb_id().c_str(),
195 new_receiver.c_str());
196}
197
198void PerHostStore::SetUpForNewLbId(const grpc::string& lb_id,
199 const grpc::string& load_key) {
200 // The top-level caller (i.e., LoadReportService) should guarantee the
201 // lb_id is unique for each reporting stream.
202 GPR_ASSERT(per_balancer_stores_.find(lb_id) == per_balancer_stores_.end());
203 GPR_ASSERT(assigned_stores_.find(lb_id) == assigned_stores_.end());
204 load_key_to_receiving_lb_ids_[load_key].insert(lb_id);
205 std::unique_ptr<PerBalancerStore> per_balancer_store(
206 new PerBalancerStore(lb_id, load_key));
207 assigned_stores_[lb_id] = {per_balancer_store.get()};
208 per_balancer_stores_[lb_id] = std::move(per_balancer_store);
209}
210
211PerBalancerStore* LoadDataStore::FindPerBalancerStore(
212 const string& hostname, const string& lb_id) const {
213 auto it = per_host_stores_.find(hostname);
214 if (it != per_host_stores_.end()) {
215 const PerHostStore& per_host_store = it->second;
216 return per_host_store.FindPerBalancerStore(lb_id);
217 } else {
218 return nullptr;
219 }
220}
221
222void LoadDataStore::MergeRow(const grpc::string& hostname,
223 const LoadRecordKey& key,
224 const LoadRecordValue& value) {
225 PerBalancerStore* per_balancer_store =
226 FindPerBalancerStore(hostname, key.lb_id());
227 if (per_balancer_store != nullptr) {
228 per_balancer_store->MergeRow(key, value);
229 return;
230 }
231 // Unknown LB ID. Track it until its number of in-progress calls drops to
232 // zero.
233 int64_t in_progress_delta = value.GetNumCallsInProgressDelta();
234 if (in_progress_delta != 0) {
235 auto it_tracker = unknown_balancer_id_trackers_.find(key.lb_id());
236 if (it_tracker == unknown_balancer_id_trackers_.end()) {
237 gpr_log(
238 GPR_DEBUG,
239 "[LoadDataStore %p] Start tracking unknown balancer (lb_id_: %s).",
240 this, key.lb_id().c_str());
241 unknown_balancer_id_trackers_.insert(
242 {key.lb_id(), static_cast<uint64_t>(in_progress_delta)});
243 } else if ((it_tracker->second += in_progress_delta) == 0) {
244 unknown_balancer_id_trackers_.erase(it_tracker);
245 gpr_log(GPR_DEBUG,
246 "[LoadDataStore %p] Stop tracking unknown balancer (lb_id_: %s).",
247 this, key.lb_id().c_str());
248 }
249 }
250}
251
252const std::set<PerBalancerStore*>* LoadDataStore::GetAssignedStores(
253 const grpc::string& hostname, const grpc::string& lb_id) {
254 auto it = per_host_stores_.find(hostname);
255 if (it == per_host_stores_.end()) return nullptr;
256 return it->second.GetAssignedStores(lb_id);
257}
258
259void LoadDataStore::ReportStreamCreated(const grpc::string& hostname,
260 const grpc::string& lb_id,
261 const grpc::string& load_key) {
262 per_host_stores_[hostname].ReportStreamCreated(lb_id, load_key);
263}
264
265void LoadDataStore::ReportStreamClosed(const grpc::string& hostname,
266 const grpc::string& lb_id) {
267 auto it_per_host_store = per_host_stores_.find(hostname);
268 GPR_ASSERT(it_per_host_store != per_host_stores_.end());
269 it_per_host_store->second.ReportStreamClosed(lb_id);
270}
271
272} // namespace load_reporter
273} // namespace grpc