-
Notifications
You must be signed in to change notification settings - Fork 3.5k
/
NegativeAcksTracker.java
96 lines (80 loc) · 3.83 KB
/
NegativeAcksTracker.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import static org.apache.pulsar.client.impl.UnAckedMessageTracker.addChunkedMessageIdsAndRemoveFromSequenceMap;
class NegativeAcksTracker {
private HashMap<MessageId, Long> nackedMessages = null;
private final ConsumerBase<?> consumer;
private final Timer timer;
private final long nackDelayNanos;
private final long timerIntervalNanos;
private Timeout timeout;
// Set a min delay to allow for grouping nacks within a single batch
private static final long MIN_NACK_DELAY_NANOS = TimeUnit.MILLISECONDS.toNanos(100);
public NegativeAcksTracker(ConsumerBase<?> consumer, ConsumerConfigurationData<?> conf) {
this.consumer = consumer;
this.timer = consumer.getClient().timer();
this.nackDelayNanos = Math.max(TimeUnit.MICROSECONDS.toNanos(conf.getNegativeAckRedeliveryDelayMicros()),
MIN_NACK_DELAY_NANOS);
this.timerIntervalNanos = nackDelayNanos / 3;
}
private synchronized void triggerRedelivery(Timeout t) {
if (nackedMessages.isEmpty()) {
this.timeout = null;
return;
}
// Group all the nacked messages into one single re-delivery request
Set<MessageId> messagesToRedeliver = new HashSet<>();
long now = System.nanoTime();
nackedMessages.forEach((msgId, timestamp) -> {
if (timestamp < now) {
addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer);
messagesToRedeliver.add(msgId);
}
});
messagesToRedeliver.forEach(nackedMessages::remove);
consumer.onNegativeAcksSend(messagesToRedeliver);
consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS);
}
public synchronized void add(MessageId messageId) {
if (messageId instanceof BatchMessageIdImpl) {
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
messageId = new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(),
batchMessageId.getPartitionIndex());
}
if (nackedMessages == null) {
nackedMessages = new HashMap<>();
}
nackedMessages.put(messageId, System.nanoTime() + nackDelayNanos);
if (this.timeout == null) {
// Schedule a task and group all the redeliveries for same period. Leave a small buffer to allow for
// nack immediately following the current one will be batched into the same redeliver request.
this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS);
}
}
}