forked from gitea/gitea
		
	Fix content holes in Actions task logs file (#25560)
Fix #25451. Bugfixes: - When stopping the zombie or endless tasks, set `LogInStorage` to true after transferring the file to storage. It was missing, it could write to a nonexistent file in DBFS because `LogInStorage` was false. - Always update `ActionTask.Updated` when there's a new state reported by the runner, even if there's no change. This is to avoid the task being judged as a zombie task. Enhancement: - Support `Stat()` for DBFS file. - `WriteLogs` refuses to write if it could result in content holes. --------- Co-authored-by: Giteabot <teabot@gitea.io>
This commit is contained in:
		
							parent
							
								
									b6693a2c9a
								
							
						
					
					
						commit
						6daf21c9b7
					
				| @ -344,6 +344,9 @@ func UpdateTask(ctx context.Context, task *ActionTask, cols ...string) error { | ||||
| 	return err | ||||
| } | ||||
| 
 | ||||
| // UpdateTaskByState updates the task by the state. | ||||
| // It will always update the task if the state is not final, even there is no change. | ||||
| // So it will update ActionTask.Updated to avoid the task being judged as a zombie task. | ||||
| func UpdateTaskByState(ctx context.Context, state *runnerv1.TaskState) (*ActionTask, error) { | ||||
| 	stepStates := map[int64]*runnerv1.StepState{} | ||||
| 	for _, v := range state.Steps { | ||||
| @ -384,6 +387,12 @@ func UpdateTaskByState(ctx context.Context, state *runnerv1.TaskState) (*ActionT | ||||
| 		}, nil); err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 	} else { | ||||
| 		// Force update ActionTask.Updated to avoid the task being judged as a zombie task | ||||
| 		task.Updated = timeutil.TimeStampNow() | ||||
| 		if err := UpdateTask(ctx, task, "updated"); err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	if err := task.LoadAttributes(ctx); err != nil { | ||||
|  | ||||
| @ -7,6 +7,7 @@ import ( | ||||
| 	"context" | ||||
| 	"errors" | ||||
| 	"io" | ||||
| 	"io/fs" | ||||
| 	"os" | ||||
| 	"path/filepath" | ||||
| 	"strconv" | ||||
| @ -21,6 +22,7 @@ var defaultFileBlockSize int64 = 32 * 1024 | ||||
| type File interface { | ||||
| 	io.ReadWriteCloser | ||||
| 	io.Seeker | ||||
| 	fs.File | ||||
| } | ||||
| 
 | ||||
| type file struct { | ||||
| @ -193,10 +195,26 @@ func (f *file) Close() error { | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (f *file) Stat() (os.FileInfo, error) { | ||||
| 	if f.metaID == 0 { | ||||
| 		return nil, os.ErrInvalid | ||||
| 	} | ||||
| 
 | ||||
| 	fileMeta, err := findFileMetaByID(f.ctx, f.metaID) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return fileMeta, nil | ||||
| } | ||||
| 
 | ||||
| func timeToFileTimestamp(t time.Time) int64 { | ||||
| 	return t.UnixMicro() | ||||
| } | ||||
| 
 | ||||
| func fileTimestampToTime(timestamp int64) time.Time { | ||||
| 	return time.UnixMicro(timestamp) | ||||
| } | ||||
| 
 | ||||
| func (f *file) loadMetaByPath() (*dbfsMeta, error) { | ||||
| 	var fileMeta dbfsMeta | ||||
| 	if ok, err := db.GetEngine(f.ctx).Where("full_path = ?", f.fullPath).Get(&fileMeta); err != nil { | ||||
|  | ||||
| @ -5,7 +5,10 @@ package dbfs | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"io/fs" | ||||
| 	"os" | ||||
| 	"path" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"code.gitea.io/gitea/models/db" | ||||
| ) | ||||
| @ -100,3 +103,29 @@ func Remove(ctx context.Context, name string) error { | ||||
| 	defer f.Close() | ||||
| 	return f.delete() | ||||
| } | ||||
| 
 | ||||
| var _ fs.FileInfo = (*dbfsMeta)(nil) | ||||
| 
 | ||||
| func (m *dbfsMeta) Name() string { | ||||
| 	return path.Base(m.FullPath) | ||||
| } | ||||
| 
 | ||||
| func (m *dbfsMeta) Size() int64 { | ||||
| 	return m.FileSize | ||||
| } | ||||
| 
 | ||||
| func (m *dbfsMeta) Mode() fs.FileMode { | ||||
| 	return os.ModePerm | ||||
| } | ||||
| 
 | ||||
| func (m *dbfsMeta) ModTime() time.Time { | ||||
| 	return fileTimestampToTime(m.ModifyTimestamp) | ||||
| } | ||||
| 
 | ||||
| func (m *dbfsMeta) IsDir() bool { | ||||
| 	return false | ||||
| } | ||||
| 
 | ||||
| func (m *dbfsMeta) Sys() any { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| @ -111,6 +111,19 @@ func TestDbfsBasic(t *testing.T) { | ||||
| 
 | ||||
| 	_, err = OpenFile(db.DefaultContext, "test2.txt", os.O_RDONLY) | ||||
| 	assert.Error(t, err) | ||||
| 
 | ||||
| 	// test stat | ||||
| 	f, err = OpenFile(db.DefaultContext, "test/test.txt", os.O_RDWR|os.O_CREATE) | ||||
| 	assert.NoError(t, err) | ||||
| 	stat, err := f.Stat() | ||||
| 	assert.NoError(t, err) | ||||
| 	assert.EqualValues(t, "test.txt", stat.Name()) | ||||
| 	assert.EqualValues(t, 0, stat.Size()) | ||||
| 	_, err = f.Write([]byte("0123456789")) | ||||
| 	assert.NoError(t, err) | ||||
| 	stat, err = f.Stat() | ||||
| 	assert.NoError(t, err) | ||||
| 	assert.EqualValues(t, 10, stat.Size()) | ||||
| } | ||||
| 
 | ||||
| func TestDbfsReadWrite(t *testing.T) { | ||||
|  | ||||
| @ -29,12 +29,28 @@ const ( | ||||
| ) | ||||
| 
 | ||||
| func WriteLogs(ctx context.Context, filename string, offset int64, rows []*runnerv1.LogRow) ([]int, error) { | ||||
| 	flag := os.O_WRONLY | ||||
| 	if offset == 0 { | ||||
| 		// Create file only if offset is 0, or it could result in content holes if the file doesn't exist. | ||||
| 		flag |= os.O_CREATE | ||||
| 	} | ||||
| 	name := DBFSPrefix + filename | ||||
| 	f, err := dbfs.OpenFile(ctx, name, os.O_WRONLY|os.O_CREATE) | ||||
| 	f, err := dbfs.OpenFile(ctx, name, flag) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("dbfs OpenFile %q: %w", name, err) | ||||
| 	} | ||||
| 	defer f.Close() | ||||
| 
 | ||||
| 	stat, err := f.Stat() | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("dbfs Stat %q: %w", name, err) | ||||
| 	} | ||||
| 	if stat.Size() < offset { | ||||
| 		// If the size is less than offset, refuse to write, or it could result in content holes. | ||||
| 		// However, if the size is greater than offset, we can still write to overwrite the content. | ||||
| 		return nil, fmt.Errorf("size of %q is less than offset", name) | ||||
| 	} | ||||
| 
 | ||||
| 	if _, err := f.Seek(offset, io.SeekStart); err != nil { | ||||
| 		return nil, fmt.Errorf("dbfs Seek %q: %w", name, err) | ||||
| 	} | ||||
|  | ||||
| @ -56,12 +56,20 @@ func stopTasks(ctx context.Context, opts actions_model.FindTaskOptions) error { | ||||
| 			return nil | ||||
| 		}); err != nil { | ||||
| 			log.Warn("Cannot stop task %v: %v", task.ID, err) | ||||
| 			// go on | ||||
| 		} else if remove, err := actions.TransferLogs(ctx, task.LogFilename); err != nil { | ||||
| 			log.Warn("Cannot transfer logs of task %v: %v", task.ID, err) | ||||
| 		} else { | ||||
| 			remove() | ||||
| 			continue | ||||
| 		} | ||||
| 
 | ||||
| 		remove, err := actions.TransferLogs(ctx, task.LogFilename) | ||||
| 		if err != nil { | ||||
| 			log.Warn("Cannot transfer logs of task %v: %v", task.ID, err) | ||||
| 			continue | ||||
| 		} | ||||
| 		task.LogInStorage = true | ||||
| 		if err := actions_model.UpdateTask(ctx, task, "log_in_storage"); err != nil { | ||||
| 			log.Warn("Cannot update task %v: %v", task.ID, err) | ||||
| 			continue | ||||
| 		} | ||||
| 		remove() | ||||
| 	} | ||||
| 
 | ||||
| 	CreateCommitStatus(ctx, jobs...) | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user
	 GitHub
							GitHub