faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.65k stars 183 forks source link

table_route decorator doesn't appear to give results as expected #187

Closed fonty422 closed 2 years ago

fonty422 commented 3 years ago

I'm trying to get a web view of a table, where I have several workers contributing to the overall table. The documentation seems to suggest that using the @app.table_route decorator reroutes the request to the worker that holds the key, but my requests to a different worker come back with a zero which suggests it doesn't find the key in the table.

If I make the request directly to the worker that is writing that key/value pair to the table, then I get the expected value.

A silly work around would be to write the data to a topic, which then is written to a table which is on a single worker that is used for api GET requests, but it seems like:

  1. Too much replication of the exact same data
  2. There's no point to workers if the work they do can't be visible outside of the worker.

Please correct me if I'm wrong and a good method is to pipe the incoming data through multiple streams (albeit duplicated) in order to arrive at a single desired result.

Expected behavior

Expect to make a GET request to the page view address and have returned the current table value for the given key

Actual behavior

Returns a zero value rather than the true current count value

ondrejchmelar commented 3 years ago

We've encountered this issue as well. It works fine in Robinhood's original version.

fonty422 commented 3 years ago

So it leads to the question, what changed to break this feature?

ondrejchmelar commented 3 years ago

I didn't find any change in faust code. faust-streaming uses different aiokafka library, maybe there's a difference?

Any chance it was fixed by https://github.com/faust-streaming/faust/commit/c1a6b0e216f62fe01f898f7dca7054bb7622161f ?

ran-ka commented 3 years ago

This might be a networking issue, can you please check:

The worker will attempt to route the request to the canonical_url URL(f"http://{web_host}:{web_port}"), so make sure it is reachable...

fonty422 commented 3 years ago

Thanks @ran-ka. The entire cluster is running in dev on a single machine (zookeeper, kafka, and workers), so they really should all be able to see each other. I'm using both the default 6066 to 60xx incrementing as required. I'll check the socket.gethostname(), but I can't imagine that it will return anything unexpected.

ran-ka commented 3 years ago

I see. Just curious, how do you set the individual worker ports, using --web_port from the CLI or using the app configuration? The reason I am asking is that I believe there is a bug in this code (https://github.com/faust-streaming/faust/pull/196). Anyway, the way I eventually found this bug, is by adding a log at this line, https://github.com/faust-streaming/faust/blob/master/faust/app/router.py#L73, logging routed_url... maybe this will help you to find the issue that you are facing

fonty422 commented 3 years ago

We do it via the CLI with the --web-port=XXXX command. If we don't parse that, then we get the web-port in use error, if we do, then it all works just fine. I'll try log the output and see what's going on there. Thanks for the advice.

ran-ka commented 3 years ago

Try to apply the commit in this PR and see if it helps: https://github.com/faust-streaming/faust/pull/196

ondrejchmelar commented 3 years ago

Or try setting URl via CK_WD_WEB_HOST envvar instead of --web-host parameter. That seems to work properly.

fonty422 commented 3 years ago

Thanks to you both, @ran-ka and @ondrejchmelar. I assume I simply parse CK_WD_WEB_HOST=xxxx instead of --web-port=xxxx when I start the app from the command line? I seem to also have another issue that feels related, but might not be. I have set a global table that all workers can see and will edit/update based on the information they receive, but I'm finding that sometimes one of the workers doesn't get updates for some of the keys in the table. The issue is that when this worker dies and a re-balance occurs the other workers seems to get the outdated data rather than the up to date data they just had themselves. Any idea as to why that might happen and what the fix might be? Would using rocksdb instead of memory resolve this do you suppose?

ran-ka commented 3 years ago

Thanks, @fonty422. I am glad that you managed to resolve your issues. BTW for me when running multiple workers locally on my Mac, I had to set --web-host as localhost for this to work. I guess if you are running inside a container it should be the container name.

Regarding GlobalTable, I don't have a lot of experience with them but I seem to remember, that by global, it means that all the workers consume all the partitions, but still, I believe that when you update the table, you need to do it from the right worker. Maybe shar emore details on when and how you update the table

ondrejchmelar commented 3 years ago

@fonty422 yes, just use CK_WD_WEB_HOST=xxx cmd instead of cmd --web-host xxx (or CK_WD_WEB_PORT instead of --web-port)