Open dusktreader opened 8 years ago
I've just verified that this occurs when using tornado gen.couroutines as well
Hmmm, I've haven't looked closely but this seems a bit strange. The error is coming from Titan. I'll take a closer look tomorrow if you haven't figured it out.
Yeah, I've verified that it seems to be happening in Titan:
revised source:
#!/usr/bin/env python
from tornado import gen
from tornado.ioloop import IOLoop
from goblin import properties, connection
from goblin.models import Vertex
class MyVertex(Vertex):
prop1 = properties.Integer()
connection.setup('ws://localhost:8182')
@gen.coroutine
def clear():
result = yield connection.execute_query('g.V().drop().iterate()')
stream = yield result.read()
return stream
@gen.coroutine
def create(i):
result = yield MyVertex.create(prop1=i)
return result
@gen.coroutine
def find(vid):
result = yield MyVertex.get(vid)
return result
@gen.coroutine
def find_all(n):
result = yield connection.execute_query(
'g.V().hasLabel(x).limit(n)',
bindings={
'x': MyVertex.get_label(),
'n': n,
},
)
stream = yield result.read()
return stream
loop = IOLoop.current()
def smash(n):
print("Clearing old vertices")
loop.run_sync(clear)
print("Smashing {} vertices".format(n))
for i in range(n*2):
v = loop.run_sync(lambda: create(i))
loop.run_sync(lambda: find_all(n))
n = 60
while True:
smash(n)
n += 1
Output:
$ bin/smash2
Clearing old vertices
Smashing 60 vertices
Clearing old vertices
Smashing 61 vertices
Clearing old vertices
Smashing 62 vertices
Clearing old vertices
Smashing 63 vertices
Clearing old vertices
Smashing 64 vertices
Clearing old vertices
Smashing 65 vertices
Traceback (most recent call last):
File "bin/smash2", line 53, in <module>
smash(n)
File "bin/smash2", line 49, in smash
loop.run_sync(lambda: find_all(n))
File "/Users/tbeck/.virtualenvs/czar/lib/python3.5/site-packages/tornado/ioloop.py", line 453, in run_sync
return future_cell[0].result()
File "/Users/tbeck/.virtualenvs/czar/lib/python3.5/site-packages/tornado/concurrent.py", line 232, in result
raise_exc_info(self._exc_info)
File "<string>", line 3, in raise_exc_info
File "/Users/tbeck/.virtualenvs/czar/lib/python3.5/site-packages/tornado/gen.py", line 1014, in run
yielded = self.gen.throw(*exc_info)
File "bin/smash2", line 39, in find_all
stream = yield result.read()
File "/Users/tbeck/.virtualenvs/czar/lib/python3.5/site-packages/tornado/gen.py", line 1008, in run
value = future.result()
File "/Users/tbeck/.virtualenvs/czar/lib/python3.5/site-packages/tornado/concurrent.py", line 232, in result
raise_exc_info(self._exc_info)
File "<string>", line 3, in raise_exc_info
RuntimeError: 599 Error during serialization: Could not execute operation due to backend exception (through reference chain: java.util.ArrayList[0]
A good place to check out this kind of stuff is in the Gremlin-Users and Thinkaurelius google groups. It seems like this problem has happened to other people. Building and updating to Titan 1.1 seems to fix: https://groups.google.com/forum/#!topic/gremlin-users/H5IlmZ4Zgyc
I can't reproduce this on my machine. But, I have confirmed that your script continues to run ad infinitum with Titan 1.1. I also noticed a drastic performance increase when upgrading.
Thanks for the heads up. I'll try the newer version
@asyncio.coroutine def clear(): result = yield from connection.execute_query('g.V().drop().iterate()') stream = yield from result.read() return stream
It looks like you have chained two futures together. The first future returned a gremlinclient.connection.Stream when resolved. Each successive read() would then give you another future which then resolves as a namedtupe("Message", ["status_code", "data", "message", "metadata"]).
If your results have more than 64 records, you would see stream.status_code equals to 206 (Partial Content). This means each call of Stream.read() would only give you 64 records. You need to continuously call read() until it resolves to None.
If I am correct, you could try to break the two calls into two coroutines and call the second continuously and see if that works.
The following example should work.
@gen.coroutine
def execute_query_async(query):
resp = yield connection.execute_query(query)
raise gen.Return(resp)
@gen.coroutine
def fetch_result_async(resp):
msg = yield resp.read()
raise gen.Return(msg)
result = []
ioloop = IOLoop.current()
resp = ioloop.run_sync(lambda: execute_query_async("g.V()"))
msg = ioloop.run_sync(lambda: fetch_result_async(resp))
while msg:
result.extend(msg.data)
msg = ioloop.run_sync(lambda: fetch_result_async(resp))
This 64-records partial response is a gremlin server config parameter and it can be changed in the corresponding yaml file such as gremlin-server.yaml:
resultIterationBatchSize: 64
Ahh makes sense @wangxiaoyu thanks for the input! Unless there are objections I will go ahead and close this issue...
I found out the correct way to chain coroutines, and the issue resolved. Thanks for all the help.
Please help with this problem! @wangxiaoyu answer is not working for me. I still getting only 64 :(
here is django code:
def all(request, element_slug, for_django_view=None):
#data = vertex.all()
# do()
try:
result = loop.run_sync(el_all)
# print ("foloo",type(followers),dir(followers[0]),followers[0].as_dict())
finally:
# connection.tear_down()
# loop.close()
pass
data = [o.as_dict() for o in result]
print ("GET_ELEMENT data", data)
response = HttpResponse(
json.dumps(data, ensure_ascii=False),
content_type='application/json',
)
if for_django_view:
return data
return response
Hi @blackboxx. I'm not sure I understand the code sample you posted. To retrieve all vertices for a model class, the standard pattern would be something like (untested, based on docs):
def get_all_users():
users = []
stream = yield from User.all() # or any defined model
# read until stream returns ``None``
while True:
resp = yield from stream.read()
if resp is None:
break
users += resp
return users # this could be something like raise gen.Return(users) if using Python 2.7 w/Tornado
users = loop.run_sync(get_all_users) # In Django view
Let me know if that helps.
Hello @davebshow and all!. My main problem was that I got only 64 items, instead all items. Your example is not working for me. If use your example, i get error
_raise BadYieldError("yielded unknown object %r" % (yielded,)) tornado.gen.BadYieldError: yielded unknown object <generator object elall at 0x104c71bf8>
If I wrap 'get_all_users' function with @gen.corutinne, i get another error:
TypeError: 'Future' object is not iterable
But here is my example and it working, but with a problems:
@gen.coroutine
def el_all():
all_elements = []
ea = El.all()
stream = yield ea
resp = []
while True:
try:
print("READING")
resp = yield stream.read()
print("OK READING")
except Exception:
pass
if resp is None:
print("!!!!RESP IS NONE", len(all_elements))
break
all_elements += resp
return all_elements
As a result im geting now not only 64 but different number, but not AGAIN not real number of objects, here is my terminal prints:
READING OK READING READING READING OK READING !!!!RESP IS NONE 128 GET_ELEMENT data 128
READING OK READING READING OK READING READING OK READING !!!!RESP IS NONE 87 GET_ELEMENT data 87
READING READING OK READING !!!!RESP IS NONE 0 GET_ELEMENT data 0
READING OK READING READING OK READING READING OK READING READING OK READING !!!!RESP IS NONE 151 GET_ELEMENT data 151
READING OK READING READING READING OK READING !!!!RESP IS NONE 128 GET_ELEMENT data 128
So tears on my eyes. Please help.
Welcome to the async world and happy debugging.
Since each of your resp is a future, they would resolve out of order. If you want to collect all of them, you need to make sure all of them have resolved and be added to all_elements
before break out of the loop.
run1 and 5: resp1 64 records, resp2 64 records, ... respFinal is None, while resp3 is not resolved. Then you break the loop, you got 128 records.
run2: resp1 64 records, resp3 have 23 records, respFinal is None, resp2 is not resolved yet. 64+23=87 records.
run3: respFinal is None, All others are not resolved. 0 records.
run4: resp1 64 records, resp2 64 records, resp3 23 records, respFinal is None. 64+64+23=151 records.
And I also doubt that the content of the resp might get overwritten in run1 or 5 since the number of the print out "OK READING"
doesn't match the length of all_elements
Honestly I don't know why my example code is not working for you in your first post. However I doubt one call to run_sync()
may not cut it since it is the result set that are sent back asynchronously and need to be read() over and over. So if you would format your code a little cleaner I could take another look.
Hello @wangxiaoyu, I formatted code so it looks better. Please explain more, how you fetching all elements.
I simplify your code
while True:
resp = yield stream.read()
if resp is None:
break
all_elements += resp
It is a lot like yield from
except that it doesn't handle async correctly. There is nothing you can do about this since you couldn't use yield from
which is available since Python 3.3 per PEP 380. This left you maybe two choices:
yield stream.read()
with gen.corountine
and call it synchronized so that you could handle each 64 records response one by one and know exactly when to stop. Pretty much like my example, which of course offsets the performance benefit of async.None
AND 2) all responses submitted prior to that one are processed. Unfortunately there is no easy way to do that since you need to keep track of all resp
that were submitted and you also need to control how many resp
are submitted before you meet the stop condition. You don't need to yield
but instead add all your stream.read()
response (which may still be a future) into the a Tornado.queue, then process the queue to the final result set and make sure all are resolved (some of which would be None
if there is no more record. Remember async, right?). At the top iteration level, you may not want to use a while loop and wait for the first resp to be None
since you could submit too many read()
. Either you know how many result you want (paging, which is preferred), or at least have some control of async.@wangxiaoyu VERRY BIG THANK YOU! My brains now are working. I will write my working Example when create some.
Hello @wangxiaoyu and all people. I'd really want to get all vertices, but still not able to do this. Please help with working example.
@blackboxx, sorry to take so long getting back to you. What version of python are you using?
@blackboxx , in your example code in this comment, I notice you've wrapped the call to read
in a try/except. Had you encountered exceptions? What happens if you print the exception in the except block, instead of passing?
@leifurhauks , im using python 3.5, and my problem is i getting 0, 64, 64 128 vertexes, instead of all
Can you show me the output if you adjust your el_all
code to print the exception in the except block? I'm investigating this as a possible bug, but it's proving difficult to reproduce.
It would also help to know a bit more about your configuration:
conf/gremlin-server/gremlin-server.yaml
)I'm very busy outside work today / this weekend, but I will keep looking into this on Monday.
@blackboxx , I think the way your loop terminates before all vertices have been processed may be caused by a bug in goblin. I would like to isolate and fix it, but it would really help to have the information I mentioned about your setup, so I can try to reproduce your results.
Thanks for your input so far. Goblin is still in alpha stage, and we really appreciate feedback that helps us identify and fix problems.
I am still a bit confused by this issue...I can run the following script using Python 2.7 and Titan 1.0 with the default config:
from tornado import gen
from tornado.ioloop import IOLoop
from goblin import connection
from goblin.models import Vertex
from goblin.properties import Integer
class TestVertex(Vertex):
number = Integer()
@gen.coroutine
def doit():
results = []
for i in range(100):
yield TestVertex.create(number=i)
stream = yield TestVertex.all()
while True:
msg = yield stream.read()
if msg is None:
break
results += msg
raise gen.Return(results)
if __name__ == '__main__':
connection.setup('ws://localhost:8182')
loop = IOLoop.current()
resp = loop.run_sync(lambda: doit())
print("got {0} vertices".format(len(resp)))
connection.tear_down()
Which outputs:
got 100 vertices
Occasionally I see the backend error:
RuntimeError: 599 Error during serialization: Could not execute operation due to backend exception (through reference chain: java.util.ArrayList[0]
However, upon building Titan 1.1 from the Github repo, I don't see this error at all, and can run this script repeatedly creating nodes in multiples of 100 without seeing any error...
@blackboxx we really need to see the exception that's being caught in your example code before we can proceed any further.
+1 @leifurhauks
I believe that it is the RuntimeError: 599 Error during serialization: Could not execute operation due to backend exception (through reference chain: java.util.ArrayList[0]
error that is causing this inconsistency, which is a known bug in Titan 1.0 as discussed here: https://groups.google.com/forum/#!topic/gremlin-users/H5IlmZ4Zgyc
I've found that the all() method raises an exception dependably when there are more than 64 vertices of one type in the graph db. I stripped my code down to the bare minimum and created an executable python script to demonstrate the issue
Here is the source code
And here is the output:
Regardless of how many This is a serious issue because it is preventing me from using goblin at the moment until this has been resolved.