Skip to content

Commit

Permalink
Fix java.lang.NoSuchMethodError: java.nio.ByteBuffer.position(I)Ljava…
Browse files Browse the repository at this point in the history
…/nio/ByteBuffer when enabling topic metadata compression (#11594)

### Motivation

Related to #11593. This PR is copying the bytes to the DirectBuf to get around issue #11593.

The created issue airlift/aircompressor#133 for tracking fix from [aircompressor](https://github.com/airlift/aircompressor)

### Verifying this change

Make the `ManagedLedgerCompressionTest` running on JDK8

(cherry picked from commit 53c7c13)
  • Loading branch information
codelipenghui authored and hangc0276 committed Aug 11, 2021
1 parent 105fe87 commit cf4bd92
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 4 deletions.
103 changes: 103 additions & 0 deletions .github/workflows/ci-unit-broker-jdk8.yaml
@@ -0,0 +1,103 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

name: CI - Unit - Broker - JDK8
on:
pull_request:
branches:
- master
push:
branches:
- branch-*

env:
MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3

jobs:

unit-tests:
name:
runs-on: ubuntu-latest
timeout-minutes: 60

steps:
- name: checkout
uses: actions/checkout@v2

- name: Tune Runner VM
uses: ./.github/actions/tune-runner-vm

- name: Detect changed files
id: changes
uses: apache/pulsar-test-infra/paths-filter@master
with:
filters: .github/changes-filter.yaml

- name: Check changed files
id: check_changes
run: echo "::set-output name=docs_only::${{ fromJSON(steps.changes.outputs.all_count) == fromJSON(steps.changes.outputs.docs_count) && fromJSON(steps.changes.outputs.docs_count) > 0 }}"

- name: Cache local Maven repository
if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
uses: actions/cache@v2
with:
path: |
~/.m2/repository/*/*/*
!~/.m2/repository/org/apache/pulsar
key: ${{ runner.os }}-m2-dependencies-core-modules-${{ hashFiles('**/pom.xml') }}
restore-keys: |
${{ runner.os }}-m2-dependencies-core-modules-
- name: Set up JDK 8
uses: actions/setup-java@v2
if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
with:
distribution: 'adopt'
java-version: 8

- name: Replace maven's wagon-http version
if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
run: sudo ./build/replace_maven-wagon-http-version.sh

- name: build modules
if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
run: mvn -B -ntp -q clean install -Pcore-modules,-main -DskipTests

- name: run unit test 'BROKER_GROUP_1'
if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
run: ./build/run_unit_group.sh BROKER_JDK8

- name: print JVM thread dumps when cancelled
if: cancelled()
run: ./build/pulsar_ci_tool.sh print_thread_dumps

- name: package surefire artifacts
if: failure()
run: |
rm -rf artifacts
mkdir artifacts
find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \;
zip -r artifacts.zip artifacts
- uses: actions/upload-artifact@master
name: upload surefire-artifacts
if: failure()
with:
name: surefire-artifacts
path: artifacts.zip
8 changes: 8 additions & 0 deletions build/run_unit_group.sh
Expand Up @@ -53,6 +53,10 @@ function broker_client_impl() {
$MVN_TEST_COMMAND -pl pulsar-broker -Dgroups='broker-impl'
}

function broker_jdk8() {
$MVN_TEST_COMMAND -pl pulsar-broker -Dgroups='broker-jdk8' -Dpulsar.allocator.pooled=true
}

# prints summaries of failed tests to console
# by using the targer/surefire-reports files
# works only when testForkCount > 1 since that is when surefire will create reports for individual test classes
Expand Down Expand Up @@ -177,6 +181,10 @@ case $TEST_GROUP in
other
;;

BROKER_JDK8)
broker_jdk8
;;

*)
echo -n "INVALID TEST GROUP"
exit 1
Expand Down
Expand Up @@ -310,6 +310,7 @@ public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) {
return managedLedgerInfo.toByteArray();
}
ByteBuf metadataByteBuf = null;
ByteBuf uncompressedByteBuf = null;
ByteBuf encodeByteBuf = null;
try {
MLDataFormats.ManagedLedgerInfoMetadata mlInfoMetadata = MLDataFormats.ManagedLedgerInfoMetadata
Expand All @@ -322,10 +323,13 @@ public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) {
metadataByteBuf.writeShort(MAGIC_MANAGED_LEDGER_INFO_METADATA);
metadataByteBuf.writeInt(mlInfoMetadata.getSerializedSize());
metadataByteBuf.writeBytes(mlInfoMetadata.toByteArray());

byte[] byteArray = managedLedgerInfo.toByteArray();
// The reason for copy the data to a direct buffer here is to ensure the metadata compression feature can
// work on JDK1.8, for more details to see: https://github.com/apache/pulsar/issues/11593
uncompressedByteBuf = Unpooled.directBuffer(byteArray.length);
uncompressedByteBuf.writeBytes(byteArray);
encodeByteBuf = getCompressionCodec(compressionType)
.encode(Unpooled.wrappedBuffer(managedLedgerInfo.toByteArray()));

.encode(uncompressedByteBuf);
CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
compositeByteBuf.addComponent(true, metadataByteBuf);
compositeByteBuf.addComponent(true, encodeByteBuf);
Expand All @@ -336,6 +340,9 @@ public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) {
if (metadataByteBuf != null) {
metadataByteBuf.release();
}
if (uncompressedByteBuf != null) {
uncompressedByteBuf.release();
}
if (encodeByteBuf != null) {
encodeByteBuf.release();
}
Expand All @@ -346,6 +353,7 @@ public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProto
ByteBuf byteBuf = Unpooled.wrappedBuffer(data);
if (byteBuf.readableBytes() > 0 && byteBuf.readShort() == MAGIC_MANAGED_LEDGER_INFO_METADATA) {
ByteBuf decodeByteBuf = null;
ByteBuf compressedByteBuf = null;
try {
int metadataSize = byteBuf.readInt();
byte[] metadataBytes = new byte[metadataSize];
Expand All @@ -354,8 +362,12 @@ public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProto
MLDataFormats.ManagedLedgerInfoMetadata.parseFrom(metadataBytes);

long unpressedSize = metadata.getUncompressedSize();
// The reason for copy the data to a direct buffer here is to ensure the metadata compression feature
// can work on JDK1.8, for more details to see: https://github.com/apache/pulsar/issues/11593
compressedByteBuf = Unpooled.directBuffer(byteBuf.readableBytes());
compressedByteBuf.writeBytes(byteBuf);
decodeByteBuf = getCompressionCodec(metadata.getCompressionType())
.decode(byteBuf, (int) unpressedSize);
.decode(compressedByteBuf, (int) unpressedSize);
byte[] decodeBytes;
// couldn't decode data by ZLIB compression byteBuf array() directly
if (decodeByteBuf.hasArray() && !CompressionType.ZLIB.equals(metadata.getCompressionType())) {
Expand All @@ -373,6 +385,10 @@ public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProto
if (decodeByteBuf != null) {
decodeByteBuf.release();
}
if (compressedByteBuf != null) {
compressedByteBuf.release();
}
byteBuf.release();
}
} else {
return ManagedLedgerInfo.parseFrom(data);
Expand Down
Expand Up @@ -33,6 +33,8 @@
/**
* ManagedLedgerInfo compression configuration test.
*/

@Test(groups = {"broker", "broker-jdk8"})
public class ManagedLedgerCompressionTest extends BrokerTestBase {

@BeforeClass
Expand Down

0 comments on commit cf4bd92

Please sign in to comment.