Closed RekGRpth closed 4 years ago
It is not possible. But you can make your backend application connect to the events channel and filter the events that want to handle. Will be more efficient than handle multiple POST request/response.
I understood your suggestion/question. But as I said, the module does not have this support, and for performance reasons, do not make sense to add it, since the backend application can connect to Nginx and receive the events. And with the option to use a persistent connection using streaming either with WebSocket or with EventSource.
how about this
diff --git a/include/ngx_http_push_stream_rbtree_util.h b/include/ngx_http_push_stream_rbtree_util.h
index 661cbde..2a941de 100644
--- a/include/ngx_http_push_stream_rbtree_util.h
+++ b/include/ngx_http_push_stream_rbtree_util.h
@@ -34,7 +34,7 @@
#ifndef NGX_HTTP_PUSH_STREAM_RBTREE_UTIL_H_
#define NGX_HTTP_PUSH_STREAM_RBTREE_UTIL_H_
-static ngx_http_push_stream_channel_t * ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf);
+static ngx_http_push_stream_channel_t * ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf, ngx_http_request_t *r);
static ngx_http_push_stream_channel_t * ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf);
static void ngx_rbtree_generic_insert(ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel, int (*compare) (const ngx_rbtree_node_t *left, const ngx_rbtree_node_t *right));
diff --git a/src/ngx_http_push_stream_module_publisher.c b/src/ngx_http_push_stream_module_publisher.c
index ee2b691..247bf5f 100644
--- a/src/ngx_http_push_stream_module_publisher.c
+++ b/src/ngx_http_push_stream_module_publisher.c
@@ -97,7 +97,7 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
if (r->method & (NGX_HTTP_POST|NGX_HTTP_PUT)) {
// create the channel if doesn't exist
- requested_channel->channel = ngx_http_push_stream_get_channel(requested_channel->id, r->connection->log, mcf);
+ requested_channel->channel = ngx_http_push_stream_get_channel(requested_channel->id, r->connection->log, mcf, r);
if (requested_channel->channel == NULL) {
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL);
}
diff --git a/src/ngx_http_push_stream_module_setup.c b/src/ngx_http_push_stream_module_setup.c
index 8c63c8f..3475829 100644
--- a/src/ngx_http_push_stream_module_setup.c
+++ b/src/ngx_http_push_stream_module_setup.c
@@ -1131,7 +1131,7 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
d->mutex_round_robin = 0;
if (mcf->events_channel_id.len > 0) {
- if ((d->events_channel = ngx_http_push_stream_get_channel(&mcf->events_channel_id, ngx_cycle->log, mcf)) == NULL) {
+ if ((d->events_channel = ngx_http_push_stream_get_channel(&mcf->events_channel_id, ngx_cycle->log, mcf, NULL)) == NULL) {
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "push stream module: unable to create events channel");
return NGX_ERROR;
}
diff --git a/src/ngx_http_push_stream_module_subscriber.c b/src/ngx_http_push_stream_module_subscriber.c
index d09966b..49f1600 100644
--- a/src/ngx_http_push_stream_module_subscriber.c
+++ b/src/ngx_http_push_stream_module_subscriber.c
@@ -355,7 +355,7 @@ ngx_http_push_stream_validate_channels(ngx_http_request_t *r, ngx_http_push_stre
continue;
}
- requested_channel->channel = ngx_http_push_stream_get_channel(requested_channel->id, r->connection->log, mcf);
+ requested_channel->channel = ngx_http_push_stream_get_channel(requested_channel->id, r->connection->log, mcf, r);
if (requested_channel->channel == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for new channel");
*status_code = NGX_HTTP_INTERNAL_SERVER_ERROR;
@@ -639,6 +639,12 @@ ngx_http_push_stream_assing_subscription_to_channel(ngx_slab_pool_t *shpool, ngx
ngx_http_push_stream_send_event(mcf, log, channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CLIENT_SUBSCRIBED, NULL);
+ ngx_http_request_t *sr;
+ ngx_str_t uri = ngx_string("/client_subscribed");
+ if (ngx_http_subrequest(subscription->subscriber->request, &uri, &subscription->subscriber->request->args, &sr, NULL, NGX_HTTP_SUBREQUEST_BACKGROUND) != NGX_OK) {
+ ngx_log_error(NGX_LOG_ERR, subscription->subscriber->request->connection->log, 0, "ngx_http_subrequest != NGX_OK");
+ }
+
return NGX_OK;
}
diff --git a/src/ngx_http_push_stream_module_utils.c b/src/ngx_http_push_stream_module_utils.c
index a87c76b..2b279f2 100644
--- a/src/ngx_http_push_stream_module_utils.c
+++ b/src/ngx_http_push_stream_module_utils.c
@@ -1548,6 +1548,9 @@ ngx_http_push_stream_worker_subscriber_cleanup(ngx_http_push_stream_subscriber_t
ngx_slab_pool_t *shpool = mcf->shpool;
ngx_queue_t *cur;
+ ngx_http_request_t *sr;
+ ngx_str_t uri = ngx_string("/client_unsubscribed");
+
while (!ngx_queue_empty(&worker_subscriber->subscriptions)) {
cur = ngx_queue_head(&worker_subscriber->subscriptions);
ngx_http_push_stream_subscription_t *subscription = ngx_queue_data(cur, ngx_http_push_stream_subscription_t, queue);
@@ -1559,6 +1562,10 @@ ngx_http_push_stream_worker_subscriber_cleanup(ngx_http_push_stream_subscriber_t
ngx_shmtx_unlock(subscription->channel->mutex);
ngx_http_push_stream_send_event(mcf, ngx_cycle->log, subscription->channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CLIENT_UNSUBSCRIBED, worker_subscriber->request->pool);
+
+ if (ngx_http_subrequest(worker_subscriber->request, &uri, &worker_subscriber->request->args, &sr, NULL, NGX_HTTP_SUBREQUEST_BACKGROUND) != NGX_OK) {
+ ngx_log_error(NGX_LOG_ERR, worker_subscriber->request->connection->log, 0, "ngx_http_subrequest != NGX_OK");
+ }
}
ngx_shmtx_lock(&shpool->mutex);
diff --git a/src/ngx_http_push_stream_rbtree_util.c b/src/ngx_http_push_stream_rbtree_util.c
index 531d025..6e1179f 100644
--- a/src/ngx_http_push_stream_rbtree_util.c
+++ b/src/ngx_http_push_stream_rbtree_util.c
@@ -94,7 +94,7 @@ ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_s
// find a channel by id. if channel not found, make one, insert it, and return that.
static ngx_http_push_stream_channel_t *
-ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf)
+ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf, ngx_http_request_t *r)
{
ngx_http_push_stream_shm_data_t *data = mcf->shm_data;
ngx_http_push_stream_channel_t *channel;
@@ -168,6 +168,14 @@ ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_st
ngx_http_push_stream_send_event(mcf, log, channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CHANNEL_CREATED, NULL);
+ if (r != NULL) {
+ ngx_http_request_t *sr;
+ ngx_str_t uri = ngx_string("/channel_created");
+ if (ngx_http_subrequest(r, &uri, &r->args, &sr, NULL, NGX_HTTP_SUBREQUEST_BACKGROUND) != NGX_OK) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "ngx_http_subrequest != NGX_OK");
+ }
+ }
+
return channel;
}
or more generally
diff --git a/include/ngx_http_push_stream_module_utils.h b/include/ngx_http_push_stream_module_utils.h
index f0ab8e6..8c99cf1 100644
--- a/include/ngx_http_push_stream_module_utils.h
+++ b/include/ngx_http_push_stream_module_utils.h
@@ -225,10 +225,10 @@ static const ngx_str_t NGX_HTTP_PUSH_STREAM_CALLBACK_CONTENT_TYPE = ngx_string(
static const ngx_str_t NGX_HTTP_PUSH_STREAM_PADDING_BY_USER_AGENT_PATTERN = ngx_string("([^:]+),(\\d+),(\\d+)");
#define NGX_HTTP_PUSH_STREAM_EVENT_TEMPLATE "{\"type\": \"%V\", \"channel\": \"%V\"}%Z"
-static ngx_str_t NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CHANNEL_CREATED = ngx_string("channel_created");
-static ngx_str_t NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CHANNEL_DESTROYED = ngx_string("channel_destroyed");
-static ngx_str_t NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CLIENT_SUBSCRIBED = ngx_string("client_subscribed");
-static ngx_str_t NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CLIENT_UNSUBSCRIBED = ngx_string("client_unsubscribed");
+static ngx_str_t NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CHANNEL_CREATED = ngx_string("/channel_created");
+static ngx_str_t NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CHANNEL_DESTROYED = ngx_string("/channel_destroyed");
+static ngx_str_t NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CLIENT_SUBSCRIBED = ngx_string("/client_subscribed");
+static ngx_str_t NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CLIENT_UNSUBSCRIBED = ngx_string("/client_unsubscribed");
ngx_event_t ngx_http_push_stream_memory_cleanup_event;
@@ -263,7 +263,7 @@ static void ngx_http_push_stream_complex_value(ngx_http_request_
ngx_int_t ngx_http_push_stream_add_msg_to_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_log_t *log, ngx_http_push_stream_channel_t *channel, u_char *text, size_t len, ngx_str_t *event_id, ngx_str_t *event_type, ngx_flag_t store_messages, ngx_pool_t *temp_pool);
-ngx_int_t ngx_http_push_stream_send_event(ngx_http_push_stream_main_conf_t *mcf, ngx_log_t *log, ngx_http_push_stream_channel_t *channel, ngx_str_t *event_id, ngx_pool_t *temp_pool);
+ngx_int_t ngx_http_push_stream_send_event(ngx_http_push_stream_main_conf_t *mcf, ngx_log_t *log, ngx_http_push_stream_channel_t *channel, ngx_str_t *event_id, ngx_pool_t *temp_pool, ngx_http_request_t *r);
static void ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev);
static void ngx_http_push_stream_disconnect_timer_wake_handler(ngx_event_t *ev);
diff --git a/include/ngx_http_push_stream_rbtree_util.h b/include/ngx_http_push_stream_rbtree_util.h
index 661cbde..2a941de 100644
--- a/include/ngx_http_push_stream_rbtree_util.h
+++ b/include/ngx_http_push_stream_rbtree_util.h
@@ -34,7 +34,7 @@
#ifndef NGX_HTTP_PUSH_STREAM_RBTREE_UTIL_H_
#define NGX_HTTP_PUSH_STREAM_RBTREE_UTIL_H_
-static ngx_http_push_stream_channel_t * ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf);
+static ngx_http_push_stream_channel_t * ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf, ngx_http_request_t *r);
static ngx_http_push_stream_channel_t * ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf);
static void ngx_rbtree_generic_insert(ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel, int (*compare) (const ngx_rbtree_node_t *left, const ngx_rbtree_node_t *right));
diff --git a/src/ngx_http_push_stream_module_publisher.c b/src/ngx_http_push_stream_module_publisher.c
index ee2b691..247bf5f 100644
--- a/src/ngx_http_push_stream_module_publisher.c
+++ b/src/ngx_http_push_stream_module_publisher.c
@@ -97,7 +97,7 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
if (r->method & (NGX_HTTP_POST|NGX_HTTP_PUT)) {
// create the channel if doesn't exist
- requested_channel->channel = ngx_http_push_stream_get_channel(requested_channel->id, r->connection->log, mcf);
+ requested_channel->channel = ngx_http_push_stream_get_channel(requested_channel->id, r->connection->log, mcf, r);
if (requested_channel->channel == NULL) {
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL);
}
diff --git a/src/ngx_http_push_stream_module_setup.c b/src/ngx_http_push_stream_module_setup.c
index 8c63c8f..3475829 100644
--- a/src/ngx_http_push_stream_module_setup.c
+++ b/src/ngx_http_push_stream_module_setup.c
@@ -1131,7 +1131,7 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
d->mutex_round_robin = 0;
if (mcf->events_channel_id.len > 0) {
- if ((d->events_channel = ngx_http_push_stream_get_channel(&mcf->events_channel_id, ngx_cycle->log, mcf)) == NULL) {
+ if ((d->events_channel = ngx_http_push_stream_get_channel(&mcf->events_channel_id, ngx_cycle->log, mcf, NULL)) == NULL) {
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "push stream module: unable to create events channel");
return NGX_ERROR;
}
diff --git a/src/ngx_http_push_stream_module_subscriber.c b/src/ngx_http_push_stream_module_subscriber.c
index d09966b..3af5113 100644
--- a/src/ngx_http_push_stream_module_subscriber.c
+++ b/src/ngx_http_push_stream_module_subscriber.c
@@ -355,7 +355,7 @@ ngx_http_push_stream_validate_channels(ngx_http_request_t *r, ngx_http_push_stre
continue;
}
- requested_channel->channel = ngx_http_push_stream_get_channel(requested_channel->id, r->connection->log, mcf);
+ requested_channel->channel = ngx_http_push_stream_get_channel(requested_channel->id, r->connection->log, mcf, r);
if (requested_channel->channel == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for new channel");
*status_code = NGX_HTTP_INTERNAL_SERVER_ERROR;
@@ -637,7 +637,7 @@ ngx_http_push_stream_assing_subscription_to_channel(ngx_slab_pool_t *shpool, ngx
subscription->channel_worker_sentinel = worker_subscribers_sentinel;
ngx_shmtx_unlock(channel->mutex);
- ngx_http_push_stream_send_event(mcf, log, channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CLIENT_SUBSCRIBED, NULL);
+ ngx_http_push_stream_send_event(mcf, log, channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CLIENT_SUBSCRIBED, NULL, subscription->subscriber->request);
return NGX_OK;
}
diff --git a/src/ngx_http_push_stream_module_utils.c b/src/ngx_http_push_stream_module_utils.c
index a87c76b..6f35a59 100644
--- a/src/ngx_http_push_stream_module_utils.c
+++ b/src/ngx_http_push_stream_module_utils.c
@@ -114,7 +114,7 @@ ngx_http_push_stream_delete_channels_data(ngx_http_push_stream_shm_data_t *data)
// remove the subscription for the channel from worker
ngx_queue_remove(&subscription->channel_worker_queue);
- ngx_http_push_stream_send_event(mcf, ngx_cycle->log, subscription->channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CLIENT_UNSUBSCRIBED, subscriber->request->pool);
+ ngx_http_push_stream_send_event(mcf, ngx_cycle->log, subscription->channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CLIENT_UNSUBSCRIBED, subscriber->request->pool, subscriber->request);
if (subscriber->longpolling) {
ngx_http_push_stream_add_polling_headers(subscriber->request, ngx_time(), 0, subscriber->request->pool);
@@ -177,7 +177,7 @@ ngx_http_push_stream_collect_deleted_channels_data(ngx_http_push_stream_shm_data
data->channels_in_trash++;
ngx_shmtx_unlock(&data->channels_trash_mutex);
- ngx_http_push_stream_send_event(mcf, ngx_cycle->log, channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CHANNEL_DESTROYED, temp_pool);
+ ngx_http_push_stream_send_event(mcf, ngx_cycle->log, channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CHANNEL_DESTROYED, temp_pool, NULL);
}
}
ngx_shmtx_unlock(&data->channels_to_delete_mutex);
@@ -445,7 +445,7 @@ ngx_http_push_stream_add_msg_to_channel(ngx_http_push_stream_main_conf_t *mcf, n
ngx_int_t
-ngx_http_push_stream_send_event(ngx_http_push_stream_main_conf_t *mcf, ngx_log_t *log, ngx_http_push_stream_channel_t *channel, ngx_str_t *event_type, ngx_pool_t *received_temp_pool)
+ngx_http_push_stream_send_event(ngx_http_push_stream_main_conf_t *mcf, ngx_log_t *log, ngx_http_push_stream_channel_t *channel, ngx_str_t *event_type, ngx_pool_t *received_temp_pool, ngx_http_request_t *r)
{
ngx_http_push_stream_shm_data_t *data = mcf->shm_data;
ngx_pool_t *temp_pool = received_temp_pool;
@@ -467,6 +467,13 @@ ngx_http_push_stream_send_event(ngx_http_push_stream_main_conf_t *mcf, ngx_log_t
}
}
+ if (r != NULL) {
+ ngx_http_request_t *sr;
+ if (ngx_http_subrequest(r, event_type, &r->args, &sr, NULL, NGX_HTTP_SUBREQUEST_BACKGROUND) != NGX_OK) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "ngx_http_subrequest != NGX_OK");
+ }
+ }
+
return NGX_OK;
}
@@ -1033,7 +1040,7 @@ ngx_http_push_stream_collect_expired_messages_and_empty_channels_data(ngx_http_p
data->channels_in_trash++;
ngx_shmtx_unlock(&data->channels_trash_mutex);
- ngx_http_push_stream_send_event(mcf, ngx_cycle->log, channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CHANNEL_DESTROYED, temp_pool);
+ ngx_http_push_stream_send_event(mcf, ngx_cycle->log, channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CHANNEL_DESTROYED, temp_pool, NULL);
}
}
ngx_shmtx_unlock(&data->channels_queue_mutex);
@@ -1558,7 +1565,7 @@ ngx_http_push_stream_worker_subscriber_cleanup(ngx_http_push_stream_subscriber_t
ngx_queue_remove(&subscription->queue);
ngx_shmtx_unlock(subscription->channel->mutex);
- ngx_http_push_stream_send_event(mcf, ngx_cycle->log, subscription->channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CLIENT_UNSUBSCRIBED, worker_subscriber->request->pool);
+ ngx_http_push_stream_send_event(mcf, ngx_cycle->log, subscription->channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CLIENT_UNSUBSCRIBED, worker_subscriber->request->pool, worker_subscriber->request);
}
ngx_shmtx_lock(&shpool->mutex);
diff --git a/src/ngx_http_push_stream_rbtree_util.c b/src/ngx_http_push_stream_rbtree_util.c
index 531d025..0567326 100644
--- a/src/ngx_http_push_stream_rbtree_util.c
+++ b/src/ngx_http_push_stream_rbtree_util.c
@@ -94,7 +94,7 @@ ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_s
// find a channel by id. if channel not found, make one, insert it, and return that.
static ngx_http_push_stream_channel_t *
-ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf)
+ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf, ngx_http_request_t *r)
{
ngx_http_push_stream_shm_data_t *data = mcf->shm_data;
ngx_http_push_stream_channel_t *channel;
@@ -166,7 +166,7 @@ ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_st
ngx_shmtx_unlock(&data->channels_queue_mutex);
- ngx_http_push_stream_send_event(mcf, log, channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CHANNEL_CREATED, NULL);
+ ngx_http_push_stream_send_event(mcf, log, channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CHANNEL_CREATED, NULL, r);
return channel;
}
Hi @RekGRpth Maybe I was not clear. This change will introduce a big number of subrequests to do something that can be achieved with the current implementation, simply doing the "backend" application connecting to the "events channel" and consuming the events using any of the current delivery methods (polling, long polling, WebSocket or EventSource). I would not add this implementation to the core of the module, but of course, you can have your own version with it. I just strongly advise to do a benchmark and a load test. Maybe, for your application the performance is acceptable, but not for others applications.
I use it so
location = /ws {
push_stream_subscriber websocket;
push_stream_channels_path $arg_id;
push_stream_message_template "{\"id\":~id~,\"channel\":\"~channel~\",\"text\":~text~}";
}
location = /pub {
push_stream_publisher;
push_stream_channels_path $arg_id;
}
location = /client_unsubscribed {
internal;
postgres_pass ngx;
postgres_query "unlisten $arg_id";
}
location = /client_subscribed {
internal;
postgres_pass ngx;
postgres_query "listen $arg_id";
}
location = /channel_created {
internal;
echo "$arg_id";
}
i.e. on client_subscribed I send to postgres command "listen channel" then postgres can send async notify and I translate it to websocket on client_unsubscribed I send to postgres command "unlisten channel"
like mirror, i.e.