blob: 52215cbde22f050f5661adf914724d57fc63b3a2 [file] [log] [blame]
Christopher Wilcox66fc1802018-10-16 09:06:47 -07001# Copyright 2018, Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
Peter Lamut7515c072019-06-18 20:25:44 +020015import datetime
Christopher Wilcox66fc1802018-10-16 09:06:47 -070016import logging
17import threading
18
19import grpc
20import mock
21import pytest
22from six.moves import queue
23
Christopher Wilcox66fc1802018-10-16 09:06:47 -070024from google.api_core import bidi
Tres Seaver151a0c12018-10-17 15:31:54 -040025from google.api_core import exceptions
Christopher Wilcox66fc1802018-10-16 09:06:47 -070026
27
28class Test_RequestQueueGenerator(object):
Christopher Wilcox66fc1802018-10-16 09:06:47 -070029 def test_bounded_consume(self):
30 call = mock.create_autospec(grpc.Call, instance=True)
31 call.is_active.return_value = True
32
33 def queue_generator(rpc):
34 yield mock.sentinel.A
35 yield queue.Empty()
36 yield mock.sentinel.B
37 rpc.is_active.return_value = False
38 yield mock.sentinel.C
39
40 q = mock.create_autospec(queue.Queue, instance=True)
41 q.get.side_effect = queue_generator(call)
42
43 generator = bidi._RequestQueueGenerator(q)
44 generator.call = call
45
46 items = list(generator)
47
48 assert items == [mock.sentinel.A, mock.sentinel.B]
49
50 def test_yield_initial_and_exit(self):
51 q = mock.create_autospec(queue.Queue, instance=True)
52 q.get.side_effect = queue.Empty()
53 call = mock.create_autospec(grpc.Call, instance=True)
54 call.is_active.return_value = False
55
Christopher Wilcox6f4070d2018-11-29 11:02:52 -080056 generator = bidi._RequestQueueGenerator(q, initial_request=mock.sentinel.A)
Christopher Wilcox66fc1802018-10-16 09:06:47 -070057 generator.call = call
58
59 items = list(generator)
60
61 assert items == [mock.sentinel.A]
62
63 def test_yield_initial_callable_and_exit(self):
64 q = mock.create_autospec(queue.Queue, instance=True)
65 q.get.side_effect = queue.Empty()
66 call = mock.create_autospec(grpc.Call, instance=True)
67 call.is_active.return_value = False
68
69 generator = bidi._RequestQueueGenerator(
Christopher Wilcox6f4070d2018-11-29 11:02:52 -080070 q, initial_request=lambda: mock.sentinel.A
71 )
Christopher Wilcox66fc1802018-10-16 09:06:47 -070072 generator.call = call
73
74 items = list(generator)
75
76 assert items == [mock.sentinel.A]
77
78 def test_exit_when_inactive_with_item(self):
79 q = mock.create_autospec(queue.Queue, instance=True)
80 q.get.side_effect = [mock.sentinel.A, queue.Empty()]
81 call = mock.create_autospec(grpc.Call, instance=True)
82 call.is_active.return_value = False
83
84 generator = bidi._RequestQueueGenerator(q)
85 generator.call = call
86
87 items = list(generator)
88
89 assert items == []
90 # Make sure it put the item back.
91 q.put.assert_called_once_with(mock.sentinel.A)
92
93 def test_exit_when_inactive_empty(self):
94 q = mock.create_autospec(queue.Queue, instance=True)
95 q.get.side_effect = queue.Empty()
96 call = mock.create_autospec(grpc.Call, instance=True)
97 call.is_active.return_value = False
98
99 generator = bidi._RequestQueueGenerator(q)
100 generator.call = call
101
102 items = list(generator)
103
104 assert items == []
105
106 def test_exit_with_stop(self):
107 q = mock.create_autospec(queue.Queue, instance=True)
108 q.get.side_effect = [None, queue.Empty()]
109 call = mock.create_autospec(grpc.Call, instance=True)
110 call.is_active.return_value = True
111
112 generator = bidi._RequestQueueGenerator(q)
113 generator.call = call
114
115 items = list(generator)
116
117 assert items == []
118
119
Peter Lamut7515c072019-06-18 20:25:44 +0200120class Test_Throttle(object):
121 def test_repr(self):
122 delta = datetime.timedelta(seconds=4.5)
123 instance = bidi._Throttle(access_limit=42, time_window=delta)
124 assert repr(instance) == \
125 "_Throttle(access_limit=42, time_window={})".format(repr(delta))
126
127 def test_raises_error_on_invalid_init_arguments(self):
128 with pytest.raises(ValueError) as exc_info:
129 bidi._Throttle(
130 access_limit=10, time_window=datetime.timedelta(seconds=0.0)
131 )
132 assert "time_window" in str(exc_info.value)
133 assert "must be a positive timedelta" in str(exc_info.value)
134
135 with pytest.raises(ValueError) as exc_info:
136 bidi._Throttle(
137 access_limit=0, time_window=datetime.timedelta(seconds=10)
138 )
139 assert "access_limit" in str(exc_info.value)
140 assert "must be positive" in str(exc_info.value)
141
142 def test_does_not_delay_entry_attempts_under_threshold(self):
143 throttle = bidi._Throttle(
144 access_limit=3, time_window=datetime.timedelta(seconds=1)
145 )
146 entries = []
147
148 for _ in range(3):
149 with throttle as time_waited:
150 entry_info = {
151 "entered_at": datetime.datetime.now(),
152 "reported_wait": time_waited,
153 }
154 entries.append(entry_info)
155
156 # check the reported wait times ...
157 assert all(entry["reported_wait"] == 0.0 for entry in entries)
158
159 # .. and the actual wait times
160 delta = entries[1]["entered_at"] - entries[0]["entered_at"]
161 assert delta.total_seconds() < 0.1
162 delta = entries[2]["entered_at"] - entries[1]["entered_at"]
163 assert delta.total_seconds() < 0.1
164
165 def test_delays_entry_attempts_above_threshold(self):
166 throttle = bidi._Throttle(
167 access_limit=3, time_window=datetime.timedelta(seconds=1)
168 )
169 entries = []
170
171 for _ in range(6):
172 with throttle as time_waited:
173 entry_info = {
174 "entered_at": datetime.datetime.now(),
175 "reported_wait": time_waited,
176 }
177 entries.append(entry_info)
178
179 # For each group of 4 consecutive entries the time difference between
180 # the first and the last entry must have been greater than time_window,
181 # because a maximum of 3 are allowed in each time_window.
182 for i, entry in enumerate(entries[3:], start=3):
183 first_entry = entries[i - 3]
184 delta = entry["entered_at"] - first_entry["entered_at"]
185 assert delta.total_seconds() > 1.0
186
187 # check the reported wait times
188 # (NOTE: not using assert all(...), b/c the coverage check would complain)
189 for i, entry in enumerate(entries):
190 if i != 3:
191 assert entry["reported_wait"] == 0.0
192
193 # The delayed entry is expected to have been delayed for a significant
194 # chunk of the full second, and the actual and reported delay times
195 # should reflect that.
196 assert entries[3]["reported_wait"] > 0.7
197 delta = entries[3]["entered_at"] - entries[2]["entered_at"]
198 assert delta.total_seconds() > 0.7
199
200
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700201class _CallAndFuture(grpc.Call, grpc.Future):
202 pass
203
204
205def make_rpc():
206 """Makes a mock RPC used to test Bidi classes."""
207 call = mock.create_autospec(_CallAndFuture, instance=True)
208 rpc = mock.create_autospec(grpc.StreamStreamMultiCallable, instance=True)
209
Christopher Wilcox2576a1b2019-03-27 10:58:39 -0700210 def rpc_side_effect(request, metadata=None):
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700211 call.is_active.return_value = True
212 call.request = request
Christopher Wilcox2576a1b2019-03-27 10:58:39 -0700213 call.metadata = metadata
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700214 return call
215
216 rpc.side_effect = rpc_side_effect
217
218 def cancel_side_effect():
219 call.is_active.return_value = False
220
221 call.cancel.side_effect = cancel_side_effect
222
223 return rpc, call
224
225
226class ClosedCall(object):
227 # NOTE: This is needed because defining `.next` on an **instance**
228 # rather than the **class** will not be iterable in Python 2.
229 # This is problematic since a `Mock` just sets members.
230
231 def __init__(self, exception):
232 self.exception = exception
233
234 def __next__(self):
235 raise self.exception
236
237 next = __next__ # Python 2
238
239 def is_active(self):
240 return False
241
242
243class TestBidiRpc(object):
244 def test_initial_state(self):
245 bidi_rpc = bidi.BidiRpc(None)
246
247 assert bidi_rpc.is_active is False
248
249 def test_done_callbacks(self):
250 bidi_rpc = bidi.BidiRpc(None)
Christopher Wilcox6f4070d2018-11-29 11:02:52 -0800251 callback = mock.Mock(spec=["__call__"])
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700252
253 bidi_rpc.add_done_callback(callback)
254 bidi_rpc._on_call_done(mock.sentinel.future)
255
256 callback.assert_called_once_with(mock.sentinel.future)
257
Christopher Wilcox2576a1b2019-03-27 10:58:39 -0700258 def test_metadata(self):
259 rpc, call = make_rpc()
260 bidi_rpc = bidi.BidiRpc(rpc, metadata=mock.sentinel.A)
261 assert bidi_rpc._rpc_metadata == mock.sentinel.A
262
263 bidi_rpc.open()
264 assert bidi_rpc.call == call
265 assert bidi_rpc.call.metadata == mock.sentinel.A
266
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700267 def test_open(self):
268 rpc, call = make_rpc()
269 bidi_rpc = bidi.BidiRpc(rpc)
270
271 bidi_rpc.open()
272
273 assert bidi_rpc.call == call
274 assert bidi_rpc.is_active
275 call.add_done_callback.assert_called_once_with(bidi_rpc._on_call_done)
276
277 def test_open_error_already_open(self):
278 rpc, _ = make_rpc()
279 bidi_rpc = bidi.BidiRpc(rpc)
280
281 bidi_rpc.open()
282
283 with pytest.raises(ValueError):
284 bidi_rpc.open()
285
286 def test_close(self):
287 rpc, call = make_rpc()
288 bidi_rpc = bidi.BidiRpc(rpc)
289 bidi_rpc.open()
290
291 bidi_rpc.close()
292
293 call.cancel.assert_called_once()
294 assert bidi_rpc.call == call
295 assert bidi_rpc.is_active is False
296 # ensure the request queue was signaled to stop.
297 assert bidi_rpc.pending_requests == 1
298 assert bidi_rpc._request_queue.get() is None
299
300 def test_close_no_rpc(self):
301 bidi_rpc = bidi.BidiRpc(None)
302 bidi_rpc.close()
303
304 def test_send(self):
305 rpc, call = make_rpc()
306 bidi_rpc = bidi.BidiRpc(rpc)
307 bidi_rpc.open()
308
309 bidi_rpc.send(mock.sentinel.request)
310
311 assert bidi_rpc.pending_requests == 1
312 assert bidi_rpc._request_queue.get() is mock.sentinel.request
313
314 def test_send_not_open(self):
315 rpc, call = make_rpc()
316 bidi_rpc = bidi.BidiRpc(rpc)
317
318 with pytest.raises(ValueError):
319 bidi_rpc.send(mock.sentinel.request)
320
321 def test_send_dead_rpc(self):
322 error = ValueError()
323 bidi_rpc = bidi.BidiRpc(None)
324 bidi_rpc.call = ClosedCall(error)
325
326 with pytest.raises(ValueError) as exc_info:
327 bidi_rpc.send(mock.sentinel.request)
328
329 assert exc_info.value == error
330
331 def test_recv(self):
332 bidi_rpc = bidi.BidiRpc(None)
333 bidi_rpc.call = iter([mock.sentinel.response])
334
335 response = bidi_rpc.recv()
336
337 assert response == mock.sentinel.response
338
339 def test_recv_not_open(self):
340 rpc, call = make_rpc()
341 bidi_rpc = bidi.BidiRpc(rpc)
342
343 with pytest.raises(ValueError):
344 bidi_rpc.recv()
345
346
347class CallStub(object):
348 def __init__(self, values, active=True):
349 self.values = iter(values)
350 self._is_active = active
351 self.cancelled = False
352
353 def __next__(self):
354 item = next(self.values)
355 if isinstance(item, Exception):
356 self._is_active = False
357 raise item
358 return item
359
360 next = __next__ # Python 2
361
362 def is_active(self):
363 return self._is_active
364
365 def add_done_callback(self, callback):
366 pass
367
368 def cancel(self):
369 self.cancelled = True
370
371
372class TestResumableBidiRpc(object):
Tres Seaver461f2cc2019-07-17 13:03:13 -0400373 def test_ctor_defaults(self):
374 start_rpc = mock.Mock()
375 should_recover = mock.Mock()
376 bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700377
378 assert bidi_rpc.is_active is False
Tres Seaver461f2cc2019-07-17 13:03:13 -0400379 assert bidi_rpc._finalized is False
380 assert bidi_rpc._start_rpc is start_rpc
381 assert bidi_rpc._should_recover is should_recover
382 assert bidi_rpc._should_terminate is bidi._never_terminate
383 assert bidi_rpc._initial_request is None
384 assert bidi_rpc._rpc_metadata is None
385 assert bidi_rpc._reopen_throttle is None
386
387 def test_ctor_explicit(self):
388 start_rpc = mock.Mock()
389 should_recover = mock.Mock()
390 should_terminate = mock.Mock()
391 initial_request = mock.Mock()
392 metadata = {"x-foo": "bar"}
393 bidi_rpc = bidi.ResumableBidiRpc(
394 start_rpc,
395 should_recover,
396 should_terminate=should_terminate,
397 initial_request=initial_request,
398 metadata=metadata,
399 throttle_reopen=True,
400 )
401
402 assert bidi_rpc.is_active is False
403 assert bidi_rpc._finalized is False
404 assert bidi_rpc._should_recover is should_recover
405 assert bidi_rpc._should_terminate is should_terminate
406 assert bidi_rpc._initial_request is initial_request
407 assert bidi_rpc._rpc_metadata == metadata
408 assert isinstance(bidi_rpc._reopen_throttle, bidi._Throttle)
409
410 def test_done_callbacks_terminate(self):
411 cancellation = mock.Mock()
412 start_rpc = mock.Mock()
413 should_recover = mock.Mock(spec=["__call__"], return_value=True)
414 should_terminate = mock.Mock(spec=["__call__"], return_value=True)
415 bidi_rpc = bidi.ResumableBidiRpc(
416 start_rpc, should_recover, should_terminate=should_terminate
417 )
418 callback = mock.Mock(spec=["__call__"])
419
420 bidi_rpc.add_done_callback(callback)
421 bidi_rpc._on_call_done(cancellation)
422
423 should_terminate.assert_called_once_with(cancellation)
424 should_recover.assert_not_called()
425 callback.assert_called_once_with(cancellation)
426 assert not bidi_rpc.is_active
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700427
428 def test_done_callbacks_recoverable(self):
Christopher Wilcox6f4070d2018-11-29 11:02:52 -0800429 start_rpc = mock.create_autospec(grpc.StreamStreamMultiCallable, instance=True)
Tres Seaver461f2cc2019-07-17 13:03:13 -0400430 should_recover = mock.Mock(spec=["__call__"], return_value=True)
431 bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
Christopher Wilcox6f4070d2018-11-29 11:02:52 -0800432 callback = mock.Mock(spec=["__call__"])
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700433
434 bidi_rpc.add_done_callback(callback)
435 bidi_rpc._on_call_done(mock.sentinel.future)
436
437 callback.assert_not_called()
438 start_rpc.assert_called_once()
Tres Seaver461f2cc2019-07-17 13:03:13 -0400439 should_recover.assert_called_once_with(mock.sentinel.future)
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700440 assert bidi_rpc.is_active
441
442 def test_done_callbacks_non_recoverable(self):
Tres Seaver461f2cc2019-07-17 13:03:13 -0400443 start_rpc = mock.create_autospec(grpc.StreamStreamMultiCallable, instance=True)
444 should_recover = mock.Mock(spec=["__call__"], return_value=False)
445 bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
Christopher Wilcox6f4070d2018-11-29 11:02:52 -0800446 callback = mock.Mock(spec=["__call__"])
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700447
448 bidi_rpc.add_done_callback(callback)
449 bidi_rpc._on_call_done(mock.sentinel.future)
450
451 callback.assert_called_once_with(mock.sentinel.future)
Tres Seaver461f2cc2019-07-17 13:03:13 -0400452 should_recover.assert_called_once_with(mock.sentinel.future)
453 assert not bidi_rpc.is_active
454
455 def test_send_terminate(self):
456 cancellation = ValueError()
457 call_1 = CallStub([cancellation], active=False)
458 call_2 = CallStub([])
459 start_rpc = mock.create_autospec(
460 grpc.StreamStreamMultiCallable, instance=True, side_effect=[call_1, call_2]
461 )
462 should_recover = mock.Mock(spec=["__call__"], return_value=False)
463 should_terminate = mock.Mock(spec=["__call__"], return_value=True)
464 bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover, should_terminate=should_terminate)
465
466 bidi_rpc.open()
467
468 bidi_rpc.send(mock.sentinel.request)
469
470 assert bidi_rpc.pending_requests == 1
471 assert bidi_rpc._request_queue.get() is None
472
473 should_recover.assert_not_called()
474 should_terminate.assert_called_once_with(cancellation)
475 assert bidi_rpc.call == call_1
476 assert bidi_rpc.is_active is False
477 assert call_1.cancelled is True
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700478
479 def test_send_recover(self):
480 error = ValueError()
481 call_1 = CallStub([error], active=False)
482 call_2 = CallStub([])
483 start_rpc = mock.create_autospec(
Christopher Wilcox6f4070d2018-11-29 11:02:52 -0800484 grpc.StreamStreamMultiCallable, instance=True, side_effect=[call_1, call_2]
485 )
486 should_recover = mock.Mock(spec=["__call__"], return_value=True)
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700487 bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
488
489 bidi_rpc.open()
490
491 bidi_rpc.send(mock.sentinel.request)
492
493 assert bidi_rpc.pending_requests == 1
494 assert bidi_rpc._request_queue.get() is mock.sentinel.request
495
496 should_recover.assert_called_once_with(error)
497 assert bidi_rpc.call == call_2
498 assert bidi_rpc.is_active is True
499
500 def test_send_failure(self):
501 error = ValueError()
502 call = CallStub([error], active=False)
503 start_rpc = mock.create_autospec(
Christopher Wilcox6f4070d2018-11-29 11:02:52 -0800504 grpc.StreamStreamMultiCallable, instance=True, return_value=call
505 )
506 should_recover = mock.Mock(spec=["__call__"], return_value=False)
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700507 bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
508
509 bidi_rpc.open()
510
511 with pytest.raises(ValueError) as exc_info:
512 bidi_rpc.send(mock.sentinel.request)
513
514 assert exc_info.value == error
515 should_recover.assert_called_once_with(error)
516 assert bidi_rpc.call == call
517 assert bidi_rpc.is_active is False
518 assert call.cancelled is True
519 assert bidi_rpc.pending_requests == 1
520 assert bidi_rpc._request_queue.get() is None
521
Tres Seaver461f2cc2019-07-17 13:03:13 -0400522 def test_recv_terminate(self):
523 cancellation = ValueError()
524 call = CallStub([cancellation])
525 start_rpc = mock.create_autospec(
526 grpc.StreamStreamMultiCallable, instance=True, return_value=call
527 )
528 should_recover = mock.Mock(spec=["__call__"], return_value=False)
529 should_terminate = mock.Mock(spec=["__call__"], return_value=True)
530 bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover, should_terminate=should_terminate)
531
532 bidi_rpc.open()
533
534 bidi_rpc.recv()
535
536 should_recover.assert_not_called()
537 should_terminate.assert_called_once_with(cancellation)
538 assert bidi_rpc.call == call
539 assert bidi_rpc.is_active is False
540 assert call.cancelled is True
541
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700542 def test_recv_recover(self):
543 error = ValueError()
544 call_1 = CallStub([1, error])
545 call_2 = CallStub([2, 3])
546 start_rpc = mock.create_autospec(
Christopher Wilcox6f4070d2018-11-29 11:02:52 -0800547 grpc.StreamStreamMultiCallable, instance=True, side_effect=[call_1, call_2]
548 )
549 should_recover = mock.Mock(spec=["__call__"], return_value=True)
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700550 bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
551
552 bidi_rpc.open()
553
554 values = []
555 for n in range(3):
556 values.append(bidi_rpc.recv())
557
558 assert values == [1, 2, 3]
559 should_recover.assert_called_once_with(error)
560 assert bidi_rpc.call == call_2
561 assert bidi_rpc.is_active is True
562
563 def test_recv_recover_already_recovered(self):
564 call_1 = CallStub([])
565 call_2 = CallStub([])
566 start_rpc = mock.create_autospec(
Christopher Wilcox6f4070d2018-11-29 11:02:52 -0800567 grpc.StreamStreamMultiCallable, instance=True, side_effect=[call_1, call_2]
568 )
Tres Seaver108f9d42018-10-17 15:57:41 -0400569 callback = mock.Mock()
570 callback.return_value = True
571 bidi_rpc = bidi.ResumableBidiRpc(start_rpc, callback)
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700572
573 bidi_rpc.open()
574
575 bidi_rpc._reopen()
576
577 assert bidi_rpc.call is call_1
578 assert bidi_rpc.is_active is True
579
580 def test_recv_failure(self):
581 error = ValueError()
582 call = CallStub([error])
583 start_rpc = mock.create_autospec(
Christopher Wilcox6f4070d2018-11-29 11:02:52 -0800584 grpc.StreamStreamMultiCallable, instance=True, return_value=call
585 )
586 should_recover = mock.Mock(spec=["__call__"], return_value=False)
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700587 bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
588
589 bidi_rpc.open()
590
591 with pytest.raises(ValueError) as exc_info:
592 bidi_rpc.recv()
593
594 assert exc_info.value == error
595 should_recover.assert_called_once_with(error)
596 assert bidi_rpc.call == call
597 assert bidi_rpc.is_active is False
598 assert call.cancelled is True
599
Tres Seaverac8716d2019-10-02 18:15:44 -0400600 def test_close(self):
601 call = mock.create_autospec(_CallAndFuture, instance=True)
602
603 def cancel_side_effect():
604 call.is_active.return_value = False
605
606 call.cancel.side_effect = cancel_side_effect
607 start_rpc = mock.create_autospec(
608 grpc.StreamStreamMultiCallable, instance=True, return_value=call
609 )
610 should_recover = mock.Mock(spec=["__call__"], return_value=False)
611 bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
612 bidi_rpc.open()
613
614 bidi_rpc.close()
615
616 should_recover.assert_not_called()
617 call.cancel.assert_called_once()
618 assert bidi_rpc.call == call
619 assert bidi_rpc.is_active is False
620 # ensure the request queue was signaled to stop.
621 assert bidi_rpc.pending_requests == 1
622 assert bidi_rpc._request_queue.get() is None
623 assert bidi_rpc._finalized
624
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700625 def test_reopen_failure_on_rpc_restart(self):
Christopher Wilcox6f4070d2018-11-29 11:02:52 -0800626 error1 = ValueError("1")
627 error2 = ValueError("2")
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700628 call = CallStub([error1])
629 # Invoking start RPC a second time will trigger an error.
630 start_rpc = mock.create_autospec(
Christopher Wilcox6f4070d2018-11-29 11:02:52 -0800631 grpc.StreamStreamMultiCallable, instance=True, side_effect=[call, error2]
632 )
633 should_recover = mock.Mock(spec=["__call__"], return_value=True)
634 callback = mock.Mock(spec=["__call__"])
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700635
636 bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
637 bidi_rpc.add_done_callback(callback)
638
639 bidi_rpc.open()
640
641 with pytest.raises(ValueError) as exc_info:
642 bidi_rpc.recv()
643
644 assert exc_info.value == error2
645 should_recover.assert_called_once_with(error1)
646 assert bidi_rpc.call is None
647 assert bidi_rpc.is_active is False
648 callback.assert_called_once_with(error2)
649
Peter Lamut7515c072019-06-18 20:25:44 +0200650 def test_using_throttle_on_reopen_requests(self):
651 call = CallStub([])
652 start_rpc = mock.create_autospec(
653 grpc.StreamStreamMultiCallable, instance=True, return_value=call
654 )
655 should_recover = mock.Mock(spec=["__call__"], return_value=True)
656 bidi_rpc = bidi.ResumableBidiRpc(
657 start_rpc, should_recover, throttle_reopen=True
658 )
659
660 patcher = mock.patch.object(bidi_rpc._reopen_throttle.__class__, "__enter__")
661 with patcher as mock_enter:
662 bidi_rpc._reopen()
663
664 mock_enter.assert_called_once()
665
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700666 def test_send_not_open(self):
667 bidi_rpc = bidi.ResumableBidiRpc(None, lambda _: False)
668
669 with pytest.raises(ValueError):
670 bidi_rpc.send(mock.sentinel.request)
671
672 def test_recv_not_open(self):
673 bidi_rpc = bidi.ResumableBidiRpc(None, lambda _: False)
674
675 with pytest.raises(ValueError):
676 bidi_rpc.recv()
677
678 def test_finalize_idempotent(self):
Christopher Wilcox6f4070d2018-11-29 11:02:52 -0800679 error1 = ValueError("1")
680 error2 = ValueError("2")
681 callback = mock.Mock(spec=["__call__"])
682 should_recover = mock.Mock(spec=["__call__"], return_value=False)
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700683
Christopher Wilcox6f4070d2018-11-29 11:02:52 -0800684 bidi_rpc = bidi.ResumableBidiRpc(mock.sentinel.start_rpc, should_recover)
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700685
686 bidi_rpc.add_done_callback(callback)
687
688 bidi_rpc._on_call_done(error1)
689 bidi_rpc._on_call_done(error2)
690
691 callback.assert_called_once_with(error1)
692
693
694class TestBackgroundConsumer(object):
695 def test_consume_once_then_exit(self):
696 bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True)
697 bidi_rpc.is_active = True
698 bidi_rpc.recv.side_effect = [mock.sentinel.response_1]
699 recved = threading.Event()
700
701 def on_response(response):
702 assert response == mock.sentinel.response_1
703 bidi_rpc.is_active = False
704 recved.set()
705
706 consumer = bidi.BackgroundConsumer(bidi_rpc, on_response)
707
708 consumer.start()
709
710 recved.wait()
711
712 bidi_rpc.recv.assert_called_once()
713 assert bidi_rpc.is_active is False
714
715 consumer.stop()
716
717 bidi_rpc.close.assert_called_once()
718 assert consumer.is_active is False
719
720 def test_pause_resume_and_close(self):
721 # This test is relatively complex. It attempts to start the consumer,
722 # consume one item, pause the consumer, check the state of the world,
723 # then resume the consumer. Doing this in a deterministic fashion
724 # requires a bit more mocking and patching than usual.
725
726 bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True)
727 bidi_rpc.is_active = True
728
729 def close_side_effect():
730 bidi_rpc.is_active = False
731
732 bidi_rpc.close.side_effect = close_side_effect
733
734 # These are used to coordinate the two threads to ensure deterministic
735 # execution.
736 should_continue = threading.Event()
737 responses_and_events = {
738 mock.sentinel.response_1: threading.Event(),
Christopher Wilcox6f4070d2018-11-29 11:02:52 -0800739 mock.sentinel.response_2: threading.Event(),
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700740 }
Christopher Wilcox6f4070d2018-11-29 11:02:52 -0800741 bidi_rpc.recv.side_effect = [mock.sentinel.response_1, mock.sentinel.response_2]
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700742
743 recved_responses = []
744 consumer = None
745
746 def on_response(response):
747 if response == mock.sentinel.response_1:
748 consumer.pause()
749
750 recved_responses.append(response)
751 responses_and_events[response].set()
752 should_continue.wait()
753
754 consumer = bidi.BackgroundConsumer(bidi_rpc, on_response)
755
756 consumer.start()
757
758 # Wait for the first response to be recved.
759 responses_and_events[mock.sentinel.response_1].wait()
760
761 # Ensure only one item has been recved and that the consumer is paused.
762 assert recved_responses == [mock.sentinel.response_1]
763 assert consumer.is_paused is True
764 assert consumer.is_active is True
765
766 # Unpause the consumer, wait for the second item, then close the
767 # consumer.
768 should_continue.set()
769 consumer.resume()
770
771 responses_and_events[mock.sentinel.response_2].wait()
772
Christopher Wilcox6f4070d2018-11-29 11:02:52 -0800773 assert recved_responses == [mock.sentinel.response_1, mock.sentinel.response_2]
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700774
775 consumer.stop()
776
777 assert consumer.is_active is False
778
779 def test_wake_on_error(self):
780 should_continue = threading.Event()
781
782 bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True)
783 bidi_rpc.is_active = True
Christopher Wilcox6f4070d2018-11-29 11:02:52 -0800784 bidi_rpc.add_done_callback.side_effect = lambda _: should_continue.set()
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700785
786 consumer = bidi.BackgroundConsumer(bidi_rpc, mock.sentinel.on_response)
787
788 # Start the consumer paused, which should immediately put it into wait
789 # state.
790 consumer.pause()
791 consumer.start()
792
793 # Wait for add_done_callback to be called
794 should_continue.wait()
Christopher Wilcox6f4070d2018-11-29 11:02:52 -0800795 bidi_rpc.add_done_callback.assert_called_once_with(consumer._on_call_done)
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700796
797 # The consumer should now be blocked on waiting to be unpaused.
798 assert consumer.is_active
799 assert consumer.is_paused
800
801 # Trigger the done callback, it should unpause the consumer and cause
802 # it to exit.
803 bidi_rpc.is_active = False
804 consumer._on_call_done(bidi_rpc)
805
806 # It may take a few cycles for the thread to exit.
807 while consumer.is_active:
808 pass
809
810 def test_consumer_expected_error(self, caplog):
811 caplog.set_level(logging.DEBUG)
812
813 bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True)
814 bidi_rpc.is_active = True
Christopher Wilcox6f4070d2018-11-29 11:02:52 -0800815 bidi_rpc.recv.side_effect = exceptions.ServiceUnavailable("Gone away")
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700816
Christopher Wilcox6f4070d2018-11-29 11:02:52 -0800817 on_response = mock.Mock(spec=["__call__"])
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700818
819 consumer = bidi.BackgroundConsumer(bidi_rpc, on_response)
820
821 consumer.start()
822
823 # Wait for the consumer's thread to exit.
824 while consumer.is_active:
825 pass
826
827 on_response.assert_not_called()
828 bidi_rpc.recv.assert_called_once()
Christopher Wilcox6f4070d2018-11-29 11:02:52 -0800829 assert "caught error" in caplog.text
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700830
831 def test_consumer_unexpected_error(self, caplog):
832 caplog.set_level(logging.DEBUG)
833
834 bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True)
835 bidi_rpc.is_active = True
836 bidi_rpc.recv.side_effect = ValueError()
837
Christopher Wilcox6f4070d2018-11-29 11:02:52 -0800838 on_response = mock.Mock(spec=["__call__"])
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700839
840 consumer = bidi.BackgroundConsumer(bidi_rpc, on_response)
841
842 consumer.start()
843
844 # Wait for the consumer's thread to exit.
845 while consumer.is_active:
846 pass
847
848 on_response.assert_not_called()
849 bidi_rpc.recv.assert_called_once()
Christopher Wilcox6f4070d2018-11-29 11:02:52 -0800850 assert "caught unexpected exception" in caplog.text
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700851
852 def test_double_stop(self, caplog):
853 caplog.set_level(logging.DEBUG)
854 bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True)
855 bidi_rpc.is_active = True
Christopher Wilcox6f4070d2018-11-29 11:02:52 -0800856 on_response = mock.Mock(spec=["__call__"])
Christopher Wilcox66fc1802018-10-16 09:06:47 -0700857
858 def close_side_effect():
859 bidi_rpc.is_active = False
860
861 bidi_rpc.close.side_effect = close_side_effect
862
863 consumer = bidi.BackgroundConsumer(bidi_rpc, on_response)
864
865 consumer.start()
866 assert consumer.is_active is True
867
868 consumer.stop()
869 assert consumer.is_active is False
870
871 # calling stop twice should not result in an error.
872 consumer.stop()