Skip to content

Commit

Permalink
feat: dataplane self-registration (#4166)
Browse files Browse the repository at this point in the history
* feat: dataplane self-registration

* pr remarks
  • Loading branch information
ndr-brt committed May 14, 2024
1 parent a6a17f5 commit 818d64f
Show file tree
Hide file tree
Showing 88 changed files with 1,422 additions and 305 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

import static java.lang.String.format;
Expand Down Expand Up @@ -113,8 +114,8 @@ public void beforeTestExecution(ExtensionContext extensionContext) throws Except
var savedProperties = (Properties) System.getProperties().clone();
properties.forEach(System::setProperty);

var runtimeException = new AtomicReference<Exception>();
var latch = new CountDownLatch(1);

runtimeThread = new Thread(() -> {
try {

Expand All @@ -127,6 +128,7 @@ public void beforeTestExecution(ExtensionContext extensionContext) throws Except

latch.countDown();
} catch (Exception e) {
runtimeException.set(e);
throw new EdcException(e);
}
});
Expand All @@ -136,7 +138,7 @@ public void beforeTestExecution(ExtensionContext extensionContext) throws Except
runtimeThread.start();

if (!latch.await(20, SECONDS)) {
throw new EdcException("Failed to start EDC runtime");
throw new EdcException("Failed to start EDC runtime", runtimeException.get());
}

MONITOR.info("Runtime %s started".formatted(name));
Expand Down
1 change: 1 addition & 0 deletions core/common/lib/transform-lib/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies {
api(project(":spi:common:core-spi"))
api(project(":spi:common:json-ld-spi"))
api(project(":spi:common:transform-spi"))
api(project(":spi:data-plane-selector:data-plane-selector-spi"))

api(libs.jakartaJson)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*
*/

package org.eclipse.edc.connector.dataplane.selector.transformer;
package org.eclipse.edc.transform.transformer.edc.from;

import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.json.JsonBuilderFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*
*/

package org.eclipse.edc.connector.dataplane.selector.transformer;
package org.eclipse.edc.transform.transformer.edc.to;

import jakarta.json.JsonArray;
import jakarta.json.JsonNumber;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*
*/

package org.eclipse.edc.connector.dataplane.selector.transformer;
package org.eclipse.edc.transform.transformer.edc.from;

import jakarta.json.Json;
import jakarta.json.JsonString;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*
*/

package org.eclipse.edc.connector.dataplane.selector.transformer;
package org.eclipse.edc.transform.transformer.edc.to;

import jakarta.json.Json;
import jakarta.json.JsonObject;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.result.ServiceResult;
import org.eclipse.edc.spi.telemetry.Telemetry;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.message.RemoteMessage;
import org.eclipse.edc.transaction.spi.TransactionContext;
import org.eclipse.edc.validator.spi.DataAddressValidatorRegistry;
Expand All @@ -52,7 +51,6 @@
import static java.lang.String.format;
import static java.util.UUID.randomUUID;
import static java.util.stream.Collectors.joining;
import static org.eclipse.edc.connector.controlplane.transfer.dataplane.spi.TransferDataPlaneConstants.HTTP_PROXY;
import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess.Type.CONSUMER;
import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess.Type.PROVIDER;
import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.SUSPENDED;
Expand Down Expand Up @@ -153,9 +151,6 @@ public ServiceResult<TransferProcess> findById(String id, TokenRepresentation to

@NotNull
private ServiceResult<TransferProcess> requestedAction(TransferRequestMessage message, String assetId) {
var destination = message.getDataDestination() != null
? message.getDataDestination() : DataAddress.Builder.newInstance().type(HTTP_PROXY).build();

var existingTransferProcess = transferProcessStore.findForCorrelationId(message.getConsumerPid());
if (existingTransferProcess != null) {
return ServiceResult.success(existingTransferProcess);
Expand All @@ -165,7 +160,7 @@ private ServiceResult<TransferProcess> requestedAction(TransferRequestMessage me
.protocol(message.getProtocol())
.correlationId(message.getConsumerPid())
.counterPartyAddress(message.getCallbackAddress())
.dataDestination(destination)
.dataDestination(message.getDataDestination())
.assetId(assetId)
.contractId(message.getContractId())
.transferType(message.getTransferType())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import static java.util.Collections.emptyMap;
import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.edc.connector.controlplane.services.transferprocess.TransferProcessProtocolServiceImpl.TRANSFER_PROCESS_REQUEST_SCOPE;
import static org.eclipse.edc.connector.controlplane.transfer.dataplane.spi.TransferDataPlaneConstants.HTTP_PROXY;
import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess.Type.CONSUMER;
import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess.Type.PROVIDER;
import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.COMPLETED;
Expand Down Expand Up @@ -223,36 +222,6 @@ void notifyRequested_invalidDestination_shouldNotInitiateTransfer() {
verifyNoInteractions(listener);
}

@Test
void notifyRequested_missingDestination_shouldInitiateTransfer() {
var participantAgent = participantAgent();
var tokenRepresentation = tokenRepresentation();
var message = TransferRequestMessage.Builder.newInstance()
.consumerPid("consumerPid")
.processId("consumerPid")
.protocol("protocol")
.contractId("agreementId")
.callbackAddress("http://any")
.build();

when(protocolTokenValidator.verify(eq(tokenRepresentation), eq(TRANSFER_PROCESS_REQUEST_SCOPE), any(), eq(message))).thenReturn(ServiceResult.success(participantAgent));
when(negotiationStore.findContractAgreement(any())).thenReturn(contractAgreement());
when(validationService.validateAgreement(any(ParticipantAgent.class), any())).thenReturn(Result.success(null));

var result = service.notifyRequested(message, tokenRepresentation);

assertThat(result).isSucceeded().satisfies(transferProcess -> {
assertThat(transferProcess.getCorrelationId()).isEqualTo("consumerPid");
assertThat(transferProcess.getCounterPartyAddress()).isEqualTo("http://any");
assertThat(transferProcess.getAssetId()).isEqualTo("assetId");
assertThat(transferProcess.getDataDestination().getType()).isEqualTo(HTTP_PROXY);
});
verify(listener).preCreated(any());
verify(store).save(argThat(t -> t.getState() == INITIAL.code()));
verify(listener).initiated(any());
verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class));
}

@Test
void notifyCompleted_shouldTransitionToCompleted() {
var participantAgent = participantAgent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.eclipse.edc.transaction.spi.TransactionContext;

import java.util.List;
import java.util.Optional;

public class EmbeddedDataPlaneSelectorService implements DataPlaneSelectorService {

Expand All @@ -48,14 +49,15 @@ public List<DataPlaneInstance> getAll() {

@Override
public DataPlaneInstance select(DataAddress source, DataAddress destination, String selectionStrategy, String transferType) {
var strategy = selectionStrategyRegistry.find(selectionStrategy);
var sanitizedSelectionStrategy = Optional.ofNullable(selectionStrategy).orElse(DEFAULT_STRATEGY);
var strategy = selectionStrategyRegistry.find(sanitizedSelectionStrategy);
if (strategy == null) {
throw new IllegalArgumentException("Strategy " + selectionStrategy + " was not found");
throw new IllegalArgumentException("Strategy " + sanitizedSelectionStrategy + " was not found");
}

return transactionContext.execute(() -> {
try (var stream = store.getAll()) {
var dataPlanes = stream.filter(dataPlane -> dataPlane.canHandle(source, destination, transferType)).toList();
var dataPlanes = stream.filter(dataPlane -> dataPlane.canHandle(source, transferType)).toList();
return strategy.apply(dataPlanes);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void select_shouldUseChosenSelector() {
when(selectionStrategy.apply(any())).thenAnswer(it -> instances.get(0));
when(selectionStrategyRegistry.find(any())).thenReturn(selectionStrategy);

var result = selector.select(createAddress("srcTestType"), createAddress("destTestType"), "strategy");
var result = selector.select(createAddress("srcTestType"), createAddress("destTestType"), "strategy", "transferType");

assertThat(result).isNotNull().extracting(DataPlaneInstance::getId).isEqualTo("instance0");
verify(selectionStrategyRegistry).find("strategy");
Expand All @@ -58,7 +58,7 @@ void select_shouldThrowException_whenStrategyNotFound() {
when(store.getAll()).thenReturn(instances.stream());
when(selectionStrategyRegistry.find(any())).thenReturn(null);

assertThatThrownBy(() -> selector.select(createAddress("srcTestType"), createAddress("destTestType"), "strategy"))
assertThatThrownBy(() -> selector.select(createAddress("srcTestType"), createAddress("destTestType"), "strategy", "transferType"))
.isInstanceOf(IllegalArgumentException.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

Expand All @@ -43,4 +44,9 @@ public Result<Endpoint> generateFor(DataAddress sourceDataAddress) {
public void addGeneratorFunction(String destinationType, Function<DataAddress, Endpoint> generatorFunction) {
generatorFunctions.put(destinationType, generatorFunction);
}

@Override
public Set<String> supportedDestinationTypes() {
return generatorFunctions.keySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import static java.lang.String.format;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toSet;

/**
* Default pipeline service implementation.
Expand Down Expand Up @@ -140,6 +142,16 @@ public void registerFactory(DataSinkFactory factory) {
sinkFactories.add(factory);
}

@Override
public Set<String> supportedSourceTypes() {
return sourceFactories.stream().map(DataSourceFactory::supportedType).collect(toSet());
}

@Override
public Set<String> supportedSinkTypes() {
return sinkFactories.stream().map(DataSinkFactory::supportedType).collect(toSet());
}

@Nullable
private DataSourceFactory getSourceFactory(DataFlowStartMessage request) {
return sourceFactories.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat;


Expand All @@ -42,4 +43,12 @@ void generateFor_noFunction() {
.isEqualTo("No Endpoint generator function registered for source data type 'testtype'");
}

}
@Test
void supportedTypes() {
generatorService.addGeneratorFunction("type", dataAddress -> new Endpoint("any", "any"));

var result = generatorService.supportedDestinationTypes();

assertThat(result).containsOnly("type");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -160,6 +161,31 @@ void canHandle_shouldReturnTrue_whenSourceAndDestinationCanBeHandled(String sour
assertThat(result).isEqualTo(expected);
}

@Nested
class SupportedTypes {

@Test
void shouldReturnSourceTypesFromFactories() {
when(sourceFactory.supportedType()).thenReturn("source");

var result = service.supportedSourceTypes();

assertThat(result).containsOnly("source");
verifyNoInteractions(sinkFactory);
}

@Test
void shouldReturnSinkTypesFromFactories() {
when(sinkFactory.supportedType()).thenReturn("sink");

var result = service.supportedSinkTypes();

assertThat(result).containsOnly("sink");
verifyNoInteractions(sourceFactory);
}

}

private DataFlow dataFlow(String sourceType, String destinationType) {
return DataFlow.Builder.newInstance()
.id("1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

plugins {
`java-library`
id("io.swagger.core.v3.swagger-gradle-plugin")
id(libs.plugins.swagger.get().pluginId)
}

dependencies {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

plugins {
`java-library`
id("io.swagger.core.v3.swagger-gradle-plugin")
id(libs.plugins.swagger.get().pluginId)
}

dependencies {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

plugins {
`java-library`
id("io.swagger.core.v3.swagger-gradle-plugin")
id(libs.plugins.swagger.get().pluginId)
}

dependencies {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

plugins {
`java-library`
id("io.swagger.core.v3.swagger-gradle-plugin")
id(libs.plugins.swagger.get().pluginId)
}

dependencies {
Expand All @@ -34,4 +34,4 @@ edcBuild {
swagger {
apiGroup.set("dsp-api")
}
}
}
4 changes: 2 additions & 2 deletions docs/developer/openapi.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ val rsApi: String by project

plugins {
`java-library`
id("io.swagger.core.v3.swagger-gradle-plugin") //<-- add this
id(libs.plugins.swagger.get().pluginId) //<-- add this
}

dependencies {
Expand All @@ -55,7 +55,7 @@ add your module to one of the categories, simply add this block to your module's
```kotlin
plugins {
`java-library`
id("io.swagger.core.v3.swagger-gradle-plugin")
id(libs.plugins.swagger.get().pluginId)
}

dependencies {
Expand Down
2 changes: 1 addition & 1 deletion extensions/common/api/api-observability/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

plugins {
`java-library`
id("io.swagger.core.v3.swagger-gradle-plugin")
id(libs.plugins.swagger.get().pluginId)
}

dependencies {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ plugins {
}

dependencies {
implementation(project(":extensions:common:api:api-core"))
api(project(":spi:control-plane:transfer-spi"))
implementation(project(":extensions:common:api:api-core"))
implementation(project(":extensions:common:http:lib:jersey-providers-lib"))

testImplementation(project(":core:common:junit"))
}
Expand Down

0 comments on commit 818d64f

Please sign in to comment.