taskcluster / cloud-mirror

cloud mirror
Mozilla Public License 2.0
0 stars 7 forks source link

multiple-consumers in each process #19

Closed jhford closed 7 years ago

jhford commented 8 years ago

This is a change to cloud-mirror so that we can run more than a single consumer in each process. The sqs-consumer option of batchSize is far from ideal for our use case. That option lets the sqs library request up to X items from the queue in a single poll. This would be great if sqs overhead were our bottleneck, but when our work item is copying a 4gb file, we don't want to jam up the rest of that worker from doing things.

This change lets us have as many copiers in a single process as we wish. This should have less overhead than running a complete container and node process for each copier itself.

jhford commented 8 years ago

I was hoping that @imbstack might have a chance to take a look at this PR. I know that @jonasfj is probably pretty busy with reviews, but I wanted to make sure you had the chance to look at this if you'd like.

jonasfj commented 8 years ago

I guess this is better than nothing... But to me the real problem is your SQS consumer... It always polls in batches of batchSize... So it waits for all messages to be processed before polling again...

Just re-implement this logic in your own wrapper around SQS using aws-sdk... Basically, you want to: 1) Have at-most maxPoll concurrent poll to SQS 2) Always poll with maxParallelism - messagesBeingProcessed

So when a message is done, then if maxPoll > currentPolls, then you send an SQS poll for maxParallelism - messagesBeingProcessed while setting messagesBeingProcessed = maxParallelism and doing currentPolls++. Otherwise, you set messagesBeingProcessed++.

When a poll request returns you set currentPolls--, and if you polled for K messages by only got N messages you set messagesBeingProcessed += N - K.

This way you won't be doing more than maxPoll concurrent polling calls. And you will always seek to be processing maxParallelism messages at the same time.

One further optimization in case a poll returns zero messages would be to set currentPolls += maxPolls, sleep 250ms and then set currentPolls -= maxPolls before doing another poll request again. This way you block needless poll requests for 250ms... Maybe 250ms is too much, maybe it needs to be 100ms.. And after a successful poll, you would probably have to immediately poll again, if maxParallelism - messagesBeingProcessed > 0 to ensure that number of parallel polls ramp up again...

Note: if using SQS long-polling and maxPolls isn't huge, you probably don't need to do any sleep in between the poll calls that returns zero messages (as such calls would take 20 seconds to timeout).


I've kind of wanted to do this for how the queue processes messages, like deadline expiration, claim resolver, etc... But haven't gotten around to do it yet.

jonasfj commented 8 years ago

r+, as I suspect this will work... but feels like duplicating a lot of logic... Well, that might be okay.

jhford commented 8 years ago

@jonasfj i still need to fix a small unit test issue before landing related to rejected handlers, but sounds like that shouldn't cause an r-...

Regarding batchsizes, i actually set the batchsize to 1. Without queues being pooled by size catgeory, we cannot run a batchsize greater than 1. If we set batchsize to 2, we could have a 4KB and a 4GB file in the same batch. That would mean that we'd have the copier process waiting for the entire 4GB file before taking the next file on once the 4KB one is done.

Since the main overhead of cloud-mirror is the streaming, not the polling of sqs, I'd rather just have batchsize of 1 and a lot more polling processes.

jonasfj commented 8 years ago

Since the main overhead of cloud-mirror is the streaming, not the polling of sqs, I'd rather just have batchsize of 1 and a lot more polling processes. View changes

That makes sense... Perhaps we should have two queues. one for small files (<32 MB) and one for big files (>32MB).. This way one big file won't block all the small ones from being copied over.

jhford commented 8 years ago

I like the idea of a small file queue, but sadly don't have time right now to implement that. Roughly the steps that I see are:

  1. create two sets of queues
  2. read the content-length header to get the file size and pass into the CacheManager.requestPut() method
  3. have the requestPut() method decide which of the queues to insert into.
  4. include the file size as a field in the msg sent so that the workers can do smart things like changing the visibilty timeout depending on the file size and which region they're coming from and to.
  5. ?
  6. profit.
jhford commented 8 years ago

@imbstack hey, as we talked about yesterday, could you review this? I'm happy to answer any questions about how it works if something's confusing.

jhford commented 8 years ago

@imbstack @jonasfj I'd like to get this ready to deploy this week sometime, any chance you could take a look at it today? It's probably a lot easier to review if you look at the overall changes instead of the intermediate commits

jonasfj commented 8 years ago

you need to update shrinkwrap..