Skip to content

Commit

Permalink
Write LOADED to event journal on map load
Browse files Browse the repository at this point in the history
- Extend `MapEventJournal` with `writeLoadEvent`
- Extend `RecordStoreMutationObserver` with `onLoadRecord`
- Call `mutationObserver.onLoadRecord()` from `DefaultRecordStore.loadRecordOrNull()`
- Call either `onLoadRecord()` or `onPutRecord()` from `DefaultRecordStore.putFromLoadInternal()` based on `DefaultRecordStore.canPublishLoadEvent()`
- Update event journal tests

Implements hazelcast#13667
  • Loading branch information
blazember committed Sep 3, 2018
1 parent ac02477 commit f57849a
Show file tree
Hide file tree
Showing 16 changed files with 287 additions and 15 deletions.
Expand Up @@ -58,7 +58,9 @@ protected CacheManager createCacheManager() {

@After
public final void terminate() {
factory.terminateAll();
if (factory != null) {
factory.terminateAll();
}
HazelcastClientManager.shutdownAll();
}
}
Expand Up @@ -63,7 +63,7 @@
import com.hazelcast.map.impl.record.Record;
import com.hazelcast.map.impl.record.RecordComparator;
import com.hazelcast.map.impl.recordstore.DefaultRecordStore;
import com.hazelcast.map.impl.recordstore.EventJournalUpdaterRecordStoreMutationObserver;
import com.hazelcast.map.impl.recordstore.EventJournalWriterRecordStoreMutationObserver;
import com.hazelcast.map.impl.recordstore.RecordStore;
import com.hazelcast.map.impl.recordstore.RecordStoreMutationObserver;
import com.hazelcast.map.listener.MapPartitionLostListener;
Expand Down Expand Up @@ -873,7 +873,7 @@ public Collection<RecordStoreMutationObserver<Record>> createRecordStoreMutation

private void addEventJournalUpdaterObserver(Collection<RecordStoreMutationObserver<Record>> observers, String mapName, int
partitionId) {
RecordStoreMutationObserver<Record> observer = new EventJournalUpdaterRecordStoreMutationObserver(getEventJournal(),
RecordStoreMutationObserver<Record> observer = new EventJournalWriterRecordStoreMutationObserver(getEventJournal(),
getMapContainer(mapName), partitionId);
observers.add(observer);
}
Expand Down
Expand Up @@ -105,6 +105,20 @@ void writeAddEvent(EventJournalConfig journalConfig, ObjectNamespace namespace,
*/
void writeEvictEvent(EventJournalConfig journalConfig, ObjectNamespace namespace, int partitionId, Data key, Object value);

/**
* Writes an {@link com.hazelcast.core.EntryEventType#LOADED} to the event journal.
* If there is no event journal configured for this map, the method will do nothing.
* If an event is added to the event journal, all parked operations waiting for
* new events on that journal will be unparked.
*
* @param journalConfig the event journal config for the map in which the event occurred
* @param namespace the map namespace
* @param partitionId the entry key partition
* @param key the entry key
* @param value the entry value
*/
void writeLoadEvent(EventJournalConfig journalConfig, ObjectNamespace namespace, int partitionId, Data key, Object value);

/**
* Returns {@code true} if the object has a configured and enabled event journal.
*
Expand Down
Expand Up @@ -38,6 +38,7 @@

import static com.hazelcast.core.EntryEventType.ADDED;
import static com.hazelcast.core.EntryEventType.EVICTED;
import static com.hazelcast.core.EntryEventType.LOADED;
import static com.hazelcast.core.EntryEventType.REMOVED;
import static com.hazelcast.core.EntryEventType.UPDATED;

Expand Down Expand Up @@ -83,6 +84,12 @@ public void writeEvictEvent(EventJournalConfig journalConfig, ObjectNamespace na
addToEventRingbuffer(journalConfig, namespace, partitionId, EVICTED, key, value, null);
}

@Override
public void writeLoadEvent(EventJournalConfig journalConfig, ObjectNamespace namespace, int partitionId, Data key,
Object value) {
addToEventRingbuffer(journalConfig, namespace, partitionId, LOADED, key, null, value);
}

@Override
public long newestSequence(ObjectNamespace namespace, int partitionId) {
return getRingbufferOrFail(namespace, partitionId).tailSequence();
Expand Down
Expand Up @@ -73,6 +73,13 @@ public void onEvictRecord(Data key, R record) {
}
}

@Override
public void onLoadRecord(Data key, R record) {
for (RecordStoreMutationObserver<R> mutationObserver : mutationObservers) {
mutationObserver.onLoadRecord(key, record);
}
}

@Override
public void onDestroy(boolean internal) {
for (RecordStoreMutationObserver<R> mutationObserver : mutationObservers) {
Expand Down
Expand Up @@ -329,7 +329,7 @@ public Record loadRecordOrNull(Data key, boolean backup, Address callerAddress)
if (value != null) {
record = createRecord(value, DEFAULT_TTL, DEFAULT_MAX_IDLE, getNow());
storage.put(key, record);
mutationObserver.onPutRecord(key, record);
mutationObserver.onLoadRecord(key, record);
if (!backup) {
saveIndex(record, null);
mapEventPublisher.publishEvent(callerAddress, name, EntryEventType.LOADED,
Expand Down Expand Up @@ -917,7 +917,11 @@ private Object putFromLoadInternal(Data key, Object value, long ttl, long maxIdl
value = mapServiceContext.interceptPut(name, null, value);
record = createRecord(value, ttl, maxIdle, now);
storage.put(key, record);
mutationObserver.onPutRecord(key, record);
if (canPublishLoadEvent()) {
mutationObserver.onLoadRecord(key, record);
} else {
mutationObserver.onPutRecord(key, record);
}
} else {
oldValue = record.getValue();
value = mapServiceContext.interceptPut(name, oldValue, value);
Expand Down
Expand Up @@ -23,14 +23,14 @@
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.ObjectNamespace;

public class EventJournalUpdaterRecordStoreMutationObserver implements RecordStoreMutationObserver {
public class EventJournalWriterRecordStoreMutationObserver implements RecordStoreMutationObserver {
private final MapEventJournal eventJournal;
private final int partitionId;
private final EventJournalConfig eventJournalConfig;
private final ObjectNamespace objectNamespace;

public EventJournalUpdaterRecordStoreMutationObserver(MapEventJournal eventJournal, MapContainer mapContainer,
int partitionId) {
public EventJournalWriterRecordStoreMutationObserver(MapEventJournal eventJournal, MapContainer mapContainer,
int partitionId) {
this.eventJournal = eventJournal;
this.partitionId = partitionId;
this.eventJournalConfig = mapContainer.getEventJournalConfig();
Expand Down Expand Up @@ -68,6 +68,11 @@ public void onEvictRecord(Data key, Record record) {
eventJournal.writeEvictEvent(eventJournalConfig, objectNamespace, partitionId, record.getKey(), record.getValue());
}

@Override
public void onLoadRecord(Data key, Record record) {
eventJournal.writeLoadEvent(eventJournalConfig, objectNamespace, partitionId, record.getKey(), record.getValue());
}

@Override
public void onDestroy(boolean internal) {
if (!internal) {
Expand Down
Expand Up @@ -73,6 +73,14 @@ public interface RecordStoreMutationObserver<R extends Record> {
*/
void onEvictRecord(Data key, R record);

/**
* Called when a record is loaded into the observed {@link RecordStore}
*
* @param key The key of the record
* @param record The record
*/
void onLoadRecord(Data key, R record);

/**
* Called when the observed {@link RecordStore} is being destroyed.
* The observer should release all resources. The implementations of
Expand Down
Expand Up @@ -95,6 +95,24 @@ public void receiveExpirationEventsWhenPutOnExpiringStructure() {
// not tested
}

@Override
@Ignore
public void receiveLoadedEventsWhenLoad() {
// not tested
}

@Override
@Ignore
public void receiveLoadedEventsWhenLoadAll() {
// not tested
}

@Override
@Ignore
public void receiveAddedEventsWhenLoadAll() {
// not tested
}

protected CacheManager createCacheManager() {
CachingProvider cachingProvider = HazelcastServerCachingProvider.createCachingProvider(getRandomInstance());
return cachingProvider.getCacheManager();
Expand Down
Expand Up @@ -67,6 +67,16 @@ public V put(K key, V value, long ttl, TimeUnit timeunit) {
return cache.getAndPut(key, value, new HazelcastExpiryPolicy(ttl, ttl, ttl, timeunit));
}

@Override
public void load(K key) {
throw new UnsupportedOperationException();
}

@Override
public void loadAll(Set<K> keys) {
throw new UnsupportedOperationException();
}

@Override
public ObjectNamespace getNamespace() {
return CacheService.getObjectNamespace(cache.getPrefixedName());
Expand Down

0 comments on commit f57849a

Please sign in to comment.