rainwoodman / sharedmem

A different flavor of multiprocessing in Python
GNU General Public License v3.0
81 stars 9 forks source link

can it suport pandas.DataFrame or dict?how to apend the data in subprocess? #6

Closed dindom999 closed 8 years ago

dindom999 commented 8 years ago

I need to read more than 20,000 DataFrame files(in /dev/shm/) and combine then to one dict. now it work like this:


def loaddata():
        _tick_dict_by_symbol={}
        _path='/dev/shm/tdata/'
        fls=os.listdir(_path)
        for x in fls:
            fn=os.path.join(_path,x)
            with open(fn,'rb') as f:
                tlist=pickle.load(f)
                for y in tlist.keys():
                    if not _tick_dict_by_symbol.has_key(y):
                        _tick_dict_by_symbol[y]=pd.DataFrame()
                    _tick_dict_by_symbol[y]=_tick_dict_by_symbol[y].append(tlist[y],ignore_index=True)

its running very slow now in server with 16 cores and 40G memory. so I hope to change to multiprocess. But don't how to share big data(about 3G) between porcess,until get here. I read your doc,seems like sharedmem do not suport dict or dataframe,am i right?

rainwoodman commented 8 years ago

You are right. sharedmem creates a shared buffer that is then viewed as a numpy array, and provides an interface for looping. But dealing with unstructured (or poorly strcutured) data, like DataFrame or dict is beyond what buffer can do.

On the other hand, seems like you are dealing with serious amount of data!

Is it homogeneous structural numerical data? If so, it may be worthwhile to think about using raw binary data and numpy array as the fundamental storage, because these are much easier (and faster!) to manipulate than DataFrame or dict, and can always be cast to latter if necessary. Also, sharedmem will 'likely' be able to help you in that case.

dindom999 commented 8 years ago

the raw date struct is a transaction list:

[
{'itemname':str,'bid1':float,'bid2':float,'bid3':float,'bid1amount':float,'bid2amount':float,'bid3amount':float,'dealprice':float,'time':int}

....
]

they are numbers except 'itemname'

I recieve about 3000-5000 items each second,and I need to change them to be groupby "itemname" immediately(that why I store it in memory not database,and need multiprocess) after that, the new data struct is a dict,like this(or else if better to share):


{'itemname':
 ['bid1','bid2','bid3','bid1amount','bid2amount','bid3amount','dealprice','time']
......
}
rainwoodman commented 8 years ago

Relational database with numpy arrays.

I hope this should shed some light on how to do this without DataFrames.

First, define numpy array of dtype:

dtype = numpy.dtype([
('itemname', 'S10'), 
('itemid', 'i8'),
('bid1', 'f4'), 
('bid2', 'f4'),
('bid3', 'f4'),
('bid1amount', 'f4'),
('bid2amount', 'f4'),
('bid3amount', 'f4'),
('dealprice', 'f4'),
('time', 'i8'),
])

Then sort your data by 'itemname'.

Then use numpy.unique to convert itemname to labels (return_inverse=True), store that to 'itemid'

For catagory statistics, Detect edges in labels with something like (data['itemid'][1:] != data['itemid'][:-1]).nonzero(). That would be where you do numpy.add.reduceat on the column you are interested in.

You may want to refer to some code I put up together for additional inspirations.

https://github.com/rainwoodman/npyquery/blob/master/numdb.py#L39

dindom999 commented 8 years ago

Thanks for your replay! sorry for I didn't point out my key problem:how to apend the data in subprocess? with your sharedmem lib changing data struct to numpy array is easy.but I don't know how to share the whole data in multiprocessing,because of GIL.

My system has 2 jobs(process): 1)getting raw data every second and group them by itemname,and then append the grouped data to the whole data just like what I describe before. 2)counting the whole data every data every 5 second,update the result to a web-server,so that client can querying anytime.

because 3000-5000 new item data will be append each second, a day's whole data increasing big to 3G,It's slow when I try mongodb or pickled file at /dev/shm.

I hope the main process hold the whole data ,subprocess A group new data and append them direct to the whole data.and subprocess B can count the same instance of the whole data .

Can sharedmem lib do that ? I study your example:

>>> input = numpy.arange(1024 * 1024 * 128, dtype='f8')
>>> output = sharedmem.empty(1024 * 1024 * 128, dtype='f8')

It make me confusing: Is var output has to be fixed all the time? can not append?

dindom999 commented 8 years ago

I run test code:

#npd is a big transaction data(np.narray)
In [235]: len(npd)
Out[235]: 4017865

In [236]: shm_np=sharedmem.copy(npd)
In [237]: shm_np=np.concatenate([shm_np,shm_np[:2000]])
In [238]: len(shm_np)
Out[238]: 4019865
#so far so good,but..
In[239]: 
with sharedmem.MapReduce() as pool:
    def work(shm_np):
        with pool.critical:
            shm_np=np.concatenate([shm_np,shm_np[:2000]])
        return True
    pool.map(work, shm_np)

Traceback (most recent call last):

  File "<ipython-input-239-62094bdf28be>", line 6, in <module>
    pool.map(work, shm_np)

  File "/home/stock/anaconda2/lib/python2.7/site-packages/sharedmem/sharedmem.py", line 699, in map
    return [realreduce(realfunc(i)) for i in sequence]

  File "/home/stock/anaconda2/lib/python2.7/site-packages/sharedmem/sharedmem.py", line 695, in realfunc
    else: return func(i)

  File "<ipython-input-239-62094bdf28be>", line 4, in work
    shm_np=np.concatenate([shm_np,shm_np[:2000]])

IndexError: invalid index
rainwoodman commented 8 years ago
  1. Concatenating two shm doesn't give you a sharedmem array. numpy will make a copy as a regular array. If you look at the type of the result, it is no longer an anonymousmemorymap.
  2. Have you tried using numpy.tofile (after seeking to the end of file) to concatenate binary arrays at file system level? This is what I usually do if performance is critical.
  3. If you use tofile(), then you don't really need allocate any array with sharedmem.copy(). The synchronization (critical section) you can use regular python multiprocessing, or the one from sharedmem's MapReduce().
  4. It's unclear to me if it is easy for you to fetch the data from the child processes. A more reasonable set up is to have multiple buffers in your main process (who fetches the 3000/sec data), and whenever a buffer is full, spawn a child process with sharedmem.background to append data to the file in disk. Still, you may want to use a Lock from multiprocessing to protect the file. In main process, the buffer can be immediately freed because the child process has already got a copy via fork. This way the main process has a short choke up every time the buffer is full (making a fork), the response time is probably acceptable, given what you have described.
rainwoodman commented 8 years ago

BTW, the way you use map was wrong. Did you mean map(work, [shm])?

dindom999 commented 8 years ago

I tried joblib.dump last week,which similar to np.tofile() It works fine now,but not perfect,because the data will increase some day later. but I don't konw how to "concatenate binary arrays at file system level",maybe it worth to try!

%timeit npd_wholeday.tofile('/dev/shm/_npd.rnpy')

1 loops, best of 3: 913 ms per loop

from joblib import load, dump
%timeit dump(npd_wholeday,'/dev/shm/_npd.npy')

1 loops, best of 3: 899 ms per loop

BTW, the way you use map was wrong. Did you mean map(work, [shm])?

Sorry for my bad test. map should work for job2(count data) but not for job1(collect data),collecting data doesn't need multiprocess.

rainwoodman commented 8 years ago

To concatenate binary arrays with files, simply do this

with open('f.raw') as ff:
    numpy.arange(100).tofile(ff)

# and later,
with open('f.raw', 'ab') as ff:
    numpy.arange(100).tofile(ff)
dindom999 commented 8 years ago

Thanks a lot ! you are a nice guy!