Open keith-turner opened 5 months ago
I think long polling here should work. Ideally, being able to do pub/sub and push notifications to a listener is the best way so there isn't any polling but long polling is also viable and most messaging APIs support it. Both Kafka and JMS brokers (just as two examples) support long polling in their API and it works pretty well with similar behavior as here. The calls will wait for messages/records to arrive and then if the timeout passes will return empty/null, etc and you can try again.
Currently in the manager the same thread pool service fate, compaction coordinator, etc RPCs. So this could cause a problem if its a fixed size thread pool and all of the threads are waiting on an empty compaction queue. This would prevent fate RPCs from running or it could even block a compaction queue that has work (like all threads are waiting on queue A that has no work and compaction queue B has work, but nothing can get to it).
One simple way to solve this would be an unbounded thread pool to service manager RPCs. However there is some limit (not sure what it is) where too many threads in a JVM start to cause problems. Another possible solution is to make the server side processing for these messages async. Thrift supports async server processing, but there is no documentation for it. With async processing when an RPC request is waiting for a compaction job there would be no thread associated with it, which would be ideal. Since there are no docs for thrift async servers I have not been able to determine if its possible to mix sync and async processing for RPCs in thrift, because we would probably not want to make everything async in the manager for code complexity reasons. If they can not be mixed, maybe we could create a thrift service with its own port that only services request for compaction jobs. Maybe that could even be switched to using grpc as a trial run of grpc which also seems to support async. If we wait long enough would probably not need async processing for this as java virtual threads could be used instead. Not sure what is the best course of action is here, just posting some notes from researching this a bit.
Some of the things I found while looking into async thrift
https://issues.apache.org/jira/browse/THRIFT-1972 https://issues.apache.org/jira/browse/THRIFT-2427
i'm researching this now and so far async thrift may not be a great option. Besides the extremely poor documentation (non-existent) I have found some blog posts that make it sound like it's not truly async and that a thread is still used per RPC call. I'm not sure if this is true or not until I dive more into the code as there's literally no information anywhere about async thrift. The lack of information also makes me a bit skeptical about using it because it probably means no one else is and there could be bugs that haven't been worked out.
I have a few ideas that I am thinking through and and can list them here when I finish researching. This may end up in a situation where we need a different solution than Thrift for this use case depending on how the async code looks.
I looked into this quite a bit today to see if async thrift may work. We have a lot of custom code for Thrift and we currently disallow Async thrift processors. The processors are generated but we are not using it anywhere. It would take some work to try things out and I think we would need to stand up another server as there is an open issue that mentions multiplex processors not working. I don't think Async thrift is the best approach to try because of the lack of documentation and support and other issues.
I talked offline to @keith-turner a bit about this and it seems like gRPC might be a good alternate to try as a trial as noted in a previous comment. gRPC supports async and streaming RPC so it looks like it would work. A compactor could send in the request and the server could offload and wait to complete the StreamObserver that comes in until there is a new job available. CompleteableFuture could likely be used here to make the code easier to manage (I will let @keith-turner comment on this). By doing this we could handle a lot of requests and they would not be taking up threads while waiting.
I also had a few other alternate ideas:
1) We could create create an http REST endpoint using Jetty for this and use the Async servlet API. It would be easy to have a simple endpoint for the compactors to hit and the server can offload the async request until a job is available and then respond. The main drawback is having to stand up a new http service separate from everything else, similar to gRPC. However with gRPC that could potentially be re-used in the future for other services where as we would not want to use REST for some things (like scans)
2) We could redesign things entirely to use an embedded broker in the coordinator (such as Apache ActiveMQ or Artemis). We could create one or more queues for compaction jobs, the compactors could subscribe to the queue and the coordinator would publish jobs to the queue. The compactor consumers would need to configure a prefetch setting of 1 so that only 1 job is sent at a time so that it won't receive another until ready. This would work well but the drawback is having to configure a broker and having to do a lot of refactoring to the current architecture. It would be quite a big overhaul to how things work today.
3) We could invert things so the coordinator keeps track of free compactors and just sends a message to a free server with a job vs the compactors trying to query. This seems like it may not work well because things would get out of sync and add complexity to the coordinator.
CompleteableFuture could likely be used here to make the code easier to manage (I will let @keith-turner comment on this).
If we add the the following to CompactionJobQueues
public CompletableFuture<MetaJob> getAsync(CompactorGroupId groupId) {
// TODO implement
throw new UnsupportedOperationException();
}
then hoping if we can get an async way to handle job request RPC that the following could be done in CompactionCoordinator
/**
* Process an RPC from a compactor to get a compaction job asynchronously.
*
* @param callback a RPC callback that will send the compaction job back to the compactor
*/
public void getCompactionJob(String groupName, String compactorAddress,
String externalCompactionId, Consumer<TExternalCompactionJob> callback) {
CompactorGroupId groupId = CompactorGroupId.of(groupName);
CompletableFuture<CompactionJobQueues.MetaJob> future = jobQueues.getAsync(groupId);
// TODO if nothing is done then the reservation and RPC callback will run in the thread that
// completes the future which would probably be a TGW thread. This is easy to remedy w/
// completable future
// TODO need to timeout and return nothing periodically, completablefuture has mechanism for
// this that need investigation to see if workable
future.thenApply(metaJob -> reserve(compactorAddress, externalCompactionId, metaJob))
.thenAccept(callback);
}
// this method comes from copying existing code in CompactionCoordinator to function
private TExternalCompactionJob reserve(String compactorAddress, String externalCompactionId,
CompactionJobQueues.MetaJob metaJob) {
// reserve the compaction job
Optional<CompactionConfig> compactionConfig = getCompactionConfig(metaJob);
// this method may reread the metadata, do not use the metadata in metaJob for anything after
// this method
CompactionMetadata ecm = null;
var kind = metaJob.getJob().getKind();
// Only reserve user compactions when the config is present. When compactions are canceled the
// config is deleted.
var cid = ExternalCompactionId.from(externalCompactionId);
if (kind == CompactionKind.SYSTEM
|| (kind == CompactionKind.USER && compactionConfig.isPresent())) {
ecm = reserveCompaction(metaJob, compactorAddress, cid);
}
if (ecm != null) {
return createThriftJob(externalCompactionId, ecm, metaJob, compactionConfig);
} else {
return new TExternalCompactionJob();
}
}
I'm working on trying out gRPC now to see how it goes. I figure it would be good to get something simple to at least compare how it could work and see if we want to go that direction. Besides a hello world RPC to try things, my plan would be to try and create one RPC for now, which is the getCompactionJob() RPC that exists in the CompactionCoordinatorService and see if it would work.
I have not had a chance to open up a new issue yet to cover my findings of experimenting with async RPC using Async thrift and gRPC yet as mentioned in this comment, but I was trying to think of any other options as well to help solve this, assuming the metrics that are added in #4945 demonstrate an issue.
Something else I thought of that we could try is to create an embedded Jetty REST service in the Manager for the getCompactionJob() RPC call and switch the Compactors to use that service to get next compaction job.
I think this could work because:
The drawback would of course be that it would be a one off service that was only used for this one use case but otherwise as long as the authentication worked it would check the same boxes in terms of efficiently reducing latency for comaption job run time by allowing long polling so when a job is ready it can be immediately sent to a free compactor without blocking io threads.
Compactors poll for work w/ exponential backoff. When compactors are all idle for a while and there is a surge of jobs to do it can take them a bit to all start working.
One possible way to imporve this is to modify how polling works. The coordinator could hold request from compactors for a time period when nothing is currently queued. When something is queued it could be immediately given to a held compactor RPC request. Would not want to hold RPC request for too long because it could be related to a dead compactor. Could hold request for some time period like 60 to 90 seconds and return nothing if the queue is still empty. If the compactor is still alive it can make another request for work which will be held again if the queue is currently empty.
Decreasing this latency is good for a system that has lots of small files arriving constantly at tablets. With a model like this for polling and #4618, very low latency could be achieved for compaction of new bulk imported files. For minor compacted files would not have a signal like #4618 provides for bulk imports to queue compaction jobs for a tablet.