Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timeout on parallel subscribe two consumers #996

Open
philmu opened this issue Sep 29, 2023 · 3 comments
Open

Timeout on parallel subscribe two consumers #996

philmu opened this issue Sep 29, 2023 · 3 comments
Labels
defect Suspected defect such as a bug or regression

Comments

@philmu
Copy link

philmu commented Sep 29, 2023

Observed behavior

When parallel subscribe in different threads two consumer on local idle nats, i received sometimes an IOException:Timeout after two seconds.

Expected behavior

No timeout

Server and client version

Server: 2.10.1
Client: jnats 2.16.14

Host environment

Xubuntu 22.04.3 6.2.0-33-generic x86_64

Steps to reproduce

After a few iteration with my testcode:

Iteration: 0
Stream [1] established.
Stream [0] established.
Iteration: 1
Stream [0] established.
Stream [1] established.
Iteration: 2
Stream [0] established.
Stream [1] established.
Iteration: 3
Stream [1] established.
Stream [0] not established.
java.io.IOException: Timeout or no response waiting for NATS JetStream server
	at io.nats.client.impl.NatsJetStreamImpl.responseRequired(NatsJetStreamImpl.java:216)
	at io.nats.client.impl.NatsJetStreamImpl.makeRequestResponseRequired(NatsJetStreamImpl.java:200)
	at io.nats.client.impl.NatsJetStreamImpl._getStreamNames(NatsJetStreamImpl.java:151)
	at io.nats.client.impl.NatsJetStreamImpl.lookupStreamBySubject(NatsJetStreamImpl.java:191)
	at io.nats.client.impl.NatsJetStream.createSubscription(NatsJetStream.java:301)
	at io.nats.client.impl.NatsJetStream.subscribe(NatsJetStream.java:563)
	at NatsTimeoutOnParallelSubscribe.lambda$0(NatsTimeoutOnParallelSubscribe.java:54)
	at java.base/java.lang.Thread.run(Thread.java:833)
Error on subscribe

After some debugging the problem is an timout in https://github.com/nats-io/nats.java/blob/2.16.14/src/main/java/io/nats/client/impl/NatsConnection.java#L1098

I think in the block https://github.com/nats-io/nats.java/blob/2.16.14/src/main/java/io/nats/client/impl/NatsConnection.java#L1161 seems to be exists an multithreading-problem

Pragmatic way: After synchronize the part, the timeout-xpt is gone:

synchronized (this){
        if (inboxDispatcher.get() == null) {
            NatsDispatcher d = new NatsDispatcher(this, this::deliverReply);

            if (inboxDispatcher.compareAndSet(null, d)) {
                String id = this.nuid.next();
                this.dispatchers.put(id, d);
                d.start(id);
                d.subscribe(this.mainInbox);
            }
        }
     }

Testcode:

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import io.nats.client.Connection;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.api.StorageType;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.api.StreamInfo;

public class NatsTimeoutOnParallelSubscribe {

    private final static String NATS_SERVER = "nats://localhost:4222";
    private final static String NATS_TOKEN = "KxX1FqaJ";

    private final static String STREAM_NAME = "NatsTimeout";
    private final static String STREAM_SUBJECT = "test";

    public static void main(String[] args) throws IOException, JetStreamApiException, InterruptedException {
	NatsTimeoutOnParallelSubscribe nt = new NatsTimeoutOnParallelSubscribe();
	nt.createStream();
	int i = 0;
	while (true) {
	    System.out.printf("Iteration: %d\n", i++);
	    nt.parallelSubscribe();
	    Thread.sleep(1000);
	}
    }

    private void parallelSubscribe() throws IOException, JetStreamApiException, InterruptedException {
	final Connection nastCon = getConnection();
	final JetStream jetStream = nastCon.jetStream();
	final int streams = 2;
	CountDownLatch cdl = new CountDownLatch(streams);

	for (int i = 0; i < streams; i++) {
	    final int c = i;
	    new Thread(() -> {
		try {
		    jetStream.subscribe(STREAM_SUBJECT);
		    cdl.countDown();
		    System.out.printf("Stream [%d] established.\n", c);
		} catch (Exception e) {
                    System.err.printf("Stream [%d] not established.\n", c);
		    e.printStackTrace();
		}
	    }).start();
	}
	if (!cdl.await(3, TimeUnit.SECONDS)) {
	    System.err.printf("Error on subscribe\n");
	    System.exit(1);
	}

	nastCon.close();
    }

    private void createStream() throws IOException, JetStreamApiException, InterruptedException {
	final Connection nastCon = getConnection();
	JetStreamManagement jsm = nastCon.jetStreamManagement();
	List<StreamInfo> currentStreams = jsm.getStreams();

	Optional<StreamInfo> oldStream = currentStreams.stream()
		.filter(si -> si.getConfiguration().getName().equals(STREAM_NAME)).findFirst();

	if (oldStream.isPresent()) {
	    System.out.printf("use existing stream\n");
	} else {
	    StreamInfo info = jsm.addStream(StreamConfiguration.builder().name(STREAM_NAME)
		    .storageType(StorageType.Memory).subjects(STREAM_SUBJECT).description("Test Stream").build());
	    System.out.printf("stream created: %s\n", info);
	}

	nastCon.close();
    }

    private Connection getConnection() {

	io.nats.client.Options.Builder builder = new Options.Builder().server(NATS_SERVER)
		.token(NATS_TOKEN.toCharArray());
	try {
	    return Nats.connect(builder.build());
	} catch (IllegalStateException | IOException | InterruptedException e) {
	    throw new RuntimeException(e);
	}
    }
}

@philmu philmu added the defect Suspected defect such as a bug or regression label Sep 29, 2023
@scottf
Copy link
Contributor

scottf commented Sep 29, 2023

Is this repeatable against a local, non-cluster server? This seems unusual, the subscribe request reply should take milliseconds. not accounting for things like network latency, server load.

@philmu
Copy link
Author

philmu commented Sep 30, 2023

Yes, this issue comes from my local (non-cluster) machine.
I think this is not a server side problem, but rather a multithreading problem on clientside. With the synchronize block the issue is gone.

@scottf
Copy link
Contributor

scottf commented Sep 30, 2023

I'm very concerned about putting a synchronize block around that code since it does not explain why there is a timeout, even if the problem appears to go away when you sync it. If there is an issue, I think there is another reason to be figured out. Will have to look into this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

No branches or pull requests

2 participants