Creating a view table includes four queries in AsterixDB:
Drop a view table if it exists
Create a schema
Create a new view table with the schema just created.
Insert into records selected by a query from twitter.ds_tweets
Expected behavior
Send only one request to Elasticsearch and complete all of the four queries above sequentially.
Problem
Referred from an Elastic blog, there is no native support for transaction (consecutive sequence of SQL statements).
In our initial implementation, Elasticsearch sometimes doesn't handle the request following our post order. For example, Elasticsearch may handle the request in the following order:
Create an index named twitter
Drop an index named twitter
Insert selected records into twitter index
An error occurs when Elasticsearch handles the third request because the twitter index has been dropped.
Our initial implementation first fetches selected records from Elasticsearch to Cloudberry and then injects the records to the destination index in Elasticsearch. In this way, we need to send two requests for this single query. In addition, there is a high overhead for Cloudberry to process a large amount of data.
Solution
Loop through a query list. The query list consists of three queries sent in three requests:
Request 1: Drop an index. Elasticsearch will send back a response with status code: 404 if the index to be dropped doesn't exist. In Cloudberry, we handle this error code.
Request 2: Create an index with generated mapping.
Request 3: Use Reindex API to send only one request that insert documents selected by a query to destination index just created.
Note: In the loop, after each request is posted to Elasticsearch, we call Await.ready(). Then, the next request will not be posted until we get the response of the previous request. By doing so, the synchronization of three requests is realized in this loop.
Issue 2: Join (More details will be added later)
Description
When users search a keyword on the twittermap, a join query will be sent to join other datasets: state population, county population, and city population. Query results are used by Normalization button on twittermap.
Problem
Performing full SQL-style joins in a distributed system like Elasticsearch is prohibitively expensive. In addition, two forms of join in Elasticsearch are designed to scale horizontally. Referred from Elasticsearch, join is not recommended unless absolutely necessary. For example, data contains one-to-many relationship.
AsterixDB behavior
JOIN query translated by AsterixDB is shown below. It has a subquery to do aggregation first and then joins the population table. In this way, the cost of join operation is largely decreased because the relationship is one-to-one.
select tt.`state` as `state`,tt.`count` as `count`,ll0.`population` as `population`
from (
select `state` as `state`,coll_count(g) as `count`
from twitter.ds_tweet t
where t.`create_at` >= datetime('2018-01-02T00:00:00.000-0800') and t.`create_at` < datetime('2018-01-04T00:00:00.000-0800') and ftcontains(t.`text`, ['wang'], {'mode':'all'}) and t.`geo_tag`.`stateID` in [ 37,51,24,11,10,34,42,9,44,48,35,4,40,6,20,32,8,49,12,22,28,1,13,45,5,47,21,29,54,17,18,39,19,55,26,27,31,56,41,46,16,30,53,38,25,36,50,33,23,2 ]
group by t.geo_tag.stateID as `state` group as g
) tt
left outer join twitter.dsStatePopulation ll0 on ll0.`stateID` = tt.`state`
Possible solutions
Multi-search: Send two queries in one request using Multi Search API of Elasticsearch. Then merge the responses of two queries.
Query 1: Perform the subquery above.
Query 2: Query population index get poulation data.
Data model denormalization: Add population data to each tweet. For example,
Current data model: {"geo_tag": {"stateID": 48, "stateName": "Texas", "countyID": 48029, "countyName": "Bexar", "cityID": 4865000, "cityName": "San Antonio"}
Overview
Develop an adapter to support Elasticsearch as the data search engine of Cloudberry.
Related Evaluation
Issue #568
Codebase
es-adpater
Issue 1: Transaction
Description:
Creating a view table includes four queries in AsterixDB:
Expected behavior
Send only one request to Elasticsearch and complete all of the four queries above sequentially.
Problem
Referred from an Elastic blog, there is no native support for transaction (consecutive sequence of SQL statements).
Create an index named
twitter
Drop an index named
twitter
Insert selected records into
twitter
indexAn error occurs when Elasticsearch handles the third request because the
twitter
index has been dropped.Solution
Loop through a query list. The query list consists of three queries sent in three requests:
Request 1: Drop an index. Elasticsearch will send back a response with status code:
404
if the index to be dropped doesn't exist. In Cloudberry, we handle this error code.Request 2: Create an index with generated mapping.
Request 3: Use Reindex API to send only one request that insert documents selected by a query to destination index just created.
Note: In the loop, after each request is posted to Elasticsearch, we call
Await.ready()
. Then, the next request will not be posted until we get the response of the previous request. By doing so, the synchronization of three requests is realized in this loop.Issue 2: Join (More details will be added later)
Description
When users search a keyword on the twittermap, a join query will be sent to join other datasets: state population, county population, and city population. Query results are used by
Normalization
button on twittermap.Problem
Performing full SQL-style joins in a distributed system like Elasticsearch is prohibitively expensive. In addition, two forms of join in Elasticsearch are designed to scale horizontally. Referred from Elasticsearch, join is not recommended unless absolutely necessary. For example, data contains
one-to-many
relationship.AsterixDB behavior
JOIN query translated by AsterixDB is shown below. It has a subquery to do aggregation first and then joins the population table. In this way, the cost of join operation is largely decreased because the relationship is
one-to-one
.Possible solutions
Multi Search API
of Elasticsearch. Then merge the responses of two queries.Query 1: Perform the subquery above.
Query 2: Query population index get poulation data.
Current data model:
{"geo_tag": {"stateID": 48, "stateName": "Texas", "countyID": 48029, "countyName": "Bexar", "cityID": 4865000, "cityName": "San Antonio"}
Denormalized data model:
{"geo_tag": {"stateID": 48, "stateName": "Texas", "statePopulation": 1000, "countyID": 48029, "countyName": "Bexar", "countyPopulation": 100, "cityID": 4865000, "cityName": "San Antonio", "cityPopulation": 1}
Limitation
Name of indices stored in Elasticsearch must be in lowercase.
Declare time format in
mapping
when creating an index. Currently, our adapter only supportsstrict_date_time
format.Weak support of nested aggregation (Allow at most two level nested aggregation).
Only allow single JOIN (NO snowflake join).
Response from Elasticsearch needs parsing.
TODO
Finish by Winter 2019 Week 9:
[x] Translate queries related to creating a view table (cache). (Done in Week 6)
[x] Translate join query used by "Normalization" button.
[x] Do an experiment to see if DRUM works in Elasticsearch.
[x] Translate queries for heat map.
[x] Translate queries for pin map.
[x] Translate queries for sidebar.
Finish by Fall 2019:
[x] Write unit tests for Elasticsearch Adapter (Translator & Connector).
[x] Implement aggregation functions:
avg
,min
,max
.[x] Prepare Ingestion PR for ingesting data into Elasticsearch.
[x] Update ES-adapter Quick Start in Wiki.
[ ] Tune parameter of Cloudberry and Elasticsearch to fit for DRUM. (Working on it)
[ ] Performance Tuning for multiple nodes. (Working on it)