[![Users Documentation](https://img.shields.io/badge/-Users_Docs-lightgreen?style=for-the-badge&logo=readthedocs)](./docs)
[![Developers Documentation](https://img.shields.io/badge/_-Developer's_docs_(docs.qbeast.io)-ff7?style=for-the-badge&logo=readthedocs)](https://docs.qbeast.io/)
[![API](https://img.shields.io/badge/-Check_the_API-orange?style=for-the-badge)](./docs/QbeastTable.md)
[![Notebook](https://img.shields.io/badge/_-Jupyter_Notebook_example-0053B3?style=for-the-badge&logo=jupyter)](./docs/sample_pushdown_demo.ipynb)
[![Slack](https://img.shields.io/badge/_-Slack-blue?style=for-the-badge&logo=slack)](https://join.slack.com/t/qbeast-users/shared_invite/zt-w0zy8qrm-tJ2di1kZpXhjDq_hAl1LHw)
[![Academy](https://img.shields.io/badge/_-Medium-yellowgreen?style=for-the-badge&logo=medium)](https://qbeast.io/academy-courses-index/)
[![Website](https://img.shields.io/badge/_-Website-dc005f?style=for-the-badge&logo=)](https://qbeast.io)
---
**Qbeast Spark** is an Apache Spark extension that enhances data processing in [**Data Lakehouses**](http://cidrdb.org/cidr2021/papers/cidr2021_paper17.pdf). It provides advanced **multi-dimensional filtering** and **efficient data sampling**, enabling faster and more accurate queries. The extension also maintains ACID properties for data integrity and reliability, making it ideal for handling large-scale data efficiently.
[![apache-spark](https://img.shields.io/badge/apache--spark-3.5.x-blue)](https://spark.apache.org/releases/spark-release-3-5-0.html)
[![apache-hadoop](https://img.shields.io/badge/apache--hadoop-3.3.x-blue)](https://hadoop.apache.org/release/3.3.1.html)
[![delta-core](https://img.shields.io/badge/delta--core-3.1.0-blue)](https://github.com/delta-io/delta/releases/tag/v2.4.0)
[![codecov](https://codecov.io/gh/Qbeast-io/qbeast-spark/branch/main/graph/badge.svg?token=8WO7HGZ4MW)](https://codecov.io/gh/Qbeast-io/qbeast-spark)
Features
-
Data Lakehouse - Data lake with ACID properties, thanks to the underlying Delta Lake architecture
-
Multi-column indexing: Filter your data with multiple columns using the Qbeast Format.
-
Improved Sampling operator - Read statistically significant subsets of files.
-
Table Tolerance - Model for sampling fraction and query accuracy trade-off.
Query example with Qbeast
As you can see above, the Qbeast Spark extension allows faster queries with statistically accurate sampling.
Format |
Execution Time |
Result |
Delta |
~ 151.3 sec. |
37.869383 |
Qbeast |
~ 6.6 sec. |
37.856333 |
In this example, 1% sampling provides the result x22 times faster compared to using Delta format, with an error of 0,034%.
Documentation
Explore the documentation for more details:
Quickstart
You can run the qbeast-spark application locally on your computer, or using a Docker image we already prepared with the dependencies.
You can find it in the Packages section.
Pre: Install Spark
Download Spark 3.5.0 with Hadoop 3.3.4, unzip it, and create the SPARK_HOME
environment variable:
:information_source: Note: You can use Hadoop 2.7 if desired, but you could have some troubles with different cloud providers' storage, read more about it here.
wget https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
tar -xzvf spark-3.5.0-bin-hadoop3.tgz
export SPARK_HOME=$PWD/spark-3.5.0-bin-hadoop3
1. Launch a spark-shell
Inside the project folder, launch a spark shell with the required dependencies:
$SPARK_HOME/bin/spark-shell \
--packages io.qbeast:qbeast-spark_2.12:0.7.0,io.delta:delta-spark_2.12:3.1.0 \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog
2. Indexing a dataset
Read the CSV source file placed inside the project.
val csvDF = spark.read.format("csv").
option("header", "true").
option("inferSchema", "true").
load("./src/test/resources/ecommerce100K_2019_Oct.csv")
Indexing the dataset by writing it into the qbeast format, specifying the columns to index.
val tmpDir = "/tmp/qbeast-spark"
csvDF.write.
mode("overwrite").
format("qbeast").
option("columnsToIndex", "user_id,product_id").
save(tmpDir)
SQL Syntax.
You can create a table with Qbeast with the help of QbeastCatalog
.
spark.sql(
"CREATE TABLE student (id INT, name STRING, age INT) " +
"USING qbeast OPTIONS ('columnsToIndex'='id')")
Use INSERT INTO
to add records to the new table. It will update the index in a dynamic fashion when new data is inserted.
val studentsDF = Seq((1, "Alice", 34), (2, "Bob", 36)).toDF("id", "name", "age")
studentsDF.write.mode("overwrite").saveAsTable("visitor_students")
// AS SELECT FROM
spark.sql("INSERT INTO table student SELECT * FROM visitor_students")
// VALUES
spark.sql("INSERT INTO table student VALUES (3, 'Charlie', 37)")
// SHOW
spark.sql("SELECT * FROM student").show()
+---+-------+---+
| id| name|age|
+---+-------+---+
| 1| Alice| 34|
| 2| Bob| 36|
| 3|Charlie| 37|
+---+-------+---+
3. Load the dataset
Load the newly indexed dataset.
val qbeastDF =
spark.
read.
format("qbeast").
load(tmpDir)
4. Examine the Query plan for sampling
Sampling the data, notice how the sampler is converted into filters and pushed down to the source!
qbeastDF.sample(0.1).explain(true)
Go to the Quickstart or notebook for more details.
5. Interact with the format
Get insights to the data using the QbeastTable
interface!
import io.qbeast.spark.QbeastTable
val qbeastTable = QbeastTable.forPath(spark, tmpDir)
qbeastTable.getIndexMetrics()
6. Optimize the table
Optimize is an expensive operation that consist on rewriting part of the files to accomplish better layout and improving query performance.
To minimize write amplification of this command, we execute it based on subsets of the table, like Revision ID's
or specific files.
Read more about Revision
and find an example here.
Optimize API
These are the 3 ways of executing the optimize
operation:
qbeastTable.optimize() // Optimizes the last Revision Available.
// This does NOT include previous Revision's optimizations.
qbeastTable.optimize(2L) // Optimizes the Revision number 2.
qbeastTable.optimize(Seq("file1", "file2")) // Optimizes the specific files
If you want to optimize the full table, you must loop through revisions
:
val revisions = qbeastTable.revisionsIDs() // Get all the Revision ID's available in the table.
revisions.foreach(revision =>
qbeastTable.optimize(revision)
)
Go to QbeastTable documentation for more detailed information.
7. Visualize index
Use Python index visualizer for your indexed table to visually examine index structure and gather sampling metrics.
Dependencies and Version Compatibility
Version |
Spark |
Hadoop |
Delta Lake |
0.1.0 |
3.0.0 |
3.2.0 |
0.8.0 |
0.2.0 |
3.1.x |
3.2.0 |
1.0.0 |
0.3.x |
3.2.x |
3.3.x |
1.2.x |
0.4.x |
3.3.x |
3.3.x |
2.1.x |
0.5.x |
3.4.x |
3.3.x |
2.4.x |
0.6.x |
3.5.x |
3.3.x |
3.1.x |
0.7.x |
3.5.x |
3.3.x |
3.1.x |
Check here for Delta Lake and Apache Spark version compatibility.
Contribution Guide
See Contribution Guide for more information.
License
See LICENSE.
Code of conduct
See Code of conduct