Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

how to handle a forvever reader for a client unless the client is closed by program #1148

Open
someview opened this issue Dec 25, 2023 · 1 comment

Comments

@someview
Copy link

client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
	log.Fatal(err)
}

defer client.Close()

reader, err := client.CreateReader(pulsar.ReaderOptions{
	Topic:          "topic-1",
	StartMessageID: pulsar.EarliestMessageID(),
})
if err != nil {
	log.Fatal(err)
}
defer reader.Close()

for reader.HasNext() {
	msg, err := reader.Next(context.Background())
	if err != nil {
		log.Fatal(err)
	}

	fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
		msg.ID(), string(msg.Payload()))
}

This is example the doc shows. I‘m consufed with how to handle the reader. If a connection is aborted(not program close it), would the reader break?

@frederickdark
Copy link

Thanks for your question!
I'm facing the same situation with a consumer:

    client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL: "pulsar://localhost:6650",
    })
    if err != nil {
        fmt.Println("Error creating the client:", err)
        return
    }
    defer client.Close()

    consumer, err := client.Subscribe(pulsar.ConsumerOptions{
        Topic:            "persistent://public/default/my-topic",
        SubscriptionName: "my-sub",
    })
    if err != nil {
        fmt.Println("Error creating the consumer:", err)
        return
    }
    defer consumer.Close()

    for {
        // From this point on, no matter what happens with the client's connection,
        // my consumer doesn't receive an error about it and keeps waiting for new messages
        msg, err := consumer.Receive(context.Background())
        if err != nil {
            fmt.Println("Error receiving the message:", err)
            break
        }
        fmt.Printf("Message received: %s\n", string(msg.Payload()))
        consumer.Ack(msg)
    }

A'm I missing something?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants