Open yaauie opened 6 years ago
+1 For the issue, it seems that several connections are opened and kept in pool, but only one of the used at any time.
Any updates please?
Is the problem here that one filter is serially accessed by each worker?
We have discussed batch event processing in enhancement filters before. No conclusive approach was decided upon. One strategy that I am interested in exploring is distinct value processing, assumes that batches have clusters of events that will lookup the same value, e.g. IP address or product id. This could also be called cache by batch (cache cleared after each batch).
@guyboertje In our particular case the problem was unexpected number of connections to database and no ability to control them.
We had 10 pipelines with several filters each and this setup sometimes gave bursts of 100 open connections which were too many for our DB.
The best solution for this case would be ability to configure shared connection pool in all pipelines in filters, but it may lead to starvation and not sure if technically possible to implement. So the bare minimum is to simply allow set number of connections for each filter's connection pool. Since now it uses 4 connections per filter as default and cannot be set to 1 connection per filter max.
@guyboertje Thank you for your response.
Is the problem here that one filter is serially accessed by each worker?
Yes, In our case we have one pipeline, with 5 workers and a batch size of 5000.
It is a simple setup, I guess, however, we noticed that the process freezes for records > 50. We have close to 2.5 Million rows from the input SQL query, if we don't include the JDBC streaming, then the process completes in a jiffy. We suspect that the JDBC steaming is causing some bottleneck (sequential executions??) not sure if the contention is on the Oracle connection or something else.
We have discussed batch event processing in enhancement filters before. No conclusive approach was decided upon.
Would this approach help with the JDBC streaming filter?
One strategy that I am interested in exploring is distinct value processing, assumes that batches have clusters of events that will lookup the same value, e.g. IP address or product id. This could also be called cache by batch (cache cleared after each batch).
Sorry, but I'm not too familiar with the architecture of Logstash i.e. how the workers get distributed among the batches etc.
Any help or insight into our situation would be appreciated.
I'm including a trimmed out snippet of our config:
input {
jdbc {
clean_run => false
jdbc_driver_library => "C:\logstash\ojdbc8.jar"
jdbc_driver_class => "Java::oracle.jdbc.OracleDriver"
jdbc_connection_string => "jdbc:oracle:thin:hostserver.abc.xyz/service"
jdbc_user => "xxxx"
jdbc_password => "xxxxx"
# Connection pool configuration.
# Validate connection before use.
jdbc_validate_connection => true
#How often to validate a connection (in seconds)
jdbc_validation_timeout => 120
#jdbc_validation_query => "SELECT 1 FROM DUAL"
#Maximum number of times to try connecting to database
connection_retry_attempts => 5
# Number of seconds to sleep between connection attempts
connection_retry_attempts_wait_time => 2
lowercase_column_names => true
# General/Vendor-specific Sequel configuration options.
# https://github.com/jeremyevans/sequel/blob/master/doc/opening_databases.rdoc
sequel_opts =>
{
login_timeout => "60"
prefetch_rows => "5000"
jdbc_properties =>
{
"defaultRowPrefetch" => "5000"
"loginTimeout" => "60"
"inactiveConnectionTimeout" => "120"
"timeoutCheckInterval" => "120"
"tcpKeepAlive" => "true"
"oracle.net.READ_TIMEOUT" => "5000"
"validationQuery" => "SELECT 1 FROM DUAL"
}
}
# SQL Query
statement => " SELECT id, abc, def, xyz FROM table "
}
}
filter {
jdbc_streaming {
...
jdbc_connection_string => "jdbc:oracle:thin:hostserver.abc.xyz/service"
...
target => "target_field_1"
use_cache => "false"
cache_expiration => "5"
cache_size => "0"
statement => "SELECT blah FROM TABLE_1 WHERE ID = :id"
parameters => {"id" => "id"}
}
jdbc_streaming {
...
jdbc_connection_string => "jdbc:oracle:thin:hostserver.abc.xyz/service"
...
target => "target_field_2"
use_cache => "false"
cache_expiration => "5"
cache_size => "0"
statement => "SELECT blah, blah, blah FROM TABLE_2 WHERE ID = :id"
parameters => {"id" => "id"}
}
#Add field first to handle if there is any null code
mutate { add_field => { "[xyz][code]" => "" }}
if [code] { mutate { replace => { "[xyz][code]" => "%{code}" } } }
}
output {
elasticsearch {
hosts => xxxx
index => "indexname"
http_compression => true
}
}
Ahh OK. So you mean global connection pooling. I don't think this can be done in Logstash as each filter is seen as an autonomous transformation engine working on a single event at a time.
I have seen similar questions before some time back now, at that time I searched for any kind of jdbc db proxy. I searched again today and found:
Both are actively developed and are open source. I have not tested any of them. Please feedback any conclusions if you decide to evaluate them and/or use one in production.
In either the original direct or a proxied (pooled) indirect setup we still have a job to do understanding how the various jdbc plugins will react to waiting for its turn at execution and the timing out thereof. I must admit that this is a lesser understood facet of the jdbc plugin behaviour.
@guyboertje
Ok. That leads to a followup question, do (or can) the workers process filters in parallel, i.e. lets suppose I have 5 workers, each is processing an input event (so 5 in all) in parallel, and for each of those events will there be an autonomous instance of a filter be created to process the event?
Thanks for the pointers to the JDBC connection pooling, depending on the answer to my above question, we can narrow down to where the main bottleneck is, i.e. if the filters can't process multiple events in parallel, then connection pooling becomes secondary (but a bottleneck nonetheless).
@stanuku
From @dmitrymurashenkov above...
We had 10 pipelines with several filters each and this setup sometimes gave bursts of 100 open connections which were too many for our DB.
Looking at the Logstash and Sequel code, each pipeline is autonomous and each filter plugin instance (as seen in the config) declares (from the authors) itself threadsafe or not. Threadsafe filters are reused across workers (but not pipelines). Threadsafe filters are assumed to be able to be called in parallel by each worker thread. On my Macbook, the jdbc_streaming filter uses a threaded
connection pool with has 4 connections by default.
Each worker takes a batch of events from the queue (the inputs feed newly minted events into the queue) and feeds the events from the batch through each filter sequentially based on the conditional logic in the config (if any). This means that if you have two jdbc_streaming filters one after the other then only one will be executing a statement at any one time per worker thread.
Simultaneous execution of a statement by multiple workers threads is probable (up to the 4 connection limit, the default pool size) but to what degree this simultaneous statement execution occurs is determined by the how synchronised the worker loops become as each worker loop is subject to variable delays as it executes the filters and output(s).
Thinking about a worst case scenario, imagine that a jdbc_streaming filter is used to lookup user details from an userid
held in an event and that a stream of 2500 events all refer to the same user id. The exact same statement will be executed 2500 times - this is clearly very wasteful but the jdbc_streaming filter is designed with volatility in mind, i.e. that the lookup source tables change often and unpredictably. The jdbc_static filter is designed with non-volatile lookup sources in mind (but a refresh schedule allows for periodic refill of the latest changes). However, jdbc_static is less useful when the lookup sources are very large. There are some mix and match options possible to mitigate downloading millions of lookup records in the jdbc_static by using a jdbc_static for the "hot" 20000 ids and a conditional jdbc_streaming filter for the "cold" rest.
To test whether a bigger pool size will improve throughput or whether a smaller pool size will put less load on the DB you can modify the jdbc_streaming
filter source code (in place). Replace line 53 of the file vendor/bundle/jruby/2.3.0/gems/logstash-filter-jdbc_streaming-1.0.4/lib/logstash/plugin_mixins/jdbc_streaming.rb
with these 3 lines:
opts = {:user=> @jdbc_user, :password=> @jdbc_password.nil? ? nil : @jdbc_password.value}
opts[:max_connections] = 9 # number of workers plus 1
@database = Sequel.connect(@jdbc_connection_string, opts)
If you do this test, please report your findings back here.
@guyboertje Much appreciated!, will keep you posted.
Since
Sequel
supports connection pooling by default, exposing the ability to control aspects of pooling up to the pipeline configuration should be pretty straight-forward.