Open rdettai opened 2 weeks ago
Average search latency is 1.01x that of the reference (lower is better).
Ref run id: 2337, ref commit: 4ade7b5ec59685fc508c1834f2e23b5ca7b5afbe
Link
Average search latency is 0.981x that of the reference (lower is better).
Ref run id: 2339, ref commit: 4ade7b5ec59685fc508c1834f2e23b5ca7b5afbe
Link
we need a different handling of transient vs non-transient error. e.g. in the message parsing -> non-transient disconnection while streaming file -> transient... gzip corruption -> non-transient.
Note: I just stumbled upon https://github.com/quickwit-oss/quickwit/issues/1065 which is addressed as part of the reorganization of the FileSource
that is happening here:
Description
This PR proposes the generic implementation of a "queue" source. For now, only an implementation for AWS SQS with its data backed by AWS S3 is exposed to the users. Google Pubsub as the queue implementation or inlined data (i.e messages containing the data itself and not the link to the object store) will come next.
We use the shard API to provide deduplication of messages. For the current implementation where the source data is stored on S3, the deduplication is made on the object URI.
High level summary of the abstractions that are part of the generic implementation:
Processor
exposes the exact same methods as theSource
trait but does not implement it directly. Instead, the concrete queue sources (e.g.SqsSource
) wrap theProcessor
.RawMessage
: the message as received from the QueuePreProcessedPayload
: the message went through the minimal transformation to discover its partition idCheckpointedMessage
: the message was checked against the shared state (shard API), it is now ready to be processedInProgressMessage
: the message that is actively being readQueueSharedState
is an abstraction over shard API. By callingopen_shard
upon reception of the messages we avoid costly redundant processing when receiving a duplicate message.QueueLocalState
represents the state machine of the messages as they are processed by the indexing pipelineVisibilityTaskHandle
a task that extends the message visibility when required (needs to be reworked)TODO:
Processor
abstractionQueue
traitopen_shard
API to acceptpublish_token
as a field. This gives upsert semantics to the API which makes it possible to acquire the shard upon creation.SourceConfig.use_shard_api()
)Processor
abstractionShardState
abstractionSqsSource
(with some small refactoring to reuse thesetup_index
helper from theKafkaSource
)Publisher
actorTODO in subsequent PRs:
How was this PR tested?
This PR contains unit tests and higher level tests that use LocalStack.