Open tangguoEddy opened 1 month ago
@tangguoEddy Are you able to provide more details about how to reproduce this crash on a local machine?
Hey all, we are also facing same issue in prod. With no luck reproducing it locally. After a little deep dive we found several potential places that might lead to this crash.
Risk: The AWSIoTStreamThread relies on manually managing its lifecycle with start, isRunning, and cancel states. Improper synchronization can lead to race conditions, especially when starting or canceling the thread.
The isRunning flag is updated but not thread-safe, as it is accessed in both the main loop and external methods (cancel or cancelAndDisconnect).
Risk: Cleanup operations (e.g., invalidating streams, closing resources) are distributed across cleanUp, cancel, and cancelAndDisconnect. This can lead to inconsistent behavior or resource leaks if these methods are called out of sequence.
cleanUp depends on shouldDisconnect, which may not always be set consistently. cancelAndDisconnect sets shouldDisconnect, but it doesn't always guarantee that cleanUp completes successfully before releasing resources.
A canceled thread may still execute cleanup logic if isRunning is not synchronized.
Risk: The defaultRunLoopTimer is essential for maintaining the run loop's activity. If the timer is invalidated prematurely (e.g., in cleanUp or during reconnection), the run loop might exit unexpectedly.
The timer is invalidated during cleanUp:
if (self.defaultRunLoopTimer) {
[self.defaultRunLoopTimer invalidate];
self.defaultRunLoopTimer = nil;
}
The run loop exits based on isRunning and isCancelled, but a missing timer could cause it to terminate prematurely.
Since there are plenty of things that could potentially lead to the crash can you review this updated implementation?
@interface AWSIoTStreamThread()
@property(atomic, strong, nullable) AWSMQTTSession *session;
@property(atomic, strong, nullable) NSOutputStream *encoderOutputStream;
@property(atomic, strong, nullable) NSInputStream *decoderInputStream;
@property(atomic, strong, nullable) NSOutputStream *outputStream;
@property(atomic, strong, nullable) NSTimer *defaultRunLoopTimer;
@property(atomic, strong, nullable) NSRunLoop *runLoopForStreamsThread;
@property(atomic, assign) NSTimeInterval defaultRunLoopTimeInterval;
@property(atomic, assign) BOOL isRunning;
@property(atomic, assign) BOOL shouldDisconnect;
@property(atomic, copy, nullable) dispatch_block_t onStop;
// Add synchronization primitives
@property(nonatomic, strong) dispatch_queue_t cleanupQueue;
@property(nonatomic, strong) dispatch_semaphore_t cleanupSemaphore;
@property(atomic, assign) BOOL isCleaningUp;
@end
@implementation AWSIoTStreamThread
- (instancetype)initWithSession:(nonnull AWSMQTTSession *)session
decoderInputStream:(nonnull NSInputStream *)decoderInputStream
encoderOutputStream:(nonnull NSOutputStream *)encoderOutputStream
outputStream:(nullable NSOutputStream *)outputStream {
if (self = [super init]) {
_session = session;
_decoderInputStream = decoderInputStream;
_encoderOutputStream = encoderOutputStream;
_outputStream = outputStream;
_defaultRunLoopTimeInterval = 10;
_shouldDisconnect = NO;
_isCleaningUp = NO;
// Initialize synchronization primitives
_cleanupQueue = dispatch_queue_create("com.amazonaws.iot.streamthread.cleanup", DISPATCH_QUEUE_SERIAL);
_cleanupSemaphore = dispatch_semaphore_create(1);
}
return self;
}
- (void)main {
@autoreleasepool {
AWSDDLogVerbose(@"Started execution of Thread: [%@]", self);
if (![self setupRunLoop]) {
AWSDDLogError(@"Failed to setup run loop for thread: [%@]", self);
return;
}
[self startIOOperations];
while ([self shouldContinueRunning]) {
@autoreleasepool {
[self.runLoopForStreamsThread runMode:NSDefaultRunLoopMode
beforeDate:[NSDate dateWithTimeIntervalSinceNow:self.defaultRunLoopTimeInterval]];
}
}
[self performCleanup];
AWSDDLogVerbose(@"Finished execution of Thread: [%@]", self);
}
}
- (BOOL)setupRunLoop {
if (self.isRunning) {
AWSDDLogError(@"Thread already running");
return NO;
}
self.runLoopForStreamsThread = [NSRunLoop currentRunLoop];
// Setup timer with weak reference to prevent retain cycles
__weak typeof(self) weakSelf = self;
self.defaultRunLoopTimer = [[NSTimer alloc] initWithFireDate:[NSDate dateWithTimeIntervalSinceNow:60.0]
interval:60.0
target:weakSelf
selector:@selector(timerHandler:)
userInfo:nil
repeats:YES];
if (!self.defaultRunLoopTimer) {
AWSDDLogError(@"Failed to create run loop timer");
return NO;
}
[self.runLoopForStreamsThread addTimer:self.defaultRunLoopTimer
forMode:NSDefaultRunLoopMode];
self.isRunning = YES;
return YES;
}
- (void)startIOOperations {
if (self.outputStream) {
[self.outputStream scheduleInRunLoop:self.runLoopForStreamsThread
forMode:NSDefaultRunLoopMode];
[self.outputStream open];
}
[self.session connectToInputStream:self.decoderInputStream
outputStream:self.encoderOutputStream];
}
- (BOOL)shouldContinueRunning {
return self.isRunning && !self.isCancelled && self.defaultRunLoopTimer != nil;
}
- (void)invalidateTimer {
dispatch_sync(self.cleanupQueue, ^{
if (self.defaultRunLoopTimer) {
[self.defaultRunLoopTimer invalidate];
self.defaultRunLoopTimer = nil;
}
});
}
- (void)cancel {
AWSDDLogVerbose(@"Issued Cancel on thread [%@]", (NSThread *)self);
[self cancelWithDisconnect:NO];
}
- (void)cancelAndDisconnect:(BOOL)shouldDisconnect {
AWSDDLogVerbose(@"Issued Cancel and Disconnect = [%@] on thread [%@]",
shouldDisconnect ? @"YES" : @"NO", (NSThread *)self);
[self cancelWithDisconnect:shouldDisconnect];
}
- (void)cancelWithDisconnect:(BOOL)shouldDisconnect {
// Ensure thread-safe property updates
dispatch_sync(self.cleanupQueue, ^{
if (!self.isCleaningUp) {
self.shouldDisconnect = shouldDisconnect;
self.isRunning = NO;
[super cancel];
// Invalidate timer to trigger run loop exit
[self invalidateTimer];
}
});
}
- (void)performCleanup {
dispatch_semaphore_wait(self.cleanupSemaphore, DISPATCH_TIME_FOREVER);
if (self.isCleaningUp) {
dispatch_semaphore_signal(self.cleanupSemaphore);
return;
}
self.isCleaningUp = YES;
dispatch_semaphore_signal(self.cleanupSemaphore);
dispatch_sync(self.cleanupQueue, ^{
[self cleanupResources];
});
}
- (void)cleanupResources {
if (self.shouldDisconnect) {
[self closeSession];
[self closeStreams];
} else {
AWSDDLogVerbose(@"Skipping disconnect for thread: [%@]", (NSThread *)self);
}
// Handle onStop callback
dispatch_block_t stopBlock = self.onStop;
if (stopBlock) {
self.onStop = nil;
stopBlock();
}
}
- (void)closeSession {
if (self.session) {
[self.session close];
self.session = nil;
}
}
- (void)closeStreams {
if (self.outputStream) {
self.outputStream.delegate = nil;
[self.outputStream close];
[self.outputStream removeFromRunLoop:self.runLoopForStreamsThread
forMode:NSDefaultRunLoopMode];
self.outputStream = nil;
}
if (self.decoderInputStream) {
[self.decoderInputStream close];
self.decoderInputStream = nil;
}
if (self.encoderOutputStream) {
[self.encoderOutputStream close];
self.encoderOutputStream = nil;
}
}
- (void)timerHandler:(NSTimer*)theTimer {
AWSDDLogVerbose(@"Default run loop timer executed on Thread: [%@]. isRunning = %@. isCancelled = %@",
self, self.isRunning ? @"YES" : @"NO", self.isCancelled ? @"YES" : @"NO");
}
- (void)dealloc {
AWSDDLogVerbose(@"Deallocating AWSIoTStreamThread: [%@]", self);
}
@end
The improvements ensure that: Cleanup happens exactly once Timer invalidation is properly synchronized Resources are released in the correct order State transitions are thread-safe Run loop exits cleanly
I'm not sure if it is critical to have properties nonatomic
though
Thanks for your input @AndrKonovalov ! Someone will take a look at your code and provide comments as soon as they have bandwidth
when i use AWSiOSSDKV2 2.37.2, I get this crash report。
I use IoT APIs like this