actix / actix-web

Actix Web is a powerful, pragmatic, and extremely fast web framework for Rust.
https://actix.rs
Apache License 2.0
21.6k stars 1.67k forks source link

Memory leakage #1198

Closed kkonevets closed 4 years ago

kkonevets commented 4 years ago

I have a simple web service, which starts with 2Gb RAM usage. Over a day it consumes about 7Gb. Traffic is about 1 request/minute. The handle that proceed a request does not keep any State between calls. I do not use unsafe code. Htop lists that all memory is due to actix-web generated threads. Is it possible that this issue will not happen in actix-web 2.0?

I use actix-web = "1.0.9" actix-http = "0.2.4"

extern crate embed;

use actix_web::middleware::Logger;
use actix_web::{web, App, FromRequest, HttpResponse, HttpServer};
use embed::get_environment;
use embed::messages::{Info, InfoGet};
use lmdb::{Database, Result, RwTransaction, Transaction, WriteFlags};
use std::convert::TryInto;
use std::path::Path;

// TODO: write tests

static DB_PATH: &'static str = "../data/graph_db";

fn is_sorted_strict<T>(data: &[T]) -> bool
where
    T: Ord,
{
    data.windows(2).all(|w| w[0] < w[1])
}

/// Merge/Delete js with/from is and return the result, avoid self loop with ix.
/// Asuming is and js are sorted ascending without duplicates.
fn sorted_merge<I, J>(ix: u32, iit: &mut I, jit: &mut J, delete: bool) -> Vec<u8>
where
    I: Iterator<Item = u32>,
    J: Iterator<Item = u32>,
{
    let mut result = vec![];
    let mut i = iit.next();
    let mut j = jit.next();

    loop {
        match (i, j) {
            (Some(i_val), Some(j_val)) => {
                if i_val < j_val {
                    result.extend_from_slice(&i_val.to_ne_bytes());
                    i = iit.next();
                } else if i_val > j_val {
                    if !delete && ix != j_val {
                        result.extend_from_slice(&j_val.to_ne_bytes());
                    }
                    j = jit.next();
                } else {
                    if !delete {
                        result.extend_from_slice(&j_val.to_ne_bytes());
                    }
                    i = iit.next();
                    j = jit.next();
                }
            }
            (Some(i_val), None) => {
                result.extend_from_slice(&i_val.to_ne_bytes());
                i = iit.next();
            }
            (None, Some(j_val)) => {
                if delete {
                    break;
                }
                if ix != j_val {
                    result.extend_from_slice(&j_val.to_ne_bytes());
                }
                j = jit.next();
            }
            (None, None) => break,
        }
    }

    result
}

fn merge(
    db: Database,
    txn: &mut RwTransaction,
    w_flags: WriteFlags,
    i: u32,
    js: &[u32],
    delete: bool,
) -> Result<()> {
    let i_key = i.to_ne_bytes();

    let blob: Vec<_> = match &txn.get(db, &i_key) {
        Ok(blob) => {
            let iit = &mut blob
                .chunks_exact(std::mem::size_of::<u32>())
                .map(|chunk| u32::from_ne_bytes(chunk.try_into().unwrap()));
            let jit = &mut js.iter().map(|&j| j);
            sorted_merge(i, iit, jit, delete)
        }
        Err(_) => vec![],
    };

    // put back the blob
    txn.put(db, &i_key, &blob, w_flags)
}

fn index_merge((info, delete): (web::Json<Info>, bool)) -> HttpResponse {
    // TODO: reduce subgraph in (info.keys, info.values) to smallest possible
    let env = match get_environment(Path::new(DB_PATH), false, true) {
        Ok(env) => env,
        Err(e) => return HttpResponse::InternalServerError().body(format!("{}", e)),
    };
    let db = match env.open_db(Some(info.db_name.as_str())) {
        Ok(db) => db,
        Err(e) => return HttpResponse::BadRequest().body(format!("{}: {}", info.db_name, e)),
    };
    let mut txn = match env.begin_rw_txn() {
        Ok(txn) => txn,
        Err(e) => return HttpResponse::InternalServerError().body(format!("{}", e)),
    };
    let w_flags = WriteFlags::empty();

    if info.keys.len() != info.values.len() {
        return HttpResponse::BadRequest().body(format!("keys length != values length\n"));
    }

    for (&i, js) in info.keys.iter().zip(&info.values) {
        if !is_sorted_strict(js) {
            return HttpResponse::BadRequest().body(format!(
                "values should be sorted ascending and have no duplicates"
            ));
        }

        // merging js to i
        if let Err(e) = merge(db, &mut txn, w_flags, i, js, delete) {
            return HttpResponse::InternalServerError().body(format!("key {}: {}", i, e));
        }

        // going the other way - merging i to js
        for &j in js {
            if let Err(e) = merge(db, &mut txn, w_flags, j, &[i], delete) {
                return HttpResponse::InternalServerError().body(format!("key {}: {}", j, e));
            }
        }
    }

    txn.commit().unwrap();
    HttpResponse::Ok().body(format!("Success\n"))
}

fn index_put(info: web::Json<Info>) -> HttpResponse {
    index_merge((info, false))
}

fn index_delete(info: web::Json<Info>) -> HttpResponse {
    index_merge((info, true))
}

pub fn main() {
    // std::env::set_var("RUST_LOG", "actix_web=info");
    env_logger::init();

    HttpServer::new(move || {
        App::new()
            .route("/put", web::post().to(index_put))
            .data(web::Json::<Info>::configure(|cfg| {
                cfg.limit(2usize.pow(30))
            }))
            .route("/delete", web::post().to(index_delete))
            .data(web::Json::<Info>::configure(|cfg| {
                cfg.limit(2usize.pow(30))
            }))
            .wrap(Logger::default())
    })
    .bind("0.0.0.0:8088")
    .unwrap()
    .run()
    .unwrap();
}
fafhrd91 commented 4 years ago

could you publish your code

kkonevets commented 4 years ago

@fafhrd91 https://github.com/kkonevets/play_rs

fafhrd91 commented 4 years ago

how to reproduce the problem?

kkonevets commented 4 years ago

First of all you need a big traffic coming in, not sure how to provide that for you. Than you need:

  1. create lmdb database with bin/to_embeded_db()
  2. and make lots of post requests like these, possibly with different json content. Values field could have 40K -80K integer values
curl -sd '{
    "db_name":"vk",
    "keys": [2344, 45356],
    "values": [[449, 6225], [2354245,568,3224,566]]
}' -H "Content-Type: application/json" -X POST http://localhost:8088/put

curl -sd '{
    "db_name":"vk",
    "keys": [2344],
    "values": [[449, 6225]]
}' -H "Content-Type: application/json" -X POST http://localhost:8088/delete
kkonevets commented 4 years ago

https://github.com/kkonevets/play_rs By the way, GET query is not needed to reproduce the behaviour, pasted in case debugging needed.

kkonevets commented 4 years ago

the problem was with lmdb C API not with actix-web