Closed michaelklishin closed 2 years ago
Some trial and error results:
RMQConnection#channelAllocator
weak results in test failures in ChannelLifecycleIntegrationTest
RMQAllocatedChannel#nameGenerator
weak results in test failures in ConsumersIntegrationTest
I ran this branch as is and we have retain cycle. What I would like to try next is the following.
@interface RMQSuspendResumeDispatcher ()
@property (nonatomic,weak, readwrite) id<RMQChannel> channel;
@property (nonatomic,weak, readwrite) id<RMQSender> sender;
What I've done is checked out this branch into my profiling test project. I create a connection and then destroy it. The memory graph shows me what object is holding onto an other. If an object is a member of one class and that object gets passed to another class , then thats a candidate for investigation.
If these changes cause no test failures then I can pull the branch down again and verify against profiler.
@interface RMQConnectionRecover ()
@property (nonatomic,weak, readwrite) id<RMQHeartbeatSender> heartbeatSender;
@end
The RMQSuspendResumeDispatcher
changes make unit tests for that class fail. Perhaps they can be reviewed but right now, that's the way it is.
The RMQConnectionRecover
one doesn't break anything, so I've added it.
Another strategy would be to review how connections are closed. We may depend on ARC too much and can be more proactive about releasing the resources we obviously won't need any more after a connection.close-ok
frame is received.
For example, the reader doesn't seem to be released here:
- (NSArray *)closeOperations {
return @[^{[self closeAllUserChannels];},
^{[self sendFrameset:[[RMQFrameset alloc] initWithChannelNumber:@0 method:self.amqClose]];},
^{[self.channelZero blockingWaitOn:[RMQConnectionCloseOk class]];},
^{[self.heartbeatSender stop];},
^{
self.transport.delegate = nil;
[self.transport close];
}];
}
Do I have permission to push to this branch? rabbitmq-objc-client-194-take-3
The reason I asked is I wanted to add a New iPhone app Target to the project that links again RMQClient. If I have that it will be a lot easier for me to:
If we dont want to give me permission just yet thats fine. Maybe you could update this branch with the new app target and I can pull down again.
This is the current state of the memory graph after
I 'think' the run method in Reader may be creating a retain cycle. Will look into that today.
More Progress here related to capturing self in callbacks. All Tests Passing Locally.
(I can't seem to push myself)
RMQConnection
- (void)start:(void (^)(void))completionHandler {
NSError *connectError = NULL;
[self.transport connectAndReturnError:&connectError];
if (connectError) {
[self.delegate connection:self failedToConnectWithError:connectError];
} else {
[self.transport write:[RMQProtocolHeader new].amqEncoded];
__weak id this = self;
[self.commandQueue enqueue:^{
__strong typeof(self) strongThis = this;
id<RMQWaiter> handshakeCompletion = [strongThis.waiterFactory makeWithTimeout:strongThis.handshakeTimeout];
RMQHandshaker *handshaker = [[RMQHandshaker alloc] initWithSender:strongThis
config:strongThis.config
completionHandler:^(NSNumber *heartbeatTimeout,
RMQTable *serverProperties) {
[strongThis.heartbeatSender startWithInterval:@(heartbeatTimeout.integerValue / 2)];
strongThis.handshakeComplete = YES;
[handshakeCompletion done];
[strongThis.reader run];
strongThis.serverProperties = serverProperties;
completionHandler();
}];
RMQReader *handshakeReader = [[RMQReader alloc] initWithTransport:strongThis.transport
frameHandler:handshaker];
handshaker.reader = handshakeReader;
[handshakeReader run];
if (handshakeCompletion.timesOut) {
NSError *error = [NSError errorWithDomain:RMQErrorDomain
code:RMQErrorConnectionHandshakeTimedOut
userInfo:@{NSLocalizedDescriptionKey: @"Handshake timed out."}];
[strongThis.delegate connection:strongThis failedToConnectWithError:error];
}
}];
}
}
RMQReader
- (void)handleMethodFrame:(RMQFrame *)frame {
id<RMQMethod> method = (id<RMQMethod>)frame.payload;
__weak id this = self;
if (method.hasContent) {
[self.transport readFrame:^(NSData * _Nonnull headerData) {
__strong typeof(self) strongThis = this;
RMQFrame *headerFrame = [strongThis frameWithData:headerData];
RMQContentHeader *header = (RMQContentHeader *)headerFrame.payload;
RMQFrameset *frameset = [[RMQFrameset alloc] initWithChannelNumber:frame.channelNumber
method:method
contentHeader:header
contentBodies:@[]];
if ([header.bodySize isEqualToNumber:@0]) {
[strongThis.frameHandler handleFrameset:frameset];
} else {
[strongThis readBodiesForIncompleteFrameset:frameset];
}
}];
} else {
RMQFrameset *frameset = [[RMQFrameset alloc] initWithChannelNumber:frame.channelNumber
method:method];
[self.frameHandler handleFrameset:frameset];
}
}
RMQTCPSocketTransport
- (void)readFrame:(void (^)(NSData * _Nonnull))complete {
__weak id this = self;
[self read:AMQP091_HEADER_SIZE complete:^(NSData * _Nonnull data) {
const struct AMQPHeader *header;
header = (const struct AMQPHeader *)data.bytes;
__strong typeof(self) strongThis = this;
UInt32 hostSize = CFSwapInt32BigToHost(header->size);
[strongThis read:hostSize complete:^(NSData * _Nonnull payload) {
[strongThis read:AMQP091_FINAL_OCTET_SIZE complete:^(NSData * _Nonnull frameEnd) {
NSMutableData *allData = [data mutableCopy];
[allData appendData:payload];
complete(allData);
}];
}];
}];
}
These changes leave us with a slightly healthier looking memory graph (just 1 issue left)
I will add you as a contributor so that you can push.
@BarryDuggan you should be able to push to this repository now.
Why was this branch deleted without a merge?
for some important dependencies of allocated channels and RMQConnection itself.
For example, in this client, RMQConnection auto-allocates a channel for the purpose of special "channel zero" (system communication in the protocol) purposes, and that leads to a loop of strong references that prevent RMQConnection instances from being released.
Based on object graph analysis by @BarryDuggan in https://github.com/rabbitmq/rabbitmq-objc-client/discussions/194.