NetApp / netapp-dataops-toolkit

The NetApp DataOps Toolkit is a Python library that makes it simple for developers, data scientists, DevOps engineers, and data engineers to perform various data management tasks, such as near-instantaneously provisioning, cloning, or snapshotting a data volume or JupyterLab workspace.
BSD 3-Clause "New" or "Revised" License
2 stars 0 forks source link

Question about Airflow authenication settings programatically? #3

Closed sakaia closed 3 years ago

sakaia commented 3 years ago

I am try to use NetApp Data Science Toolkit by editing Snapshot ( AI Training by Airflow). Rewriting code works successfully. but I have question.

As you know, AirflowConnectionName is defined by Airflow UI. But NetApp Data Science Toolkit uses another approach for authentication (~/.ntap_dsutil/config.json).

Can we change the Authenication setting in python script?

Ref. The my current code for DSTK (but Quick and Dirty) It assumes ntap_dsutil.py file exists in same directry

# Define function that triggers the creation of a NetApp snapshot
def netappSnapshotDSTK(**kwargs) -> str :
    # Parse args
    for key, value in kwargs.items() :
        if key == 'pvName' :
            pvName = value

    # Import needed functions/classes
    from ntap_dsutil import createSnapshot
    from datetime import datetime

    # Convert pv name to ONTAP volume name
    # The following will not work if you specified a custom storagePrefix when creating your
    # Trident backend. If you specified a custom storagePrefix, you will need to update this
    # code to match your prefix.
    volumeName = 'trident_%s' % pvName.replace("-", "_")
    print('\npv name: ', pvName)
    print('ONTAP volume name: ', volumeName)

    timestamp = datetime.today().strftime("%Y%m%d_%H%M%S")
    snapshotName = 'airflow_%s' % timestamp
    createSnapshot(volumeName=volumeName, snapshotName=snapshotName, printOutput=True)

Original Code for TR4798

# Define function that triggers the creation of a NetApp snapshot
def netappSnapshot(**kwargs) -> str :
    # Parse args
    for key, value in kwargs.items() :
        if key == 'pvName' :
            pvName = value
        elif key == 'verifySSLCert' :
            verifySSLCert = value
        elif key == 'airflowConnectionName' :
            airflowConnectionName = value

    # Install netapp_ontap package
    import sys, subprocess
    result = subprocess.check_output([sys.executable, '-m', 'pip', 'install', '--user', 'netapp-ontap'])
    print(str(result).replace('\\n', '\n'))

    # Import needed functions/classes
    from netapp_ontap import config as netappConfig
    from netapp_ontap.host_connection import HostConnection as NetAppHostConnection
    from netapp_ontap.resources import Volume, Snapshot
    from datetime import datetime
    import json

    # Retrieve ONTAP cluster admin account details from Airflow connection
    connections = get_connections(conn_id = airflowConnectionName)
    ontapConnection = connections[0] # Assumes that you only have one connection with the specified conn_id configured in Airflow
    ontapClusterAdminUsername = ontapConnection.login
    ontapClusterAdminPassword = ontapConnection.password
    ontapClusterMgmtHostname = ontapConnection.host
    # Configure connection to ONTAP cluster/instance
    netappConfig.CONNECTION = NetAppHostConnection(
        host = ontapClusterMgmtHostname,
        username = ontapClusterAdminUsername,
        password = ontapClusterAdminPassword,
        verify = verifySSLCert
    )

    # Convert pv name to ONTAP volume name
    # The following will not work if you specified a custom storagePrefix when creating your
    # Trident backend. If you specified a custom storagePrefix, you will need to update this
    # code to match your prefix.
    volumeName = 'trident_%s' % pvName.replace("-", "_")
    print('\npv name: ', pvName)
    print('ONTAP volume name: ', volumeName)

    # Create snapshot; print API response
    volume = Volume.find(name = volumeName)
    timestamp = datetime.today().strftime("%Y%m%d_%H%M%S")
    snapshot = Snapshot.from_dict({
        'name': 'airflow_%s' % timestamp,
        'comment': 'Snapshot created by a Apache Airflow DAG',
        'volume': volume.to_dict()
    })
    response = snapshot.post()
    print("\nAPI Response:")
    print(response.http_response.text)

    # Retrieve snapshot details
    snapshot.get()
    # Convert snapshot details to JSON string and print
    snapshotDetails = snapshot.to_dict()
    print("\nSnapshot Details:")
    print(json.dumps(snapshotDetails, indent=2))

    # Return name of newly created snapshot
    return snapshotDetails['name']
mboglesby commented 3 years ago

@sakaia we plan to update the examples in TR-4798 to include usage of the NetApp Data Science Toolkit. When we do that, we will release some enhancements to the toolkit to simplify the integration with Airflow. I'll follow up here when those are released.