graze / queue

:postbox: Flexible abstraction for working with queues in PHP.
MIT License
49 stars 10 forks source link

Fix SQS receive limit function #9

Closed joemeehan closed 9 years ago

joemeehan commented 9 years ago

@h-bragg @sjparkinson

Prior to this change, if you specify a limit of more than 10 when polling an SQS queue, it would process at most 10 items from the queue (and probably fewer). This change now means that the function will process precisely the limit specified, and will wait indefinitely if there are fewer items on the queue than the limit. Personally I think this is how this function should work. What do you think?

h-bragg commented 9 years ago

The first bit is good :), but blocking until the number has been reached seems dangerous.

I would expect it to stop if the queue returns no items, which is what I assume the other adapters do too.

joemeehan commented 9 years ago

Yeah, I agree, though not sure the best way to handle this. The polling $limit = null option will also keep on polling forever. I'll have a look into how this works.

h-bragg commented 9 years ago

Need a test for the retries block of code.

shouldRecieve('getPath')->times(n) where n = number of retries.

joemeehan commented 9 years ago

@sjparkinson - Harry (and I, but mainly Harry) can't work out why these test cases are passing. It looks likes once() and times() aren't being checked properly. Can you have a look?

sjparkinson commented 9 years ago

I wish there was more time in the day. Will have a look this afternoon!

sjparkinson commented 9 years ago

Sorry it took so long to get to this. I've created a test that should cover the case where SQS doesn't always return MaxNumberOfMessages. It currently fails on master.

It also fails on your branch because of the changes to the value of MaxNumberOfMessages. I'm not sure why we need to bother changing that value though, surely leaving it at 10 is acceptable?

(Setting the value back to batch size makes the test pass, so the changes are :+1:).

public function testReceiveWithReceiveMessageReturningLessThanMaxNumberOfMessages()
{
    $url = $this->stubCreateQueue();
    $timeout = $this->stubQueueVisibilityTimeout($url);

    $receiveModel = m::mock('Guzzle\Service\Resource\Model');
    $receiveModel->shouldReceive('getPath')->with('Messages')->andReturn(
        [
            ['Body'=>'foo', 'Attributes'=>[], 'MessageAttributes'=>[], 'MessageId'=>0, 'ReceiptHandle'=>'a'],
            ['Body'=>'foo', 'Attributes'=>[], 'MessageAttributes'=>[], 'MessageId'=>0, 'ReceiptHandle'=>'a'],
            ['Body'=>'foo', 'Attributes'=>[], 'MessageAttributes'=>[], 'MessageId'=>0, 'ReceiptHandle'=>'a'],
            ['Body'=>'foo', 'Attributes'=>[], 'MessageAttributes'=>[], 'MessageId'=>0, 'ReceiptHandle'=>'a'],
            ['Body'=>'foo', 'Attributes'=>[], 'MessageAttributes'=>[], 'MessageId'=>0, 'ReceiptHandle'=>'a'],
            ['Body'=>'foo', 'Attributes'=>[], 'MessageAttributes'=>[], 'MessageId'=>0, 'ReceiptHandle'=>'a'],
            ['Body'=>'foo', 'Attributes'=>[], 'MessageAttributes'=>[], 'MessageId'=>0, 'ReceiptHandle'=>'a'],
            ['Body'=>'foo', 'Attributes'=>[], 'MessageAttributes'=>[], 'MessageId'=>0, 'ReceiptHandle'=>'a'],
            ['Body'=>'foo', 'Attributes'=>[], 'MessageAttributes'=>[], 'MessageId'=>0, 'ReceiptHandle'=>'a'],
        ],
        [
            ['Body'=>'foo', 'Attributes'=>[], 'MessageAttributes'=>[], 'MessageId'=>0, 'ReceiptHandle'=>'a'],
            ['Body'=>'foo', 'Attributes'=>[], 'MessageAttributes'=>[], 'MessageId'=>0, 'ReceiptHandle'=>'a'],
        ],
        null
    );

    $this->sqsClient->shouldReceive('receiveMessage')->with([
        'QueueUrl' => $url,
        'AttributeNames' => ['All'],
        'MaxNumberOfMessages' => SqsAdapter::BATCHSIZE_RECEIVE,
        'VisibilityTimeout' => $timeout
    ])->andReturn($receiveModel);

    $deleteModel = m::mock('Guzzle\Service\Resource\Model');
    $deleteModel->shouldReceive('getPath')->once()->with('Failed')->andReturn([]);
    $this->sqsClient->shouldReceive('deleteMessageBatch')->with(m::type('array'))->andReturn($deleteModel);

    $msgs = [];
    $this->client->receive(function ($msg) use (&$msgs) {
        $msgs[] = $msg;
    }, 11);

    $this->assertCount(11, $msgs);
}
sjparkinson commented 9 years ago

@h-bragg the reason why the expectation validation wasn't working is because there were no calls to Mockery::close(): http://docs.mockery.io/en/latest/reference/phpunit_integration.html

I'm working on adding it in, there's a few more failures it's highlighted.

See #10.

sjparkinson commented 9 years ago

@h-bragg another way around the issue of SQS not returning results is long-polling.

Passing ReceiveMessageWaitTimeSeconds as an option to the SqsAdapter with a decent value might help.

https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html

(TIL)

h-bragg commented 9 years ago

Hmm, the long polling seems like a better approach that retrying if there are no results returned (basically doing the same thing).

Can then remove the RETRY code and keep it simpler.

h-bragg commented 9 years ago

Long polling can be done on ReceiveMessage attributes with 'WaitTimeSeconds' => 1 too...

opinions on change?

sjparkinson commented 9 years ago

Long polling can be done on ReceiveMessage attributes with 'WaitTimeSeconds' => 1 too...

opinions on change?

Yea I don't get why Jake did this: https://github.com/graze/queue/blob/master/src/Adapter/SqsAdapter.php#L132

I think it'd be better to use the actual option name.

sjparkinson commented 9 years ago

Hmm, the long polling seems like a better approach that retrying if there are no results returned (basically doing the same thing).

Can then remove the RETRY code and keep it simpler.

If that works for you, then I'm for that. I'd add documentation to the class doc-block recommending setting of the option.

h-bragg commented 9 years ago

Yea I don't get why Jake did this: https://github.com/graze/queue/blob/master/src/Adapter/SqsAdapter.php#L132

I think it'd be better to use the actual option name.

I guess that is because thats the name of the configuration on the queue (but not the message)

h-bragg commented 9 years ago

If that works for you, then I'm for that. I'd add documentation to the class doc-block recommending setting of the option.

Yeah, go for it. This was for an extra safety case that is rarely seen.

h-bragg commented 9 years ago

Is the WaitTimeSeconds setting tested anywhere?

Or at least tested that the option gets passed through.

sjparkinson commented 9 years ago

Nope but it should. I'll look at adding a test.

sjparkinson commented 9 years ago

Ok @h-bragg, ready for a final :eyes:.

h-bragg commented 9 years ago

Looks good :+1: