supplyon / gremcos

Golang Gremlin Tinkerpop client with Azure CosmosDB compatibility
MIT License
11 stars 6 forks source link

ExecuteAsync Swallowing Errors #127

Open mattKorwel opened 2 years ago

mattKorwel commented 2 years ago

I'm working on using ExecuteAsync and I see in the logs that I'm getting an error, in my case a 409 conflict error, but its not coming back in any of the responses. I may be using it wrong but i think it should be telling me there is an error.

I'm running on 1.38.

In an example case the code below returns on the last line with an empty response array even though there are one or more errors on the back end that are visible in the logs.

It seems that 'gotResponse' is false and there are no responses on the response channel at all but also no errors?

        responseChannel := make(chan gremcosinterfaces.AsyncResponse, 2)

    err := c.cosmos.ExecuteAsync(query.String(), responseChannel)
    if err != nil {
        return nil, err
    }

    responses := make([]gremcosinterfaces.Response, 0)
        gotResponse := false
    for response := range responseChannel {
        if response.ErrorMessage != "" {
            return nil, fmt.Errorf(response.ErrorMessage)
        }
        if !response.Response.IsEmpty() {
            if response.Response.Status.Code >= 300 {
                return nil, fmt.Errorf("(%d) %s", response.Response.Status.Code, response.Response.Status.Message)
            }
            responses = append(responses, response.Response)
        }
                gotResponse = true
    }
        fmt.Println(gotResponse)
    return responses, nil
jiping-s commented 1 year ago

We got the same problem. I tried the internal client without pooling and ExecuteAsync hangs after a few requests:

    ws, wsErr := gremcos.NewWebsocket(cfg.GraphURL, gremcos.SetWritingWait(time.Second*5), gremcos.SetReadingWait(time.Second*5), gremcos.SetBufferSize(1048576, 1048576))
    if wsErr != nil {
        logger.Fatalf("failed to create ws: %v", wsErr)
    }
    errCh := make(chan error, 1000)

    cosmos, cosmosErr := gremcos.Dial(ws, errCh, gremcos.SetAuth(gremcos.StaticCredentialProvider{
        UsernameStatic: cfg.GraphUsername,
        PasswordStatic: cfg.GraphPassword,
    }))
    if cosmosErr != nil {
        logger.Fatalf("failed to create Cosmos client: %v", cosmosErr)
    }

    for {
        if async {
            c := make(chan interfaces.AsyncResponse, 100)
            go func() {
                r := <-c
                concurrentIssue := false
                if r.ErrorMessage != "" {
                    if !strings.Contains(r.ErrorMessage, "PreconditionFailedException") &&
                        !strings.Contains(r.ErrorMessage, "Conflict") {
                        logger.Panic(r.ErrorMessage)
                    }
                }
                logger.Infof("inserted to gremlin: %s %d %v", r.Response.RequestID, r.Response.Status.Code, concurrentIssue)
            }()
            if err := cosmos.ExecuteAsync(query, c); err != nil {
                logger.Panic(err)
            }
            logger.Info("inserting")
            time.Sleep(100 * time.Millisecond)