nodestream-proj / nodestream

A Declarative framework for Building, Maintaining, and Analyzing Graph Data
https://nodestream-proj.github.io/docs/
Apache License 2.0
37 stars 11 forks source link

Added preliminary support for reading parquet files #246

Closed leszek-bq closed 7 months ago

leszek-bq commented 9 months ago

This is only a quick POC. Most likely this should not be merged as is.

There are several things to consider.

  1. Hardcoded check for parquet file in the open function in SupportedFileFormat class.

    • Parquet files are binary. Provide mechanism through project configuration yaml files to specify how the file should be opened and what encoding
  2. Currently, using pandas to read the parquet files

    • Do some research for other ways to read parquet files.
      examples: pyarrow.parquet.read_table(parquet_file) pyarrow.parquet.ParquetFile(parquet_file)
  3. Currently, the entire file is read in memory first.

    • Return an Iterable that reads files in batches

Note Correct the target branch

zprobst commented 9 months ago

I think this is a great first pass. I am excited to have this as an option for file consumption. Here are some of my reactions to the central points you raised in the PR.

1. Hardcoded check for parquet file in the open function in SupportedFileFormat class.

Indeed, this needs to be revised. As a suggestion, I've come up with this:

 @SUPPORTED_FILE_FORMAT_REGISTRY.connect_baseclass
 class SupportedFileFormat(Pluggable, ABC):
+    reader = None
+
     def __init__(self, file: Union[Path, StringIO]) -> None:
         self.file = file

     @contextmanager
     def read_handle(self) -> StringIO:
         if isinstance(self.file, Path):
-            with open(self.file, "r", encoding="utf-8") as fp:
+            with open(self.file, "rb") as fp:
                 yield fp
         else:
             yield self.file

     def read_file(self) -> Iterable[JsonLikeDocument]:
         with self.read_handle() as fp:
-            return self.read_file_from_handle(fp)
+            if self.reader is not None:
+                reader = self.reader(fp)
+            else:
+                reader = fp
+            return self.read_file_from_handle(reader)

     @classmethod
     @contextmanager
     def open(cls, file: Path) -> "SupportedFileFormat":
-        with open(file, "r", encoding="utf-8") as fp:
+        with open(file, "rb") as fp:
             yield cls.from_file_pointer_and_format(fp, file.suffix)

     @classmethod
@@ -61,21 +67,31 @@ class JsonFileFormat(SupportedFileFormat, alias=".json"):
         return [json.load(fp)]

-class LineSeperatedJsonFileFormat(SupportedFileFormat, alias=".jsonl"):
-    def read_file_from_handle(self, fp: StringIO) -> Iterable[JsonLikeDocument]:
-        return (json.loads(line) for line in fp)
+class LineSeparatedJsonFileFormat(SupportedFileFormat, alias=".jsonl"):
+    reader = TextIOWrapper
+
+    def read_file_from_handle(
+        self, reader: TextIOWrapper
+    ) -> Iterable[JsonLikeDocument]:
+        return (json.loads(line) for line in reader.readlines())

 class TextFileFormat(SupportedFileFormat, alias=".txt"):
-    def read_file_from_handle(self, fp: StringIO) -> Iterable[JsonLikeDocument]:
-        return ({"line": line} for line in fp)
+    reader = TextIOWrapper

+    def read_file_from_handle(
+        self, reader: TextIOWrapper
+    ) -> Iterable[JsonLikeDocument]:
+        return ({"line": line} for line in reader.readlines())

-class CommaSeperatedValuesFileFormat(SupportedFileFormat, alias=".csv"):
-    def read_file_from_handle(self, fp: StringIO) -> Iterable[JsonLikeDocument]:
-        if not isinstance(fp, TextIOWrapper):
-            return DictReader(TextIOWrapper(fp))
-        return DictReader(fp)
+
+class CommaSeparatedValuesFileFormat(SupportedFileFormat, alias=".csv"):
+    reader = TextIOWrapper
+
+    def read_file_from_handle(
+        self, reader: TextIOWrapper
+    ) -> Iterable[JsonLikeDocument]:
+        return DictReader(reader)

This change gets the tests to pass and opens the file in rb mode, so it should handle the parquet addition just fine. One notable omission from this change is opening files with utf-8 encoding. I am not unilaterally against supporting other file encodings, but I consider it out of scope. And this gets around the issue of hard-coded checks. What do you think about a variation of this?

Currently, using pandas to read the parquet files

I am not inherently against this. Internally, pandas seem to call off to one or more parquet backends (including yarrow). However, that leads to your third point...

Currently, the entire file is read in memory first

This could be better. However, the pandas API wants to return a data frame representing the entire file - not segments. There are other areas of suboptimal performance in nodestream. I will noodle on any recommendations i have for this, but I am also not opposed to punting on this problem as an MVP.

zprobst commented 9 months ago

I've changed the base to the 0.12 branch. With the recommendations above, I think we may be breaking (or at least subtly changing the meaning of the API for FileFormat, thus putting onto the next breaking release.

zprobst commented 9 months ago

This could be better. However, the pandas API wants to return a data frame representing the entire file - not segments. There are other areas of suboptimal performance in nodestream. I will noodle on any recommendations i have for this, but I am also not opposed to punting on this problem as an MVP.

Okay so I think maybe this is the ideal way to go and drop use of pandas. It does seems to give us a more workable API to yield each item in the parquet file.

Cole-Greer commented 7 months ago

I've rebased the branch to 0.12, removed the inefficient intermediate json serialization step, and added a simple test.

zprobst commented 7 months ago

@Cole-Greer do you mind taking a look at the build failures?

I think we just need to run the linters/formatters with make lint format

Cole-Greer commented 7 months ago

@Cole-Greer do you mind taking a look at the build failures?

I think we just need to run the linters/formatters with make lint format

Sorry about that, should be good to go now.