Open aamirrashid opened 4 years ago
I was wondering about this example the other day.
The example encodes the input as a string https://github.com/confluentinc/libserdes/blob/master/examples/serdes-kafka-avro-client.c#L215, using the schema only to populate the magic byte. Maybe you should input just the values: https://avro.apache.org/docs/current/spec.html#binary_encoding
maybe instead of: str: {"Make":"BMW","Model":"M-4","Year":"2014","MSRP":55000}
try: str: "BMW","M-4","2014",???
I don't know how to handle that MSRP being an "int", but supplied as a string.
I truly don't understand the value in shipping example code which does not work out of the box. I believe there is a shortcoming in the libserdes and in the example code. It's missing the functionality to convert json to avro - both in the example code and in the lib itself. Surprisingly enough, libserdes does have the code to do the reverse i.e. avro to json! Go figure!
The right way to fix it would be to add functionality in libserdes to convert from json to avro, and then the example code should be modified to call that functionality.
For my purposes, I coded a function to convert from json to avro in example code, and then it all worked.
Here are the 2 functions that I added to convert from json to avro. I simply called json_to_avro() from run_producer() after } else if (!strncmp(buf, "str: ", 5)) {
and it just works.
`int schema_traverse(const avro_schema_t schema, json_t json, avro_value_t current_val, int quiet, int strjson, size_t max_str_sz) { assert(json != NULL); assert(current_val != NULL);
if (!json) {
fprintf(stderr, "ERROR: Avro schema does not match JSON\n");
return 1;
}
switch (schema->type) {
case AVRO_RECORD:
{
if (!json_is_object(json)) {
if (!quiet)
fprintf(stderr, "ERROR: Expecting JSON object for Avro record, got something else\n");
return 1;
}
int len = avro_schema_record_size(schema), i;
for (i=0; i<len; i++) {
const char *name = avro_schema_record_field_name(schema, i);
avro_schema_t field_schema = avro_schema_record_field_get_by_index(schema, i);
json_t *json_val = json_object_get(json, name);
avro_value_t field;
avro_value_get_by_index(current_val, i, &field, NULL);
if (schema_traverse(field_schema, json_val, &field, quiet, strjson, max_str_sz))
return 1;
}
} break;
case AVRO_LINK:
/* TODO */
fprintf(stderr, "ERROR: AVRO_LINK is not implemented\n");
return 1;
break;
case AVRO_STRING:
if (!json_is_string(json)) {
if (json && strjson) {
/* -j specified, just dump the remaining json as string */
char * js = json_dumps(json, JSON_COMPACT|JSON_SORT_KEYS|JSON_ENCODE_ANY);
if (max_str_sz && (strlen(js) > max_str_sz))
js[max_str_sz] = 0; /* truncate the string - this will result in invalid JSON! */
avro_value_set_string(current_val, js);
free(js);
break;
}
if (!quiet)
fprintf(stderr, "ERROR: Expecting JSON string for Avro string, got something else\n");
return 1;
} else {
const char *js = json_string_value(json);
if (max_str_sz && (strlen(js) > max_str_sz)) {
/* truncate the string */
char *jst = malloc(strlen(js));
strcpy(jst, js);
jst[max_str_sz] = 0;
avro_value_set_string(current_val, jst);
free(jst);
} else
avro_value_set_string(current_val, js);
}
break;
case AVRO_BYTES:
if (!json_is_string(json)) {
if (!quiet)
fprintf(stderr, "ERROR: Expecting JSON string for Avro string, got something else\n");
return 1;
}
/* NB: Jansson uses null-terminated strings, so embedded nulls are NOT
supported, not even escaped ones */
const char *s = json_string_value(json);
avro_value_set_bytes(current_val, (void *)s, strlen(s));
break;
case AVRO_INT32:
if (!json_is_integer(json)) {
if (!quiet)
fprintf(stderr, "ERROR: Expecting JSON integer for Avro int, got something else\n");
return 1;
}
avro_value_set_int(current_val, json_integer_value(json));
break;
case AVRO_INT64:
if (!json_is_integer(json)) {
if (!quiet)
fprintf(stderr, "ERROR: Expecting JSON integer for Avro long, got something else\n");
return 1;
}
avro_value_set_long(current_val, json_integer_value(json));
break;
case AVRO_FLOAT:
if (!json_is_number(json)) {
if (!quiet)
fprintf(stderr, "ERROR: Expecting JSON number for Avro float, got something else\n");
return 1;
}
avro_value_set_float(current_val, json_number_value(json));
break;
case AVRO_DOUBLE:
if (!json_is_number(json)) {
if (!quiet)
fprintf(stderr, "ERROR: Expecting JSON number for Avro double, got something else\n");
return 1;
}
avro_value_set_double(current_val, json_number_value(json));
break;
case AVRO_BOOLEAN:
if (!json_is_boolean(json)) {
if (!quiet)
fprintf(stderr, "ERROR: Expecting JSON boolean for Avro boolean, got something else\n");
return 1;
}
avro_value_set_boolean(current_val, json_is_true(json));
break;
case AVRO_NULL:
if (!json_is_null(json)) {
if (!quiet)
fprintf(stderr, "ERROR: Expecting JSON null for Avro null, got something else\n");
return 1;
}
avro_value_set_null(current_val);
break;
case AVRO_ENUM:
// TODO ???
break;
case AVRO_ARRAY:
if (!json_is_array(json)) {
if (!quiet)
fprintf(stderr, "ERROR: Expecting JSON array for Avro array, got something else\n");
return 1;
} else {
int i, len = json_array_size(json);
avro_schema_t items = avro_schema_array_items(schema);
avro_value_t val;
for (i=0; i<len; i++) {
avro_value_append(current_val, &val, NULL);
if (schema_traverse(items, json_array_get(json, i), &val, quiet, strjson, max_str_sz))
return 1;
}
}
break;
case AVRO_MAP:
if (!json_is_object(json)) {
if (!quiet)
fprintf(stderr, "ERROR: Expecting JSON object for Avro map, got something else\n");
return 1;
} else {
avro_schema_t values = avro_schema_map_values(schema);
void *iter = json_object_iter(json);
avro_value_t val;
while (iter) {
avro_value_add(current_val, json_object_iter_key(iter), &val, 0, 0);
if (schema_traverse(values, json_object_iter_value(iter), &val, quiet, strjson, max_str_sz))
return 1;
iter = json_object_iter_next(json, iter);
}
}
break;
case AVRO_UNION:
{
unsigned int i;
avro_value_t branch;
for (i=0; i<avro_schema_union_size(schema); i++) {
avro_value_set_branch(current_val, i, &branch);
avro_schema_t type = avro_schema_union_branch(schema, i);
if (!schema_traverse(type, json, &branch, 1, strjson, max_str_sz))
break;
}
if (i==avro_schema_union_size(schema)) {
fprintf(stderr, "ERROR: No type in the Avro union matched the JSON type we got\n");
return 1;
}
break;
}
case AVRO_FIXED:
if (!json_is_string(json)) {
if (!quiet)
fprintf(stderr, "ERROR: Expecting JSON string for Avro fixed, got something else\n");
return 1;
}
/* NB: Jansson uses null-terminated strings, so embedded nulls are NOT
supported, not even escaped ones */
const char *f = json_string_value(json);
if (avro_value_set_fixed(current_val, (void *)f, strlen(f))) {
fprintf(stderr, "ERROR: Setting Avro fixed value FAILED\n");
return 1;
}
break;
default:
fprintf(stderr, "ERROR: Unknown type: %d\n", schema->type);
return 1;
}
return 0;
}
void json_to_avro(char buffer, int buffer_len, avro_schema_t schema, avro_value_t val) { json_error_t err; json_t *json; int n = 0; int max_str_sz = 1024; int strjson = 1;
assert(buffer != NULL);
assert(val != NULL);
json = json_loadb(buffer, buffer_len, 0, &err);
if (!json) {
fprintf(stderr, "JSON error on line %d, column %d, pos %d: %s, skipping to EOL\n", n, err.column, err.position, err.text);
}
if (schema_traverse(schema, json, val, 0, strjson, max_str_sz)) {
fprintf(stderr, "Error processing record %s, skipping...\n", buffer);
}
json_decref(json);
}`
This sounds great. I'll going to try out your code.
I can't get it together properly. Your code uses a avro_schema_t but we are given a serdes_schema_t. Where did you get the avro_schema_t for the call to json_to_avro?
Here is how you do it:
` } else if (!strncmp(buf, "str: ", 5)) { / Emit a single Avro string / avro_value_t val; void ser_buf = NULL; size_t ser_buf_size; char input; int input_len; avro_schema_t avro_schema = NULL; avro_value_iface_t *iface = NULL;
avro_generic_string_new(&val, buf+5);
avro_schema = serdes_schema_avro(schema);
iface = avro_generic_class_from_schema(avro_schema);
avro_generic_value_new(iface, &val);
input = buf+5;
input_len = strlen(input);
json_to_avro(input, input_len, avro_schema, &val);
`
Yes, that's good. You should get a PR going for this.
It works. Thanks much.
At the moment running into issue with supporting ENUM types in avro schema. I noticed it keeps returning the first item as it lacks the implementation in the following case statement (schema_traverse routine).
case AVRO_ENUM: // TODO ??? break;
Could anyone get AVRO_ENUM to work at some point? If so, please post that part of the code.
I am trying out few things too to get AVRO_ENUM case block working and will post my response here.
Thanks.
Got it working. Here is the snippet.
case AVRO_ENUM:
{
json_int_t symbol_value;
symbol_value = (json_int_t) avro_schema_enum_get_by_name(schema, json_string_value(json));
avro_value_set_enum(current_val, symbol_value);
}
break;
I'm trying to use the ./serdes-kafka-avro-client example as shipped with latest version of libserdes, but I cannot get it to work. Could someone please let me know if it's working code or not? Or maybe I'm not using it as intended. Here's my output:
1). The schema is already registered in schemaregistry:
curl -X GET localhost:60002/subjects/car/versions/latest | jq . % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 258 100 258 0 0 15246 0 --:--:-- --:--:-- --:--:-- 16125 { "subject": "car", "version": 1, "id": 22, "schema": "{"type":"record","name":"Car","fields":[{"name":"Make","type":"string"},{"name":"Model","type":"string"},{"name":"Year","type":"string"},{"name":"MSRP","type":"int"}]}" }
I start the consumer ./serdes-kafka-avro-client -C -b localhost:60000 -t aamir -p 0 -r localhost:60002 I start the producer ./serdes-kafka-avro-client -P -b localhost:60000 -t aamir -r localhost:60002 -s 22 % Using schema (null) with id 22 % Use "schema: " to specify a new schema % Use "str: " to produce an Avro-encoded string % Ctrl-D to exit
I enter just one record from producer str: {"Make":"BMW","Model":"M-4","Year":"2014","MSRP":55000}
The consumer complains that it cannot deserialize % serdes_deserialize_avro failed: Failed to read avro value: Cannot read string value: Cannot read string length: Cannot read 1 bytes from memory bufferCannot read string value: Cannot read string length: Cannot read 1 bytes from memory bufferCannot read string value: Cannot read string length: Cannot read 1 bytes from memory bufferCannot read string value: Cannot read string length: Cannot read 1 bytes from memory bufferCannot read string value: Cannot read string length: Cannot read 1 bytes from memory bufferCannot read string value: Can
Can someone please help?
Thanks!