aws-samples / amazon-neptune-samples

Samples and documentation for using the Amazon Neptune graph database service
MIT No Attribution
355 stars 142 forks source link

Cannot run the event loop while another loop is running #35

Open murphycrosby opened 4 years ago

murphycrosby commented 4 years ago

I had to build neptune_python_utils on python3.6.

Running this on SageMaker Jupyter.

neptune_endpoint = 'neptunecluster.cluster-cghdntee9kjh.us-east-1.neptune.amazonaws.com' neptune_port = 8182 neptune.clear(neptune_endpoint=neptune_endpoint, neptune_port=neptune_port)

clearing data... clearing property graph data [edge_batch_size=200, edge_count=Unknown]...

RuntimeError Traceback (most recent call last)

in () ----> 1 neptune.clear(neptune_endpoint=neptune_endpoint, neptune_port=neptune_port) ~/SageMaker/util/neptune.py in clear(self, neptune_endpoint, neptune_port, batch_size, edge_batch_size, vertex_batch_size) 60 def clear(self, neptune_endpoint=None, neptune_port=None, batch_size=200, edge_batch_size=None, vertex_batch_size=None): 61 print('clearing data...') ---> 62 self.clearGremlin(neptune_endpoint, neptune_port, batch_size, edge_batch_size, vertex_batch_size) 63 self.clearSparql(neptune_endpoint, neptune_port) 64 print('done') ~/SageMaker/util/neptune.py in clearGremlin(self, neptune_endpoint, neptune_port, batch_size, edge_batch_size, vertex_batch_size) 77 else: 78 print('clearing property graph data [edge_batch_size={}, edge_count={}]...'.format(edge_batch_size, edge_count)) ---> 79 g.E().limit(edge_batch_size).drop().toList() 80 edge_count = g.E().count().next() 81 has_edges = (edge_count > 0) ~/anaconda3/envs/python3/lib/python3.6/site-packages/gremlin_python/process/traversal.py in toList(self) 56 57 def toList(self): ---> 58 return list(iter(self)) 59 60 def toSet(self): ~/anaconda3/envs/python3/lib/python3.6/site-packages/gremlin_python/process/traversal.py in __next__(self) 46 def __next__(self): 47 if self.traversers is None: ---> 48 self.traversal_strategies.apply_strategies(self) 49 if self.last_traverser is None: 50 self.last_traverser = next(self.traversers) ~/anaconda3/envs/python3/lib/python3.6/site-packages/gremlin_python/process/traversal.py in apply_strategies(self, traversal) 571 def apply_strategies(self, traversal): 572 for traversal_strategy in self.traversal_strategies: --> 573 traversal_strategy.apply(traversal) 574 575 def apply_async_strategies(self, traversal): ~/anaconda3/envs/python3/lib/python3.6/site-packages/gremlin_python/driver/remote_connection.py in apply(self, traversal) 147 def apply(self, traversal): 148 if traversal.traversers is None: --> 149 remote_traversal = self.remote_connection.submit(traversal.bytecode) 150 traversal.remote_results = remote_traversal 151 traversal.side_effects = remote_traversal.side_effects ~/anaconda3/envs/python3/lib/python3.6/site-packages/gremlin_python/driver/driver_remote_connection.py in submit(self, bytecode) 53 54 def submit(self, bytecode): ---> 55 result_set = self._client.submit(bytecode) 56 results = result_set.all().result() 57 side_effects = RemoteTraversalSideEffects(result_set.request_id, self._client, ~/anaconda3/envs/python3/lib/python3.6/site-packages/gremlin_python/driver/client.py in submit(self, message, bindings) 109 110 def submit(self, message, bindings=None): --> 111 return self.submitAsync(message, bindings=bindings).result() 112 113 def submitAsync(self, message, bindings=None): ~/anaconda3/envs/python3/lib/python3.6/site-packages/gremlin_python/driver/client.py in submitAsync(self, message, bindings) 125 message.args.update({'bindings': bindings}) 126 conn = self._pool.get(True) --> 127 return conn.write(message) ~/anaconda3/envs/python3/lib/python3.6/site-packages/gremlin_python/driver/connection.py in write(self, request_message) 53 def write(self, request_message): 54 if not self._inited: ---> 55 self.connect() 56 request_id = str(uuid.uuid4()) 57 result_set = resultset.ResultSet(queue.Queue(), request_id) ~/anaconda3/envs/python3/lib/python3.6/site-packages/gremlin_python/driver/connection.py in connect(self) 43 self._transport.close() 44 self._transport = self._transport_factory() ---> 45 self._transport.connect(self._url, self._headers) 46 self._protocol.connection_made(self._transport) 47 self._inited = True ~/anaconda3/envs/python3/lib/python3.6/site-packages/gremlin_python/driver/tornado/transport.py in connect(self, url, headers) 34 url = httpclient.HTTPRequest(url, headers=headers) 35 self._ws = self._loop.run_sync( ---> 36 lambda: websocket.websocket_connect(url)) 37 38 def write(self, message): ~/anaconda3/envs/python3/lib/python3.6/site-packages/tornado/ioloop.py in run_sync(self, func, timeout) 569 self.stop() 570 timeout_handle = self.add_timeout(self.time() + timeout, timeout_callback) --> 571 self.start() 572 if timeout is not None: 573 self.remove_timeout(timeout_handle) ~/anaconda3/envs/python3/lib/python3.6/site-packages/tornado/platform/asyncio.py in start(self) 130 self._setup_logging() 131 asyncio.set_event_loop(self.asyncio_loop) --> 132 self.asyncio_loop.run_forever() 133 finally: 134 asyncio.set_event_loop(old_loop) ~/anaconda3/envs/python3/lib/python3.6/asyncio/base_events.py in run_forever(self) 410 if events._get_running_loop() is not None: 411 raise RuntimeError( --> 412 'Cannot run the event loop while another loop is running') 413 self._set_coroutine_wrapper(self._debug) 414 self._thread_id = threading.get_ident() RuntimeError: Cannot run the event loop while another loop is running
iansrobinson commented 4 years ago

Hi Murphy

Thanks for reporting this. I'll look into it over the next week.

Kind regards

ian

On Tue, 25 Feb 2020 at 13:46, Murphy Crosby notifications@github.com wrote:

I had to build neptune_python_utils on python3.6.

neptune_endpoint = ' neptunecluster.cluster-cghdntee9kjh.us-east-1.neptune.amazonaws.com' neptune_port = 8182 neptune.clear(neptune_endpoint=neptune_endpoint, neptune_port=neptune_port) clearing data... clearing property graph data [edge_batch_size=200, edge_count=Unknown]...

RuntimeError Traceback (most recent call last) in () ----> 1 neptune.clear(neptune_endpoint=neptune_endpoint, neptune_port=neptune_port)

~/SageMaker/util/neptune.py in clear(self, neptune_endpoint, neptune_port, batch_size, edge_batch_size, vertex_batch_size) 60 def clear(self, neptune_endpoint=None, neptune_port=None, batch_size=200, edge_batch_size=None, vertex_batch_size=None): 61 print('clearing data...') ---> 62 self.clearGremlin(neptune_endpoint, neptune_port, batch_size, edge_batch_size, vertex_batch_size) 63 self.clearSparql(neptune_endpoint, neptune_port) 64 print('done')

~/SageMaker/util/neptune.py in clearGremlin(self, neptune_endpoint, neptune_port, batch_size, edge_batch_size, vertex_batch_size) 77 else: 78 print('clearing property graph data [edge_batch_size={}, edge_count={}]...'.format(edge_batch_size, edge_count)) ---> 79 g.E().limit(edge_batch_size).drop().toList() 80 edge_count = g.E().count().next() 81 has_edges = (edge_count > 0)

~/anaconda3/envs/python3/lib/python3.6/site-packages/gremlin_python/process/traversal.py in toList(self) 56 57 def toList(self): ---> 58 return list(iter(self)) 59 60 def toSet(self):

~/anaconda3/envs/python3/lib/python3.6/site-packages/gremlin_python/process/traversal.py in next(self) 46 def next(self): 47 if self.traversers is None: ---> 48 self.traversal_strategies.apply_strategies(self) 49 if self.last_traverser is None: 50 self.last_traverser = next(self.traversers)

~/anaconda3/envs/python3/lib/python3.6/site-packages/gremlin_python/process/traversal.py in apply_strategies(self, traversal) 571 def apply_strategies(self, traversal): 572 for traversal_strategy in self.traversal_strategies: --> 573 traversal_strategy.apply(traversal) 574 575 def apply_async_strategies(self, traversal):

~/anaconda3/envs/python3/lib/python3.6/site-packages/gremlin_python/driver/remote_connection.py in apply(self, traversal) 147 def apply(self, traversal): 148 if traversal.traversers is None: --> 149 remote_traversal = self.remote_connection.submit(traversal.bytecode) 150 traversal.remote_results = remote_traversal 151 traversal.side_effects = remote_traversal.side_effects

~/anaconda3/envs/python3/lib/python3.6/site-packages/gremlin_python/driver/driver_remote_connection.py in submit(self, bytecode) 53 54 def submit(self, bytecode): ---> 55 result_set = self._client.submit(bytecode) 56 results = result_set.all().result() 57 side_effects = RemoteTraversalSideEffects(result_set.request_id, self._client,

~/anaconda3/envs/python3/lib/python3.6/site-packages/gremlin_python/driver/client.py in submit(self, message, bindings) 109 110 def submit(self, message, bindings=None): --> 111 return self.submitAsync(message, bindings=bindings).result() 112 113 def submitAsync(self, message, bindings=None):

~/anaconda3/envs/python3/lib/python3.6/site-packages/gremlin_python/driver/client.py in submitAsync(self, message, bindings) 125 message.args.update({'bindings': bindings}) 126 conn = self._pool.get(True) --> 127 return conn.write(message)

~/anaconda3/envs/python3/lib/python3.6/site-packages/gremlin_python/driver/connection.py in write(self, request_message) 53 def write(self, request_message): 54 if not self._inited: ---> 55 self.connect() 56 request_id = str(uuid.uuid4()) 57 result_set = resultset.ResultSet(queue.Queue(), request_id)

~/anaconda3/envs/python3/lib/python3.6/site-packages/gremlin_python/driver/connection.py in connect(self) 43 self._transport.close() 44 self._transport = self._transport_factory() ---> 45 self._transport.connect(self._url, self._headers) 46 self._protocol.connection_made(self._transport) 47 self._inited = True

~/anaconda3/envs/python3/lib/python3.6/site-packages/gremlin_python/driver/tornado/transport.py in connect(self, url, headers) 34 url = httpclient.HTTPRequest(url, headers=headers) 35 self._ws = self._loop.run_sync( ---> 36 lambda: websocket.websocket_connect(url)) 37 38 def write(self, message):

~/anaconda3/envs/python3/lib/python3.6/site-packages/tornado/ioloop.py in run_sync(self, func, timeout) 569 self.stop() 570 timeout_handle = self.add_timeout(self.time() + timeout, timeout_callback) --> 571 self.start() 572 if timeout is not None: 573 self.remove_timeout(timeout_handle)

~/anaconda3/envs/python3/lib/python3.6/site-packages/tornado/platform/asyncio.py in start(self) 130 self._setup_logging() 131 asyncio.set_event_loop(self.asyncio_loop) --> 132 self.asyncio_loop.run_forever() 133 finally: 134 asyncio.set_event_loop(old_loop)

~/anaconda3/envs/python3/lib/python3.6/asyncio/base_events.py in run_forever(self) 410 if events._get_running_loop() is not None: 411 raise RuntimeError( --> 412 'Cannot run the event loop while another loop is running') 413 self._set_coroutine_wrapper(self._debug) 414 self._thread_id = threading.get_ident()

RuntimeError: Cannot run the event loop while another loop is running

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/aws-samples/amazon-neptune-samples/issues/35?email_source=notifications&email_token=AACKOME67RA53LTQBGSGMQLREWGTFA5CNFSM4K3TMK7KYY3PNVWWK3TUL52HS4DFUVEXG43VMWVGG33NNVSW45C7NFSM4IQGR4GQ, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKOMATNWCKGXXVS37ERFLREWGTFANCNFSM4K3TMK7A .

iansrobinson commented 4 years ago

Hi Murphy

I think the issue here may be down to version of tornado you are using. Our Neptune-Sagemaker examples, and Neptune's own Neptune Workbench all use tornado==4.5.1.

ian