neo4j / neo4j-go-driver

Neo4j Bolt Driver for Go
Apache License 2.0
488 stars 68 forks source link

Getting SIGSEGV: segmentation violation panic when using session.Run after Neo recovery #451

Closed jenyad20 closed 8 months ago

jenyad20 commented 1 year ago

test.log Neo4j Version: 3.5.23 Community
Neo4j Mode: HA cluster / bolt Driver version: Go driver 5.5.0, also happened on version 3
Operating System: Ubuntu 22 locally, ubuntu 14 on aws

We noticed that every time a neo instance restarts our service get a panic, while investigating i found out that the panic doesn't happen when the neo is down, but rather when it recovers. I've managed to reproduce this by running the service locally, which access neo, bombarding it with requests to reproduce production behaviour and in the middle blocking access to neo, and then restoring it.

This only happens under high load when there are requests from multiple go routines after the connection was lost and is being reestablished,

To simulate this behaviour in testing I've created a test that opens a 100 go routines that close connection to neo and then try to execute a query, we have an error wrapper for closed connection that we reestablish in such case, and if session.Run would return an error we could handle it, however it throws a panic due to some race condition with thread unsafe buffer usage, the only solution for this would be to add a mutex that would let one go routine at a time run, but it would heavily slow down our process.

The test function

func (s *DriverTestSuite) TestMultithreadedQueryRequestsWithConnectionRecovery() {
    s.driver.Close(s.ctx)
    count := 100
    wg := &sync.WaitGroup{}
    wg.Add(count)

    for i := 0; i < count; i++ {

        go func(wg *sync.WaitGroup, i int, s *DriverTestSuite) {
            defer wg.Done()
            s.driver.Close(s.ctx)
            err := s.executeSimpleQuery()
            s.Require().NoError(err)

        }(wg, i, s)
    }
    wg.Wait()

}

func executeSimpleQuery(ctx context.Context, driver *Driver) error {
    return driver.ExecuteQuery(ctx, "CREATE (test:Test) return true", map[string]interface{}{}, func(result neo4j.ResultWithContext) error {
        var record *neo4j.Record
        result.NextRecord(ctx, &record)
        if record == nil || len(record.Values) == 0 {
            return errors.New("no records")
        }
        _, ok := record.Values[0].(bool)
        if !ok {
            return errors.New("expected value to be bool")
        }
        return nil
    })
}

func (s *DriverTestSuite) executeSimpleQuery() error {
    return executeSimpleQuery(s.ctx, s.driver)
}

and the relevant functions in code:

// ResultsHookFn allows the caller to parse the query results safely
type ResultsHookFn func(result neo4j.ResultWithContext) error

// ExecuteQuery runs a query an ensured connected driver via Bolt. it it used with a hook of the original neo4j.Result object for a convenient usage
func (d *Driver) ExecuteQuery(ctx context.Context, query string, params map[string]interface{}, onResults ResultsHookFn) (err error) {
    accessLock.RLock()
    defer accessLock.RUnlock()
    return d.nonblockExecuteQuery(ctx, query, params, onResults)

}

// nonblockExecuteQuery makes sure that a recursive retry to execute a query doesn't create a more mutexes and thus a deadlock
// example is when a query executed, Rlock acquired, than Close function called, trying to aquire Lock, blocked, and then
// the function calls itself again for retry, trying to acquire Rlock, but is blocked by Lock that is blocked by previous Rlock
func (d *Driver) nonblockExecuteQuery(ctx context.Context, query string, params map[string]interface{}, onResults ResultsHookFn) (err error) {

    session, err := d.NewSession(ctx)
    if err != nil {
        return err
    }
    defer d.CloseSession(ctx, session)

    result, err := session.Run(ctx, query, params)
    if err != nil {
        queryExecutionFailureMeter.Mark(1)
        if err.Error() == "Trying to create session on closed driver" || strings.HasPrefix(err.Error(), "ConnectivityError") {
            err = d.reconnect(ctx)
            if err != nil {
                return err
            }
            return d.nonblockExecuteQuery(ctx, query, params, onResults)
        }
        return err
    }
    err = executeHook(onResults, result) //<-- reporting metrics inside
    if err != nil {
        return err
    }
    return nil
}

// NewSession returns a new *connected* session only after ensuring the underlying connection is alive.
// it ensures liveliness by re-creating a new driver in case of connectivity issues.
// it returns an error in case any connectivity issue could not be resolved even after re-creating the driver.
func (d *Driver) NewSession(ctx context.Context) (neo4j.SessionWithContext, error) {
    return d.driver.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite}), nil
}

// CloseSession closes any open resources and marks this session as unusable.
// it wraps the original neo4j.Session.Close() func with af metrics and logs
func (d *Driver) CloseSession(ctx context.Context, session neo4j.SessionWithContext) {
    defer recoverLogPanic()
    err := session.Close(ctx)
    if err != nil {
        closeSessionFailureMeter.Mark(1)
    }
}

// reconnect will create a new driver if current one is not connected
// it uses double verification, as two queries might both get an error and try to reconnect, one will fix the connection
// the other doesn't need to reconnect
func (d *Driver) reconnect(ctx context.Context) error {
    recoveryLock.Lock()
    defer recoveryLock.Unlock()
    if err := d.driver.VerifyConnectivity(ctx); err == nil {
        return nil

    }

    connectionLostMeter.Mark(1)

    data, err := d.enclavedPassword.Open()
    if err != nil {
        return errors.New("couldnt open neo4j password enclave")
    }

    driver, err := NewDriver(Settings{d.dbURI, d.user, data.String()})
    if err != nil {
        return err
    }
    d.nonblockClose(ctx) //close old driver
    d.driver = driver.driver
    return nil
}

func (d *Driver) nonblockClose(ctx context.Context) {
    defer recoverLogPanic()
    if d.driver == nil {
        return
    }
    if err := d.driver.Close(ctx); err != nil {
        failedToCloseMeter.Mark(1)
        log.WarnWithError("failed to close existing driver", err)
    }
}

// Close safely closes the underlying open connections to the DB.
func (d *Driver) Close(ctx context.Context) {
    accessLock.Lock()
    defer accessLock.Unlock()
    d.nonblockClose(ctx)
}
fbiville commented 1 year ago

Hello and thanks for this detailed report.

Isn't reconnect inherently unsafe here? This closes a driver instance (and its connection pool) that may be in use by other goroutines, or am I misunderstanding something?

Also, would you be able to share a full stack trace of the panic you mention?

jenyad20 commented 1 year ago

For the sake of test i removed the line from reconnect where it closes the old driver just in case, but I get the same error. The code only goes to reconnect in the specific case where there is connectivity error or the driver is already closed, I use mutex to make sure that only one goroutine enters reconnect, and double verify, if several routines get the same error than even after entering the function, i check again if the connection is down, the first goroutine fixes the connection, the following ones see that its working and return. And when closing the connection in the test I use the blocking method, which means it finishes for all the goroutines to finish before closing the connection.

I shared the full trace, its in the very top, you got the test.log button

fbiville commented 1 year ago

Ok, that is very strange... Each Packer is tied to a single Connection instance, I wonder why the append fails in such a way. Let me try to set up a similar test as yours and see if I can reproduce it.

fbiville commented 1 year ago

There are some bits that I don't understand in the setup: why is s.driver.Close(s.ctx) called twice before executeSimpleQuery is called? When is the driver actually open?

If I get rid of these two calls, the test seems to run fine.

fbiville commented 1 year ago

If you could have a look at https://github.com/neo4j/neo4j-go-driver/pull/452 and see what I'm missing, it would be great. I got rid of the "password enclave" and metrics reporting to simplify the setup.

jenyad20 commented 1 year ago

Yes, closing the driver is what causes the issue, it simulates what happens in production when the neo is down, our service gets a panic when neo recovers. The connection happens during the suite setup, the very first close in this case might be redundent, but if the driver is closed than the close function does nothing

jenyad20 commented 1 year ago

In the test you need to add

func (s *DriverTestSuite) SetupSuite() {
    s.ctx = context.Background()
}

func (s *DriverTestSuite) TearDownSuite() {
    s.ctx.Done()
}

func (s *DriverTestSuite) connectToNeo() {
    require := s.Require()
    driver, err := NewDriver(connectionSettings)
    require.NoError(err)
    s.driver = driver
}

func (s *DriverTestSuite) SetupTest() {

    s.connectToNeo()
}

func (s *DriverTestSuite) connectToNeo() {
    require := s.Require()
    driver, err := NewDriver(connectionSettings)
    require.NoError(err)
    s.driver = driver
}

and in the main file you need

type Driver struct {
    driver           neo4j.DriverWithContext
    dbURI, user      string
    enclavedPassword *memguard.Enclave
}

// Settings holds the driver settings
type Settings struct {
    ConnectionString, User, Password string
}

func executeHook(onResults ResultsHookFn, result neo4j.ResultWithContext) (err error) {
    defer func() {
        if r := recover(); r != nil {
            parseResultsFailureMeter.Mark(1)
            log.WarnWithFields("[neo4j onResults] recovered from panic", logger.Fields{
                "recovered-value": fmt.Sprintf("%v", r),
            })
            err = fmt.Errorf("[neo4j onResults] recovered from panic: %v\n\n%s", r, string(debug.Stack()))
        }
    }()
    err = onResults(result)
    if err != nil {
        return err
    }
    return nil
}

func NewDriver(settings Settings) (*Driver, error) {
    enclave := memguard.NewEnclave([]byte(settings.Password))
    if enclave == nil {
        return nil, errors.New("failed to create enclave for neo4j password")
    }

    driver, err := neo4j.NewDriverWithContext(settings.ConnectionString, neo4j.BasicAuth(settings.User, settings.Password, ""))

    if err != nil {
        failedToCreateMeter.Mark(1)
        log.ErrorWithError("[neo4j driver] cannot create new driver", err)
        return nil, err
    }

    return &Driver{driver: driver, dbURI: settings.ConnectionString, user: settings.User, enclavedPassword: enclave}, nil
}

func recoverLogPanic() {
    if r := recover(); r != nil {
        failedToCloseMeter.Mark(1)
        log.WarnWithFields("[neo4j driver] recovered from panic", logger.Fields{
            "recovered-value": fmt.Sprintf("%v", r),
        })
    }
}
fbiville commented 1 year ago

If we get rid of testify, the password enclave and metrics (since they do not seem particularly relevant to the issue at hand), it seems we have a similar setup.

~I still don't understand how the test is supposed to work, since the driver is closed twice before executeSimpleQuery is called. executeSimpleQuery calls ExecuteQuery, leading to nonblockExecuteQuery and NewSession. NewSession will immediately fail with a usage error since you cannot create sessions on closed drivers.~

Would you be able to share a repository with the minimal reproducer, so that I can take it for a spin?

jenyad20 commented 1 year ago

nonblockExecuteQuery has a recursion if it gets that particular error with closed driver it executes the reconnect function and then runs itself again. To reproduce the issue in a working environment you can try to do a stress test with this drive on neo, midway through the test stop neo, wait around 5-10 minutes until the driver notice it lost the connection and then turn neo back on. This whole repo is just those two files, i shared about 90% of the code, the rest is functions for transactions, and more tests. All the single threaded tests pass, and a low load doesn't cause panic either, but with enough requests there appears to be a race condition.

fbiville commented 1 year ago

~The recursion you mention kicks in after the session.Run call, not NewSession call. Unless I am missing something (and it's likely that I am), the driver is already closed when NewSession is called, nonblockExecuteQuery returns the error right away, and nothing else happens.~

EDIT: ah gotcha, the error is deferred, so it indeed kicks only when session.run is called.

I appreciate you sharing most of the code. Sharing a small repository with everything ready would come a long way.

fbiville commented 1 year ago

Just set up https://github.com/fbiville/neo4j-go-driver-issue-451, I will see if I can get the long-running setup as you described earlier (since the test runs so far complete after a few seconds).

fbiville commented 1 year ago

@jenyad20 I still have a few more remarks/questions:

reconnect locking

It seems to me reconnect should rely on accessLock, not recoveryLock. Otherwise, the driver substitution is unsafe. In a multithreaded context like yours, closing the driver from one of the session's goroutines is unsafe. If you replace:

recoveryLock.Lock()
defer recoveryLock.Unlock()

with:

accessLock.Lock()
defer accessLock.Unlock()

Does the issue still persist?

session.Run

What are the reasons preventing you from running transaction functions (session.ExecuteRead / session.ExecuteWrite)? These have retry capabilities built in, session.Run does not.

3.5 server

Have you been able to reproduce the issue with a recent server? Neo4j 3.5 has reached End Of Life a few months ago.

jenyad20 commented 1 year ago

Trying to create a repo to reproduce, for some reason the very same code got a different result in the original and the clone repo, investigating, meanwhile I can't use accessLock in the reconnect, because its already used in the ExecuteQuery, which in turn calls for reconnect, so i would create a deadlock.

I saw the new functions and thought about switching to them, but they have a different signature, so we would have to update all the services that uses this library on our side, its a big project and I was hoping to solve the issue without using different funtions.

Don't have a new server, all our production environment and namespaces use neo 3.5, but this this seems like a client side issue, an error would be acceptible and we can handle it, panic is a bigger issue.

fbiville commented 1 year ago

because its already used in the ExecuteQuery, which in turn calls for reconnect, so i would create a deadlock.

I see, and Go does not support reentrant locks, so we're out of luck.

What if you move the locking before calling nonblockExecuteQuery just before the reconnect call? Something like:

    if err != nil {
        if err.Error() == "Trying to create session on closed driver" || strings.HasPrefix(err.Error(), "ConnectivityError") {
            accessLock.Lock()
            defer accessLock.Unlock()
            err = d.reconnect(ctx)
            if err != nil {
                return err
            }
            return d.nonblockExecuteQuery(ctx, query, params, onResults)
        }
        return err
    }

Would that help?

Don't have a new server, all our production environment and namespaces use neo 3.5, but this this seems like a client side issue, an error would be acceptible and we can handle it, panic is a bigger issue.

I concur, but reproducing with a recent Neo4j server version will be much easier for me to reproduce the results (I don't have any 3.5 instances around, and I cannot run local servers below the 4.4 version).

jenyad20 commented 1 year ago

For some reason the behavior is very inconsistent, all morning i've been trying to reproduce and it worked normally, now it keeps failing all the time. i added another lock

func (d *Driver) nonblockExecuteQuery(ctx context.Context, query string, params map[string]interface{}, onResults ResultsHookFn) (err error) {

    recoveryLock.RLock()
    session, err := d.NewSession(ctx)
    if err != nil {
        recoveryLock.RUnlock()
        return err
    }
    defer d.CloseSession(ctx, session)

    result, err := session.Run(ctx, query, params)
    recoveryLock.RUnlock()
    if err != nil {
        queryExecutionFailureMeter.Mark(1)
        if err.Error() == "Trying to create session on closed driver" || strings.HasPrefix(err.Error(), "ConnectivityError") {
            log.WarnWithError("[neo4j driver] existing neo4j connection lost. creating new one.", err)
            err = d.reconnect(ctx)
            if err != nil {
                return err
            }
            return d.nonblockExecuteQuery(ctx, query, params, onResults)
        }
        return err
    }
    err = executeHook(onResults, result) //<-- reporting metrics inside
    if err != nil {
        return err
    }
    return nil
}

that prevents the race condition where a session might be created from a driver in a process of being closed, still same result.

fbiville commented 1 year ago

What about my suggestion above? Have you tried it?

    if err != nil {
        if err.Error() == "Trying to create session on closed driver" || strings.HasPrefix(err.Error(), "ConnectivityError") {
            accessLock.Lock()
            defer accessLock.Unlock()
            err = d.reconnect(ctx)
            if err != nil {
                return err
            }
            return d.nonblockExecuteQuery(ctx, query, params, onResults)
        }
        return err
    }
jenyad20 commented 1 year ago

it will attempt to aquire a lock while im already inside the rlock, that would result in a deadlock, and if i release the rlock first than there is always a risk of another goroutine in between the release and the new lock

fbiville commented 1 year ago

Right, I missed these. Why do you need to lock in ExecuteQuery? The driver is designed to be thread-safe and supports scenarios such as creating 1 session per goroutine. Is it because of the metrics reporting bits? If not, it seems to me you could remove the lock there and lock only before calling reconnect.

fbiville commented 1 year ago

In any case, I don't think the errors you have been seeing are due to a bug in the driver. I'm going to remove the bug label from this issue as a result.

I'm happy to continue providing guidance here, as best I can, but https://community.neo4j.com/ is a better medium for this, should you have similar questions in the future.

fbiville commented 1 year ago

@jenyad20 do you still need assistance with this?

jenyad20 commented 1 year ago

Yes, the issue still persists, and the bigger problem is that the behavior is inconsistent.

fbiville commented 1 year ago

As I hinted at earlier, I believe the only possible way I could help further is having direct access to a repository and iterate from there.

The general advice remains true:

Doing anything else is fraught with issues and possibly breaches the thread-safety guarantees of the driver.

fbiville commented 1 year ago

@jenyad20 hello, we've improved the connection pool in the last two releases and I wonder if this could have any impact on the issues you were having. Do your issues persist with the latest 5.9.0?

robsdedude commented 8 months ago

I'll close this issue since there has been no activity in quit a while. If the error persists, feel free to keep commenting. But reading the supplied code, I come to the same conclusions as @fbiville