RevolutionAnalytics / dplyrXdf

dplyr backend for Revolution Analytics xdf files
Other
39 stars 16 forks source link

Exact class comparisons cause tbl_xdf inputs to fail in Spark CC #36

Closed hongooi73 closed 5 years ago

hongooi73 commented 7 years ago

Specifically else if (class(data) %in% c("RxTextData", "RxXdfData", "RxParquetData", "RxOrcData"))

Full function source:

> RevoScaleR:::rxCheckSupportForDataSource
function (computeContext, jobType, data, isOutData = FALSE) 
{
    if (is.null(data)) {
        return(TRUE)
    }
    if (isOutData) {
        inout.datasource <- "output data source"
    }
    else {
        inout.datasource <- "input data source"
    }
    if (class(computeContext) == "RxHadoopMR") {
        if (inherits(data, "RxSparkData")) {
            msg <- paste(class(data), "is supported in RxSpark compute context only")
            stop(msg, call. = FALSE)
        }
    }
    else if (class(computeContext) == "RxSpark") {
        if (class(data) == "character") {
            stop(paste(data, "is considered as local file. Data source used in RxSpark compute context must be in hdfs file system."), 
                call. = FALSE)
        }
        else if (class(data) %in% c("RxTextData", "RxXdfData", 
            "RxParquetData", "RxOrcData")) {
            if (data@fileSystem$fileSystemType != "hdfs") {
                if (grepl("://", data@file)) {
                  message(class(data), "specifying a hdfs fileSystem is recommended")
                }
                else {
                  msg <- paste(class(data), "as", inout.datasource, 
                    "in RxSpark compute context must be in hdfs file system")
                  stop(msg, call. = FALSE)
                }
            }
            if (is(data, "RxXdfData") && isTRUE(isOutData) && 
                tolower(rxFileExtension(data@file)) == "xdf") {
                msg <- paste(data@file, "has extension '.xdf', which is considered as single XDF and not supported in RxHadoopMR and RxSpark compute context")
                stop(msg, call. = FALSE)
            }
            if (is(data, "RxXdfData") && isTRUE(isOutData) && 
                is.logical(data@createCompositeSet) && data@createCompositeSet == 
                FALSE) {
                stop("The `createCompositeSet` argument cannot be set to FALSE in RxHadoopMR and RxSpark compute context.", 
                  call. = FALSE)
            }
            if (!is(data, "RxParquetData") && !is(data, "RxOrcData")) {
                cc.nameNode <- computeContext@nameNode
                data.hostName <- data@fileSystem$hostName
                if (grepl("://", cc.nameNode) || grepl("://", 
                  data.hostName)) {
                  if (cc.nameNode != data.hostName) {
                    msg <- paste(class(data), "data source and RxSpark compute context have different hdfs (default/azure blob/azure data lake). data source:", 
                      cc.nameNode, ", compute context:", data.hostName)
                    stop(msg, call. = FALSE)
                  }
                }
            }
        }
        else if (class(data) == "RxHiveData") {
            if (isOutData && data@dfType == "hive") {
                msg <- paste("Cannot use RxHiveData with query as", 
                  inout.datasource, ". Please use RxHiveData with table.")
                stop(msg, call. = FALSE)
            }
        }
        else {
            msg <- paste(class(data), "as", inout.datasource, 
                "in RxSpark compute context is not supported")
            stop(msg, call. = FALSE)
        }
        if (inherits(data, "RxSparkData") && identical(jobType, 
            "hpc")) {
            stop("RxSparkData is not supported in HPC (rxExec) mode", 
                call. = FALSE)
        }
    }
    else {
        if (inherits(data, "RxSparkData")) {
            msg <- paste(class(data), "is supported in RxSpark compute context only")
            stop(msg, call. = FALSE)
        }
        return(TRUE)
    }
    TRUE
}
<environment: namespace:RevoScaleR>
hongooi73 commented 7 years ago

Directly modifying RevoScaleR:::rxCheckSupportForDataSource does not fix this:

# in Spark compute context

hd <- RxHdfsFileSystem()

# mtcars composite Xdf
mthc <- RxXdfData("/user/sshuser/mtcarsc", fileSystem=hd, createCompositeSet=TRUE)

# RxXdfData output works
mth2 <- RxXdfData("/user/sshuser/mtcarsc2", fileSystem=hd, createCompositeSet=TRUE)
rxDataStep(mthc, mth2)

rxHadoopRemoveDir("/user/sshuser/mtcarsc2")

# tbl_xdf output fails
mt_tbl <- as(mth2, "tbl_xdf")
rxDataStep(mthc, mt_tbl)  # fails

#Error in rxCompleteClusterJob(hpcServerJob, consoleOutput, autoCleanup) : 
  #No job results retrieved.
#In addition: 
#Warning message:
#In rxCompleteClusterJob(hpcServerJob, consoleOutput, autoCleanup) :

 #Unable to retrieve output object(s).
#One of the following may have occured:
   #Output may have already been retrieved and deleted
   #The cluster operating system has not flushed the output files to disk yet
   #Job may have failed.
   #Job may have not generated output objects.
hongooi73 commented 7 years ago

TFS item 78064

hongooi73 commented 7 years ago

Use as_xdf to wrap any tbl_xdf data sources