miguno / kafka-storm-starter

[PROJECT IS NO LONGER MAINTAINED] Code examples that show to integrate Apache Kafka 0.8+ with Apache Storm 0.9+ and Apache Spark Streaming 1.1+, while using Apache Avro as the data serialization format.
http://www.michael-noll.com/blog/2014/05/27/kafka-storm-integration-example-tutorial/
Other
725 stars 330 forks source link

KafkaSparkStreamingSpec.scala: NotSerializableException: org.apache.commons.pool2.impl.GenericObjectPool #6

Closed uladzimir-shelhunou closed 8 years ago

uladzimir-shelhunou commented 9 years ago

Hello

I tried made the same pool of the producers in my spark application, but I got this error when spark trying to broadcast pool.

Exception in thread "Driver" java.io.NotSerializableException: org.apache.commons.pool2.impl.GenericObjectPool
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:202)
    at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101)
    at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:84)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
    at org.apache.spark.DefaultExecutionContext.broadcast(DefaultExecutionContext.scala:80)
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:961)
     ....

Could you please comment this issue?

miguno commented 9 years ago

This is a known issue because Apache Commons' pool implementation is not serializable.

Here's my comment from the link above:

This serialization issue is similar to the issue described at https://github.com/dbpedia/distributed-extraction-framework/issues/9. It looks as if one would need to use a different object pool implementation than the one from Apache Commons Pool, as the latter may be hard to serialize (see sparkContext broadcast JedisPool not work).

Furthermore I noticed that the serialization issue is also triggered locally (e.g. when running ./sbt test in kafka-storm-starter) when using Spark 1.2+.

To fix this issue you need to implement your own, serializable pool.

uladzimir-shelhunou commented 9 years ago

I understand why it happens, but I did't know that is known issue. I want to implement own pool, but I have some concerns about partitions and offsets.

btiernay commented 9 years ago

I did something like the following for Java:

import static lombok.AccessLevel.PRIVATE;

import java.io.Serializable;
import java.util.NoSuchElementException;

import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Accessors;

import org.apache.commons.pool2.ObjectPool;

@RequiredArgsConstructor
public abstract class LazySerializableObjectPool<T> implements ObjectPool<T>, Serializable {

  @NonNull
  @Getter(lazy = true, value = PRIVATE)
  @Accessors(fluent = true)
  private final transient ObjectPool<T> delegate = createDelegate();

  protected abstract ObjectPool<T> createDelegate();

  @Override
  public T borrowObject() throws Exception, NoSuchElementException, IllegalStateException {
    return delegate().borrowObject();
  }

  @Override
  public void returnObject(T obj) throws Exception {
    delegate().returnObject(obj);
  }

  @Override
  public void invalidateObject(T obj) throws Exception {
    delegate().invalidateObject(obj);
  }

  @Override
  public void addObject() throws Exception, IllegalStateException, UnsupportedOperationException {
    delegate().addObject();
  }

  @Override
  public int getNumIdle() {
    return delegate().getNumIdle();
  }

  @Override
  public int getNumActive() {
    return delegate().getNumActive();
  }

  @Override
  public void clear() throws Exception, UnsupportedOperationException {
    delegate().clear();
  }

  @Override
  public void close() {
    delegate().close();
  }

}
@RequiredArgsConstructor
public class KafkaProducerObjectPool extends LazySerializableObjectPool<...> {

  /**
   * Configuration.
   */
  @NonNull
  private final Map<String, String> producerProperties;

  @Override
  protected ObjectPool<KafkaProducer> createDelegate() {
    val pooledObjectFactory =  ...
    val maxNumProducers = 10;
    val poolConfig = new GenericObjectPoolConfig();
    poolConfig.setMaxTotal(maxNumProducers);
    poolConfig.setMaxIdle(maxNumProducers);

    return new GenericObjectPool<KafkaProducer>(pooledObjectFactory, poolConfig);
  }

}

See https://projectlombok.org/features/GetterLazy.html for details.

nickshoe commented 2 years ago

@btiernay thanks for sharing your solution, it really helped me out.