streamnative / pulsar-rs

Rust Client library for Apache Pulsar
Other
369 stars 121 forks source link

Support HTTP service discovery #163

Open SilverBut opened 3 years ago

SilverBut commented 3 years ago

Like the Golang client apache/pulsar-client-go#504, maybe we can considering add a HTTP service discovery.

It's not hard, I have a quick prototype running on my local server based on pulsar-rs. Should I open a PR? ;)

Geal commented 3 years ago

sure, good idea :)

18601673727 commented 2 years ago

Like the Golang client apache/pulsar-client-go#504, maybe we can considering add a HTTP service discovery.

It's not hard, I have a quick prototype running on my local server based on pulsar-rs. Should I open a PR? ;)

Big brother, where's your PR? Online waiting, need it very urgently!

Geal commented 2 years ago

Well you can do it instead if you need it that urgently, it should not take too much time to get something working

18601673727 commented 2 years ago

Well you can do it instead if you need it that urgently, it should not take too much time to get something working

Thanks for the quick reply, i forked and modified, but got this error somehow, any hints?

   Compiling test_pulsar v0.1.0 (C:\workspace\repos\test_pulsar)
    Finished dev [unoptimized + debuginfo] target(s) in 5.16s
     Running `target\debug\test_pulsar.exe`
2021-11-20 03:53:20 TRACE mio::poll:registering event source with poller: token=Token(0), interests=READABLE | WRITABLE    
2021-11-20 03:53:20 DEBUG pulsar::connection_manager:ConnectionManager::connect(BrokerAddress { url: Url { scheme: "http", cannot_be_a_base: false, username: "", password: None, host: Some(Domain("mysecretpulsar.com")), port: Some(8080), path: "/", query: None, fragment: None }, broker_url: "mysecretpulsar.com:8080", proxy: false })
2021-11-20 03:53:20 DEBUG pulsar::connection:Connecting to http://mysecretpulsar.com:8080/: 123.123.123.123:8080
2021-11-20 03:53:20 TRACE mio::poll:registering event source with poller: token=Token(1), interests=READABLE | WRITABLE
2021-11-20 03:53:20 TRACE pulsar::connection:connection message: Message { command: BaseCommand { r#type: Connect, connect: Some(CommandConnect { client_version: "2.0.1-incubating", 
auth_method: None, auth_method_name: Some("token"), auth_data: Some([100, 121, 74, 114, 90, 88, 108, 74, 90, 67, 73, 54, 73, 110, 66, 49, 98, 72, 78, 104, 99, 105, 49, 54, 99, 72, 70, 107, 99, 72, 70, 108, 89, 109, 56, 53, 90, 84, 77, 105, 76, 67, 74, 104, 98, 71, 99, 105, 79, 105, 74, 73, 85, 122, 73, 49, 78, 105, 74, 57, 46, 101, 121, 74, 122, 100, 87, 73, 105, 79, 105, 74, 119, 100, 87, 120, 122, 89, 88, 73, 116, 101, 110, 66, 120, 90, 72, 66, 120, 90, 87, 74, 118, 79, 87, 85, 122, 88, 51, 66, 115, 89, 88, 108, 108, 99, 105, 74, 57, 46, 
49, 53, 68, 101, 57, 70, 66, 88, 95, 14, 78, 88, 72, 105, 72, 79, 45, 49, 36, 89, 86, 71, 87, 113, 68, 57, 79, 109, 118, 77, 67, 85, 112, 83, 51, 99, 97, 100, 45, 82, 99, 97, 56]), protocol_version: Some(12), proxy_to_broker_url: None, original_principal: None, original_auth_data: None, original_auth_method: None, feature_flags: None }), connected: None, subscribe: None, producer: None, send: None, send_receipt: None, send_error: None, message: None, ack: None, flow: None, unsubscribe: None, success: None, error: None, close_producer: None, close_consumer: None, producer_success: None, ping: None, pong: None, redeliver_unacknowledged_messages: None, partition_metadata: None, partition_metadata_response: None, lookup_topic: None, lookup_topic_response: None, consumer_stats: None, consumer_stats_response: None, reached_end_of_topic: None, seek: None, get_last_message_id: None, get_last_message_id_response: None, active_consumer_change: None, get_topics_of_namespace: None, get_topics_of_namespace_response: None, get_schema: None, get_schema_response: None, auth_challenge: None, auth_response: None, ack_response: None, get_or_create_schema: None, get_or_create_schema_response: None, new_txn: None, new_txn_response: None, add_partition_to_txn: None, add_partition_to_txn_response: None, add_subscription_to_txn: None, add_subscription_to_txn_response: None, end_txn: None, end_txn_response: None, end_txn_on_partition: None, end_txn_on_partition_response: None, end_txn_on_subscription: None, end_txn_on_subscription_response: None }, payload: None }
2021-11-20 03:53:20 TRACE pulsar::message:Encoder sending 196 bytes
2021-11-20 03:53:20 TRACE tokio_util::codec::framed_impl:flushing framed transport
2021-11-20 03:53:20 TRACE tokio_util::codec::framed_impl:writing; remaining=196
2021-11-20 03:53:20 TRACE tokio_util::codec::framed_impl:framed transport flushed
2021-11-20 03:53:20 TRACE tokio_util::codec::framed_impl:attempting to decode a frame    
2021-11-20 03:53:20 TRACE pulsar::message:Decoder received 321 bytes
2021-11-20 03:53:20 TRACE pulsar::message:Decoder received 321 bytes
2021-11-20 03:53:20 TRACE tokio_util::codec::framed_impl:Got an error, going to errored state
2021-11-20 03:53:20 TRACE mio::poll:deregistering event source from poller
thread 'main' panicked at 'Unable to build pulsar client!: Connection(Io(Custom { kind: Other, error: "bytes remaining on stream" }))', src\main.rs:119:96
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
2021-11-20 03:53:20 TRACE mio::poll:deregistering event source from poller
error: process didn't exit successfully: `target\debug\test_pulsar.exe` (exit code: 101)

Testing side code:

let addr = "http://mysecretpulsar.com:8080";
let auth = Authentication { 
    name: "token".to_string(), 
    data: std::env::var("PULSAR_TOKEN").expect("Unable to get PULSAR_TOKEN!").into_bytes()
};
let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).with_auth(auth).build().await.expect("Unable to build pulsar client!");
let mut producer = pulsar
    .producer()
    .with_topic("non-persistent://pulsar-zpqdpqebo333/default/test")
    .with_name("player")
    .with_options(producer::ProducerOptions {
        schema: Some(proto::Schema {
            r#type: proto::schema::Type::String as i32,
            ..Default::default()
        }),
        ..Default::default()
    })
    .build()
    .await
    .expect("Unable to build pulsar producer!");
caibirdme commented 2 years ago

Any update for this feature?

yswtrue commented 2 years ago

any progress here?