Open dragondgold opened 1 year ago
cc: @KKcorps
The error seems to occur at segment commit according to the logs. This makes sense as well cause increasing the flush threshold fixes it. It is actually just delaying the error for you instead of being an actual fix.
Will take a look
The error seems to occur at segment commit according to the logs. This makes sense as well cause increasing the flush threshold fixes it. It is actually just delaying the error for you instead of being an actual fix.
Will take a look
I think it's fixing it because after increasing the flush threshold to 200k I ingested around 100 million rows without any issues, that's the weird thing
Tried with both 1.0.0 and master but unable to repro this issue on my end.
I noticed that the number of values in the MV column are around 3000 per row and so took that into account as well while publishing the data
Here's the code I am using for data gen
public class DeviceEventPublisher {
private static final Random random = new Random();
private static final String KAFKA_TOPIC = "mv_ingestion_repro";
private static final String BOOTSTRAP_SERVERS = "localhost:19092"; // Update with your Kafka server details
public static final int NUM_EVENTS_TO_PUBLISH = 30000;
public static final int MV_ARRAY_LENGTH = 3203;
private KafkaProducer<String, String> producer;
public DeviceEventPublisher() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
this.producer = new KafkaProducer<>(properties);
}
public void publishEvent(DeviceEvent event) {
try {
String jsonData = JsonUtils.objectToString(event);
producer.send(new ProducerRecord<>(KAFKA_TOPIC, jsonData));
} catch (Exception e) {
System.out.println("Exception publishing event: " + e);
}
}
public static void main(String[] args) {
DeviceEventPublisher publisher = new DeviceEventPublisher();
// Generate and publish 10 random events
for (int i = 0; i < NUM_EVENTS_TO_PUBLISH; i++) {
DeviceEvent event = generateRandomEvent();
publisher.publishEvent(event);
if (i % 1000 == 0) {
System.out.println("Published " + i + " events");
}
}
System.out.println("Published " + NUM_EVENTS_TO_PUBLISH + " events to Kafka topic " + KAFKA_TOPIC);
publisher.producer.close();
}
private static DeviceEvent generateRandomEvent() {
DeviceEvent event = new DeviceEvent();
// Random values for each field
event.setCountry("Country" + random.nextInt(100));
event.setDeviceId(UUID.randomUUID().toString());
event.setDeviceType("Type" + random.nextInt(5));
event.setSegments(generateRandomIntList());
event.setOptions(generateRandomIntList());
event.setRelationId(UUID.randomUUID().toString());
event.setClient(random.nextInt(1000));
event.setTimestamp(System.currentTimeMillis() / 1000L);
return event;
}
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
private static class DeviceEvent {
private String country;
private String deviceId;
private String deviceType;
private List<Integer> segments;
private List<Integer> options;
private String relationId;
private int client;
private long timestamp;
public String getCountry() {
return country;
}
public void setCountry(String country) {
this.country = country;
}
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
public String getDeviceType() {
return deviceType;
}
public void setDeviceType(String deviceType) {
this.deviceType = deviceType;
}
public List<Integer> getSegments() {
return segments;
}
public void setSegments(List<Integer> segments) {
this.segments = segments;
}
public List<Integer> getOptions() {
return options;
}
public void setOptions(List<Integer> options) {
this.options = options;
}
public String getRelationId() {
return relationId;
}
public void setRelationId(String relationId) {
this.relationId = relationId;
}
public int getClient() {
return client;
}
public void setClient(int client) {
this.client = client;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
}
private static List<Integer> generateRandomIntList() {
List<Integer> list = new ArrayList<>();
int size = MV_ARRAY_LENGTH;
for (int i = 0; i < size; i++) {
list.add(random.nextInt(10000));
}
return list;
}
}
I feel like it could be some corrupt row that might be causing this although not sure.
I'm using Apache Pinot 1.0.0 and I'm having some trouble when creating an inverted index on a multi-value column. This is my table config:
And this is my schema:
As soon as Pinot starts consuming events from Kafka I get these errors:
The data is ingested into the table anyways, but when running a query like
select count(*) from devices where segments = 560
I get this error:If I change the table flush size from:
"realtime.segment.flush.threshold.size": "20000"
To:
"realtime.segment.flush.threshold.size": "200000"
The errors are gone and everything works as expected. This some sample data in case it's useful: sample_data.csv.zip