nv-morpheus / Morpheus

Morpheus SDK
Apache License 2.0
309 stars 119 forks source link

[FEA]: Add parquet support to write_to_file_stage.py #980

Open tgrunzweig-cpacket opened 1 year ago

tgrunzweig-cpacket commented 1 year ago

Is this a new feature, an improvement, or a change to existing functionality?

New Feature

How would you describe the priority of this feature request

Medium

Please provide a clear description of problem this feature solves

I want morpheus to write inference outputs in parquet format.

The current implementation of write_to_file_stage.py has some JSON and csv specific instructions in _convert_to_strings(), and raises NotImplementedError for other file types.

The inference outputs files in DFP can end up being pretty big, and are likely further processed, so a machine readable efficient format (like parquet) would make deployment more efficient.

Describe your ideal solution

write_to_file_stage.py

           def node_fn(obs: mrc.Observable, sub: mrc.Subscriber):

                # Ensure our directory exists
                os.makedirs(os.path.realpath(os.path.dirname(self._output_file)), exist_ok=True)

                    def write_to_file(x: MessageMeta):

                     # new
                      if(path.suffix(self._output_file) == '.parquet'):
                        x.df.to_paruqet(self._output_file)

                      else:
                        # Open up the file handle
                         with open(self._output_file, "a") as out_file:
                        # end new
                              lines = self._convert_to_strings(x.df)
                ....

Describe any alternatives you have considered

No response

Additional context

No response

Code of Conduct

jarmak-nv commented 1 year ago

Hi @Tzahi-cpacket!

Thanks for submitting this issue - our team has been notified and we'll get back to you as soon as we can! In the mean time, feel free to add any relevant information to this issue.

mdemoret-nv commented 1 year ago

@Tzahi-cpacket After looking into this more, one issue is that our write_to_file_stage.py will iteratively write to files as messages are received and keep the file stream open for the duration of the stage. With CSV and JSON lines format, this works well to offload data to disk. However, the only library I found that supports appending to Parquet files is fastparquet. So this leaves a few options:

  1. Keep all messages in memory until pipeline shutdown and then concat them into a single DataFrame which can be written to disk
  2. Write to parquet using the fastparquet engine in pandas
    1. This would require converting every DataFrame from GPU->CPU before writing to disk which will have a performance penalty
  3. Write to a partitioned parquet dataset
    1. Each message would be treated as a different partition
    2. A rough outline of how this would work can be found here: https://docs.rapids.ai/api/cudf/legacy/api_docs/api/cudf.io.parquet.parquetdatasetwriter/

Are any of these options preferable over another?

tgrunzweig-cpacket commented 1 year ago

perhaps we can discuss some questions regarding this?

  1. Are you not concerned that the files (current csv or json implementation, future single parquet) become huge over time? This is not just our usecase too, I mean in general.

  2. There is something to be said for a "perpetual usecase", where the pipeline is running for an extended period of time, weeks? months maybe? Once we implement a "directory watcher" as the input stage, the pipeline should never shut down, right? As far as we understand, we can even update the models using a second pipeline while the inference pipeline is working. So the pipeline will just keep on going, outputting forever.

  3. Is it technically possible to build an output dataframe in memory during pipeline operations, and flush it to a new file when the size has reached a threshold, or when a delta-time ("every 10 minutes") has reached a threshold?

  4. We don't have to write in parquet, it's just a reasonable standard for fast IO for medium and large size data. Do you have other suggestions? As long as there is an open source reading library we could probably work around it. csv's and json are just not very size efficient once you go beyond small data sets.

Cheers, Tzahi

On Fri, Jun 30, 2023 at 10:54 AM Michael Demoret @.***> wrote:

@Tzahi-cpacket https://github.com/Tzahi-cpacket After looking into this more, one issue is that our write_to_file_stage.py will iteratively write to files as messages are received and keep the file stream open for the duration of the stage. With CSV and JSON lines format, this works well to offload data to disk. However, the only library I found that supports appending to Parquet files is fastparquet. So this leaves a few options:

  1. Keep all messages in memory until pipeline shutdown and then concat them into a single DataFrame which can be written to disk
  2. Write to parquet using the fastparquet engine in pandas
    1. This would require converting every DataFrame from GPU->CPU before writing to disk which will have a performance penalty
  3. Write to a partitioned parquet dataset
    1. Each message would be treated as a different partition
    2. A rough outline of how this would work can be found here: https://docs.rapids.ai/api/cudf/legacy/api_docs/api/cudf.io.parquet.parquetdatasetwriter/ https://url.avanan.click/v2/___https://docs.rapids.ai/api/cudf/legacy/api_docs/api/cudf.io.parquet.parquetdatasetwriter/___.YXAzOmNwYWNrZXRuZXR3b3JrczphOmc6NGQ2Mjc3NmQxMTQwNjczZjIyNTYxMmIzOWVhOWNiNTQ6NjphYmU4OmU2NWMyZmE0ZDczMGYzZjhiOGU5MjcyNTI0MzM2YTQ0OTg4YjEzMjQyY2JjZTA0MDA2NzIyYTkzOGE1Y2I0Y2E6aDpU

Are any of these options preferable over another?

— Reply to this email directly, view it on GitHub https://github.com/nv-morpheus/Morpheus/issues/980#issuecomment-1614986833, or unsubscribe https://github.com/notifications/unsubscribe-auth/AVNVGEPG24JXQ6QJ6XCBKT3XN4HDXANCNFSM6AAAAAAZBHEEOE . You are receiving this because you were mentioned.Message ID: @.***>