typedb / typedb-driver-python

TypeDB Driver for Python
https://typedb.com
Apache License 2.0
67 stars 25 forks source link

Opening a session always spawns a new thread #222

Closed alexjpwalker closed 2 years ago

alexjpwalker commented 3 years ago

Problem to Solve

Opening a session always spawns a new thread. We tried using ThreadPool to reuse threads from a pool, but we couldn't find a way to make them daemon threads - meaning the application would hang for 5 seconds after any execution (eg a test run, or user's program run)

Proposed Solution

Either find a way to make a ThreadPool use daemon threads, or find a way to kill the pulse thread on closing a Session.

nicolamassarenti commented 3 years ago

Hi @alexjpwalker,

I am encountering the same problem: when doing ingestion of 10k+ entities/relations the number of threads starts to explode (more than 30k created, active only a few) and after a while the application crashes.

Below some screenshot:

Beginning of the ingestion image

Soon before application crash image

The error I get is:

Traceback (most recent call last):
  File "/home/nicolamassarenti/Documents/Noovle/puma/graph-ontology-icam/package/src/connectors/grakn.py", line 54, in _create_transaction
    transaction = session.transaction(transaction_type)
  File "/home/nicolamassarenti/Documents/Noovle/puma/graph-ontology-icam/package/venv-grakn-3.8/lib/python3.8/site-packages/grakn/core/session.py", line 83, in transaction
    return _CoreTransaction(self, transaction_type, options)
  File "/home/nicolamassarenti/Documents/Noovle/puma/graph-ontology-icam/package/venv-grakn-3.8/lib/python3.8/site-packages/grakn/core/transaction.py", line 55, in __init__
    self._bidirectional_stream = BidirectionalStream(stub, session.transmitter())
  File "/home/nicolamassarenti/Documents/Noovle/puma/graph-ontology-icam/package/venv-grakn-3.8/lib/python3.8/site-packages/grakn/stream/bidirectional_stream.py", line 42, in __init__
    self._response_iterator = stub.transaction(self._request_iterator)
  File "/home/nicolamassarenti/Documents/Noovle/puma/graph-ontology-icam/package/venv-grakn-3.8/lib/python3.8/site-packages/grakn/common/rpc/stub.py", line 84, in transaction
    return resilient_call(lambda: self._stub.transaction(request_iterator))
  File "/home/nicolamassarenti/Documents/Noovle/puma/graph-ontology-icam/package/venv-grakn-3.8/lib/python3.8/site-packages/grakn/common/rpc/stub.py", line 38, in resilient_call
    return function()
  File "/home/nicolamassarenti/Documents/Noovle/puma/graph-ontology-icam/package/venv-grakn-3.8/lib/python3.8/site-packages/grakn/common/rpc/stub.py", line 84, in <lambda>
    return resilient_call(lambda: self._stub.transaction(request_iterator))
  File "/home/nicolamassarenti/Documents/Noovle/puma/graph-ontology-icam/package/venv-grakn-3.8/lib/python3.8/site-packages/grpc/_channel.py", line 1085, in __call__
    call = self._managed_call(
  File "/home/nicolamassarenti/Documents/Noovle/puma/graph-ontology-icam/package/venv-grakn-3.8/lib/python3.8/site-packages/grpc/_channel.py", line 1186, in create
    _run_channel_spin_thread(state)
  File "/home/nicolamassarenti/Documents/Noovle/puma/graph-ontology-icam/package/venv-grakn-3.8/lib/python3.8/site-packages/grpc/_channel.py", line 1150, in _run_channel_spin_thread
    channel_spin_thread.start()
  File "src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pyx.pxi", line 117, in grpc._cython.cygrpc.ForkManagedThread.start
  File "/usr/lib/python3.8/threading.py", line 852, in start
    _start_new_thread(self._bootstrap, ())
RuntimeError: can't start new thread

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/nicolamassarenti/Documents/Noovle/puma/graph-ontology-icam/package/main.py", line 12, in <module>
    start_ingestion(data_path=path)
  File "/home/nicolamassarenti/Documents/Noovle/puma/graph-ontology-icam/package/src/ingestion.py", line 337, in start_ingestion
    grakn_handler.write(
  File "/home/nicolamassarenti/Documents/Noovle/puma/graph-ontology-icam/package/src/handlers/grakn_handler.py", line 30, in write
    grakn_client.insert(query=compiled_query)
  File "/home/nicolamassarenti/Documents/Noovle/puma/graph-ontology-icam/package/src/connectors/grakn.py", line 82, in insert
    with self._get_transaction(
  File "/usr/lib/python3.8/contextlib.py", line 113, in __enter__
    return next(self.gen)
  File "/home/nicolamassarenti/Documents/Noovle/puma/graph-ontology-icam/package/src/connectors/grakn.py", line 67, in _get_transaction
    with self._create_transaction(
  File "/usr/lib/python3.8/contextlib.py", line 113, in __enter__
    return next(self.gen)
  File "/home/nicolamassarenti/Documents/Noovle/puma/graph-ontology-icam/package/src/connectors/grakn.py", line 57, in _create_transaction
    transaction.close()
AttributeError: 'NoneType' object has no attribute 'close'

Looking forward the bugfix!

alexjpwalker commented 3 years ago

@nicolamassarenti , are you creating thousands of Sessions? This issue causes a new Thread to be spawned for each session, but they are also cleaned up when the Session is closed AFAIK.

nicolamassarenti commented 3 years ago

@alexjpwalker yes, I refactored the code you provided in the docs. Here the code that manages my sessions


    @contextmanager
    def _create_session(
        self, client: GraknClient, session_type: SessionType = SessionType.DATA
    ) -> GraknSession:
        session = None
        try:
            session = client.session(Database.NAME.value, session_type)
            yield session
        finally:
            session.close()

the wrappers for transaction and client management are very similar.

Then, everything is wrapped by:


    @contextmanager
    def _get_transaction(self, transaction_type: TransactionType) -> GraknTransaction:
        try:
            # Creating client
            with self._create_client() as client:
                # Creating session
                with self._create_session(client=client) as session:
                    # Creating Transaction
                    with self._create_transaction(
                        session=session, transaction_type=transaction_type
                    ) as transaction:
                        yield transaction

                        if transaction_type == TransactionType.WRITE:
                            # Committing write insert
                            transaction.commit()
                        if transaction_type == TransactionType.READ:
                            # Closing read transaction
                            transaction.close()
        finally:
            pass

That is called for read/write operations, like:


    def insert(self, query: str):
        with self._get_transaction(
            transaction_type=TransactionType.WRITE
        ) as transaction:
            transaction.query().insert(query=query)

The only difference of this code wrt the one you provide in the docs is that mine uses the @contextmanager. I don't see any flaws, do you? Or do you believe that the problem is related to an issue in the package?

Nicola

alexjpwalker commented 3 years ago

@nicolamassarenti : The code as written should work fine.

Note that it's inefficient to create a Session for every single Transaction - Sessions are designed to be long-lived and you would be able to do many transactions in one Session, as long as it's of the right type (schema or data).

I can imagine that, if you created 30000 sessions in less than 5 seconds, then the client might end up with 30000 threads. Each session creates a Pulse thread that lasts 5 seconds (or longer, if the session is not closed)

nicolamassarenti commented 3 years ago

@alexjpwalker I followed your suggestion and the number of threads remained constant to ~1250.

Thanks!

alexjpwalker commented 2 years ago

A user recently observed the following error on trying to open a transaction, after leaving a client running for a month:

 [...]
File "/home/ubuntu/adam-action-graph-service/action_selection/venv/lib/python3.8/site-packages/typedb/common/rpc/stub.py", line 76, in transaction
return resilient_call(lambda: self._stub.transaction(request_iterator))
File "/home/ubuntu/adam-action-graph-service/action_selection/venv/lib/python3.8/site-packages/typedb/common/rpc/stub.py", line 41, in resilient_call
return function()
File "/home/ubuntu/adam-action-graph-service/action_selection/venv/lib/python3.8/site-packages/typedb/common/rpc/stub.py", line 76, in <lambda>
return resilient_call(lambda: self._stub.transaction(request_iterator))
File "/home/ubuntu/adam-action-graph-service/action_selection/venv/lib/python3.8/site-packages/grpc/_channel.py", line 1210, in __call__
_consume_request_iterator(request_iterator, state, call,
File "/home/ubuntu/adam-action-graph-service/action_selection/venv/lib/python3.8/site-packages/grpc/_channel.py", line 266, in _consume_request_iterator
consumption_thread.start()
File "src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pyx.pxi", line 117, in grpc._cython.cygrpc.ForkManagedThread.start
File "/usr/lib/python3.8/threading.py", line 852, in start
_start_new_thread(self._bootstrap, ())
RuntimeError: can't start new thread

It's possible that this issue is related.