blob: 1e37edcab3ec167e6ef1c39f9e1a4818c1a003a9 [file] [log] [blame]
/*
* Copyright (C) 2010 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.common.collect;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.base.Equivalence;
import com.google.common.base.Function;
import com.google.common.collect.MapMaker.RemovalCause;
import com.google.common.collect.MapMaker.RemovalListener;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.ref.ReferenceQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReferenceArray;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
/**
* Adds computing functionality to {@link MapMakerInternalMap}.
*
* @author Bob Lee
* @author Charles Fry
*/
class ComputingConcurrentHashMap<K, V> extends MapMakerInternalMap<K, V> {
final Function<? super K, ? extends V> computingFunction;
/**
* Creates a new, empty map with the specified strategy, initial capacity, load factor and
* concurrency level.
*/
ComputingConcurrentHashMap(MapMaker builder,
Function<? super K, ? extends V> computingFunction) {
super(builder);
this.computingFunction = checkNotNull(computingFunction);
}
@Override
Segment<K, V> createSegment(int initialCapacity, int maxSegmentSize) {
return new ComputingSegment<K, V>(this, initialCapacity, maxSegmentSize);
}
@Override
ComputingSegment<K, V> segmentFor(int hash) {
return (ComputingSegment<K, V>) super.segmentFor(hash);
}
V getOrCompute(K key) throws ExecutionException {
int hash = hash(checkNotNull(key));
return segmentFor(hash).getOrCompute(key, hash, computingFunction);
}
@SuppressWarnings("serial") // This class is never serialized.
static final class ComputingSegment<K, V> extends Segment<K, V> {
ComputingSegment(MapMakerInternalMap<K, V> map, int initialCapacity, int maxSegmentSize) {
super(map, initialCapacity, maxSegmentSize);
}
V getOrCompute(K key, int hash, Function<? super K, ? extends V> computingFunction)
throws ExecutionException {
try {
outer: while (true) {
// don't call getLiveEntry, which would ignore computing values
ReferenceEntry<K, V> e = getEntry(key, hash);
if (e != null) {
V value = getLiveValue(e);
if (value != null) {
recordRead(e);
return value;
}
}
// at this point e is either null, computing, or expired;
// avoid locking if it's already computing
if (e == null || !e.getValueReference().isComputingReference()) {
boolean createNewEntry = true;
ComputingValueReference<K, V> computingValueReference = null;
lock();
try {
preWriteCleanup();
int newCount = this.count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
for (e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash && entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
ValueReference<K, V> valueReference = e.getValueReference();
if (valueReference.isComputingReference()) {
createNewEntry = false;
} else {
V value = e.getValueReference().get();
if (value == null) {
enqueueNotification(entryKey, hash, value, RemovalCause.COLLECTED);
} else if (map.expires() && map.isExpired(e)) {
// This is a duplicate check, as preWriteCleanup already purged expired
// entries, but let's accomodate an incorrect expiration queue.
enqueueNotification(entryKey, hash, value, RemovalCause.EXPIRED);
} else {
recordLockedRead(e);
return value;
}
// immediately reuse invalid entries
evictionQueue.remove(e);
expirationQueue.remove(e);
this.count = newCount; // write-volatile
}
break;
}
}
if (createNewEntry) {
computingValueReference = new ComputingValueReference<K, V>(computingFunction);
if (e == null) {
e = newEntry(key, hash, first);
e.setValueReference(computingValueReference);
table.set(index, e);
} else {
e.setValueReference(computingValueReference);
}
}
} finally {
unlock();
postWriteCleanup();
}
if (createNewEntry) {
// This thread solely created the entry.
return compute(key, hash, e, computingValueReference);
}
}
// The entry already exists. Wait for the computation.
checkState(!Thread.holdsLock(e), "Recursive computation");
// don't consider expiration as we're concurrent with computation
V value = e.getValueReference().waitForValue();
if (value != null) {
recordRead(e);
return value;
}
// else computing thread will clearValue
continue outer;
}
} finally {
postReadCleanup();
}
}
V compute(K key, int hash, ReferenceEntry<K, V> e,
ComputingValueReference<K, V> computingValueReference)
throws ExecutionException {
V value = null;
long start = System.nanoTime();
long end = 0;
try {
// Synchronizes on the entry to allow failing fast when a recursive computation is
// detected. This is not fool-proof since the entry may be copied when the segment
// is written to.
synchronized (e) {
value = computingValueReference.compute(key, hash);
end = System.nanoTime();
}
if (value != null) {
// putIfAbsent
V oldValue = put(key, hash, value, true);
if (oldValue != null) {
// the computed value was already clobbered
enqueueNotification(key, hash, value, RemovalCause.REPLACED);
}
}
return value;
} finally {
if (end == 0) {
end = System.nanoTime();
}
if (value == null) {
clearValue(key, hash, computingValueReference);
}
}
}
}
/**
* Used to provide computation exceptions to other threads.
*/
private static final class ComputationExceptionReference<K, V> implements ValueReference<K, V> {
final Throwable t;
ComputationExceptionReference(Throwable t) {
this.t = t;
}
@Override
public V get() {
return null;
}
@Override
public ReferenceEntry<K, V> getEntry() {
return null;
}
@Override
public ValueReference<K, V> copyFor(ReferenceQueue<V> queue, ReferenceEntry<K, V> entry) {
return this;
}
@Override
public boolean isComputingReference() {
return false;
}
@Override
public V waitForValue() throws ExecutionException {
throw new ExecutionException(t);
}
@Override
public void clear(ValueReference<K, V> newValue) {}
}
/**
* Used to provide computation result to other threads.
*/
private static final class ComputedReference<K, V> implements ValueReference<K, V> {
final V value;
ComputedReference(@Nullable V value) {
this.value = value;
}
@Override
public V get() {
return value;
}
@Override
public ReferenceEntry<K, V> getEntry() {
return null;
}
@Override
public ValueReference<K, V> copyFor(ReferenceQueue<V> queue, ReferenceEntry<K, V> entry) {
return this;
}
@Override
public boolean isComputingReference() {
return false;
}
@Override
public V waitForValue() {
return get();
}
@Override
public void clear(ValueReference<K, V> newValue) {}
}
private static final class ComputingValueReference<K, V> implements ValueReference<K, V> {
final Function<? super K, ? extends V> computingFunction;
@GuardedBy("ComputingValueReference.this") // writes
volatile ValueReference<K, V> computedReference = unset();
public ComputingValueReference(Function<? super K, ? extends V> computingFunction) {
this.computingFunction = computingFunction;
}
@Override
public V get() {
// All computation lookups go through waitForValue. This method thus is
// only used by put, to whom we always want to appear absent.
return null;
}
@Override
public ReferenceEntry<K, V> getEntry() {
return null;
}
@Override
public ValueReference<K, V> copyFor(ReferenceQueue<V> queue, ReferenceEntry<K, V> entry) {
return this;
}
@Override
public boolean isComputingReference() {
return true;
}
/**
* Waits for a computation to complete. Returns the result of the computation.
*/
@Override
public V waitForValue() throws ExecutionException {
if (computedReference == UNSET) {
boolean interrupted = false;
try {
synchronized (this) {
while (computedReference == UNSET) {
try {
wait();
} catch (InterruptedException ie) {
interrupted = true;
}
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
return computedReference.waitForValue();
}
@Override
public void clear(ValueReference<K, V> newValue) {
// The pending computation was clobbered by a manual write. Unblock all
// pending gets, and have them return the new value.
setValueReference(newValue);
// TODO(fry): could also cancel computation if we had a thread handle
}
V compute(K key, int hash) throws ExecutionException {
V value;
try {
value = computingFunction.apply(key);
} catch (Throwable t) {
setValueReference(new ComputationExceptionReference<K, V>(t));
throw new ExecutionException(t);
}
setValueReference(new ComputedReference<K, V>(value));
return value;
}
void setValueReference(ValueReference<K, V> valueReference) {
synchronized (this) {
if (computedReference == UNSET) {
computedReference = valueReference;
notifyAll();
}
}
}
}
// Serialization Support
private static final long serialVersionUID = 4;
@Override
Object writeReplace() {
return new ComputingSerializationProxy<K, V>(keyStrength, valueStrength, keyEquivalence,
valueEquivalence, expireAfterWriteNanos, expireAfterAccessNanos, maximumSize,
concurrencyLevel, removalListener, this, computingFunction);
}
static final class ComputingSerializationProxy<K, V> extends AbstractSerializationProxy<K, V> {
final Function<? super K, ? extends V> computingFunction;
ComputingSerializationProxy(Strength keyStrength, Strength valueStrength,
Equivalence<Object> keyEquivalence, Equivalence<Object> valueEquivalence,
long expireAfterWriteNanos, long expireAfterAccessNanos, int maximumSize,
int concurrencyLevel, RemovalListener<? super K, ? super V> removalListener,
ConcurrentMap<K, V> delegate, Function<? super K, ? extends V> computingFunction) {
super(keyStrength, valueStrength, keyEquivalence, valueEquivalence, expireAfterWriteNanos,
expireAfterAccessNanos, maximumSize, concurrencyLevel, removalListener, delegate);
this.computingFunction = computingFunction;
}
private void writeObject(ObjectOutputStream out) throws IOException {
out.defaultWriteObject();
writeMapTo(out);
}
@SuppressWarnings("deprecation") // self-use
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
MapMaker mapMaker = readMapMaker(in);
delegate = mapMaker.makeComputingMap(computingFunction);
readEntries(in);
}
Object readResolve() {
return delegate;
}
private static final long serialVersionUID = 4;
}
}