Chaffelson / nipyapi

A convenient Python wrapper for Apache NiFi
Other
245 stars 76 forks source link

create connections between processors #83

Closed Chaffelson closed 5 years ago

Chaffelson commented 5 years ago

From an email: I tried something like

print nipyapi.nifi.ProcessGroupsApi().create_connection(id=pg_id, body=nipyapi.nifi.models.connection_entity.ConnectionEntity( source_group_id=pg_id, source_type="PROCESSOR", source_id=in_http.id, destination_group_id=pg_id, destination_type="PROCESSOR", destination_id=out_kafka.id ) )

but always got "Connection details must be specified"

could you please provide me with working example?

gaving commented 5 years ago

I'm also interested in this, been googling for an example of relationships but can't see any in the demos.

gaving commented 5 years ago

An attempt with debugging on:-

For:-

log.info("Creating connections")

nipyapi.nifi.ProcessGroupsApi().create_connection(
    id=process_group_0.id,
    body=nipyapi.nifi.models.connection_entity.ConnectionEntity(
        source_group_id=process_group_0.id,
        source_type="PROCESSOR",
        source_id=processor_0.id,
        destination_group_id=process_group_0.id,
        destination_type="PROCESSOR",
        destination_id=processor_1.id,
    ),
)

Result:-

INFO:__main__:Creating connections
header: Date header: X-Frame-Options header: Location header: Content-Type header: Vary header: Vary header: Content-Length header: Server send: b'POST /nifi-api/process-groups/bfab4680-0166-1000-655b-58cf8ede9c60/connections HTTP/1.1\r\nHost: internal.local:8080\r\nAccept-Encoding: identity\r\nContent-Length: 287\r\nAccept: application/json\r\nContent-Type: application/json\r\nUser-Agent: Swagger-Codegen/1.0.0/python\r\n\r\n'
send: b'{"sourceId": "bfab4708-0166-1000-162c-0b7ebd9c8bcc", "sourceGroupId": "bfab4680-0166-1000-655b-58cf8ede9c60", "sourceType": "PROCESSOR", "destinationId": "bfab47a8-0166-1000-1513-bc933cf8e366", "destinationGroupId": "bfab4680-0166-1000-655b-58cf8ede9c60", "destinationType": "PROCESSOR"}'
reply: 'HTTP/1.1 400 Bad Request\r\n'
2018-10-29 11:52:45,255 DEBUG http://internal.local:8080 "POST /nifi-api/process-groups/bfab4680-0166-1000-655b-58cf8ede9c60/connections HTTP/1.1" 400 37
2018-10-29 11:52:45,255 DEBUG http://internal.local:8080 "POST /nifi-api/process-groups/bfab4680-0166-1000-655b-58cf8ede9c60/connections HTTP/1.1" 400 37
DEBUG:urllib3.connectionpool:http://internal.local:8080 "POST /nifi-api/process-groups/bfab4680-0166-1000-655b-58cf8ede9c60/connections HTTP/1.1" 400 37
header: Date header: X-Frame-Options header: Content-Type header: Vary header: Content-Length header: Server Traceback (most recent call last):
  File "test.py", line 88, in <module>
    destination_id=processor_1.id,
  File "C:\Python37\lib\site-packages\nipyapi\nifi\apis\process_groups_api.py", line 180, in create_connection
    (data) = self.create_connection_with_http_info(id, body, **kwargs)
  File "C:\Python37\lib\site-packages\nipyapi\nifi\apis\process_groups_api.py", line 267, in create_connection_with_http_info
    collection_formats=collection_formats)
  File "C:\Python37\lib\site-packages\nipyapi\nifi\api_client.py", line 326, in call_api
    _return_http_data_only, collection_formats, _preload_content, _request_timeout)
  File "C:\Python37\lib\site-packages\nipyapi\nifi\api_client.py", line 153, in __call_api
    _request_timeout=_request_timeout)
  File "C:\Python37\lib\site-packages\nipyapi\nifi\api_client.py", line 371, in request
    body=body)
  File "C:\Python37\lib\site-packages\nipyapi\nifi\rest.py", line 268, in POST
    body=body)
  File "C:\Python37\lib\site-packages\nipyapi\nifi\rest.py", line 224, in request
    raise ApiException(http_resp=r)
nipyapi.nifi.rest.ApiException: (400)
Reason: Bad Request
HTTP response headers: HTTPHeaderDict({'Date': 'Mon, 29 Oct 2018 11:52:45 GMT', 'X-Frame-Options': 'SAMEORIGIN', 'Content-Type': 'text/plain', 'Vary': 'Accept-Encoding', 'Content-Length': '37', 'Server': 'Jetty(9.4.3.v20170317)'})
HTTP response body: Connection details must be specified.
gaving commented 5 years ago

Finally managed to get this going by specifying a component attribute:-

    log.info("Creating connections")
    nipyapi.nifi.ProcessGroupsApi().create_connection(
        id=process_group_0.id,
        body=nipyapi.nifi.models.connection_entity.ConnectionEntity(
            revision=nipyapi.nifi.models.revision_dto.RevisionDTO(version=0),
            source_id=sql_processor.id,
            source_group_id=process_group_0.id,
            source_type="PROCESSOR",
            destination_id=mongo_processor.id,
            destination_group_id=process_group_0.id,
            destination_type="PROCESSOR",
            component=nipyapi.nifi.models.connection_dto.ConnectionDTO(
                source=nipyapi.nifi.models.connectable_dto.ConnectableDTO(
                    id=sql_processor.id,
                    group_id=process_group_0.id,
                    type="PROCESSOR"
                ),
                destination=nipyapi.nifi.models.connectable_dto.ConnectableDTO(
                    id=mongo_processor.id,
                    group_id=process_group_0.id,
                    type="PROCESSOR"
                ),
                selected_relationships=["failure", "success"],
            ),
        ),
    )

Not pretty, but it works.

image

Chaffelson commented 5 years ago

Nice work! It looks like most of that should be able to be abstracted away - I think you would just need the two Objects to be connected (or even just the GUIDs) and the relationships or other connection parameters to include, unless you think a more complex and complete implementation would be necessary. If the two entities are passed in, it should be possible to test that the parent Process Group is the same and a connection is reasonable to attempt.

gaving commented 5 years ago

@Chaffelson Sounds good! For my purposes I've wrapped this away in a method so it's suiting my needs but yeah could be a little less verbose.

Chaffelson commented 5 years ago

Hi @gaving I've pushed commit https://github.com/Chaffelson/nipyapi/commit/8fd6317700e7c83203e51ec74ef3efb8de6d8e63 into the next branch for this feature set, if you want to review it and give me your thoughts I'd appreciate it

gaving commented 5 years ago

Hi @Chaffelson! Looks great, I'll hopefully get a chance to play with it tomorrow.

gaving commented 5 years ago

@Chaffelson That was satisfying!

image

Removed my custom code and method. The only thing is I see is infer_object_label_from_class is only supporting processors at the minute so I can't connect up my output port.

Do you have plans to implement that method? I was gonna submit a pull but can't work out how to check if nipyapi.nifi.PortEntity is an input or output port to return INPUT_PORT or OUTPUT_PORT respectively (and I guess the same is for the remote ports).

Chaffelson commented 5 years ago

I wanted to check this was in the right direction before diving into the other potential connection objects, as to do them properly I should add create/list/get/delete for each of them. I will head into those if I get time this weekend, but if you find a solution in the meantime please do share it.

gaving commented 5 years ago

No problem, it was pretty much what I wrapped my workaround in. Makes sense to be <source>, <target>, <relationships> and you have reduced my parameters by inferring the other objects in the method.

So pretty happy with the API this end but I'll keep my eyes peeled for any changes / next release.

Thanks again for the solution.

Chaffelson commented 5 years ago

Pushed a new commit with updated infer function and some other useful stuff for retrieving various canvas components, let me know what you think

chris-heathwood-mood commented 5 years ago

Just in case you need it, to link process groups you have to grab the output port inside it and link that across without specifying the selected_relationships e.g.

nipyapi.nifi.ProcessGroupsApi().create_connection(
    id=process_group.id,
    body=nipyapi.nifi.models.connection_entity.ConnectionEntity(
        revision=nipyapi.nifi.models.revision_dto.RevisionDTO(version=0),
        source_id=output_port_from_process_group.id,
        source_group_id=deployed_process_group.id,
        source_type="OUTPUT_PORT",
        destination_id=output_port.id,
        destination_group_id=process_group.id,
        destination_type="OUTPUT_PORT",
        component=nipyapi.nifi.models.connection_dto.ConnectionDTO(
            source=nipyapi.nifi.models.connectable_dto.ConnectableDTO(
                id=output_port_from_process_group.id,
                group_id=deployed_process_group.id,
                type="OUTPUT_PORT"
            ),
            destination=nipyapi.nifi.models.connectable_dto.ConnectableDTO(
                id=output_port.id,
                group_id=process_group.id,
                type="OUTPUT_PORT"
            ),
        ),
    ),
)
Chaffelson commented 5 years ago

@chris-heathwood-mood thanks for the contribution there, I wasn't aware of that edge case. I have updated the create_connection function to cover more ground, though I still consider it experimental until we have coverage of all the various object types.

def create_connection(source, target, relationships=None, name=None):
    """
    Creates a connection between two objects for the given relationships

    Args:
        source: Object to initiate the connection, e.g. ProcessorEntity
        target: Object to terminate the connection, e.g. FunnelEntity
        relationships (list): list of strings of relationships to connect, may
            be collected from the object 'relationships' property (optional)
        name (str): Defaults to None, String of Name for Connection (optional)

    Returns:
        (ConnectionEntity): for the created connection

    """
    # determine source and destination strings by class supplied
    source_type = nipyapi.utils.infer_object_label_from_class(source)
    target_type = nipyapi.utils.infer_object_label_from_class(target)
    if source_type not in ['OUTPUT_PORT', 'INPUT_PORT']:
        source_rels = [x.name for x in source.component.relationships]
        if relationships:
            assert all(i in source_rels for i in relationships), \
                "One or more relationships [{0}] not in list of valid Source " \
                "Relationships [{1}]" \
                    .format(str(relationships), str(source_rels))
        else:
            # if no relationships supplied, we connect them all
            relationships = source_rels
    if source_type == 'OUTPUT_PORT':
        # the hosting process group for an Output port connection to another
        # process group is the common parent process group
        parent_pg = get_process_group(source.component.parent_group_id, 'id')
        if parent_pg.id == get_root_pg_id():
            parent_id = parent_pg.id
        else:
            parent_id = parent_pg.component.parent_group_id
    else:
        parent_id = source.component.parent_group_id
    try:
        return nipyapi.nifi.ProcessGroupsApi().create_connection(
            id=parent_id,
            body=nipyapi.nifi.ConnectionEntity(
                revision=nipyapi.nifi.RevisionDTO(
                    version=0
                ),
                source_type=source_type,
                destination_type=target_type,
                component=nipyapi.nifi.ConnectionDTO(
                    source=nipyapi.nifi.ConnectableDTO(
                        id=source.id,
                        group_id=source.component.parent_group_id,
                        type=source_type
                    ),
                    name=name,
                    destination=nipyapi.nifi.ConnectableDTO(
                        id=target.id,
                        group_id=target.component.parent_group_id,
                        type=target_type
                    ),
                    selected_relationships=relationships
                )
            )
        )
    except nipyapi.nifi.rest.ApiException as e:
        raise ValueError(e.body)
Chaffelson commented 5 years ago

I'm going to mark this as closed and ready for the 0.12 release. Please reopen or create a new ticket if you want to see behavioural changes

cdavid15 commented 5 years ago

@Chaffelson can this be re-opened to allow for further customisation of the connection?

I am trying to set the back_pressure_object_threshold and back_pressure_data_size_threshold attributes of the ConnectionDTO class and this would provide a nice level of abstraction if these could be passed downwards opposed to having to revert to the ProcessGroupsApi?