allwefantasy / spark-binlog

A library for querying Binlog with Apache Spark structure streaming, for Spark SQL , DataFrames and [MLSQL](https://www.mlsql.tech).
Apache License 2.0
154 stars 54 forks source link

BinLogSocketServerInExecutor Error (Support MySQL BitSet Type) #25

Closed gsrikanthchow closed 4 years ago

gsrikanthchow commented 4 years ago

im trying to implement this using pypark.. But im getting the error

[Stage 0:> (0 + 1) / 1]


Batch: 2

+-----+ |value| +-----+ +-----+

[Stage 0:> (0 + 1) / 1]20/02/04 13:54:10 ERROR BinLogSocketServerInExecutor: java.lang.IllegalStateException: No ObjectCodec defined for the generator, can only serialize simple wrapper types (type passed java.util.BitSet) at com.fasterxml.jackson.core.JsonGenerator._writeSimpleObject(JsonGenerator.java:1725) at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:327) at com.fasterxml.jackson.core.JsonGenerator.writeObjectField(JsonGenerator.java:1415) at org.apache.spark.sql.mlsql.sources.mysql.binlog.io.UpdateRowsWriter.writeRow(UpdateRowsWriter.java:60) at org.apache.spark.sql.mlsql.sources.mysql.binlog.io.UpdateRowsWriter.writeEvent(UpdateRowsWriter.java:39) at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor.convertRawBinlogEventRecord(BinLogSocketServerInExecutor.scala:316) at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor$$anonfun$handleConnection$2$$anonfun$apply$4.apply(BinLogSocketServerInExecutor.scala:442) at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor$$anonfun$handleConnection$2$$anonfun$apply$4.apply(BinLogSocketServerInExecutor.scala:438) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor$$anonfun$handleConnection$2.apply(BinLogSocketServerInExecutor.scala:438) at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor$$anonfun$handleConnection$2.apply(BinLogSocketServerInExecutor.scala:437) at org.apache.spark.streaming.BinlogWriteAheadLog.read(BinlogWriteAheadLog.scala:38) at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor.handleConnection(BinLogSocketServerInExecutor.scala:437) at org.apache.spark.sql.mlsql.sources.mysql.binlog.SocketServerInExecutor$$anonfun$2.apply(servers.scala:128) at org.apache.spark.sql.mlsql.sources.mysql.binlog.SocketServerInExecutor$$anonfun$2.apply(servers.scala:127) at org.apache.spark.sql.mlsql.sources.mysql.binlog.SocketServerInExecutor$$anon$4$$anon$5.run(servers.scala:101) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 20/02/04 13:54:10 ERROR BinLogSocketServerInExecutor: java.lang.IllegalStateException: No ObjectCodec defined for the generator, can only serialize simple wrapper types (type passed java.util.BitSet) at com.fasterxml.jackson.core.JsonGenerator._writeSimpleObject(JsonGenerator.java:1725) at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:327) at com.fasterxml.jackson.core.JsonGenerator.writeObjectField(JsonGenerator.java:1415) at org.apache.spark.sql.mlsql.sources.mysql.binlog.io.UpdateRowsWriter.writeRow(UpdateRowsWriter.java:60) at org.apache.spark.sql.mlsql.sources.mysql.binlog.io.UpdateRowsWriter.writeEvent(UpdateRowsWriter.java:39) at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor.convertRawBinlogEventRecord(BinLogSocketServerInExecutor.scala:316) at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor$$anonfun$handleConnection$2$$anonfun$apply$4.apply(BinLogSocketServerInExecutor.scala:442) at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor$$anonfun$handleConnection$2$$anonfun$apply$4.apply(BinLogSocketServerInExecutor.scala:438) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor$$anonfun$handleConnection$2.apply(BinLogSocketServerInExecutor.scala:438) at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor$$anonfun$handleConnection$2.apply(BinLogSocketServerInExecutor.scala:437) at org.apache.spark.streaming.BinlogWriteAheadLog.read(BinlogWriteAheadLog.scala:38) at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor.handleConnection(BinLogSocketServerInExecutor.scala:437) at org.apache.spark.sql.mlsql.sources.mysql.binlog.SocketServerInExecutor$$anonfun$2.apply(servers.scala:128) at org.apache.spark.sql.mlsql.sources.mysql.binlog.SocketServerInExecutor$$anonfun$2.apply(servers.scala:127) at org.apache.spark.sql.mlsql.sources.mysql.binlog.SocketServerInExecutor$$anon$4$$anon$5.run(servers.scala:101) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

Pyspark shell command : pyspark --packages tech.mlsql:mysql-binlog_2.11:1.0.0,tech.mlsql:delta-plus_2.11:0.2.0,io.delta:delta-core_2.11:0.5.0 --jars /usr/share/java/mysql-connector-java.jar

df = spark.readStream. \ format("org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource"). \ option("host","xxxxxxxxxxx"). \ option("port","3306"). \ option("userName","xxxxx"). \ option("password","xxxxxxx"). \ option("databaseNamePattern","db1"). \ option("tableNamePattern","table1"). \ option("bingLogNamePrefix","mysql-binlog"). \ option("binlogIndex","123354"). \ option("binlogFileOffset","4"). \ load()

query = df.writeStream. \ format("org.apache.spark.sql.delta.sources.MLSQLDeltaDataSource"). \ option("path","/tmp/datahouse/{db}/{table}"). \ option("path","{db}/{table}"). \ option("mode","Append"). \ option("idCols","id"). \ option("duration","3"). \ option("syncType","binlog"). \ option("checkpointLocation", "/tmp/cpl-binlog2"). \ outputMode("append") \ .trigger(processingTime = "3 seconds") \ .start()

Can you please help me what is wrong with the code

allwefantasy commented 4 years ago
BinLogSocketServerInExecutor:
java.lang.IllegalStateException: No ObjectCodec defined for the generator, can only serialize simple wrapper types (type passed java.util.BitSet)

It seems that we can not serialize the column whose type is bitset. Can you show your table schema?

harishchanderramesh commented 4 years ago

Hi,

This is from the same team. Here is the table schema.

CREATE TABLE table1 ( id bigint(20) NOT NULL AUTO_INCREMENT, callguid varchar(100) NOT NULL, meeting_uuid varchar(100) NOT NULL, audio_log_status tinyint(4) NOT NULL DEFAULT '0', audio_logs tinyint(4) NOT NULL DEFAULT '0', connector_address varchar(32) DEFAULT NULL, direct_connect varchar(64) DEFAULT NULL, creation_time datetime NOT NULL, did_number varchar(32) DEFAULT NULL, disconnect_reason varchar(1024) DEFAULT NULL, participant_email varchar(75) DEFAULT NULL, endpoint varchar(32) NOT NULL, cpu_params varchar(1024) DEFAULT NULL, endpoint_sub_type varchar(255) DEFAULT NULL, instance_count int(11) DEFAULT '0', leader bit(1) NOT NULL DEFAULT b'0', local_address varchar(32) DEFAULT NULL, meeting_join_time datetime DEFAULT NULL, meeting_leave_time datetime DEFAULT NULL, name varchar(128) DEFAULT NULL, nat_detected bit(1) NOT NULL DEFAULT b'0', paired_remote_address varchar(32) DEFAULT NULL, proxy bit(1) NOT NULL DEFAULT b'0', proxy_info varchar(128) DEFAULT NULL, qualaroo_comments varchar(1024) DEFAULT NULL, qualaroo_feedback tinyint(4) NOT NULL DEFAULT '0', recording bit(1) NOT NULL DEFAULT b'0', remote_address varchar(32) DEFAULT NULL, secure_call bit(1) NOT NULL DEFAULT b'0', session_id varchar(10) DEFAULT NULL, sip_proxy_transport bit(1) NOT NULL DEFAULT b'0', uniq_id varchar(64) DEFAULT NULL, vendor varchar(255) DEFAULT NULL, version varchar(1024) DEFAULT NULL, video_share bit(1) NOT NULL DEFAULT b'0', visibility bit(1) NOT NULL DEFAULT b'0', location_id bigint(20) DEFAULT NULL, mixer_id tinyint(4) NOT NULL DEFAULT '0', roi_distance decimal(10,2) DEFAULT NULL, mpls bit(1) DEFAULT b'0', app_version varchar(45) DEFAULT NULL, browser varchar(45) DEFAULT NULL, operating_system varchar(64) DEFAULT NULL, machine_model varchar(64) DEFAULT NULL, pairing tinyint(4) DEFAULT NULL, dial_out bit(1) DEFAULT NULL, copper_address varchar(32) DEFAULT NULL, audio_play_selection_type varchar(32) DEFAULT NULL, audio_cap_selection_type varchar(32) DEFAULT NULL, reconnects smallint(6) NOT NULL DEFAULT '0', bjn_user_id int(11) DEFAULT NULL, meetme_location_id bigint(20) DEFAULT NULL, roster_absent bit(1) NOT NULL DEFAULT b'0', attr1 varchar(255) DEFAULT NULL, attr2 varchar(255) DEFAULT NULL, attr3 varchar(255) DEFAULT NULL, attr4 varchar(255) DEFAULT NULL, attr5 varchar(255) DEFAULT NULL, num1 bigint(20) DEFAULT NULL, num2 bigint(20) DEFAULT NULL, num3 bigint(20) DEFAULT NULL, num4 bigint(20) DEFAULT NULL, num5 bigint(20) DEFAULT NULL, PRIMARY KEY (id,creation_time), KEY IDX_EP_NAME1 (name), KEY app_version1 (app_version), KEY FK_MEETING_UUID_idx1 (meeting_uuid), KEY IDX_EP_PROXY1 (proxy_info), KEY machine_model1 (machine_model), KEY UQ_CALLGUID_MEETINGUUID1 (callguid,meeting_uuid,creation_time), KEY IDX_MEETING_JOIN_TIME1 (meeting_join_time), KEY IDX_EP_CALLGUID1 (callguid), KEY IDX_EP_CREATION_TIME1 (creation_time), KEY FK_EP_TO_LOCATION1 (location_id), KEY bjn_user_id1 (bjn_user_id), KEY IDX_MEETING_LEAVE_TIME1 (meeting_leave_time), KEY endpoint1 (endpoint), KEY reconnects1 (reconnects), KEY operating_system1 (operating_system), KEY qualaroo_feedback1 (qualaroo_feedback), KEY endpoint_sub_type1 (endpoint_sub_type) ) ENGINE=InnoDB AUTO_INCREMENT=1016408956 DEFAULT CHARSET=utf8 /!50100 PARTITION BY RANGE (TO_DAYS(creation_time)) (PARTITION unused VALUES LESS THAN (0) ENGINE = InnoDB, PARTITION p201806 VALUES LESS THAN (737241) ENGINE = InnoDB, PARTITION p201807 VALUES LESS THAN (737272) ENGINE = InnoDB, PARTITION p201808 VALUES LESS THAN (737303) ENGINE = InnoDB, PARTITION p201809 VALUES LESS THAN (737333) ENGINE = InnoDB, PARTITION p201810 VALUES LESS THAN (737364) ENGINE = InnoDB, PARTITION p201811 VALUES LESS THAN (737394) ENGINE = InnoDB, PARTITION p201812 VALUES LESS THAN (737425) ENGINE = InnoDB, PARTITION p201901 VALUES LESS THAN (737456) ENGINE = InnoDB, PARTITION p201902 VALUES LESS THAN (737484) ENGINE = InnoDB, PARTITION p201903 VALUES LESS THAN (737515) ENGINE = InnoDB, PARTITION p201904 VALUES LESS THAN (737545) ENGINE = InnoDB, PARTITION p201905 VALUES LESS THAN (737576) ENGINE = InnoDB, PARTITION p201906 VALUES LESS THAN (737606) ENGINE = InnoDB, PARTITION p201907 VALUES LESS THAN (737637) ENGINE = InnoDB, PARTITION p201908 VALUES LESS THAN (737668) ENGINE = InnoDB, PARTITION p201909 VALUES LESS THAN (737698) ENGINE = InnoDB, PARTITION p201910 VALUES LESS THAN (737729) ENGINE = InnoDB, PARTITION p201911 VALUES LESS THAN (737759) ENGINE = InnoDB, PARTITION p201912 VALUES LESS THAN (737790) ENGINE = InnoDB, PARTITION p202001 VALUES LESS THAN (737821) ENGINE = InnoDB, PARTITION p202002 VALUES LESS THAN (737850) ENGINE = InnoDB, PARTITION p202003 VALUES LESS THAN (737881) ENGINE = InnoDB, PARTITION p202004 VALUES LESS THAN (737911) ENGINE = InnoDB, PARTITION p202005 VALUES LESS THAN (737942) ENGINE = InnoDB, PARTITION p202006 VALUES LESS THAN (737972) ENGINE = InnoDB, PARTITION p202007 VALUES LESS THAN (738003) ENGINE = InnoDB, PARTITION p202008 VALUES LESS THAN (738034) ENGINE = InnoDB, PARTITION p202009 VALUES LESS THAN (738064) ENGINE = InnoDB, PARTITION p202010 VALUES LESS THAN (738095) ENGINE = InnoDB, PARTITION p202011 VALUES LESS THAN (738125) ENGINE = InnoDB, PARTITION p202012 VALUES LESS THAN (738156) ENGINE = InnoDB, PARTITION p202101 VALUES LESS THAN (738187) ENGINE = InnoDB, PARTITION p202102 VALUES LESS THAN (738215) ENGINE = InnoDB, PARTITION p202103 VALUES LESS THAN (738246) ENGINE = InnoDB, PARTITION p202104 VALUES LESS THAN (738276) ENGINE = InnoDB, PARTITION p202105 VALUES LESS THAN (738307) ENGINE = InnoDB, PARTITION p202106 VALUES LESS THAN (738337) ENGINE = InnoDB, PARTITION p202107 VALUES LESS THAN (738368) ENGINE = InnoDB, PARTITION p202108 VALUES LESS THAN (738399) ENGINE = InnoDB, PARTITION p202109 VALUES LESS THAN (738429) ENGINE = InnoDB, PARTITION p202110 VALUES LESS THAN (738460) ENGINE = InnoDB, PARTITION p202111 VALUES LESS THAN (738490) ENGINE = InnoDB, PARTITION p202112 VALUES LESS THAN (738521) ENGINE = InnoDB, PARTITION p202201 VALUES LESS THAN (738552) ENGINE = InnoDB, PARTITION p202202 VALUES LESS THAN (738580) ENGINE = InnoDB, PARTITION p202203 VALUES LESS THAN (738611) ENGINE = InnoDB, PARTITION p202204 VALUES LESS THAN (738641) ENGINE = InnoDB, PARTITION p202205 VALUES LESS THAN (738672) ENGINE = InnoDB, PARTITION p202206 VALUES LESS THAN (738702) ENGINE = InnoDB, PARTITION p202207 VALUES LESS THAN (738733) ENGINE = InnoDB, PARTITION p202208 VALUES LESS THAN (738764) ENGINE = InnoDB, PARTITION p202209 VALUES LESS THAN (738794) ENGINE = InnoDB, PARTITION p202210 VALUES LESS THAN (738825) ENGINE = InnoDB, PARTITION p202211 VALUES LESS THAN (738855) ENGINE = InnoDB, PARTITION p202212 VALUES LESS THAN (738886) ENGINE = InnoDB, PARTITION p202301 VALUES LESS THAN (738917) ENGINE = InnoDB, PARTITION p202302 VALUES LESS THAN (738945) ENGINE = InnoDB, PARTITION p202303 VALUES LESS THAN (738976) ENGINE = InnoDB, PARTITION p202304 VALUES LESS THAN (739006) ENGINE = InnoDB, PARTITION p202305 VALUES LESS THAN (739037) ENGINE = InnoDB, PARTITION p202306 VALUES LESS THAN (739067) ENGINE = InnoDB, PARTITION p202307 VALUES LESS THAN (739098) ENGINE = InnoDB, PARTITION p202308 VALUES LESS THAN (739129) ENGINE = InnoDB, PARTITION p202309 VALUES LESS THAN (739159) ENGINE = InnoDB, PARTITION p202310 VALUES LESS THAN (739190) ENGINE = InnoDB, PARTITION p202311 VALUES LESS THAN (739220) ENGINE = InnoDB, PARTITION p202312 VALUES LESS THAN (739251) ENGINE = InnoDB, PARTITION doom VALUES LESS THAN MAXVALUE ENGINE = InnoDB) /

allwefantasy commented 4 years ago

I guess this issue is caused by 'bit' type in MySQL. I will test it and if it's a bug then fix it.

allwefantasy commented 4 years ago

I have fixed this issue in master branch, and if there are any problems please reopen this issue or address a new issue.

You can build your own jar by yourself. Try to execute a command like the following:

 mvn clean install -DskipTests -Pdisable-java8-doclint -Prelease-sign-artifacts

In the fix, we treat bit(1) as boolean type and bit(n) where n >1 as long type.