slingdata-io / sling-cli

Sling is a CLI tool that extracts data from a source storage/database and loads it in a target storage/database.
https://docs.slingdata.io
GNU General Public License v3.0
433 stars 34 forks source link

Converting MySQL Bit(1) data type to String #96

Closed ShahBinoy closed 10 months ago

ShahBinoy commented 10 months ago

I am trying to perform EL from MySQL DB to S3 Parquet with default values set The sync task fails with error

2024-01-02 22:28:43 DBG type is db-file                                                                                                                                                       
2024-01-02 22:28:43 DBG using source options: {"empty_as_null":true,"null_if":"NULL","datetime_format":"AUTO","max_decimals":-1}                                                              
2024-01-02 22:28:43 DBG using target options: {"header":true,"compression":"snappy","concurrency":4,"datetime_format":"auto","delimiter":",","file_max_rows":100000,"format":"parquet","max_decimals":-1,"use_bulk":true,"add_new_columns":true,"column_casing":"source"}
2024-01-02 22:28:43 INF connecting to source database (mysql)                                                                                                                                 
2024-01-02 22:28:43 INF reading from source database                                                                                                                                          
2024-01-02 22:28:43 DBG select * from `acadia`.`form_field_entry_value`                                                                                                                       
2024-01-02 22:28:43 INF writing to target file system (s3)                                                                                                                                    
2024-01-02 22:28:43 DBG writing to s3://mybucket/data_landing/Tables/form_field_entry_value/2024_01_02/part.01 [fileRowLimit=100000 fileBytesLimit=0 compression=SNAPPY 
2024-01-02 22:28:43 DBG column type change for value_as_boolean (smallint to string)                                                                                                          
2024-01-02 22:28:43 INF execution failed                                                                                                                                                      
fatal:                                                                                                                                                                                        
--- sling.go:421 func1 ---                                                                                                                                                                    
--- sling.go:462 cliInit ---                                                                                                                                                                  
--- cli.go:283 CliProcess ---                                                                                                                                                                 
~ failure running task (see docs @ https://docs.slingdata.io/sling-cli)                                                                                                                       
--- sling_logic.go:191 processRun ---                                                                                                                                                         
--- sling_logic.go:282 runTask ---                                                                                                                                                            
~ execution failed                                                                                                                                                                            
--- task_run.go:128 Execute ---                                                                                                                                                               
--- datastream.go:622 func4 ---                                                                                                                                                               
context canceled                                                                                                                                                                              

~ Failed to scan                                                                                                                                                                              
--- datastream.go:619 func4 ---                                                                                                                                                               
context canceled                                                                                                                                                                              

~ error writing row                                                                                                                                                                           
--- datastream.go:1552 func1 ---                                                                                                                                                              
unsupported type for storing in int32 column: string =>                                                                                                                                       

~ error writing row                                                                                                                                                                           
--- datastream.go:1552 func1 ---                                                                                                                                                              
unsupported type for storing in int32 column: string =>                                                                                                                                       

--- fs.go:759 func1 ---                                                                                                                                                                       
context canceled                                                                                                                                                                              

~ error writing row                                                                                                                                                                           
--- datastream.go:1552 func1 ---                                                                                                                                                              
unsupported type for storing in int32 column: string =>

context canceled                                                                                                                                                                              

~ error writing row                                                                                                                                                                           
--- datastream.go:1552 func1 ---                                                                                                                                                              
unsupported type for storing in int32 column: string =>                                                                                                                                       

context canceled                                                                                                                                                                              

--- task_run.go:87 func1 ---                                                                                                                                                                  
--- task_run.go:277 runDbToFile ---                                                                                                                                                           
--- task_run_write.go:47 WriteToFile ---                                                                                                                                                      
~ Could not WriteToFile                                                                                                                                                                       
--- fs.go:593 WriteDataflow ---                                                                                                                                                               
~ Could not FileSysWriteDataflow                                                                                                                                                              
--- fs.go:802 WriteDataflowReady ---                                                                                                                                                          
--- datastream.go:622 func4 ---                                                                                                                                                               
context canceled                                                                                                                                                                              

~ Failed to scan                                                                                                                                                                              
--- datastream.go:619 func4 ---                                                                                                                                                               
context canceled                                                                                                                                                                              

~ error writing row                                                                                                                                                                           
--- datastream.go:1552 func1 ---                                                                                                                                                              
unsupported type for storing in int32 column: string =>                                                                                                                                       

~ error writing row                                                                                                                                                                           
--- datastream.go:1552 func1 ---                                                                                                                                                              
unsupported type for storing in int32 column: string =>                                                                                                                                       

--- fs.go:759 func1 ---                                                                                                                                                                       
context canceled                                                                                                                                                                              

~ error writing row                                                                                                                                                                           
--- datastream.go:1552 func1 ---                                                                                                                                                              
unsupported type for storing in int32 column: string =>                                                                                                                                       

context canceled

I am also attaching the source mysql data file

form_field_entry_value.sql.zip

How can I handle this failure?

flarco commented 10 months ago

Hi, it seems the column value_as_boolean is inferred as int but should be string.

Two things you can try:

See https://docs.slingdata.io/sling-cli/run/configuration for more details on both.

flarco commented 10 months ago

Actually, since it’s MySQL source, the types are directly transferred from the database column type in the SELECT result-set. So SAMPLE_SIZE would not work… Two second option should work though (columns). Let me know how that goes.

flarco commented 10 months ago

I’m curious now, what is the MySQL type for value_as_boolean? I didn’t think MySQL supported bools (stored as TINYINT right?).

ShahBinoy commented 10 months ago

I’m curious now, what is the MySQL type for value_as_boolean? I didn’t think MySQL supported bools (stored as TINYINT right?).

Here is the ddl for the table

CREATE TABLE `form_field_entry_value` (
  `id` bigint NOT NULL,
  `value_as_number` double DEFAULT NULL,
  `value_as_boolean` bit(1) DEFAULT NULL,
  `value_as_string` longtext,
  `value_as_file` varchar(1024) DEFAULT NULL,
  `file_name` varchar(255) DEFAULT NULL,
  `file_size` bigint DEFAULT NULL,
  `unit_id` bigint DEFAULT NULL,
  `form_field_entry_id` bigint NOT NULL,
  `score` double DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;

going to try the suggestion that you provided

Update:

Since my code auto-generates the task file, it makes it really hacky to inject a deviant column name for handling manual type conversion.

Is there a fix intended to handle the bit(1) kind of dataset to map it to string ?

flarco commented 10 months ago

Ok I pushed https://github.com/flarco/dbio/commit/67d2b58187659ee9cd9707fe9bf81706ccd133bc And https://github.com/slingdata-io/sling-cli/commit/bb455a28a949791a56f9bf815939cc16766a1151 Can you build locally and test? I can release once you confirm.

ShahBinoy commented 10 months ago

Ok I pushed flarco/dbio@67d2b58 And bb455a2 Can you build locally and test? I can release once you confirm.

So I tested it, and it does not fail but the output in parquet has issues

When I load it in duckdb it appears as below:

D select * from user_profile_value;

┌─────────┬──────────────────────┬──────────────────────┬──────────────────────┬───────────────┬──────────────────┬───┬───────────────┬───────────────┬────────────┬───────────────┬───────────────┐
│   id    │ user_profile_field…  │   value_as_string    │   value_as_number    │ value_as_file │ value_as_boolean │ … │  created_on   │  updated_on   │ is_deleted │ created_by_id │ updated_by_id │
│  int64  │        int64         │       varchar        │    decimal(24,9)     │    varchar    │     varchar      │   │     int64     │     int64     │  varchar   │     int64     │     int64     │
├─────────┼──────────────────────┼──────────────────────┼──────────────────────┼───────────────┼──────────────────┼───┼───────────────┼───────────────┼────────────┼───────────────┼───────────────┤
│       1 │               514890 │ Organization/xxxxx…  │          0.000000000 │               │                  │ … │ 1524778226035 │ 1524778226035 │ \0         │         50338 │         50338 │
│       2 │               514891 │ Site/joinallofus     │          0.000000000 │               │                  │ … │               │ 1524778226074 │ \0         │               │         50338 │
│   17988 │                14628 │ 33 Test Lane         │          0.000000000 │               │                  │ … │ 1496236234314 │ 1496236234314 │ \0         │               │               │
│   17989 │                14629 │                      │          0.000000000 │               │                  │ … │ 1496236239864 │ 1496236239864 │ \0         │               │               │
│   17990 │                14630 │ Pittsburgh           │          0.000000000 │               │                  │ … │ 1496236239978 │ 1496236239978 │ \0         │               │               │
│   17991 │                14631 │ 15117                │          0.000000000 │               │                  │ … │ 1496236240415 │ 1496236240415 │ \0         │               │               │
│   17992 │                14632 │ PIIState_PA          │          0.000000000 │               │                  │ … │ 1496236240873 │ 1496236240873 │ \0         │               │               │
│   17993 │                14633 │ 1212121223           │          0.000000000 │               │                  │ … │ 1496236241227 │ 1496236241227 │ \0         │               │               │
│   17994 │                14634 │                      │  358111053257088872… │               │                  │ … │ 1496236241786 │ 1496236241786 │ \0         │               │               │
│   17995 │                14635 │ Organization/TEST    │          0.000000000 │               │                  │ … │ 1496236242365 │ 1668590623338 │ \0         │               │               │
│   17999 │                14639 │                      │          0.000000000 │               │ \0               │ … │ 1496236242832 │ 1496236242832 │ \0         │               │               │
│   18000 │                14640 │ Male                 │          0.000000000 │               │                  │ … │ 1496236831888 │ 1496236831888 │ \0         │               │               │
│   18001 │                14636 │                      │          0.000000000 │               │                  │ … │ 1496236831929 │ 1496236831929 │ \0         │               │               │

D select distinct value_as_boolean from user_profile_value;
┌──────────────────┐
│ value_as_boolean │
│     varchar      │
├──────────────────┤
│                  │
│ \0               │
│                  │
└──────────────────┘

When I try to open it with Pycharm Big Data Tool, it throws error

NumberFormatException: Zero length BigInteger

java.lang.NumberFormatException: Zero length BigInteger
    at java.base/java.math.BigInteger.<init>(BigInteger.java:312)
    at java.base/java.math.BigInteger.<init>(BigInteger.java:340)
    at com.jetbrains.bigdatatools.common.rfs.localcache.analyzing.parquet.DecimalBinaryPrinter.printCurrentValue(ValuePrinters.kt:70)
    at com.jetbrains.bigdatatools.common.rfs.localcache.analyzing.parquet.ColumnIterator.readValue(Iterators.kt:123)
    at com.jetbrains.bigdatatools.common.rfs.localcache.analyzing.parquet.ColumnIterator.next(Iterators.kt:89)
    at com.jetbrains.bigdatatools.common.rfs.localcache.analyzing.parquet.ColumnIterator.next(Iterators.kt:61)
    at com.jetbrains.bigdatatools.common.rfs.localcache.analyzing.parquet.CompoundIterator.next(Iterators.kt:18)
    at com.jetbrains.bigdatatools.common.rfs.localcache.analyzing.parquet.TotalSizeLimitingIterator.<init>(Iterators.kt:29)
    at com.jetbrains.bigdatatools.common.rfs.util.RfsFileUtil.writeDecompiledTableToFile(RfsFileUtil.kt:71)
    at com.jetbrains.bigdatatools.common.rfs.localcache.analyzing.parquet.ParquetPartsDownloader.downloadFullPage(ParquetAnalyzers.kt:505)
    at com.jetbrains.bigdatatools.common.rfs.localcache.filetypes.ParquetContentDownloader.download(ParquetContentDownloader.kt:46)
    at com.jetbrains.bigdatatools.common.rfs.localcache.RfsFileContentManager.downloadFilePart$lambda$8(RfsFileContentManager.kt:200)
    at com.intellij.openapi.progress.ProgressManager.lambda$runProcess$0(ProgressManager.java:73)
    at com.intellij.openapi.progress.impl.CoreProgressManager.lambda$runProcess$1(CoreProgressManager.java:192)
    at com.intellij.openapi.progress.impl.CoreProgressManager.lambda$executeProcessUnderProgress$12(CoreProgressManager.java:610)
    at com.intellij.openapi.progress.impl.CoreProgressManager.registerIndicatorAndRun(CoreProgressManager.java:685)
    at com.intellij.openapi.progress.impl.CoreProgressManager.computeUnderProgress(CoreProgressManager.java:641)
    at com.intellij.openapi.progress.impl.CoreProgressManager.executeProcessUnderProgress(CoreProgressManager.java:609)
    at com.intellij.openapi.progress.impl.ProgressManagerImpl.executeProcessUnderProgress(ProgressManagerImpl.java:78)
    at com.intellij.openapi.progress.impl.CoreProgressManager.runProcess(CoreProgressManager.java:179)
    at com.intellij.openapi.progress.ProgressManager.runProcess(ProgressManager.java:73)
    at com.jetbrains.bigdatatools.common.rfs.localcache.RfsFileContentManager.downloadFilePart(RfsFileContentManager.kt:197)
    at com.jetbrains.bigdatatools.common.rfs.localcache.RfsFileContentManager.getOrDownload(RfsFileContentManager.kt:130)
    at com.jetbrains.bigdatatools.common.rfs.localcache.RfsFileContentManager.getOrDownload$default(RfsFileContentManager.kt:98)
    at com.jetbrains.bigdatatools.common.rfs.localcache.RfsFileContentManager.getContentWithOffsets(RfsFileContentManager.kt:60)
    at com.jetbrains.bigdatatools.common.rfs.localcache.RfsFileContentManager.getContentWithOffsets$default(RfsFileContentManager.kt:56)
    at com.jetbrains.bigdatatools.common.rfs.view.GeneralContentTypeViewerBase.loadContent(GeneralContentTypeViewerBase.kt:22)
    at com.jetbrains.bigdatatools.common.rfs.view.RfsAsyncFileTypeViewer$openViewer$1.invoke(RfsAsyncFileTypeViewer.kt:27)
    at com.jetbrains.bigdatatools.common.rfs.view.RfsAsyncFileTypeViewer$openViewer$1.invoke(RfsAsyncFileTypeViewer.kt:25)
    at com.jetbrains.bigdatatools.common.util.ThreadUtilsKt.executeOnPooledThread$lambda$3(ThreadUtils.kt:40)
    at com.intellij.openapi.application.impl.ApplicationImpl$2.run(ApplicationImpl.java:249)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at com.intellij.util.concurrency.ContextCallable.call(ContextCallable.java:32)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at com.intellij.util.concurrency.ContextRunnable.run(ContextRunnable.java:27)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.util.concurrent.Executors$PrivilegedThreadFactory$1$1.run(Executors.java:702)
    at java.base/java.util.concurrent.Executors$PrivilegedThreadFactory$1$1.run(Executors.java:699)
    at java.base/java.security.AccessController.doPrivileged(AccessController.java:399)
    at java.base/java.util.concurrent.Executors$PrivilegedThreadFactory$1.run(Executors.java:699)
    at java.base/java.lang.Thread.run(Thread.java:840)
    at java.base/java.lang.Thread.getStackTrace(Thread.java:1619)
    at com.jetbrains.bigdatatools.common.rfs.driver.fileinfo.ErrorResult.throwException(SafeResut.kt:66)
    at com.jetbrains.bigdatatools.common.rfs.driver.fileinfo.SafeResult.resultOrThrow(SafeResut.kt:10)
    at com.jetbrains.bigdatatools.common.rfs.view.RfsAsyncFileTypeViewer$openViewer$1.invoke(RfsAsyncFileTypeViewer.kt:27)
    at com.jetbrains.bigdatatools.common.rfs.view.RfsAsyncFileTypeViewer$openViewer$1.invoke(RfsAsyncFileTypeViewer.kt:25)
    at com.jetbrains.bigdatatools.common.util.ThreadUtilsKt.executeOnPooledThread$lambda$3(ThreadUtils.kt:40)
    at com.intellij.openapi.application.impl.ApplicationImpl$2.run(ApplicationImpl.java:249)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at com.intellij.util.concurrency.ContextCallable.call(ContextCallable.java:32)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at com.intellij.util.concurrency.ContextRunnable.run(ContextRunnable.java:27)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.util.concurrent.Executors$PrivilegedThreadFactory$1$1.run(Executors.java:702)
    at java.base/java.util.concurrent.Executors$PrivilegedThreadFactory$1$1.run(Executors.java:699)
    at java.base/java.security.AccessController.doPrivileged(AccessController.java:399)
    at java.base/java.util.concurrent.Executors$PrivilegedThreadFactory$1.run(Executors.java:699)
    at java.base/java.lang.Thread.run(Thread.java:840)

So it kind of works 👍🏼 but incorrectly :-/ .

Attaching the parquet file if needed part.01.0001.parquet.zip

flarco commented 10 months ago

Ok, testing on my MySQL intance, I did the following:

CREATE TABLE `form_field_entry_value` (
  `value_as_boolean` bit(1) DEFAULT NULL,
  `score` double DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;

insert into form_field_entry_value values (0, 0);
insert into form_field_entry_value values (1, 0);
insert into form_field_entry_value values (2, 0);
insert into form_field_entry_value values ('a', 0);

Then doing select * from form_field_entry_value; gives:

 value_as_boolean | score 
------------------+-------
 \x00             |     0 
 \x01             |     0 
 \x01             |     0 
 \x01             |     0 

So, it's binary (either 0 or 1).

Further testing inside Go, I'm getting raw values of \x00 or \x01, which have no string representation. When converting to string, it's an empty space. Only when I print as binary mode (fmt.Printf("%b", val) ) does it show 0 or 1.

So what I did is added a parse_bit transform (https://github.com/slingdata-io/sling-cli/pull/99/commits/de0553ecb5d65a529ed33252bd3ae70bd4bb8dbe) which is used by default when source is MySQL (https://github.com/slingdata-io/sling-cli/pull/99/commits/ce07058814fd59321d7a8fa26c07e57c1701d1da).

Can you re-compile and give it another shot?

ShahBinoy commented 10 months ago

Ok, testing on my MySQL intance, I did the following:

CREATE TABLE `form_field_entry_value` (
  `value_as_boolean` bit(1) DEFAULT NULL,
  `score` double DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;

insert into form_field_entry_value values (0, 0);
insert into form_field_entry_value values (1, 0);
insert into form_field_entry_value values (2, 0);
insert into form_field_entry_value values ('a', 0);

Then doing select * from form_field_entry_value; gives:

 value_as_boolean | score 
------------------+-------
 \x00             |     0 
 \x01             |     0 
 \x01             |     0 
 \x01             |     0 

So, it's binary (either 0 or 1).

Further testing inside Go, I'm getting raw values of \x00 or \x01, which have no string representation. When converting to string, it's an empty space. Only when I print as binary mode (fmt.Printf("%b", val) ) does it show 0 or 1.

So what I did is added a parse_bit transform (de0553e) which is used by default when source is MySQL (ce07058).

Can you re-compile and give it another shot?

So I tested it again and it works

via Polars Data frame

pldsc:DataFrame = plds.collect()
pldsc.select(pl.col('value_as_boolean'))
shape: (5_793_245, 1)
┌──────────────────┐
│ value_as_boolean │
│ ---              │
│ str              │
╞══════════════════╡
│ null             │
│ null             │
│ null             │
│ null             │
│ …                │
│ null             │
│ 1                │
│ 0                │
│ null             │
└──────────────────┘

And via DuckDB

D select id, value_as_string, value_as_number, value_as_boolean from user_profile_value;
┌─────────┬────────────────────────────┬──────────────────────────────────┬──────────────────┐
│   id    │      value_as_string       │         value_as_number          │ value_as_boolean │
│  int64  │          varchar           │          decimal(24,9)           │     varchar      │
├─────────┼────────────────────────────┼──────────────────────────────────┼──────────────────┤
│       1 │ Organization/xxxxxxxxxxxxx │                      0.000000000 │                  │
│       2 │ Site/joinallofus           │                      0.000000000 │                  │
│   17988 │ 33 Test Lane               │                      0.000000000 │                  │
│   17989 │                            │                      0.000000000 │                  │
│   17990 │ Pittsburgh                 │                      0.000000000 │                  │
│   17991 │ 15117                      │                      0.000000000 │                  │
│   17992 │ PIIState_PA                │                      0.000000000 │                  │
│   17993 │ 1212121223                 │                      0.000000000 │                  │
│   17994 │                            │ 3581110532570888727442.108723248 │                  │
│   17995 │ Organization/TEST          │                      0.000000000 │                  │
│   17999 │                            │                      0.000000000 │ 0                │
│   18000 │ Male                       │                      0.000000000 │                  │
│   18001 │                            │                      0.000000000 │ 1                │
│   18002 │                            │                      0.000000000 │ 1                │
│   18003 │                            │                      0.000000000 │ 1                │

So it works.

Side Note, the Parquet file loading in Pycharm via Jetbrains Big Data Tool still fails, but since I could parse the Parquet file via 2 tools DuckDb and Python+Polars, I think this solution works.

Thank you @flarco for quick turn around.

flarco commented 10 months ago

@ShahBinoy hey, can you try again? Pushed changes to use https://github.com/parquet-go/parquet-go which is the better parquet lib in go. The one sling is currently using (https://github.com/fraugster/parquet-go) is unmaintained.

ShahBinoy commented 10 months ago

@ShahBinoy hey, can you try again? Pushed changes to use https://github.com/parquet-go/parquet-go which is the better parquet lib in go. The one sling is currently using (https://github.com/fraugster/parquet-go) is unmaintained.

So ran the new code.

Parquet file can be opened in Pycharm Big Data Tool

image

Can be loaded from DuckDb

D select id, value_as_string, value_as_number, value_as_boolean from user_profile_value;
100% ▕████████████████████████████████████████████████████████████▏
┌─────────┬────────────────────────────┬─────────────────┬──────────────────┐
│   id    │      value_as_string       │ value_as_number │ value_as_boolean │
│  int64  │          varchar           │     varchar     │     varchar      │
├─────────┼────────────────────────────┼─────────────────┼──────────────────┤
│       1 │ Organization/xxxxxxxxxxxxx │                 │                  │
│       2 │ Site/joinallofus           │                 │                  │
│   17994 │                            │ -314647200000   │                  │
│   17995 │ Organization/TEST          │                 │                  │
│   17999 │                            │                 │ 0                │
│   18000 │ Male                       │                 │                  │
│   18001 │                            │                 │ 1                │
│   18002 │                            │                 │ 1                │
│   18003 │                            │                 │ 1                │
│   18004 │ sdfsdfasdf                 │                 │                  │

And Read through Polars DataFrame

pldsc:DataFrame = plds.collect()
pldsc.select(pl.col('value_as_boolean'))
shape: (5_793_245, 1)
┌──────────────────┐
│ value_as_boolean │
│ ---              │
│ str              │
╞══════════════════╡
│                  │
│                  │
│                  │
│                  │
│ …                │
│                  │
│ 1                │
│ 0                │
│                  │
└──────────────────┘

And no perceivable drop in performance ( for smaller datasets )

👍🏼 @flarco

ShahBinoy commented 10 months ago

@flarco any ETA on 1.0.68 ?

flarco commented 10 months ago

Waiting on @nixent in https://github.com/slingdata-io/sling-cli/issues/92 Will give it another day or so.

flarco commented 10 months ago

v1.0.68 Released!