Skip to content

Commit

Permalink
[Sink] Fix the marshal and unmarshal the sink config (apache#12625)
Browse files Browse the repository at this point in the history
### Motivation

Recently, I tests the sink feature by `pulsarctl`, when getting sink info, found the following issues:

- Cannot get the `cleanupSubscription` field correctly
- Cannot get the `processingGuarantees` field correctly
- The `resources` always `null`


### Modifications

- Set value to `cleanupSubscription` field in `SinkConfig.convertFromDetails()`
- Set value to `resources` field in `SinkConfig.convertFromDetails()`
- Improves the set value to `processingGuarantees` in `SinkConfig.convertFromDetails()`
- Explicitly set the value for `processingGuarantees` 
- Added unit tests
  • Loading branch information
nodece authored and fangxiaobing committed Dec 19, 2021
1 parent 889c6c5 commit 3cbe7d8
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetail
if (sinkConfig.getProcessingGuarantees() != null) {
functionDetailsBuilder.setProcessingGuarantees(
convertProcessingGuarantee(sinkConfig.getProcessingGuarantees()));
} else {
functionDetailsBuilder.setProcessingGuarantees(Function.ProcessingGuarantees.ATLEAST_ONCE);
}

// set source spec
Expand Down Expand Up @@ -283,19 +285,24 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
if (!isEmpty(functionDetails.getSource().getSubscriptionName())) {
sinkConfig.setSourceSubscriptionName(functionDetails.getSource().getSubscriptionName());
}
if (functionDetails.getSource().getSubscriptionType() == Function.SubscriptionType.FAILOVER) {
sinkConfig.setRetainOrdering(true);
sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
} else {
sinkConfig.setRetainOrdering(false);
sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);

switch (functionDetails.getSource().getSubscriptionType()) {
case FAILOVER:
sinkConfig.setRetainOrdering(true);
break;
default:
sinkConfig.setRetainOrdering(false);
}

sinkConfig.setProcessingGuarantees(convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));

sinkConfig.setAutoAck(functionDetails.getAutoAck());
sinkConfig.setCleanupSubscription(functionDetails.getSource().getCleanupSubscription());

// Set subscription position
sinkConfig.setSourceSubscriptionPosition(
convertFromFunctionDetailsSubscriptionPosition(functionDetails.getSource().getSubscriptionPosition()));
sinkConfig.setCleanupSubscription(functionDetails.getSource().getCleanupSubscription());

if (functionDetails.getSource().getTimeoutMs() != 0) {
sinkConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
Expand Down Expand Up @@ -331,6 +338,7 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
resources.setCpu(functionDetails.getResources().getCpu());
resources.setRam(functionDetails.getResources().getRam());
resources.setDisk(functionDetails.getResources().getDisk());
sinkConfig.setResources(resources);
}

if (isNotBlank(functionDetails.getRuntimeFlags())) {
Expand Down Expand Up @@ -581,6 +589,10 @@ public static SinkConfig validateUpdate(SinkConfig existingConfig, SinkConfig ne
if (!StringUtils.isEmpty(newConfig.getCustomRuntimeOptions())) {
mergedConfig.setCustomRuntimeOptions(newConfig.getCustomRuntimeOptions());
}
if (newConfig.getCleanupSubscription() != null) {
mergedConfig.setCleanupSubscription(newConfig.getCleanupSubscription());
}

return mergedConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.functions.utils;

import com.google.common.collect.Lists;
import com.google.gson.Gson;
import lombok.Data;
import lombok.NoArgsConstructor;
Expand Down Expand Up @@ -46,8 +47,11 @@
import java.nio.file.Files;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE;
import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.ATMOST_ONCE;
import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE;
import static org.mockito.ArgumentMatchers.any;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
Expand Down Expand Up @@ -105,12 +109,59 @@ public void testConvertBackFidelity() throws IOException {
sinkConfig.setCleanupSubscription(false);
sinkConfig.setTimeoutMs(2000l);
sinkConfig.setRuntimeFlags("-DKerberos");
sinkConfig.setCleanupSubscription(true);

sinkConfig.setResources(Resources.getDefaultResources());

Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null));
SinkConfig convertedConfig = SinkConfigUtils.convertFromDetails(functionDetails);
assertEquals(
new Gson().toJson(sinkConfig),
new Gson().toJson(convertedConfig)
new Gson().toJson(convertedConfig),
new Gson().toJson(sinkConfig)
);
}

@Test
public void testParseRetainOrderingField() throws IOException {
List<Boolean> testcases = Lists.newArrayList(true, false, null);
for (Boolean testcase : testcases) {
SinkConfig sinkConfig = createSinkConfig();
sinkConfig.setRetainOrdering(testcase);
Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null));
SinkConfig result = SinkConfigUtils.convertFromDetails(functionDetails);
assertEquals(result.getRetainOrdering(), testcase != null ? testcase : Boolean.valueOf(false));
}
}

@Test
public void testParseProcessingGuaranteesField() throws IOException {
List<FunctionConfig.ProcessingGuarantees> testcases = Lists.newArrayList(
EFFECTIVELY_ONCE,
ATMOST_ONCE,
ATLEAST_ONCE,
null
);

for (FunctionConfig.ProcessingGuarantees testcase : testcases) {
SinkConfig sinkConfig = createSinkConfig();
sinkConfig.setProcessingGuarantees(testcase);
Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null));
SinkConfig result = SinkConfigUtils.convertFromDetails(functionDetails);
assertEquals(result.getProcessingGuarantees(), testcase == null ? ATLEAST_ONCE : testcase);
}
}

@Test
public void testCleanSubscriptionField() throws IOException {
List<Boolean> testcases = Lists.newArrayList(true, false, null);

for (Boolean testcase : testcases) {
SinkConfig sinkConfig = createSinkConfig();
sinkConfig.setCleanupSubscription(testcase);
Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null));
SinkConfig result = SinkConfigUtils.convertFromDetails(functionDetails);
assertEquals(result.getCleanupSubscription(), testcase == null ? Boolean.valueOf(true) : testcase);
}
}

@Test
Expand Down Expand Up @@ -339,6 +390,22 @@ public void testMergeRuntimeFlags() {
);
}

@Test
public void testMergeDifferentCleanupSubscription() {
SinkConfig sinkConfig = createSinkConfig();
SinkConfig newSinkConfig = createUpdatedSinkConfig("cleanupSubscription", false);
SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig);
assertEquals(
mergedConfig.getCleanupSubscription().booleanValue(),
false
);
mergedConfig.setCleanupSubscription(sinkConfig.getCleanupSubscription());
assertEquals(
new Gson().toJson(sinkConfig),
new Gson().toJson(mergedConfig)
);
}

@Test
public void testValidateConfig() throws IOException {
mockStatic(ConnectorUtils.class);
Expand Down Expand Up @@ -384,6 +451,7 @@ private SinkConfig createSinkConfig() {
sinkConfig.setTimeoutMs(2000l);
sinkConfig.setCleanupSubscription(true);
sinkConfig.setArchive("DummyArchive.nar");
sinkConfig.setCleanupSubscription(true);
return sinkConfig;
}

Expand Down

0 comments on commit 3cbe7d8

Please sign in to comment.