Closed buraksezer closed 1 month ago
I'm a bot and I ๐ this PR title. ๐ค
Here are some key observations to aid the review process:
โฑ๏ธ Estimated effort to review: 4 ๐ต๐ต๐ต๐ตโช |
๐งช PR contains tests |
๐ No security concerns identified |
โก Recommended focus areas for review Concurrency Concerns The use of sync.Map and atomic operations suggests that concurrency is a concern. However, it's essential to ensure that all operations on shared resources are safe and that there are no race conditions or deadlocks. Error Handling The error handling in the stream removal and garbage collection processes should be reviewed to ensure that errors are handled appropriately and do not lead to inconsistent states or resource leaks. Resource Management The implementation of garbage collection for stream managers should be carefully reviewed to ensure that it effectively frees up resources without prematurely removing active streams or causing interruptions. |
API Changes
--- prev.txt 2024-10-09 09:15:23.890104629 +0000
+++ current.txt 2024-10-09 09:15:18.070072734 +0000
@@ -7686,6 +7686,11 @@
ErrOAuthClientDeleted = "oauth.client_deleted"
)
const (
+ // ExtensionTykStreaming is the oas extension for tyk streaming
+ ExtensionTykStreaming = "x-tyk-streaming"
+ StreamGCInterval = 1 * time.Minute
+)
+const (
ResetQuota string = "resetQuota"
CertificateRemoved string = "CertificateRemoved"
CertificateAdded string = "CertificateAdded"
@@ -7737,10 +7742,6 @@
The name for event handlers as defined in the API Definition JSON/BSON
format
-const (
- // ExtensionTykStreaming is the oas extension for tyk streaming
- ExtensionTykStreaming = "x-tyk-streaming"
-)
const ListDetailed = "detailed"
const LoopScheme = "tyk"
const OIDPREFIX = "openid"
Explore these optional code suggestions:
Category | Suggestion | Score |
Security |
Improve security by using a more secure hash function for generating cache keys___ **Replace the MD5 hash function with a more secure hash function like SHA-256 forgenerating cache keys, as MD5 is vulnerable to collision attacks.** [gateway/mw_streaming.go [231]](https://github.com/TykTechnologies/tyk/pull/6601/files#diff-6f565750150d990575c808f1ca8f38483160dc6edf05f1534cd0bedb27c2e6c8R231-R231) ```diff -cacheKey := fmt.Sprintf("%x", md5.Sum(configJSON)) +cacheKey := fmt.Sprintf("%x", sha256.Sum256(configJSON)) ``` Suggestion importance[1-10]: 9Why: The suggestion to replace MD5 with SHA-256 for generating cache keys enhances security by mitigating collision vulnerabilities associated with MD5. This is a significant improvement in terms of security best practices. | 9 |
Resource management |
Ensure proper resource cleanup by calling the cancel function for each stream___ **Ensure that thecancel function from the context is called for each stream in removeStreamManager to properly release resources and avoid potential memory leaks.**
[gateway/mw_streaming.go [143]](https://github.com/TykTechnologies/tyk/pull/6601/files#diff-6f565750150d990575c808f1ca8f38483160dc6edf05f1534cd0bedb27c2e6c8R143-R143)
```diff
+sm.cancel()
s.streamManagerCache.Delete(cacheKey)
```
Suggestion importance[1-10]: 8Why: Adding a call to the cancel function ensures proper resource cleanup and prevents potential memory leaks, which is crucial for maintaining application stability and performance. | 8 |
Error handling |
Add error handling for JSON marshaling to enhance robustness___ **Add error handling for thejson.Marshal operation when creating the cache key to handle potential serialization issues gracefully.** [gateway/mw_streaming.go [230]](https://github.com/TykTechnologies/tyk/pull/6601/files#diff-6f565750150d990575c808f1ca8f38483160dc6edf05f1534cd0bedb27c2e6c8R230-R230) ```diff -configJSON, _ := json.Marshal(streamsConfig) +configJSON, err := json.Marshal(streamsConfig) +if err != nil { + s.Logger().Errorf("Failed to marshal streams config: %v", err) + return nil +} ``` Suggestion importance[1-10]: 7Why: Introducing error handling for the JSON marshaling process improves the robustness of the code by ensuring that serialization issues are caught and logged, preventing potential runtime errors. | 7 |
Concurrency management |
Improve thread safety and performance by reviewing locking mechanisms around shared resources___ **Consider using a more precise locking mechanism or review the necessity of lockingaround lastActivity.Store(time.Now()) to avoid potential race conditions or performance bottlenecks.** [gateway/mw_streaming.go [497-499]](https://github.com/TykTechnologies/tyk/pull/6601/files#diff-6f565750150d990575c808f1ca8f38483160dc6edf05f1534cd0bedb27c2e6c8R497-R499) ```diff +h.sm.routeLock.Lock() h.sm.lastActivity.Store(time.Now()) +h.sm.routeLock.Unlock() ``` Suggestion importance[1-10]: 6Why: The suggestion to add locking around the `lastActivity.Store` operation addresses potential race conditions, enhancing thread safety. However, it may introduce performance bottlenecks, so the impact is moderate. | 6 |
I do not think lastActivity doing its job now. The actual activity happens inside the f(w, r), and if there is long websocket connection, which last more then 10 minutes, it will timeout, even if it were active.
If there is current "f(w, r)" running for given consumer group, it should be counted as active.
h.sm.lastActivity.Store(time.Now()) f(w, r) h.sm.lastActivity.Store(time.Now())
@buger I think wrapping http.ResponseWriter
and updating lastActivity
in the Write
method could be an option.
@buraksezer
As alternative, you can just maintain counter of active connections, for each stream manager. Literally:
h.sm.Inc()
f(w, r)
h.sm.Dec()
And in GC just check if counter non 0.
@buraksezer
As alternative, you can just maintain counter of active connections, for each stream manager. Literally:
h.sm.Inc() f(w, r) h.sm.Dec()
And in GC just check if counter non 0.
This might be the only option because the underlying TCP connection is hijacked for websocket traffic and this invalidates my solution.
// Unload closes and remove active streams func (s *StreamingMiddleware) Unload() { s.Logger().Debugf("Unloading streaming middleware %s", s.Spec.Name) totalStreams := 0 s.streamManagers.Range(func(_, value interface{}) bool { manager, ok := value.(*StreamManager) if !ok { return true } manager.streams.Range(func(_, _ interface{}) bool { totalStreams++ return true }) return true }) globalStreamCounter.Add(-int64(totalStreams))
Do we really need separate streamManagers sync.Map here, since we already doing similar with streamManagersCache?
I think we do not need streamManagers
map. It's only used to track globalStreamCounter
. We can use streamManagersCache
for this.
Failed conditions
0.0% Coverage on New Code (required โฅ 80%)
C Reliability Rating on New Code (required โฅ A)
See analysis details on SonarCloud
Catch issues before they fail your Quality Gate with our IDE extension SonarLint
User description
TT-13139
Cherry-picked stream caching feature from this branch: https://github.com/TykTechnologies/tyk/pull/6538
Two new integration tests have been added to test
input http -> output http
scenario. See this issue for the details: https://tyktech.atlassian.net/browse/TT-13139Closing the previous one: https://github.com/TykTechnologies/tyk/pull/6592
PR Type
Enhancement, Tests
Description
StreamingMiddleware
to manage inactive streams and improve performance.Changes walkthrough ๐
mw_streaming.go
Implement stream caching and garbage collection in StreamingMiddleware
gateway/mw_streaming.go
mw_streaming_test.go
Add integration tests for HTTP server streaming scenarios
gateway/mw_streaming_test.go