wbolster / plyvel

Plyvel, a fast and feature-rich Python interface to LevelDB
https://plyvel.readthedocs.io/
Other
529 stars 75 forks source link

multiprocessing, itertools.groupby, and plyvel: pickle error #73

Closed fasiha closed 6 years ago

fasiha commented 6 years ago

I may be hitting a very unusual niche here but I'm encountering a pickling error when trying to parallelize a plyvel iterator using itertools.groupby (which uses a group function to break an iterable into another iterable that produces (group key, iterable) (the linked docs have a nice analogy, groupby is like uniq)).

If I don't use groupby, I have no problems: I can combine multiprocessing with plyvel's iterable. But as soon as I introduce a groupby I encounter

TypeError: self._iter,self.comparator,self.start_slice,self.stop_slice cannot be converted to a Python object for pickling

Complete example (sorry, this isn't pep8-compliant since I removed some newlines to make it a bit shorter):

# create a test database
import plyvel
db = plyvel.DB('/tmp/testdb', create_if_missing=True)

for path in ['aaa', 'bb', 'c']:
    for n in range(4):
        db.put(bytes(path + '/' + str(n), 'utf8'), b'1')

# iterate over all keys with a process pool: this is fine

from multiprocessing import Pool
simpleiter = db.iterator(include_value=False)
def simplefun(key):
    print(key)
    return key

with Pool(2) as p:
    for x in p.imap_unordered(simplefun, simpleiter):
        print('done')

# Now, the problem:

import itertools as it
def grouper(key):
    return key[:key.find(b'/')]

groupiter = it.groupby(db.iterator(include_value=False), grouper)

def groupedfun(keyIter):
    key, iterator = keyIter
    print(key)
    return len(list(iterator))

with Pool(2) as p:
    for x in p.imap_unordered(groupedfun, groupiter):
        print('done')

Stacktrace:

Traceback (most recent call last):
  File "baz.py", line 45, in <module>
    for x in p.map(groupedfun, groupiter):
  File "/Users/f/.pyenv/versions/3.6.4/lib/python3.6/multiprocessing/pool.py", line 266, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/Users/f/.pyenv/versions/3.6.4/lib/python3.6/multiprocessing/pool.py", line 644, in get
    raise self._value
  File "/Users/f/.pyenv/versions/3.6.4/lib/python3.6/multiprocessing/pool.py", line 424, in _handle_tasks
    put(task)
  File "/Users/f/.pyenv/versions/3.6.4/lib/python3.6/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/Users/f/.pyenv/versions/3.6.4/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
  File "stringsource", line 2, in plyvel._plyvel.Iterator.__reduce_cython__
TypeError: self._iter,self.comparator,self.start_slice,self.stop_slice cannot be converted to a Python object for pickling

(Note that I don't think my toy groupedfun function is problematic, since the follow straight map works fine:

groupiter = it.groupby(db.iterator(include_value=False), grouper)
for x in map(groupedfun, groupiter):
    print('straight map', x)

But trying to do this with multiprocessing leads to the error.)

I tried the above using concurrent.futures, joblib, and multiprocessing.Pool.map, but I cannot parallelize a groupby version of plyvel iterable. Any hints?

Again, I love plyvel, thanks for your hard work!

wbolster commented 6 years ago

this is not a bug in plyvel. actually, your code is incorrect and i think it can even lead to data loss.

leveldb is not multi-process safe, as its docs clearly state:

A database may only be opened by one process at a time. — https://github.com/google/leveldb/blob/master/doc/index.md#concurrency

fasiha commented 6 years ago

If I can impose on your generosity some more by asking a noob question—notice in the code above how the database is only opened in the main process once, and not inside the process pool. So the image in my mind is that the main process generates the stream of keys which would be sent to the process pool via inter-process communication, thereby preserving the process-safety of the LevelDB. Is this image wrong? Looking at the error message again, it's clear that Python is trying to pickle a Plyvel object, and I'd assumed it was an object representing database contents, but perhaps it's trying to pickle the entire iterable?

I'll dig some more around the semantics of the multiprocessing module and if all else fails, manually convert chunks of the iterable into lists and send them to the process pool like in the imagine I had in my mind.

Thank you 🙇‍♂️!

fasiha commented 6 years ago

From the multiprocessing docs,

When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe. This has some consequences which are a little surprising, but should not cause any practical difficulties – if they really bother you then you can instead use a queue created with a manager.

Maybe objects on the iterables generated by itertools.groupby (from operating on the original Plyvel iterable) somehow run afoul of this and I'll try their advice.

wbolster commented 6 years ago

both reading from and writing to the database must happen in the ‘parent’ process.

itertools.groupby() returns special ‘grouper’ objects, which are wrappers around the underlying iterators, which in plyvel's case would be leveldb iterators which cannot be shared between processes.

you may consume each group into an actual list though:

it = db.iter(...)
it = itertools.groupby(it, ...)
it = (list(group) for key, group in it)

pool.map(f, it)
fasiha commented 6 years ago

Just to make sure I understand, this involves loading the whole Plyvel iterable (and thus the whole LevelDB database) into memory before using multiprocessing, yes? Certainly that's the logical conclusion to this idea of "read some entries from LevelDB, send them to the appropriate process, repeat", where "some"→"all" :)

A final question—is it wrong to use multiprocessing with just a plain db.iterator (no groupby), as I showed in my example, the one that didn't throw a pickling error? Recall that was this example:

from multiprocessing import Pool
simpleiter = db.iterator(include_value=False)
def simplefun(key):
    print(key)
    return key
with Pool(2) as p:
    for x in p.imap_unordered(simplefun, simpleiter):
        print('done') # works fine

Is this exposing my database to multiple processes, with the potential for data loss/race conditions/etc.? (I'm asking if basically my mental image of Plyvel getting a value from LevelDB and giving it to the Python pool manager is still accurate.)

Thanks for bearing with my many questions and incomplete understanding!

fasiha commented 6 years ago

Ah, I think I see more clearly now what you were suggesting! The grouper object returned by groupby can be converted into an iterable that hydrates each sub-iterable, so it wouldn't load the full database into memory, just keyvals that belonged to that group! Very cool, that does seem to work! (Now if only my grouped chunks weren't 10 GB+!)

wbolster commented 6 years ago

exactly! the db.iterator is lazy, the groupby is lazy, the group is lazy but it is turned into a list. so each chunk is now a normal list, which can cross process boundaries so pool.imap can be used.