godotengine / godot

Godot Engine – Multi-platform 2D and 3D game engine
https://godotengine.org
MIT License
91.04k stars 21.18k forks source link

Function calls made after awaiting on signal emitted from within a thread has a chance of failing abruptly without error. #92855

Open KnightNine opened 5 months ago

KnightNine commented 5 months ago

Tested versions

v4.2.rc.custom_build [d34f1ec4b], v4.3.beta.custom_build [44ac302e3]

System information

Windows 10

Issue description

I built a "thread hooking" system (which essentially allows me to jump in and out of a thread at any point in my code) to avoid creating multiple threads because Godot 3.X would stutter every time a thread was started. My code revolves around usage of this system which only now I've found to just stop working at random without any error. (And re-working all my thread reliant code to call functions directly with threads will give me Big Sad™ which is why I want some sorta fix.) It took me so long to find this issue because this code can run completely fine a few thousand times... until it doesn't.

When the issue occurs the function called within the thread just does not execute past a certain point as if it were awaiting on a signal that never comes.

Steps to reproduce

Here is the "thread hooking" code and the "stability/stress test" which crashes it:

extends Node2D
#optional label to show thread progress without std_out
#@onready var label = get_node("RichTextLabel")

var thread = Thread.new()
var semaphore = Semaphore.new()
var thread_active = true
signal thread_frame()
signal thread_finished()
var yielding_on_thread_process = false

func thread_process():
    Thread.set_thread_safety_checks_enabled(false)
    while thread_active:

        if !yielding_on_thread_process:
            print("held")
            semaphore.wait()
            print("released")
        #this fixes the deadlocking and error issues (because i think the thread will call emit_signal() at the exact same time as `await self.thread_frame` is called in reattach_to_thread(), which seems to break godot ) (maybe make the delay a bit larger than 1 usec just in case):
        OS.delay_usec(100)
        emit_signal("thread_frame")

    print("finished")

var ri = 0
func reattach_to_thread():
    var id = thread.get_id()
    #if not already in thread
    if OS.get_thread_caller_id() != int(id):
        #on semaphore.post(), the thread is "released" to emit a signal
        yielding_on_thread_process = true
        print("posted" + str(ri))
        semaphore.post()
        #this merges/hooks the function into the thread (awaiting on anything adds the code being run to whatever thread is being awaited on)
        await self.thread_frame
        print("hooked"+ str(ri))
        yielding_on_thread_process = false
        ri+=1
        if ri > 999:
            ri = 0
    else:
        print("reattach not needed")

#stability test functions==============================

func stability_test():
    label.text = ""
    #stability test:
    for i in range(1000):
        await reattach_to_thread()
        await get_tree().process_frame
        label.text += ", "+str(i)
        print(i)

#READY=============================================

# Called when the node enters the scene tree for the first time.
func _ready():
    #HOW TO USE THREAD HOOKING:
    #start the thread process
    var c = Callable(self,"thread_process")
    thread.start(c)

    print("you are currently in the main thread:"+str(OS.get_thread_caller_id()))

    await reattach_to_thread()

    print("you are in the thread now:"+str(OS.get_thread_caller_id()))

    #return to main thread by awaiting on process_frame
    await get_tree().process_frame
    print("you are in the main thread again:"+str(OS.get_thread_caller_id()))

    #TESTING STABILITY:
    while true:
        await stability_test()

Read comments under func awaitable_threader(i) for more info about the issue.

Minimal reproduction project (MRP)

N/A

AThousandShips commented 5 months ago

You are on a very outdated branch, please try on a supported branch and if you have your own code in your branch please remove it, bug reports are only for official code

KnightNine commented 5 months ago

After further testing I've found that this issue is only present in my project, I cannot seem to replicate it in a fresh project.

The potential causes I can think of (as someone who knows little about the core engine stuff) are:

(can someone with a large project and/or a project that was imported from 3.X try running a scene containing the above code?)

I synced my Godot build with the 4.3 master branch (v4.3.beta.custom_build [44ac302e3]), only preserving my modifications to astar so that my project wouldn't throw a bunch of errors due to making unsupported function calls. The result is that the issue persists.

I'll have to test removing stuff from my Autoload scripts to see if that changes anything and I'll add it here if I find anything.

KnightNine commented 5 months ago

yeah, I have no idea what causes it so far, I've disabled anything that could potentially run within my Autoload scripts and it hasn't resolved it. I'll note that in 4.3 the issue doesn't seem to appear as often when the project window is in focus. (Also I still can't replicate it in a new project so I'm leaning towards this being somehow related to the project's size)

Really want to know if anyone can replicate it or if it's just me.

AThousandShips commented 5 months ago

Pretty hard to tell, I'd say the first step is to establish if it's independent of your own code, but with the vague contexts it's pretty impossible to tell or to replicate

KnightNine commented 5 months ago

I've further narrowed down the code above.

I can simplify the stability test function to:

func stability_test():
    label.text = ""
    #stability test:
    for i in range(1000):
        await reattach_to_thread()
        await get_tree().process_frame
        label.text += ", "+str(i)
        print(i)

with the issue persisting.

I also noticed that adding a delay or a print statement after the await get_tree().process_frame like so:

    await get_tree().process_frame
    OS.delay_usec(100)

fixes the issue. 🤔 (EDIT: it works in my test project but not when i'm using it in my actual project.)

This leads me to believe that this is some sort of deadlocking issue where 'await get_tree().process_frame' is being called at the exact same time as the "process_frame" signal is being emitted, the await doesn't know what to do and crashes. This would explain why it's so inconsistent.

Awaiting on a process frame from a thread isn't something that is restricted in the thread_safety_checks so maybe this possibility wasn't accounted for or seen to be possible. Though I still can't explain why it only happens in my project... I'm probably missing something.

RadiantUwU commented 2 months ago

Hello! Await connects to the signal, if the signal is not protected by a mutex, it will result in thread safety issues and crashes.

KnightNine commented 2 months ago

@RadiantUwU I've found that, in my my main project which uses this system, even with relying on the OS.delay_usec(100), it still crashes.

I tried using mutex, then I realized it was redundant since I could use the semaphore as a mutex. letting the signal only be emitted a process frame after the await call has been made. (it was still crashing anyway)

commented out all the uses of mutex:

#optional label to show thread progress without std_out
@onready var label = get_node("RichTextLabel")

var thread = Thread.new()
var semaphore = Semaphore.new()
var thread_active = true
signal thread_frame()
signal thread_finished()

var semaphore_counter = 0
var yielding_on_thread_process = false

var mut = Mutex.new()

func thread_process():
    Thread.set_thread_safety_checks_enabled(false)
    while thread_active:

        if !yielding_on_thread_process:
            semaphore_counter+=1
            alt_print("held"+str(semaphore_counter))
            semaphore.wait()

            alt_print("released")

        #mut.lock()
        alt_print("lock")
        emit_signal("thread_frame")

    alt_print("finished")

var ri = 0
func reattach_to_thread():
    var id = thread.get_id()
    #if not already in thread
    if OS.get_thread_caller_id() != int(id):
        #on semaphore.post(), the thread is "released" to emit a signal

        #mut.lock()
        alt_print("lock2")

        yielding_on_thread_process = true

        call_deferred("delayed_unlocker")
        #this merges/hooks the function into the thread (awaiting on anything adds the code being run to whatever thread is being awaited on)
        await self.thread_frame

        while semaphore_counter < 0:
            alt_print("empty held"+str(semaphore_counter))
            semaphore_counter += 1
            semaphore.wait()

        alt_print("hooked"+ str(ri))
        yielding_on_thread_process = false
        alt_print("unlock")
        #mut.unlock()

        ri+=1
        if ri > 999:
            ri = 0
    else:
        alt_print("reattach not needed")

func delayed_unlocker():
    await get_tree().process_frame
    alt_print("unlock2")
    #mut.unlock()
    alt_print("posted" + str(ri))
    semaphore_counter -= 1
    semaphore.post()

#there is a chance that the get_tree().process_frame will deadlock
var awaiting_process_frame = false
func safe_merge_with_main_thread():
    #mut.lock()
    alt_print("lock3")
    awaiting_process_frame = true
    await get_tree().process_frame
    alt_print("unlock3")
    #mut.unlock()

#stability test functions==============================

const USE_ALT_PRINT = true

func stability_test():
    label.text = ""
    #stability test:
    for i in range(100):

        await reattach_to_thread()

        await safe_merge_with_main_thread()
        if !USE_ALT_PRINT:

            label.text += ", "+str(i)

func alt_print(text):
    if USE_ALT_PRINT:
        await get_tree().process_frame
        label.text += ", "+str(text)
    else:
        print(text)

Results:

Test1

Settings:

Test 2

Settings:

Test 3

Settings:

Conclusion: (EDIT: Dismiss this, it's incorrect.)

I suspect that even if i call await self.thread_frame 1 or 2 frames before posting the semaphore and/or unlocking the mutex for the signal to be emitted, there is still a chance that await self.thread_frame will not "complete" its call. by "completion" I mean the point where the await call is actually waiting and won't dismiss the signal when it is emitted.

There's no way to track the completion of an await call so threadsafety for awaiting upon a signal within another thread is ultimately impossible.😭

There'd need to be a feature dedicated to this, like a signal that is emitted when all the await calls in the engine are settled or a way to make calls after an await call like await awaitsignal : also_call_this_function_after().

KnightNine commented 2 months ago

@RadiantUwU OK NEVERMIND, I tried using my own logger program (which writes to a file) to see why it was crashing (since adding print calls, modifying the range() value, or changing what was added to the label drastically changed the frequency of the crashes.) and I found that it was actually deadlocking when awaiting on the process_frame to re-merge into the main thread and had nothing to do with the await self.thread_frame. I was completely incorrect.😔

I don't want to celebrate too early but this seems to work reliably now! Cleaned up the code and removed what was unnecessary:

#optional label to show thread progress without std_out
@onready var label = get_node("RichTextLabel")

var thread = Thread.new()
var semaphore = Semaphore.new()
var thread_active = true
signal thread_frame()
signal thread_finished()

var semaphore_counter = 0
var process_mut = Mutex.new()

signal re_merge_process_frame()
func _process(delta):

    if awaiting_process_frame:
        process_mut.lock()
        emit_signal("re_merge_process_frame")
        process_mut.unlock()

func thread_process():
    Thread.set_thread_safety_checks_enabled(false)
    while thread_active:
        semaphore_counter+=1
        semaphore.wait()

        emit_signal("thread_frame")

func reattach_to_thread():
    var id = thread.get_id()
    #if not already in thread
    if OS.get_thread_caller_id() != int(id):

        call_deferred("delayed_unlocker")
        #this merges/hooks the function into the thread (awaiting on anything adds the code being run to whatever thread is being awaited on)
        await self.thread_frame

    else:
        pass

var awaiting_process_frame = false
func safe_merge_with_main_thread():
    process_mut.lock()
    awaiting_process_frame = true
    #can't await on process frame here so call_deferred_thread_group() is used instead
    call_deferred_thread_group("delayed_unlocker2")
    await re_merge_process_frame
    awaiting_process_frame = false

#delayed unlockers make their call after the await call.
func delayed_unlocker():
    #on semaphore.post(), the thread is "released" to emit a signal
    semaphore_counter -= 1
    semaphore.post()

func delayed_unlocker2():
    process_mut.unlock()

Two major changes are that I found a use for call_deferred_thread_group() and the other is that I needed to use an alternate signal to hook back into the main thread.