masesgroup / KNet

KNet is a comprehensive .NET suite for Apache Kafka™ providing all features: Producer, Consumer, Admin, Streams, Connect, backends (ZooKeeper and Kafka).
https://knet.masesgroup.com/
Apache License 2.0
36 stars 6 forks source link

Classes implementing Java listeners does not override corresponding Listener class in .NET #329

Closed masesdevelopers closed 8 months ago

masesdevelopers commented 8 months ago

Describe the bug Classes implementing Java listeners, like FailOnInvalidTimestamp, LogAndSkipOnInvalidTimestamp, UsePartitionTimeOnInvalidTimestamp, WallclockTimestampExtractor, does not override corresponding Listener class in .NET

To Reproduce Steps to reproduce the behavior:

  1. Go to https://github.com/masesgroup/KNet/blob/fac406516064a80f72d42fadb19fb9a8d45f5154/src/net/KNet/Generated/Org/Apache/Kafka/Streams/Processor/AllPackageClasses.cs

  2. Scroll down to

  3. See that parent class is MASES.JCOBridge.C2JBridge.JVMBridgeBase<>

Expected behavior All listed classes shall inherit from https://github.com/masesgroup/KNet/blob/fac406516064a80f72d42fadb19fb9a8d45f5154/src/net/KNet/Generated/Org/Apache/Kafka/Streams/Processor/AllPackageClasses.cs#L660 so the following invocation can be done with pre-made Java classes: https://github.com/masesgroup/KNet/blob/fac406516064a80f72d42fadb19fb9a8d45f5154/src/net/KNet/Generated/Org/Apache/Kafka/Streams/Topology.cs#L229

Screenshots If applicable, add screenshots to help explain your problem.

Desktop (please complete the following information):

Additional context Open an issue on JNet because the classes are generated using JNetReflector

masesdevelopers commented 8 months ago

The classes reported in this issue have the following pattern:

public class UsePartitionTimeOnInvalidTimestamp extends ExtractRecordMetadataTimestamp {
    /**
     * Returns the current stream-time as new timestamp for the record.
     *
     * @param record a data record
     * @param recordTimestamp the timestamp extractor from the record
     * @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
     * @return the provided highest extracted valid timestamp as new timestamp for the record
     * @throws StreamsException if highest extracted valid timestamp is unknown
     */
    @Override
    public long onInvalidTimestamp(final ConsumerRecord<Object, Object> record,
                                   final long recordTimestamp,
                                   final long partitionTime)
            throws StreamsException {
        if (partitionTime < 0) {
            throw new StreamsException("Could not infer new timestamp for input record " + record
                    + " because partition time is unknown.");
        }
        return partitionTime;
    }
}

and

abstract class ExtractRecordMetadataTimestamp implements TimestampExtractor {

    /**
     * Extracts the embedded metadata timestamp from the given {@link ConsumerRecord}.
     *
     * @param record a data record
     * @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
     * @return the embedded metadata timestamp of the given {@link ConsumerRecord}
     */
    @Override
    public long extract(final ConsumerRecord<Object, Object> record, final long partitionTime) {
        final long timestamp = record.timestamp();

        if (timestamp < 0) {
            return onInvalidTimestamp(record, timestamp, partitionTime);
        }

        return timestamp;
    }

    /**
     * Called if no valid timestamp is embedded in the record meta data.
     *
     * @param record a data record
     * @param recordTimestamp the timestamp extractor from the record
     * @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
     * @return a new timestamp for the record (if negative, record will not be processed but dropped silently)
     */
    public abstract long onInvalidTimestamp(final ConsumerRecord<Object, Object> record,
                                            final long recordTimestamp,
                                            final long partitionTime);
}

The inheritance chain in the Java code use a private class (ExtractRecordMetadataTimestamp) not managed from JNetReflector and it is not visible int the documentation too; furthermore JNetReflector use a single level of indirection and it is not able to identify TimestampExtractor interface within ExtractRecordMetadataTimestamp.

Here an example where JNetReflector has done a good job: https://github.com/masesgroup/KNet/blob/fac406516064a80f72d42fadb19fb9a8d45f5154/src/net/KNet/Generated/Org/Apache/Kafka/Clients/Producer/RoundRobinPartitioner.cs#L39

In this specific cases JNetReflector shall be helped implementing something.