diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/mapstore/writebehind/WriteBehindStore.java b/hazelcast/src/main/java/com/hazelcast/map/impl/mapstore/writebehind/WriteBehindStore.java index 6e57dddab324..2286ad0ff390 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/mapstore/writebehind/WriteBehindStore.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/mapstore/writebehind/WriteBehindStore.java @@ -182,18 +182,19 @@ public Object add(Data key, Object value, @Override public void addForcibly(DelayedEntry delayedEntry) { + delayedEntry.setSequence(sequence.incrementAndGet()); writeBehindQueue.addLast(delayedEntry, true); stagingArea.put(delayedEntry.getKey(), delayedEntry); - delayedEntry.setSequence(sequence.incrementAndGet()); } public void add(DelayedEntry delayedEntry) { + delayedEntry.setSequence(sequence.incrementAndGet()); + writeBehindQueue.addLast(delayedEntry, false); stagingArea.put(delayedEntry.getKey(), delayedEntry); - delayedEntry.setSequence(sequence.incrementAndGet()); } @Override diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/mapstore/writebehind/entry/AddedDelayedEntry.java b/hazelcast/src/main/java/com/hazelcast/map/impl/mapstore/writebehind/entry/AddedDelayedEntry.java index 2ba4ea0ee040..a55d19bc76a8 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/mapstore/writebehind/entry/AddedDelayedEntry.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/mapstore/writebehind/entry/AddedDelayedEntry.java @@ -32,7 +32,7 @@ class AddedDelayedEntry implements DelayedEntry { private final long expirationTime; private final int partitionId; private long storeTime; - private long sequence; + private volatile long sequence; AddedDelayedEntry(K key, V value, long expirationTime, long storeTime, int partitionId) { this.key = key; diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/mapstore/writebehind/entry/DeletedDelayedEntry.java b/hazelcast/src/main/java/com/hazelcast/map/impl/mapstore/writebehind/entry/DeletedDelayedEntry.java index 081a8bab4f2b..48214ed0cb01 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/mapstore/writebehind/entry/DeletedDelayedEntry.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/mapstore/writebehind/entry/DeletedDelayedEntry.java @@ -32,7 +32,7 @@ class DeletedDelayedEntry implements DelayedEntry { private final K key; private final int partitionId; private long storeTime; - private long sequence; + private volatile long sequence; private UUID txnId; DeletedDelayedEntry(K key, long storeTime, int partitionId) { diff --git a/hazelcast/src/test/java/com/hazelcast/map/impl/mapstore/writebehind/WriteBehindStoreTest.java b/hazelcast/src/test/java/com/hazelcast/map/impl/mapstore/writebehind/WriteBehindStoreTest.java new file mode 100644 index 000000000000..96883aa823f4 --- /dev/null +++ b/hazelcast/src/test/java/com/hazelcast/map/impl/mapstore/writebehind/WriteBehindStoreTest.java @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hazelcast.map.impl.mapstore.writebehind; + +import com.hazelcast.internal.serialization.Data; +import com.hazelcast.internal.serialization.InternalSerializationService; +import com.hazelcast.internal.serialization.impl.HeapData; +import com.hazelcast.map.impl.mapstore.MapStoreContext; +import com.hazelcast.map.impl.mapstore.writebehind.entry.DelayedEntries; +import com.hazelcast.map.impl.mapstore.writebehind.entry.DelayedEntry; +import com.hazelcast.test.HazelcastParallelClassRunner; +import com.hazelcast.test.annotation.ParallelJVMTest; +import com.hazelcast.test.annotation.QuickTest; +import com.hazelcast.test.starter.ReflectionUtils; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.io.Serializable; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.CALLS_REAL_METHODS; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(HazelcastParallelClassRunner.class) +@Category({QuickTest.class, ParallelJVMTest.class}) +public class WriteBehindStoreTest { + + private MapStoreContext mapStoreContext = mock(MapStoreContext.class, RETURNS_DEEP_STUBS); + private WriteBehindProcessor writeBehindProcessor = mock(WriteBehindProcessor.class, RETURNS_DEEP_STUBS); + + @Test + public void shouldNotChangeSequenceAfterAddingToQueue() throws IllegalAccessException { + // given + when(mapStoreContext.getMapServiceContext().getNodeEngine().getSerializationService()).thenReturn(mock(InternalSerializationService.class)); + WriteBehindStore store = new WriteBehindStore(mapStoreContext, 1, writeBehindProcessor); + DummyQueue queue = mock(DummyQueue.class, CALLS_REAL_METHODS); + ReflectionUtils.setFieldValueReflectively(store, "writeBehindQueue", queue); + DelayedEntry delayedEntry = DelayedEntries.newAddedDelayedEntry(mock(HeapData.class), new Entry(1, 1), 0, 0, 1, null); + assertEquals(delayedEntry.getSequence(), 0L); + + // when + store.add(delayedEntry); + + // then + assertEquals(delayedEntry.getSequence(), 1L); + assertEquals(queue.getSequence(), 1L); + + } + + private static final class Entry implements Serializable { + + private int id; + private int version; + + Entry() { + // serialization + } + + Entry(int id, int version) { + this.id = id; + this.version = version; + } + + Entry newVersion() { + return new Entry(id, version + 1); + } + + } + + abstract static class DummyQueue implements WriteBehindQueue { + + long sequence = 0; + + public long getSequence() { + return sequence; + } + + @Override + public void addLast(Object o, boolean addWithoutCapacityCheck) { + if (o instanceof DelayedEntry) { + sequence = ((DelayedEntry) o).getSequence(); + } + } + + + } +}