From ee0f69ad44fdb2eb3b2b96be813f509b4c967fbe Mon Sep 17 00:00:00 2001 From: Malla Sandeep Date: Wed, 17 Apr 2024 15:41:40 +0530 Subject: [PATCH 1/2] Added support for pulsar.service.url in source and sink configs for LocalRunner --- .../java/org/apache/pulsar/functions/LocalRunner.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java index 3b1c86a68c285..71b3bd23c14d4 100644 --- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java +++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java @@ -441,6 +441,15 @@ public void start(boolean blocking) throws Exception { String serviceUrl = DEFAULT_SERVICE_URL; if (brokerServiceUrl != null) { serviceUrl = brokerServiceUrl; + } else { + if (sourceConfig != null && sourceConfig.getConfigs() != null + && sourceConfig.getConfigs().get("pulsar.service.url") != null){ + serviceUrl = (String) sourceConfig.getConfigs().get("pulsar.service.url"); + } + if (sinkConfig != null && sinkConfig.getConfigs() != null + && sinkConfig.getConfigs().get("pulsar.service.url") != null){ + serviceUrl = (String) sinkConfig.getConfigs().get("pulsar.service.url"); + } } if (webServiceUrl == null) { webServiceUrl = DEFAULT_WEB_SERVICE_URL; From c5082ed0c19295b41e7424166bcec5dca292ab9d Mon Sep 17 00:00:00 2001 From: Malla Sandeep Date: Wed, 17 Apr 2024 15:48:24 +0530 Subject: [PATCH 2/2] refactoring added spaces where required --- .../main/java/org/apache/pulsar/functions/LocalRunner.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java index 71b3bd23c14d4..c19a8acd4b71f 100644 --- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java +++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java @@ -443,11 +443,11 @@ public void start(boolean blocking) throws Exception { serviceUrl = brokerServiceUrl; } else { if (sourceConfig != null && sourceConfig.getConfigs() != null - && sourceConfig.getConfigs().get("pulsar.service.url") != null){ + && sourceConfig.getConfigs().get("pulsar.service.url") != null) { serviceUrl = (String) sourceConfig.getConfigs().get("pulsar.service.url"); } if (sinkConfig != null && sinkConfig.getConfigs() != null - && sinkConfig.getConfigs().get("pulsar.service.url") != null){ + && sinkConfig.getConfigs().get("pulsar.service.url") != null) { serviceUrl = (String) sinkConfig.getConfigs().get("pulsar.service.url"); } }