Closed nickchomey closed 3 weeks ago
The important line in the log seems to be
ERR node stopped error="node mysql-file:mysql stopped with error: source stream was stopped unexpectedly: error reading from source: read plugin error: failed to extract schema for key: failed to create schema for value: record.ID: can't get schema for type uint64: unsupported avro type" component=pipeline.Service node_id=mysql-file:mysql stack=[{"file":"/home/runner/work/conduit/conduit/pkg/pipeline/lifecycle.go","func":"github.com/conduitio/conduit/pkg/pipeline.(*Service).runPipeline.func2","line":584},{"file":"/home/runner/work/conduit/conduit/pkg/pipeline/stream/source.go","func":"github.com/conduitio/conduit/pkg/pipeline/stream.(*SourceNode).Run","line":140},{"file":"/home/runner/work/conduit/conduit/pkg/pipeline/stream/source.go","func":"github.com/conduitio/conduit/pkg/pipeline/stream.(*SourceNode).Run.func1","line":87}]
Here is a sample OpenCDC record - i have confirmed that it reaches nats unchanged. The red semicolons look suspicious...?
{
"position": "eyJraW5kIjoic25hcHNob3QiLCJzbmFwc2hvdF9wb3NpdGlvbiI6eyJzbmFwc2hvdHMiOnsid3BfcG9zdHMiOnsibGFzdF9yZWFkIjoxMDQ1Niwic25hcHNob3RfZW5kIjoxMDQ1Nn19fX0=",
"operation": "snapshot",
"metadata": {
"conduit.source.connector.id": "mysql-file:mysql",
"mysql.serverID": "1",
"opencdc.collection": "wp_posts",
"opencdc.key.schema.subject": "mysql-file:mysql:wp_posts.key",
"opencdc.key.schema.version": "1",
"opencdc.payload.schema.subject": "mysql-file:mysql:wp_posts.payload",
"opencdc.payload.schema.version": "1",
"opencdc.readAt": "1726784960502868321";
},
"key": {
"ID": 10456;
},
"payload": {
"before": null,
"after": {
"ID": 10456,
"comment_count": 0,
"comment_status": "closed",
"guid": "https://ddev.seeingtheforest.net/?p=10456",
"menu_order": 0,
"ping_status": "closed",
"pinged": "",
"post_author": 1,
"post_content": "",
"post_content_filtered": "",
"post_date": "2024-09-19T16:29:09Z",
"post_date_gmt": "2024-09-19T22:29:09Z",
"post_excerpt": "",
"post_mime_type": "",
"post_modified": "2024-09-19T16:29:09Z",
"post_modified_gmt": "2024-09-19T22:29:09Z",
"post_name": "10455-revision-v1",
"post_parent": 10455,
"post_password": "",
"post_status": "inherit",
"post_title": "conduit post",
"post_type": "revision",
"to_ping": "";
}
}
}
Oh, surely it is because schema support was added in v0.11 and it seems like this connector doesn't yet support it? Or perhaps has a bug in the implementation.
Unfortunately, and bizarrely, when I now try to run the 0.10.3 binary, i get an error. No idea why, since it worked before. Perhaps this connector now requires v0.11.0+? I'm using the proper architecture binary...
WRN could not load standalone connector plugin error="failed to dispense connector specifier (tip: check if the file is a valid connector plugin binary and if you have permissions for running it): Unrecognized remote plugin message: \nFailed to read any lines from plugin's stdout\nThis usually means\n the plugin was not compiled for this architecture,\n the plugin is missing dynamic-link libraries necessary to run,\n the plugin is not executable by this process due to file permissions, or\n the plugin failed to negotiate the initial go-plugin protocol handshake\n\nAdditional notes about plugin:\n Path: /home/nick/tool-dev/conduit-binary/connectors/nats-pubsub\n Mode: -rwxr-xr-x\n Owner: 1002 [nick] (current: 1002 [nick])\n Group: 1002 [nick] (current: 1002 [nick])\n ELF architecture: EM_X86_64 (current architecture: amd64)\n" component=plugin.connector.standalone.Registry plugin_path=/home/nick/tool-dev/conduit-binary/connectors/nats-pubsub stack=[{"file":"/home/runner/work/conduit/conduit/pkg/plugin/connector/standalone/registry.go","func":"github.com/conduitio/conduit/pkg/plugin/connector/standalone.(*Registry).loadSpecifications","line":167}]
record.ID: can't get schema for type uint64: unsupported avro type
Seems like uint64
is not a supported avro type. It can be encoded as a fixed schema with size 8, but the problem is that we can't know for sure that a fixed schema is actually an uint64
when decoding. And since we are not decoding it in any particular struct, but rather in a map, the destination will in that case get a [8]byte
instead of uint64
.
One solution would be to create a fixed type with a logical schema attached that denotes it's actually an uint64
. I played around with this today but it's easier said than done. I'll see if I get some time next week to make it work.
That said, it's still unusual that restarting the pipeline fixes the issue 🤔 Restarting the pipeline should result in the last unprocessed record to be read again, in this case the record that caused the pipeline to stop in the first place. So I'm wondering if the mysql connector actually restarts at the correct position, or if it maybe skips the record after it's restarted. It's all speculation right now, this would have to be debugged (cc @alarbada, you might have more insights about this).
Btw, what type is the field ID
in the mysql database?
Thanks for looking into it!
Here's the structure of this table.
ID is of type bigint(20) unsigned. Here's Mysql integer types. I just checked a couple other wordpress sites and they all have the same type. This is the default, evidently, so should be accounted for here. Apparently the (20) just relates to display width when looking at the column.
Here's supported types in Avro. The fact that it is unsigned seems to be the issue, not the length (their long int is 64 bits).
Here's an issue from the underlying go-mysql-driver about uint64. Apparently they support it now.
Here's a suggestion from the Avro team about how to deal with unsigned ints.
Here's the types supported by the go avro package that is in this connector's go.mod
. It has uint64 under the same fixed
type suggested above.
Perhaps it might be an easy enough thing to fix by using the fixed type when appropriate? I'm not sure where to look, but surely your team does.
And how avro does encoding, for reference
Though, yes, there's something else weird going on in that it simply works again after restarting. But I do believe it properly accounts for the changes upon restarting. I can dig into the binlogs if you need it.
I have absolutely no idea what I'm doing, but with the help of Copilot I just made this change to conduit-connector-mysql/common/utils.go->FormatValue()
and it seems to be working... Its just changing uint64 to int64, or a fixed avro value if it is too long...
func FormatValue(val any) any {
switch val := val.(type) {
case time.Time:
return val.UTC().Format(time.RFC3339)
case *time.Time:
return val.UTC().Format(time.RFC3339)
case []uint8:
s := string(val)
if parsed, err := time.Parse(time.DateTime, s); err == nil {
return parsed.UTC().Format(time.RFC3339)
}
return s
case uint64:
// Convert uint64 to int64
if val <= math.MaxInt64 { //could also just use <= 9223372036854775807 rather than import the math package
return int64(val)
}
// Handle the case where uint64 is too large to fit in int64
// Convert to fixed type (example with 8 bytes)
return map[string]interface{}{
"type": "fixed",
"size": 8,
"name": "uint64_fixed",
"bytes": val,
}
default:
return val
}
}
I also have no idea if this is more appropriate - converting the int64 or avro map to a string, as is done in the []int8 case?
case uint64:
// Convert uint64 to int64
var s string
if val <= math.MaxInt64 { //could also just use <= 9223372036854775807 rather than import the math package
s = string(int64(val))
if parsed, err := time.Parse(time.DateTime, s); err == nil {
return parsed.UTC().Format(time.RFC3339)
}
} else {
// Handle the case where uint64 is too large to fit in int64
// Convert to fixed type (example with 8 bytes)
s = fmt.Sprintf(`{"type":"fixed","size":8,"name":"uint64_fixed","bytes":%d}`, val)
}
return s
Perhaps you guys could test it out? Surely something along these lines would work?
Would it have any knock-on effects if going in the opposite direction - an int64 value back into a unit64 column? Presumably no, because the int64 would surely just be positive values?
Do any of the other mysql data types need to be accounted for in FormatValue()? There's just time, []uint8 and, now, unit64...
I did manage to reproduce the issue! @lovromazgon it's with the cdc mode, not during the snapshot (that was confusing me for a while). I'm still not sure why when the pipeline is restarted there is no error, maybe because the position has changed and the connector doesn't read the uint64
again from cdc mode. I also don't know why, if the problem is with avro not supporting uint64
, I can't get an error during snapshot mode.
Let me see @nickchomey if I can fix it from the connector side. The issue might be moreless what you proposed, but we should be able to handle all mysql datatypes.
Great! Yes, I very much assume that my "solution" was an awful bandaid. It would be best to fix the problem with the restart "fixing" things when it probably shouldnt, as well as ensure that all mysql datatypes map appropriately to avro.
I think I'm at the limit of my utility here, so I'll leave it to you experts! Let me know if you need any testing done, specific binlog or other data, etc...
Update: when doing the mysql snapshot, there's no "avro doesn't support uint64" error because actually the BIGINT(20) unsigned
datatype is being read as an int64. I believe that this is because the current mysql driver that I'm using is using the driver.Value
interface to comply with the database/sql
package. Here's the proof:
https://github.com/go-sql-driver/mysql/blob/master/rows.go#L200-L232
That's what I saw locally at least, but I haven't explored much more from the go-sql-driver/mysql repo.
What could be also problematic (not really related to avro not supporting uint64
) is that there's a difference in schemas for the same table structure in snapshot and cdc modes. Could that lead to conduit level errors?
I cant be helpful with the encoding stuff, but am glad you're hot on the trail!
What could be also problematic (not really related to avro not supporting uint64) is that there's a difference in schemas for the same table structure in snapshot and cdc modes. Could that lead to conduit level errors?
Could you elaborate on this?
I actually noticed the schema version changing and couldnt figure out why. Im now thinking that perhaps it was due to what youve just described - the initial snapshot created v1, and then cdc created v2. I dont have much understanding of schemas/avro/schema registry yet, but surely that should not be happening?
Could you elaborate on this?
I actually noticed the schema version changing and couldnt figure out why. Im now thinking that perhaps it was due to what youve just described - the initial snapshot created v1, and then cdc created v2. I dont have much understanding of schemas/avro/schema registry yet, but surely that should not be happening?
I'm not that familiar with how conduit fetches the schema exactly, but what I mean is that a row might have a schema with an ID of int64 in snapshot mode, and in cdc mode, when record is updated, the ID would be uint64 (and triggering the error that you saw). The datatype did change, so 2 different schemas.
But now that I think about it, if avro doesn't support uint64, then this situation will never happen, because it will error out with what you saw.
I feel that we need a more exhaustive integration test with all mysql datatypes so that there's no discrepancy.
About you @nickchomey restarting the pipeline and not having any errors. Could you please restart it again and trigger an update to a table that has a column of type BIGINT(20) UNSIGNED
? You should see an error with the current source connector at main.
I'm not quite sure what you're asking for - this is what I already did in the original post and screen recording. But Im happy to do anything if you can clarify what you need.
Unless you're asking to trigger another error? Ive done that various times - it just fails, requires a restart, proceeds successfully until i create a new change to one of those columns
proceeds successfully until i create a new change to one of those columns
Exactly what I was asking for :smile:. If that's the behaviour that you are seeing, which what I've also observed locally, then it is definitely when the connector has finished the table snapshot and it is capturing data changes, the cdc mode.
If you are comfortable with possible rounding errors (converting uint64 to int64), I did add your fix here:
https://github.com/conduitio-labs/conduit-connector-mysql/tree/fix-issue
lmk if that solves your issue.
I assume that will work! I dont see what rounding errors would come from integers - its just a matter of allowing negative numbers or not. If the column is purely positive/unsigned numbers, then they'll just fill the signed column without creating any negative numbers.
However, I just extracted all of the datatypes from all columns in my database. There's lots of columns with other unsigned integer types
bigint(20) unsigned
int(1) unsigned
int(10) unsigned
int(11) unsigned
int(2) unsigned
int(3) unsigned
int(4) unsigned
int(5) unsigned
int(6) unsigned
mediumint(8) unsigned
mediumint(9) unsigned
smallint(5) unsigned
smallint(9) unsigned
tinyint(1) unsigned
tinyint(2) unsigned
tinyint(3) unsigned
tinyint(4) unsigned
The (#)
can all be ignored, so its really just tinyint, smallint, mediumint, int, bigint
(all of the mysql int types) https://dev.mysql.com/doc/refman/8.0/en/integer-types.html
And then here are all of the data types used in my db. Not sure if Avro supports them all or if they get handled somehow before avro.
I dont see these in the avro specification
bigint
bigint unsigned
binary
blob
char
date
datetime
decimal
decimal unsigned
double
double unsigned
enum('authenticator')
enum('ban','mute')
float
int
int unsigned
longblob
longtext
mediumint
mediumint unsigned
mediumtext
point
set('default','stripe','hover','order-column','row-border','compact','cell-border')
smallint unsigned
smallint
text
timestamp
tinyblob
tinyint
tinyint unsigned
tinytext
varbinary
varchar
I now see #17 - surely this issue is related
Bug description
I'm not sure what the issue is, but the mysql connector appears to be not working properly.
I have 2 pipelines:
Here's a screen recording showing that echoing a message to the text file outputs to the file and then to nats. But then when I make a change to my wordpress site (which sits on the mysql db table), it doesnt propagate through to the file or nats, and shows a bunch of error messages. When I stop and restart conduit, the changes immediately show up in the file and then in nats.
Also, when I initially set up the pipelines, it did this for the initial snapshot - a few thousand records came through once I restarted the crashed conduit application.
Ps. I was successfully using the mysql connector a month or two ago with the same connector configs. I dont recall which versions of the connector or conduit though. So, perhaps something has changed, either in the connector, conduit itself, or both?
here's the entire output in conduit, starting when I create the post and ending after I've restarted conduit. I dont know what to make of it...
Steps to reproduce
See the video and logs
Version
built from current main branch standalone:mysql@ec4da2c conduit v0.11.1 linux/amd64 - binary downloaded from releases