Skip to content

Commit

Permalink
Implement OIDC auth for async (#1131)
Browse files Browse the repository at this point in the history
  • Loading branch information
katcharov committed Nov 8, 2023
1 parent 60d2f06 commit dd9c52b
Show file tree
Hide file tree
Showing 17 changed files with 1,248 additions and 94 deletions.
36 changes: 0 additions & 36 deletions driver-core/src/main/com/mongodb/assertions/Assertions.java
Expand Up @@ -17,7 +17,6 @@

package com.mongodb.assertions;

import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.lang.Nullable;

import java.util.Collection;
Expand Down Expand Up @@ -78,25 +77,6 @@ public static <T> Iterable<T> notNullElements(final String name, final Iterable<
return values;
}

/**
* Throw IllegalArgumentException if the value is null.
*
* @param name the parameter name
* @param value the value that should not be null
* @param callback the callback that also is passed the exception if the value is null
* @param <T> the value type
* @return the value
* @throws java.lang.IllegalArgumentException if value is null
*/
public static <T> T notNull(final String name, final T value, final SingleResultCallback<?> callback) {
if (value == null) {
IllegalArgumentException exception = new IllegalArgumentException(name + " can not be null");
callback.onResult(null, exception);
throw exception;
}
return value;
}

/**
* Throw IllegalStateException if the condition if false.
*
Expand All @@ -110,22 +90,6 @@ public static void isTrue(final String name, final boolean condition) {
}
}

/**
* Throw IllegalStateException if the condition if false.
*
* @param name the name of the state that is being checked
* @param condition the condition about the parameter to check
* @param callback the callback that also is passed the exception if the condition is not true
* @throws java.lang.IllegalStateException if the condition is false
*/
public static void isTrue(final String name, final boolean condition, final SingleResultCallback<?> callback) {
if (!condition) {
IllegalStateException exception = new IllegalStateException("state should be: " + name);
callback.onResult(null, exception);
throw exception;
}
}

/**
* Throw IllegalArgumentException if the condition if false.
*
Expand Down
22 changes: 20 additions & 2 deletions driver-core/src/main/com/mongodb/internal/Locks.java
Expand Up @@ -17,6 +17,8 @@
package com.mongodb.internal;

import com.mongodb.MongoInterruptedException;
import com.mongodb.internal.async.AsyncRunnable;
import com.mongodb.internal.async.SingleResultCallback;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -36,7 +38,23 @@ public static void withLock(final Lock lock, final Runnable action) {
});
}

public static <V> V withLock(final StampedLock lock, final Supplier<V> supplier) {
public static void withLockAsync(final StampedLock lock, final AsyncRunnable runnable,
final SingleResultCallback<Void> callback) {
long stamp;
try {
stamp = lock.writeLockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
callback.onResult(null, new MongoInterruptedException("Interrupted waiting for lock", e));
return;
}

runnable.thenAlwaysRunAndFinish(() -> {
lock.unlockWrite(stamp);
}, callback);
}

public static void withLock(final StampedLock lock, final Runnable runnable) {
long stamp;
try {
stamp = lock.writeLockInterruptibly();
Expand All @@ -45,7 +63,7 @@ public static <V> V withLock(final StampedLock lock, final Supplier<V> supplier)
throw new MongoInterruptedException("Interrupted waiting for lock", e);
}
try {
return supplier.get();
runnable.run();
} finally {
lock.unlockWrite(stamp);
}
Expand Down
26 changes: 26 additions & 0 deletions driver-core/src/main/com/mongodb/internal/async/AsyncConsumer.java
@@ -0,0 +1,26 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb.internal.async;

/**
* See tests for usage (AsyncFunctionsTest).
* <p>
* This class is not part of the public API and may be removed or changed at any time
*/
@FunctionalInterface
public interface AsyncConsumer<T> extends AsyncFunction<T, Void> {
}
33 changes: 33 additions & 0 deletions driver-core/src/main/com/mongodb/internal/async/AsyncFunction.java
@@ -0,0 +1,33 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb.internal.async;

import com.mongodb.lang.Nullable;

/**
* See tests for usage (AsyncFunctionsTest).
* <p>
* This class is not part of the public API and may be removed or changed at any time
*/
@FunctionalInterface
public interface AsyncFunction<T, R> {
/**
* This should not be called externally, but should be implemented as a
* lambda. To "finish" an async chain, use one of the "finish" methods.
*/
void unsafeFinish(@Nullable T value, SingleResultCallback<R> callback);
}
158 changes: 158 additions & 0 deletions driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java
@@ -0,0 +1,158 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb.internal.async;

import com.mongodb.internal.async.function.RetryState;
import com.mongodb.internal.async.function.RetryingAsyncCallbackSupplier;

import java.util.function.Predicate;
import java.util.function.Supplier;

/**
* See tests for usage (AsyncFunctionsTest).
* <p>
* This class is not part of the public API and may be removed or changed at any time
*/
@FunctionalInterface
public interface AsyncRunnable extends AsyncSupplier<Void>, AsyncConsumer<Void> {

static AsyncRunnable beginAsync() {
return (c) -> c.onResult(null, null);
}

/**
* Must be invoked at end of async chain
* @param runnable the sync code to invoke (under non-exceptional flow)
* prior to the callback
* @param callback the callback provided by the method the chain is used in
*/
default void thenRunAndFinish(final Runnable runnable, final SingleResultCallback<Void> callback) {
this.finish((r, e) -> {
if (e != null) {
callback.onResult(null, e);
return;
}
try {
runnable.run();
} catch (Throwable t) {
callback.onResult(null, t);
return;
}
callback.onResult(null, null);
});
}

/**
* See {@link #thenRunAndFinish(Runnable, SingleResultCallback)}, but the runnable
* will always be executed, including on the exceptional path.
* @param runnable the runnable
* @param callback the callback
*/
default void thenAlwaysRunAndFinish(final Runnable runnable, final SingleResultCallback<Void> callback) {
this.finish((r, e) -> {
try {
runnable.run();
} catch (Throwable t) {
if (e != null) {
t.addSuppressed(e);
}
callback.onResult(null, t);
return;
}
callback.onResult(null, e);
});
}

/**
* @param runnable The async runnable to run after this runnable
* @return the composition of this runnable and the runnable, a runnable
*/
default AsyncRunnable thenRun(final AsyncRunnable runnable) {
return (c) -> {
this.unsafeFinish((r, e) -> {
if (e == null) {
runnable.unsafeFinish(c);
} else {
c.onResult(null, e);
}
});
};
}

/**
* @param condition the condition to check
* @param runnable The async runnable to run after this runnable,
* if and only if the condition is met
* @return the composition of this runnable and the runnable, a runnable
*/
default AsyncRunnable thenRunIf(final Supplier<Boolean> condition, final AsyncRunnable runnable) {
return (callback) -> {
this.unsafeFinish((r, e) -> {
if (e != null) {
callback.onResult(null, e);
return;
}
boolean matched;
try {
matched = condition.get();
} catch (Throwable t) {
callback.onResult(null, t);
return;
}
if (matched) {
runnable.unsafeFinish(callback);
} else {
callback.onResult(null, null);
}
});
};
}

/**
* @param supplier The supplier to supply using after this runnable
* @return the composition of this runnable and the supplier, a supplier
* @param <R> The return type of the resulting supplier
*/
default <R> AsyncSupplier<R> thenSupply(final AsyncSupplier<R> supplier) {
return (c) -> {
this.unsafeFinish((r, e) -> {
if (e == null) {
supplier.unsafeFinish(c);
} else {
c.onResult(null, e);
}
});
};
}

/**
* @param runnable the runnable to loop
* @param shouldRetry condition under which to retry
* @return the composition of this, and the looping branch
* @see RetryingAsyncCallbackSupplier
*/
default AsyncRunnable thenRunRetryingWhile(
final AsyncRunnable runnable, final Predicate<Throwable> shouldRetry) {
return thenRun(callback -> {
new RetryingAsyncCallbackSupplier<Void>(
new RetryState(),
(rs, lastAttemptFailure) -> shouldRetry.test(lastAttemptFailure),
cb -> runnable.finish(cb) // finish is required here, to handle exceptions
).get(callback);
});
}
}

0 comments on commit dd9c52b

Please sign in to comment.