sei-protocol / sei-chain

Apache License 2.0
2.72k stars 815 forks source link

Websocket eth filter system not reporting events correctly #1850

Open CRossel87a opened 2 months ago

CRossel87a commented 2 months ago

Seid version name: sei server_name: version: v5.7.5 commit: 7e7a9ce6eb069da0f1b31d9f8d23eadf80c30916 build_tags: netgo ledger, go: go version go1.21.6 linux/amd64 build_deps:

Chain ID Mainnet, pacific-1, 1329

Describe the bug Almost no eth events are emitted through the websocket filter subscription

To Reproduce Steps to reproduce the behavior:

  1. wscat -c ws://127.0.0.1:8546 # Connect to the websocket server
  2. Subscribe to Uniswap3 swaps {"jsonrpc":"2.0","id":1,"method":"eth_subscribe","params": ["logs", {"topics": ["0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"]}]}
  3. Wait & see or swap a small amount at Oku trade and observe

OR

  1. Subscribe to Uniswap2 sync events: {"jsonrpc":"2.0","id":1,"method":"eth_subscribe","params": ["logs", {"topics": ["0x1c411e9a96e071241c2f21f7726b17ae89e3cab4c78be50e062b03a9fffbbad1"]}]}
  2. Wait & see or swap a small amount at Dragonswap and observe

Expected behavior All events emitted

lordshisho commented 2 months ago

For anyone looking to confirm:

const ethers = require('ethers');

const pollingProvider = new ethers.providers.JsonRpcProvider("https://evm-rpc.sei-apis.com");
const websocketProvider = new ethers.providers.WebSocketProvider("wss://evm-ws.sei-apis.com");

const eventTracker = new Map();
const CONFIRMATION_TIMEOUT = 10000; // 10 seconds

async function listenForSwapEvents() {
    console.log("Listening for Swap events...");
    const swapTopic = ethers.utils.id("Swap(address,uint256,uint256,uint256,uint256,address)");

    const filter = {
        topics: [swapTopic],
        fromBlock: 'latest'
    };

    function handleEvent(providerType, log) {
        const eventKey = `${log.transactionHash}-${log.logIndex}`;

        if (!eventTracker.has(eventKey)) {
            eventTracker.set(eventKey, { providers: new Set(), timeout: null });
        }

        const eventInfo = eventTracker.get(eventKey);
        eventInfo.providers.add(providerType);

        if (eventInfo.providers.size === 1) {
            // First provider to detect this event, start the confirmation timeout
            eventInfo.timeout = setTimeout(() => {
                if (eventInfo.providers.size < 2) {
                    console.error(`ERROR: Event not confirmed by both providers within ${CONFIRMATION_TIMEOUT}ms`);
                    console.error(`Transaction Hash: ${log.transactionHash}`);
                    console.error(`Contract Address: ${log.address}`);
                    console.error(`Detected by: ${Array.from(eventInfo.providers).join(', ')}`);
                }
                eventTracker.delete(eventKey);
            }, CONFIRMATION_TIMEOUT);
        } else if (eventInfo.providers.size === 2) {
            // Both providers have detected the event
            clearTimeout(eventInfo.timeout);
            console.log(`\nSwap event confirmed by both providers:`);
            console.log(`Transaction Hash: ${log.transactionHash}`);
            console.log(`Contract Address: ${log.address}`);
            eventTracker.delete(eventKey);
        }
    }

    pollingProvider.on(filter, (log) => handleEvent('Polling', log));
    websocketProvider.on(filter, (log) => handleEvent('WebSocket', log));

    // Handle WebSocket disconnections
    websocketProvider._websocket.on("close", (code) => {
        console.log(`WebSocket disconnected with code ${code}. Attempting to reconnect...`);
        websocketProvider._websocket.terminate();
        listenForSwapEvents();
    });
}

listenForSwapEvents().catch(console.error);

// Keep the script running
process.stdin.resume();

It does in fact miss the majority of events on the Websocket Provider for some reason.

Swap event confirmed by both providers:
Transaction Hash: 0xd111657bb13d4c9fb595d6176bfdbf7fdc1c65b968ebd61685f690d8a72a7fd7
Contract Address: 0x70BCA5F4D188D93eD202dFdA5Db9eEdA0760d2b0
ERROR: Event not confirmed by both providers within 10000ms
Transaction Hash: 0x1938d34356afa36070e9a4da3a0643be326b118b8fdbafc918f72b1c0411fa01
Contract Address: 0x8D5261cFF8d63E71C772574EbA63E64E6726EE06
Detected by: Polling
ERROR: Event not confirmed by both providers within 10000ms
Transaction Hash: 0x56235c22f15e522c4c63dc6fa5ed5e519afdbec39e7c201ccfc8eb460d43ae8d
Contract Address: 0xC75C669a62A7eCe0C8d37904b747970467432ad3
Detected by: Polling
ERROR: Event not confirmed by both providers within 10000ms
Transaction Hash: 0x7b42c52c5b11248794eb39496000eabdf9a25f6e5d8ddc74f2d9951e5d6763ff
Contract Address: 0xDfe611c5a718a0525c6261Cf1f100e8db776b4b4
Detected by: Polling
CRossel87a commented 2 months ago

For anyone looking to confirm:

const ethers = require('ethers');

const pollingProvider = new ethers.providers.JsonRpcProvider("https://evm-rpc.sei-apis.com");
const websocketProvider = new ethers.providers.WebSocketProvider("wss://evm-ws.sei-apis.com");

const eventTracker = new Map();
const CONFIRMATION_TIMEOUT = 10000; // 10 seconds

async function listenForSwapEvents() {
    console.log("Listening for Swap events...");
    const swapTopic = ethers.utils.id("Swap(address,uint256,uint256,uint256,uint256,address)");

    const filter = {
        topics: [swapTopic],
        fromBlock: 'latest'
    };

    function handleEvent(providerType, log) {
        const eventKey = `${log.transactionHash}-${log.logIndex}`;

        if (!eventTracker.has(eventKey)) {
            eventTracker.set(eventKey, { providers: new Set(), timeout: null });
        }

        const eventInfo = eventTracker.get(eventKey);
        eventInfo.providers.add(providerType);

        if (eventInfo.providers.size === 1) {
            // First provider to detect this event, start the confirmation timeout
            eventInfo.timeout = setTimeout(() => {
                if (eventInfo.providers.size < 2) {
                    console.error(`ERROR: Event not confirmed by both providers within ${CONFIRMATION_TIMEOUT}ms`);
                    console.error(`Transaction Hash: ${log.transactionHash}`);
                    console.error(`Contract Address: ${log.address}`);
                    console.error(`Detected by: ${Array.from(eventInfo.providers).join(', ')}`);
                }
                eventTracker.delete(eventKey);
            }, CONFIRMATION_TIMEOUT);
        } else if (eventInfo.providers.size === 2) {
            // Both providers have detected the event
            clearTimeout(eventInfo.timeout);
            console.log(`\nSwap event confirmed by both providers:`);
            console.log(`Transaction Hash: ${log.transactionHash}`);
            console.log(`Contract Address: ${log.address}`);
            eventTracker.delete(eventKey);
        }
    }

    pollingProvider.on(filter, (log) => handleEvent('Polling', log));
    websocketProvider.on(filter, (log) => handleEvent('WebSocket', log));

    // Handle WebSocket disconnections
    websocketProvider._websocket.on("close", (code) => {
        console.log(`WebSocket disconnected with code ${code}. Attempting to reconnect...`);
        websocketProvider._websocket.terminate();
        listenForSwapEvents();
    });
}

listenForSwapEvents().catch(console.error);

// Keep the script running
process.stdin.resume();

It does in fact miss the majority of events on the Websocket Provider for some reason.

Swap event confirmed by both providers:
Transaction Hash: 0xd111657bb13d4c9fb595d6176bfdbf7fdc1c65b968ebd61685f690d8a72a7fd7
Contract Address: 0x70BCA5F4D188D93eD202dFdA5Db9eEdA0760d2b0
ERROR: Event not confirmed by both providers within 10000ms
Transaction Hash: 0x1938d34356afa36070e9a4da3a0643be326b118b8fdbafc918f72b1c0411fa01
Contract Address: 0x8D5261cFF8d63E71C772574EbA63E64E6726EE06
Detected by: Polling
ERROR: Event not confirmed by both providers within 10000ms
Transaction Hash: 0x56235c22f15e522c4c63dc6fa5ed5e519afdbec39e7c201ccfc8eb460d43ae8d
Contract Address: 0xC75C669a62A7eCe0C8d37904b747970467432ad3
Detected by: Polling
ERROR: Event not confirmed by both providers within 10000ms
Transaction Hash: 0x7b42c52c5b11248794eb39496000eabdf9a25f6e5d8ddc74f2d9951e5d6763ff
Contract Address: 0xDfe611c5a718a0525c6261Cf1f100e8db776b4b4
Detected by: Polling

Looking at the code in evmrpc\subscribe.go, we can see the logs functionality has not been properly implemented. My good friend Claude the AI made this, and it seems to work - but not vetted:

`

func (a SubscriptionAPI) Logs(ctx context.Context, filter filters.FilterCriteria) (s *rpc.Subscription, err error) { defer recordMetrics("eth_logs", a.connectionType, time.Now(), err == nil)

notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
    return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}

if filter == nil {
    filter = &filters.FilterCriteria{}
}

rpcSub := notifier.CreateSubscription()

// Create a channel for new head notifications
newHeadCh := make(chan map[string]interface{}, NewHeadsListenerBuffer)

// Subscribe to new heads
a.newHeadListenersMtx.Lock()
if uint64(len(a.newHeadListeners)) >= a.subscriptonConfig.newHeadLimit {
    a.newHeadListenersMtx.Unlock()
    return nil, errors.New("no new subscription can be created")
}
a.newHeadListeners[rpcSub.ID] = newHeadCh
a.newHeadListenersMtx.Unlock()

go func() {
    defer func() {
        a.newHeadListenersMtx.Lock()
        delete(a.newHeadListeners, rpcSub.ID)
        close(newHeadCh)
        a.newHeadListenersMtx.Unlock()
    }()

    for {
        select {
        case newHead, ok := <-newHeadCh:
            if !ok {
                return
            }

            blockNumber, ok := newHead["number"].(*hexutil.Big)
            if !ok {
                continue
            }

            currentBlock := blockNumber.ToInt().Int64()

            // Check if we should process this block based on FromBlock filter
            if filter.FromBlock != nil && currentBlock < filter.FromBlock.Int64() {
                continue
            }

            // Check if we've exceeded ToBlock filter
            if filter.ToBlock != nil && currentBlock > filter.ToBlock.Int64() {
                return
            }

            // Fetch logs for the current block
            logs, _, err := a.logFetcher.GetLogsByFilters(ctx, *filter, currentBlock)
            if err != nil {
                _ = notifier.Notify(rpcSub.ID, err)
                return
            }

            for _, log := range logs {
                if err := notifier.Notify(rpcSub.ID, log); err != nil {
                    return
                }
            }

        case <-rpcSub.Err():
            return
        case <-notifier.Closed():
            return
        }
    }
}()

return rpcSub, nil

}`

CRossel87a commented 2 months ago

1853 does not complete this issue, not even close

cordt-sei commented 2 months ago

Hey @CRossel87a we tried using your fix there and it seemed to partially work, it would start to show the 'synthetic' events for any cross-environment tx, but not all of them. Did it work fully for you?

CRossel87a commented 2 months ago

Hey @CRossel87a we tried using your fix there and it seemed to partially work, it would start to show the 'synthetic' events for any cross-environment tx, but not all of them. Did it work fully for you?

Hi,

I have not done deep testing on this

cordt-sei commented 2 months ago

I think we may have found the issue. You were also chatting with me on discord before it got hacked and deleted right?

On Thu, Oct 3, 2024, 3:58 PM CRossel87a @.***> wrote:

Hey @CRossel87a https://github.com/CRossel87a we tried using your fix there and it seemed to partially work, it would start to show the 'synthetic' events for any cross-environment tx, but not all of them. Did it work fully for you?

Hi,

I have not done deep testing on this

— Reply to this email directly, view it on GitHub https://github.com/sei-protocol/sei-chain/issues/1850#issuecomment-2392412328, or unsubscribe https://github.com/notifications/unsubscribe-auth/BHR645VP4UWWRCFD2TIRM2TZZW4YPAVCNFSM6AAAAABN4C25WGVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDGOJSGQYTEMZSHA . You are receiving this because you modified the open/close state.Message ID: @.***>