Chaffelson / nipyapi

A convenient Python wrapper for Apache NiFi
Other
245 stars 76 forks source link

UX: Upload/Download a bucket/item/version of the flow from the registry #30

Closed aperepel closed 6 years ago

aperepel commented 6 years ago

As a deployer I would like to issue a command to download a specific (or latest if version omitted) flow representation and save this artifact in a file. The file is to be in a format understood by another instance of the registry for import.

aperepel commented 6 years ago

Bonus points for allowing the deployer to specify mnemonic and friendly names path instead of using the uuids in the URI. Let the system lookup the actual IDs transparently.

Chaffelson commented 6 years ago

Working on this now, my logic flow is something like:

  1. Get any version of any flow in any bucket (Effectively VersionedFlowSnapshot.flow_contents)
  2. Deterministically seralise that Flow into YAML that can be diff'd
  3. Export / Import the YAML safely

Bonus: Might be able to make this work by coding enforcement of unique names for ProcessGroups and getting the user to pass in a set of parameters. Will think on it as I get import/export working

aperepel commented 6 years ago

Question. If the registry already has a representation json and metadata in it, why do we need to serialize to yaml? Or this json is not usable for import?

Chaffelson commented 6 years ago

@kevdoran may also have opinions here. The Json needs to be pruned to be imported, as it contains the source object IDs and other information - so I don't think it makes sense to export the 'raw' object anyway. So this means we'd be doing some form of transformation on the object between the Registry and import/export anyway, so it may as well be useful. Obviously transforming it from Json to another format introduces some complexity, and will require testing to ensure that no useful information is lost on the round trip. I think this is the same testing that will validate which format is more useful so I'm pushing forward with it. The main argument for Yaml is that we are moving towards support for things like Ansible, which like to define configurations in Yaml. It makes sense to me to standardise all import/export functions on a single format to reduce code complexity and confusion for the user, so if I can use the same Yaml for import/export that we use for user-defined objects for import, then it's more consistent. Obviously I need to thoroughly test both options, as at this point I do not know what will work with the API calls, but here's a un-pruned example object in both for you to compare. First is Json:

pprint(ver_flow_snapshot_0.flow_contents)
{'comments': '',
 'component_type': 'PROCESS_GROUP',
 'connections': [],
 'controller_services': [],
 'funnels': [],
 'group_identifier': None,
 'identifier': '198075d5-84c8-3fe0-8afe-c831857a2155',
 'input_ports': [],
 'labels': [],
 'name': 'nipyapi_console_process_group_0',
 'output_ports': [],
 'position': {'x': 400.0, 'y': 400.0},
 'process_groups': [],
 'processors': [{'annotation_data': None,
                 'auto_terminated_relationships': ['success'],
                 'bulletin_level': 'WARN',
                 'bundle': {'artifact': 'nifi-standard-nar',
                            'group': 'org.apache.nifi',
                            'version': '1.5.0'},
                 'comments': '',
                 'component_type': 'PROCESSOR',
                 'concurrently_schedulable_task_count': 1,
                 'execution_node': 'ALL',
                 'group_identifier': '198075d5-84c8-3fe0-8afe-c831857a2155',
                 'identifier': 'cbf1d237-0b72-38cf-8bbf-3ecf44743781',
                 'name': 'nipyapi_console_processor_0',
                 'penalty_duration': '30 sec',
                 'position': {'x': 400.0, 'y': 400.0},
                 'properties': {'Batch Size': '1',
                                'Data Format': 'Text',
                                'File Size': '0B',
                                'Unique FlowFiles': 'false',
                                'character-set': 'UTF-8'},
                 'property_descriptors': {'Batch Size': {'display_name': 'Batch '
                                                                         'Size',
                                                         'identifies_controller_service': False,
                                                         'name': 'Batch Size'},
                                          'Data Format': {'display_name': 'Data '
                                                                          'Format',
                                                          'identifies_controller_service': False,
                                                          'name': 'Data '
                                                                  'Format'},
                                          'File Size': {'display_name': 'File '
                                                                        'Size',
                                                        'identifies_controller_service': False,
                                                        'name': 'File Size'},
                                          'Unique FlowFiles': {'display_name': 'Unique '
                                                                               'FlowFiles',
                                                               'identifies_controller_service': False,
                                                               'name': 'Unique '
                                                                       'FlowFiles'},
                                          'character-set': {'display_name': 'Character '
                                                                            'Set',
                                                            'identifies_controller_service': False,
                                                            'name': 'character-set'},
                                          'generate-ff-custom-text': {'display_name': 'Custom '
                                                                                      'Text',
                                                                      'identifies_controller_service': False,
                                                                      'name': 'generate-ff-custom-text'}},
                 'run_duration_millis': 0,
                 'scheduling_period': '1s',
                 'scheduling_strategy': 'TIMER_DRIVEN',
                 'style': {},
                 'type': 'org.apache.nifi.processors.standard.GenerateFlowFile',
                 'yield_duration': '1 sec'}],
 'remote_process_groups': [],
 'variables': {},
 'versioned_flow_coordinates': None}

And here's the Yaml version:

identifier: 198075d5-84c8-3fe0-8afe-c831857a2155
name: nipyapi_console_process_group_0
comments: ''
position:
  x: 400.0
  y: 400.0
process_groups: []
remote_process_groups: []
processors:
- identifier: cbf1d237-0b72-38cf-8bbf-3ecf44743781
  name: nipyapi_console_processor_0
  comments: ''
  position:
    x: 400.0
    y: 400.0
  bundle:
    group: org.apache.nifi
    artifact: nifi-standard-nar
    version: 1.5.0
  style: {}
  type: org.apache.nifi.processors.standard.GenerateFlowFile
  properties:
    character-set: UTF-8
    File Size: 0B
    Batch Size: '1'
    Unique FlowFiles: 'false'
    Data Format: Text
  property_descriptors:
    character-set:
      name: character-set
      display_name: Character Set
      identifies_controller_service: false
    File Size:
      name: File Size
      display_name: File Size
      identifies_controller_service: false
    generate-ff-custom-text:
      name: generate-ff-custom-text
      display_name: Custom Text
      identifies_controller_service: false
    Batch Size:
      name: Batch Size
      display_name: Batch Size
      identifies_controller_service: false
    Unique FlowFiles:
      name: Unique FlowFiles
      display_name: Unique FlowFiles
      identifies_controller_service: false
    Data Format:
      name: Data Format
      display_name: Data Format
      identifies_controller_service: false
  annotation_data:
  scheduling_period: 1s
  scheduling_strategy: TIMER_DRIVEN
  execution_node: ALL
  penalty_duration: 30 sec
  yield_duration: 1 sec
  bulletin_level: WARN
  run_duration_millis: 0
  concurrently_schedulable_task_count: 1
  auto_terminated_relationships:
  - success
  component_type: PROCESSOR
  group_identifier: 198075d5-84c8-3fe0-8afe-c831857a2155
input_ports: []
output_ports: []
connections: []
labels: []
funnels: []
controller_services: []
versioned_flow_coordinates:
variables: {}
component_type: PROCESS_GROUP
group_identifier:

Both need to be deterministically ordered (probably alpha sort), and pruned of extra information, but I think the Yaml is fundamentally more approachable from a user perspective.

aperepel commented 6 years ago

I'd imagine this is the core functionality a registry must have. Shouldn't we be working work the NiFi team to add it in the product maybe? My only concern is every other tool will have to come up with its own way, making things fragile. For diff purposes NiFi does detect changes with a current format already, so the underlying format is less critical.

Chaffelson commented 6 years ago

I have just pushed an initial version of this functionality. There is yet more to do before I would release it, and before you ask I will also add a tidy convenience wrapper, but the core functionality is there and working. Relevant snippets here for convenience if you want to test it:

from nipyapi import versioning

help(versioning.export_flow) Help on function export_flow in module nipyapi.versioning: export_flow(flow_snapshot, file_path=None, mode='json') Exports the flow_contents of a given VersionedFlowSnapshot in the provided format mode :param flow_snapshot: The VersionedFlowSnapshot to export :param file_path: Optional; String for the file to be written. No file will be written if left blank, the encoded object is still returned :param mode: String 'json' or 'yaml' to specific the encoding format :return: String of the encoded Snapshot

help(versioning.import_flow)

Help on function import_flow in module nipyapi.versioning: import_flow(bucket_id, encoded_flow=None, file_path=None, flow_name=None, flow_id=None) Imports a given encoded_flow version into the bucket and flow described, may optionally be passed a file to read the encoded flow_contents from. Note that only one of encoded_flow or file_path, and only one of flow_name or flow_id should be specified. :param bucket_id: ID of the bucket to write the encoded_flow version to :param encoded_flow: Optional; String of the encoded flow to import; if not specified file_path is read from. EXOR file_path :param file_path: Optional; String of the file path to read the encoded flow from, if not specified encoded_flow is read from. EXOR encoded_flow :param flow_name: Optional; If this is to be the first version in a new flow object, then this is the String name for the flow object. EXOR flow_id :param flow_id: Optional; If this is a new version for an existing flow object, then this is the ID of that object. EXOR flow_name :return: the new VersionedFlowSnapshot

Chaffelson commented 6 years ago

Fine tuning tests and edge cases on this today

Chaffelson commented 6 years ago

This functionality is now implemented, however it raised several concerns about how some other parts of the code base are used ineffectively which I now think needs some refactoring before moving to 0.8.0. Fortunately, none of the refactoring is particularly major, I'm just a bit of a pedant.

Chaffelson commented 6 years ago

Refactoring complete, this code is now in master from PR #46 Do you want to test this @aperepel ?

Chaffelson commented 6 years ago

Close this issue as the code is ready for release - please reopen if you find any problems.