blaze / castra

Partitioned storage system based on blosc. **No longer actively maintained.**
BSD 3-Clause "New" or "Revised" License
153 stars 21 forks source link

Support for on-disk appends, partitioning #36

Open jcrist opened 9 years ago

jcrist commented 9 years ago

I've been working on a refactor of Castra - before I spend any more time on this, I should probably get some feedback. Here's the plan:

Issues I'm attempting to solve:

  1. Castra provides no validation that partitions split evenly - indices like [[1, 2, 3, 3], [3, 3, 4, 5, 6], ...] were possible (and happened to me)
  2. Castra provides no easy way to say "partition weekly", without manually doing the partitioning elsewhere (issue #3)

The plan:

  1. Add partitionby=None to the init signature. This will live in meta. If None, no repartitioning is done by Castra. Can also be a time period (things you can pass to resample in pandas).
  2. extend checks current partitions for equality overlap (even if partitionby=None). There are 3 cases that can happen here:
    1. Start of new frame is before end of existing partition. This errors
    2. Start of new frame is equal to end of existing partition. The equal parts are split off and appended to existing partition. Remainder is stored as new partition.
    3. Start of new frame is after existing partition. New frame is written to disk (current behavior)
  3. If partitionby != None, then data is partitioned by Castra into blocks. extend should still take large dataframes (calling extend on a row is a bad idea), but will group them into partitions based on the rule passed to partitionby. Using the functionality provided by bloscpack, the on disk partitions can be appended to with little overhead. This makes writing in cases where this happens slightly slower, but has no penalty on reads.
  4. Add extend_sequence function. This takes an iterable of dataframes (can be a generator), and does the partitioning in memory instead of on disk. This will be faster than calling extend in a loop (no on disk appends), but will result in the same disk file format.

This method means that the disk will match what's in memory after calls to extend or extend_sequence complete, will allow castra to do partitioning for the user, and will ensure that the partitions are valid. I have a crude version of this working now, and have found writes to be only slightly penalized when appends happen (no penalty if they don't), and no penalty for reading from disk.

mrocklin commented 9 years ago

Validating that input blocks don't overlap with existing blocks sounds like a clear win.

Appending to existing partitions might be useful but might also be more than we want to maintain within castra itself. @esc and I considered buffering blocks and appending to existing blocks this when we first built castra but decided against it in order to keep castra very simple. Our plan was to add this stuff on top of castra in external code. This came out of dealing with the bcolz codebase which, while much more fully featured that castra, is also more expensive to maintain. It may be that it's time to revisit this decision; I just wanted to share historical reasons on how we've tried to keep the core simple.

I like the extend sequence idea. It matches how I tend to use castra today, e.g. for df in dfs: mycastra.extend(df), and aligns well with the idea that buffering logic can exist external to the existing model.

Appending onto existing blocks sounds like it might be tricky. I understand that you've been diving into bloscpack to do this. I suspect that this would marry castra and bloscpack more tightly than they are currently. This tight coupling concerns me, especially if we want to switch to using other compression libraries. This concern about marrying the two is motivated a bit by bloscpack not releasing the GIL, see https://github.com/Blosc/python-blosc/issues/101. I would be -0.5 on any change that removed this option going forward.

jcrist commented 9 years ago

My thoughts:

Castras should have the following invariants:

  1. New data always comes after old data
  2. Partitions must not overlap (i.e. don't do this: [[1, 2, 3, 3], [3, 3, 4, 5, 6]...]

Additionally, having the index be a time series partitioned by some period is a common pattern. We should try to make this as easy as possible for users, while also ensuring the two invariants above.

In my mind, the following use case should work:

# Create a castra partitioned by day:
c = Castra('filepath', template=temp, partitionby='d')
# Add some existing data
c.extend_sequence(some_iterator)
c.close()
# Get new data at a later time, and add it, while keeping the partitioning scheme
c = Castra('filepath')
c.extend(df)

I really want to support this functionality, as it's something I would expect from a tool like this. Saying "this castra is partitioned by day" means to me that both extend and extend_sequence should respect that.

mrocklin commented 9 years ago

I'm not sure that castras should manage partition sizes; this may be a pandoras box (although if you have an implementation that does this well that could be a good counterargument.)

All use cases that I've come across would be satisfied by moving the partitionby keyword argument toextend_sequence`.

# Create a castra
c = Castra('filepath', template=temp)
# Add some existing data,  partitioned by day
c.extend_sequence(some_iterator, partitionby='d')
c.close()

Direct use of extend is up to the user to coordinate:

c = Castra('filepath')
c.extend(df)  # user manages partition size directly

This keeps a lot of logic out of the actual castra object and yet satisfies most use cases I can think of. It's also something that I think can be done very cheaply.

jcrist commented 9 years ago

If you have a castra that already exists on disk up to May 15, and you have a dataframe from May 16 to June 16, what does c.extend_sequence([df], partitionby='M') do? What does c.extend(df) do?

Or a simpler case, suppose you have a castra that has an index up to May 16, 0:00:00, and you have a dataframe with a few more datapoints at that same time. How can you add that dataframe to the castra, without modifying the existing partitions?

mrocklin commented 9 years ago

In the firs case I would expect to add two partitions with extend_sequence and one partition with extend

In the second case I would expect castra to throw an error.

mrocklin commented 9 years ago

If there is an application where the second case ends up being really important (e.g. log files that come in slightly out of order) then that sounds like a motivating use case. Do you have such a case?

jcrist commented 9 years ago

I don't, and I don't think castra should handle "out-of-order" data specifically. I do think it should work on overlapping order though (end castra index == start of next frame)

I do think that adding periodic new data to an existing castra is something that should work, and should be easy to do. Sometimes these datasets overlap. My main use-case is also covered by extend_sequence, but this seemed like a good thing to add, especially if it's moderately cheap.

mrocklin commented 9 years ago

I just tried using df.to_castra(..., sorted_index_column='pickup_datetime') on the nyctaxi dataset and got this error

ValueError: Index of new dataframe less than known data

So it looks like we are erring at least in the case of <. This should probably be changed to <=.