streamingfast / substreams-sink-sql

Apache License 2.0
12 stars 15 forks source link

Sink fails trying to insert multiple rows with the same PK #10

Closed fschoell closed 1 year ago

fschoell commented 1 year ago

Testing a Substream that retrieves deltas from a StoreMaxInt64 and translates them into database operations. See here for details. Table schema can be found here.

The sink fails however for some reason as it tries to insert the data with the same PK twice (instead of doing a INSERT INTO first and then an UPDATE transaction subsequently).

Logs from the Postgres sink:

2023-01-26T16:18:15.878+0100 DEBG (cli) re-binding flags {"cmd": "substreams-sink-postgres", "prefix": ""}
2023-01-26T16:18:15.879+0100 DEBG (cli) binding persistent flag {"actual": "delay-before-start", "rebind": "global-delay-before-start"}
2023-01-26T16:18:15.879+0100 DEBG (cli) binding persistent flag {"actual": "metrics-listen-addr", "rebind": "global-metrics-listen-addr"}
2023-01-26T16:18:15.879+0100 DEBG (cli) binding persistent flag {"actual": "pprof-listen-addr", "rebind": "global-pprof-listen-addr"}
2023-01-26T16:18:15.879+0100 DEBG (cli) re-binding flags {"cmd": "run", "prefix": "run-"}
2023-01-26T16:18:15.879+0100 DEBG (cli) binding flag {"actual": "development-mode", "rebind": "run-development-mode"}
2023-01-26T16:18:15.879+0100 DEBG (cli) binding flag {"actual": "flush-interval", "rebind": "run-flush-interval"}
2023-01-26T16:18:15.879+0100 DEBG (cli) binding flag {"actual": "insecure", "rebind": "run-insecure"}
2023-01-26T16:18:15.879+0100 DEBG (cli) binding flag {"actual": "irreversible-only", "rebind": "run-irreversible-only"}
2023-01-26T16:18:15.879+0100 DEBG (cli) binding flag {"actual": "live-block-time-delta", "rebind": "run-live-block-time-delta"}
2023-01-26T16:18:15.880+0100 DEBG (cli) binding flag {"actual": "plaintext", "rebind": "run-plaintext"}
2023-01-26T16:18:15.880+0100 DEBG (cli) binding flag {"actual": "undo-buffer-size", "rebind": "run-undo-buffer-size"}
2023-01-26T16:18:15.880+0100 DEBG (cli) reboung flags terminated {"cmd": "run"}
2023-01-26T16:18:15.880+0100 DEBG (cli) re-binding flags {"cmd": "setup", "prefix": "setup-"}
2023-01-26T16:18:15.880+0100 DEBG (cli) reboung flags terminated {"cmd": "setup"}
2023-01-26T16:18:15.880+0100 DEBG (cli) reboung flags terminated {"cmd": "substreams-sink-postgres"}
2023-01-26T16:18:15.883+0100 INFO (sink-postgres) starting prometheus metrics server {"listen_addr": "localhost:9102"}
2023-01-26T16:18:15.883+0100 INFO (sink-postgres) sink from psql {"dsn": "psql://app_user:password@127.0.0.1:5432/app_db?sslmode=disable", "endpoint": "eos.firehose.eosnation.io:9001", "manifest_path": "substreams.yaml", "output_module_name": "db_out", "block_range": ""}
2023-01-26T16:18:15.884+0100 INFO (sink-postgres) starting pprof server {"listen_addr": "localhost:6060"}
2023-01-26T16:18:15.950+0100 DEBG (sink-postgres) processing schema's table {"schema_name": "hdb_catalog", "table_name": "hdb_schema_notifications"}
2023-01-26T16:18:15.950+0100 DEBG (sink-postgres) processing schema's table {"schema_name": "public", "table_name": "last_block"}
2023-01-26T16:18:15.959+0100 DEBG (sink-postgres) processing schema's table {"schema_name": "public", "table_name": "max_action_block"}
2023-01-26T16:18:15.963+0100 DEBG (sink-postgres) processing schema's table {"schema_name": "public", "table_name": "max_trx_block"}
2023-01-26T16:18:15.966+0100 DEBG (sink-postgres) processing schema's table {"schema_name": "public", "table_name": "cursors"}
2023-01-26T16:18:15.973+0100 DEBG (sink-postgres) processing schema's table {"schema_name": "hdb_catalog", "table_name": "hdb_action_log"}
2023-01-26T16:18:15.973+0100 DEBG (sink-postgres) processing schema's table {"schema_name": "hdb_catalog", "table_name": "hdb_cron_event_invocation_logs"}
2023-01-26T16:18:15.973+0100 DEBG (sink-postgres) processing schema's table {"schema_name": "hdb_catalog", "table_name": "hdb_cron_events"}
2023-01-26T16:18:15.973+0100 DEBG (sink-postgres) processing schema's table {"schema_name": "hdb_catalog", "table_name": "hdb_metadata"}
2023-01-26T16:18:15.973+0100 DEBG (sink-postgres) processing schema's table {"schema_name": "hdb_catalog", "table_name": "hdb_scheduled_event_invocation_logs"}
2023-01-26T16:18:15.973+0100 DEBG (sink-postgres) processing schema's table {"schema_name": "hdb_catalog", "table_name": "hdb_scheduled_events"}
2023-01-26T16:18:15.973+0100 DEBG (sink-postgres) processing schema's table {"schema_name": "hdb_catalog", "table_name": "hdb_version"}
2023-01-26T16:18:15.973+0100 DEBG (sink-postgres) processing schema's table {"schema_name": "public", "table_name": "hourly_stats"}
2023-01-26T16:18:15.977+0100 INFO (sink-postgres) reading substreams manifest {"manifest_path": "substreams.yaml"}
2023-01-26T16:18:16.685+0100 DEBG (pipeline) skipping protofile already seen {"proto_file": "google/protobuf/timestamp.proto"}
2023-01-26T16:18:16.685+0100 DEBG (pipeline) skipping protofile already seen {"proto_file": "sf/substreams/v1/clock.proto"}
2023-01-26T16:18:16.685+0100 DEBG (pipeline) skipping protofile already seen {"proto_file": "sf/substreams/v1/modules.proto"}
2023-01-26T16:18:16.685+0100 DEBG (pipeline) skipping protofile already seen {"proto_file": "google/protobuf/descriptor.proto"}
2023-01-26T16:18:16.685+0100 DEBG (pipeline) skipping protofile already seen {"proto_file": "sf/substreams/v1/package.proto"}
2023-01-26T16:18:16.686+0100 DEBG (pipeline) skipping protofile already seen {"proto_file": "google/protobuf/any.proto"}
2023-01-26T16:18:16.686+0100 DEBG (pipeline) skipping protofile already seen {"proto_file": "sf/substreams/v1/substreams.proto"}
2023-01-26T16:18:17.003+0100 DEBG (pipeline) skipping protofile already seen {"proto_file": "google/protobuf/timestamp.proto"}
2023-01-26T16:18:17.003+0100 DEBG (pipeline) skipping protofile already seen {"proto_file": "sf/substreams/v1/clock.proto"}
2023-01-26T16:18:17.003+0100 DEBG (pipeline) skipping protofile already seen {"proto_file": "sf/substreams/v1/modules.proto"}
2023-01-26T16:18:17.003+0100 DEBG (pipeline) skipping protofile already seen {"proto_file": "google/protobuf/descriptor.proto"}
2023-01-26T16:18:17.003+0100 DEBG (pipeline) skipping protofile already seen {"proto_file": "sf/substreams/v1/package.proto"}
2023-01-26T16:18:17.003+0100 DEBG (pipeline) skipping protofile already seen {"proto_file": "google/protobuf/any.proto"}
2023-01-26T16:18:17.003+0100 DEBG (pipeline) skipping protofile already seen {"proto_file": "sf/substreams/v1/substreams.proto"}
2023-01-26T16:18:17.360+0100 DEBG (pipeline) skipping protofile already seen {"proto_file": "google/protobuf/timestamp.proto"}
2023-01-26T16:18:17.360+0100 DEBG (pipeline) skipping protofile already seen {"proto_file": "sf/substreams/v1/clock.proto"}
2023-01-26T16:18:17.360+0100 DEBG (pipeline) skipping protofile already seen {"proto_file": "sf/substreams/v1/modules.proto"}
2023-01-26T16:18:17.360+0100 DEBG (pipeline) skipping protofile already seen {"proto_file": "google/protobuf/descriptor.proto"}
2023-01-26T16:18:17.360+0100 DEBG (pipeline) skipping protofile already seen {"proto_file": "sf/substreams/v1/package.proto"}
2023-01-26T16:18:17.360+0100 DEBG (pipeline) skipping protofile already seen {"proto_file": "google/protobuf/any.proto"}
2023-01-26T16:18:17.360+0100 DEBG (pipeline) skipping protofile already seen {"proto_file": "sf/substreams/v1/substreams.proto"}
2023-01-26T16:18:17.360+0100 DEBG (pipeline) skipping protofile already seen {"proto_file": "sf/substreams/v1/test/test.proto"}
2023-01-26T16:18:17.360+0100 INFO (pipeline) computed start block {"module_name": "store_trx_count", "start_block": 2}
2023-01-26T16:18:17.360+0100 INFO (pipeline) computed start block {"module_name": "store_act_count", "start_block": 2}
2023-01-26T16:18:17.360+0100 INFO (pipeline) computed start block {"module_name": "store_max_trx_count", "start_block": 2}
2023-01-26T16:18:17.361+0100 INFO (pipeline) computed start block {"module_name": "store_max_action_count", "start_block": 2}
2023-01-26T16:18:17.361+0100 INFO (pipeline) computed start block {"module_name": "map_hourly_stats", "start_block": 2}
2023-01-26T16:18:17.361+0100 INFO (pipeline) computed start block {"module_name": "db_out", "start_block": 2}
2023-01-26T16:18:17.361+0100 INFO (sink-postgres) validating output store {"output_store": "db_out"}
2023-01-26T16:18:17.364+0100 INFO (sink-postgres) resolved block range {"start_block": 2, "stop_block": 0}
2023-01-26T16:18:17.366+0100 INFO (sink-postgres) ready, waiting for signal to quit
2023-01-26T16:18:17.371+0100 INFO (sink-postgres) starting stats service {"runs_each": "2s"}
2023-01-26T16:18:17.372+0100 INFO (sink-postgres) starting stats service {"runs_each": "2s"}
2023-01-26T16:18:17.372+0100 DEBG (substreams-clients) getting connection {"endpoint": "eos.firehose.eosnation.io:9001"}
2023-01-26T16:18:17.377+0100 DEBG (substreams-clients) creating new client {"endpoint": "eos.firehose.eosnation.io:9001"}
2023-01-26T16:18:17.377+0100 DEBG (substreams-clients) client created
2023-01-26T16:18:17.377+0100 DEBG (sink-postgres) launching substreams request {"start_block": 2, "cursor": ""}
2023-01-26T16:18:18.018+0100 DEBG (sink-postgres) substreams waiting to receive message {"cursor": ""}
2023-01-26T16:18:18.931+0100 INFO (sink-postgres) session init {"trace_id": "c519ae6d9b6387f7c2a6f5a7b369d784"}
2023-01-26T16:18:18.931+0100 DEBG (sink-postgres) substreams waiting to receive message {"cursor": ""}
2023-01-26T16:18:19.371+0100 DEBG (sink-postgres) received response progress {"progress": {"Progress":{"modules":[{"name":"store_max_trx_count","Type":{"ProcessedRanges":{"processed_ranges":[{"start_block":2,"end_block":4000},{"start_block":25000,"end_block":34000},{"start_block":50000,"end_block":59000},{"start_block":75000,"end_block":83000},{"start_block":100000,"end_block":107000},{"start_block":125000,"end_block":133000},{"start_block":155000,"end_block":157000}]}}},{"name":"store_act_count","Type":{"ProcessedRanges":{"processed_ranges":[{"start_block":2,"end_block":4000},{"start_block":25000,"end_block":33000},{"start_block":50000,"end_block":58000},{"start_block":75000,"end_block":84000},{"start_block":100000,"end_block":108000},{"start_block":125000,"end_block":134000}]}}},{"name":"store_trx_count","Type":{"ProcessedRanges":{"processed_ranges":[{"start_block":2,"end_block":4000},{"start_block":25000,"end_block":34000},{"start_block":50000,"end_block":58000},{"start_block":75000,"end_block":82000},{"start_block":100000,"end_block":108000},{"start_block":125000,"end_block":133000}]}}},{"name":"store_max_action_count","Type":{"ProcessedRanges":{"processed_ranges":[{"start_block":2,"end_block":4000},{"start_block":25000,"end_block":33000},{"start_block":50000,"end_block":57000},{"start_block":75000,"end_block":82000},{"start_block":100000,"end_block":108000},{"start_block":125000,"end_block":132000}]}}}]}}}
2023-01-26T16:18:19.371+0100 DEBG (sink-postgres) substreams waiting to receive message {"cursor": ""}
2023-01-26T16:18:19.371+0100 INFO (sink-postgres) substreams sink stats {"db_flush_rate": "0.000 flush/s (0 total)", "data_msg_rate": "0.000 msg/s (0 total)", "progress_msg_rate": "0.000 msg/s (0 total)", "block_rate": "0.000 blocks/s (0 total)", "flushed_entries": 0, "last_block": "None"}
2023-01-26T16:18:19.372+0100 INFO (sink-postgres) substreams sink stats {"progress_msg_rate": "2.000 msg/s (4 total)", "block_rate": "0.000 blocks/s (0 total)", "last_block": "None"}
2023-01-26T16:18:20.237+0100 DEBG (sink-postgres) processing insert operation {"table_name": "max_trx_block", "primary_key": "EOS", "field_count": 2}
2023-01-26T16:18:20.237+0100 DEBG (sink-postgres) adding tracking of table never seen before {"table_name": "max_trx_block"}
2023-01-26T16:18:20.237+0100 DEBG (sink-postgres) primary key entry never existed for table, adding insert operation {"primary_key": "EOS", "table_name": "max_trx_block"}
2023-01-26T16:18:20.237+0100 DEBG (sink-postgres) processing insert operation {"table_name": "max_action_block", "primary_key": "EOS", "field_count": 2}
2023-01-26T16:18:20.237+0100 DEBG (sink-postgres) adding tracking of table never seen before {"table_name": "max_action_block"}
2023-01-26T16:18:20.237+0100 DEBG (sink-postgres) primary key entry never existed for table, adding insert operation {"primary_key": "EOS", "table_name": "max_action_block"}
2023-01-26T16:18:20.237+0100 DEBG (sink-postgres) substreams waiting to receive message {"cursor": "AxhcstDz76VtanOZXj-ly6WwLpc_DFhlVQvlKhFBhYnw9yTGiJ-kCGUnbBXSxqqhi0bpHl2u2o2YFSp9oZIDvdnqx71n73RrQHl4l9zpquLrf6DxOQwScrFgVeqOYtjbWzzVZA2teeQC4t3jaPfRZRQxM5F1LjO0jTsC94QAdvBD6nI9wzj4cM2Hg_2UpYYU-LNzQOPyliulDjY="}
2023-01-26T16:18:20.237+0100 DEBG (sink-postgres) processing insert operation {"table_name": "max_trx_block", "primary_key": "EOS", "field_count": 2}
2023-01-26T16:18:20.237+0100 ERRO (sink-postgres) substreams encountered an error {"error": "handle block scope data: apply database changes: database insert: attempting to insert in table \"max_trx_block\" a primary key \"EOS\", that is already scheduled for insertion, insert should only be called once for a given primary key"}
2023-01-26T16:18:20.237+0100 INFO (sink-postgres) sleeping before re-connecting {"sleep": "662.16376ms"}
2023-01-26T16:18:20.900+0100 DEBG (sink-postgres) launching substreams request {"start_block": 2, "cursor": "SxTEEe7L2qNhwBDC5cChzaWwLpc_DFllVQvlKhFBhYii-XHB2Z_zUWRwPB3Xwvz0iRDqTl2s3orPRnd88ccDvYfuy70x5XA5RS5-wYrr_rS8eaL6a1gfdbFhVeqOYtjbWzzUNgP4frUCtYTiP6fZYBBnZpMjLWO0jz8FoNddd6AW6nJjxzT4JseD0fjDo9BC-uclF-XwnXnxAzE="}
2023-01-26T16:18:20.900+0100 DEBG (sink-postgres) substreams waiting to receive message {"cursor": "SxTEEe7L2qNhwBDC5cChzaWwLpc_DFllVQvlKhFBhYii-XHB2Z_zUWRwPB3Xwvz0iRDqTl2s3orPRnd88ccDvYfuy70x5XA5RS5-wYrr_rS8eaL6a1gfdbFhVeqOYtjbWzzUNgP4frUCtYTiP6fZYBBnZpMjLWO0jz8FoNddd6AW6nJjxzT4JseD0fjDo9BC-uclF-XwnXnxAzE="}
2023-01-26T16:18:21.371+0100 INFO (sink-postgres) substreams sink stats {"db_flush_rate": "0.000 flush/s (0 total)", "data_msg_rate": "0.000 msg/s (0 total)", "progress_msg_rate": "0.000 msg/s (0 total)", "block_rate": "0.000 blocks/s (0 total)", "flushed_entries": 0, "last_block": "None"}
2023-01-26T16:18:21.372+0100 INFO (sink-postgres) substreams sink stats {"progress_msg_rate": "1.000 msg/s (4 total)", "block_rate": "0.250 blocks/s (1 total)", "last_block": "#2 (0000000267f3e2284b482f3afc2e724be1d6cbc1804532ec62d4e7af47c30693)"}
2023-01-26T16:18:21.851+0100 INFO (sink-postgres) session init {"trace_id": "80e3fb475d2bf8b264288169b1c52a39"}
2023-01-26T16:18:21.851+0100 DEBG (sink-postgres) substreams waiting to receive message {"cursor": "SxTEEe7L2qNhwBDC5cChzaWwLpc_DFllVQvlKhFBhYii-XHB2Z_zUWRwPB3Xwvz0iRDqTl2s3orPRnd88ccDvYfuy70x5XA5RS5-wYrr_rS8eaL6a1gfdbFhVeqOYtjbWzzUNgP4frUCtYTiP6fZYBBnZpMjLWO0jz8FoNddd6AW6nJjxzT4JseD0fjDo9BC-uclF-XwnXnxAzE="}
2023-01-26T16:18:23.300+0100 DEBG (sink-postgres) received response progress {"progress": {"Progress":{"modules":[{"name":"store_act_count","Type":{"ProcessedRanges":{"processed_ranges":[{"start_block":2,"end_block":4000},{"start_block":25000,"end_block":33000},{"start_block":50000,"end_block":58000},{"start_block":75000,"end_block":84000},{"start_block":100000,"end_block":108000},{"start_block":125000,"end_block":134000}]}}},{"name":"store_trx_count","Type":{"ProcessedRanges":{"processed_ranges":[{"start_block":2,"end_block":4000},{"start_block":25000,"end_block":34000},{"start_block":50000,"end_block":58000},{"start_block":75000,"end_block":82000},{"start_block":100000,"end_block":108000},{"start_block":125000,"end_block":133000}]}}},{"name":"store_max_action_count","Type":{"ProcessedRanges":{"processed_ranges":[{"start_block":2,"end_block":4000},{"start_block":25000,"end_block":33000},{"start_block":50000,"end_block":57000},{"start_block":75000,"end_block":82000},{"start_block":100000,"end_block":108000},{"start_block":125000,"end_block":132000}]}}},{"name":"store_max_trx_count","Type":{"ProcessedRanges":{"processed_ranges":[{"start_block":2,"end_block":4000},{"start_block":25000,"end_block":34000},{"start_block":50000,"end_block":59000},{"start_block":75000,"end_block":83000},{"start_block":100000,"end_block":107000},{"start_block":125000,"end_block":133000},{"start_block":155000,"end_block":157000}]}}}]}}}
2023-01-26T16:18:23.300+0100 DEBG (sink-postgres) substreams waiting to receive message {"cursor": "SxTEEe7L2qNhwBDC5cChzaWwLpc_DFllVQvlKhFBhYii-XHB2Z_zUWRwPB3Xwvz0iRDqTl2s3orPRnd88ccDvYfuy70x5XA5RS5-wYrr_rS8eaL6a1gfdbFhVeqOYtjbWzzUNgP4frUCtYTiP6fZYBBnZpMjLWO0jz8FoNddd6AW6nJjxzT4JseD0fjDo9BC-uclF-XwnXnxAzE="}
2023-01-26T16:18:23.300+0100 DEBG (sink-postgres) processing insert operation {"table_name": "max_trx_block", "primary_key": "EOS", "field_count": 2}
2023-01-26T16:18:23.301+0100 ERRO (sink-postgres) substreams encountered an error {"error": "handle block scope data: apply database changes: database insert: attempting to insert in table \"max_trx_block\" a primary key \"EOS\", that is already scheduled for insertion, insert should only be called once for a given primary key"}
2023-01-26T16:18:23.301+0100 INFO (sink-postgres) sleeping before re-connecting {"sleep": "393.470638ms"}
2023-01-26T16:18:23.371+0100 INFO (sink-postgres) substreams sink stats {"db_flush_rate": "0.000 flush/s (0 total)", "data_msg_rate": "0.000 msg/s (0 total)", "progress_msg_rate": "0.000 msg/s (0 total)", "block_rate": "0.000 blocks/s (0 total)", "flushed_entries": 0, "last_block": "None"}
2023-01-26T16:18:23.372+0100 INFO (sink-postgres) substreams sink stats {"progress_msg_rate": "1.333 msg/s (8 total)", "block_rate": "0.167 blocks/s (1 total)", "last_block": "#2 (0000000267f3e2284b482f3afc2e724be1d6cbc1804532ec62d4e7af47c30693)"}
2023-01-26T16:18:23.694+0100 DEBG (sink-postgres) launching substreams request {"start_block": 2, "cursor": "kdxsENULnJPfmPZ1NPpnfqWwLpc_DF5lVQvlKhFBhY-goSPF1MukUmcjbRTVkazx3xTsTFis3d2bFHZ7ocFSvdPql78x6XM8RCgvwYDr_b25KvGjMFseebFmVeqOYtjbWzzTNFuqerhW4ofhbPbQYkM3Y8UnK2GxjzxS9IVccPAQu3I3w2j6JsuA1PnF8tBI-uQsErajxCLyAj0="}
2023-01-26T16:18:23.696+0100 DEBG (sink-postgres) substreams waiting to receive message {"cursor": "kdxsENULnJPfmPZ1NPpnfqWwLpc_DF5lVQvlKhFBhY-goSPF1MukUmcjbRTVkazx3xTsTFis3d2bFHZ7ocFSvdPql78x6XM8RCgvwYDr_b25KvGjMFseebFmVeqOYtjbWzzTNFuqerhW4ofhbPbQYkM3Y8UnK2GxjzxS9IVccPAQu3I3w2j6JsuA1PnF8tBI-uQsErajxCLyAj0="}
2023-01-26T16:18:25.145+0100 INFO (sink-postgres) session init {"trace_id": "3b5c27019157162e6836caaba1bf1961"}
2023-01-26T16:18:25.145+0100 DEBG (sink-postgres) substreams waiting to receive message {"cursor": "kdxsENULnJPfmPZ1NPpnfqWwLpc_DF5lVQvlKhFBhY-goSPF1MukUmcjbRTVkazx3xTsTFis3d2bFHZ7ocFSvdPql78x6XM8RCgvwYDr_b25KvGjMFseebFmVeqOYtjbWzzTNFuqerhW4ofhbPbQYkM3Y8UnK2GxjzxS9IVccPAQu3I3w2j6JsuA1PnF8tBI-uQsErajxCLyAj0="}
2023-01-26T16:18:25.371+0100 INFO (sink-postgres) substreams sink stats {"db_flush_rate": "0.000 flush/s (0 total)", "data_msg_rate": "0.000 msg/s (0 total)", "progress_msg_rate": "0.000 msg/s (0 total)", "block_rate": "0.000 blocks/s (0 total)", "flushed_entries": 0, "last_block": "None"}
2023-01-26T16:18:25.372+0100 INFO (sink-postgres) substreams sink stats {"progress_msg_rate": "1.000 msg/s (8 total)", "block_rate": "0.125 blocks/s (1 total)", "last_block": "#2 (0000000267f3e2284b482f3afc2e724be1d6cbc1804532ec62d4e7af47c30693)"}
2023-01-26T16:18:27.372+0100 INFO (sink-postgres) substreams sink stats {"db_flush_rate": "0.000 flush/s (0 total)", "data_msg_rate": "0.000 msg/s (0 total)", "progress_msg_rate": "0.000 msg/s (0 total)", "block_rate": "0.000 blocks/s (0 total)", "flushed_entries": 0, "last_block": "None"}
2023-01-26T16:18:27.373+0100 INFO (sink-postgres) substreams sink stats {"progress_msg_rate": "0.800 msg/s (8 total)", "block_rate": "0.100 blocks/s (1 total)", "last_block": "#2 (0000000267f3e2284b482f3afc2e724be1d6cbc1804532ec62d4e7af47c30693)"}
2023-01-26T16:18:29.371+0100 INFO (sink-postgres) substreams sink stats {"db_flush_rate": "0.000 flush/s (0 total)", "data_msg_rate": "0.000 msg/s (0 total)", "progress_msg_rate": "0.000 msg/s (0 total)", "block_rate": "0.000 blocks/s (0 total)", "flushed_entries": 0, "last_block": "None"}
2023-01-26T16:18:29.373+0100 INFO (sink-postgres) substreams sink stats {"progress_msg_rate": "0.667 msg/s (8 total)", "block_rate": "0.083 blocks/s (1 total)", "last_block": "#2 (0000000267f3e2284b482f3afc2e724be1d6cbc1804532ec62d4e7af47c30693)"}
2023-01-26T16:18:30.483+0100 DEBG (sink-postgres) received response progress {"progress": {"Progress":{"modules":[{"name":"store_act_count","Type":{"ProcessedRanges":{"processed_ranges":[{"start_block":2,"end_block":4000},{"start_block":4000,"end_block":5000},{"start_block":25000,"end_block":34000},{"start_block":50000,"end_block":59000},{"start_block":75000,"end_block":85000},{"start_block":100000,"end_block":109000},{"start_block":125000,"end_block":135000}]}}},{"name":"store_trx_count","Type":{"ProcessedRanges":{"processed_ranges":[{"start_block":2,"end_block":4000},{"start_block":4000,"end_block":5000},{"start_block":25000,"end_block":36000},{"start_block":50000,"end_block":59000},{"start_block":75000,"end_block":84000},{"start_block":100000,"end_block":110000},{"start_block":125000,"end_block":134000}]}}},{"name":"store_max_action_count","Type":{"ProcessedRanges":{"processed_ranges":[{"start_block":2,"end_block":4000},{"start_block":25000,"end_block":35000},{"start_block":50000,"end_block":59000},{"start_block":75000,"end_block":83000},{"start_block":100000,"end_block":109000},{"start_block":125000,"end_block":133000}]}}},{"name":"store_max_trx_count","Type":{"ProcessedRanges":{"processed_ranges":[{"start_block":2,"end_block":4000},{"start_block":4000,"end_block":5000},{"start_block":25000,"end_block":35000},{"start_block":50000,"end_block":60000},{"start_block":75000,"end_block":85000},{"start_block":100000,"end_block":108000},{"start_block":125000,"end_block":135000},{"start_block":155000,"end_block":157000}]}}}]}}}
2023-01-26T16:18:30.483+0100 DEBG (sink-postgres) substreams waiting to receive message {"cursor": "kdxsENULnJPfmPZ1NPpnfqWwLpc_DF5lVQvlKhFBhY-goSPF1MukUmcjbRTVkazx3xTsTFis3d2bFHZ7ocFSvdPql78x6XM8RCgvwYDr_b25KvGjMFseebFmVeqOYtjbWzzTNFuqerhW4ofhbPbQYkM3Y8UnK2GxjzxS9IVccPAQu3I3w2j6JsuA1PnF8tBI-uQsErajxCLyAj0="}
2023-01-26T16:18:30.580+0100 DEBG (sink-postgres) processing insert operation {"table_name": "max_trx_block", "primary_key": "EOS", "field_count": 2}
2023-01-26T16:18:30.580+0100 ERRO (sink-postgres) substreams encountered an error {"error": "handle block scope data: apply database changes: database insert: attempting to insert in table \"max_trx_block\" a primary key \"EOS\", that is already scheduled for insertion, insert should only be called once for a given primary key"}
2023-01-26T16:18:30.580+0100 INFO (sink-postgres) sleeping before re-connecting {"sleep": "542.090085ms"}
2023-01-26T16:18:31.123+0100 DEBG (sink-postgres) launching substreams request {"start_block": 2, "cursor": "ERp3WSL2l9L8BmL8lBioOKWwLpc_DF9lVQvlKhFBhY6l93bH2cijBzchbEuEmPj2jEPoGFyt3I3MFXp6ocEE7YLowLxs6XIxRH4kxo67qOK5K6egbQ9Kd7FnVeqOYtjbWzzSMQ3_eLVV5dKxbvePM0pjZJZwLzW1jj0Co4RQcfAQ7SJmwT_5e8uB2fmT-ddGqrFzErf1x3-mVjM="}
2023-01-26T16:18:31.123+0100 DEBG (sink-postgres) substreams waiting to receive message {"cursor": "ERp3WSL2l9L8BmL8lBioOKWwLpc_DF9lVQvlKhFBhY6l93bH2cijBzchbEuEmPj2jEPoGFyt3I3MFXp6ocEE7YLowLxs6XIxRH4kxo67qOK5K6egbQ9Kd7FnVeqOYtjbWzzSMQ3_eLVV5dKxbvePM0pjZJZwLzW1jj0Co4RQcfAQ7SJmwT_5e8uB2fmT-ddGqrFzErf1x3-mVjM="}
^C2023-01-26T16:18:31.349+0100 INFO (derr) Received termination signal... Ctrl+C multiple times to force kill {"signal": "interrupt"}
2023-01-26T16:18:31.349+0100 INFO (sink-postgres) received termination signal, quitting application
2023-01-26T16:18:31.349+0100 INFO (sink-postgres) waiting for app termination
2023-01-26T16:18:31.349+0100 INFO (sink-postgres) application terminating shutting down sinker
2023-01-26T16:18:31.349+0100 INFO (sink-postgres) app terminated
colindickson commented 1 year ago

@fschoell The PK in the sink is the primary key, so it is definitely intended behaviour that it should fail if you try to an insert multiple times with the same primary key.

I see that the code always uses &blk_stats.chain.clone() as the primary key. This value should instead be unique to the row itself (usually the value given as the key in the delta).

fschoell commented 1 year ago

@fschoell The PK in the sink is the primary key, so it is definitely intended behaviour that it should fail if you try to an insert multiple times with the same primary key.

I see that the code always uses &blk_stats.chain.clone() as the primary key. This value should instead be unique to the row itself (usually the value given as the key in the delta).

Yes that's on purpose. Those tables should only hold one row that contains the max transaction/action count found within all blocks so far. So the PK is always the same. On the first write to the StoreMaxInt64 the db_out function should generate a Create (INSERT INTO) database operation and whenever a new max value is found, it should generate an Update (UPDATE) database operation.

fschoell commented 1 year ago

ah ok the key is different so there will be multiple creates I guess (one per key, not one per entry). Makes sense