Skip to content

Commit

Permalink
Fix: Cast exception occurs if function/source/sink type is ByteBuffer (
Browse files Browse the repository at this point in the history
…#11611)

Co-authored-by: Jerry Peng <jerryp@splunk.com>
  • Loading branch information
jerrypeng and Jerry Peng committed Aug 10, 2021
1 parent f81824d commit b7e027b
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 6 deletions.
Expand Up @@ -37,6 +37,7 @@
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -74,19 +75,20 @@
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.LocalRunner;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -851,7 +853,7 @@ public void testPulsarSourceLocalRunMultipleInstances() throws Throwable {
runWithNarClassLoader(() -> testPulsarSourceLocalRun(null, 2));
}

private void testPulsarSinkLocalRun(String jarFilePathUrl, int parallelism) throws Exception {
private void testPulsarSinkLocalRun(String jarFilePathUrl, int parallelism, String className) throws Exception {
final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
final String sourceTopic = "persistent://" + replNamespace + "/input";
Expand All @@ -869,7 +871,9 @@ private void testPulsarSinkLocalRun(String jarFilePathUrl, int parallelism) thro
SinkConfig sinkConfig = createSinkConfig(tenant, namespacePortion, sinkName, sourceTopic, subscriptionName);

sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic, ConsumerConfig.builder().receiverQueueSize(1000).build()));
if (jarFilePathUrl == null || !jarFilePathUrl.endsWith(".nar")) {
if (className != null) {
sinkConfig.setClassName(className);
} else if (jarFilePathUrl == null || !jarFilePathUrl.endsWith(".nar")) {
sinkConfig.setClassName("org.apache.pulsar.io.datagenerator.DataGeneratorPrintSink");
}

Expand Down Expand Up @@ -935,6 +939,14 @@ private void testPulsarSinkLocalRun(String jarFilePathUrl, int parallelism) thro
if (m != null) {
metricsMap.put(m.tags.get("instance_id"), m);
}
} else if (line.startsWith("pulsar_sink_sink_exceptions_total")) {
Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(line);
assertFalse(metrics.isEmpty());
PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_sink_sink_exceptions_total");
if (m == null) {
m = metrics.get("pulsar_sink_sink_exceptions_total_1min");
}
assertEquals(m.value, 0);
}
});
Assert.assertEquals(metricsMap.size(), parallelism);
Expand Down Expand Up @@ -972,7 +984,11 @@ private void testPulsarSinkLocalRun(String jarFilePathUrl, int parallelism) thro
}

private void testPulsarSinkLocalRun(String jarFilePathUrl) throws Exception {
testPulsarSourceLocalRun(jarFilePathUrl, 1);
testPulsarSinkLocalRun(jarFilePathUrl, 1);
}

private void testPulsarSinkLocalRun(String jarFilePathUrl, int parallelism) throws Exception {
testPulsarSinkLocalRun(jarFilePathUrl, parallelism, null);
}

@Test(timeOut = 20000, groups = "builtin")
Expand Down Expand Up @@ -1001,6 +1017,31 @@ public void testPulsarSinkStatsMultipleInstances() throws Throwable {
runWithNarClassLoader(() -> testPulsarSinkLocalRun(null, 2));
}

public static class StatsNullSink implements Sink<ByteBuffer> {
volatile long bytesTotal = 0;

@Override
public void open(Map map, final SinkContext sinkContext) throws Exception {

}

@Override
public void write(Record<ByteBuffer> record) throws Exception {
bytesTotal += record.getValue().capacity();
record.ack();
}

@Override
public void close() throws Exception {

}
}

@Test
public void test() throws Throwable{
runWithNarClassLoader(() -> testPulsarSinkLocalRun(null, 1, StatsNullSink.class.getName()));
}

private void runWithNarClassLoader(Assert.ThrowingRunnable throwingRunnable) throws Throwable {
ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
try (NarClassLoader classLoader = NarClassLoader.getFromArchive(getPulsarIODataGeneratorNar(), Collections.emptySet(), originalClassLoader, NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR)) {
Expand Down
Expand Up @@ -145,7 +145,11 @@ private static <T> Schema<T> newSchemaInstance(Class<T> clazz, SchemaType type)
private static <T> Schema<T> newSchemaInstance(Class<T> clazz, SchemaType type, ConsumerConfig conf) {
switch (type) {
case NONE:
return (Schema<T>) Schema.BYTES;
if (ByteBuffer.class.isAssignableFrom(clazz)) {
return (Schema<T>) Schema.BYTEBUFFER;
} else {
return (Schema<T>) Schema.BYTES;
}

case AUTO_CONSUME:
case AUTO:
Expand Down

0 comments on commit b7e027b

Please sign in to comment.