numaproj / numaflow-python

Numaflow Python SDK
Apache License 2.0
52 stars 18 forks source link

Need to finish implementing Sourcer async_server support #119

Closed tolmanam closed 11 months ago

tolmanam commented 11 months ago

I've run into issues trying to start up an async source.

First there was #118 , and when I hack up my environment to work get past that typo, I get the following error when attempting to run.

 future: <Task finished name='Task-1' coro=<run.<locals>.new_coro() done, defined at /usr/local/lib/python3.9/site-packages/aiorun │
 .py:209> exception=TypeError("__init__() missing 2 required positional arguments: 'ack_handler' and 'pending_handler'")>          │
 Traceback (most recent call last):                                                                                                
   File "/usr/local/lib/python3.9/site-packages/aiorun.py", line 219, in new_coro                                                  
     await coro                                                                                                                    
   File "/usr/local/lib/python3.9/site-packages/pynumaflow/sourcer/async_server.py", line 213, in start                            │
     await self.__serve_async(server)                                                                                              
   File "/usr/local/lib/python3.9/site-packages/pynumaflow/sourcer/async_server.py", line 185, in __serve_async                    │
     AsyncSourcer(read_handler=self.__source_read_handler),                                                                        
 TypeError: __init__() missing 2 required positional arguments: 'ack_handler' and 'pending_handler'    

Message from the maintainers:

Impacted by this bug? Give it a 👍. We often sort issues this way to know what to prioritize.

tolmanam commented 11 months ago

Oddly enough.. the unit tests aren't complaining, so... I'm trying to understand how they are "working"

kohlisid commented 11 months ago

Hello @tolmanam Just to confirm your use case here, You are implementing a user defined source right?

tolmanam commented 11 months ago

Hi @kohlisid Yes. I am building a user-defined source.

I've cobbled together the code from several of your examples.

kohlisid commented 11 months ago

Hey @tolmanam So for UDS we need to provide the Sourcer with the three function types required for the implementation read_handler, ack_handler, pending_handler

TypeError: __init__() missing 2 required positional arguments: 'ack_handler' and 'pending_handler'

I see from the error that you are missing giving two of them, can you try to provide that implementation as well

tolmanam commented 11 months ago

Agreed, but in this case, the error appears to be coming from the async_server.py

It looks to me as if it's trying to create another instance of itself.

kohlisid commented 11 months ago

Sure thanks! Let me check and provide a fix for you!

kohlisid commented 11 months ago

Will be fixed in #120

kohlisid commented 11 months ago

@tolmanam https://github.com/numaproj/numaflow-python/releases/tag/v0.5.4 The fixed is merged, can you please try at your end

Thanks for pointing this out! Let us know if you face any other issue Looking forward to seeing your Numaflow Usecase :D