jonhoo / flurry

A port of Java's ConcurrentHashMap to Rust
Apache License 2.0
518 stars 47 forks source link

Memory usage in Flurry Hashmap #115

Open ss025 opened 1 year ago

ss025 commented 1 year ago

Hi I am trying to use flurry hashmap in one of my projects and want to understand memory usage and garbage collection.

I am trying a simple actix web appplication with flurry hashmap by inserting same key/value again and again. Few observation are

  1. On first add memory usage ~15GB while it is 6.7GB with std hashmap behind the lock
  2. Calling add -> clear -> add repeatedly on same key/values eventually leads to OOM error and application is killed.

Output of program

curl --location --request GET '127.0.0.1:8080/add'
count 50331648! from thread ThreadId(18) - stats Length: 50331648, Capacity: 50331648, Memory: 15.7 GiB, Virtual: 17.3 GiB

curl --location --request GET '127.0.0.1:8080/clear'
count 0! from thread ThreadId(19) - stats Length: 0, Capacity: 0, Memory: 15.7 GiB, Virtual: 17.3 GiB

curl --location --request GET '127.0.0.1:8080/add'
count 50331648! from thread ThreadId(20) - stats Length: 50331648, Capacity: 50331648, Memory: 29.1 GiB, Virtual: 30.6 GiB

curl --location --request GET '127.0.0.1:8080/clear'
count 0! from thread ThreadId(21) - stats Length: 0, Capacity: 0, Memory: 29.1 GiB, Virtual: 30.6 GiB

curl --location --request GET '127.0.0.1:8080/add'
curl: (52) Empty reply from server

main.rs

use actix_web::web::Data;
use actix_web::{get, web, App, HttpServer};
use std::sync::Mutex;

use bytesize::ByteSize;
use sysinfo::{get_current_pid, ProcessExt, ProcessRefreshKind, RefreshKind, System, SystemExt};

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let data = web::Data::new(AppState {
        data: flurry::HashMap::<String, String>::new(),
        sys: Mutex::new(System::new_with_specifics(
            RefreshKind::new().with_processes(ProcessRefreshKind::new()),
        )),
    });

    HttpServer::new(move || {
        App::new()
            .app_data(data.clone())
            .service(add)
            .service(clear)
            .service(stats)
    })
    .bind(("127.0.0.1", 8080))?
    .run()
    .await
}

struct AppState {
    data: flurry::HashMap<String, String>,
    sys: Mutex<System>,
}

#[get("/stats")]
async fn stats(data: web::Data<AppState>) -> String {
    stats_2(data)
}

fn stats_2(data: Data<AppState>) -> String {
    let pid = get_current_pid().unwrap();
    let mut sys = data.sys.lock().unwrap();
    sys.refresh_process(pid);

    let proc = sys.process(pid).unwrap();
    let map = &data.data;
    let string = format!(
        "Length: {}, Capacity: {}, Memory: {}, Virtual: {}\n",
        map.len(),
        map.len(),
        ByteSize::b(proc.memory()).to_string_as(true),
        ByteSize::b(proc.virtual_memory()).to_string_as(true)
    );

    string
}

#[get("/add")]
async fn add(data: web::Data<AppState>) -> String {
    let size;
    {
        let max_entries = 100663296 as u64;
        let m = &data.data;
        for i in 0..max_entries / 2 {
            m.pin().insert(format!("str-{i}"), format!("str-{i}-{i}"));
        }

        size = m.len();
    }
    let stats1 = stats_2(data);
    format!(
        "count {size}! from thread {:?} - stats {stats1}\n",
        std::thread::current().id()
    )
}

#[get("/clear")]
async fn clear(data: web::Data<AppState>) -> String {
    let size;
    {
        let m = &data.data;
        m.pin().clear();
        // unsafe { malloc_trim(0) };
        size = m.len();
    }

    let stats1 = stats_2(data);
    format!(
        "count {size}! from thread {:?} - stats {stats1}\n",
        std::thread::current().id()
    )
}

Cargo.toml

[package]
name = "skiptest"
version = "0.1.0"
edition = "2021"

[dependencies]
actix-web = "4"

flurry = "0.4.0"

sysinfo = "0.28.4"
bytesize = "1.2.0"
jonhoo commented 1 year ago

Huh, that's really interesting indeed, and suggests that the memory reclamation isn't kicking in at some point here. @JackThomson2 if you're looking for a project to get you back into more depths of flurry, this may be a good candidate!

JackThomson2 commented 1 year ago

Hi Jon, sorry been away for a while. This looks an interesting problem, and does seem something to do with the reclamation not running for some reason. I'll definitely take a look at this, I'll update in this issue here what I find. Thanks ss025 for the example really helpful to get going.

So I had a little bit of time yesterday, here's what I've found so far:

  1. Manually dropping fleize down to the original 0.2.1 didn't change the behaviour at all same issue
  2. Managing the guards and calling flush() manually didn't help either (interestingly actually increase memory usage)
  3. We can actually observe memory increase if we call the /add endpoint 2x, whereas I believe this should be the same
  4. Calling .remove() in a loop gives the same behaviour as .clear()

Been doing some more investigating, this doesn't appear to happen in a simple console app so I believe it's something to do with the way actix is using the map. I'll keep digging

I'll keep you posted on what I find next

Update:

Sorry for the lack of progress here, struggling to get much time.

Been investigating the possibility of heap fragmentation. Using Jemallocator does seem to resolve the issue of calling /add repeatedly. However the /clear to /add does still seem to be causing the memory to grow. I'll keeo you guys posted.

One thing I'm investigating at the moment is how it performs in a serial console app, rather than in Actix, in this console app it appears to be behaving properly without the memory increases. So I'm looking into if it's how actix interacts with Flurry.

More notes from my investigation

I'm going to try another async webserver and see if it's async related

ss025 commented 1 year ago

@JackThomson2 would you be able to find anything on above issue ?

JackThomson2 commented 1 year ago

Hey sorry I didn't update here, been pretty busy at work.

So a couple of new findings:

My suspicion is that its something to do with how actix uses multiple single threaded runtimes, but I haven't been able to investigate further yet. I'll keep you posted as I find more

ss025 commented 1 year ago

Thanks, @JackThomson2 for finding. My original problem started with hyper. I am not sure if there is an issue with Hyper or not. Let me try that

JackThomson2 commented 1 year ago

Thanks for taking a look, when you test with hyper can you use a custom allocator such as jemalloc so we can try eliminate heap fragmentation being a potential cause

ss025 commented 1 year ago

Sure @JackThomson2 . WIll you be able to share the Axum example code you tried so I can compare it with hyper on the same machine and settings?

JackThomson2 commented 1 year ago

Sure this is the code I was using

use lazy_static::lazy_static;

use axum::{response::Html, routing::get, Router};
use std::net::SocketAddr;
use std::sync::Mutex;
use bytesize::ByteSize;
use sysinfo::{get_current_pid, ProcessExt, ProcessRefreshKind, RefreshKind, System, SystemExt};
use tikv_jemallocator::Jemalloc;

#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;

lazy_static! {
    static ref APP_DATA: AppState = AppState::new();
}

struct AppState {
    data: flurry::HashMap<String, String>,
    sys: Mutex<System>,
}

impl AppState {
    pub fn new() -> Self {
        AppState {
            data: flurry::HashMap::<String, String>::with_capacity(100663296),
            sys: Mutex::new(System::new_with_specifics(
                RefreshKind::new().with_processes(ProcessRefreshKind::new()),
            )),
        }
    }
}

fn stats_2(data: &AppState) -> String {
    let pid = get_current_pid().unwrap();
    let mut sys = data.sys.lock().unwrap();
    sys.refresh_process(pid);

    let proc = sys.process(pid).unwrap();

    let string = {
        let map = &data.data.pin();
        format!(
            "Length: {}, Capacity: {}, Memory: {}, Virtual: {}\n",
            map.len(),
            map.len(),
            ByteSize::b(proc.memory()).to_string_as(true),
            ByteSize::b(proc.virtual_memory()).to_string_as(true)
        )
    };

    string
}

#[tokio::main]
async fn main() {
    // build our application with a route
    let app = Router::new()
        .route("/add", get(add_handler))
        .route("/clear", get(clear_handler))
        .route("/", get(handler));

    // run it
    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
    println!("listening on {}", addr);
    axum::Server::bind(&addr)
        .serve(app.into_make_service())
        .await
        .unwrap();
}

async fn handler() -> Html<&'static str> {
    Html("<h1>Hello, World!</h1>")
}

async fn add_handler() -> String {
    let size;
    let init_size;
    {
        let max_entries = 100663296 as u64;
        let m = &APP_DATA.data;
        init_size = m.len();
        let adding = m.pin();
        for i in 0..max_entries / 2 {
            adding.insert(format!("str-{i}"), format!("str-{i}-{i}"));
        }
        size = m.len();
    }
    let stats1 = stats_2(&APP_DATA);
    format!(
        "initial count {init_size}, count {size}! from thread {:?} - stats {stats1}\n",
        std::thread::current().id()
    )
}

async fn clear_handler() -> String {
    let size;
    let init_size;
    {
        let m = &APP_DATA.data;
        init_size = m.len();
        m.pin().clear();
        // unsafe { malloc_trim(0) };
        size = m.len();
    }

    let stats1 = stats_2(&APP_DATA);
    format!(
        "initial count {init_size}, count {size}! from thread {:?} - stats {stats1}\n",
        std::thread::current().id()
    )
}
ss025 commented 1 year ago

@JackThomson2 what output is coming for add -> clear -> add -> clear on the above snippet for you ?

JackThomson2 commented 1 year ago

I don't exact numbers, but what I found was memory increased the first 2 cycles then eventually dropped and stayed consistent

JackThomson2 commented 1 year ago

So some exciting news. I've made a little breakthrough and figured out the exact location of our issue.

It appears to be with the delayed allocations in the shield / waiting for it to drop. I was able to change the line here: https://github.com/jonhoo/flurry/blob/main/src/reclaim.rs#L152 to be: self.collector().unwrap().retire(shared.ptr, seize::reclaim::boxed::<T>); Calling retire directly against the collector is a way to bypass the use of the TLS implementation and directly drop if possible. With this change we see consistent memory usage throughout write / clear cycles!

I will now dig further into the implementation of Seize to figure out how we're seeing this issue. But I feel we're close to a fix for this! Thanks again @ss025 for flagging this up.

ibraheemdev commented 1 year ago

Just saw this issue. If you need any help or guidance regarding seize please let me know!

JackThomson2 commented 1 year ago

Hey @ibraheemdev thank you I may send you and email shortly feel like I'm going in circles here. I think the self.collector().unwrap().retire(shared.ptr, seize::reclaim::boxed::<T>); was a red herring. I think now the issue is the .clear() implementation. When I do a .remove() in a loop memory is fine. Later on today I'll fire an email over with everything I've found. Thanks

JackThomson2 commented 1 year ago

So I've come full circle here. I think the issue is actually just memory fragmentation. I switched to Jemalloc and hooked into their API to retrieve actual bytes allocated and it's consistent between clear() and remove() so it seems with the huge amounts allocated the allocator can't keep up. I tried manually calling tikv_jemalloc_ctl::epoch::advance().unwrap(); to try force a de-fragmentation with no luck.

The question is why the remove() loop is friendlier on the allocator, I tried re-writing the clear function to look more like the remove function. I managed to get it pretty close to the dellocation order but still ended up with the heap fragmentation.

I think the real solution may be too implement the preloading of nodes and reusing them as suggested by @ibraheemdev here: https://github.com/jonhoo/flurry/issues/80#issuecomment-1082309088

I've been having a play with implementing this to see how it affects the memory profile (at the moment it's very hacky and a lot slower but looking at memory usage)

ibraheemdev commented 1 year ago

So I had some time to work on this, and I implemented some memory optimizations on an experimental branch. It cuts the node memory overhead in half by moving batches to a separate allocation instead of the linked list. crossbeam-epoch implements this batches this way as well, and I think it's a good tradeoff in general because the allocation is still amortized to once per-batch. If either of you want to run your tests against the branch and report any findings here that would be helpful.

ibraheemdev commented 4 months ago

The seize improvements are released in 0.3.3 (https://github.com/jonhoo/flurry/pull/123). The memory overhead per object is now just one word (vs. four). However, I'm not sure if this will fix the issue because the additional memory is still incurred after retiring and it's unclear what the actual issue is here.