dmlc / xgboost

Scalable, Portable and Distributed Gradient Boosting (GBDT, GBRT or GBM) Library, for Python, R, Java, Scala, C++ and more. Runs on single machine, Hadoop, Spark, Dask, Flink and DataFlow
https://xgboost.readthedocs.io/en/stable/
Apache License 2.0
26.15k stars 8.71k forks source link

XGBoost4J failed with SIGSEGV in multiple threads environment #7167

Open CatTail opened 3 years ago

CatTail commented 3 years ago

Hi Team, I'm using XGBoost4J to do online inference, however, I encounter an issue that when executing predict method in multiple thread environments, the native code will fail with SIGSEGV.

Runtime Environment: Ubuntu 20.04 with JDK 11

JVM Fatal Error Log Example 1: hs_err_730147.log JVM Fatal Error Log Example 2: hs_err_730976.log

This is the command I use to reproduce the issue, note you need to install ammonite to run the script

$ java -XX:+UseG1GC -XX:+HeapDumpOnOutOfMemoryError -XX:+UnlockDiagnosticVMOptions -XX:NativeMemoryTracking=summary -XX:HeapDumpPath=/tmp -XX:ErrorFile=/tmp/hs_err_%p.log -jar $(which amm) test.sc --modelPath xgboost.model

You can download the repro.zip file, uncompress it and run the above command to reproduce the issue.

For readability, I also paste the scala code that can reproduce the issue below

import java.io.{DataInputStream, FileInputStream}
import java.util.concurrent.{ExecutorService, ForkJoinPool, ForkJoinWorkerThread}

import scala.concurrent.{ExecutionContext, Future}

import $ivy.`ml.dmlc::xgboost4j:1.3.1`
import ml.dmlc.xgboost4j.scala.{DMatrix, XGBoost}
import ml.dmlc.xgboost4j.LabeledPoint

val Iteration = 100
val RowSize = 881
val FeatureSize = 109

implicit val ec =
  ExecutionContext.fromExecutor(new java.util.concurrent.ForkJoinPool(Runtime.getRuntime.availableProcessors() * 2))

@main
def main(modelPath: String): Unit = {
  val xgboost4jPredictor = XGBoost.loadModel(new java.io.FileInputStream(modelPath))
  val fis = new FileInputStream("features")
  val dis = new DataInputStream(fis)
  val arr = new Array[Float](RowSize * FeatureSize)
  var index = 0
  while (dis.available() > 0) {
    val feature = dis.readFloat()
    arr(index) = feature
    index = index + 1
  }
  val data = arr.grouped(FeatureSize).toSeq

  (1 to Iteration).foreach { _ =>
    timingFuture {
      val f1 = Future {
        val matrix = new DMatrix(data.map(new LabeledPoint(0.0f, FeatureSize, null, _)).toIterator)
        xgboost4jPredictor.setParam("nthread", Integer.valueOf(2))
        xgboost4jPredictor.predict(matrix)
        println("f1 done")
      }

      val f2 = Future {
        val matrix = new DMatrix(data.map(new LabeledPoint(0.0f, FeatureSize, null, _)).toIterator)
        xgboost4jPredictor.setParam("nthread", Integer.valueOf(2))
        xgboost4jPredictor.predict(matrix)
        println("f2 done")
      }
      val f3 = Future {
        val matrix = new DMatrix(data.map(new LabeledPoint(0.0f, FeatureSize, null, _)).toIterator)
        xgboost4jPredictor.setParam("nthread", Integer.valueOf(2))
        xgboost4jPredictor.predict(matrix)
        println("f3 done")
      }
      val f4 = Future {
        val matrix = new DMatrix(data.map(new LabeledPoint(0.0f, FeatureSize, null, _)).toIterator)
        xgboost4jPredictor.setParam("nthread", Integer.valueOf(2))
        xgboost4jPredictor.predict(matrix)
        println("f4 done")
      }
      Future.sequence(Seq(f1, f2, f3, f4))
    }

    Thread.sleep(1000)
  }
}

def timingFuture[T](block: => Future[T])(implicit executionContext: ExecutionContext): Future[T] = {
  val startTime = System.nanoTime()
  block.andThen {
    case ret =>
      val endTime = System.nanoTime()
      println(s"duration: ${((endTime - startTime) / 1e6d).toLong}")
      ret
  }
}

def forkJoinPool(parallelism: Int, threadPrefix: String): ExecutorService = {
  import ForkJoinPool.ForkJoinWorkerThreadFactory

  val factory = new ForkJoinWorkerThreadFactory {
    override def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = {
      val worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool)
      worker.setName(s"$threadPrefix-${worker.getPoolIndex}")
      worker
    }
  }

  new ForkJoinPool(parallelism, factory, null, true) // scalastyle:ignore null
}
trivialfis commented 3 years ago

@wbo4958 Is JDK 11 with scala being tested?

CatTail commented 3 years ago

@trivialfis actually I just run jdk 8 and I can reproduce the same issue. One more thing I can mention is I'm using scala 2.12, but I guess both Java and Scala version don't matter here, if you run the above script and take a look at the Linux coredump (not java heap dump), it seems the native code have some sort of thread safe issue, but since I'm not familiar with c++ codebase, so I'm not sure here, below are the snapshot of gdb core dump output

Core was generated by `java -XX:+UseG1GC -XX:+HeapDumpOnOutOfMemoryError -XX:+UnlockDiagnosticVMOption'.
Program terminated with signal SIGABRT, Aborted.
#0  __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:50
50      ../sysdeps/unix/sysv/linux/raise.c: No such file or directory.
[Current thread is 1 (Thread 0x7f7bca53c700 (LWP 733731))]
(gdb) where
#0  __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:50
#1  0x00007f7c28ebb859 in __GI_abort () at abort.c:79
#2  0x00007f7c28f263ee in __libc_message (action=action@entry=do_abort, fmt=fmt@entry=0x7f7c29050285 "%s\n")
    at ../sysdeps/posix/libc_fatal.c:155
#3  0x00007f7c28f2e47c in malloc_printerr (str=str@entry=0x7f7c290525d0 "free(): double free detected in tcache 2")
    at malloc.c:5347
#4  0x00007f7c28f300ed in _int_free (av=0x7f7bdc000020, p=0x7f7bdc003dd0, have_lock=0) at malloc.c:4201
#5  0x00007f7bca91c3b8 in std::vector<std::pair<std::string, std::string>, std::allocator<std::pair<std::string, std::string> > >::~vector() () from /tmp/libxgboost4j7120880720884674347.so
#6  0x00007f7bca971087 in xgboost::LearnerConfiguration::Configure() () from /tmp/libxgboost4j7120880720884674347.so
#7  0x00007f7bca957958 in xgboost::LearnerImpl::Predict(std::shared_ptr<xgboost::DMatrix>, bool, xgboost::HostDeviceVector<float>*, unsigned int, bool, bool, bool, bool, bool) () from /tmp/libxgboost4j7120880720884674347.so
#8  0x00007f7bca825235 in XGBoosterPredict () from /tmp/libxgboost4j7120880720884674347.so
#9  0x00007f7bca81868c in Java_ml_dmlc_xgboost4j_java_XGBoostJNI_XGBoosterPredict ()
   from /tmp/libxgboost4j7120880720884674347.so
trivialfis commented 3 years ago

Thanks for sharing, which XGBoost version are you using?

CatTail commented 3 years ago

I'm using XGBoost4J 1.3.1, the above reproduce script should have the ivy version there.

trivialfis commented 3 years ago
trivialfis commented 3 years ago

AFAIK JVM package still requires a sync during prediction: https://github.com/dmlc/xgboost/pull/7027

CatTail commented 3 years ago

But the safety is only guaranteed with prediction. If one tries to train a model in one thread and provide prediction at the other using the same model the behaviour is undefined

In the doc it says model training and prediction should be in the same thread, but in my case, I'm loading the model from disk and doing prediction in another thread (no training involved), is this have thread safety issue (note I'm using xgboost4j which have that synchronized keyword)

set_params is not thread safe.

Thanks, I'm also aware of this, do you know how I can fix this, will it fix the SIGSEGV issue if I use synchronized keyword to wrap the set_params calls? BTW I'm also aware there is a environment that can control the thread count OMP_NUM_THREADS, however, when I run export that environment variable it seems xgboost is not properly pick it up, is there anything I do wrong?

trivialfis commented 3 years ago

, but in my case, I'm loading the model from disk and doing prediction in another thread

Yes, but your example code has setParams, which is mentioned in the document.

if I use synchronized keyword to wrap the set_params calls?

Can you set the parameter before passing the model into threads? I think that might be easier and has lesser things to worry about.

CatTail commented 3 years ago

Can you set the parameter before passing the model into threads?

Yes, I can do that but that doesn't work, if I invoke setParams in one thread and execution predict in another thread, somehow during prediction xgboost still create more thread than the number I configured, I guess this is also caused by thread safety?

Maybe I should give more background on the issue I encounter, I'm running xgboost in k8s to do online inference. The pod is limited to use 2 CPUs, the host has 70 CPUs, however, it seems openmp is choosing 70 instead of 2 as max thread count, result in creating lot of threads, and causing xgboost prediction latency increase. I was to detect the issue by draw flamegraph when doing prediction.

image

I try to use setParams("nthread") to limit the number of thread openmp create, if I invoke setParams in a separate thread as prediction, it didn't work. It indeed work if I invoke setParams in the same thread as prediction but it eventually failed with SIGSEGV.

trivialfis commented 3 years ago

Thanks for sharing, I'm not familiar with k8s nor jvm package, but will try my best to answer the questions.

Could you please set the parameter right after loading the model and before creating the Future?

  val xgboost4jPredictor = XGBoost.loadModel(new java.io.FileInputStream(modelPath))
  xgboost4jPredictor.setParams("nthread", Integer.valueOf(2))

  ...

  val f1 = Future {
        val matrix = new DMatrix(data.map(new LabeledPoint(0.0f, FeatureSize, null, _)).toIterator)
        xgboost4jPredictor.predict(matrix)
        println("f1 done")
      }