boto / boto3

AWS SDK for Python
https://aws.amazon.com/sdk-for-python/
Apache License 2.0
9.03k stars 1.87k forks source link

[AppFlow] - Bad task filter formatting when updated programmatically using boto3 #4305

Open santanagabriel opened 2 days ago

santanagabriel commented 2 days ago

Describe the bug

We are trying to update an AppFlow filter programmatically, using the Boto3 library on Lambda.

The flow has a SAP instance as source data and S3 bucket as destination.

After successfully updating the flow filter with option "is exactly" as condition and multiple criteria, (as shown in the print below), the following filter is sent on the URL to the SAP instance on the first running:

( SalesOrder eq '2001,1790' )

However, when saving the flow manually using AWS Console (without making any changes), a different filter format is sent on the URL to SAP:

( ( SalesOrder eq '2001 ' ) or ( SalesOrder eq ' 1790' ) )

image

Regression Issue

Expected Behavior

Send the correct filter format on URL to SAP on the first running after filter update using The Boto3 library:

( ( SalesOrder eq '2001 ' ) or ( SalesOrder eq ' 1790' ) )

Current Behavior

On the first running after filter update using Boto3, a incorrect filter format is sent on the URL to SAP instance:

( SalesOrder eq '2001,1790' )

Reproduction Steps

Configure a new flow with using SAP OData connector to call an API (e.g. API_SALES_ORDER_SRV/A_SalesOrderItem) on AWS Console.

Programmatically add a new filter to the flow, with option "is exactly" as condition and multiple criteria:

`
logger = logging.getLogger() logger.setLevel(logging.INFO)

appflow_client = boto3.client("appflow")

def lambda_handler(event, context):

new_filter_task = {
    "sourceFields": [flow_filter],
    "connectorOperator": {"SAPOData": "EQUAL_TO"},
    "taskType": "Filter",
    "taskProperties": {
        "DATA_TYPE": "Edm.String",
        "VALUE": "2001,1790",
    },
}

try:

    filter_tasks = []
    # Add the new filter task
    filter_tasks.append(new_filter_task)

    # Add a MAP_ALL task if it doesn't exist
    map_all_task = {"taskType": "Map_all", "sourceFields": [], "taskProperties": {}}

    filter_tasks.append(map_all_task)

    current_flow = appflow_client.describe_flow(flowName=flow_name)

    update_params = {
        "flowName": flow_name,
        "sourceFlowConfig": {
            "connectorType": current_flow["sourceFlowConfig"]["connectorType"],
            "sourceConnectorProperties": current_flow["sourceFlowConfig"][
                "sourceConnectorProperties"
            ],
            "connectorProfileName": current_flow["sourceFlowConfig"].get(
                "connectorProfileName"
            ),
        },
        "destinationFlowConfigList": current_flow["destinationFlowConfigList"],
        "tasks": filter_tasks,
        "triggerConfig": {
            "triggerType": current_flow["triggerConfig"]["triggerType"]
        },
    }

    appflow_client.update_flow(**update_params)
    # appflow_client.create_flow(**update_params)
    logger.info(
        f"Flow '{flow_name}' updated successfully. All previous filters removed and new filter added."
    )
    return True
except ClientError as e:
    logger.error(f"Error updating flow '{flow_name}': {e}", exc_info=True)
    return False

`

Possible Solution

No response

Additional Information/Context

No response

SDK version used

1.34.155

Environment details (OS name and version, etc.)

Python 3.12

tim-finnigan commented 1 day ago

Thanks for reaching out. The boto3 update_flow command calls the UpdateFlow API, so this issue appears to be with the service API rather than the SDK directly. If there's an issue with the API behavior then we can reach out to the AppFlow team internally regarding this.

First could you provide a complete minimal snippet for reproducing the issue? The current snippet is missing the value for flow_filter.

Have you also tried reproducing this outside of Lambda? The default boto3 version included in the Python runtime for Lambda is often a few versions behind latest, so you may be missing recent updates/fixes. (The latest boto3 version per the CHANGELOG is 1.35.41, and you can run print(boto3.__version__) in your function to confirm which version you're using.) .

Can you also share your debug logs (with any sensitive info redacted) which you can get by adding boto3.set_stream_logger('') to your script after importing boto3. That would help with further investigation into the issue.

santanagabriel commented 22 hours ago

Hi Tim, thanks for your quick reply!

I have installed the latest boto3 version (1.35.41) on my VSCode by using requirements.txt and pip install -r requirements.txt -t .

Still getting the same behavior as previously mentioned, both running the function locally on VSCode or directly on AWS Console.

Right below is the minimal snippet for reproducing the issue: (Please be aware that in this snippet we are reading a previously created flow to update it)

import boto3
from botocore.exceptions import ClientError

boto3.set_stream_logger("")
print(boto3.__version__)

appflow_client = boto3.client("appflow")

def lambda_handler(event, context):

    filter_tasks = []
    # Add new filter task with 2 criterias for the same field (SalesOrder) with OR conditions
    #The main objective is to get these only two documents from SAP, nothing else
    new_filter_task = {
        "sourceFields": ["SalesOrder"],
        "connectorOperator": {"SAPOData": "EQUAL_TO"},
        "taskType": "Filter",
        "taskProperties": {
            "DATA_TYPE": "Edm.String",
            "VALUE": "1790,2001",
        },
    }
    filter_tasks.append(new_filter_task)

    # Add a mandatory MAP_ALL task
    map_all_task = {"taskType": "Map_all", "sourceFields": [], "taskProperties": {}}
    filter_tasks.append(map_all_task)

    try:
        #Flow must exists on AppFlow before running this code
        flow_name = "nas-salesorder-header" #Replace with your flow name if necessary

        #Get others parameters from the existing flow to allow update
        current_flow = appflow_client.describe_flow(flowName=flow_name)

        update_params = {
            "flowName": flow_name,
            "sourceFlowConfig": {
                "connectorType": current_flow["sourceFlowConfig"]["connectorType"],
                "sourceConnectorProperties": current_flow["sourceFlowConfig"][
                    "sourceConnectorProperties"
                ],
                "connectorProfileName": current_flow["sourceFlowConfig"].get(
                    "connectorProfileName"
                ),
            },
            "destinationFlowConfigList": current_flow["destinationFlowConfigList"],
            "tasks": filter_tasks,
            "triggerConfig": {
                "triggerType": current_flow["triggerConfig"]["triggerType"]
            },
        }
        #Trigger Update
        appflow_client.update_flow(**update_params)
        print(f"Flow '{flow_name}' updated successfully.")
        return True
    except ClientError as e:
        print(f"Error updating flow '{flow_name}': {e}", exc_info=True)
        return False

Here is the redacted log of my last execution: redacted_logs.json

Please let me know if you need any additional information.

Thanks in advance.