njaard / sonnerie

A simple timeseries database
Other
266 stars 19 forks source link

Out of memory error when compacting #24

Closed ronniec95 closed 1 year ago

ronniec95 commented 1 year ago

If I send ~250 different keys, with 1500 entries each. This is across multiple http PUT requests. I tried to then compact

ronnie@Casa:/mnt/d/muCapital$ sonnerie -d database compact
thread 'main' panicked at 'compacting: IOError(Os { code: 12, kind: OutOfMemory, message: "Cannot allocate memory" })', /home/ronnie/.cargo/registry/src/index.crates.io-6f17d22bba15001f/sonnerie-0.8.4/src/main.rs:148:10
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

If I send through a significantly smaller number of keys compact works but it's severely rate limiting. The same crash occurs whether minor or major compacting

njaard commented 1 year ago

Could it be that you ran out of file descriptors?

On linux, ulimit -n for me outputs "1024", which may be more than you can compact, and the number you need to compact is the number of files in your database directory.

Try increasing the limit?

ronniec95 commented 1 year ago

I think a better way to fix it is to limit the DatabaseReader to max ~8000 files at a time. You've disabled branching and pull requests so the fix is posted below for you to incorporate as you wish.

I've just made a loop around the compaction method which is safe as compaction is transactional.

diff --git a/src/database_reader.rs b/src/database_reader.rs
index 68417d4..4ff7357 100644
--- a/src/database_reader.rs
+++ b/src/database_reader.rs
@@ -18,6 +18,10 @@ use regex::Regex;
 #[cfg(feature = "by-key")]
 use crate::bykey::DatabaseKeyReader;

+/// To prevent file handles running out on certain systems (eg. WSL on windows)
+/// set a maximum number of files to have open at once
+const MAX_FILES_TO_COMPACT: usize = 8192;
+
 /// Read a database in key-timestamp sorted format.
 ///
 /// Open a database with [`new`](#method.new) and then [`get`](#method.get),
@@ -93,7 +97,7 @@ impl DatabaseReader {
            // we add 1 because we'd reserve the 0 for the main database,
            // regardless of whether `include_main_db` or not
            .map(|(txid, p)| (txid + 1, p));
-       for (txid, p) in iter {
+       for (txid, p) in iter.take(MAX_FILES_TO_COMPACT) {
            let mut f = File::open(&p)?;
            let len = f.seek(std::io::SeekFrom::End(0))? as usize;
            if len == 0 {
@@ -116,6 +120,12 @@ impl DatabaseReader {
        })
    }

+   /// Number of tx files found in this iteration
+   /// This reduces the likelihood of file ulimit errors
+   pub fn num_txes(&self) -> usize {
+       self.txes.len()
+   }
+
    /// Get the filenames of each transaction.
    ///
    /// This is useful for compacting, because after
diff --git a/src/main.rs b/src/main.rs
index f59e704..6644c1b 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -375,83 +375,93 @@ fn compact(
    let lock = File::create(dir.join(".compact"))?;
    lock.lock_exclusive()?;

-   let db = if major {
-       DatabaseReader::new(dir)?
-   } else {
-       DatabaseReader::without_main_db(dir)?
-   };
-   let db = std::sync::Arc::new(db);
-
-   let mut compacted = CreateTx::new(dir)?;
-
-   if let Some(gegnum) = gegnum {
-       let mut child = std::process::Command::new("/bin/sh")
-           .arg("-c")
-           .arg(gegnum)
-           .stdin(std::process::Stdio::piped())
-           .stdout(std::process::Stdio::piped())
-           .spawn()
-           .expect("unable to run --gegnum process");
-
-       let childinput = child.stdin.take().expect("process had no stdin");
-       let mut childinput = std::io::BufWriter::new(childinput);
-
-       let ts_format_cloned = ts_format.map(|m| m.to_owned());
-
-       // a thread that reads from "db" and writes to the child
-       let reader_db = db.clone();
-       let reader_thread = std::thread::spawn(move || -> std::io::Result<()> {
-           let timestamp_format = if let Some(ts_format) = &ts_format_cloned {
-               formatted::PrintTimestamp::FormatString(ts_format)
-           } else {
-               formatted::PrintTimestamp::Nanos
-           };
+   // We loop to ensure we've processed all the .tx files
+   // As we know the compaction is atomic there is no downside
+   // to processing a (somewhat) large group at a time
+   loop {
+       let db = if major {
+           DatabaseReader::new(dir)?
+       } else {
+           DatabaseReader::without_main_db(dir)?
+       };

-           let reader = reader_db.get_range(..);
-           for record in reader {
-               formatted::print_record(
-                   &record,
-                   &mut childinput,
-                   timestamp_format,
-                   formatted::PrintRecordFormat::Yes,
-               )?;
-               writeln!(&mut childinput)?;
-           }
-           Ok(())
-       });
-
-       let childoutput = child.stdout.take().expect("process had no stdout");
-       let mut childoutput = std::io::BufReader::new(childoutput);
-       formatted::add_from_stream_with_fmt(&mut compacted, &mut childoutput, ts_format)?;
-
-       reader_thread
-           .join()
-           .expect("failed to join subprocess writing thread")
-           .expect("child writer failed");
-       let result = child.wait()?;
-       if !result.success() {
-           panic!("child process failed: cancelling compact");
+       eprintln!("processing {} .txes", db.num_txes());
+
+       if db.num_txes() == 0 {
+           break;
        }
-   } else {
-       {
-           let ps = db.transaction_paths();
-           if ps.len() == 1 && ps[0].file_name().expect("filename") == "main" {
-               eprintln!("nothing to do");
-               return Ok(());
+       let db = std::sync::Arc::new(db);
+
+       let mut compacted = CreateTx::new(dir)?;
+
+       if let Some(gegnum) = gegnum {
+           let mut child = std::process::Command::new("/bin/sh")
+               .arg("-c")
+               .arg(gegnum)
+               .stdin(std::process::Stdio::piped())
+               .stdout(std::process::Stdio::piped())
+               .spawn()
+               .expect("unable to run --gegnum process");
+
+           let childinput = child.stdin.take().expect("process had no stdin");
+           let mut childinput = std::io::BufWriter::new(childinput);
+
+           let ts_format_cloned = ts_format.map(|m| m.to_owned());
+
+           // a thread that reads from "db" and writes to the child
+           let reader_db = db.clone();
+           let reader_thread = std::thread::spawn(move || -> std::io::Result<()> {
+               let timestamp_format = if let Some(ts_format) = &ts_format_cloned {
+                   formatted::PrintTimestamp::FormatString(ts_format)
+               } else {
+                   formatted::PrintTimestamp::Nanos
+               };
+
+               let reader = reader_db.get_range(..);
+               for record in reader {
+                   formatted::print_record(
+                       &record,
+                       &mut childinput,
+                       timestamp_format,
+                       formatted::PrintRecordFormat::Yes,
+                   )?;
+                   writeln!(&mut childinput)?;
+               }
+               Ok(())
+           });
+
+           let childoutput = child.stdout.take().expect("process had no stdout");
+           let mut childoutput = std::io::BufReader::new(childoutput);
+           formatted::add_from_stream_with_fmt(&mut compacted, &mut childoutput, ts_format)?;
+
+           reader_thread
+               .join()
+               .expect("failed to join subprocess writing thread")
+               .expect("child writer failed");
+           let result = child.wait()?;
+           if !result.success() {
+               panic!("child process failed: cancelling compact");
            }
+       } else {
+           {
+               let ps = db.transaction_paths();
+               if ps.len() == 1 && ps[0].file_name().expect("filename") == "main" {
+                   eprintln!("nothing to do");
+                   return Ok(());
+               }
+           }
+           // create the new transaction after opening the database reader
+           let reader = db.get_range(..);
+           let mut n = 0u64;
+           for record in reader {
+               compacted.add_record_raw(record.key(), record.format(), record.raw())?;
+               n += 1;
+           }
+           eprintln!("compacted {} records", n);
        }
-       // create the new transaction after opening the database reader
-       let reader = db.get_range(..);
-       let mut n = 0u64;
-       for record in reader {
-           compacted.add_record_raw(record.key(), record.format(), record.raw())?;
-           n += 1;
-       }
-       eprintln!("compacted {} records", n);
-   }
-
-   sonnerie::_purge_compacted_files(compacted, dir, &db, major).expect("failure compacting");

+       sonnerie::_purge_compacted_files(compacted, dir, &db, major).expect("failure compacting");
+   }
    Ok(())

}

njaard commented 1 year ago

I have applied your code with some changes. Thank you!

By the way, normally, you would fork a repository and then you can create pull requests.