blob: dbc4b5faccb019d8aaca1b687562ff1d83af210a [file] [log] [blame]
Yurii Karabas0ec34ca2020-11-24 20:08:54 +02001"""Event loop mixins."""
2
3import threading
4from . import events
5
6_global_lock = threading.Lock()
7
8
9class _LoopBoundedMixin:
10 _loop = None
11
12 def _get_loop(self):
13 loop = events._get_running_loop()
14
15 if self._loop is None:
16 with _global_lock:
17 if self._loop is None:
18 self._loop = loop
19 if loop is not self._loop:
20 raise RuntimeError(f'{type(self).__name__} have already bounded to another loop')
21 return loop