brave-intl / challenge-bypass-server

https://privacypass.github.io
BSD 3-Clause "New" or "Revised" License
5 stars 6 forks source link

fix: handle unknown topic etc. #708

Closed ibukanov closed 1 month ago

ibukanov commented 1 month ago

Refactor processMessagesIntoBatchPipeline to ensure that it closes the message channel to signal that the message processing is ready including the case when the message topic does not match any of configured topics. For that move all processing of the message including the search for topic processors to runMessageProcessor() groutine and use defer to close the channel. That required to change the message channel to be of struct{} type and store the error in the message itself so a simple defer close(msg.done) can be used to cover both normal and error cases.

Provide unit tests for processMessagesIntoBatchPipeline that cover success and error paths.

Bound batchPipeline capacity by the number of CPU cores to avoid running too many CPU-intensive tasks in parallel.

Closes #702 Closes #703 Closes #704

github-actions[bot] commented 1 month ago

[puLL-Merge] - brave-intl/challenge-bypass-server@708

Description

This PR makes several changes to the Kafka package to improve performance, simplify error handling, and make the code more testable.

The main changes are:

Possible Issues

Changes ### Changes kafka/main.go - Simplify `Processor` function signature - Add `messageReader` interface - Refactor `processMessagesIntoBatchPipeline` to limit concurrency and simplify message passing - Update `readAndCommitBatchPipelineResults` to match simplified `MessageContext` - Pass context to `Emit` kafka/main_test.go - Add unit tests for `processMessagesIntoBatchPipeline` kafka/signed_blinded_token_issuer_handler.go - Update handler to match new `Processor` signature - Refactor error handling to return errors - Pass context to error handling function - Return errors from `handlePermanentIssuanceError` kafka/signed_token_redeem_handler.go - Update handler to match new `Processor` signature - Refactor error handling to return errors - Pass context to error handling function - Return errors from `handlePermanentRedemptionError`