kegaretail / react-native-rabbitmq

29 stars 56 forks source link

message expiration not work #24

Open CrazyMan2 opened 4 years ago

CrazyMan2 commented 4 years ago

import { Connection, Queue, Exchange } from 'react-native-rabbitmq';

let connection:Connection=null; let queue:Queue; let exchange:Exchange; let mExchangeName="weixinFace"; let mCallFun=null;//回调函数

export function setCallFun(callFun) { mCallFun=callFun; } export function connectRabbitMq(channel) { const config = { host: 'xxxxxx', port: 5672, username: 'xxxx', //your rabbitmq management username password: 'xxxx', //your rabbitmq management password virtualhost: '/', };

connection = new Connection(config);
connection.connect();
let connected = false;

connection.on('connected', (event) => {
    console.log("connected");
    queue = new Queue(connection, {
        name: channel,
        passive: false,
        durable: true,
        exclusive: false,
        // consumer_arguments: { 'x-message-ttl': 1000 },
        // Arguments:{'x-message-ttl': 1000 },
        // ttl:1000,
        autoDelete: false,
    });
    exchange = new Exchange(connection, {
        name: mExchangeName,
        type: 'direct',
        durable:true,
        autoDelete: false,
        internal: false,
    });
    queue.bind(exchange, channel);
    queue.on('message', (data) => {
        console.log(data);
        if(mCallFun)mCallFun(data);
    });
});
connection.on('error', event => {
    connected = false;
    console.log(connection);
    console.log(event);
    console.log("you are not connected");
});

} export function closeRabbitMq() { // connection.close(); } export function sendMessage(message,channel) { let properties ={expiration:1000}; exchange.publish(message,channel,properties); }

queue ttl and exchange properties expiration not work ,please help me

CrazyMan2 commented 4 years ago

queue = new Queue(connection, { name: channel, passive: false, durable: true, exclusive: false, consumer_arguments: { 'x-message-ttl': 1000 }, // Arguments:{'x-message-ttl': 1000 }, // ttl:1000, autoDelete: false, }); image x-message-ttl not set success

mrozkosz commented 4 years ago

I have the same problem. Someone knows how to solve it?

timhonders commented 4 years ago

IOS or Android?

timhonders commented 4 years ago

Can u try this

queue = new Queue(connection, { name: channel, passive: false, durable: true, exclusive: false, consumer_arguments: { }, autoDelete: false, }, { 'x-message-ttl': 1000 });

mrozkosz commented 4 years ago

I have this problem in both platforms. On Android I chanced a variable in file RabbitMqQueue.java before this.channel.queueDeclare(this.name, this.durable, this.exclusive, this.autodelete, args); now (it work) this.channel.queueDeclare(this.name, this.durable, this.exclusive, this.autodelete, consumer_args); Still I don't know how to solve this problem on iOS .

Zrzut-ekranu-2019-12-19-o-18-22-43

timhonders commented 4 years ago

ok that's not the way to do it, i will take al look at this

timhonders commented 4 years ago

This example works on android:

this.queue = new Queue(this.connection, { name: 'test5.testqueue', passive: false, durable: true, exclusive: false, consumer_arguments: { 'x-priority': this.priority, }, buffer_delay: 600, autoBufferAck: true }, { 'x-queue-mode': 'lazy', 'x-message-ttl': 1000, 'x-expires': 1800000 });

rmq

mrozkosz commented 4 years ago

Is there any solution for IOS?

Ro392 commented 4 years ago

Hello friend, do you know how to solve this error on IOS? It is very important!

Ro392 commented 4 years ago

IOS - sends arguments, but the queue created doesn't have them.

Zrzut ekranu 2020-01-7 o 12 09 35 Zrzut ekranu 2020-01-7 o 12 03 58
timhonders commented 4 years ago

Can u show youre code

Ro392 commented 4 years ago
Zrzut ekranu 2020-01-26 o 18 16 10
foo-io commented 9 months ago

Hi! Replace this code in _nodemodules/react-native-rabbit/ios/RCTReactNativeRabbitMq/RabbitMqQueue.m

#import "RabbitMqQueue.h"

@interface RabbitMqQueue ()
    @property (nonatomic, readwrite) NSString *name;
    @property (nonatomic, readwrite) NSDictionary *config;
    @property (nonatomic, readwrite) RMQQueue *queue; 
    @property (nonatomic, readwrite) id<RMQChannel> channel;
    @property (nonatomic, readwrite) RMQQueueDeclareOptions options;
    @property (nonatomic, readwrite) RCTBridge *bridge;
    @property (nonatomic, readwrite) RMQConsumer *consumer;
@end

@implementation RabbitMqQueue

RCT_EXPORT_MODULE();

-(id)initWithConfig:(NSDictionary *)config channel:(id<RMQChannel>)channel {
    if (self = [super init]) {

        self.config = config;
        self.channel = channel;
        self.name = [config objectForKey:@"name"];

        self.options = RMQQueueDeclareNoOptions;

        if ([config objectForKey:@"passive"] != nil && [[config objectForKey:@"passive"] boolValue]){
            self.options = self.options | RMQQueueDeclarePassive;
        }

        if ([config objectForKey:@"durable"] != nil && [[config objectForKey:@"durable"] boolValue]){
            self.options = self.options | RMQQueueDeclareDurable;
        }

        if ([config objectForKey:@"exclusive"] != nil && [[config objectForKey:@"exclusive"] boolValue]){
            self.options = self.options | RMQQueueDeclareExclusive;
        }

        if ([config objectForKey:@"autoDelete"] != nil && [[config objectForKey:@"autoDelete"] boolValue]){
            self.options = self.options | RMQExchangeDeclareAutoDelete;
        }

        if ([config objectForKey:@"NoWait"] != nil && [[config objectForKey:@"NoWait"] boolValue]){
            self.options = self.options | RMQQueueDeclareNoWait;
        }

        NSMutableDictionary *tmp_arguments = [[NSMutableDictionary alloc]init];
        if ([config objectForKey:@"consumer_arguments"] != nil){

            NSDictionary *consumer_arguments = [config objectForKey:@"consumer_arguments"];

            if ([consumer_arguments objectForKey:@"x-message-ttl"] != nil){
                NSNumber *ttl = [consumer_arguments objectForKey:@"x-message-ttl"];
                [tmp_arguments setObject:[[RMQSignedShort alloc] init:[ttl integerValue]] forKey:@"x-message-ttl"];
            }

            if ([consumer_arguments objectForKey:@"x-expires"] != nil){
                NSNumber *expires = [consumer_arguments objectForKey:@"x-expires"];
                [tmp_arguments setObject:[[RMQSignedLong alloc] init:[expires intValue]] forKey:@"x-expires"];
            }

            if ([consumer_arguments objectForKey:@"x-max-length"] != nil){
                NSNumber *maxLength = [consumer_arguments objectForKey:@"x-max-length"];
                [tmp_arguments setObject:[[RMQSignedLong alloc] init:[maxLength intValue]] forKey:@"x-max-length"];
            }

            if ([consumer_arguments objectForKey:@"x-priority"] != nil){
                NSNumber *xpriority = [consumer_arguments objectForKey:@"x-priority"];
                [tmp_arguments setObject:[[RMQSignedShort alloc] init:[xpriority integerValue]] forKey:@"x-priority"];
            }

            if ([consumer_arguments objectForKey:@"x-single-active-consumer"] != nil){
                BOOL active = [[consumer_arguments valueForKey:@"x-single-active-consumer"] boolValue];
                [tmp_arguments setObject:[[RMQBoolean alloc] init:active] forKey:@"x-single-active-consumer"];
            }

        }

        RMQBasicConsumeOptions consumer_options = RMQBasicConsumeNoOptions;

        RMQTable *arguments = [[RMQTable alloc] init:tmp_arguments];

        self.queue = [self.channel queue:self.name options:self.options arguments:tmp_arguments];

        self.consumer = [self.queue subscribe:consumer_options
                    arguments:arguments
                    handler:^(RMQMessage * _Nonnull message) {

            NSString *body = [[NSString alloc] initWithData:message.body encoding:NSUTF8StringEncoding];

            //[self.channel ack:message.deliveryTag];

            [EventEmitter emitEventWithName:@"RabbitMqQueueEvent" 
                body:@{
                    @"name": @"message", 
                    @"queue_name": self.name, 
                    @"message": body, 
                    @"routingKey": message.routingKey, // Will be deprecated
                    @"routing_key": message.routingKey, 
                    @"exchange": message.exchangeName,
                    @"consumer_tag": message.consumerTag, 
                    @"delivery_tag": message.deliveryTag
                }
            ];

        }];

    }
    return self;
}

-(void) bind:(RMQExchange *)exchange routing_key:(NSString *)routing_key {
    if ([routing_key length] == 0){
        [self.queue bind:exchange];
    }else{
        [self.queue bind:exchange routingKey:routing_key];
    }
}

-(void) unbind:(RMQExchange *)exchange routing_key:(NSString *)routing_key {
    if ([routing_key length] == 0){
        [self.queue unbind:exchange];
    }else{
        [self.queue unbind:exchange routingKey:routing_key];
    }
}

-(void) delete {
    [self.queue delete:self.options];
}

-(void) ack:(NSNumber *)deliveryTag {
    [self.channel ack:deliveryTag];
}

-(void) cancelConsumer {
    [self.consumer cancel];
}

@end

and example for use:

let queue = new Queue(connection, {
          name: 'queueName',
          passive: false,
          durable: true,
          exclusive: false,
          consumer_arguments: {
            'x-expires': 1800000,
            'x-max-length': 1,
          },
        });