intel-analytics / ipex-llm

Accelerate local LLM inference and finetuning (LLaMA, Mistral, ChatGLM, Qwen, Mixtral, Gemma, Phi, MiniCPM, Qwen-VL, MiniCPM-V, etc.) on Intel XPU (e.g., local PC with iGPU and NPU, discrete GPU such as Arc, Flex and Max); seamlessly integrate with llama.cpp, Ollama, HuggingFace, LangChain, LlamaIndex, vLLM, GraphRAG, DeepSpeed, Axolotl, etc
Apache License 2.0
6.75k stars 1.27k forks source link

Predictions not working in yarn-cluster, ok in yarn-client #2747

Open vvbrzeski opened 5 years ago

vvbrzeski commented 5 years ago

Everything works fine w/ my model when running in yarn-client mode. However, when I run in yarn-cluster mode (the same exact code), all the predictions are the same identical Float value.

Here is a snippet of my code.

val predictor = Predictor(model, batchPerPartition = 1000)
val scores = predictor.predict(samples)

What am I missing?

vvbrzeski commented 5 years ago

FYI: It seems this only happens with batch_size (in optimizer) == num_executors * executor_cores. I have tried this with: a) num_exec = 50, num_cores = 4. batch_size = 200 ==> FAIL. batch_size = 400 or 600 ==> OK. b) num_exec = 40, num_cores = 6. batch_size = 240 ==> FAIL. batch_size = 480 ==> OK.

I train and score in the same job. Spark 2.3.1. BigDL 0.70.0.

    def score_model(sqlCtxt: SQLContext, model: Model, keys: RDD[(Long, Long, Double)], samples: RDD[Sample[Float]]) : DataFrame = {

    val scores = model.
      predict(samples).
      map(t => t.toTensor[Float]).
      // softmax computation; can remove if using Softmax in last layer
      map(t => {
        val z = t.exp().sum()
        t / z
      }).
      map(probs => probs.toArray()).
      // prob(Y=1), prob(Y=2)
      // conversion to Double for classifier metrics calculation
      map(arr => (arr(0).toDouble, arr(1).toDouble))

    import sqlCtxt.implicits._
    val pred = keys.
      zip(scores).
      map(x => (x._1._1, x._1._2, x._1._3, x._2._1, x._2._2)).
      toDF()
    pred

    }
    ....
    ....

    val (train_keys, train_samples) = data_to_samples(train_raw, 2, 3, N_FEATURES)
    val (validation_keys, validation_samples) = data_to_samples(validation_raw, 2, 3, N_FEATURES)

    train_keys.persist(StorageLevel.MEMORY_AND_DISK)
    train_samples.persist(StorageLevel.MEMORY_AND_DISK)
    validation_keys.persist(StorageLevel.MEMORY_AND_DISK)
    validation_samples.persist(StorageLevel.MEMORY_AND_DISK)

    // construct model graph
    val N_HIDDENS = Array (50, 30, 20)
    val N_CLASSES = 2
    val model = build_model(N_FEATURES, N_HIDDENS, N_CLASSES)

    // train the model
    val trained_model = train_model(model, train_samples, batch_size, learning_rate, n_epochs, output_path)

    // save model
    trained_model.saveModule(HDFS_NN+ "/" + output_path + "/model")

    // make predictions
    val pred_train_df = score_model(sqlContext, trained_model, train_keys, train_samples).
      persist(StorageLevel.MEMORY_AND_DISK)