dipdup-io / dipdup

Modular framework for creating selective indexers and featureful backends for dapps
https://dipdup.io
MIT License
92 stars 52 forks source link

await functions in handler files is not awaited #318

Closed fash-ns closed 2 years ago

fash-ns commented 2 years ago

Hi, I've recently updated my dipdup to v5 and now my async functions seems not to be awaited!
There is a mint on 2085726th level of tezos mainnet and a transfer of the minted token on 2085732nd level. It seems saving minted token in database is not awaited. Because I keep getting error (Object does not exist) which means minted token is not saved in database.

Steps to reproduce:

Index KT1JsJUo4PQARJB3AkstQJ8mHskoNJb29a7Z contract for minted tokens and transfer history. You can check this issue in the mentioned levels.

What did you expect to happen:

on_mint handler should be awaited till the save process completed and then, data in next levels should be processed.

What actually happened:

on_mint and on_transfer handlers are executed almost simultaneously (Which a super little lag)

Environment

droserasprout commented 2 years ago

Hi @fash-ns! Broken atomicity is a pretty serious issue! Are you sure it's not a #236 bug? Do you use get_or_create, update_or_create or similar "complex" Tortoise methods in your handlers? Please, provide some logs.

fash-ns commented 2 years ago

Hey @droserasprout! Thanks for your reply. Yes, I've used update_or_create in many places. Should I change them with simple create and update methods? Is there any better solution? Isn't it happened because of having more than one indexes key in .yml config file? As you can see in bellow logs, It seems next block level is being processed before last block processing is finished. This issue leads to an Object does not exist Error. Requested logs can be accessed using This link (updated)

Config file

spec_version: 1.2
package: indexer

database:
  kind: postgres
  host: his-db
  port: 5432
  user: ${POSTGRES_USER:-changeme}
  password: ${POSTGRES_PASSWORD:-changeme}
  database: ${POSTGRES_DB:-changeme}
  # schema_name: demo

hasura:
  url: http://his-hasura:8080
  admin_secret: ${ADMIN_SECRET:-changeme}
  allow_aggregations: true
  camel_case: false
  select_limit: 100

contracts:
  xarb_mint:
    address: KT1JsJUo4PQARJB3AkstQJ8mHskoNJb29a7Z
    typename: xarb_fa2
  xarb_swap:
    address: KT1GBErEfN2bULxkAWucmvptNPXQsj7qPfnU
    typename: xarb_swap
  hen_mint:
    address: KT1RJ6PbjHpwc3M5rw5s2Nbmefwbuwbdxton
    typename: hen_fa2
  hen_minter:
    address: KT1Hkg5qeNhfwpKW4fXvq7HGZB9z2EnmCCA9
    typename: hen_minter
  hen_swap:
    address: KT1HbQepzV1nVGg8QVznG7z4RcHseD5kwqBn
    typename: hen_swap
  hen_users:
    address: KT1My1wDZHDGweCrJnQJi3wcFaS67iksirvj
    typename: hen_users
  teia_swap:
    address: KT1PHubm9HtyQEJ4BBpMTVomq6mhbfNZ9z5w
    typename: teia_swap
  objkt_swap:
    address: KT1FvqJwEDWb1Gwc55Jd1jjTHRVWbYKUUpyq
    typename: objkt_swap

 hooks:
   fix_metadata:
     callback: fix_metadata
     atomic: True

datasources:
  # tzkt_hangzhounet:
  #   kind: tzkt
  #   url: https://api.hangzhou2net.tzkt.io
  tzkt_mainnet:
    kind: tzkt
    url: https://api.tzkt.io
    http:
      cache: false
      retry_count:  5
      retry_sleep: 1
      retry_multiplier: 2
      connection_timeout: 120
  xarb_ipfs:
    kind: ipfs
    url: https://************/ipfs
    http:
      cache: true
      retry_count: 5
      retry_sleep: 1
      retry_multiplier: 1.5
      connection_timeout: 120
  cloudflare_ipfs:
    kind: ipfs
    url: https://cloudflare-ipfs.com/ipfs
    http:
      cache: true
      retry_count: 5
      retry_sleep: 1
      retry_multiplier: 1.5
      connection_timeout: 120

 jobs:
   fix_metadata:
     hook: fix_metadata
     interval: 300

indexes:
  xarb_mainnet:
    kind: operation
    contracts:
      - xarb_mint
      - xarb_swap
    datasource: tzkt_mainnet
    handlers:
      - callback: on_mint
        pattern:
          - type: transaction
            destination: xarb_mint
            entrypoint: mint
      - callback: on_transfer
        pattern:
          - type: transaction
            destination: xarb_mint
            entrypoint: transfer
      - callback: on_operator_update
        pattern:
          - type: transaction
            destination: xarb_mint
            entrypoint: update_operators
      - callback: on_swap
        pattern:
          - type: transaction
            destination: xarb_swap
            entrypoint: swap
      - callback: on_cancel_swap
        pattern:
          - type: transaction
            destination: xarb_swap
            entrypoint: cancel_swap
      - callback: on_collect
        pattern:
          - type: transaction
            destination: xarb_swap
            entrypoint: collect
  hen_mainnet:
    kind: operation
    contracts:
      - hen_mint
      - hen_minter
      - hen_swap
      - hen_users
      - teia_swap
      - objkt_swap
    datasource: tzkt_mainnet
    types:
      - transaction
      - origination
    handlers:
      - callback: on_hen_mint
        pattern:
          - type: transaction
            destination: hen_minter
            entrypoint: mint_OBJKT
          - type: transaction
            destination: hen_mint
            entrypoint: mint
      - callback: on_hen_transfer
        pattern:
          - type: transaction
            destination: hen_mint
            entrypoint: transfer
      - callback: on_hen_operator_update
        pattern:
          - type: transaction
            destination: hen_mint
            entrypoint: update_operators
      - callback: on_hen_swap
        pattern:
          - type: transaction
            destination: hen_swap
            entrypoint: swap
      - callback: on_hen_cancel_swap
        pattern:
          - type: transaction
            destination: hen_swap
            entrypoint: cancel_swap
      - callback: on_hen_collect
        pattern:
          - type: transaction
            destination: hen_swap
            entrypoint: collect
      # hen swap v1
      - callback: on_hen_swap_minter
        pattern:
          - type: transaction
            destination: hen_minter
            entrypoint: swap
      - callback: on_hen_cancel_swap
        pattern:
          - type: transaction
            destination: hen_minter
            entrypoint: cancel_swap
      - callback: on_hen_collect_minter
        pattern:
          - type: transaction
            destination: hen_minter
            entrypoint: collect
      - callback: on_hen_teia_swap
        pattern:
          - type: transaction
            destination: teia_swap
            entrypoint: swap
      - callback: on_hen_teia_cancel_swap
        pattern:
          - type: transaction
            destination: teia_swap
            entrypoint: cancel_swap
      - callback: on_hen_teia_collect
        pattern:
          - type: transaction
            destination: teia_swap
            entrypoint: collect
      - callback: on_hen_objkt_swap
        pattern:
          - type: transaction
            destination: objkt_swap
            entrypoint: ask
      - callback: on_hen_objkt_cancel_swap
        pattern:
          - type: transaction
            destination: objkt_swap
            entrypoint: retract_ask
      - callback: on_hen_objkt_collect
        pattern:
          - type: transaction
            destination: objkt_swap
            entrypoint: fulfill_ask
      # hen users
      - callback: on_hen_register
        pattern:
          - type: transaction
            destination: hen_users
            entrypoint: registry

advanced:
  reindex:
    manual: wipe
    migration: exception
    rollback: ignore
    config_modified: exception
    schema_modified: exception
droserasprout commented 2 years ago

I don't see anything suspicious in your logs or config. Failing .get() call on on_transfer:17 means you have no such entry in the database at the time of execution.

Make sure that your indexes are independent of each other. Synchronization is performed simultaneously, so you can never tell which index will reach a specific level first.

fash-ns commented 2 years ago

Synchronization is performed simultaneously, so you can never tell which index will reach a specific level first.

It's OK. The problem is the token which is not existed and caused the error is no. 19 (You can check it out via transfer tx_hash on tzkt) is minted previously at 1967322nd level and should have stored in database (Check the 239th line in logs file). It seems indexing of a specific index is not asynchronous!

droserasprout commented 2 years ago

I don't understand what you mean by "not asynchronous indexing". All indexes are atomic by level - matched handlers are executed within a single database transaction. Try running your project with dipdup -l debug.yml ... run to check actual SQL queries (insert into token, select from token) that execute on each step.

fash-ns commented 2 years ago

Let me put it this way:

Please take a look at logs bellow:

INFO     dipdup.index         xarb_mainnet: Processing operations of level 1967518
INFO     dipdup.index         xarb_mainnet: oniLLHs4AAEpGFW6yeFuGrJJAqPYuL5G9y4XeiGPmr2f9jHwsqc: `on_mint` handler matched!
INFO     dipdup.index         xarb_mainnet: ooxVzHPKk3EB9nhkdzizTQtk5NPwV3KNDp4PD1YX2dYLt5CeKAD: `on_operator_update` handler matched!
INFO     dipdup.index         xarb_mainnet: ooxVzHPKk3EB9nhkdzizTQtk5NPwV3KNDp4PD1YX2dYLt5CeKAD: `on_transfer` handler matched!
INFO     dipdup.index         xarb_mainnet: ooxVzHPKk3EB9nhkdzizTQtk5NPwV3KNDp4PD1YX2dYLt5CeKAD: `on_swap` handler matched!
INFO     dipdup.handlers.on_hen_mint ooWnLkTebhW8QoCncVc8vukrYBH2ksaypy9HuUGTuhQtY3iKae6: Mint of token 170 of hen saved successfully
adding metadata for token 170 of hen
WARNING  dipdup.callback      `on_hen_mint` handler callback executed in 1.0458146940000006 seconds
INFO     dipdup.index         hen_mainnet: Processing operations of level 1365936
INFO     dipdup.index         hen_mainnet: op4RBDAYu2UhwwYLiHPm91SQDwBPXBsHY1ZtUWNptH6xdYcrYfj: `on_hen_transfer` handler matched!
INFO     dipdup.index         hen_mainnet: op4RBDAYu2UhwwYLiHPm91SQDwBPXBsHY1ZtUWNptH6xdYcrYfj: `on_hen_collect_minter` handler matched!
INFO     dipdup.handlers.on_mint oniLLHs4AAEpGFW6yeFuGrJJAqPYuL5G9y4XeiGPmr2f9jHwsqc: Mint of token 27 of xarb saved successfully, <Token>
adding metadata for token 27 of xarb
INFO     dipdup.handlers.on_transfer ooxVzHPKk3EB9nhkdzizTQtk5NPwV3KNDp4PD1YX2dYLt5CeKAD: transferring token 26
INFO     dipdup.index         xarb_mainnet: Processing operations of level 1967528
INFO     dipdup.index         xarb_mainnet: oo8zzPVJCkCEkJ6TusgrofviAx4FQdMysfqoCXMihZJC8he2TBz: `on_operator_update` handler matched!
INFO     dipdup.index         xarb_mainnet: oo8zzPVJCkCEkJ6TusgrofviAx4FQdMysfqoCXMihZJC8he2TBz: `on_transfer` handler matched!
INFO     dipdup.index         xarb_mainnet: oo8zzPVJCkCEkJ6TusgrofviAx4FQdMysfqoCXMihZJC8he2TBz: `on_swap` handler matched!
INFO     dipdup.index         hen_mainnet: Processing operations of level 1365939
INFO     dipdup.index         hen_mainnet: ooQE3asVLN4DXvLGU6KQ2YCJe1ci2rh3HTfz42FdCfFwKNwZmb5: `on_hen_transfer` handler matched!
INFO     dipdup.index         hen_mainnet: ooQE3asVLN4DXvLGU6KQ2YCJe1ci2rh3HTfz42FdCfFwKNwZmb5: `on_hen_collect_minter` handler matched!
INFO     dipdup.handlers.on_transfer oo8zzPVJCkCEkJ6TusgrofviAx4FQdMysfqoCXMihZJC8he2TBz: transferring token 27
INFO     apscheduler.scheduler Scheduler has been shut down
INFO     dipdup.index         hen_mainnet: Processing operations of level 1365940
INFO     dipdup.index         hen_mainnet: opSL8BQsmLdyR9x5CqmZv9TdprgSXjKQWxMyMkvBMoDMiPboG7N: `on_hen_mint` handler matched!
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/dipdup/context.py", line 393, in _callback_wrapper
    yield
  File "/usr/local/lib/python3.9/site-packages/dipdup/context.py", line 338, in fire_handler
    await handler_config.callback_fn(new_ctx, *args, **kwargs)
  File "/indexer/indexer/handlers/on_transfer.py", line 18, in on_transfer
    token = await models.Token.filter(token_id=int(tx.token_id), source="xarb").get()
  File "/usr/local/lib/python3.9/site-packages/tortoise/queryset.py", line 978, in _execute
    raise DoesNotExist("Object does not exist")
tortoise.exceptions.DoesNotExist: Object does not exist

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/bin/dipdup", line 8, in <module>
    sys.exit(cli())
  File "/usr/local/lib/python3.9/site-packages/asyncclick/core.py", line 1150, in __call__
    return anyio.run(self._main, main, args, kwargs, **({"backend":_anyio_backend} if _anyio_backend is not None else {}))
  File "/usr/local/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 56, in run
    return asynclib.run(func, *args, **backend_options)
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 233, in run
    return native_run(wrapper(), debug=debug)
  File "/usr/local/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/local/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 228, in wrapper
    return await func(*args)
  File "/usr/local/lib/python3.9/site-packages/asyncclick/core.py", line 1153, in _main
    return await main(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/asyncclick/core.py", line 1074, in main
    rv = await self.invoke(ctx)
  File "/usr/local/lib/python3.9/site-packages/asyncclick/core.py", line 1684, in invoke
    return await _process_result(await sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.9/site-packages/asyncclick/core.py", line 1420, in invoke
    return await ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.9/site-packages/asyncclick/core.py", line 774, in invoke
    rv = await rv
  File "/usr/local/lib/python3.9/site-packages/dipdup/cli.py", line 94, in wrapper
    await fn(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/dipdup/cli.py", line 215, in run
    await dipdup.run()
  File "/usr/local/lib/python3.9/site-packages/dipdup/dipdup.py", line 370, in run
    await gather(*tasks)
  File "/usr/local/lib/python3.9/site-packages/dipdup/dipdup.py", line 122, in run
    await gather(*tasks)
  File "/usr/local/lib/python3.9/site-packages/dipdup/index.py", line 206, in process
    await self._synchronize(sync_level)
  File "/usr/local/lib/python3.9/site-packages/dipdup/index.py", line 327, in _synchronize
    await self._process_level_operations(operation_subgroups)
  File "/usr/local/lib/python3.9/site-packages/dipdup/index.py", line 378, in _process_level_operations
    await self._call_matched_handler(handler_config, operation_subgroup, args)
  File "/usr/local/lib/python3.9/site-packages/dipdup/index.py", line 517, in _call_matched_handler
    await self._ctx.fire_handler(
  File "/usr/local/lib/python3.9/site-packages/dipdup/context.py", line 106, in fire_handler
    await self.callbacks.fire_handler(self, name, index, datasource, fmt, *args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/dipdup/context.py", line 338, in fire_handler
    await handler_config.callback_fn(new_ctx, *args, **kwargs)
  File "/usr/local/lib/python3.9/contextlib.py", line 137, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.9/site-packages/dipdup/context.py", line 400, in _callback_wrapper
    raise CallbackError(kind, name) from e
dipdup.exceptions.CallbackError: ('handler', 'on_transfer')
________________________________________________________________________________

`on_transfer` handler callback execution failed.

As you can see at the 12th line of logs, The 27th token of xarb has been minted and inserted to database using await token.save() tortoise method. Token object is also printed at the end of the line (Bellow is the 12th line of above logs):

INFO     dipdup.handlers.on_mint oniLLHs4AAEpGFW6yeFuGrJJAqPYuL5G9y4XeiGPmr2f9jHwsqc: Mint of token 27 of xarb saved successfully, <Token>
adding metadata for token 27 of xarb

So, the 27th token should be existed in database. But before finishing the insertion of this token, next level is indexed! (Take a look at bellow logs which is line 15 - 22 of above logs):

INFO     dipdup.index         xarb_mainnet: Processing operations of level 1967528
INFO     dipdup.index         xarb_mainnet: oo8zzPVJCkCEkJ6TusgrofviAx4FQdMysfqoCXMihZJC8he2TBz: `on_operator_update` handler matched!
INFO     dipdup.index         xarb_mainnet: oo8zzPVJCkCEkJ6TusgrofviAx4FQdMysfqoCXMihZJC8he2TBz: `on_transfer` handler matched!
INFO     dipdup.index         xarb_mainnet: oo8zzPVJCkCEkJ6TusgrofviAx4FQdMysfqoCXMihZJC8he2TBz: `on_swap` handler matched!
INFO     dipdup.index         hen_mainnet: Processing operations of level 1365939
INFO     dipdup.index         hen_mainnet: ooQE3asVLN4DXvLGU6KQ2YCJe1ci2rh3HTfz42FdCfFwKNwZmb5: `on_hen_transfer` handler matched!
INFO     dipdup.index         hen_mainnet: ooQE3asVLN4DXvLGU6KQ2YCJe1ci2rh3HTfz42FdCfFwKNwZmb5: `on_hen_collect_minter` handler matched!
INFO     dipdup.handlers.on_transfer oo8zzPVJCkCEkJ6TusgrofviAx4FQdMysfqoCXMihZJC8he2TBz: transferring token 27

So, dipdup wants to transfer a token which is not inserted in database yet (No. 27 of xarb). This is where i think indexing processes of a specific level are not awaited.

I will appreciate if you can help me find out what is the issue,

Thanks

droserasprout commented 2 years ago

@fash-ns, the logic of wrapping level handlers within a single SQL transaction hasn't been changed from the very first DipDup releases. However, storage parsing and matching operations with handlers are performed in the async, so that the logs may be confusing.

In the previous reply, I suggested you enable debug logs to ensure that the token is inserted-selected in the correct order and with correct queries.

I can't tell the reason for this issue without knowing the exact code you run. I've tried to create a minimal example:

models.py

class Token(Model):
    token_id = fields.IntField()
    source = fields.TextField()

    class Meta:
        unique_together = ('token_id', 'source')

on_mint.py

async def on_mint(
    ctx: HandlerContext,
    mint: Transaction[MintParameter, XarbFa2Storage],
) -> None:
    await Token(
        token_id=list(mint.storage.token_metadata.keys())[0],
        source='xarb',
    ).save()

on_transfer.py

async def on_transfer(
    ctx: HandlerContext,
    transfer: Transaction[TransferParameter, XarbFa2Storage],
) -> None:
    tx = transfer.parameter.__root__[0].txs[0]
    token = await Token.filter(
        token_id=int(tx.token_id),
        source='xarb',
    ).get()

    if token.token_id == 27:
        print('🤔')
        quit()

Got the following output:

...
INFO     dipdup.index         xarb_mainnet: Processing operations of level 1967517
INFO     dipdup.index         xarb_mainnet: ooNCgzjCgpagRViS1bCQ9BRwje5WmNouKL5s9xaaNRbzxgUpuVJ: `on_transfer` handler matched!
INFO     dipdup.index         xarb_mainnet: ooNCgzjCgpagRViS1bCQ9BRwje5WmNouKL5s9xaaNRbzxgUpuVJ: `on_collect` handler matched!
INFO     dipdup.index         xarb_mainnet: Processing operations of level 1967518
INFO     dipdup.index         xarb_mainnet: oniLLHs4AAEpGFW6yeFuGrJJAqPYuL5G9y4XeiGPmr2f9jHwsqc: `on_mint` handler matched!
INFO     dipdup.index         xarb_mainnet: ooxVzHPKk3EB9nhkdzizTQtk5NPwV3KNDp4PD1YX2dYLt5CeKAD: `on_operator_update` handler matched!
INFO     dipdup.index         xarb_mainnet: ooxVzHPKk3EB9nhkdzizTQtk5NPwV3KNDp4PD1YX2dYLt5CeKAD: `on_transfer` handler matched!
INFO     dipdup.index         xarb_mainnet: ooxVzHPKk3EB9nhkdzizTQtk5NPwV3KNDp4PD1YX2dYLt5CeKAD: `on_swap` handler matched!
INFO     dipdup.index         xarb_mainnet: Processing operations of level 1967528
INFO     dipdup.index         xarb_mainnet: oo8zzPVJCkCEkJ6TusgrofviAx4FQdMysfqoCXMihZJC8he2TBz: `on_operator_update` handler matched!
INFO     dipdup.index         xarb_mainnet: oo8zzPVJCkCEkJ6TusgrofviAx4FQdMysfqoCXMihZJC8he2TBz: `on_transfer` handler matched!
INFO     dipdup.index         xarb_mainnet: oo8zzPVJCkCEkJ6TusgrofviAx4FQdMysfqoCXMihZJC8he2TBz: `on_swap` handler matched!
🤔

If you want to share the whole project with me for further investigation but not do it publicly, you can create a private GitHub repo or gist and add me to the collaborators.

droserasprout commented 2 years ago

Closing this issue since I can't reproduce it. Feel free to reopen if the problem persists.

fash-ns commented 2 years ago

Hey droserasprout, I've added you to our indexer repository. I'll really appreciate your help! Thanks