eclipse-edc / Connector

EDC core services including data plane and control plane
Apache License 2.0
288 stars 240 forks source link

Fatal error while resume PULL transfer process #4591

Open AndrYurk opened 3 weeks ago

AndrYurk commented 3 weeks ago

Bug Report

Describe the Bug

Not possible to Resume PULL transfer process from provider side due to fatal error on consumer side request.

Expected Behavior

Resume called from provider should start transfer process without exceptions.

Observed Behavior

Currently system throw an exception on consumer side while trying to change state from "SUSPENDED" to "STARTED".

[PROVIDER] DEBUG 2024-10-29T16:08:13.635279121 Resumption requested for TransferProcess with ID dd7d56ca-38e9-4203-afc8-b826592b9a5e
[CONSUMER] DEBUG 2024-10-29T16:08:13.850276352 [ExecutionManager] Run pre-execution task
[CONSUMER] DEBUG 2024-10-29T16:08:13.850403681 [ExecutionManager] No WorkItems found, skipping execution
[PROVIDER] DEBUG 2024-10-29T16:08:14.242948008 TransferProcess: ID dd7d56ca-38e9-4203-afc8-b826592b9a5e. Resume data flow
[PROVIDER] DEBUG 2024-10-29T16:08:14.246843283 [DataPlaneManagerImpl] DataFlow dd7d56ca-38e9-4203-afc8-b826592b9a5e is now in state STARTED
[PROVIDER] DEBUG 2024-10-29T16:08:14.246929808 TransferProcess: ID dd7d56ca-38e9-4203-afc8-b826592b9a5e. send transfer start to http://localhost:3326/protocol
[CONSUMER] DEBUG 2024-10-29T16:08:14.27007983 DSP: Incoming TransferStartMessage for class org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess process: 96942366-01d6-4660-a39d-01fea334106e
[CONSUMER] DEBUG 2024-10-29T16:08:14.274445504 DSP: Service call failed: Cannot process TransferStartMessage because transfer cannot be started
[PROVIDER] DEBUG 2024-10-29T16:08:14.490688961 TransferProcess: ID dd7d56ca-38e9-4203-afc8-b826592b9a5e. send transfer start to http://localhost:3326/protocol
[PROVIDER] SEVERE 2024-10-29T16:08:14.490804542 TransferProcess: ID dd7d56ca-38e9-4203-afc8-b826592b9a5e. Fatal error while send transfer start to http://localhost:3326/protocol. Error details: {"@type":"dspace:TransferError","dspace:code":"409","dspace:reason":"Cannot process TransferStartMessage because transfer cannot be started","@context":{"tx":"https://w3id.org/tractusx/v0.0.1/ns/","dcat":"http://www.w3.org/ns/dcat#","dct":"http://purl.org/dc/terms/","odrl":"http://www.w3.org/ns/odrl/2/","dspace":"https://w3id.org/dspace/v0.8/","@vocab":"https://w3id.org/edc/v0.0.1/ns/","edc":"https://w3id.org/edc/v0.0.1/ns/"}}
[PROVIDER] WARNING 2024-10-29T16:08:14.490933233 {"@type":"dspace:TransferError","dspace:code":"409","dspace:reason":"Cannot process TransferStartMessage because transfer cannot be started","@context":{"tx":"https://w3id.org/tractusx/v0.0.1/ns/","dcat":"http://www.w3.org/ns/dcat#","dct":"http://purl.org/dc/terms/","odrl":"http://www.w3.org/ns/odrl/2/","dspace":"https://w3id.org/dspace/v0.8/","@vocab":"https://w3id.org/edc/v0.0.1/ns/","edc":"https://w3id.org/edc/v0.0.1/ns/"}}
[PROVIDER] DEBUG 2024-10-29T16:08:14.492351832 [TransferProcessManagerImpl] TransferProcess dd7d56ca-38e9-4203-afc8-b826592b9a5e is now in state TERMINATED
[PROVIDER] DEBUG 2024-10-29T16:08:15.242057389 [PolicyMonitorManagerImpl] PolicyMonitorEntry dd7d56ca-38e9-4203-afc8-b826592b9a5e is now in state COMPLETED

Steps to Reproduce

End to end test for tractusx-edc/edc-tests/edc-controlplane/transfer-tests/TransferPullEndToEndTest

@Test
        void transferData_withSuspendResumeByProvider() {
            var assetId = "api-asset-1";

            var requestDefinition = request().withMethod("GET").withPath(MOCK_BACKEND_PATH);

            Map<String, Object> dataAddress = Map.of(
                    "baseUrl", privateBackendUrl,
                    "type", "HttpData",
                    "contentType", "application/json"
            );

            PROVIDER.createAsset(assetId, Map.of(), dataAddress);

            var accessPolicyId = PROVIDER.createPolicyDefinition(createAccessPolicy(CONSUMER.getBpn()));
            var contractPolicyId = PROVIDER.createPolicyDefinition(createContractPolicy(CONSUMER.getBpn()));
            PROVIDER.createContractDefinition(assetId, "def-1", accessPolicyId, contractPolicyId);
            var consumerTransferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER).withTransferType("HttpData-PULL").execute();

            CONSUMER.waitForTransferProcess(consumerTransferProcessId, TransferProcessStates.STARTED);

            // wait until EDC is available on the consumer side
            server.when(requestDefinition).respond(response().withStatusCode(200).withBody("test response"));

            var edr = CONSUMER.edrs().waitForEdr(consumerTransferProcessId);

            // consumer can fetch data with a valid token
            var data = CONSUMER.data().pullData(edr, Map.of());
            assertThat(data).isNotNull().isEqualTo("test response");

            server.verify(requestDefinition, VerificationTimes.exactly(1));
            var providerTransferProcessId  = PROVIDER.getTransferProcesses().stream()
                    .filter(filter -> filter.asJsonObject().getString("correlationId").equals(consumerTransferProcessId))
                    .map(id -> id.asJsonObject().getString("@id")).findFirst().orElseThrow();

            PROVIDER.suspendTransfer(providerTransferProcessId, "reason");
            PROVIDER.waitForTransferProcess(providerTransferProcessId, TransferProcessStates.SUSPENDED);

            // consumer cannot fetch data with the prev token (suspended)
            CONSUMER.data().pullDataRequest(edr, Map.of()).statusCode(403);
            server.verify(requestDefinition, VerificationTimes.exactly(1));

            PROVIDER.resumeTransfer(providerTransferProcessId);
            PROVIDER.waitForTransferProcess(providerTransferProcessId, TransferProcessStates.STARTED);

            var newEdr = CONSUMER.edrs().waitForEdr(consumerTransferProcessId);

            // consumer can now re-fetch data with a new EDR token
            data = CONSUMER.data().pullData(newEdr, Map.of());
            assertThat(data).isNotNull().isEqualTo("test response");

            server.verify(requestDefinition, VerificationTimes.exactly(2));

            // consumer cannot fetch data with the prev token (suspended) after the transfer process has been resumed
            CONSUMER.data().pullDataRequest(edr, Map.of()).statusCode(403);
            server.verify(requestDefinition, VerificationTimes.exactly(2));

        }

Manual scenario: 1) Create simple asset, policy and contract definition(asset, policy_definition, contract_definition) on provider EDC. 2) Get policy_ID and initiate cotract negotiation(query_catalog, init_contract_negotiation) on consumer EDC. 3) Get contractAgreementId(get_contract_negotiations) using ID from initiated transfer. 4) Initiate HTTP PULL transfer(pull_transfer) on consumer EDC using. 5) Get correlationId(get_transfer_process) using pull_transfer_ID on consumer EDC(state should be "STARTED"). 6) Suspend transfer process using correlationId(suspend_process) on provider EDC. 7) Check status process(get_transfer_process) using correlationId on provider EDC. State should be "SUSPENDED" 8) Resume transfer process(resume_process) using correlationId on provider EDC. 9) Check status process(get_transfer_process) using correlationId on provider EDC. State is "TERMINATED", "errorDetail" is filled with error. 10) Check status process (get_transfer_process) using pull_transfer_ID on consumer EDC. State is "SUSPENDED". Resume error request command.txt

Context Information

Reproduced on EDC 0.7.2 as well as on 0.10.0

Detailed Description

When provider initialize resume process, state changed to "RESUMING" and after success start(startTransferFlow), sent TransferStartMessage to consumer. Base on that consumer try change state to "STARTED" but according to org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess#canBeStartedConsumer it is not possible from state "SUSPENDED".

{
    "@id": "8abbbf1a-91f4-41aa-9491-bd617a78176e",
    "@type": "TransferProcess",
    "state": "TERMINATED",
    "stateTimestamp": 1730367263386,
    "type": "PROVIDER",
    "callbackAddresses": [],
    "correlationId": "524feeac-55f4-4913-9a4c-d34faf58c0e7",
    "assetId": "test",
    "contractId": "f7b63151-66de-4f83-a64a-5d7ab1bb92ae",
    "transferType": "HttpData-PULL",
    "errorDetail": "{\"@type\":\"dspace:TransferError\",\"dspace:code\":\"409\",\"dspace:reason\":\"Cannot process TransferStartMessage because transfer cannot be started\",\"dspace:processId\":\"524feeac-55f4-4913-9a4c-d34faf58c0e7\",\"@context\":{\"@vocab\":\"https://w3id.org/edc/v0.0.1/ns/\",\"edc\":\"https://w3id.org/edc/v0.0.1/ns/\",\"tx\":\"https://w3id.org/tractusx/v0.0.1/ns/\",\"tx-auth\":\"https://w3id.org/tractusx/auth/\",\"cx-policy\":\"https://w3id.org/catenax/policy/\",\"dcat\":\"http://www.w3.org/ns/dcat#\",\"dct\":\"http://purl.org/dc/terms/\",\"odrl\":\"http://www.w3.org/ns/odrl/2/\",\"dspace\":\"https://w3id.org/dspace/v0.8/\"}}",
    "@context": {
        "@vocab": "https://w3id.org/edc/v0.0.1/ns/",
        "edc": "https://w3id.org/edc/v0.0.1/ns/",
        "tx": "https://w3id.org/tractusx/v0.0.1/ns/",
        "tx-auth": "https://w3id.org/tractusx/auth/",
        "cx-policy": "https://w3id.org/catenax/policy/",
        "odrl": "http://www.w3.org/ns/odrl/2/"
    }
}

Possible Implementation

Allow to change consumer state from "SUSPENDED" to "STARTED" in org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess#canBeStartedConsumer

github-actions[bot] commented 3 weeks ago

Thanks for your contribution :fire: We will take a look asap :rocket: