Open wuzzeb opened 8 years ago
Here is what I have now. batchGet I only read from a single table in my app so I made my life easier, but an approach like batchWrite could also be used. Each chunk of 100 for get and 25 for write (the maximum number of items allowed by AWS) starts a new exponential backoff. Warning: only lightly tested.
expBackoff :: MonadIO m => RetryPolicyM m
expBackoff = limitRetries 10 ++ fullJitterBackoff (50*1000) -- 50 milliseconds on first
batchGetSrc :: MonadAWS m => Text -> KeysAndAttributes -> Source m [HashMap Text AttributeValue]
batchGetSrc table initialKeys =
case NonEmpty.splitAt 100 (initialKeys^.kaaKeys) of
(first100, rest) -> do
batchGetSrcChunk table (initialKeys & kaaKeys .~ NonEmpty.fromList first100)
unless (null rest) $
batchGetSrc table (initialKeys & kaaKeys .~ NonEmpty.fromList rest)
batchGetSrcChunk :: MonadAWS m => Text -> KeysAndAttributes -> Source m [HashMap Text AttributeValue]
batchGetSrcChunk table initialKeys = loop (singletonMap table initialKeys) defaultRetryStatus
where
loop unprocessedKeys retry = do
resp <- lift $ send $ batchGetItem & bgiRequestItems .~ unprocessedKeys
yieldMany $ resp^..bgirsResponses . ix table
unless (null $ resp^.bgirsUnprocessedKeys) $ do
retry' <- applyAndDelay expBackoff retry
case retry' of
Nothing -> throwM $ DatabaseError "Hit maximum number of retries in batchGetSrcChunk"
Just r -> loop (resp^.bgirsUnprocessedKeys) r
batchWrite :: MonadAWS m => HashMap Text (NonEmpty WriteRequest) -> m ()
batchWrite m | null m = return ()
| foldl' (\x y -> x + length y) 0 m <= 25 = batchWriteChunk m -- short circuit when at most 25 total
| otherwise = go 0 mempty $ mapToList m
where
go _ reqs [] = unless (null reqs) $ batchWriteChunk reqs
go size reqs ((tbl,x):xs) = case NonEmpty.splitAt (25 - size) x of
(firstChunk, rest) -> do
let newSize = size + length firstChunk
let newReqs = insertMap tbl (NonEmpty.fromList firstChunk) reqs
let remainingReqs = if null rest then xs else insertMap tbl (NonEmpty.fromList rest) xs
if newSize == 25
then batchWriteChunk newReqs >> go 0 mempty remainingReqs
else go newSize newReqs remainingReqs
batchWriteChunk :: MonadAWS m => HashMap Text (NonEmpty WriteRequest) -> m ()
batchWriteChunk initial = loop initial defaultRetryStatus
where
loop unprocessedWrites retry = do
resp <- send $ batchWriteItem & bwiRequestItems .~ unprocessedWrites
unless (null $ resp^.bwirsUnprocessedItems) $ do
retry' <- applyAndDelay expBackoff retry
case retry' of
Nothing -> throwM $ DatabaseError "Hit maximum number of retries in batchWriteChunk"
Just r -> loop (resp^.bwirsUnprocessedItems) r
Hmm. This is a tough one and the exact use case (changing of the request per retry action) hasn't come up before.
I'm going to write a small test project at the end of the week and see if I can explore some simple way to support this.
@brendanhay I know this issue is old, but since then have you found an easier way to have an exponential backoff with Amazonka? Preferably for other services too, I am interested in S3
mostly?
For something like S3, exponential backoff should be fine with the tools in the Retry
type:
The problem with dynamodb here is that you get a successful response that indicates a partial processing of the input. At which point the caller is expected to craft a new response after a delay. Is S3 asking you to do that sort of rewriting (genuinely curious - I'm not 100% familiar with the guts of the S3 API)?
BatchGet and BatchWrite both return an UnprocessedItems (see http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html). The documentation states that if DynamoDB returns any unprocessed items, the request should be retried with just the unprocessed items using exponential backoff.
retrying
from Control.Retry also does not allow changing the action. At the moment, I am looking at usingapplyAndDelay
from Control.Retry in a loop which repeatedly calls send.Any ideas?