koralium / flowtide

Streaming integration engine
https://koralium.github.io/flowtide/
Apache License 2.0
28 stars 2 forks source link

SqlServer UniqueIdentitifer type can't be written into openfga sink #369

Closed le-yams closed 7 months ago

le-yams commented 7 months ago

Hello,

I have a SqlServer source and a OpenFga sink with the following plan

INSERT INTO openfga
SELECT
    'company' as user_type,
    company.Id as user_id,
    'company_of' as relation,
    'organization' as object_type,
    company.OrganizationId as object_id
FROM [MyDB].[dbo].[Company] company;

The OrganizationId column type is uniqueidentifier and it fails to be written to openfga with the following error.

fail: FlowtideDotNet.Base.Engine.Internal.StateMachine.StreamContext[25]
      Stream error, stream: `stream`
      System.AggregateException: One or more errors occurred. (Unsupported type Blob)
       ---> System.InvalidOperationException: Unsupported type Blob
         at FlowtideDotNet.Connector.OpenFGA.Internal.FlowtideOpenFgaSink.ColumnToString(FlxValueRef& flxValue)
         at FlowtideDotNet.Connector.OpenFGA.Internal.FlowtideOpenFgaSink.GetClientTupleKey(SimpleChangeEvent row)
         at FlowtideDotNet.Connector.OpenFGA.Internal.FlowtideOpenFgaSink.UploadChanges(IAsyncEnumerable`1 rows, Watermark watermark, CancellationToken cancellationToken)
         at FlowtideDotNet.Connector.OpenFGA.Internal.FlowtideOpenFgaSink.UploadChanges(IAsyncEnumerable`1 rows, Watermark watermark, CancellationToken cancellationToken)
         at FlowtideDotNet.Core.Operators.Write.SimpleGroupedWriteOperator.SendData()
         at FlowtideDotNet.Core.Operators.Write.SimpleGroupedWriteOperator.Checkpoint(Int64 checkpointTime)
         at FlowtideDotNet.Core.Operators.Write.GroupedWriteBaseOperator`1.OnCheckpoint(Int64 checkpointTime)
         at FlowtideDotNet.Base.Vertices.Egress.EgressVertex`2.HandleCheckpoint(ICheckpointEvent checkpointEvent)
         at FlowtideDotNet.Base.Vertices.Egress.Internal.NonParallelEgressVertex`1.HandleLockingEvent(ILockingEvent lockingEvent)
         --- End of inner exception stack trace ---

I would expect the unique identifier to be written as a string.

I tried adding the LOWER() function LOWER(company.OrganizationId) as object_id, the error goes away but the object_id value always is null.

Regards

Ulimo commented 7 months ago

Thank you for adding the issue!

PR created that causes it to use string instead from sql server.

Ulimo commented 7 months ago

Release 0.9.0-alpha 15 contains the fix.

le-yams commented 7 months ago

Thanks šŸ‘