embulk / embulk-base-restclient

Base class library for Embulk plugins to access RESTful services
https://www.embulk.org/
Apache License 2.0
6 stars 7 forks source link

Pass schema along with the plugin task to build ServiceRequestMapper instance #94

Closed instcode closed 7 years ago

instcode commented 7 years ago

There are scenarios when an output plugin wants to know the schema in order to construct a proper ServiceRequestMapper instance. For example, a plugin may want to build a record exporter that is able to read all the columns in the source schema and process the data. With the current implementation, if we build the RecordExporter based of predefined column names, e.g. XXXX, it may throw Column 'XXXX' is not found if the source schema doesn't actually contains the columns.

A real example below:

This is how I construct a ServiceRequestMapper

    @Override
    public JacksonServiceRequestMapper buildServiceRequestMapper(PluginTask task)
    {
        JacksonServiceRequestMapper.Builder builder = JacksonServiceRequestMapper.builder();
        Column[] columns = new Column[] {COOKIE, LIST_NAME, TIMESTAMP, DELETE};
        for (Column column : columns) {
            builder = builder.add(new JacksonDirectStringScope(column.alias),
                    new JacksonTopLevelValueLocator(column.name));
        }
        return builder.build();
    }

This is the source schema (in OutputPlugin#open(TaskSource taskSource, Schema schema, int taskIndex)

schema = Schema.builder()
                .add("cookie", Types.STRING)
                .add("list_name", Types.STRING)
                .build();

(missing "timestamp" & "delete" columns)

And when it loads a Page of data in RestClientPageOutput:

    @Override
    public void add(Page page)
    {
        final PageReader pageReader = new PageReader(this.embulkSchema);
        pageReader.setPage(page);
        while (pageReader.nextRecord()) {
            final SinglePageRecordReader singlePageRecordReader = new SinglePageRecordReader(pageReader);
            ServiceRecord record = recordExporter.exportRecord(singlePageRecordReader);
            this.recordBuffer.bufferRecord(record);
        }
    }

The following exception is thrown:

aused by: org.embulk.spi.SchemaConfigException: Column 'timestamp' is not found
    at org.embulk.spi.Schema.lookupColumn(Schema.java:97)
    at org.embulk.base.restclient.record.EmbulkValueScope.cacheSingleColumn(EmbulkValueScope.java:70)
    at org.embulk.base.restclient.jackson.scope.JacksonDirectStringScope.scopeString(JacksonDirectStringScope.java:20)
    at org.embulk.base.restclient.jackson.scope.JacksonStringScopeBase.scopeEmbulkValues(JacksonStringScopeBase.java:17)
    at org.embulk.base.restclient.jackson.scope.JacksonStringScopeBase.scopeEmbulkValues(JacksonStringScopeBase.java:9)
    at org.embulk.base.restclient.record.ValueExporter.exportValueToBuildRecord(ValueExporter.java:14)
    at org.embulk.base.restclient.record.RecordExporter.exportRecord(RecordExporter.java:18)
    at org.embulk.base.restclient.RestClientPageOutput.add(RestClientPageOutput.java:43)

This PR enables that support. If it knows the source schema when building ServiceRequestMapper, it can check whether a column is available to read or not and won't build a mapper contains missing columns:

    @Override
    public JacksonServiceRequestMapper buildServiceRequestMapper(PluginTask task, Schema schema)
    {
        JacksonServiceRequestMapper.Builder builder = JacksonServiceRequestMapper.builder();
        Column[] columns = new Column[] {COOKIE, LIST_NAME, TIMESTAMP, DELETE};
        for (Column column : columns) {
            // Check whether the schema contains the column above and build the service request mapper
            ....
        }
        return builder.build();
    }

@dmikurube @muga Please have a look. Thank you.

muga commented 7 years ago

@instcode My understanding is that this PR introduces new 2nd parameter in buildServiceRequestMapper method to validate unexpected output schema given from input plugin. Please correct me. If so, it's better to execute such validation before Embulk transaction starts. Do you have any reasons why we cannot do that in validateOutputTask method?

instcode commented 7 years ago

Thanks @muga. Yes, we can use the following code to handle that:

interface YourPluginTask extends Task {
  void setSchema(Schema);
  Schema getSchema();
}

Let me close this PR.