forked from gitea/gitea
It may be prudent to add runtime finalizers to the git.Repository and git.blobReader objects to absolutely ensure that these are both properly cancelled, cleaned and closed out. This commit is a backport of an extract from #19448 Signed-off-by: Andrew Thornton <art27@cantab.net>
This commit is contained in:
parent
35a7db49b4
commit
88da50674f
|
@ -12,8 +12,11 @@ import (
|
|||
"bytes"
|
||||
"io"
|
||||
"math"
|
||||
"runtime"
|
||||
"sync"
|
||||
|
||||
"code.gitea.io/gitea/modules/log"
|
||||
"code.gitea.io/gitea/modules/process"
|
||||
)
|
||||
|
||||
// Blob represents a Git object.
|
||||
|
@ -54,11 +57,15 @@ func (b *Blob) DataAsync() (io.ReadCloser, error) {
|
|||
return io.NopCloser(bytes.NewReader(bs)), err
|
||||
}
|
||||
|
||||
return &blobReader{
|
||||
br := &blobReader{
|
||||
repo: b.repo,
|
||||
rd: rd,
|
||||
n: size,
|
||||
cancel: cancel,
|
||||
}, nil
|
||||
}
|
||||
runtime.SetFinalizer(br, (*blobReader).finalizer)
|
||||
|
||||
return br, nil
|
||||
}
|
||||
|
||||
// Size returns the uncompressed size of the blob
|
||||
|
@ -86,6 +93,10 @@ func (b *Blob) Size() int64 {
|
|||
}
|
||||
|
||||
type blobReader struct {
|
||||
lock sync.Mutex
|
||||
closed bool
|
||||
|
||||
repo *Repository
|
||||
rd *bufio.Reader
|
||||
n int64
|
||||
cancel func()
|
||||
|
@ -104,27 +115,57 @@ func (b *blobReader) Read(p []byte) (n int, err error) {
|
|||
}
|
||||
|
||||
// Close implements io.Closer
|
||||
func (b *blobReader) Close() error {
|
||||
func (b *blobReader) Close() (err error) {
|
||||
b.lock.Lock()
|
||||
defer b.lock.Unlock()
|
||||
if b.closed {
|
||||
return
|
||||
}
|
||||
return b.close()
|
||||
}
|
||||
|
||||
func (b *blobReader) close() (err error) {
|
||||
defer b.cancel()
|
||||
b.closed = true
|
||||
if b.n > 0 {
|
||||
var n int
|
||||
for b.n > math.MaxInt32 {
|
||||
n, err := b.rd.Discard(math.MaxInt32)
|
||||
n, err = b.rd.Discard(math.MaxInt32)
|
||||
b.n -= int64(n)
|
||||
if err != nil {
|
||||
return err
|
||||
return
|
||||
}
|
||||
b.n -= math.MaxInt32
|
||||
}
|
||||
n, err := b.rd.Discard(int(b.n))
|
||||
n, err = b.rd.Discard(int(b.n))
|
||||
b.n -= int64(n)
|
||||
if err != nil {
|
||||
return err
|
||||
return
|
||||
}
|
||||
}
|
||||
if b.n == 0 {
|
||||
_, err := b.rd.Discard(1)
|
||||
_, err = b.rd.Discard(1)
|
||||
b.n--
|
||||
return err
|
||||
return
|
||||
}
|
||||
return nil
|
||||
return
|
||||
}
|
||||
|
||||
func (b *blobReader) finalizer() error {
|
||||
if b == nil {
|
||||
return nil
|
||||
}
|
||||
b.lock.Lock()
|
||||
defer b.lock.Unlock()
|
||||
if b.closed {
|
||||
return nil
|
||||
}
|
||||
|
||||
pid := ""
|
||||
if b.repo.Ctx != nil {
|
||||
pid = " from PID: " + string(process.GetPID(b.repo.Ctx))
|
||||
}
|
||||
log.Error("Finalizer running on unclosed blobReader%s: %s%s", pid, b.repo.Path)
|
||||
|
||||
return b.close()
|
||||
}
|
||||
|
|
|
@ -12,8 +12,12 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sync"
|
||||
|
||||
"code.gitea.io/gitea/modules/log"
|
||||
gitealog "code.gitea.io/gitea/modules/log"
|
||||
"code.gitea.io/gitea/modules/process"
|
||||
"code.gitea.io/gitea/modules/setting"
|
||||
|
||||
"github.com/go-git/go-billy/v5/osfs"
|
||||
|
@ -28,6 +32,9 @@ type Repository struct {
|
|||
|
||||
tagCache *ObjectCache
|
||||
|
||||
lock sync.Mutex
|
||||
closed bool
|
||||
|
||||
gogitRepo *gogit.Repository
|
||||
gogitStorage *filesystem.Storage
|
||||
gpgSettings *GPGSettings
|
||||
|
@ -63,23 +70,57 @@ func OpenRepositoryCtx(ctx context.Context, repoPath string) (*Repository, error
|
|||
return nil, err
|
||||
}
|
||||
|
||||
return &Repository{
|
||||
repo := &Repository{
|
||||
Path: repoPath,
|
||||
gogitRepo: gogitRepo,
|
||||
gogitStorage: storage,
|
||||
tagCache: newObjectCache(),
|
||||
Ctx: ctx,
|
||||
}, nil
|
||||
}
|
||||
|
||||
runtime.SetFinalizer(repo, (*Repository).finalizer)
|
||||
|
||||
return repo, nil
|
||||
}
|
||||
|
||||
// Close this repository, in particular close the underlying gogitStorage if this is not nil
|
||||
func (repo *Repository) Close() {
|
||||
if repo == nil || repo.gogitStorage == nil {
|
||||
func (repo *Repository) Close() (err error) {
|
||||
if repo == nil {
|
||||
return
|
||||
}
|
||||
if err := repo.gogitStorage.Close(); err != nil {
|
||||
repo.lock.Lock()
|
||||
defer repo.lock.Unlock()
|
||||
return repo.close()
|
||||
}
|
||||
|
||||
func (repo *Repository) close() (err error) {
|
||||
repo.closed = true
|
||||
if repo.gogitStorage == nil {
|
||||
return
|
||||
}
|
||||
err = repo.gogitStorage.Close()
|
||||
if err != nil {
|
||||
gitealog.Error("Error closing storage: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (repo *Repository) finalizer() error {
|
||||
if repo == nil {
|
||||
return nil
|
||||
}
|
||||
repo.lock.Lock()
|
||||
defer repo.lock.Unlock()
|
||||
if !repo.closed {
|
||||
pid := ""
|
||||
if repo.Ctx != nil {
|
||||
pid = " from PID: " + string(process.GetPID(repo.Ctx))
|
||||
}
|
||||
log.Error("Finalizer running on unclosed repository%s: %s%s", pid, repo.Path)
|
||||
}
|
||||
|
||||
// We still need to run the close fn as it may be possible to reopen the gogitrepo after close
|
||||
return repo.close()
|
||||
}
|
||||
|
||||
// GoGitRepo gets the go-git repo representation
|
||||
|
|
|
@ -13,8 +13,11 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sync"
|
||||
|
||||
"code.gitea.io/gitea/modules/log"
|
||||
"code.gitea.io/gitea/modules/process"
|
||||
)
|
||||
|
||||
// Repository represents a Git repository.
|
||||
|
@ -25,6 +28,10 @@ type Repository struct {
|
|||
|
||||
gpgSettings *GPGSettings
|
||||
|
||||
lock sync.Mutex
|
||||
|
||||
closed bool
|
||||
|
||||
batchCancel context.CancelFunc
|
||||
batchReader *bufio.Reader
|
||||
batchWriter WriteCloserError
|
||||
|
@ -64,29 +71,57 @@ func OpenRepositoryCtx(ctx context.Context, repoPath string) (*Repository, error
|
|||
repo.batchWriter, repo.batchReader, repo.batchCancel = CatFileBatch(ctx, repoPath)
|
||||
repo.checkWriter, repo.checkReader, repo.checkCancel = CatFileBatchCheck(ctx, repo.Path)
|
||||
|
||||
runtime.SetFinalizer(repo, (*Repository).finalizer)
|
||||
|
||||
return repo, nil
|
||||
}
|
||||
|
||||
// CatFileBatch obtains a CatFileBatch for this repository
|
||||
func (repo *Repository) CatFileBatch(ctx context.Context) (WriteCloserError, *bufio.Reader, func()) {
|
||||
if repo.batchCancel == nil || repo.batchReader.Buffered() > 0 {
|
||||
repo.lock.Lock()
|
||||
defer repo.lock.Unlock()
|
||||
|
||||
if repo.closed || repo.batchReader.Buffered() > 0 {
|
||||
log.Debug("Opening temporary cat file batch for: %s", repo.Path)
|
||||
return CatFileBatch(ctx, repo.Path)
|
||||
}
|
||||
|
||||
if repo.batchCancel == nil {
|
||||
repo.batchWriter, repo.batchReader, repo.batchCancel = CatFileBatch(ctx, repo.Path)
|
||||
}
|
||||
|
||||
return repo.batchWriter, repo.batchReader, func() {}
|
||||
}
|
||||
|
||||
// CatFileBatchCheck obtains a CatFileBatchCheck for this repository
|
||||
func (repo *Repository) CatFileBatchCheck(ctx context.Context) (WriteCloserError, *bufio.Reader, func()) {
|
||||
if repo.checkCancel == nil || repo.checkReader.Buffered() > 0 {
|
||||
repo.lock.Lock()
|
||||
defer repo.lock.Unlock()
|
||||
|
||||
if repo.closed || repo.checkReader.Buffered() > 0 {
|
||||
log.Debug("Opening temporary cat file batch-check: %s", repo.Path)
|
||||
return CatFileBatchCheck(ctx, repo.Path)
|
||||
}
|
||||
|
||||
if repo.checkCancel == nil {
|
||||
repo.checkWriter, repo.checkReader, repo.checkCancel = CatFileBatchCheck(ctx, repo.Path)
|
||||
}
|
||||
|
||||
return repo.checkWriter, repo.checkReader, func() {}
|
||||
}
|
||||
|
||||
// Close this repository, in particular close the underlying gogitStorage if this is not nil
|
||||
func (repo *Repository) Close() {
|
||||
func (repo *Repository) Close() (err error) {
|
||||
if repo == nil {
|
||||
return
|
||||
}
|
||||
repo.lock.Lock()
|
||||
defer repo.lock.Unlock()
|
||||
|
||||
return repo.close()
|
||||
}
|
||||
|
||||
func (repo *Repository) close() (err error) {
|
||||
if repo == nil {
|
||||
return
|
||||
}
|
||||
|
@ -102,4 +137,26 @@ func (repo *Repository) Close() {
|
|||
repo.checkReader = nil
|
||||
repo.checkWriter = nil
|
||||
}
|
||||
repo.closed = true
|
||||
return
|
||||
}
|
||||
|
||||
func (repo *Repository) finalizer() (err error) {
|
||||
if repo == nil {
|
||||
return
|
||||
}
|
||||
repo.lock.Lock()
|
||||
defer repo.lock.Unlock()
|
||||
if repo.closed {
|
||||
return nil
|
||||
}
|
||||
|
||||
if repo.batchCancel != nil || repo.checkCancel != nil {
|
||||
pid := ""
|
||||
if repo.Ctx != nil {
|
||||
pid = " from PID: " + string(process.GetPID(repo.Ctx))
|
||||
}
|
||||
log.Error("Finalizer running on unclosed repository%s: %s%s", pid, repo.Path)
|
||||
}
|
||||
return repo.close()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue