adjust / rmq

Message queue system written in Go and backed by Redis
MIT License
1.56k stars 205 forks source link

Add support for header transferring #136

Closed vearutop closed 1 year ago

vearutop commented 1 year ago

Redis protocol does not define a specific way to pass additional data like header. However, there is often need to pass them (for example for traces propagation).

This implementation injects optional header values marked with a signature into payload body during publishing. When message is consumed, if signature is present, header and original payload are extracted from augmented payload.

Header is defined as http.Header for better interoperability with existing libraries, for example with propagation.HeaderCarrier.

 // ....

 h := make(http.Header)
 h.Set("X-Baz", "quux")

 // You can add header to your payload during publish.
 _ = pub.Publish(rmq.PayloadWithHeader(`{"foo":"bar"}`, h))

 // ....

 _, _ = con.AddConsumerFunc("tag", func(delivery rmq.Delivery) {
     // And receive header back in consumer.
     delivery.(rmq.WithHeader).Header().Get("X-Baz") // "quux"

     // ....
 })

This change is intended to be backwards compatible.

Header parsing is only done if payload starts with "\xFF\x00\xBE\xBEJ", such signature is added along with header values explicitly by caller during publishing (using PayloadWithHeader helper).

github-actions[bot] commented 1 year ago

Lines Of Code

Language Files Lines Code Comments Blanks Complexity Bytes
Go 29 (+1) 2930 (+103) 2148 (+65) 280 (+15) 502 (+23) 392 (+13) 76.4K (+2.7K)
Go (test) 7 (+1) 1499 (+165) 1204 (+127) 46 (+5) 249 (+33) 84 (+3) 42.3K (+4.1K)
Markdown 1 645 (+39) 477 (+28) 0 168 (+11) 0 24.2K (+1.5K)
github-actions[bot] commented 1 year ago

Go API Changes

# github.com/adjust/rmq/v5
## compatible changes
(*TestBatchConsumer).Consumed: added
(*TestBatchConsumer).Last: added
(*TestConsumer).Deliveries: added
(*TestConsumer).Last: added
ExtractHeaderAndPayload: added
PayloadBytesWithHeader: added
PayloadWithHeader: added
WithHeader: added

# summary
Inferred base version: v5.0.2
Suggested version: v5.1.0
github-actions[bot] commented 1 year ago

Unit Test Coverage

total: (statements) 71.6% changed lines: (statements) 94.3%

Coverage of changed lines | File | Function | Coverage | |----------------|-------------------------|----------| | Total | | 94.3% | | delivery.go | | 83.3% | | delivery.go:34 | Header | 100.0% | | delivery.go:38 | newDelivery | 92.6% | | header.go | | 100.0% | | header.go:22 | PayloadWithHeader | 100.0% | | header.go:33 | PayloadBytesWithHeader | 100.0% | | header.go:50 | ExtractHeaderAndPayload | 100.0% | | queue.go | | 80.0% | | queue.go:230 | newDelivery | 100.0% | | queue.go:185 | consumeBatch | 77.8% |
Coverage diff with base branch | File | Function | Base Coverage | Current Coverage | |-------------------------------------------------|-------------------------|---------------|------------------| | Total | | 70.0% | 71.6% (+1.6%) | | github.com/adjust/rmq/v5/deliveries.go | Ack | 0.0% | 100.0% (+100.0%) | | github.com/adjust/rmq/v5/deliveries.go | each | 0.0% | 50.0% (+50.0%) | | github.com/adjust/rmq/v5/delivery.go | Header | no function | 100.0% | | github.com/adjust/rmq/v5/delivery.go | newDelivery | 100.0% | 80.0% (-20.0%) | | github.com/adjust/rmq/v5/header.go | ExtractHeaderAndPayload | no function | 100.0% | | github.com/adjust/rmq/v5/header.go | PayloadBytesWithHeader | no function | 100.0% | | github.com/adjust/rmq/v5/header.go | PayloadWithHeader | no function | 100.0% | | github.com/adjust/rmq/v5/queue.go | consumeBatch | 88.9% | 85.7% (-3.2%) | | github.com/adjust/rmq/v5/test_batch_consumer.go | Consume | 85.7% | 100.0% (+14.3%) |
github-actions[bot] commented 1 year ago

Benchmark Result

Benchmark diff with base branch ``` name old time/op new time/op delta Queue-2 177µs ± 8% 173µs ± 6% ~ (p=0.240 n=6+6) name old alloc/op new alloc/op delta Queue-2 46.6kB ±11% 44.2kB ± 7% ~ (p=0.180 n=6+6) name old allocs/op new allocs/op delta Queue-2 88.0 ± 0% 88.0 ± 0% ~ (all equal) ```
Benchmark result ``` name time/op Queue-2 173µs ± 6% name alloc/op Queue-2 44.2kB ± 7% name allocs/op Queue-2 88.0 ± 0% ```
wellle commented 1 year ago

The approach and the implementation seem very good. 👍

vearutop commented 1 year ago

@wellle I've added notes on compatibility to README, please check if they are clear enough (if you have better wording, please share).