Open leyou240 opened 3 months ago
This will be an add-on business logic on top of the existing framework.
Internally we have systems on top of Pingora working that way.
Do you have a reference to a standard way of doing such thing or are you looking for guidance how to implement such custom logic.
This will be an add-on business logic on top of the existing framework.
Internally we have systems on top of Pingora working that way.
Do you have a reference to a standard way of doing such thing or are you looking for guidance how to implement such custom logic.
thx, I'm looking for guidance how to implement such custom logic.
take a look at this. https://gist.github.com/Object905/6cafd5e8e56dd60670149296411a407f
Since publishing this gist I've updated code to be more self-contained, removed dependency on crossbeam and made it work with kube-rs>0.92.0, because it changed store internals and old version was deadlocking/not discovering right away. Now this should be copy-pastable. So, updated the gist too right now.
It works on my production and also achieves zero downtime upgrades of services. As a bonus included DNS discovery that I made too.
Since publishing this gist I've updated code to be more self-contained, removed dependency on crossbeam and made it work with kube-rs>0.92.0, because it changed store internals and old version was deadlocking/not discovering right away. Now this should be copy-pastable. So, updated the gist too right now.
It works on my production and also achieves zero downtime upgrades of services. As a bonus included DNS discovery that I made too.
interesting! some question, does the dns discovery doesnt need the updater background service?
My use case for dns discovery doesn't account for short living dns entries (like coredns in kubernetes), yes. Setting LoadBalancer.update_frequency should be enough for most use cases when resolving "real" domains, hickory_resolver client does some dns caching inside based on real ttl of entries, so they're not actually re-queried every time when back ends are updated.
And it will be hard to achieve zero downtime with DNS anyway. That may be remedied by retrying when handling upstream errors, but that seems to be a bit flaky anyway.
@Object905 Thanks, this gist help me a lot!
interesting approach. I'm working on using the -u
upgrade feature to change the config (or the binary), rather than trying to keep a lot of dynamic config around in pingora.
From k8s, docker, or whatever, I generate a config file that pingora reads once, at startup. the config file has the resolved DNS names (for example)
Any drawbacks to this approach?
From k8s, docker, or whatever, I generate a config file that pingora reads once, at startup. the config file has the resolved DNS names (for example)
Any drawbacks to this approach?
Are you referring to pod or service IPs? Pod IPs are not stable
If pingora upgrade is seamless, then unstable Pod IPs are not a problem. Upgrade every few minutes.
But upgrade may not be as seamless as one would hope for (e.g. the HTTP cache doesn't get upgraded. I think, so effectively a flush at every pod IP change). That's why I asked.
Since publishing this gist I've updated code to be more self-contained, removed dependency on crossbeam and made it work with kube-rs>0.92.0, because it changed store internals and old version was deadlocking/not discovering right away. Now this should be copy-pastable. So, updated the gist too right now.
It works on my production and also achieves zero downtime upgrades of services. As a bonus included DNS discovery that I made too.
@Object905 Follow this gist, How to get kube_client
in non-async function ?
Since publishing this gist I've updated code to be more self-contained, removed dependency on crossbeam and made it work with kube-rs>0.92.0, because it changed store internals and old version was deadlocking/not discovering right away. Now this should be copy-pastable. So, updated the gist too right now. It works on my production and also achieves zero downtime upgrades of services. As a bonus included DNS discovery that I made too.
@Object905 Follow this gist, How to get
kube_client
in non-async function ?
You can use block_on()
(see the example).
As an example, I have the following code at the start of main()
to instantiate a client:
let runtime = Runtime::new().expect("Could not start runtime");
let client = runtime.block_on(async {
let client = Client::try_default()
.await
.expect("Could not create client");
let version = client
.apiserver_version()
.await
.expect("Could not get version")
.git_version;
info!("K8S: {version}");
client
});
@simonhammes The code as follow report error: watcher error: failed to perform initial object list: ServiceError: buffer's worker closed unexpectedly
fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt().init();
info!("Starting...");
let client = tokio::runtime::Runtime::new()
.unwrap()
.block_on(async { Client::try_default().await })
.expect("Failed to create kube client");
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async move {
let api = Api::<EndpointSlice>::namespaced(client, "default");
let (_reader, writer) = reflector::store();
let filter = Config::default().labels("app=nginx");
let watcher = runtime::watcher(api, filter);
let mut watcher_stream = watcher.default_backoff().reflect(writer).boxed();
loop {
info!("Waiting for next event...");
match watcher_stream.try_next().await {
Ok(data) => match data {
Some(e) => match e {
Event::Apply(k) => info!("Applied {}", k.name_any()),
Event::Delete(k) => info!("Deleted {}", k.name_any()),
Event::Init => info!("Init"),
Event::InitDone => info!("InitDone"),
Event::InitApply(k) => info!("InitApply {}", k.name_any()),
},
None => info!("no data"),
},
Err(e) => {
warn!("watcher error: {e}");
break;
}
}
}
});
Ok(())
}
@simonhammes The code as follow report error:
watcher error: failed to perform initial object list: ServiceError: buffer's worker closed unexpectedly
fn main() -> anyhow::Result<()> { tracing_subscriber::fmt().init(); info!("Starting..."); let client = tokio::runtime::Runtime::new() .unwrap() .block_on(async { Client::try_default().await }) .expect("Failed to create kube client"); tokio::runtime::Runtime::new() .unwrap() .block_on(async move { let api = Api::<EndpointSlice>::namespaced(client, "default"); let (_reader, writer) = reflector::store(); let filter = Config::default().labels("app=nginx"); let watcher = runtime::watcher(api, filter); let mut watcher_stream = watcher.default_backoff().reflect(writer).boxed(); loop { info!("Waiting for next event..."); match watcher_stream.try_next().await { Ok(data) => match data { Some(e) => match e { Event::Apply(k) => info!("Applied {}", k.name_any()), Event::Delete(k) => info!("Deleted {}", k.name_any()), Event::Init => info!("Init"), Event::InitDone => info!("InitDone"), Event::InitApply(k) => info!("InitApply {}", k.name_any()), }, None => info!("no data"), }, Err(e) => { warn!("watcher error: {e}"); break; } } } }); Ok(()) }
It's ok when in one runtime
fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt().init();
info!("Starting...");
let rt = tokio::runtime::Runtime::new().unwrap();
let client = rt
.block_on(async { Client::try_default().await })
.expect("Failed to create kube client");
rt.block_on(async move {
let api = Api::<EndpointSlice>::namespaced(client, "default");
let (_reader, writer) = reflector::store();
let filter = Config::default().labels("app=nginx");
let watcher = runtime::watcher(api, filter);
let mut watcher_stream = watcher.default_backoff().reflect(writer).boxed();
loop {
info!("Waiting for next event...");
match watcher_stream.try_next().await {
Ok(data) => match data {
Some(e) => match e {
Event::Apply(k) => info!("Applied {}", k.name_any()),
Event::Delete(k) => info!("Deleted {}", k.name_any()),
Event::Init => info!("Init"),
Event::InitDone => info!("InitDone"),
Event::InitApply(k) => info!("InitApply {}", k.name_any()),
},
None => info!("no data"),
},
Err(e) => {
warn!("watcher error: {e}");
break;
}
}
}
});
Ok(())
}
@shenshouer Sorry, I can't help you with that.
I'm using the code from https://gist.github.com/Object905/6cafd5e8e56dd60670149296411a407f#file-register-rs inside main()
and it works without issues.
@simonhammes The issue has been resolved. Thanks to @Object905 for providing the sample code at https://gist.github.com/Object905/6cafd5e8e56dd60670149296411a407f. Here is the modified example based on their code:
use std::sync::LazyLock;
use futures::{StreamExt, TryStreamExt};
use k8s_openapi::api::discovery::v1::EndpointSlice;
use kube::{
runtime::{
self,
reflector::{self},
watcher::{Config, Event},
WatchStreamExt,
},
Api, Client, ResourceExt,
};
use log::info;
use tracing::warn;
pub static SHARED_RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("Failed to create tokio shared runtime")
});
pub static KUBE: LazyLock<Option<Client>> = LazyLock::new(|| {
let config = kube::Config::incluster()
.ok()
.or(SHARED_RUNTIME.block_on(kube::Config::infer()).ok())?;
let _guard = SHARED_RUNTIME.enter();
let client = Client::try_from(config).ok()?;
Some(client)
});
fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt().init();
info!("Starting...");
let rt = tokio::runtime::Runtime::new().unwrap();
let client = KUBE.clone().expect("Failed to get kube client");
let api = Api::<EndpointSlice>::namespaced(client, "default");
let (_reader, writer) = reflector::store();
let filter = Config::default().labels("app=nginx");
let watcher = runtime::watcher(api, filter);
let mut watcher_stream = watcher.default_backoff().reflect(writer).boxed();
rt.block_on(async move {
loop {
info!("Waiting for next event...");
match watcher_stream.try_next().await {
Ok(data) => match data {
Some(e) => match e {
Event::Apply(k) => info!("Applied {}", k.name_any()),
Event::Delete(k) => info!("Deleted {}", k.name_any()),
Event::Init => info!("Init"),
Event::InitDone => info!("InitDone"),
Event::InitApply(k) => info!("InitApply {}", k.name_any()),
},
None => info!("no data"),
},
Err(e) => {
warn!("watcher error: {e}");
break;
}
}
}
});
Ok(())
}
What is the problem your feature solves, or the need it fulfills?
A clear and concise description of why this feature should be added. What is the problem? Who is this for? Our microservices are based on k8s, and will be dynamically expanded and reduced during daily operation and maintenance. Currently, static configuration is very inconvenient. We hope to support k8s or consul service discovery.
Describe the solution you'd like
What do you propose to resolve the problem or fulfill the need above? How would you like it to work?
Describe alternatives you've considered
What other solutions, features, or workarounds have you considered that might also solve the issue? What are the tradeoffs for these alternatives compared to what you're proposing?
Additional context
This could include references to documentation or papers, prior art, screenshots, or benchmark results.