Issue #16129: Add `Py_SetStandardStreamEncoding`

This new pre-initialization API allows embedding
applications like Blender to force a particular
encoding and error handler for the standard IO streams.

Also refactors Modules/_testembed.c to let us start
testing multiple embedding scenarios.

(Initial patch by Bastien Montagne)
diff --git a/Doc/c-api/init.rst b/Doc/c-api/init.rst
index 9ed2a3a..1a54321 100644
--- a/Doc/c-api/init.rst
+++ b/Doc/c-api/init.rst
@@ -86,6 +86,33 @@
 =======================
 
 
+.. c:function:: int Py_SetStandardStreamEncoding(char *encoding, char *errors)
+
+   .. index::
+      single: Py_Initialize()
+      single: main()
+      triple: stdin; stdout; sdterr
+
+   This function should be called before :c:func:`Py_Initialize`. It
+   specifies which encoding and error handling to use with standard io,
+   with the same meanings as in :func:`str.encode`.
+
+   It overrides :envvar:`PYTHONIOENCODING` values, and allows embedding code
+   to control io encoding when the environment variable does not work.
+
+   ``encoding`` and/or ``errors`` may be NULL to use
+   :envvar:`PYTHONIOENCODING` and/or default values (depending on other
+   settings).
+
+   Note that :data:`sys.stderr` always uses the "backslashreplace" error
+   handler, regardless of this (or any other) setting.
+
+   If :c:func:`Py_Finalize` is called, this function will need to be called
+   again in order to affect subsequent calls to :c:func:`Py_Initialize`.
+
+   Returns 0 if successful.
+
+
 .. c:function:: void Py_SetProgramName(wchar_t *name)
 
    .. index::
diff --git a/Doc/whatsnew/3.4.rst b/Doc/whatsnew/3.4.rst
index 3610dbf..befa00d 100644
--- a/Doc/whatsnew/3.4.rst
+++ b/Doc/whatsnew/3.4.rst
@@ -564,7 +564,10 @@
 
 Changes to Python's build process and to the C API include:
 
-* None yet.
+* The new :c:func:`Py_SetStandardStreamEncoding` pre-initialization API
+  allows applications embedding the CPython interpreter to reliably force
+  a particular encoding and error handler for the standard streams
+  (Contributed by Bastien Montagne and Nick Coghlan in :issue:`16129`)
 
 
 Deprecated
diff --git a/Include/pythonrun.h b/Include/pythonrun.h
index 8fdb5b5..70c412b 100644
--- a/Include/pythonrun.h
+++ b/Include/pythonrun.h
@@ -38,6 +38,8 @@
 PyAPI_FUNC(PyThreadState *) Py_NewInterpreter(void);
 PyAPI_FUNC(void) Py_EndInterpreter(PyThreadState *);
 
+PyAPI_FUNC(int) Py_SetStandardStreamEncoding(const char *encoding, const char *errors);
+
 #ifndef Py_LIMITED_API
 PyAPI_FUNC(int) PyRun_SimpleStringFlags(const char *, PyCompilerFlags *);
 PyAPI_FUNC(int) PyRun_AnyFileFlags(FILE *, const char *, PyCompilerFlags *);
diff --git a/Lib/test/test_capi.py b/Lib/test/test_capi.py
index bbbacc2..e1f9ae9 100644
--- a/Lib/test/test_capi.py
+++ b/Lib/test/test_capi.py
@@ -9,6 +9,7 @@
 import sys
 import time
 import unittest
+import textwrap
 from test import support
 try:
     import _posixsubprocess
@@ -218,36 +219,81 @@
         self.assertEqual(_testcapi.argparsing("Hello", "World"), 1)
 
 
-class EmbeddingTest(unittest.TestCase):
+@unittest.skipIf(
+    sys.platform.startswith('win'),
+    "interpreter embedding tests aren't built under Windows")
+class EmbeddingTests(unittest.TestCase):
+    # XXX only tested under Unix checkouts
 
-    @unittest.skipIf(
-        sys.platform.startswith('win'),
-        "test doesn't work under Windows")
-    def test_subinterps(self):
-        # XXX only tested under Unix checkouts
+    def setUp(self):
         basepath = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
-        oldcwd = os.getcwd()
+        self.test_exe = exe = os.path.join(basepath, "Modules", "_testembed")
+        if not os.path.exists(exe):
+            self.skipTest("%r doesn't exist" % exe)
         # This is needed otherwise we get a fatal error:
         # "Py_Initialize: Unable to get the locale encoding
         # LookupError: no codec search functions registered: can't find encoding"
+        self.oldcwd = os.getcwd()
         os.chdir(basepath)
-        try:
-            exe = os.path.join(basepath, "Modules", "_testembed")
-            if not os.path.exists(exe):
-                self.skipTest("%r doesn't exist" % exe)
-            p = subprocess.Popen([exe],
-                                 stdout=subprocess.PIPE,
-                                 stderr=subprocess.PIPE)
-            (out, err) = p.communicate()
-            self.assertEqual(p.returncode, 0,
-                             "bad returncode %d, stderr is %r" %
-                             (p.returncode, err))
-            if support.verbose:
-                print()
-                print(out.decode('latin1'))
-                print(err.decode('latin1'))
-        finally:
-            os.chdir(oldcwd)
+
+    def tearDown(self):
+        os.chdir(self.oldcwd)
+
+    def run_embedded_interpreter(self, *args):
+        """Runs a test in the embedded interpreter"""
+        cmd = [self.test_exe]
+        cmd.extend(args)
+        p = subprocess.Popen(cmd,
+                             stdout=subprocess.PIPE,
+                             stderr=subprocess.PIPE)
+        (out, err) = p.communicate()
+        self.assertEqual(p.returncode, 0,
+                         "bad returncode %d, stderr is %r" %
+                         (p.returncode, err))
+        return out.decode("latin1"), err.decode("latin1")
+
+    def test_subinterps(self):
+        # This is just a "don't crash" test
+        out, err = self.run_embedded_interpreter()
+        if support.verbose:
+            print()
+            print(out)
+            print(err)
+
+    def test_forced_io_encoding(self):
+        # Checks forced configuration of embedded interpreter IO streams
+        out, err = self.run_embedded_interpreter("forced_io_encoding")
+        if support.verbose:
+            print()
+            print(out)
+            print(err)
+        expected_output = textwrap.dedent("""\
+        --- Use defaults ---
+        Expected encoding: default
+        Expected errors: default
+        stdin: {0.stdin.encoding}:strict
+        stdout: {0.stdout.encoding}:strict
+        stderr: {0.stderr.encoding}:backslashreplace
+        --- Set errors only ---
+        Expected encoding: default
+        Expected errors: surrogateescape
+        stdin: {0.stdin.encoding}:surrogateescape
+        stdout: {0.stdout.encoding}:surrogateescape
+        stderr: {0.stderr.encoding}:backslashreplace
+        --- Set encoding only ---
+        Expected encoding: latin-1
+        Expected errors: default
+        stdin: latin-1:strict
+        stdout: latin-1:strict
+        stderr: latin-1:backslashreplace
+        --- Set encoding and errors ---
+        Expected encoding: latin-1
+        Expected errors: surrogateescape
+        stdin: latin-1:surrogateescape
+        stdout: latin-1:surrogateescape
+        stderr: latin-1:backslashreplace""").format(sys)
+
+        self.assertEqual(out.strip(), expected_output)
 
 class SkipitemTest(unittest.TestCase):
 
@@ -358,7 +404,7 @@
 
 def test_main():
     support.run_unittest(CAPITest, TestPendingCalls, Test6012,
-                         EmbeddingTest, SkipitemTest, TestThreadState,
+                         EmbeddingTests, SkipitemTest, TestThreadState,
                          SubinterpreterTest)
 
     for name in dir(_testcapi):
diff --git a/Misc/ACKS b/Misc/ACKS
index 7be6f58..ab39bcf 100644
--- a/Misc/ACKS
+++ b/Misc/ACKS
@@ -872,6 +872,7 @@
 Florian Mladitsch
 Doug Moen
 The Dragon De Monsyne
+Bastien Montagne
 Skip Montanaro
 Peter Moody
 Paul Moore
diff --git a/Misc/NEWS b/Misc/NEWS
index f9ede51..28ba51f 100644
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -120,6 +120,14 @@
 - Issue #4366: Fix building extensions on all platforms when --enable-shared
   is used.
 
+C API
+-----
+
+- Issue #16129: Added a `Py_SetStandardStreamEncoding` pre-initialization API
+  to allow embedding applications like Blender to force a particular
+  encoding and error handler for the standard IO streams (initial patch by
+  Bastien Montagne)
+
 Tests
 -----
 
diff --git a/Modules/_testembed.c b/Modules/_testembed.c
index 51b439f..a21d251 100644
--- a/Modules/_testembed.c
+++ b/Modules/_testembed.c
@@ -1,7 +1,26 @@
 #include <Python.h>
 #include <stdio.h>
 
-void print_subinterp(void)
+/*********************************************************
+ * Embedded interpreter tests that need a custom exe
+ *
+ * Executed via 'EmbeddingTests' in Lib/test/test_capi.py
+ *********************************************************/
+
+static void _testembed_Py_Initialize(void)
+{
+    /* HACK: the "./" at front avoids a search along the PATH in
+       Modules/getpath.c */
+    Py_SetProgramName(L"./_testembed");
+    Py_Initialize();
+}
+
+
+/*****************************************************
+ * Test repeated initalisation and subinterpreters
+ *****************************************************/
+
+static void print_subinterp(void)
 {
     /* Just output some debug stuff */
     PyThreadState *ts = PyThreadState_Get();
@@ -14,7 +33,7 @@
     );
 }
 
-int main(int argc, char *argv[])
+static void test_repeated_init_and_subinterpreters(void)
 {
     PyThreadState *mainstate, *substate;
 #ifdef WITH_THREAD
@@ -24,10 +43,7 @@
 
     for (i=0; i<3; i++) {
         printf("--- Pass %d ---\n", i);
-        /* HACK: the "./" at front avoids a search along the PATH in
-           Modules/getpath.c */
-        Py_SetProgramName(L"./_testembed");
-        Py_Initialize();
+        _testembed_Py_Initialize();
         mainstate = PyThreadState_Get();
 
 #ifdef WITH_THREAD
@@ -54,5 +70,71 @@
         PyEval_RestoreThread(mainstate);
         Py_Finalize();
     }
+}
+
+/*****************************************************
+ * Test forcing a particular IO encoding
+ *****************************************************/
+
+static void check_stdio_details(const char *encoding, const char * errors)
+{
+    /* Output info for the test case to check */
+    if (encoding) {
+        printf("Expected encoding: %s\n", encoding);
+    } else {
+        printf("Expected encoding: default\n");
+    }
+    if (errors) {
+        printf("Expected errors: %s\n", errors);
+    } else {
+        printf("Expected errors: default\n");
+    }
+    fflush(stdout);
+    /* Force the given IO encoding */
+    Py_SetStandardStreamEncoding(encoding, errors);
+    _testembed_Py_Initialize();
+    PyRun_SimpleString(
+        "import sys;"
+        "print('stdin: {0.encoding}:{0.errors}'.format(sys.stdin));"
+        "print('stdout: {0.encoding}:{0.errors}'.format(sys.stdout));"
+        "print('stderr: {0.encoding}:{0.errors}'.format(sys.stderr));"
+        "sys.stdout.flush()"
+    );
+    Py_Finalize();
+}
+
+static void test_forced_io_encoding(void)
+{
+    /* Check various combinations */
+    printf("--- Use defaults ---\n");
+    check_stdio_details(NULL, NULL);
+    printf("--- Set errors only ---\n");
+    check_stdio_details(NULL, "surrogateescape");
+    printf("--- Set encoding only ---\n");
+    check_stdio_details("latin-1", NULL);
+    printf("--- Set encoding and errors ---\n");
+    check_stdio_details("latin-1", "surrogateescape");
+
+    /* Check calling after initialization fails */
+    Py_Initialize();
+
+    if (Py_SetStandardStreamEncoding(NULL, NULL) == 0) {
+        printf("Unexpected success calling Py_SetStandardStreamEncoding");
+    }
+    Py_Finalize();
+}
+
+/* Different embedding tests */
+int main(int argc, char *argv[])
+{
+
+    /* TODO: Check the argument string to allow for more test cases */
+    if (argc > 1) {
+        /* For now: assume "forced_io_encoding */
+        test_forced_io_encoding();
+    } else {
+        /* Run the original embedding test case by default */
+        test_repeated_init_and_subinterpreters();
+    }
     return 0;
 }
diff --git a/Python/pythonrun.c b/Python/pythonrun.c
index 06f30b0..3bcc474 100644
--- a/Python/pythonrun.c
+++ b/Python/pythonrun.c
@@ -134,6 +134,40 @@
     return initialized;
 }
 
+/* Helper to allow an embedding application to override the normal
+ * mechanism that attempts to figure out an appropriate IO encoding
+ */
+
+static char *_Py_StandardStreamEncoding = NULL;
+static char *_Py_StandardStreamErrors = NULL;
+
+int
+Py_SetStandardStreamEncoding(const char *encoding, const char *errors)
+{
+    if (Py_IsInitialized()) {
+        /* This is too late to have any effect */
+        return -1;
+    }
+    if (encoding) {
+        _Py_StandardStreamEncoding = _PyMem_RawStrdup(encoding);
+        if (!_Py_StandardStreamEncoding) {
+            PyErr_NoMemory();
+            return -1;
+        }
+    }
+    if (errors) {
+        _Py_StandardStreamErrors = _PyMem_RawStrdup(errors);
+        if (!_Py_StandardStreamErrors) {
+            if (_Py_StandardStreamEncoding) {
+                PyMem_RawFree(_Py_StandardStreamEncoding);
+            }
+            PyErr_NoMemory();
+            return -1;
+        }
+    }
+    return 0;
+}
+
 /* Global initializations.  Can be undone by Py_Finalize().  Don't
    call this twice without an intervening Py_Finalize() call.  When
    initializations fail, a fatal error is issued and the function does
@@ -1088,23 +1122,29 @@
     }
     Py_DECREF(wrapper);
 
-    pythonioencoding = Py_GETENV("PYTHONIOENCODING");
-    encoding = errors = NULL;
-    if (pythonioencoding) {
-        pythonioencoding = _PyMem_Strdup(pythonioencoding);
-        if (pythonioencoding == NULL) {
-            PyErr_NoMemory();
-            goto error;
+    encoding = _Py_StandardStreamEncoding;
+    errors = _Py_StandardStreamErrors;
+    if (!encoding || !errors) {
+        pythonioencoding = Py_GETENV("PYTHONIOENCODING");
+        if (pythonioencoding) {
+            char *err;
+            pythonioencoding = _PyMem_Strdup(pythonioencoding);
+            if (pythonioencoding == NULL) {
+                PyErr_NoMemory();
+                goto error;
+            }
+            err = strchr(pythonioencoding, ':');
+            if (err) {
+                *err = '\0';
+                err++;
+                if (*err && !errors) {
+                    errors = err;
+                }
+            }
+            if (*pythonioencoding && !encoding) {
+                encoding = pythonioencoding;
+            }
         }
-        errors = strchr(pythonioencoding, ':');
-        if (errors) {
-            *errors = '\0';
-            errors++;
-            if (!*errors)
-                errors = NULL;
-        }
-        if (*pythonioencoding)
-            encoding = pythonioencoding;
     }
 
     /* Set sys.stdin */
@@ -1184,6 +1224,15 @@
         status = -1;
     }
 
+    /* We won't need them anymore. */
+    if (_Py_StandardStreamEncoding) {
+        PyMem_RawFree(_Py_StandardStreamEncoding);
+        _Py_StandardStreamEncoding = NULL;
+    }
+    if (_Py_StandardStreamErrors) {
+        PyMem_RawFree(_Py_StandardStreamErrors);
+        _Py_StandardStreamErrors = NULL;
+    }
     PyMem_Free(pythonioencoding);
     Py_XDECREF(bimod);
     Py_XDECREF(iomod);