getElementsByName / tmp-doc

0 stars 0 forks source link

spark sample #17

Open getElementsByName opened 3 years ago

getElementsByName commented 3 years ago

pyspark

os.environ["HADOOP_USER_NAME"] = "hdfs" sc=SparkContext(master="local[4]").getOrCreate() sc.setLogLevel('ERROR') spark = SparkSession(sc) print("==========[python start]===========") with sc, spark:

lst=np.random.randint(0,10,20)

A=sc.parallelize(lst)

print(A)

my_range = spark.range(1000).toDF("number")

print(my_range.count())

text_file_df = spark.read.json(f"file:///{os.path.dirname(os.path.realpath(file))}/test.json") print(f'count: {text_file_df.count()}') text_file_df.show()

id_window = Window.partitionBy(*{"id", "value"}).orderBy(desc('value')) \ .rowsBetween(Window.unboundedPreceding, Window.currentRow)

df_with_window = text_file_df.withColumn("rank", row_number().over(id_window))

df_with_window.show()


- `test.json`
```json
{"id": "1", "value": "v1"}
{"id": "2"}
{"id": "3", "value": "v3"}
{"id": "1", "value": "v1"}
{"id": "1", "value": "v2"}
{"id": "1", "value": "v3"}
{"id": "1", "value": "v4"}
getElementsByName commented 3 years ago

rdd

groupby host level

import json

def jsonKeyValueMap(r):
    j = json.loads(r)
    return (j["place_id"], [j])

rdd = df.toJSON() \
    .map( # (key, [value])
        jsonKeyValueMap
    ).reduceByKey( # (key, [v1, v2, v3])
        lambda p,q: p + q
    )
rdd.toDebugString()

rddtaken = rdd.take(5)
print(rddtaken)

import json

def send_to_kafka(data_list):
    producer = get_kafka_producer()
    for data in data_list:
        (key, value_json) = data
        producer.produce(producing_topic_name, key=str(key), value=str(value_json), on_delivery=acked)
        producer.poll(0)
    producer.flush()

rdd.foreachPartition(send_to_kafka)
getElementsByName commented 3 years ago

dataframe

join

visits_base_receipts_join_df = filtered_receipts_df.join(filtered_visits_df, 
                          filtered_receipts_df["receipt_id"] == filtered_visits_df["visit_behavior_receipt_id"],
                          "right_outer"
                         )

visits_base_receipts_join_df.printSchema()

aggregation

groupby

result_df = join_result_df.where(
        col("place_id").isNotNull()
    ).groupBy(
        col("place_id")
    ).agg(
        collect_list(col("receipt_id")).alias('receipt_id_list'),
        collect_list(col("body_len")).alias('body_len_list'),
        collect_list(col("author_id")).alias('author_id_list'),
        collect_list(col("rating")).alias('rating_list'),
        collect_list(col("status")).alias('status_list'),
        collect_list(col("update_time")).alias('update_time_list'),
        collect_list(col("receipt_url")).alias('receipt_url_list'),
    )

count

result_df = join_result_df.groupBy(col("place_id")).agg(
    count("receipt_id").alias('count'),
).orderBy(desc("count"))