diff --git a/.github/workflows/ci-unit-broker-jdk8.yaml b/.github/workflows/ci-unit-broker-jdk8.yaml new file mode 100644 index 0000000000000..623c3ac254179 --- /dev/null +++ b/.github/workflows/ci-unit-broker-jdk8.yaml @@ -0,0 +1,103 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +name: CI - Unit - Broker - JDK8 +on: + pull_request: + branches: + - master + push: + branches: + - branch-* + +env: + MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3 + +jobs: + + unit-tests: + name: + runs-on: ubuntu-latest + timeout-minutes: 60 + + steps: + - name: checkout + uses: actions/checkout@v2 + + - name: Tune Runner VM + uses: ./.github/actions/tune-runner-vm + + - name: Detect changed files + id: changes + uses: apache/pulsar-test-infra/paths-filter@master + with: + filters: .github/changes-filter.yaml + + - name: Check changed files + id: check_changes + run: echo "::set-output name=docs_only::${{ fromJSON(steps.changes.outputs.all_count) == fromJSON(steps.changes.outputs.docs_count) && fromJSON(steps.changes.outputs.docs_count) > 0 }}" + + - name: Cache local Maven repository + if: ${{ steps.check_changes.outputs.docs_only != 'true' }} + uses: actions/cache@v2 + with: + path: | + ~/.m2/repository/*/*/* + !~/.m2/repository/org/apache/pulsar + key: ${{ runner.os }}-m2-dependencies-core-modules-${{ hashFiles('**/pom.xml') }} + restore-keys: | + ${{ runner.os }}-m2-dependencies-core-modules- + + - name: Set up JDK 8 + uses: actions/setup-java@v2 + if: ${{ steps.check_changes.outputs.docs_only != 'true' }} + with: + distribution: 'adopt' + java-version: 8 + + - name: Replace maven's wagon-http version + if: ${{ steps.check_changes.outputs.docs_only != 'true' }} + run: sudo ./build/replace_maven-wagon-http-version.sh + + - name: build modules + if: ${{ steps.check_changes.outputs.docs_only != 'true' }} + run: mvn -B -ntp -q clean install -Pcore-modules,-main -DskipTests + + - name: run unit test 'BROKER_GROUP_1' + if: ${{ steps.check_changes.outputs.docs_only != 'true' }} + run: ./build/run_unit_group.sh BROKER_JDK8 + + - name: print JVM thread dumps when cancelled + if: cancelled() + run: ./build/pulsar_ci_tool.sh print_thread_dumps + + - name: package surefire artifacts + if: failure() + run: | + rm -rf artifacts + mkdir artifacts + find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \; + zip -r artifacts.zip artifacts + + - uses: actions/upload-artifact@master + name: upload surefire-artifacts + if: failure() + with: + name: surefire-artifacts + path: artifacts.zip diff --git a/build/run_unit_group.sh b/build/run_unit_group.sh index 00b3fb0797603..449e91b0a399e 100755 --- a/build/run_unit_group.sh +++ b/build/run_unit_group.sh @@ -53,6 +53,10 @@ function broker_client_impl() { $MVN_TEST_COMMAND -pl pulsar-broker -Dgroups='broker-impl' } +function broker_jdk8() { + $MVN_TEST_COMMAND -pl pulsar-broker -Dgroups='broker-jdk8' -Dpulsar.allocator.pooled=true +} + # prints summaries of failed tests to console # by using the targer/surefire-reports files # works only when testForkCount > 1 since that is when surefire will create reports for individual test classes @@ -177,6 +181,10 @@ case $TEST_GROUP in other ;; + BROKER_JDK8) + broker_jdk8 + ;; + *) echo -n "INVALID TEST GROUP" exit 1 diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java index a2951143d70d2..b21b41ec92c21 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java @@ -310,6 +310,7 @@ public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) { return managedLedgerInfo.toByteArray(); } ByteBuf metadataByteBuf = null; + ByteBuf uncompressedByteBuf = null; ByteBuf encodeByteBuf = null; try { MLDataFormats.ManagedLedgerInfoMetadata mlInfoMetadata = MLDataFormats.ManagedLedgerInfoMetadata @@ -322,10 +323,13 @@ public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) { metadataByteBuf.writeShort(MAGIC_MANAGED_LEDGER_INFO_METADATA); metadataByteBuf.writeInt(mlInfoMetadata.getSerializedSize()); metadataByteBuf.writeBytes(mlInfoMetadata.toByteArray()); - + byte[] byteArray = managedLedgerInfo.toByteArray(); + // The reason for copy the data to a direct buffer here is to ensure the metadata compression feature can + // work on JDK1.8, for more details to see: https://github.com/apache/pulsar/issues/11593 + uncompressedByteBuf = Unpooled.directBuffer(byteArray.length); + uncompressedByteBuf.writeBytes(byteArray); encodeByteBuf = getCompressionCodec(compressionType) - .encode(Unpooled.wrappedBuffer(managedLedgerInfo.toByteArray())); - + .encode(uncompressedByteBuf); CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer(); compositeByteBuf.addComponent(true, metadataByteBuf); compositeByteBuf.addComponent(true, encodeByteBuf); @@ -336,6 +340,9 @@ public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) { if (metadataByteBuf != null) { metadataByteBuf.release(); } + if (uncompressedByteBuf != null) { + uncompressedByteBuf.release(); + } if (encodeByteBuf != null) { encodeByteBuf.release(); } @@ -346,6 +353,7 @@ public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProto ByteBuf byteBuf = Unpooled.wrappedBuffer(data); if (byteBuf.readableBytes() > 0 && byteBuf.readShort() == MAGIC_MANAGED_LEDGER_INFO_METADATA) { ByteBuf decodeByteBuf = null; + ByteBuf compressedByteBuf = null; try { int metadataSize = byteBuf.readInt(); byte[] metadataBytes = new byte[metadataSize]; @@ -354,8 +362,12 @@ public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProto MLDataFormats.ManagedLedgerInfoMetadata.parseFrom(metadataBytes); long unpressedSize = metadata.getUncompressedSize(); + // The reason for copy the data to a direct buffer here is to ensure the metadata compression feature + // can work on JDK1.8, for more details to see: https://github.com/apache/pulsar/issues/11593 + compressedByteBuf = Unpooled.directBuffer(byteBuf.readableBytes()); + compressedByteBuf.writeBytes(byteBuf); decodeByteBuf = getCompressionCodec(metadata.getCompressionType()) - .decode(byteBuf, (int) unpressedSize); + .decode(compressedByteBuf, (int) unpressedSize); byte[] decodeBytes; // couldn't decode data by ZLIB compression byteBuf array() directly if (decodeByteBuf.hasArray() && !CompressionType.ZLIB.equals(metadata.getCompressionType())) { @@ -373,6 +385,10 @@ public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProto if (decodeByteBuf != null) { decodeByteBuf.release(); } + if (compressedByteBuf != null) { + compressedByteBuf.release(); + } + byteBuf.release(); } } else { return ManagedLedgerInfo.parseFrom(data); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerCompressionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerCompressionTest.java index 9a3bc2fb4453b..3a2390be929a6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerCompressionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerCompressionTest.java @@ -33,6 +33,8 @@ /** * ManagedLedgerInfo compression configuration test. */ + +@Test(groups = {"broker", "broker-jdk8"}) public class ManagedLedgerCompressionTest extends BrokerTestBase { @BeforeClass