Closed fertel closed 6 years ago
The mentioned code is that part that actually communicates to the server, so actual stuff does not happen in the current process.
You may also call produce_async
if you are concerned about blocking. By definition it does not wait.
If must wait - perhaps spawn a new process that sends the message to kafka and dies.
All of the produce (produce_async, produce_async_batch, produce and produce_batch) code suffers from the same bug. The pick function itself is flawed. I will gladly put together a test project as an example if that would help.
I have confirmed its that it is a bug in my own project and I have temporarily worked around it by wrapping ekaf functions inside a spawn function.
In the meantime - note that ekaf_lib is being called directly from ekaf which would be initiated from a calling process (whatever that may be). https://github.com/helpshift/ekaf/blob/master/src/ekaf.erl#L56-L66
Note https://github.com/helpshift/ekaf/blob/master/src/ekaf_lib.erl#L71-L93 - the first step is it finds the server process for a given topic. It then sends a message to that server process ! {pick, {Key,Data}, self()}
to get the worker process. After that message is sent the receive expression is used - at this point it will wait for whatever the next message is. This is all being done from whatever calling process initially called ekaf.
By your theory, No process that sends a message, should receive should and a receive should not never happen from a message that sends a message. But that's not the case. This is standard message passing behaviour, that gives the extended choice of choosing to spawn a message and hence know it's id - to you. If I abstracted that out - and always returned a pid that you did not control - it would not be in your best interest.
I will send an example. Also maybe this would be better to see as code sample. Essentially you are using the receive expression on the calling process not on a process generated by the ekaf library.
In a typical otp process that may be receiving gen_cast type messages its going to block until it receives any message. So by using the receive expression from a calling otp process behavior will be unexpected because your receive expression blocks and receives whatever the next message is sent to the calling process.
Calling ekaf from a gen server that receives messages is at risk of both dropping messages (to the gen server) or sending to an incorrect topic. I will post my elixir code if its not sufficient will be happy to port to erlang.
Actually can recreate in a few lines:
self() ! wrong_message,
ekaf:produce_sync(<<"test_topic">>,{<<"key">>,<<"value">>}),
receive
{ok, Pid}-> io:fwrite("~n sender pid: ~p, ekaf fsm pid: ~p~n",[self(),Pid])
end.
It seems you are calling ekaf:pick when just sending a single binary not in a tuple. All of the functions that use receive essentially break on the last message if receiving messages from other sources. It also drops all messages that are received in the interim both to the process and the current piece of data meant to be sent to kafka note:
_E ->
common_sync(Event, Topic, Rest, Timeout, [_E|Results])
In this case _E is any message thats not {ok,Pid}
. Because Rest is the remainder - when Rest == []
the receive would go to the calling process.
@bosky101 any update on this? Haven't been able to really dive into the code - but would using ekaf:pick(Topic) instead of the message sending and receive block fix this -or was there another reason you wrote it that way?
Sorry missed your last comment.
I wanted to block produce_sync and therefore it uses receive from the process calling ekaf.
That said if I understand correctly - your issue is when calling ekaf from a gen server that receives messages. Do you want a produce_sync that does not block, and your gen_server gets notified with the response whenever it happens? Is that why you're considering using pick directly?
If so - Yes you can take parts of ekaf:produce_sync (pick and common_sync) and write your own produce_sync_cast. And once tested and it looks good - do submit a pull request.
Thanks ~B
On 15-Jul-2016, at 8:38 PM, Jason Fertel notifications@github.com wrote:
@bosky101 any update on this? Haven't been able to really dive into the code - but would using ekaf:pick(Topic) instead of the message sending and receive block fix this -or was there another reason you wrote it that way?
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub, or mute the thread.
@bosky101 not at all what I'm looking for - it sounds like that is the way in which produce_async should work. I am looking for the worker selection to be properly encapsulated and not toss out messages meant for the calling process - the four line snippet from a few lines back recreates that.
A way to fix this would to be to use typical otp call functionality would allow you to block the calling process without accidentally flushing/receiving incorrect messages. Something like poolboy to checkout workers would suffice for this.
It seems `ekaf:pick would be fine as it appears to just call pg2 - which is a negligibly fast call but won't do anything intelligent to spread load around.
In the current implementation calling gen_fsm:sync_send_event would also suffice to ensure that there wouldn't be this problem.
I would be happy to create the pull request to fix this - based on your suggestions.
produce_async does what you need. Based on the partition picking strategy you set in ekaf - it is non blocking and distributes old among workers.
But, yes...
If you want sync - and block but not handle other messages to the calling process - then the current receive block in the sync call should be converted into gen_fsm:sync_send_event
will be happy to review.
~B
On 19-Jul-2016, at 12:36 AM, Jason Fertel notifications@github.com wrote:
@bosky101 not at all what I'm looking for - it sounds like that is the way in which produce_async should work. I am looking for the worker selection to be properly encapsulated and not toss out messages meant for the calling process - the four line snippet from a few lines back recreates that.
A way to fix this would to be to use typical otp call functionality would allow you to block the calling process without accidentally flushing/receiving incorrect messages. Something like poolboy to checkout workers would suffice for this.
It seems `ekaf:pick would be fine as it appears to just call pg2 - which is a negligibly fast call but won't do anything intelligent to spread load around.
In the current implementation calling gen_fsm:sync_send_event would also suffice to ensure that there wouldn't be this problem.
I would be happy to create the pull request to fix this - based on your suggestions.
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub, or mute the thread.
This is done with every call to kafka server processes - should probably be moved to inside the server process itself.
https://github.com/helpshift/ekaf/blob/master/src/ekaf_lib.erl#L137-L151
The receive blocks calling process until message is received - if the calling process is actively receiving messages unexpected behavior will occur (dropped messages, sending data to the wrong topic, etc...).
This would affect any producer function.