diff --git a/build.gradle b/build.gradle index 570ceb242f..8d0d2fa087 100644 --- a/build.gradle +++ b/build.gradle @@ -26,7 +26,7 @@ dependencies { testCompile 'junit:junit:4.12' testCompile 'org.mockito:mockito-core:1.10.19' - testCompile 'com.google.guava:guava:19.0' + testCompile 'com.google.guava:guava:24.0-jre' testCompile 'com.pushtorefresh.java-private-constructor-checker:checker:1.2.0' perfCompile 'org.openjdk.jmh:jmh-core:1.11.3' diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 5d1896008e..99e84ec609 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -7275,7 +7275,7 @@ public final void forEach(final Action1 onNext, final Action1ReactiveX operators documentation: GroupBy */ public final Observable> groupBy(final Func1 keySelector, final Func1 elementSelector) { - return lift(new OperatorGroupBy(keySelector, elementSelector)); + return lift(new OperatorGroupByEvicting(keySelector, elementSelector)); } /** @@ -7334,7 +7334,12 @@ public final Observable> groupBy(final Func1ReactiveX operators documentation: GroupBy * @since 1.3 + * @deprecated since 1.3.7, use {@link #groupBy(Func1, Func1, int, boolean, Func1)} + * instead which uses much less memory. Please take note of the + * usage difference involving the evicting action which now expects + * the value from the map instead of the key. */ + @Deprecated public final Observable> groupBy(final Func1 keySelector, final Func1 elementSelector, final Func1, Map> evictingMapFactory) { if (evictingMapFactory == null) { @@ -7369,6 +7374,72 @@ public final Observable> groupBy(final Func1 + * {@code + * Func1, Map> mapFactory + * = action -> CacheBuilder.newBuilder() + * .maximumSize(1000) + * .expireAfterAccess(12, TimeUnit.HOURS) + * .removalListener(entry -> action.call(entry.getValue())) + * . build().asMap(); + * } + * + * + * @param + * the key type + * @param + * the element type + * @return an {@code Observable} that emits {@link GroupedObservable}s, each of which corresponds to a + * unique key value and each of which emits those items from the source Observable that share that + * key value + * @throws NullPointerException + * if {@code evictingMapFactory} is null + * @see ReactiveX operators documentation: GroupBy + * @since 1.3.7 + */ + @Experimental + public final Observable> groupBy(final Func1 keySelector, + final Func1 elementSelector, int bufferSize, boolean delayError, + final Func1, Map> evictingMapFactory) { + if (evictingMapFactory == null) { + throw new NullPointerException("evictingMapFactory cannot be null"); + } + return lift(new OperatorGroupByEvicting( + keySelector, elementSelector, bufferSize, delayError, evictingMapFactory)); + } + + /** + * Groups the items emitted by an {@code Observable} according to a specified criterion, and emits these + * grouped items as {@link GroupedObservable}s. The emitted {@code GroupedObservable} allows only a single + * {@link Subscriber} during its lifetime and if this {@code Subscriber} unsubscribes before the + * source terminates, the next emission by the source having the same key will trigger a new + * {@code GroupedObservable} emission. + *

+ * + *

+ * Note: A {@link GroupedObservable} will cache the items it is to emit until such time as it + * is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those + * {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they may + * discard their buffers by applying an operator like {@link #ignoreElements} to them. + *

+ *
Scheduler:
+ *
{@code groupBy} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param keySelector + * a function that extracts the key for each item * @param * the key type * @return an {@code Observable} that emits {@link GroupedObservable}s, each of which corresponds to a @@ -7377,7 +7448,7 @@ public final Observable> groupBy(final Func1ReactiveX operators documentation: GroupBy */ public final Observable> groupBy(final Func1 keySelector) { - return lift(new OperatorGroupBy(keySelector)); + return lift(new OperatorGroupByEvicting(keySelector)); } /** diff --git a/src/main/java/rx/internal/operators/OperatorGroupBy.java b/src/main/java/rx/internal/operators/OperatorGroupBy.java index 8892f0d4d0..5d6ec2f556 100644 --- a/src/main/java/rx/internal/operators/OperatorGroupBy.java +++ b/src/main/java/rx/internal/operators/OperatorGroupBy.java @@ -42,13 +42,16 @@ * the source and group value type * @param * the value type of the groups + * @deprecated + * since 1.3.7, use {@link OperatorGroupByEvicting} instead */ +@Deprecated public final class OperatorGroupBy implements Operator, T> { final Func1 keySelector; final Func1 valueSelector; final int bufferSize; final boolean delayError; - final Func1, Map> mapFactory; + final Func1, Map> mapFactory; //nullable @SuppressWarnings({ "unchecked", "rawtypes" }) public OperatorGroupBy(Func1 keySelector) { @@ -116,6 +119,10 @@ public static final class GroupBySubscriber final int bufferSize; final boolean delayError; final Map> groups; + + // double store the groups to workaround the bug in the + // signature of groupBy with evicting map factory + final Map> groupsCopy; final Queue> queue; final GroupByProducer producer; final Queue evictedKeys; @@ -134,7 +141,7 @@ public static final class GroupBySubscriber volatile boolean done; final AtomicInteger wip; - + public GroupBySubscriber(Subscriber> actual, Func1 keySelector, Func1 valueSelector, int bufferSize, boolean delayError, Func1, Map> mapFactory) { @@ -158,6 +165,7 @@ public GroupBySubscriber(Subscriber> actual, Fun this.evictedKeys = new ConcurrentLinkedQueue(); this.groups = createMap(mapFactory, new EvictionAction(evictedKeys)); } + this.groupsCopy = new ConcurrentHashMap>(); } static class EvictionAction implements Action1 { @@ -211,6 +219,9 @@ public void onNext(T t) { if (!cancelled.get()) { group = GroupedUnicast.createWith(key, bufferSize, this, delayError); groups.put(mapKey, group); + if (evictedKeys != null) { + groupsCopy.put(mapKey, group); + } groupCount.getAndIncrement(); @@ -234,7 +245,9 @@ public void onNext(T t) { if (evictedKeys != null) { K evictedKey; while ((evictedKey = evictedKeys.poll()) != null) { - GroupedUnicast g = groups.get(evictedKey); + GroupedUnicast g = groupsCopy.remove(evictedKey); + // do a null check on g because cancel(K) could have cleared + // the map if (g != null) { g.onComplete(); } @@ -270,6 +283,7 @@ public void onCompleted() { } groups.clear(); if (evictedKeys != null) { + groupsCopy.clear(); evictedKeys.clear(); } @@ -304,6 +318,9 @@ public void cancel(K key) { unsubscribe(); } } + if (evictedKeys != null) { + groupsCopy.remove(mapKey); + } } void drain() { @@ -364,6 +381,7 @@ void errorAll(Subscriber> a, Queue q, Throwab List> list = new ArrayList>(groups.values()); groups.clear(); if (evictedKeys != null) { + groupsCopy.clear(); evictedKeys.clear(); } diff --git a/src/main/java/rx/internal/operators/OperatorGroupByEvicting.java b/src/main/java/rx/internal/operators/OperatorGroupByEvicting.java new file mode 100644 index 0000000000..d02ecf5d39 --- /dev/null +++ b/src/main/java/rx/internal/operators/OperatorGroupByEvicting.java @@ -0,0 +1,605 @@ +/** + * Copyright 2018 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.operators; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import rx.*; +import rx.Observable.*; +import rx.exceptions.Exceptions; +import rx.functions.*; +import rx.internal.producers.ProducerArbiter; +import rx.internal.util.*; +import rx.observables.GroupedObservable; +import rx.plugins.RxJavaHooks; +import rx.observers.Subscribers; +import rx.subscriptions.Subscriptions; + +/** + * Groups the items emitted by an Observable according to a specified criterion, and emits these + * grouped items as Observables, one Observable per group. + *

+ * + * + * @param + * the key type + * @param + * the source and group value type + * @param + * the value type of the groups + */ +public final class OperatorGroupByEvicting implements Operator, T>{ + + final Func1 keySelector; + final Func1 valueSelector; + final int bufferSize; + final boolean delayError; + final Func1, Map> mapFactory; //nullable + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public OperatorGroupByEvicting(Func1 keySelector) { + this(keySelector, (Func1)UtilityFunctions.identity(), RxRingBuffer.SIZE, false, null); + } + + public OperatorGroupByEvicting(Func1 keySelector, Func1 valueSelector) { + this(keySelector, valueSelector, RxRingBuffer.SIZE, false, null); + } + + public OperatorGroupByEvicting(Func1 keySelector, Func1 valueSelector, int bufferSize, boolean delayError, Func1, Map> mapFactory) { + this.keySelector = keySelector; + this.valueSelector = valueSelector; + this.bufferSize = bufferSize; + this.delayError = delayError; + this.mapFactory = mapFactory; + } + + @SuppressWarnings("unchecked") + @Override + public Subscriber call(Subscriber> child) { + Map> groups; + Queue> evictedGroups; + + if (mapFactory == null) { + evictedGroups = null; + groups = new ConcurrentHashMap>(); + } else { + evictedGroups = new ConcurrentLinkedQueue>(); + Action1 evictionAction = (Action1)(Action1) + new EvictionAction(evictedGroups); + try { + groups = (Map>)(Map) + mapFactory.call((Action1)(Action1) evictionAction); + } catch (Throwable ex) { + //Can reach here because mapFactory.call() may throw + Exceptions.throwOrReport(ex, child); + Subscriber parent2 = Subscribers.empty(); + parent2.unsubscribe(); + return parent2; + } + } + final GroupBySubscriber parent = new GroupBySubscriber( + child, keySelector, valueSelector, bufferSize, delayError, groups, evictedGroups); + + child.add(Subscriptions.create(new Action0() { + @Override + public void call() { + parent.cancel(); + } + })); + + child.setProducer(parent.producer); + + return parent; + } + + public static final class GroupByProducer implements Producer { + final GroupBySubscriber parent; + + public GroupByProducer(GroupBySubscriber parent) { + this.parent = parent; + } + @Override + public void request(long n) { + parent.requestMore(n); + } + } + + public static final class GroupBySubscriber + extends Subscriber { + final Subscriber> actual; + final Func1 keySelector; + final Func1 valueSelector; + final int bufferSize; + final boolean delayError; + final Map> groups; + final Queue> queue; + final GroupByProducer producer; + final Queue> evictedGroups; + + static final Object NULL_KEY = new Object(); + + final ProducerArbiter s; + + final AtomicBoolean cancelled; + + final AtomicLong requested; + + final AtomicInteger groupCount; + + Throwable error; + volatile boolean done; + + final AtomicInteger wip; + + public GroupBySubscriber(Subscriber> actual, Func1 keySelector, + Func1 valueSelector, int bufferSize, boolean delayError, Map> groups, + Queue> evictedGroups) { + this.actual = actual; + this.keySelector = keySelector; + this.valueSelector = valueSelector; + this.bufferSize = bufferSize; + this.delayError = delayError; + this.queue = new ConcurrentLinkedQueue>(); + this.s = new ProducerArbiter(); + this.s.request(bufferSize); + this.producer = new GroupByProducer(this); + this.cancelled = new AtomicBoolean(); + this.requested = new AtomicLong(); + this.groupCount = new AtomicInteger(1); + this.wip = new AtomicInteger(); + this.groups = groups; + this.evictedGroups = evictedGroups; + } + + @Override + public void setProducer(Producer s) { + this.s.setProducer(s); + } + + @Override + public void onNext(T t) { + if (done) { + return; + } + + final Queue> q = this.queue; + final Subscriber> a = this.actual; + + K key; + try { + key = keySelector.call(t); + } catch (Throwable ex) { + unsubscribe(); + errorAll(a, q, ex); + return; + } + + boolean newGroup = false; + @SuppressWarnings("unchecked") + K mapKey = key != null ? key : (K) NULL_KEY; + GroupedUnicast group = groups.get(mapKey); + if (group == null) { + // if the main has been cancelled, stop creating groups + // and skip this value + if (!cancelled.get()) { + group = GroupedUnicast.createWith(key, bufferSize, this, delayError); + groups.put(mapKey, group); + + groupCount.getAndIncrement(); + + newGroup = false; + q.offer(group); + drain(); + } else { + return; + } + } + + V v; + try { + v = valueSelector.call(t); + } catch (Throwable ex) { + unsubscribe(); + errorAll(a, q, ex); + return; + } + + group.onNext(v); + + if (evictedGroups != null) { + GroupedUnicast evictedGroup; + while ((evictedGroup = evictedGroups.poll()) != null) { + evictedGroup.onComplete(); + } + } + + if (newGroup) { + q.offer(group); + drain(); + } + } + + @Override + public void onError(Throwable t) { + if (done) { + RxJavaHooks.onError(t); + return; + } + error = t; + done = true; + groupCount.decrementAndGet(); + drain(); + } + + @Override + public void onCompleted() { + if (done) { + return; + } + + for (GroupedUnicast e : groups.values()) { + e.onComplete(); + } + groups.clear(); + if (evictedGroups != null) { + evictedGroups.clear(); + } + + done = true; + groupCount.decrementAndGet(); + drain(); + } + + public void requestMore(long n) { + if (n < 0) { + throw new IllegalArgumentException("n >= 0 required but it was " + n); + } + + BackpressureUtils.getAndAddRequest(requested, n); + drain(); + } + + public void cancel() { + // cancelling the main source means we don't want any more groups + // but running groups still require new values + if (cancelled.compareAndSet(false, true)) { + if (groupCount.decrementAndGet() == 0) { + unsubscribe(); + } + } + } + + public void cancel(K key) { + Object mapKey = key != null ? key : NULL_KEY; + if (groups.remove(mapKey) != null) { + if (groupCount.decrementAndGet() == 0) { + unsubscribe(); + } + } + } + + void drain() { + if (wip.getAndIncrement() != 0) { + return; + } + + int missed = 1; + + final Queue> q = this.queue; + final Subscriber> a = this.actual; + + for (;;) { + + if (checkTerminated(done, q.isEmpty(), a, q)) { + return; + } + + long r = requested.get(); + boolean unbounded = r == Long.MAX_VALUE; + long e = 0L; + + while (r != 0) { + boolean d = done; + + GroupedObservable t = q.poll(); + + boolean empty = t == null; + + if (checkTerminated(d, empty, a, q)) { + return; + } + + if (empty) { + break; + } + + a.onNext(t); + + r--; + e--; + } + + if (e != 0L) { + if (!unbounded) { + requested.addAndGet(e); + } + s.request(-e); + } + + missed = wip.addAndGet(-missed); + if (missed == 0) { + break; + } + } + } + + void errorAll(Subscriber> a, Queue q, Throwable ex) { + q.clear(); + List> list = new ArrayList>(groups.values()); + groups.clear(); + if (evictedGroups != null) { + evictedGroups.clear(); + } + + for (GroupedUnicast e : list) { + e.onError(ex); + } + + a.onError(ex); + } + + boolean checkTerminated(boolean d, boolean empty, + Subscriber> a, Queue q) { + if (d) { + Throwable err = error; + if (err != null) { + errorAll(a, q, err); + return true; + } else + if (empty) { + actual.onCompleted(); + return true; + } + } + return false; + } + } + + static class EvictionAction implements Action1> { + + final Queue> evictedGroups; + + EvictionAction(Queue> evictedGroups) { + this.evictedGroups = evictedGroups; + } + + @Override + public void call(GroupedUnicast group) { + evictedGroups.offer(group); + } + } + + static final class GroupedUnicast extends GroupedObservable { + + public static GroupedUnicast createWith(K key, int bufferSize, GroupBySubscriber parent, boolean delayError) { + State state = new State(bufferSize, parent, key, delayError); + return new GroupedUnicast(key, state); + } + + final State state; + + protected GroupedUnicast(K key, State state) { + super(key, state); + this.state = state; + } + + public void onNext(T t) { + state.onNext(t); + } + + public void onError(Throwable e) { + state.onError(e); + } + + public void onComplete() { + state.onComplete(); + } + } + + static final class State extends AtomicInteger implements Producer, Subscription, OnSubscribe { + /** */ + private static final long serialVersionUID = -3852313036005250360L; + + final K key; + final Queue queue; + final GroupBySubscriber parent; + final boolean delayError; + + final AtomicLong requested; + + volatile boolean done; + Throwable error; + + final AtomicBoolean cancelled; + + final AtomicReference> actual; + + final AtomicBoolean once; + + + public State(int bufferSize, GroupBySubscriber parent, K key, boolean delayError) { + this.queue = new ConcurrentLinkedQueue(); + this.parent = parent; + this.key = key; + this.delayError = delayError; + this.cancelled = new AtomicBoolean(); + this.actual = new AtomicReference>(); + this.once = new AtomicBoolean(); + this.requested = new AtomicLong(); + } + + @Override + public void request(long n) { + if (n < 0) { + throw new IllegalArgumentException("n >= required but it was " + n); + } + if (n != 0L) { + BackpressureUtils.getAndAddRequest(requested, n); + drain(); + } + } + + @Override + public boolean isUnsubscribed() { + return cancelled.get(); + } + + @Override + public void unsubscribe() { + if (cancelled.compareAndSet(false, true)) { + if (getAndIncrement() == 0) { + parent.cancel(key); + } + } + } + + @Override + public void call(Subscriber s) { + if (once.compareAndSet(false, true)) { + s.add(this); + s.setProducer(this); + actual.lazySet(s); + drain(); + } else { + s.onError(new IllegalStateException("Only one Subscriber allowed!")); + } + } + + public void onNext(T t) { + if (t == null) { + error = new NullPointerException(); + done = true; + } else { + queue.offer(NotificationLite.next(t)); + } + drain(); + } + + public void onError(Throwable e) { + error = e; + done = true; + drain(); + } + + public void onComplete() { + done = true; + drain(); + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + int missed = 1; + + final Queue q = queue; + final boolean delayError = this.delayError; + Subscriber a = actual.get(); + for (;;) { + if (a != null) { + if (checkTerminated(done, q.isEmpty(), a, delayError)) { + return; + } + + long r = requested.get(); + boolean unbounded = r == Long.MAX_VALUE; + long e = 0; + + while (r != 0L) { + boolean d = done; + Object v = q.poll(); + boolean empty = v == null; + + if (checkTerminated(d, empty, a, delayError)) { + return; + } + + if (empty) { + break; + } + + a.onNext(NotificationLite.getValue(v)); + + r--; + e--; + } + + if (e != 0L) { + if (!unbounded) { + requested.addAndGet(e); + } + parent.s.request(-e); + } + } + + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + if (a == null) { + a = actual.get(); + } + } + } + + boolean checkTerminated(boolean d, boolean empty, Subscriber a, boolean delayError) { + if (cancelled.get()) { + queue.clear(); + parent.cancel(key); + return true; + } + + if (d) { + if (delayError) { + if (empty) { + Throwable e = error; + if (e != null) { + a.onError(e); + } else { + a.onCompleted(); + } + return true; + } + } else { + Throwable e = error; + if (e != null) { + queue.clear(); + a.onError(e); + return true; + } else + if (empty) { + a.onCompleted(); + return true; + } + } + } + + return false; + } + } +} diff --git a/src/test/java/rx/internal/operators/OperatorGroupByTest.java b/src/test/java/rx/internal/operators/OperatorGroupByTest.java index 7fd6ab0a2e..b9c2bc6ece 100644 --- a/src/test/java/rx/internal/operators/OperatorGroupByTest.java +++ b/src/test/java/rx/internal/operators/OperatorGroupByTest.java @@ -39,8 +39,10 @@ import rx.functions.*; import rx.internal.util.*; import rx.observables.GroupedObservable; +import rx.observers.AssertableSubscriber; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; public class OperatorGroupByTest { @@ -2047,4 +2049,230 @@ public Observable call(GroupedObservable v) { assertTrue("" + c, c > 0); assertTrue("" + c, c < 10000); } + + @Test + public void groupByEvictingMapFactoryThrows() { + final RuntimeException ex = new RuntimeException("boo"); + Func1, Map> evictingMapFactory = // + new Func1, Map>() { + + @Override + public Map call(final Action1 notify) { + throw ex; + } + }; + Observable.just(1) + .groupBy(UtilityFunctions.identity(), UtilityFunctions.identity(), 16, true, evictingMapFactory) + .test() + .assertNoValues() + .assertError(ex); + } + + @Test + public void groupByEvictingMapFactoryExpiryCompletesGroupedFlowable() { + final List completed = new CopyOnWriteArrayList(); + Func1, Map> evictingMapFactory = createEvictingMapFactorySynchronousOnly(1); + PublishSubject subject = PublishSubject.create(); + AssertableSubscriber ts = subject + .groupBy(UtilityFunctions.identity(), UtilityFunctions.identity(), 16, true, evictingMapFactory) + .flatMap(addCompletedKey(completed)) + .test(); + subject.onNext(1); + subject.onNext(2); + subject.onNext(3); + ts.assertValues(1, 2, 3) + .assertNoTerminalEvent(); + assertEquals(Arrays.asList(1, 2), completed); + //ensure coverage of the code that clears the evicted queue + subject.onCompleted(); + ts.assertCompleted(); + ts.assertValueCount(3); + } + + private static final Func1 mod5 = new Func1() { + + @Override + public Integer call(Integer n) { + return n % 5; + } + }; + + @Test + public void groupByEvictingMapFactoryWithExpiringGuavaCacheDemonstrationCodeForUseInJavadoc() { + //javadoc will be a version of this using lambdas and without assertions + final List completed = new CopyOnWriteArrayList(); + //size should be less than 5 to notice the effect + Func1, Map> evictingMapFactory = createEvictingMapFactoryGuava(3); + int numValues = 1000; + Observable.range(1, numValues) + .groupBy(mod5, UtilityFunctions.identity(), 16, true, evictingMapFactory) + .flatMap(addCompletedKey(completed)) + .test() + .assertCompleted() + .assertValueCount(numValues); + //the exact eviction behaviour of the guava cache is not specified so we make some approximate tests + assertTrue(completed.size() > numValues * 0.9); + } + + @Test + public void groupByEvictingMapFactoryEvictionQueueClearedOnErrorCoverageOnly() { + Func1, Map> evictingMapFactory = createEvictingMapFactorySynchronousOnly(1); + PublishSubject subject = PublishSubject.create(); + AssertableSubscriber ts = subject + .groupBy(UtilityFunctions.identity(), UtilityFunctions.identity(), 16, true, evictingMapFactory) + .flatMap(new Func1, Observable>() { + @Override + public Observable call(GroupedObservable g) { + return g; + } + }) + .test(); + RuntimeException ex = new RuntimeException(); + //ensure coverage of the code that clears the evicted queue + subject.onError(ex); + ts.assertNoValues() + .assertError(ex); + } + + private static Func1, Observable> addCompletedKey( + final List completed) { + return new Func1, Observable>() { + @Override + public Observable call(final GroupedObservable g) { + return g.doOnCompleted(new Action0() { + @Override + public void call() { + completed.add(g.getKey()); + } + }); + } + }; + } + + //not thread safe + private static final class SingleThreadEvictingHashMap implements Map { + + private final List list = new ArrayList(); + private final Map map = new HashMap(); + private final int maxSize; + private final Action1 evictedListener; + + SingleThreadEvictingHashMap(int maxSize, Action1 evictedListener) { + this.maxSize = maxSize; + this.evictedListener = evictedListener; + } + + @Override + public int size() { + return map.size(); + } + + @Override + public boolean isEmpty() { + return map.isEmpty(); + } + + @Override + public boolean containsKey(Object key) { + return map.containsKey(key); + } + + @Override + public boolean containsValue(Object value) { + return map.containsValue(value); + } + + @Override + public V get(Object key) { + return map.get(key); + } + + @Override + public V put(K key, V value) { + list.remove(key); + V v; + if (maxSize > 0 && list.size() == maxSize) { + //remove first + K k = list.get(0); + list.remove(0); + v = map.remove(k); + } else { + v = null; + } + list.add(key); + V result = map.put(key, value); + if (v != null) { + evictedListener.call(v); + } + return result; + } + + @Override + public V remove(Object key) { + list.remove(key); + return map.remove(key); + } + + @Override + public void putAll(Map m) { + for (Entry entry: m.entrySet()) { + put(entry.getKey(), entry.getValue()); + } + } + + @Override + public void clear() { + list.clear(); + map.clear(); + } + + @Override + public Set keySet() { + return map.keySet(); + } + + @Override + public Collection values() { + return map.values(); + } + + @Override + public Set> entrySet() { + return map.entrySet(); + } + } + + private static Func1, Map> createEvictingMapFactoryGuava(final int maxSize) { + Func1, Map> evictingMapFactory = // + new Func1, Map>() { + + @Override + public Map call(final Action1 notify) { + return CacheBuilder.newBuilder() // + .maximumSize(maxSize) // + .removalListener(new RemovalListener() { + @Override + public void onRemoval(RemovalNotification notification) { + notify.call(notification.getValue()); + }}) + . build() + .asMap(); + }}; + return evictingMapFactory; + } + + private static Func1, Map> createEvictingMapFactorySynchronousOnly(final int maxSize) { + Func1, Map> evictingMapFactory = // + new Func1, Map>() { + + @Override + public Map call(final Action1 notify) { + return new SingleThreadEvictingHashMap(maxSize, new Action1() { + @Override + public void call(Object object) { + notify.call(object); + }}); + }}; + return evictingMapFactory; + } }