ralscha / sse-eventbus

EventBus library for sending events from a Spring appliction to the web browser with SSE
Apache License 2.0
83 stars 28 forks source link

Customize output format for different connected clients? #28

Open JogoShugh opened 2 months ago

JogoShugh commented 2 months ago

Hello,

I was wondering if it's possible to dispatch a single event, but then format it different based on the client. I couldn't see a way to do this that is already built in, as the convert logic is global, not client-specific. So, I forked it and have spiked out an initial approach to doing this.

Let me know if you'd like to support this, I would be happy to clean up what I have done and do it in a way you'd prefer to see it. I imagine rather than subclassing, we could use a bean injection approach that passes a custom instance into SseEventBus.

In any case, here is what I've had to change in your source so far:

https://github.com/JogoShugh/sse-eventbus/pull/1/files

I wouldn't really call what I've done true "mediatype" support, since it's still streaming out text/event-stream in the "event: data: " format, but for now it works for me.

Then, in my own project code:

package org.starbornag.api

import ch.rasc.sse.eventbus.*
import ch.rasc.sse.eventbus.config.SseEventBusConfigurer
import com.fasterxml.jackson.databind.ObjectMapper
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Import
import org.springframework.http.MediaType
import org.starbornag.api.domain.bed.BedCellPlanted
import org.starbornag.api.domain.bed.BedEvent

class BedSseEventBus(configurer: SseEventBusConfigurer?, subscriptionRegistry: SubscriptionRegistry?) :
    SseEventBus(configurer, subscriptionRegistry) {

    private val iconMap = mapOf(
        "BedCellWatered" to "💧",
        "BedFertilized" to "🌿",
        "BedMulched" to "🪵"
    )

    private fun plantTypeToIcon(plantType: String) =
        when (plantType.lowercase()) {
            "tomato" -> "🍅"
            "eggplant" -> "🍆"
            "potato" -> "🥔"
            "carrot" -> "🥕"
            "hot pepper" -> "🌶️"
           // etc.....
            else -> "" // Default case (for unknown plant types)
        }

    override fun convertObjectForClient(event: SseEvent, client: Client): String {
        val mediaType = client.mediaType()
        return when (mediaType) {
            MediaType.APPLICATION_JSON -> super.convertObjectForClient(event, client)
            else -> {
                when (val data = event.data()) {
                    is BedCellPlanted -> plantTypeToIcon(data.plantType)
                    is BedEvent -> iconMap[data.javaClass.simpleName] ?: ""
                    else -> super.convertObjectForClient(event, client)
                }
            }
        }
    }
}

@Configuration
class BedSseEventBusConfiguration {
    @Autowired(required = false)
    protected var objectMapper: ObjectMapper? = null

    @Autowired(required = false)
    protected var dataObjectConverters: List<DataObjectConverter>? = null

    @Bean
    fun eventBus(): SseEventBus {
        val config = object : SseEventBusConfigurer {} // Use object expression

        val sseEventBus = BedSseEventBus(config, DefaultSubscriptionRegistry()) // Your derived class

        val converters = dataObjectConverters?.toMutableList() ?: mutableListOf()

        if (objectMapper != null) {
            converters.add(JacksonDataObjectConverter(objectMapper!!))
        } else {
            converters.add(DefaultDataObjectConverter())
        }

        sseEventBus.dataObjectConverters = converters

        return sseEventBus
    }
}

@Retention(AnnotationRetention.RUNTIME)
@Target(AnnotationTarget.CLASS)
@MustBeDocumented
@Import(
    BedSseEventBusConfiguration::class
)
annotation class EnableBedSseEventBus

Then, the only other change is adding the media type during creation of the client-specific emitter:

@GetMapping("/api/beds/{bedId}/events", produces =
    [MediaType.TEXT_EVENT_STREAM_VALUE, MediaType.APPLICATION_JSON_VALUE])
fun events(@PathVariable bedId: UUID,
           @RequestParam clientId: UUID,
           @RequestHeader("Accept") acceptHeader: MediaType?
): ResponseEntity<SseEmitter> {
            val bed = BedRepository.getBed(bedId)
    val eventNames = getEventNamesFromBedCells(bed)
    val mediaType = acceptHeader ?: MediaType.TEXT_PLAIN
    return ResponseEntity.ok(sseEventBus.createSseEmitter(
            clientId.toString(),
        120_000L,
            mediaType,
            *eventNames.toTypedArray()
        )
    )
}

private fun getEventNamesFromBedCells(bed: BedAggregate?) =
    bed!!.rows.flatMap { row ->
        row.cells.flatMap { cell ->
            listOf("events-$cell", "plants-$cell")
        }
    }
}

Test case

Here I have one browser client and two CLIs. The browser client actually sends text/event-stream (maybe that should be the default instead of text/plain) and one of the CLIs sends application/json:

The UI is using HTMX, so all I really need to do is stream fragments of HTML (so far just emojis lol) to populate the cells with current events:

image

But, I also want to be able to listen out for JSON events of the same origin and do other things with that.

So, this is working for me, but if you think there is some value here let me know, I'll be happy to clean it up and submit a PR.

JogoShugh commented 2 months ago

Well, I made another commit that uses a bean injection approach. It's much better already:

https://github.com/JogoShugh/sse-eventbus/commit/9617522d85f52da6fece0bca50839cec44f693f5

So, in my code, I no longer subclass, I just now have:

class BedEventConverter : MediaTypeAwareDataObjectConverter {
    private val iconMap = mapOf(
        "BedCellWatered" to "💧",
        "BedFertilized" to "🌿",
        "BedMulched" to "🪵"
    )

    private fun plantTypeToIcon(plantType: String) =
        when (plantType.lowercase()) {
            "tomato" -> "🍅"
            "eggplant" -> "🍆"
            "potato" -> "🥔"
            "carrot" -> "🥕"
            "corn" -> "🌽" // or "ear of corn"
            "hot pepper" -> "🌶️"
            "bell pepper" -> "🫑"
            "cucumber" -> "🥒"
            "broccoli" -> "🥦"
            "garlic" -> "🧄"
            "onion" -> "🧅"
            "lettuce" -> "🥬"
            "sweet potato" -> "🍠"
            "chili pepper" -> "🌶"
            "mushroom" -> "🍄"
            "peanuts" -> "🥜"
            "beans" -> "🫘"
            "chestnut" -> "🌰"
            "ginger root" -> "🫚"
            "shallot" -> "🫛"
            "herb" -> "🌿" // If you want to include herbs
            else -> "" // Default case (for unknown plant types)
        }

    override fun supports(event: SseEvent, mediaType: MediaType): Boolean {
        return when (event.data()) {
            is BedEvent -> true
            else -> false
        }
    }

    override fun convert(event: SseEvent, mediaType: MediaType): String? {
        return when (mediaType) {
            MediaType.APPLICATION_JSON -> null
            else -> {
                when (val data = event.data()) {
                    is BedCellPlanted -> plantTypeToIcon(data.plantType)
                    is BedEvent -> iconMap[data.javaClass.simpleName] ?: ""
                    else -> null
                }
            }
        }
    }
}

@Configuration
class BedEventConverterConfiguration {
    @Bean
    fun bedEventFormatter() : MediaTypeAwareDataObjectConverter = BedEventConverter()
}
ralscha commented 2 months ago

Hi

That looks great. I'll be happy to integrate it when you create a pull request.

Ralph

JogoShugh commented 2 months ago

Great! I'll work on it this weekend, will add tests and such

JogoShugh commented 1 month ago

Sitll hacking on this...

I wanted to dig deeper into SseEmitter and, in particular, understand DataWithMediatype..

As far as when using WebFlux, I am able to do it like this:

package org.starbornag.api

import org.springframework.http.HttpInputMessage
import org.springframework.http.HttpOutputMessage
import org.springframework.http.MediaType
import org.springframework.http.converter.HttpMessageConverter
import org.springframework.stereotype.Component
import org.starbornag.api.domain.bed.BedEvent

@Component
class BedEventToHtmlConverter : HttpMessageConverter<BedEvent> {

    override fun canRead(clazz: Class<*>, mediaType: MediaType?): Boolean = false

    override fun canWrite(clazz: Class<*>, mediaType: MediaType?): Boolean {
        return BedEvent::class.java.isAssignableFrom(clazz) &&
                mediaType?.isCompatibleWith(MediaType.TEXT_HTML) == true
    }

    override fun getSupportedMediaTypes(): List<MediaType> = listOf(MediaType.TEXT_HTML)

    override fun read(clazz: Class<out BedEvent>, inputMessage: HttpInputMessage): BedEvent {
        throw UnsupportedOperationException("Reading BedEvent from HTML is not supported")
    }

    override fun write(bedCellPlanted: BedEvent, contentType: MediaType?, outputMessage: HttpOutputMessage) {
        val html = BedEventHtmlFormatter.convert(bedCellPlanted)
        outputMessage.headers.contentType = MediaType.TEXT_HTML
        outputMessage.body.write(html.toByteArray())
    }
}

This then allows us to use the sseBuilder.data(Object, MediaType) overload which produces an instance of this:

https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.DataWithMediaType.html

And, the SseEmitter itself knows how to deal with that and call this appropriate implementation of HttpMessageConverter as show above.

So, this approach would remove the need entirely for the "MediaTypeAware" converter in my first pass.

After I realized the built in jackson converter was taking precedence, so I added a configurer option to bypass the DataObjectConverters, defaulting to false.

Then, in the my project I just did:

@Configuration
class SseEventBusConfiguration : SseEventBusConfigurer {
    override fun bypassDataObjectConverters(): Boolean = true
}

And thus, I still get exactly what I'm hoping for:

Two listeners, one wanting application/json and the other wanting text/html:

image

And, firing off the behavior that triggers the eventsL:

image

ralscha commented 1 month ago

That looks very good

JogoShugh commented 1 month ago

OK cool, I'll go with this approach then for final PR. Since the bypass option will be additive / opt-in it shouldn't break any existing behaviors.

JogoShugh commented 1 month ago

Note: As a tangent to this, I'm working on using this library Actson to do streaming JSON object "emission" (Kotlin Flow terminilogy) since openAI streams JSON to you in chunks like:

{ "comm

ands": [

{ "plan

tType": "tom

ato"

etc etc..

So, when combined with your library, I'm now able to turn the streamed JSON commands translated from openAI into commands as soon just enough of the fragment comes in, process it, and fire off the SSE notifications as quickly as possible.

I fiddled around with the Function Calling / "toolCalls" support that they provide, but it doesn't seem any more effective for what I'm doing than carefully constructing the prompt and giving it examples.

Example of that (still working bugs out) -> https://github.com/michel-kraemer/actson/issues/91

Most critically:

Even just calling openAi directly with cUrl and the same prompting takes 4 to 6 seconds when the embedded phrase results in N distinct commands. So, will be looking into fine-tuning a base model, but that's all very sub-tangent of course.