confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
80 stars 1.04k forks source link

Test harness test failed: Expected <N> records but it was <1> #5663

Open panasenco opened 4 years ago

panasenco commented 4 years ago

Describe the bug All circular inserts in one stream don't get picked up in a different stream.

To Reproduce ksqlDB version 0.9.0

SQL Statements:

CREATE STREAM countdown (i INT) WITH (kafka_topic='countdown', value_format='json', partitions=1);
INSERT INTO countdown SELECT i-1 AS i FROM countdown WHERE i > 0 EMIT CHANGES;
CREATE STREAM countdown2 (i INT) WITH (kafka_topic='countdown2', value_format='json', partitions=1);
INSERT INTO countdown2 SELECT i FROM countdown EMIT CHANGES;

Input JSON:

{
    "inputs": [
        {"topic": "countdown", "timestamp": 0, "value": {"I": 2} }
    ]
}

Output JSON:

{
    "outputs": [
        {"topic": "countdown", "timestamp": 0, "value": {"I": 2}},
        {"topic": "countdown", "timestamp": 0, "value": {"I": 1}},
        {"topic": "countdown", "timestamp": 0, "value": {"I": 0}},
        {"topic": "countdown2", "timestamp": 0, "value": {"I": 2}},
        {"topic": "countdown2", "timestamp": 0, "value": {"I": 1}},
        {"topic": "countdown2", "timestamp": 0, "value": {"I": 0}}
    ]
}

Expected behavior Test passes.

Actual behaviour

>>>>> Test failed: Topic countdown2. Expected <3> records but it was <1>

Additional context Removing the "countdown2" topic outputs makes the test pass.

Running the statements manually in ksqlDB results in the same rows in both streams.

Full output:

OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
[2020-06-20 04:04:31,954] INFO Adding function AS_VALUE for method public java.lang.Object io.confluent.ksql.function.udf.AsValue.asValue(java.lang.Object) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:31,996] INFO Adding function ARRAY_LENGTH for method public java.lang.Integer io.confluent.ksql.function.udf.array.ArrayLength.calcArrayLength(java.util.List) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,016] INFO Adding function ENTRIES for method public java.util.List io.confluent.ksql.function.udf.array.Entries.entriesString(java.util.Map,boolean) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,057] INFO Adding function ENTRIES for method public java.util.List io.confluent.ksql.function.udf.array.Entries.entriesInt(java.util.Map,boolean) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,066] INFO Adding function ENTRIES for method public java.util.List io.confluent.ksql.function.udf.array.Entries.entriesBigInt(java.util.Map,boolean) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,075] INFO Adding function ENTRIES for method public java.util.List io.confluent.ksql.function.udf.array.Entries.entriesDouble(java.util.Map,boolean) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,080] INFO Adding function ENTRIES for method public java.util.List io.confluent.ksql.function.udf.array.Entries.entriesBoolean(java.util.Map,boolean) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,085] INFO Adding function GENERATE_SERIES for method public java.util.List io.confluent.ksql.function.udf.array.GenerateSeries.generateSeriesInt(int,int,int) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,087] INFO Adding function GENERATE_SERIES for method public java.util.List io.confluent.ksql.function.udf.array.GenerateSeries.generateSeriesInt(int,int) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,088] INFO Adding function GENERATE_SERIES for method public java.util.List io.confluent.ksql.function.udf.array.GenerateSeries.generateSeriesLong(long,long,int) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,090] INFO Adding function GENERATE_SERIES for method public java.util.List io.confluent.ksql.function.udf.array.GenerateSeries.generateSeriesLong(long,long) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,126] INFO Adding function datetostring for method public java.lang.String io.confluent.ksql.function.udf.datetime.DateToString.dateToString(int,java.lang.String) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,136] INFO Adding function stringtodate for method public int io.confluent.ksql.function.udf.datetime.StringToDate.stringToDate(java.lang.String,java.lang.String) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,171] INFO Adding function stringtotimestamp for method public long io.confluent.ksql.function.udf.datetime.StringToTimestamp.stringToTimestamp(java.lang.String,java.lang.String) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,173] INFO Adding function stringtotimestamp for method public long io.confluent.ksql.function.udf.datetime.StringToTimestamp.stringToTimestamp(java.lang.String,java.lang.String,java.lang.String) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,181] INFO Adding function timestamptostring for method public java.lang.String io.confluent.ksql.function.udf.datetime.TimestampToString.timestampToString(long,java.lang.String) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,185] INFO Adding function timestamptostring for method public java.lang.String io.confluent.ksql.function.udf.datetime.TimestampToString.timestampToString(long,java.lang.String,java.lang.String) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,187] INFO Adding function unix_date for method public int io.confluent.ksql.function.udf.datetime.UnixDate.unixDate() (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,188] INFO Adding function unix_timestamp for method public long io.confluent.ksql.function.udf.datetime.UnixTimestamp.unixTimestamp() (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,193] INFO Adding function geo_distance for method public java.lang.Double io.confluent.ksql.function.udf.geo.GeoDistance.geoDistance(double,double,double,double) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,203] INFO Adding function geo_distance for method public java.lang.Double io.confluent.ksql.function.udf.geo.GeoDistance.geoDistance(double,double,double,double,java.lang.String) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,208] INFO Adding function JSON_ARRAY_CONTAINS for method public java.lang.Boolean io.confluent.ksql.function.udf.json.JsonArrayContains.contains(java.lang.String,java.lang.Object) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,210] INFO Adding function ARRAY_CONTAINS for method public boolean io.confluent.ksql.function.udf.list.ArrayContains.contains(java.util.List,java.lang.Object) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,215] INFO Adding function slice for method public java.util.List io.confluent.ksql.function.udf.list.Slice.slice(java.util.List,java.lang.Integer,java.lang.Integer) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,218] INFO Adding function AS_MAP for method public final java.util.Map io.confluent.ksql.function.udf.map.AsMap.asMap(java.util.List,java.util.List) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,223] INFO Adding function Abs for method public java.lang.Double io.confluent.ksql.function.udf.math.Abs.abs(java.lang.Double) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,224] INFO Adding function Abs for method public java.math.BigDecimal io.confluent.ksql.function.udf.math.Abs.abs(java.math.BigDecimal) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,231] INFO Adding function Abs for method public java.lang.Integer io.confluent.ksql.function.udf.math.Abs.abs(java.lang.Integer) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,232] INFO Adding function Abs for method public java.lang.Long io.confluent.ksql.function.udf.math.Abs.abs(java.lang.Long) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,234] INFO Adding function Ceil for method public java.lang.Double io.confluent.ksql.function.udf.math.Ceil.ceil(java.lang.Double) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,235] INFO Adding function Ceil for method public java.math.BigDecimal io.confluent.ksql.function.udf.math.Ceil.ceil(java.math.BigDecimal) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,243] INFO Adding function Ceil for method public java.lang.Integer io.confluent.ksql.function.udf.math.Ceil.ceil(java.lang.Integer) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,245] INFO Adding function Ceil for method public java.lang.Long io.confluent.ksql.function.udf.math.Ceil.ceil(java.lang.Long) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,247] INFO Adding function exp for method public java.lang.Double io.confluent.ksql.function.udf.math.Exp.exp(java.lang.Double) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,247] INFO Adding function exp for method public java.lang.Double io.confluent.ksql.function.udf.math.Exp.exp(java.lang.Long) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,250] INFO Adding function exp for method public java.lang.Double io.confluent.ksql.function.udf.math.Exp.exp(java.lang.Integer) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,257] INFO Adding function Floor for method public java.lang.Double io.confluent.ksql.function.udf.math.Floor.floor(java.lang.Double) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,260] INFO Adding function Floor for method public java.math.BigDecimal io.confluent.ksql.function.udf.math.Floor.floor(java.math.BigDecimal) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,268] INFO Adding function Floor for method public java.lang.Integer io.confluent.ksql.function.udf.math.Floor.floor(java.lang.Integer) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,270] INFO Adding function Floor for method public java.lang.Long io.confluent.ksql.function.udf.math.Floor.floor(java.lang.Long) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,272] INFO Adding function ln for method public java.lang.Double io.confluent.ksql.function.udf.math.Ln.ln(java.lang.Double) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,274] INFO Adding function ln for method public java.lang.Double io.confluent.ksql.function.udf.math.Ln.ln(java.lang.Long) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,276] INFO Adding function ln for method public java.lang.Double io.confluent.ksql.function.udf.math.Ln.ln(java.lang.Integer) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,279] INFO Adding function Round for method public java.math.BigDecimal io.confluent.ksql.function.udf.math.Round.round(java.math.BigDecimal) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,283] INFO Adding function Round for method public java.lang.Long io.confluent.ksql.function.udf.math.Round.round(long) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,283] INFO Adding function Round for method public java.lang.Long io.confluent.ksql.function.udf.math.Round.round(int) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,284] INFO Adding function Round for method public java.math.BigDecimal io.confluent.ksql.function.udf.math.Round.round(java.math.BigDecimal,java.lang.Integer) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,285] INFO Adding function Round for method public java.lang.Long io.confluent.ksql.function.udf.math.Round.round(java.lang.Double) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,289] INFO Adding function Round for method public java.lang.Double io.confluent.ksql.function.udf.math.Round.round(java.lang.Double,java.lang.Integer) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,291] INFO Adding function sign for method public java.lang.Integer io.confluent.ksql.function.udf.math.Sign.sign(java.lang.Double) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,296] INFO Adding function sign for method public java.lang.Integer io.confluent.ksql.function.udf.math.Sign.sign(java.lang.Long) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,300] INFO Adding function sign for method public java.lang.Integer io.confluent.ksql.function.udf.math.Sign.sign(java.lang.Integer) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,302] INFO Adding function sqrt for method public java.lang.Double io.confluent.ksql.function.udf.math.Sqrt.sqrt(java.lang.Double) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,303] INFO Adding function sqrt for method public java.lang.Double io.confluent.ksql.function.udf.math.Sqrt.sqrt(java.lang.Long) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,304] INFO Adding function sqrt for method public java.lang.Double io.confluent.ksql.function.udf.math.Sqrt.sqrt(java.lang.Integer) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,311] INFO Adding function COALESCE for method public final java.lang.Object io.confluent.ksql.function.udf.nulls.Coalesce.coalesce(java.lang.Object,java.lang.Object[]) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,314] INFO Adding function IFNULL for method public final java.lang.Object io.confluent.ksql.function.udf.nulls.IfNull.ifNull(java.lang.Object,java.lang.Object) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,316] INFO Adding function elt for method public java.lang.String io.confluent.ksql.function.udf.string.Elt.elt(int,java.lang.String[]) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,318] INFO Adding function field for method public int io.confluent.ksql.function.udf.string.Field.field(java.lang.String,java.lang.String[]) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,320] INFO Adding function initcap for method public java.lang.String io.confluent.ksql.function.udf.string.InitCap.initcap(java.lang.String) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,323] INFO Adding function mask_keep_left for method public java.lang.String io.confluent.ksql.function.udf.string.MaskKeepLeftKudf.mask(java.lang.String,int) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,324] INFO Adding function mask_keep_left for method public java.lang.String io.confluent.ksql.function.udf.string.MaskKeepLeftKudf.mask(java.lang.String,int,java.lang.String,java.lang.String,java.lang.String,java.lang.String) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,327] INFO Adding function mask_keep_right for method public java.lang.String io.confluent.ksql.function.udf.string.MaskKeepRightKudf.mask(java.lang.String,int) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,329] INFO Adding function mask_keep_right for method public java.lang.String io.confluent.ksql.function.udf.string.MaskKeepRightKudf.mask(java.lang.String,int,java.lang.String,java.lang.String,java.lang.String,java.lang.String) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,334] INFO Adding function mask for method public java.lang.String io.confluent.ksql.function.udf.string.MaskKudf.mask(java.lang.String) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,335] INFO Adding function mask for method public java.lang.String io.confluent.ksql.function.udf.string.MaskKudf.mask(java.lang.String,java.lang.String,java.lang.String,java.lang.String,java.lang.String) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,339] INFO Adding function mask_left for method public java.lang.String io.confluent.ksql.function.udf.string.MaskLeftKudf.mask(java.lang.String,int) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,340] INFO Adding function mask_left for method public java.lang.String io.confluent.ksql.function.udf.string.MaskLeftKudf.mask(java.lang.String,int,java.lang.String,java.lang.String,java.lang.String,java.lang.String) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,345] INFO Adding function mask_right for method public java.lang.String io.confluent.ksql.function.udf.string.MaskRightKudf.mask(java.lang.String,int) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,349] INFO Adding function mask_right for method public java.lang.String io.confluent.ksql.function.udf.string.MaskRightKudf.mask(java.lang.String,int,java.lang.String,java.lang.String,java.lang.String,java.lang.String) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,354] INFO Adding function regexp_extract for method public java.lang.String io.confluent.ksql.function.udf.string.RegexpExtract.regexpExtract(java.lang.String,java.lang.String) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,355] INFO Adding function regexp_extract for method public java.lang.String io.confluent.ksql.function.udf.string.RegexpExtract.regexpExtract(java.lang.String,java.lang.String,java.lang.Integer) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,359] INFO Adding function replace for method public java.lang.String io.confluent.ksql.function.udf.string.Replace.replace(java.lang.String,java.lang.String,java.lang.String) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,362] INFO Adding function split for method public java.util.List io.confluent.ksql.function.udf.string.SplitKudf.split(java.lang.String,java.lang.String) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,366] INFO Adding function substring for method public java.lang.String io.confluent.ksql.function.udf.string.Substring.substring(java.lang.String,java.lang.Integer) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,367] INFO Adding function substring for method public java.lang.String io.confluent.ksql.function.udf.string.Substring.substring(java.lang.String,java.lang.Integer,java.lang.Integer) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,369] INFO Adding function url_decode_param for method public java.lang.String io.confluent.ksql.function.udf.url.UrlDecodeParamKudf.decodeParam(java.lang.String) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,376] INFO Adding function url_encode_param for method public java.lang.String io.confluent.ksql.function.udf.url.UrlEncodeParamKudf.encodeParam(java.lang.String) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,378] INFO Adding function url_extract_fragment for method public java.lang.String io.confluent.ksql.function.udf.url.UrlExtractFragmentKudf.extractFragment(java.lang.String) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,379] INFO Adding function url_extract_host for method public java.lang.String io.confluent.ksql.function.udf.url.UrlExtractHostKudf.extractHost(java.lang.String) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,398] INFO Adding function url_extract_parameter for method public java.lang.String io.confluent.ksql.function.udf.url.UrlExtractParameterKudf.extractParam(java.lang.String,java.lang.String) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,400] INFO Adding function url_extract_path for method public java.lang.String io.confluent.ksql.function.udf.url.UrlExtractPathKudf.extractPath(java.lang.String) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,402] INFO Adding function url_extract_port for method public java.lang.Integer io.confluent.ksql.function.udf.url.UrlExtractPortKudf.extractPort(java.lang.String) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,406] INFO Adding function url_extract_protocol for method public java.lang.String io.confluent.ksql.function.udf.url.UrlExtractProtocolKudf.extractProtocol(java.lang.String) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,410] INFO Adding function url_extract_query for method public java.lang.String io.confluent.ksql.function.udf.url.UrlExtractQueryKudf.extractQuery(java.lang.String) (io.confluent.ksql.function.UdfLoader:142)
[2020-06-20 04:04:32,419] INFO Adding UDAF name=collect_list from path=internal class=class io.confluent.ksql.function.udaf.array.CollectListUdaf (io.confluent.ksql.function.UdafLoader:71)
[2020-06-20 04:04:32,424] INFO Adding UDAF name=collect_list from path=internal class=class io.confluent.ksql.function.udaf.array.CollectListUdaf (io.confluent.ksql.function.UdafLoader:71)
[2020-06-20 04:04:32,428] INFO Adding UDAF name=collect_list from path=internal class=class io.confluent.ksql.function.udaf.array.CollectListUdaf (io.confluent.ksql.function.UdafLoader:71)
[2020-06-20 04:04:32,429] INFO Adding UDAF name=collect_list from path=internal class=class io.confluent.ksql.function.udaf.array.CollectListUdaf (io.confluent.ksql.function.UdafLoader:71)
[2020-06-20 04:04:32,431] INFO Adding UDAF name=collect_list from path=internal class=class io.confluent.ksql.function.udaf.array.CollectListUdaf (io.confluent.ksql.function.UdafLoader:71)
[2020-06-20 04:04:32,434] INFO Adding UDAF name=collect_set from path=internal class=class io.confluent.ksql.function.udaf.array.CollectSetUdaf (io.confluent.ksql.function.UdafLoader:71)
[2020-06-20 04:04:32,434] INFO Adding UDAF name=collect_set from path=internal class=class io.confluent.ksql.function.udaf.array.CollectSetUdaf (io.confluent.ksql.function.UdafLoader:71)
[2020-06-20 04:04:32,435] INFO Adding UDAF name=collect_set from path=internal class=class io.confluent.ksql.function.udaf.array.CollectSetUdaf (io.confluent.ksql.function.UdafLoader:71)
[2020-06-20 04:04:32,436] INFO Adding UDAF name=collect_set from path=internal class=class io.confluent.ksql.function.udaf.array.CollectSetUdaf (io.confluent.ksql.function.UdafLoader:71)
[2020-06-20 04:04:32,437] INFO Adding UDAF name=collect_set from path=internal class=class io.confluent.ksql.function.udaf.array.CollectSetUdaf (io.confluent.ksql.function.UdafLoader:71)
[2020-06-20 04:04:32,439] INFO Adding UDAF name=avg from path=internal class=class io.confluent.ksql.function.udaf.average.AverageUdaf (io.confluent.ksql.function.UdafLoader:71)
[2020-06-20 04:04:32,446] INFO Adding UDAF name=avg from path=internal class=class io.confluent.ksql.function.udaf.average.AverageUdaf (io.confluent.ksql.function.UdafLoader:71)
[2020-06-20 04:04:32,448] INFO Adding UDAF name=avg from path=internal class=class io.confluent.ksql.function.udaf.average.AverageUdaf (io.confluent.ksql.function.UdafLoader:71)
[2020-06-20 04:04:32,452] INFO Adding UDAF name=COUNT_DISTINCT from path=internal class=class io.confluent.ksql.function.udaf.count.CountDistinct (io.confluent.ksql.function.UdafLoader:71)
[2020-06-20 04:04:32,454] INFO Adding UDAF name=LATEST_BY_OFFSET from path=internal class=class io.confluent.ksql.function.udaf.latest.LatestByOffset (io.confluent.ksql.function.UdafLoader:71)
[2020-06-20 04:04:32,456] INFO Adding UDAF name=LATEST_BY_OFFSET from path=internal class=class io.confluent.ksql.function.udaf.latest.LatestByOffset (io.confluent.ksql.function.UdafLoader:71)
[2020-06-20 04:04:32,458] INFO Adding UDAF name=LATEST_BY_OFFSET from path=internal class=class io.confluent.ksql.function.udaf.latest.LatestByOffset (io.confluent.ksql.function.UdafLoader:71)
[2020-06-20 04:04:32,461] INFO Adding UDAF name=LATEST_BY_OFFSET from path=internal class=class io.confluent.ksql.function.udaf.latest.LatestByOffset (io.confluent.ksql.function.UdafLoader:71)
[2020-06-20 04:04:32,471] INFO Adding UDAF name=LATEST_BY_OFFSET from path=internal class=class io.confluent.ksql.function.udaf.latest.LatestByOffset (io.confluent.ksql.function.UdafLoader:71)
[2020-06-20 04:04:32,473] INFO Adding UDAF name=histogram from path=internal class=class io.confluent.ksql.function.udaf.map.HistogramUdaf (io.confluent.ksql.function.UdafLoader:71)
[2020-06-20 04:04:32,475] INFO Adding UDAF name=sum_list from path=internal class=class io.confluent.ksql.function.udaf.sum.ListSumUdaf (io.confluent.ksql.function.UdafLoader:71)
[2020-06-20 04:04:32,475] INFO Adding UDAF name=sum_list from path=internal class=class io.confluent.ksql.function.udaf.sum.ListSumUdaf (io.confluent.ksql.function.UdafLoader:71)
[2020-06-20 04:04:32,476] INFO Adding UDAF name=sum_list from path=internal class=class io.confluent.ksql.function.udaf.sum.ListSumUdaf (io.confluent.ksql.function.UdafLoader:71)
[2020-06-20 04:04:32,488] INFO UDFs can't be loaded as as dir src/test/resources/udf-example.jar doesn't exist or is not a directory (io.confluent.ksql.function.UserFunctionLoader:88)
[2020-06-20 04:04:32,495] INFO ProcessingLogConfig values: 
    ksql.logging.processing.rows.include = false
    ksql.logging.processing.stream.auto.create = false
    ksql.logging.processing.stream.name = KSQL_PROCESSING_LOG
    ksql.logging.processing.topic.auto.create = false
    ksql.logging.processing.topic.name = 
    ksql.logging.processing.topic.partitions = 1
    ksql.logging.processing.topic.replication.factor = 1
 (io.confluent.ksql.logging.processing.ProcessingLogConfig:347)
[2020-06-20 04:04:32,611] INFO KsqlConfig values: 
    ksql.access.validator.enable = auto
    ksql.any.key.name.enabled = false
    ksql.authorization.cache.expiry.time.secs = 30
    ksql.authorization.cache.max.entries = 10000
    ksql.connect.url = http://localhost:8083
    ksql.connect.worker.config = 
    ksql.extension.dir = ext
    ksql.hidden.topics = [_confluent.*, __confluent.*, _schemas, __consumer_offsets, __transaction_state, connect-configs, connect-offsets, connect-status, connect-statuses]
    ksql.insert.into.values.enabled = true
    ksql.internal.topic.min.insync.replicas = 1
    ksql.internal.topic.replicas = 1
    ksql.metric.reporters = []
    ksql.metrics.extension = null
    ksql.metrics.tags.custom = 
    ksql.output.topic.name.prefix = 
    ksql.persistence.wrap.single.values = true
    ksql.persistent.prefix = query_
    ksql.pull.queries.enable = true
    ksql.query.persistent.active.limit = 2147483647
    ksql.query.pull.enable.standby.reads = false
    ksql.query.pull.max.allowed.offset.lag = 9223372036854775807
    ksql.query.pull.max.qps = 2147483647
    ksql.query.pull.metrics.enabled = false
    ksql.readonly.topics = [_confluent.*, __confluent.*, _schemas, __consumer_offsets, __transaction_state, connect-configs, connect-offsets, connect-status, connect-statuses]
    ksql.schema.registry.url = 
    ksql.security.extension.class = null
    ksql.service.id = some.ksql.service.id
    ksql.sink.window.change.log.additional.retention = 1000000
    ksql.streams.shutdown.timeout.ms = 300000
    ksql.timestamp.throw.on.invalid = false
    ksql.transient.prefix = transient_
    ksql.udf.collect.metrics = false
    ksql.udf.enable.security.manager = true
    ksql.udfs.enabled = true
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
 (io.confluent.ksql.util.KsqlConfig:347)
[2020-06-20 04:04:32,807] INFO AvroDataConfig values: 
    connect.meta.data = true
    enhanced.avro.schema.support = false
    schemas.cache.config = 1000
 (io.confluent.connect.avro.AvroDataConfig:347)
[2020-06-20 04:04:32,835] INFO JsonSchemaDataConfig values: 
    decimal.format = BASE64
    schemas.cache.size = 1000
 (io.confluent.connect.json.JsonSchemaDataConfig:179)
[2020-06-20 04:04:32,836] INFO JsonSchemaDataConfig values: 
    decimal.format = BASE64
    schemas.cache.size = 1000
 (io.confluent.connect.json.JsonSchemaDataConfig:179)
[2020-06-20 04:04:32,845] INFO ProtobufDataConfig values: 
    schemas.cache.config = 1000
 (io.confluent.connect.protobuf.ProtobufDataConfig:179)
[2020-06-20 04:04:32,949] INFO KsqlConfig values: 
    ksql.access.validator.enable = auto
    ksql.any.key.name.enabled = false
    ksql.authorization.cache.expiry.time.secs = 30
    ksql.authorization.cache.max.entries = 10000
    ksql.connect.url = http://localhost:8083
    ksql.connect.worker.config = 
    ksql.extension.dir = ext
    ksql.hidden.topics = [_confluent.*, __confluent.*, _schemas, __consumer_offsets, __transaction_state, connect-configs, connect-offsets, connect-status, connect-statuses]
    ksql.insert.into.values.enabled = true
    ksql.internal.topic.min.insync.replicas = 1
    ksql.internal.topic.replicas = 1
    ksql.metric.reporters = []
    ksql.metrics.extension = null
    ksql.metrics.tags.custom = 
    ksql.output.topic.name.prefix = 
    ksql.persistence.wrap.single.values = true
    ksql.persistent.prefix = query_
    ksql.pull.queries.enable = true
    ksql.query.persistent.active.limit = 2147483647
    ksql.query.pull.enable.standby.reads = false
    ksql.query.pull.max.allowed.offset.lag = 9223372036854775807
    ksql.query.pull.max.qps = 2147483647
    ksql.query.pull.metrics.enabled = false
    ksql.readonly.topics = [_confluent.*, __confluent.*, _schemas, __consumer_offsets, __transaction_state, connect-configs, connect-offsets, connect-status, connect-statuses]
    ksql.schema.registry.url = 
    ksql.security.extension.class = null
    ksql.service.id = some.ksql.service.id
    ksql.sink.window.change.log.additional.retention = 1000000
    ksql.streams.shutdown.timeout.ms = 300000
    ksql.timestamp.throw.on.invalid = false
    ksql.transient.prefix = transient_
    ksql.udf.collect.metrics = false
    ksql.udf.enable.security.manager = true
    ksql.udfs.enabled = true
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
 (io.confluent.ksql.util.KsqlConfig:347)
[2020-06-20 04:04:32,973] INFO ProcessingLogConfig values: 
    ksql.logging.processing.rows.include = false
    ksql.logging.processing.stream.auto.create = false
    ksql.logging.processing.stream.name = KSQL_PROCESSING_LOG
    ksql.logging.processing.topic.auto.create = false
    ksql.logging.processing.topic.name = 
    ksql.logging.processing.topic.partitions = 1
    ksql.logging.processing.topic.replication.factor = 1
 (io.confluent.ksql.logging.processing.ProcessingLogConfig:347)
[2020-06-20 04:04:33,598] INFO KsqlConfig values: 
    ksql.access.validator.enable = auto
    ksql.any.key.name.enabled = false
    ksql.authorization.cache.expiry.time.secs = 30
    ksql.authorization.cache.max.entries = 10000
    ksql.connect.url = http://localhost:8083
    ksql.connect.worker.config = 
    ksql.extension.dir = ext
    ksql.hidden.topics = [_confluent.*, __confluent.*, _schemas, __consumer_offsets, __transaction_state, connect-configs, connect-offsets, connect-status, connect-statuses]
    ksql.insert.into.values.enabled = true
    ksql.internal.topic.min.insync.replicas = 1
    ksql.internal.topic.replicas = 1
    ksql.metric.reporters = []
    ksql.metrics.extension = null
    ksql.metrics.tags.custom = 
    ksql.output.topic.name.prefix = 
    ksql.persistence.wrap.single.values = true
    ksql.persistent.prefix = query_
    ksql.pull.queries.enable = true
    ksql.query.persistent.active.limit = 2147483647
    ksql.query.pull.enable.standby.reads = false
    ksql.query.pull.max.allowed.offset.lag = 9223372036854775807
    ksql.query.pull.max.qps = 2147483647
    ksql.query.pull.metrics.enabled = false
    ksql.readonly.topics = [_confluent.*, __confluent.*, _schemas, __consumer_offsets, __transaction_state, connect-configs, connect-offsets, connect-status, connect-statuses]
    ksql.schema.registry.url = 
    ksql.security.extension.class = null
    ksql.service.id = some.ksql.service.id
    ksql.sink.window.change.log.additional.retention = 1000000
    ksql.streams.shutdown.timeout.ms = 300000
    ksql.timestamp.throw.on.invalid = false
    ksql.transient.prefix = transient_
    ksql.udf.collect.metrics = false
    ksql.udf.enable.security.manager = true
    ksql.udfs.enabled = true
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
 (io.confluent.ksql.util.KsqlConfig:347)
[2020-06-20 04:04:33,746] INFO KsqlConfig values: 
    ksql.access.validator.enable = auto
    ksql.any.key.name.enabled = false
    ksql.authorization.cache.expiry.time.secs = 30
    ksql.authorization.cache.max.entries = 10000
    ksql.connect.url = http://localhost:8083
    ksql.connect.worker.config = 
    ksql.extension.dir = ext
    ksql.hidden.topics = [_confluent.*, __confluent.*, _schemas, __consumer_offsets, __transaction_state, connect-configs, connect-offsets, connect-status, connect-statuses]
    ksql.insert.into.values.enabled = true
    ksql.internal.topic.min.insync.replicas = 1
    ksql.internal.topic.replicas = 1
    ksql.metric.reporters = []
    ksql.metrics.extension = null
    ksql.metrics.tags.custom = 
    ksql.output.topic.name.prefix = 
    ksql.persistence.wrap.single.values = true
    ksql.persistent.prefix = query_
    ksql.pull.queries.enable = true
    ksql.query.persistent.active.limit = 2147483647
    ksql.query.pull.enable.standby.reads = false
    ksql.query.pull.max.allowed.offset.lag = 9223372036854775807
    ksql.query.pull.max.qps = 2147483647
    ksql.query.pull.metrics.enabled = false
    ksql.readonly.topics = [_confluent.*, __confluent.*, _schemas, __consumer_offsets, __transaction_state, connect-configs, connect-offsets, connect-status, connect-statuses]
    ksql.schema.registry.url = 
    ksql.security.extension.class = null
    ksql.service.id = some.ksql.service.id
    ksql.sink.window.change.log.additional.retention = 1000000
    ksql.streams.shutdown.timeout.ms = 300000
    ksql.timestamp.throw.on.invalid = false
    ksql.transient.prefix = transient_
    ksql.udf.collect.metrics = false
    ksql.udf.enable.security.manager = true
    ksql.udfs.enabled = true
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
 (io.confluent.ksql.util.KsqlConfig:347)
[2020-06-20 04:04:34,533] INFO KsqlConfig values: 
    ksql.access.validator.enable = auto
    ksql.any.key.name.enabled = false
    ksql.authorization.cache.expiry.time.secs = 30
    ksql.authorization.cache.max.entries = 10000
    ksql.connect.url = http://localhost:8083
    ksql.connect.worker.config = 
    ksql.extension.dir = ext
    ksql.hidden.topics = [_confluent.*, __confluent.*, _schemas, __consumer_offsets, __transaction_state, connect-configs, connect-offsets, connect-status, connect-statuses]
    ksql.insert.into.values.enabled = true
    ksql.internal.topic.min.insync.replicas = 1
    ksql.internal.topic.replicas = 1
    ksql.metric.reporters = []
    ksql.metrics.extension = null
    ksql.metrics.tags.custom = 
    ksql.output.topic.name.prefix = 
    ksql.persistence.wrap.single.values = true
    ksql.persistent.prefix = query_
    ksql.pull.queries.enable = true
    ksql.query.persistent.active.limit = 2147483647
    ksql.query.pull.enable.standby.reads = false
    ksql.query.pull.max.allowed.offset.lag = 9223372036854775807
    ksql.query.pull.max.qps = 2147483647
    ksql.query.pull.metrics.enabled = false
    ksql.readonly.topics = [_confluent.*, __confluent.*, _schemas, __consumer_offsets, __transaction_state, connect-configs, connect-offsets, connect-status, connect-statuses]
    ksql.schema.registry.url = 
    ksql.security.extension.class = null
    ksql.service.id = some.ksql.service.id
    ksql.sink.window.change.log.additional.retention = 1000000
    ksql.streams.shutdown.timeout.ms = 300000
    ksql.timestamp.throw.on.invalid = false
    ksql.transient.prefix = transient_
    ksql.udf.collect.metrics = false
    ksql.udf.enable.security.manager = true
    ksql.udfs.enabled = true
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
 (io.confluent.ksql.util.KsqlConfig:347)
[2020-06-20 04:04:34,732] INFO StreamsConfig values: 
    application.id = _confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0
    application.server = 
    bootstrap.servers = [localhost:0]
    buffered.records.per.partition = 1000
    built.in.metrics.version = latest
    cache.max.bytes.buffering = 0
    client.id = 
    commit.interval.ms = 2000
    connections.max.idle.ms = 540000
    default.deserialization.exception.handler = class io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler
    default.key.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
    default.production.exception.handler = class io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler
    default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
    default.value.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
    max.task.idle.ms = 0
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    num.standby.replicas = 0
    num.stream.threads = 4
    partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
    poll.ms = 100
    processing.guarantee = at_least_once
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    replication.factor = 1
    request.timeout.ms = 40000
    retries = 0
    retry.backoff.ms = 100
    rocksdb.config.setter = null
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    state.cleanup.delay.ms = 600000
    state.dir = /tmp/confluent2493202554803238540
    topology.optimization = all
    upgrade.from = null
    windowstore.changelog.additional.retention.ms = 86400000
 (org.apache.kafka.streams.StreamsConfig:347)
[2020-06-20 04:04:34,764] INFO stream-client [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33] Kafka Streams version: 5.5.0-ccs (org.apache.kafka.streams.KafkaStreams:696)
[2020-06-20 04:04:34,764] INFO stream-client [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33] Kafka Streams commit ID: 785a156634af5f7e (org.apache.kafka.streams.KafkaStreams:697)
[2020-06-20 04:04:34,804] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-1] Creating restore consumer client (org.apache.kafka.streams.processor.internals.StreamThread:492)
[2020-06-20 04:04:34,825] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-1] Creating shared producer client (org.apache.kafka.streams.processor.internals.StreamThread:502)
[2020-06-20 04:04:34,846] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-1] Creating consumer client (org.apache.kafka.streams.processor.internals.StreamThread:541)
[2020-06-20 04:04:34,867] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-2] Creating restore consumer client (org.apache.kafka.streams.processor.internals.StreamThread:492)
[2020-06-20 04:04:34,868] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-2] Creating shared producer client (org.apache.kafka.streams.processor.internals.StreamThread:502)
[2020-06-20 04:04:34,873] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-2] Creating consumer client (org.apache.kafka.streams.processor.internals.StreamThread:541)
[2020-06-20 04:04:34,883] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-3] Creating restore consumer client (org.apache.kafka.streams.processor.internals.StreamThread:492)
[2020-06-20 04:04:34,884] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-3] Creating shared producer client (org.apache.kafka.streams.processor.internals.StreamThread:502)
[2020-06-20 04:04:34,886] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-3] Creating consumer client (org.apache.kafka.streams.processor.internals.StreamThread:541)
[2020-06-20 04:04:34,891] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-4] Creating restore consumer client (org.apache.kafka.streams.processor.internals.StreamThread:492)
[2020-06-20 04:04:34,893] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-4] Creating shared producer client (org.apache.kafka.streams.processor.internals.StreamThread:502)
[2020-06-20 04:04:34,894] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-4] Creating consumer client (org.apache.kafka.streams.processor.internals.StreamThread:541)
[2020-06-20 04:04:34,931] INFO KsqlConfig values: 
    ksql.access.validator.enable = auto
    ksql.any.key.name.enabled = false
    ksql.authorization.cache.expiry.time.secs = 30
    ksql.authorization.cache.max.entries = 10000
    ksql.connect.url = http://localhost:8083
    ksql.connect.worker.config = 
    ksql.extension.dir = ext
    ksql.hidden.topics = [_confluent.*, __confluent.*, _schemas, __consumer_offsets, __transaction_state, connect-configs, connect-offsets, connect-status, connect-statuses]
    ksql.insert.into.values.enabled = true
    ksql.internal.topic.min.insync.replicas = 1
    ksql.internal.topic.replicas = 1
    ksql.metric.reporters = []
    ksql.metrics.extension = null
    ksql.metrics.tags.custom = 
    ksql.output.topic.name.prefix = 
    ksql.persistence.wrap.single.values = true
    ksql.persistent.prefix = query_
    ksql.pull.queries.enable = true
    ksql.query.persistent.active.limit = 2147483647
    ksql.query.pull.enable.standby.reads = false
    ksql.query.pull.max.allowed.offset.lag = 9223372036854775807
    ksql.query.pull.max.qps = 2147483647
    ksql.query.pull.metrics.enabled = false
    ksql.readonly.topics = [_confluent.*, __confluent.*, _schemas, __consumer_offsets, __transaction_state, connect-configs, connect-offsets, connect-status, connect-statuses]
    ksql.schema.registry.url = 
    ksql.security.extension.class = null
    ksql.service.id = some.ksql.service.id
    ksql.sink.window.change.log.additional.retention = 1000000
    ksql.streams.shutdown.timeout.ms = 300000
    ksql.timestamp.throw.on.invalid = false
    ksql.transient.prefix = transient_
    ksql.udf.collect.metrics = false
    ksql.udf.enable.security.manager = true
    ksql.udfs.enabled = true
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
 (io.confluent.ksql.util.KsqlConfig:347)
[2020-06-20 04:04:34,964] INFO KsqlConfig values: 
    ksql.access.validator.enable = auto
    ksql.any.key.name.enabled = false
    ksql.authorization.cache.expiry.time.secs = 30
    ksql.authorization.cache.max.entries = 10000
    ksql.connect.url = http://localhost:8083
    ksql.connect.worker.config = 
    ksql.extension.dir = ext
    ksql.hidden.topics = [_confluent.*, __confluent.*, _schemas, __consumer_offsets, __transaction_state, connect-configs, connect-offsets, connect-status, connect-statuses]
    ksql.insert.into.values.enabled = true
    ksql.internal.topic.min.insync.replicas = 1
    ksql.internal.topic.replicas = 1
    ksql.metric.reporters = []
    ksql.metrics.extension = null
    ksql.metrics.tags.custom = 
    ksql.output.topic.name.prefix = 
    ksql.persistence.wrap.single.values = true
    ksql.persistent.prefix = query_
    ksql.pull.queries.enable = true
    ksql.query.persistent.active.limit = 2147483647
    ksql.query.pull.enable.standby.reads = false
    ksql.query.pull.max.allowed.offset.lag = 9223372036854775807
    ksql.query.pull.max.qps = 2147483647
    ksql.query.pull.metrics.enabled = false
    ksql.readonly.topics = [_confluent.*, __confluent.*, _schemas, __consumer_offsets, __transaction_state, connect-configs, connect-offsets, connect-status, connect-statuses]
    ksql.schema.registry.url = 
    ksql.security.extension.class = null
    ksql.service.id = some.ksql.service.id
    ksql.sink.window.change.log.additional.retention = 1000000
    ksql.streams.shutdown.timeout.ms = 300000
    ksql.timestamp.throw.on.invalid = false
    ksql.transient.prefix = transient_
    ksql.udf.collect.metrics = false
    ksql.udf.enable.security.manager = true
    ksql.udfs.enabled = true
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
 (io.confluent.ksql.util.KsqlConfig:347)
[2020-06-20 04:04:34,973] INFO KsqlConfig values: 
    ksql.access.validator.enable = auto
    ksql.any.key.name.enabled = false
    ksql.authorization.cache.expiry.time.secs = 30
    ksql.authorization.cache.max.entries = 10000
    ksql.connect.url = http://localhost:8083
    ksql.connect.worker.config = 
    ksql.extension.dir = ext
    ksql.hidden.topics = [_confluent.*, __confluent.*, _schemas, __consumer_offsets, __transaction_state, connect-configs, connect-offsets, connect-status, connect-statuses]
    ksql.insert.into.values.enabled = true
    ksql.internal.topic.min.insync.replicas = 1
    ksql.internal.topic.replicas = 1
    ksql.metric.reporters = []
    ksql.metrics.extension = null
    ksql.metrics.tags.custom = 
    ksql.output.topic.name.prefix = 
    ksql.persistence.wrap.single.values = true
    ksql.persistent.prefix = query_
    ksql.pull.queries.enable = true
    ksql.query.persistent.active.limit = 2147483647
    ksql.query.pull.enable.standby.reads = false
    ksql.query.pull.max.allowed.offset.lag = 9223372036854775807
    ksql.query.pull.max.qps = 2147483647
    ksql.query.pull.metrics.enabled = false
    ksql.readonly.topics = [_confluent.*, __confluent.*, _schemas, __consumer_offsets, __transaction_state, connect-configs, connect-offsets, connect-status, connect-statuses]
    ksql.schema.registry.url = 
    ksql.security.extension.class = null
    ksql.service.id = some.ksql.service.id
    ksql.sink.window.change.log.additional.retention = 1000000
    ksql.streams.shutdown.timeout.ms = 300000
    ksql.timestamp.throw.on.invalid = false
    ksql.transient.prefix = transient_
    ksql.udf.collect.metrics = false
    ksql.udf.enable.security.manager = true
    ksql.udfs.enabled = true
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
 (io.confluent.ksql.util.KsqlConfig:347)
[2020-06-20 04:04:34,997] INFO KsqlConfig values: 
    ksql.access.validator.enable = auto
    ksql.any.key.name.enabled = false
    ksql.authorization.cache.expiry.time.secs = 30
    ksql.authorization.cache.max.entries = 10000
    ksql.connect.url = http://localhost:8083
    ksql.connect.worker.config = 
    ksql.extension.dir = ext
    ksql.hidden.topics = [_confluent.*, __confluent.*, _schemas, __consumer_offsets, __transaction_state, connect-configs, connect-offsets, connect-status, connect-statuses]
    ksql.insert.into.values.enabled = true
    ksql.internal.topic.min.insync.replicas = 1
    ksql.internal.topic.replicas = 1
    ksql.metric.reporters = []
    ksql.metrics.extension = null
    ksql.metrics.tags.custom = 
    ksql.output.topic.name.prefix = 
    ksql.persistence.wrap.single.values = true
    ksql.persistent.prefix = query_
    ksql.pull.queries.enable = true
    ksql.query.persistent.active.limit = 2147483647
    ksql.query.pull.enable.standby.reads = false
    ksql.query.pull.max.allowed.offset.lag = 9223372036854775807
    ksql.query.pull.max.qps = 2147483647
    ksql.query.pull.metrics.enabled = false
    ksql.readonly.topics = [_confluent.*, __confluent.*, _schemas, __consumer_offsets, __transaction_state, connect-configs, connect-offsets, connect-status, connect-statuses]
    ksql.schema.registry.url = 
    ksql.security.extension.class = null
    ksql.service.id = some.ksql.service.id
    ksql.sink.window.change.log.additional.retention = 1000000
    ksql.streams.shutdown.timeout.ms = 300000
    ksql.timestamp.throw.on.invalid = false
    ksql.transient.prefix = transient_
    ksql.udf.collect.metrics = false
    ksql.udf.enable.security.manager = true
    ksql.udfs.enabled = true
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
 (io.confluent.ksql.util.KsqlConfig:347)
[2020-06-20 04:04:35,019] INFO StreamsConfig values: 
    application.id = _confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1
    application.server = 
    bootstrap.servers = [localhost:0]
    buffered.records.per.partition = 1000
    built.in.metrics.version = latest
    cache.max.bytes.buffering = 0
    client.id = 
    commit.interval.ms = 2000
    connections.max.idle.ms = 540000
    default.deserialization.exception.handler = class io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler
    default.key.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
    default.production.exception.handler = class io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler
    default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
    default.value.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
    max.task.idle.ms = 0
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    num.standby.replicas = 0
    num.stream.threads = 4
    partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
    poll.ms = 100
    processing.guarantee = at_least_once
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    replication.factor = 1
    request.timeout.ms = 40000
    retries = 0
    retry.backoff.ms = 100
    rocksdb.config.setter = null
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    state.cleanup.delay.ms = 600000
    state.dir = /tmp/confluent2493202554803238540
    topology.optimization = all
    upgrade.from = null
    windowstore.changelog.additional.retention.ms = 86400000
 (org.apache.kafka.streams.StreamsConfig:347)
[2020-06-20 04:04:35,032] INFO stream-client [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4] Kafka Streams version: 5.5.0-ccs (org.apache.kafka.streams.KafkaStreams:696)
[2020-06-20 04:04:35,032] INFO stream-client [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4] Kafka Streams commit ID: 785a156634af5f7e (org.apache.kafka.streams.KafkaStreams:697)
[2020-06-20 04:04:35,033] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-1] Creating restore consumer client (org.apache.kafka.streams.processor.internals.StreamThread:492)
[2020-06-20 04:04:35,034] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-1] Creating shared producer client (org.apache.kafka.streams.processor.internals.StreamThread:502)
[2020-06-20 04:04:35,038] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-1] Creating consumer client (org.apache.kafka.streams.processor.internals.StreamThread:541)
[2020-06-20 04:04:35,040] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-2] Creating restore consumer client (org.apache.kafka.streams.processor.internals.StreamThread:492)
[2020-06-20 04:04:35,042] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-2] Creating shared producer client (org.apache.kafka.streams.processor.internals.StreamThread:502)
[2020-06-20 04:04:35,044] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-2] Creating consumer client (org.apache.kafka.streams.processor.internals.StreamThread:541)
[2020-06-20 04:04:35,052] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-3] Creating restore consumer client (org.apache.kafka.streams.processor.internals.StreamThread:492)
[2020-06-20 04:04:35,053] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-3] Creating shared producer client (org.apache.kafka.streams.processor.internals.StreamThread:502)
[2020-06-20 04:04:35,055] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-3] Creating consumer client (org.apache.kafka.streams.processor.internals.StreamThread:541)
[2020-06-20 04:04:35,063] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-4] Creating restore consumer client (org.apache.kafka.streams.processor.internals.StreamThread:492)
[2020-06-20 04:04:35,063] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-4] Creating shared producer client (org.apache.kafka.streams.processor.internals.StreamThread:502)
[2020-06-20 04:04:35,065] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-4] Creating consumer client (org.apache.kafka.streams.processor.internals.StreamThread:541)
    >>>>> Test failed: Topic countdown2. Expected <3> records but it was <1>
Actual records: 
<, {I=2}> with timestamp=0
[2020-06-20 04:04:35,262] INFO stream-client [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33] State transition from CREATED to PENDING_SHUTDOWN (org.apache.kafka.streams.KafkaStreams:285)
[2020-06-20 04:04:35,265] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-1] Informed to shut down (org.apache.kafka.streams.processor.internals.StreamThread:1116)
[2020-06-20 04:04:35,265] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-1] State transition from CREATED to PENDING_SHUTDOWN (org.apache.kafka.streams.processor.internals.StreamThread:221)
[2020-06-20 04:04:35,266] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-1] Shutting down (org.apache.kafka.streams.processor.internals.StreamThread:1130)
[2020-06-20 04:04:35,272] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD (org.apache.kafka.streams.processor.internals.StreamThread:221)
[2020-06-20 04:04:35,273] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-1] Shutdown complete (org.apache.kafka.streams.processor.internals.StreamThread:1150)
[2020-06-20 04:04:35,273] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-2] Informed to shut down (org.apache.kafka.streams.processor.internals.StreamThread:1116)
[2020-06-20 04:04:35,277] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-2] State transition from CREATED to PENDING_SHUTDOWN (org.apache.kafka.streams.processor.internals.StreamThread:221)
[2020-06-20 04:04:35,278] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-2] Shutting down (org.apache.kafka.streams.processor.internals.StreamThread:1130)
[2020-06-20 04:04:35,284] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-2] State transition from PENDING_SHUTDOWN to DEAD (org.apache.kafka.streams.processor.internals.StreamThread:221)
[2020-06-20 04:04:35,287] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-2] Shutdown complete (org.apache.kafka.streams.processor.internals.StreamThread:1150)
[2020-06-20 04:04:35,288] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-3] Informed to shut down (org.apache.kafka.streams.processor.internals.StreamThread:1116)
[2020-06-20 04:04:35,289] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-3] State transition from CREATED to PENDING_SHUTDOWN (org.apache.kafka.streams.processor.internals.StreamThread:221)
[2020-06-20 04:04:35,289] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-3] Shutting down (org.apache.kafka.streams.processor.internals.StreamThread:1130)
[2020-06-20 04:04:35,292] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-3] State transition from PENDING_SHUTDOWN to DEAD (org.apache.kafka.streams.processor.internals.StreamThread:221)
[2020-06-20 04:04:35,293] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-3] Shutdown complete (org.apache.kafka.streams.processor.internals.StreamThread:1150)
[2020-06-20 04:04:35,293] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-4] Informed to shut down (org.apache.kafka.streams.processor.internals.StreamThread:1116)
[2020-06-20 04:04:35,295] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-4] State transition from CREATED to PENDING_SHUTDOWN (org.apache.kafka.streams.processor.internals.StreamThread:221)
[2020-06-20 04:04:35,297] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-4] Shutting down (org.apache.kafka.streams.processor.internals.StreamThread:1130)
[2020-06-20 04:04:35,299] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-4] State transition from PENDING_SHUTDOWN to DEAD (org.apache.kafka.streams.processor.internals.StreamThread:221)
[2020-06-20 04:04:35,304] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33-StreamThread-4] Shutdown complete (org.apache.kafka.streams.processor.internals.StreamThread:1150)
[2020-06-20 04:04:35,305] INFO stream-client [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33] State transition from PENDING_SHUTDOWN to NOT_RUNNING (org.apache.kafka.streams.KafkaStreams:285)
[2020-06-20 04:04:35,305] INFO stream-client [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-560759a9-c8b2-4837-bcfa-274430a0da33] Streams client stopped completely (org.apache.kafka.streams.KafkaStreams:970)
[2020-06-20 04:04:35,307] INFO stream-client [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4] State transition from CREATED to PENDING_SHUTDOWN (org.apache.kafka.streams.KafkaStreams:285)
[2020-06-20 04:04:35,312] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-1] Informed to shut down (org.apache.kafka.streams.processor.internals.StreamThread:1116)
[2020-06-20 04:04:35,312] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-1] State transition from CREATED to PENDING_SHUTDOWN (org.apache.kafka.streams.processor.internals.StreamThread:221)
[2020-06-20 04:04:35,313] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-1] Shutting down (org.apache.kafka.streams.processor.internals.StreamThread:1130)
[2020-06-20 04:04:35,317] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD (org.apache.kafka.streams.processor.internals.StreamThread:221)
[2020-06-20 04:04:35,317] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-1] Shutdown complete (org.apache.kafka.streams.processor.internals.StreamThread:1150)
[2020-06-20 04:04:35,317] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-2] Informed to shut down (org.apache.kafka.streams.processor.internals.StreamThread:1116)
[2020-06-20 04:04:35,318] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-2] State transition from CREATED to PENDING_SHUTDOWN (org.apache.kafka.streams.processor.internals.StreamThread:221)
[2020-06-20 04:04:35,318] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-2] Shutting down (org.apache.kafka.streams.processor.internals.StreamThread:1130)
[2020-06-20 04:04:35,321] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-2] State transition from PENDING_SHUTDOWN to DEAD (org.apache.kafka.streams.processor.internals.StreamThread:221)
[2020-06-20 04:04:35,322] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-2] Shutdown complete (org.apache.kafka.streams.processor.internals.StreamThread:1150)
[2020-06-20 04:04:35,323] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-3] Informed to shut down (org.apache.kafka.streams.processor.internals.StreamThread:1116)
[2020-06-20 04:04:35,324] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-3] State transition from CREATED to PENDING_SHUTDOWN (org.apache.kafka.streams.processor.internals.StreamThread:221)
[2020-06-20 04:04:35,324] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-3] Shutting down (org.apache.kafka.streams.processor.internals.StreamThread:1130)
[2020-06-20 04:04:35,329] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-3] State transition from PENDING_SHUTDOWN to DEAD (org.apache.kafka.streams.processor.internals.StreamThread:221)
[2020-06-20 04:04:35,329] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-3] Shutdown complete (org.apache.kafka.streams.processor.internals.StreamThread:1150)
[2020-06-20 04:04:35,330] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-4] Informed to shut down (org.apache.kafka.streams.processor.internals.StreamThread:1116)
[2020-06-20 04:04:35,331] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-4] State transition from CREATED to PENDING_SHUTDOWN (org.apache.kafka.streams.processor.internals.StreamThread:221)
[2020-06-20 04:04:35,331] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-4] Shutting down (org.apache.kafka.streams.processor.internals.StreamThread:1130)
[2020-06-20 04:04:35,338] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-4] State transition from PENDING_SHUTDOWN to DEAD (org.apache.kafka.streams.processor.internals.StreamThread:221)
[2020-06-20 04:04:35,338] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4-StreamThread-4] Shutdown complete (org.apache.kafka.streams.processor.internals.StreamThread:1150)
[2020-06-20 04:04:35,340] INFO stream-client [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4] State transition from PENDING_SHUTDOWN to NOT_RUNNING (org.apache.kafka.streams.KafkaStreams:285)
[2020-06-20 04:04:35,341] INFO stream-client [_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_1-306e55db-7c1b-4210-8613-1dde0df25af4] Streams client stopped completely (org.apache.kafka.streams.KafkaStreams:970)
panasenco commented 4 years ago

As a workaround I could obviously just not do that and either keep the circular insert as the last piece in a topology or create a new intermediate stream:

CREATE STREAM countdown (i INT) WITH (kafka_topic='countdown', value_format='json', partitions=1);
CREATE STREAM countdown2 (i INT) WITH (kafka_topic='countdown2', value_format='json', partitions=1);
INSERT INTO countdown2 SELECT i FROM countdown EMIT CHANGES;
INSERT INTO countdown2 SELECT i-1 AS i FROM countdown2 WHERE i > 0 EMIT CHANGES;
big-andy-coates commented 4 years ago

I'm not sure we even want to be allowing an INSERT INTO itself. This is just going to create a circular workload that would overwhelm ksqlDB. I think the bug here is actually not failing on that statement.