Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

golang confluent kafka producer client running into error - Failed to create thread: Resource temporarily unavailable (11) #1177

Open
6 tasks
Mukthar-am opened this issue Apr 16, 2024 · 0 comments

Comments

@Mukthar-am
Copy link

Description

I am running a golang based webserver which is a ingestion gateway for Kafka. This web service, receives HTTP connections from different clients and ingests into AWS MSK.

In my case, I have implemented batching, trying to ingest a batch of 50 messages where each message is of 1 KB and using async producer of golang confluent kafka.

My goal is to have higher ingestion rates so while perf testing, I run into the below error after a few min's. I could see that the ingestion rate pumps up to as high as 500 messages per second but later golang server crashes.

*** Error

Failed to create thread: Resource temporarily unavailable (11)

How to reproduce

func AsyncBatchProduce(messages [][]byte, topic string) {
LOG.Debug("Init async BATCH kafka producer with producer configs", zap.Any("BrokerConfigs", kConf))

kafkaProducerInstance, producerError := kafka.NewProducer(kConf)
if producerError != nil {
	LOG.Error("failed to create new kafka async batch producer instance", zap.Any("create-producer-error", producerError))
} else {
	LOG.Info("successfully obtained kafka async batch producer instance")
}

// Produce messages to topic (asynchronously)
deliveryChan := make(chan kafka.Event, 10000)
defer close(deliveryChan)

// Channel to receive errors
produceErrChan := make(chan error)

// dump all the messages into the producer instance
for mId, message := range messages {
	LOG.Debug("batch producer iterator", zap.Any("message-id", mId))
	err := kafkaProducerInstance.Produce(
		&kafka.Message{
			TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
			Value:          message,
		}, deliveryChan)

	if err != nil {
		LOG.Error("producer instance error", zap.Any("producer-error", err))
		produceErrChan <- err
	}
}

// Delivery report handler for produced messages
go func() {
	for e := range kafkaProducerInstance.Events() {

		switch ev := e.(type) {
		case *kafka.Message:
			part := ev.TopicPartition
			LOG.Debug(part.String())

			if ev.TopicPartition.Error != nil {
				LOG.Error("= Failed persisting message to Kafka",
					zap.Any("topic", &ev.TopicPartition.Topic),
					zap.Any("persistence-error", ev.TopicPartition.Error))
			} else {
				LOG.Info("= Successfully persisted message to ",
					zap.Any("topic", &ev.TopicPartition.Topic),
					zap.Any("partition", ev.TopicPartition.Partition),
					zap.Any("offset", ev.TopicPartition.Offset))
			}

		case kafka.Error:
			LOG.Info("encountered kafka broker connection errors, opting fallback persistence route")

			// create kafkaConnectionError.txt file, flagging that the kafka system is down
			fileutils.WriteToFileIfNotExists(kafkaConnectionErrorFile, kafkaConnectionErrorFileContent)

			kafkaProducerInstance.Flush(produceTimeout)
			fileWriteChannelizer.HandleBatchedMessage(messages)

			return
		default:
			LOG.Error("Event Ignored", zap.Any("kafka.Message", ev))
		}
	}
}()

// Check for errors
select {
case err := <-produceErrChan:
	if opError, ok := err.(*kafka.Error); ok && opError.IsFatal() && opError.Code() == kafka.ErrLeaderNotAvailable {
		LOG.Error("Connection refused error", zap.Any("opError", opError))
	} else {
		LOG.Error("Error sending message", zap.Any("err", err))
	}
default:
	LOG.Debug("submitted message successfully to async message committer")
}

channelOut := <-deliveryChan
messageReport := channelOut.(*kafka.Message)

if messageReport.TopicPartition.Error != nil {
	LOG.Info("topic partition error", zap.Any("partition error", messageReport.TopicPartition.Error.Error()))

} else {
	// ensure to delete the kafkaConnectionErrorFile, flagging the cron to transfer s3 backed messages to kafka
	fileutils.DeleteFile(kafkaConnectionErrorFile)
	LOG.Info("successfully persisted message",
		zap.Any("batch-size", len(messages)),
		zap.Any("topic", messageReport.TopicPartition.Topic),
		zap.Any("offset", messageReport.TopicPartition.Offset),
		zap.Any("partition", messageReport.TopicPartition.Partition))
}

//defer close(deliveryChan)

// Wait for message deliveries before shutting down
kafkaProducerInstance.Flush(produceTimeout)

}

Checklist

Please provide the following information:

  • confluent-kafka-go and librdkafka version (LibraryVersion()): github.com/confluentinc/confluent-kafka-go v1.9.2 and librdkafka not part of my project
  • Client configuration: ConfigMap{...}
  • Operating system: Ubuntu 18.04.6 LTS
  • Provide client logs (with "debug": ".." as necessary)
  • Provide broker log excerpts
  • Critical issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant