atviriduomenys / spinta

Spinta is a framework to describe, extract and publish data (a DEP Framework).
MIT License
13 stars 4 forks source link

Paginated push #366

Open sirex opened 1 year ago

sirex commented 1 year ago

Currently push command reads whole table, this can lock table for longer times and possibly interrupt other services. To avoid table locking, push command should read source table in a smaller fragments or chunks.

Fragmentation and pagination should be compatible with LDES: https://semiceu.github.io/LinkedDataEventStreams/#tree

d | r | b | m | property | type     | ref   | source  | prepare       | access
$DATASET                 |          |       |         |               |
  | db                   | sql      |       | sqlite:///$BASEDIR/db.sqlite | |
  |   |   | City         |          | id    | cities  | page(created) |
  |   |   |   | id       | integer  |       | id      |               | open
  |   |   |   | name     | string   |       | name    |               | open
  |   |   |   | created  | datetime |       | created |               | open

In the example above, pagination is specified in model.prepare. page(created) will read data from source not all at once, but in pages.

On a first run a lowest possible value will be taken from created using given type, in this case, for datetime, a 0000-00-00 00:00:00 value will be used. Then, using LIMIT a specified number of rows will be retrieved from data source. By default limit could be 1000 rows, but limit can be specified via page(created, size: 1000) too. Also limit can be specified via configuration push_page_size option.

When reading data, rows must be ordered by created (or other property specified in page).

After reading first chunk of data, last created value must be stored in the push state database, in _page table. Content of page table could look like this:

model property value
$DATASET/City created 2023-02-22 13:44

Next time, another chunk will be read starting from value staved in _page table for that model.

Also we need a spinta push --full flag to reset page value and start from the beginning.

The page function should be interpreted using SqlQueryBuilder and pagination parameters should be added to the builder env. Then, this linke:

https://github.com/atviriduomenys/spinta/blob/acb9f1dd2ff99a9c7e13aca851b847453a114712/spinta/datasets/backends/sql/commands/read.py#L98

Should be replace with a paginate helper function, this function should interpret pagination parameters stored in builder env and run conn.execute multiple times, with a pagination filter, using last page value from push state, passed to getall as page argument.

Pagination should be used only if page argument is given to getall command method.

Pagination should use limit passed via page() function limit argument, but should respect limit argument passed via getall command method, meaning that pagination should stop, until getall limit is reached.

_page table in push state should be updated by push command and this should be done after sending a chunk to remote target.

We also need to update part where we read data from push state, currently we just read whole table into memory, but I think we should read row by row.

https://github.com/atviriduomenys/spinta/blob/0f4e232ae40dc8e69f051f7f2487b7e2685194cc/spinta/cli/push.py#L767-L774

Depends on

sirex commented 1 year ago

Schema how to zip resource and push state data tables. Both tables must be sorted by model.ref.

Drawing5
sirex commented 1 year ago

I tested pagination on https://get-test.data.gov.lt/datasets/gov/rc/ar/adresai/Adresas and it seems, that pagination is not enabled.

When I run query on a large table, all data are read with a single query and pagination does not happen.

I tested this on:

In both cases, I don't get response for some time and finally, after some times download starts. So I assume, that pagination is not used.

adp-atea commented 9 months ago

Created dummy database with this exact manifest, generated 10 million entries.

Tried to fetch csv data with pagination disabled using ?page(disable:true) After 5 minutes it did not even try to start to download the file.

Then I tried with default pagination settings (1000 page size) It immediately started to download, but it was slow, it took around 20 minutes, to just download 150mb of data

After that I increased pagination page size in configs to 100000 It took around 1 minute to download 100 mb of data.

It seems like it is a page size issue, there could also be some optimization improvements that could be done to reduce overhead with pagination.

Currently page size can only be set in the config.py push_page_size variable or in the manifest file. There is also ?page(size:x) option, but it's currently bugged and is fixed in https://github.com/orgs/atviriduomenys/projects/9?pane=issue&itemId=44626801 task

sirex commented 8 months ago

When pushing with

spinta push sdsa.xlsx -o put.data.gov.lt

Following error is raised:

Traceback (most recent call last):
  File "spinta/cli/push.py", line 619, in _read_model_data_by_page
    stream = peek(stream)
  File "spinta/utils/itertools.py", line 40, in peek
    peek = list(islice(it, 1))
  File "spinta/commands/read.py", line 237, in get_page
    yield from get_paginated_values(model_page, page_meta, rows)
  File "spinta/commands/read.py", line 271, in get_paginated_values
    for i, row in enumerate(rows):
  File "spinta/datasets/backends/sql/commands/read.py", line 99, in getall
    for row in conn.execute(qry):
  File "sqlalchemy/engine/default.py", line 736, in do_execute
    cursor.execute(statement, parameters)
  File "src/pymssql/_pymssql.pyx", line 476, in pymssql._pymssql.Cursor.execute
sqlalchemy.exc.ProgrammingError: (pymssql._pymssql.ProgrammingError) (102, """
    Incorrect syntax near 'NULLS'.DB-Lib error message 20018, severity 15:
    General SQL Server error: Check messages from the SQL Server
""")

SQL: """
 SELECT TOP 1001
     [T].[ID],
     [T].[X],
     [T].[Y],
     [T].[Z],
 FROM [T]
 ORDER BY [T].[ID] ASC NULLS LAST
"""

The error is raise on every page.

Finally command ends with:

Traceback (most recent call last):
  File "spinta/cli/push.py", line 460, in _push_rows
    next(rows)
  File "spinta/cli/push.py", line 1339, in _save_push_state
    for row in rows:
  File "spinta/cli/push.py", line 905, in _push_to_remote_spinta
    for row in rows:
  File "spinta/cli/push.py", line 867, in _prepare_rows_for_push
    for row in rows:
  File "spinta/cli/push.py", line 1292, in _check_push_state
    for model_type, group in itertools.groupby(rows, key=_get_model_type):
  File "spinta/cli/push.py", line 408, in _read_rows
    yield from _get_model_rows(
  File "spinta/cli/push.py", line 526, in _get_model_rows
    for row in rows:
  File "spinta/cli/push.py", line 564, in _iter_model_rows
    for row in rows:
  File "spinta/cli/push.py", line 750, in _read_rows_by_pages
    state_row = next(state_rows, None)
  File "spinta/cli/push.py", line 863, in _get_state_rows
    yield from get_paginated_values(model_page, page_meta, rows)
  File "spinta/commands/read.py", line 306, in get_paginated_values
    raise InfiniteLoopWithPagination(
spinta.exceptions.InfiniteLoopWithPagination:
    Pagination values has cause infinite loop while fetching data.
    Page of size: 1000, first value is the same as previous page's last value, which is:
    (
        '23d5911f-fe34-4e99-8417-a366fe3f0ca7',
        '93ef8eea666afc84cb7732607895c47d4e99f182',
        '02ef17f8-77f8-4711-9f5b-e6c123584ac2',
        None,
        False,
        None,
        None
    )

  Context:
    component: spinta.components.Page
    manifest: default
    schema: Sheet1:308
    dataset: datasets/gov/...
    resource: resource1
    model: datasets/gov/...
    entity: ...
    resource.backend: datasets/gov/.../resource1
    page: {
        'key': b'W251bGxd',
        'key_values': {
            'id': None,
        },
        'size': 1000
    }
    page_size: 1000
    page_values: (
        '23d5911f-fe34-4e99-8417-a366fe3f0ca7',
        '93ef8eea666afc84cb7732607895c47d4e99f182',
        '02ef17f8-77f8-4711-9f5b-e6c123584ac2',
        None,
        False,
        None,
        None,
    )

The database used is MSSQL and it seems, that MSSQL does not like NULLS keyword.

Also it looks, that pagination is done on id, because SDSA specifies model.ref: id, but id has NULL values.

Basically the error is related with paginating with NULL values.

Another question, whey we see multiple page values, one looks like this:

page: {
    'key': b'W251bGxd',
    'key_values': {
        'id': None,
    },
    'size': 1000
}

And other:

    page_values: (
        '23d5911f-fe34-4e99-8417-a366fe3f0ca7',
        '93ef8eea666afc84cb7732607895c47d4e99f182',
        '02ef17f8-77f8-4711-9f5b-e6c123584ac2',
        None,
        False,
        None,
        None,
    )

What is the second page values, where they come from, if we use id, one key for pagination?

JuliusLADP commented 1 month ago

sutikrinti ar nepadaryta?