Open robsonhermes opened 1 year ago
I see that an unbounded sleep was introduced in #1348, I wonder if that has made the change in #677 ineffective.
Hello @gharris1727
Thanks for the reply. IMHO, #1348 did not cause the "Graceful stop of task failed". I did some tracing today and could clearly see the task logging every 100ms that was waiting for the next poll()
interval (even when a DEL was requested in the connector via API). So the stop()
caused by the DEL got only processed when the current poll()
iteration happened, as it was the only chance stop()
got to be finally processed.
On a side note, all the merit goes to you, as you pointed the issue in the kafka mailing list when I submitted this same question.
Finally, what is puzzling me is that there is a test which, apparently, checks if a task can be started/stopped by the same thread. But I'm still trying to understand in details if the test is missing something.
The testStartStopSameThread doesn't catch the scenario of this issue because it tests only starting and stoping. It doesn't cover the scenario where the poll()
is already invoked "nearly" after the start()
, which is fairly what happens in a real scenario. See here.
I think a more complete test would be: start
, poll
, stop
. Which would re-create the issue (stop will be processed after current poll returns). However, kafka source code tells expected behavior is to stop any in progress poll
:
Signal this SourceTask to stop. In SourceTasks, this method only needs to signal to the task that it should stop trying to poll for new data and interrupt any outstanding poll() requests.
@C0urante 's original suggestion in #677 would do the trick. However, I don't know how to implement this other than spawning a separate thread in JdbcSourceTask
and make the poll()
run in this separate thread, freeing up the dedicated task thread to be able to receive stop()
call.
Maybe it would be possible to simply to replace this if with:
if (sleepMs > 0) {
log.trace("{}ms to poll {}. Will return control.", nextUpdate - now, querier);
return Collections.emptyList();
}
Idea is to, if this task needs to sleep, we actually return an empty list (don't return null cause I guess that's to signal a pause?).
Checking here, which is where I believe the connector task is called from, believe returning the empty list would be a no-op, and then the poll()
would be called again, until the time for the next execution is finally achieved. But with this way, returning control, if there's a pending stop()
it would be execute before the next poll()
.
Thoughts?
@robsonhermes : was that resolved since? if not, how did you workaround it?
Hello @papaya-daniel
Thanks for reaching out. Fortunately, the issue did not caused a real impact on my use case.
I ended up filtering out from our logging the undesirable error log Graceful stop of task failed
.
Hi, I've opened a PR with the proposed solution for this problem.
Can someone have a look at my PR ? Thanks in advance :)
@jakubmalek , will take a look next week, thanks!
This is a new issue and not exactly the same as #677
I'm facing this error whenever stoping my JDBC connectors. Got in touch with Kafka connect users and they explained is most likely caused by: https://issues.apache.org/jira/browse/KAFKA-15090
Since this change, the stop signal in invoked in the same thread as the dedicated thread for the task. This results in the
JdbcSourceTask
to only process thestop()
signal when it returns from the currentpoll()
.Basically it means this change in kafka connect made the change of #677 stop reaching the goal of that change.
On my case, we are using kafka connect and JDBC source/sink to move large amounts of data around, once per day. Waiting for the next
poll()
returning to be able to cleanly stop tasks is not really an option.I'd guess the required change would be to implement the
poll()
in some sort of non blocking way, but I admit not quite sure how to implement it.