ralscha / sse-eventbus

EventBus library for sending events from a Spring appliction to the web browser with SSE
Apache License 2.0
79 stars 26 forks source link

Example of using see-event bus with Kotlin and CoroutineScope #26

Open JogoShugh opened 3 months ago

JogoShugh commented 3 months ago

Hello, I'm excited to have found this library. Thanks for doing this!

This is not really issue, but more of an example of getting this to work using Kotlin and with a CoroutineScope.

Background

Code

First, read this issue about sending to specific clients only

This issue was very informative and worked exactly as I needed. Thanks!

Controller

This is a WIP and I am by no means an expert on Coroutines yet, so any feedback is welcome!

Command handler function

package org.starbornag.api.rest.bed

import ch.rasc.sse.eventbus.SseEventBus
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import org.springframework.http.MediaType
import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.*
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter
import org.starbornag.api.domain.bed.BedRepository
import java.util.*
import java.util.concurrent.ConcurrentHashMap

@RestController
class BedCommandHandler(
    private val bedCommandMapper: BedCommandMapper,
    private val sseEventBus: SseEventBus
) {
    private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)

    @PostMapping("/api/beds/{bedId}/{action}", consumes = [MediaType.APPLICATION_JSON_VALUE])
    suspend fun handle(
        @PathVariable bedId: UUID,
        @PathVariable action: String,
        @RequestBody commandPayload: Any
    ): ResponseEntity<BedResourceWithCurrentState>  {
        try {
            val bed = BedRepository.getBed(bedId)
            scope.launch {
                val command = bedCommandMapper.convertCommand(action, commandPayload)
                bed?.execute(command, sseEventBus) // Execute the command directly
            }
            val resource = BedResourceWithCurrentState.from(bed!!)
            val response = ResponseEntity.ok(resource)
            return response
        } catch(e: Exception) {
            println("Here is the exception: " + e.stackTraceToString())
            throw e
        }
    }

    @GetMapping("/api/beds/{bedId}/events", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
    fun events(@PathVariable bedId: UUID,
               @RequestParam clientId: UUID
               ): SseEmitter =
        sseEventBus.createSseEmitter(clientId.toString(), 60_000L, bedId.toString())
}

events fun for streaming events

Bed.execute to dispatch to each Cell

Here, we provide the stream to interested clients. The clientId, as suggested in the linked issue above, allows us to register distinct clients for the single bed, which is passed in as its UUID for the event type.

    @GetMapping("/api/beds/{bedId}/events", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
    fun events(@PathVariable bedId: UUID,
               @RequestParam clientId: UUID
               ): SseEmitter =
        sseEventBus.createSseEmitter(clientId.toString(), 60_000L, bedId.toString())

BedAggregate

execute function

Within this class, most commands simply get dispatched to the associate BedCellAggregate instances. That is the case for the WaterBedCommand we will see soon.

Note this is also declared as suspend:

    // Generic command handler dispatcher
    suspend fun <T : BedCommand> execute(command: T, sseEventBus: SseEventBus) {
        when (command) {
            is PlantSeedlingCommand -> execute(command, sseEventBus)
            else ->  {
                dispatchCommandToAllCells(command, sseEventBus)
            }
        }
    }

dispatchCommandToAllCells

Again, we are in a suspend function, and we use coroutineScope and then launch on the call to get the cell and call its own execute, again passing the bus:

    private suspend fun dispatchCommandToAllCells(command: BedCommand, sseEventBus: SseEventBus) {
        coroutineScope {
            rows.forEach { row ->
                row.cells.forEach { cellId ->
                    launch {
                        val cell = BedCellRepository.getBedCell(cellId)
                        cell.execute(command, sseEventBus)
                    }
                }
            }
        }
    }

BedCellAggregate

Within here, the generic execute fun is also suspend, but the one that handles the WaterCommand specifically is not. It is where we actually call sseEventBus.handleEvent().

    // Generic command handler dispatcher
    suspend fun <T : BedCommand> execute(command: T, sseEventBus: SseEventBus) {
        // Simulate latency
        delay(10.milliseconds)
        when (command) {
            is PlantSeedlingCommand -> execute(command)
            is WaterCommand -> execute(command, sseEventBus)
            is FertilizeCommand -> execute(command)
            is HarvestCommand -> execute(command)
            else -> throw IllegalArgumentException("Unsupported command type")
        }
    }

    private fun execute(command: WaterCommand, sseEventBus: SseEventBus) {
        val wateredEvent = BedWatered(command.started, command.volume)
        events.add(wateredEvent)
        sseEventBus.handleEvent(SseEvent.of(command.bedId.toString(), wateredEvent))
    }

CLI Demo

Service is up and running

image

Start two listeners

#!/bin/sh
clientId=0AF7867B-CFC0-48C4-A7A9-0BAFBCDC5569
curl -N http://localhost:8080/api/beds/2fbda883-d49d-4067-8e16-2b04cc523111/events?clientId=$clientId
image
#!/bin/sh
clientId=B59D7469-6989-49EF-AA1D-2D646EF8B06B
curl -N http://localhost:8080/api/beds/2fbda883-d49d-4067-8e16-2b04cc523111/events?clientId=$clientId
image

Run the commands

image

Note that the "lasterWatered" value for the bed comes back initially as null, as expected because there is not yet any historical data here due to the simulated latency in the code above with delay(10):

image

Observe events in the first listener

image

Observe events in the second listener

image

I can continue sending the water bed commands and continue seeing the events flow in as well!

I hope this example proves helpful to anyone. If you know more about coroutines than I do, please feel free to give me some tips.