Azure / azure-stream-analytics

Azure Stream Analytics
MIT License
224 stars 935 forks source link

How can a custom deserializer output a native JSON field (record) ? #107

Open oising opened 2 years ago

oising commented 2 years ago

Here is my situation - I've got an event hub feed that contains escaped JSON in a string, i.e. "{\"foo\":\"bar\"}" so I cannot use the built in JSON format. I've managed to build a StreamSerializer based on the examples, but I can only yield back simple structures with string or other primitives as fields. I want to return data that will appear as a native JSON output, just like the built-in one does. Here's what my currently (failing) attempt looks like:

Here's my structure that's yielded back from my StreamSerializer:

    public class CustomJsonData
    {
        public string Source { get; set; } = "MySource";
        public string JsonString { get; set; } // works fine
        public JsonDocument Json { get; set; } // fails
    }

The serializer itself:

   public class CustomJsonDeserializer : StreamDeserializer<CustomJsonData>
    {
        // streamingDiagnostics is used to write error to diagnostic logs
        private StreamingDiagnostics streamingDiagnostics;

        // Initializes the operator and provides context that is required for publishing diagnostics
        public override void Initialize(StreamingContext streamingContext)
        {
            this.streamingDiagnostics = streamingContext.Diagnostics;
        }

        // Deserializes a stream into objects of type CustomEvent
        public override IEnumerable<CustomJsonData> Deserialize(Stream stream)
        {
            using (var sr = new StreamReader(stream))
            {
                string line = line = sr.ReadLine();
                JsonDocument json = null;

                try {
                    line = Regex.Unescape(line.Trim('"'));
                    json = JsonDocument.Parse(line);
                } catch (Exception e) {
                    line = e.Message;
                }
                yield return new CustomJsonData() {
                        JsonString = line,
                        Json = json
                };
            }
        }
    }

This works fine dumping out a string, but then it's just a string field - we don't get any native JSON support in the Portal UI nor the ability to dot navigate the properties/fields.

So, we could probably use a javascript UDF in the query to eval the string JSON.parse() the string but this seems inefficient and less safe than having our serializer emit native JSON.

The JsonDocument above is from System.Text.Json 5.x (to comply with netstandard 2.0) but we get:

[Info] 2022-01-26 1:33:45 PM : Job Start Successfully !
[Error] 2022-01-26 1:33:55 PM : **System Exception** Could not load file or assembly 'System.Text.Json, Version=5.0.0.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51'.
[Error] 2022-01-26 1:33:55 PM :    at System.Runtime.Loader.AssemblyLoadContext.LoadFromPath(IntPtr ptrNativeAssemblyLoadContext, String ilPath, String niPath, ObjectHandleOnStack retAssembly)
at System.Runtime.Loader.AssemblyLoadContext.LoadFromAssemblyPath(String assemblyPath)
at System.Reflection.Assembly.LoadFrom(String assemblyFile)
at System.Reflection.Assembly.LoadFromResolveHandler(Object sender, ResolveEventArgs args)
at System.Runtime.Loader.AssemblyLoadContext.InvokeResolveEvent(ResolveEventHandler eventHandler, RuntimeAssembly assembly, String name)
at System.Runtime.Loader.AssemblyLoadContext.OnAssemblyResolve(RuntimeAssembly assembly, String assemblyFullName)
[Info] Stopped Local Run.
Child process exited with code 4294967295
[Info] Shutdown local credential server http://localhost:8999/

Any tips? Do I need to use a very specific version of System.Text.Json? Do I need a magic attribute somewhere? Do I need to use a differenet base class (i.e. not StreamSerializer?)

Thanks

Fleid commented 2 years ago

Hey @oising, let me ping the experts internally. Is that a time sensitive ask?

oising commented 2 years ago

hey @Fleid - it was, but I figured out how to workaround it by using a javascript UDF in conjunction with the deserializer to cast the json string as an ASA "record" type in the query itself. I was hung up on trying to do it all in the C# deserializer. It would be nice to know if it's possible nonetheless. I feel that pushing everything into the javascript engine is probably not the most efficient, but hey, I'm not expert here :)

oising commented 2 years ago

You guys might also warn people away from VS 2022 -- the entire ASA project subsystem seems broken. I had to reinstall VS 2019 to get anything working.

Fleid commented 2 years ago

VS Code is the recommended IDE for sure. I didn't know about VS 2022 but that's not my scope, will let the right people now.

For the custom deser, I was looking for an excuse to play with it anyway ;) Will keep you posted, surely by next week.

oising commented 2 years ago

@Fleid - yeah, the VS code experience is decent, but it feels a bit cluttered, and I noticed the extension will complain about the emitted job template ARM using an obsolete API version, so it offers to update the version string, and then that broke vstudio. I feel like I'm wrestling with an octopus in VS code sometimes.

edit: also, it's confusing trying to understand why/when/how one would use the Azure Stream Analytics CI/CD NuGet package vs the npm azure-stream-analytics cli, when both offer CLIs -- I think the former would be much better off as a dotnet global tool or something.

Fleid commented 2 years ago

I've just created a bug on our side for the ARM version warning. I use it everyday but I never compile in VSCode, always on ADO/GitHub from the repo.

For the tool, the recommend combo is VSCode ASA Tools + the npm CLI package. I have to update the doc to make it clearer.

Fleid commented 2 years ago

@oising I just want to clarify a thing here.

The deserializer you're implementing will replace our built-in adapter in the pipeline. It will not "prefix" it. So it should output objects with properties that you will be using as columns in your query, and of which types are supported by ASA. You should not output a JSON payload, expecting the built-in JSON deserializer to pick it up.

From the doc above:

StreamDeserializer deserializes a stream into object of type T. The following conditions must be met:

  • T is a class or a struct.
  • All public fields in T are either
    • One of [sbyte, byte, short, ushort, int, uint, long, DateTime, string, float, double] or their nullable equivalents.
    • Another struct or class following the same rules.
    • Array of type T2 that follows the same rules.
    • IListT2 where T2 follows the same rules.
    • Does not have any recursive types.

So for you that means something like:

public class myCustomType{
   public string foo {get; set;}
}

Which you could extract from your stream:

...
   using (var sr = new StreamReader(myStream))
   {
      string line = line = sr.ReadLine().Replace("\\",""); //or something more clever if you're expecting / in strings
      var lineT = JsonConvert.DeserializeObject<myCustomType>(line); //static parsing, I expect it to break if your schema drifts
      yield return lineT;
    }
...

Does that clarify things?

oising commented 2 years ago

Yes Florian, thank you --- but I think my point still stands. If the built-in deserializer can output "record" (json) types, then you should permit the same thing for a user-supplied one. This is especially useful if the schema of the incoming json is not stable, or will change over time. Updating the asa sql is easy to do and a lot less work than updating the serializer.

On Wed, 26 Jan 2022 at 19:59, Florian Eiden @.***> wrote:

@oising https://github.com/oising I just want to clarify a thing here.

The deserializer you're implementing will replace our built-in adapter in the pipeline. It will not "prefix" it. So it should output objects https://docs.microsoft.com/en-us/azure/stream-analytics/custom-deserializer-examples#net-custom-deserializer with properties that you will be using as columns in your query, and of which types are supported by ASA. You should not output a JSON payload, expecting the built-in JSON deserializer to pick it up.

From the doc above:

StreamDeserializer deserializes a stream into object of type T. The following conditions must be met:

  • T is a class or a struct.
  • All public fields in T are either
    • One of [sbyte, byte, short, ushort, int, uint, long, DateTime, string, float, double] or their nullable equivalents.
    • Another struct or class following the same rules.
    • Array of type T2 that follows the same rules.
    • IListT2 where T2 follows the same rules.
    • Does not have any recursive types.

So for you that means something like:

public class myCustomType{ public string foo {get; set;} }

Does that clarify things?

— Reply to this email directly, view it on GitHub https://github.com/Azure/azure-stream-analytics/issues/107#issuecomment-1022745383, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAOCGILCRGURPWAAB4O7TKLUYCKG3ANCNFSM5M32HYQQ . Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub.

You are receiving this because you were mentioned.Message ID: @.***>

--


404 signature missing

Fleid commented 2 years ago

You can output records in a custom deserializer, they are going to be nested classes, not JSONDocuments. So the output object (record level event, = row) will contain a property (column) of type record (another object, defined by a class or a struct).

public class myNestedRecordType{
   public string nestedFoo {get; set;};
}

public class myOutputEventType{
   public string foo {get; set;};
   public myNestedRecordType bar {get; set;}
}

Now what you may need then is a dynamic object. I need to play with it to see how dynamic data = JsonConvert.DeserializeObject(line); is ingested.

But that's not necessary to deal with multiple or drifting schemas. Since the whole deserialization happens at the record level, you can branch the output depending on the absence or presence of a field, and/or wrap the whole thing in a TRY/CATCH and encapsulate events you can't deserialize in a string or a byte[].

oising commented 2 years ago

I think maybe you're overthinking it :) All I really would like is a way to return a class/struct with one or more string members that can opt-in to being converted to the ASA Type "Record." Example:

public class CustomJsonData
    {
        public string Source { get; set; } = "MySource";
        public string JsonString { get; set; } // a string
        [ASARecordType]
        public string Json { get; set; } // will be exposed as an ASA record
    }

Yes, I'd agree that the ASA record type is similar to the DLR's DynamicMetaObject / C# dynamic but their domain is quite different.

Fleid commented 2 years ago

Let me sleep on it :D

Fleid commented 2 years ago

Ok I think I got it now, and how I see it is to rather have:

public class CustomJsonData
    {
        public string Source { get; set; } = "MySource";
        public string JsonString { get; set; } // a string
        public dynamic JsonRecord { get; set; } // will be exposed as an ASA record
    }

Where the deserializer can send dynamic objects that will be surfaced in the query as records.

This already works from the the deserialization side of things, I checked. But this type is not supported by ASA. I'm starting a thread with the dev owner to see if we can make that happen (or if there's another way to go around that).

oising commented 2 years ago

Ah, okay - that would be nice to have. To emit dynamic as ASA record, by convention. I like that. Good stuff @Fleid ! Let us know how you get on :)

ruiminwang commented 2 years ago

octopus

If you never wish to see this prompt for any template file, you can disable it by setting azureResourceManagerTools.checkForLatestSchema to false in VS Code's settings. You can navigate to this setting in the VS Code UI by going to Settings -> Extensions -> Azure Resource Manager Tools. image