amqp-node / amqplib

AMQP 0-9-1 library and client for Node.JS
https://amqp-node.github.io/amqplib/
Other
3.69k stars 474 forks source link

Consume large message. How do I resolve the problem that an Invalid frame occurs because the content is not read complete #764

Closed YANGLEDUO1 closed 5 months ago

YANGLEDUO1 commented 5 months ago

image

cressie176 commented 5 months ago

Hi @YANGLEDUO1,

Have you read and ruled out these

YANGLEDUO1 commented 5 months ago

Yes, I set frameMax to 10M, and I still have this problem

YANGLEDUO1 commented 5 months ago

嗨,

你读过这些吗 Yes, I set frameMax to 10M, and I still have this problem

YANGLEDUO1 commented 5 months ago

嗨,

你读过这些吗

I printed the read, but the display was incomplete

cressie176 commented 5 months ago

Could it be the messages headers are too big?

YANGLEDUO1 commented 5 months ago

Could it be the messages headers are too big?

Yes, is there a solution

YANGLEDUO1 commented 5 months ago

Could it be the messages headers are too big?

The message content is relatively large

cressie176 commented 5 months ago

I believe RabbitMQ fragments the message and sends multiple frames, so content size shouldn't be a problem. There is some information about the process here.

My guess is that the header size may be too big (this has been reported as an issue in previous issues). I don't know this area of amqplib well enough to know it is a problem with the client though. Can you try using the RabbitMQ command line tools / management app to check the header size?

cressie176 commented 5 months ago

My mistake - it doesn't seem to be frame size. That would have been the error on line 55 instead (I tested to make sure).

cressie176 commented 5 months ago

It looks like the issue is that the message is genuinely invalid. The last byte of the frame must equal the frame end token, and it appears that it does not

cressie176 commented 5 months ago

FYI...

const amqplib = require('amqplib');
const test = new Array(100000000).fill('x').join('');

(async () => {
  await publish();
  await get();
})();

async function publish() {
  const connection = await amqplib.connect('amqp://localhost:5672?frameMax=0x100000')
  const channel = await connection.createChannel();
  await channel.assertQueue('q764');
  await channel.purgeQueue('q764');

  channel.sendToQueue('q764', Buffer.from(test));

  await channel.close();
  await connection.close();
}

async function get() {
  const connection = await amqplib.connect('amqp://localhost:5672?frameMax=0x1000')
  const channel = await connection.createChannel();
  await channel.assertQueue('q764');

  const message = await channel.get('q764');
  channel.ack(message);
  console.log('ok', message.content.length);

  await channel.close();
  await connection.close();
}
node index.js
ok 100000000
cressie176 commented 5 months ago

This does not appear to be an issue with amqplib. It also seems unlikely that RabbitMQ would send an invalid frame. Is there anything between the client and RabbitMQ which could be corrupting the message? e.g. a TCP load balancer perhaps?

I suggest you

  1. Try consuming the message with the RabbitMQ cli tools and see if you get a similar error, i.e.
    rabbitmqadmin get queue=<queue_name> requeue=false
  2. Use something like wireshark to inspect the frames
YANGLEDUO1 commented 5 months ago

这似乎不是 amqplib 的问题。RabbitMQ 似乎也不太可能发送无效帧。客户端和 RabbitMQ 之间是否有任何可能损坏消息的内容?例如,也许是 TCP 负载平衡器?

我建议你

  1. 尝试使用 RabbitMQ cli 工具使用消息,看看是否收到类似的错误,即``` rabbitmqadmin get queue= requeue=false
  2. 使用 wireshark 之类的东西来检查框架 I have no problem using java to consume.Maybe it's because my message is in Chinese
YANGLEDUO1 commented 5 months ago

这似乎不是 amqplib 的问题。RabbitMQ 似乎也不太可能发送无效帧。客户端和 RabbitMQ 之间是否有任何可能损坏消息的内容?例如,也许是 TCP 负载平衡器? 我建议你

  1. 尝试使用 RabbitMQ cli 工具使用消息,看看是否收到类似的错误,即``` rabbitmqadmin get queue= requeue=false
  2. 使用 wireshark 之类的东西来检查框架 I have no problem using java to consume.Maybe it's because my message is in Chinese

It has nothing to do with Chinese. It seems like when you approach frameMax the last one is a newline or a semicolon . An error will occur

YANGLEDUO1 commented 5 months ago

Did you recreate it?

cressie176 commented 5 months ago

Hi @YANGLEDUO1, I will give it a try. Can you paste the above as a buffer converted to hex please, wrapped in double quotes so I can be sure I am using exactly the correct content

i.e.

console.log(`"` + Buffer.from('the message content').toString('hex')+`"`)
"746865206d65737361676520636f6e74656e74"
YANGLEDUO1 commented 5 months ago

Sample message here => https://gist.githubusercontent.com/cressie176/091694a4285ac572bbb5c2548a87d5f4/raw/38ccfb2acb6d617a24101e430d5fdc75916d5423/amqplib-764.txt

cressie176 commented 5 months ago
const amqplib = require('amqplib');
const test = Buffer.from("7b0a2...220a7d", 'hex');

(async () => {
  await publish();
  await get();
})();

async function publish() {
  const connection = await amqplib.connect('amqp://localhost:5672')
  const channel = await connection.createChannel();
  await channel.assertQueue('q764');
  await channel.purgeQueue('q764');

  channel.sendToQueue('q764', test);

  await channel.close();
  await connection.close();
}

async function get() {
  const connection = await amqplib.connect('amqp://localhost:5672')
  const channel = await connection.createChannel();
  await channel.assertQueue('q764');

  const message = await channel.get('q764');
  channel.ack(message);
  console.log('ok', message.content.length);
  console.log(message.content.toString('utf8'));
  await channel.close();
  await connection.close();
}

Works fine

ok 60023
{
"body":"陨眫龂...
"body":"陨眫龂...
}
YANGLEDUO1 commented 5 months ago

Can the one above be reproduced

YANGLEDUO1 commented 5 months ago

image

cressie176 commented 5 months ago

ok 58308 { "body":"陨眫龂 "body":"陨眫龂 " }

YANGLEDUO1 commented 5 months ago

image image

YANGLEDUO1 commented 5 months ago

Is it my code?

cressie176 commented 5 months ago

You don't need to put the channel.close and connection.close in a setTimeout. Instead await them...

await channel.close()
await connection.close();

This could be cutting the message short that is published to RabbitMQ. I am surprised the broker accepts it though.

It would be interesting to see what size bitsyntax thinks the frame is though. Can you log it out and compare it to rest.length?

cressie176 commented 5 months ago

I will check later to see how you are getting on. Ultimately though this isn't a problem with amqplib as all it is doing is checking the last byte of the frame equals the frame end constant (206).

It is possibly a problem with a library called bitsyntax, which parses the frame, but I doubt it. I think it is most likely that either the frame data has been corrupted or truncated, or the frame size has been set incorrectly, either when the message was published (possibly because you haven't awaited the channel.close and connection close) or by something else - maybe a library that you are using to add instrumentation, or something like a TCP load balancer.

You can rule these out using wireshark to look at the AMQP frames. You should be able to see the frame size and frame payload.

cressie176 commented 5 months ago

For reference, this is what I see when I use wireshark.

The first frame in the highlighted set is the GET-OK response. After that there are the message headers, and three body frames. Each body frame has a length of 4088

Screenshot 2024-06-01 at 15 15 29

This is the last set of body frames before the message is completely delivered and an Ack is sent back. The last body frame has a length of 1076, meaning there are 1076 bytes of content. The 1077th byte is ce, which when converted from HEX to UTF-8 is 206. This is the FRAME END token.

Screenshot 2024-06-01 at 15 16 01

If I update the following code in node_modules/amqplib/lib/frame.js

    else if (rest.length > size) {
      console.log(`size`, size, rest[size]);
      if (rest[size] !== FRAME_END)
        throw new Error('Invalid frame');

The output from my test script above confirms the frame sizes and FRAME END tokens

...
size 23 206   <--- Basic.Get-Ok
size 18 206   <--- Content-Header
size 4088 206 <--- Content-Body
size 4088 206 <--- Content-Body
size 4088 206 <--- Content-Body
size 4088 206 <--- Content-Body
size 4088 206 <--- Content-Body
size 4088 206 <--- Content-Body
size 4088 206 <--- Content-Body
size 4088 206 <--- Content-Body
size 4088 206 <--- Content-Body
size 4088 206 <--- Content-Body
size 4088 206 <--- Content-Body
size 4088 206 <--- Content-Body
size 4088 206 <--- Content-Body
size 4088 206 <--- Content-Body
size 1076 206 <--- Content-Body
size 4 206    <--- Channel.Close-Ok
size 4 206    <--- Connection.Close-Ok
YANGLEDUO1 commented 5 months ago

image image my print: image

YANGLEDUO1 commented 5 months ago

I don't think I caught anything like you

作为参考,这是我使用 wireshark 时看到的。

突出显示的集合中的第一帧是 GET-OK 响应。之后是消息标头和三个正文框架。每个车身框架的长度为4088

屏幕截图 2024-06-01 at 15 15:29

这是消息完全传递并发回 Ack 之前的最后一组正文帧。最后一个正文帧的长度为 1076,这意味着有 1076 字节的内容。第 1077 个字节是 ,从十六进制转换为 UTF-8 时为 206。这是 FRAME END 令牌。ce

屏幕截图 2024-06-01 at 15 16 01

如果我在 node_modules/amqplib/lib/frame.js 中更新以下代码

    else if (rest.length > size) {
      console.log(`size`, size, rest[size]);
      if (rest[size] !== FRAME_END)
        throw new Error('Invalid frame');

上面测试脚本的输出确认了帧大小和 FRAME END 令牌

...
size 23 206   <--- Basic.Get-Ok
size 18 206   <--- Content-Header
size 4088 206 <--- Content-Body
size 4088 206 <--- Content-Body
size 4088 206 <--- Content-Body
size 4088 206 <--- Content-Body
size 4088 206 <--- Content-Body
size 4088 206 <--- Content-Body
size 4088 206 <--- Content-Body
size 4088 206 <--- Content-Body
size 4088 206 <--- Content-Body
size 4088 206 <--- Content-Body
size 4088 206 <--- Content-Body
size 4088 206 <--- Content-Body
size 4088 206 <--- Content-Body
size 4088 206 <--- Content-Body
size 1076 206 <--- Content-Body
size 4 206    <--- Channel.Close-Ok
size 4 206    <--- Connection.Close-Ok

I don't think I caught anything like you

cressie176 commented 5 months ago

So amqplib is reporting the same size as wireshark (116618), but finding that byte in that position is 52 not 206, hence throwing an the error.

You can use wireshark to check the actual payload by selecting the it as you have done, and looking in the panel to the right. The data should be highlighted (in blue on my screen), and the first unhighlighted byte should be the FRAME END "ce". If it is not "ce" but "34" (the hex value for 52), then the the frame is definitely invalid.

Screenshot 2024-06-01 at 17 48 51

You can also copy the payload as a hex stream by right clicking it, then pasting it into a text editor and counting the length of the string. It should be 233236 (2x116618).

Screenshot 2024-06-01 at 18 02 55
YANGLEDUO1 commented 5 months ago

image In the text the length is 233237

YANGLEDUO1 commented 5 months ago

Why does this happen

因此,amqplib 报告的大小与 wireshark (116618) 相同,但发现该位置的字节是 52 而不是 206,因此抛出错误。

您可以使用 wireshark 检查实际有效负载,方法是像您所做的那样选择它,然后在右侧的面板中查看。数据应该突出显示(在我的屏幕上以蓝色显示),第一个未突出显示的字节应该是 FRAME END “ce”。如果它不是“ce”而是“34”(52 的十六进制值),则该帧肯定是无效的。

Screenshot 2024-06-01 at 17 48 51

您还可以将有效负载复制为十六进制流,方法是右键单击它,然后将其粘贴到文本编辑器中并计算字符串的长度。它应该是 233236 (2x116618)。

Screenshot 2024-06-01 at 18 02 55

Why does this happen

YANGLEDUO1 commented 5 months ago

Is it me code? Can you tell me how to fix this

cressie176 commented 5 months ago
  1. Did that message fail with the Invalid Frame error?
  2. Can you copy the payload for that frame as a Hex Stream using Wireshark and paste it into a new comment, without any other text please
cressie176 commented 5 months ago

Hi @YANGLEDUO1. I don't think what you pasted was taken using the Wireshark "as a Hex Stream" - there aren't any letters in it.

image

cressie176 commented 5 months ago

Sorry - I see that is the copy from the frame now. What is strange is that you were previously sending a mixture of English (Body) and Chinese characters. When translated to hex, these would appear as a mixture of the letters A-F and numbers 0-9. The content in the above message is exclusively numeric. Do you have any explanation for this?

YANGLEDUO1 commented 5 months ago

console.log(" + Buffer.from('origin message').toString('hex')+")

YANGLEDUO1 commented 5 months ago

image I can see from this that it's not complete

YANGLEDUO1 commented 5 months ago

My message is sent directly from the rabbitmq management side. If you implement it according to your code, you won't have this problem

YANGLEDUO1 commented 5 months ago

You can try consuming a message but not sending it yourself. It is sent directly through the rabbitmq management terminal

cressie176 commented 5 months ago

Works fine with the message you shared previously...

image

size 508 206 size 12 206 size 5 206 size 8 206 size 17 206 size 8 206 size 4 206 size 4 206 size 508 206 size 12 206 size 5 206 size 8 206 size 17 206 size 36 206 size 51 206 size 19 206 size 60026 206 ok 60026

YANGLEDUO1 commented 5 months ago

But I just don't go and I have the same problem with the package image

cressie176 commented 5 months ago

If you are publishing using the RabbitMQ Management console, then amqplib is throwing an error before it is reaching your code.

YANGLEDUO1 commented 5 months ago

Yes. It hasn't reached my code yet

cressie176 commented 5 months ago

For a message that fails with the Invalid Frame error, can you provide

  1. A copy of the whole last Content-Body frame (not just the payload) as a Hex Stream from Wireshark.

image

  1. The buffer that the parseFrame function receives
function parseFrame(bin, max) {
  console.log(bin.toString('hex'));
  var fh = frameHeaderPattern(bin);

Please provide them in two separate gists to make this issue easier to navigate

cressie176 commented 5 months ago

Also the details of the Tune and Tune-OK commands would be useful to confirm the Frame Max

image

YANGLEDUO1 commented 5 months ago

image copy Wireshark: 030001000005ff223a66616c73652c226f72674964223a22383832363031393535313730343531343536222c226f7267436f6465223a224731323439222c226f7267436f646573223a2247313234372c4731323439222c2274656e616e744964223a22383832353937393136353735373933313532222c226d656d6265724964223a22383838383633333235333237313936313630222c226d656d6265724e616d65223a22e8b0a2e6b281e88aaf222c226d656d6265724a6f624e756d626572223a22333331303232323030383036323330303433222c226d656d6265724964656e7469747943617465676f7279223a2273747564656e74222c2265717569706d656e744964223a224d303134323031383232333131303030313233222c2265717569706d656e744e616d65223a22e6ada3e997a8e587bae58fa3e4babae884b8e8af86e588abe69cba3031222c226f70657261746554797065223a225245564f4b45222c2270726f636573735374617465223a66616c73652c2263617264446f776e6c696e6b223a66616c73652c2266616365446f776e6c696e6b223a66616c73657d2c7b226964223a223839383233383633353836343735323132384d303134323031383232333131303030313233222c2264656c65746564223a66616c73652c226f72674964223a22383838383630343037323738363030313932222c226f7267436f6465223a2247393635393337222c226f7267436f646573223a2247313234372c47313234392c47393635393337222c2274656e616e744964223a22383832353937393136353735373933313532222c226d656d6265724964223a22383938323338363335383634373532313238222c226d656d6265724e616d65223a22e69cb1e6b5b7e6be9c222c226d656d6265724a6f624e756d626572223a22333331303832323030373131323030303430222c226d656d6265724964656e7469747943617465676f7279223a2273747564656e74222c2265717569706d656e744964223a224d303134323031383232333131303030313233222c2265717569706d656e744e616d65223a22e6ada3e997a8e587bae58fa3e4babae884b8e8af86e588abe69cba3031222c226f70657261746554797065223a225245564f4b45222c2270726f636573735374617465223a66616c73652c226e61747572616c4661636555726c223a222f66732f6174746163686d656e74732f646f776e6c6f61642f31373138383733363130313037363231333736222c2263617264446f776e6c696e6b223a66616c73652c2266616365446f776e6c696e6b223a66616c73657d2c7b226964223a223930313035343039323938383634353337364d303134323031383232333131303030313233222c2264656c65746564223a66616c73652c226f72674964223a22383838383630343037323738363030313932222c226f7267436f6465223a2247393635393337222c226f7267436f646573223a2247313234372c47313234392c47393635393337222c2274656e616e744964223a22383832353937393136353735373933313532222c226d656d6265724964223a22393031303534303932393838363435333736222c226d656d6265724e616d65223a22e590b4e5b09ae79c9f222c226d656d6265724a6f624e756d626572223a22333331303232323030373131313530393634222c226d656d6265724964656e7469747943617465676f7279223a2273747564656e74222c2265717569706d656e744964223a224d303134323031383232333131303030313233222c2265717569706d656e744e616d65223a22e6ada3e997a8e587bae58fa3e4babae884b8e8af86e588abe69cba3031222c226f70657261746554797065223a225245564f4b45222c2270726f636573735374617465223a66616c73652c226e61747572616c4661636555726c223a222f66732f6174746163686d656e74732f646f776e6c6f61642f31373036313434303939333733303237333238222c2263617264446f776e6c696e6b223a66616c73652c2266616365446f776e6c696e6b223a66616c73657d5d2c2265717569706d656e744964223a224d303134323031383232333131303030313233227dce

YANGLEDUO1 commented 5 months ago

code console content res.txt

cressie176 commented 5 months ago

These are not from the same message

The Wireshark one starts with 030001000005ff223a6661 The console one starts with 01000100000054003c003c1f

I need the output from a message which fails from both Wireshark and the debug line added to the parseFrame function

YANGLEDUO1 commented 5 months ago

rest.txt

这些不是来自同一条消息

Wireshark 的开头是 控制台的开头是030001000005ff223a6661``01000100000054003c003c1f

我需要一条消息的输出,该消息从 Wireshark 和添加到 parseFrame 函数的调试行都失败

This is a complete print at the time of receipt, and the contents of the file contain 030001000005ff223a6661. You do a search. He looks like a subcontractor

YANGLEDUO1 commented 5 months ago

image This is the length of each print through parseFrame