influxdata / influxdb-java

Java client for InfluxDB
MIT License
1.18k stars 476 forks source link

maximum connections reached (?) with max-connection-limit=0 #422

Closed davide195 closed 6 years ago

davide195 commented 6 years ago

Hi all, i've this issue with a function that delete from the DB sequentially a lot of measure. Error log is:

Exception while pinging database: org.influxdb.InfluxDBIOException: java.net.SocketException: No buffer space available (maximum connections reached?): connect at org.influxdb.impl.InfluxDBImpl.ping(InfluxDBImpl.java:254) at com.valtellina.eee.database.ItemLogDatabaseImpl.pingServer(ItemLogDatabaseImpl.java:86) at com.valtellina.eee.database.ItemLogDatabaseImpl.connect(ItemLogDatabaseImpl.java:79) at com.valtellina.eee.database.ItemLogDatabaseImpl.itemsToAggregate(ItemLogDatabaseImpl.java:262) at com.valtellina.eee.dataLogger.DataLoggerAggregationTask$Task.run(DataLoggerAggregationTask.java:111) at java.util.TimerThread.mainLoop(Timer.java:555) at java.util.TimerThread.run(Timer.java:505) Caused by: java.net.SocketException: No buffer space available (maximum connections reached?): connect at java.net.DualStackPlainSocketImpl.connect0(Native Method) at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:83) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:589) at okhttp3.internal.platform.Platform.connectSocket(Platform.java:125) at okhttp3.internal.connection.RealConnection.connectSocket(RealConnection.java:238) at okhttp3.internal.connection.RealConnection.connect(RealConnection.java:158) at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:256) at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:134) at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:113) at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:125) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) at org.influxdb.impl.GzipRequestInterceptor.intercept(GzipRequestInterceptor.java:42) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) at okhttp3.logging.HttpLoggingInterceptor.intercept(HttpLoggingInterceptor.java:143) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:200) at okhttp3.RealCall.execute(RealCall.java:77) at retrofit2.OkHttpCall.execute(OkHttpCall.java:180) at org.influxdb.impl.InfluxDBImpl.ping(InfluxDBImpl.java:240)

My conf:

### Welcome to the InfluxDB configuration file.

# The values in this file override the default values used by the system if
# a config option is not specified. The commented out lines are the configuration
# field and the default value used. Uncommenting a line and changing the value
# will change the value used at runtime when the process is restarted.

# Once every 24 hours InfluxDB will report usage data to usage.influxdata.com
# The data includes a random ID, os, arch, version, the number of series and other
# usage data. No data from user databases is ever transmitted.
# Change this option to true to disable reporting.
# reporting-disabled = false

# Bind address to use for the RPC service for backup and restore.
# bind-address = "127.0.0.1:8088"

###
### [meta]
###
### Controls the parameters for the Raft consensus group that stores metadata
### about the InfluxDB cluster.
###

[meta]
  # Where the metadata/raft database is stored
  dir = "C:/Users/Admin2/InfluxDB/meta"

  # Automatically create a default retention policy when creating a database.
  # retention-autocreate = true

  # If log messages are printed for the meta service
  logging-enabled = true

###
### [data]
###
### Controls where the actual shard data for InfluxDB lives and how it is
### flushed from the WAL. "dir" may need to be changed to a suitable place
### for your system, but the WAL settings are an advanced configuration. The
### defaults should work for most systems.
###

[data]
  # The directory where the TSM storage engine stores TSM files.
  dir = "C:/Users/Admin2/InfluxDB/data"

  # The directory where the TSM storage engine stores WAL files.
  wal-dir = "C:/Users/Admin2/InfluxDB/wal"

  # The amount of time that a write will wait before fsyncing.  A duration
  # greater than 0 can be used to batch up multiple fsync calls.  This is useful for slower
  # disks or when WAL write contention is seen.  A value of 0s fsyncs every write to the WAL.
  # Values in the range of 0-100ms are recommended for non-SSD disks.
  # wal-fsync-delay = "0s"

  # The type of shard index to use for new shards.  The default is an in-memory index that is
  # recreated at startup.  A value of "tsi1" will use a disk based index that supports higher
  # cardinality datasets.
  # index-version = "inmem"

  # Trace logging provides more verbose output around the tsm engine. Turning
  # this on can provide more useful output for debugging tsm engine issues.
  # trace-logging-enabled = false

  # Whether queries should be logged before execution. Very useful for troubleshooting, but will
  # log any sensitive data contained within a query.
  # query-log-enabled = true

  # Settings for the TSM engine

  # CacheMaxMemorySize is the maximum size a shard's cache can
  # reach before it starts rejecting writes.
  # Valid size suffixes are k, m, or g (case insensitive, 1024 = 1k).
  # Vaues without a size suffix are in bytes.
  # cache-max-memory-size = "1g"

  # CacheSnapshotMemorySize is the size at which the engine will
  # snapshot the cache and write it to a TSM file, freeing up memory
  # Valid size suffixes are k, m, or g (case insensitive, 1024 = 1k).
  # Values without a size suffix are in bytes.
  # cache-snapshot-memory-size = "25m"

  # CacheSnapshotWriteColdDuration is the length of time at
  # which the engine will snapshot the cache and write it to
  # a new TSM file if the shard hasn't received writes or deletes
  # cache-snapshot-write-cold-duration = "10m"

  # CompactFullWriteColdDuration is the duration at which the engine
  # will compact all TSM files in a shard if it hasn't received a
  # write or delete
  # compact-full-write-cold-duration = "4h"

  # The maximum number of concurrent full and level compactions that can run at one time.  A
  # value of 0 results in 50% of runtime.GOMAXPROCS(0) used at runtime.  Any number greater
  # than 0 limits compactions to that value.  This setting does not apply
  # to cache snapshotting.
  # max-concurrent-compactions = 0

  # The maximum series allowed per database before writes are dropped.  This limit can prevent
  # high cardinality issues at the database level.  This limit can be disabled by setting it to
  # 0.
  # max-series-per-database = 1000000

  # The maximum number of tag values per tag that are allowed before writes are dropped.  This limit
  # can prevent high cardinality tag values from being written to a measurement.  This limit can be
  # disabled by setting it to 0.
  # max-values-per-tag = 100000

###
### [coordinator]
###
### Controls the clustering service configuration.
###

[coordinator]
  # The default time a write request will wait until a "timeout" error is returned to the caller.
  # write-timeout = "10s"

  # The maximum number of concurrent queries allowed to be executing at one time.  If a query is
  # executed and exceeds this limit, an error is returned to the caller.  This limit can be disabled
  # by setting it to 0.
  # max-concurrent-queries = 0

  # The maximum time a query will is allowed to execute before being killed by the system.  This limit
  # can help prevent run away queries.  Setting the value to 0 disables the limit.
  # query-timeout = "0s"

  # The time threshold when a query will be logged as a slow query.  This limit can be set to help
  # discover slow or resource intensive queries.  Setting the value to 0 disables the slow query logging.
  # log-queries-after = "0s"

  # The maximum number of points a SELECT can process.  A value of 0 will make
  # the maximum point count unlimited.  This will only be checked every second so queries will not
  # be aborted immediately when hitting the limit.
  # max-select-point = 0

  # The maximum number of series a SELECT can run.  A value of 0 will make the maximum series
  # count unlimited.
  # max-select-series = 0

  # The maxium number of group by time bucket a SELECT can create.  A value of zero will max the maximum
  # number of buckets unlimited.
  # max-select-buckets = 0

###
### [retention]
###
### Controls the enforcement of retention policies for evicting old data.
###

[retention]
  # Determines whether retention policy enforcement enabled.
  # enabled = true

  # The interval of time when retention policy enforcement checks run.
  # check-interval = "30m"

###
### [shard-precreation]
###
### Controls the precreation of shards, so they are available before data arrives.
### Only shards that, after creation, will have both a start- and end-time in the
### future, will ever be created. Shards are never precreated that would be wholly
### or partially in the past.

[shard-precreation]
  # Determines whether shard pre-creation service is enabled.
  # enabled = true

  # The interval of time when the check to pre-create new shards runs.
  # check-interval = "10m"

  # The default period ahead of the endtime of a shard group that its successor
  # group is created.
  # advance-period = "30m"

###
### Controls the system self-monitoring, statistics and diagnostics.
###
### The internal database for monitoring data is created automatically if
### if it does not already exist. The target retention within this database
### is called 'monitor' and is also created with a retention period of 7 days
### and a replication factor of 1, if it does not exist. In all cases the
### this retention policy is configured as the default for the database.

[monitor]
  # Whether to record statistics internally.
  # store-enabled = true

  # The destination database for recorded statistics
  # store-database = "_internal"

  # The interval at which to record statistics
  # store-interval = "10s"

###
### [http]
###
### Controls how the HTTP endpoints are configured. These are the primary
### mechanism for getting data into and out of InfluxDB.
###

[http]
  # Determines whether HTTP endpoint is enabled.
  # enabled = true

  # The bind address used by the HTTP service.
  # bind-address = ":8086"

  # Determines whether user authentication is enabled over HTTP/HTTPS.
  # auth-enabled = false

  # The default realm sent back when issuing a basic auth challenge.
  # realm = "InfluxDB"

  # Determines whether HTTP request logging is enabled.
  # log-enabled = true

  # Determines whether detailed write logging is enabled.
  # write-tracing = false

  # Determines whether the pprof endpoint is enabled.  This endpoint is used for
  # troubleshooting and monitoring.
  # pprof-enabled = true

  # Determines whether HTTPS is enabled.
  # https-enabled = false

  # The SSL certificate to use when HTTPS is enabled.
  # https-certificate = "/etc/ssl/influxdb.pem"

  # Use a separate private key location.
  # https-private-key = ""

  # The JWT auth shared secret to validate requests using JSON web tokens.
  # shared-secret = ""

  # The default chunk size for result sets that should be chunked.
  # max-row-limit = 0

  # The maximum number of HTTP connections that may be open at once.  New connections that
  # would exceed this limit are dropped.  Setting this value to 0 disables the limit.
  max-connection-limit = 0

  # Enable http service over unix domain socket
  # unix-socket-enabled = false

  # The path of the unix domain socket.
  # bind-socket = "/var/run/influxdb.sock"

  # The maximum size of a client request body, in bytes. Setting this value to 0 disables the limit.
  # max-body-size = 25000000

###
### [ifql]
###
### Configures the ifql RPC API.
###

[ifql]
  # Determines whether the RPC service is enabled.
  # enabled = true

  # Determines whether additional logging is enabled.
  # log-enabled = true

  # The bind address used by the ifql RPC service.
  # bind-address = ":8082"

###
### [subscriber]
###
### Controls the subscriptions, which can be used to fork a copy of all data
### received by the InfluxDB host.
###

[subscriber]
  # Determines whether the subscriber service is enabled.
  # enabled = true

  # The default timeout for HTTP writes to subscribers.
  # http-timeout = "30s"

  # Allows insecure HTTPS connections to subscribers.  This is useful when testing with self-
  # signed certificates.
  # insecure-skip-verify = false

  # The path to the PEM encoded CA certs file. If the empty string, the default system certs will be used
  # ca-certs = ""

  # The number of writer goroutines processing the write channel.
  # write-concurrency = 40

  # The number of in-flight writes buffered in the write channel.
  # write-buffer-size = 1000

###
### [[graphite]]
###
### Controls one or many listeners for Graphite data.
###

[[graphite]]
  # Determines whether the graphite endpoint is enabled.
  # enabled = false
  # database = "graphite"
  # retention-policy = ""
  # bind-address = ":2003"
  # protocol = "tcp"
  # consistency-level = "one"

  # These next lines control how batching works. You should have this enabled
  # otherwise you could get dropped metrics or poor performance. Batching
  # will buffer points in memory if you have many coming in.

  # Flush if this many points get buffered
  # batch-size = 5000

  # number of batches that may be pending in memory
  # batch-pending = 10

  # Flush at least this often even if we haven't hit buffer limit
  # batch-timeout = "1s"

  # UDP Read buffer size, 0 means OS default. UDP listener will fail if set above OS max.
  # udp-read-buffer = 0

  ### This string joins multiple matching 'measurement' values providing more control over the final measurement name.
  # separator = "."

  ### Default tags that will be added to all metrics.  These can be overridden at the template level
  ### or by tags extracted from metric
  # tags = ["region=us-east", "zone=1c"]

  ### Each template line requires a template pattern.  It can have an optional
  ### filter before the template and separated by spaces.  It can also have optional extra
  ### tags following the template.  Multiple tags should be separated by commas and no spaces
  ### similar to the line protocol format.  There can be only one default template.
  # templates = [
  #   "*.app env.service.resource.measurement",
  #   # Default template
  #   "server.*",
  # ]

###
### [collectd]
###
### Controls one or many listeners for collectd data.
###

[[collectd]]
  # enabled = false
  # bind-address = ":25826"
  # database = "collectd"
  # retention-policy = ""
  #
  # The collectd service supports either scanning a directory for multiple types
  # db files, or specifying a single db file.
  # typesdb = "/usr/local/share/collectd"
  #
  # security-level = "none"
  # auth-file = "/etc/collectd/auth_file"

  # These next lines control how batching works. You should have this enabled
  # otherwise you could get dropped metrics or poor performance. Batching
  # will buffer points in memory if you have many coming in.

  # Flush if this many points get buffered
  # batch-size = 5000

  # Number of batches that may be pending in memory
  # batch-pending = 10

  # Flush at least this often even if we haven't hit buffer limit
  # batch-timeout = "10s"

  # UDP Read buffer size, 0 means OS default. UDP listener will fail if set above OS max.
  # read-buffer = 0

  # Multi-value plugins can be handled two ways.
  # "split" will parse and store the multi-value plugin data into separate measurements
  # "join" will parse and store the multi-value plugin as a single multi-value measurement.
  # "split" is the default behavior for backward compatability with previous versions of influxdb.
  # parse-multivalue-plugin = "split"
###
### [opentsdb]
###
### Controls one or many listeners for OpenTSDB data.
###

[[opentsdb]]
  # enabled = false
  # bind-address = ":4242"
  # database = "opentsdb"
  # retention-policy = ""
  # consistency-level = "one"
  # tls-enabled = false
  # certificate= "/etc/ssl/influxdb.pem"

  # Log an error for every malformed point.
  # log-point-errors = true

  # These next lines control how batching works. You should have this enabled
  # otherwise you could get dropped metrics or poor performance. Only points
  # metrics received over the telnet protocol undergo batching.

  # Flush if this many points get buffered
  # batch-size = 1000

  # Number of batches that may be pending in memory
  # batch-pending = 5

  # Flush at least this often even if we haven't hit buffer limit
  # batch-timeout = "1s"

###
### [[udp]]
###
### Controls the listeners for InfluxDB line protocol data via UDP.
###

[[udp]]
  # enabled = false
  # bind-address = ":8089"
  # database = "udp"
  # retention-policy = ""

  # These next lines control how batching works. You should have this enabled
  # otherwise you could get dropped metrics or poor performance. Batching
  # will buffer points in memory if you have many coming in.

  # Flush if this many points get buffered
  # batch-size = 5000

  # Number of batches that may be pending in memory
  # batch-pending = 10

  # Will flush at least this often even if we haven't hit buffer limit
  # batch-timeout = "1s"

  # UDP Read buffer size, 0 means OS default. UDP listener will fail if set above OS max.
  # read-buffer = 0

###
### [continuous_queries]
###
### Controls how continuous queries are run within InfluxDB.
###

[continuous_queries]
  # Determines whether the continuous query service is enabled.
  # enabled = true

  # Controls whether queries are logged when executed by the CQ service.
  # log-enabled = true

  # Controls whether queries are logged to the self-monitoring data store.
  # query-stats-enabled = false

  # interval for how often continuous queries will be checked if they need to run
  # run-interval = "1s"

The function:

public void delete(String sn, String reg_name, Long last_ts) {
         InfluxDB influx = connect();

        Query queryObject = new Query("DELETE FROM \"modbus_measure\" WHERE sn = '"+ sn +"' and reg_name = '"+ reg_name +"' AND time < "+ last_ts, db);
        QueryResult queryResult = influx.query(queryObject);

        influx.close();
}

public InfluxDB connect() {
        InfluxDB dbc = InfluxDBFactory.connect(url, username, password);
        dbc.setDatabase(db);
        log.info("Connected to the database!"+ pingServer(dbc));
        return dbc;
 }

Why i get this error?

Thanks.

fmachado commented 6 years ago

@davide195 one of the InfluxDB key features[1] is the capability of "efficiently auto-expire stale data" using "Retention Policy"[2].

For example, you could create a RP to keep 1 year, 6 months or just 1 day of data. This will automatically remove stale data and I strongly advise you to adopt a RP (or multiple RP's) instead of building your own cleanup solution in Java.

Regarding your code: a) If you have only one InfluxDB database with a single user, declare InfluxDB dbc as a class scoped object and create it only once instead of creating every time you call a method. InfluxDB dbc object can be shared and used concurrently.

b) If you have multiple InfluxDB databases with multiple users and don't want to adapt the approach mentioned before, call InfluxDBFactory.connect(String url, String username, String password, OkHttpClient.Builder client)[3] passing a shared OkHttpClient.Builder object. This will make all InfluxDB client objects share the same HTTP connection pool[4].

Take a look on the links that I am sharing with you. If you need more help or something is not clear, ask again. ;)

Fernando

[1] https://docs.influxdata.com/influxdb/v1.4/#key-features [2] https://docs.influxdata.com/influxdb/v1.4/concepts/glossary/#retention-policy-rp [3] https://github.com/influxdata/influxdb-java/blob/master/src/main/java/org/influxdb/InfluxDBFactory.java#L79 [4] https://github.com/square/okhttp/wiki/Concurrency#connection-pool

davide195 commented 6 years ago

Hello @fmachado , thanks for the reply.

I'm passed to the "one shared instance" of dbc.

i know the possibility to use the RP but before the deleting of the data i need to aggregate it. What i need is an aggregation from the raw 10 sec to 1h only for the data older then 28 days. Using the RP there is the possibility to delete before the aggregation, loosing the data. I know that there is the CQ but it aggregate the data between now() and now()-1h and its not what i need. Am i wrong?

Thanks

fmachado commented 6 years ago

One last comment before closing this issue: googling "SocketException: No buffer space available" I found posts from users executing the Java code on Windows platform and getting the mentioned exception because they were hitting the max # of TCP ports allowed to be active/open on Windows (5000). Check this link[1] for more information.

IMO does not matter what OS you are using because "SocketException: No buffer space available" means that you are probably doing something wrong.

If you are still facing this issue, please reopen it with more details about your code. Would be great if you could provide a test case that I can reproduce.

[1] http://mashijie.blogspot.de/2009/05/change-default-setting-of-tcp-ports.html