apache / druid

Apache Druid: a high performance real-time analytics database.
https://druid.apache.org/
Apache License 2.0
13.44k stars 3.69k forks source link

Support directly reading and writing Druid data from Spark #9780

Open JulianJaffePinterest opened 4 years ago

JulianJaffePinterest commented 4 years ago

Motivation

To paraphrase the design doc, Druid is an excellent OLAP tool that aims to serve real-time analytics. Apache Spark, on the other hand, is a de-facto standard in the industry for big data processing, which is mostly batch-oriented by nature. While Druid has a few ways to query and ingest data, most of them are not suitable to be used from within ETL workflows, and specifically - via Spark (a few use-cases are detailed in this slack thread).

For ingesting data, the primary concern is the inefficiency of needing to write data produced in a Spark application to an intermediate location, using additional resources to read the data back into memory and prepare it for ingestion, and then finally ingesting the data into Druid via a Druid indexing task. If a Spark application can write files directly to deep storage in the format needed by Druid, these wasteful intermediate steps can be avoided.

On the reading side, some users want to use Spark to perform more complicated or arbitrary operations on data stored in Druid than Druid supports, or to join Druid data against other data stores, or simply to create very large reports. While this could be done via the existing Druid query interfaces, most of these use cases are batch-oriented, and so are both computationally expensive to produce and unlikely to be repeated with any frequency. To avoid degrading the performance of a Druid cluster for interactive workloads or triggering thrash as segments page in and out of memory, Spark applications could instead read the necessary data directly from the backing segment files.

Proposed changes

There are 3 primary ways to read and write data in Spark: the original RDD APIs, the DataSource V1 API, and the DataSource V2 API. A brief overview of some of the pros and cons of each is below:

RDD APIs

DataSource V1

DataSource V2

This presentation goes into more detail if desired.

Because DataSource V2 is more powerful and full-featured and is the current focus of Spark development efforts, we should target this API for our development efforts. Moreover, since Spark 2.4 has been released for a year and a half and Spark traditionally sees slow uptake across major versions, Spark 2.4 will likely continue to be the dominant deployed version for some time, lessening the argument for designing against the upcoming 3.x modifications to the API.

Direct Reader

The direct Spark reader will have two modes of operation:

  1. In the first and most common case, users will provide a data source name, one or more intervals (via Spark filters), and optionally various connection and configuration parameters. We will issue segment metadata queries to determine the schema of data source, query the metadata store to obtain the load specs for the active segments covering the desired intervals for the given data source, and then read each segment into a partition.
    1. Ideally, we could construct the schema purely from queries to the INFORMATION_SCHEMA.COLUMNS and not have to issue potentially expensive segment metadata queries over large intervals. However, the INFORMATION_SCHEMA tables don't contain information on the specific complex serde used to encode a metric or on whether or not a particular column has entries with multiple values (see #9707).
    2. Similarly, the sys tables exposed through the SQL query interface do not contain the load specs for the given segments, so segment locations will need to be queried through the coordinator API or the metadata server. Since we need to interact with the metadata server for the writer anyway and querying the metadata server is much more performant than querying the coordinator API for segment loadspecs, we will read segment locations on deep storage via SQL queries to the metadata server.
  2. In the second case, we will allow users to provide a list of segments directly and bypass querying the metadata store. This will support cases where users have alternative ways of determining segment locations.

Direct Writer

Perhaps the biggest disadvantage of the two DataSource APIs vis-à-vis the older RDD APIs is the lack of control over the partitioning of the input data. This gives users much greater control over how data is partitioned, but also allows them to easily make suboptimal choices without meaning too. Druid requires each segment to have extensive information about broader partitioning choices in order to take advantage of most of its features (e.g. a segment must know how many other segments there are in an interval to support atomic updates, and must know which segment of n it is to support contiguity checks and minor compaction, etc.). Without this information, Druid loses some flexibility and consistency guarantees.

Frustratingly, Spark shares very little information with its writers (only a partition's id, as well as task and epoch ids). This information is insufficient on its own to construct most Druid shard specs. To work around this, we can allow users to optionally provide additional partition information as options to the writers while falling back to the knowledge-less case if they don't. This will allow users who require certain Druid features to achieve them without unduly burdening users who don't.

At a high level, the Druid writer will work by constructing one or more incremental indices per partition (one per segment interval with rows in the partition unless a user provided more information) and then merging these together into segment files, publishing them to deep storage, and reporting the resulting data segments to the driver. If all partitions successfully write their segments, Spark will update the metadata store directly and the Druid coordinator will begin the loading process the next time it runs. If one or more partition write fails, Spark will delete all temporary files associated with the write attempt on its executors and delete the already written segments from deep storage. In this way, the writer should avoid polluting deep storage with segments from failed writes, and users won't be left with partially updated data sources.

Registries to shadow extensions

Because reading and writing data will happen on Spark clusters instead of Druid clusters, we can't take full advantage of the extensibility and dependency injection of Druid (e.g. we can't support custom metadata stores or complex metrics we don't know about the same way Druid does). To support custom extensions, we can use a registry pattern instead. By default, we can register the core implementations of various extension points and expose public APIs that allow users to register their own as well. Where we need use certain features in code, such as when we interact with deep storage or construct shard specs, we can pattern match against the registered functions for the given task. As with passing partitioning information to the writer, this pattern allows users who need more complex functionality to take advantage of it while "just working" for users who don't.

Rationale

The basic motivation for direct readers and writers in Spark is discussed above, as are the pros and cons of the various Spark APIs for reading data into and writing data out of Spark applications. Fuller discussions of the benefits and drawbacks of various technical implementations in Spark as well as existing third-party alternatives can be found in the design doc.

Operational impact

There will be no operational impacts on existing Druid clusters. This proposal would add readers and writers that can be called from within a Spark job.

Test plan

Because the reader and writer are symmetric, we can add integration tests that write segments to ephemeral local storage and read them back and verify correctness. Beyond these integration tests and unit tests, the biggest challenge will be testing all the various deep storage and metadata server possibilities. Here we will have to rely to some degree on the existing test support for the underlying extensions.

Future work

The primary goal of this proposal is to add readers and writers with all core features supported. The Spark DataSource V2 API allows us to envision support for many future enhancements, even if they're not part of the initial scope. Some highlights include:

Annex

Spark read/write to Druid - design doc

JulianJaffePinterest commented 4 years ago

I haven't had the time I'd hoped to have over the past month to finish this up, so the code stands at about the same place: code is complete and working as expected, but the necessary pieces to make this actually useful (documentation) is still in progress. The outstanding work is to sanitize the writer configs, document everything, and figure out how to test this with other deep storage and metadata server technologies (we run this with segments stored on S3 and use a MySQL instance for our metadata server). I'm posting a pointer to the code here to save people having to dig through the design doc and to hopefully spur myself to carve out more time to resolve these issue.

mangrrua commented 4 years ago

Hello @JulianJaffePinterest

Do you still work on this connector? We may improve this connector, but for the first version your source code seems good.

averma111 commented 4 years ago

@mangrrua and @JulianJaffePinterest I am very thankful to your guys for the work of spark connector to druid. Can I package this jar and make it working with current druid 0.18.1 version?

averma111 commented 4 years ago

Also as per the design document https://github.com/metamx/druid-spark-batch is option for writing into Druid. but the build is failing. Can I get some help.

soumyajose0784 commented 4 years ago

@JulianJaffePinterest , we were able start druid servuces with the spark extension built from github code. Could you please share other configurations and ingestion spec sample for data ingestion from spark

xsqian commented 4 years ago

@mangrrua and @JulianJaffePinterest I am very thankful to your guys for the work of spark connector to druid. Can I package this jar and make it working with current druid 0.18.1 version?

like to get some response on this question as well. Thanks!

averma111 commented 3 years ago

@xsqian May get some response on this,

bowenli86 commented 3 years ago

Hi folks, any update on this?

JulianJaffePinterest commented 3 years ago

No update from my side on this. I haven't had time to work on this and it appears that there isn't community appetite to support direct Spark readers and writers (there is certainly community desire for such a feature though!). I haven't left this in a great state - the code works, and I suppose if you wanted to work backwards from the DruidDataSourceOptionKeys you could even make it work for you but the documentation is mostly in code comments instead of a useful readme. Even the usual first step of just working backwards from the tests doesn't help too much here since the whole point of these connectors is to hide the various APIs behind the Spark DataSourceV2 API - everything interesting happens in the options passed along in the .options() call (e.g. spark.read.format('druid').options(Map(DruidDataSourceOptionKeys.brokerHostKey -> '<my broker hostname>', ...))) so the key piece for usability is the documentation. The best pointers I can give you there are that the reader is easier to get working, and can mostly be read off from the DruidDataSourceOptionsKeys. The writer is trickier, as discussed in the proposal and in the code. Basically, in order to effectively use the output in Druid you'll need to use a custom partitioner in Spark and pass along a map to the writer to work around the limited information Spark passes across. This pattern is pretty anti-user and so one of the aims of this proposal was to start a discussion on ways to improve the situation, but that hasn't happened yet.

@averma111, @mangrrua, and any others interested in working on this proposal/packaging the code/etc., my code is licensed under the same license as the Druid project, as confirmed in the license headers on each file. I would love to see the community pick up where I left off.

JulianJaffePinterest commented 3 years ago

I had a little free time so I pulled in the most recent changes to mainline Druid and started a readme. There are a few minor signature changes that I blindly migrated to, but down the road we should actually adopt the new approaches - they will be more performant. I also added a simple readme with tables of the various config options and some sample code.

JulianJaffePinterest commented 3 years ago

Had some more time, so I added a round-trip example to the test suites that should help demonstrate how to use these connectors. The major outstanding piece is porting the partitioner from the druid-spark-batch repo, updating it, and adding some scala magic to make it easier to glue the partitioning and the writing together.

JulianJaffePinterest commented 3 years ago

A little more time 😃. I've implemented a solution for the partitioning problem mentioned above - the writer now "rationalizes" shard specs just before committing them, resulting in contiguous and complete time chunks. This means that linear and numbered shard specs need no special information from users. I've also added scaffolding and a sample partitioner to illustrate how providing partition mapping metadata can be done for more complex shard specs such as hashed or single dimension. I also added more documentation around partitioning. Once the deep storage config serialization is fixed up, this should be drop-in ready (it's pretty clean and easy to use at the moment, modulo pushing segments to deep storage which still needs some custom work).

JulianJaffePinterest commented 3 years ago

I cleaned up the docs a little more and added a few more tests. I don't have access to Azure or GCS blob storage to actually implement default segment writer registration functions and not every class the constructors require are serializable, so I haven't implemented working versions. While I do have access to S3, I use custom SegmentPushers and SegmentKillers and so the same caveats apply to the shipped s3 segment writer functions. Local and HDFS deep storage segment writers work out of the box, and I'm hopeful that people who use the various cloud providers' blob storages for deep storage will contribute actual working default implementations as they build them 🤞.

With the admittedly big exception of some of the deep storage writers, this is pretty much done. I think this process has pretty conclusively demonstrated that there isn't sufficient community support for maintaining Spark connectors as part of mainline Druid, so I'll leave this issue open but the next step will likely be to move these connectors to a standalone repository and ask for a pointer to be added to the Druid docs.

JulianJaffePinterest commented 3 years ago

Fixed up the deep storage writers for S3 and GCS, added support for password providers in metadata server connections, and improved the documentation.

b-slim commented 3 years ago

@JulianJaffePinterest Let me start by saying this is great work and would love to share with you some of the experience we had when building a similar connector between Hive and Druid. The main point I want to share with you is why we picked the Hive Repo as the best place for such a connector.

Those are the 2 cents that I want to share and should be considered as opinion and not review

JulianJaffePinterest commented 3 years ago

@b-slim Your points are well taken. There are a few differences between how the Spark and Hive projects run their codebases, and in how this extension is implemented, that I think change the calculus a little bit.

You're definitely right that there are some awkward edges in trying to fit this extension into the main Druid codebase, and it may not make sense to do so at the end of the day.

vivek-balakrishnan-rovio commented 3 years ago

Hello, I am a Data Engineer working for Rovio. We have been using Druid for close to 1.5 years to power our analytics dashboards. We had a similar requirement to export our data from Hive tables to Druid.

Initially, we had patched the DruidStorageHandler of Hive to work with EMR and S3. Later, we decided to write a Apache Spark write only datasource inspired by Hive's DruidStorageHandler. At that point we were not aware of any similar effort going on. We notice that this PR has a richer feature set than our library and also supports both read & write even for complex metrics (Sketches & HLL). Our library has been mostly driven by our internal needs only supports writing and has supports only for basic metrics aggregation at the moment.

Regarding writing, We took a similar approach for partitioning the dataset before writing as supported in this PR. We also wrote a Scala extension and a python wrapper to abstract the partitioning logic. Unlike this PR, our library expects the data to be partitioned by __time column and does not do any segment rationalizing in the commit phase.

If you would like to try our library, the modules are available in maven central and PyPI (as documented in README).

We are excited that there is an official library for reading and writing Druid segments with Spark dataframes.

Regarding the feature gap between your extension and rovio-ingest, how do you envision the future? Would you be open to for example adding a pyspark wrapper?

By the way, rovio-ingest was already added to the list of Community and Third Party Software of druid website: https://github.com/apache/druid-website-src/pull/231

Thank you.

JulianJaffePinterest commented 3 years ago

Hey Vivek, thanks for your comment. I’m happy to hear of your experiences working with Spark and Druid! The PR for this issue (#10920) is currently in review. Once it’s merged, it won’t matter what my plans are, it will matter what the community wants 😄. I don’t have much experience with pySpark directly, but I would definitely welcome a Python wrapper if one is necessary! (From looking at yours, it seems that the wrapper is just for conveniently handling repartitioning, not just to allow interoperation, right?)

mangrrua commented 3 years ago

Hi @JulianJaffePinterest

Thanks for this connector! This is in the PR now, but can we use it in production?

JulianJaffePinterest commented 3 years ago

[@mangrrua] I'm aware of at least a few different companies and organizations using this in production. There may be others I don't know about as well.

520lailai commented 3 years ago

If you are running two or more tasks which generate segments for the same data source and the same time chunk, the generated segments could potentially overshadow each other, which could lead to incorrect query results. Druid ingestion tasks will get the locks prior to avoid this problem. So Why not using the task lock before create the segment in spark job?

besides, insert metadata directly into the mysql in production may cause security issues.

JulianJaffePinterest commented 3 years ago

@520lailai The calling Spark applications set the version of the segments they write themselves. If a user runs multiple jobs that generate segments for the same data source and time chunk, the application the user assigns the highest version to will produce the ultimately available segments, once all applications have finished. If you have a specific use case in mind where you will be running concurrent Spark jobs that will target the same data sources and intervals, I'd be happy to give you more tailored suggestions.

As for Druid task locks, a Spark application calling this writer is not a Druid task. The application is not triggered by a Druid cluster, and the Druid cluster is unaware of the application. If we instead view Druid task locks as segment locks, I could imagine using the internal API to acquire a lock on write, but it would only be useful in limited circumstances. I don't see value in delaying a Spark job from writing (the job will write segments with the specified version regardless; timeshifting the write doesn't change anything). I can see where delaying a real-time ingestion task may be useful. If there's community demand for integrating with Druid locks it could be done.

Finally, to your point about security concerns, the writer must provide its own metadata server credentials. The metadata client will only ever attempt to read and insert data into an existing table, and so ideally the associated user should have only those permissions. If you're planning to provide credentials independently of users (for example, via environment variables or a credential store running on the Spark cluster nodes themselves), you should not allow anyone to submit Spark applications you would not allow to send post requests to the overlord.

520lailai commented 3 years ago

If this application conflicts with the real-time task, what happens if the real-time task and this spark application generate the same version of the sgement?

JulianJaffePinterest commented 3 years ago

If a Spark application produces segments with the exact same version as a real-time task, the precise behavior depends on the shard spec but will likely result in neither set of segments being loaded by a Druid cluster. A simple way to avoid this is to use a version prefix for the segments written via Spark (to ensure they always overshadow real-time segments). If you want Spark-produced and real-time segments to live alongside each other, you could follow the major/minor version approach that Druid internal compaction jobs do. And if you want real-time and Spark-produced segments to "compete" and have the latest task win, you can use time stamp versions and slice off a digit of precision from the time stamp in Spark to ensure that the two different methods can't produce identical versions. Down the road, if there's community demand/a specific use case where it makes sense to integrate this writer with Druid task locks, it shouldn't be too difficult to do so, but I haven't come across one so far. Maybe as Druid's internal tasks continue to grow in power and complexity some will arise.

mangrrua commented 3 years ago

@JulianJaffePinterest could you run any job in your spark cluster using this connector?

When i import this dependency, then run the job it throws exception below; java.lang.NoSuchMethodError: com.google.inject.util.Types.collectionOf(Ljava/lang/reflect/Type;)Ljava/lang/reflect/ParameterizedType;

Druid uses guava 16.0.1 and guice 4.0.1, but spark uses guava 14 and guice 3. Can we use guava 16 and guice 4 in spark?

JulianJaffePinterest commented 3 years ago

Yes, I can run jobs in a Spark cluster. How are you depending on this extension/submitting jars to your Spark cluster? Also, what version of Spark are you using?

520lailai commented 3 years ago

I pull the new code from your branch: spark_druid_connector, I have some questions about it.

Class : DruidDataWriterFactory Function: generateDimensions()

1 // No dimensions provided, assume all non-excluded non-metric columns are dimensions. 2 if (dimensionsStr == DruidConfigurationKeys.dimensionsDefaultKey.2 || dimensionsStr.isEmpty) { 3 val nonDimColumns = metrics ++ excludedDimensions :+ tsCol 4 val dimensions = schema.fieldNames.filterNot(nonDimColumns.contains())

5 val excessDimensions = dimensions.filterNot(schema.fieldNames.contains(_)) 6 if (excessDimensions.nonEmpty) { 7 logWarn(s"Dimensions contains columns not in source df! Excess columns: ${excessDimensions.mkString(", ")}") }

the number 5 line : What is the point of doing this? the 'dimensions' are all from 'schema.fieldNames' , so the number 5 will always return empty array.

mangrrua commented 3 years ago

Yes, I can run jobs in a Spark cluster. How are you depending on this extension/submitting jars to your Spark cluster? Also, what version of Spark are you using?

Spark version = 2.4.3 . I publish your code to the local cache using the maven, then import it in the another project.

I pass assemblyPackageDependency jar of my project as --jars parameters, and package jar to run spark using this jar.

JulianJaffePinterest commented 3 years ago

@mangrrua Sorry, try setting userClassPathFirst. This issue was introduced when I added the additional explicit dependencies requested by the dependency analyzer, and since I submit my jobs with user class path first I didn't notice the regression. I've pushed a commit shading guava that should solve your issue.

@520lailai you're right, that line's unnecessary after the refactor from a couple months ago. I've removed it.

For both of you, let's move these comments to the pr #10920 instead of the issue.

Law101 commented 2 years ago

Thanks @JulianJaffePinterest & co. for your great contribution! Now, am trying to wrap my head around how to integrate this into my structured streaming application, the module documentation is down and am just wondering if anyone can point me to resources to get started with. I intend to use this in python specifically. Thank you once again.

JulianJaffePinterest commented 2 years ago

@Law101 for a structured streaming application you probably want to instead emit to Kafka or Kinesis from your spark app and then use Druid's real time ingestion to read your data into Druid. This is because Druid won't know about the segments you create with this writer until you finalize them whereas Druid can query the in-progress real time segments it creates immediately.

Law101 commented 2 years ago

Thank you @JulianJaffePinterest this is exactly the approach am currently using, and I now understand better with your explanation.