| /* |
| * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved. |
| * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
| * |
| * This code is free software; you can redistribute it and/or modify it |
| * under the terms of the GNU General Public License version 2 only, as |
| * published by the Free Software Foundation. Oracle designates this |
| * particular file as subject to the "Classpath" exception as provided |
| * by Oracle in the LICENSE file that accompanied this code. |
| * |
| * This code is distributed in the hope that it will be useful, but WITHOUT |
| * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
| * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
| * version 2 for more details (a copy is included in the LICENSE file that |
| * accompanied this code). |
| * |
| * You should have received a copy of the GNU General Public License version |
| * 2 along with this work; if not, write to the Free Software Foundation, |
| * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
| * |
| * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
| * or visit www.oracle.com if you need additional information or have any |
| * questions. |
| */ |
| |
| package jdk.incubator.http; |
| |
| import java.io.IOException; |
| import java.io.UncheckedIOException; |
| import java.net.URI; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.FileChannel; |
| import java.nio.file.Files; |
| import java.nio.file.OpenOption; |
| import java.nio.file.Path; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CompletionStage; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.Flow; |
| import java.util.function.Consumer; |
| import java.util.function.Function; |
| import jdk.incubator.http.internal.common.MinimalFuture; |
| import jdk.incubator.http.internal.common.Utils; |
| import jdk.incubator.http.internal.common.Log; |
| |
| class ResponseProcessors { |
| |
| static class ConsumerProcessor implements HttpResponse.BodyProcessor<Void> { |
| private final Consumer<Optional<byte[]>> consumer; |
| private Flow.Subscription subscription; |
| private final CompletableFuture<Void> result = new MinimalFuture<>(); |
| |
| ConsumerProcessor(Consumer<Optional<byte[]>> consumer) { |
| this.consumer = consumer; |
| } |
| |
| @Override |
| public CompletionStage<Void> getBody() { |
| return result; |
| } |
| |
| @Override |
| public void onSubscribe(Flow.Subscription subscription) { |
| this.subscription = subscription; |
| subscription.request(1); |
| } |
| |
| @Override |
| public void onNext(ByteBuffer item) { |
| byte[] buf = new byte[item.remaining()]; |
| item.get(buf); |
| consumer.accept(Optional.of(buf)); |
| subscription.request(1); |
| } |
| |
| @Override |
| public void onError(Throwable throwable) { |
| result.completeExceptionally(throwable); |
| } |
| |
| @Override |
| public void onComplete() { |
| consumer.accept(Optional.empty()); |
| result.complete(null); |
| } |
| |
| } |
| |
| static class PathProcessor implements HttpResponse.BodyProcessor<Path> { |
| |
| private final Path file; |
| private final CompletableFuture<Path> result = new MinimalFuture<>(); |
| |
| private Flow.Subscription subscription; |
| private FileChannel out; |
| private final OpenOption[] options; |
| |
| PathProcessor(Path file, OpenOption... options) { |
| this.file = file; |
| this.options = options; |
| } |
| |
| @Override |
| public void onSubscribe(Flow.Subscription subscription) { |
| this.subscription = subscription; |
| try { |
| out = FileChannel.open(file, options); |
| } catch (IOException e) { |
| result.completeExceptionally(e); |
| subscription.cancel(); |
| return; |
| } |
| subscription.request(1); |
| } |
| |
| @Override |
| public void onNext(ByteBuffer item) { |
| try { |
| out.write(item); |
| } catch (IOException ex) { |
| Utils.close(out); |
| subscription.cancel(); |
| result.completeExceptionally(ex); |
| } |
| subscription.request(1); |
| } |
| |
| @Override |
| public void onError(Throwable e) { |
| result.completeExceptionally(e); |
| Utils.close(out); |
| } |
| |
| @Override |
| public void onComplete() { |
| Utils.close(out); |
| result.complete(file); |
| } |
| |
| @Override |
| public CompletionStage<Path> getBody() { |
| return result; |
| } |
| } |
| |
| static class ByteArrayProcessor<T> implements HttpResponse.BodyProcessor<T> { |
| private final Function<byte[], T> finisher; |
| private final CompletableFuture<T> result = new MinimalFuture<>(); |
| private final List<ByteBuffer> received = new ArrayList<>(); |
| |
| private Flow.Subscription subscription; |
| |
| ByteArrayProcessor(Function<byte[],T> finisher) { |
| this.finisher = finisher; |
| } |
| |
| @Override |
| public void onSubscribe(Flow.Subscription subscription) { |
| if (this.subscription != null) { |
| subscription.cancel(); |
| return; |
| } |
| this.subscription = subscription; |
| // We can handle whatever you've got |
| subscription.request(Long.MAX_VALUE); |
| } |
| |
| @Override |
| public void onNext(ByteBuffer item) { |
| // incoming buffers are allocated by http client internally, |
| // and won't be used anywhere except this place. |
| // So it's free simply to store them for further processing. |
| if(item.hasRemaining()) { |
| received.add(item); |
| } |
| } |
| |
| @Override |
| public void onError(Throwable throwable) { |
| received.clear(); |
| result.completeExceptionally(throwable); |
| } |
| |
| static private byte[] join(List<ByteBuffer> bytes) { |
| int size = Utils.remaining(bytes); |
| byte[] res = new byte[size]; |
| int from = 0; |
| for (ByteBuffer b : bytes) { |
| int l = b.remaining(); |
| b.get(res, from, l); |
| from += l; |
| } |
| return res; |
| } |
| |
| @Override |
| public void onComplete() { |
| try { |
| result.complete(finisher.apply(join(received))); |
| received.clear(); |
| } catch (IllegalArgumentException e) { |
| result.completeExceptionally(e); |
| } |
| } |
| |
| @Override |
| public CompletionStage<T> getBody() { |
| return result; |
| } |
| } |
| |
| static class MultiProcessorImpl<V> implements HttpResponse.MultiProcessor<MultiMapResult<V>,V> { |
| private final MultiMapResult<V> results; |
| private final Function<HttpRequest,Optional<HttpResponse.BodyHandler<V>>> pushHandler; |
| private final boolean completion; // aggregate completes on last PP received or overall completion |
| |
| MultiProcessorImpl(Function<HttpRequest,Optional<HttpResponse.BodyHandler<V>>> pushHandler, boolean completion) { |
| this.results = new MultiMapResult<V>(new ConcurrentHashMap<>()); |
| this.pushHandler = pushHandler; |
| this.completion = completion; |
| } |
| |
| @Override |
| public Optional<HttpResponse.BodyHandler<V>> onRequest(HttpRequest request) { |
| return pushHandler.apply(request); |
| } |
| |
| @Override |
| public void onResponse(HttpResponse<V> response) { |
| results.put(response.request(), CompletableFuture.completedFuture(response)); |
| } |
| |
| @Override |
| public void onError(HttpRequest request, Throwable t) { |
| results.put(request, MinimalFuture.failedFuture(t)); |
| } |
| |
| @Override |
| public CompletableFuture<MultiMapResult<V>> completion( |
| CompletableFuture<Void> onComplete, CompletableFuture<Void> onFinalPushPromise) { |
| if (completion) |
| return onComplete.thenApply((ignored)-> results); |
| else |
| return onFinalPushPromise.thenApply((ignored) -> results); |
| } |
| } |
| |
| static class MultiFile { |
| |
| final Path pathRoot; |
| |
| MultiFile(Path destination) { |
| if (!destination.toFile().isDirectory()) |
| throw new UncheckedIOException(new IOException("destination is not a directory")); |
| pathRoot = destination; |
| } |
| |
| Optional<HttpResponse.BodyHandler<Path>> handlePush(HttpRequest request) { |
| final URI uri = request.uri(); |
| String path = uri.getPath(); |
| while (path.startsWith("/")) |
| path = path.substring(1); |
| Path p = pathRoot.resolve(path); |
| if (Log.trace()) { |
| Log.logTrace("Creating file body processor for URI={0}, path={1}", |
| uri, p); |
| } |
| try { |
| Files.createDirectories(p.getParent()); |
| } catch (IOException ex) { |
| throw new UncheckedIOException(ex); |
| } |
| |
| final HttpResponse.BodyHandler<Path> proc = |
| HttpResponse.BodyHandler.asFile(p); |
| |
| return Optional.of(proc); |
| } |
| } |
| |
| /** |
| * Currently this consumes all of the data and ignores it |
| */ |
| static class NullProcessor<T> implements HttpResponse.BodyProcessor<T> { |
| |
| Flow.Subscription subscription; |
| final CompletableFuture<T> cf = new MinimalFuture<>(); |
| final Optional<T> result; |
| |
| NullProcessor(Optional<T> result) { |
| this.result = result; |
| } |
| |
| @Override |
| public void onSubscribe(Flow.Subscription subscription) { |
| this.subscription = subscription; |
| subscription.request(Long.MAX_VALUE); |
| } |
| |
| @Override |
| public void onNext(ByteBuffer item) { |
| // TODO: check whether this should consume the buffer, as in: |
| item.position(item.limit()); |
| } |
| |
| @Override |
| public void onError(Throwable throwable) { |
| cf.completeExceptionally(throwable); |
| } |
| |
| @Override |
| public void onComplete() { |
| if (result.isPresent()) { |
| cf.complete(result.get()); |
| } else { |
| cf.complete(null); |
| } |
| } |
| |
| @Override |
| public CompletionStage<T> getBody() { |
| return cf; |
| } |
| } |
| } |