GoogleCloudPlatform / DataflowJavaSDK

Google Cloud Dataflow provides a simple, powerful model for building both batch and streaming parallel data processing pipelines.
http://cloud.google.com/dataflow
855 stars 325 forks source link

Caused by: java.io.NotSerializableException: com.beam.example.config.ElasticsearchRestClientConfig #651

Closed nithinsridhar closed 4 years ago

nithinsridhar commented 4 years ago

I'm reading data from elasticsearch using rest high-level client using apache beam and I'm getting below error: Caused by: java.io.NotSerializableException: com.beam.example.config.ElasticsearchRestClientConfig

====================Elastic Search Config===================== @Configuration public class ElasticsearchRestClientConfig {

private String elasticsearchHost;

@Bean(destroyMethod = "close")
public RestHighLevelClient client() {

    RestHighLevelClient client = new RestHighLevelClient(
            RestClient.builder(new HttpHost("http://127.0.0.1:9200")));
    System.out.println("SSS");
    System.out.println(client);
    return client;

}

} =====================Transformer============================== @Component public class ElasticsearchReadTransform extends DoFn<String,String> implements Serializable {

private ElasticsearchRestClientConfig elasticsearchRestClientConfig = new ElasticsearchRestClientConfig();

@ProcessElement
public void processElement(ProcessContext pc) {
    String msg =  pc.element();
    String response = searchByProductId("ItemA26","INV360_Org_A27");
    pc.output(response);
}

public String searchByProductId(String productId, String orgId) {
    SearchRequest searchRequest = new SearchRequest();
    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

    QueryBuilder queryBuilder = QueryBuilders
            .boolQuery()
            .must(QueryBuilders
                    .matchQuery("inventory_audit.PRODUCT_ID", productId)
            );

    searchSourceBuilder.query(queryBuilder);

    searchRequest.source(searchSourceBuilder);
    SearchResponse response = null;
    try{
        response = elasticsearchRestClientConfig.client().search(searchRequest, RequestOptions.DEFAULT);
    }catch (Exception e){

    }

    return response.toString();
}

}

lukecwik commented 4 years ago

You should make your ElasticsearchRestClientConfig implement Serializable.

Beam has to make your DoFns available when processing on several workers and does this in Beam Java by relying on Java serialization. So everything in your DoFns closure needs to be serializable.

On Wed, Apr 22, 2020 at 10:04 PM nithinsridhar notifications@github.com wrote:

I'm reading data from elasticsearch using rest high-level client using apache beam and I'm getting below error: Caused by: java.io.NotSerializableException: com.beam.example.config.ElasticsearchRestClientConfig

====================Elastic Search Config===================== @configuration https://github.com/configuration public class ElasticsearchRestClientConfig {

private String elasticsearchHost;

@Bean(destroyMethod = "close") public RestHighLevelClient client() {

RestHighLevelClient client = new RestHighLevelClient(
        RestClient.builder(new HttpHost("http://127.0.0.1:9200")));
System.out.println("SSS");
System.out.println(client);
return client;

}

} =====================Transformer============================== @component https://github.com/component public class ElasticsearchReadTransform extends DoFn<String,String> implements Serializable {

private ElasticsearchRestClientConfig elasticsearchRestClientConfig = new ElasticsearchRestClientConfig();

@ProcessElement public void processElement(ProcessContext pc) { String msg = pc.element(); String response = searchByProductId("ItemA26","INV360_Org_A27"); pc.output(response); }

public String searchByProductId(String productId, String orgId) { SearchRequest searchRequest = new SearchRequest(); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

QueryBuilder queryBuilder = QueryBuilders
        .boolQuery()
        .must(QueryBuilders
                .matchQuery("inventory_audit.PRODUCT_ID", productId)
        );

searchSourceBuilder.query(queryBuilder);

searchRequest.source(searchSourceBuilder);
SearchResponse response = null;
try{
    response = elasticsearchRestClientConfig.client().search(searchRequest, RequestOptions.DEFAULT);
}catch (Exception e){

}

return response.toString();

}

}

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/651, or unsubscribe https://github.com/notifications/unsubscribe-auth/ACM4V3A7PIJZ6A4QDZO7UFDRN7D7PANCNFSM4MOXUSZA .

lukecwik commented 4 years ago

Please try to use StackOverflow or user@beam.apache.org in the future for questions related to usage.

nithinsridhar commented 4 years ago

Can you please give us one example project of using Elasticsearch in apache beam with both read, write, and search functionalities... I tried as you but getting below error: Caused by: java.lang.NoClassDefFoundError: org/elasticsearch/action/admin/cluster/repositories/cleanup/CleanupRepositoryRequest

lukecwik commented 4 years ago

Have you looked at: https://github.com/apache/beam/blob/24361d1b5981ef7d18e586a8e5deaf683f4329f1/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java#L1

That sounds like your missing a dependency and/or having a linkage error due to a diamond dependency problem.

nitinsridar commented 4 years ago

@nithinsridhar and @nitinsridar are the same users

I'm Using below version which is not working: Apache beam version = 2.19.0 Elasticsearch = 7.5.1

This might be the reason I guess it's not working.

I tried below versions: and its working Apache beam = 2.13.0 Elasticsearch = 6.0.0