HCADatalab / powderkeg

Live-coding the cluster!
Eclipse Public License 1.0
159 stars 23 forks source link

RDD dynamic partition write function #25

Open clojurians-org opened 7 years ago

clojurians-org commented 7 years ago

i wander whether has any plan on dynamic partition write function, as it is very common use case.

on cascalog, i use [templatefields and sink-template] keyword to control the dynamic partition.

      (?- (hfs-delimited "hdfs://192.168.1.3:9000/user/hive/warehouse/model.db/d_bolome_order"
                         :outfields ["?dw-dt" "?dw-ts" "?dw-src-id"
                                     "?product-dw-id" "?j-product-dw-src-id"
                                     "!!show-dw-id" "?j-show-dw-src-id"
                                     "!!preview-show-dw-id" "?j-preview-show-dw-src-id"
                                     "!!replay-show-dw-id" "?j-replay-show-dw-src-id"
                                     "?pay-dt" "?user-id" "?order-id"
                                     "?quantity" "?price" "?warehouse-id" "?coupon-id" "?event-id"
                                     "?copon-discount-amount" "?system-discount-amount" "?tax-amount" "?logistics-amount"]
                         :delimiter "\001"
                         :quote ""
                         :sinkmode :replace
                         :templatefields ["?dw-dt"]
                         :sink-template "p_dw_dt=%s"
                         :compression  :enable) $)))

currently , i convert the rdd to dataframe(by partitionBy) to complete this function.

  (as-> (keg/rdd (-> ss .read (.load "hdfs://192.168.1.3:9000/user/hive/warehouse/agg.db/d_bolome_user_order_trgx") (.repartition 8) .rdd)
               (map #(mapv (fn [idx] (.get % idx)) (-> % .length range)))
               (mapcat (fn [[user-id-trgx-str]]
                         (let [[user-id user-trgx] (clojure.edn/read-string user-id-trgx-str)
....
                             (mapv #(RowFactory/create (into-array [(:dm-ds-kind %)  (pr-str [user-id %])])) user-shift-tkvs)) )))
      $
    (.createDataFrame ss $
                      (DataTypes/createStructType (map #(DataTypes/createStructField % DataTypes/StringType false) ["p_ds" "user-id-tkv"])))
    (.write $)
    (.partitionBy $ (into-array ["p_ds"]))
    (.format $ "parquet")
    (.mode $ SaveMode/Overwrite)
    (.save $ "hdfs://192.168.1.3:9000/user/hive/warehouse/mlin.db/d_bolome_user_order"))

the rdd has the saveAsHadoopFile and MultipleTextOutputFormat class, but it need to exended, so it's very inconvenient. http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job

cgrand commented 7 years ago

Isn't the :key option in keg/by-key what you are looking for? (keg/by-key some-data :key (juxt :a :b))

clojurians-org commented 7 years ago

it's about the hdfs directory when save file to disk for mapping hive table, not about the data. such as /user/hive/warehouse/model/db/tableA/a=1/b=3/part-0000 /user/hive/warehouse/model/db/tableA/a=2/b=3/part-0000 ....

create external table tableA ( .... ) partitioned by (a string, b string) stored as textfile ;