mateusjunges / laravel-kafka

Use Kafka Producers and Consumers in your laravel app with ease!
https://laravelkafka.com/docs/v2.0/introduction
MIT License
580 stars 83 forks source link

Upstash Support - Producer not sending messages. #274

Closed nathandaly closed 6 months ago

nathandaly commented 7 months ago

Package Version v2.0.1

Laravel Version 11

PHP Version 8.3

Problem description Using the examples documented I have tried to produce messages that will show up in my Upstash Kafka instance. I am new to Kafka and the topic base messaging strategy so it may be user error.

Expected behaviour

When I run the code below. I expect to see messages appear in my Upstash Kafka instance. When I run the CURL command I do see message so I know it is able to receive them.

curl "https://direct-yak-8714-eu1-rest-kafka.upstash.io/webhook?topic=enrolments&user=ZGlyZWN0LXlhay04NzE0JKgyDZiuvCQpWmrEhellSXVgdQNnOE4P97TQwKmIIwI&pass=****" \
  -d "this is the message"

I have tried with and without Sasl and neither are able to produce messages in Upstash.

use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message\Serializers\JsonSerializer;
use Junges\Kafka\Message\Message;

$producer = Kafka::publish(broker: "direct-yak-8714-eu1-kafka.upstash.io:9092")
  ->withSasl(
    username: "ZGlyZWN0LXlhay04NzE0JKgyDZiuvCQpWmrEhellSXVgdQNnOE4P97TQwKmIIwI",
    password: "****",
    mechanisms: "SCRAM-SHA-256",
    securityProtocol: "SASL_SSL"
  )
  ->withLogCb(fn($log) => dd($log))
  ->onTopic("enrolments")
  ->usingSerializer(new JsonSerializer())
  ->withDebugEnabled()
  ->withMessage(new Message(body: ["key" => "value"]));
$producer->send();

❓Is it because it's appending /bootstrap to the URL?

Relevant log output Producer send() returns true with the below log call back.

RdKafka\Producer {
  out_q_len: 1
  orig_broker_id: -1
  orig_broker_name: "sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap"
  brokers: RdKafka\Metadata\Collection {...}
  topics: RdKafka\Metadata\Collection {...}
}
%7|1712833876.424|SASL|rdkafka#producer-1| [thrd:app]: Selected provider SCRAM (builtin) for SASL mechanism SCRAM-SHA-256
%7|1712833876.424|OPENSSL|rdkafka#producer-1| [thrd:app]: Using OpenSSL version OpenSSL 3.0.2 15 Mar 2022 (0x30000020, librdkafka built with 0x30000000)
%7|1712833876.425|WAKEUPFD|rdkafka#producer-1| [thrd:app]: sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap: Enabled low-latency ops queue wake-ups
%7|1712833876.425|BROKER|rdkafka#producer-1| [thrd:app]: sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap: Added new broker with NodeId -1
%7|1712833876.425|CONNECT|rdkafka#producer-1| [thrd:app]: sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap: Selected for cluster connection: bootstrap servers added (broker has 0 connection attempt(s))
%7|1712833876.425|BRKMAIN|rdkafka#producer-1| [thrd:sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap]: sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap: Enter main broker thread
%7|1712833876.425|CONNECT|rdkafka#producer-1| [thrd:sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap]: sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap: Received CONNECT op
%7|1712833876.425|STATE|rdkafka#producer-1| [thrd:sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap]: sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1712833876.425|BRKMAIN|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1712833876.425|BROADCAST|rdkafka#producer-1| [thrd:sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap]: Broadcasting state change
%7|1712833876.425|CONNECT|rdkafka#producer-1| [thrd:sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap]: sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap: broker in state TRY_CONNECT connecting
%7|1712833876.425|STATE|rdkafka#producer-1| [thrd:sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap]: sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
%7|1712833876.425|BROADCAST|rdkafka#producer-1| [thrd:sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap]: Broadcasting state change
%7|1712833876.425|INIT|rdkafka#producer-1| [thrd:app]: librdkafka v1.8.0 (0x10800ff) rdkafka#producer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer, GCC GXX INSTALL GNULD LDS C11THREADS LIBDL PLUGINS ZLIB SSL SASL_CYRUS ZSTD HDRHISTOGRAM LZ4_EXT SYSLOG RAPIDJSON SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER, debug 0xfffff)
%7|1712833876.425|CONF|rdkafka#producer-1| [thrd:app]: Client configuration:
%7|1712833876.425|CONF|rdkafka#producer-1| [thrd:app]:   client.software.version = 1.8.0
%7|1712833876.425|CONF|rdkafka#producer-1| [thrd:app]:   metadata.broker.list = direct-yak-8714-eu1-kafka.upstash.io:9092
%7|1712833876.425|CONF|rdkafka#producer-1| [thrd:app]:   debug = generic,broker,topic,metadata,feature,queue,msg,protocol,cgrp,security,fetch,interceptor,plugin,consumer,admin,eos,mock,assignor,conf,all
%7|1712833876.425|CONF|rdkafka#producer-1| [thrd:app]:   log_level = 7
%7|1712833876.425|CONF|rdkafka#producer-1| [thrd:app]:   opaque = 0xffff8a14d890
%7|1712833876.425|CONF|rdkafka#producer-1| [thrd:app]:   security.protocol = sasl_ssl
%7|1712833876.425|CONF|rdkafka#producer-1| [thrd:app]:   ssl_key = [redacted]
%7|1712833876.425|CONF|rdkafka#producer-1| [thrd:app]:   sasl.mechanisms = SCRAM-SHA-256
%7|1712833876.425|CONF|rdkafka#producer-1| [thrd:app]:   sasl.username = [redacted]
%7|1712833876.425|CONF|rdkafka#producer-1| [thrd:app]:   sasl.password = [redacted]
%7|1712833876.425|CONF|rdkafka#producer-1| [thrd:app]:   compression.codec = snappy
%7|1712833876.425|CONF|rdkafka#producer-1| [thrd:app]:   dr_msg_cb = 0xffff8ab9e634
%7|1712833876.426|TOPIC|rdkafka#producer-1| [thrd:app]: New local topic: enrolments
%7|1712833876.426|TOPPARNEW|rdkafka#producer-1| [thrd:app]: NEW enrolments [-1] 0xaaaacc927060 refcnt 0xaaaacc9270f0 (at rd_kafka_topic_new0:465)
%7|1712833876.426|CONNECT|rdkafka#producer-1| [thrd:app]: Not selecting any broker for cluster connection: still suppressed for 49ms: leader query
%7|1712833876.426|METADATA|rdkafka#producer-1| [thrd:app]: Hinted cache of 1/1 topic(s) being queried
%7|1712833876.426|METADATA|rdkafka#producer-1| [thrd:app]: Skipping metadata refresh of 1 topic(s): leader query: no usable brokers
%7|1712833876.427|PURGE|rdkafka#producer-1| [thrd:app]: enrolments [-1]: purging queues (purge_flags 0x3, exclude xmit_msgq)
%7|1712833876.456|CONNECT|rdkafka#producer-1| [thrd:sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap]: sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap: Connecting to ipv4#63.34.151.162:9092 (sasl_ssl) with socket 13
%7|1712833876.456|PURGE|rdkafka#producer-1| [thrd:sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap]: sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap: Purging queues with flags queue,inflight
%7|1712833876.456|PURGEQ|rdkafka#producer-1| [thrd:sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap]: sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap: Purged 0 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
%7|1712833876.456|PURGEQ|rdkafka#producer-1| [thrd:sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap]: sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap: Purged 0 message(s) from 0 partition(s)
%7|1712833876.456|PURGEQ|rdkafka#producer-1| [thrd:app]: Purged 0 message(s) from 0 UA-partition(s)
%7|1712833876.456|DESTROY|rdkafka#producer-1| [thrd:app]: Terminating instance (destroy flags none (0x0))
%7|1712833876.456|TERMINATE|rdkafka#producer-1| [thrd:app]: Interrupting timers
%7|1712833876.456|TERMINATE|rdkafka#producer-1| [thrd:app]: Sending TERMINATE to internal main thread
%7|1712833876.456|TERMINATE|rdkafka#producer-1| [thrd:app]: Joining internal main thread
%7|1712833876.456|TERMINATE|rdkafka#producer-1| [thrd:main]: Internal main thread terminating
%7|1712833876.456|DESTROY|rdkafka#producer-1| [thrd:main]: Destroy internal
%7|1712833876.456|BROADCAST|rdkafka#producer-1| [thrd:main]: Broadcasting state change
%7|1712833876.456|DESTROY|rdkafka#producer-1| [thrd:main]: Removing all topics
%7|1712833876.456|TOPPARREMOVE|rdkafka#producer-1| [thrd:main]: Removing toppar enrolments [-1] 0xaaaacc927060
%7|1712833876.456|DESTROY|rdkafka#producer-1| [thrd:main]: enrolments [-1]: 0xaaaacc927060 DESTROY_FINAL
%7|1712833876.456|DESTROY|rdkafka#producer-1| [thrd:main]: Sending TERMINATE to sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap
%7|1712833876.456|TERMINATE|rdkafka#producer-1| [thrd:main]: Purging reply queue
%7|1712833876.456|TERMINATE|rdkafka#producer-1| [thrd:main]: Decommissioning internal broker
%7|1712833876.456|TERMINATE|rdkafka#producer-1| [thrd:main]: Join 2 broker thread(s)
%7|1712833876.456|TERM|rdkafka#producer-1| [thrd:sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap]: sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap: Received TERMINATE op in state CONNECT: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
%7|1712833876.456|TERM|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Received TERMINATE op in state INIT: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
%7|1712833876.456|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap]: sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap: Client is terminating (after 30ms in state CONNECT) (_DESTROY)
%7|1712833876.456|STATE|rdkafka#producer-1| [thrd:sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap]: sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap: Broker changed state CONNECT -> DOWN
%7|1712833876.456|BROADCAST|rdkafka#producer-1| [thrd:sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap]: Broadcasting state change
%7|1712833876.456|BUFQ|rdkafka#producer-1| [thrd:sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap]: sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap: Purging bufq with 0 buffers
%7|1712833876.456|FAIL|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Client is terminating (after 30ms in state INIT) (_DESTROY)
%7|1712833876.456|STATE|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Broker changed state INIT -> DOWN
%7|1712833876.456|BROADCAST|rdkafka#producer-1| [thrd::0/internal]: Broadcasting state change
%7|1712833876.456|BUFQ|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers
%7|1712833876.456|BUFQ|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers
%7|1712833876.456|BUFQ|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Updating 0 buffers on connection reset
%7|1712833876.456|BRKTERM|rdkafka#producer-1| [thrd::0/internal]: :0/internal: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
%7|1712833876.456|TERMINATE|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Handle is terminating in state DOWN: 1 refcnts (0xaaaacca3bcb0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
%7|1712833876.456|BUFQ|rdkafka#producer-1| [thrd:sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap]: sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap: Purging bufq with 0 buffers
%7|1712833876.456|BUFQ|rdkafka#producer-1| [thrd:sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap]: sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap: Updating 0 buffers on connection reset
%7|1712833876.456|BRKTERM|rdkafka#producer-1| [thrd:sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap]: sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
%7|1712833876.456|TERMINATE|rdkafka#producer-1| [thrd:sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap]: sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap: Handle is terminating in state DOWN: 1 refcnts (0xaaaacca3c900), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
%7|1712833876.456|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap]: sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
%7|1712833876.456|BUFQ|rdkafka#producer-1| [thrd:sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap]: sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap: Purging bufq with 0 buffers
%7|1712833876.456|BUFQ|rdkafka#producer-1| [thrd:sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap]: sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap: Purging bufq with 0 buffers
%7|1712833876.456|BUFQ|rdkafka#producer-1| [thrd:sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap]: sasl_ssl://direct-yak-8714-eu1-kafka.upstash.io:9092/bootstrap: Updating 0 buffers on connection reset
%7|1712833876.456|FAIL|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
%7|1712833876.456|BUFQ|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers
%7|1712833876.456|BUFQ|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers
%7|1712833876.456|BUFQ|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Updating 0 buffers on connection reset
%7|1712833876.456|TERMINATE|rdkafka#producer-1| [thrd:main]: Internal main thread termination done
%7|1712833876.456|TERMINATE|rdkafka#producer-1| [thrd:app]: Destroying op queues
%7|1712833876.456|TERMINATE|rdkafka#producer-1| [thrd:app]: Destroying SSL CTX
%7|1712833876.456|TERMINATE|rdkafka#producer-1| [thrd:app]: Termination done: freeing resources
mateusjunges commented 7 months ago

Hey 👋

I can see my messages on upstash dashboard when I produce using the shouldFlush argument set to true.

image

I'm still not sure why tho

mateusjunges commented 6 months ago

Hey @nathandaly can you try to set the linger.ms producer configuration to 0? This should fix the issue

->withConfigOption('linger.ms', 0)
mateusjunges commented 6 months ago

closing due to lack of response