Stratio / Spark-MongoDB

Spark library for easy MongoDB access
http://www.stratio.com
Apache License 2.0
307 stars 99 forks source link

Cannot supply samplingRatio when creating a DataFrame #111

Open glytching opened 8 years ago

glytching commented 8 years ago

Thanks very much for the spark-mongodb connector, much appreciated.

I'm having an issue when creating a DataFrame from a MongoDB collection.

The elapsed time for creating the DataFrame is 2 -3 seconds, in this scenario:

Walking the code I can see that:

MongodbSchema(new MongodbRDD(sqlContext, config, rddPartitioner), config.get[Any](MongodbConfig.SamplingRatio).fold(MongodbConfig.DefaultSamplingRatio)(_.toString.toDouble)).schema()  

Am I correct in the above diagnosis? If so, what can be done about it? If not, how can I realiably provide my own sampling ratio?

Any help gratefully accepted.

Version details etc:

Here's a test case showing the behaviour in action;

  @Test
  public void canReproduceCallerSuppliedSamplingRatioIssue() {
    SparkConf sparkConf = new SparkConf()
        .setAppName("ProvidingASamplingRatio")
        .setMaster("local[*]");

    JavaSparkContext sc = new JavaSparkContext(sparkConf);

    SQLContext sqlContext = new SQLContext(sc);

    Double samplingRatio = 0.1;
    long start = System.currentTimeMillis();
    DataFrame countriesSlow = sqlContext
        .read()
        .format("com.stratio.datasource.mongodb")
        .options(getOptions("Scratch", "countries", samplingRatio))
        .load();
    // the preceding call takes 2 - 3 *seconds*
    System.out.println(String.format("Loaded countries without a schema in: %sms using caller supplied sampling ratio: %s",
        (System.currentTimeMillis() - start), samplingRatio));
    countriesSlow.show(5);

    StructType countriesSchema = DataTypes.createStructType(Arrays.asList(
        DataTypes.createStructField("code", DataTypes.StringType, true),
        DataTypes.createStructField("region", DataTypes.StringType, true),
        DataTypes.createStructField("name", DataTypes.StringType, true)
    ));

    start = System.currentTimeMillis();
    DataFrame countriesFast = sqlContext
        .read()
        .format("com.stratio.datasource.mongodb")
        .options(getOptions("Scratch", "countries", null))
        .schema(countriesSchema)
        .load();
    // the preceding call takes 2 - 3 *millis*
    System.out.println(String.format("Loaded countries with a schema in: %sms using default sampling ratio",
        (System.currentTimeMillis() - start)));
    countriesFast.show(5);
  }

  private Map getOptions(String databaseName, String collectionName, Double samplingRatio) {
    Map options = new HashMap();
    // see MongodbConfig
    options.put("host", "...");
    options.put("credentials", "...");
    options.put("database", databaseName);
    options.put("collection", collectionName);
    if (samplingRatio != null) {
      options.put("schema_samplingRatio", samplingRatio);
    }
    return options;
  }
pmadrigal commented 8 years ago

Hi @colmmcdonnell ,

You are right, there is a problem with this only when we go through DataFrameReader API. There is a PR #112 solving it.

You can use fromMongodb method instead of DataframeReader. See our first steps.

Anyway, you are using an old version 0.8.7, in the next weeks we are going to release 0.11.2 version.

Thanks for your feedback!

glytching commented 8 years ago

Hi @pmadrigal

Thanks for your response. A few replies:

Rgds

pmadrigal commented 8 years ago

@colmmcdonnell

import org.apache.spark.sql.types.StructType;
import scala.*;
import scala.collection.JavaConverters;
import com.stratio.datasource.mongodb.MongodbContext;
import com.stratio.datasource.mongodb.config.MongodbConfigBuilder;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

public class TestJavaAPI {

    public static void main(String[] args){

        JavaSparkContext sc = new JavaSparkContext("local[2]", "test spark-mongodb java");
        SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

        List<String> javaHost = new ArrayList<String>();
        javaHost.add("localhost:27017");

        scala.collection.immutable.List<String> host = JavaConverters.asScalaBufferConverter(javaHost)
                .asScala().toList();

        HashMap<String, Object> javaOptions = new HashMap<String, Object>();
        javaOptions.put("host", host);
        javaOptions.put("database", "highschool");
        javaOptions.put("collection", "students");

        Option<StructType> schema = scala.Option.apply(null);

        scala.collection.immutable.Map<String, Object> options = JavaConverters.mapAsScalaMapConverter(javaOptions)
                .asScala().toMap(Predef.<Tuple2<String, Object>>conforms());

        MongodbConfigBuilder builder = new MongodbConfigBuilder(options);
        MongodbContext mc = new MongodbContext(sqlContext);
        DataFrame df = mc.fromMongoDB(builder.build(), schema);
        df.registerTempTable("students");
        sqlContext.sql("SELECT * FROM students"); df.show();

    }
}

For using this library with Java, some Scala types are needed and as you can see, code is less clean. We highly recommend Scala!

Hope help you!

glytching commented 8 years ago

Hi @pmadrigal

Thanks for your response, I can now invoke fromMongoDB from Java (and I am using v0.11.1) but ... the original issue I had was that this call: sqlContext.read().format("com.stratio.datasource.mongodb").options(...).load() was taking 2 - 3 seconds on a tiny collection. I had thought that this was because I was unable to supply a sampleRatio, however using the code you have provided I can now create the DataFrame using fromMongoDB and I can provide a sampleRatio (I walked through the code and confirmed that the sameplRation was used) but the elapsed time for the fromMongoDB call is still 2 - 3 seconds for this tiny collection.

I am seeing the same unexpected long duration whether I use fromMongoDB or sqlContext.read().format("com.stratio.datasource.mongodb").options(...).load(), providing a sampleRatio seems to make no difference. The only thing which reduces this elapsed time to a manageable number is supplying a schema.

Have I misunderstood the meaning/purpose of sampleRatio? Is there any way I can load a DataFrame quickly without supplying a schema? I know that "quickly" is subjective so to give a concrete example; in my test I am reading from a collection with 250 small documents (average document size is 71 bytes), I would hope to be able to create a DataFrame for this collection (whether using the DataFrame API or fromMongoDB) in < 5ms, this is easily achieved if I supply a schema so I am wondering if there is some way to achieve the same performance (or at least to achieve sub second performance) without providing a schema?

Thanks again for your help, much appreciated.

Rgds

Test case showing that the fromMongoDB call shows no improvement even with a small sample ratio:

  @Test
  public void canReproduceCallerSuppliedSamplingRatioIssue() {
    SparkConf sparkConf = new SparkConf()
        .setAppName("ProvidingASamplingRatio")
        .setMaster("local[*]");

    JavaSparkContext sc = new JavaSparkContext(sparkConf);

    SQLContext sqlContext = new SQLContext(sc);

    List<String> javaHost = new ArrayList<>();
    javaHost.add("...");

    List<MongodbCredentials> javaCredentials = new ArrayList<>();
    javaCredentials.add(new MongodbCredentials(...));

    scala.collection.immutable.List<String> host = JavaConverters.asScalaBufferConverter(javaHost)
        .asScala().toList();
    scala.collection.immutable.List<MongodbCredentials> credentials = JavaConverters.asScalaBufferConverter(javaCredentials)
        .asScala().toList();

    HashMap<String, Object> javaOptions = new HashMap<>();
    javaOptions.put("host", host);
    javaOptions.put("credentials", credentials);
    javaOptions.put("database", "Scratch");
    javaOptions.put("collection", "countries");
    double samplingRatio = 0.1;
    javaOptions.put("schema_samplingRatio", samplingRatio);

    Option<StructType> schema = scala.Option.apply(null);
    scala.collection.immutable.Map<String, Object> options = JavaConverters.mapAsScalaMapConverter(javaOptions)
        .asScala().toMap(Predef.<Tuple2<String, Object>>conforms());

    MongodbConfigBuilder builder = new MongodbConfigBuilder(options);
    MongodbContext mc = new MongodbContext(sqlContext);

    long start = System.currentTimeMillis();
    DataFrame countriesSlow = mc.fromMongoDB(builder.build(), schema);
    // the preceding call takes >3 *seconds*
    System.out.println(String.format("Created dataframe for countries without a schema in: %sms using caller supplied sampling ratio: %s",
        (System.currentTimeMillis() - start), samplingRatio));
    countriesSlow.show(5);

    schema = scala.Option.apply(DataTypes.createStructType(Arrays.asList(
        DataTypes.createStructField("code", DataTypes.StringType, true),
        DataTypes.createStructField("region", DataTypes.StringType, true),
        DataTypes.createStructField("name", DataTypes.StringType, true)
    )));

    start = System.currentTimeMillis();
    DataFrame countriesFast = mc.fromMongoDB(builder.build(), schema);
    // the preceding call takes 1 - 2 *millis*
    System.out.println(String.format("Created dataframe for countries with a schema in: %sms",
        (System.currentTimeMillis() - start)));
    countriesFast.show(5);
  }
pmadrigal commented 8 years ago

Hi @colmmcdonnell ,

As you know, by supplying schema we avoid to infer it and the time that it takes.

To create a DataFrame, a schema is needed. To infer it, we need to get data from Mongo, create an RDD, iterate over each record getting the partial schema of each one, and choosing a final schema valid for all the records. All this process is taking the time that you see in your example.

On the other side, SamplingRatio is a config property that allow us to scan only a part of the collection when we infer it schema. If collection is small, like in your case, it won't be much difference reducing ratio, but with a big collection the time will be considerably reduced.

Note that with SamplingRatio set to 1.0, you ensure that the schema is correct, because we have scanned all the collection.

Hope I have clarified the question.

Thanks for your feedback!

glytching commented 8 years ago

Hi @pmadrigal

Thanks for your help, I think I have enough detail to take it from here.

Rgds