Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broker OOM and memory leak, possibly due to ledger loss #12716

Closed
YanshuoH opened this issue Nov 10, 2021 · 12 comments
Closed

Broker OOM and memory leak, possibly due to ledger loss #12716

YanshuoH opened this issue Nov 10, 2021 · 12 comments
Assignees
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@YanshuoH
Copy link

Hi,

Describe the bug

Recently encountered a memory leak in Pulsar Broker. I've tried to identify the issue with logs like below.

I've enabled io.netty.leakDetectionLevel up to paranoid and io.netty.leakDetection.targetRecords=40, didn't find any leak messages appear.

We have 4 pulsar clusters:

  • version 2.7.3
  • all running in Kubernetes 1.20. Only one cluster have encountered such issue.

The only difference of the cluster in question is that I've enabled Geo-Replication on all namespaces. So I disable it, but the memory leak remains.

14:44:01.497 [pulsar-io-23-2] ERROR org.apache.pulsar.PulsarBrokerStarter - -- Shutting down - Received OOM exception: failed to allocate 16777216 byte(s) of direct memory (used: 1073741824, max: 1073741824)
io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 1073741824, max: 1073741824)
	at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:776) ~[io.netty-netty-common-4.1.60.Final.jar:4.1.60.Final]
	at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:731) ~[io.netty-netty-common-4.1.60.Final.jar:4.1.60.Final]
	at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:645) ~[io.netty-netty-buffer-4.1.60.Final.jar:4.1.60.Final]
	at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:621) ~[io.netty-netty-buffer-4.1.60.Final.jar:4.1.60.Final]
	at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:204) ~[io.netty-netty-buffer-4.1.60.Final.jar:4.1.60.Final]
	at io.netty.buffer.PoolArena.tcacheAllocateNormal(PoolArena.java:188) ~[io.netty-netty-buffer-4.1.60.Final.jar:4.1.60.Final]
	at io.netty.buffer.PoolArena.allocate(PoolArena.java:138) ~[io.netty-netty-buffer-4.1.60.Final.jar:4.1.60.Final]
	at io.netty.buffer.PoolArena.allocate(PoolArena.java:128) ~[io.netty-netty-buffer-4.1.60.Final.jar:4.1.60.Final]
	at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:378) ~[io.netty-netty-buffer-4.1.60.Final.jar:4.1.60.Final]
	at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187) ~[io.netty-netty-buffer-4.1.60.Final.jar:4.1.60.Final]
	at org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:164) [org.apache.bookkeeper-bookkeeper-common-allocator-4.12.0.jar:4.12.0]
	at org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:158) [org.apache.bookkeeper-bookkeeper-common-allocator-4.12.0.jar:4.12.0]
	at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187) [io.netty-netty-buffer-4.1.60.Final.jar:4.1.60.Final]
	at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178) [io.netty-netty-buffer-4.1.60.Final.jar:4.1.60.Final]
	at io.netty.channel.unix.PreferredDirectByteBufAllocator.ioBuffer(PreferredDirectByteBufAllocator.java:53) [io.netty-netty-transport-native-unix-common-4.1.60.Final-linux-x86_64.jar:4.1.60.Final]
	at io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114) [io.netty-netty-transport-4.1.60.Final.jar:4.1.60.Final]
	at io.netty.channel.epoll.EpollRecvByteAllocatorHandle.allocate(EpollRecvByteAllocatorHandle.java:75) [io.netty-netty-transport-native-epoll-4.1.60.Final-linux-x86_64.jar:4.1.60.Final]
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:780) [io.netty-netty-transport-native-epoll-4.1.60.Final-linux-x86_64.jar:4.1.60.Final]
	at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe$1.run(AbstractEpollChannel.java:425) [io.netty-netty-transport-native-epoll-4.1.60.Final-linux-x86_64.jar:4.1.60.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) [io.netty-netty-common-4.1.60.Final.jar:4.1.60.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) [io.netty-netty-common-4.1.60.Final.jar:4.1.60.Final]
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) [io.netty-netty-transport-native-epoll-4.1.60.Final-linux-x86_64.jar:4.1.60.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [io.netty-netty-common-4.1.60.Final.jar:4.1.60.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [io.netty-netty-common-4.1.60.Final.jar:4.1.60.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.60.Final.jar:4.1.60.Final]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_302]

Screenshots

It is only a testing cluster with not much memory configured. But I believe the screenshots can show the leak.

Screen Shot 2021-11-10 at 9 35 54 AM

Screen Shot 2021-11-10 at 9 36 14 AM

Screen Shot 2021-11-10 at 9 35 41 AM

Screen Shot 2021-11-10 at 9 37 34 AM

Additional context

I might take a wild guess that the memory leak may come from Bookie side data loss.
I've found some error logs in it:

11:51:51.568 [BookieReadThreadPool-OrderedExecutor-3-0] INFO  org.apache.bookkeeper.proto.ReadEntryProcessorV3 - No ledger found while reading entry: 0 from ledger: 242603
11:51:52.828 [bookie-io-1-1] INFO  org.apache.bookkeeper.proto.BookieRequestHandler - Channels disconnected: [id: 0x5d119911, L:/10.119.139.211:3181 ! R:/10.119.131.152:34376]
11:52:10.560 [bookie-io-1-2] INFO  org.apache.bookkeeper.proto.AuthHandler - Authentication success on server side
11:52:10.560 [bookie-io-1-2] INFO  org.apache.bookkeeper.proto.BookieRequestHandler - Channel connected  [id: 0xb66b4346, L:/10.119.139.211:3181 - R:/10.119.164.211:54744]
11:52:11.149 [BookieReadThreadPool-OrderedExecutor-0-0] ERROR org.apache.bookkeeper.proto.ReadLacProcessorV3 - No ledger found while performing readLac from ledger: 242603
org.apache.bookkeeper.bookie.Bookie$NoLedgerException: Ledger 242603 not found
	at org.apache.bookkeeper.bookie.LedgerDescriptor.createReadOnly(LedgerDescriptor.java:52) ~[org.apache.bookkeeper-bookkeeper-server-4.12.0.jar:4.12.0]
	at org.apache.bookkeeper.bookie.HandleFactoryImpl.getReadOnlyHandle(HandleFactoryImpl.java:61) ~[org.apache.bookkeeper-bookkeeper-server-4.12.0.jar:4.12.0]
	at org.apache.bookkeeper.bookie.Bookie.getExplicitLac(Bookie.java:1364) ~[org.apache.bookkeeper-bookkeeper-server-4.12.0.jar:4.12.0]
	at org.apache.bookkeeper.proto.ReadLacProcessorV3.getReadLacResponse(ReadLacProcessorV3.java:71) [org.apache.bookkeeper-bookkeeper-server-4.12.0.jar:4.12.0]
	at org.apache.bookkeeper.proto.ReadLacProcessorV3.safeRun(ReadLacProcessorV3.java:118) [org.apache.bookkeeper-bookkeeper-server-4.12.0.jar:4.12.0]
	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.12.0.jar:4.12.0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_302]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_302]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.60.Final.jar:4.1.60.Final]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_302]
11:52:11.150 [BookieReadThreadPool-OrderedExecutor-0-0] ERROR org.apache.bookkeeper.proto.ReadLacProcessorV3 - No ledger found while trying to read last entry: 242603
org.apache.bookkeeper.bookie.Bookie$NoLedgerException: Ledger 242603 not found
	at org.apache.bookkeeper.bookie.LedgerDescriptor.createReadOnly(LedgerDescriptor.java:52) ~[org.apache.bookkeeper-bookkeeper-server-4.12.0.jar:4.12.0]
	at org.apache.bookkeeper.bookie.HandleFactoryImpl.getReadOnlyHandle(HandleFactoryImpl.java:61) ~[org.apache.bookkeeper-bookkeeper-server-4.12.0.jar:4.12.0]
	at org.apache.bookkeeper.bookie.Bookie.readEntry(Bookie.java:1441) ~[org.apache.bookkeeper-bookkeeper-server-4.12.0.jar:4.12.0]
	at org.apache.bookkeeper.proto.ReadLacProcessorV3.getReadLacResponse(ReadLacProcessorV3.java:86) [org.apache.bookkeeper-bookkeeper-server-4.12.0.jar:4.12.0]
	at org.apache.bookkeeper.proto.ReadLacProcessorV3.safeRun(ReadLacProcessorV3.java:118) [org.apache.bookkeeper-bookkeeper-server-4.12.0.jar:4.12.0]
	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.12.0.jar:4.12.0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_302]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_302]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.60.Final.jar:4.1.60.Final]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_302]
11:52:11.163 [BookieReadThreadPool-OrderedExecutor-3-0] INFO  org.apache.bookkeeper.proto.ReadEntryProcessorV3 - No ledger found while reading entry: 0 from ledger: 242603

This is because we've encountered some Kubernetes Node issue long times ago and some of the ledgers are unintentionally deleted.

And I can confirm that all the bookies are working just fine without any memory leak issue (except the above exceptions)

Tell me if there's any clue or anything I can do to diagnostic the memory leak.

Thanks.

@codelipenghui
Copy link
Contributor

@yonyong @hangc0276 Please help check the issue.

@YanshuoH
Copy link
Author

After some JVM investigation, I've used jcmd -dump:format=b,file=dump.bin 1. Found something interesting in the dominant_tree (using MAT):
Screen Shot 2021-11-18 at 9 34 42 PM

path2gc:

Screen Shot 2021-11-18 at 9 40 54 PM

Not sure if the pulsar_expired_token is the coupable or just intended.

Tell me if you need more information with the dump.

@YanshuoH
Copy link
Author

YanshuoH commented Nov 18, 2021

And finally after correctly turning on the leak detection,

So we can see where the leak comes from.

The leak log is quite long, click details to reveal it.

13:41:14.876 [bookkeeper-io-12-2] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
#1:
	io.netty.buffer.AbstractPooledDerivedByteBuf.deallocate(AbstractPooledDerivedByteBuf.java:86)
	io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
	io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
	org.apache.bookkeeper.mledger.impl.EntryImpl.deallocate(EntryImpl.java:164)
	org.apache.bookkeeper.mledger.util.AbstractCASReferenceCounted.release0(AbstractCASReferenceCounted.java:104)
	org.apache.bookkeeper.mledger.util.AbstractCASReferenceCounted.release(AbstractCASReferenceCounted.java:87)
	org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:121)
	org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.java:226)
	org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readEntriesComplete(PersistentDispatcherMultipleConsumers.java:477)
	org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$checkReadCompletion$2(OpReadEntry.java:150)
	org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
	org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
	org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.lang.Thread.run(Thread.java:748)
#2:
	org.apache.bookkeeper.mledger.impl.EntryImpl.deallocate(EntryImpl.java:164)
	org.apache.bookkeeper.mledger.util.AbstractCASReferenceCounted.release0(AbstractCASReferenceCounted.java:104)
	org.apache.bookkeeper.mledger.util.AbstractCASReferenceCounted.release(AbstractCASReferenceCounted.java:87)
	org.apache.bookkeeper.mledger.util.RangeCache.removeRange(RangeCache.java:144)
	org.apache.bookkeeper.mledger.impl.EntryCacheImpl.invalidateEntries(EntryCacheImpl.java:157)
	org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.doCacheEviction(ManagedLedgerImpl.java:1921)
	org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda$doCacheEviction$5(ManagedLedgerFactoryImpl.java:288)
	java.util.concurrent.ConcurrentHashMap$ValuesView.forEach(ConcurrentHashMap.java:4707)
	org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.doCacheEviction(ManagedLedgerFactoryImpl.java:284)
	org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.cacheEvictionTask(ManagedLedgerFactoryImpl.java:269)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.lang.Thread.run(Thread.java:748)
#3:
	io.netty.buffer.AdvancedLeakAwareByteBuf.retainedDuplicate(AdvancedLeakAwareByteBuf.java:100)
	org.apache.bookkeeper.mledger.impl.EntryImpl.create(EntryImpl.java:98)
	org.apache.bookkeeper.mledger.impl.EntryCacheImpl.asyncReadEntry0(EntryCacheImpl.java:280)
	org.apache.bookkeeper.mledger.impl.EntryCacheImpl.asyncReadEntry(EntryCacheImpl.java:249)
	org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncReadEntry(ManagedLedgerImpl.java:1744)
	org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.internalReadFromLedger(ManagedLedgerImpl.java:1716)
	org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncReadEntries(ManagedLedgerImpl.java:1539)
	org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.notifyEntriesAvailable(ManagedCursorImpl.java:2622)
	org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
	org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
	org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203)
	org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.lang.Thread.run(Thread.java:748)
#4:
	io.netty.buffer.AbstractPooledDerivedByteBuf.deallocate(AbstractPooledDerivedByteBuf.java:86)
	io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
	io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
	org.apache.bookkeeper.mledger.impl.EntryImpl.deallocate(EntryImpl.java:164)
	org.apache.bookkeeper.mledger.util.AbstractCASReferenceCounted.release0(AbstractCASReferenceCounted.java:104)
	org.apache.bookkeeper.mledger.util.AbstractCASReferenceCounted.release(AbstractCASReferenceCounted.java:87)
	org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:121)
	org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:531)
	org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readEntriesComplete(PersistentDispatcherMultipleConsumers.java:477)
	org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$checkReadCompletion$2(OpReadEntry.java:150)
	org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
	org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
	org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.lang.Thread.run(Thread.java:748)
#5:
	org.apache.bookkeeper.mledger.impl.OpAddEntry.safeRun(OpAddEntry.java:178)
	org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
	org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.lang.Thread.run(Thread.java:748)
#6:
	org.apache.bookkeeper.mledger.impl.EntryImpl.deallocate(EntryImpl.java:164)
	org.apache.bookkeeper.mledger.util.AbstractCASReferenceCounted.release0(AbstractCASReferenceCounted.java:104)
	org.apache.bookkeeper.mledger.util.AbstractCASReferenceCounted.release(AbstractCASReferenceCounted.java:87)
	org.apache.bookkeeper.mledger.impl.OpAddEntry.safeRun(OpAddEntry.java:174)
	org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
	org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.lang.Thread.run(Thread.java:748)
#7:
	io.netty.buffer.AbstractPooledDerivedByteBuf.deallocate(AbstractPooledDerivedByteBuf.java:86)
	io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
	io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
	org.apache.bookkeeper.util.ByteBufList.deallocate(ByteBufList.java:262)
	io.netty.util.AbstractReferenceCounted.handleRelease(AbstractReferenceCounted.java:86)
	io.netty.util.AbstractReferenceCounted.release(AbstractReferenceCounted.java:76)
	io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88)
	org.apache.bookkeeper.client.PendingAddOp.maybeRecycle(PendingAddOp.java:493)
	org.apache.bookkeeper.client.PendingAddOp.submitCallback(PendingAddOp.java:434)
	org.apache.bookkeeper.client.LedgerHandle.sendAddSuccessCallbacks(LedgerHandle.java:1832)
	org.apache.bookkeeper.client.PendingAddOp.sendAddSuccessCallbacks(PendingAddOp.java:415)
	org.apache.bookkeeper.client.PendingAddOp.writeComplete(PendingAddOp.java:409)
	org.apache.bookkeeper.proto.PerChannelBookieClient$AddCompletion.writeComplete(PerChannelBookieClient.java:2149)
	org.apache.bookkeeper.proto.PerChannelBookieClient$AddCompletion.handleResponse(PerChannelBookieClient.java:2206)
	org.apache.bookkeeper.proto.PerChannelBookieClient$AddCompletion.handleV2Response(PerChannelBookieClient.java:2185)
	org.apache.bookkeeper.proto.PerChannelBookieClient$ReadV2ResponseCallback.safeRun(PerChannelBookieClient.java:1380)
	org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.lang.Thread.run(Thread.java:748)
#8:
	org.apache.bookkeeper.mledger.impl.EntryCacheImpl.insert(EntryCacheImpl.java:113)
	org.apache.bookkeeper.mledger.impl.OpAddEntry.safeRun(OpAddEntry.java:173)
	org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
	org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.lang.Thread.run(Thread.java:748)
#9:
	org.apache.bookkeeper.mledger.impl.EntryImpl.create(EntryImpl.java:88)
	org.apache.bookkeeper.mledger.impl.EntryCacheImpl.insert(EntryCacheImpl.java:112)
	org.apache.bookkeeper.mledger.impl.OpAddEntry.safeRun(OpAddEntry.java:173)
	org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
	org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.lang.Thread.run(Thread.java:748)
#10:
	org.apache.bookkeeper.mledger.impl.EntryCacheImpl.insert(EntryCacheImpl.java:108)
	org.apache.bookkeeper.mledger.impl.OpAddEntry.safeRun(OpAddEntry.java:173)
	org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
	org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.lang.Thread.run(Thread.java:748)
#11:
	org.apache.bookkeeper.mledger.impl.EntryImpl.create(EntryImpl.java:77)
	org.apache.bookkeeper.mledger.impl.OpAddEntry.safeRun(OpAddEntry.java:170)
	org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
	org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.lang.Thread.run(Thread.java:748)
#12:
	io.netty.buffer.AdvancedLeakAwareByteBuf.retainedDuplicate(AdvancedLeakAwareByteBuf.java:100)
	org.apache.bookkeeper.mledger.impl.OpAddEntry.initiate(OpAddEntry.java:105)
	org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.internalAsyncAddEntry(ManagedLedgerImpl.java:688)
	org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.lambda$asyncAddEntry$3(ManagedLedgerImpl.java:621)
	org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
	org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
	org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.lang.Thread.run(Thread.java:748)
#13:
	org.apache.bookkeeper.mledger.impl.OpAddEntry.create(OpAddEntry.java:79)
	org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncAddEntry(ManagedLedgerImpl.java:618)
	org.apache.pulsar.broker.service.persistent.PersistentTopic.publishMessage(PersistentTopic.java:361)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.writeMarker(ReplicatedSubscriptionsController.java:231)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:86)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.lang.Thread.run(Thread.java:748)
#14:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeBytes(AdvancedLeakAwareByteBuf.java:592)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1986)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.lang.Thread.run(Thread.java:748)
#15:
	io.netty.buffer.AdvancedLeakAwareByteBuf.setInt(AdvancedLeakAwareByteBuf.java:304)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1983)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.lang.Thread.run(Thread.java:748)
#16:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeByte(AdvancedLeakAwareByteBuf.java:544)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawByte(ByteBufCodedOutputStream.java:85)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawVarint32(ByteBufCodedOutputStream.java:94)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeInt32NoTag(ByteBufCodedOutputStream.java:220)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeInt32(ByteBufCodedOutputStream.java:111)
	org.apache.pulsar.common.api.proto.PulsarApi$MessageMetadata.writeTo(PulsarApi.java:4393)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1969)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.lang.Thread.run(Thread.java:748)
#17:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeByte(AdvancedLeakAwareByteBuf.java:544)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawByte(ByteBufCodedOutputStream.java:85)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawVarint32(ByteBufCodedOutputStream.java:94)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeTag(ByteBufCodedOutputStream.java:105)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeInt32(ByteBufCodedOutputStream.java:110)
	org.apache.pulsar.common.api.proto.PulsarApi$MessageMetadata.writeTo(PulsarApi.java:4393)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1969)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.lang.Thread.run(Thread.java:748)
#18:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeByte(AdvancedLeakAwareByteBuf.java:544)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawByte(ByteBufCodedOutputStream.java:85)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawVarint32(ByteBufCodedOutputStream.java:97)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeTag(ByteBufCodedOutputStream.java:105)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeInt32(ByteBufCodedOutputStream.java:110)
	org.apache.pulsar.common.api.proto.PulsarApi$MessageMetadata.writeTo(PulsarApi.java:4393)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1969)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.lang.Thread.run(Thread.java:748)
#19:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeByte(AdvancedLeakAwareByteBuf.java:544)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawByte(ByteBufCodedOutputStream.java:85)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawVarint64(ByteBufCodedOutputStream.java:149)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeUInt64NoTag(ByteBufCodedOutputStream.java:142)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeUInt64(ByteBufCodedOutputStream.java:121)
	org.apache.pulsar.common.api.proto.PulsarApi$MessageMetadata.writeTo(PulsarApi.java:4345)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1969)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.lang.Thread.run(Thread.java:748)
#20:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeByte(AdvancedLeakAwareByteBuf.java:544)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawByte(ByteBufCodedOutputStream.java:85)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawVarint64(ByteBufCodedOutputStream.java:152)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeUInt64NoTag(ByteBufCodedOutputStream.java:142)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeUInt64(ByteBufCodedOutputStream.java:121)
	org.apache.pulsar.common.api.proto.PulsarApi$MessageMetadata.writeTo(PulsarApi.java:4345)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1969)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.lang.Thread.run(Thread.java:748)
#21:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeByte(AdvancedLeakAwareByteBuf.java:544)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawByte(ByteBufCodedOutputStream.java:85)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawVarint32(ByteBufCodedOutputStream.java:94)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeTag(ByteBufCodedOutputStream.java:105)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeUInt64(ByteBufCodedOutputStream.java:120)
	org.apache.pulsar.common.api.proto.PulsarApi$MessageMetadata.writeTo(PulsarApi.java:4345)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1969)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.lang.Thread.run(Thread.java:748)
#22:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeByte(AdvancedLeakAwareByteBuf.java:544)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawByte(ByteBufCodedOutputStream.java:85)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawVarint64(ByteBufCodedOutputStream.java:149)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeUInt64NoTag(ByteBufCodedOutputStream.java:142)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeUInt64(ByteBufCodedOutputStream.java:121)
	org.apache.pulsar.common.api.proto.PulsarApi$MessageMetadata.writeTo(PulsarApi.java:4342)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1969)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.lang.Thread.run(Thread.java:748)
#23:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeByte(AdvancedLeakAwareByteBuf.java:544)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawByte(ByteBufCodedOutputStream.java:85)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawVarint32(ByteBufCodedOutputStream.java:94)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeTag(ByteBufCodedOutputStream.java:105)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeUInt64(ByteBufCodedOutputStream.java:120)
	org.apache.pulsar.common.api.proto.PulsarApi$MessageMetadata.writeTo(PulsarApi.java:4342)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1969)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.lang.Thread.run(Thread.java:748)
#24:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeBytes(AdvancedLeakAwareByteBuf.java:616)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawBytes(ByteBufCodedOutputStream.java:182)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeBytesNoTag(ByteBufCodedOutputStream.java:167)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeBytes(ByteBufCodedOutputStream.java:161)
	org.apache.pulsar.common.api.proto.PulsarApi$MessageMetadata.writeTo(PulsarApi.java:4339)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1969)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.lang.Thread.run(Thread.java:748)
#25:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeByte(AdvancedLeakAwareByteBuf.java:544)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawByte(ByteBufCodedOutputStream.java:85)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawVarint32(ByteBufCodedOutputStream.java:94)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeBytesNoTag(ByteBufCodedOutputStream.java:166)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeBytes(ByteBufCodedOutputStream.java:161)
	org.apache.pulsar.common.api.proto.PulsarApi$MessageMetadata.writeTo(PulsarApi.java:4339)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1969)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.lang.Thread.run(Thread.java:748)
#26:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeByte(AdvancedLeakAwareByteBuf.java:544)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawByte(ByteBufCodedOutputStream.java:85)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawVarint32(ByteBufCodedOutputStream.java:94)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeTag(ByteBufCodedOutputStream.java:105)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeBytes(ByteBufCodedOutputStream.java:160)
	org.apache.pulsar.common.api.proto.PulsarApi$MessageMetadata.writeTo(PulsarApi.java:4339)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1969)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.lang.Thread.run(Thread.java:748)
#27:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeInt(AdvancedLeakAwareByteBuf.java:562)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1968)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.lang.Thread.run(Thread.java:748)
#28:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeShort(AdvancedLeakAwareByteBuf.java:550)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1961)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.lang.Thread.run(Thread.java:748)
Created at:
	io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:385)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
	org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:164)
	org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.buffer(ByteBufAllocatorImpl.java:135)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1955)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.lang.Thread.run(Thread.java:748)
: 5 leak records were discarded because they were duplicates

Not mean to pollute this thread but this may be related to #11396

@YanshuoH
Copy link
Author

Sorry.

I can confirm that after upgrading to 2.8.1, the memory leak still exists.

Screen Shot 2021-11-19 at 9 17 27 AM

@hangc0276 hangc0276 self-assigned this Nov 19, 2021
@hangc0276
Copy link
Contributor

@yonyong @hangc0276 Please help check the issue.

I will check this issue.

@hangc0276
Copy link
Contributor

@YanshuoH Thanks for you report. Would you please paste your configuration for broker and bookkeeper?

@YanshuoH
Copy link
Author

Sure.

A bit strange (annoying) thing is that I've enabled lead detection up to paranoid, but didn't find any leak report.

Broker Conf
apiVersion: v1
kind: ConfigMap
metadata:
  name: pulsar-broker
  namespace: pulsar
  labels:
    app: pulsar
    release: 2.6.1
    cluster: 2.6.1-pulsar
    component: broker
data:
  # Metadata settings
  zookeeperServers: "zookeeper.pulsar:2181"
  configurationStoreServers: "zookeeper.pulsar:2181"

  # Broker settings
  clusterName: 2.6.1-pulsar
  exposeTopicLevelMetricsInPrometheus: "false"
  exposeConsumerLevelMetricsInPrometheus: "false"
  numHttpServerThreads: "8"
  zooKeeperSessionTimeoutMillis: "30000"
  statusFilePath: "/pulsar/status"
  defaultRetentionTimeInMinutes: "10080" # 7 days
  defaultRetentionSizeInMB: "1000" # 5Gi
  brokerDeleteInactiveTopicsEnabled: "false"
  brokerDeduplicationEnabled: "false"
  subscriptionExpirationTimeMinutes: "60"

  # topic auto creation: this is crucial for geo-replication.
  # partitioning must be the same between clusters.
  allowAutoTopicCreation: "false"
  allowAutoTopicCreationType: "partitioned"
  defaultNumPartitions: "1"

  # bundles
  defaultNumberOfNamespaceBundles: "8"
  # disable bundle split to avoid trembling during peak time
  loadBalancerAutoBundleSplitEnabled: "false"
  loadBalancerAutoUnloadSplitBundlesEnabled: "true"
  loadBalancerNamespaceBundleMaxTopics: "1000"
  loadBalancerNamespaceBundleMaxSessions: "1000"
  loadBalancerNamespaceBundleMaxMsgRate: "20000"
  loadBalancerNamespaceMaximumBundles: "128"

  # throttle
  # Max memory size for broker handling messages sending from producers.
  # If the processing message size exceed this value, broker will stop read data
  # from the connection. The processing messages means messages are sends to broker
  # but broker have not send response to client, usually waiting to write to bookies.
  # It's shared across all the topics running in the same broker.
  # Use -1 to disable the memory limitation. Default is 1/2 of direct memory.
  maxMessagePublishBufferSizeInMB: ""
  # Interval between checks to see if message publish buffer size is exceed the max message publish buffer size
  # Use 0 or negative number to disable the max publish buffer limiting.
  messagePublishBufferCheckIntervalInMillis: "100"

  # shedding
  loadBalancerSheddingEnabled: "true"
  loadBalancerLoadSheddingStrategy: "org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder"
  loadBalancerSheddingIntervalMinutes: "1"
  loadBalancerSheddingGracePeriodMinutes: "10"
  loadBalancerBrokerOverloadedThresholdPercentage: "50"
  # The broker resource usage threshold.
  # When the broker resource usage is greater than the pulsar cluster average resource usage,
  # the threshold shredder will be triggered to offload bundles from the broker.
  # It only takes effect in ThresholdSheddler strategy.
  loadBalancerBrokerThresholdShedderPercentage: "10"
  # When calculating new resource usage, the history usage accounts for.
  # It only takes effect in ThresholdSheddler strategy.
  loadBalancerHistoryResourcePercentage: "0.9"
  # The BandWithIn usage weight when calculating new resource usage.
  # It only takes effect in ThresholdShedder strategy.
  loadBalancerBandwithInResourceWeight: "1.0"
  # The BandWithOut usage weight when calculating new resource usage.
  # It only takes effect in ThresholdShedder strategy.
  loadBalancerBandwithOutResourceWeight: "1.0"
  # The CPU usage weight when calculating new resource usage.
  # It only takes effect in ThresholdShedder strategy.
  loadBalancerCPUResourceWeight: "1.0"
  # The heap memory usage weight when calculating new resource usage.
  # It only takes effect in ThresholdShedder strategy.
  loadBalancerMemoryResourceWeight: "1.0"
  # The direct memory usage weight when calculating new resource usage.
  # It only takes effect in ThresholdShedder strategy.
  loadBalancerDirectMemoryResourceWeight: "1.0"
  # Bundle unload minimum throughput threshold (MB), avoiding bundle unload frequently.
  # It only takes effect in ThresholdShedder strategy.
  loadBalancerBundleUnloadMinThroughputThreshold: "10"

  # Consumer
  # Precise dispatcher flow control according to history message number of each entry
  preciseDispatcherFlowControl: "true"

  # Authorization & Authentication
  authenticationEnabled: "true"
  authenticationProviders: "org.apache.pulsar.broker.authentication.AuthenticationProviderToken"
  authorizationEnabled: "true"
  authorizationProvider: "org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider"
  superUserRoles: "admin,ops,pulsar"
  proxyRoles: "pulsar-proxy"
  # This is not very important. Traffic will come from pulsar-proxy.
  # Enable this allows us to enable rbac on authorization.
  tokenSecretKey: "data:;base64,redacted" # redacted
  brokerClientAuthenticationPlugin: "org.apache.pulsar.client.impl.auth.AuthenticationToken"
  brokerClientAuthenticationParameters: "token:redacted"

  # Ledger offload
  managedLedgerOffloadDriver: "aws-s3"
  # this is the actual bucket
  s3ManagedLedgerOffloadServiceEndpoint: "https://pulsar-cluster.s3.cn-northwest-1.amazonaws.com.cn"
  # this is the directory
  s3ManagedLedgerOffloadBucket: "pulsar-primary"
  s3ManagedLedgerOffloadRegion: "cn-northwest-1"
  #  s3ManagedLedgerOffloadReadBufferSizeInBytes: "1000000" # 1 Mb by default
  #  s3ManagedLedgerOffloadMaxBlockSizeInBytes: "64000000" # 64 Mb by default
  # currently the WebIdentity is not working
  #  s3ManagedLedgerOffloadRole: "arn:aws-cn:iam::651844176281:role/pulsar-broker-sa"
  #  s3ManagedLedgerOffloadRoleSessionName: "pulsar-s3-offload"
  managedLedgerOffloadAutoTriggerSizeThresholdBytes: "10000000" # 10 Mi
  #  managedLedgerOffloadAutoTriggerSizeThresholdBytes: "1000000000" # 1 Gi

  # Function Worker Settings
  # function worker configuration
  functionsWorkerEnabled: "false"
  PF_functionRuntimeFactoryClassName: "org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory"
  PF_pulsarFunctionsCluster: 2.6.1-pulsar
  PF_connectorsDirectory: ./connectors
  PF_containerFactory: k8s
  PF_numFunctionPackageReplicas: "2"
  # support version >= 2.5.0
  PF_functionRuntimeFactoryConfigs_pulsarRootDir: /pulsar
  PF_kubernetesContainerFactory_pulsarRootDir: /pulsar
  PF_functionRuntimeFactoryConfigs_pulsarDockerImageName: "651844176281.dkr.ecr.cn-northwest-1.amazonaws.com.cn/apachepulsar/pulsar:2.8.1"
  PF_functionRuntimeFactoryConfigs_submittingInsidePod: "true"
  PF_functionRuntimeFactoryConfigs_installUserCodeDependencies: "true"
  PF_functionRuntimeFactoryConfigs_jobNamespace: pulsar
  PF_functionRuntimeFactoryConfigs_expectedMetricsCollectionInterval: "30"
  PF_functionRuntimeFactoryConfigs_pulsarAdminUrl: "http://pulsar-broker:8080/"
  PF_functionRuntimeFactoryConfigs_pulsarServiceUrl: "pulsar://pulsar-broker:6650/"
  PF_functionRuntimeFactoryConfigs_changeConfigMap: "pulsar-functions-worker"
  PF_functionRuntimeFactoryConfigs_changeConfigMapNamespace: pulsar
#  # support version < 2.5.0
  PF_kubernetesContainerFactory_pulsarDockerImageName: "651844176281.dkr.ecr.cn-northwest-1.amazonaws.com.cn/apachepulsar/pulsar:2.8.1"
  PF_kubernetesContainerFactory_submittingInsidePod: "true"
  PF_kubernetesContainerFactory_installUserCodeDependencies: "true"
  PF_kubernetesContainerFactory_jobNamespace: pulsar
  PF_kubernetesContainerFactory_expectedMetricsCollectionInterval: "30"
  PF_kubernetesContainerFactory_pulsarAdminUrl: "http://pulsar-broker:8080/"
  PF_kubernetesContainerFactory_pulsarServiceUrl: "pulsar://pulsar-broker:6650/"
  PF_kubernetesContainerFactory_changeConfigMap: "pulsar-functions-worker"
  PF_kubernetesContainerFactory_changeConfigMapNamespace: pulsar

  # prometheus needs to access /metrics endpoint
  webServicePort: "8080"
  brokerServicePort: "6650"

  # Java Env
  PULSAR_GC: >
    -XX:+UseG1GC
    -XX:MaxGCPauseMillis=10
    -XX:+ParallelRefProcEnabled
    -XX:+UnlockExperimentalVMOptions
    -XX:+DoEscapeAnalysis
    -XX:ParallelGCThreads=4
    -XX:ConcGCThreads=4
    -XX:G1NewSizePercent=50
    -XX:+DisableExplicitGC
    -XX:-ResizePLAB
    -XX:+ExitOnOutOfMemoryError
    -XX:+PerfDisableSharedMem
  PULSAR_MEM: |
    -Xms512m -Xmx2048m -XX:MaxDirectMemorySize=3072m
  PULSAR_EXTRA_OPTS: >
    -Dio.netty.leakDetectionLevel=paranoid
    -Dio.netty.recycler.linkCapacity=1024
    -Dio.netty.leakDetection.targetRecords=100
    -XX:+HeapDumpOnOutOfMemoryError
    -XX:NativeMemoryTracking=detail
    -Dpulsar.allocator.leak_detection=Paranoid
    -XX:+UnlockDiagnosticVMOptions

  # Ledger Quorum
  managedLedgerDefaultAckQuorum: "2"
  managedLedgerDefaultEnsembleSize: "2"
  managedLedgerDefaultWriteQuorum: "2"

  # quotas
  backlogQuotaCheckEnabled: "false"
  backlogQuotaDefaultRetentionPolicy: "consumer_backlog_eviction"

Bookie Conf
apiVersion: v1
kind: ConfigMap
metadata:
  name: bookie
  namespace: pulsar
  labels:
    app: pulsar
    release: 2.6.3
    component: bookie
    cluster: 2.6.1-pulsar
data:
  # common config
  zkServers: "zookeeper.pulsar:2181"
  zkLedgersRootPath: "/ledgers"
  # enable bookkeeper http server
  httpServerEnabled: "true"
  httpServerPort: "8000"
  # config the stats provider
  statsProviderClass: org.apache.bookkeeper.stats.prometheus.PrometheusMetricsProvider
  # use hostname as the bookie id
  useHostNameAsBookieID: "true"
  # disable auto recovery on bookies since we will start AutoRecovery in separated pods
  autoRecoveryDaemonEnabled: "false"
  # Do not retain journal files as it increase the disk utilization
  journalMaxBackups: "0"
  journalDirectories: "/pulsar/data/bookkeeper/journal"
  PULSAR_PREFIX_journalDirectories: "/pulsar/data/bookkeeper/journal"
  ledgerDirectories: "/pulsar/data/bookkeeper/ledgers"
  allowStorageExpansion: "true"

  BOOKIE_GC: >
    -XX:+UseG1GC
    -XX:MaxGCPauseMillis=10
    -XX:+ParallelRefProcEnabled
    -XX:+UnlockExperimentalVMOptions
    -XX:+DoEscapeAnalysis
    -XX:ParallelGCThreads=4
    -XX:ConcGCThreads=4
    -XX:G1NewSizePercent=50
    -XX:+DisableExplicitGC
    -XX:-ResizePLAB
    -XX:+ExitOnOutOfMemoryError
    -XX:+PerfDisableSharedMem
    -XX:+PrintGCDetails
    -verbosegc
    -Xlog:gc:/var/log/bookie-gc.log
  BOOKIE_MEM: >
    -Xms512m -Xmx2048m -XX:MaxDirectMemorySize=1024m

  # db storage
  # Used to control the maximum entry read-ahead cache size.
  # default: 25% of the total direct memory
  dbStorage_readAheadCacheMaxSizeMb: ""
  # Used to specify the size of the RocksDB block-cache.
  # default: 10% of the total direct memory size
  dbStorage_rocksDB_blockCacheSize: ""
  dbStorage_rocksDB_writeBufferSizeMB: "64"
  # Used to specify the max size of the write cache (in MB)
  # default: 25% of the total direct memory
  dbStorage_writeCacheMaxSizeMb: ""

@hangc0276
Copy link
Contributor

Ok, I will take a look.

@YanshuoH
Copy link
Author

Additional remarks.

I've

  • set-subscription-expiration-time for all namespaces
  • clear-backlog for inactive topics (no producer & no consumer)

Screen Shot 2021-11-19 at 3 19 13 PM

Now the memory usage seems stabilized (though load balance looks un-balanced but it should be unrelated to the leak).

@xuesongxs
Copy link
Contributor

In pulsar v2.8.1, broker oom also appears.

@xuesongxs
Copy link
Contributor

@yonyong @hangc0276 Please help check the issue.

I will check this issue.

Is there any progress on this bug? I also encountered broker oom in pulsar v2.8.1.

@YanshuoH
Copy link
Author

After upgraded to v2.8.2, the OOM issue disappears.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

No branches or pull requests

4 participants