Closed rafaelsousa closed 1 year ago
I'd recommend to work on a MicroProfile Reactive Messaging connector for pulsar. Whether it should be alongside native APIs or in lieu of, I don't know.
Any ideas on when this will be started ?
So far no one has shown interest in starting the implementation. Could be you though :D
I would like to work on it
Neat! Just go ahead. The best is to copy the design of https://github.com/quarkusio/quarkus/tree/master/extensions/smallrye-reactive-messaging-kafka But before that you will likely need to implement a ReactiveMessaging connector in SmallRye for Pulsar. Again looking at the Kafka one will give you the right hints.
Email Quarkus-Dev and join Zulip when you have questions or want to discuss the design more.
@rafaelsousa are you working on this?
@ekarlso, I am starting to take a look, but I confess I am still a little lost.
So I'm currently building a PoC using Quarkus and Pulsar and am interested in this ticket.
I have a rudimentary implementation that enable publishing but I'm not sure how to integrate it with the small-rye messaging. Granted I haven't even tried yet, but any guidance helps.
PulsarBean.java
import io.quarkus.runtime.StartupEvent;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
@ApplicationScoped
public class PulsarBean {
private final Logger log = LoggerFactory.getLogger(PulsarBean.class);
@Inject
@ConfigProperty(name = "pulsar.service-url", defaultValue = "pulsar://localhost:6650")
String serviceUrl;
@Inject
@ConfigProperty(name = "pulsar.use-tls", defaultValue = "false")
boolean useTls;
PulsarClient pulsarClient;
void startup(@Observes StartupEvent event) {
log.debug("Starting PulsarMessageBean...");
try {
this.pulsarClient = PulsarClient.builder()
.serviceUrl(this.serviceUrl)
.allowTlsInsecureConnection(this.useTls)
.build();
} catch (PulsarClientException pcl) {
log.error("Error starting pulsar client", pcl);
}
}
public PulsarClient getPulsarClient() {
return this.pulsarClient;
}
}
And in my service I'm just injecting this bean into my service:
Service.java
@ApplicationScoped
public class ActionService {
@Inject
PulsarBean pulsar;
public Long createAction(Long actionId, String actionName) {
try {
Producer<ActionMessage> producer = this.pulsar.getPulsarClient()
.newProducer(Schema.JSON(ActionMessage.class))
.topic("actions-sync")
.producerName("create-action")
.create();
String messageKey = UUID.randomUUID().toString();
ActionMessage message = new ActionMessage();
message.setActionId(actionId);
message.setActionName(actionName);
producer.newMessage()
.key(messageKey)
.value(message)
.send();
} catch(PulsarClientException pse) {
log.error("Error sending message with name {} to pulsar", pse, pse);
}
return actionId;
}
}
Publishing seems straight forward in general, but I want to know how to integrate this with small-rye.
Suggestions on any docs or code references?
Hi @anthonyikeda,
I've made a pull request last year https://github.com/smallrye/smallrye-reactive-messaging/pull/488 As a Work In Progress, but I still didn't have any applications for it. But this week I have and I resumed working with it. Hope I'll find the time to finish it soon.
@rafaelsousa I saw that it is has been stopped for a few months now. I'm interested in using Pulsar with Quarkus and I may be able to help.
Hi @vipseixas, we could work together on this. The easier step would be that I invite you to my repo or you could fork the current smallrye repo and I'll merge my code into yours. What do you say?
@rafaelsousa I think it's easier if you invite me to your repo but first I forked your repo and I'll browse to the code a little this weekend.
camel has a pulsar support which is available in quarkus as camel-quarkus-pulsar
Hello, I see that this is the top requested extension. And cxf seems pretty mature so I think to take another one now. And jna too. I can help and push it to quarkiverse will be better I think.
FYI, @geoand and @ozangunalp are working on this.
FYI, pulsar community begin working for A pulsar client of reactive https://github.com/apache/pulsar-client-reactive
@ozangunalp we should probably try and get our work over the finish line :)
FYI, pulsar community begin working for A pulsar client of reactive https://github.com/apache/pulsar-client-reactive
FWIW, their regular client Pulsar already support non-blocking IO. We definitely won't be utilizing any client that is based on Reactor
Apache Pulsar really looks like a rewarding candidate for an extension, since it seems to grow pretty fast https://pulsar.apache.org/blog/2022/05/11/apache-pulsar-community-welcomes-500th-contributor/
and now in July 2022 it seems to have also the highest number of pull requests in the field of messaging/streaming tech (even more than kafa) https://www.youtube.com/watch?v=ckfw68-cn2o
-> will be really exiting what we can build combining these two in perfect manner :-D
IIUC, @ozangunalp is putting the finishing touches on it, right?
@geoand yes.
Excellent. Let me know if you need anything :)
It is on the right track, I need to iron out some issues for an experimental release.
Do you have any updates here?
There is a draft PR but I haven't advanced since my last comment. We've been focused mainly on some Kafka-related features. I hope next week I'll have some time to work on that.
I think we can close this issue as it will be tracked in
FWIW, their regular client Pulsar already support non-blocking IO. We definitely won't be utilizing any client that is based on Reactor
Hi, I'm one of the maintainers of the Apache Pulsar Reactive client. Do you mean you won't use a client that uses internally Reactor ? Or is it only the API ? The Reactive client APIs are based on r.s.Publisher
only and can be adapted by any reactive library.
Forgot to close it.
*Extension to support Apache Pulsar publish / subscribe model
Apache Pulsar is growing in adoption and accordingly to this article
Interested in this extension, please +1 via the emoji/reaction feature of GitHub (top right).