Closed heldner closed 2 months ago
Hi @heldner sorry for the delay. Can you share more details on how are you collecting the data you used to draw the graph? Would also be interesting if you send me all the values used in the module configuration and an explanation on how your application works. Like, if you proactively delete channels, and any other detail that might help. If it is a bug, maybe related to a specific use case since the basic usage has automated tests covering it.
Metrcs are colleted with one minute interval and are drown by zabbix. The application is fetching pack of messages from rabbit and posts to nginx lua script. Then lua pushes messages one by one to pushstream.
full config related to pushstream
location /message-post {
push_stream_publisher;
push_stream_channels_path $arg_id;
allow 127.0.0.1;
}
location /message-post-multi {
default_type 'text/plain';
content_by_lua '
ngx.req.read_body()
local reqs = {}
local post_data = ngx.req.get_body_data()
for word in string.gmatch(ngx.var.arg_ids, "[%w_]+") do
table.insert(reqs, { "/message-post?id="..word, { method = ngx.HTTP_POST, body = post_data }})
end
local resps = { ngx.location.capture_multi(reqs) }
ngx.say("OK")
';
allow 127.0.0.1;
}
location /message-post-batch {
client_max_body_size 0;
client_body_buffer_size 128k;
default_type 'text/plain';
content_by_lua_file /etc/nginx/lua/pushstream_batch.lua;
allow 127.0.0.1;
}
location /channels-stats {
push_stream_channels_statistics;
push_stream_channels_path $arg_id;
allow 127.0.0.1; # allow stats only from localhost
}
# subscribe
location ~ /message-pipe/(.*) {
push_stream_subscriber long-polling;
push_stream_channels_path $1;
push_stream_message_template "{\"id\":~id~,\"channel\":\"~channel~\",\"text\":~text~}";
push_stream_longpolling_connection_ttl 30s;
add_header Access-Control-Allow-Origin *;
}
location ~ /message-pipe-ws/(.*) {
push_stream_subscriber websocket;
push_stream_channels_path $1;
push_stream_message_template "{\"id\":~id~,\"channel\":\"~channel~\",\"text\":~text~}";
push_stream_websocket_allow_publish on;
push_stream_ping_message_interval 10s;
add_header Access-Control-Allow-Origin *;
}
location ~ /message-pipe-es/(.*) {
push_stream_subscriber eventsource;
push_stream_channels_path $1;
push_stream_message_template "{\"id\":~id~,\"channel\":\"~channel~\",\"text\":~text~}";
push_stream_ping_message_interval 10s;
add_header Access-Control-Allow-Origin *;
}
location ~ /message-pipe-stream/(.*) {
default_type "text/html; charset=utf-8";
push_stream_subscriber;
push_stream_channels_path $1;
push_stream_header_template "<html>\r\n\t<head>\r\n\t\t<meta http-equiv=\"Content-Type\" content=\"text/html; charset=utf-8\">\r\n\t\t<meta http-equiv=\"Cache-Control\" cont
ent=\"no-store\">\r\n\t\t<meta http-equiv=\"Cache-Control\" content=\"no-cache\">\r\n\t\t<meta http-equiv=\"Pragma\" content=\"no-cache\">\r\n\t\t<meta http-equiv=\"Expires\" content=\"Thu, 1
Jan 1970 00:00:00 GMT\">\r\n\t\t<script type=\"text/javascript\">\r\n\t\t\twindow.onError = null;\r\n\t\t\ttry{\r\n\t\t\t\tdocument.domain = (window.location.hostname.match(/^(\d{1,3}\.){3}\
d{1,3}$/))\r\n\t\t\t\t\t? window.location.hostname\r\n\t\t\t\t\t: window.location.hostname.split('.').slice(\r\n\t\t\t\t\t\t-1 * Math.max(\r\n\t\t\t\t\t\t\twindow.location.hostname.split('.')
.length - 1,\r\n\t\t\t\t\t\t\t(window.location.hostname.match(/(\w{4,}\.\w{2}|\.\w{3,})$/) ? 2 : 3)\r\n\t\t\t\t\t\t)\r\n\t\t\t\t\t).join('.')\r\n\t\t\t\t;\r\n\t\t\t}catch(e) {};\r\n\t\t\tpare
nt.PushStream.register(this);\r\n\t\t</script>\r\n\t</head>\r\n\t<body>\r\n";
push_stream_message_template "<script>\r\n\tp(~id~,'~channel~',(function(){ return ~text~ })() || '');\r\n</script>\r\n";
push_stream_footer_template "\t</body>\r\n</html>\r\n";
push_stream_ping_message_interval 10s;
add_header Access-Control-Allow-Origin *;
add_header Content-Type text/html;
}
pushstream_batch.lua
if ngx.req.get_method() ~= "POST" then
ngx.status = ngx.HTTP_BAD_REQUEST
ngx.say("This API works only with method POST")
return
end
local cjson = require "cjson" -- https://github.com/mpx/lua-cjson/releases/#2.1.0
local function get_body_str()
ngx.req.read_body()
local body = ngx.req.get_body_data()
if body == nil then
-- Looks like Nginx decided to store POST data into file
local tmp_file_name = ngx.req.get_body_file()
if tmp_file_name == nil then
return nil
end
local tmp_file_hdl = io.open(tmp_file_name, 'rb')
body = tmp_file_hdl:read("*all")
tmp_file_hdl:close()
end
return body
end
local ok, arr = pcall(cjson.decode, get_body_str())
if not ok then
ngx.log(ngx.ERR, arr)
ngx.status = ngx.HTTP_BAD_REQUEST
ngx.say('Invalid JSON')
return
end
local queue = {}
local blockings = {}
local MAX_SUBREQUESTS = 192 -- actually 200 is hard-coded in NGX_HTTP_MAX_SUBREQUESTS
local stat_count = 0
local function process_item(item)
local pushstream_channels = item.pushstream_channel
if type(pushstream_channels) ~= "table" then
pushstream_channels = {pushstream_channels}
end
data = cjson.encode({
channel=item.channel,
content=item.content
})
for _, pushstream_channel in ipairs(pushstream_channels) do
-- Hash which defines which messages we can send in parallel
-- Two messages with the same hash cannot be sent in one batch
local criterion = pushstream_channel .. ":" .. item.channel
-- Get the last queue order item with same hash had and add one
local queue_order = (blockings[criterion] or 0) + 1
-- Make subrequest batches contain not more than MAX_SUBREQUESTS
while queue[queue_order] ~= nil and queue[queue_order][MAX_SUBREQUESTS] do
queue_order = queue_order + 1
end
-- Sync back
blockings[criterion] = queue_order
-- Create batch if needed
if queue[queue_order] == nil then
queue[queue_order] = {}
end
-- Append to batch
table.insert(queue[queue_order], {
"/message-post?id=" .. pushstream_channel, {
method = ngx.HTTP_POST,
body = data
}
})
stat_count = stat_count + 1
end
end
for _, item in ipairs(arr) do
local ok, err = pcall(process_item, item)
if not ok then
ngx.log(ngx.ERR, err)
end
end
local stat_queues = 0
for _, reqs in ipairs(queue) do
-- Call subrequests. If it fails, it's bad, but at least nginx won't spit out 500
local ok, err = pcall(ngx.location.capture_multi, reqs)
if not ok then
ngx.log(ngx.ERR, err)
end
stat_queues = stat_queues + 1
end
ngx.say('OK: Sent ' .. stat_count .. ' requests in ' .. stat_queues .. ' queues.')
From your configuration, I could see that you are not storing the messages, which means that right after you publish a message it is stored in the trash queue and wait until all workers delivered it to finally delete the message. This leads me to new questions:
One side note, if I understood correctly on your /message-post-multi
location you are publishing the same message to more than one channel but one each time, correct?
If this is the case, you might want to publish to all channels at once setting the channels_path like 'channel1/channel2/channel3/...'
Sorry for delay. No, nginx is not reloaded, and workers are not dying. Could you give me example for multiple channels message publishing, please?
If you have a publisher location like
location /message-post {
push_stream_publisher;
push_stream_channels_path $arg_id;
allow 127.0.0.1;
}
You can post a Hello
message to channels A
, B
and C
doing
curl -s -v -X POST 'http://localhost/pub?id=A/C/B' -d 'Hello'
I mean, is just concatenate the channels IDs using a slash. The module will add the message to all the specified channels.
About the error you are having, can you recompile your Nginx using the version at the clean_memory branch?
It has a small change when checking the trash queue.
You should see a message like DEBUG version 1
in the logs when you start the server, and when it begins to have pending messages on the trash, that should have been deleted, you will see messages like Message expired on trash but still with reference (id: SOME NUMBER, ref count: ANOTHER NUMBER)
.
If this message appears more than once for the same id, please revert to your previous version of Nginx (to not fill in your disk with these log messages) and send me the logs.
I'm tried version from clean_memory branch and nothing happens, no logs. It must be my mistake, I'll try later again and update this issue.
Hi @heldner were you able to try to compile the Nginx again with the branch code?
Hi @wandenberg! Sorry for delay. At this moment I have nginx with module from branch clean_memory working for 4 days. And it looks like trash queue is not growing. But! I didn't saw debug messages, neither "DEBUG version 1" nor "Message expired on ..". :thinking: pushstream-2018-12-11.jpg
Good that the trash is not growing anymore. The "Message expired on ..." was really not expected to happen, it is a good thing. But I expected that you were able to see the "DEBUG version 1" message right after you started the server. Do you mind to send me the Nginx binary that you are using? Then I can try to double check that you are really using the new code and prepare a version with the changes in a cleaned way to merge on the master. To be safe that it really fixed the issue.
Here nginx binary. I'm sure your fix is working. This is last month graph - pushstream-1711-1712.jpg. Clean_memory version was deployed 06-12-2018.
Hi @heldner I am preparing to merge this fix on master. Do you mind to test the latest commit on the clean_memory branch? (https://github.com/wandenberg/nginx-push-stream-module/commit/a150091aab224c1e71e3d5f27e39d4fd9bec066f) You should see a message 'ngx_http_push_stream_module DEBUG version 2.' when the server starts. Basically is the same as before but with a check to not spend CPU time.
Hello, @wandenberg! Yes, I'm tested and I have multiple instances of nginx with pushstream builded from (https://github.com/wandenberg/nginx-push-stream-module/commit/a150091aab224c1e71e3d5f27e39d4fd9bec066f). I saw debug message at nginx start.
Hello @heldner, what was the result with the latest release?
@wandenberg sorry for delay. I'm not tested a150091 commit. Test has planned on the next week.
I have a bad news. Nginx, with builded pushstream module from commit https://github.com/wandenberg/nginx-push-stream-module/commit/a150091aab224c1e71e3d5f27e39d4fd9bec066f fails with errors
[Wed Mar 20 14:58:23 2019] traps: nginx[49671] general protection ip:56a5c6 sp:7ffca87a5e40 error:0 in nginx[400000+3f3000]
[Mon Mar 25 16:00:36 2019] traps: nginx[80644] general protection ip:56a5c6 sp:7ffca87a5e90 error:0 in nginx[400000+3f3000]
I'm backed to
Hello!
I'm having a problem with large trash queue. It grows slowly. version of push-stream-module 0.5.4
these options are default:
in error logs I have nothing corresponding to the problem zabbix graph