rsocket / rsocket-kotlin

RSocket Kotlin multi-platform implementation
http://rsocket.io
Apache License 2.0
553 stars 37 forks source link

Limit rate operator for streams #109

Closed whyoleg closed 3 years ago

whyoleg commented 4 years ago

Need to implement some limit rate operator for streams and channels, to improve flexibility of streams API. F.e to support similar operator from reactor like Flux.limitRate or reactor-like prefetch logic: request new elements, when 75% of elements collected

yschimke commented 4 years ago

Yep - just noticed that it's defaulting to the X + cancel, even for a take(3).

$ ./rsocket-cli --debug -r 3 wss://rsocket-demo.herokuapp.com/rsocket
11:34:30.215    NativePRNGNonBlocking is not found, fallback to SHA1PRNG
11:34:31.144    Send: 
Setup frame -> Stream Id: 0 Length: 52
Flags: 0b000000000 (M0R0L0)
Version: 1.0 Honor lease: false
Keep alive: interval=20.0s, max lifetime=90.0s
Data mime type: application/json
Metadata mime type: application/json
Data: Empty
11:34:31.171    Send: 
RequestStream frame -> Stream Id: 1 Length: 13
Flags: 0b100000000 (M1F0C0N0)
Initial request: 64
Metadata: Empty
Data: Empty
11:34:31.263    Receive: 
Payload frame -> Stream Id: 1 Length: 19
Flags: 0b000100000 (M0F0C0N1)
Data(length=13):
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 31 36 30 33 30 31 37 32 37 31 35 38 39          |1603017271589                      |
+--------+-------------------------------------------------+----------------+
1603017271589
11:34:31.312    Receive: 
Payload frame -> Stream Id: 1 Length: 19
Flags: 0b000100000 (M0F0C0N1)
Data(length=13):
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 31 36 30 33 30 31 37 32 37 31 36 33 39          |1603017271639                      |
+--------+-------------------------------------------------+----------------+
1603017271639
11:34:31.362    Receive: 
Payload frame -> Stream Id: 1 Length: 19
Flags: 0b000100000 (M0F0C0N1)
Data(length=13):
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 31 36 30 33 30 31 37 32 37 31 36 38 39          |1603017271689                      |
+--------+-------------------------------------------------+----------------+
11:34:31.364    Send: 
Cancel frame -> Stream Id: 1 Length: 6
Flags: 0b000000000 ()
1603017271689