| /* |
| * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. |
| * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
| * |
| * This code is free software; you can redistribute it and/or modify it |
| * under the terms of the GNU General Public License version 2 only, as |
| * published by the Free Software Foundation. Oracle designates this |
| * particular file as subject to the "Classpath" exception as provided |
| * by Oracle in the LICENSE file that accompanied this code. |
| * |
| * This code is distributed in the hope that it will be useful, but WITHOUT |
| * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
| * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
| * version 2 for more details (a copy is included in the LICENSE file that |
| * accompanied this code). |
| * |
| * You should have received a copy of the GNU General Public License version |
| * 2 along with this work; if not, write to the Free Software Foundation, |
| * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
| * |
| * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
| * or visit www.oracle.com if you need additional information or have any |
| * questions. |
| */ |
| package java.util.stream; |
| |
| import java.util.Spliterator; |
| import java.util.concurrent.CountedCompleter; |
| import java.util.concurrent.ForkJoinPool; |
| |
| /** |
| * Abstract base class for most fork-join tasks used to implement stream ops. |
| * Manages splitting logic, tracking of child tasks, and intermediate results. |
| * Each task is associated with a {@link Spliterator} that describes the portion |
| * of the input associated with the subtree rooted at this task. |
| * Tasks may be leaf nodes (which will traverse the elements of |
| * the {@code Spliterator}) or internal nodes (which split the |
| * {@code Spliterator} into multiple child tasks). |
| * |
| * @implNote |
| * <p>This class is based on {@link CountedCompleter}, a form of fork-join task |
| * where each task has a semaphore-like count of uncompleted children, and the |
| * task is implicitly completed and notified when its last child completes. |
| * Internal node tasks will likely override the {@code onCompletion} method from |
| * {@code CountedCompleter} to merge the results from child tasks into the |
| * current task's result. |
| * |
| * <p>Splitting and setting up the child task links is done by {@code compute()} |
| * for internal nodes. At {@code compute()} time for leaf nodes, it is |
| * guaranteed that the parent's child-related fields (including sibling links |
| * for the parent's children) will be set up for all children. |
| * |
| * <p>For example, a task that performs a reduce would override {@code doLeaf()} |
| * to perform a reduction on that leaf node's chunk using the |
| * {@code Spliterator}, and override {@code onCompletion()} to merge the results |
| * of the child tasks for internal nodes: |
| * |
| * <pre>{@code |
| * protected S doLeaf() { |
| * spliterator.forEach(...); |
| * return localReductionResult; |
| * } |
| * |
| * public void onCompletion(CountedCompleter caller) { |
| * if (!isLeaf()) { |
| * ReduceTask<P_IN, P_OUT, T, R> child = children; |
| * R result = child.getLocalResult(); |
| * child = child.nextSibling; |
| * for (; child != null; child = child.nextSibling) |
| * result = combine(result, child.getLocalResult()); |
| * setLocalResult(result); |
| * } |
| * } |
| * }</pre> |
| * |
| * <p>Serialization is not supported as there is no intention to serialize |
| * tasks managed by stream ops. |
| * |
| * @param <P_IN> Type of elements input to the pipeline |
| * @param <P_OUT> Type of elements output from the pipeline |
| * @param <R> Type of intermediate result, which may be different from operation |
| * result type |
| * @param <K> Type of parent, child and sibling tasks |
| * @since 1.8 |
| */ |
| @SuppressWarnings("serial") |
| abstract class AbstractTask<P_IN, P_OUT, R, |
| K extends AbstractTask<P_IN, P_OUT, R, K>> |
| extends CountedCompleter<R> { |
| |
| /** |
| * Default target factor of leaf tasks for parallel decomposition. |
| * To allow load balancing, we over-partition, currently to approximately |
| * four tasks per processor, which enables others to help out |
| * if leaf tasks are uneven or some processors are otherwise busy. |
| */ |
| static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2; |
| |
| /** The pipeline helper, common to all tasks in a computation */ |
| protected final PipelineHelper<P_OUT> helper; |
| |
| /** |
| * The spliterator for the portion of the input associated with the subtree |
| * rooted at this task |
| */ |
| protected Spliterator<P_IN> spliterator; |
| |
| /** Target leaf size, common to all tasks in a computation */ |
| protected long targetSize; // may be lazily initialized |
| |
| /** |
| * The left child. |
| * null if no children |
| * if non-null rightChild is non-null |
| */ |
| protected K leftChild; |
| |
| /** |
| * The right child. |
| * null if no children |
| * if non-null leftChild is non-null |
| */ |
| protected K rightChild; |
| |
| /** The result of this node, if completed */ |
| private R localResult; |
| |
| /** |
| * Constructor for root nodes. |
| * |
| * @param helper The {@code PipelineHelper} describing the stream pipeline |
| * up to this operation |
| * @param spliterator The {@code Spliterator} describing the source for this |
| * pipeline |
| */ |
| protected AbstractTask(PipelineHelper<P_OUT> helper, |
| Spliterator<P_IN> spliterator) { |
| super(null); |
| this.helper = helper; |
| this.spliterator = spliterator; |
| this.targetSize = 0L; |
| } |
| |
| /** |
| * Constructor for non-root nodes. |
| * |
| * @param parent this node's parent task |
| * @param spliterator {@code Spliterator} describing the subtree rooted at |
| * this node, obtained by splitting the parent {@code Spliterator} |
| */ |
| protected AbstractTask(K parent, |
| Spliterator<P_IN> spliterator) { |
| super(parent); |
| this.spliterator = spliterator; |
| this.helper = parent.helper; |
| this.targetSize = parent.targetSize; |
| } |
| |
| /** |
| * Constructs a new node of type T whose parent is the receiver; must call |
| * the AbstractTask(T, Spliterator) constructor with the receiver and the |
| * provided Spliterator. |
| * |
| * @param spliterator {@code Spliterator} describing the subtree rooted at |
| * this node, obtained by splitting the parent {@code Spliterator} |
| * @return newly constructed child node |
| */ |
| protected abstract K makeChild(Spliterator<P_IN> spliterator); |
| |
| /** |
| * Computes the result associated with a leaf node. Will be called by |
| * {@code compute()} and the result passed to @{code setLocalResult()} |
| * |
| * @return the computed result of a leaf node |
| */ |
| protected abstract R doLeaf(); |
| |
| /** |
| * Returns a suggested target leaf size based on the initial size estimate. |
| * |
| * @return suggested target leaf size |
| */ |
| public static long suggestTargetSize(long sizeEstimate) { |
| long est = sizeEstimate / LEAF_TARGET; |
| return est > 0L ? est : 1L; |
| } |
| |
| /** |
| * Returns the targetSize, initializing it via the supplied |
| * size estimate if not already initialized. |
| */ |
| protected final long getTargetSize(long sizeEstimate) { |
| long s; |
| return ((s = targetSize) != 0 ? s : |
| (targetSize = suggestTargetSize(sizeEstimate))); |
| } |
| |
| /** |
| * Returns the local result, if any. Subclasses should use |
| * {@link #setLocalResult(Object)} and {@link #getLocalResult()} to manage |
| * results. This returns the local result so that calls from within the |
| * fork-join framework will return the correct result. |
| * |
| * @return local result for this node previously stored with |
| * {@link #setLocalResult} |
| */ |
| @Override |
| public R getRawResult() { |
| return localResult; |
| } |
| |
| /** |
| * Does nothing; instead, subclasses should use |
| * {@link #setLocalResult(Object)}} to manage results. |
| * |
| * @param result must be null, or an exception is thrown (this is a safety |
| * tripwire to detect when {@code setRawResult()} is being used |
| * instead of {@code setLocalResult()} |
| */ |
| @Override |
| protected void setRawResult(R result) { |
| if (result != null) |
| throw new IllegalStateException(); |
| } |
| |
| /** |
| * Retrieves a result previously stored with {@link #setLocalResult} |
| * |
| * @return local result for this node previously stored with |
| * {@link #setLocalResult} |
| */ |
| protected R getLocalResult() { |
| return localResult; |
| } |
| |
| /** |
| * Associates the result with the task, can be retrieved with |
| * {@link #getLocalResult} |
| * |
| * @param localResult local result for this node |
| */ |
| protected void setLocalResult(R localResult) { |
| this.localResult = localResult; |
| } |
| |
| /** |
| * Indicates whether this task is a leaf node. (Only valid after |
| * {@link #compute} has been called on this node). If the node is not a |
| * leaf node, then children will be non-null and numChildren will be |
| * positive. |
| * |
| * @return {@code true} if this task is a leaf node |
| */ |
| protected boolean isLeaf() { |
| return leftChild == null; |
| } |
| |
| /** |
| * Indicates whether this task is the root node |
| * |
| * @return {@code true} if this task is the root node. |
| */ |
| protected boolean isRoot() { |
| return getParent() == null; |
| } |
| |
| /** |
| * Returns the parent of this task, or null if this task is the root |
| * |
| * @return the parent of this task, or null if this task is the root |
| */ |
| @SuppressWarnings("unchecked") |
| protected K getParent() { |
| return (K) getCompleter(); |
| } |
| |
| /** |
| * Decides whether or not to split a task further or compute it |
| * directly. If computing directly, calls {@code doLeaf} and pass |
| * the result to {@code setRawResult}. Otherwise splits off |
| * subtasks, forking one and continuing as the other. |
| * |
| * <p> The method is structured to conserve resources across a |
| * range of uses. The loop continues with one of the child tasks |
| * when split, to avoid deep recursion. To cope with spliterators |
| * that may be systematically biased toward left-heavy or |
| * right-heavy splits, we alternate which child is forked versus |
| * continued in the loop. |
| */ |
| @Override |
| public void compute() { |
| Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators |
| long sizeEstimate = rs.estimateSize(); |
| long sizeThreshold = getTargetSize(sizeEstimate); |
| boolean forkRight = false; |
| @SuppressWarnings("unchecked") K task = (K) this; |
| while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) { |
| K leftChild, rightChild, taskToFork; |
| task.leftChild = leftChild = task.makeChild(ls); |
| task.rightChild = rightChild = task.makeChild(rs); |
| task.setPendingCount(1); |
| if (forkRight) { |
| forkRight = false; |
| rs = ls; |
| task = leftChild; |
| taskToFork = rightChild; |
| } |
| else { |
| forkRight = true; |
| task = rightChild; |
| taskToFork = leftChild; |
| } |
| taskToFork.fork(); |
| sizeEstimate = rs.estimateSize(); |
| } |
| task.setLocalResult(task.doLeaf()); |
| task.tryComplete(); |
| } |
| |
| /** |
| * {@inheritDoc} |
| * |
| * @implNote |
| * Clears spliterator and children fields. Overriders MUST call |
| * {@code super.onCompletion} as the last thing they do if they want these |
| * cleared. |
| */ |
| @Override |
| public void onCompletion(CountedCompleter<?> caller) { |
| spliterator = null; |
| leftChild = rightChild = null; |
| } |
| |
| /** |
| * Returns whether this node is a "leftmost" node -- whether the path from |
| * the root to this node involves only traversing leftmost child links. For |
| * a leaf node, this means it is the first leaf node in the encounter order. |
| * |
| * @return {@code true} if this node is a "leftmost" node |
| */ |
| protected boolean isLeftmostNode() { |
| @SuppressWarnings("unchecked") |
| K node = (K) this; |
| while (node != null) { |
| K parent = node.getParent(); |
| if (parent != null && parent.leftChild != node) |
| return false; |
| node = parent; |
| } |
| return true; |
| } |
| } |