Open muhufuk opened 6 years ago
I am seeing the same behavior...maxRecords being ignored completely. In fact the number of records being returned is often larger than the default of 10,000. Could really use a workaround.
Same here. This is an annoying issue when we want to slow down the processing when our downstream processing slows.
Based on [1] MaxRecords is the maximum number of Kinesis records per request. If KPL is used on the producer side, there will be more user records in each KinesisRecord so the RecordsProcessor will receive more records per batch.
@shenavaa Yes we also suspected from it but in some cases we have got 3k+ record even though max record is 1500. Beside as you mentioned we have used kpl but aggretion is false.
@muhufuk Can you also share KPL configuration properties and number of shards in the stream for reproduction ?
@shenavaa we have just changed region and aggregation flag. rest is set as default. we have 60 shards.
Hello, I am having the same issue! I am running version 1.9.0 and I have a similar setup for my KinesisClientLibConfiguration. However when setting the maxrecord it is not limited. Worth mentioning I have a custom Record Processor class.
I am using KCL 1.9.1 and have the same issue. I set MaxRecords around 300-500, but it keeps getting records around 1000 in one time.
So how do you limit the KPL so we can get a consistent number of records?
My understanding of reading the kcl source code is that maxRecords
is only used by the polling fetcher, not the fanout one. I don't see a way to limit the max records when using fanout... which is unfortunate because it can lead to extremely large per-shard caches, c.f. https://github.com/awslabs/aws-eventstream-java/pull/4
I'm using KVL V2, I used this and it worked
final var polling =
new PollingConfig(getKinesisStreamName(), kinesisClient)
.maxRecords(maximumRecordsPerCall)
.idleTimeBetweenReadsInMillis(getKinesisTimeIntervalBetweenCalls());
var retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig(polling);
this.scheduler =
new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
retrievalConfig);
I'm using KVL V2, I used this and it worked @wellingtonmacena PollingConfig isn't EFO mode though, and as @fommil pointed out maxRecords seems to be polling-only.
While starting worker I have set maxRecords to 10 in KinesisClientLibConfiguration but still I am getting large (~15 ~13 etc..) records in one call of ProcessRecords.
public KinesisClientLibConfiguration kinesisClientLibConfiguration( final ClientConfiguration clientConfiguration) { return new KinesisClientLibConfiguration( applicationName + dataStreamName, dataStreamName, new DefaultAWSCredentialsProviderChain(), "Worker-" + UUID.randomUUID().toString()) .withRegionName(region) .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) .withCommonClientConfig(clientConfiguration) .withInitialLeaseTableReadCapacity(leaseTableReadCapacity) .withInitialLeaseTableWriteCapacity(leaseTableWriteCapacity) .withFailoverTimeMillis(failOverTime) .withMaxRecords(10);