risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
6.79k stars 562 forks source link

bug: pulsar source consumption unexpectedly stopped #10738

Open neverchanje opened 1 year ago

neverchanje commented 1 year ago

docker-compose.yml

---
version: "3"
services:
  risingwave:
    image: ghcr.io/risingwavelabs/risingwave:latest
    ports:
      - 4566:4566
      - 5691:5691
    command:
      - playground
    container_name: risingwave
  message_queue:
    image: "apachepulsar/pulsar:2.11.1"
    command: bin/pulsar standalone
    ports:
      - 8080:8080
      - 6650:6650
    hostname: message_queue
    container_name: message_queue
    stop_grace_period: 2s
name: risingwave-compose

The golang program:

package main

import (
    "context"
    // "crypto/rand"

    "flag"
    "fmt"
    "log"
    "time"

    "github.com/apache/pulsar-client-go/pulsar"
)

var (
    topic string
    count int
)

func init() {
    const (
        defaultTopic = "public/default/log"
        usage        = "the topic of pulsar"
    )
    flag.StringVar(&topic, "topic", defaultTopic, usage)
    flag.IntVar(&count, "count", 10, "how many messages to send")
}

func main() {
    flag.Parse()
    client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL:               "pulsar://localhost:6650",
        OperationTimeout:  30 * time.Second,
        ConnectionTimeout: 30 * time.Second,
    })
    if err != nil {
        log.Fatalf("Could not instantiate Pulsar client: %v", err)
    }
    defer client.Close()

    producer, err := client.CreateProducer(pulsar.ProducerOptions{
        Topic: topic,
    })
    if err != nil {
        panic(err)
    }
    defer producer.Close()

    for i := 0; i < count; i += 1000 {
        batch := 1000
        if count-i < 1000 {
            batch = count - i
        }
        produceBatchMessagesWithCount(producer, topic, batch)
        fmt.Printf("%v messages published\n", i+batch)
    }

    if err != nil {
        fmt.Println("Failed to publish message", err)
    }
    fmt.Println("Published message")
}

func produceBatchMessagesWithCount(producer pulsar.Producer, topic string, count int) {
    for i := 0; i < count; i++ {
        m := `{"app": "test", "data": "hi"}`

        producer.SendAsync(context.Background(), &pulsar.ProducerMessage{
            Payload: []byte(m),
        }, func(id pulsar.MessageID, producerMessage *pulsar.ProducerMessage, e error) {
            if e != nil {
                panic(e)
            }
        })
    }

    if err := producer.Flush(); err != nil {
        log.Panicf("Failed to Flush, error %v\n", err)
    }
}

Run the golang datagen via:

go mod init main
go mod tidy
go build
./main --count 1000000

After the data preparation, you can check if Pulsar has indeed ingested the expected amount of records via:

./bin/pulsar-admin topics stats public/default/log

Normally, it will show "msgInCounter" : 1000000 as below:

{
  "msgRateIn" : 0.0,
  "msgThroughputIn" : 0.0,
  "msgRateOut" : 0.0,
  "msgThroughputOut" : 0.0,
  "bytesInCounter" : 39052803,
  "msgInCounter" : 1000000,

However, the RisingWave table that I created only contains a small number of data, which varied every time I ran. Never had it ingested 1000000 records.

psql -h localhost -p 4566 -d dev -U root
psql (14.8 (Ubuntu 14.8-0ubuntu0.22.04.1), server 8.3.0)
Type "help" for help.

dev=> select count(*) from twitter;
 count
-------
 60859
(1 row)
NiuBlibing commented 10 months ago

I'm having the similar problem. I checked pulsar manager, the speed of message in and out is balance but the backlog is growing, and there is many unack messages.

github-actions[bot] commented 2 months ago

This issue has been open for 60 days with no activity.

If you think it is still relevant today, and needs to be done in the near future, you can comment to update the status, or just manually remove the no-issue-activity label.

You can also confidently close this issue as not planned to keep our backlog clean. Don't worry if you think the issue is still valuable to continue in the future. It's searchable and can be reopened when it's time. 😄