Issue #25304: Add asyncio.run_coroutine_threadsafe(). By Vincent Michel.
diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py
index dbe06c4..166bc80 100644
--- a/Lib/asyncio/futures.py
+++ b/Lib/asyncio/futures.py
@@ -390,22 +390,64 @@
         __await__ = __iter__ # make compatible with 'await' expression
 
 
-def wrap_future(fut, *, loop=None):
+def _set_concurrent_future_state(concurrent, source):
+    """Copy state from a future to a concurrent.futures.Future."""
+    assert source.done()
+    if source.cancelled():
+        concurrent.cancel()
+    if not concurrent.set_running_or_notify_cancel():
+        return
+    exception = source.exception()
+    if exception is not None:
+        concurrent.set_exception(exception)
+    else:
+        result = source.result()
+        concurrent.set_result(result)
+
+
+def _chain_future(source, destination):
+    """Chain two futures so that when one completes, so does the other.
+
+    The result (or exception) of source will be copied to destination.
+    If destination is cancelled, source gets cancelled too.
+    Compatible with both asyncio.Future and concurrent.futures.Future.
+    """
+    if not isinstance(source, (Future, concurrent.futures.Future)):
+        raise TypeError('A future is required for source argument')
+    if not isinstance(destination, (Future, concurrent.futures.Future)):
+        raise TypeError('A future is required for destination argument')
+    source_loop = source._loop if isinstance(source, Future) else None
+    dest_loop = destination._loop if isinstance(destination, Future) else None
+
+    def _set_state(future, other):
+        if isinstance(future, Future):
+            future._copy_state(other)
+        else:
+            _set_concurrent_future_state(future, other)
+
+    def _call_check_cancel(destination):
+        if destination.cancelled():
+            if source_loop is None or source_loop is dest_loop:
+                source.cancel()
+            else:
+                source_loop.call_soon_threadsafe(source.cancel)
+
+    def _call_set_state(source):
+        if dest_loop is None or dest_loop is source_loop:
+            _set_state(destination, source)
+        else:
+            dest_loop.call_soon_threadsafe(_set_state, destination, source)
+
+    destination.add_done_callback(_call_check_cancel)
+    source.add_done_callback(_call_set_state)
+
+
+def wrap_future(future, *, loop=None):
     """Wrap concurrent.futures.Future object."""
-    if isinstance(fut, Future):
-        return fut
-    assert isinstance(fut, concurrent.futures.Future), \
-        'concurrent.futures.Future is expected, got {!r}'.format(fut)
-    if loop is None:
-        loop = events.get_event_loop()
+    if isinstance(future, Future):
+        return future
+    assert isinstance(future, concurrent.futures.Future), \
+        'concurrent.futures.Future is expected, got {!r}'.format(future)
     new_future = Future(loop=loop)
-
-    def _check_cancel_other(f):
-        if f.cancelled():
-            fut.cancel()
-
-    new_future.add_done_callback(_check_cancel_other)
-    fut.add_done_callback(
-        lambda future: loop.call_soon_threadsafe(
-            new_future._copy_state, future))
+    _chain_future(future, new_future)
     return new_future