Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

40个分区的topic消息严重不均衡下个别partition无法被consumer消费 #12800

Closed
hepyu opened this issue Nov 15, 2021 · 18 comments
Closed
Labels
deprecated/question Questions should happened in GitHub Discussions

Comments

@hepyu
Copy link

hepyu commented Nov 15, 2021

疑似pulsar-broker的bug,但我无法确认。

目录:
(1).不用英文的原因
(2).pulsar部署版本&细节&架构
(3).问题&现象与使用陈述&脱敏代码
1.问题
2.现象与使用陈述&脱敏代码

(1).不用英文的原因

我英文不好,虽然看懂没有问题,但是如果用英文陈述整个问题&现象的话,无法保证陈述的准确行,故使用中文。

(2).pulsar部署版本&细节&架构

pulsar版本是2.8.0,部署在openjdk11上,具体版本号是:11.0.12。
在aws海外部署,使用机型是c5a.2xlarge(8c16g),一共是3台,每台部署一个broker、bookie、zk。启动命令的参数没有修改都是默认值。

部署详情与细节:
pulsar-7:aws上部署生产级别的5节点pulsar集群
https://mp.weixin.qq.com/s/YwCr-l2WcM4fJVg7NIx_HA

(3).问题&现象与使用陈述&脱敏代码

1.问题
40个分区的topic消息严重不均衡下个别partition无法被consumer消费。最近一次是有两个分区各自堆积到30万左右(backlog值)。
image
这个分区topic消息发送平均大小:
image
pulsar_rate_in:
image
pulsar_rate_out:spike的时间是我们发现不消费的时间,但有可能之前就已经有问题了,这么高是重启了consumer。
image

2.现象与使用陈述&脱敏代码
producer使用的是批量发送(平均8条左右是一批,最多40条是一批),并且是异步发送,使用key-sharding的方式发送到这个topic的不同分区,脱敏代码:

package test;

import com.google.common.collect.Lists;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.StringCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

import org.apache.pulsar.client.api.Producer;

@Service
public class PushService implements IPushService {

	private final static Logger logger = LoggerFactory.getLogger(PushService.class);

	@Resource
	private IMqBackoffService mqBackoffService;

	@Resource(name = "XXXStatusProducer")
	private Producer<byte[]> YYYStatusUUUProducer;

	@Override
	public void pushZZZPPPRRRStatus(Long userId, List<YYYStatusResp> RRRList) {
		String content = GsonUtil.beanToJsonString(RRRList);
		try {
			//使用的是异步,且批量的发送方式,按照key做sharding发送到topic不同的partition.
			YYYStatusUUUProducer.newMessage().key(String.valueOf(userId))
					.value(content.getBytes(StandardCharsets.UTF_8)).sendAsync().exceptionally((e -> {
						logger.error("send sync ZZZ PPP RRR status error,content:{}", content, e);
						// 如果发送失败,会将异步发送失败的消息存到aws的aurora数据库,使用数据库的本地事务(shardingjdbc4.1.1),事务使用的注解方式.
						mqBackoffService.saveYYYStatus(RRRList);
						return null;
					}));

		} catch (Exception e) {
			logger.error("send ZZZ PPP RRR status error,content:{}", content, e);
			mqBackoffService.saveYYYStatus(RRRList);
		}
	}

}

consumer使用的是key-sharding方式消费,脱敏代码:

package test;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.List;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;

@Component
public class XXXResultMessageListener implements MessageListener<byte[]> {

	private static final Logger log = LoggerFactory.getLogger(XXXResultMessageListener.class);

	private final IoooService oooService;

	public XXXResultMessageListener(IoooService oooService) {
		this.oooService = oooService;
	}

	@SneakyThrows
	@LogTid
	@Override
	public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
		String body = new String(msg.getValue());
		try {
			if (StrUtil.isBlank(body)) {
				log.warn("YYY result received msg value is null messageId:{}", msg.getMessageId());
				consumer.acknowledge(msg.getMessageId());
				return;
			}
			log.info("YYY result,topicName:{},message:{}", msg.getTopicName(), body);
			List<TTTUUUStatusResp> TTTUUUStatusRespList = JSONUtil.toList(body, TTTUUUStatusResp.class);
			if (CollUtil.isNotEmpty(TTTUUUStatusRespList)) {
				TTTUUUStatusResp TTTUUUStatusResp = TTTUUUStatusRespList.get(0);
				if (UserTypeEnum.PPP.getCode().equals(TTTUUUStatusResp.getUserType())) {
					// 只是设置标志,是否打印日志.
					ThreadFilter.setPrintFlag(false);
				}
				// 同步处理,等处理完成后再获取消息,涉及到的aurora数据库的本地事务,使用的是shardingjdbc4.1.1,事务都是用的注解方式.
				oooService.vvvBatchReceived(TTTUUUStatusRespList);
			}
			consumer.acknowledge(msg.getMessageId());
		} catch (Exception e) {
			if (e instanceof BaseRuntimeException) {
				// BaseRuntimeException是业务异常,业务规定不用处理,直接返回ack.
				consumer.acknowledge(msg.getMessageId());
			} else {
				log.error("YYY result Failed to process message error,data is {},e:{}", body, e.getStackTrace(), e);
				// 如果业务处理失败,告知pulsar重新消费.
				consumer.negativeAcknowledge(msg);
			}
			throw e;
		} finally {
			ThreadFilter.removePrintFlag();
		}

	}

}

@hepyu
Copy link
Author

hepyu commented Nov 15, 2021

补充一下:
这个40个分区的topic,现在是有2个分区啊严重堆积,各堆积了30万不消费。但是重启broker后会消费,但是当消费万1~2万消息后又不消费了,再次重启又消费一点然后停止,反复如此。而且我们一直给的压力都很大,压极限,之前也经常出现处理能力不足造成的几十万消息堆积,但是处理能力上来后都可以平滑的消费万,但是上周11.12号突然出现消费不了的情况。

综上现象,是我怀疑是pulsar-broker的疑似bug的原因。

@hepyu
Copy link
Author

hepyu commented Nov 15, 2021

补充2:
我也在问题分区上查看了pulsar-broker的所有线程,没有死锁。

@hepyu
Copy link
Author

hepyu commented Nov 15, 2021

补充3:
经过不断查、重启,现在连个分区还各余不到10万堆积。
image

arthas查看dashboard如下:
image
是内存不够了么?

threads:
image

@hepyu
Copy link
Author

hepyu commented Nov 15, 2021

补充4:

我查阅了2.8.1release的fix list,注意到了2.8.1的这个修复:
[broker] Fix issue where Key_Shared consumers could get stuck #10920
#10920

然后我同样看了下问题分区的stats,和上述issue中的stats做对比,不是同一个现象,但确实有些问题:
1.consumers竟然是空的,也就是没有消费者,那肯定没有办法消费了吧?在issue#10920中consumers是有消费者的。
2.subscriptions下的msgBacklog和msgBacklogNoDelayed有值且相等且正确,但是backlogSize确实0,这个不对吧?
image

下附脱敏后的topic stats(只把订阅者名称改了,其余都保持现场):

{
  "count" : 0,
  "msgRateIn" : 0.0,
  "msgThroughputIn" : 0.0,
  "msgRateOut" : 0.0,
  "msgThroughputOut" : 0.0,
  "bytesInCounter" : 0,
  "msgInCounter" : 0,
  "bytesOutCounter" : 10241853528,
  "msgOutCounter" : 7697026,
  "averageMsgSize" : 0.0,
  "msgChunkPublished" : false,
  "storageSize" : 501755617,
  "backlogSize" : 501604741,
  "offloadedStorageSize" : 0,
  "publishers" : [ {
    "count" : 0,
    "accessMode" : "Shared",
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "chunkedMessageRate" : 0.0,
    "producerId" : 83,
    "producerNameOffset" : 133,
    "producerNameLength" : 25,
    "addressOffset" : 79,
    "addressLength" : 17,
    "connectedSinceOffset" : 96,
    "connectedSinceLength" : 32,
    "clientVersionOffset" : 128,
    "clientVersionLength" : 5,
    "stringBuffer" : "/10.0.45.29:411742021-11-15T13:18:47.778522+08:002.8.0pulsar-biz-cluster-29-150/10.0.45.29:411742021-11-15T13:18:47.778522+08:002.8.0pulsar-biz-cluster-29-150",
    "metadata" : { },
    "address" : "/10.0.45.29:41174",
    "connectedSince" : "2021-11-15T13:18:47.778522+08:00",
    "clientVersion" : "2.8.0",
    "producerName" : "pulsar-biz-cluster-29-150"
  }, {
    "count" : 0,
    "accessMode" : "Shared",
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "chunkedMessageRate" : 0.0,
    "producerId" : 71,
    "producerNameOffset" : 135,
    "producerNameLength" : 25,
    "addressOffset" : 80,
    "addressLength" : 18,
    "connectedSinceOffset" : 98,
    "connectedSinceLength" : 32,
    "clientVersionOffset" : 130,
    "clientVersionLength" : 5,
    "stringBuffer" : "/10.0.44.204:493302021-11-15T13:23:34.155674+08:002.8.0pulsar-biz-cluster-29-168/10.0.44.204:493302021-11-15T13:23:34.155674+08:002.8.0pulsar-biz-cluster-29-168",
    "metadata" : { },
    "address" : "/10.0.44.204:49330",
    "connectedSince" : "2021-11-15T13:23:34.155674+08:00",
    "clientVersion" : "2.8.0",
    "producerName" : "pulsar-biz-cluster-29-168"
  }, {
    "count" : 0,
    "accessMode" : "Shared",
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "chunkedMessageRate" : 0.0,
    "producerId" : 83,
    "producerNameOffset" : 135,
    "producerNameLength" : 25,
    "addressOffset" : 80,
    "addressLength" : 18,
    "connectedSinceOffset" : 98,
    "connectedSinceLength" : 32,
    "clientVersionOffset" : 130,
    "clientVersionLength" : 5,
    "stringBuffer" : "/10.0.47.110:590002021-11-15T13:18:45.181149+08:002.8.0pulsar-biz-cluster-29-129/10.0.47.110:590002021-11-15T13:18:45.181149+08:002.8.0pulsar-biz-cluster-29-129",
    "metadata" : { },
    "address" : "/10.0.47.110:59000",
    "connectedSince" : "2021-11-15T13:18:45.181149+08:00",
    "clientVersion" : "2.8.0",
    "producerName" : "pulsar-biz-cluster-29-129"
  } ],
  "waitingPublishers" : 0,
  "subscriptions" : {
    "xxx-web" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "bytesOutCounter" : 10241853528,
      "msgOutCounter" : 7697026,
      "msgRateRedeliver" : 0.0,
      "chunkedMessageRate" : 0,
      "msgBacklog" : 90862,
      "backlogSize" : 0,
      "msgBacklogNoDelayed" : 90862,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "type" : "Key_Shared",
      "msgRateExpired" : 0.0,
      "totalMsgExpired" : 0,
      "lastExpireTimestamp" : 0,
      "lastConsumedFlowTimestamp" : 1636951642402,
      "lastConsumedTimestamp" : 0,
      "lastAckedTimestamp" : 0,
      "lastMarkDeleteAdvancedTimestamp" : 1636948251670,
      "consumers" : [ ],
      "isDurable" : true,
      "isReplicated" : false,
      "consumersAfterMarkDeletePosition" : { },
      "nonContiguousDeletedMessagesRanges" : 46,
      "nonContiguousDeletedMessagesRangesSerializedSize" : 810,
      "durable" : true,
      "replicated" : false
    }
  },
  "replication" : { },
  "deduplicationStatus" : "Disabled",
  "nonContiguousDeletedMessagesRanges" : 46,
  "nonContiguousDeletedMessagesRangesSerializedSize" : 810
}

@hepyu
Copy link
Author

hepyu commented Nov 15, 2021

补充5:

顺着补充4, 我查看了这个topic其他正常的分区,正常分区的stats是:
1.consumers有消费者实例,正确。
2.subscriptions下的msgBacklog和msgBacklogNoDelayed,backlogSize有值且相等且正确,都是0。
image

也就是说,在这种场景下,somehow的未知原因会导致严重堆积/不均衡的分区会丢失自己的consumer?

下附脱敏后的topic stats(只把订阅者名称改了,其余都保持现场):

{
  "count" : 0,
  "msgRateIn" : 0.0,
  "msgThroughputIn" : 0.0,
  "msgRateOut" : 0.0,
  "msgThroughputOut" : 0.0,
  "bytesInCounter" : 0,
  "msgInCounter" : 0,
  "bytesOutCounter" : 0,
  "msgOutCounter" : 0,
  "averageMsgSize" : 0.0,
  "msgChunkPublished" : false,
  "storageSize" : 0,
  "backlogSize" : 0,
  "offloadedStorageSize" : 0,
  "publishers" : [ {
    "count" : 0,
    "accessMode" : "Shared",
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "chunkedMessageRate" : 0.0,
    "producerId" : 80,
    "producerNameOffset" : 133,
    "producerNameLength" : 25,
    "addressOffset" : 79,
    "addressLength" : 17,
    "connectedSinceOffset" : 96,
    "connectedSinceLength" : 32,
    "clientVersionOffset" : 128,
    "clientVersionLength" : 5,
    "stringBuffer" : "/10.0.45.29:519842021-11-15T13:18:47.778107+08:002.8.0pulsar-biz-cluster-27-792/10.0.45.29:519842021-11-15T13:18:47.778107+08:002.8.0pulsar-biz-cluster-27-792",
    "metadata" : { },
    "address" : "/10.0.45.29:51984",
    "connectedSince" : "2021-11-15T13:18:47.778107+08:00",
    "clientVersion" : "2.8.0",
    "producerName" : "pulsar-biz-cluster-27-792"
  }, {
    "count" : 0,
    "accessMode" : "Shared",
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "chunkedMessageRate" : 0.0,
    "producerId" : 68,
    "producerNameOffset" : 135,
    "producerNameLength" : 25,
    "addressOffset" : 80,
    "addressLength" : 18,
    "connectedSinceOffset" : 98,
    "connectedSinceLength" : 32,
    "clientVersionOffset" : 130,
    "clientVersionLength" : 5,
    "stringBuffer" : "/10.0.44.204:424662021-11-15T13:23:34.155559+08:002.8.0pulsar-biz-cluster-27-829/10.0.44.204:424662021-11-15T13:23:34.155559+08:002.8.0pulsar-biz-cluster-27-829",
    "metadata" : { },
    "address" : "/10.0.44.204:42466",
    "connectedSince" : "2021-11-15T13:23:34.155559+08:00",
    "clientVersion" : "2.8.0",
    "producerName" : "pulsar-biz-cluster-27-829"
  }, {
    "count" : 0,
    "accessMode" : "Shared",
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "chunkedMessageRate" : 0.0,
    "producerId" : 80,
    "producerNameOffset" : 135,
    "producerNameLength" : 25,
    "addressOffset" : 80,
    "addressLength" : 18,
    "connectedSinceOffset" : 98,
    "connectedSinceLength" : 32,
    "clientVersionOffset" : 130,
    "clientVersionLength" : 5,
    "stringBuffer" : "/10.0.47.110:367542021-11-15T13:18:45.180869+08:002.8.0pulsar-biz-cluster-27-740/10.0.47.110:367542021-11-15T13:18:45.180869+08:002.8.0pulsar-biz-cluster-27-740",
    "metadata" : { },
    "address" : "/10.0.47.110:36754",
    "connectedSince" : "2021-11-15T13:18:45.180869+08:00",
    "clientVersion" : "2.8.0",
    "producerName" : "pulsar-biz-cluster-27-740"
  } ],
  "waitingPublishers" : 0,
  "subscriptions" : {
    "xxx-web" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "bytesOutCounter" : 0,
      "msgOutCounter" : 0,
      "msgRateRedeliver" : 0.0,
      "chunkedMessageRate" : 0,
      "msgBacklog" : 0,
      "backlogSize" : 0,
      "msgBacklogNoDelayed" : 0,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "type" : "Key_Shared",
      "msgRateExpired" : 0.0,
      "totalMsgExpired" : 0,
      "lastExpireTimestamp" : 0,
      "lastConsumedFlowTimestamp" : 1636949155301,
      "lastConsumedTimestamp" : 0,
      "lastAckedTimestamp" : 0,
      "lastMarkDeleteAdvancedTimestamp" : 0,
      "consumers" : [ {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 0,
        "msgOutCounter" : 0,
        "msgRateRedeliver" : 0.0,
        "chunkedMessageRate" : 0.0,
        "consumerName" : "8d143",
        "availablePermits" : 1000,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 1000,
        "blockedConsumerOnUnackedMsgs" : false,
        "readPositionWhenJoining" : "320266:0",
        "addressOffset" : 53,
        "addressLength" : 16,
        "connectedSinceOffset" : 69,
        "connectedSinceLength" : 32,
        "clientVersionOffset" : 101,
        "clientVersionLength" : 5,
        "lastAckedTimestamp" : 0,
        "lastConsumedTimestamp" : 0,
        "keyHashRanges" : [ "[0, 65536]" ],
        "metadata" : { },
        "stringBuffer" : "/10.0.36.1:501262021-11-15T12:05:51.210651+08:002.8.0/10.0.36.1:501262021-11-15T12:05:51.210651+08:002.8.0",
        "address" : "/10.0.36.1:50126",
        "connectedSince" : "2021-11-15T12:05:51.210651+08:00",
        "clientVersion" : "2.8.0"
      } ],
      "isDurable" : true,
      "isReplicated" : false,
      "consumersAfterMarkDeletePosition" : { },
      "nonContiguousDeletedMessagesRanges" : 0,
      "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
      "durable" : true,
      "replicated" : false
    }
  },
  "replication" : { },
  "deduplicationStatus" : "Disabled",
  "nonContiguousDeletedMessagesRanges" : 0,
  "nonContiguousDeletedMessagesRangesSerializedSize" : 0
}

@hepyu
Copy link
Author

hepyu commented Nov 15, 2021

补充6:
出现补充5的情况是因为消费者的jvm假死了,消费者是两台机器(8c16g),都假死了,但是奇怪的是第一台机器的所关联的分区还在(topics stats可以看到),第二台机器所关联的分区不在了(topics stats看到的consuemrs为空)。

假死原因是因为垃圾回收用的是G1,之前用ZGC的时候,即使积压了几十万,从来没死过,过会就缓过来了。

现在我把consumer重启(垃圾回收还是G1),问题分区的topic stats:然后跑一跑又停了(消费者hang住了),但是topic stats中的consuemrs还在,但是我估计等一会又会消失,正在观察。
image

{
  "count" : 0,
  "msgRateIn" : 0.0,
  "msgThroughputIn" : 0.0,
  "msgRateOut" : 21133.17315796585,
  "msgThroughputOut" : 3.738464191451498E7,
  "bytesInCounter" : 0,
  "msgInCounter" : 0,
  "bytesOutCounter" : 19747414784,
  "msgOutCounter" : 13204468,
  "averageMsgSize" : 0.0,
  "msgChunkPublished" : false,
  "storageSize" : 463806140,
  "backlogSize" : 454731490,
  "offloadedStorageSize" : 0,
  "publishers" : [ {
    "count" : 0,
    "accessMode" : "Shared",
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "chunkedMessageRate" : 0.0,
    "producerId" : 83,
    "producerNameOffset" : 133,
    "producerNameLength" : 25,
    "addressOffset" : 79,
    "addressLength" : 17,
    "connectedSinceOffset" : 96,
    "connectedSinceLength" : 32,
    "clientVersionOffset" : 128,
    "clientVersionLength" : 5,
    "stringBuffer" : "/10.0.45.29:411742021-11-15T13:18:47.778522+08:002.8.0pulsar-biz-cluster-29-150/10.0.45.29:411742021-11-15T13:18:47.778522+08:002.8.0pulsar-biz-cluster-29-150",
    "metadata" : { },
    "address" : "/10.0.45.29:41174",
    "connectedSince" : "2021-11-15T13:18:47.778522+08:00",
    "clientVersion" : "2.8.0",
    "producerName" : "pulsar-biz-cluster-29-150"
  }, {
    "count" : 0,
    "accessMode" : "Shared",
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "chunkedMessageRate" : 0.0,
    "producerId" : 71,
    "producerNameOffset" : 135,
    "producerNameLength" : 25,
    "addressOffset" : 80,
    "addressLength" : 18,
    "connectedSinceOffset" : 98,
    "connectedSinceLength" : 32,
    "clientVersionOffset" : 130,
    "clientVersionLength" : 5,
    "stringBuffer" : "/10.0.44.204:493302021-11-15T13:23:34.155674+08:002.8.0pulsar-biz-cluster-29-168/10.0.44.204:493302021-11-15T13:23:34.155674+08:002.8.0pulsar-biz-cluster-29-168",
    "metadata" : { },
    "address" : "/10.0.44.204:49330",
    "connectedSince" : "2021-11-15T13:23:34.155674+08:00",
    "clientVersion" : "2.8.0",
    "producerName" : "pulsar-biz-cluster-29-168"
  }, {
    "count" : 0,
    "accessMode" : "Shared",
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "chunkedMessageRate" : 0.0,
    "producerId" : 83,
    "producerNameOffset" : 135,
    "producerNameLength" : 25,
    "addressOffset" : 80,
    "addressLength" : 18,
    "connectedSinceOffset" : 98,
    "connectedSinceLength" : 32,
    "clientVersionOffset" : 130,
    "clientVersionLength" : 5,
    "stringBuffer" : "/10.0.47.110:590002021-11-15T13:18:45.181149+08:002.8.0pulsar-biz-cluster-29-129/10.0.47.110:590002021-11-15T13:18:45.181149+08:002.8.0pulsar-biz-cluster-29-129",
    "metadata" : { },
    "address" : "/10.0.47.110:59000",
    "connectedSince" : "2021-11-15T13:18:45.181149+08:00",
    "clientVersion" : "2.8.0",
    "producerName" : "pulsar-biz-cluster-29-129"
  } ],
  "waitingPublishers" : 0,
  "subscriptions" : {
    "xxx-web" : {
      "msgRateOut" : 21133.17315796585,
      "msgThroughputOut" : 3.738464191451498E7,
      "bytesOutCounter" : 19747414784,
      "msgOutCounter" : 13204468,
      "msgRateRedeliver" : 20815.939672078995,
      "chunkedMessageRate" : 0,
      "msgBacklog" : 62630,
      "backlogSize" : 0,
      "msgBacklogNoDelayed" : 62630,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 9701,
      "type" : "Key_Shared",
      "msgRateExpired" : 0.0,
      "totalMsgExpired" : 0,
      "lastExpireTimestamp" : 0,
      "lastConsumedFlowTimestamp" : 1636966578645,
      "lastConsumedTimestamp" : 1636966578572,
      "lastAckedTimestamp" : 1636966579538,
      "lastMarkDeleteAdvancedTimestamp" : 1636966354819,
      "consumers" : [ {
        "msgRateOut" : 9538.202975124479,
        "msgThroughputOut" : 1.6648729276349474E7,
        "bytesOutCounter" : 4681539476,
        "msgOutCounter" : 2743390,
        "msgRateRedeliver" : 9137.052795176734,
        "chunkedMessageRate" : 0.0,
        "consumerName" : "8fc51",
        "availablePermits" : -238,
        "unackedMessages" : 1167,
        "avgMessagesPerEntry" : 12,
        "blockedConsumerOnUnackedMsgs" : false,
        "readPositionWhenJoining" : "319543:108",
        "addressOffset" : 55,
        "addressLength" : 18,
        "connectedSinceOffset" : 73,
        "connectedSinceLength" : 32,
        "clientVersionOffset" : 105,
        "clientVersionLength" : 5,
        "lastAckedTimestamp" : 1636966578552,
        "lastConsumedTimestamp" : 1636966578572,
        "keyHashRanges" : [ "[32769, 65536]" ],
        "metadata" : { },
        "stringBuffer" : "/10.0.42.143:410382021-11-15T16:48:37.950836+08:002.8.0/10.0.42.143:410382021-11-15T16:48:37.950836+08:002.8.0",
        "address" : "/10.0.42.143:41038",
        "connectedSince" : "2021-11-15T16:48:37.950836+08:00",
        "clientVersion" : "2.8.0"
      }, {
        "msgRateOut" : 11594.970182841374,
        "msgThroughputOut" : 2.0735912638165507E7,
        "bytesOutCounter" : 4824021780,
        "msgOutCounter" : 2764052,
        "msgRateRedeliver" : 11678.886876902261,
        "chunkedMessageRate" : 0.0,
        "consumerName" : "95259",
        "availablePermits" : 948,
        "unackedMessages" : 8534,
        "avgMessagesPerEntry" : 20,
        "blockedConsumerOnUnackedMsgs" : false,
        "readPositionWhenJoining" : "319652:1214",
        "addressOffset" : 53,
        "addressLength" : 16,
        "connectedSinceOffset" : 69,
        "connectedSinceLength" : 32,
        "clientVersionOffset" : 101,
        "clientVersionLength" : 5,
        "lastAckedTimestamp" : 1636966579538,
        "lastConsumedTimestamp" : 1636966576817,
        "keyHashRanges" : [ "[0, 32768]" ],
        "metadata" : { },
        "stringBuffer" : "/10.0.36.1:474282021-11-15T16:48:53.143161+08:002.8.0/10.0.36.1:474282021-11-15T16:48:53.143161+08:002.8.0",
        "address" : "/10.0.36.1:47428",
        "connectedSince" : "2021-11-15T16:48:53.143161+08:00",
        "clientVersion" : "2.8.0"
      } ],
      "isDurable" : true,
      "isReplicated" : false,
      "consumersAfterMarkDeletePosition" : { },
      "nonContiguousDeletedMessagesRanges" : 29,
      "nonContiguousDeletedMessagesRangesSerializedSize" : 522,
      "durable" : true,
      "replicated" : false
    }
  },
  "replication" : { },
  "deduplicationStatus" : "Disabled",
  "nonContiguousDeletedMessagesRanges" : 29,
  "nonContiguousDeletedMessagesRangesSerializedSize" : 522
}

@hepyu
Copy link
Author

hepyu commented Nov 15, 2021

补充7:
果然如补充6所猜测的一致,当消费者由于GC假死后,分区的topic stats中的consumers为空,pulsar-broker应该是通过心跳判断consuemr已经断开了。

目前从重现&各种现象下的分析结果是:消费者如果使用的是G1(已经做调优),并且在海量消息段时间涌入时,会出现假死,然后从pulsar-broker上断开。导致无法消费。

image

后边我们会将消费者的GC切换为ZGC验证是否恢复正常。

@hepyu
Copy link
Author

hepyu commented Nov 15, 2021

补充8:

ZGC也不行,经查是因为每批发送40个消息,之前用ZGC的时候是每批发送10个消息。

如下图,是consumer刚恢复时pulsar给consumer吐出的速率,但是一个消费者的TPS是600多(业务重),肯定是无法消费过来的,但还接受海量的消息,会不会把内存中的queue撑爆了?

我的问题是:为啥我的消费能力只有600多,但是pulsar却给我这么海量的消息,我并没有取这么多消息啊。
image

@hepyu
Copy link
Author

hepyu commented Nov 15, 2021

目前阶段定位是:
pulsar-consumer默认使用的是push方式,大量积压后,消费者重启时,pulsar-broker会推送海量消息到consumer,直接把consuemr内存打爆。

@hpvd
Copy link

hpvd commented Nov 15, 2021

just a little translation to make this issue searchable in english language:

Suspected pulsar-broker bug, but I can't confirm it.

Contents.
(1). Reasons for not using English
(2). pulsar deployment version & details & architecture
(3). Problems & Phenomena & Usage Statement & Desensitized Code
1.Problem
2. Phenomenon & usage statement & desensitized code

(1). Reasons for not using English

I am not good at English, although I can read it without any problem, but if I state the whole problem & phenomenon in English, I can't guarantee the accurate line of the statement, so I use Chinese.

(2). pulsar deployment version & details & architecture

pulsar version is 2.8.0, deployed on openjdk11, the specific version number is: 11.0.12.
In aws overseas deployment, the model used is c5a.2xlarge (8c16g), a total of 3 units, each deployed a broker, bookie, zk. start command parameters are not modified are the default value.

Deployment details and details.
pulsar-7: deploying a production-grade 5-node pulsar cluster on aws
https://mp.weixin.qq.com/s/YwCr-l2WcM4fJVg7NIx_HA

(3). Problems & Phenomena & Usage Statement & Desensitization Code

  1. Problem
    Individual partitions cannot be consumed by consumers under severe imbalance of topic messages in 40 partitions. The most recent one is that two partitions are stacked up to about 300,000 each (backlog value).

The average size of this partition topic messages sent.

pulsar_rate_in.

pulsar_rate_out: The time of the spike is the time we found not consumed, but it is possible that there was already a problem before, so high is the restart of the consumer.

  1. phenomenon and the use of statements & desensitization code
    The producer uses a batch send (average of about 8 is a batch, up to 40 is a batch), and is sent asynchronously, using key-sharding to different partitions of this topic, desensitization code.
    [CODE]

consumer is using the key-sharding method of consumption, desensitization code
[CODE]

@codelipenghui
Copy link
Contributor

@hepyu I think there are 2 questions

  1. The consumer stop consuming data from the topic, looks you already point out the root cause
  2. For the second one, I think you can try to reduce the receiver queue size of the consumer, by default you will receive 1000 batches(might more than 1000 messages) when starting the consumer

@codelipenghui codelipenghui added status/triaged deprecated/question Questions should happened in GitHub Discussions and removed type/bug The PR fixed a bug or issue reported a bug labels Nov 17, 2021
@wangjialing218
Copy link
Contributor

you could try to enable pool message in consumer setting, this could reduce GC impact in consumer.
For more reference: https://github.com/apache/pulsar/wiki/PIP-83-:-Pulsar-client:-Message-consumption-with-pooled-buffer

@hepyu
Copy link
Author

hepyu commented Nov 19, 2021

@hepyu I think there are 2 questions

  1. The consumer stop consuming data from the topic, looks you already point out the root cause
  2. For the second one, I think you can try to reduce the receiver queue size of the consumer, by default you will receive 1000 batches(might more than 1000 messages) when starting the consumer

2这个调整也没用。类似的调整消费rate等也是没有用的。都是只能让服务慢点死。

造成本issue的根本原因:
key_shard模式下,pulsar-client会用一个receiverQueue不断接收pulsar-broker推送过来的消息,receiverQueueSize只能限制这个queue的大小,但这个限制没用,只能让服务慢点死。因为每个key_shard都在client分配了一个singleThreadPool来处理,而这个singleThreadPool是一个无界队列,receiverQueue不断接收到消息后转发到key_shard的threadPool中的无界队列后,receiverQueue继续从pulsar-broker接收推送来的消息,直到把所有的key_shard的threadpool的无界队列打到最大极限,把服务吃死。

这个问题只有两种解决方式:
1.增加consumer的消费节点,和每个消费节点的消费能力。但是我们的业务场景是不可能的,因为处理速度是恒定的,我不可能无限加节点去解决这个问题。如果还要用这个方式,当出现这个问题后,只能临时加节点结合不断重启来解决,这样做也很奇葩。
2.改用同步pull的方式去消费,肯定没有问题。对于我们的业务来说,这个是正确选择。

pulsar的这个设计本身来说没有任何问题,push类型本来就是给高tps消费能力的场景使用的。但是我有个建议:是否能在push的这个方案上进行优化,来解决我们遇到的这个问题,比如key_shard的threadpool的无界队列是否可以可配,结合对broker的通知来控制推送强度。不过,这个貌似很麻烦。

这个issue我关了。

@hepyu hepyu closed this as completed Nov 19, 2021
@hepyu
Copy link
Author

hepyu commented Nov 19, 2021

补充一下:
之前有同学说把acktimeout改小,这个也不行,反而会死的更快,因为还没处理完或者在队列中等待时就已经timeout了,pulsar-broker会再次重发,导致灾难更加严重。

@hepyu
Copy link
Author

hepyu commented Nov 22, 2021

附:
pulsar push方式的实现原理:
https://pulsar.apache.org/docs/zh-CN/next/develop-binary-protocol/#%E6%B5%81%E9%87%8F%E6%8E%A7%E5%88%B6

我们最终采取的方案是:不使用client提供的方案,自己实现。
每个key_shard用一个thread消费,从receiveQueue中取出消息扔到一个阻塞队列,这个阻塞队列的长度是1,这样就可以规避这个issue的问题了:真正的根据消费能力通知pulsar-broker往consumer推送消息。

@hepyu
Copy link
Author

hepyu commented Nov 22, 2021

附: pulsar push方式的实现原理: https://pulsar.apache.org/docs/zh-CN/next/develop-binary-protocol/#%E6%B5%81%E9%87%8F%E6%8E%A7%E5%88%B6

我们最终采取的方案是:不使用client提供的方案,自己实现。 每个key_shard用一个thread消费,从receiveQueue中取出消息扔到一个阻塞队列,这个阻塞队列的长度是1,这样就可以规避这个issue的问题了:真正的根据消费能力通知pulsar-broker往consumer推送消息。

@wangjialing218 @hpvd @codelipenghui
强烈建议pulsar-client提供一个这样的类似机制。很实用。

@xuesongxs
Copy link
Contributor

我们使用pulsar 2.8.1版本,在broker.conf中使用key_shared一致性哈希,没有再出现接收不到消息的问题。
subscriptionKeySharedUseConsistentHashing=true

@hepyu
Copy link
Author

hepyu commented Dec 1, 2021

我们使用pulsar 2.8.1版本,在broker.conf中使用key_shared一致性哈希,没有再出现接收不到消息的问题。 subscriptionKeySharedUseConsistentHashing=true

其实这个issue最后查明也不是说”收不到消息“,是因为大量堆积后push大量推,把服务的pulsar-client的无界队列打慢造成内存泄漏(消费能力严重不足),造成了“收不到消息”的现象。

已经解决了,在前文里。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
deprecated/question Questions should happened in GitHub Discussions
Projects
None yet
Development

No branches or pull requests

5 participants