aws-samples / pyflink-getting-started

MIT No Attribution
51 stars 25 forks source link

Implement 'print' connector with sliding window sink #1

Closed jeff1evesque closed 2 years ago

jeff1evesque commented 2 years ago

I'm trying to combine the SlidingWindows/sliding-windows.py example, with the GettingStarted/getting-started.py. Specifically, I've created my own variant of sliding_window.py, with an associated datagen/stock.py. However, when I run my variation, nothing happens in the Pycharm output:

image

However, if I comment out the first sliding_window_table assignment, and uncomment the second sliding_window_table assignment:

    #sliding_window_table = (
    #    input_table.window(
    #        Slide.over(sliding_window_over)
    #        .every(sliding_window_every)
    #        .on(sliding_window_on)
    #        .alias(sliding_window_alias)
    #    )
    #    .group_by('ticker, {}'.format(sliding_window_alias))
    #    .select('ticker, price.min as p, {}.end as t'.format(
    #        sliding_window_alias,
    #        sliding_window_on
    #    ))
    #)
    sliding_window_table = input_table.select('ticker, price, utc')

Then, the console is able to capture output:

image

However, my desire is to combine both the AWS provided sliding window example with the print connector example.

jeremyber-aws commented 2 years ago

Hi @jeff1evesque I just tested this code sample (attached) and it is working with the stock.py datagen module.

Can you try the same? Note on Line 161 I am creating the print table, and line 169 should be executed as it is a local execution.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
# -*- coding: utf-8 -*-

"""
sliding-windows.py
~~~~~~~~~~~~~~~~~~~
This module:
    1. Creates a table environment
    2. Creates a source table from a Kinesis Data Stream
    3. Creates a sink table writing to a Kinesis Data Stream
    4. Queries from the Source Table and
       creates a sliding window over 10 seconds to calculate the minimum value over the window.
    5. These sliding window results are inserted into the Sink table.
"""

from pyflink.table import EnvironmentSettings, StreamTableEnvironment
from pyflink.table.window import Slide
import os
import json

# 1. Creates a Table Environment
env_settings = (
    EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
)
table_env = StreamTableEnvironment.create(environment_settings=env_settings)

APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json"  # on kda

is_local = (
    True if os.environ.get("IS_LOCAL") else False
)  # set this env var in your local environment

if is_local:
    # only for local, overwrite variable to properties and pass in your jars delimited by a semicolon (;)
    APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json"  # local

    CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
    table_env.get_config().get_configuration().set_string(
        "pipeline.jars",
        "file:///" + CURRENT_DIR + "/lib/flink-sql-connector-kinesis_2.12-1.13.2.jar",
    )

def get_application_properties():
    if os.path.isfile(APPLICATION_PROPERTIES_FILE_PATH):
        with open(APPLICATION_PROPERTIES_FILE_PATH, "r") as file:
            contents = file.read()
            properties = json.loads(contents)
            return properties
    else:
        print('A file at "{}" was not found'.format(APPLICATION_PROPERTIES_FILE_PATH))

def property_map(props, property_group_id):
    for prop in props:
        if prop["PropertyGroupId"] == property_group_id:
            return prop["PropertyMap"]

def create_table(table_name, stream_name, region, stream_initpos):
    return """ CREATE TABLE {0} (
                ticker VARCHAR(6),
                price DOUBLE,
                event_time TIMESTAMP(3),
                WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

              )
              PARTITIONED BY (ticker)
              WITH (
                'connector' = 'kinesis',
                'stream' = '{1}',
                'aws.region' = '{2}',
                'scan.stream.initpos' = '{3}',
                'sink.partitioner-field-delimiter' = ';',
                'sink.producer.collection-max-count' = '100',
                'format' = 'json',
                'json.timestamp-format.standard' = 'ISO-8601'
              ) """.format(
        table_name, stream_name, region, stream_initpos
    )

def perform_sliding_window_aggregation(input_table_name):
    # use SQL Table in the Table API
    input_table = table_env.from_path(input_table_name)

    sliding_window_table = (
        input_table.window(
            Slide.over("10.seconds")
            .every("5.seconds")
            .on("event_time")
            .alias("ten_second_window")
        )
        .group_by("ticker, ten_second_window")
        .select("ticker, price.min as price, ten_second_window.end as event_time")
    )

    return sliding_window_table

def create_print_table(table_name, stream_name, region, stream_initpos):
    return """ CREATE TABLE {0} (
                ticker VARCHAR(6),
                price DOUBLE,
                event_time TIMESTAMP(3),
                WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

              )
              WITH (
                'connector' = 'print'
              ) """.format(
        table_name, stream_name, region, stream_initpos
    )

def main():
    # Application Property Keys
    input_property_group_key = "consumer.config.0"
    producer_property_group_key = "producer.config.0"

    input_stream_key = "input.stream.name"
    input_region_key = "aws.region"
    input_starting_position_key = "flink.stream.initpos"

    output_stream_key = "output.stream.name"
    output_region_key = "aws.region"

    # tables
    input_table_name = "input_table"
    output_table_name = "output_table"

    # get application properties
    props = get_application_properties()

    input_property_map = property_map(props, input_property_group_key)
    output_property_map = property_map(props, producer_property_group_key)

    input_stream = input_property_map[input_stream_key]
    input_region = input_property_map[input_region_key]
    stream_initpos = input_property_map[input_starting_position_key]

    output_stream = output_property_map[output_stream_key]
    output_region = output_property_map[output_region_key]

    # 2. Creates a source table from a Kinesis Data Stream
    table_env.execute_sql(
        create_table(input_table_name, input_stream, input_region, stream_initpos)
    )

    # 3. Creates a sink table writing to a Kinesis Data Stream
    table_env.execute_sql(
        create_table(output_table_name, output_stream, output_region, stream_initpos)
    )

    # 4. Queries from the Source Table and creates a sliding window over 10 seconds to calculate the minimum value
    # over the window.
    sliding_window_table = perform_sliding_window_aggregation(input_table_name)
    table_env.create_temporary_view("sliding_window_table", sliding_window_table)

    # Create print table to print results to the console
    table_env.execute_sql(create_print_table("print_table", "a", "b", "c"))

    # 5. These sliding windows are inserted into the print table
    table_result1 = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
                                          .format("print_table", "sliding_window_table"))

    if is_local:
        table_result1.wait()
    else:
        # get job status through TableResult
        print(table_result1.get_job_client().get_job_status())

if __name__ == "__main__":
    main()
jeff1evesque commented 2 years ago

@jeremyber-aws, any chance I could have a sink that goes to some blackhole? I don't need a sink just yet.

jeremyber-aws commented 2 years ago

If you replace the line

                'connector' = 'print'

with

                'connector' = 'blackhole'

there will be no output. This is a built-in connector for the python Table API.

jeff1evesque commented 2 years ago

@jeremyber-aws, what I was wondering was whether I could replace 'connector' = 'kinesis' with 'connector' = 'blackhole' for the sink. Specifically, I do not have an output_stream:

    # 3. Creates a sink table writing to a Kinesis Data Stream
    table_env.execute_sql(
        create_table(output_table_name, output_stream, output_region, stream_initpos)
    )

Otherwise, I'm curious whether step 4 and step 5 could be combined in a logical way to replace step 3, since I don't have a kinesis stream on hand for my sink:

    # 4. Queries from the Source Table and creates a sliding window over 10 seconds to calculate the minimum value
    # over the window.
    sliding_window_table = perform_sliding_window_aggregation(input_table_name)
    table_env.create_temporary_view("sliding_window_table", sliding_window_table)

    # Create print table to print results to the console
    table_env.execute_sql(create_print_table("print_table", "a", "b", "c"))

    # 5. These sliding windows are inserted into the print table
    table_result1 = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
                                          .format("print_table", "sliding_window_table"))
jeremyber-aws commented 2 years ago

Yes you can replace 'connector' = 'kinesis with 'connector' = 'blackhole' and there will simply be no output present. You may need to remove some of the other fields in the create clause like 'stream' = '{1}',, blackhole will be just like the print connector with no additional fields.

jeff1evesque commented 2 years ago

That's what somewhat tried to do in sliding_window.py. Specifically, for my local environment, I tried to have a print connector as a sink for local development/testing purposes:

    if is_local:
        table_env.execute_sql(
            create_print_table(output_table_name)
        )

    else:
        table_env.execute_sql(
            create_print_table(output_table_name, connector='blackhole')
        )

However, when I run this variant -- nothing is printed, despite records/entry being put into the source kinesis.

jeremyber-aws commented 2 years ago

@jeff1evesque, The BlackHole connector allows for swallowing all input records. It is designed for:

high performance testing. UDF to output, not substantive sink. Just like /dev/null device on Unix-like operating systems.

If you have properly set the IS_LOCAL environment variable in your IDE, you will create the print_table. This will elicit printed results in your local IDE. On a remote Kubernetes cluster, etc you may not see results on the job manager as these print statements will appear on the Task Managers.

Perhaps you've modified the create_print_table function to take the connector type as a parameter in your second block--if this is the case, this second block will only execute if you have not set the IS_LOCAL environment variable. If all else is true, using the blackhole connector necessarily means nothing will be printed.

Hope this makes sense!

Jeremy

jeff1evesque commented 2 years ago

@jeremyber-aws, yup my IS_LOCAL is properly set in pycharm. What I was saying in the original issue statement/post was that my print connector for the sink was not printing. Instead acting like a blackhole.

jeremyber-aws commented 2 years ago

Can you try the code I shared here? This is working on my Pycharm IDE

jeff1evesque commented 2 years ago

That code if I’m not mistaken, requires a Kinesis Stream for a sink? For local development, I’m using PyCharm with PyFlink simply to develop and test sliding window functionality from a Kinesis Stream source to hopefully a print connector for the sink.

jeremyber-aws commented 2 years ago

It does not--if you look at #5

    # 5. These sliding windows are inserted into the print table
    table_result1 = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
                                          .format("print_table", "sliding_window_table"))

This will insert the results of your sliding window table into the print_table defined above.

jeff1evesque commented 2 years ago

What about the following segment:

    # 3. Creates a sink table writing to a Kinesis Data Stream
    table_env.execute_sql(
        create_table(output_table_name, output_stream, output_region, stream_initpos)
    )

Do I need an output_stream, and does that coincide with a Kinesis Stream?

jeremyber-aws commented 2 years ago

You can remove this line as it isn't being used. I've just tested removing the #3 step and it works the same.

jeff1evesque commented 2 years ago

@jeremyber-aws, I just tried to run the suggested code with #3 removed, and a trivial print('is_local: {}'.format(is_local)). Unfortunately, for me it's stuck like my earlier code variation:

/Users/jeff1evesque/opt/miniconda3/envs/kinesis_analytics/bin/python3.8 /Users/jeff1evesque/application/kinesis-analytics/flink/sliding_window.py
is_local: True
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/Users/jeff1evesque/opt/miniconda3/envs/kinesis_analytics/lib/python3.8/site-packages/pyflink/lib/flink-dist_2.11-1.13.2.jar) to field java.util.Collections$SingletonList.serialVersionUID
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release

I did kick-off the datagen/stock.py, and I see the trace inputting json records into my Kinesis Stream.

jeremyber-aws commented 2 years ago

Hi Jeff, those warnings come for me as well--It's not preventing the Flink application from running. Once you start sending records into your source kinesis data stream you should see print statements appear.

image

jeff1evesque commented 2 years ago

I tried the provided "a", "b", "c" example once again:

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
# -*- coding: utf-8 -*-

"""
sliding-windows.py
~~~~~~~~~~~~~~~~~~~
This module:
    1. Creates a table environment
    2. Creates a source table from a Kinesis Data Stream
    3. Creates a sink table writing to a Kinesis Data Stream
    4. Queries from the Source Table and
       creates a sliding window over 10 seconds to calculate the minimum value over the window.
    5. These sliding window results are inserted into the Sink table.
"""

from pyflink.table import EnvironmentSettings, StreamTableEnvironment
from pyflink.table.window import Slide
import os
import json

# 1. Creates a Table Environment
env_settings = (
    EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
)
table_env = StreamTableEnvironment.create(environment_settings=env_settings)

APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json"  # on kda

is_local = (
    True if os.environ.get("IS_LOCAL") else False
)  # set this env var in your local environment

if is_local:
    # only for local, overwrite variable to properties and pass in your jars delimited by a semicolon (;)
    APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json"  # local

    CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
    table_env.get_config().get_configuration().set_string(
        "pipeline.jars",
        "file:///" + CURRENT_DIR + "/lib/flink-sql-connector-kinesis_2.12-1.13.2.jar",
    )

def get_application_properties():
    if os.path.isfile(APPLICATION_PROPERTIES_FILE_PATH):
        with open(APPLICATION_PROPERTIES_FILE_PATH, "r") as file:
            contents = file.read()
            properties = json.loads(contents)
            return properties
    else:
        print('A file at "{}" was not found'.format(APPLICATION_PROPERTIES_FILE_PATH))

def property_map(props, property_group_id):
    for prop in props:
        if prop["PropertyGroupId"] == property_group_id:
            return prop["PropertyMap"]

def create_table(table_name, stream_name, region, stream_initpos):
    return """ CREATE TABLE {0} (
                ticker VARCHAR(6),
                price DOUBLE,
                event_time TIMESTAMP(3),
                WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

              )
              PARTITIONED BY (ticker)
              WITH (
                'connector' = 'kinesis',
                'stream' = '{1}',
                'aws.region' = '{2}',
                'scan.stream.initpos' = '{3}',
                'sink.partitioner-field-delimiter' = ';',
                'sink.producer.collection-max-count' = '100',
                'format' = 'json',
                'json.timestamp-format.standard' = 'ISO-8601'
              ) """.format(
        table_name, stream_name, region, stream_initpos
    )

def perform_sliding_window_aggregation(input_table_name):
    # use SQL Table in the Table API
    input_table = table_env.from_path(input_table_name)

    sliding_window_table = (
        input_table.window(
            Slide.over("10.seconds")
            .every("5.seconds")
            .on("event_time")
            .alias("ten_second_window")
        )
        .group_by("ticker, ten_second_window")
        .select("ticker, price.min as price, ten_second_window.end as event_time")
    )

    return sliding_window_table

def create_print_table(table_name, stream_name, region, stream_initpos):
    return """ CREATE TABLE {0} (
                ticker VARCHAR(6),
                price DOUBLE,
                event_time TIMESTAMP(3),
                WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

              )
              WITH (
                'connector' = 'print'
              ) """.format(
        table_name, stream_name, region, stream_initpos
    )

def main():
    # tables
    input_table_name = "input_table"
    output_table_name = "output_table"

    input_stream = "TestStream"
    input_region = "us-east-1"
    stream_initpos = "LATEST"

    # 2. Creates a source table from a Kinesis Data Stream
    table_env.execute_sql(
        create_table(input_table_name, input_stream, input_region, stream_initpos)
    )

    # 4. Queries from the Source Table and creates a sliding window over 10 seconds to calculate the minimum value
    # over the window.
    sliding_window_table = perform_sliding_window_aggregation(input_table_name)
    table_env.create_temporary_view("sliding_window_table", sliding_window_table)

    # Create print table to print results to the console
    table_env.execute_sql(create_print_table("print_table", "a", "b", "c"))

    # 5. These sliding windows are inserted into the print table
    table_result1 = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
                                          .format("print_table", "sliding_window_table"))

    if is_local:
        table_result1.wait()
    else:
        # get job status through TableResult
        print(table_result1.get_job_client().get_job_status())

if __name__ == "__main__":
    main()

With the following datagen/stock.py:

import datetime
import json
import random
import boto3

STREAM_NAME = "TestStream"

def get_data():
    return {
        'event_time': datetime.datetime.now().isoformat(),
        'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
        'price': round(random.random() * 100, 2)    }

def generate(stream_name, kinesis_client):
    while True:
        data = get_data()
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(data),
            PartitionKey="partitionkey")

if __name__ == '__main__':
    generate(STREAM_NAME, boto3.client('kinesis'))

The following error trace is returned:

Users/jeff1evesque/opt/miniconda3/envs/kinesis_analytics/bin/python3.8 /Users/jeff1evesque/application/kinesis-analytics/flink/sliding_window.py
Traceback (most recent call last):
  File "/Users/jeff1evesque/application/kinesis-analytics/flink/sliding_window.py", line 152, in <module>
    main()
  File "/Users/jeff1evesque/application/kinesis-analytics/flink/sliding_window.py", line 140, in main
    table_result1 = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
  File "/Users/jeff1evesque/opt/miniconda3/envs/kinesis_analytics/lib/python3.8/site-packages/pyflink/table/table_environment.py", line 804, in execute_sql
    return TableResult(self._j_tenv.executeSql(stmt))
  File "/Users/jeff1evesque/opt/miniconda3/envs/kinesis_analytics/lib/python3.8/site-packages/py4j/java_gateway.py", line 1285, in __call__
    return_value = get_return_value(
  File "/Users/jeff1evesque/opt/miniconda3/envs/kinesis_analytics/lib/python3.8/site-packages/pyflink/util/exceptions.py", line 146, in deco
    return f(*a, **kw)
  File "/Users/jeff1evesque/opt/miniconda3/envs/kinesis_analytics/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql.
: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.input_table'.

Table options are:

'aws.region'='us-east-1'
'connector'='kinesis'
'format'='json'
'json.timestamp-format.standard'='ISO-8601'
'scan.stream.initpos'='LATEST'
'sink.partitioner-field-delimiter'=';'
'sink.producer.collection-max-count'='100'
'stream'='TestStream'
    at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:137)
    at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:116)
    at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:82)
    at org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:495)
    at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1099)
    at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1123)
    at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:351)
    at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:154)
    at org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:68)
    at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:151)
    at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:133)
    at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)
    at org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:68)
    at org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:150)
    at java.base/java.util.Collections$SingletonList.forEach(Collections.java:4856)
    at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:150)
    at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:133)
    at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:57)
    at org.apache.flink.table.operations.WindowAggregateQueryOperation.accept(WindowAggregateQueryOperation.java:111)
    at org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:150)
    at java.base/java.util.Collections$SingletonList.forEach(Collections.java:4856)
    at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:150)
    at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:133)
    at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:47)
    at org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:76)
    at org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:184)
    at org.apache.flink.table.planner.catalog.QueryOperationCatalogViewTable.convertToRel(QueryOperationCatalogViewTable.java:76)
    at org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:59)
    at org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:55)
    at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:169)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:161)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:989)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:958)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:283)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:603)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:272)
    at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
    at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='kinesis'
    at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:467)
    at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:441)
    at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:133)
    ... 57 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kinesis' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
datagen
filesystem
print
    at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319)
    at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:463)
    ... 59 more

Process finished with exit code 1
jeremyber-aws commented 2 years ago

Hi @jeff1evesque this error seems to indicate that the kinesis jar was not properly loaded into your path as indicated by the "Available factory identifiers".

Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kinesis' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
datagen
filesystem
print

Could you validate that the jar you are referencing on line 41 is in fact available in your /lib/ folder?

This refers to the source, not the sink (print) table.

jeff1evesque commented 2 years ago

I just reverted my code to the suggested example again, and ensured that the flink connector was in the lib/ directory:

image

Since the pathing wasn't correct, I just changed line 41 to the following (similar to my original case/posting):

image

Meanwhile, I have the datagen/stock.py executing in the background:

image

However, the former screenshot indicates that the PyFlink application is not reading anything being ingested. The problem between the original post and your adjusted sample code I think have similar underlying issues.

jeremyber-aws commented 2 years ago

@jeff1evesque your screenshot with the warnings, just to be clear, is not indicative of any issues, these are common warnings that happen at the start of execution of a Pyflink application (from what I have seen).

You can validate this by simply removing all transformations and merely printing data as it comes out of your Kinesis Data Stream from your Kinesis Data Analytics application.

Feel free to send me an e-mail (jdber@amazon.com) and we can further debug this on a screenshare or via e-mail, but I do not see any issues underlying the code.

I will update the issue once we have resolved.

jeff1evesque commented 2 years ago

Got some help from Apache-Flink community support, who suggested the following code addition:

    table_env.get_config().get_configuration().set_string(
        'parallelism.default',
        '1'
    )

Now, local PyCharm is able to capture the desired output:

      /Users/jeff1evesque/opt/miniconda3/envs/kinesis-analytics/bin/python /Users/jeff1evesque/application/kinesis-analytics/flink/sliding_window.py
      is_local: True

      Source Schema
      (
        `ticker` VARCHAR(6),
        `price` DOUBLE,
        `utc` TIMESTAMP(3) *ROWTIME*,
        WATERMARK FOR `utc`: TIMESTAMP(3) AS `utc` - INTERVAL '20' SECOND
      )

      Sink Schema
      (
        `ticker` VARCHAR(6),
        `price` DOUBLE,
        `utc` TIMESTAMP(3) *ROWTIME*,
        WATERMARK FOR `utc`: TIMESTAMP(3) AS `utc` - INTERVAL '20' SECOND
      )
      sliding_window_over: 2.minutes
      sliding_window_every: 1.minutes
      sliding_window_on: utc

      sliding_window_table
      (
        `ticker` VARCHAR(6),
        `price` DOUBLE,
        `utc` TIMESTAMP(3)
      )

      creating temporary view for sliding window table to access within SQL
      WARNING: An illegal reflective access operation has occurred
      WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/Users/jeff1evesque/opt/miniconda3/envs/kinesis-analytics/lib/python3.8/site-packages/pyflink/lib/flink-dist_2.11-1.13.2.jar) to field java.util.Collections$SingletonList.serialVersionUID
      WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
      WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
      WARNING: All illegal access operations will be denied in a future release
      +I[AMZN, 0.75, 2022-06-06T19:30]
      +I[TSLA, 0.59, 2022-06-06T19:30]
      +I[AAPL, 0.52, 2022-06-06T19:30]
      +I[MSFT, 0.26, 2022-06-06T19:30]
      +I[AMZN, 0.26, 2022-06-06T19:31]
      +I[MSFT, 0.26, 2022-06-06T19:31]
      +I[TSLA, 0.26, 2022-06-06T19:31]
      +I[AAPL, 0.01, 2022-06-06T19:31]
      +I[AMZN, 0.11, 2022-06-06T19:32]
      +I[MSFT, 0.17, 2022-06-06T19:32]
      +I[AAPL, 0.01, 2022-06-06T19:32]
      +I[TSLA, 0.03, 2022-06-06T19:32]
      +I[TSLA, 0.03, 2022-06-06T19:33]
      +I[AAPL, 0.06, 2022-06-06T19:33]
      +I[AMZN, 0.02, 2022-06-06T19:33]
      +I[MSFT, 0.16, 2022-06-06T19:33]
      +I[AAPL, 0.06, 2022-06-06T19:34]
      +I[MSFT, 0.01, 2022-06-06T19:34]
      +I[TSLA, 0.03, 2022-06-06T19:34]
      +I[AMZN, 0.02, 2022-06-06T19:34]
      +I[MSFT, 0.01, 2022-06-06T19:35]
      +I[AAPL, 0.18, 2022-06-06T19:35]
      +I[TSLA, 0.03, 2022-06-06T19:35]
      +I[AMZN, 0.03, 2022-06-06T19:35]
jeremyber-aws commented 2 years ago

Awesome! This issue will be considered resolved.

If you are running your application with more parallelism than available kinesis data stream shards, some consumer instances could be idle and therefore prevent watermarks in your event time processing application to advance. I suggest setting the Shard Idle Interval Milliseconds to a timeout value for when shards should be considered idle. This will prevent watermarks from not advancing and therefore output from windows not being emitted.