Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Committing/rolling-back already committed transaction after one of node is killed #12705

Closed
acieplak opened this issue Mar 23, 2018 · 2 comments · Fixed by #13499
Closed

Committing/rolling-back already committed transaction after one of node is killed #12705

acieplak opened this issue Mar 23, 2018 · 2 comments · Fixed by #13499
Assignees
Milestone

Comments

@acieplak
Copy link

Hello,
Recently I have discovered some issue with MapInterceptor and TransactionContext in a multi-node environment.
Basically, here are steps to reproduce:

  1. Start one hazelcast instance
  2. Start a second hazelcast instance
  3. Submit a task to both members where I create some object and add them in a transactional context
  4. Commit the changes and wait until propagated to IMap and wrote to MapStore
  5. Kill one of the nodes

The problem:
After killing the node - the second one try to recover data and invoke MapStore#storeAll with inconsistent data.

Hazelcast version: 3.9.3
Java version: 1.8

Here is my hazelcast configuration:

public class Hazelcast {
    private static final Logger log = LoggerFactory.getLogger(Hazelcast.class);

    public static final String MAP_NAME = "DummyMap";
    public static final String EXECUTOR_SERVICE = "DummyExecutorService";

    public static HazelcastInstance createHazelcastInstance() {
        HazelcastInstance hazelcastInstance = newHazelcastInstance(configuration());
        log.info("Started Hazelcast Node");
        return hazelcastInstance;
    }

    private static Config configuration() {
        return new Config()
                .setProperty("hazelcast.logging.type", "slf4j")
                .addMapConfig(new MapConfig(MAP_NAME)
                        .setInMemoryFormat(InMemoryFormat.BINARY)
                        .setReadBackupData(true)
                        .setBackupCount(1)
                        .setMapStoreConfig(mapStoreConfig())
                ).addListenerConfig(mapInterceptorConfiguration());
    }

    private static ListenerConfig mapInterceptorConfiguration() {
        DistributedObjectListener registerer = new DistributedObjectListener() {
            private String interceptorId;

            @Override
            public void distributedObjectCreated(DistributedObjectEvent event) {
                DistributedObject distributedObject = event.getDistributedObject();
                if (distributedObject instanceof IMap) {
                    IMap cache = (IMap) distributedObject;
                    interceptorId = cache.addInterceptor(new HazelcastMapInterceptor());
                }
            }

            @Override
            public void distributedObjectDestroyed(DistributedObjectEvent event) {
                DistributedObject distributedObject = event.getDistributedObject();
                if (distributedObject instanceof IMap) {
                    IMap cache = (IMap) distributedObject;
                    cache.removeInterceptor(interceptorId);
                }
            }
        };
        return new ListenerConfig().setImplementation(registerer);
    }

    private static MapStoreConfig mapStoreConfig() {
        return new MapStoreConfig()
                .setWriteBatchSize(1000)
                .setWriteDelaySeconds(1)
                .setEnabled(true)
                .setImplementation(new HazelcastCacheStore());
    }
}

I have Dummy object which is simple POJO with two fields - key (used in map as key) and dateTime:

public class DummyObject {
    private Long key;
    private LocalDateTime dateTime;
    .....
}

MapInterceptor main responsibility is to interceptPut as given:

@Override
public Object interceptPut(Object oldValue, Object newValue) {
    if (newValue instanceof DummyObject) {
        ((DummyObject) newValue).setDateTime(now());
        log.info("trigger for object: {}", newValue);
    }
    return newValue;
}

Moreover, I've created MapStore with simple ConcurrentMap as a data store (to simplify the case). The store has a constraint on DummyObject.dateTime - cannot be null:

public class HazelcastCacheStore implements MapStore<Long, DummyObject> {

    private final Map<Long, DummyObject> dummyStore = synchronizedMap(new LinkedHashMap<>());

    private static void checkDummyObject(DummyObject target) {
        if (target.getDateTime() == null) {
            log.info("dateTime can't be null for object: {}", target);
        }
    }

    @Override
    public void storeAll(Map<Long, DummyObject> map) {
        log.info("try to store object: {}", printDummy(map.values()));
        for (DummyObject target : map.values()) {
            checkDummyObject(target);
        }
        dummyStore.putAll(map);
        log.info("store contains: {}", printDummy(dummyStore.values()));
    }

    .....
}

My main test method:

public static void main(String[] args) throws Exception {
    HazelcastInstance hazelcast = createHazelcastInstance();
    IMap<Long, DummyObject> map = hazelcast.getMap(MAP_NAME);

    Map<Member, Future<Void>> memberFutureMap = hazelcast
        .getExecutorService(EXECUTOR_SERVICE)
        .submitToAllMembers(new TransactionalPut());
    for (Future<Void> future : memberFutureMap.values()) {
        future.get();
    }
    
    sleep(5_000);
    log.info("IMap after execution: {}", printDummy(map));
    System.exit(0);
}

Additionally, I have one more node already started in the cluster.

The TransactionalPut task is defined as follow:

public class TransactionalPut implements Callable<Void>, Serializable, HazelcastInstanceAware {

    private static final Logger log = LoggerFactory.getLogger(TransactionalPut.class);
    private static final ThreadLocal<TransactionContext> context = new ThreadLocal<>();

    private transient HazelcastInstance hazelcastInstance;

    @Override
    public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
        this.hazelcastInstance = hazelcastInstance;
    }

    @Override
    public Void call() throws Exception {
        Set<DummyObject> dummyObjects = createDummyObjects();

        log.info("beginning transaction for objects: {}", printDummy(dummyObjects));
        context.set(hazelcastInstance.newTransactionContext(transactionOptions()));
        context.get().beginTransaction();
        TransactionalMap<Long, DummyObject> map = context.get().getMap(MAP_NAME);

        try {
            dummyObjects.forEach(d -> map.put(d.getKey(), d));
            log.info("committing transaction for object: {}", printDummy(dummyObjects));
            context.get().commitTransaction();

            return null;
        } catch (Exception e) {
            log.info("rolling back transaction for object: {}", printDummy(dummyObjects));
            context.get().rollbackTransaction();
            throw e;
        }
    }

    private static TransactionOptions transactionOptions() {
        return new TransactionOptions()
                .setTransactionType(TWO_PHASE)
                .setDurability(2)
                .setTimeout(1, HOURS);
    }
}

Summary:
Just before node was killed I see that on the map I have correct values.
After the shutdown of a node, I've got logs form TransactionManagerService that said Committing/rolling-back alive transactions of Member...
I guess it should not take place because I've already committed this transaction and as a proof, I can see correct values in IMap.
Moreover, hazelcast just after the node was killed try to store values without changes applied by MapInterceptor (DummyObject.dateTime is null).
This only happens if I use transaction - without it the map is in the expected state.

@acieplak
Copy link
Author

Hey,
Any lead on this one?
I guess this might be quite important cause it might put backups in an inconsistent state according to cache so if there is something confusing in my description please ask and I will try to explain it more precisely.
Regards

@acieplak
Copy link
Author

acieplak commented Apr 4, 2018

Hello again,
There is still no answer so maybe @ahmetmircik (as you're first on hazelcast people list) could look at this one?
Regards

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants