diff --git a/.github/workflows/pulsar-ci.yaml b/.github/workflows/pulsar-ci.yaml
index aa33d7ae197d1..1642b54337fc0 100644
--- a/.github/workflows/pulsar-ci.yaml
+++ b/.github/workflows/pulsar-ci.yaml
@@ -1498,12 +1498,16 @@ jobs:
with:
action: wait
- # This job is required for pulls to be merged.
+ # This job is required for pulls to be merged. This job is referenced by name in .asf.yaml file in the
+ # protected_branches section for master branch required_status_checks.
# It depends on all other jobs in this workflow.
- # It cleans up the binaries in the same job in order to not spin up another runner for basically doing nothing.
+ # This job also cleans up the binaries at the end of the workflow.
pulsar-ci-checks-completed:
name: "Pulsar CI checks completed"
- if: ${{ always() && needs.preconditions.result == 'success' }}
+ # run always, but skip for other repositories than apache/pulsar when a scheduled workflow is cancelled
+ # this is to allow the workflow scheduled jobs to show as cancelled instead of failed since scheduled
+ # jobs are not enabled for other than apache/pulsar repository.
+ if: ${{ always() && !(cancelled() && github.repository != 'apache/pulsar' && github.event_name == 'schedule') }}
runs-on: ubuntu-22.04
timeout-minutes: 10
needs: [
@@ -1521,10 +1525,11 @@ jobs:
]
steps:
- name: Check that all required jobs were completed successfully
- if: ${{ needs.preconditions.outputs.docs_only != 'true' }}
+ if: ${{ needs.preconditions.result != 'success' || needs.preconditions.outputs.docs_only != 'true' }}
run: |
if [[ ! ( \
- "${{ needs.unit-tests.result }}" == "success" \
+ "${{ needs.preconditions.result }}" == "success" \
+ && "${{ needs.unit-tests.result }}" == "success" \
&& "${{ needs.integration-tests.result }}" == "success" \
&& "${{ needs.system-tests.result }}" == "success" \
&& "${{ needs.macos-build.result }}" == "success" \
diff --git a/buildtools/pom.xml b/buildtools/pom.xml
index cd4d02af3d7b4..58f99e9ea86b5 100644
--- a/buildtools/pom.xml
+++ b/buildtools/pom.xml
@@ -41,7 +41,7 @@
1.8
3.1.0
2.23.1
- 1.7.32
+ 2.0.13
7.7.1
3.11
4.1
@@ -100,6 +100,12 @@
org.testng
testng
${testng.version}
+
+
+ org.slf4j
+ *
+
+
org.apache.logging.log4j
@@ -111,7 +117,7 @@
org.apache.logging.log4j
- log4j-slf4j-impl
+ log4j-slf4j2-impl
org.slf4j
diff --git a/conf/broker.conf b/conf/broker.conf
index fd6bba0f45d2c..d97e3a5ef89ad 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -88,6 +88,16 @@ advertisedAddress=
# If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data.
haProxyProtocolEnabled=false
+# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests.
+webServiceHaProxyProtocolEnabled=false
+
+# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false.
+webServiceTrustXForwardedFor=false
+
+# Add detailed client/remote and server/local addresses and ports to http/https request logging.
+# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled.
+webServiceLogDetailedAddresses=
+
# Number of threads to config Netty Acceptor. Default is 1
numAcceptorThreads=
@@ -1660,10 +1670,10 @@ s3ManagedLedgerOffloadBucket=
# For Amazon S3 ledger offload, Alternative endpoint to connect to (useful for testing)
s3ManagedLedgerOffloadServiceEndpoint=
-# For Amazon S3 ledger offload, Max block size in bytes. (64MB by default, 5MB minimum)
+# For Amazon S3 ledger offload, Max block size in bytes. (64MiB by default, 5MiB minimum)
s3ManagedLedgerOffloadMaxBlockSizeInBytes=67108864
-# For Amazon S3 ledger offload, Read buffer size in bytes (1MB by default)
+# For Amazon S3 ledger offload, Read buffer size in bytes (1MiB by default)
s3ManagedLedgerOffloadReadBufferSizeInBytes=1048576
# For Google Cloud Storage ledger offload, region where offload bucket is located.
@@ -1673,10 +1683,11 @@ gcsManagedLedgerOffloadRegion=
# For Google Cloud Storage ledger offload, Bucket to place offloaded ledger into
gcsManagedLedgerOffloadBucket=
-# For Google Cloud Storage ledger offload, Max block size in bytes. (64MB by default, 5MB minimum)
-gcsManagedLedgerOffloadMaxBlockSizeInBytes=67108864
+# For Google Cloud Storage ledger offload, Max block size in bytes. (128MiB by default, 5MiB minimum)
+# Since JClouds limits the maximum number of blocks to 32, the maximum size of a ledger is 32 times the block size.
+gcsManagedLedgerOffloadMaxBlockSizeInBytes=134217728
-# For Google Cloud Storage ledger offload, Read buffer size in bytes (1MB by default)
+# For Google Cloud Storage ledger offload, Read buffer size in bytes (1MiB by default)
gcsManagedLedgerOffloadReadBufferSizeInBytes=1048576
# For Google Cloud Storage, path to json file containing service account credentials.
diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 3871c74a88778..6f995576ebd64 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -27,6 +27,16 @@ workerHostname: localhost
workerPort: 6750
workerPortTls: 6751
+# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests.
+webServiceHaProxyProtocolEnabled: false
+
+# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false.
+webServiceTrustXForwardedFor: false
+
+# Add detailed client/remote and server/local addresses and ports to http/https request logging.
+# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled.
+webServiceLogDetailedAddresses: null
+
# The Configuration metadata store url
# Examples:
# * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181
diff --git a/conf/proxy.conf b/conf/proxy.conf
index 5a9d433f39ceb..6e6c960e8009e 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -63,6 +63,16 @@ advertisedAddress=
# If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data.
haProxyProtocolEnabled=false
+# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests.
+webServiceHaProxyProtocolEnabled=false
+
+# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false.
+webServiceTrustXForwardedFor=false
+
+# Add detailed client/remote and server/local addresses and ports to http/https request logging.
+# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled.
+webServiceLogDetailedAddresses=
+
# Enables zero-copy transport of data across network interfaces using the splice system call.
# Zero copy mode cannot be used when TLS is enabled or when proxyLogLevel is > 0.
proxyZeroCopyModeEnabled=true
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 5c94d63817a12..b04e5ccefa640 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -51,6 +51,16 @@ advertisedAddress=
# If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data.
haProxyProtocolEnabled=false
+# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests.
+webServiceHaProxyProtocolEnabled=false
+
+# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false.
+webServiceTrustXForwardedFor=false
+
+# Add detailed client/remote and server/local addresses and ports to http/https request logging.
+# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled.
+webServiceLogDetailedAddresses=
+
# Number of threads to use for Netty IO. Default is set to 2 * Runtime.getRuntime().availableProcessors()
numIOThreads=
diff --git a/conf/websocket.conf b/conf/websocket.conf
index 490cff2722ee5..9051f3b590c8e 100644
--- a/conf/websocket.conf
+++ b/conf/websocket.conf
@@ -46,6 +46,16 @@ statusFilePath=
# Hostname or IP address the service binds on, default is 0.0.0.0.
bindAddress=0.0.0.0
+# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests.
+webServiceHaProxyProtocolEnabled=false
+
+# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false.
+webServiceTrustXForwardedFor=false
+
+# Add detailed client/remote and server/local addresses and ports to http/https request logging.
+# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled.
+webServiceLogDetailedAddresses=
+
# Name of the pulsar cluster to connect to
clusterName=
diff --git a/distribution/licenses/LICENSE-Reactive-gRPC.txt b/distribution/licenses/LICENSE-Reactive-gRPC.txt
new file mode 100644
index 0000000000000..bc589401e7bdf
--- /dev/null
+++ b/distribution/licenses/LICENSE-Reactive-gRPC.txt
@@ -0,0 +1,29 @@
+BSD 3-Clause License
+
+Copyright (c) 2019, Salesforce.com, Inc.
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+* Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+
+* Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+* Neither the name of the copyright holder nor the names of its
+ contributors may be used to endorse or promote products derived from
+ this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file
diff --git a/distribution/server/pom.xml b/distribution/server/pom.xml
index 599a9755f9155..1c9ea68685308 100644
--- a/distribution/server/pom.xml
+++ b/distribution/server/pom.xml
@@ -180,7 +180,7 @@
org.apache.logging.log4j
- log4j-slf4j-impl
+ log4j-slf4j2-impl
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 4dc6e4341672c..c5c243796b6f3 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -262,7 +262,7 @@ The Apache Software License, Version 2.0
- com.fasterxml.jackson.module-jackson-module-parameter-names-2.14.2.jar
* Caffeine -- com.github.ben-manes.caffeine-caffeine-2.9.1.jar
* Conscrypt -- org.conscrypt-conscrypt-openjdk-uber-2.5.2.jar
- * Proto Google Common Protos -- com.google.api.grpc-proto-google-common-protos-2.9.0.jar
+ * Proto Google Common Protos -- com.google.api.grpc-proto-google-common-protos-2.17.0.jar
* Bitbucket -- org.bitbucket.b_c-jose4j-0.9.4.jar
* Gson
- com.google.code.gson-gson-2.8.9.jar
@@ -350,40 +350,40 @@ The Apache Software License, Version 2.0
* Log4J
- org.apache.logging.log4j-log4j-api-2.23.1.jar
- org.apache.logging.log4j-log4j-core-2.23.1.jar
- - org.apache.logging.log4j-log4j-slf4j-impl-2.23.1.jar
+ - org.apache.logging.log4j-log4j-slf4j2-impl-2.23.1.jar
- org.apache.logging.log4j-log4j-web-2.23.1.jar
* Java Native Access JNA
- net.java.dev.jna-jna-jpms-5.12.1.jar
- net.java.dev.jna-jna-platform-jpms-5.12.1.jar
* BookKeeper
- - org.apache.bookkeeper-bookkeeper-common-4.16.5.jar
- - org.apache.bookkeeper-bookkeeper-common-allocator-4.16.5.jar
- - org.apache.bookkeeper-bookkeeper-proto-4.16.5.jar
- - org.apache.bookkeeper-bookkeeper-server-4.16.5.jar
- - org.apache.bookkeeper-bookkeeper-tools-framework-4.16.5.jar
- - org.apache.bookkeeper-circe-checksum-4.16.5.jar
- - org.apache.bookkeeper-cpu-affinity-4.16.5.jar
- - org.apache.bookkeeper-statelib-4.16.5.jar
- - org.apache.bookkeeper-stream-storage-api-4.16.5.jar
- - org.apache.bookkeeper-stream-storage-common-4.16.5.jar
- - org.apache.bookkeeper-stream-storage-java-client-4.16.5.jar
- - org.apache.bookkeeper-stream-storage-java-client-base-4.16.5.jar
- - org.apache.bookkeeper-stream-storage-proto-4.16.5.jar
- - org.apache.bookkeeper-stream-storage-server-4.16.5.jar
- - org.apache.bookkeeper-stream-storage-service-api-4.16.5.jar
- - org.apache.bookkeeper-stream-storage-service-impl-4.16.5.jar
- - org.apache.bookkeeper.http-http-server-4.16.5.jar
- - org.apache.bookkeeper.http-vertx-http-server-4.16.5.jar
- - org.apache.bookkeeper.stats-bookkeeper-stats-api-4.16.5.jar
- - org.apache.bookkeeper.stats-prometheus-metrics-provider-4.16.5.jar
- - org.apache.distributedlog-distributedlog-common-4.16.5.jar
- - org.apache.distributedlog-distributedlog-core-4.16.5-tests.jar
- - org.apache.distributedlog-distributedlog-core-4.16.5.jar
- - org.apache.distributedlog-distributedlog-protocol-4.16.5.jar
- - org.apache.bookkeeper.stats-codahale-metrics-provider-4.16.5.jar
- - org.apache.bookkeeper-bookkeeper-slogger-api-4.16.5.jar
- - org.apache.bookkeeper-bookkeeper-slogger-slf4j-4.16.5.jar
- - org.apache.bookkeeper-native-io-4.16.5.jar
+ - org.apache.bookkeeper-bookkeeper-common-4.17.0.jar
+ - org.apache.bookkeeper-bookkeeper-common-allocator-4.17.0.jar
+ - org.apache.bookkeeper-bookkeeper-proto-4.17.0.jar
+ - org.apache.bookkeeper-bookkeeper-server-4.17.0.jar
+ - org.apache.bookkeeper-bookkeeper-tools-framework-4.17.0.jar
+ - org.apache.bookkeeper-circe-checksum-4.17.0.jar
+ - org.apache.bookkeeper-cpu-affinity-4.17.0.jar
+ - org.apache.bookkeeper-statelib-4.17.0.jar
+ - org.apache.bookkeeper-stream-storage-api-4.17.0.jar
+ - org.apache.bookkeeper-stream-storage-common-4.17.0.jar
+ - org.apache.bookkeeper-stream-storage-java-client-4.17.0.jar
+ - org.apache.bookkeeper-stream-storage-java-client-base-4.17.0.jar
+ - org.apache.bookkeeper-stream-storage-proto-4.17.0.jar
+ - org.apache.bookkeeper-stream-storage-server-4.17.0.jar
+ - org.apache.bookkeeper-stream-storage-service-api-4.17.0.jar
+ - org.apache.bookkeeper-stream-storage-service-impl-4.17.0.jar
+ - org.apache.bookkeeper.http-http-server-4.17.0.jar
+ - org.apache.bookkeeper.http-vertx-http-server-4.17.0.jar
+ - org.apache.bookkeeper.stats-bookkeeper-stats-api-4.17.0.jar
+ - org.apache.bookkeeper.stats-prometheus-metrics-provider-4.17.0.jar
+ - org.apache.distributedlog-distributedlog-common-4.17.0.jar
+ - org.apache.distributedlog-distributedlog-core-4.17.0-tests.jar
+ - org.apache.distributedlog-distributedlog-core-4.17.0.jar
+ - org.apache.distributedlog-distributedlog-protocol-4.17.0.jar
+ - org.apache.bookkeeper.stats-codahale-metrics-provider-4.17.0.jar
+ - org.apache.bookkeeper-bookkeeper-slogger-api-4.17.0.jar
+ - org.apache.bookkeeper-bookkeeper-slogger-slf4j-4.17.0.jar
+ - org.apache.bookkeeper-native-io-4.17.0.jar
* Apache HTTP Client
- org.apache.httpcomponents-httpclient-4.5.13.jar
- org.apache.httpcomponents-httpcore-4.4.15.jar
@@ -430,23 +430,23 @@ The Apache Software License, Version 2.0
- org.jetbrains.kotlin-kotlin-stdlib-jdk8-1.8.20.jar
- org.jetbrains-annotations-13.0.jar
* gRPC
- - io.grpc-grpc-all-1.55.3.jar
- - io.grpc-grpc-auth-1.55.3.jar
- - io.grpc-grpc-context-1.55.3.jar
- - io.grpc-grpc-core-1.55.3.jar
- - io.grpc-grpc-netty-1.55.3.jar
- - io.grpc-grpc-protobuf-1.55.3.jar
- - io.grpc-grpc-protobuf-lite-1.55.3.jar
- - io.grpc-grpc-stub-1.55.3.jar
- - io.grpc-grpc-alts-1.55.3.jar
- - io.grpc-grpc-api-1.55.3.jar
- - io.grpc-grpc-grpclb-1.55.3.jar
- - io.grpc-grpc-netty-shaded-1.55.3.jar
- - io.grpc-grpc-services-1.55.3.jar
- - io.grpc-grpc-xds-1.55.3.jar
- - io.grpc-grpc-rls-1.55.3.jar
- - io.grpc-grpc-servlet-1.55.3.jar
- - io.grpc-grpc-servlet-jakarta-1.55.3.jar
+ - io.grpc-grpc-all-1.56.0.jar
+ - io.grpc-grpc-auth-1.56.0.jar
+ - io.grpc-grpc-context-1.56.0.jar
+ - io.grpc-grpc-core-1.56.0.jar
+ - io.grpc-grpc-netty-1.56.0.jar
+ - io.grpc-grpc-protobuf-1.56.0.jar
+ - io.grpc-grpc-protobuf-lite-1.56.0.jar
+ - io.grpc-grpc-stub-1.56.0.jar
+ - io.grpc-grpc-alts-1.56.0.jar
+ - io.grpc-grpc-api-1.56.0.jar
+ - io.grpc-grpc-grpclb-1.56.0.jar
+ - io.grpc-grpc-netty-shaded-1.56.0.jar
+ - io.grpc-grpc-services-1.56.0.jar
+ - io.grpc-grpc-xds-1.56.0.jar
+ - io.grpc-grpc-rls-1.56.0.jar
+ - io.grpc-grpc-servlet-1.56.0.jar
+ - io.grpc-grpc-servlet-jakarta-1.56.0.jar
* Perfmark
- io.perfmark-perfmark-api-0.26.0.jar
* OpenCensus
@@ -481,7 +481,12 @@ The Apache Software License, Version 2.0
* Prometheus
- io.prometheus-simpleclient_httpserver-0.16.0.jar
* Oxia
- - io.streamnative.oxia-oxia-client-0.1.0-shaded.jar
+ - io.streamnative.oxia-oxia-client-0.1.6.jar
+ - io.streamnative.oxia-oxia-client-metrics-api-0.1.6.jar
+ * OpenHFT
+ - net.openhft-zero-allocation-hashing-0.16.jar
+ * Project reactor
+ - io.projectreactor-reactor-core-3.5.2.jar
* Java JSON WebTokens
- io.jsonwebtoken-jjwt-api-0.11.1.jar
- io.jsonwebtoken-jjwt-impl-0.11.1.jar
@@ -504,8 +509,8 @@ The Apache Software License, Version 2.0
* Google HTTP Client
- com.google.http-client-google-http-client-gson-1.41.0.jar
- com.google.http-client-google-http-client-1.41.0.jar
- - com.google.auto.value-auto-value-annotations-1.9.jar
- - com.google.re2j-re2j-1.6.jar
+ - com.google.auto.value-auto-value-annotations-1.10.1.jar
+ - com.google.re2j-re2j-1.7.jar
* Jetcd
- io.etcd-jetcd-api-0.7.5.jar
- io.etcd-jetcd-common-0.7.5.jar
@@ -548,6 +553,9 @@ BSD 3-clause "New" or "Revised" License
* JSR305 -- com.google.code.findbugs-jsr305-3.0.2.jar -- ../licenses/LICENSE-JSR305.txt
* JLine -- jline-jline-2.14.6.jar -- ../licenses/LICENSE-JLine.txt
* JLine3 -- org.jline-jline-3.21.0.jar -- ../licenses/LICENSE-JLine.txt
+ * Reactive gRPC
+ - com.salesforce.servicelibs-reactive-grpc-common-1.2.4.jar -- ../licenses/LICENSE-Reactive-gRPC.txt
+ - com.salesforce.servicelibs-reactor-grpc-stub-1.2.4.jar -- ../licenses/LICENSE-Reactive-gRPC.txt
BSD 2-Clause License
* HdrHistogram -- org.hdrhistogram-HdrHistogram-2.1.9.jar -- ../licenses/LICENSE-HdrHistogram.txt
@@ -555,8 +563,8 @@ BSD 2-Clause License
MIT License
* Java SemVer -- com.github.zafarkhaja-java-semver-0.9.0.jar -- ../licenses/LICENSE-SemVer.txt
* SLF4J -- ../licenses/LICENSE-SLF4J.txt
- - org.slf4j-slf4j-api-1.7.32.jar
- - org.slf4j-jcl-over-slf4j-1.7.32.jar
+ - org.slf4j-slf4j-api-2.0.13.jar
+ - org.slf4j-jcl-over-slf4j-2.0.13.jar
* The Checker Framework
- org.checkerframework-checker-qual-3.33.0.jar
* oshi
@@ -566,8 +574,8 @@ MIT License
- com.auth0-jwks-rsa-0.22.0.jar
Protocol Buffers License
* Protocol Buffers
- - com.google.protobuf-protobuf-java-3.19.6.jar -- ../licenses/LICENSE-protobuf.txt
- - com.google.protobuf-protobuf-java-util-3.19.6.jar -- ../licenses/LICENSE-protobuf.txt
+ - com.google.protobuf-protobuf-java-3.22.3.jar -- ../licenses/LICENSE-protobuf.txt
+ - com.google.protobuf-protobuf-java-util-3.22.3.jar -- ../licenses/LICENSE-protobuf.txt
CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt
* Java Annotations API
diff --git a/distribution/shell/pom.xml b/distribution/shell/pom.xml
index 5f4fc549ccc62..144f7b1ff6d83 100644
--- a/distribution/shell/pom.xml
+++ b/distribution/shell/pom.xml
@@ -51,7 +51,7 @@
org.apache.logging.log4j
- log4j-slf4j-impl
+ log4j-slf4j2-impl
diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt b/distribution/shell/src/assemble/LICENSE.bin.txt
index 069e61b89b55a..41b38f17dce77 100644
--- a/distribution/shell/src/assemble/LICENSE.bin.txt
+++ b/distribution/shell/src/assemble/LICENSE.bin.txt
@@ -333,6 +333,7 @@ The Apache Software License, Version 2.0
- listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
* J2ObjC Annotations -- j2objc-annotations-1.3.jar
* Netty Reactive Streams -- netty-reactive-streams-2.0.6.jar
+ * Swagger -- swagger-annotations-1.6.2.jar
* DataSketches
- memory-0.8.3.jar
- sketches-core-0.8.3.jar
@@ -385,7 +386,7 @@ The Apache Software License, Version 2.0
* Log4J
- log4j-api-2.23.1.jar
- log4j-core-2.23.1.jar
- - log4j-slf4j-impl-2.23.1.jar
+ - log4j-slf4j2-impl-2.23.1.jar
- log4j-web-2.23.1.jar
* OpenTelemetry
- opentelemetry-api-1.34.1.jar
@@ -393,9 +394,9 @@ The Apache Software License, Version 2.0
- opentelemetry-extension-incubator-1.34.1-alpha.jar
* BookKeeper
- - bookkeeper-common-allocator-4.16.5.jar
- - cpu-affinity-4.16.5.jar
- - circe-checksum-4.16.5.jar
+ - bookkeeper-common-allocator-4.17.0.jar
+ - cpu-affinity-4.17.0.jar
+ - circe-checksum-4.17.0.jar
* AirCompressor
- aircompressor-0.20.jar
* AsyncHttpClient
@@ -423,13 +424,13 @@ BSD 3-clause "New" or "Revised" License
MIT License
* SLF4J -- ../licenses/LICENSE-SLF4J.txt
- - slf4j-api-1.7.32.jar
+ - slf4j-api-2.0.13.jar
* The Checker Framework
- checker-qual-3.33.0.jar
Protocol Buffers License
* Protocol Buffers
- - protobuf-java-3.19.6.jar -- ../licenses/LICENSE-protobuf.txt
+ - protobuf-java-3.22.3.jar -- ../licenses/LICENSE-protobuf.txt
CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt
* Java Annotations API
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 08c3509d58a8e..d0cc9f5e576bd 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3817,7 +3817,7 @@ public PositionImpl getValidPositionAfterSkippedEntries(final PositionImpl posit
Long nextLedgerId = ledgers.ceilingKey(skippedPosition.getLedgerId() + 1);
// This means it has jumped to the last position
if (nextLedgerId == null) {
- if (currentLedgerEntries == 0) {
+ if (currentLedgerEntries == 0 && currentLedger != null) {
return PositionImpl.get(currentLedger.getId(), 0);
}
return lastConfirmedEntry.getNext();
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 1932b1d90ae5b..f16e71bf264b8 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -4697,5 +4697,66 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
&& cursorReadPosition.getEntryId() == expectReadPosition.getEntryId());
}
+ @Test
+ public void testRecoverCursorWithTerminateManagedLedger() throws Exception {
+ String mlName = "my_test_ledger";
+ String cursorName = "c1";
+
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(mlName, config);
+ ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(cursorName);
+
+ // Write some data.
+ Position p0 = ledger.addEntry("entry-0".getBytes());
+ Position p1 = ledger.addEntry("entry-1".getBytes());
+
+ // Read message.
+ List entries = c1.readEntries(2);
+ assertEquals(entries.size(), 2);
+ assertEquals(entries.get(0).getPosition(), p0);
+ assertEquals(entries.get(1).getPosition(), p1);
+ entries.forEach(Entry::release);
+
+ // Mark delete the last message.
+ c1.markDelete(p1);
+ Position markDeletedPosition = c1.getMarkDeletedPosition();
+ Assert.assertEquals(markDeletedPosition, p1);
+
+ // Terminate the managed ledger.
+ Position lastPosition = ledger.terminate();
+ assertEquals(lastPosition, p1);
+
+ // Close the ledger.
+ ledger.close();
+
+ // Reopen the ledger.
+ ledger = (ManagedLedgerImpl) factory.open(mlName, config);
+ BookKeeper mockBookKeeper = mock(BookKeeper.class);
+ final ManagedCursorImpl cursor = new ManagedCursorImpl(mockBookKeeper, new ManagedLedgerConfig(), ledger,
+ cursorName);
+
+ CompletableFuture recoverFuture = new CompletableFuture<>();
+ // Recover the cursor.
+ cursor.recover(new VoidCallback() {
+ @Override
+ public void operationComplete() {
+ recoverFuture.complete(null);
+ }
+
+ @Override
+ public void operationFailed(ManagedLedgerException exception) {
+ recoverFuture.completeExceptionally(exception);
+ }
+ });
+
+ recoverFuture.join();
+ assertTrue(recoverFuture.isDone());
+ assertFalse(recoverFuture.isCompletedExceptionally());
+
+ // Verify the cursor state.
+ assertEquals(cursor.getMarkDeletedPosition(), markDeletedPosition);
+ assertEquals(cursor.getReadPosition(), markDeletedPosition.getNext());
+ }
+
private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
}
diff --git a/pom.xml b/pom.xml
index c7fba94abd8ea..585347fb1f855 100644
--- a/pom.xml
+++ b/pom.xml
@@ -137,7 +137,7 @@ flexible messaging model and an intuitive client API.
1.26.0
- 4.16.5
+ 4.17.0
3.9.2
1.5.0
1.10.0
@@ -153,7 +153,7 @@ flexible messaging model and an intuitive client API.
0.16.0
4.3.8
7.9.2
- 1.7.32
+ 2.0.13
4.4
2.23.1
1.78
@@ -168,9 +168,9 @@ flexible messaging model and an intuitive client API.
0.5.0
1.14.12
1.17
- 3.19.6
+ 3.22.3
${protobuf3.version}
- 1.55.3
+ 1.56.0
1.41.0
0.26.0
${grpc.version}
@@ -248,7 +248,7 @@ flexible messaging model and an intuitive client API.
4.5.13
4.4.15
0.7.5
- 0.1.0
+ 0.1.6
2.0
1.10.12
5.3.3
@@ -278,6 +278,7 @@ flexible messaging model and an intuitive client API.
1.5.4
5.4.0
2.33.2
+ 1.0.3
0.6.1
@@ -351,6 +352,10 @@ flexible messaging model and an intuitive client API.
org.yaml
*
+
+ org.slf4j
+ *
+
@@ -459,6 +464,10 @@ flexible messaging model and an intuitive client API.
org.bouncycastle
*
+
+ log4j-slf4j-impl
+ org.apache.logging.log4j
+
slf4j-log4j12
org.slf4j
@@ -1192,7 +1201,6 @@ flexible messaging model and an intuitive client API.
io.streamnative.oxia
oxia-client
${oxia.version}
- shaded
io.streamnative.oxia
@@ -1582,6 +1590,10 @@ flexible messaging model and an intuitive client API.
org.apache.zookeeper
*
+
+ log4j-slf4j-impl
+ org.apache.logging.log4j
+
@@ -2701,5 +2713,13 @@ flexible messaging model and an intuitive client API.
false
+
+
+ oracle.releases
+ https://download.oracle.com/maven
+
+ false
+
+
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 2b58cbc2d1178..156c83bd6960c 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -250,6 +250,22 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ " when getting topic statistics data.")
private boolean haProxyProtocolEnabled;
+ @FieldContext(category = CATEGORY_SERVER,
+ doc = "Enable or disable the use of HA proxy protocol for resolving the client IP for http/https "
+ + "requests. Default is false.")
+ private boolean webServiceHaProxyProtocolEnabled = false;
+
+ @FieldContext(category = CATEGORY_SERVER, doc =
+ "Trust X-Forwarded-For header for resolving the client IP for http/https requests.\n"
+ + "Default is false.")
+ private boolean webServiceTrustXForwardedFor = false;
+
+ @FieldContext(category = CATEGORY_SERVER, doc =
+ "Add detailed client/remote and server/local addresses and ports to http/https request logging.\n"
+ + "Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor "
+ + "is enabled.")
+ private Boolean webServiceLogDetailedAddresses;
+
@FieldContext(
category = CATEGORY_SERVER,
doc = "Number of threads to use for Netty Acceptor."
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index 1ba353dccaa1c..975b23192f949 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -24,6 +24,8 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -54,10 +56,14 @@ public class NamespaceResources extends BaseResources {
private static final String NAMESPACE_BASE_PATH = "/namespace";
public NamespaceResources(MetadataStore configurationStore, int operationTimeoutSec) {
+ this(configurationStore, operationTimeoutSec, ForkJoinPool.commonPool());
+ }
+
+ public NamespaceResources(MetadataStore configurationStore, int operationTimeoutSec, Executor executor) {
super(configurationStore, Policies.class, operationTimeoutSec);
this.configurationStore = configurationStore;
isolationPolicies = new IsolationPolicyResources(configurationStore, operationTimeoutSec);
- partitionedTopicResources = new PartitionedTopicResources(configurationStore, operationTimeoutSec);
+ partitionedTopicResources = new PartitionedTopicResources(configurationStore, operationTimeoutSec, executor);
}
public CompletableFuture> listNamespacesAsync(String tenant) {
@@ -234,9 +240,11 @@ public void setIsolationDataWithCreate(String cluster,
public static class PartitionedTopicResources extends BaseResources {
private static final String PARTITIONED_TOPIC_PATH = "/admin/partitioned-topics";
+ private final Executor executor;
- public PartitionedTopicResources(MetadataStore configurationStore, int operationTimeoutSec) {
+ public PartitionedTopicResources(MetadataStore configurationStore, int operationTimeoutSec, Executor executor) {
super(configurationStore, PartitionedTopicMetadata.class, operationTimeoutSec);
+ this.executor = executor;
}
public CompletableFuture updatePartitionedTopicAsync(TopicName tn, Function runWithMarkDeleteAsync(TopicName topic,
future.complete(deleteResult);
}
});
- });
+ }, executor);
return future;
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
index fe7ffe0bc7b43..cc64eeb52f6eb 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.broker.resources;
import java.util.Optional;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
import lombok.Getter;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
@@ -57,13 +59,19 @@ public class PulsarResources {
public PulsarResources(MetadataStore localMetadataStore, MetadataStore configurationMetadataStore) {
this(localMetadataStore, configurationMetadataStore, DEFAULT_OPERATION_TIMEOUT_SEC);
}
+
+ public PulsarResources(MetadataStore localMetadataStore, MetadataStore configurationMetadataStore,
+ int operationTimeoutSec) {
+ this(localMetadataStore, configurationMetadataStore, operationTimeoutSec, ForkJoinPool.commonPool());
+ }
+
public PulsarResources(MetadataStore localMetadataStore, MetadataStore configurationMetadataStore,
- int operationTimeoutSec) {
+ int operationTimeoutSec, Executor executor) {
if (configurationMetadataStore != null) {
tenantResources = new TenantResources(configurationMetadataStore, operationTimeoutSec);
clusterResources = new ClusterResources(localMetadataStore, configurationMetadataStore,
operationTimeoutSec);
- namespaceResources = new NamespaceResources(configurationMetadataStore, operationTimeoutSec);
+ namespaceResources = new NamespaceResources(configurationMetadataStore, operationTimeoutSec, executor);
resourcegroupResources = new ResourceGroupResources(configurationMetadataStore, operationTimeoutSec);
} else {
tenantResources = null;
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
index 0963f25c3d31f..413184764f52b 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
@@ -120,7 +120,7 @@ public CompletableFuture clearTenantPersistence(String tenant) {
return store.exists(path)
.thenCompose(exists -> {
if (exists) {
- return store.delete(path, Optional.empty());
+ return store.deleteRecursive(path);
} else {
return CompletableFuture.completedFuture(null);
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/JettyRequestLogFactory.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/JettyRequestLogFactory.java
index e5daa5852b51f..fc88647eb49ea 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/JettyRequestLogFactory.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/JettyRequestLogFactory.java
@@ -18,9 +18,23 @@
*/
package org.apache.pulsar.broker.web;
+import java.net.InetSocketAddress;
import java.util.TimeZone;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.jetty.io.Connection;
+import org.eclipse.jetty.io.EndPoint;
+import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.CustomRequestLog;
+import org.eclipse.jetty.server.ProxyConnectionFactory;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.RequestLog;
+import org.eclipse.jetty.server.Response;
+import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.Slf4jRequestLogWriter;
+import org.eclipse.jetty.util.HostPort;
+import org.eclipse.jetty.util.component.ContainerLifeCycle;
/**
* Class to standardize initialization of a Jetty request logger for all pulsar components.
@@ -58,7 +72,184 @@ public class JettyRequestLogFactory {
* Build a new Jetty request logger using the format defined in this class.
* @return a request logger
*/
- public static CustomRequestLog createRequestLogger() {
- return new CustomRequestLog(new Slf4jRequestLogWriter(), LOG_FORMAT);
+ public static RequestLog createRequestLogger() {
+ return createRequestLogger(false, null);
+ }
+
+ /**
+ * Build a new Jetty request logger using the format defined in this class.
+ * @param showDetailedAddresses whether to show detailed addresses and ports in logs
+ * @return a request logger
+ */
+ public static RequestLog createRequestLogger(boolean showDetailedAddresses, Server server) {
+ if (!showDetailedAddresses) {
+ return new CustomRequestLog(new Slf4jRequestLogWriter(), LOG_FORMAT);
+ } else {
+ return new OriginalClientIPRequestLog(server);
+ }
+ }
+
+ /**
+ * Logs the original and real remote (client) and local (server) IP addresses
+ * when detailed addresses are enabled.
+ * Tracks the real addresses of remote and local using a registered Connection.Listener
+ * when detailed addresses are enabled.
+ * This is necessary when Proxy Protocol is used to pass the original client IP.
+ */
+ @Slf4j
+ private static class OriginalClientIPRequestLog extends ContainerLifeCycle implements RequestLog {
+ private final ThreadLocal requestLogStringBuilder = ThreadLocal.withInitial(StringBuilder::new);
+ private final CustomRequestLog delegate;
+ private final Slf4jRequestLogWriter delegateLogWriter;
+
+ OriginalClientIPRequestLog(Server server) {
+ delegate = new CustomRequestLog(this::write, LOG_FORMAT);
+ addBean(delegate);
+ delegateLogWriter = new Slf4jRequestLogWriter();
+ addBean(delegateLogWriter);
+ if (server != null) {
+ for (Connector connector : server.getConnectors()) {
+ // adding the listener is only necessary for connectors that use ProxyConnectionFactory
+ if (connector.getDefaultConnectionFactory() instanceof ProxyConnectionFactory) {
+ connector.addBean(proxyProtocolOriginalEndpointListener);
+ }
+ }
+ }
+ }
+
+ void write(String requestEntry) {
+ StringBuilder sb = requestLogStringBuilder.get();
+ sb.setLength(0);
+ sb.append(requestEntry);
+ }
+
+ @Override
+ public void log(Request request, Response response) {
+ delegate.log(request, response);
+ StringBuilder sb = requestLogStringBuilder.get();
+ sb.append(" [R:");
+ sb.append(request.getRemoteHost());
+ sb.append(':');
+ sb.append(request.getRemotePort());
+ InetSocketAddress realRemoteAddress = lookupRealAddress(request.getHttpChannel().getRemoteAddress());
+ if (realRemoteAddress != null) {
+ String realRemoteHost = HostPort.normalizeHost(realRemoteAddress.getHostString());
+ int realRemotePort = realRemoteAddress.getPort();
+ if (!realRemoteHost.equals(request.getRemoteHost()) || realRemotePort != request.getRemotePort()) {
+ sb.append(" via ");
+ sb.append(realRemoteHost);
+ sb.append(':');
+ sb.append(realRemotePort);
+ }
+ }
+ sb.append("]->[L:");
+ InetSocketAddress realLocalAddress = lookupRealAddress(request.getHttpChannel().getLocalAddress());
+ if (realLocalAddress != null) {
+ String realLocalHost = HostPort.normalizeHost(realLocalAddress.getHostString());
+ int realLocalPort = realLocalAddress.getPort();
+ sb.append(realLocalHost);
+ sb.append(':');
+ sb.append(realLocalPort);
+ if (!realLocalHost.equals(request.getLocalAddr()) || realLocalPort != request.getLocalPort()) {
+ sb.append(" dst ");
+ sb.append(request.getLocalAddr());
+ sb.append(':');
+ sb.append(request.getLocalPort());
+ }
+ } else {
+ sb.append(request.getLocalAddr());
+ sb.append(':');
+ sb.append(request.getLocalPort());
+ }
+ sb.append(']');
+ try {
+ delegateLogWriter.write(sb.toString());
+ } catch (Exception e) {
+ log.warn("Failed to write request log", e);
+ }
+ }
+
+ private InetSocketAddress lookupRealAddress(InetSocketAddress socketAddress) {
+ if (socketAddress == null) {
+ return null;
+ }
+ if (proxyProtocolRealAddressMapping.isEmpty()) {
+ return socketAddress;
+ }
+ AddressEntry entry = proxyProtocolRealAddressMapping.get(new AddressKey(socketAddress.getHostString(),
+ socketAddress.getPort()));
+ if (entry != null) {
+ return entry.realAddress;
+ } else {
+ return socketAddress;
+ }
+ }
+
+ private final Connection.Listener proxyProtocolOriginalEndpointListener =
+ new ProxyProtocolOriginalEndpointListener();
+
+ private final ConcurrentHashMap proxyProtocolRealAddressMapping =
+ new ConcurrentHashMap<>();
+
+ // Use a record as key since InetSocketAddress hash code changes if the address gets resolved
+ record AddressKey(String hostString, int port) {
+
+ }
+
+ record AddressEntry(InetSocketAddress realAddress, AtomicInteger referenceCount) {
+
+ }
+
+ // Tracks the real addresses of remote and local when detailed addresses are enabled.
+ // This is necessary when Proxy Protocol is used to pass the original client IP.
+ // The Proxy Protocol implementation in Jetty wraps the original endpoint with a ProxyEndPoint
+ // and the real endpoint information isn't available in the request object.
+ // This listener is added to all connectors to track the real addresses of the client and server.
+ class ProxyProtocolOriginalEndpointListener implements Connection.Listener {
+ @Override
+ public void onOpened(Connection connection) {
+ handleConnection(connection, true);
+ }
+
+ @Override
+ public void onClosed(Connection connection) {
+ handleConnection(connection, false);
+ }
+
+ private void handleConnection(Connection connection, boolean increment) {
+ if (connection.getEndPoint() instanceof ProxyConnectionFactory.ProxyEndPoint) {
+ ProxyConnectionFactory.ProxyEndPoint proxyEndPoint =
+ (ProxyConnectionFactory.ProxyEndPoint) connection.getEndPoint();
+ EndPoint originalEndpoint = proxyEndPoint.unwrap();
+ mapAddress(proxyEndPoint.getLocalAddress(), originalEndpoint.getLocalAddress(), increment);
+ mapAddress(proxyEndPoint.getRemoteAddress(), originalEndpoint.getRemoteAddress(), increment);
+ }
+ }
+
+ private void mapAddress(InetSocketAddress current, InetSocketAddress real, boolean increment) {
+ // don't add the mapping if the current address is the same as the real address
+ if (real != null && current != null && current.equals(real)) {
+ return;
+ }
+ AddressKey key = new AddressKey(current.getHostString(), current.getPort());
+ proxyProtocolRealAddressMapping.compute(key, (__, entry) -> {
+ if (entry == null) {
+ if (increment) {
+ entry = new AddressEntry(real, new AtomicInteger(1));
+ }
+ } else {
+ if (increment) {
+ entry.referenceCount.incrementAndGet();
+ } else {
+ if (entry.referenceCount.decrementAndGet() == 0) {
+ // remove the entry if the reference count drops to 0
+ entry = null;
+ }
+ }
+ }
+ return entry;
+ });
+ }
+ }
}
}
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index e15e024ea8158..3548877912199 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -176,6 +176,13 @@
test
+
+ io.github.hakky54
+ consolecaptor
+ ${consolecaptor.version}
+ test
+
+
io.streamnative.oxia
oxia-testcontainers
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 7613a13db22de..96f3653ea9966 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -444,6 +444,9 @@ public CompletableFuture closeAsync() {
return closeFuture;
}
LOG.info("Closing PulsarService");
+ if (brokerService != null) {
+ brokerService.unloadNamespaceBundlesGracefully();
+ }
state = State.Closing;
// close the service in reverse order v.s. in which they are started
@@ -562,6 +565,11 @@ public CompletableFuture closeAsync() {
transactionBufferClient.close();
}
+ if (topicPoliciesService != null) {
+ topicPoliciesService.close();
+ topicPoliciesService = null;
+ }
+
if (client != null) {
client.close();
client = null;
@@ -992,7 +1000,7 @@ protected ManagedLedgerStorage newManagedLedgerClientFactory() throws Exception
@VisibleForTesting
protected PulsarResources newPulsarResources() {
PulsarResources pulsarResources = new PulsarResources(localMetadataStore, configurationMetadataStore,
- config.getMetadataStoreOperationTimeoutSeconds());
+ config.getMetadataStoreOperationTimeoutSeconds(), getExecutor());
pulsarResources.getClusterResources().getStore().registerListener(this::handleDeleteCluster);
return pulsarResources;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index a1bfeb2142ffc..45455f16d4dc1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -23,6 +23,7 @@
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -43,9 +44,11 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.service.plugin.InvalidEntryFilterException;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.Constants;
@@ -621,35 +624,82 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
private void internalCreatePartitionedTopicToReplicatedClustersInBackground(int numPartitions) {
getNamespaceReplicatedClustersAsync(namespaceName)
- .thenAccept(clusters -> {
- for (String cluster : clusters) {
- if (!cluster.equals(pulsar().getConfiguration().getClusterName())) {
- // this call happens in the background without async composition. completion is logged.
- pulsar().getPulsarResources().getClusterResources()
- .getClusterAsync(cluster)
- .thenCompose(clusterDataOp ->
- ((TopicsImpl) pulsar().getBrokerService()
- .getClusterPulsarAdmin(cluster,
- clusterDataOp).topics())
- .createPartitionedTopicAsync(
- topicName.getPartitionedTopicName(),
- numPartitions,
- true, null))
- .whenComplete((__, ex) -> {
- if (ex != null) {
- log.error(
- "[{}] Failed to create partitioned topic {} in cluster {}.",
- clientAppId(), topicName, cluster, ex);
- } else {
- log.info(
- "[{}] Successfully created partitioned topic {} in "
- + "cluster {}",
- clientAppId(), topicName, cluster);
- }
- });
- }
+ .thenAccept(clusters -> {
+ // this call happens in the background without async composition. completion is logged.
+ internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, numPartitions);
+ });
+ }
+
+ protected Map> internalCreatePartitionedTopicToReplicatedClustersInBackground(
+ Set clusters, int numPartitions) {
+ final String shortTopicName = topicName.getPartitionedTopicName();
+ Map> tasksForAllClusters = new HashMap<>();
+ for (String cluster : clusters) {
+ if (cluster.equals(pulsar().getConfiguration().getClusterName())) {
+ continue;
+ }
+ ClusterResources clusterResources = pulsar().getPulsarResources().getClusterResources();
+ CompletableFuture createRemoteTopicFuture = new CompletableFuture<>();
+ tasksForAllClusters.put(cluster, createRemoteTopicFuture);
+ clusterResources.getClusterAsync(cluster).whenComplete((clusterData, ex1) -> {
+ if (ex1 != null) {
+ // Unexpected error, such as NPE. Catch all error to avoid the "createRemoteTopicFuture" stuck.
+ log.error("[{}] An un-expected error occurs when trying to create partitioned topic {} in cluster"
+ + " {}.", clientAppId(), topicName, cluster, ex1);
+ createRemoteTopicFuture.completeExceptionally(new RestException(ex1));
+ return;
+ }
+ // Get cluster data success.
+ TopicsImpl topics =
+ (TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterData).topics();
+ topics.createPartitionedTopicAsync(shortTopicName, numPartitions, true, null)
+ .whenComplete((ignore, ex2) -> {
+ if (ex2 == null) {
+ // Create success.
+ log.info("[{}] Successfully created partitioned topic {} in cluster {}",
+ clientAppId(), topicName, cluster);
+ createRemoteTopicFuture.complete(null);
+ return;
+ }
+ // Create topic on the remote cluster error.
+ Throwable unwrapEx2 = FutureUtil.unwrapCompletionException(ex2);
+ // The topic has been created before, check the partitions count is expected.
+ if (unwrapEx2 instanceof PulsarAdminException.ConflictException) {
+ topics.getPartitionedTopicMetadataAsync(shortTopicName).whenComplete((topicMeta, ex3) -> {
+ if (ex3 != null) {
+ // Unexpected error, such as NPE. Catch all error to avoid the
+ // "createRemoteTopicFuture" stuck.
+ log.error("[{}] Failed to check remote-cluster's topic metadata when creating"
+ + " partitioned topic {} in cluster {}.",
+ clientAppId(), topicName, cluster, ex3);
+ createRemoteTopicFuture.completeExceptionally(new RestException(ex3));
+ }
+ // Call get partitioned metadata of remote cluster success.
+ if (topicMeta.partitions == numPartitions) {
+ log.info("[{}] Skip created partitioned topic {} in cluster {}, because that {}",
+ clientAppId(), topicName, cluster, unwrapEx2.getMessage());
+ createRemoteTopicFuture.complete(null);
+ } else {
+ String errorMsg = String.format("[%s] There is an exists topic %s with different"
+ + " partitions %s on the remote cluster %s, you want to create it"
+ + " with partitions %s",
+ clientAppId(), shortTopicName, topicMeta.partitions, cluster,
+ numPartitions);
+ log.error(errorMsg);
+ createRemoteTopicFuture.completeExceptionally(
+ new RestException(Status.PRECONDITION_FAILED, errorMsg));
+ }
+ });
+ } else {
+ // An HTTP error was responded from the remote cluster.
+ log.error("[{}] Failed to create partitioned topic {} in cluster {}.",
+ clientAppId(), topicName, cluster, ex2);
+ createRemoteTopicFuture.completeExceptionally(new RestException(unwrapEx2));
}
});
+ });
+ }
+ return tasksForAllClusters;
}
/**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index bbadc7bb3316d..5f2dccc3e9c24 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2019,7 +2019,7 @@ protected void internalSetMaxUnackedMessagesPerConsumer(Integer maxUnackedMessag
}
protected void internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic){
- validateNamespacePolicyOperationAsync(namespaceName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE);
+ validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
if (maxSubscriptionsPerTopic != null && maxSubscriptionsPerTopic < 0) {
throw new RestException(Status.PRECONDITION_FAILED,
@@ -2125,9 +2125,10 @@ protected CompletableFuture internalSetOffloadThresholdInSecondsAsync(long
f.complete(null);
})
.exceptionally(t -> {
+ Throwable cause = FutureUtil.unwrapCompletionException(t);
log.error("[{}] Failed to update offloadThresholdInSeconds configuration for namespace {}",
clientAppId(), namespaceName, t);
- f.completeExceptionally(new RestException(t));
+ f.completeExceptionally(new RestException(cause));
return null;
});
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 1f8d06571908e..682f41dcdb61f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -205,6 +205,7 @@ protected CompletableFuture> internalGetPartitionedTopicListAsync()
protected CompletableFuture