Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

ISSUE-12800: 40个分区的topic消息严重不均衡下个别partition无法被consumer消费 #3272

Closed
sijie opened this issue Nov 15, 2021 · 0 comments

Comments

@sijie
Copy link
Member

sijie commented Nov 15, 2021

Original Issue: apache#12800


疑似pulsar-broker的bug,但我无法确认。

目录:
(1).不用英文的原因
(2).pulsar部署版本&细节&架构
(3).问题&现象与使用陈述&脱敏代码
1.问题
2.现象与使用陈述&脱敏代码

(1).不用英文的原因

我英文不好,虽然看懂没有问题,但是如果用英文陈述整个问题&现象的话,无法保证陈述的准确行,故使用中文。

(2).pulsar部署版本&细节&架构

pulsar版本是2.8.0,部署在openjdk11上,具体版本号是:11.0.12。
在aws海外部署,使用机型是c5a.2xlarge(8c16g),一共是3台,每台部署一个broker、bookie、zk。启动命令的参数没有修改都是默认值。

部署详情与细节:
pulsar-7:aws上部署生产级别的5节点pulsar集群
https://mp.weixin.qq.com/s/YwCr-l2WcM4fJVg7NIx_HA

(3).问题&现象与使用陈述&脱敏代码

1.问题
40个分区的topic消息严重不均衡下个别partition无法被consumer消费。最近一次是有两个分区各自堆积到30万左右(backlog值)。
image
这个分区topic消息发送平均大小:
image
pulsar_rate_in:
image
pulsar_rate_out:spike的时间是我们发现不消费的时间,但有可能之前就已经有问题了,这么高是重启了consumer。
image

2.现象与使用陈述&脱敏代码
producer使用的是批量发送(平均8条左右是一批,最多40条是一批),并且是异步发送,使用key-sharding的方式发送到这个topic的不同分区,脱敏代码:

package test;

import com.google.common.collect.Lists;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.StringCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

import org.apache.pulsar.client.api.Producer;

@Service
public class PushService implements IPushService {

	private final static Logger logger = LoggerFactory.getLogger(PushService.class);

	@Resource
	private IMqBackoffService mqBackoffService;

	@Resource(name = "XXXStatusProducer")
	private Producer<byte[]> YYYStatusUUUProducer;

	@Override
	public void pushZZZPPPRRRStatus(Long userId, List<YYYStatusResp> RRRList) {
		String content = GsonUtil.beanToJsonString(RRRList);
		try {
			//使用的是异步,且批量的发送方式,按照key做sharding发送到topic不同的partition.
			YYYStatusUUUProducer.newMessage().key(String.valueOf(userId))
					.value(content.getBytes(StandardCharsets.UTF_8)).sendAsync().exceptionally((e -> {
						logger.error("send sync ZZZ PPP RRR status error,content:{}", content, e);
						// 如果发送失败,会将异步发送失败的消息存到aws的aurora数据库,使用数据库的本地事务(shardingjdbc4.1.1),事务使用的注解方式.
						mqBackoffService.saveYYYStatus(RRRList);
						return null;
					}));

		} catch (Exception e) {
			logger.error("send ZZZ PPP RRR status error,content:{}", content, e);
			mqBackoffService.saveYYYStatus(RRRList);
		}
	}

}

consumer使用的是key-sharding方式消费,脱敏代码:

package test;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.List;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;

@Component
public class XXXResultMessageListener implements MessageListener<byte[]> {

	private static final Logger log = LoggerFactory.getLogger(XXXResultMessageListener.class);

	private final IoooService oooService;

	public XXXResultMessageListener(IoooService oooService) {
		this.oooService = oooService;
	}

	@SneakyThrows
	@LogTid
	@Override
	public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
		String body = new String(msg.getValue());
		try {
			if (StrUtil.isBlank(body)) {
				log.warn("YYY result received msg value is null messageId:{}", msg.getMessageId());
				consumer.acknowledge(msg.getMessageId());
				return;
			}
			log.info("YYY result,topicName:{},message:{}", msg.getTopicName(), body);
			List<TTTUUUStatusResp> TTTUUUStatusRespList = JSONUtil.toList(body, TTTUUUStatusResp.class);
			if (CollUtil.isNotEmpty(TTTUUUStatusRespList)) {
				TTTUUUStatusResp TTTUUUStatusResp = TTTUUUStatusRespList.get(0);
				if (UserTypeEnum.PPP.getCode().equals(TTTUUUStatusResp.getUserType())) {
					// 只是设置标志,是否打印日志.
					ThreadFilter.setPrintFlag(false);
				}
				// 同步处理,等处理完成后再获取消息,涉及到的aurora数据库的本地事务,使用的是shardingjdbc4.1.1,事务都是用的注解方式.
				oooService.vvvBatchReceived(TTTUUUStatusRespList);
			}
			consumer.acknowledge(msg.getMessageId());
		} catch (Exception e) {
			if (e instanceof BaseRuntimeException) {
				// BaseRuntimeException是业务异常,业务规定不用处理,直接返回ack.
				consumer.acknowledge(msg.getMessageId());
			} else {
				log.error("YYY result Failed to process message error,data is {},e:{}", body, e.getStackTrace(), e);
				// 如果业务处理失败,告知pulsar重新消费.
				consumer.negativeAcknowledge(msg);
			}
			throw e;
		} finally {
			ThreadFilter.removePrintFlag();
		}

	}

}

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

1 participant