Closed muxueqz closed 11 years ago
这个问题与 DPark 无关,是你的使用 Mongodb 的代码的问题,只要 db 对象是正确pickle的,就可以在DPark 中正常使用。
save_to_mongo() 函数里为啥要调用 disconnect() ? db 是全局对象,断开后就没法再用了吧
disconnect()删除掉也一样,这是我先前想每次insert建立一次连接,insert完再关闭。
这么说,是pymongo的问题? 哪个数据库是可以pickle的呢?我测试了一下,sqlite3/cx_Oracle都不行
试试这样: https://gist.github.com/davies/5062304
如果某些对象不能Pickle,可以把初始化代码放到函数里面, 为了避免每一行都初始化一次,可以先 .glom() 再 map(), 或者用 mapPartition()
这样能缓解一些,但偶尔还是会有"ConnectionFailure: could not connect to x.x.x.x:27017: [Errno 99] Cannot assign requested address" 一直不太明白glom和mapPartition的作用,但我想你的思路是把一行一行的数据zip一下?
请问glom的作用是什么呢? "将self中的每一块作为一个元素,并组合成一个新的RDD后返回"没看明白,在测试代码可以跑,但实际项目中的代码却运行失败。
Traceback (most recent call last): File "/opt/17173/logv3/lib/python2.7/site-packages/DPark-0.1-py2.7.egg/dpark/schedule.py", line 342, in run_task result = task.run(aid) File "/opt/17173/logv3/lib/python2.7/site-packages/DPark-0.1-py2.7.egg/dpark/task.py", line 52, in run return self.func(self.rdd.iterator(self.split)) File "/opt/17173/logv3/lib/python2.7/site-packages/DPark-0.1-py2.7.egg/dpark/rdd.py", line 189, in mf f(i) File "pv_day.py", line 353, in db_reduce key, line = lines ValueError: too many values to unpack
一个RDD 相当于两层list, 即 [[x,x,x], [x,x,], ], mapPartition() 即把整个 split 总所有的记录当做一个对象传给参数中的函数,返回值要求是一个list, 用来组成新的 RDD的一个 Split。
RDD([Split1(x1,x2), Split2(x3,x4)]).mapPartition(func) = RDD(func(Split1(x1,x2)), func(Split2(x3,x4)))
glom() == mapPartition(lambda x: [x]), 即把原来 RDD 中的每个 Split 变成新的 RDD 中的一个元素。
代码在 https://gist.github.com/5062200
运行后报错 TypeError: 'Database' object is not callable. If you meant to call the 'getnewargs' method on a 'Connection' object it is failing because no such method exists.
暂时只想到加个代理层(比如nginx + lua做个http接口)来使用mongodb,不知有没有更好的办法?