mhart / kinesalite

An implementation of Amazon's Kinesis built on LevelDB
MIT License
807 stars 86 forks source link

Reading from kinesalite stream works, but writing to it fails with security token invalid error message #119

Open KissBalazs opened 1 year ago

KissBalazs commented 1 year ago

I have the following Flink application, running in a docker container, connected to a Kinesalite server:

public class DataStreamJob {
    private static final Logger LOG = LoggerFactory.getLogger(org.agco.oeedataprocessor.DataStreamJob.class);
    private static final String inputStreamName = "kinesalite-local-input-stream";
    private static final String outputStreamName = "kinesalite-local-output-stream";
    private static final String region = "eu-west-1";
    private static final String aws_access_key = "fake_access_key";
    private static final String aws_secret_access_key = "fake_secret_access_key";
    private static final String aws_endpoint = "https://kinesis:4567"; 

    private static FlinkKinesisProducer<String> createSinkFromStaticConfigFOR_KINESALITE() {
        Properties outputProperties = new Properties();

        outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
        outputProperties.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, aws_access_key);
        outputProperties.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, aws_secret_access_key);
        outputProperties.setProperty(ConsumerConfigConstants.AWS_ENDPOINT, aws_endpoint);

        outputProperties.setProperty("AggregationEnabled", "false");

        FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties);
        sink.setDefaultStream(outputStreamName);
        sink.setDefaultPartition("0");
        return sink;
    }

    private static FlinkKinesisConsumer<String> createSourceFromStaticConfigFOR_KINESALITE() {
        Properties kinesisProps = new Properties();
        kinesisProps.setProperty(ConsumerConfigConstants.AWS_REGION, region);
        kinesisProps.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, aws_access_key);
        kinesisProps.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, aws_secret_access_key);
        kinesisProps.setProperty(ConsumerConfigConstants.AWS_ENDPOINT, aws_endpoint);

        kinesisProps.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");

        return new FlinkKinesisConsumer<String>(inputStreamName, new SimpleStringSchema(), kinesisProps);
    }

    public static void main(String[] args) throws Exception {

        // REQUIRED SETTINGS FOR LOCAL KINESIS
        System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
        System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true");

        // set up the streaming execution environment
        Configuration flinkConfig = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(flinkConfig);
        env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE );
        env.setParallelism(6);

        DataStreamSource<String> machineDataSource = env.addSource(createSourceFromStaticConfigFOR_KINESALITE());

        /**
         * Test Processor: just printing out the data received 
         */
        machineDataSource.map(x -> {
            LOG.info("------------------------------------------------------");
            LOG.info("Received new data:" +x.toString());
            return x;
        });

        /**
         *         1st. Processor
         *  Parse the JSON, then add the asset processing in a keyed state
         */
        DataStream<MachineCycleData> Stream1_dataPackages = machineDataSource
                .map(MachineData::new)
                .filter(x -> x.getAssetNumber() != null)
                .keyBy(MachineData::getAssetNumber)
                .process(new P1MachineEventAggregator())
                .name("Asset-processing");

        /**
         * Output Sink.
         */
        Stream1_dataPackages
                .map(MachineCycleData::toJson)
                .addSink(createSinkFromStaticConfigFOR_KINESALITE())
                .name("Production-stream-output-sink");

        // execute program
        env.execute("Data Porcessor Application v.1.0");
    }

}

My problem is that this code executes properly, and sets up the connections. I am able to read from the input stream, but it fails to write to the output stream: upon reaching the sink, the application gets stuck with the following error:

08:12:10,405 WARN  org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader [] - [2022-09-20 08:12:10.404771] [0x0000007d][0x00007fdc0c5fe700] [warning] [AWS Log: WARN](AWSErrorMarshaller)Encountered AWSError 'UnrecognizedClientException': The security token included in the request is invalid.
2022-09-20T08:12:10.405582072Z 08:12:10,405 ERROR org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader [] - [2022-09-20 08:12:10.404823] [0x0000007d][0x00007fdc0c5fe700] [error] [AWS Log: ERROR](AWSClient)HTTP response code: 400
2022-09-20T08:12:10.405617491Z Exception name: UnrecognizedClientException
2022-09-20T08:12:10.405622374Z Error message: The security token included in the request is invalid.
2022-09-20T08:12:10.405625426Z 6 response headers:
2022-09-20T08:12:10.405627745Z connection : close
2022-09-20T08:12:10.405630340Z content-length : 107
2022-09-20T08:12:10.405632727Z content-type : application/x-amz-json-1.1
2022-09-20T08:12:10.405635750Z date : Tue, 20 Sep 2022 08:12:10 GMT
2022-09-20T08:12:10.405638119Z x-amz-id-2 : fbHpmqnwFAP/pOGnn9Qq759pd+mnq/aEB+S/Ee/ESPW1ueiCwLkfItn9x7SiWw1u/LCEUAoJoxsJI/CiMpmQc+jZiM2pR7xj
2022-09-20T08:12:10.405640503Z x-amzn-requestid : e4873f08-900e-b9fc-bf24-471285095483
2022-09-20T08:12:10.405645052Z 

What can be the problem here? As you can see the two configuration is nearly identical. :\

KissBalazs commented 1 year ago

I have discovered that the problem is that the sink actually trying to send data to AWS instead of the local kinesalite instance!( I have provided my current credentials, and it sent data to the stream I have created on AWS....) So it seems that the AWS URL configuration is not working for some reason.

Bunky commented 1 year ago

I've just encountered this exact scenario - Did you get anywhere with this?

KissBalazs commented 1 year ago

@Bunky after further looking into the docs, I realized the the current AWS Flink Kinesis connector version is buggy, and won't send the data to the expected outputstreams. If I upgrade the version it worked as expected, but that way I was unable to build and deploy in "per-job" mode in AWS, so I scrapped this approach, and now I write the results to STDOUT instead of sending them back to Flink to an output stream. My whole aim with this was to create a test environment for our AWS Kinesis app, and this approach suffice for me. See also my same question in here: https://stackoverflow.com/questions/73801996/apache-flink-kineisstreamsink-pkix-path-building-failed

Bunky commented 1 year ago

I've sort of found a fix for this, however haven't had the chance to try it specifically for Kinesalite. I ended up changing to use kinesis-mock for use with docker, although I'd imagine similar steps can be made with Kinesalite.

Instead of using

outputProperties.setProperty(ConsumerConfigConstants.AWS_ENDPOINT, "http://localhost:4568");

in the producer, I swapped it out for

outputProperties.put("KinesisEndpoint", "localhost");
outputProperties.put("KinesisPort", "4567");

This seemed to then point correctly to my local Kinesis, only throwing an SSL connect error, which was resolved by pointing the producer to an SSL enabled port, which kinesis-mock provides out of the box (which is why in the above examples the port is different, there may just be a way to disable the SSL requirement, but I didn't look into it).

And for sake of clarity I have this working with Apache Flink 1.15.2