matthewfranglen / postgres-elasticsearch-fdw

Postgres to Elastic Search Foreign Data Wrapper
MIT License
108 stars 32 forks source link

[Feature proposal] Add sort option in the query to do delta export #7

Closed ahocquard closed 4 years ago

ahocquard commented 4 years ago

Hi, again me :)

I think the main goal of this extension is to synchronize an ES index to a Postgres database. The only option for now is to filter thanks to the URI search in ES.

If you have to synchronize regularly your ES index database, you need to only load new documents since the last import (if the ES documents are immutable such as logs). Personally, to do a delta import this way, I do that:

-- it loads every new documents since the last import.
-- it uses Query String Syntax. Do note that date filter is a ">" (as expected) and not ">=" due to the character "{" in the query string
--
-- @see https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html
-- @see https://lucene.apache.org/solr/guide/6_6/working-with-dates.html
--
-- Do not try to modify it with CTE or put subquery from the WHERE clause in the FROM clause.
-- Otherwise, the optimizer tries to load all data from ES and then applies the WHERE clause on the timestamp on Postgres side.
insert into access_log
select
    *
from es_access_log
where query = (
    select
        CONCAT(
            'log_timestamp:{',
            to_char(MAX(log_timestamp), 'YYYY-MM-DD"T"HH24:MI:SS"Z"'),
            ' TO *}'
        )
    from access_log
);

The problem is that you can have logs in the same second interval that are not loaded. Generally, when you want to do such export, you export with a deterministic order as for a seek pagination

Whith pseudo SQL code, the filter would be:

WHERE log_timestamp >=  max_log_timestamp and id > id_for_max_log_timestamp
ORDER BY id

Adding sort capability (possible in the URI search from ES) would allow to export accurately your data for such usecase.

matthewfranglen commented 4 years ago

Hi,

Thanks for making another suggestion. I would like to clarify what you are asking for here as the desired change is not immediately clear to me.

You want to export logs from an elasticsearch instance (something like logstash) and you want to be able to perform this export periodically. The logs have timestamps and increasing ids. You correctly perform a range query over the log_timestamp to do so, however this does not meet your needs.

To fully meet your needs you need to combine this with a range query over the id.

So I wonder why you cannot do that? The lucene query syntax supports combining filters. Have you tried

log_timestamp:[TIMESTAMP TO *] AND id:{LAST_ID TO *]

With regard to the requirement for sorting for pagination, that is not strictly required by elasticsearch. The underlying query that the fdw performs uses a cursor (scroll in elasticsearch terminology) which returns the records for a snapshot at the point that the query was started.

https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html#request-body-search-scroll

The results that are returned from a scroll request reflect the state of the index at the time that the initial search request was made, like a snapshot in time. Subsequent changes to documents (index, update or delete) will only affect later search requests.

If you load the records into a CTE you should be able to do an update to your tracking table taking the max id from that?

I am probably mistaken in some part of this so please let me know what works or doesn't work for you.

matthewfranglen commented 4 years ago

It would be possible to add an optional sort column that would then populate the sort query parameter. I wonder if it would be better to support the full json style query syntax if the column type of the query was json.

matthewfranglen commented 4 years ago

I like that you view the extension as a way to read from elasticsearch into postgres as that is the opposite of what I have used it for. In my experience postgres is quite slow at handling full text queries and indices, which is an area where lucene (and thus elasticsearch) shine. So I have used elasticsearch as an index over data stored in postgres. It's great that it is useful for many things.

ahocquard commented 4 years ago

Architecture

Let me explain my architecture, it will help to better understand one of the usecase of the wrapper :

  1. It collect Nginx access logs data through logstash and put it in ES (legacy)
  1. Analyse data with Kibana (legacy)

It's pretty nice for a lot of needs. But the data is very redundant sometimes. A lot of nested GROUP BY are needed, and Kibana does not allow that.

At some point, we realized that we want to cross the data with other datasources. And here, a relational database is far better.

  1. Import data in Posgres from a lot of sources to manipulate easily the data and create my projection

At the end, I will maybe re-index the projections in ES if Postgres is too slow. But I think the projection will be on just several thousands lines compared to the millions of ES events... Also, it's for analytics and I don't need real-time queries.

Btw, did you know this extension exists to automatically do that? I discovered it recently.

Sorting

log_timestamp:[TIMESTAMP TO *] AND id:{LAST_ID TO *]

It's not possible to do that if your ids are not sequential. And they are not with ES. My pseudo-code was wrong. It's:

WHERE 
    (log_timestamp >=  max_log_timestamp and id > id_for_max_log_timestamp) 
    OR log_timestamp >  max_log_timestamp
ORDER BY id
matthewfranglen commented 4 years ago

Thanks for this additional context. This helps a lot. I do think that supporting the json query dsl will address your desired feature. It has a sort term which can be coupled with the appropriate query.

For your current deployment perhaps you should try:

(log_timestamp:TIMESTAMP AND id:{LAST_ID TO *]) OR log_timestamp:{TIMESTAMP TO *]

As that seems to replicate what you desire. I'll let you know when I've evaluated/implemented the change to the query column.

Turns out I have that extension starred heh. It looks great!

ahocquard commented 4 years ago

Even this query will not work.

Actually, I tried to explain the logic with an example. But I realized that even my solution with sort is not correct. Imagine this example:

ES time "t":

{"_id"=>"0001", "log_timestamp": "2020-01-01T00:00:00.000Z", "user_id": 10}
{"_id"=>"9999", "log_timestamp": "2020-01-02T00:00:00.000Z", "user_id": 11}

You export these lines in Postgres. Easy.

ES time "t+1", ids "0002" and "8888* are added:

{"_id"=>"0001", "log_timestamp": "2020-01-01T00:00:00.000Z", "user_id": 10}
{"_id"=>"9999", "log_timestamp": "2020-01-02T00:00:00.000Z", "user_id": 11}
{"_id"=>"0002", "log_timestamp": "2020-01-02T00:00:00.000Z", "user_id": 12}
{"_id"=>"8888", "log_timestamp": "2020-01-03T00:00:00.000Z", "user_id": 11}

You only want lines to export ids 0002 and 8888 as 000 and 9999 are already in ES . You can't request:

(log_timestamp >= "2020-01-02T00:00:00.000" AND id > "9999") OR log_timestamp > "2020-01-02T00:00:00.000"

This query does not export the id "0002" but it should. Anyway, my solution with the sort only works when all the rows are already present in database.

I close this issue, I don't have a solution to this issue actually :thinking:

matthewfranglen commented 4 years ago

So the query column cannot be json as you can't perform equality tests over it, so my trick of using that to convey query terms fails. I may have some luck using the operators over the exposed fields in multicorn to construct a query however that will have the same expressive power as the lucene URI syntax.

matthewfranglen commented 4 years ago

For your example have you considered exporting from the time of the timestamp inclusive and using an upsert to ignore the rows that are already present (the ON CONFLICT clause of insert)?

ahocquard commented 4 years ago

! I liked the idea that my event store (because my access log is an event) is append-only. But what you propose is better :+1:

Thanks