2023-08-01 21:06:13,500 [LeaseCoordinator-0000] INFO s.a.k.l.d.DynamoDBLeaseRefresher [NONE] - Transferred lease shardId-000000000000 ownership from f4457d0c-af35-4e59-b036-dca933497848 to 2ee036cd-7d94-4a56-b141-816baf05cdda
2023-08-01 21:06:13,501 [LeaseCoordinator-0000] INFO s.a.k.l.dynamodb.DynamoDBLeaseTaker [NONE] - Worker 2ee036cd-7d94-4a56-b141-816baf05cdda successfully took 1 leases: shardId-000000000000
2023-08-01 21:06:16,533 [multi-lang-daemon-0000] INFO s.a.k.r.f.FanOutConsumerRegistration [NONE] - abdul-stream2 : Waiting for StreamConsumer PythonKCLSample to have ACTIVE status...
2023-08-01 21:06:17,552 [multi-lang-daemon-0000] INFO s.a.kinesis.coordinator.Scheduler [NONE] - Created new shardConsumer for : ShardInfo(streamIdentifierSerOpt=Optional.empty, shardId=shardId-000000000000, concurrencyToken=00c07f81-fe6d-4fd8-82da-1f6781bf3605, parentShardIds=[], checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0})
2023-08-01 21:06:17,554 [ShardRecordProcessor-0000] INFO s.a.k.l.BlockOnParentShardTask [NONE] - No need to block on parents [] of shard shardId-000000000000
2023-08-01 21:06:18,788 [ShardRecordProcessor-0000] ERROR s.a.k.m.MultiLangShardRecordProcessor [NONE] - Encountered an error while trying to initialize record processor
java.io.IOException: Failed to start client executable
at software.amazon.kinesis.multilang.MultiLangShardRecordProcessor.initialize(MultiLangShardRecordProcessor.java:79)
at software.amazon.kinesis.lifecycle.InitializeTask.call(InitializeTask.java:102)
at software.amazon.kinesis.lifecycle.ShardConsumer.executeTask(ShardConsumer.java:336)
at software.amazon.kinesis.lifecycle.ShardConsumer.lambda$initializeComplete$2(ShardConsumer.java:289)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1623)
Caused by: java.io.IOException: Cannot run program "sample_kclpy_app.py": CreateProcess error=193, %1 is not a valid Win32 application
at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1140)
at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1074)
at software.amazon.kinesis.multilang.MultiLangShardRecordProcessor.startProcess(MultiLangShardRecordProcessor.java:321)
at software.amazon.kinesis.multilang.MultiLangShardRecordProcessor.initialize(MultiLangShardRecordProcessor.java:73)
Here is the sample.properties file.
`# The script that abides by the multi-language protocol. This script will be executed by the MultiLangDaemon, which will communicate with this scriptover STDIN and STDOUT according to the multi-language protocol.
executableName = sample_kclpy_app.py
Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
from future import print_function
import sys
import time
from amazon_kclpy import kcl
from amazon_kclpy.v3 import processor
class RecordProcessor(processor.RecordProcessorBase):
"""
A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern:
* initialize will be called once
* process_records will be called zero or more times
* shutdown will be called if this MultiLangDaemon instance loses the lease to this shard, or the shard ends due
a scaling change.
"""
def __init__(self):
self._SLEEP_SECONDS = 5
self._CHECKPOINT_RETRIES = 5
self._CHECKPOINT_FREQ_SECONDS = 60
self._largest_seq = (None, None)
self._largest_sub_seq = None
self._last_checkpoint_time = None
def log(self, message):
sys.stderr.write(message)
def initialize(self, initialize_input):
"""
Called once by a KCLProcess before any calls to process_records
:param amazon_kclpy.messages.InitializeInput initialize_input: Information about the lease that this record
processor has been assigned.
"""
self._largest_seq = (None, None)
self._last_checkpoint_time = time.time()
def checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=None):
"""
Checkpoints with retries on retryable exceptions.
:param amazon_kclpy.kcl.Checkpointer checkpointer: the checkpointer provided to either process_records
or shutdown
:param str or None sequence_number: the sequence number to checkpoint at.
:param int or None sub_sequence_number: the sub sequence number to checkpoint at.
"""
for n in range(0, self._CHECKPOINT_RETRIES):
try:
checkpointer.checkpoint(sequence_number, sub_sequence_number)
return
except kcl.CheckpointError as e:
if 'ShutdownException' == e.value:
#
# A ShutdownException indicates that this record processor should be shutdown. This is due to
# some failover event, e.g. another MultiLangDaemon has taken the lease for this shard.
#
print('Encountered shutdown exception, skipping checkpoint')
return
elif 'ThrottlingException' == e.value:
#
# A ThrottlingException indicates that one of our dependencies is is over burdened, e.g. too many
# dynamo writes. We will sleep temporarily to let it recover.
#
if self._CHECKPOINT_RETRIES - 1 == n:
sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n))
return
else:
print('Was throttled while checkpointing, will attempt again in {s} seconds'
.format(s=self._SLEEP_SECONDS))
elif 'InvalidStateException' == e.value:
sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n')
else: # Some other error
sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e))
time.sleep(self._SLEEP_SECONDS)
def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
"""
Called for each record that is passed to process_records.
:param str data: The blob of data that was contained in the record.
:param str partition_key: The key associated with this record.
:param int sequence_number: The sequence number associated with this record.
:param int sub_sequence_number: the sub sequence number associated with this record.
"""
####################################
# Insert your processing logic here
####################################
self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}"
.format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data)))
def should_update_sequence(self, sequence_number, sub_sequence_number):
"""
Determines whether a new larger sequence number is available
:param int sequence_number: the sequence number from the current record
:param int sub_sequence_number: the sub sequence number from the current record
:return boolean: true if the largest sequence should be updated, false otherwise
"""
return self._largest_seq == (None, None) or sequence_number > self._largest_seq[0] or \
(sequence_number == self._largest_seq[0] and sub_sequence_number > self._largest_seq[1])
def process_records(self, process_records_input):
"""
Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers
from the records to indicate where in the stream to checkpoint.
:param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the
records.
"""
try:
for record in process_records_input.records:
data = record.binary_data
seq = int(record.sequence_number)
sub_seq = record.sub_sequence_number
key = record.partition_key
self.process_record(data, key, seq, sub_seq)
if self.should_update_sequence(seq, sub_seq):
self._largest_seq = (seq, sub_seq)
#
# Checkpoints every self._CHECKPOINT_FREQ_SECONDS seconds
#
if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS:
self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1])
self._last_checkpoint_time = time.time()
except Exception as e:
self.log("Encountered an exception while processing records. Exception was {e}\n".format(e=e))
def lease_lost(self, lease_lost_input):
self.log("Lease has been lost")
def shard_ended(self, shard_ended_input):
self.log("Shard has ended checkpointing")
shard_ended_input.checkpointer.checkpoint()
def shutdown_requested(self, shutdown_requested_input):
self.log("Shutdown has been requested, checkpointing.")
shutdown_requested_input.checkpointer.checkpoint()
if name == "main":
print("inside main")
kcl_process = kcl.KCLProcess(RecordProcessor())
print("KCL Process: ", kcl_process)
kcl_process.run()
`
I am getting this error.
2023-08-01 21:06:13,500 [LeaseCoordinator-0000] INFO s.a.k.l.d.DynamoDBLeaseRefresher [NONE] - Transferred lease shardId-000000000000 ownership from f4457d0c-af35-4e59-b036-dca933497848 to 2ee036cd-7d94-4a56-b141-816baf05cdda 2023-08-01 21:06:13,501 [LeaseCoordinator-0000] INFO s.a.k.l.dynamodb.DynamoDBLeaseTaker [NONE] - Worker 2ee036cd-7d94-4a56-b141-816baf05cdda successfully took 1 leases: shardId-000000000000 2023-08-01 21:06:16,533 [multi-lang-daemon-0000] INFO s.a.k.r.f.FanOutConsumerRegistration [NONE] - abdul-stream2 : Waiting for StreamConsumer PythonKCLSample to have ACTIVE status... 2023-08-01 21:06:17,552 [multi-lang-daemon-0000] INFO s.a.kinesis.coordinator.Scheduler [NONE] - Created new shardConsumer for : ShardInfo(streamIdentifierSerOpt=Optional.empty, shardId=shardId-000000000000, concurrencyToken=00c07f81-fe6d-4fd8-82da-1f6781bf3605, parentShardIds=[], checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}) 2023-08-01 21:06:17,554 [ShardRecordProcessor-0000] INFO s.a.k.l.BlockOnParentShardTask [NONE] - No need to block on parents [] of shard shardId-000000000000 2023-08-01 21:06:18,788 [ShardRecordProcessor-0000] ERROR s.a.k.m.MultiLangShardRecordProcessor [NONE] - Encountered an error while trying to initialize record processor java.io.IOException: Failed to start client executable at software.amazon.kinesis.multilang.MultiLangShardRecordProcessor.initialize(MultiLangShardRecordProcessor.java:79) at software.amazon.kinesis.lifecycle.InitializeTask.call(InitializeTask.java:102) at software.amazon.kinesis.lifecycle.ShardConsumer.executeTask(ShardConsumer.java:336) at software.amazon.kinesis.lifecycle.ShardConsumer.lambda$initializeComplete$2(ShardConsumer.java:289) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1623) Caused by: java.io.IOException: Cannot run program "sample_kclpy_app.py": CreateProcess error=193, %1 is not a valid Win32 application at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1140) at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1074) at software.amazon.kinesis.multilang.MultiLangShardRecordProcessor.startProcess(MultiLangShardRecordProcessor.java:321) at software.amazon.kinesis.multilang.MultiLangShardRecordProcessor.initialize(MultiLangShardRecordProcessor.java:73)
Here is the sample.properties file.
`# The script that abides by the multi-language protocol. This script will be executed by the MultiLangDaemon, which will communicate with this scriptover STDIN and STDOUT according to the multi-language protocol. executableName = sample_kclpy_app.py
streamArn = arn:aws:kinesis:us-east-1:249144947172:stream/abdul-stream2
streamName = abdul-stream2
applicationName = PythonKCLSample
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
processingLanguage = python/3.11 initialPositionInStream = TRIM_HORIZON
regionName = us-east-1 `
Here is the python sample_kplpy_app.py file.
`#!C:/Users/rehmana16/AppData/Local/Programs/Python/Python311/python.exe
Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
from future import print_function
import sys import time
from amazon_kclpy import kcl from amazon_kclpy.v3 import processor
class RecordProcessor(processor.RecordProcessorBase): """ A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern:
if name == "main": print("inside main") kcl_process = kcl.KCLProcess(RecordProcessor()) print("KCL Process: ", kcl_process) kcl_process.run() `