Skip to content

Commit

Permalink
test: streamSetAll
Browse files Browse the repository at this point in the history
Signed-off-by: Jeronimo Irazabal <jeronimo.irazabal@gmail.com>
  • Loading branch information
jeroiraz committed Nov 25, 2022
1 parent afc9380 commit f057c14
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 2 deletions.
10 changes: 8 additions & 2 deletions src/main/java/io/codenotary/immudb4j/ImmuClient.java
Expand Up @@ -1256,7 +1256,7 @@ public synchronized TxHeader streamSet(byte[] key, byte[] value)
return TxHeader.valueOf(txHdr);
}

public synchronized TxHeader streamSetAll(List<KVPair> kvList) throws InterruptedException {
public synchronized TxHeader streamSetAll(List<KVPair> kvList) throws InterruptedException, CorruptedDataException {
final LatchHolder<ImmudbProto.TxHeader> latchHolder = new LatchHolder<>();
final StreamObserver<Chunk> streamObserver = nonBlockingStub.streamSet(txHeaderStreamObserver(latchHolder));

Expand All @@ -1266,8 +1266,14 @@ public synchronized TxHeader streamSetAll(List<KVPair> kvList) throws Interrupte
}

streamObserver.onCompleted();

final ImmudbProto.TxHeader txHdr = latchHolder.awaitValue();

if (txHdr.getNentries() != kvList.size()) {
throw new CorruptedDataException();
}

return TxHeader.valueOf(latchHolder.awaitValue());
return TxHeader.valueOf(txHdr);
}

//
Expand Down
63 changes: 63 additions & 0 deletions src/test/java/io/codenotary/immudb4j/StreamSetAllTest.java
@@ -0,0 +1,63 @@
/*
Copyright 2022 CodeNotary, Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package io.codenotary.immudb4j;

import io.codenotary.immudb4j.exceptions.CorruptedDataException;
import org.testng.Assert;
import org.testng.annotations.Test;

import java.util.Arrays;
import java.util.List;

public class StreamSetAllTest extends ImmuClientIntegrationTest {

@Test(testName = "setAll & getAll")
public void t1() {
immuClient.openSession("defaultdb", "immudb", "immudb");

String key1 = "sga-key1";
byte[] val1 = new byte[] { 1 };
String key2 = "sga-key2";
byte[] val2 = new byte[] { 2, 3 };
String key3 = "sga-key3";
byte[] val3 = new byte[] { 3, 4, 5 };

final List<KVPair> kvs = KVListBuilder.newBuilder()
.add(new KVPair(key1, val1))
.add(new KVPair(key2, val2))
.add(new KVPair(key3, val3))
.entries();

try {
TxHeader txHdr = immuClient.streamSetAll(kvs);
Assert.assertNotNull(txHdr);
} catch (InterruptedException|CorruptedDataException e) {
Assert.fail("Failed at SetAll.", e);
}

List<String> keys = Arrays.asList(key1, key2, key3);
List<Entry> got = immuClient.getAll(keys);

Assert.assertEquals(kvs.size(), got.size());

for (int i = 0; i < kvs.size(); i++) {
Assert.assertEquals(got.get(i).getValue(), kvs.get(i).getValue(), String.format("Expected: %s got: %s", kvs.get(i), got.get(i)));
}

immuClient.closeSession();
}

}
1 change: 1 addition & 0 deletions tests.sh
Expand Up @@ -25,6 +25,7 @@ TESTS="${TESTS} ScanTest"
TESTS="${TESTS} SetAllAndGetAllTest"
TESTS="${TESTS} SetAndGetTest"
TESTS="${TESTS} StreamSetAndGetTest"
TESTS="${TESTS} StreamSetAllTest"
TESTS="${TESTS} ShutdownTest"
TESTS="${TESTS} StateTest"
TESTS="${TESTS} TxTest"
Expand Down

0 comments on commit f057c14

Please sign in to comment.