-
Notifications
You must be signed in to change notification settings - Fork 3.5k
/
NonPersistentDispatcherSingleActiveConsumer.java
113 lines (100 loc) · 4.38 KB
/
NonPersistentDispatcherSingleActiveConsumer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.nonpersistent;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.stats.Rate;
@Slf4j
public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractDispatcherSingleActiveConsumer
implements NonPersistentDispatcher {
private final NonPersistentTopic topic;
private final Rate msgDrop;
private final Subscription subscription;
private final RedeliveryTracker redeliveryTracker;
public NonPersistentDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
NonPersistentTopic topic, Subscription subscription) {
super(subscriptionType, partitionIndex, topic.getName(), subscription,
topic.getBrokerService().pulsar().getConfiguration());
this.topic = topic;
this.subscription = subscription;
this.msgDrop = new Rate();
this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
}
@Override
public void sendMessages(List<Entry> entries) {
Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
if (currentConsumer != null && currentConsumer.getAvailablePermits() > 0 && currentConsumer.isWritable()) {
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null, false);
currentConsumer.sendMessages(entries, batchSizes, null, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker());
} else {
entries.forEach(entry -> {
int totalMsgs = Commands.getNumberOfMessagesInBatch(entry.getDataBuffer(), subscription.toString(), -1);
if (totalMsgs > 0) {
msgDrop.recordEvent(totalMsgs);
}
entry.release();
});
}
}
@Override
protected boolean isConsumersExceededOnSubscription() {
return isConsumersExceededOnSubscription(topic.getBrokerService(), topic.getName(), consumers.size());
}
@Override
public Rate getMessageDropRate() {
return msgDrop;
}
@Override
public boolean hasPermits() {
return ACTIVE_CONSUMER_UPDATER.get(this) != null && ACTIVE_CONSUMER_UPDATER.get(this).getAvailablePermits() > 0;
}
@Override
public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
// No-op
}
@Override
public RedeliveryTracker getRedeliveryTracker() {
return redeliveryTracker;
}
@Override
protected void scheduleReadOnActiveConsumer() {
// No-op
}
@Override
protected void readMoreEntries(Consumer consumer) {
// No-op
}
@Override
protected void cancelPendingRead() {
// No-op
}
}