pinax-network / substreams-sink

Substreams Sink library
MIT License
4 stars 1 forks source link

PRODUCTION_MODE with INACTIVITY_SECONDS #21

Closed sdevkc closed 9 months ago

sdevkc commented 10 months ago

When PRODUCTION_MODE=true and using INACTIVITY_SECONDS. I think there is a difference between it actually being inactive or not. right now it just exits after INACTIVITY_SECONDS even if its processing in the background.

I'm not sure how the inactivty should be defined here but atleast I don't think it should be considered inactive when response.message.value.processedBytes.totalBytesRead is increasing over time.

Just a a quick thought, maybe in onInactivitySeconds we can do something like:

let lastTotalBytesRead: bigint = 0n;
let currentTotalBytesRead: bigint = 0n;

...

emitter.on("progress", (progress) => {
    if (progress.processedBytes) {
        currentTotalBytesRead = progress.processedBytes.totalBytesRead;
    }
});
chamorin commented 10 months ago

The process is said "inactive" when the sink stops receiving clock events, then it does the countdown. If give a stop block using -t --stop-block <int> and that stop block is reached the process will stop

sdevkc commented 10 months ago

However in production mode, it does not write data immediately, so there are no clock events but is still reading in the background, i.e.,

...

INFO    /dist/index.js:71       sink-js ProcessedBytes {
  totalBytesRead: 420915677n,
  totalBytesWritten: 0n
}
INFO    /dist/index.js:71       sink-js ProcessedBytes {
  totalBytesRead: 444417248n,
  totalBytesWritten: 0n
}
INFO    /dist/index.js:71       sink-js ProcessedBytes {
  totalBytesRead: 466314327n,
  totalBytesWritten: 0n
}
INFO    /dist/index.js:71       sink-js ProcessedBytes {
  totalBytesRead: 490045787n,
  totalBytesWritten: 0n
}
INFO    /dist/index.js:71       sink-js ProcessedBytes {
  totalBytesRead: 507357031n,
  totalBytesWritten: 0n
}
ERROR   /node_modules/substreams-sink/dist/src/inactivitySeconds.js:9   sink-js Process will exit due to inactivity for 10 seconds

So my question is, should this be considered "inactive"?

chamorin commented 10 months ago

I'm getting clock events with PRODUCTION_MODE=true, also set INACTIVITY_SECONDS=60 to check if it would stop but it didn't 🤔 Could you share the command with the command line options that you are running please?

chamorin commented 10 months ago

But to answer your question: while there's bytes read it shouldn't be considered as "inactive"

sdevkc commented 10 months ago

I used npm start and using the Solana endpoint with an empty start cursor. Here are more details:

  "scripts": {
    "start": "tsc && node ./dist/bin/cli.js run",
    "build": "tsc",
    "rollup": "rollup --config rollup.config.mjs",
    "linux": "npm run build && npm run rollup && ./scripts/build.linux.sh",
    "macos": "npm run build && npm run rollup && ./scripts/build.macos.sh",
    "prepublishOnly": "npm run build"
  },
  "dependencies": {
    "substreams-sink": "0.13.4"
  },
SUBSTREAMS_API_TOKEN=...
SUBSTREAMS_ENDPOINT=https://mainnet.sol.streamingfast.io:443
AUTH_ISSUE_URL=https://auth.streamingfast.io/v1/auth/issue

MANIFEST=/home/sol.spkg
MODULE_NAME=map_transactions
START_BLOCK=236965261

VERBOSE=true
PRODUCTION_MODE=true
INACTIVITY_SECONDS=60
npm start                                                                                                      

> sink-js@0.0.1 start
> tsc && node ./dist/bin/cli.js run

(node:5969) ExperimentalWarning: Importing JSON modules is an experimental feature and might change at any time
(Use `node --trace-warnings ...` to show where the warning was created)
INFO    /node_modules/substreams-sink/dist/src/prometheus.js:93 sink-js manifest {
  moduleHash: 'ebe8a4a3fd48f74bbf76ab1826ab5ae52f9d6b21',
  manifest: '/home/sol.spkg',
  substreamsEndpoint: 'https://mainnet.sol.streamingfast.io:443',
  finalBlocksOnly: 'false',
  productionMode: 'true'
}
INFO    /dist/index.js:50       sink-js
Start cursor:  
INFO    /node_modules/substreams-sink/dist/src/http.js:20       sink-js prometheus server {
  hostname: 'localhost',
  port: 9102
}
INFO    /node_modules/substreams-sink/dist/src/prometheus.js:82 sink-js session {
  traceId: '6f54552d8fbe31cd8e2f20e84e8bcdd6',
  resolvedStartBlock: '236965261',
  linearHandoffBlock: '237287400',
  maxParallelWorkers: '70'
}

...

INFO    /dist/index.js:55       sink-js ProcessedBytes {
  totalBytesRead: 5743185216n,
  totalBytesWritten: 0n
}
INFO    /dist/index.js:55       sink-js ProcessedBytes {
  totalBytesRead: 5788490535n,
  totalBytesWritten: 0n
}
INFO    /dist/index.js:55       sink-js ProcessedBytes {
  totalBytesRead: 5833945547n,
  totalBytesWritten: 0n
}
ERROR   /node_modules/substreams-sink/dist/src/inactivitySeconds.js:9   sink-js Process will exit due to inactivity for 60 seconds
chamorin commented 10 months ago

Sorry for the late response, I'll try it on my side 👍 Happy new year! 🎉

chamorin commented 10 months ago

Ok I tried using https://mainnet.sol.streamingfast.io:443 as the SUBSTREAMS_ENDPOINT and I'm getting:

Process will exit due to inactivity for 60 seconds

when production mode is at true but also when it's at false

sdevkc commented 10 months ago

Happy new year!

Ok I tried using https://mainnet.sol.streamingfast.io:443 as the SUBSTREAMS_ENDPOINT and I'm getting:

Sorry Its not clear what your start block is. Do you mean it exited due to no clock events when bytes are still reading, or its working as expected?

chamorin commented 10 months ago

There's no clock events received from the substreams endpoint so it triggers the inactivity and stops the process

sdevkc commented 10 months ago

Is this an endpoint issue? I've never tried with eth. Do you intend to make adjustments to this with maybe totalBytesRead for tracking progress or this is an issue on streamingfast's end?

sdevkc commented 9 months ago

@chamorin Hey any news on this? Work in progress or TODO?

chamorin commented 9 months ago

Hey @sdevkc, I got it to work with this env had to wait a bit for it to start processing:

MANIFEST=/home/substreams-solana-quickstart-v1.0.0.spkg
SUBSTREAMS_ENDPOINT=https://mainnet.sol.streamingfast.io:443
MODULE_NAME=map_block

VERBOSE=true
START_BLOCK=236965261
PRODUCTION_MODE=true
INACTIVITY_SECONDS=500

SUBSTREAMS_API_TOKEN=server_...
AUTH_ISSUE_URL=https://auth.streamingfast.io/v1/auth/issue

Here's the output from the Prometheus /metrics endpoint after letting it run for some time:

# HELP substreams_sink_data_message The number of data message received
# TYPE substreams_sink_data_message counter
substreams_sink_data_message 137890

# HELP substreams_sink_data_message_size_bytes The total size of in bytes of all data message received
# TYPE substreams_sink_data_message_size_bytes counter
substreams_sink_data_message_size_bytes 59100659

# HELP substreams_sink_undo_message The number of block undo message received
# TYPE substreams_sink_undo_message counter
substreams_sink_undo_message 0

# HELP head_block_number Last processed block number
# TYPE head_block_number gauge
head_block_number 237156373

# HELP head_block_time_drift Head block time drift in seconds
# TYPE head_block_time_drift gauge
head_block_time_drift 2788158

# HELP head_block_timestamp Head block timestamp
# TYPE head_block_timestamp gauge
head_block_timestamp 1703155488

# HELP substreams_sink_backprocessing_completion Determines if backprocessing is completed, which is if we receive a first data message
# TYPE substreams_sink_backprocessing_completion gauge
substreams_sink_backprocessing_completion 1

# HELP custom_counter help
# TYPE custom_counter counter
custom_counter 137890

# HELP manifest Substreams manifest and sha256 hash of map module
# TYPE manifest gauge
manifest{module_hash="5cd98b936223a96b71dc8670f1c4786ae696e4fd",manifest="/home/substreams-solana-quickstart-v1.0.0.spkg",output_module="map_block",substreams_endpoint="https://mainnet.sol.streamingfast.io:443",start_block_num="236965261",stop_block_num="0",production_mode="true",final_blocks_only="false"} 1

# HELP session Substreams Session
# TYPE session gauge
session{trace_id="a772d39ff63d6d5cded126151b12df70",resolved_start_block="237005051",linear_handoff_block="243436900",max_parallel_workers="70"} 1
chamorin commented 9 months ago

I changed it so now it listens to any message instead of clock events to determine if it's inactive or not

chamorin commented 9 months ago

Created a new release, let me know if everything works

sdevkc commented 9 months ago

Thanks, will check it out and let you know.

sdevkc commented 9 months ago

@chamorin

Hey I'm still getting the same problem. I'm not receiving anyMessage but is receiving progress.

npm list substreams-sink 
...
└── substreams-sink@0.13.8

Env:

SUBSTREAMS_API_TOKEN=...
SUBSTREAMS_ENDPOINT=https://mainnet.sol.streamingfast.io:443
AUTH_ISSUE_URL=https://auth.streamingfast.io/v1/auth/issue

MANIFEST=/home/sol.spkg
MODULE_NAME=map_transactions
START_BLOCK=243632376

VERBOSE=true
PRODUCTION_MODE=true
INACTIVITY_SECONDS=15

Code:

import fs from "fs";

import { type ActionOptions } from "./bin/cli.js";
import { commander, setup, prometheus, http, logger } from "substreams-sink";

// Custom Prometheus Counters
const customCounter = prometheus.registerCounter("custom_counter");

export async function action(options: ActionOptions) {
    // Setup sink for Block Emitter
    const { emitter, startCursor } = await setup(options);
    logger.info("\nStart cursor: ", startCursor);

    emitter.on("progress", (progress) => {
        if (progress.processedBytes) {
            logger.info(
                "totalBytesRead: ",
                progress.processedBytes.totalBytesRead
            );
        }
    });

    // Stream Blocks
    emitter.on("anyMessage", async (message, cursor, clock) => {
        logger.info(JSON.stringify({ clock }));
        customCounter?.inc(1);
        let text = JSON.stringify({ clock });
        fs.appendFileSync("local.json", `${text}\n`);
    });

    emitter.on("progress", (progress) => {
        logger.info(progress.processedBytes);
    });

    // Setup HTTP server & Prometheus metrics
    http.listen(options);

    // Start streaming
    await emitter.start();
    logger.info(`Start emitting`);

    http.server.close();
    logger.info(`Http server closed`);
}
INFO    /dist/index.js:47       sink-js
Start cursor:  
INFO    /dist/index.js:74       sink-js Start emitting
INFO    /dist/index.js:76       sink-js Http server closed
INFO    /node_modules/substreams-sink/dist/src/http.js:20       sink-js prometheus server {
  hostname: 'localhost',
  port: 9102
}
INFO    /node_modules/substreams-sink/dist/src/prometheus.js:82 sink-js session {
  traceId: 'a7bb1ff824ae52993e511696fb9bf654',
  resolvedStartBlock: '243632376',
  linearHandoffBlock: '244319700',
  maxParallelWorkers: '70'
}
INFO    /dist/index.js:50       sink-js totalBytesRead:  88842320n
INFO    /dist/index.js:68       sink-js ProcessedBytes {
  totalBytesRead: 88842320n,
  totalBytesWritten: 0n
}

...

INFO    /dist/index.js:50       sink-js totalBytesRead:  1968979095n
INFO    /dist/index.js:68       sink-js ProcessedBytes {
  totalBytesRead: 1968979095n,
  totalBytesWritten: 0n
}
ERROR   /node_modules/substreams-sink/dist/src/inactivitySeconds.js:9   sink-js Process will exit due to inactivity for 15 seconds
sdevkc commented 9 months ago

Hey, will this be fixed or can i make a PR? @chamorin

chamorin commented 9 months ago

Hey @sdevkc, always welcomed to open a PR 👌 Thanks

DenisCarriere commented 9 months ago

@sdevkc should be fixed, however.. if you are using StreamingFast endpoint, you'll need to wait to this newest release since the Authentication uses API Key instead of JWT API token.

sdevkc commented 9 months ago

Doesn't work if obtain the Streamingfast JWT token from https://auth.streamingfast.io/v1/auth/issue?