Open dcboy opened 10 months ago
Can you provide a way to reproduce this? If possible, a simple Go file for the server and a curl
example to send the H2 request.
server side
func Serve() {
uploadDir, err := filepath.Abs(path.Join(Flags.UploadDir, "./tusd/"))
if err != nil {
log.Fatalf("Unable to make absolute path: %s", err)
}
store := filestore.New(uploadDir)
locker := filelocker.New(uploadDir)
composer := tusd.NewStoreComposer()
store.UseIn(composer)
locker.UseIn(composer)
handler, err := tusd.NewHandler(tusd.Config{
MaxSize: 0,
BasePath: Flags.Basepath,
StoreComposer: composer,
RespectForwardedHeaders: true,
DisableDownload: true,
DisableTermination: true,
PreUploadCreateCallback: func(hook tusd.HookEvent) (resp tusd.HTTPResponse, fileinfo tusd.FileInfoChanges, err error) {
log.Infof("PreUploadCreateCallback: %s", utils.ToJson(hook))
return resp, tusd.FileInfoChanges{}, nil
},
PreFinishResponseCallback: func(hook tusd.HookEvent) (resp tusd.HTTPResponse, err error) {
log.Infof("PreFinishResponseCallback: %s", utils.ToJson(hook))
log.Infof("postFinish from:%s to:%s", from, to)
// del bin
os.Remove(fmt.Sprintf("%s.info", hook.Upload.Storage["Path"]))
return resp, nil
},
Logger: logger.GetSLogger(),
})
if err != nil {
panic(fmt.Errorf("unable to create handler: %s", err))
}
basepath := Flags.Basepath
quicAddress := "0.0.0.0:" + Flags.QuicPort
httpAddress := "0.0.0.0:" + Flags.HttpPort
httpsAddress := "0.0.0.0:" + Flags.HttpsPort
log.Infof("Using %s as address to listen QUIC.", quicAddress)
log.Infof("Using %s as address to listen HTTP.", httpAddress)
log.Infof("Using %s as address to listen HTTPS.", httpsAddress)
log.Infof("Using %s as the base path.", basepath)
basepathWithoutSlash := strings.TrimSuffix(basepath, "/")
basepathWithSlash := basepathWithoutSlash + "/"
http.Handle(basepathWithSlash, authMiddleware(http.StripPrefix(basepathWithSlash, handler)))
http.Handle(basepathWithoutSlash, authMiddleware(http.StripPrefix(basepathWithoutSlash, handler)))
// https
go func() {
listener, err := net.Listen("tcp", httpsAddress)
if err = http.ServeTLS(listener, nil, Flags.TLSCertFile, Flags.TLSKeyFile); err != nil {
log.Errorf("Unable to serve: %s", err)
}
}()
// http
go func() {
listener, err := net.Listen("tcp", httpAddress)
if err = http.Serve(listener, nil); err != nil {
log.Errorf("Unable to serve: %s", err)
}
}()
// quci
go func() {
if err = http3.ListenAndServeQUIC(quicAddress, Flags.TLSCertFile, Flags.TLSKeyFile, nil); err != nil {
log.Fatalf("Unable to serve: %s", err)
}
}()
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
select {
case <-c:
log.Infof("exit")
}
}
client side
package main
import (
"crypto/sha256"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"path"
"path/filepath"
"strings"
"sync"
"time"
"github.com/eventials/go-tus"
"github.com/quic-go/quic-go/http3"
)
type fileStore struct {
mu sync.Mutex
data map[string]string
filePath string
}
func newFileStore(filePath string) (*fileStore, error) {
fs := &fileStore{
data: make(map[string]string),
filePath: filePath,
}
if err := fs.loadFromFile(); err != nil {
return nil, err
}
return fs, nil
}
func (fs *fileStore) Get(key string) (string, bool) {
fs.mu.Lock()
defer fs.mu.Unlock()
value, exists := fs.data[key]
if !exists {
return "", false
}
return value, true
}
func (fs *fileStore) Set(key, value string) {
fs.mu.Lock()
defer fs.mu.Unlock()
fs.data[key] = value
fs.saveToFile()
}
func (fs *fileStore) Delete(key string) {
fs.mu.Lock()
defer fs.mu.Unlock()
delete(fs.data, key)
fs.saveToFile()
}
func (fs *fileStore) Close() {
fs.mu.Lock()
defer fs.mu.Unlock()
fs.saveToFile()
}
func (fs *fileStore) loadFromFile() error {
file, err := os.OpenFile(fs.filePath, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return err
}
defer file.Close()
data, err := ioutil.ReadFile(fs.filePath)
if err != nil {
return err
}
if len(data) > 0 {
if err := json.Unmarshal(data, &fs.data); err != nil {
return err
}
}
return nil
}
func (fs *fileStore) saveToFile() error {
data, err := json.Marshal(fs.data)
if err != nil {
return err
}
if err := ioutil.WriteFile(fs.filePath, data, 0644); err != nil {
return err
}
return nil
}
type TusClientCallback interface {
OnProgress(currentSize, totalSize int64)
}
type TusClient struct {
tag string
endpoint string
deviceCode string
store *fileStore
httpClient *http.Client
}
func NewTusClient(endpoint string) *TusClient {
var c *TusClient
roundTripper = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true,},
}
http2.ConfigureTransport(roundTripper)
c = &TusClient{
tag: fmt.Sprintf("[TusClient|%s]: ", protocol),
deviceCode: "",
endpoint: endpoint,
httpClient: &http.Client{Transport: roundTripper},
}
c.init()
return c
}
func (b *TusClient) init() {
storeFile, _ := filepath.Abs(path.Join("./data.json"))
err := os.MkdirAll(filepath.Dir(storeFile), os.FileMode(0774))
if err != nil {
b.log(fmt.Sprintf("init make dir fail %s error: %s", storeFile, err.Error()))
}
b.store, err = newFileStore(storeFile)
if err != nil {
b.log(fmt.Sprintf("init store error: %s", err.Error()))
}
b.log("init success")
}
func (b *TusClient) ResumableUpload(localPath, cb TusClientCallback) error {
prefix := filepath.Base(localPath)
f, err := os.Open(localPath)
if err != nil {
return err
}
defer f.Close()
config := &tus.Config{
ChunkSize: 1 * 1024 * 1024,
Resume: true,
OverridePatchMethod: false,
Store: b.store,
HttpClient: b.httpClient,
}
client, err := tus.NewClient(b.endpoint, config)
if err != nil {
b.log(fmt.Sprintf("%s:new client error %s", prefix, err.Error()))
return err
}
upload, err := tus.NewUploadFromFile(f)
if err != nil {
b.log(fmt.Sprintf("%s:new upload error %s", prefix, err.Error()))
return err
}
uploader, err := client.CreateOrResumeUpload(upload)
if err != nil {
b.log(fmt.Sprintf("%s:create upload error %s", prefix, err.Error()))
return err
}
startTime := time.Now()
b.log(fmt.Sprintf("%s:upload start", prefix))
progressChan := make(chan tus.Upload)
defer close(progressChan)
uploader.NotifyUploadProgress(progressChan)
go func(notifyChan *chan tus.Upload) {
for {
up := <-*notifyChan
cb.OnProgress(up.Offset(), up.Size())
if up.Finished() {
break
}
}
fmt.Println("exit")
}(&progressChan)
outErr := uploader.Upload()
elapsedTime := time.Since(startTime).Seconds()
if outErr == nil {
b.log(fmt.Sprintf("%s:upload finish %fs", prefix, elapsedTime))
b.store.Delete(upload.Fingerprint)
}
return outErr
}
func (b *TusClient) log(v ...interface{}) {
log.Println(append([]interface{}{b.tag}, v...)...)
}
type tusClientCallback struct {
}
func (b *tusClientCallback) OnProgress(currentSize, totalSize int64) {
log.Printf("currentSize: %d totalSize: %d", currentSize, totalSize)
}
func main() {
// if use HTTP/2 need config tls for server
endpoint := "https://127.0.0.1:6001/upload/"
//TODO:
localPath := "path/to/file"
fileName := filepath.Base(localPath)
tusClient := NewTusClient(endpoint)
err := tusClient.ResumableUpload(localPath, &tusClientCallback{})
if err != nil {
fmt.Printf("error:%s", err.Error())
}
}
maybe this issues: https://github.com/golang/go/issues/58237
maybe this issues: golang/go#58237
That could very well be the issue. tusd uses SetReadDeadline
internally. How you tried using the latest version of x/net/http2
instead of net/http
to see if that fixes the problem?
since go version 1.6, The net/http package has provided transparent support for the HTTP/2 protocol Cannot be used directly x/net/http2
Ok, I had hoped that the H2 package can be upgraded independently. Until this bug is fixed in Go itself, you can try to prevent tusd calling SetReadDeadline
when not request body is present. AFAIU, this is the root cause for this error.
You can try wrapping https://github.com/tus/tusd/blob/79709611ce7c32e784f2c609e993b5484a09f5a0/pkg/handler/unrouted_handler.go#L163 into an if r.Body != nil
and see if that helps.
Ok, I had hoped that the H2 package can be upgraded independently. Until this bug is fixed in Go itself, you can try to prevent tusd calling
SetReadDeadline
when not request body is present. AFAIU, this is the root cause for this error.
thx very mush
// Set the initial read deadline for consuming the request body. All headers have already been read,
// so this is only for reading the request body. While reading, we regularly update the read deadline
// so this deadline is usually not final. See the bodyReader and writeChunk.
// We also update the write deadline, but makes sure that it is larger than the read deadline, so we
// can still write a response in the case of a read timeout.
if r.Body != nil {
if err := c.resC.SetReadDeadline(time.Now().Add(handler.config.NetworkTimeout)); err != nil {
c.log.Warn("NetworkControlError", "error", err)
}
}
this is will not fix the problem
https://github.com/dcboy/tusd/commit/e5f560e3d3ef3a55994a10291e07ea035c91304b
That's great to hear! Cloud you open a PR for this? Then we can merge an release the fix.
That's great to hear! Cloud you open a PR for this? Then we can merge an release the fix.
could not resolve problem -_-~~
Oh sorry, I misread your comment. We also call SetReadDeadline at https://github.com/tus/tusd/blob/79709611ce7c32e784f2c609e993b5484a09f5a0/pkg/handler/unrouted_handler.go#L851, but this is already wrapped in a body check.
There is also https://github.com/tus/tusd/blob/1a43e26f16f43bed5dd2219c27e6eb14c125fb03/pkg/handler/body_reader.go#L109, but closeWithError
should also only be called if the request body is not nil.
From your original comment, it also looks like this occurred for a PATCH requests with a body. So maybe this is another issue than https://github.com/golang/go/issues/58237.
tusd version: v2.1.0
Using the tusd package programmatically as same demo, and use filestore