Closed AbhilashR2020 closed 4 years ago
It would be great if you can share a gist that simulates your problem. I am pretty sure we can fix it once it is better understood.
I am afraid that you are experience issue due to hard limit on the wire format of the message. https://github.com/fogfish/esq/blob/master/src/esq_writer.erl#L125
It seems that any chunk large then 64KB is corrupted due 16 bits size of length value.
Hi,
Please find the sample test-case with the simulated problem.
11> {ok, Input} = file:read_file("C:/Users/bbharti/Downloads/FunctionalWebDevelopmentwit-LanceHalvorsen_157.pdf"). {ok,<<"%PDF-1.4\n%Γπ╧╙\n1 0 obj<</BaseFont/Helvetica/Type/Font/Encoding/WinAnsiEncoding/Subtype/Type1>>\nendobj\n2 0 ob"...>>} 12> {ok, Q} = esq:new("c:/Projects/clientServer/esq/myqueue1"). {ok,<0.137.0>} 13> size(Input). 4201586 14> esq:deq(Q). %% just to see no data exists. [] 15> esq:enq(Input, Q). %% data is inserted ok 16> esq:deq(Q).
=ERROR REPORT==== 30-Jan-2020::17:26:29 === Generic server <0.137.0> terminating Last message in was {'$pipe',{<0.133.0>,#Ref<0.0.5.1365>},{deq,1}} When Server state == {machine,esq_queue,handle, {q, {queue,0,[],[]}, {file, {writer, {file_descriptor,prim_file,{#Port<0.21863>,504}}, "c:/Projects/clientServer/esq/myqueue1", "c:/Projects/clientServer/esq/myqueue1/20200130/q.spool", 4201638}, {reader,undefined, "c:/Projects/clientServer/esq/myqueue1",undefined, <<>>}, 1}, undefined,1,undefined,undefined,1000, {1580,385386,395000}}, undefined,undefined,undefined,undefined,undefined} Reason for termination == {badarg,[{erlang,binary_to_term,[<<>>],[]}, {esq_reader,decode,1, [{file,"c:/Projects/clientServer/esq/src/esq_reader.erl"}, {line,132}]}, {esq_reader,deq,1, [{file,"c:/Projects/clientServer/esq/src/esq_reader.erl"}, {line,58}]}, {esq_file,deq,3, [{file,"c:/Projects/clientServer/esq/src/esq_file.erl"}, {line,71}]}, {esq_queue,shift_on_disk_tail,2, [{file,"c:/Projects/clientServer/esq/src/esq_queue.erl"}, {line,198}]}, {esq_queue,deq,2, [{file,"c:/Projects/clientServer/esq/src/esq_queue.erl"}, {line,125}]}, {esq_queue,handle,3, [{file,"c:/Projects/clientServer/esq/src/esq_queue.erl"}, {line,74}]}, {pipe_process,run,3, [{file,"c:/Projects/clientServer/esq/_build/default/lib/pipe/src/pipe_process.erl"}, {line,224}]}]} exception exit: badarg in function binary_to_term/1 called as binary_to_term(<<>>) in call from esq_reader:decode/1 (c:/Projects/clientServer/esq/src/esq_reader.erl, line 132) in call from esq_reader:deq/1 (c:/Projects/clientServer/esq/src/esq_reader.erl, line 58)
The root cause is a hard limit of the message size. The queue operates with 16KB messages only. Technically the fix is trivial see #18 but there are few things to consider:
Message Queue is a signalling channel. This is a bad practice to pass large files through it. Instead it should be used to pass a reference to file. Please check that any MQ solution employs a very small limit on the message.
Compatibility. Changing the message format. We are going to break compatibility. Fortunately, I've built-in version schema so that I can transparently adapt.
Alignment of queue segments. The default size of segments is tuned for 16KB message. I would need to find new ratio if message size growth.
I do not feel a right, to push 4GB messages to queue. However, if 16KB is small, we can increase the size, may be to 1MB. What is you use-cases?
Hi,
use-case is similar to PostgresBDR solution. Using esq as a queuing application.
Our messages are up to 4MB not 4GB. I think it would be good to have queue support to better in memory side.
Compatibility. Changing the message format. We are going to break compatibility. Fortunately, I've built-in version schema so that I can transparently adapt. can you please elaborate on this. How compatibility can be an issue
Alignment of queue segments: I hope this is disk segment size right?. so we can adjust it to adaptable size( can this be configurable?).
Tested the esq for large data packets. Its working without any errors
Hi,
I was trying to enque and deque the large data packet using esq's. I was able to enque a 4MB file. But while trying to deque its returning [] list, and data is lost from the disk.
In the README section of the document, it is mentioned as below
The head is kept in memory using dequeue data structure. It's capacity is limited to C messages.
which is of 64KB limit(tested).Is there anyways we can increase the in-flight memory in holding a larger packets.