Open deitch opened 4 weeks ago
How about a type ProgressManager interface
in CopyOptions
?
How about a type ProgressManager interface in CopyOptions?
What does it look like? How is it better than a channel or a func?
Maybe a practical example would help me understand it?
How about a type ProgressManager interface in CopyOptions?
What does it look like? How is it better than a channel or a func?
Maybe a practical example would help me understand it?
Something like https://pkg.go.dev/oras.land/oras@v1.2.0/cmd/oras/internal/display/status/progress#Manager
type Manager interface {
Add() (Status, error)
Close() error
}
type Status chan *status
// status is used as message to update progress view.
type status struct {
done bool // done is true when the end time is set
prompt string
descriptor ocispec.Descriptor
offset int64
total humanize.Bytes
speedWindow *speedWindow
startTime time.Time
endTime time.Time
mark spinner
lock sync.Mutex
}
The oras
CLI combines the status update and view model in a single structure.
Maybe we can improve a bit by separating them. /cc @qweeah
I truly don't remember that part. 😁
I could see having an err error
and a done bool
property, not sure I would want much else.
What is the Manager
used for? Is that just so that a UI-implementing side (like oras CLI) can manage it? If so, I would keep it separate. Simplest possible method (channel or func call) passed to Copy()
, let the consuming side worry about what it does with it.
If we want to add progress to oras CLI, it becomes a consuming party, I think I would handle it as a separate addition in a separate PR.
I thought about this a bit more over the last few days. I still am having a hard time seeing the benefit of a ProgressManager
interface over just having a func
or chan
. I am trying to keep it as simple as possible, which to me looks like:
:= func(msg oras.Update) {
fmt.Printf("copied %d bytes out of %d total for %s\n", msg.Copied, msg.Descriptor.Size, msg.Descriptor.Digest)
}
opts := oras.CopyOptions{
ProgressHandler: f
}
desc, err := oras.Copy(ctx, src, tagName, dst, tagName, opts)
With each copy of data, it just calls f(update)
.
I have been thinking a bit more about the whole "how do we call the function without blocking". I had suggested earlier:
I would consider having each call that sends to the channel or calls func() to be in a separate short-lived goroutine, which calls fund or sends to the channel, wrapped in a timeout.
but I like this less. I am worried about a proliferation of goroutines. If we have frequent updates, that could get out of control.
Here is a possible alternative.
Copy()
, if the handler is not nil
, we spin up a chan Update
and one goroutine.Copy()
) sends an Update on the channel. The goroutine we spun up receives that update and then calls the handler function.Copy()
thread put select default
on sending to the channel, so we do not have to worry about it.This is less of an issue if we pass a channel in the options, instead of a function, but I do think the option is cleaner.
Thoughts @shizhMSFT ?
I would like to have a more flexible interface and have some utility functions for easy setup. Let me try to refactor the existing oras
CLI code a bit in the next few days or a week and see if we can have better code. Besides Copy()
, I'm also holistically thinking about how we can cover the progress for things like Fetch()
, and Push()
.
Good point there @shizhMSFT . I am trying hard to keep the abstract interface (is that redundant?) that goes to Copy()
as simple as possible, while the implementation of the consumer - like oras CLI - is where other things go. I hadn't thought about Fetch()
and Push()
, but should go there as well.
Here's a solid PoC on how the Manager
and related code look like (see 2 files below).
File track/interface.go
defines the interfaces and includes a simple utility function:
package track
import (
"io"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
// State represents the state of a descriptor.
type State int
const (
StateUnknown State = iota
StateInitialized
StateTransmitting
StateTransmitted
StateExists
StateSkipped
StateMounted
)
// Status represents the status of a descriptor.
type Status struct {
// State represents the state of the descriptor.
State State
// Offset represents the current offset of the descriptor.
// Offset is discarded if set to a negative value.
Offset int64
}
// Tracker updates the status of a descriptor.
type Tracker interface {
io.Closer
// Update updates the status of the descriptor.
Update(status Status) error
// Fail marks the descriptor as failed.
Fail(err error) error
}
// Manager tracks the progress of multiple descriptors.
type Manager interface {
io.Closer
// Track starts tracking the progress of a descriptor.
Track(desc ocispec.Descriptor) (Tracker, error)
}
// Record adds the progress of a descriptor as a single entry.
func Record(m Manager, desc ocispec.Descriptor, status Status) error {
tracker, err := m.Track(desc)
if err != nil {
return err
}
err = tracker.Update(status)
if err != nil {
return err
}
return tracker.Close()
}
File track/reader.go
implements how a Tracker
can hook a Reader
:
package track
import "io"
// ReadTracker tracks the transmission based on the read operation.
type ReadTracker struct {
base io.Reader
tracker Tracker
offset int64
}
// NewReadTracker attaches a tracker to a reader.
func NewReadTracker(track Tracker, r io.Reader) *ReadTracker {
return &ReadTracker{
base: r,
tracker: track,
}
}
// Read reads from the base reader and updates the status.
func (rt *ReadTracker) Read(p []byte) (n int, err error) {
n, err = rt.base.Read(p)
rt.offset += int64(n)
_ = rt.tracker.Update(Status{
State: StateTransmitting,
Offset: rt.offset,
})
if err != nil && err != io.EOF {
_ = rt.tracker.Fail(err)
}
return n, err
}
// Close closes the tracker.
func (rt *ReadTracker) Close() error {
return rt.tracker.Close()
}
// Start starts tracking the transmission.
func (rt *ReadTracker) Start() error {
return rt.tracker.Update(Status{
State: StateInitialized,
Offset: -1,
})
}
// Done marks the transmission as complete.
// Done should be called after the transmission is complete.
// Note: Reading all content from the reader does not imply the transmission is
// complete.
func (rt *ReadTracker) Done() error {
return rt.tracker.Update(Status{
State: StateTransmitted,
Offset: -1,
})
}
Please refer to https://github.com/oras-project/oras/pull/1524 to understand how above implementation can take effect in the real code.
Here is an example using functions.
package main
import (
"context"
"fmt"
"io"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2/content"
"oras.land/oras-go/v2/registry/remote"
"oras.land/oras/internal/experimental/track"
)
type ProgressFunc func(status track.Status, err error) error
func (pf ProgressFunc) Update(status track.Status) error {
return pf(status, nil)
}
func (pf ProgressFunc) Fail(err error) error {
return pf(track.Status{}, err)
}
func (pf ProgressFunc) Close() error {
return nil
}
type DescriptorProgressFunc func(desc ocispec.Descriptor, status track.Status, err error) error
func (dpf DescriptorProgressFunc) Track(desc ocispec.Descriptor) (ProgressFunc, error) {
return func(status track.Status, err error) error {
return dpf(desc, status, err)
}, nil
}
func (dpf DescriptorProgressFunc) Close() error {
return nil
}
func main() {
// Create a new progress manager.
prompt := map[track.State]string{
track.StateInitialized: "Pending",
track.StateTransmitting: "Pulling",
track.StateTransmitted: "Pulled ",
}
manager := DescriptorProgressFunc(func(desc ocispec.Descriptor, status track.Status, err error) error {
if err != nil {
fmt.Println("Error ", desc.Digest, err)
return err
}
if status.Offset < 0 {
fmt.Println(prompt[status.State], desc.Digest)
} else {
fmt.Printf("%s %s %6.2f%%\n", prompt[status.State], desc.Digest, float64(status.Offset)/float64(desc.Size)*100)
}
return nil
})
// Pull a manifest
repo, err := remote.NewRepository("ghcr.io/oras-project/oras")
if err != nil {
panic(err)
}
ctx := context.Background()
desc, err := repo.Resolve(ctx, "v1.2.0")
if err != nil {
panic(err)
}
tracker, _ := manager.Track(desc)
defer tracker.Close()
tracker.Update(track.Status{
State: track.StateInitialized,
})
r, err := repo.Fetch(ctx, desc)
if err != nil {
tracker.Fail(err)
return
}
defer r.Close()
rt := track.NewReadTracker(tracker, r)
defer rt.Close()
vr := content.NewVerifyReader(rt, desc)
_, err = io.ReadAll(vr) // io.ReadAll uses a smaller buffer
if err != nil {
tracker.Fail(err)
return
}
err = vr.Verify()
if err != nil {
tracker.Fail(err)
return
}
rt.Done()
}
It outputs
$ go run internal/experimental/example
Pending sha256:0087224dd0decc354b5b0689068fbbc40cd5dc3dbf65fcb3868dfbd363dc790b 0.00%
Pulling sha256:0087224dd0decc354b5b0689068fbbc40cd5dc3dbf65fcb3868dfbd363dc790b 13.15%
Pulling sha256:0087224dd0decc354b5b0689068fbbc40cd5dc3dbf65fcb3868dfbd363dc790b 23.02%
Pulling sha256:0087224dd0decc354b5b0689068fbbc40cd5dc3dbf65fcb3868dfbd363dc790b 36.17%
Pulling sha256:0087224dd0decc354b5b0689068fbbc40cd5dc3dbf65fcb3868dfbd363dc790b 52.61%
Pulling sha256:0087224dd0decc354b5b0689068fbbc40cd5dc3dbf65fcb3868dfbd363dc790b 78.91%
Pulling sha256:0087224dd0decc354b5b0689068fbbc40cd5dc3dbf65fcb3868dfbd363dc790b 100.00%
Pulling sha256:0087224dd0decc354b5b0689068fbbc40cd5dc3dbf65fcb3868dfbd363dc790b 100.00%
Pulled sha256:0087224dd0decc354b5b0689068fbbc40cd5dc3dbf65fcb3868dfbd363dc790b
Currently, this model only covers descriptors and not references.
@shizhMSFT yes, this is pretty detailed. It is more advanced - but also more complex - than I had in mind.
From the Copy()
perspective, all I would think it wants is a simple way to send an update: "For descriptor desc, be informed that I have copied x bytes out of y expected total, and my latest state update is Z (error, completed, in progress)." I would think we would want it to be as simple as possible for Copy()
(and Fetch()
, etc.; any core library function).
What I think you have constructed is a more advanced thing in a few respects:
I assume (please correct me) that your thought was something like:
opts := oras.CopyOptions {
// other stuff
ProgressTracker: manager, // some implementation of manager
}
oras.Copy(ctx, src, srcRef, dst , dstRef , opts )
I think that is more complex than needs be. A manager is useful, but is not the only way to do it. For example, I might have a simpler implementation, or just need progress for one descriptor, etc.
That is why I was thinking the simple method above:
f := func(msg oras.Update) {
fmt.Printf("copied %d bytes out of %d total for %s\n", msg.Copied, msg.Descriptor.Size, msg.Descriptor.Digest)
}
opts := oras.CopyOptions{
ProgressHandler: f
}
desc, err := oras.Copy(ctx, src, tagName, dst, tagName, opts)
Now, you might actually want a manager. In which case, the update function itself might look like:
type ProgressHandler interface {
Update func(msg oras.Update)
}
// this can be the actual manager you had above; implements ProgressHandler interface
type manager struct {
}
func (m *manager) Update(msg oras.Update) {
// check the descriptor, if needed, create a tracker
}
manager := NewManager()
opts := oras.CopyOptions{
ProgressHandler: manager,
}
desc, err := oras.Copy(ctx, src, tagName, dst, tagName, opts)
In the philosophy of "batteries optional but included", we can include a manager implementation just like you have above, on the assumption that many will want it. But keep the Copy()
interaction really simple: I just send updates. How you manage them - discard, print to stdout, separate by desc or status or whatever - is beyond me, and I don't even care about the interaction.
Heh, crossed wires. I am reading your latest comments now.
Looking at your example, you didn't plumb it into Copy()
or Fetch()
, but you decorated the desc
? What am I misunderstanding?
That is a lot of extra overhead to do when calling Fetch()
. Wouldn't I want it to be a simple additional option to the call?
Here is an example if you don't want a manager.
package main
import (
"context"
"fmt"
"io"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2/content"
"oras.land/oras-go/v2/registry/remote"
"oras.land/oras/internal/experimental/track"
)
type ProgressFunc func(status track.Status, err error) error
func (pf ProgressFunc) Update(status track.Status) error {
return pf(status, nil)
}
func (pf ProgressFunc) Fail(err error) error {
return pf(track.Status{}, err)
}
func (pf ProgressFunc) Close() error {
return nil
}
func main() {
// Pull a manifest
repo, err := remote.NewRepository("ghcr.io/oras-project/oras")
if err != nil {
panic(err)
}
ctx := context.Background()
desc, err := repo.Resolve(ctx, "v1.2.0")
if err != nil {
panic(err)
}
// Create a new progress tracker.
prompt := map[track.State]string{
track.StateInitialized: "Pending",
track.StateTransmitting: "Pulling",
track.StateTransmitted: "Pulled ",
}
tracker := ProgressFunc(func(status track.Status, err error) error {
if err != nil {
fmt.Println("Error ", desc.Digest, err)
return err
}
if status.Offset < 0 {
fmt.Println(prompt[status.State], desc.Digest)
} else {
fmt.Printf("%s %s %6.2f%%\n", prompt[status.State], desc.Digest, float64(status.Offset)/float64(desc.Size)*100)
}
return nil
})
tracker.Update(track.Status{
State: track.StateInitialized,
})
r, err := repo.Fetch(ctx, desc)
if err != nil {
tracker.Fail(err)
return
}
defer r.Close()
rt := track.NewReadTracker(tracker, r)
defer rt.Close()
vr := content.NewVerifyReader(rt, desc)
_, err = io.ReadAll(vr) // io.ReadAll uses a smaller buffer
if err != nil {
tracker.Fail(err)
return
}
err = vr.Verify()
if err != nil {
tracker.Fail(err)
return
}
rt.Done()
}
A manager is required since Copy()
deals with multiple descriptors but Fetch()
and Push()
only deal with one descriptor at a time.
If ProgressFunc
and DescriptorProgressFunc
are pre-defined in oras-go
, all you need is just a single function
func(desc ocispec.Descriptor, status track.Status, err error) error {
fmt.Printf("copied %d bytes out of %d total for %s\n", status.Offset, desc.Size, desc.Digest)
}
although this simple function does not consider error handling.
So.. the complete example is
f := func(desc ocispec.Descriptor, status track.Status, err error) error {
fmt.Printf("copied %d bytes out of %d total for %s\n", status.Offset, desc.Size, desc.Digest)
}
opts := oras.CopyOptions{
ProgressManager: DescriptorProgressFunc(f),
}
desc, err := oras.Copy(ctx, src, tagName, dst, tagName, opts)
A manager is required since Copy() deals with multiple descriptors but Fetch() and Push() only deal with one descriptor at a time.
I don't think that changes anything. If Update
includes the descriptor, it includes all I need to know.
Hold on a second. You are not decorating the desc, you are decorating the reader?
r, err := repo.Fetch(ctx, desc)
if err != nil {
tracker.Fail(err)
return
}
defer r.Close()
rt := track.NewReadTracker(tracker, r)
defer rt.Close()
vr := content.NewVerifyReader(rt, desc)
_, err = io.ReadAll(vr) // io.ReadAll uses a smaller buffer
So all you are doing is wrapping a ReadCloser
with a ReadCloser
that has the "let me call some function to report updates once in a while" logic inside it? Chaining?
Yes, I don't think there is other way to get the status of a read operation (e.g. resp.Body
). It is worth noting that on receiving a status update does not imply you should render it. In other words, updates are call-based but rendering can be time-based.
That makes a lot of sense. It is the canonical way of doing things like that in go.
I still don't get how we would do that with Copy()
, which, as you said, is multiple blobs. Ah, is that what the manager is for? It is a "tracker factory", where Copy()
can get a tracker for each desc on which it is working, and then it can do the chaining you just showed for each ReadCloser
?
Ah, is that what the manager is for? It is a "tracker factory", where
Copy()
can get a tracker for each desc on which it is working, and then it can do the chaining you just showed for eachReadCloser
?
Exactly.
I think the name "manager" threw me off. It is a tracker factory, that lets copy say, "I am about to start working on desc, give me a tracker for it, and if I get none (no factory or nil tracker returned), I will just process the blobs as I want".
OK, it is interesting. Going to absorb it a bit. The trick is to get the documentation right.
What then are DescriptorProgressFunc
and ProgressFunc
? Don't I just pass a Manager
, which has one main func on it, Track()
, which lets me start tracking a desc?
What then are
DescriptorProgressFunc
andProgressFunc
?
They are just utility functions like http.HandlerFunc for your simple cases for demo purposes.
What purpose do they serve? If I understood correctly
What purpose do they serve?
For simple scenarios like yours, crafting a custom type and implementing those methods might be time consuming. Those utilities help you to covert a function to a handler meeting the interfaces.
This pattern is common in the golang world. If you take a look at http.HandlerFunc, you can observe that http.HandlerFunc
is a utility to convert a function into a http.Handler interface, which is convenient.
The analogy to http.HandlerFunc
is a good one. Although in our case, it might be a bit too complex.
My perspective always bounces between engineer who needs to build something, and user who needs to consume it (product). I will take a stab at writing the brief form of docs here. Tell me if this describes what you are doing.
-----BEGIN-----
When transmitting blobs, whether individually via Fetch()
or Push()
, or as part of an entire image via Fetch()
, you have the option to track progress of each blob being transmitted. oras library does not care what you do with the progress updates, only that it makes them available to you.
At the lowest level, each blob when being read is fetched via oras.Fetch()
, which returns an io.ReadCloser
. You can capture progress by wrapping the io.ReadCloser
with a struct that returns an io.ReadCloser
but reports on progress.
r, err := repo.Fetch(ctx, desc) // returns an io.ReadCloser
rc := myReadCloserTrackerFunc(r) // also returns an io.ReadCloser
This is the simplest way to track progress for a specific blob. Whether you send updates to an observability system or stderr, how often you do it, whether based on every buffer copy, every 1MB, or every 3 seconds, is entirely up to how you build your wrapper function, in our example myReadCloserTrackerFunc
.
At a higher level, but not required, just to wrap a single ReadCloser
, oras provides the Tracker
interface.
type Tracker interface {
io.Closer
Update(status Status) error
Fail(err error) error
ReadCloser(r io.ReadCloser) io.ReadCloser
}
You can use a Tracker
to generate a ReadCloser
as well as keep track of several status updates. A Tracker
can be generated via ProgressFunc(func)
, which returns a Tracker
you can use. You pass it the function you expect to be called with each update
tracker := ProgressFunc(func(status track.Status, err error) error {
if err != nil {
fmt.Println("Error ", desc.Digest, err)
return err
}
if status.Offset < 0 {
fmt.Println(prompt[status.State], desc.Digest)
} else {
fmt.Printf("%s %s %6.2f%%\n", prompt[status.State], desc.Digest, float64(status.Offset)/float64(desc.Size)*100)
}
return nil
})
You then can generate your io.ReadCloser
to wrap the io.ReadCloser
returned from Fetch()
:
tracker := ProgressFunc() // etc
r, err := repo.Fetch(ctx, desc) // returns an io.ReadCloser
rc := tracker.ReadCloser(r)
If you are copying an entire artifact by tag, which likely involves multiple descriptors, you can track it by providing a manager, or factory, that generates trackers for each descriptor's content that is copied. The manager is called by Copy()
each time it encounters a new descriptor, and uses the returned Tracker
to track progress for each blob's Fetch()
or Push()
:
type Manager interface {
io.Closer
Track(desc ocispec.Descriptor) (Tracker, error)
}
You then can pass the Manager
to Copy()
:
opts := oras.CopyOptions {
// other stuff
ProgressTracker: manager, // some implementation of manager
}
oras.Copy(ctx, src, srcRef, dst , dstRef , opts )
If the ProgressTracker
property is nil, or if Track()
returns nil for a given descriptor, then that descriptor is not tracked.
Of course, you are not required to create a Manager
implementation. oras library provides a convenient one for you with oras.DescriptorProgressFunc()
, which returns a manager or factory. The function is called with each update, whether of copied content or change in status. For example:
manager := DescriptorProgressFunc(func(desc ocispec.Descriptor, status track.Status, err error) error {
if err != nil {
fmt.Println("Error ", desc.Digest, err)
return err
}
if status.Offset < 0 {
fmt.Println(prompt[status.State], desc.Digest)
} else {
fmt.Printf("%s %s %6.2f%%\n", prompt[status.State], desc.Digest, float64(status.Offset)/float64(desc.Size)*100)
}
return nil
})
Each update, whether a change in status or bytes copied, will call the function. -----END-----
Is that roughly it? If so, I have a few suggestions:
DescriptorProgressFunc
and ProgressFunc
identical. The only difference is that one passes the descriptor and one doesn't. Does it hurt to pass it? And then make it a type.Manager
is confusing. Why not just call it type ProgressTracker interface
?DescriptorProgressFunc
) and "tracker generator" (ProgressFunc
) have somewhat confusing names. Isn't the func that gets passed to them a DescriptorProgressFunc
?
type DescriptorProgressFunc func(desc ocispec.Descriptor, status track.Status, err error) error
and then we can call the generator something like NewProgressTracker
and NewTracker
or similar
Looking for feedback.
Is that roughly it?
Yes. Just notice that the Tracker
interface should not contain
ReadCloser(r io.ReadCloser) io.ReadCloser
as we can provide a common function with performance optimized. E.g. implementing io.WriteTo
if the original r
implements it.
Your suggestion is mainly about naming. We can revise them in the future PRs (as I will prepare one).
Besides, the name Manager
is common in the golang world. E.g. github.com/containerd/containerd
: content.Manager. For now, track.Manager
manages track.Tracker
s.
For the package name, I'm considering track
v.s. progress
. If I cannot find out a scenario for general tracking, I'd like to rename the package to progress
(i.e. progress.Manager
and progress.Tracker
).
Sounds good. Feel free to use (and modify) my "docs" in the documentation with the PR.
What assistance do you need?
As discussed in this comment (with thanks to @shizhMSFT for shepherding it through).
Summary
oras.Copy()
Proposed design
The signature for oras.Copy includes oras.CopyOptions as the last parameter. If this were variadic, I would suggest adding another
WithProgressUpdate()
or similar, but we use a singleCopyOptions
, so I propose adding another property:CopyOptions.Progress
. The type depends on the design choice.There are two ways to do this:
CopyOptions.Progress func(Update)
. With each update,Copy()
(or its underlying functions) would call the passed function, passing it theUpdate
(see below).CopyOptions.Progress chan<- Update
. With each updateCopy()
would send anUpdate
to the channelIf
Progress
is nil, then this functionality is bypassed.Despite my linked PR using channels, I have no strong preference for channel over function.
Preventing Blocking
With both functions and channels, there is a concern that it might block. In principle, I think that if someone calls
Copy()
and passes it a blocking function or unbuffered channel, that is their issue. However, it can cause us headaches to support them.I would consider having each call that sends to the channel or calls
func()
to be in a separate short-lived goroutine, which calls fund or sends to the channel, wrapped in a timeout.Frequency of Update
My initial design is to rely on the underlying
io.CopyN()
. Whatever we use for that, we use for writing updates. However, that can overwhelm if the defaultio.Copy()
is used. If I recall correctly,io.Copy()
defaults to 32K. With a 100MB blob, that is ~3000 updates. That may or may not be good.However we control the update frequency, I think it should be byte-based, not time-based. I.e. "updates every x KB" instead of "updates every y seconds." That is more useful, and also easier to implement.
In terms of controlling the update frequency, the simplest way is
CopyOption.ProgresssFrequency uint
. If it is 0, stick to the default.An alternative is to have
CopyOption.Progress
be a struct with both the channel/func (whichever is chosen) and an update frequency property.A third method - and probably the simplest - is not to control it at all, but rather have it be part of
CopyOption.Progress
. Our callCopy()
calls that / sends to channel, and it buffers as often as it wants. This is the simplest, but is subject to making our "blocking control", i.e. goroutines, being overwhelmed.Open to ideas.
Structure of update message
The
oras.Update
should be simple and contain only 2 properties:The descriptor is important for 2 reasons:
Sample
Channel:
Func: