elixir-explorer / explorer

Series (one-dimensional) and dataframes (two-dimensional) for fast and elegant data exploration in Elixir
https://hexdocs.pm/explorer
MIT License
1.12k stars 123 forks source link

Large memory usage when using `Explorer.Dataframe.concat_columns` on 30k (small) data frames. Memory leak? #937

Closed vegabook closed 4 months ago

vegabook commented 4 months ago

Here are 30682 small parquet files in zip format. They're each single column vectors of 1024 f64 floats. About 240 Megabytes in total.

Loading them in with from_parquet works fine. 30682 Dataframes:

require Explorer.DataFrame, as: DF
# j = path_to_where_you_unzipped_the_files
d = Enum.map(File.ls!(j), fn x ->  Path.join(j, x) |> DF.from_parquet! end)

However concating them quickly overwhelms the 32GB of RAM on my machine and the process gets terminated:

dd = DF.concat_columns(d)

Instead a reduction works fine but takes about 30 minutes:

[h | r] = d
dd = Enum.reduce(r, fn n, h -> DF.concat_columns(h, n) end)

This last one never goes above about 2GB RAM usage on the machine because obviously each iteration of the accumulator gets garbage collected. But it's super slow.

Doing all the above because Explorer doesn't seem natively to support parquet datasets

josevalim commented 4 months ago

Today our concat_columns is implemented as a join but perhaps the most efficient way to create it is by retrieving all columns in a dataframe and then concatenating them directly.

You could emulate this in your code by doing something like:

d = Enum.with_index(File.ls!(j), fn x, i ->  {"row#{i}", Path.join(j, x) |> DF.from_parquet! |> DF.pull("column")} end)

Replacing "column" by the actual column name from Parquet. And then you call DF.new.

We also need to measure if the slow down is also not on our Elixir side. This should be a fun problem. :)

josevalim commented 4 months ago

@vegabook this takes about 25 seconds on my machine:

[path] = System.argv()
require Explorer.DataFrame, as: DF

path
|> File.ls!()
|> Task.async_stream(fn dir ->
  IO.inspect(dir)
  df = DF.from_parquet!(Path.join(path, dir))
  for name <- df.names, do: {name, df[name]}
end)
|> Enum.flat_map(fn {:ok, list} -> list end)
|> DF.new()
|> IO.inspect()

There is likely a faster implementation available.

vegabook commented 4 months ago

@vegabook this takes about 25 seconds on my machine:

[path] = System.argv()
require Explorer.DataFrame, as: DF

path
|> File.ls!()
|> Task.async_stream(fn dir ->
  IO.inspect(dir)
  df = DF.from_parquet!(Path.join(path, dir))
  for name <- df.names, do: {name, df[name]}
end)
|> Enum.flat_map(fn {:ok, list} -> list end)
|> DF.new()
|> IO.inspect()

There is likely a faster implementation available.

Hey your first answer worked great:

xx = Enum.with_index(File.ls!(j), fn x, i -> {"col#{i}", Path.join(j, x) |> DF.from_parquet! |> DF.pull(0)} end) |> DF.new

Less than 10 seconds on a fairly weak machine. So basically create a list of tuples, then feed it to DF.new. Great. Thank you.

Closing the issue.

josevalim commented 4 months ago

I will reopen this since we can optimize it!