bpo-41486: Faster bz2/lzma/zlib via new output buffering (GH-21740)

Faster bz2/lzma/zlib via new output buffering.
Also adds .readall() function to _compression.DecompressReader class
to take best advantage of this in the consume-all-output at once scenario.

Often a 5-20% speedup in common scenarios due to less data copying.

Contributed by Ma Lin.
diff --git a/Modules/zlibmodule.c b/Modules/zlibmodule.c
index 1ddaefd..8a20dfc 100644
--- a/Modules/zlibmodule.c
+++ b/Modules/zlibmodule.c
@@ -9,6 +9,79 @@
 #include "structmember.h"         // PyMemberDef
 #include "zlib.h"
 
+// Blocks output buffer wrappers
+#include "pycore_blocks_output_buffer.h"
+
+#if OUTPUT_BUFFER_MAX_BLOCK_SIZE > UINT32_MAX
+    #error "The maximum block size accepted by zlib is UINT32_MAX."
+#endif
+
+/* On success, return value >= 0
+   On failure, return -1 */
+static inline Py_ssize_t
+Buffer_InitAndGrow(_BlocksOutputBuffer *buffer, Py_ssize_t max_length,
+                   Bytef **next_out, uint32_t *avail_out)
+{
+    Py_ssize_t allocated;
+
+    allocated = _BlocksOutputBuffer_InitAndGrow(
+                    buffer, max_length, (void**) next_out);
+    *avail_out = (uint32_t) allocated;
+    return allocated;
+}
+
+/* On success, return value >= 0
+   On failure, return -1 */
+static inline Py_ssize_t
+Buffer_InitWithSize(_BlocksOutputBuffer *buffer, Py_ssize_t init_size,
+                    Bytef **next_out, uint32_t *avail_out)
+{
+    Py_ssize_t allocated;
+
+    if (init_size < 0 || (size_t)init_size > UINT32_MAX) {
+        PyErr_SetString(PyExc_ValueError,
+                        "Initial buffer size should (0 <= size <= UINT32_MAX)");
+        return -1;
+    }
+
+    allocated = _BlocksOutputBuffer_InitWithSize(
+                    buffer, init_size, (void**) next_out);
+    *avail_out = (uint32_t) allocated;
+    return allocated;
+}
+
+/* On success, return value >= 0
+   On failure, return -1 */
+static inline Py_ssize_t
+Buffer_Grow(_BlocksOutputBuffer *buffer,
+            Bytef **next_out, uint32_t *avail_out)
+{
+    Py_ssize_t allocated;
+
+    allocated = _BlocksOutputBuffer_Grow(
+                    buffer, (void**) next_out, (Py_ssize_t) *avail_out);
+    *avail_out = (uint32_t) allocated;
+    return allocated;
+}
+
+static inline Py_ssize_t
+Buffer_GetDataSize(_BlocksOutputBuffer *buffer, uint32_t avail_out)
+{
+    return _BlocksOutputBuffer_GetDataSize(buffer, (Py_ssize_t) avail_out);
+}
+
+static inline PyObject *
+Buffer_Finish(_BlocksOutputBuffer *buffer, uint32_t avail_out)
+{
+    return _BlocksOutputBuffer_Finish(buffer, (Py_ssize_t) avail_out);
+}
+
+static inline void
+Buffer_OnError(_BlocksOutputBuffer *buffer)
+{
+    _BlocksOutputBuffer_OnError(buffer);
+}
+
 
 #define ENTER_ZLIB(obj) do {                      \
     if (!PyThread_acquire_lock((obj)->lock, 0)) { \
@@ -149,56 +222,6 @@ arrange_input_buffer(z_stream *zst, Py_ssize_t *remains)
     *remains -= zst->avail_in;
 }
 
-static Py_ssize_t
-arrange_output_buffer_with_maximum(z_stream *zst, PyObject **buffer,
-                                   Py_ssize_t length,
-                                   Py_ssize_t max_length)
-{
-    Py_ssize_t occupied;
-
-    if (*buffer == NULL) {
-        if (!(*buffer = PyBytes_FromStringAndSize(NULL, length)))
-            return -1;
-        occupied = 0;
-    }
-    else {
-        occupied = zst->next_out - (Byte *)PyBytes_AS_STRING(*buffer);
-
-        if (length == occupied) {
-            Py_ssize_t new_length;
-            assert(length <= max_length);
-            /* can not scale the buffer over max_length */
-            if (length == max_length)
-                return -2;
-            if (length <= (max_length >> 1))
-                new_length = length << 1;
-            else
-                new_length = max_length;
-            if (_PyBytes_Resize(buffer, new_length) < 0)
-                return -1;
-            length = new_length;
-        }
-    }
-
-    zst->avail_out = (uInt)Py_MIN((size_t)(length - occupied), UINT_MAX);
-    zst->next_out = (Byte *)PyBytes_AS_STRING(*buffer) + occupied;
-
-    return length;
-}
-
-static Py_ssize_t
-arrange_output_buffer(z_stream *zst, PyObject **buffer, Py_ssize_t length)
-{
-    Py_ssize_t ret;
-
-    ret = arrange_output_buffer_with_maximum(zst, buffer, length,
-                                             PY_SSIZE_T_MAX);
-    if (ret == -2)
-        PyErr_NoMemory();
-
-    return ret;
-}
-
 /*[clinic input]
 zlib.compress
 
@@ -215,16 +238,20 @@ static PyObject *
 zlib_compress_impl(PyObject *module, Py_buffer *data, int level)
 /*[clinic end generated code: output=d80906d73f6294c8 input=638d54b6315dbed3]*/
 {
-    PyObject *RetVal = NULL;
-    Py_ssize_t obuflen = DEF_BUF_SIZE;
+    PyObject *RetVal;
     int flush;
     z_stream zst;
+    _BlocksOutputBuffer buffer = {.list = NULL};
 
     zlibstate *state = get_zlib_state(module);
 
     Byte *ibuf = data->buf;
     Py_ssize_t ibuflen = data->len;
 
+    if (Buffer_InitAndGrow(&buffer, -1, &zst.next_out, &zst.avail_out) < 0) {
+        goto error;
+    }
+
     zst.opaque = NULL;
     zst.zalloc = PyZlib_Malloc;
     zst.zfree = PyZlib_Free;
@@ -252,10 +279,11 @@ zlib_compress_impl(PyObject *module, Py_buffer *data, int level)
         flush = ibuflen == 0 ? Z_FINISH : Z_NO_FLUSH;
 
         do {
-            obuflen = arrange_output_buffer(&zst, &RetVal, obuflen);
-            if (obuflen < 0) {
-                deflateEnd(&zst);
-                goto error;
+            if (zst.avail_out == 0) {
+                if (Buffer_Grow(&buffer, &zst.next_out, &zst.avail_out) < 0) {
+                    deflateEnd(&zst);
+                    goto error;
+                }
             }
 
             Py_BEGIN_ALLOW_THREADS
@@ -276,15 +304,16 @@ zlib_compress_impl(PyObject *module, Py_buffer *data, int level)
 
     err = deflateEnd(&zst);
     if (err == Z_OK) {
-        if (_PyBytes_Resize(&RetVal, zst.next_out -
-                            (Byte *)PyBytes_AS_STRING(RetVal)) < 0)
+        RetVal = Buffer_Finish(&buffer, zst.avail_out);
+        if (RetVal == NULL) {
             goto error;
+        }
         return RetVal;
     }
     else
         zlib_error(state, zst, err, "while finishing compression");
  error:
-    Py_XDECREF(RetVal);
+    Buffer_OnError(&buffer);
     return NULL;
 }
 
@@ -307,11 +336,12 @@ zlib_decompress_impl(PyObject *module, Py_buffer *data, int wbits,
                      Py_ssize_t bufsize)
 /*[clinic end generated code: output=77c7e35111dc8c42 input=a9ac17beff1f893f]*/
 {
-    PyObject *RetVal = NULL;
+    PyObject *RetVal;
     Byte *ibuf;
     Py_ssize_t ibuflen;
     int err, flush;
     z_stream zst;
+    _BlocksOutputBuffer buffer = {.list = NULL};
 
     zlibstate *state = get_zlib_state(module);
 
@@ -322,6 +352,10 @@ zlib_decompress_impl(PyObject *module, Py_buffer *data, int wbits,
         bufsize = 1;
     }
 
+    if (Buffer_InitWithSize(&buffer, bufsize, &zst.next_out, &zst.avail_out) < 0) {
+        goto error;
+    }
+
     ibuf = data->buf;
     ibuflen = data->len;
 
@@ -350,10 +384,11 @@ zlib_decompress_impl(PyObject *module, Py_buffer *data, int wbits,
         flush = ibuflen == 0 ? Z_FINISH : Z_NO_FLUSH;
 
         do {
-            bufsize = arrange_output_buffer(&zst, &RetVal, bufsize);
-            if (bufsize < 0) {
-                inflateEnd(&zst);
-                goto error;
+            if (zst.avail_out == 0) {
+                if (Buffer_Grow(&buffer, &zst.next_out, &zst.avail_out) < 0) {
+                    inflateEnd(&zst);
+                    goto error;
+                }
             }
 
             Py_BEGIN_ALLOW_THREADS
@@ -393,14 +428,13 @@ zlib_decompress_impl(PyObject *module, Py_buffer *data, int wbits,
         goto error;
     }
 
-    if (_PyBytes_Resize(&RetVal, zst.next_out -
-                        (Byte *)PyBytes_AS_STRING(RetVal)) < 0)
-        goto error;
-
-    return RetVal;
+    RetVal = Buffer_Finish(&buffer, zst.avail_out);
+    if (RetVal != NULL) {
+        return RetVal;
+    }
 
  error:
-    Py_XDECREF(RetVal);
+    Buffer_OnError(&buffer);
     return NULL;
 }
 
@@ -633,9 +667,9 @@ zlib_Compress_compress_impl(compobject *self, PyTypeObject *cls,
                             Py_buffer *data)
 /*[clinic end generated code: output=6731b3f0ff357ca6 input=04d00f65ab01d260]*/
 {
-    PyObject *RetVal = NULL;
-    Py_ssize_t obuflen = DEF_BUF_SIZE;
+    PyObject *RetVal;
     int err;
+    _BlocksOutputBuffer buffer = {.list = NULL};
     zlibstate *state = PyType_GetModuleState(cls);
 
     ENTER_ZLIB(self);
@@ -643,13 +677,18 @@ zlib_Compress_compress_impl(compobject *self, PyTypeObject *cls,
     self->zst.next_in = data->buf;
     Py_ssize_t ibuflen = data->len;
 
+    if (Buffer_InitAndGrow(&buffer, -1, &self->zst.next_out, &self->zst.avail_out) < 0) {
+        goto error;
+    }
+
     do {
         arrange_input_buffer(&self->zst, &ibuflen);
 
         do {
-            obuflen = arrange_output_buffer(&self->zst, &RetVal, obuflen);
-            if (obuflen < 0)
-                goto error;
+            if (self->zst.avail_out == 0) {
+                if (Buffer_Grow(&buffer, &self->zst.next_out, &self->zst.avail_out) < 0)
+                    goto error;
+            }
 
             Py_BEGIN_ALLOW_THREADS
             err = deflate(&self->zst, Z_NO_FLUSH);
@@ -665,12 +704,14 @@ zlib_Compress_compress_impl(compobject *self, PyTypeObject *cls,
 
     } while (ibuflen != 0);
 
-    if (_PyBytes_Resize(&RetVal, self->zst.next_out -
-                        (Byte *)PyBytes_AS_STRING(RetVal)) == 0)
+    RetVal = Buffer_Finish(&buffer, self->zst.avail_out);
+    if (RetVal != NULL) {
         goto success;
+    }
 
  error:
-    Py_CLEAR(RetVal);
+    Buffer_OnError(&buffer);
+    RetVal = NULL;
  success:
     LEAVE_ZLIB(self);
     return RetVal;
@@ -746,8 +787,9 @@ zlib_Decompress_decompress_impl(compobject *self, PyTypeObject *cls,
 /*[clinic end generated code: output=b024a93c2c922d57 input=bfb37b3864cfb606]*/
 {
     int err = Z_OK;
-    Py_ssize_t ibuflen, obuflen = DEF_BUF_SIZE, hard_limit;
-    PyObject *RetVal = NULL;
+    Py_ssize_t ibuflen;
+    PyObject *RetVal;
+    _BlocksOutputBuffer buffer = {.list = NULL};
 
     PyObject *module = PyType_GetModule(cls);
     if (module == NULL)
@@ -758,33 +800,28 @@ zlib_Decompress_decompress_impl(compobject *self, PyTypeObject *cls,
         PyErr_SetString(PyExc_ValueError, "max_length must be non-negative");
         return NULL;
     } else if (max_length == 0)
-        hard_limit = PY_SSIZE_T_MAX;
-    else
-        hard_limit = max_length;
+        max_length = -1;
 
     ENTER_ZLIB(self);
 
     self->zst.next_in = data->buf;
     ibuflen = data->len;
 
-    /* limit amount of data allocated to max_length */
-    if (max_length && obuflen > max_length)
-        obuflen = max_length;
+    if (Buffer_InitAndGrow(&buffer, max_length, &self->zst.next_out, &self->zst.avail_out) < 0) {
+        goto abort;
+    }
 
     do {
         arrange_input_buffer(&self->zst, &ibuflen);
 
         do {
-            obuflen = arrange_output_buffer_with_maximum(&self->zst, &RetVal,
-                                                         obuflen, hard_limit);
-            if (obuflen == -2) {
-                if (max_length > 0) {
+            if (self->zst.avail_out == 0) {
+                if (Buffer_GetDataSize(&buffer, self->zst.avail_out) == max_length) {
                     goto save;
                 }
-                PyErr_NoMemory();
-            }
-            if (obuflen < 0) {
-                goto abort;
+                if (Buffer_Grow(&buffer, &self->zst.next_out, &self->zst.avail_out) < 0) {
+                    goto abort;
+                }
             }
 
             Py_BEGIN_ALLOW_THREADS
@@ -828,12 +865,14 @@ zlib_Decompress_decompress_impl(compobject *self, PyTypeObject *cls,
         goto abort;
     }
 
-    if (_PyBytes_Resize(&RetVal, self->zst.next_out -
-                        (Byte *)PyBytes_AS_STRING(RetVal)) == 0)
+    RetVal = Buffer_Finish(&buffer, self->zst.avail_out);
+    if (RetVal != NULL) {
         goto success;
+    }
 
  abort:
-    Py_CLEAR(RetVal);
+    Buffer_OnError(&buffer);
+    RetVal = NULL;
  success:
     LEAVE_ZLIB(self);
     return RetVal;
@@ -858,8 +897,8 @@ zlib_Compress_flush_impl(compobject *self, PyTypeObject *cls, int mode)
 /*[clinic end generated code: output=c7efd13efd62add2 input=286146e29442eb6c]*/
 {
     int err;
-    Py_ssize_t length = DEF_BUF_SIZE;
-    PyObject *RetVal = NULL;
+    PyObject *RetVal;
+    _BlocksOutputBuffer buffer = {.list = NULL};
 
     zlibstate *state = PyType_GetModuleState(cls);
     /* Flushing with Z_NO_FLUSH is a no-op, so there's no point in
@@ -872,11 +911,15 @@ zlib_Compress_flush_impl(compobject *self, PyTypeObject *cls, int mode)
 
     self->zst.avail_in = 0;
 
+    if (Buffer_InitAndGrow(&buffer, -1, &self->zst.next_out, &self->zst.avail_out) < 0) {
+        goto error;
+    }
+
     do {
-        length = arrange_output_buffer(&self->zst, &RetVal, length);
-        if (length < 0) {
-            Py_CLEAR(RetVal);
-            goto error;
+        if (self->zst.avail_out == 0) {
+            if (Buffer_Grow(&buffer, &self->zst.next_out, &self->zst.avail_out) < 0) {
+                goto error;
+            }
         }
 
         Py_BEGIN_ALLOW_THREADS
@@ -885,7 +928,6 @@ zlib_Compress_flush_impl(compobject *self, PyTypeObject *cls, int mode)
 
         if (err == Z_STREAM_ERROR) {
             zlib_error(state, self->zst, err, "while flushing");
-            Py_CLEAR(RetVal);
             goto error;
         }
     } while (self->zst.avail_out == 0);
@@ -898,7 +940,6 @@ zlib_Compress_flush_impl(compobject *self, PyTypeObject *cls, int mode)
         err = deflateEnd(&self->zst);
         if (err != Z_OK) {
             zlib_error(state, self->zst, err, "while finishing compression");
-            Py_CLEAR(RetVal);
             goto error;
         }
         else
@@ -910,15 +951,18 @@ zlib_Compress_flush_impl(compobject *self, PyTypeObject *cls, int mode)
         */
     } else if (err != Z_OK && err != Z_BUF_ERROR) {
         zlib_error(state, self->zst, err, "while flushing");
-        Py_CLEAR(RetVal);
         goto error;
     }
 
-    if (_PyBytes_Resize(&RetVal, self->zst.next_out -
-                        (Byte *)PyBytes_AS_STRING(RetVal)) < 0)
-        Py_CLEAR(RetVal);
+    RetVal = Buffer_Finish(&buffer, self->zst.avail_out);
+    if (RetVal != NULL) {
+        goto success;
+    }
 
- error:
+error:
+    Buffer_OnError(&buffer);
+    RetVal = NULL;
+success:
     LEAVE_ZLIB(self);
     return RetVal;
 }
@@ -1120,8 +1164,9 @@ zlib_Decompress_flush_impl(compobject *self, PyTypeObject *cls,
 {
     int err, flush;
     Py_buffer data;
-    PyObject *RetVal = NULL;
+    PyObject *RetVal;
     Py_ssize_t ibuflen;
+    _BlocksOutputBuffer buffer = {.list = NULL};
 
     PyObject *module = PyType_GetModule(cls);
     if (module == NULL) {
@@ -1144,14 +1189,19 @@ zlib_Decompress_flush_impl(compobject *self, PyTypeObject *cls,
     self->zst.next_in = data.buf;
     ibuflen = data.len;
 
+    if (Buffer_InitWithSize(&buffer, length, &self->zst.next_out, &self->zst.avail_out) < 0) {
+        goto abort;
+    }
+
     do {
         arrange_input_buffer(&self->zst, &ibuflen);
         flush = ibuflen == 0 ? Z_FINISH : Z_NO_FLUSH;
 
         do {
-            length = arrange_output_buffer(&self->zst, &RetVal, length);
-            if (length < 0)
-                goto abort;
+            if (self->zst.avail_out == 0) {
+                if (Buffer_Grow(&buffer, &self->zst.next_out, &self->zst.avail_out) < 0)
+                    goto abort;
+            }
 
             Py_BEGIN_ALLOW_THREADS
             err = inflate(&self->zst, flush);
@@ -1193,13 +1243,14 @@ zlib_Decompress_flush_impl(compobject *self, PyTypeObject *cls,
         }
     }
 
-    if (_PyBytes_Resize(&RetVal, self->zst.next_out -
-                        (Byte *)PyBytes_AS_STRING(RetVal)) == 0) {
+    RetVal = Buffer_Finish(&buffer, self->zst.avail_out);
+    if (RetVal != NULL) {
         goto success;
     }
 
  abort:
-    Py_CLEAR(RetVal);
+    Buffer_OnError(&buffer);
+    RetVal = NULL;
  success:
     PyBuffer_Release(&data);
     LEAVE_ZLIB(self);