opensearch-project / opensearch-spark

Spark Accelerator framework ; It enables secondary indices to remote data stores.
Apache License 2.0
12 stars 18 forks source link

[BUG] REPL handleSessionError does not update error when session exist #380

Open noCharger opened 2 weeks ago

noCharger commented 2 weeks ago

What is the bug? REPL handleSessionError does not update error when session exist. Only time gets updated.

https://github.com/opensearch-project/opensearch-spark/blob/main/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala#L434-L454

  def handleSessionError(
      e: Exception,
      applicationId: String,
      jobId: String,
      sessionId: String,
      jobStartTime: Long,
      flintSessionIndexUpdater: OpenSearchUpdater,
      osClient: OSClient,
      sessionIndex: String,
      sessionTimerContext: Timer.Context): Unit = {
    val error = s"Session error: ${e.getMessage}"
    CustomLogging.logError(error, e)

    val flintInstance = getExistingFlintInstance(osClient, sessionIndex, sessionId)
      .getOrElse(createFailedFlintInstance(applicationId, jobId, sessionId, jobStartTime, error))

    updateFlintInstance(flintInstance, flintSessionIndexUpdater, sessionId)
    if (flintInstance.state.equals("fail")) {
      recordSessionFailed(sessionTimerContext)
    }
  }

https://github.com/opensearch-project/opensearch-spark/blob/main/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala#L486-L495

 private def updateFlintInstance(
      flintInstance: FlintInstance,
      flintSessionIndexUpdater: OpenSearchUpdater,
      sessionId: String): Unit = {
    val currentTime = currentTimeProvider.currentEpochMillis()
    flintSessionIndexUpdater.upsert(
      sessionId,
      FlintInstance.serializeWithoutJobId(flintInstance, currentTime))
  }

How can one reproduce the bug? Create an unit test case, and verify if the expected exception message passed to update

    verify(mockFlintSessionIndexUpdater).upsert(
      eqTo(sessionId),
      argThat((session: String) =>
        session.contains("running") && session.contains(sessionException.getMessage)))

Test failed

Actual invocations have different arguments:
openSearchUpdater.upsert(
    "testSessionId",
    "{"state":"running","lastUpdateTime":1718151741998,"applicationId":"testAppId","error":"","sessionId":"testSessionId","excludeJobIds":"","jobStartTime":0,"type":"session"}"
);

What is the expected behavior?

The error message should be updated.