marklogic / nifi

Mirror of Apache NiFi to support ongoing MarkLogic integration efforts
https://marklogic.github.io/nifi/
Apache License 2.0
12 stars 23 forks source link

PutMarkLogic relationship for when all batches are complete #196

Closed ics-joshuabowers closed 7 months ago

ics-joshuabowers commented 1 year ago

We are using Nifi to insert millions records into MarkLogic from a relational database. One of the key requirements is we need to be able to insert a record once every batch from within a single flow file is complete (i.e. if there are 12 batches in a flow file all 12 must complete successfully for this relationship to be triggered). This way we can know weather or not all the records made it to MarkLogic successfully or not. It also lets us know when Nifi has completed one flow file which then allows us to trigger other automations either with in Nifi or in MarkLogic.

This is a critical feature for us to use Nifi and we would like to know if there is a way to do this or if this could be added if it doesn't exist.

Thank you for your help and time!

rjrudin commented 1 year ago

Hey @joshuabowersChurch - based on the support ticket, wanted to confirm first that you're using PutMarkLogicRecord (PMLR)? And assuming that's the case - how many docs do you tend to extract from a single record?

My initial thought was that a user would have a new option (defaulting to false for backwards compatibility) to tell PMLR to wait for all the docs from a single record to be written to MarkLogic. And after that finishes, it would send a FlowFile to a new relationship that captures at least the URIs of all the written documents. And then control would proceed to the next processor in the NiFi flow.

ics-joshuabowers commented 1 year ago

Hi @rjrudin, first of all thank you for your quick response! We are using the PMLR currently. The number of records vary from 1.5 million to just under 3 million. The results are comming from the ConvertAveroToJson processor and to speed up performance I did not split the batch up into indavidual records, so they all come in a single FlowFile.

I think the idea you described would fit our needs. Thank you again for jumping into this so quickly!

rjrudin commented 1 year ago

For something like millions of records - depending of course on a variety of factors, that could take dozens of seconds to minutes to complete. Do you see any issue with the PMLR processor hanging onto that one incoming FlowFile for that long - or is that pretty much exactly what you're expecting should happen from an overall flow perspective - i.e. "This FlowFile should not proceed to the next processor until we're certain that everything has been written to MarkLogic"?

ics-joshuabowers commented 1 year ago

You are correct in that the processor would take a few minutes to work through all those records, but for what we are doing that is acceptable. Thus, we are expecting that the PLMR would hold on to the FlowFile until it is completed. This way we have the guarentee that it completes and then moves through the rest flow.

rjrudin commented 1 year ago

Hey @ics-joshuabowers - just an FYI, we did a 1.16.3.3 release today that fixed a few bugs. We are then planning another release soon where we'll update the NiFi dependencies to the latest version - 1.22.0 (but we expect the connector to continue working on much older versions of NiFi). We will address this in that next release.

ics-joshuabowers commented 1 year ago

Thanks for the update @rjrudin!

rjrudin commented 10 months ago

@ics-joshuabowers Apologie for the delay here, we've been working some other projects. With NiFi 2.0 on the horizon, we are considering a 1.23.0 release that builds against the latest NiFi 1.23.x dependencies. To our knowledge, this should be backwards compatible as long as we don't depend on newer connector APIs, which we don't see any reason for doing so. We'll be testing against NiFi 1.16 for backwards compatibility too.

Which version of NiFi are you on today?

ics-joshuabowers commented 10 months ago

Hi @rjrudin, thank you for the update. We are currently running Nifi version 1.23.2. FYI, we've been using version 1.16.3.3 of this plugin, with Nifi version 1.23.2, and have not seen any issues as of yet.

rjrudin commented 8 months ago

@ics-joshuabowers We are finally getting to this! What was your thought on including all the URIs in the new FlowFile sent to the new relationship? My concern is if we do that, we could run out of memory if we're storing 3m URIs in memory and then in a single Flow File.

ics-joshuabowers commented 8 months ago

Hi @rjrudin, storing all the URI's in the flow would be problematic. Beyond that I don't have any ideas. Thank you for taking this on and working with me.

rjrudin commented 7 months ago

Hey @ics-joshuabowers - I was testing out PutMarkLogicRecord and I think the ORIGINAL relationship may be meeting your need here. I used the test instructions at https://marklogic.github.io/nifi/bulk-record-insert-into-marklogic , where there's a single zip file that contains a JSON file with 1000 objects in it. I used a thread count of 1 and a batch size of 100, and I verified the following:

  1. All 1k objects were written to MarkLogic.
  2. 10 calls were made to MarkLogic and 10 FlowFiles were sent to the BATCH_SUCCESS relationship.
  3. After all docs were written, the original/incoming FlowFile was sent to the ORIGINAL relationship.

Here are the attributes of the FlowFile sent to ORIGINAL:

FlowFile Properties
Key: 'entryDate'
        Value: 'Mon Jan 29 16:33:30 UTC 2024'
Key: 'lineageStartDate'
        Value: 'Mon Jan 29 16:33:30 UTC 2024'
Key: 'fileSize'
        Value: '1116492'
FlowFile Attribute Map Content
Key: 'absolute.path'
        Value: '/opt/nifi/nifi-current/IOT-Data.json/'
Key: 'file.creationTime'
        Value: '2024-01-29T16:10:42+0000'
Key: 'file.encryptionMethod'
        Value: 'NONE'
Key: 'file.group'
        Value: 'nifi'
Key: 'file.lastAccessTime'
        Value: '2024-01-29T16:10:43+0000'
Key: 'file.lastModifiedTime'
        Value: '2024-01-29T16:10:42+0000'
Key: 'file.owner'
        Value: 'nifi'
Key: 'file.permissions'
        Value: 'rw-r--r--'
Key: 'file.size'
        Value: '76055'
Key: 'filename'
        Value: 'IOT-Data.json'
Key: 'fragment.count'
        Value: '1'
Key: 'fragment.identifier'
        Value: '7e9c60b0-191e-4698-a723-45beb683ca26'
Key: 'fragment.index'
        Value: '1'
Key: 'mime.type'
        Value: 'application/octet-stream'
Key: 'path'
        Value: '/'
Key: 'segment.original.filename'
        Value: 'IOT-Data.json'
Key: 'uuid'
        Value: '941158ac-f0ae-43ed-82c8-707c019806e8'

For your use case, it seems that having this FlowFile sent to ORIGINAL and also having some reference to the input source - in this case, the file IOT-Data.json - may suffice? Please let me know.

ics-joshuabowers commented 7 months ago

Thank you @rjrudin for following up with this. It does look like the batch success relationship along with the original relationship meets our needs. Thank you for sharing this, I'll close the ticket as this resolves our need.