Closed tomasfarias closed 1 year ago
We can't implement all methods of the event loop, only deterministic ones. You should never call Kafka from inside a workflow, only from inside an activity. A workflow is meant to be deterministic code only. There is never a case where you should need is_running
and we can't really give an accurate value for that or is_closed
like a non-distributed event loop can. I guess technically we could just always return True
from is_running
and always return False
from is_closed
, but it's not really accurate as it's never really running/closed in the background like a normal event loop.
I'm actually kind of glad this raised a not-implemented so it catches illegal things happening to the workflow's event loop, such as code based on whether it's "running" or not as if it was a local event loop.
You should never call Kafka from inside a workflow, only from inside an activity. A workflow is meant to be deterministic code only.
A call to Kafka (or any other external service) can be made deterministic: It's a matter of handling possible errors in a deterministic fashion. In my case, I ignore them and move on.
I'm actually kind of glad this raised a not-implemented so it catches illegal things happening to the workflow's event loop, such as code based on whether it's "running" or not as if it was a local event loop.
Not much else to say from my end as this is the direction you are glad to go with. We will deal with the issue from our side. Thank you for taking time to answer this request. I'll go ahead and close it.
Parts of python standard library assume that is_running and is_closed are implemented:
Eg consider this line in aiohttp https://github.com/aio-libs/aiohttp/blob/3f2f4a705f588d79a8649bfb17018cb30431df89/aiohttp/connector.py#L960
if sys.version_info >= (3, 12):
# Optimization for Python 3.12, try to send immediately
resolved_host_task = asyncio.Task(coro, loop=loop, eager_start=True)
And if we look in the stdlib asyncio code we can see this calls:
if eager_start and self._loop.is_running():
self.__eager_start()
But is_running() raises a NotImplementedError because the method is abstract.
If you do not support parts of the standard AbstractEventLoop interface that the standard core python library assumes exists, you need to reimplement them and give an error message that makes it clear what is being done wrong. It's particularly hard to debug since the event loop is only this WorkflowInstance in certain circumstances.
From the discussion above I think it's a mistake to try to make an aiohttp ClientSession outside of an activity. So that's how we fixed it, but more helpful errors explaining why the usage was wrong would be really appreciated here.
Is your feature request related to a problem? Please describe.
Trying to produce messages to Kafka using
aiokafka
from a Temporal Workflow fails when trying to check if the current event loop is running. This is caused by the the Temporal event loop not fully implement theAbstractEventLoop
interface, which includesis_running
. Although more understandable when it comes to thread methods (to_thread
, etc...), simpler methods likeis_running
should be easy to implement.Describe the solution you'd like
Add an
is_running
method to_WorkflowInstanceImpl
to better conform to theAbstractEventLoop
interface, perhaps by checking_deleting
:Although perhaps it should always be
True
as long as there is a Runtime defined?Additional context
The Python doc states that custom implementations of
AbstractEventLoop
should have all methods defined here. Although they are not being strict about it ("should have defined"), the closer Temporal's event loop gets to the interface, the more it can interoperate with other Python packages, likeaiokafka
.