MicrosoftDocs / azure-docs

Open source documentation of Microsoft Azure
https://docs.microsoft.com/azure
Creative Commons Attribution 4.0 International
10.2k stars 21.36k forks source link

Azure Python SDK to create ADF data flow and pipeline #110147

Closed SiddheshDesai06 closed 12 months ago

SiddheshDesai06 commented 1 year ago

I am trying to follow the code in this Document- https://learn.microsoft.com/en-us/azure/data-factory/quickstart-create-data-factory-python but the code is giving errors, I have modified the code to create the datasets the dataset creation is successful but the datasets are not linked to the output blob path and the creation of pipeline also fails.

My modified code:-

from azure.identity import ClientSecretCredential 
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *
from datetime import datetime, timedelta
import time

def print_item(group):
    """Print an Azure object instance."""
    print("\tName: {}".format(group.name))
    print("\tId: {}".format(group.id))
    if hasattr(group, 'location'):
        print("\tLocation: {}".format(group.location))
    if hasattr(group, 'tags'):
        print("\tTags: {}".format(group.tags))
    if hasattr(group, 'properties'):
        print_properties(group.properties)

def print_properties(props):
    """Print a ResourceGroup properties instance."""
    if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
        print("\tProperties:")
        print("\t\tProvisioning State: {}".format(props.provisioning_state))
    print("\n\n")

def print_activity_run_details(activity_run):
    """Print activity run details."""
    print("\n\tActivity run details\n")
    print("\tActivity run status: {}".format(activity_run.status))
    if activity_run.status == 'Succeeded':
        print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
        print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
        print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
    else:
        print("\tErrors: {}".format(activity_run.error['message']))

def main():

    # Azure subscription ID
    subscription_id = 'subid'

    # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
    rg_name = 'siliconrg123'

    # The data factory name. It must be globally unique.
    df_name = 'siliconadf432912'

    # Specify your Active Directory client ID, client secret, and tenant ID
    credentials = ClientSecretCredential(client_id='clientid', client_secret='clientsecret', tenant_id='tenantid') 
    resource_client = ResourceManagementClient(credentials, subscription_id)
    adf_client = DataFactoryManagementClient(credentials, subscription_id)

    rg_params = {'location':'westus'}
    df_params = {'location':'westus'}

    # create the resource group
    # # comment out if the resource group already exits
    resource_client.resource_groups.create_or_update(rg_name, rg_params)

    # Create a data factory
    df_resource = Factory(location='westus')
    df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
    print_item(df)
    while df.provisioning_state != 'Succeeded':
        df = adf_client.factories.get(rg_name, df_name)
        time.sleep(1)

    # Create an Azure Storage linked service
    ls_name = 'AzureBlobStorage1'
    # ls_type = 'AzureStorageLinkedService'

    # # IMPORTANT: specify the name and key of your Azure Storage account.
    storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=siliconstrg1;AccountKey=xxxxxxxxxxxxx;EndpointSuffix=core.windows.net')

    ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string)) 
    ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
    print_item(ls)

    # Create an Azure blob dataset (input)
    ds_name = 'ds_in'
    ds_ls = LinkedServiceReference(type=LinkedServiceReference,reference_name=ls_name)
    blob_path = 'adf-container/inputfolder'
    blob_filename = 'input.txt'
    ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
        #linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename)) 
        linked_service_name=LinkedServiceReference)) 
    ds = adf_client.datasets.create_or_update(
        rg_name, df_name, ds_name, ds_azure_blob)
    print_item(ds)

    # Create an Azure blob dataset (output)
    dsOut_name = 'ds_out'
    output_blobpath = 'adf-container2/inputfolder'
    dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=LinkedServiceReference))
    dsOut = adf_client.datasets.create_or_update(
        rg_name, df_name, dsOut_name, dsOut_azure_blob)
    print_item(dsOut)

    # Create a copy activity
    act_name = 'copyBlobtoBlob'
    blob_source = BlobSource()
    blob_sink = BlobSink()
    dsin_ref = DatasetReference(type=DatasetReference,reference_name=ds_name)
    dsOut_ref = DatasetReference(type=DatasetReference,reference_name=dsOut_name)
    copy_activity = CopyActivity(name=act_name, inputs=[dsin_ref], outputs=[
                                 dsOut_ref], source=blob_source, sink=blob_sink)

    # Create a pipeline with the copy activity
    p_name = 'copyPipeline'
    params_for_pipeline = {}
    p_obj = PipelineResource(
        activities=[copy_activity], parameters=params_for_pipeline)
    p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
    print_item(p)

    # Create a pipeline run
    run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})

    # Monitor the pipeline run
    time.sleep(30)
    pipeline_run = adf_client.pipeline_runs.get(
        rg_name, df_name, run_response.run_id)
    print("\n\tPipeline run status: {}".format(pipeline_run.status))
    filter_params = RunFilterParameters(
        last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
    query_response = adf_client.activity_runs.query_by_pipeline_run(
        rg_name, df_name, pipeline_run.run_id, filter_params)
    print_activity_run_details(query_response.value[0])

# Start the main method
main()

I modified the lined below:-

ds_ls  =  LinkedServiceReference(type=LinkedServiceReference,reference_name=ls_name)
#linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))

linked_service_name=LinkedServiceReference))

Output:-

ADF resource and Datasets created but getting this error

image image Datasets created like below but output blob path is not shown:- image

And got this error after I ran the above code:-

  File "c:\adfpython\adf.py", line 114, in main
    raise HttpResponseError(response=response, error_format=ARMErrorFormat)
azure.core.exceptions.HttpResponseError: (BadRequest) The document creation or update failed because of invalid reference 'ds_in'.
Code: BadRequest
Message: The document creation or update failed because of invalid reference 'ds_in'.
Target: /subscriptions/xxxxxxx/resourceGroups/siliconrg123/providers/Microsoft.DataFactory/factories/siliconadf432912/pipelines/copyPipeline

Document Details

Do not edit this section. It is required for learn.microsoft.com ➟ GitHub issue linking.

SaibabaBalapur-MSFT commented 1 year ago

@SiddheshDesai06 Thanks for your feedback! We will investigate and update as appropriate.

Naveenommi-MSFT commented 1 year ago

Hi @SiddheshDesai06

Ds_in is not valid. You need to debug it. Can you do it in gui without SDK? Many customers are using Python SDK successfully.

SiddheshDesai06 commented 1 year ago

Hi @Naveenommi-MSFT Yes, I am able to do it in GUI but when I run the Default code from MS Document it gives errors. Can you give me a workable sample with Python SDK? Thank you!

SiddheshDesai06 commented 1 year ago

Hi @Naveenommi-MSFT invalid reference 'ds_in This is just one error, If you run the quickstart code - https://learn.microsoft.com/en-us/azure/data-factory/quickstart-create-data-factory-python#full-script by replacing variabes in def main with yours in def main():

# Azure subscription ID
subscription_id = '<subscription ID>'

# This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
rg_name = '<resource group>'

# The data factory name. It must be globally unique.
df_name = '<factory name>'

# Specify your Active Directory client ID, client secret, and tenant ID
credentials = ClientSecretCredential(client_id='<service principal ID>', client_secret='<service principal key>', tenant_id='<tenant ID>') 
resource_client = ResourceManagementClient(credentials, subscription_id)
adf_client = DataFactoryManagementClient(credentials, subscription_id)

rg_params = {'location':'westus'}
df_params = {'location':'westus'} values 

You will face many errors, so the quickstart code itself has an issue

I tried resolving the errors but the error continued when I used the quick start code as it is with my variables.

Naveenommi-MSFT commented 1 year ago

@ssabat We have reproduce the issue, we got similar error, please see the screenshot below. Could you please review add comments on this, update as appropriate.

image

Naveenommi-MSFT commented 1 year ago

@SiddheshDesai06 Thank you for bringing this to our attention. I've delegated this to content author @ssabat , who will review it and offer their insightful opinions.

ssabat commented 1 year ago

@SiddheshDesai06 @Naveenommi-MSFT This sample is very popular. Please follow steps exactly as per doc before you customize for your use case.

Create the application by following the steps in this link, using Authentication Option 2 (application secret), and assign the application to the Contributor role by following instructions in the same article. Make note of the following values as shown in the article to use in later steps: Application (client) ID, client secret value, and tenant ID.

You are missing resource assignments/binding and htttp errors are there because above is not setup correctly.

sid24desai commented 1 year ago

I tried assigning custom values of my application secrets, Still the entire code is full of errors.

ssabat commented 12 months ago

Thanks @SiddheshDesai06 for your feedback. This is a sample code only. It has been used many times with custom modification @sid24desai, please open a support ticket and we will work with you find the root cause for errors.

please-close