lucidworks / spark-solr

Tools for reading data from Solr as a Spark RDD and indexing objects from Spark into Solr using SolrJ.
Apache License 2.0
445 stars 250 forks source link
solr spark

= Lucidworks Spark/Solr Integration :toc: :toclevels: 4 :toc-placement!: :toc-title:

This project includes tools for reading data from Solr as a Spark DataFrame/RDD and indexing objects from Spark into Solr using SolrJ.

toc::[]

//tag::version-compatibility[] == Version Compatibility

The spark-solr project has several releases, each of which support different versions of Spark and Solr. The compatibility chart below shows the versions supported across the past releases. 'Connector' refers to the 'spark-solr' library

[width="40%",frame="topbot",options="header,footer"] |====================== |Connector | Spark | Solr |https://search.maven.org/artifact/com.lucidworks.spark/spark-solr/4.0.2/jar[4.0.2] | 3.1.2 | 8.11.0 |https://search.maven.org/artifact/com.lucidworks.spark/spark-solr/4.0.1/jar[4.0.1] | 3.1.2 | 8.11.0 |https://search.maven.org/artifact/com.lucidworks.spark/spark-solr/4.0.0/jar[4.0.0] | 3.1.2 | 8.9.0 |https://search.maven.org/artifact/com.lucidworks.spark/spark-solr/3.10.0/jar[3.10.0] | 2.4.5 | 8.9.0 |https://search.maven.org/artifact/com.lucidworks.spark/spark-solr/3.9.0/jar[3.9.0] | 2.4.5 | 8.3.0 |https://search.maven.org/artifact/com.lucidworks.spark/spark-solr/3.8.0/jar[3.8.1] | 2.4.5 | 8.3.0 |https://search.maven.org/artifact/com.lucidworks.spark/spark-solr/3.8.0/jar[3.8.0] | 2.4.5 | 8.3.0 |https://search.maven.org/artifact/com.lucidworks.spark/spark-solr/3.7.0/jar[3.7.0] | 2.4.4 | 8.3.0 |https://search.maven.org/artifact/com.lucidworks.spark/spark-solr/3.6.6/jar[3.6.6] | 2.4.3 | 8.2.0 |https://search.maven.org/artifact/com.lucidworks.spark/spark-solr/3.6.0/jar[3.6.0] | 2.4.0 | 7.5.0 |https://search.maven.org/artifact/com.lucidworks.spark/spark-solr/3.5.8/jar[3.5.19] | 2.3.2 | 7.6.0 |https://search.maven.org/artifact/com.lucidworks.spark/spark-solr/3.4.5/jar[3.4.5] | 2.2.1 | 7.2.1 |https://search.maven.org/artifact/com.lucidworks.spark/spark-solr/3.3.4/jar[3.3.4] | 2.2.0 | 7.1.0 |https://search.maven.org/artifact/com.lucidworks.spark/spark-solr/3.2.2/jar[3.2.2] | 2.2.0 | 6.6.1 |https://search.maven.org/artifact/com.lucidworks.spark/spark-solr/3.1.1/jar[3.1.1] | 2.1.1 | 6.6.0 |https://search.maven.org/artifact/com.lucidworks.spark/spark-solr/3.0.4/jar[3.0.4] | 2.1.1 | 6.5.1 |https://search.maven.org/artifact/com.lucidworks.spark/spark-solr/2.4.0/jar[2.4.0] | 1.6.3 | 6.4.2 |https://search.maven.org/artifact/com.lucidworks.spark/spark-solr/2.3.4/jar[2.3.4] | 1.6.3 | 6.3.0 |https://search.maven.org/artifact/com.lucidworks.spark/spark-solr/2.2.3/jar[2.2.3] | 1.6.2 | 6.1.0 |https://search.maven.org/artifact/com.lucidworks.spark/spark-solr/2.1.0/jar[2.1.0] | 1.6.2 | 6.1.0 |https://search.maven.org/artifact/com.lucidworks.spark/spark-solr/2.0.4/jar[2.0.4] | 1.6.1 | 5.5.2 |======================

//tag::getting-started[] == Getting started

=== Import jar File via spark-shell

[source] cd $SPARK_HOME ./bin/spark-shell --jars spark-solr-{version}-shaded.jar

The shaded jar can be downloaded from the http://search.maven.org/#search%7Cgav%7C1%7Cg%3A%22com.lucidworks.spark%22%20AND%20a%3A%22spark-solr%22[Maven Central] or built from the respective branch

=== Connect to your SolrCloud Instance

==== via DataFrame

[source,scala] val options = Map( "collection" -> "{solr_collection_name}", "zkhost" -> "{zk_connect_string}" ) val df = spark.read.format("solr") .options(options) .load

==== via RDD

[source,scala] import com.lucidworks.spark.rdd.SelectSolrRDD val solrRDD = new SelectSolrRDD(zkHost, collectionName, sc)

SelectSolrRDD is an RDD of https://lucene.apache.org/solr/api/solr-solrj/org/apache/solr/common/SolrDocument.html[SolrDocument]

==== via RDD (Java)

[source,java]

import com.lucidworks.spark.rdd.SolrJavaRDD; import org.apache.spark.api.java.JavaRDD;

SolrJavaRDD solrRDD = SolrJavaRDD.get(zkHost, collection, jsc.sc()); JavaRDD resultsRDD = solrRDD.queryShards(solrQuery);

//end::getting-started[]

//tag::build[] == Download/Build the jar Files

=== Maven Central

The released jar files (1.1.2, 2.0.0, etc..) can be downloaded from the http://search.maven.org/#search%7Cgav%7C1%7Cg%3A%22com.lucidworks.spark%22%20AND%20a%3A%22spark-solr%22[Maven Central repository]. Maven Central also holds the shaded, sources, and javadoc .jars for each release.

[source]

com.lucidworks.spark spark-solr {latestVersion}

==== Snapshots

Snapshots of spark-solr are built for every commit on master branch. The snapshots can be accessed from https://oss.sonatype.org/content/repositories/snapshots/com/lucidworks/spark/spark-solr/[OSS Sonatype].

//tag::build-source[] === Build from Source

[source] mvn clean package -DskipTests

This will build 2 jars in the target directory:

${VERSION} will be something like 3.5.6-SNAPSHOT, for development builds.

The first .jar is what you'd want to use if you were using spark-solr in your own project. The second is what you'd use to submit one of the included example apps to Spark. //end::build-source[] //end::build[]

//tag::features[] == Features

//end::features[]

//tag::querying[] == Querying

=== Cursors

https://lucene.apache.org/solr/guide/pagination-of-results.html[Cursors] are used by default to pull documents out of Solr. By default, the number of tasks allocated will be the number of shards available for the collection.

If your Spark cluster has more available executor slots than the number of shards, then you can increase parallelism when reading from Solr by splitting each shard into sub ranges using a split field. A good candidate for the split field is the version field that is attached to every document by the shard leader during indexing. See <> section to enable and configure intra shard splitting.

Cursors won't work if the index changes during the query time. Constrain your query to a static index by using additional Solr parameters using <<solr-params,solr.params>>.

=== Streaming API (/export)

If the fields that are being queried have https://lucene.apache.org/solr/guide/docvalues.html[docValues] enabled, then the Streaming API can be used to pull documents from Solr in a true Streaming fashion. This method is 8-10x faster than Cursors. The option <> allows you to enable Streaming API via DataFrame.

//end::querying[]

//tag::indexing[] == Indexing

Objects can be sent to Solr via Spark Streaming or DataFrames. The schema is inferred from the DataFrame and any fields that do not exist in Solr schema will be added via Schema API. See https://lucene.apache.org/solr/guide/schema-factory-definition-in-solrconfig.html[ManagedIndexSchemaFactory].

See <> for configuration and tuning.

//end::indexing[]

//tag::spark-examples[] == Examples

==== link:docs/examples/twitter.adoc[Indexing and Querying Twitter data]

==== link:docs/examples/csv.adoc[Indexing and Querying NYC yellow taxi CSV data]

//end::spark-examples[]

//tag::spark-devdocs[] //tag::tuning[] == Configuration and Tuning

The Solr DataSource supports a number of optional parameters that allow you to optimize performance when reading data from Solr. The only required parameters for the DataSource are zkhost and collection.

=== Query Parameters

==== query

Probably the most obvious option is to specify a Solr query that limits the rows you want to load into Spark. For instance, if we only wanted to load documents that mention "solr", we would do:

Usage: option("query","body_t:solr")

Default: \*:*

If you don't specify the "query" option, then all rows are read using the "match all documents" query (\*:*).

==== fields

You can use the fields option to specify a subset of fields to retrieve for each document in your results:

Usage: option("fields","id,author_s,favorited_b,...")

By default, all stored fields for each document are pulled back from Solr.

You can also specify an alias for a field using Solr's field alias syntax, e.g. author:author_s. If you want to invoke a function query, such as rord(), then you'll need to provide an alias, e.g. ord_user:ord(user_id). If the return type of the function query is something other than int or long, then you'll need to specify the return type after the function query, such as: foo:div(sum(x,100),max(y,1)):double

TIP: If you request Solr function queries, then the library must use the /select handler to make the request as exporting function queries through /export is not supported by Solr.

==== filters

You can use the filters option to set filter queries on Solr query:

Usage: option("filters","firstName:Sam,lastName:Powell")

==== rows

You can use the rows option to specify the number of rows to retrieve from Solr per request; do not confuse this with max_rows (see below). Behind the scenes, the implementation uses either deep paging cursors or Streaming API and response streaming, so it is usually safe to specify a large number of rows.

To be clear, this is not the maximum number of rows to read from Solr. All matching rows on the backend are read. The rows parameter is the page size.

By default, the implementation uses 1000 rows but if your documents are smaller, you can increase this to 10000. Using too large a value can put pressure on the Solr JVM's garbage collector.

Usage: option("rows","10000") Default: 1000

==== max_rows

Limits the result set to a maximum number of rows; only applies when using the /select handler. The library will issue the query from a single task and let Solr do the distributed query processing. In addition, no paging is performed, i.e. the rows param is set to max_rows when querying. Consequently, this option should not be used for large max_rows values, rather you should just retrieve all rows using multiple Spark tasks and then re-sort with Spark if needed.

Usage: option("max_rows", "100") Defalut: None

==== request_handler

Set the Solr request handler for queries. This option can be used to export results from Solr via /export handler which streams data out of Solr. See https://lucene.apache.org/solr/guide/exporting-result-sets.html[Exporting Result Sets] for more information.

The /export handler needs fields to be explicitly specified. Please use the fields option or specify the fields in the query.

Usage: option("request_handler", "/export") Default: /select

==== Increase Read Parallelism using Intra-Shard Splits

If your Spark cluster has more available executor slots than the number of shards, then you can increase parallelism when reading from Solr by splitting each shard into sub ranges using a split field. The sub range splitting enables faster fetching from Solr by increasing the number of tasks in Solr. This should only be used if there are enough computing resources in the Spark cluster.

Shard splitting is disabled by default.

===== splits

Enable shard splitting on default field \_version_.

Usage: option("splits", "true")

Default: false

The above option is equivalent to option("split_field", "\_version_")

===== split_field

The field to split on can be changed using split_field option.

Usage: option("split_field", "id") Default: \_version_

===== splits_per_shard

Behind the scenes, the DataSource implementation tries to split the shard into evenly sized splits using filter queries. You can also split on a string-based keyword field but it should have sufficient variance in the values to allow for creating enough splits to be useful. In other words, if your Spark cluster can handle 10 splits per shard, but there are only 3 unique values in a keyword field, then you will only get 3 splits.

Keep in mind that this is only a hint to the split calculator and you may end up with a slightly different number of splits than what was requested.

Usage: option("splits_per_shard", "30")

Default: 1

==== flatten_multivalued

This option is enabled by default and flattens multi valued fields from Solr.

Usage: option("flatten_multivalued", "false")

Default: true

==== dv

The dv option will fetch the docValues that are indexed but not stored by using function queries. Should be used for Solr versions lower than 5.5.0.

Usage: option("dv", "true")

Default: false

==== skip_non_dv

The skip_non_dv option instructs the solr datasource to skip all fields that are not docValues.

Usage: option("skip_non_dv", "true")

Default: false

==== sample_seed

The sample_seed option allows you to read a random sample of documents from Solr using the specified seed. This option can be useful if you just need to explore the data before performing operations on the full result set. By default, if this option is provided, a 10% sample size is read from Solr, but you can use the sample_pct option to control the sample size.

Usage: option("sample_seed", "5150")

Default: None

==== sample_pct

The sample_pct option allows you to set the size of a random sample of documents from Solr; use a value between 0 and 1.

Usage: option("sample_pct", "0.05")

Default: 0.1

==== solr.params

The solr.params option can be used to specify any arbitrary Solr parameters in the form of a Solr query.

TIP: Don't use this to pass parameters that are covered by other options, such as fl (use the fields option) or sort. This option is strictly intended for parameters that are NOT covered by other options.

Usage: option("solr.params", "fq=userId:[10 TO 1000]")

=== Index parameters

==== soft_commit_secs

If specified, the soft_commit_secs option will be set via SolrConfig API during indexing

Usage: option("soft_commit_secs", "10")

Default: None

==== commit_within

The commit_within param sets commitWithin on the indexing requests processed by SolrClient. This value should be in milliseconds. See https://lucene.apache.org/solr/guide/6_6/updatehandlers-in-solrconfig.html#UpdateHandlersinSolrConfig-commitWithin[commitWithin]

Usage: option("commit_within", "5000")

Default: None

==== batch_size

The batch_size option determines the number of documents that are sent to Solr via a HTTP call during indexing. Set this option higher if the docs are small and memory is available.

Usage: option("batch_size", "10000")

Default: 500

==== gen_uniq_key

If the documents are missing the unique key (derived from Solr schema), then the gen_uniq_key option will generate a unique value for each document before indexing to Solr. Instead of this option, the http://lucene.apache.org/solr/api/solr-core/org/apache/solr/update/processor/UUIDUpdateProcessorFactory.html[UUIDUpdateProcessorFactory] can be used to generate UUID values for documents that are missing the unique key field

Usage: option("gen_uniq_key", "true")

Default: false

==== solr_field_types

This option can used to specify field type for fields written to Solr. Only works if the field names are not already defined in Solr schema

Usage: option("solr_field_types", "rating:string,title:text_en"

=== Querying Time Series Data

==== partition_by

Set this option as time, in order to query mutiple time series collections, partitioned according to some time period

Usage: option("partition_by", "time")

Default:none

==== time_period

This is of the form X DAYS/HOURS/MINUTES.This should be the time period with which the partitions are created.

Usage: option("time_period", "1MINUTES")

Default: 1DAYS

==== datetime_pattern

This pattern can be inferred from time_period. But this option can be used to explicitly specify.

Usage: option("datetime_pattern", "yyyy_MM_dd_HH_mm")

Default: yyyy_MM_dd

==== timestamp_field_name

This option is used to specify the field name in the indexed documents where time stamp is found.

Usage: option("timestamp_field_name", "ts")

Default: timestamp_tdt

==== timezone_id

Used to specify the timezone.

Usage: option("timezone_id", "IST")

Default: UTC

==== max_active_partitions

This option is used to specify the maximum number of partitions that must be allowed at a time.

Usage: option("max_active_partitions", "100")

Default: null

//end::tuning[]

//tag::spark-troubleshooting[] == Troubleshooting Tips

=== Why is dataFrame.count so slow?

Solr can provide the number of matching documents nearly instantly, so why is calling count on a DataFrame backed by a Solr query so slow? The reason is that Spark likes to read all rows before performing any operations on a DataFrame. So when you ask SparkSQL to count the rows in a DataFrame, spark-solr has to read all matching documents from Solr and then count the rows in the RDD.

If you're just exploring a Solr collection from Spark and need to know the number of matching rows for a query, you can use SolrQuerySupport.getNumDocsFromSolr utility function.

=== I set rows to 10 and now my job takes forever to read 10 rows from Solr!

The rows option sets the page size, but all matching rows are read from Solr for every query. So if your query matches many documents in Solr, then Spark is reading them all 10 docs per request.

Use the sample_seed option to limit the size of the results returned from Solr.

//end::spark-troubleshooting[]

//tag::spark-app[] == Developing a Spark Application

The com.lucidworks.spark.SparkApp provides a simple framework for implementing Spark applications in Java. The class saves you from having to duplicate boilerplate code needed to run a Spark application, giving you more time to focus on the business logic of your application.

To leverage this framework, you need to develop a concrete class that either implements RDDProcessor or extends StreamProcessor depending on the type of application you're developing.

=== RDDProcessor

Implement the com.lucidworks.spark.SparkApp$RDDProcessor interface for building a Spark application that operates on a JavaRDD, such as one pulled from a Solr query (see SolrQueryProcessor as an example).

=== StreamProcessor

Extend the com.lucidworks.spark.SparkApp$StreamProcessor abstract class to build a Spark streaming application.

See com.lucidworks.spark.example.streaming.oneusagov.OneUsaGovStreamProcessor or com.lucidworks.spark.example.streaming.TwitterToSolrStreamProcessor for examples of how to write a StreamProcessor.

//end::spark-app[]

//tag::spark-auth[] == Authenticating with Solr

For background on Solr security, see: https://lucene.apache.org/solr/guide/6_6/securing-solr.html[Securing Solr].

=== Kerberos

The Kerberos config should be set via system param java.security.auth.login.config on extraJavaOptions for both executor and driver.

==== SparkApp The SparkApp framework (in spark-solr) allows you to pass the path to a JAAS authentication configuration file using the -solrJaasAuthConfig option.

For example, if you need to authenticate using the "solr" Kerberos principal, you need to create a JAAS configuration file named jaas-client.conf that sets the location of your Kerberos keytab file, such as:

[source] Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/keytabs/solr.keytab" storeKey=true useTicketCache=true debug=true principal="solr"; };

To use this configuration to authenticate to Solr, you simply need to pass the path to jaas-client.conf created above using the -solrJaasAuthConfig option, such as:

[source] spark-submit --master yarn-server \ --class com.lucidworks.spark.SparkApp \ $SPARK_SOLR_PROJECT/target/spark-solr-${VERSION}-shaded.jar \ hdfs-to-solr -zkHost $ZK -collection spark-hdfs \ -hdfsPath /user/spark/testdata/syn_sample_50k \ -solrJaasAuthConfig=/path/to/jaas-client.conf

=== Basic Auth

Basic auth can be configured via System properties basicauth or solr.httpclient.config. These system properties have to be set on Driver and Executor JVMs

Examples:

Using basicauth [source] ./bin/spark-shell --master local[*] --jars ~/Git/spark-solr/target/spark-solr-3.0.1-SNAPSHOT-shaded.jar --conf 'spark.driver.extraJavaOptions=-Dbasicauth=solr:SolrRocks'

Using solr.httpclient.config [source] ./bin/spark-shell --master local[*] --jars ~/Git/spark-solr/target/spark-solr-3.0.1-SNAPSHOT-shaded.jar --conf 'spark.driver.extraJavaOptions=-Dsolr.httpclient.config=/Users/kiran/spark/spark-2.1.0-bin-hadoop2.7/auth.txt'

Contents of config file

[source] httpBasicAuthUser=solr httpBasicAuthPassword=SolrRocks

//end::spark-auth[] //end::spark-devdocs[]