man-group / arctic

High performance datastore for time series and tick data
https://arctic.readthedocs.io/en/latest/
GNU Lesser General Public License v2.1
3.06k stars 584 forks source link

[ChunkStore] Decompression error on concurrent read & write #655

Open ddebie opened 6 years ago

ddebie commented 6 years ago

Arctic Version

1.70.0

Arctic Store

ChunkStore

Platform and version

CentOS 7.2

Description of problem and/or code sample that reproduces the issue

I'm getting an error when I have two threads simultaneously reading and writing from one symbol. The reading thread will periodically throw a LZ4BlockError:

LZ4BlockError: Decompression failed: corrupt input or insufficient space in destination buffer. Error code: 24049

Here is the code to reproduce it:

def get_library():
    return Arctic('some_host:1234').get_library(LIB_NAME)

def write_loop():
    library = get_library()
    while True:
        data = create_data(100)  # Creates 100 rows of random data
        library.append('SYMBOL', data)

def read_loop():
    library = get_library()
    while True:
        df = library.read('SYMBOL')

proc = mp.Process(target=write_loop)
proc.start()

try:
    read_loop()
finally:
    proc.terminate()

From a quick check, it seems that the data being passed to decompress(_str) in _compression.py is not valid lz4 - could the the block metadata and data be out of sync?

bmoscon commented 6 years ago

I'm pretty sure none of the libraries are threadsafe. There are multiple documents that make up the stored data and you can't read and write to them at the same time (though you can write new data to chunkstore and read back older data that is not part of that chunk at the same time).

correct me if I'm wrong @jamesblackburn

yschimke commented 6 years ago

This seems like something Arctic should be handling, and have some documented expectations of what clients might observe. Potentially with ways to documented ways resolve e.g. either retry on a class of errors (similar to SQL optimistic lock failures) or ways to resolve via config e.g. isolation levels.

ddebie commented 6 years ago

Thanks for your responses guys

I've looked into this a bit more and from what I can tell, it's due to writes not being atomic - there is a short period of time where the metadata document is out of sync with the data. So it seems that the problem exists even when running from a single thread.

Even with a single thread, if an application dies while writing, it's possible to cause permanent data corruption (and I've been able to reproduce this pretty consistently). If I run just the writing code (below), and then randomly kill the process, eventually the symbol will become corrupted and unreadable.

def write_loop():
    library = get_library()
    while True:
        data = create_data(100)  # Creates 100 rows of random data
        library.append('SYMBOL', data)

Is there any way around this? It seems to me that without writes being atomic, permanent data corruption is possible.

bmoscon commented 6 years ago

I'm well aware of the issue you're seeing, but its not easily solved in arctic. It is more easily solved by the application using arctic, but something might be able to be done in arctic. The multi document atomic writes are supported by mongo, but only in version 4.0+

ddebie commented 5 years ago

How would you recommend solving this in the application using arctic? As I understand it, if the application dies or even if the box goes down at the wrong time, the data will be corrupted. How can the application avoid this?

Also, are there currently any plans for Arctic to utilize the new multi doc atomic writes in mongo in the future?

bmoscon commented 5 years ago

either don't read while writing or make sure you are not reading from the chunk being written to. If your chunksize were hourly you could make sure you were never reading from the current hour (for example). Obviously there is nothing you can do to prevent power outages or the like.

There are not plants for arctic to support multi doc atomic writes at the moment

yschimke commented 5 years ago

@bmoscon Can we leave this open to track what is a valid enhancement request?

bmoscon commented 5 years ago

of course, at some point we'll want to do this

ddebie commented 5 years ago

either don't read while writing or make sure you are not reading from the chunk being written to. If your chunksize were hourly you could make sure you were never reading from the current hour (for example). Obviously there is nothing you can do to prevent power outages or the like.

There are not plants for arctic to support multi doc atomic writes at the moment

The issue exists even when you're not reading/writing concurrently though. Even if I do just a single write, it is possible to completely corrupt the data so that it's no longer readable. In my view such an issue is pretty critical, and with it I'm not sure how Arctic can be safely used in a production environment where data integrity matters. We have been considering switching our tick databases to Arctic and have liked what we've seen so far, but this would definitely be a blocker to us.

bmoscon commented 5 years ago

I assume you're referring to a power outage? The only two collections that matter for chunkstore are symbol and the data. the symbol document doesnt really have anything of importance that isnt written once when its created (chunk size, column data, etc), After that is written, if you are not reading and writing concurrently you really shouldnt have any issues, even with a power outage as mongo has supported single document atomic writes since 3.2 I believe. The only information in the symbol document that is updated are things like row counts, number of chunks, etc but they are only for statistical purposes.

ddebie commented 5 years ago

Yep I'm referring to anything that could kill a process while it is writing to Arctic. Could be a power outage, hardware failure, even just killing the python process. This example reproduces the data corruption for me:

import pandas as pd
from arctic import Arctic

library = Arctic('some_host:1234').get_library('compression_test')

while True:
    data = create_data(100) # Create dataframe with 100 rows random data
    library.append('SYMBOL', data)

So I first initialize the library, and then run the above code and repeatedly stop the process with Ctrl+Z. Eventually the data will become corrupted (for me it takes ~10 times or so of stopping before corruption, but would depend on your system/hardware). Here is a sample output from running the above code:

[65]+  Stopped                 python arctic-bug2.py
[root@20f6c80ae2a5 tmp]# python arctic-bug2.py 
^Z
[66]+  Stopped                 python arctic-bug2.py
[root@20f6c80ae2a5 tmp]# python arctic-bug2.py 
^Z
[67]+  Stopped                 python arctic-bug2.py
[root@20f6c80ae2a5 tmp]# python arctic-bug2.py 
^Z
[68]+  Stopped                 python arctic-bug2.py
[root@20f6c80ae2a5 tmp]# python arctic-bug2.py 
^Z
[69]+  Stopped                 python arctic-bug2.py
[root@20f6c80ae2a5 tmp]# python arctic-bug2.py 
Traceback (most recent call last):
  File "arctic-bug2.py", line 24, in <module>
    library.append('SYMBOL', data)
  File "/usr/local/lib/python2.7/site-packages/arctic/chunkstore/chunkstore.py", line 538, in append
    self.__update(sym, item, metadata=metadata, combine_method=SER_MAP[sym[SERIALIZER]].combine, audit=audit)
  File "/usr/local/lib/python2.7/site-packages/arctic/chunkstore/chunkstore.py", line 447, in __update
    df = self.read(symbol, chunk_range=chunker.to_range(start, end), filter_data=False)
  File "/usr/local/lib/python2.7/site-packages/arctic/chunkstore/chunkstore.py", line 301, in read
    return deser(chunks[symbol[0]], **kwargs) if skip_filter else chunker.filter(deser(chunks[symbol[0]], **kwargs), chunk_range)
  File "/usr/local/lib/python2.7/site-packages/arctic/serialization/numpy_arrays.py", line 219, in deserialize
    df = pd.concat([self.converter.objify(d, columns) for d in data], ignore_index=not index)
  File "/usr/local/lib/python2.7/site-packages/arctic/serialization/numpy_arrays.py", line 147, in objify
    d = decompress(doc[DATA][doc[METADATA][LENGTHS][col][0]: doc[METADATA][LENGTHS][col][1] + 1])
  File "/usr/local/lib/python2.7/site-packages/arctic/_compression.py", line 135, in decompress
    return lz4_decompress(_str)
_block.LZ4BlockError: Decompression failed: corrupt input or insufficient space in destination buffer. Error code: 1774823
[root@20f6c80ae2a5 tmp]# python arctic-bug2.py 
Traceback (most recent call last):
  File "arctic-bug2.py", line 24, in <module>
    library.append('SYMBOL', data)
  File "/usr/local/lib/python2.7/site-packages/arctic/chunkstore/chunkstore.py", line 538, in append
    self.__update(sym, item, metadata=metadata, combine_method=SER_MAP[sym[SERIALIZER]].combine, audit=audit)
  File "/usr/local/lib/python2.7/site-packages/arctic/chunkstore/chunkstore.py", line 447, in __update
    df = self.read(symbol, chunk_range=chunker.to_range(start, end), filter_data=False)
  File "/usr/local/lib/python2.7/site-packages/arctic/chunkstore/chunkstore.py", line 301, in read
    return deser(chunks[symbol[0]], **kwargs) if skip_filter else chunker.filter(deser(chunks[symbol[0]], **kwargs), chunk_range)
  File "/usr/local/lib/python2.7/site-packages/arctic/serialization/numpy_arrays.py", line 219, in deserialize
    df = pd.concat([self.converter.objify(d, columns) for d in data], ignore_index=not index)
  File "/usr/local/lib/python2.7/site-packages/arctic/serialization/numpy_arrays.py", line 147, in objify
    d = decompress(doc[DATA][doc[METADATA][LENGTHS][col][0]: doc[METADATA][LENGTHS][col][1] + 1])
  File "/usr/local/lib/python2.7/site-packages/arctic/_compression.py", line 135, in decompress
    return lz4_decompress(_str)
_block.LZ4BlockError: Decompression failed: corrupt input or insufficient space in destination buffer. Error code: 1774823

At this point, it's fully corrupted and you can no longer read nor write

bmoscon commented 5 years ago

ok, that should not happen. I can look into that and fix it. I'll let you know

jamesblackburn commented 5 years ago

@ddebie also try VersionStore (the default) which is what we use in production here. It shouldn’t be possible to corrupt the symbol with ctrl-C etc.

bmoscon commented 5 years ago

@jamesblackburn i found the issue. bulk operations are not atomic in mongodb. The individual operations are, but if it dies in the middle, it will not roll back the previous ones. Not sure how to handle this, other than to write data before the bulk write to internal metadata. on arctic start up we could check to see if the metadata written looks to match what was actually written and remove invalid/corrupt chunks if necessary?

streamtv85 commented 5 years ago

Hi, are there any plans on fixing this? This happens to me from time to time when using ChunkStore. And to cure this, I have to delete all the data for symbol, not just the last chunk. Any workarounds known to cure this without deleting the whole symbol?

bmoscon commented 5 years ago

yes I do plan on fixing it. You can delete the last chunk, but you'll also need to update the metadata for the symbol as well, otherwise it will think there is data for a chunk that doesn't exist

streamtv85 commented 5 years ago

the problem is that when trying to delete the last chunk I am getting the same LZ4BlockError but I do not get the error when deleting the whole symbol The easiest way to recover data seemed to me:

The only problem is that it may take much time for large sets of data. Is there more efficient way to recover the broken symbol?

bmoscon commented 5 years ago

try using delete(self, symbol, chunk_range=None) with chunk_range set to the appropriate range

gambitvk commented 5 years ago

Hi Guys, thanks for opening up this work. I am also interested in using ArcticDB for our work to handle market data, just wondering if there is a time frame of when this issue could be fixed? It will make selling Arctic to my boss a much easier job.

cheers

yschimke commented 5 years ago

@shashank88 Do you think testing with Forward pointers will make any difference here? Does it make it safer in single writer with concurrent readers mode?

bmoscon commented 5 years ago

I dont think the forward pointers would do anything. The issue is that data is being written to two collections in two separate transactions (or in some cases, you might have multiple updates to a collection in separate transactions). If you kill the process in the middle of this, bad things can happen.

shashank88 commented 5 years ago

Yeah, I don't think forward pointer will help solve this based on what @bmoscon said. I haven't dug into this issue till now, will take a look

gambitvk commented 5 years ago

Sorry I am a bit new to Mongo, just a quick thought base on exp form other DBMS .. we normally will try to use a transaction to group anything that needs to be atomic and looks like Mongodb do provide such facility. Could this be used to solve this problem?

Thanks Guys, keen to see this get fixed!!

https://docs.mongodb.com/manual/core/transactions/

// Start a session. session = db.getMongo().startSession( { readPreference: { mode: "primary" } } );

employeesCollection = session.getDatabase("hr").employees; eventsCollection = session.getDatabase("reporting").events;

// Start a transaction session.startTransaction( { readConcern: { level: "snapshot" }, writeConcern: { w: "majority" } } );

// Operations inside the transaction try { employeesCollection.updateOne( { employee: 3 }, { $set: { status: "Inactive" } } ); eventsCollection.insertOne( { employee: 3, status: { new: "Inactive", old: "Active" } } ); } catch (error) { // Abort transaction on error session.abortTransaction(); throw error; }

// Commit the transaction using write concern set at transaction start session.commitTransaction();

session.endSession();

bmoscon commented 5 years ago
  1. Mongo transactions are only supported in 4.0+. Adding this would require dropping support for all previous versions. Not sure thats do-able.
  2. It requires a replica set. No other setup is supported. Eventually they'll support transactions on sharded clusters, but they do not currently. The requirement for a specific configuration is likely a dealbreaker.
jamesblackburn commented 5 years ago

I think there may be a bit of confusion here regarding the different store types. VersionStore (the default store type) and TickStore should not experience this issue.

In VersionStore we deliberately write chunks (which are immutable) before writing the metadata documents i.e. publishing the data.

We have tests for incomplete writes and concurrent reads and writes: https://github.com/manahl/arctic/blob/master/tests/integration/test_arctic_multithreading.py#L47

In VersionStore there is an edge cases with large sharded clusters where, if a mongod crashes and you're Arctic library only has single mongod consistency (w=1) and a data rollback occurs the latest version can have the wrong number of chunks. In the case of sharded clusters you should use majority write concern to guarantee resilience.

bmoscon commented 5 years ago

sorry, I thought this was already clear, but maybe it wasnt. This 100% only affects chunkstore. I'm working on a fix now, should have it fixed within a week or two. I'll likely only fix the data corruption issue (i.e. you write, and the write dies in the middle). Writing and reading concurrently is much harder to fix and I'll address that later

yschimke commented 5 years ago

@bmoscon confusion probably comes from me, sorry.

bmoscon commented 5 years ago

@yschimke no worries - wasnt blaming anyone. the title now says [ChunkStore] so I think we should be a-ok :D

bmoscon commented 5 years ago

ok, i have a fix that looks like it will work, doing some more testing. Basically it creates its own transaction record, does the write, and then removes the transaction, if the transaction is present during a read or a write, it will raise an error (and can be self corrected (i..e remove the partially written data)).

michael-stefanski commented 5 years ago

Did you commit the fix? Is it in another branch? I have this issue in production and I can help with more testing.

bmoscon commented 5 years ago

https://github.com/manahl/arctic/tree/chunkstore-transactions

havent had time to test the clean up portion of it

PatternMatching commented 5 years ago

With Mongo 4.2, it appears as though the applicability of distributed transactions across the different kinds of configurations (sharded vs. replica) has become homogeneous.

MongoDB Documentation Reference

Agree that dropping support for versions of Mongo prior to 4.2 would be problematic, but this still seems like the most robust solution for this issue, which is a pretty huge problem at scale when you are dealing with concurrency, as well as replication lag (it would appear). I know it is fairly ugly in practice, but is it possible that coding around Mongo version to prevent a breaking change would be considerably more expensive than no backwards compatibility in the context of the utility the fix would bring to Chunkstore?

harryd31 commented 5 years ago

Yea I definitely do agree with @PatternMatching , with coding around it. It would be such a great help for this issue.

flamby commented 3 years ago

Hi,

Suffering a lot of the issue of writing and reading at the same time once I reduce storage_interval below 15 seconds, I wonder if the chunkstore-transactions branch would have fixed that issue? (mongodb sharding and tuning/scaling don't help in that regard).

Or do you plan to support Mongo 4.2 transaction sometimes? which sounds a better and simpler fix.

Thanks