apache / ignite

Apache Ignite
https://ignite.apache.org/
Apache License 2.0
4.76k stars 1.9k forks source link

Class object used by StreamReceiver is not send to remote node #11392

Open iswarezwp opened 2 months ago

iswarezwp commented 2 months ago

issue

When I use IgniteDataStreamer with user defined class, like this:

cfg.setPeerClassLoadingEnabled(true);
Ignite ignite = Ignition.start(cfg);

IgniteDataStreamer<String, MyDouble> mktStmr = ignite.dataStreamer(mktCache.getName());

I got the following error, it seems that the MyDouble class is not send to remote node:

SEVERE: DataStreamer operation failed.
class org.apache.ignite.IgniteCheckedException: Failed to finish operation (too many remaps): 32
        at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$5.apply(DataStreamerImpl.java:977)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$5.apply(DataStreamerImpl.java:942)
        at org.apache.ignite.internal.util.future.GridFutureAdapter.notifyListener(GridFutureAdapter.java:474)
        at org.apache.ignite.internal.util.future.GridFutureAdapter.unblock(GridFutureAdapter.java:350)
        at org.apache.ignite.internal.util.future.GridFutureAdapter.unblockAll(GridFutureAdapter.java:338)
        at org.apache.ignite.internal.util.future.GridFutureAdapter.onDone(GridFutureAdapter.java:586)
        at org.apache.ignite.internal.util.future.GridFutureAdapter.onDone(GridFutureAdapter.java:565)
        at org.apache.ignite.internal.util.future.GridFutureAdapter.onDone(GridFutureAdapter.java:553)
        at org.apache.ignite.internal.util.future.GridCompoundFuture.apply(GridCompoundFuture.java:132)
        at org.apache.ignite.internal.util.future.GridCompoundFuture.apply(GridCompoundFuture.java:46)
        at org.apache.ignite.internal.util.future.GridFutureAdapter.notifyListener(GridFutureAdapter.java:474)
        at org.apache.ignite.internal.util.future.GridFutureAdapter.unblock(GridFutureAdapter.java:350)
        at org.apache.ignite.internal.util.future.GridFutureAdapter.unblockAll(GridFutureAdapter.java:338)
        at org.apache.ignite.internal.util.future.GridFutureAdapter.onDone(GridFutureAdapter.java:586)
        at org.apache.ignite.internal.util.future.GridFutureAdapter.onDone(GridFutureAdapter.java:565)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$Buffer.onResponse(DataStreamerImpl.java:2121)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$3.onMessage(DataStreamerImpl.java:368)
        at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1906)
        at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:1527)
        at org.apache.ignite.internal.managers.communication.GridIoManager.access$5300(GridIoManager.java:242)
        at org.apache.ignite.internal.managers.communication.GridIoManager$9.execute(GridIoManager.java:1420)
        at org.apache.ignite.internal.managers.communication.TraceRunnable.run(TraceRunnable.java:55)
        at org.apache.ignite.internal.util.StripedExecutor$Stripe.body(StripedExecutor.java:637)
        at org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:125)
        at java.lang.Thread.run(Thread.java:750)
Caused by: class org.apache.ignite.IgniteCheckedException: DataStreamer request failed [node=4167fc2a-b0c4-45a7-bf8e-52a97457ad34]
        at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$Buffer.onResponse(DataStreamerImpl.java:2112)
        ... 9 more
Caused by: class org.apache.ignite.binary.BinaryInvalidTypeException: com.nimblex.StreamVisitorExample$MyDouble
        at org.apache.ignite.internal.binary.BinaryContext.descriptorForTypeId(BinaryContext.java:741)
        at org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize0(BinaryReaderExImpl.java:1772)
        at org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize(BinaryReaderExImpl.java:1731)
        at org.apache.ignite.internal.binary.BinaryObjectImpl.deserializeValue(BinaryObjectImpl.java:866)
        at org.apache.ignite.internal.binary.BinaryObjectImpl.value(BinaryObjectImpl.java:198)
        at org.apache.ignite.internal.processors.cache.CacheObjectUtils.unwrapBinary(CacheObjectUtils.java:199)
        at org.apache.ignite.internal.processors.cache.CacheObjectUtils.unwrapBinaryIfNeeded(CacheObjectUtils.java:78)
        at org.apache.ignite.internal.processors.cache.CacheObjectContext.unwrapBinaryIfNeeded(CacheObjectContext.java:138)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry$1.getValue(DataStreamerEntry.java:96)
        at com.nimblex.StreamVisitorExample$DeviceStreamReceiver.receive(StreamVisitorExample.java:143)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamerUpdateJob.call(DataStreamerUpdateJob.java:141)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.localUpdate(DataStreamProcessor.java:394)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.processRequest(DataStreamProcessor.java:299)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.access$000(DataStreamProcessor.java:60)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$1.onMessage(DataStreamProcessor.java:90)
        ... 8 more
Caused by: java.lang.ClassNotFoundException: com.nimblex.StreamVisitorExample$MyDouble
        at java.net.URLClassLoader.findClass(URLClassLoader.java:407)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at org.apache.ignite.internal.util.IgniteUtils.forName(IgniteUtils.java:9373)
        at org.apache.ignite.internal.util.IgniteUtils.forName(IgniteUtils.java:9311)
        at org.apache.ignite.internal.MarshallerContextImpl.getClass(MarshallerContextImpl.java:384)
        at org.apache.ignite.internal.binary.BinaryContext.descriptorForTypeId(BinaryContext.java:717)
        ... 22 more

code

The code is modifed from https://github.com/apache/ignite/blob/master/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.nimblex;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DeploymentMode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
import org.apache.ignite.stream.StreamReceiver;
import org.apache.ignite.stream.StreamVisitor;

public class StreamVisitorExample {
    /** Random number generator. */
    private static final Random RAND = new Random();

    /** The list of instruments. */
    private static final String[] INSTRUMENTS = {"IBM", "GOOG", "MSFT", "GE", "EBAY", "YHOO", "ORCL", "CSCO", "AMZN", "RHT"};

    /** The list of initial instrument prices. */
    private static final double[] INITIAL_PRICES = {194.9, 893.49, 34.21, 23.24, 57.93, 45.03, 44.41, 28.44, 378.49, 69.50};

    public static void main(String[] args) throws Exception {

        TcpDiscoverySpi spi = new TcpDiscoverySpi();
        // create a new instance of tcp discovery multicast ip finder
        TcpDiscoveryMulticastIpFinder tcMp = new TcpDiscoveryMulticastIpFinder();
        tcMp.setAddresses(Arrays.asList("192.168.8.63")); // change your IP address here
        // set the multi cast ip finder for spi
        spi.setIpFinder(tcMp);
        // create new ignite configuration
        IgniteConfiguration cfg = new IgniteConfiguration();
        cfg.setClientMode(true);
        // set the discovery spi to ignite configuration
        cfg.setDiscoverySpi(spi);

        // fixbug: Remote node has deployment mode different from local
        cfg.setPeerClassLoadingEnabled(true);
        // cfg.setDeploymentMode(DeploymentMode.CONTINUOUS);

        try (Ignite ignite = Ignition.start(cfg)) {

            // Market data cache with default configuration.
            CacheConfiguration<String, MyDouble> mktDataCfg = new CacheConfiguration<>("marketTicks");

            // Financial instrument cache configuration.
            CacheConfiguration<String, Instrument> instCfg = new CacheConfiguration<>("instCache");

            // Index key and value for querying financial instruments.
            // Note that Instrument class has @QuerySqlField annotation for secondary field indexing.
            instCfg.setIndexedTypes(String.class, Instrument.class);

            // Auto-close caches at the end of the example.
            try (
                IgniteCache<String, MyDouble> mktCache = ignite.getOrCreateCache(mktDataCfg);
                IgniteCache<String, Instrument> instCache = ignite.getOrCreateCache(instCfg)
            ) {
                try (IgniteDataStreamer<String, MyDouble> mktStmr = ignite.dataStreamer(mktCache.getName())) {
                    // Note that we receive market data, but do not populate 'mktCache' (it remains empty).
                    // Instead we update the instruments in the 'instCache'.
                    // Since both, 'instCache' and 'mktCache' use the same key, updates are collocated.
                    System.out.println("register receiver: ");

                mktStmr.receiver(new DeviceStreamReceiver(instCache));

                    // Stream 10 million market data ticks into the system.
                    System.out.println("begin write data: ");
                    for (int i = 1; i <= 1; i++) {
                        int idx = RAND.nextInt(INSTRUMENTS.length);

                        // Use gaussian distribution to ensure that
                        // numbers closer to 0 have higher probability.
                        double price = round2(INITIAL_PRICES[idx] + RAND.nextGaussian());

                        mktStmr.addData(INSTRUMENTS[idx], new MyDouble(price));

                        if (i % 500_000 == 0)
                            System.out.println("Number of tuples streamed into Ignite: " + i);
                    }
                }

                System.out.println("try to query data: ");

                // Select top 3 best performing instruments.
                SqlFieldsQuery top3qry = new SqlFieldsQuery(
                    "select symbol, (latest - open) from Instrument order by (latest - open) desc limit 3");

                // Execute queries.
                List<List<?>> top3 = instCache.query(top3qry).getAll();

                System.out.println("Top performing financial instruments: " + top3.toString());
            }
            finally {
                // Distributed cache could be removed from cluster only by #destroyCache() call.
                ignite.destroyCache(mktDataCfg.getName());
                ignite.destroyCache(instCfg.getName());
            }
        }
    }

    public static class DeviceStreamReceiver implements StreamReceiver<String, MyDouble> {
        private IgniteCache<String, Instrument> instCache;
        public DeviceStreamReceiver(IgniteCache<String, Instrument> cache) {
            instCache = cache;
        }

        @Override
        public void receive(IgniteCache<String, MyDouble> cache, Collection<Map.Entry<String, MyDouble>> entries) {
            //CommunicationTrackKafkaProducer producer = CommunicationTrackKafkaProducer.getInstance();

            for (Map.Entry<String, MyDouble> entry : entries) {
                String symbol = entry.getKey();
                MyDouble tick = entry.getValue();

                Instrument inst = instCache.get(symbol);

                if (inst == null)
                    inst = new Instrument(symbol);

                // Don't populate market cache, as we don't use it for querying.
                // Update cached instrument based on the latest market tick.
                inst.update(tick.getValue());

                instCache.put(symbol, inst);
            }
        }
    }

    /**
     * Rounds double value to two significant signs.
    *
    * @param val value to be rounded.
    * @return rounded double value.
    */
    private static double round2(double val) {
        return Math.floor(100 * val + 0.5) / 100;
    }

    /**
     * Financial instrument.
    */
    public static class Instrument implements Serializable {
        /** Instrument symbol. */
        @QuerySqlField(index = true)
        private final String symbol;

        /** Open price. */
        @QuerySqlField(index = true)
        private double open;

        /** Close price. */
        @QuerySqlField(index = true)
        private double latest;

        /**
         * @param symbol Symbol.
        */
        public Instrument(String symbol) {
            this.symbol = symbol;
        }

        /**
         * Updates this instrument based on the latest market tick price.
        *
        * @param price Latest price.
        */
        public void update(double price) {
            if (open == 0)
                open = price;

            this.latest = price;
        }
    }

    public static class MyDouble implements Serializable {
        private Double value;

        public MyDouble(Double value) {
            this.value = value;
        }

        public Double getValue() {
            return value;
        }

        public void setValue(Double value) {
            this.value = value;
        }
    }
}
ptupitsyn commented 2 months ago

My guess is that DeviceStreamReceiver is deployed, but Ignite has no way of knowing that it depends on MyDouble (because of generic type erasure).

And peer class loading does not work for cache objects.

Try adding the following method to the receiver to retrieve the value from the entry and see if it helps:

private static MyDouble getValue(Map.Entry entry) {
  return (MyDouble)entry.getValue();
}
iswarezwp commented 2 months ago

@ptupitsyn Thank you very much, I modified the DeviceStreamReceiver:

public static class DeviceStreamReceiver implements StreamReceiver<String, MyDouble> {
        private IgniteCache<String, Instrument> instCache;
        public DeviceStreamReceiver(IgniteCache<String, Instrument> cache) {
            instCache = cache;
        }

        @Override
        public void receive(IgniteCache<String, MyDouble> cache, Collection<Map.Entry<String, MyDouble>> entries) {
            //CommunicationTrackKafkaProducer producer = CommunicationTrackKafkaProducer.getInstance();

            for (Map.Entry<String, MyDouble> entry : entries) {
                String symbol = entry.getKey();
                MyDouble tick = getValue(entry);

                Instrument inst = instCache.get(symbol);

                if (inst == null)
                    inst = new Instrument(symbol);

                // Don't populate market cache, as we don't use it for querying.
                // Update cached instrument based on the latest market tick.
                inst.update(tick.getValue());

                instCache.put(symbol, inst);
            }
        }

        private static MyDouble getValue(Map.Entry<String, MyDouble> entry) {
            return (MyDouble)entry.getValue();
        }
    }

and, still got the same error:

Caused by: class org.apache.ignite.binary.BinaryInvalidTypeException: com.nimblex.StreamVisitorExample$MyDouble
        at org.apache.ignite.internal.binary.BinaryContext.descriptorForTypeId(BinaryContext.java:741)
        at org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize0(BinaryReaderExImpl.java:1772)
        at org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize(BinaryReaderExImpl.java:1731)
        at org.apache.ignite.internal.binary.BinaryObjectImpl.deserializeValue(BinaryObjectImpl.java:866)
        at org.apache.ignite.internal.binary.BinaryObjectImpl.value(BinaryObjectImpl.java:198)
        at org.apache.ignite.internal.processors.cache.CacheObjectUtils.unwrapBinary(CacheObjectUtils.java:199)
        at org.apache.ignite.internal.processors.cache.CacheObjectUtils.unwrapBinaryIfNeeded(CacheObjectUtils.java:78)
        at org.apache.ignite.internal.processors.cache.CacheObjectContext.unwrapBinaryIfNeeded(CacheObjectContext.java:138)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry$1.getValue(DataStreamerEntry.java:96)
        at com.nimblex.StreamVisitorExample$DeviceStreamReceiver.getValue(StreamVisitorExample.java:159)
        at com.nimblex.StreamVisitorExample$DeviceStreamReceiver.receive(StreamVisitorExample.java:143)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamerUpdateJob.call(DataStreamerUpdateJob.java:141)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.localUpdate(DataStreamProcessor.java:394)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.processRequest(DataStreamProcessor.java:299)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.access$000(DataStreamProcessor.java:60)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$1.onMessage(DataStreamProcessor.java:90)
        ... 8 more
Caused by: java.lang.ClassNotFoundException: com.nimblex.StreamVisitorExample$MyDouble
        at java.net.URLClassLoader.findClass(URLClassLoader.java:407)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at org.apache.ignite.internal.util.IgniteUtils.forName(IgniteUtils.java:9373)
        at org.apache.ignite.internal.util.IgniteUtils.forName(IgniteUtils.java:9311)
        at org.apache.ignite.internal.MarshallerContextImpl.getClass(MarshallerContextImpl.java:384)
        at org.apache.ignite.internal.binary.BinaryContext.descriptorForTypeId(BinaryContext.java:717)
iswarezwp commented 2 months ago

I added two lines of code at the beginning of the function, MyDouble can be accessed here. The problem seems to be with the datastreamer.DataStreamerEntry$1.getValue

@Override
        public void receive(IgniteCache<String, MyDouble> cache, Collection<Map.Entry<String, MyDouble>> entries) {
            //CommunicationTrackKafkaProducer producer = CommunicationTrackKafkaProducer.getInstance();
            MyDouble value = new MyDouble(null);
            System.out.println("Just for test: "+value.toString());

            for (Map.Entry<String, MyDouble> entry : entries) {
                String symbol = entry.getKey();
                MyDouble tick = getValue(entry);

                Instrument inst = instCache.get(symbol);

                if (inst == null)
                    inst = new Instrument(symbol);

                // Don't populate market cache, as we don't use it for querying.
                // Update cached instrument based on the latest market tick.
                inst.update(tick.getValue());

                instCache.put(symbol, inst);
            }
        }

error message:

Caused by: class org.apache.ignite.binary.BinaryInvalidTypeException: com.nimblex.StreamVisitorExample$MyDouble
        at org.apache.ignite.internal.binary.BinaryContext.descriptorForTypeId(BinaryContext.java:741)
        at org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize0(BinaryReaderExImpl.java:1772)
        at org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize(BinaryReaderExImpl.java:1731)
        at org.apache.ignite.internal.binary.BinaryObjectImpl.deserializeValue(BinaryObjectImpl.java:866)
        at org.apache.ignite.internal.binary.BinaryObjectImpl.value(BinaryObjectImpl.java:198)
        at org.apache.ignite.internal.processors.cache.CacheObjectUtils.unwrapBinary(CacheObjectUtils.java:199)
        at org.apache.ignite.internal.processors.cache.CacheObjectUtils.unwrapBinaryIfNeeded(CacheObjectUtils.java:78)
        at org.apache.ignite.internal.processors.cache.CacheObjectContext.unwrapBinaryIfNeeded(CacheObjectContext.java:138)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry$1.getValue(DataStreamerEntry.java:96)
        at com.nimblex.StreamVisitorExample$DeviceStreamReceiver.getValue(StreamVisitorExample.java:161)
        at com.nimblex.StreamVisitorExample$DeviceStreamReceiver.receive(StreamVisitorExample.java:145)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamerUpdateJob.call(DataStreamerUpdateJob.java:141)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.localUpdate(DataStreamProcessor.java:394)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.processRequest(DataStreamProcessor.java:299)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.access$000(DataStreamProcessor.java:60)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$1.onMessage(DataStreamProcessor.java:90)
        ... 8 more
Caused by: java.lang.ClassNotFoundException: com.nimblex.StreamVisitorExample$MyDouble
        at java.net.URLClassLoader.findClass(URLClassLoader.java:407)