Closed yangb8 closed 5 years ago
I can definitely try first one; but regarding 2 and 3, I am pretty sure InputFormat (new MapRecuce API) doesn't work with Hive which only supports (old MapRed API). I prefer to open a separate issue to track a new feature: implement connector to support old MapRed API if it's necessary, and then use Hive to verify it.
Thanks, I agree with tracking Hive as a separate issue, and I don't know how important it would be. Just exploring how inputformats are typically constructed.
@EronWright , actually, I've documented how to use hadoop-connectors to setup MR jobs in README. it matches the first one (via JobConf). and I believe it's the only way to use this connector so far. btw, issues #29 is opened for implementing connector with the old MapRed API.
Configuration conf = new Configuration();
// optional to set start and end positions
// generally, start positions are set to the end positions in previous job,
// so only new generated events will be processed, otherwise, start from very beginning if not set
conf.setStrings(PravegaInputFormat.START_POSITIONS, startPos);
// fetch end positions
String endPos = PravegaInputFormat.fetchLatestPositionsJson("tcp://127.0.0.1:9090", "myScope", "myStream");
conf.setStrings(PravegaInputFormat.END_POSITIONS, endPos);
conf.setStrings(PravegaInputFormat.SCOPE_NAME, "myScope");
conf.setStrings(PravegaInputFormat.STREAM_NAME, "myStream");
conf.setStrings(PravegaInputFormat.URI_STRING, "tcp://127.0.0.1:9090");
conf.setStrings(PravegaInputFormat.DESERIALIZER, io.pravega.client.stream.impl.JavaSerializer.class.getName());
Job job = new Job(conf);
job.setInputFormatClass(PravegaInputFormat.class);
Like I mentioned in sync up meeting, I think it might not be quite necessary to use builder style now because 'via JobConf' is the only way to use this connector, so end users don't have the opportunities to create a PravegaInputFormat directly.
@yangb8 thanks for the info, it agrees with my understanding that the job's Configuration
is the vehicle for passing runtime parameters from user code to the InputFormat
.
One possibility would be to provide a builder-style API that helped with setting parameters on the Configuration
. For illustration:
Configuration conf = PravegaInputFormat.builder()
.forStream("scope-1/stream-1")
.withDeserializer(...)
.build();
Job job = new Job(conf);
wow, it's cool stuff, let me try it. thanks @EronWright
On one hand, it is nice to use similar terminology across connectors, but on the other hand it is important to integrate in a natural way.
@yangb8 would you mind elaborating on how Hadoop input formats are routinely constructed and parameterized? This will help in understanding whether a builder makes sense. I see a few common approaches:
JobConf
- see TeraValidate::mainStorageHandler
- (example)