milvus-io / milvus

A cloud-native vector database, storage for next generation AI applications
https://milvus.io
Apache License 2.0
29.44k stars 2.82k forks source link

[Feature]: Support RocketMQ Option After Streaming Service Finished #33962

Open CocaineCong opened 3 months ago

CocaineCong commented 3 months ago

Is there an existing issue for this?

Is your feature request related to a problem? Please describe.

Support for more types of MQ, more diversity

Describe the solution you'd like.

Describe an alternate solution.

No response

Anything else? (Additional Context)


title: support rocketmq in milvus

authors: @CocaineCong

creation-date: 2024-06-19


Support RocketMQ Option & RocketMQ API Controller

Milvus currently supports message queue (MQ) options for tasks such as asynchronous data processing, real-time data synchronization, and event-driven architectures. However, the existing MQ options are limited, and users may require additional flexibility and compatibility with different messaging systems.

image

Summary

This proposal suggests extending the message queue options in Milvus to include support for RocketMQ, a distributed messaging and streaming platform developed by the Apache Software Foundation.

RocketMQ provides features like high throughput, fault tolerance, scalability, and strong consistency, making it a popular choice for real-time messaging and event-driven applications.

Motivation

Goals

Preliminary Design

In total, we need to implement the interfaces in these files.

image
// Client is the interface that provides operations of message queues
type Client interface {
    // CreateProducer creates a producer instance
    CreateProducer(options ProducerOptions) (Producer, error)
    // Subscribe creates a consumer instance and subscribe a topic
    Subscribe(options ConsumerOptions) (Consumer, error)
    // Get the earliest MessageID
    EarliestMessageID() MessageID
    // String to msg ID
    StringToMsgID(string) (MessageID, error)
    // Deserialize MessageId from a byte array
    BytesToMsgID([]byte) (MessageID, error)
    // Close the client and free associated resources
    Close()
}
type Consumer interface {
    // returns the subscription for the consumer
    Subscription() string
    // Get Message channel, once you chan you can not seek again
    Chan() <-chan Message
    // Seek to the uniqueID position, the second bool param indicates whether the message is included in the position
    Seek(MessageID, bool) error //nolint:govet
    // Ack make sure that msg is received
    Ack(Message)
    // Close consumer
    Close()
    // GetLatestMsgID return the latest message ID
    GetLatestMsgID() (MessageID, error)
    // check created topic whether vaild or not
    CheckTopicValid(channel string) error
}
type MessageID interface {
    // Serialize the message id into a sequence of bytes that can be stored somewhere else
    Serialize() []byte
    AtEarliestPosition() bool
    LessOrEqualThan(msgID []byte) (bool, error)
    Equal(msgID []byte) (bool, error)
}
type Message interface {
    // Topic get the topic from which this message originated from
    Topic() string
    // Properties are application defined key/value pairs that will be attached to the message.
    // Return the properties attached to the message.
    Properties() map[string]string
    // Payload get the payload of the message
    Payload() []byte
    // ID get the unique message ID associated with this message.
    // The message id can be used to univocally refer to a message without having the keep the entire payload in memory.
    ID() MessageID
}
type Producer interface {
    // return the topic which producer is publishing to
    // Topic() string
    // publish a message
    Send(ctx context.Context, message *ProducerMessage) (MessageID, error)
    Close()
}

Design Details

later…

CocaineCong commented 3 months ago

If this proposal passes, please assign it to me.🫡

xiaofan-luan commented 2 months ago

@chyezh could you helo on reviewing it?

chyezh commented 2 months ago

@CocaineCong In Milvus 2.4.5, we will introduce wal service instead of original mqwrapper interface. new design will be given at #33285.

And we want a more simple interface of wal instead of mqwrapper. Read + Append instead of Produce and Consume.

Recently, the interface definition PR #33745 will be merged. You can see and review the definition in README, https://github.com/milvus-io/milvus/pull/33745#issuecomment-2159721055. And then I will give a new PR as an example to reimplement pulsar mq into wal.

Could you help to modify rocketMQ proposal after those PR merged? rocketMQ can be implement as the wal interface directly, and to be published at v2.4.5.

chyezh commented 2 months ago

we need implement the following interface:

CocaineCong commented 2 months ago

@CocaineCong In Milvus 2.4.5, we will introduce wal service instead of original mqwrapper interface. new design will be given at #33285.

And we want a more simple interface of wal instead of mqwrapper. Read + Append instead of Produce and Consume.

Recently, the interface definition PR #33745 will be merged. You can see and review the definition in README, #33745 (comment). And then I will give a new PR as an example to reimplement pulsar mq into wal.

Could you help to modify rocketMQ proposal after those PR merged? rocketMQ can be implement as the wal interface directly, and to be published at v2.4.5.

ok, I got it. anything need my help just let me know.

chyezh commented 2 months ago

related PR is ready at master branch: #34046

You can design and implement wal services based on RocketMQ now. pulsar implementation is here, pkg/streaming/walimpls/impls/pulsar/.

If the PR pass the walimpls.NewWALImplsTestFramework unit test, we can start to review it. And please link your PR with issue #33285.

Moreover, ScannerImpls interface should be implemented as a scanner without any server-side persistent state, such as Pulsar's Reader interface to avoid resource leak. If RocketMQ does not support this, please let me know. I will design scanner's garbage collection plan in future.

/assign @CocaineCong

CocaineCong commented 2 months ago

related PR is ready at master branch: #34046

You can design and implement wal services based on RocketMQ now. pulsar implementation is here, pkg/streaming/walimpls/impls/pulsar/.

If the PR pass the walimpls.NewWALImplsTestFramework unit test, we can start to review it. And please link your PR with issue #33285.

Moreover, ScannerImpls interface should be implemented as a scanner without any server-side persistent state, such as Pulsar's Reader interface to avoid resource leak. If RocketMQ does not support this, please let me know. I will design scanner's garbage collection plan in future.

/assign @CocaineCong

got it.