Not all partitions are imported into datomic from mysql #5

uchouhan commented 8 years ago

When I run the following job (tried multiple times) it seems to import certain ranges of mysql ids but not all (for example ids 1-16538, 104023-164xxx, 1060xxx-1640xxx made it but the ones in b/w did not), not sure if its an issue with the partition setup in my code (was trying to build on the onyx-example in the github repo)

(ns datomic-mysql-transfer.core (:require [clojure.java.jdbc :as jdbc] [datomic.api :as d] [onyx.plugin.datomic] [onyx.plugin.sql] [onyx.api]) (:import [com.mchange.v2.c3p0 ComboPooledDataSource]))

;;;;;;;; First, some set up work for SQL ;;;;;;;;;;;;;

;;; Def some top-level constants to use below

(def db-name "optimis_development")

(def classname "com.mysql.jdbc.Driver")

(def subprotocol "mysql")

(def subname (format "//" db-name))

(def user "root")

(def password "")

;;; Throughput knob that you can tune (def batch-size 20)

;;; The table to read out of (def table :patients)

;;; A monotonically increasing integer to partition the table by (def id-column :id)

;;; JDBC spec to connect to MySQL (def db-spec {:classname classname :subprotocol subprotocol :subname subname :user user :password password})

;;; Create a pool of connections for the virtual peers of Onyx to share (defn pool [spec] {:datasource (doto (ComboPooledDataSource.) (.setDriverClass (:classname spec)) (.setJdbcUrl (str "jdbc:" (:subprotocol spec) ":" (:subname spec))) (.setUser (:user spec)) (.setPassword (:password spec)) (.setMaxIdleTimeExcessConnections (* 30 60)) (.setMaxIdleTime (* 3 60 60)))})

;;; Create the pool (def conn-pool (pool db-spec))

;;;;;;;; Next, some set up work for Datomic ;;;;;;;;;;;;;

;;; The URI for the Datomic database that we'll write to (def db-uri (str "datomic:sql://adb3?jdbc:mysql://localhost:3306/datomic?user=datomic&password=datomic"))

;;; The schema of the database. A user's name and age, semantic ;;; equivalent of the MySQL schema. (def schema [{:db/id #db/id [:db.part/db] :db/ident :com.optimispt/patients :db.install/_partition :db.part/db} {:db/id #db/id [:db.part/db] :db/ident :patient/id :db/valueType :db.type/long :db/unique :db.unique/identity :db/cardinality :db.cardinality/one :db.install/_attribute :db.part/db} {:db/id #db/id [:db.part/db] :db/ident :patient/practice-id :db/valueType :db.type/long :db/cardinality :db.cardinality/one :db.install/_attribute :db.part/db} {:db/id #db/id [:db.part/db] :db/ident :patient/photo-id :db/valueType :db.type/long :db/cardinality :db.cardinality/one :db.install/_attribute :db.part/db} {:db/id #db/id [:db.part/db] :db/ident :patient/suffix :db/valueType :db.type/string :db/cardinality :db.cardinality/one :db.install/_attribute :db.part/db} {:db/id #db/id [:db.part/db] :db/ident :patient/first-name :db/valueType :db.type/string :db/fulltext true :db/cardinality :db.cardinality/one :db.install/_attribute :db.part/db} {:db/id #db/id [:db.part/db] :db/ident :patient/middle-name :db/valueType :db.type/string :db/fulltext true :db/cardinality :db.cardinality/one :db.install/_attribute :db.part/db} {:db/id #db/id [:db.part/db] :db/ident :patient/last-name :db/valueType :db.type/string :db/fulltext true :db/cardinality :db.cardinality/one :db.install/_attribute :db.part/db} {:db/id #db/id [:db.part/db] :db/ident :patient/birth-date :db/valueType :db.type/instant :db/index true :db/cardinality :db.cardinality/one :db.install/_attribute :db.part/db} {:db/id #db/id [:db.part/db] :db/ident :patient/gender :db/valueType :db.type/string :db/cardinality :db.cardinality/one :db.install/_attribute :db.part/db} {:db/id #db/id [:db.part/db] :db/ident :patient/ssn :db/valueType :db.type/string :db/cardinality :db.cardinality/one :db/fulltext true :db.install/_attribute :db.part/db} {:db/id #db/id [:db.part/db] :db/ident :patient/uuid :db/valueType :db.type/uuid :db/cardinality :db.cardinality/one :db.install/_attribute :db.part/db}])

;;; Create the DB, connect to it, and transact the schema (d/create-database db-uri)

(def datomic-conn (d/connect db-uri))

@(d/transact datomic-conn schema)

;;;;;;;;;;;; Next, we run the Onyx job to transfer the data ;;;;;;;;;;;;;; (def id (java.util.UUID/randomUUID))

(def env-config {:zookeeper/address "" :zookeeper/server? true :zookeeper.server/port 2188 :onyx/id id})

(def peer-config {:zookeeper/address "" :onyx/id id :onyx.peer/job-scheduler :onyx.job-scheduler/balanced :onyx.messaging/impl :aeron :onyx.messaging/peer-port 40200 :onyx.messaging/bind-addr "localhost"})

(def env (onyx.api/start-env env-config))

(def peer-group (onyx.api/start-peer-group peer-config))

;;; Partition the MySQL table by ID column, parallel read the rows, ;;; do a semantic transformation, write to Datomic. (def workflow [[:partition-keys :read-rows] [:read-rows :prepare-datoms] [:prepare-datoms :write-to-datomic]])

(def n-peers (count (set (mapcat identity workflow))))

(def v-peers (onyx.api/start-peers n-peers peer-group))

(def catalog [{:onyx/name :partition-keys :onyx/plugin :onyx.plugin.sql/partition-keys :onyx/type :input :onyx/medium :sql :sql/classname classname :sql/subprotocol subprotocol :sql/subname subname :sql/user user :sql/password password :sql/table table :sql/id id-column :sql/rows-per-segment 1000 :onyx/batch-size batch-size :onyx/max-peers 1 :onyx/doc "Partitions a range of primary keys into subranges"}

{:onyx/name :read-rows :onyx/fn :onyx.plugin.sql/read-rows :onyx/type :function :sql/classname classname :sql/subprotocol subprotocol :sql/subname subname :sql/user user :sql/password password :sql/table table :sql/id id-column :onyx/batch-size batch-size :onyx/doc "Reads rows of a SQL table bounded by a key range"}

{:onyx/name :prepare-datoms :onyx/fn :datomic-mysql-transfer.core/prepare-datoms :onyx/type :function :onyx/batch-size batch-size :onyx/doc "Semantically transform the SQL rows to Datomic datoms"}

{:onyx/name :write-to-datomic :onyx/plugin :onyx.plugin.datomic/write-bulk-datoms :onyx/type :output :onyx/medium :datomic :datomic/uri db-uri :datomic/partition :com.optimispt/patients :onyx/batch-size batch-size :onyx/doc "Transacts segments to storage"}])

(defn non-nil-datoms [segment](into {} %28remove #%28nil? %28val %%29%29 {:patient/id %28:id segment%29 :patient/practice-id %28:practice_id segment%29 :patient/suffix %28:suffix segment%29 :patient/first-name %28:first_name segment%29 :patient/middle-name %28:middle_name segment%29 :patient/last-name %28:last_name segment%29 :patient/birth-date %28:birth_date segment%29 :patient/gender %28:gender segment%29 :patient/ssn %28:gender segment%29 :patient/photo-id %28:photo_id segment%29}%29))

;;; We need to prepare the datoms before we send it to the Datomic plugin. ;;; Set the temp ids and batch the segments into the :datoms key. (defn prepare-datoms [segment] {:tx [(merge {:db/id (d/tempid :com.optimispt/patients)} (non-nil-datoms segment))]})

(def lifecycles [{:lifecycle/task :partition-keys :lifecycle/calls :onyx.plugin.sql/partition-keys-calls} {:lifecycle/task :read-rows :lifecycle/calls :onyx.plugin.sql/read-rows-calls} {:lifecycle/task :write-to-datomic :lifecycle/calls :onyx.plugin.datomic/write-bulk-tx-calls}])

;;; And off we go! (def job-id (:job-id (onyx.api/submit-job peer-config {:catalog catalog :workflow workflow :lifecycles lifecycles :task-scheduler :onyx.task-scheduler/balanced})))

;;; Block until the job is done, then check Datomic (onyx.api/await-job-completion peer-config job-id)

;;; Aaaaaand stop! (doseq [v-peer v-peers](onyx.api/shutdown-peer v-peer))

(onyx.api/shutdown-peer-group peer-group)

(onyx.api/shutdown-env env)


uchouhan commented 8 years ago

cc: @MichaelDrogalis @lbradstreet

MichaelDrogalis commented 8 years ago

I'll check it out after the Conj wraps up. Thanks!

uchouhan commented 8 years ago

I added :onyx/batch-timeout with a higher value but that does not make a difference. Also, added a prn statement in the write-batch function

(defrecord DatomicWriteBulkDatoms [conn]
[_ event](function/read-batch event))

[_ event]
;; Transact each tx individually to avoid tempid conflicts.
(doseq [tx (mapcat :leaves (:tree (:onyx.core/results event)))](prn %28:tx %28:message tx%29%29)
@(d/transact conn (:tx (:message tx))))
{:onyx.core/written? true})

[ ]

and it seems to skip a significant number of ranges (like shown below) [{:db/id #db/id[:com.optimispt/patients -1001007], :patient/id 1000}] [{:db/id #db/id[:com.optimispt/patients -1001008], :patient/id 1001}] [{:db/id #db/id[:com.optimispt/patients -1001009], :patient/id 1002}] [{:db/id #db/id[:com.optimispt/patients -1001010], :patient/id 1003}] [{:db/id #db/id[:com.optimispt/patients -1001011], :patient/id 1004}] [{:db/id #db/id[:com.optimispt/patients -1036012], :patient/id 600009}] [{:db/id #db/id[:com.optimispt/patients -1036013], :patient/id 600011}] [{:db/id #db/id[:com.optimispt/patients -1036014], :patient/id 600013}] [{:db/id #db/id[:com.optimispt/patients -1036015], :patient/id 600015}] [{:db/id #db/id[:com.optimispt/patients -1036016], :patient/id 600017}]

A similar statement in the read-rows function verified that the sql is generated for all ranges.

lbradstreet commented 8 years ago

Hi @uchouhan, so you can confirm that these ranges are being generated by your tasks (i.e. in prepare-datoms)? If so, are they passed in to the datomic task? Or does write-batch never see them?

lbradstreet commented 8 years ago

Please feel free to come onto https://gitter.im/onyx-platform/onyx and chat about this with us in real time. I'm also happy to work through the issue here.

uchouhan commented 8 years ago

I've seen some of these earlier, but not always

15-Nov-19 00:12:38 Umang-104.local WARN [onyx.messaging.aeron] - java.lang.Thread.run Thread.java: 745 uk.co.real_logic.agrona.concurrent.AgentRunner.run AgentRunner.java: 105 uk.co.real_logic.aeron.ClientConductor.doWork ClientConductor.java: 113 uk.co.real_logic.aeron.ClientConductor.doWork ClientConductor.java: 295 uk.co.real_logic.aeron.ClientConductor.onCheckTimeouts ClientConductor.java: 339 uk.co.real_logic.aeron.exceptions.ConductorServiceTimeoutException: Timeout between service calls over 10000000000ns