Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test long vals #717

Merged
merged 15 commits into from Apr 1, 2022
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Expand Up @@ -29,7 +29,7 @@ jobs:
DAPR_RUNTIME_VER: 1.6.0-rc.2
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.6.0-rc.1/install/install.sh
DAPR_CLI_REF:
DAPR_REF: 5a307f3deaa1b322f7945179adad0403de80eb7e
DAPR_REF: 4cf499448ef6ee87c83db6a11b84e83237e92665
steps:
- uses: actions/checkout@v3
- name: Set up OpenJDK ${{ env.JDK_VER }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/validate.yml
Expand Up @@ -40,7 +40,7 @@ jobs:
DAPR_RUNTIME_VER: 1.6.0-rc.2
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.6.0-rc.1/install/install.sh
DAPR_CLI_REF:
DAPR_REF: 5a307f3deaa1b322f7945179adad0403de80eb7e
DAPR_REF: 4cf499448ef6ee87c83db6a11b84e83237e92665
steps:
- uses: actions/checkout@v3
- name: Set up OpenJDK ${{ env.JDK_VER }}
Expand Down
101 changes: 101 additions & 0 deletions sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java
Expand Up @@ -26,6 +26,7 @@
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.utils.TypeRef;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand All @@ -35,8 +36,13 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.Set;

import static io.dapr.it.Retry.callWithRetry;
import static io.dapr.it.TestUtils.assertThrowsDaprException;
Expand All @@ -51,6 +57,7 @@ public class PubSubIT extends BaseIT {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private static final TypeRef<List<CloudEvent>> CLOUD_EVENT_LIST_TYPE_REF = new TypeRef<>() {};
private static final TypeRef<List<CloudEvent<ConvertToLong>>> CLOUD_EVENT_LONG_LIST_TYPE_REF = new TypeRef<>() {};
private static final TypeRef<List<CloudEvent<MyObject>>> CLOUD_EVENT_MYOBJECT_LIST_TYPE_REF = new TypeRef<>() {};

//Number of messages to be sent: 10
Expand All @@ -66,6 +73,8 @@ public class PubSubIT extends BaseIT {
// Topic to test binary data
private static final String BINARY_TOPIC_NAME = "binarytopic";

private static final String LONG_TOPIC_NAME = "testinglongvalues";

/**
* Parameters for this test.
* Param #1: useGrpc.
Expand Down Expand Up @@ -402,6 +411,71 @@ public void testPubSubTTLMetadata() throws Exception {
daprRun.stop();
}

@Test
public void testLongValues() throws Exception {
final DaprRun daprRun = closeLater(startDaprApp(
this.getClass().getSimpleName(),
SubscriberService.SUCCESS_MESSAGE,
SubscriberService.class,
true,
60000));
// At this point, it is guaranteed that the service above is running and all ports being listened to.
if (this.useGrpc) {
daprRun.switchToGRPC();
} else {
daprRun.switchToHTTP();
}

Random random = new Random(590518626939830271L);
Set<ConvertToLong> values = new HashSet<>();
values.add(new ConvertToLong().setVal(590518626939830271L));
ConvertToLong val;
for (int i = 0; i < NUM_MESSAGES - 1; i++) {
do {
val = new ConvertToLong().setVal(random.nextLong());
} while (values.contains(val));
values.add(val);
}
Iterator<ConvertToLong> valuesIt = values.iterator();
try (DaprClient client = new DaprClientBuilder().build()) {
for (int i = 0; i < NUM_MESSAGES; i++) {
ConvertToLong value = valuesIt.next();
System.out.println("The long value sent " + value.getValue());
//Publishing messages
client.publishEvent(
PUBSUB_NAME,
LONG_TOPIC_NAME,
value,
Collections.singletonMap(Metadata.TTL_IN_SECONDS, "1")).block();

try {
Thread.sleep((long) (1000 * Math.random()));
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
return;
}
}
}

Set<ConvertToLong> actual = new HashSet<>();
try (DaprClient client = new DaprClientBuilder().build()) {
callWithRetry(() -> {
System.out.println("Checking results for topic " + LONG_TOPIC_NAME);
final List<CloudEvent<ConvertToLong>> messages = client.invokeMethod(
daprRun.getAppName(),
"messages/testinglongvalues",
null,
HttpExtension.GET, CLOUD_EVENT_LONG_LIST_TYPE_REF).block();
Assert.assertNotNull(messages);
for (CloudEvent<ConvertToLong> message : messages) {
actual.add(message.getData());
}
}, 2000);
Assert.assertEquals(values, actual);
}
}

public static class MyObject {
private String id;

Expand All @@ -413,4 +487,31 @@ public void setId(String id) {
this.id = id;
}
}

public static class ConvertToLong {
private Long value;

public ConvertToLong setVal(Long value) {
this.value = value;
return this;
}

public Long getValue() {
return value;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ConvertToLong that = (ConvertToLong) o;
return Objects.equals(value, that.value);
}

@Override
public int hashCode() {
return Objects.hash(value);
}
}

}
Expand Up @@ -115,6 +115,20 @@ public Mono<Void> handleMessageTTLTopic(@RequestBody(required = false) CloudEven
});
}

@Topic(name = "testinglongvalues", pubsubName = "messagebus")
@PostMapping(path = "/testinglongvalues")
public Mono<Void> handleMessageLongValues(@RequestBody(required = false) CloudEvent<PubSubIT.ConvertToLong> cloudEvent) {
return Mono.fromRunnable(() -> {
try {
Long message = cloudEvent.getData().getValue();
System.out.println("Subscriber got: " + message);
messagesByTopic.compute("testinglongvalues", merge(cloudEvent));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

private BiFunction<String, List<CloudEvent<?>>, List<CloudEvent<?>>> merge(final CloudEvent<?> item) {
return (key, value) -> {
final List<CloudEvent<?>> list = value == null ? new ArrayList<>() : value;
Expand Down