bpo-41486: Faster bz2/lzma/zlib via new output buffering (GH-21740)

Faster bz2/lzma/zlib via new output buffering.
Also adds .readall() function to _compression.DecompressReader class
to take best advantage of this in the consume-all-output at once scenario.

Often a 5-20% speedup in common scenarios due to less data copying.

Contributed by Ma Lin.
diff --git a/Modules/_bz2module.c b/Modules/_bz2module.c
index bfcdac6..9893a63 100644
--- a/Modules/_bz2module.c
+++ b/Modules/_bz2module.c
@@ -8,6 +8,59 @@
 #include <bzlib.h>
 #include <stdio.h>
 
+// Blocks output buffer wrappers
+#include "pycore_blocks_output_buffer.h"
+
+#if OUTPUT_BUFFER_MAX_BLOCK_SIZE > UINT32_MAX
+    #error "The maximum block size accepted by libbzip2 is UINT32_MAX."
+#endif
+
+/* On success, return value >= 0
+   On failure, return -1 */
+static inline Py_ssize_t
+Buffer_InitAndGrow(_BlocksOutputBuffer *buffer, Py_ssize_t max_length,
+                   char **next_out, uint32_t *avail_out)
+{
+    Py_ssize_t allocated;
+
+    allocated = _BlocksOutputBuffer_InitAndGrow(
+                    buffer, max_length, (void**) next_out);
+    *avail_out = (uint32_t) allocated;
+    return allocated;
+}
+
+/* On success, return value >= 0
+   On failure, return -1 */
+static inline Py_ssize_t
+Buffer_Grow(_BlocksOutputBuffer *buffer,
+            char **next_out, uint32_t *avail_out)
+{
+    Py_ssize_t allocated;
+
+    allocated = _BlocksOutputBuffer_Grow(
+                    buffer, (void**) next_out, (Py_ssize_t) *avail_out);
+    *avail_out = (uint32_t) allocated;
+    return allocated;
+}
+
+static inline Py_ssize_t
+Buffer_GetDataSize(_BlocksOutputBuffer *buffer, uint32_t avail_out)
+{
+    return _BlocksOutputBuffer_GetDataSize(buffer, (Py_ssize_t) avail_out);
+}
+
+static inline PyObject *
+Buffer_Finish(_BlocksOutputBuffer *buffer, uint32_t avail_out)
+{
+    return _BlocksOutputBuffer_Finish(buffer, (Py_ssize_t) avail_out);
+}
+
+static inline void
+Buffer_OnError(_BlocksOutputBuffer *buffer)
+{
+    _BlocksOutputBuffer_OnError(buffer);
+}
+
 
 #ifndef BZ_CONFIG_ERROR
 #define BZ2_bzCompress bzCompress
@@ -115,52 +168,22 @@ catch_bz2_error(int bzerror)
     }
 }
 
-#if BUFSIZ < 8192
-#define INITIAL_BUFFER_SIZE 8192
-#else
-#define INITIAL_BUFFER_SIZE BUFSIZ
-#endif
-
-static int
-grow_buffer(PyObject **buf, Py_ssize_t max_length)
-{
-    /* Expand the buffer by an amount proportional to the current size,
-       giving us amortized linear-time behavior. Use a less-than-double
-       growth factor to avoid excessive allocation. */
-    size_t size = PyBytes_GET_SIZE(*buf);
-    size_t new_size = size + (size >> 3) + 6;
-
-    if (max_length > 0 && new_size > (size_t) max_length)
-        new_size = (size_t) max_length;
-
-    if (new_size > size) {
-        return _PyBytes_Resize(buf, new_size);
-    } else {  /* overflow */
-        PyErr_SetString(PyExc_OverflowError,
-                        "Unable to allocate buffer - output too large");
-        return -1;
-    }
-}
-
 
 /* BZ2Compressor class. */
 
 static PyObject *
 compress(BZ2Compressor *c, char *data, size_t len, int action)
 {
-    size_t data_size = 0;
     PyObject *result;
+    _BlocksOutputBuffer buffer = {.list = NULL};
 
-    result = PyBytes_FromStringAndSize(NULL, INITIAL_BUFFER_SIZE);
-    if (result == NULL)
-        return NULL;
-
+    if (Buffer_InitAndGrow(&buffer, -1, &c->bzs.next_out, &c->bzs.avail_out) < 0) {
+        goto error;
+    }
     c->bzs.next_in = data;
     c->bzs.avail_in = 0;
-    c->bzs.next_out = PyBytes_AS_STRING(result);
-    c->bzs.avail_out = INITIAL_BUFFER_SIZE;
+
     for (;;) {
-        char *this_out;
         int bzerror;
 
         /* On a 64-bit system, len might not fit in avail_in (an unsigned int).
@@ -175,21 +198,15 @@ compress(BZ2Compressor *c, char *data, size_t len, int action)
             break;
 
         if (c->bzs.avail_out == 0) {
-            size_t buffer_left = PyBytes_GET_SIZE(result) - data_size;
-            if (buffer_left == 0) {
-                if (grow_buffer(&result, -1) < 0)
-                    goto error;
-                c->bzs.next_out = PyBytes_AS_STRING(result) + data_size;
-                buffer_left = PyBytes_GET_SIZE(result) - data_size;
+            if (Buffer_Grow(&buffer, &c->bzs.next_out, &c->bzs.avail_out) < 0) {
+                goto error;
             }
-            c->bzs.avail_out = (unsigned int)Py_MIN(buffer_left, UINT_MAX);
         }
 
         Py_BEGIN_ALLOW_THREADS
-        this_out = c->bzs.next_out;
         bzerror = BZ2_bzCompress(&c->bzs, action);
-        data_size += c->bzs.next_out - this_out;
         Py_END_ALLOW_THREADS
+
         if (catch_bz2_error(bzerror))
             goto error;
 
@@ -197,13 +214,14 @@ compress(BZ2Compressor *c, char *data, size_t len, int action)
         if (action == BZ_FINISH && bzerror == BZ_STREAM_END)
             break;
     }
-    if (data_size != (size_t)PyBytes_GET_SIZE(result))
-        if (_PyBytes_Resize(&result, data_size) < 0)
-            goto error;
-    return result;
+
+    result = Buffer_Finish(&buffer, c->bzs.avail_out);
+    if (result != NULL) {
+        return result;
+    }
 
 error:
-    Py_XDECREF(result);
+    Buffer_OnError(&buffer);
     return NULL;
 }
 
@@ -420,36 +438,29 @@ decompress_buf(BZ2Decompressor *d, Py_ssize_t max_length)
     /* data_size is strictly positive, but because we repeatedly have to
        compare against max_length and PyBytes_GET_SIZE we declare it as
        signed */
-    Py_ssize_t data_size = 0;
     PyObject *result;
+    _BlocksOutputBuffer buffer = {.list = NULL};
     bz_stream *bzs = &d->bzs;
 
-    if (max_length < 0 || max_length >= INITIAL_BUFFER_SIZE)
-        result = PyBytes_FromStringAndSize(NULL, INITIAL_BUFFER_SIZE);
-    else
-        result = PyBytes_FromStringAndSize(NULL, max_length);
-    if (result == NULL)
-        return NULL;
+    if (Buffer_InitAndGrow(&buffer, max_length, &bzs->next_out, &bzs->avail_out) < 0) {
+        goto error;
+    }
 
-    bzs->next_out = PyBytes_AS_STRING(result);
     for (;;) {
         int bzret;
-        size_t avail;
-
         /* On a 64-bit system, buffer length might not fit in avail_out, so we
            do decompression in chunks of no more than UINT_MAX bytes
            each. Note that the expression for `avail` is guaranteed to be
            positive, so the cast is safe. */
-        avail = (size_t) (PyBytes_GET_SIZE(result) - data_size);
-        bzs->avail_out = (unsigned int)Py_MIN(avail, UINT_MAX);
         bzs->avail_in = (unsigned int)Py_MIN(d->bzs_avail_in_real, UINT_MAX);
         d->bzs_avail_in_real -= bzs->avail_in;
 
         Py_BEGIN_ALLOW_THREADS
         bzret = BZ2_bzDecompress(bzs);
-        data_size = bzs->next_out - PyBytes_AS_STRING(result);
-        d->bzs_avail_in_real += bzs->avail_in;
         Py_END_ALLOW_THREADS
+
+        d->bzs_avail_in_real += bzs->avail_in;
+
         if (catch_bz2_error(bzret))
             goto error;
         if (bzret == BZ_STREAM_END) {
@@ -458,22 +469,21 @@ decompress_buf(BZ2Decompressor *d, Py_ssize_t max_length)
         } else if (d->bzs_avail_in_real == 0) {
             break;
         } else if (bzs->avail_out == 0) {
-            if (data_size == max_length)
+            if (Buffer_GetDataSize(&buffer, bzs->avail_out) == max_length)
                 break;
-            if (data_size == PyBytes_GET_SIZE(result) &&
-                grow_buffer(&result, max_length) == -1)
+            if (Buffer_Grow(&buffer, &bzs->next_out, &bzs->avail_out) < 0) {
                 goto error;
-            bzs->next_out = PyBytes_AS_STRING(result) + data_size;
+            }
         }
     }
-    if (data_size != PyBytes_GET_SIZE(result))
-        if (_PyBytes_Resize(&result, data_size) == -1)
-            goto error;
 
-    return result;
+    result = Buffer_Finish(&buffer, bzs->avail_out);
+    if (result != NULL) {
+        return result;
+    }
 
 error:
-    Py_XDECREF(result);
+    Buffer_OnError(&buffer);
     return NULL;
 }
 
@@ -668,7 +678,7 @@ static int
 _bz2_BZ2Decompressor___init__(PyObject *self, PyObject *args, PyObject *kwargs)
 {
     int return_value = -1;
-    
+
     if (!_PyArg_NoPositional("BZ2Decompressor", args)) {
         goto exit;
     }