Closed AviKatziuk closed 9 months ago
Q [2,a] I would like to use RTDIP components to read from PythonDeltaSource , transform using BaseRawToMDMTransformer then write to EVMContractDestination by @lyndanajjar
from rtdip.pipeline import Pipeline
from rtdip.components.sources.python.delta import PythonDeltaSource
from rtdip.components.destinations.spark.evm_contract import EVMContractDestination
# Create a pipeline
pipeline = Pipeline()
# Add a Delta source component
delta_source = PythonDeltaSource(table_name='your_delta_table')
pipeline.add_component(delta_source)
# Add a destination component
evm_contract_destination = EVMContractDestination(contract_id='your_contract_id', connection_string='your_connection_string')
pipeline.add_component(evm_contract_destination)
# Connect the components
pipeline.connect(delta_source, evm_contract_destination)
# Run the pipeline
pipeline.run()
Q [2,b] I would like to use RTDIP components to read from PythonDeltaSource , transform using BaseRawToMDMTransformer then write to EVMContractDestination by @lyndanajjar
from rtdip.pipeline import Pipeline
from rtdip.components.sources.python.delta_sharing import PythonDeltaSharingSource
from rtdip.components.transformers.spark.sem_json_to_pcdm import SEMJsonToPCDMTransformer
from rtdip.components.destinations.spark.delta import SparkDeltaDestination
# Create the pipeline
pipeline = Pipeline()
# Add the components to the pipeline
source = PythonDeltaSharingSource("path_to_delta_table")
transformer = SEMJsonToPCDMTransformer()
destination = SparkDeltaDestination("path_to_delta_table")
pipeline.add_component(source)
pipeline.add_component(transformer)
pipeline.add_component(destination)
# Connect the components in the pipeline
source.connect(transformer)
transformer.connect(destination)
# Execute the pipeline
pipeline.execute()
Q [2,c] I would like to use RTDIP components to read from SparkDeltaSource , transform using BaseRawToMDMTransformer then write to SparkKafkaDestination by @lyndanajjar
from rtdip_sdk.pipelines.sources import SparkDeltaSource
from rtdip_sdk.pipelines.transformers import BaseRawToMDMTransformer
from rtdip_sdk.pipelines.destinations import SparkKafkaDestination
def pipeline():
source_df = SparkDeltaSource(
spark=spark,
table_name="delta_table_name"
).read_batch()
transform_df = BaseRawToMDMTransformer(
spark=spark,
data=source_df
).transform()
SparkKafkaDestination(
data=transform_df,
options={
"kafka.bootstrap.servers": "localhost:9092",
"topic": "output_topic"
}
).write_batch()
if __name__ == "__main__":
pipeline()
Q [2,d] I would like to use RTDIP components to read from PythonDeltaSource , transform using BaseRawToMDMTransformer then write to SparkKafkaEventhubDestination by @Nahrain1
from rtdip.pipeline import Pipeline
from rtdip.components.sources.python.delta import PythonDeltaSource
from rtdip.components.transformers.python.base_raw_to_mdm import BaseRawToMDMTransformer
from rtdip.components.destinations.spark.kafka_eventhub import SparkKafkaEventhubDestination
# Create the pipeline
pipeline = Pipeline()
# Add the source component
source = PythonDeltaSource(
table_name="your_delta_table_name"
)
pipeline.add_component(source)
# Add the transformer component
transformer = BaseRawToMDMTransformer()
pipeline.add_component(transformer)
# Add the destination component
destination = SparkKafkaEventhubDestination(
broker_list="your_kafka_broker_list",
topic="your_kafka_topic"
)
pipeline.add_component(destination)
# Connect the components
pipeline.connect(source, transformer)
pipeline.connect(transformer, destination)
# Run the pipeline
pipeline.run()
Q [2,e] I would like to use RTDIP components to read from PythonDeltaSource , transform using PandasToPySparkTransformer then write to SparkDeltaMergeDestinationby @Nahrain1
from rtdip_sdk.pipelines.sources import PythonDeltaSource
from rtdip_sdk.pipelines.transformers import PandasToPySparkTransformer
from rtdip_sdk.pipelines.destinations import SparkDeltaMergeDestination
def pipeline():
source_df = PythonDeltaSource(
delta_table_path="path/to/delta/table"
).read_batch()
transformed_df = PandasToPySparkTransformer(
data=source_df
).transform()
SparkDeltaMergeDestination(
data=transformed_df,
options={
"mergeCondition": "merge_condition",
"mergeKey": "merge_key",
"updateColumns": ["col1", "col2"],
"deleteColumns": ["col3"]
},
destination="path/to/merge/destination"
).write_batch()
if __name__ == "__main__":
pipeline()
Q [2,f] I would like to use RTDIP components to read from SparkDeltaSource , transform using SEMJsonToPCDMTransformer then write to SparkKafkaEventhubDestination by @Nahrain1
from rtdip_sdk.pipelines.sources.spark.delta import SparkDeltaSource
from rtdip_sdk.pipelines.transformers.spark.sem_json_to_pcdm import SEMJsonToPCDMTransformer
from rtdip_sdk.pipelines.destinations.spark.kafka_eventhub import SparkKafkaEventhubDestination
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
import json
def pipeline():
spark = SparkSessionUtility(config={}).execute()
delta_source_configuration = {
"delta.path": "/path/to/delta_table"
}
source_df = SparkDeltaSource(spark, delta_source_configuration).read_batch()
pcdm_df = SEMJsonToPCDMTransformer(source_df, "body").transform()
kafka_eventhub_destination_configuration = {
"kafka.bootstrap.servers": "your.kafka.bootstrap.servers",
"eventhubs.connectionString": "{EventhubConnectionString}"
}
SparkKafkaEventhubDestination(
spark, data=pcdm_df, options=kafka_eventhub_destination_configuration
).write_batch()
if __name__ == "__main__":
pipeline()
Q[2,a]: I would like to use RTDIP components to read from PythonDeltaSource , transform using BaseRawToMDMTransformer then write to EVMContractDestination
The import from rtdip.components.sources.python.delta import PythonDeltaSource,
is incorrect. It should follow:
from rtdip_sdk.pipelines.sources.python.delta import PythonDeltaSource
the following does not exist
from rtdip.pipeline import Pipeline
NOTE: Best to follow the same format as 2c where you use
def Pipeline():
unless you are trying to use pipeline execute components, in which case the import is
from rtdip_sdk.pipelines.execute import PipelineJob, PipelineStep, PipelineTask
which is creating a job which executes a list of pipeline steps. However for the purpose of this challenge and the time constraint, I would suggest sticking to following the outcome of 2c (NOTE ABOVE)
The source components need to include .read_batch()
at the end (see below)
delta_source = PythonDeltaSource(table_name='your_delta_table').read_batch()
Tranform step is missing (NOTE: BaseRawToMDMTransformer is specific to ISO data but you shouldn't use any files that includes the word Base, if you would like to create a example using ISO components then please only use the components in the ISO source and transformer folders. but that can then be written to any destination)
The destination EVMContractDestination would only work for blockchain contracts. It would not be able to write from reading a data source.
Q[2,b]: I would like to use RTDIP components to read from PythonDeltaSharingSource , transform using SEMJsonToPCDMTransformer then write to SparkDeltaDestination
from rtdip.pipeline import Pipeline
from rtdip.components.sources.python.delta_sharing import PythonDeltaSharingSource
from rtdip.components.transformers.spark.sem_json_to_pcdm import SEMJsonToPCDMTransformer
from rtdip.components.destinations.spark.delta import SparkDeltaDestination
it should be
from rtdip_sdk.pipelines.sources.python.delta_sharing import PythonDeltaSharingSource
from rtdip_sdk.pipelines.transformers.spark.sem_json_to_pcdm import SEMJsonToPCDMTransformer
from rtdip_sdk.pipelines.destinations.spark.delta import SparkDeltaDestination
source = PythonDeltaSharingSource("path_to_delta_table")
is missing .read_batch()
at the end and the parameters are incorrect. it should be:PythonDeltaSharingSource(
profile_path="{CREDENTIAL-FILE-LOCATION}",
share_name="{SHARE-NAME}",
schema_name="{SCHEMA-NAME}",
table_name="{TABLE-NAME}"
).read_batch()
SEMJsonToPCDMTransformer
is missing parameters and also .transform()
at the end. It should be:
SEMJsonToPCDMTransformer(
data={DATA}
source_column_name="{SOURCE-COLUMN-NAME}",
version={VERSION-NUMBER},
).transform()
SparkDeltaDestination
is missing required parameters and .write_batch()
at the end. It should be:
SparkDeltaDestination(
data={DATA}
options={},
destination="{DELTA-TABLE-PATH}",
).write_batch()
from rtdip_sdk.pipelines.transformers.spark.sem_json_to_pcdm import SEMJsonToPCDMTransformer
from rtdip_sdk.pipelines.destinations.spark.delta import SparkDeltaDestination
def pipeline():
source = PythonDeltaSharingSource(
profile_path="{CREDENTIAL-FILE-LOCATION}",
share_name="{SHARE-NAME}",
schema_name="{SCHEMA-NAME}",
table_name="{TABLE-NAME}"
).read_batch()
tranform = SEMJsonToPCDMTransformer(
data={source},
source_column_name="{SOURCE-COLUMN-NAME}",
version={VERSION-NUMBER},
).transform()
SparkDeltaDestination(
data=tranform,
options={},
destination="{DELTA-TABLE-PATH}",
).write_batch()
if __name__ == "__main__":
pipeline()
Q[2,c]: I would like to use RTDIP components to read from SparkDeltaSource , transform using BaseRawToMDMTransformer then write to SparkKafkaDestination
The import from rtdip_sdk.pipelines.transformers import BaseRawToMDMTransformer
is incorrect. BUT this transformer shouldn't be used as it is a base file. Please use other transformers in future tests.
Needs a spark session, so will need the following import from rtdip_sdk.pipelines.utilities import SparkSessionUtility
and the following command line
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
BaseRawToMDMTransformer
. If this output was using another transformer with the correct parameters, and included the spark session then this result would pass. Q [2,d] I would like to use RTDIP components to read from PythonDeltaSource , transform using BaseRawToMDMTransformer then write to SparkKafkaEventhubDestination
The import from rtdip_sdk.pipelines.transformers import BaseRawToMDMTransformer is incorrect. BUT this transformer shouldn't be used as it is a base file. Please use other transformers in future tests.
We are seeing the same issues as in [2,a] and [2,b] with incorrect importing of the Functions and calling on non existing import Pipeline.
Pipeline structure is causing issues with this script and you should look to have a structure like in [2,d]. All functions are missing their actions ie read_batch, write_batch, etc.
A correct example is shown below:
from rtdip_sdk.pipelines.sources import PythonDeltaSource
from rtdip_sdk.pipelines.transformers import BaseRawToMDMTransformer
from rtdip_sdk.pipelines.destinations import SparkKafkaEventhubDestination
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
def pipeline():
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
path = "abfss://{FILE-SYSTEM}@{ACCOUNT-NAME}.dfs.core.windows.net/{PATH}/{FILE-NAME}
source_df = PythonDeltaSource(
path=path,
version=None,
storage_options={
"azure_storage_account_name": "{AZURE-STORAGE-ACCOUNT-NAME}",
"azure_storage_account_key": "{AZURE-STORAGE-ACCOUNT-KEY}"
},
pyarrow_options=None,
without_files=False
).read_batch()
transform_df = BaseRawToMDMTransformer(
spark=spark,
data=source_df
).transform()
connectionString = Endpoint=sb://{NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}=;EntityPath={EVENT_HUB_NAME}
eventhub_destination = SparkKafkaEventhubDestination(
spark=spark,
data=transform_df,
options={
"kafka.bootstrap.servers": "host1:port1,host2:port2"
},
consumer_group="{YOUR-EVENTHUB-CONSUMER-GROUP}",
trigger="10 seconds",
query_name="KafkaEventhubDestination",
query_wait_interval=None
)
eventhub_destination.write_batch()
if __name__ == "__main__":
pipeline()
Q [2,e] I would like to use RTDIP components to read from PythonDeltaSource , transform using PandasToPySparkTransformer then write to SparkDeltaMergeDestination
Missing the the spark component for the transformation if not using databricks
This query when not specified shows how to write to batch is the write to batch the default? Would it be possible to show both the write batch option and the write stream option?
The below is pretty good just made some minor adjustments to include the spark component:
from rtdip_sdk.pipelines.sources import PythonDeltaSource
from rtdip_sdk.pipelines.transformers import PandasToPySparkTransformer
from rtdip_sdk.pipelines.destinations import SparkDeltaMergeDestination
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
def pipeline():
source_df = PythonDeltaSource(
delta_table_path="path/to/delta/table"
).read_batch()
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
transformed_df = PandasToPySparkTransformer(
data=source_df,
spark=spark
).transform()
SparkDeltaMergeDestination(
data=transformed_df,
options={
"mergeCondition": "merge_condition",
"mergeKey": "merge_key",
"updateColumns": ["col1", "col2"],
"deleteColumns": ["col3"]
},
destination="path/to/merge/destination"
).write_batch()
if __name__ == "__main__":
pipeline()
Q [2,f] I would like to use RTDIP components to read from SparkDeltaSource , transform using SEMJsonToPCDMTransformer then write to SparkKafkaEventhubDestination
The import paths can be simplified, I have changed this in the example.
SparkDeltaSourceSparkDeltaSource is missing options.
SEMJsonToPCDMTransformer is missing version.
SparkKafkaEventhubDestination is missing consumer_group.
Really good example only a few minor issues, please find the updated code below:
from rtdip_sdk.pipelines.sources import SparkDeltaSource
from rtdip_sdk.pipelines.transformers import SEMJsonToPCDMTransformer
from rtdip_sdk.pipelines.destinations import SparkKafkaEventhubDestination
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
import json
def pipeline():
spark = SparkSessionUtility(config={}).execute()
delta_source_configuration = {
"delta.path": "/path/to/delta_table"
}
source_df = SparkDeltaSource(spark, delta_source_configuration, options={}).read_batch()
pcdm_df = SEMJsonToPCDMTransformer(source_df, "body", version=10).transform()
kafka_eventhub_destination_configuration = {
"kafka.bootstrap.servers": "your.kafka.bootstrap.servers",
"eventhubs.connectionString": "{EventhubConnectionString}"
}
SparkKafkaEventhubDestination(
spark, data=pcdm_df, options=kafka_eventhub_destination_configuration, consumer_group="{YOUR-EVENTHUB-CONSUMER-GROUP}"
).write_batch()
if __name__ == "__main__":
pipeline()
User story
Testing week 2 queries
For every query you must add "I would like to use RTDIP component... (and then the query)"
Q[2,a]: I would like to use RTDIP components to read from PythonDeltaSource , transform using BaseRawToMDMTransformer then write to EVMContractDestination
Q[2,b]: I would like to use RTDIP components to read from PythonDeltaSharingSource , transform using SEMJsonToPCDMTransformer then write to SparkDeltaDestination
Q[2,c]: I would like to use RTDIP components to read from SparkDeltaSource , transform using BaseRawToMDMTransformer then write to SparkKafkaDestination
Q[2,d]: I would like to use RTDIP components to read from PythonDeltaSource , transform using BaseRawToMDMTransformer then write to SparkKafkaEventhubDestination
Q[2,e]: I would like to use RTDIP components to read from PythonDeltaSource , transform using PandasToPySparkTransformer then write to SparkDeltaMergeDestination
Q[2,f]: I would like to use RTDIP components to read from SparkDeltaSource , transform using SEMJsonToPCDMTransformer then write to SparkKafkaEventhubDestination
Acceptance criteria
Definition of done (DoD)
Feature DoD:
Sprint Release DoD:
Project Release Definition of Done
DoD general criteria