blob: cf7b54007bbe75bc2f9fc9e9daf05e31489797aa [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Support for tasks, coroutines and the scheduler."""
2
3__all__ = ['coroutine', 'Task',
Guido van Rossum68816ef2013-12-28 08:06:40 -10004 'iscoroutinefunction', 'iscoroutine',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
Guido van Rossumde3a1362013-11-29 09:29:00 -08007 'gather', 'shield',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008 ]
9
10import collections
11import concurrent.futures
12import functools
13import inspect
14import linecache
Victor Stinner0f3e6bc2014-02-19 23:15:02 +010015import os
16import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017import traceback
18import weakref
19
20from . import events
21from . import futures
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070022from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023
24# If you set _DEBUG to true, @coroutine will wrap the resulting
25# generator objects in a CoroWrapper instance (defined below). That
26# instance will log a message when the generator is never iterated
27# over, which may happen when you forget to use "yield from" with a
28# coroutine call. Note that the value of the _DEBUG flag is taken
29# when the decorator is used, so to be of any use it must be set
30# before you define your coroutines. A downside of using this feature
31# is that tracebacks show entries for the CoroWrapper.__next__ method
32# when _DEBUG is true.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +010033_DEBUG = (not sys.flags.ignore_environment
34 and bool(os.environ.get('PYTHONASYNCIODEBUG')))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070035
36
37class CoroWrapper:
Guido van Rossume1f55442014-01-16 11:05:23 -080038 # Wrapper for coroutine in _DEBUG mode.
39
Victor Stinnerbac77932014-01-16 01:55:29 +010040 __slots__ = ['gen', 'func', '__name__', '__doc__']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070041
42 def __init__(self, gen, func):
43 assert inspect.isgenerator(gen), gen
44 self.gen = gen
45 self.func = func
46
47 def __iter__(self):
48 return self
49
50 def __next__(self):
51 return next(self.gen)
52
53 def send(self, value):
54 return self.gen.send(value)
55
56 def throw(self, exc):
57 return self.gen.throw(exc)
58
59 def close(self):
60 return self.gen.close()
61
62 def __del__(self):
63 frame = self.gen.gi_frame
64 if frame is not None and frame.f_lasti == -1:
65 func = self.func
66 code = func.__code__
67 filename = code.co_filename
68 lineno = code.co_firstlineno
Guido van Rossum2b430b82013-11-01 14:13:30 -070069 logger.error(
70 'Coroutine %r defined at %s:%s was never yielded from',
71 func.__name__, filename, lineno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070072
73
74def coroutine(func):
75 """Decorator to mark coroutines.
76
77 If the coroutine is not yielded from before it is destroyed,
78 an error message is logged.
79 """
80 if inspect.isgeneratorfunction(func):
81 coro = func
82 else:
83 @functools.wraps(func)
84 def coro(*args, **kw):
85 res = func(*args, **kw)
86 if isinstance(res, futures.Future) or inspect.isgenerator(res):
87 res = yield from res
88 return res
89
90 if not _DEBUG:
91 wrapper = coro
92 else:
93 @functools.wraps(func)
94 def wrapper(*args, **kwds):
95 w = CoroWrapper(coro(*args, **kwds), func)
96 w.__name__ = coro.__name__
97 w.__doc__ = coro.__doc__
98 return w
99
100 wrapper._is_coroutine = True # For iscoroutinefunction().
101 return wrapper
102
103
104def iscoroutinefunction(func):
105 """Return True if func is a decorated coroutine function."""
106 return getattr(func, '_is_coroutine', False)
107
108
109def iscoroutine(obj):
110 """Return True if obj is a coroutine object."""
111 return isinstance(obj, CoroWrapper) or inspect.isgenerator(obj)
112
113
114class Task(futures.Future):
115 """A coroutine wrapped in a Future."""
116
117 # An important invariant maintained while a Task not done:
118 #
119 # - Either _fut_waiter is None, and _step() is scheduled;
120 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
121 #
122 # The only transition from the latter to the former is through
123 # _wakeup(). When _fut_waiter is not None, one of its callbacks
124 # must be _wakeup().
125
126 # Weak set containing all tasks alive.
127 _all_tasks = weakref.WeakSet()
128
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800129 # Dictionary containing tasks that are currently active in
130 # all running event loops. {EventLoop: Task}
131 _current_tasks = {}
132
133 @classmethod
134 def current_task(cls, loop=None):
135 """Return the currently running task in an event loop or None.
136
137 By default the current task for the current event loop is returned.
138
139 None is returned when called not in the context of a Task.
140 """
141 if loop is None:
142 loop = events.get_event_loop()
143 return cls._current_tasks.get(loop)
144
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700145 @classmethod
146 def all_tasks(cls, loop=None):
147 """Return a set of all tasks for an event loop.
148
149 By default all tasks for the current event loop are returned.
150 """
151 if loop is None:
152 loop = events.get_event_loop()
153 return {t for t in cls._all_tasks if t._loop is loop}
154
155 def __init__(self, coro, *, loop=None):
156 assert iscoroutine(coro), repr(coro) # Not a coroutine function!
157 super().__init__(loop=loop)
158 self._coro = iter(coro) # Use the iterator just in case.
159 self._fut_waiter = None
160 self._must_cancel = False
161 self._loop.call_soon(self._step)
162 self.__class__._all_tasks.add(self)
163
164 def __repr__(self):
165 res = super().__repr__()
166 if (self._must_cancel and
167 self._state == futures._PENDING and
168 '<PENDING' in res):
169 res = res.replace('<PENDING', '<CANCELLING', 1)
170 i = res.find('<')
171 if i < 0:
172 i = len(res)
173 res = res[:i] + '(<{}>)'.format(self._coro.__name__) + res[i:]
174 return res
175
176 def get_stack(self, *, limit=None):
177 """Return the list of stack frames for this task's coroutine.
178
179 If the coroutine is active, this returns the stack where it is
180 suspended. If the coroutine has completed successfully or was
181 cancelled, this returns an empty list. If the coroutine was
182 terminated by an exception, this returns the list of traceback
183 frames.
184
185 The frames are always ordered from oldest to newest.
186
Yury Selivanovdec1a452014-02-18 22:27:48 -0500187 The optional limit gives the maximum number of frames to
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700188 return; by default all available frames are returned. Its
189 meaning differs depending on whether a stack or a traceback is
190 returned: the newest frames of a stack are returned, but the
191 oldest frames of a traceback are returned. (This matches the
192 behavior of the traceback module.)
193
194 For reasons beyond our control, only one stack frame is
195 returned for a suspended coroutine.
196 """
197 frames = []
198 f = self._coro.gi_frame
199 if f is not None:
200 while f is not None:
201 if limit is not None:
202 if limit <= 0:
203 break
204 limit -= 1
205 frames.append(f)
206 f = f.f_back
207 frames.reverse()
208 elif self._exception is not None:
209 tb = self._exception.__traceback__
210 while tb is not None:
211 if limit is not None:
212 if limit <= 0:
213 break
214 limit -= 1
215 frames.append(tb.tb_frame)
216 tb = tb.tb_next
217 return frames
218
219 def print_stack(self, *, limit=None, file=None):
220 """Print the stack or traceback for this task's coroutine.
221
222 This produces output similar to that of the traceback module,
223 for the frames retrieved by get_stack(). The limit argument
224 is passed to get_stack(). The file argument is an I/O stream
225 to which the output goes; by default it goes to sys.stderr.
226 """
227 extracted_list = []
228 checked = set()
229 for f in self.get_stack(limit=limit):
230 lineno = f.f_lineno
231 co = f.f_code
232 filename = co.co_filename
233 name = co.co_name
234 if filename not in checked:
235 checked.add(filename)
236 linecache.checkcache(filename)
237 line = linecache.getline(filename, lineno, f.f_globals)
238 extracted_list.append((filename, lineno, name, line))
239 exc = self._exception
240 if not extracted_list:
241 print('No stack for %r' % self, file=file)
242 elif exc is not None:
243 print('Traceback for %r (most recent call last):' % self,
244 file=file)
245 else:
246 print('Stack for %r (most recent call last):' % self,
247 file=file)
248 traceback.print_list(extracted_list, file=file)
249 if exc is not None:
250 for line in traceback.format_exception_only(exc.__class__, exc):
251 print(line, file=file, end='')
252
253 def cancel(self):
254 if self.done():
255 return False
256 if self._fut_waiter is not None:
257 if self._fut_waiter.cancel():
258 # Leave self._fut_waiter; it may be a Task that
259 # catches and ignores the cancellation so we may have
260 # to cancel it again later.
261 return True
262 # It must be the case that self._step is already scheduled.
263 self._must_cancel = True
264 return True
265
266 def _step(self, value=None, exc=None):
267 assert not self.done(), \
268 '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
269 if self._must_cancel:
270 if not isinstance(exc, futures.CancelledError):
271 exc = futures.CancelledError()
272 self._must_cancel = False
273 coro = self._coro
274 self._fut_waiter = None
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800275
276 self.__class__._current_tasks[self._loop] = self
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700277 # Call either coro.throw(exc) or coro.send(value).
278 try:
279 if exc is not None:
280 result = coro.throw(exc)
281 elif value is not None:
282 result = coro.send(value)
283 else:
284 result = next(coro)
285 except StopIteration as exc:
286 self.set_result(exc.value)
287 except futures.CancelledError as exc:
288 super().cancel() # I.e., Future.cancel(self).
289 except Exception as exc:
290 self.set_exception(exc)
291 except BaseException as exc:
292 self.set_exception(exc)
293 raise
294 else:
295 if isinstance(result, futures.Future):
296 # Yielded Future must come from Future.__iter__().
297 if result._blocking:
298 result._blocking = False
299 result.add_done_callback(self._wakeup)
300 self._fut_waiter = result
301 if self._must_cancel:
302 if self._fut_waiter.cancel():
303 self._must_cancel = False
304 else:
305 self._loop.call_soon(
306 self._step, None,
307 RuntimeError(
308 'yield was used instead of yield from '
309 'in task {!r} with {!r}'.format(self, result)))
310 elif result is None:
311 # Bare yield relinquishes control for one event loop iteration.
312 self._loop.call_soon(self._step)
313 elif inspect.isgenerator(result):
314 # Yielding a generator is just wrong.
315 self._loop.call_soon(
316 self._step, None,
317 RuntimeError(
318 'yield was used instead of yield from for '
319 'generator in task {!r} with {}'.format(
320 self, result)))
321 else:
322 # Yielding something else is an error.
323 self._loop.call_soon(
324 self._step, None,
325 RuntimeError(
326 'Task got bad yield: {!r}'.format(result)))
Guido van Rossum1a605ed2013-12-06 12:57:40 -0800327 finally:
328 self.__class__._current_tasks.pop(self._loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700329 self = None
330
331 def _wakeup(self, future):
332 try:
333 value = future.result()
334 except Exception as exc:
335 # This may also be a cancellation.
336 self._step(None, exc)
337 else:
338 self._step(value, None)
339 self = None # Needed to break cycles when an exception occurs.
340
341
342# wait() and as_completed() similar to those in PEP 3148.
343
344FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
345FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
346ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
347
348
349@coroutine
350def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
351 """Wait for the Futures and coroutines given by fs to complete.
352
353 Coroutines will be wrapped in Tasks.
354
355 Returns two sets of Future: (done, pending).
356
357 Usage:
358
359 done, pending = yield from asyncio.wait(fs)
360
361 Note: This does not raise TimeoutError! Futures that aren't done
362 when the timeout occurs are returned in the second set.
363 """
Victor Stinner208556c2014-02-11 11:54:08 +0100364 if isinstance(fs, futures.Future) or iscoroutine(fs):
365 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700366 if not fs:
367 raise ValueError('Set of coroutines/Futures is empty.')
368
369 if loop is None:
370 loop = events.get_event_loop()
371
Yury Selivanov622be342014-02-06 22:06:16 -0500372 fs = {async(f, loop=loop) for f in set(fs)}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700373
374 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
375 raise ValueError('Invalid return_when value: {}'.format(return_when))
376 return (yield from _wait(fs, timeout, return_when, loop))
377
378
379def _release_waiter(waiter, value=True, *args):
380 if not waiter.done():
381 waiter.set_result(value)
382
383
384@coroutine
385def wait_for(fut, timeout, *, loop=None):
386 """Wait for the single Future or coroutine to complete, with timeout.
387
388 Coroutine will be wrapped in Task.
389
Victor Stinner421e49b2014-01-23 17:40:59 +0100390 Returns result of the Future or coroutine. When a timeout occurs,
391 it cancels the task and raises TimeoutError. To avoid the task
392 cancellation, wrap it in shield().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700393
394 Usage:
395
396 result = yield from asyncio.wait_for(fut, 10.0)
397
398 """
399 if loop is None:
400 loop = events.get_event_loop()
401
Guido van Rossum48c66c32014-01-29 14:30:38 -0800402 if timeout is None:
403 return (yield from fut)
404
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700405 waiter = futures.Future(loop=loop)
406 timeout_handle = loop.call_later(timeout, _release_waiter, waiter, False)
407 cb = functools.partial(_release_waiter, waiter, True)
408
409 fut = async(fut, loop=loop)
410 fut.add_done_callback(cb)
411
412 try:
413 if (yield from waiter):
414 return fut.result()
415 else:
416 fut.remove_done_callback(cb)
Victor Stinner421e49b2014-01-23 17:40:59 +0100417 fut.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700418 raise futures.TimeoutError()
419 finally:
420 timeout_handle.cancel()
421
422
423@coroutine
424def _wait(fs, timeout, return_when, loop):
425 """Internal helper for wait() and _wait_for().
426
427 The fs argument must be a collection of Futures.
428 """
429 assert fs, 'Set of Futures is empty.'
430 waiter = futures.Future(loop=loop)
431 timeout_handle = None
432 if timeout is not None:
433 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
434 counter = len(fs)
435
436 def _on_completion(f):
437 nonlocal counter
438 counter -= 1
439 if (counter <= 0 or
440 return_when == FIRST_COMPLETED or
441 return_when == FIRST_EXCEPTION and (not f.cancelled() and
442 f.exception() is not None)):
443 if timeout_handle is not None:
444 timeout_handle.cancel()
445 if not waiter.done():
446 waiter.set_result(False)
447
448 for f in fs:
449 f.add_done_callback(_on_completion)
450
451 try:
452 yield from waiter
453 finally:
454 if timeout_handle is not None:
455 timeout_handle.cancel()
456
457 done, pending = set(), set()
458 for f in fs:
459 f.remove_done_callback(_on_completion)
460 if f.done():
461 done.add(f)
462 else:
463 pending.add(f)
464 return done, pending
465
466
467# This is *not* a @coroutine! It is just an iterator (yielding Futures).
468def as_completed(fs, *, loop=None, timeout=None):
Guido van Rossum2303fec2014-02-12 17:58:19 -0800469 """Return an iterator whose values are coroutines.
470
471 When waiting for the yielded coroutines you'll get the results (or
472 exceptions!) of the original Futures (or coroutines), in the order
473 in which and as soon as they complete.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700474
475 This differs from PEP 3148; the proper way to use this is:
476
477 for f in as_completed(fs):
478 result = yield from f # The 'yield from' may raise.
479 # Use result.
480
Guido van Rossum2303fec2014-02-12 17:58:19 -0800481 If a timeout is specified, the 'yield from' will raise
482 TimeoutError when the timeout occurs before all Futures are done.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700483
484 Note: The futures 'f' are not necessarily members of fs.
485 """
Victor Stinner208556c2014-02-11 11:54:08 +0100486 if isinstance(fs, futures.Future) or iscoroutine(fs):
487 raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700488 loop = loop if loop is not None else events.get_event_loop()
489 deadline = None if timeout is None else loop.time() + timeout
Yury Selivanov622be342014-02-06 22:06:16 -0500490 todo = {async(f, loop=loop) for f in set(fs)}
Guido van Rossum2303fec2014-02-12 17:58:19 -0800491 from .queues import Queue # Import here to avoid circular import problem.
492 done = Queue(loop=loop)
493 timeout_handle = None
494
495 def _on_timeout():
496 for f in todo:
497 f.remove_done_callback(_on_completion)
498 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
499 todo.clear() # Can't do todo.remove(f) in the loop.
500
501 def _on_completion(f):
502 if not todo:
503 return # _on_timeout() was here first.
504 todo.remove(f)
505 done.put_nowait(f)
506 if not todo and timeout_handle is not None:
507 timeout_handle.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700508
509 @coroutine
510 def _wait_for_one():
Guido van Rossum2303fec2014-02-12 17:58:19 -0800511 f = yield from done.get()
512 if f is None:
513 # Dummy value from _on_timeout().
514 raise futures.TimeoutError
515 return f.result() # May raise f.exception().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700516
Guido van Rossum2303fec2014-02-12 17:58:19 -0800517 for f in todo:
518 f.add_done_callback(_on_completion)
519 if todo and timeout is not None:
520 timeout_handle = loop.call_later(timeout, _on_timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700521 for _ in range(len(todo)):
522 yield _wait_for_one()
523
524
525@coroutine
526def sleep(delay, result=None, *, loop=None):
527 """Coroutine that completes after a given time (in seconds)."""
528 future = futures.Future(loop=loop)
529 h = future._loop.call_later(delay, future.set_result, result)
530 try:
531 return (yield from future)
532 finally:
533 h.cancel()
534
535
536def async(coro_or_future, *, loop=None):
537 """Wrap a coroutine in a future.
538
539 If the argument is a Future, it is returned directly.
540 """
541 if isinstance(coro_or_future, futures.Future):
542 if loop is not None and loop is not coro_or_future._loop:
543 raise ValueError('loop argument must agree with Future')
544 return coro_or_future
545 elif iscoroutine(coro_or_future):
546 return Task(coro_or_future, loop=loop)
547 else:
548 raise TypeError('A Future or coroutine is required')
549
550
551class _GatheringFuture(futures.Future):
552 """Helper for gather().
553
554 This overrides cancel() to cancel all the children and act more
555 like Task.cancel(), which doesn't immediately mark itself as
556 cancelled.
557 """
558
559 def __init__(self, children, *, loop=None):
560 super().__init__(loop=loop)
561 self._children = children
562
563 def cancel(self):
564 if self.done():
565 return False
566 for child in self._children:
567 child.cancel()
568 return True
569
570
571def gather(*coros_or_futures, loop=None, return_exceptions=False):
572 """Return a future aggregating results from the given coroutines
573 or futures.
574
575 All futures must share the same event loop. If all the tasks are
576 done successfully, the returned future's result is the list of
577 results (in the order of the original sequence, not necessarily
Yury Selivanovf317cb72014-02-06 12:03:53 -0500578 the order of results arrival). If *return_exceptions* is True,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700579 exceptions in the tasks are treated the same as successful
580 results, and gathered in the result list; otherwise, the first
581 raised exception will be immediately propagated to the returned
582 future.
583
584 Cancellation: if the outer Future is cancelled, all children (that
585 have not completed yet) are also cancelled. If any child is
586 cancelled, this is treated as if it raised CancelledError --
587 the outer Future is *not* cancelled in this case. (This is to
588 prevent the cancellation of one child to cause other children to
589 be cancelled.)
590 """
Yury Selivanov622be342014-02-06 22:06:16 -0500591 arg_to_fut = {arg: async(arg, loop=loop) for arg in set(coros_or_futures)}
592 children = [arg_to_fut[arg] for arg in coros_or_futures]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700593 n = len(children)
594 if n == 0:
595 outer = futures.Future(loop=loop)
596 outer.set_result([])
597 return outer
598 if loop is None:
599 loop = children[0]._loop
600 for fut in children:
601 if fut._loop is not loop:
602 raise ValueError("futures are tied to different event loops")
603 outer = _GatheringFuture(children, loop=loop)
604 nfinished = 0
605 results = [None] * n
606
607 def _done_callback(i, fut):
608 nonlocal nfinished
609 if outer._state != futures._PENDING:
610 if fut._exception is not None:
611 # Mark exception retrieved.
612 fut.exception()
613 return
614 if fut._state == futures._CANCELLED:
615 res = futures.CancelledError()
616 if not return_exceptions:
617 outer.set_exception(res)
618 return
619 elif fut._exception is not None:
620 res = fut.exception() # Mark exception retrieved.
621 if not return_exceptions:
622 outer.set_exception(res)
623 return
624 else:
625 res = fut._result
626 results[i] = res
627 nfinished += 1
628 if nfinished == n:
629 outer.set_result(results)
630
631 for i, fut in enumerate(children):
632 fut.add_done_callback(functools.partial(_done_callback, i))
633 return outer
634
635
636def shield(arg, *, loop=None):
637 """Wait for a future, shielding it from cancellation.
638
639 The statement
640
641 res = yield from shield(something())
642
643 is exactly equivalent to the statement
644
645 res = yield from something()
646
647 *except* that if the coroutine containing it is cancelled, the
648 task running in something() is not cancelled. From the POV of
649 something(), the cancellation did not happen. But its caller is
650 still cancelled, so the yield-from expression still raises
651 CancelledError. Note: If something() is cancelled by other means
652 this will still cancel shield().
653
654 If you want to completely ignore cancellation (not recommended)
655 you can combine shield() with a try/except clause, as follows:
656
657 try:
658 res = yield from shield(something())
659 except CancelledError:
660 res = None
661 """
662 inner = async(arg, loop=loop)
663 if inner.done():
664 # Shortcut.
665 return inner
666 loop = inner._loop
667 outer = futures.Future(loop=loop)
668
669 def _done_callback(inner):
670 if outer.cancelled():
671 # Mark inner's result as retrieved.
672 inner.cancelled() or inner.exception()
673 return
674 if inner.cancelled():
675 outer.cancel()
676 else:
677 exc = inner.exception()
678 if exc is not None:
679 outer.set_exception(exc)
680 else:
681 outer.set_result(inner.result())
682
683 inner.add_done_callback(_done_callback)
684 return outer