Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use java 8 api instead of exposing guava types #30

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ dependencies {
testImplementation "io.grpc:grpc-testing:${grpcVersion}"
testImplementation "org.junit.jupiter:junit-jupiter-api:${jUnitVersion}"
testImplementation "org.mockito:mockito-core:3.4.0"
testImplementation "org.assertj:assertj-core:3.25.3"
testImplementation "org.slf4j:slf4j-nop:${slf4jVersion}"
testImplementation "org.testcontainers:qdrant:${testcontainersVersion}"
testImplementation "org.testcontainers:junit-jupiter:${testcontainersVersion}"
Expand Down
846 changes: 370 additions & 476 deletions src/main/java/io/qdrant/client/QdrantClient.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright © 2014-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.qdrant.client.futureconverter;


import com.google.common.util.concurrent.ListenableFuture;

import java.util.concurrent.CompletableFuture;

/**
* Converts between {@link java.util.concurrent.CompletableFuture} and Guava {@link com.google.common.util.concurrent.ListenableFuture}.
*/
public class FutureConverter {

/**
* Converts {@link java.util.concurrent.CompletableFuture} to {@link com.google.common.util.concurrent.ListenableFuture}.
*/
public static <T> ListenableFuture<T> toListenableFuture(CompletableFuture<T> completableFuture) {
return GuavaFutureUtils.createListenableFuture(Java8FutureUtils.createValueSourceFuture(completableFuture));
}

/**
* Converts {@link com.google.common.util.concurrent.ListenableFuture} to {@link java.util.concurrent.CompletableFuture}.
*/
public static <T> CompletableFuture<T> toCompletableFuture(ListenableFuture<T> listenableFuture) {
return Java8FutureUtils.createCompletableFuture(GuavaFutureUtils.createValueSourceFuture(listenableFuture));
}
}
64 changes: 64 additions & 0 deletions src/main/java/io/qdrant/client/futureconverter/FutureWrapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright © 2014-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.qdrant.client.futureconverter;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Wraps future and delegates to wrapped.
*/
public class FutureWrapper<T> implements Future<T> {
private final Future<T> wrappedFuture;

protected FutureWrapper(Future<T> wrappedFuture) {
if (wrappedFuture == null) {
throw new NullPointerException("Wrapped future can not be null");
}
this.wrappedFuture = wrappedFuture;
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return wrappedFuture.cancel(mayInterruptIfRunning);
}

@Override
public boolean isCancelled() {
return wrappedFuture.isCancelled();
}

@Override
public boolean isDone() {
return wrappedFuture.isDone();
}

@Override
public T get() throws InterruptedException, ExecutionException {
return wrappedFuture.get();
}

@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return wrappedFuture.get(timeout, unit);
}

protected Future<T> getWrappedFuture() {
return wrappedFuture;
}
}
153 changes: 153 additions & 0 deletions src/main/java/io/qdrant/client/futureconverter/GuavaFutureUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Copyright © 2014-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.qdrant.client.futureconverter;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;

import java.util.concurrent.Executor;
import java.util.function.Consumer;


public class GuavaFutureUtils {
// *************************************** Converting to ListenableFuture ******************************************

/**
* Creates listenable future from ValueSourceFuture. We have to send all Future API calls to ValueSourceFuture.
*/
public static <T> ListenableFuture<T> createListenableFuture(ValueSourceFuture<T> valueSourceFuture) {
if (valueSourceFuture instanceof ListenableFutureBackedValueSourceFuture) {
return ((ListenableFutureBackedValueSourceFuture<T>) valueSourceFuture).getWrappedFuture();
} else {
return new ValueSourceFutureBackedListenableFuture<>(valueSourceFuture);
}
}

public static <T> ListenableFuture<T> createListenableFuture(ValueSource<T> valueSource) {
if (valueSource instanceof ListenableFutureBackedValueSourceFuture) {
return ((ListenableFutureBackedValueSourceFuture<T>) valueSource).getWrappedFuture();
} else {
return new ValueSourceBackedListenableFuture<>(valueSource);
}
}

/**
* If we have ValueSourceFuture, we can use it as the implementation and this class only converts
* listener registration.
*/
private static class ValueSourceFutureBackedListenableFuture<T> extends FutureWrapper<T> implements ListenableFuture<T> {
ValueSourceFutureBackedListenableFuture(ValueSourceFuture<T> valueSourceFuture) {
super(valueSourceFuture);
}

@Override
protected ValueSourceFuture<T> getWrappedFuture() {
return (ValueSourceFuture<T>) super.getWrappedFuture();
}

@Override
public void addListener(Runnable listener, Executor executor) {
getWrappedFuture().addCallbacks(value -> executor.execute(listener), ex -> executor.execute(listener));
}
}


/**
* If we only get ValueSource we have to create a ValueSourceFuture. Here we wrap Guavas SettableFuture
* and use it for listener handling and value storage.
*/
private static class ValueSourceBackedListenableFuture<T> extends FutureWrapper<T> implements ListenableFuture<T> {
private final ValueSource<T> valueSource;

private ValueSourceBackedListenableFuture(ValueSource<T> valueSource) {
super(com.google.common.util.concurrent.SettableFuture.create());
this.valueSource = valueSource;
valueSource.addCallbacks(value -> getWrappedFuture().set(value), ex -> getWrappedFuture().setException(ex));
}

@Override
public void addListener(Runnable listener, Executor executor) {
getWrappedFuture().addListener(listener, executor);
}

@Override
protected com.google.common.util.concurrent.SettableFuture<T> getWrappedFuture() {
return (com.google.common.util.concurrent.SettableFuture<T>) super.getWrappedFuture();
}


@Override
public boolean cancel(boolean mayInterruptIfRunning) {
valueSource.cancel(mayInterruptIfRunning);
return super.cancel(mayInterruptIfRunning);
}

private ValueSource<T> getValueSource() {
return valueSource;
}
}


// *************************************** Converting from ListenableFuture ******************************************
public static <T> ValueSourceFuture<T> createValueSourceFuture(ListenableFuture<T> listenableFuture) {
if (listenableFuture instanceof ValueSourceFutureBackedListenableFuture) {
return ((ValueSourceFutureBackedListenableFuture<T>) listenableFuture).getWrappedFuture();
} else {
return new ListenableFutureBackedValueSourceFuture<>(listenableFuture);
}
}

public static <T> ValueSource<T> createValueSource(ListenableFuture<T> listenableFuture) {
if (listenableFuture instanceof ValueSourceBackedListenableFuture) {
return ((ValueSourceBackedListenableFuture<T>) listenableFuture).getValueSource();
} else {
return new ListenableFutureBackedValueSourceFuture<>(listenableFuture);
}
}

/**
* Wraps ListenableFuture and exposes it as ValueSourceFuture.
*/
private static class ListenableFutureBackedValueSourceFuture<T> extends ValueSourceFuture<T> {
private ListenableFutureBackedValueSourceFuture(ListenableFuture<T> wrappedFuture) {
super(wrappedFuture);
}

@Override
public void addCallbacks(Consumer<T> successCallback, Consumer<Throwable> failureCallback) {
Futures.addCallback(getWrappedFuture(), new FutureCallback<T>() {
@Override
public void onSuccess(T result) {
successCallback.accept(result);
}

@Override
public void onFailure(Throwable t) {
failureCallback.accept(t);

}
}, MoreExecutors.directExecutor());
}


@Override
protected ListenableFuture<T> getWrappedFuture() {
return (ListenableFuture<T>) super.getWrappedFuture();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright © 2014-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.qdrant.client.futureconverter;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

public class Java8FutureUtils {

public static <T> CompletableFuture<T> createCompletableFuture(ValueSource<T> valueSource) {
if (valueSource instanceof CompletableFuturebackedValueSource) {
return ((CompletableFuturebackedValueSource<T>) valueSource).getWrappedFuture();
} else {
return new ValueSourcebackedCompletableFuture<T>(valueSource);
}
}

public static <T> ValueSourceFuture<T> createValueSourceFuture(CompletableFuture<T> completableFuture) {
if (completableFuture instanceof ValueSourcebackedCompletableFuture &&
((ValueSourcebackedCompletableFuture<T>) completableFuture).getValueSource() instanceof ValueSourceFuture) {
return (ValueSourceFuture<T>) ((ValueSourcebackedCompletableFuture<T>) completableFuture).getValueSource();
} else {
return new CompletableFuturebackedValueSource<>(completableFuture);
}
}

public static <T> ValueSource<T> createValueSource(CompletableFuture<T> completableFuture) {
if (completableFuture instanceof ValueSourcebackedCompletableFuture) {
return ((ValueSourcebackedCompletableFuture<T>) completableFuture).getValueSource();
} else {
return new CompletableFuturebackedValueSource<>(completableFuture);
}
}

/**
* CompletableFuture that takes values from the ValueSource. CompletableFuture is a class, not
* an interface so we can not just forward events from the ValueSource, we to always instantiate the class.
*/
private static final class ValueSourcebackedCompletableFuture<T> extends CompletableFuture<T> {
private final ValueSource<T> valueSource;

private ValueSourcebackedCompletableFuture(ValueSource<T> valueSource) {
this.valueSource = valueSource;
valueSource.addCallbacks(this::complete, this::completeExceptionally);
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (isDone()) {
return false;
}
boolean result = valueSource.cancel(mayInterruptIfRunning);
super.cancel(mayInterruptIfRunning);
return result;
}

private ValueSource<T> getValueSource() {
return valueSource;
}
}

private static final class CompletableFuturebackedValueSource<T> extends ValueSourceFuture<T> {
private CompletableFuturebackedValueSource(CompletableFuture<T> completableFuture) {
super(completableFuture);
}


@Override
public void addCallbacks(Consumer<T> successCallback, Consumer<Throwable> failureCallback) {
getWrappedFuture().whenComplete((v, t) -> {
if (t == null) {
successCallback.accept(v);
} else {
failureCallback.accept(t);
}
});
}

@Override
protected CompletableFuture<T> getWrappedFuture() {
return (CompletableFuture<T>) super.getWrappedFuture();
}
}
}