Skip to content

Commit

Permalink
[fix] [ml] Fix the incorrect total size if use ML interceptor (#19404)
Browse files Browse the repository at this point in the history
(cherry picked from commit 33f40f6)
  • Loading branch information
poorbarcode authored and codelipenghui committed Feb 7, 2023
1 parent d4f8c40 commit a0a9eb5
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private final CallbackMutex offloadMutex = new CallbackMutex();
private static final CompletableFuture<PositionImpl> NULL_OFFLOAD_PROMISE = CompletableFuture
.completedFuture(PositionImpl.latest);
private volatile LedgerHandle currentLedger;
protected volatile LedgerHandle currentLedger;
private long currentLedgerEntries = 0;
private long currentLedgerSize = 0;
protected long currentLedgerSize = 0;
private long lastLedgerCreatedTimestamp = 0;
private long lastLedgerCreationFailureTimestamp = 0;
private long lastLedgerCreationInitiationTimestamp = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,22 @@ public void setCloseWhenDone(boolean closeWhenDone) {

public void initiate() {
if (STATE_UPDATER.compareAndSet(OpAddEntry.this, State.OPEN, State.INITIATED)) {

ByteBuf duplicateBuffer = data.retainedDuplicate();

// internally asyncAddEntry() will take the ownership of the buffer and release it at the end
addOpCount = ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml);
lastInitTime = System.nanoTime();
if (ml.getManagedLedgerInterceptor() != null) {
payloadProcessorHandle = ml.getManagedLedgerInterceptor().processPayloadBeforeLedgerWrite(this, duplicateBuffer);
long originalDataLen = data.readableBytes();
payloadProcessorHandle = ml.getManagedLedgerInterceptor().processPayloadBeforeLedgerWrite(this,
duplicateBuffer);
if (payloadProcessorHandle != null) {
duplicateBuffer = payloadProcessorHandle.getProcessedPayload();
// If data len of entry changes, correct "dataLength" and "currentLedgerSize".
if (originalDataLen != duplicateBuffer.readableBytes()) {
this.dataLength = duplicateBuffer.readableBytes();
this.ml.currentLedgerSize += (dataLength - originalDataLen);
}
}
}
ledger.asyncAddEntry(duplicateBuffer, this, addOpCount);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl;

import static org.testng.Assert.assertEquals;
import static org.apache.pulsar.broker.intercept.MangedLedgerInterceptorImplTest.TestPayloadProcessor;
import java.util.HashSet;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
import org.apache.pulsar.broker.intercept.MangedLedgerInterceptorImplTest;
import org.apache.pulsar.common.intercept.ManagedLedgerPayloadProcessor;
import org.awaitility.Awaitility;
import org.testng.annotations.Test;

/***
* Differ to {@link MangedLedgerInterceptorImplTest}, this test can call {@link ManagedLedgerImpl}'s methods modified
* by "default".
*/
@Slf4j
@Test(groups = "broker")
public class MangedLedgerInterceptorImplTest2 extends MockedBookKeeperTestCase {

private void switchLedgerManually(ManagedLedgerImpl ledger){
LedgerHandle originalLedgerHandle = ledger.currentLedger;
ledger.ledgerClosed(ledger.currentLedger);
ledger.createLedgerAfterClosed();
Awaitility.await().until(() -> {
return ledger.state == ManagedLedgerImpl.State.LedgerOpened && ledger.currentLedger != originalLedgerHandle;
});
}

@Test
public void testCurrentLedgerSizeCorrectIfHasInterceptor() throws Exception {
final String mlName = "ml1";
final String cursorName = "cursor1";

// Registry interceptor.
ManagedLedgerConfig config = new ManagedLedgerConfig();
Set<ManagedLedgerPayloadProcessor> processors = new HashSet();
processors.add(new TestPayloadProcessor());
ManagedLedgerInterceptor interceptor = new ManagedLedgerInterceptorImpl(new HashSet(), processors);
config.setManagedLedgerInterceptor(interceptor);
config.setMaxEntriesPerLedger(100);

// Add one entry.
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(mlName, config);
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(cursorName);
ledger.addEntry(new byte[1]);

// Mark "currentLedgerSize" and switch ledger.
long currentLedgerSize = ledger.getCurrentLedgerSize();
switchLedgerManually(ledger);

// verify.
assertEquals(currentLedgerSize, MangedLedgerInterceptorImplTest.calculatePreciseSize(ledger));

// cleanup.
cursor.close();
ledger.close();
factory.getEntryCacheManager().clear();
factory.shutdown();
config.setManagedLedgerInterceptor(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
Expand All @@ -31,6 +32,7 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.OpAddEntry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
Expand Down Expand Up @@ -59,7 +61,7 @@
public class MangedLedgerInterceptorImplTest extends MockedBookKeeperTestCase {
private static final Logger log = LoggerFactory.getLogger(MangedLedgerInterceptorImplTest.class);

public class TestPayloadProcessor implements ManagedLedgerPayloadProcessor {
public static class TestPayloadProcessor implements ManagedLedgerPayloadProcessor {
@Override
public Processor inputProcessor() {
return new Processor() {
Expand Down Expand Up @@ -157,6 +159,47 @@ public void testMessagePayloadProcessor() throws Exception {
config.setManagedLedgerInterceptor(null);
}

@Test
public void testTotalSizeCorrectIfHasInterceptor() throws Exception {
final String mlName = "ml1";
final String cursorName = "cursor1";

// Registry interceptor.
ManagedLedgerConfig config = new ManagedLedgerConfig();
Set<ManagedLedgerPayloadProcessor> processors = new HashSet();
processors.add(new TestPayloadProcessor());
ManagedLedgerInterceptor interceptor = new ManagedLedgerInterceptorImpl(new HashSet(), processors);
config.setManagedLedgerInterceptor(interceptor);
config.setMaxEntriesPerLedger(2);

// Add many entries and consume.
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(mlName, config);
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(cursorName);
for (int i = 0; i < 5; i++){
cursor.delete(ledger.addEntry(new byte[1]));
}

// Trim ledgers.
CompletableFuture<Void> trimLedgerFuture = new CompletableFuture<>();
ledger.trimConsumedLedgersInBackground(trimLedgerFuture);
trimLedgerFuture.join();

// verify.
assertEquals(ledger.getTotalSize(), calculatePreciseSize(ledger));

// cleanup.
cursor.close();
ledger.close();
factory.getEntryCacheManager().clear();
factory.shutdown();
config.setManagedLedgerInterceptor(null);
}

public static long calculatePreciseSize(ManagedLedgerImpl ledger){
return ledger.getLedgersInfo().values().stream()
.map(info -> info.getSize()).reduce((l1,l2) -> l1 + l2).orElse(0L) + ledger.getCurrentLedgerSize();
}

@Test(timeOut = 20000)
public void testRecoveryIndex() throws Exception {
final int MOCK_BATCH_SIZE = 2;
Expand Down

0 comments on commit a0a9eb5

Please sign in to comment.