diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java index d2109eaac9dcd..02ac4e97becd5 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java @@ -21,6 +21,7 @@ import lombok.Data; import org.apache.bookkeeper.common.annotation.InterfaceAudience; import org.apache.bookkeeper.common.annotation.InterfaceStability; +import org.apache.pulsar.common.api.proto.CompressionType; /** * Configuration for a {@link ManagedLedgerFactory}. @@ -75,4 +76,9 @@ public class ManagedLedgerFactoryConfig { * cluster name for prometheus stats */ private String clusterName; + + /** + * ManagedLedgerInfo compression type. If the compression type is null or invalid, don't compress data. + */ + private String managedLedgerInfoCompressionType = CompressionType.NONE.name(); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index b135577cceb44..16d577ab5232a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -172,7 +172,7 @@ private ManagedLedgerFactoryImpl(MetadataStore metadataStore, this.bookkeeperFactory = bookKeeperGroupFactory; this.isBookkeeperManaged = isBookkeeperManaged; this.metadataStore = metadataStore; - this.store = new MetaStoreImpl(metadataStore, scheduledExecutor); + this.store = new MetaStoreImpl(metadataStore, scheduledExecutor, config.getManagedLedgerInfoCompressionType()); this.config = config; this.mbean = new ManagedLedgerFactoryMBeanImpl(this); this.entryCacheManager = new EntryCacheManager(this); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java index caa21a149c764..cfe3aaf6e67d8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java @@ -25,15 +25,23 @@ import java.util.Optional; import java.util.concurrent.CompletionException; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException; import org.apache.bookkeeper.mledger.ManagedLedgerException.MetadataNotFoundException; +import org.apache.bookkeeper.mledger.proto.MLDataFormats; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.CompressionType; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; import org.apache.bookkeeper.util.SafeRunnable; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; +import org.apache.pulsar.common.compression.CompressionCodec; +import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.Stat; @@ -47,9 +55,30 @@ public class MetaStoreImpl implements MetaStore { private final MetadataStore store; private final OrderedExecutor executor; + private static final int MAGIC_MANAGED_LEDGER_INFO_METADATA = 0x4778; // 0100 0111 0111 1000 + private final CompressionType compressionType; + public MetaStoreImpl(MetadataStore store, OrderedExecutor executor) { this.store = store; this.executor = executor; + this.compressionType = CompressionType.NONE; + } + + public MetaStoreImpl(MetadataStore store, OrderedExecutor executor, String compressionType) { + this.store = store; + this.executor = executor; + CompressionType finalCompressionType; + if (compressionType != null) { + try { + finalCompressionType = CompressionType.valueOf(compressionType); + } catch (Exception e) { + log.error("Failed to get compression type {} error msg: {}.", compressionType, e.getMessage()); + throw e; + } + } else { + finalCompressionType = CompressionType.NONE; + } + this.compressionType = finalCompressionType; } @Override @@ -62,7 +91,7 @@ public void getManagedLedgerInfo(String ledgerName, boolean createIfMissing, if (optResult.isPresent()) { ManagedLedgerInfo info; try { - info = ManagedLedgerInfo.parseFrom(optResult.get().getValue()); + info = parseManagedLedgerInfo(optResult.get().getValue()); info = updateMLInfoTimestamp(info); callback.operationComplete(info, optResult.get().getStat()); } catch (InvalidProtocolBufferException e) { @@ -101,9 +130,8 @@ public void asyncUpdateLedgerIds(String ledgerName, ManagedLedgerInfo mlInfo, St log.debug("[{}] Updating metadata version={} with content={}", ledgerName, stat, mlInfo); } - byte[] serializedMlInfo = mlInfo.toByteArray(); // Binary format String path = PREFIX + ledgerName; - store.put(path, serializedMlInfo, Optional.of(stat.getVersion())) + store.put(path, compressLedgerInfo(mlInfo), Optional.of(stat.getVersion())) .thenAcceptAsync(newVersion -> callback.operationComplete(null, newVersion), executor.chooseThread(ledgerName)) .exceptionally(ex -> { executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback.operationFailed(getException(ex)))); @@ -264,4 +292,88 @@ private static MetaStoreException getException(Throwable t) { return new MetaStoreException(t); } } + + /** + * Compress ManagedLedgerInfo data. + * + * compression data structure + * [MAGIC_NUMBER](2) + [METADATA_SIZE](4) + [METADATA_PAYLOAD] + [MANAGED_LEDGER_INFO_PAYLOAD] + */ + public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) { + if (compressionType == null || compressionType.equals(CompressionType.NONE)) { + return managedLedgerInfo.toByteArray(); + } + ByteBuf metadataByteBuf = null; + ByteBuf encodeByteBuf = null; + try { + MLDataFormats.ManagedLedgerInfoMetadata mlInfoMetadata = MLDataFormats.ManagedLedgerInfoMetadata + .newBuilder() + .setCompressionType(compressionType) + .setUncompressedSize(managedLedgerInfo.getSerializedSize()) + .build(); + metadataByteBuf = PulsarByteBufAllocator.DEFAULT.buffer( + mlInfoMetadata.getSerializedSize() + 6, mlInfoMetadata.getSerializedSize() + 6); + metadataByteBuf.writeShort(MAGIC_MANAGED_LEDGER_INFO_METADATA); + metadataByteBuf.writeInt(mlInfoMetadata.getSerializedSize()); + metadataByteBuf.writeBytes(mlInfoMetadata.toByteArray()); + + encodeByteBuf = getCompressionCodec().encode(Unpooled.wrappedBuffer(managedLedgerInfo.toByteArray())); + + CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer(); + compositeByteBuf.addComponent(true, metadataByteBuf); + compositeByteBuf.addComponent(true, encodeByteBuf); + byte[] dataBytes = new byte[compositeByteBuf.readableBytes()]; + compositeByteBuf.readBytes(dataBytes); + return dataBytes; + } finally { + if (metadataByteBuf != null) { + metadataByteBuf.release(); + } + if (encodeByteBuf != null) { + encodeByteBuf.release(); + } + } + } + + public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProtocolBufferException { + ByteBuf byteBuf = Unpooled.wrappedBuffer(data); + if (byteBuf.readableBytes() > 0 && byteBuf.readShort() == MAGIC_MANAGED_LEDGER_INFO_METADATA) { + ByteBuf decodeByteBuf = null; + try { + int metadataSize = byteBuf.readInt(); + byte[] metadataBytes = new byte[metadataSize]; + byteBuf.readBytes(metadataBytes); + MLDataFormats.ManagedLedgerInfoMetadata metadata = + MLDataFormats.ManagedLedgerInfoMetadata.parseFrom(metadataBytes); + + long unpressedSize = metadata.getUncompressedSize(); + decodeByteBuf = getCompressionCodec().decode(byteBuf, (int) unpressedSize); + byte[] decodeBytes; + // couldn't decode data by ZLIB compression byteBuf array() directly + if (decodeByteBuf.hasArray() && !CompressionType.ZLIB.equals(metadata.getCompressionType())) { + decodeBytes = decodeByteBuf.array(); + } else { + decodeBytes = new byte[decodeByteBuf.readableBytes() - decodeByteBuf.readerIndex()]; + decodeByteBuf.readBytes(decodeBytes); + } + return ManagedLedgerInfo.parseFrom(decodeBytes); + } catch (Exception e) { + log.error("Failed to parse managedLedgerInfo metadata, " + + "fall back to parse managedLedgerInfo directly.", e); + return ManagedLedgerInfo.parseFrom(data); + } finally { + if (decodeByteBuf != null) { + decodeByteBuf.release(); + } + } + } else { + return ManagedLedgerInfo.parseFrom(data); + } + } + + private CompressionCodec getCompressionCodec() { + return CompressionCodecProvider.getCompressionCodec( + org.apache.pulsar.common.api.proto.CompressionType.valueOf(compressionType.name())); + } + } diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto index a5be8e4a4e379..a3528b664e29f 100644 --- a/managed-ledger/src/main/proto/MLDataFormats.proto +++ b/managed-ledger/src/main/proto/MLDataFormats.proto @@ -124,3 +124,16 @@ message ManagedCursorInfo { // Store which index in the batch message has been deleted repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 7; } + +enum CompressionType { + NONE = 0; + LZ4 = 1; + ZLIB = 2; + ZSTD = 3; + SNAPPY = 4; +} + +message ManagedLedgerInfoMetadata { + required CompressionType compressionType = 1; + required int32 uncompressedSize = 2; +} diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerInfoMetadataTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerInfoMetadataTest.java new file mode 100644 index 0000000000000..2f27489aeb9f6 --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerInfoMetadataTest.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import com.google.protobuf.InvalidProtocolBufferException; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.offload.OffloadUtils; +import org.apache.bookkeeper.mledger.proto.MLDataFormats; +import org.apache.commons.lang3.RandomUtils; +import org.apache.pulsar.common.api.proto.CompressionType; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * ManagedLedgerInfo metadata test. + */ +@Slf4j +public class ManagedLedgerInfoMetadataTest { + + @DataProvider(name = "compressionTypeProvider") + private Object[][] compressionTypeProvider() { + return new Object[][] { + {null}, + {"INVALID_TYPE"}, + {CompressionType.NONE.name()}, + {CompressionType.LZ4.name()}, + {CompressionType.ZLIB.name()}, + {CompressionType.ZSTD.name()}, + {CompressionType.SNAPPY.name()} + }; + } + + @Test(dataProvider = "compressionTypeProvider") + public void testEncodeAndDecode(String compressionType) throws IOException { + long ledgerId = 10000; + List ledgerInfoList = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + MLDataFormats.ManagedLedgerInfo.LedgerInfo.Builder builder = MLDataFormats.ManagedLedgerInfo.LedgerInfo.newBuilder(); + builder.setLedgerId(ledgerId); + builder.setEntries(RandomUtils.nextInt()); + builder.setSize(RandomUtils.nextLong()); + builder.setTimestamp(System.currentTimeMillis()); + + UUID uuid = UUID.randomUUID(); + builder.getOffloadContextBuilder() + .setUidMsb(uuid.getMostSignificantBits()) + .setUidLsb(uuid.getLeastSignificantBits()); + Map offloadDriverMetadata = new HashMap<>(); + offloadDriverMetadata.put("bucket", "test-bucket"); + offloadDriverMetadata.put("managedLedgerOffloadDriver", "pulsar-offload-dev"); + offloadDriverMetadata.put("serviceEndpoint", "https://s3.eu-west-1.amazonaws.com"); + offloadDriverMetadata.put("region", "eu-west-1"); + OffloadUtils.setOffloadDriverMetadata( + builder, + "aws-s3", + offloadDriverMetadata + ); + + MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = builder.build(); + ledgerInfoList.add(ledgerInfo); + ledgerId ++; + } + + MLDataFormats.ManagedLedgerInfo managedLedgerInfo = MLDataFormats.ManagedLedgerInfo.newBuilder() + .addAllLedgerInfo(ledgerInfoList) + .build(); + + MetaStoreImpl metaStore; + try { + metaStore = new MetaStoreImpl(null, null, compressionType); + if ("INVALID_TYPE".equals(compressionType)) { + Assert.fail("The managedLedgerInfo compression type is invalid, should fail."); + } + } catch (Exception e) { + if ("INVALID_TYPE".equals(compressionType)) { + Assert.assertEquals(e.getClass(), IllegalArgumentException.class); + Assert.assertEquals( + "No enum constant org.apache.bookkeeper.mledger.proto.MLDataFormats.CompressionType." + + compressionType, e.getMessage()); + return; + } else { + throw e; + } + } + + byte[] compressionBytes = metaStore.compressLedgerInfo(managedLedgerInfo); + log.info("[{}] Uncompressed data size: {}, compressed data size: {}", + compressionType, managedLedgerInfo.getSerializedSize(), compressionBytes.length); + if (compressionType == null || compressionType.equals(CompressionType.NONE.name())) { + Assert.assertEquals(compressionBytes.length, managedLedgerInfo.getSerializedSize()); + } + + // parse compression data and unCompression data, check their results. + MLDataFormats.ManagedLedgerInfo info1 = metaStore.parseManagedLedgerInfo(compressionBytes); + MLDataFormats.ManagedLedgerInfo info2 = metaStore.parseManagedLedgerInfo(managedLedgerInfo.toByteArray()); + Assert.assertEquals(info1, info2); + } + + @Test + public void testParseEmptyData() throws InvalidProtocolBufferException { + MetaStoreImpl metaStore = new MetaStoreImpl(null, null); + MLDataFormats.ManagedLedgerInfo managedLedgerInfo = metaStore.parseManagedLedgerInfo(new byte[0]); + Assert.assertEquals(managedLedgerInfo.toString(), ""); + } + +} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 08024a9f16a57..38e432e2a11cc 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1601,6 +1601,11 @@ public class ServiceConfiguration implements PulsarConfiguration { private String managedLedgerDataReadPriority = OffloadedReadPriority.TIERED_STORAGE_FIRST .getValue(); + @FieldContext(category = CATEGORY_STORAGE_ML, + doc = "ManagedLedgerInfo compression type, option values (NONE, LZ4, ZLIB, ZSTD, SNAPPY). \n" + + "If value is invalid or NONE, then save the ManagedLedgerInfo bytes data directly.") + private String managedLedgerInfoCompressionType = "NONE"; + /*** --- Load balancer --- ****/ @FieldContext( category = CATEGORY_LOAD_BALANCER, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java index 3b684bf6180f3..431cb729b22fb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java @@ -69,6 +69,7 @@ public void initialize(ServiceConfiguration conf, MetadataStore metadataStore, conf.getManagedLedgerPrometheusStatsLatencyRolloverSeconds()); managedLedgerFactoryConfig.setTraceTaskExecution(conf.isManagedLedgerTraceTaskExecution()); managedLedgerFactoryConfig.setCursorPositionFlushSeconds(conf.getManagedLedgerCursorPositionFlushSeconds()); + managedLedgerFactoryConfig.setManagedLedgerInfoCompressionType(conf.getManagedLedgerInfoCompressionType()); Configuration configuration = new ClientConfiguration(); if (conf.isBookkeeperClientExposeStatsToPrometheus()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerCompressionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerCompressionTest.java new file mode 100644 index 0000000000000..626522de70d91 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerCompressionTest.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +/** + * ManagedLedgerInfo compression configuration test. + */ +public class ManagedLedgerCompressionTest extends BrokerTestBase { + + @BeforeClass + @Override + protected void setup() throws Exception { + conf.setManagedLedgerInfoCompressionType(CompressionType.NONE.name()); + super.baseSetup(); + } + + @AfterClass + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test(timeOut = 1000 * 10) + public void testRestartBrokerEnableManagedLedgerInfoCompression() throws Exception { + String topic = newTopicName(); + @Cleanup + Producer producer = pulsarClient.newProducer() + .topic(topic) + .create(); + @Cleanup + Consumer consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("test") + .subscribe(); + + int messageCnt = 100; + for (int i = 0; i < messageCnt; i++) { + producer.newMessage().value("test".getBytes()).send(); + } + for (int i = 0; i < messageCnt; i++) { + Message message = consumer.receive(1000, TimeUnit.SECONDS); + consumer.acknowledge(message); + Assert.assertNotNull(message); + } + + stopBroker(); + conf.setManagedLedgerInfoCompressionType(CompressionType.ZSTD.name()); + startBroker(); + + for (int i = 0; i < messageCnt; i++) { + producer.newMessage().value("test".getBytes()).send(); + } + for (int i = 0; i < messageCnt; i++) { + Message message = consumer.receive(1000, TimeUnit.SECONDS); + Assert.assertNotNull(message); + consumer.acknowledge(message); + } + + stopBroker(); + conf.setManagedLedgerInfoCompressionType("INVALID"); + try { + startBroker(); + Assert.fail("The managedLedgerInfo compression type is invalid, should fail."); + } catch (Exception e) { + Assert.assertEquals(e.getCause().getClass(), IllegalArgumentException.class); + Assert.assertEquals( + "No enum constant org.apache.bookkeeper.mledger.proto.MLDataFormats.CompressionType.INVALID", + e.getCause().getMessage()); + } + } + +}