tronprotocol / tips

TRON Improvement Proposals
229 stars 203 forks source link

Issue: one node disconnect its peer because of processing FETCH_INV_DATA message failed in p2p #426

Closed 317787106 closed 1 year ago

317787106 commented 2 years ago

Simple Summary

We introduce a solution to fix the error: message from /195.122.17.97 process failed, type: FETCH_INV_DATA, ... , type: 4, bad message, detail: Num:16694801,ID:0000000000febe115cca6a31bc73c24fd377b928f63e9c35a771b743a225862d is exist.

Abstract

The peer disconnects a node because process FETCH_INV_DATA message from this node failed as this node ask its peer same block. We try to find the reason why this peer asks for the same block from our fullnode twice. We find that multiply threads access variable requestBlockIds, sometimes the method Channel.disconnect may be not running as we excpet. We optimze the code to avoid this error.

Motivation

Sometimes, I find following log in one fullnode associates with the peer 3.137.162.36:60160:

12:26:01.855 INFO  [nioEventLoopGroup-6-21] [net](MessageQueue.java:157) Receive from /3.137.162.36:60160, type: SYNC_BLOCK_CHAIN
size: 5, start block: Num:39415280,ID:0000000002596df090ac2f49ff88e27d243753b0ec7c8121a59e373fc065ecf7, end block Num:39415299,ID:0000000002596e03ee21489852d50e39283d438d5c2fc9e2a2f6210db72f6346
......
12:26:01.864 INFO  [nioEventLoopGroup-6-21] [net](MessageQueue.java:138) Send to /3.137.162.36:60160, type: BLOCK_CHAIN_INVENTORY
size: 2001, first blockId: Num:39415299,ID:0000000002596e03ee21489852d50e39283d438d5c2fc9e2a2f6210db72f6346, end blockId: Num:39417299,ID:00000000025975d35fc3a8c2a515804607ef320815ea116ecabc4e92c4288701
......
12:26:01.914 INFO  [nioEventLoopGroup-6-21] [net](MessageQueue.java:157) Receive from /3.137.162.36:60160, type: SYNC_BLOCK_CHAIN
size: 11, start block: Num:39415280,ID:0000000002596df090ac2f49ff88e27d243753b0ec7c8121a59e373fc065ecf7, end block Num:39417299,ID:00000000025975d35fc3a8c2a515804607ef320815ea116ecabc4e92c4288701
......
12:26:01.921 INFO  [nioEventLoopGroup-6-21] [net](MessageQueue.java:138) Send to /3.137.162.36:60160, type: BLOCK_CHAIN_INVENTORY
size: 2001, first blockId: Num:39417299,ID:00000000025975d35fc3a8c2a515804607ef320815ea116ecabc4e92c4288701, end blockId: Num:39419299,ID:0000000002597da33e7f02678f1716f1961ff7769e94f7795657160bb7744c38
......
12:26:02.348 INFO  [nioEventLoopGroup-6-21] [net](MessageQueue.java:157) Receive from /3.137.162.36:60160, type: FETCH_INV_DATA
invType: BLOCK, size: 100, First hash: 0000000002596e04d4a1871ee86784bbc269c48099a9e8d97f3402e26c558daa, End hash: 0000000002596e679adb2bb334f8aeed5db88697f28662f215cdd4fbd9684e52
......
12:26:02.349 INFO  [nioEventLoopGroup-6-21] [net](MessageQueue.java:138) Send to /3.137.162.36:60160, type: BLOCK
Num:39415300,ID:0000000002596e04d4a1871ee86784bbc269c48099a9e8d97f3402e26c558daa, trx size: 155
......
12:26:02.392 INFO  [nioEventLoopGroup-6-21] [net](MessageQueue.java:138) Send to /3.137.162.36:60160, type: BLOCK
Num:39415399,ID:0000000002596e679adb2bb334f8aeed5db88697f28662f215cdd4fbd9684e52, trx size: 126
......
12:26:03.351 ERROR [nioEventLoopGroup-6-21] [net](TronNetService.java:156) Message from /3.137.162.36 process failed, type: FETCH_INV_DATA
invType: BLOCK, size: 100, First hash: 0000000002596e494bdf6f1165c05d2ba7c42132efe7b84b9076df41a77d8ce8, End hash: 0000000002596eeeaa6e3c9764bf66d65bb8e7a489bd9828a51f073497d43d11
type: 4, bad message, detail: Num:39415369,ID:0000000002596e494bdf6f1165c05d2ba7c42132efe7b84b9076df41a77d8ce8 is exist.
12:26:03.351 INFO  [nioEventLoopGroup-6-21] [net](Channel.java:140) Send to /3.137.162.36:60160 online-time 1s, type: P2P_DISCONNECT reason: BAD_PROTOCOL
12:26:03.351 INFO  [nioEventLoopGroup-6-21] [net](TronChannelInitializer.java:48) Close channel:/3.137.162.36:60160 | fb55f476c6d5030052ee1c3d5acdd10547eaa448419b7b964ce1ec87ad49bbc9845775722a2a76193a4d2f8952e5f68332c37d06dac086defd25bc6c18271421

This log tells us the peer 3.137.162.36:60160 send two same SYNC_BLOCK_CHAIN messages to local fullnode among 100 mills, the local fullnode find this peer ask the same block Num:39415369,ID:0000000002596e494bdf6f1165c05d2ba7c42132efe7b84b9076df41a77d8ce8 among short time when deal with FETCH_INV_DATA message, so we disconnect with the peer.

How does it happen in multi-threads ?

Let's suppose following steps from thress thread:

SyncService.startFetchSyncBlock:

stpe 1. Peer A send FetchInvDataMessage to local fullnode to ask for blocks by SyncService.startFetchSyncBlock, including Block(Num=100) for example. Local fulnode send back Block(Num=100) to this peer, and the peer receive Block(Num=100) successfully. But Peer A has not pop Block(Num=100) from local FullNode's syncBlockToFetch yet by handleSyncBlock.

step 2. Peer A executes method tronNetDelegate.getActivePeer().stream(), but the left has not executed.

private void startFetchSyncBlock() {
  HashMap<PeerConnection, List<BlockId>> send = new HashMap<>();

  tronNetDelegate.getActivePeer().stream()
      .filter(peer -> peer.isNeedSyncFromPeer() && peer.isIdle())
      //when execute here, peer A disconnect with local FullNode
      .forEach(peer -> {
        if (!send.containsKey(peer)) {
          send.put(peer, new LinkedList<>());
        }
        for (BlockId blockId : peer.getSyncBlockToFetch()) {
          if (requestBlockIds.getIfPresent(blockId) == null) {
            requestBlockIds.put(blockId, System.currentTimeMillis());
            peer.getSyncBlockRequested().put(blockId, System.currentTimeMillis());
            send.get(peer).add(blockId);
            if (send.get(peer).size() >= MAX_BLOCK_FETCH_PER_PEER) {
              break;
            }
          }
        }
      });

  send.forEach((peer, blockIds) -> {
    if (!blockIds.isEmpty()) {
      peer.sendMessage(new FetchInvDataMessage(new LinkedList<>(blockIds), InventoryType.BLOCK));
    }
  });
}

step 5. For Peer A, Block(Num=100) is not in global requestBlockIds, so it assigns Block(Num=100) to local FullNode's send list, and then send FetchInvDataMessage to local FullNode to ask for Block(Num=100) again. because the channel has not closed, it can send any TronMessage.

SyncService.handleSyncBlock:

step 4. peer A checks local fullnode is disconnected so it remove Block(Num=100) from its global requestBlockIds.

if (peerConnection.isDisconnect()) {
  blockWaitToProcess.remove(msg);
  invalid(msg.getBlockId()); //remove Block from requestBlockIds
  return;
}

PeerStatusCheck.statusCheck:

step 3. Peer A disconnects with Local FullNode as some reason, only set the tag Channel.isDisconnect = ture, but actually channle has not closed.

public void disconnect(ReasonCode reason) {
  this.isDisconnect = true;
  this.disconnectTime = System.currentTimeMillis();
  channelManager.processDisconnect(this, reason);
  DisconnectMessage msg = new DisconnectMessage(reason);
  logger.info("Send to {} online-time {}s, {}",
      ctx.channel().remoteAddress(),
      (System.currentTimeMillis() - startTime) / 1000,
      msg);
  getNodeStatistics().nodeDisconnectedLocal(reason);
  // close() has not been callback yet.
  ctx.writeAndFlush(msg.getSendData()).addListener(future -> close());
}

Implementation

So, how to prevent some peer from requesting same block twice or more? Acutally, it's simple that check if the tag Channel.isDisconnect is ture before sending any message through ctx.writeAndFlush in MessageQueue.java.

    sendMsgThread = new Thread(() -> {
      while (sendMsgFlag) {
        try {
          if (msgQueue.isEmpty()) {
            Thread.sleep(10);
            continue;
          }
          Message msg = msgQueue.take();
          // check begin
          if (channel.isDisconnect()) {
            logger.warn("Failed to send to {} as channel has closed, {}",
                ctx.channel().remoteAddress(), msg);
            return;
          }
          // check end
          ctx.writeAndFlush(msg.getSendData()).addListener((ChannelFutureListener) future -> {
            if (!future.isSuccess() && !channel.isDisconnect()) {
              logger.warn("Failed to send to {}, {}", ctx.channel().remoteAddress(), msg);
            }
          });
        } catch (InterruptedException e) {
          logger.warn("Send message server interrupted.");
          Thread.currentThread().interrupt();
        } catch (Exception e) {
          logger.error("Failed to send to {}, error info: {}", ctx.channel().remoteAddress(),
              e.getMessage());
        }
      }
    });
  public void fastSend(Message msg) {
    // check begin
    if (channel.isDisconnect()) {
      logger.warn("Fast send to {} failed as channel has closed, {} ",
          ctx.channel().remoteAddress(), msg);
      msgQueue.clear();
      return;
    }
    // check end
    logger.info("Fast send to {}, {} ", ctx.channel().remoteAddress(), msg);
    ctx.writeAndFlush(msg.getSendData()).addListener((ChannelFutureListener) future -> {
      if (!future.isSuccess() && !channel.isDisconnect()) {
        logger.error("Fast send to {} failed, {}", ctx.channel().remoteAddress(), msg);
      }
    });
  }