Closed AviKatziuk closed 7 months ago
Q[1,a] - by Obi
from rtdip_sdk.pipelines.sources.spark.eventhub import SparkEventhubSource
from rtdip_sdk.pipelines.transformers.spark.binary_to_string import BinaryToStringTransformer
from rtdip_sdk.pipelines.transformers.spark.edgex_opcua_json_to_pcdm import EdgeXOPCUAJsonToPCDMTransformer
from rtdip_sdk.pipelines.destinations.spark.delta import SparkDeltaDestination
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
import json
def pipeline():
spark = SparkSessionUtility(config={}).execute()
eventhub_source_configuration = {
"eventhubs.connectionString": "your_eventhub_connection_string",
"eventhubs.consumerGroup": "your_consumer_group",
"eventhubs.startingPosition": json.dumps(
{"offset": "0", "seqNo": -1, "enqueuedTime": None, "isInclusive": True}
),
}
source = SparkEventhubSource(spark, eventhub_source_configuration).read_batch()
string_data = BinaryToStringTransformer(source, "body", "body").transform()
PCDM_data = EdgeXOPCUAJsonToPCDMTransformer(string_data, "body").transform()
delta_destination_configuration = {
"delta.path": "path_to_delta_table",
"delta.mode": "overwrite",
}
SparkDeltaDestination(
PCDM_data, delta_destination_configuration
).write_batch()
if __name__ == "__main__":
pipeline()
Q[1,b] - by Linda
from rtdip_sdk.pipelines.transformers.json_parser import JSONParser
from rtdip_sdk.pipelines.destinations.hive import HiveDestination
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
def pipeline():
spark = SparkSessionUtility(config={}).execute()
kafka_source_configuration = {
"kafka.bootstrap.servers": "<bootstrap_server>",
"subscribe": "<topic>",
"startingOffsets": "earliest",
"failOnDataLoss": "false"
}
kafka_df = KafkaSource(spark, kafka_source_configuration).read_batch()
parsed_df = JSONParser(kafka_df).transform()
hive_destination_configuration = {
"database": "<database>",
"table": "<table>"
}
HiveDestination(parsed_df, hive_destination_configuration).write_batch()
if __name__ == "__main__":
pipeline()
Q[1,c] - Naharin
Removed
Q[1,d] - by Obi
from rtdip_sdk.pipelines.sources.spark.mqtt import SparkMQTTSource
from rtdip_sdk.pipelines.transformers.spark.string_to_json import StringToJsonTransformer
from rtdip_sdk.pipelines.transformers.spark.filter import FilterTransformer
from rtdip_sdk.pipelines.destinations.spark.elasticsearch import SparkElasticsearchDestination
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Set up the MQTT configuration
mqtt_conf = {
"mqtt.serverUri": "mqtt://<broker-url>",
"mqtt.topic": "<mqtt-topic>",
"mqtt.username": "<username>",
"mqtt.password": "<password>",
"mqtt.cleanSession": True,
"mqtt.keepAliveInterval": 60
}
# Set up the Elasticsearch configuration
es_conf = {
"es.nodes": "<elasticsearch-host>",
"es.port": "<elasticsearch-port>",
"es.resource": "<index-name>/<document-type>",
"es.nodes.wan.only": True
}
# Set the threshold value to filter out readings
threshold = 50
def pipeline():
spark = SparkSessionUtility(config={}).execute()
# Read data from MQTT broker
source = SparkMQTTSource(spark, mqtt_conf).read_stream()
# Transform string data to JSON
json_data = StringToJsonTransformer(source, "value", "json").transform()
# Filter out readings below the threshold value
filtered_data = FilterTransformer(json_data, f"value >= {threshold}").transform()
# Write data to Elasticsearch
SparkElasticsearchDestination(data=filtered_data, options=es_conf).write_stream()
if __name__ == "__main__":
pipeline()
Q[1,e] - by Obi
import os
import sys
import boto3
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType
# Set AWS credentials
os.environ['AWS_ACCESS_KEY_ID'] = 'your_access_key'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'your_secret_access_key'
# Create SparkSession
spark = SparkSession.builder.appName("FinancialDataImport").getOrCreate()
# Read Parquet file from S3 bucket
s3_bucket = "your_s3_bucket"
parquet_file = "your_parquet_file"
s3_path = f"s3a://{s3_bucket}/{parquet_file}"
df = spark.read.parquet(s3_path)
# Apply Standard Scaler transformation
features = ["feature1", "feature2", "feature3"] # Replace with actual feature columns
assembler = VectorAssembler(inputCols=features, outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
pipeline = Pipeline(stages=[assembler, scaler])
pipeline_model = pipeline.fit(df)
scaled_df = pipeline_model.transform(df).select("scaled_features")
# Convert Spark DataFrame to Pandas DataFrame
pandas_df = scaled_df.toPandas()
# Upload Pandas DataFrame to Redshift database
redshift_host = "your_redshift_host"
redshift_db = "your_redshift_db"
redshift_user = "your_redshift_user"
redshift_password = "your_redshift_password"
redshift_table = "your_redshift_table"
redshift_port = 5439
# Create Redshift connection
conn = psycopg2.connect(
host=redshift_host,
port=redshift_port,
user=redshift_user,
password=redshift_password,
database=redshift_db
)
# Convert Pandas DataFrame to CSV string
csv_string = pandas_df.to_csv(index=False, header=False)
# Copy CSV string to Redshift table
cursor = conn.cursor()
cursor.copy_from(StringIO(csv_string), redshift_table, sep=',')
conn.commit()
# Close Redshift connection
cursor.close()
conn.close()
# Print success message
print("Financial data imported and uploaded to Redshift successfully.")
Q[1,f] - by Obi
from rtdip_sdk.pipelines.sources.http import HttpSource
from rtdip_sdk.pipelines.transformers.spark.json_normalizer import JsonNormalizer
from rtdip_sdk.pipelines.destinations.mongodb import MongoDbDestination
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
def pipeline():
spark = SparkSessionUtility(config={}).execute()
# Define the source configuration
source_configuration = {
"url": "https://api.example.com/temperature-data",
"headers": {
"Authorization": "Bearer {your_access_token}"
}
}
# Read data from the REST API
http_source = HttpSource(spark, source_configuration)
source_df = http_source.read_batch()
# Normalize the data
normalizer = JsonNormalizer(source_df)
normalized_df = normalizer.transform()
# Define the MongoDB destination configuration
destination_configuration = {
"uri": "mongodb://{username}:{password}@{host}:{port}/{database}.{collection}",
"mode": "append"
}
# Write the data to MongoDB
mongodb_destination = MongoDbDestination(spark, normalized_df, destination_configuration)
mongodb_destination.write_batch()
if __name__ == "__main__":
pipeline()```
Q[1,g] - by Obi
from rtdip_sdk.pipelines.sources.gcp.gcs import GCSBlobSource
from rtdip_sdk.pipelines.transformers.python.json_sentiment_analysis import SentimentAnalysisTransformer
from rtdip_sdk.pipelines.destinations.gcp.bigquery import BigQueryDestination
from rtdip_sdk.pipelines.utilities import GCPClientUtility
def pipeline():
# Set up GCP client
gcp_client = GCPClientUtility()
# Read data from Google Cloud Storage
source = GCSBlobSource(
gcp_client=gcp_client,
bucket_name="{your_bucket_name}",
blob_path_prefix="{your_blob_path_prefix}",
file_extension=".json"
).read_batch()
# Perform sentiment analysis on JSON logs
sentiment_data = SentimentAnalysisTransformer(
data=source,
text_column="log_message",
sentiment_column="sentiment_score"
).transform()
# Write results to Google BigQuery
BigQueryDestination(
gcp_client=gcp_client,
dataset_name="{your_dataset_name}",
table_name="{your_table_name}",
write_mode="overwrite"
).write_batch(data=sentiment_data)
if __name__ == "__main__":
pipeline()
User story
Testing week 1 queries
Q[1,a]: I would like to use RTDIP components to read from an eventhub using 'connection string' as the connection string, and 'consumer group' as the consumer group, transform using binary to string, and edge x transformer then write to delta.
Q[1,b]: I need to read data from Kafka using a specific bootstrap server and topic, then apply a JSON parser, and finally write the results to a Hive table.
Q[1,c]: Removed
Q[1,d]: Stream data from a MQTT broker, filter out readings below a threshold value, and store the data in Elasticsearch.
Q[1,e]: Import financial data from an S3 bucket in Parquet format, apply a standard scaler transformation, and then upload it to a Redshift database.
Q[1,f]: Retrieve temperature data from a REST API, normalize the data, and write it into a MongoDB collection.
Q[1,g]: Connect to a Google Cloud Storage, download logs in JSON format, conduct sentiment analysis, and then store the results in a Google BigQuery table.
Acceptance criteria
Definition of done (DoD)
Feature DoD:
Sprint Release DoD:
Project Release Definition of Done
DoD general criteria