owncloud / ocis

:atom_symbol: ownCloud Infinite Scale Stack
https://doc.owncloud.com/ocis/next/
Apache License 2.0
1.38k stars 182 forks source link

files may be stuck in postprocessing even though the upload session is deleted #10259

Open butonic opened 2 weeks ago

butonic commented 2 weeks ago

We found a file that is stuck in postprocessing but the upload session has been deleted. I assume because the virus scanner emitted an event that caused the upload session to be deleted without removing the postprocessing flag.

maybe relatedy: https://github.com/owncloud/ocis/issues/8848 https://github.com/owncloud/ocis/issues/9244 https://github.com/owncloud/ocis/issues/7177

butonic commented 1 week ago

The ocis charts always uses "OCIS_ASYNC_UPLOADS=true", so postprocessing is always enabled. The postprocessing code in the decomposedfs is handling the events:

1. case events.PostprocessingFinished:

by default deletes the postprocessing status. only when the ev.Outcome is events.PPOutcomeContinue and the session.Finalize() fails will we not delete the postprocessing state The Finalize() call tries to upload the blob. Only that can fail.

2. case events.RestartPostprocessing:

does not change postprocessing state

3. case events.PostprocessingStepFinished:

does not change postprocessing state

There are other codepaths that call session.store.Cleanup(), but they are all deleting the postprocessing state: The FinishUpload() handling might call session.store.Cleanup():

1. on a checksum error

on a calculation error or an unknown checksum we call session.store.Cleanup(ctx, session, true, false, false). The signature of that function is Cleanup(ctx context.Context, session Session, revertNodeMetadata, keepUpload, unmarkPostprocessing bool) and it calls session.Cleanup(revertNodeMetadata, !keepUpload, !keepUpload). That signature is Cleanup(revertNodeMetadata, cleanBin, cleanInfo bool). When revertNodeMetadata is true and there is a node for the session and this is the first revision of a file we completely delete the node:

// cleanup cleans up after the upload is finished
func (session *OcisSession) Cleanup(revertNodeMetadata, cleanBin, cleanInfo bool) {
    ctx := session.Context(context.Background())

    if revertNodeMetadata {
        n, err := session.Node(ctx)
        if err != nil {
            appctx.GetLogger(ctx).Error().Err(err).Str("sessionid", session.ID()).Msg("reading node for session failed")
        } else {
            if session.NodeExists() && session.info.MetaData["versionsPath"] != "" {
                p := session.info.MetaData["versionsPath"]
                if err := session.store.lu.CopyMetadata(ctx, p, n.InternalPath(), func(attributeName string, value []byte) (newValue []byte, copy bool) {
                    return value, strings.HasPrefix(attributeName, prefixes.ChecksumPrefix) ||
                        attributeName == prefixes.TypeAttr ||
                        attributeName == prefixes.BlobIDAttr ||
                        attributeName == prefixes.BlobsizeAttr ||
                        attributeName == prefixes.MTimeAttr
                }, true); err != nil {
                    appctx.GetLogger(ctx).Info().Str("versionpath", p).Str("nodepath", n.InternalPath()).Err(err).Msg("renaming version node failed")
                }

                if err := os.RemoveAll(p); err != nil {
                    appctx.GetLogger(ctx).Info().Str("versionpath", p).Str("nodepath", n.InternalPath()).Err(err).Msg("error removing version")
                }

            } else {
                // if no other upload session is in progress (processing id != session id) or has finished (processing id == "")
                latestSession, err := n.ProcessingID(ctx)
                if err != nil {
                    appctx.GetLogger(ctx).Error().Err(err).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Str("uploadid", session.ID()).Msg("reading processingid for session failed")
                }
                if latestSession == session.ID() {
                    // actually delete the node
                    session.removeNode(ctx)
                }
                // FIXME else if the upload has become a revision, delete the revision, or if it is the last one, delete the node
            }
        }
    }

    if cleanBin {
        if err := os.Remove(session.binPath()); err != nil && !errors.Is(err, fs.ErrNotExist) {
            appctx.GetLogger(ctx).Error().Str("path", session.binPath()).Err(err).Msg("removing upload failed")
        }
    }

    if cleanInfo {
        if err := session.Purge(ctx); err != nil && !errors.Is(err, fs.ErrNotExist) {
            appctx.GetLogger(ctx).Error().Err(err).Str("session", session.ID()).Msg("removing upload info failed")
        }
    }
}
butonic commented 1 week ago
PrajwolAmatya commented 4 days ago

Tried using the config POSTPROCESSING_STEPS=virusscan and ANTIVIRUS_INFECTED_FILE_HANDLING=continue, and upload a file with virus. The file gets stuck in postprocessing. Also, the CLI command doesn't list that file. Tried with command

For both the cases the file with virus is not listed in the response.

butonic commented 1 day ago

I can reproduce files hanging in postprocessing when setting

    "OCIS_ASYNC_UPLOADS": "true",
    "POSTPROCESSING_DELAY": "1s",

then, running rclone (with the 425 patch) copy of a folder with 1k 10kb files wil leave hundreds of files in the web ui stuckh in postprocessing.

The log shows:

{"level":"error","event":"PostprocessingFinished","uploadid":"07c0a8cf-accd-40f4-a61f-5348a2321101","error":"ERR_UPLOAD_NOT_FOUND: upload not found","time":"2024-10-21T13:05:51+02:00","message":"Failed to get upload"}

and the files are actually not present on disk.

butonic commented 1 day ago

Also happens when using the web ui using 1000 4k files. All uploads in the web are finished but after ~500 processed uploads the upload not found errors are logged.

butonic commented 1 day ago

ok i see this:

the outcome is continue ...

  1. a finished upload is moved from the upload folder to the final blobstore (in the ocis fs without s3)
            case events.PPOutcomeContinue:
                if err := session.Finalize(); err != nil {
  2. the upload seassion is then cleaned up, but now the rm of the blob returns an error
            fs.sessionStore.Cleanup(ctx, session, revertNodeMetadata, keepUpload, unmarkPostprocessing)

hm

// Cleanup cleans upload metadata, binary data and processing status as necessary
func (store OcisStore) Cleanup(ctx context.Context, session Session, revertNodeMetadata, keepUpload, unmarkPostprocessing bool) {
    ctx, span := tracer.Start(session.Context(ctx), "Cleanup")
    defer span.End()
    session.Cleanup(revertNodeMetadata, !keepUpload, !keepUpload)

    // unset processing status
    if unmarkPostprocessing {
        n, err := session.Node(ctx)
        if err != nil {
            appctx.GetLogger(ctx).Info().Str("session", session.ID()).Err(err).Msg("could not read node")
            return
        }
        // FIXME: after cleanup the node might already be deleted ...
        if n != nil { // node can be nil when there was an error before it was created (eg. checksum-mismatch)
            if err := n.UnmarkProcessing(ctx, session.ID()); err != nil {
                appctx.GetLogger(ctx).Info().Str("path", n.InternalPath()).Err(err).Msg("unmarking processing failed")
            }
        }
    }
}

session.Cleanup is called with revertNodeMetadata false, cleanBin true and cleanInfo true:


// cleanup cleans up after the upload is finished
func (session *OcisSession) Cleanup(revertNodeMetadata, cleanBin, cleanInfo bool) {
    ctx := session.Context(context.Background())

    if revertNodeMetadata {
        n, err := session.Node(ctx)
        if err != nil {
            appctx.GetLogger(ctx).Error().Err(err).Str("sessionid", session.ID()).Msg("reading node for session failed")
        } else {
            if session.NodeExists() && session.info.MetaData["versionsPath"] != "" {
                p := session.info.MetaData["versionsPath"]
                if err := session.store.lu.CopyMetadata(ctx, p, n.InternalPath(), func(attributeName string, value []byte) (newValue []byte, copy bool) {
                    return value, strings.HasPrefix(attributeName, prefixes.ChecksumPrefix) ||
                        attributeName == prefixes.TypeAttr ||
                        attributeName == prefixes.BlobIDAttr ||
                        attributeName == prefixes.BlobsizeAttr ||
                        attributeName == prefixes.MTimeAttr
                }, true); err != nil {
                    appctx.GetLogger(ctx).Info().Str("versionpath", p).Str("nodepath", n.InternalPath()).Err(err).Msg("renaming version node failed")
                }

                if err := os.RemoveAll(p); err != nil {
                    appctx.GetLogger(ctx).Info().Str("versionpath", p).Str("nodepath", n.InternalPath()).Err(err).Msg("error removing version")
                }

            } else {
                // if no other upload session is in progress (processing id != session id) or has finished (processing id == "")
                latestSession, err := n.ProcessingID(ctx)
                if err != nil {
                    appctx.GetLogger(ctx).Error().Err(err).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Str("uploadid", session.ID()).Msg("reading processingid for session failed")
                }
                if latestSession == session.ID() {
                    // actually delete the node
                    session.removeNode(ctx)
                }
                // FIXME else if the upload has become a revision, delete the revision, or if it is the last one, delete the node
            }
        }
    }

    if cleanBin {
        if err := os.Remove(session.binPath()); err != nil && !errors.Is(err, fs.ErrNotExist) {
            appctx.GetLogger(ctx).Error().Str("path", session.binPath()).Err(err).Msg("removing upload failed")
        }
    }

    if cleanInfo {
        if err := session.Purge(ctx); err != nil && !errors.Is(err, fs.ErrNotExist) {
            appctx.GetLogger(ctx).Error().Err(err).Str("session", session.ID()).Msg("removing upload info failed")
        }
    }
}

so it wipes the bin ... and then calls session.Purge(ctx) which tries to wipe the bin again ...


// Purge deletes the upload session metadata and written binary data
func (s *OcisSession) Purge(ctx context.Context) error {
    sessionPath := sessionPath(s.store.root, s.info.ID)
    f, err := lockedfile.OpenFile(sessionPath+".lock", os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0600)
    if err != nil {
        return err
    }
    defer func() {
        f.Close()
        os.Remove(sessionPath + ".lock")
    }()
    if err := os.Remove(sessionPath); err != nil {
        return err
    }
    if err := os.Remove(s.binPath()); err != nil {
        return err
    }
    return nil
}

so we really try to wipe the bin ... hard ... and if it is gone we don't bother logging an error which is ok ...

but it seems another event tries to do something with wthe event ... ๐Ÿค”

butonic commented 23 hours ago

the PostprocessingFinished event seems to be received twice:

> [Go 491]: PostprocessingFinished events.PostprocessingFinished {UploadID: "bf447069-3635-456f-9248-f491d30fe344", Filename: "f141.bin", SpaceOwner: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId nil, ExecutingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User {state: (*"google.golang.org/protobuf/internal/impl.MessageState")(0xc0040120a0), sizeCache: 0, unknownFields: []uint8 len: 0, cap: 0, nil, Id: *(*"github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId")(0xc0040001e0), Username: "einstein", Mail: "", MailVerified: false, DisplayName: "Albert Einstein", Groups: []string len: 0, cap: 0, nil, Opaque: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Opaque nil, UidNumber: 0, GidNumber: 0}, Result: map[github.com/cs3org/reva/v2/pkg/events.Postprocessingstep]interface {} nil, Outcome: "continue", Timestamp: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Timestamp nil, ImpersonatingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User nil}
decomposedfs.go:279
> [Go 491]: PostprocessingFinished events.PostprocessingFinished {UploadID: "bf447069-3635-456f-9248-f491d30fe344", Filename: "f141.bin", SpaceOwner: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId nil, ExecutingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User {state: (*"google.golang.org/protobuf/internal/impl.MessageState")(0xc008237680), sizeCache: 0, unknownFields: []uint8 len: 0, cap: 0, nil, Id: *(*"github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId")(0xc006d291d0), Username: "einstein", Mail: "", MailVerified: false, DisplayName: "Albert Einstein", Groups: []string len: 0, cap: 0, nil, Opaque: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Opaque nil, UidNumber: 0, GidNumber: 0}, Result: map[github.com/cs3org/reva/v2/pkg/events.Postprocessingstep]interface {} nil, Outcome: "continue", Timestamp: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Timestamp nil, ImpersonatingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User nil}
decomposedfs.go:279
butonic commented 23 hours ago

and we actually emit the event twice:

> [Go 230]: pp finished "e325b07a-2cc8-4866-bc01-862227740c1c"
postprocessing.go:95
> [Go 333]: PostprocessingFinished events.PostprocessingFinished {UploadID: "e325b07a-2cc8-4866-bc01-862227740c1c", Filename: "f13.bin", SpaceOwner: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId nil, ExecutingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User {state: (*"google.golang.org/protobuf/internal/impl.MessageState")(0xc003aa01e0), sizeCache: 0, unknownFields: []uint8 len: 0, cap: 0, nil, Id: *(*"github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId")(0xc003a8cf50), Username: "einstein", Mail: "", MailVerified: false, DisplayName: "Albert Einstein", Groups: []string len: 0, cap: 0, nil, Opaque: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Opaque nil, UidNumber: 0, GidNumber: 0}, Result: map[github.com/cs3org/reva/v2/pkg/events.Postprocessingstep]interface {} nil, Outcome: "continue", Timestamp: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Timestamp nil, ImpersonatingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User nil}
decomposedfs.go:279
> [Go 230]: pp finished "e325b07a-2cc8-4866-bc01-862227740c1c"
postprocessing.go:95
> [Go 333]: PostprocessingFinished events.PostprocessingFinished {UploadID: "e325b07a-2cc8-4866-bc01-862227740c1c", Filename: "f13.bin", SpaceOwner: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId nil, ExecutingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User {state: (*"google.golang.org/protobuf/internal/impl.MessageState")(0xc0042cc0a0), sizeCache: 0, unknownFields: []uint8 len: 0, cap: 0, nil, Id: *(*"github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId")(0xc00595e0f0), Username: "einstein", Mail: "", MailVerified: false, DisplayName: "Albert Einstein", Groups: []string len: 0, cap: 0, nil, Opaque: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Opaque nil, UidNumber: 0, GidNumber: 0}, Result: map[github.com/cs3org/reva/v2/pkg/events.Postprocessingstep]interface {} nil, Outcome: "continue", Timestamp: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Timestamp nil, ImpersonatingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User nil}
decomposedfs.go:279
butonic commented 23 hours ago

making the postprocessing service spawn a goroutine for every event:

index 13441567d9..73dd42b05e 100644
--- a/services/postprocessing/pkg/service/service.go
+++ b/services/postprocessing/pkg/service/service.go
@@ -66,17 +66,19 @@ func NewPostprocessingService(ctx context.Context, stream events.Stream, logger
 // Run to fulfil Runner interface
 func (pps *PostprocessingService) Run() error {
        for e := range pps.events {
-               err := pps.processEvent(e)
-               if err != nil {
-                       switch {
-                       case errors.Is(err, ErrFatal):
-                               return err
-                       case errors.Is(err, ErrEvent):
-                               continue
-                       default:
-                               pps.log.Fatal().Err(err).Msg("unknown error - exiting")
+               go func(e events.Event) {
+                       err := pps.processEvent(e)
+                       if err != nil {
+                               switch {
+                               case errors.Is(err, ErrFatal):
+                                       pps.log.Fatal().Err(err).Msg("fatal error - exiting")
+                               case errors.Is(err, ErrEvent):
+                                       pps.log.Error().Err(err).Msg("continuing")
+                               default:
+                                       pps.log.Fatal().Err(err).Msg("unknown error - exiting")
+                               }
                        }
-               }
+               }(e)
        }
        return nil
 }

I can no longer reproduce the issue .... ๐Ÿค”

butonic commented 22 hours ago

we only emit BytesReceived once:

> [Go 5387]: BytesReceived "7a029f85-fda5-4dbb-ada4-d482d46bc325"
upload.go:202
> [Go 228]: pp finished "7a029f85-fda5-4dbb-ada4-d482d46bc325" "delay"
postprocessing.go:95
> [Go 586]: PostprocessingFinished events.PostprocessingFinished {UploadID: "7a029f85-fda5-4dbb-ada4-d482d46bc325", Filename: "f145.bin", SpaceOwner: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId nil, ExecutingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User {state: (*"google.golang.org/protobuf/internal/impl.MessageState")(0xc00b43e5a0), sizeCache: 0, unknownFields: []uint8 len: 0, cap: 0, nil, Id: *(*"github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId")(0xc005552be0), Username: "einstein", Mail: "", MailVerified: false, DisplayName: "Albert Einstein", Groups: []string len: 0, cap: 0, nil, Opaque: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Opaque nil, UidNumber: 0, GidNumber: 0}, Result: map[github.com/cs3org/reva/v2/pkg/events.Postprocessingstep]interface {} nil, Outcome: "continue", Timestamp: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Timestamp nil, ImpersonatingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User nil}
decomposedfs.go:279
> [Go 228]: pp finished "7a029f85-fda5-4dbb-ada4-d482d46bc325" "delay"
postprocessing.go:95
> [Go 586]: PostprocessingFinished events.PostprocessingFinished {UploadID: "7a029f85-fda5-4dbb-ada4-d482d46bc325", Filename: "f145.bin", SpaceOwner: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId nil, ExecutingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User {state: (*"google.golang.org/protobuf/internal/impl.MessageState")(0xc005f783c0), sizeCache: 0, unknownFields: []uint8 len: 0, cap: 0, nil, Id: *(*"github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId")(0xc005f33a90), Username: "einstein", Mail: "", MailVerified: false, DisplayName: "Albert Einstein", Groups: []string len: 0, cap: 0, nil, Opaque: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Opaque nil, UidNumber: 0, GidNumber: 0}, Result: map[github.com/cs3org/reva/v2/pkg/events.Postprocessingstep]interface {} nil, Outcome: "continue", Timestamp: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Timestamp nil, ImpersonatingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User nil}
butonic commented 22 hours ago

we even get two deley events:

> [Go 3824]: BytesReceived "8a0a000d-25ac-4e7b-ac1b-7e088cc8c57c"
upload.go:202
> [Go 360]: StartPostprocessingStep "8a0a000d-25ac-4e7b-ac1b-7e088cc8c57c"
service.go:162
> [Go 360]: pp finished "8a0a000d-25ac-4e7b-ac1b-7e088cc8c57c" "delay"
postprocessing.go:95
> [Go 289]: PostprocessingFinished events.PostprocessingFinished {UploadID: "8a0a000d-25ac-4e7b-ac1b-7e088cc8c57c", Filename: "f152.bin", SpaceOwner: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId nil, ExecutingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User {state: (*"google.golang.org/protobuf/internal/impl.MessageState")(0xc002fe3180), sizeCache: 0, unknownFields: []uint8 len: 0, cap: 0, nil, Id: *(*"github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId")(0xc004c2bcc0), Username: "einstein", Mail: "", MailVerified: false, DisplayName: "Albert Einstein", Groups: []string len: 0, cap: 0, nil, Opaque: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Opaque nil, UidNumber: 0, GidNumber: 0}, Result: map[github.com/cs3org/reva/v2/pkg/events.Postprocessingstep]interface {} nil, Outcome: "continue", Timestamp: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Timestamp nil, ImpersonatingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User nil} "d45488ee-8e78-4daf-af27-7cad3d7343c9"
decomposedfs.go:279
> [Go 360]: StartPostprocessingStep "8a0a000d-25ac-4e7b-ac1b-7e088cc8c57c"
service.go:162
> [Go 360]: pp finished "8a0a000d-25ac-4e7b-ac1b-7e088cc8c57c" "delay"
postprocessing.go:95
> [Go 289]: PostprocessingFinished events.PostprocessingFinished {UploadID: "8a0a000d-25ac-4e7b-ac1b-7e088cc8c57c", Filename: "f152.bin", SpaceOwner: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId nil, ExecutingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User {state: (*"google.golang.org/protobuf/internal/impl.MessageState")(0xc005d77d60), sizeCache: 0, unknownFields: []uint8 len: 0, cap: 0, nil, Id: *(*"github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId")(0xc005f06050), Username: "einstein", Mail: "", MailVerified: false, DisplayName: "Albert Einstein", Groups: []string len: 0, cap: 0, nil, Opaque: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Opaque nil, UidNumber: 0, GidNumber: 0}, Result: map[github.com/cs3org/reva/v2/pkg/events.Postprocessingstep]interface {} nil, Outcome: "continue", Timestamp: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Timestamp nil, ImpersonatingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User nil} "cd945f97-7ec0-4c88-969e-5b67ebf138c6"
butonic commented 22 hours ago

hm we only get one BytesReceived event ... but somehow produce multiple StartPostProcessingStep events:

> [Go 4137]: BytesReceived "8b20ebfb-3421-43e4-9269-de08165895f9"
upload.go:202
> [Go 366]: received BytesReceived "8b20ebfb-3421-43e4-9269-de08165895f9"
service.go:112
> [Go 366]: StartPostprocessingStep "8b20ebfb-3421-43e4-9269-de08165895f9"
service.go:162
> [Go 366]: pp finished "8b20ebfb-3421-43e4-9269-de08165895f9" "delay"
postprocessing.go:95
> [Go 373]: PostprocessingFinished events.PostprocessingFinished {UploadID: "8b20ebfb-3421-43e4-9269-de08165895f9", Filename: "f126.bin", SpaceOwner: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId nil, ExecutingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User {state: (*"google.golang.org/protobuf/internal/impl.MessageState")(0xc003da86e0), sizeCache: 0, unknownFields: []uint8 len: 0, cap: 0, nil, Id: *(*"github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId")(0xc008922870), Username: "einstein", Mail: "", MailVerified: false, DisplayName: "Albert Einstein", Groups: []string len: 0, cap: 0, nil, Opaque: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Opaque nil, UidNumber: 0, GidNumber: 0}, Result: map[github.com/cs3org/reva/v2/pkg/events.Postprocessingstep]interface {} nil, Outcome: "continue", Timestamp: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Timestamp nil, ImpersonatingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User nil} "73bc21cb-bc9d-4403-a9af-36faba7eab86"
decomposedfs.go:279
> [Go 366]: StartPostprocessingStep "8b20ebfb-3421-43e4-9269-de08165895f9"
service.go:162
> [Go 366]: pp finished "8b20ebfb-3421-43e4-9269-de08165895f9" "delay"
postprocessing.go:95
> [Go 373]: PostprocessingFinished events.PostprocessingFinished {UploadID: "8b20ebfb-3421-43e4-9269-de08165895f9", Filename: "f126.bin", SpaceOwner: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId nil, ExecutingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User {state: (*"google.golang.org/protobuf/internal/impl.MessageState")(0xc004e68460), sizeCache: 0, unknownFields: []uint8 len: 0, cap: 0, nil, Id: *(*"github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId")(0xc004de9220), Username: "einstein", Mail: "", MailVerified: false, DisplayName: "Albert Einstein", Groups: []string len: 0, cap: 0, nil, Opaque: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Opaque nil, UidNumber: 0, GidNumber: 0}, Result: map[github.com/cs3org/reva/v2/pkg/events.Postprocessingstep]interface {} nil, Outcome: "continue", Timestamp: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Timestamp nil, ImpersonatingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User nil} "161b47d5-9173-40f9-93f1-913c42f617a6"
butonic commented 22 hours ago

hmmm ... triple events processing:

> [Go 5222]: BytesReceived "6f6cc35c-753f-46df-b665-b430a924aad6"
upload.go:202
> [Go 366]: received BytesReceived "6f6cc35c-753f-46df-b665-b430a924aad6"
service.go:112
> [Go 366]: received BytesReceived "6f6cc35c-753f-46df-b665-b430a924aad6"
service.go:112
> [Go 366]: StartPostprocessingStep "6f6cc35c-753f-46df-b665-b430a924aad6"
service.go:162
> [Go 366]: pp finished "6f6cc35c-753f-46df-b665-b430a924aad6" "delay"
postprocessing.go:95
> [Go 373]: PostprocessingFinished events.PostprocessingFinished {UploadID: "6f6cc35c-753f-46df-b665-b430a924aad6", Filename: "f158.bin", SpaceOwner: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId nil, ExecutingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User {state: (*"google.golang.org/protobuf/internal/impl.MessageState")(0xc0080665a0), sizeCache: 0, unknownFields: []uint8 len: 0, cap: 0, nil, Id: *(*"github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId")(0xc007456dc0), Username: "einstein", Mail: "", MailVerified: false, DisplayName: "Albert Einstein", Groups: []string len: 0, cap: 0, nil, Opaque: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Opaque nil, UidNumber: 0, GidNumber: 0}, Result: map[github.com/cs3org/reva/v2/pkg/events.Postprocessingstep]interface {} nil, Outcome: "continue", Timestamp: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Timestamp nil, ImpersonatingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User nil} "488c2a8b-b496-4dff-831c-30ec56a1ece0"
decomposedfs.go:279
> [Go 366]: StartPostprocessingStep "6f6cc35c-753f-46df-b665-b430a924aad6"
service.go:162
> [Go 366]: pp finished "6f6cc35c-753f-46df-b665-b430a924aad6" "delay"
postprocessing.go:95
> [Go 373]: PostprocessingFinished events.PostprocessingFinished {UploadID: "6f6cc35c-753f-46df-b665-b430a924aad6", Filename: "f158.bin", SpaceOwner: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId nil, ExecutingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User {state: (*"google.golang.org/protobuf/internal/impl.MessageState")(0xc004b3b180), sizeCache: 0, unknownFields: []uint8 len: 0, cap: 0, nil, Id: *(*"github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId")(0xc0046e2cd0), Username: "einstein", Mail: "", MailVerified: false, DisplayName: "Albert Einstein", Groups: []string len: 0, cap: 0, nil, Opaque: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Opaque nil, UidNumber: 0, GidNumber: 0}, Result: map[github.com/cs3org/reva/v2/pkg/events.Postprocessingstep]interface {} nil, Outcome: "continue", Timestamp: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Timestamp nil, ImpersonatingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User nil} "3514e795-303b-4e87-b45b-80da12a30832"
decomposedfs.go:279
{"level":"error","event":"PostprocessingFinished","uploadid":"6f6cc35c-753f-46df-b665-b430a924aad6","error":"ERR_UPLOAD_NOT_FOUND: upload not found","time":"2024-10-21T15:59:32+02:00","message":"Failed to get upload"}
> [Go 366]: StartPostprocessingStep "6f6cc35c-753f-46df-b665-b430a924aad6"
service.go:162
> [Go 366]: pp finished "6f6cc35c-753f-46df-b665-b430a924aad6" "delay"
postprocessing.go:95
> [Go 373]: PostprocessingFinished events.PostprocessingFinished {UploadID: "6f6cc35c-753f-46df-b665-b430a924aad6", Filename: "f158.bin", SpaceOwner: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId nil, ExecutingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User {state: (*"google.golang.org/protobuf/internal/impl.MessageState")(0xc005741360), sizeCache: 0, unknownFields: []uint8 len: 0, cap: 0, nil, Id: *(*"github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId")(0xc0059156d0), Username: "einstein", Mail: "", MailVerified: false, DisplayName: "Albert Einstein", Groups: []string len: 0, cap: 0, nil, Opaque: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Opaque nil, UidNumber: 0, GidNumber: 0}, Result: map[github.com/cs3org/reva/v2/pkg/events.Postprocessingstep]interface {} nil, Outcome: "continue", Timestamp: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Timestamp nil, ImpersonatingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User nil} "2aa106cf-3923-4fe7-9ec7-fec1ac4e4c4a"
decomposedfs.go:279
{"level":"error","event":"PostprocessingFinished","uploadid":"6f6cc35c-753f-46df-b665-b430a924aad6","error":"ERR_UPLOAD_NOT_FOUND: upload not found","time":"2024-10-21T16:05:47+02:00","message":"Failed to get upload"}

but ... on my disk:

โœ— ls -l /home/jfd/.ocis/storage/users/spaces/4c/510ada-c86b-4815-8820-42cdf82c3d51/nodes/4c/51/0a/da/-c86b-4815-8820-42cdf82c3d51/1000_4k/f158.bin 
lrwxrwxrwx 1 jfd jfd 55 21. Okt 15:48 /home/jfd/.ocis/storage/users/spaces/4c/510ada-c86b-4815-8820-42cdf82c3d51/nodes/4c/51/0a/da/-c86b-4815-8820-42cdf82c3d51/1000_4k/f158.bin -> ../../../../../3e/0f/5f/4f/-16df-4059-b9cc-d9469b33af2d
โœ— cat /home/jfd/.ocis/storage/users/spaces/4c/510ada-c86b-4815-8820-42cdf82c3d51/nodes/3e/0f/5f/4f/-16df-4059-b9cc-d9469b33af2d.mpk 
๏ฟฝ๏ฟฝuser.ocis.blobid๏ฟฝ$6f6cc35c-753f-46df-b665-b430a924aad6๏ฟฝuser.ocis.id๏ฟฝ$3e0f5f4f-16df-4059-b9cc-d9469b33af2d๏ฟฝuser.ocis.cs.adler32๏ฟฝ๏ฟฝ3๏ฟฝ๏ฟฝ๏ฟฝuser.ocis.cs.sha1๏ฟฝ๏ฟฝ~๏ฟฝ๏ฟฝbcP๏ฟฝ๏ฟฝ็ฎŽ
                                                                                                                                                                   ๏ฟฝ๏ฟฝำ™๏ฟฝ{หฒuser.ocis.blobsize๏ฟฝ4096๏ฟฝuser.ocis.cs.md5๏ฟฝูด๏ฟฝ๏ฟฝt๏ฟฝ+ใƒค๏ฟฝ+๏ฟฝ\&๏ฟฝuser.ocis.parentid๏ฟฝ$af56b721-b7c9-43fe-8307-af8fb66c0da2๏ฟฝuser.ocis.namef158.bin๏ฟฝuser.ocis.mtime๏ฟฝ2022-11-02T13:46:59Z๏ฟฝuser.ocis.type๏ฟฝ1%

โœ— go run ./ocis/cmd/ocis decomposedfs metadata -r /home/jfd/.ocis/storage/users -n /home/jfd/.ocis/storage/users/spaces/4c/510ada-c86b-4815-8820-42cdf82c3d51/nodes/3e/0f/5f/4f/-16df-4059-b9cc-d9469b33af2d dump
2024/10/21 16:11:43 INFO memory is not limited, skipping package=github.com/KimMachineGun/automemlimit/memlimit
user.ocis.blobid="6f6cc35c-753f-46df-b665-b430a924aad6"
user.ocis.blobsize="4096"
user.ocis.cs.adler32=0swjPZ/A==
user.ocis.cs.md5=0s2bSEyHS4K+ODpNIrHMlcJg==
user.ocis.cs.sha1=0siH6mrWJjUI+P566OC9HI05nke8s=
user.ocis.id="3e0f5f4f-16df-4059-b9cc-d9469b33af2d"
user.ocis.mtime="2022-11-02T13:46:59Z"
user.ocis.name="f158.bin"
user.ocis.parentid="af56b721-b7c9-43fe-8307-af8fb66c0da2"
user.ocis.type="1"

the postprocessing flag is not present ...

no erroro ... notihng to see here ... case closed ๐Ÿคช

maybe the problem is the processing of events in sync ๐Ÿค” und user expectancy is that these events should be processed faster ...

we could introduce a workgroup off go routines concurrently processing the events ...

butonic commented 22 hours ago

hmmm five times

> [Go 8478]: BytesReceived "f523d4b5-d064-4858-9f50-8882993e9e66"
upload.go:202
> [Go 366]: received BytesReceived "f523d4b5-d064-4858-9f50-8882993e9e66"
service.go:112
> [Go 366]: received BytesReceived "f523d4b5-d064-4858-9f50-8882993e9e66"
service.go:112
> [Go 366]: StartPostprocessingStep "f523d4b5-d064-4858-9f50-8882993e9e66"
service.go:162
> [Go 366]: pp finished "f523d4b5-d064-4858-9f50-8882993e9e66" "delay"
> [Go 373]: PostprocessingFinished events.PostprocessingFinished {UploadID: "f523d4b5-d064-4858-9f50-8882993e9e66", Filename: "f188.bin", SpaceOwner: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId nil, ExecutingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User {state: (*"google.golang.org/protobuf/internal/impl.MessageState")(0xc003150f00), sizeCache: 0, unknownFields: []uint8 len: 0, cap: 0, nil, Id: *(*"github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId")(0xc0020d88c0), Username: "einstein", Mail: "", MailVerified: false, DisplayName: "Albert Einstein", Groups: []string len: 0, cap: 0, nil, Opaque: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Opaque nil, UidNumber: 0, GidNumber: 0}, Result: map[github.com/cs3org/reva/v2/pkg/events.Postprocessingstep]interface {} nil, Outcome: "continue", Timestamp: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Timestamp nil, ImpersonatingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User nil} "57ee1d74-a384-47c6-90dd-ee202ee387d1"
decomposedfs.go:279
> [Go 366]: StartPostprocessingStep "f523d4b5-d064-4858-9f50-8882993e9e66"
service.go:162
> [Go 366]: pp finished "f523d4b5-d064-4858-9f50-8882993e9e66" "delay"
postprocessing.go:95
> [Go 373]: PostprocessingFinished events.PostprocessingFinished {UploadID: "f523d4b5-d064-4858-9f50-8882993e9e66", Filename: "f188.bin", SpaceOwner: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId nil, ExecutingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User {state: (*"google.golang.org/protobuf/internal/impl.MessageState")(0xc004b3a820), sizeCache: 0, unknownFields: []uint8 len: 0, cap: 0, nil, Id: *(*"github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId")(0xc0064c6eb0), Username: "einstein", Mail: "", MailVerified: false, DisplayName: "Albert Einstein", Groups: []string len: 0, cap: 0, nil, Opaque: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Opaque nil, UidNumber: 0, GidNumber: 0}, Result: map[github.com/cs3org/reva/v2/pkg/events.Postprocessingstep]interface {} nil, Outcome: "continue", Timestamp: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Timestamp nil, ImpersonatingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User nil} "10bf3056-ec9a-479b-b0d6-dc1aafbd92b7"
decomposedfs.go:279
{"level":"error","event":"PostprocessingFinished","uploadid":"f523d4b5-d064-4858-9f50-8882993e9e66","error":"ERR_UPLOAD_NOT_FOUND: upload not found","time":"2024-10-21T16:00:07+02:00","message":"Failed to get upload"}
> [Go 366]: StartPostprocessingStep "f523d4b5-d064-4858-9f50-8882993e9e66"
service.go:162
> [Go 366]: pp finished "f523d4b5-d064-4858-9f50-8882993e9e66" "delay"
postprocessing.go:95
> [Go 373]: PostprocessingFinished events.PostprocessingFinished {UploadID: "f523d4b5-d064-4858-9f50-8882993e9e66", Filename: "f188.bin", SpaceOwner: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId nil, ExecutingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User {state: (*"google.golang.org/protobuf/internal/impl.MessageState")(0xc00506a000), sizeCache: 0, unknownFields: []uint8 len: 0, cap: 0, nil, Id: *(*"github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId")(0xc000892190), Username: "einstein", Mail: "", MailVerified: false, DisplayName: "Albert Einstein", Groups: []string len: 0, cap: 0, nil, Opaque: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Opaque nil, UidNumber: 0, GidNumber: 0}, Result: map[github.com/cs3org/reva/v2/pkg/events.Postprocessingstep]interface {} nil, Outcome: "continue", Timestamp: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Timestamp nil, ImpersonatingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User nil} "166322bd-03b3-4904-b7cb-491cce995f16"
decomposedfs.go:279
{"level":"error","event":"PostprocessingFinished","uploadid":"f523d4b5-d064-4858-9f50-8882993e9e66","error":"ERR_UPLOAD_NOT_FOUND: upload not found","time":"2024-10-21T16:07:01+02:00","message":"Failed to get upload"}
> [Go 366]: StartPostprocessingStep "f523d4b5-d064-4858-9f50-8882993e9e66"
service.go:162
> [Go 366]: pp finished "f523d4b5-d064-4858-9f50-8882993e9e66" "delay"
postprocessing.go:95
> [Go 373]: PostprocessingFinished events.PostprocessingFinished {UploadID: "f523d4b5-d064-4858-9f50-8882993e9e66", Filename: "f188.bin", SpaceOwner: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId nil, ExecutingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User {state: (*"google.golang.org/protobuf/internal/impl.MessageState")(0xc000b3d680), sizeCache: 0, unknownFields: []uint8 len: 0, cap: 0, nil, Id: *(*"github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId")(0xc00c5a0b90), Username: "einstein", Mail: "", MailVerified: false, DisplayName: "Albert Einstein", Groups: []string len: 0, cap: 0, nil, Opaque: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Opaque nil, UidNumber: 0, GidNumber: 0}, Result: map[github.com/cs3org/reva/v2/pkg/events.Postprocessingstep]interface {} nil, Outcome: "continue", Timestamp: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Timestamp nil, ImpersonatingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User nil} "38d06943-75fd-46a5-b37e-d494a497f814"
decomposedfs.go:279
{"level":"error","event":"PostprocessingFinished","uploadid":"f523d4b5-d064-4858-9f50-8882993e9e66","error":"ERR_UPLOAD_NOT_FOUND: upload not found","time":"2024-10-21T16:09:05+02:00","message":"Failed to get upload"}
> [Go 366]: StartPostprocessingStep "f523d4b5-d064-4858-9f50-8882993e9e66"
service.go:162
> [Go 366]: pp finished "f523d4b5-d064-4858-9f50-8882993e9e66" "delay"
postprocessing.go:95
> [Go 373]: PostprocessingFinished events.PostprocessingFinished {UploadID: "f523d4b5-d064-4858-9f50-8882993e9e66", Filename: "f188.bin", SpaceOwner: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId nil, ExecutingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User {state: (*"google.golang.org/protobuf/internal/impl.MessageState")(0xc0078f26e0), sizeCache: 0, unknownFields: []uint8 len: 0, cap: 0, nil, Id: *(*"github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.UserId")(0xc004754690), Username: "einstein", Mail: "", MailVerified: false, DisplayName: "Albert Einstein", Groups: []string len: 0, cap: 0, nil, Opaque: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Opaque nil, UidNumber: 0, GidNumber: 0}, Result: map[github.com/cs3org/reva/v2/pkg/events.Postprocessingstep]interface {} nil, Outcome: "continue", Timestamp: *github.com/cs3org/go-cs3apis/cs3/types/v1beta1.Timestamp nil, ImpersonatingUser: *github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1.User nil} "e440cd4a-e3ee-40f0-a90c-ab51e5cec91c"
decomposedfs.go:279
{"level":"error","event":"PostprocessingFinished","uploadid":"f523d4b5-d064-4858-9f50-8882993e9e66","error":"ERR_UPLOAD_NOT_FOUND: upload not found","time":"2024-10-21T16:16:00+02:00","message":"Failed to get upload"}

if these events are coming up again and again ... it might look like the postprocessing queue never ends ... hm we still need to find the reason for the reemittance of the StartPostprocessingStep events ...

the PostprocessingFinished lines show the we emit a new event, they all have a different event id

butonic commented 21 hours ago

the upload processing is still going on and now i see a lot of:

nats: slow consumer, messages dropped on connection [37] for subscription on "main-queue"
nats: slow consumer, messages dropped on connection [37] for subscription on "main-queue"
nats: slow consumer, messages dropped on connection [37] for subscription on "main-queue"
nats: slow consumer, messages dropped on connection [37] for subscription on "main-queue"
nats: slow consumer, messages dropped on connection [37] for subscription on "main-queue"
nats: slow consumer, messages dropped on connection [37] for subscription on "main-queue"
nats: slow consumer, messages dropped on connection [37] for subscription on "main-queue"

IIRC @rhafer is seeing this as well.

now ... if the wrong messages are dropped, because we are not fetching them fast enough an upload might never be processed ...

butonic commented 20 hours ago

The autoAck option only acks messages affter they have been fetched from the channel. The postrocessing service only uses one go routine to fetch events. with delays set to 1s this causes nats to consider the postprocessing service a slow consumer. Furthermore BytesReady and PostprocessingStepFinished events are handled on the main-queue, causing the processing service to become the bottleneck. In k8s deployments we start two postprocessing services which allows ocis to handle events twice as fast, but still not fast enough if virusscannig needs to handle many files.

To prevent the postprocessing from using excessive ram we will not just spawn a go routine for every event.

Instead, we will start a configurable worker group with 10 go routines.

butonic commented 20 hours ago

hm the analysis is really skewed by the delay. that option has a direct impact on the processing of events, while the antivirus scanner does eg not block processing of new events while a scan is running... AFAICT @kobergj or does it?

even a workgroup of 10 will get stuck in delays ... the delay step should return immediately but start a go routine that emits a PostProcessingStepFinished event after the delay ... currently it takes a shortcut.

kobergj commented 5 hours ago

hm the analysis is really skewed by the delay. that option has a direct impact on the processing of events, while the antivirus scanner does eg not block processing of new events while a scan is running... AFAICT @kobergj or does it?

Correct - The antivirus service is a separate microservice. Therefore it will NOT stop postprocessing service from processing events. However there could be other limiting factors (cpu, mem) that may be depleted when antivirus runs scans

butonic commented 4 hours ago

hm the activity log also does not consume events fast enough

โœ— ~/go/bin/nats consumer report
[ocis] ? Select a Stream main-queue
โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ
โ”‚                                              Consumer report for main-queue with 12 consumers                                             โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ Consumer                                 โ”‚ Mode โ”‚ Ack Policy โ”‚ Ack Wait โ”‚ Ack Pending โ”‚ Redelivered โ”‚ Unprocessed   โ”‚ Ack Floor โ”‚ Cluster โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ activitylog                              โ”‚ Push โ”‚ Explicit   โ”‚ 30.00s   โ”‚ 1,000       โ”‚ 1,000       โ”‚ 68,987 / 44%  โ”‚ 85,585    โ”‚         โ”‚
โ”‚ clientlog                                โ”‚ Push โ”‚ Explicit   โ”‚ 30.00s   โ”‚ 0           โ”‚ 0           โ”‚ 0             โ”‚ 155,572   โ”‚         โ”‚
โ”‚ dcfs                                     โ”‚ Push โ”‚ Explicit   โ”‚ 30.00s   โ”‚ 0           โ”‚ 0           โ”‚ 0             โ”‚ 155,572   โ”‚         โ”‚
โ”‚ evhistory                                โ”‚ Push โ”‚ Explicit   โ”‚ 30.00s   โ”‚ 0           โ”‚ 0           โ”‚ 0             โ”‚ 155,572   โ”‚         โ”‚
โ”‚ frontend                                 โ”‚ Push โ”‚ Explicit   โ”‚ 30.00s   โ”‚ 0           โ”‚ 0           โ”‚ 0             โ”‚ 155,572   โ”‚         โ”‚
โ”‚ graph                                    โ”‚ Push โ”‚ Explicit   โ”‚ 30.00s   โ”‚ 0           โ”‚ 0           โ”‚ 0             โ”‚ 155,572   โ”‚         โ”‚
โ”‚ notifications                            โ”‚ Push โ”‚ Explicit   โ”‚ 30.00s   โ”‚ 0           โ”‚ 0           โ”‚ 0             โ”‚ 155,572   โ”‚         โ”‚
โ”‚ postprocessing                           โ”‚ Push โ”‚ Explicit   โ”‚ 30.00s   โ”‚ 1,000       โ”‚ 972         โ”‚ 135,841 / 87% โ”‚ 18,731    โ”‚         โ”‚
โ”‚ search                                   โ”‚ Push โ”‚ Explicit   โ”‚ 30.00s   โ”‚ 0           โ”‚ 0           โ”‚ 0             โ”‚ 155,572   โ”‚         โ”‚
โ”‚ sse-2e53bc47-33d6-4c69-8955-f162297a3a87 โ”‚ Push โ”‚ Explicit   โ”‚ 30.00s   โ”‚ 0           โ”‚ 0           โ”‚ 0             โ”‚ 155,572   โ”‚         โ”‚
โ”‚ storage-users                            โ”‚ Push โ”‚ Explicit   โ”‚ 30.00s   โ”‚ 0           โ”‚ 0           โ”‚ 0             โ”‚ 155,572   โ”‚         โ”‚
โ”‚ userlog                                  โ”‚ Push โ”‚ Explicit   โ”‚ 30.00s   โ”‚ 0           โ”‚ 0           โ”‚ 0             โ”‚ 155,572   โ”‚         โ”‚
โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ

hm ... disabling all the logpoints allowed ocis to process the events and drain the queue in a few seconds.

also ... after all events were processed all files were accessible. no file was stuck.

rhafer commented 4 hours ago

This might be interesting: https://docs.nats.io/nats-concepts/jetstream/consumers#maxackpending

"For high throughput, set MaxAckPending to a high value. For applications with high latency due to external services, use a lower value and adjust AckWait to avoid re-deliveries."

So for postprocessing it might make sense to actually lower MaxAckPending (maybe to the size of the worker group processing the events?)

kobergj commented 3 hours ago

Good idea. Should we lower MaxAckPending then for all services?

butonic commented 3 hours ago

MaxAckPending needs to be added to go micro before we can add it in reva and then in ocis. I would save that for a new PR after https://github.com/owncloud/ocis/pull/10372

nicholas-wilson-au commented 1 hour ago

image

butonic commented 11 minutes ago

@kobergj @rhafer AFAICT the antivirus scanner has a similar bottleneck as the postprocessing service. But it is more severe: the postrocessing service can pull events fast enough and increasing the number of replicas efficiently increases the capacity.

Increasing the number of antivirus pods does not increase the frequency of pulls from the queue: it just adds one consumer to the group, but each consumer handles events in sync. @nicholas-wilson-au is testing with 1000 10MB files. 10GB in total ... and every scan seems to take some time.

We increased the number of antivirus pods to 17, which effectively allows us to have 17 files in virus scanning at the same time. That was still not fast enough and nats tried to redeliver the 1000 messages, which only worsens the problem because we are rescanning the files ๐Ÿคฆ

Now ... we can decrease the number of pending messages with st like:

nats consumer edit main-queue antivirus --max-pending=17

in order to allow one pending message per consumer in the antivirus group. But since scans may take some time we may need to also tweak the ack time with eg.

nats consumer edit main-queue antivirus --max-pending=5 --wait=60s

now ... this can be done manually by an admin ... but we should set better defaults.