vert-x3 / vertx-lang-kotlin

Vert.x for Kotlin
Apache License 2.0
296 stars 68 forks source link

Enable Receive Channels to work with Pipe #192

Closed arminzavada closed 11 months ago

arminzavada commented 3 years ago

Describe the feature

Let us do the following:

val channel = createSomeReceiveChannel()
val writeStream = someWriteStream

Pump.pump(channel, writeStream).start()

Or the same thing, but reversed.

Use cases

Currently, we have to use a hot-loop to prevent buffer-overflows:

while (writeStream.writeQueueFull()) {
     delay(1)
}

for (data in dataChannel) {
     writeStream.write(data)
}

Contribution

I would implement this happily, however, I do not know Vertx well enough. If someone gave me some pointers, I would happily fix this.

vietj commented 3 years ago

are you using Vert.x 4 ?

if yes instead you should do something like

for (data in dataChannel) {
     writeStream.write(data).await()
}

otherwise I think the best would be to convert a channel to a ReadStream and then use ReadStream#pipe to a write stream that does the same internally.

arminzavada commented 3 years ago

otherwise I think the best would be to convert a channel to a ReadStream and then use ReadStream#pipe to a write stream that does the same internally.

How does one convert a channel into a ReadStream? I didn't find any way to do it at the moment, this could also be a viable feature.

if yes instead you should do something like

for (data in dataChannel) { writeStream.write(data).await() }

Would this be optimal in therms of performance?

tsegismont commented 1 year ago

Would this be optimal in therms of performance?

Perhaps something like this would be better:

for (data in dataChannel) {
  val fut = writeStream.write(data)
  if (writeStream.writeQueueFull()) {
    fut.await()
  }
}

How does one convert a channel into a ReadStream? I didn't find any way to do it at the moment, this could also be a viable feature.

That's something we can have in a future release.

tsegismont commented 11 months ago

In fact it's possible to convert the write stream to a channel and use a loop:

val channel = createSomeReceiveChannel()
val writeStream = someWriteStream

val sendChannel = writeStream.toSendChannel(vertx)
for (data in channel) {
  sendChannel.send(data)
}