ra1u / redis-dart

fast redis protocol parser and client
MIT License
84 stars 35 forks source link

FormatException with Byte data on topic #56

Closed dellorogiulio closed 2 years ago

dellorogiulio commented 2 years ago

Hi everyone, I'm doing some tests with this package and I found a problem when interfacing with a redis on which topic messages are messagepack-packed data official site. Until on topic there are strings only, this package has no problem using the following code:

_subscriber!.getStream().listen((event) {
   print(event);
}

however, when I try to receive an "encoded" message like this: image

which is shown as:

I get

[ERROR:flutter/lib/ui/ui_dart_state.cc(209)] Unhandled Exception: FormatException: Unexpected extension byte (at offset 0)
#0      _Utf8Decoder.convertSingle (dart:convert-patch/convert_patch.dart:1789:7)
#1      Utf8Decoder.convert (dart:convert/utf.dart:318:42)
#2      Utf8Codec.decode (dart:convert/utf.dart:63:20)
#3      RedisParser.parseBulk.<anonymous closure>.<anonymous closure>
package:redis/redisparser.dart:95
#4      _rootRunUnary (dart:async/zone.dart:1436:47)
#5      _CustomZone.runUnary (dart:async/zone.dart:1335:19)
<asynchronous suspension>
#6      _CustomZone.bindUnaryCallbackGuarded.<anonymous closure> (dart:async/zone.dart)
<asynchronous suspension>

before the

subscriber!.getStream().listen

callback is called.

How can we fix this?

Thanks in advance

ra1u commented 2 years ago

I think issue is that we transform received bulk data as strings:

https://github.com/ra1u/redis-dart/blob/master/lib/redisparser.dart#L95

For quick hack you should be fine if you throw out conversion UTF8.decode, but for backward compatibility i will have to came out with better idea.

dellorogiulio commented 2 years ago

Hi @ra1u , thanks for your response. I try and it works! I mean, now event in

_subscriber!.getStream().listen((event) {
   print(event);
}

is a List of List (of course) image so I need to decode both event[0] and event[1]

const UTF8 = Utf8Codec();
String action = UTF8.decode(event[0]);
if (action != 'message') {
return;
}
String topicName = UTF8.decode(event[1]);

which doesn't sound so bad to me. However, a friendly interface is always a good thing.

As implementation idea, you could mimic node ioredis package which allows user to subscribe on 'message' (consider a string) or 'messageBuffer' which is instead a raw buffer. Obviously this need some complexity from your side since you should have to wrap dart Stream and dispatch different callback for different events (now every event has the same callback and then user had to handle what it need), so that the user can register a callback to the 'subscribe' action, to the 'message', 'messageBuffer' etc.

Note: in ioredis, when you subscribe to 'messageBuffer', topic name is raw buffer too.

What do you think about that?

Thanks again!

EDIT: for ones interested in using messagepack in dart, msgpack_dart is perfectly working with this package. it only needs an extension to pass from List<int> (get as raw buffer) to UInt8List (need from msgpack_dart as type to deserialize)

extension AsUint8List on List<int> {
  Uint8List asUint8List() {
    final self = this;
    return (self is Uint8List) ? self : Uint8List.fromList(this);
  }
}

and then

List<int> message = event[2];
msgpack.deserialize(message.asUint8List());
dellorogiulio commented 2 years ago

Just to completeness: what if I need to publish raw data? Something like a UInt8List as third argument?

await _redis!.send_object(["PUBLISH", "topic_2", [1, 2, 3, 4]]);

Thanks in advance

ra1u commented 2 years ago

For that, take a peek in https://github.com/ra1u/redis-dart/blob/master/lib/redisserialise.dart There is class RedisBulk that you can use for this. (just push your data into this class).

dellorogiulio commented 2 years ago

Thanks I will Try!

ra1u commented 2 years ago

For your case that should be

await _redis!.send_object(["PUBLISH", "topic_2", RedisBulk([1, 2, 3, 4])]);
ra1u commented 2 years ago

We have now available support for binary data on master. Idea is that parser is embedded in class Command.

final conn = RedisConnection();
Command _cmd = await conn.connect('localhost',6379);
Command cmd_binary = Command.from(_cmd).setParser(RedisParserBulkBinary()); 

List<int> data = [1,2,3,4,5,6,7,8,9];    
await cmd_binary.send_object(["SET", "mykey", RedisBulk(data)]); 
List<int> rsp = cmd_binary.send_object(["GET", key]);

Due to minor backward incompatibility it is expected to be released as v3.0 in near future.

dellorogiulio commented 2 years ago

Thanks!