diff --git a/act/common/git/git.go b/act/common/git/git.go index 3c64098f..3ebe9724 100644 --- a/act/common/git/git.go +++ b/act/common/git/git.go @@ -38,9 +38,11 @@ var ( ErrNoRepo = errors.New("unable to find git repo") ) -// acquireCloneLock returns an unlock function after locking the per-directory mutex for dir. -// Only concurrent operations targeting the same directory are erialized; clones into different directories run in parallel. -func acquireCloneLock(dir string) func() { +// AcquireCloneLock returns an unlock function after locking the per-directory mutex for dir. +// Only concurrent operations targeting the same directory are serialized; clones into different directories run in parallel. +// Callers reading files inside dir (e.g. tarring a checked-out action into a job container) must hold this lock too, +// otherwise a concurrent NewGitCloneExecutor on the same dir can mutate the worktree mid-read. +func AcquireCloneLock(dir string) func() { v, _ := cloneLocks.LoadOrStore(dir, &sync.Mutex{}) mu := v.(*sync.Mutex) mu.Lock() @@ -308,7 +310,7 @@ func NewGitCloneExecutor(input NewGitCloneExecutorInput) common.Executor { logger.Infof("git clone '%s' # ref=%s", input.URL, input.Ref) logger.Debugf(" cloning %s to %s", input.URL, input.Dir) - defer acquireCloneLock(input.Dir)() + defer AcquireCloneLock(input.Dir)() refName := plumbing.ReferenceName("refs/heads/" + input.Ref) r, err := CloneIfRequired(ctx, refName, input, logger) diff --git a/act/common/git/git_test.go b/act/common/git/git_test.go index cfcdd09a..710674ca 100644 --- a/act/common/git/git_test.go +++ b/act/common/git/git_test.go @@ -310,11 +310,11 @@ func TestAcquireCloneLock(t *testing.T) { t.Run("same directory serializes", func(t *testing.T) { dir := t.TempDir() - unlock1 := acquireCloneLock(dir) + unlock1 := AcquireCloneLock(dir) secondAcquired := make(chan struct{}) go func() { - unlock := acquireCloneLock(dir) + unlock := AcquireCloneLock(dir) close(secondAcquired) unlock() }() @@ -338,12 +338,12 @@ func TestAcquireCloneLock(t *testing.T) { dirA := t.TempDir() dirB := t.TempDir() - unlockA := acquireCloneLock(dirA) + unlockA := AcquireCloneLock(dirA) defer unlockA() done := make(chan struct{}) go func() { - unlock := acquireCloneLock(dirB) + unlock := AcquireCloneLock(dirB) unlock() close(done) }() diff --git a/act/runner/action.go b/act/runner/action.go index 63ea3434..5a50c2c1 100644 --- a/act/runner/action.go +++ b/act/runner/action.go @@ -19,6 +19,7 @@ import ( "strings" "gitea.com/gitea/runner/act/common" + "gitea.com/gitea/runner/act/common/git" "gitea.com/gitea/runner/act/container" "gitea.com/gitea/runner/act/model" @@ -44,6 +45,11 @@ type runAction func(step actionStep, actionDir string, remoteAction *remoteActio //go:embed res/trampoline.js var trampoline embed.FS +var ( + ContainerImageExistsLocally = container.ImageExistsLocally + ContainerNewDockerBuildExecutor = container.NewDockerBuildExecutor +) + func readActionImpl(ctx context.Context, step *model.Step, actionDir, actionPath string, readFile actionYamlReader, writeFile fileWriter) (*model.Action, error) { logger := common.Logger(ctx) allErrors := []error{} @@ -148,6 +154,8 @@ func maybeCopyToActionDir(ctx context.Context, step actionStep, actionDir, actio return rc.JobContainer.CopyTarStream(ctx, containerActionDirCopy, ta) } + defer git.AcquireCloneLock(actionDir)() + if err := removeGitIgnore(ctx, actionDir); err != nil { return err } @@ -197,7 +205,7 @@ func runActionImpl(step actionStep, actionDir string, remoteAction *remoteAction if remoteAction == nil { location = containerActionDir } - return execAsDocker(ctx, step, actionName, location, remoteAction == nil) + return execAsDocker(ctx, step, actionName, actionDir, location, remoteAction == nil) case x.IsComposite(): if err := maybeCopyToActionDir(ctx, step, actionDir, actionPath, containerActionDir); err != nil { return err @@ -265,7 +273,7 @@ func removeGitIgnore(ctx context.Context, directory string) error { } // TODO: break out parts of function to reduce complexicity -func execAsDocker(ctx context.Context, step actionStep, actionName, basedir string, localAction bool) error { +func execAsDocker(ctx context.Context, step actionStep, actionName, actionDir, basedir string, localAction bool) error { logger := common.Logger(ctx) rc := step.getRunContext() action := step.getActionModel() @@ -284,12 +292,12 @@ func execAsDocker(ctx context.Context, step actionStep, actionName, basedir stri image = strings.ToLower(image) contextDir, fileName := filepath.Split(filepath.Join(basedir, action.Runs.Image)) - anyArchExists, err := container.ImageExistsLocally(ctx, image, "any") + anyArchExists, err := ContainerImageExistsLocally(ctx, image, "any") if err != nil { return err } - correctArchExists, err := container.ImageExistsLocally(ctx, image, rc.Config.ContainerArchitecture) + correctArchExists, err := ContainerImageExistsLocally(ctx, image, rc.Config.ContainerArchitecture) if err != nil { return err } @@ -321,13 +329,21 @@ func execAsDocker(ctx context.Context, step actionStep, actionName, basedir stri } defer buildContext.Close() } - prepImage = container.NewDockerBuildExecutor(container.NewDockerBuildExecutorInput{ + prepImage = ContainerNewDockerBuildExecutor(container.NewDockerBuildExecutorInput{ ContextDir: contextDir, Dockerfile: fileName, ImageTag: image, BuildContext: buildContext, Platform: rc.Config.ContainerArchitecture, }) + if buildContext == nil { + // Held across the whole build: the daemon drains contextDir lazily. + inner := prepImage + prepImage = func(ctx context.Context) error { + defer git.AcquireCloneLock(actionDir)() + return inner(ctx) + } + } } else { logger.Debugf("image '%s' for architecture '%s' already exists", image, rc.Config.ContainerArchitecture) } diff --git a/act/runner/action_test.go b/act/runner/action_test.go index a234f476..3814c6eb 100644 --- a/act/runner/action_test.go +++ b/act/runner/action_test.go @@ -9,8 +9,13 @@ import ( "io" "io/fs" "strings" + "sync" "testing" + "time" + "gitea.com/gitea/runner/act/common" + "gitea.com/gitea/runner/act/common/git" + "gitea.com/gitea/runner/act/container" "gitea.com/gitea/runner/act/model" "github.com/stretchr/testify/assert" @@ -252,3 +257,153 @@ func TestActionRunner(t *testing.T) { }) } } + +func TestMaybeCopyToActionDirHoldsCloneLock(t *testing.T) { + ctx := context.Background() + + actionDir := t.TempDir() + + releaseCopy := make(chan struct{}) + release := sync.OnceFunc(func() { close(releaseCopy) }) + defer release() + + copyEntered := make(chan struct{}) + + cm := &containerMock{} + cm.On("CopyDir", "/var/run/act/actions/", actionDir+"/", false).Return(func(ctx context.Context) error { + close(copyEntered) + <-releaseCopy + return nil + }) + + step := &stepActionRemote{ + Step: &model.Step{Uses: "remote/action@v1"}, + RunContext: &RunContext{ + Config: &Config{}, + JobContainer: cm, + }, + } + + copyDone := make(chan error, 1) + go func() { + copyDone <- maybeCopyToActionDir(ctx, step, actionDir, "", "/var/run/act/actions/") + }() + + select { + case <-copyEntered: + case err := <-copyDone: + t.Fatalf("maybeCopyToActionDir returned before CopyDir was entered: %v", err) + case <-time.After(time.Second): + t.Fatal("CopyDir was not entered within 1 second") + } + + peerAcquired := make(chan struct{}) + go func() { + unlock := git.AcquireCloneLock(actionDir) + close(peerAcquired) + unlock() + }() + + select { + case <-peerAcquired: + t.Fatal("peer AcquireCloneLock returned while CopyDir was running") + case <-time.After(50 * time.Millisecond): + } + + release() + + select { + case err := <-copyDone: + if err != nil { + t.Fatalf("maybeCopyToActionDir returned error: %v", err) + } + case <-time.After(time.Second): + t.Fatal("maybeCopyToActionDir did not return after CopyDir was unblocked") + } + + select { + case <-peerAcquired: + case <-time.After(time.Second): + t.Fatal("peer AcquireCloneLock did not proceed after lock released") + } + + cm.AssertExpectations(t) +} + +func TestExecAsDockerHoldsCloneLockForRemoteUncached(t *testing.T) { + actionDir := t.TempDir() + + unlockOnce := sync.OnceFunc(git.AcquireCloneLock(actionDir)) + defer unlockOnce() + + innerEntered := make(chan struct{}) + releaseInner := make(chan struct{}) + releaseOnce := sync.OnceFunc(func() { close(releaseInner) }) + defer releaseOnce() + + origImageExists := ContainerImageExistsLocally + ContainerImageExistsLocally = func(_ context.Context, _, _ string) (bool, error) { + return false, nil + } + defer func() { ContainerImageExistsLocally = origImageExists }() + + origBuildExec := ContainerNewDockerBuildExecutor + ContainerNewDockerBuildExecutor = func(_ container.NewDockerBuildExecutorInput) common.Executor { + return func(_ context.Context) error { + close(innerEntered) + <-releaseInner + return nil + } + } + defer func() { ContainerNewDockerBuildExecutor = origBuildExec }() + + step := &stepActionRemote{ + Step: &model.Step{ID: "1", Uses: "remote/action@v1", With: map[string]string{}}, + RunContext: &RunContext{ + Config: &Config{}, + Run: &model.Run{ + JobID: "1", + Workflow: &model.Workflow{ + Name: "wf", + Jobs: map[string]*model.Job{"1": {}}, + }, + }, + JobContainer: &containerMock{}, + }, + action: &model.Action{Runs: model.ActionRuns{Using: "docker", Image: "Dockerfile"}}, + env: map[string]string{}, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + done := make(chan error, 1) + go func() { done <- execAsDocker(ctx, step, "test-action", actionDir, actionDir, false) }() + + select { + case <-innerEntered: + t.Fatal("inner build executor ran before clone lock was released") + case err := <-done: + t.Fatalf("execAsDocker returned before inner was entered: %v", err) + case <-time.After(50 * time.Millisecond): + } + + unlockOnce() + + select { + case <-innerEntered: + case err := <-done: + t.Fatalf("execAsDocker returned without entering inner: %v", err) + case <-time.After(time.Second): + t.Fatal("inner build executor not entered after lock released") + } + + cancel() + releaseOnce() + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("execAsDocker did not return after inner was released and ctx was canceled") + } +} diff --git a/act/runner/reusable_workflow.go b/act/runner/reusable_workflow.go index e008931e..590ebc82 100644 --- a/act/runner/reusable_workflow.go +++ b/act/runner/reusable_workflow.go @@ -142,9 +142,16 @@ func cloneRemoteReusableWorkflow(rc *RunContext, cloneURL, ref, targetDirectory, } } +var modelNewWorkflowPlanner = model.NewWorkflowPlanner + func newReusableWorkflowExecutor(rc *RunContext, directory, workflow string) common.Executor { return func(ctx context.Context) error { - planner, err := model.NewWorkflowPlanner(path.Join(directory, workflow), true) + // Scoped to the yaml read so concurrent invocations don't serialize + // on the whole job run. + planner, err := func() (model.WorkflowPlanner, error) { + defer git.AcquireCloneLock(directory)() + return modelNewWorkflowPlanner(path.Join(directory, workflow), true) + }() if err != nil { return err } diff --git a/act/runner/reusable_workflow_test.go b/act/runner/reusable_workflow_test.go index 7dfb5bea..8e618698 100644 --- a/act/runner/reusable_workflow_test.go +++ b/act/runner/reusable_workflow_test.go @@ -5,11 +5,15 @@ package runner import ( "context" + "errors" "os" "os/exec" "path/filepath" + "sync" "testing" + "time" + "gitea.com/gitea/runner/act/common/git" "gitea.com/gitea/runner/act/model" "github.com/stretchr/testify/require" @@ -71,6 +75,54 @@ func TestReusableWorkflowCachedBranchRefRefreshes(t *testing.T) { require.Equal(t, tmpl("v2"), string(got), "cached workflow file must reflect the updated branch tip") } +func TestNewReusableWorkflowExecutorHoldsCloneLock(t *testing.T) { + workflowDir := t.TempDir() + + unlockOnce := sync.OnceFunc(git.AcquireCloneLock(workflowDir)) + defer unlockOnce() + + plannerCalled := make(chan struct{}) + + origPlanner := modelNewWorkflowPlanner + modelNewWorkflowPlanner = func(string, bool) (model.WorkflowPlanner, error) { + close(plannerCalled) + return nil, errors.New("stop") + } + defer func() { modelNewWorkflowPlanner = origPlanner }() + + rc := &RunContext{ + Config: &Config{}, + Run: &model.Run{Workflow: &model.Workflow{Jobs: map[string]*model.Job{}}}, + } + exec := newReusableWorkflowExecutor(rc, workflowDir, "reusable.yml") + + done := make(chan error, 1) + go func() { done <- exec(context.Background()) }() + + select { + case <-plannerCalled: + t.Fatal("planner ran while clone lock was held") + case err := <-done: + t.Fatalf("executor returned before planner was reached: %v", err) + case <-time.After(50 * time.Millisecond): + } + + unlockOnce() + + select { + case <-plannerCalled: + case <-time.After(time.Second): + t.Fatal("planner not called after lock was released") + } + + select { + case err := <-done: + require.Error(t, err) + case <-time.After(time.Second): + t.Fatal("executor did not return after planner ran") + } +} + func gitMust(t *testing.T, dir string, args ...string) { t.Helper() cmd := exec.Command("git", args...) diff --git a/act/runner/step_action_remote.go b/act/runner/step_action_remote.go index 72c7da96..e842ca97 100644 --- a/act/runner/step_action_remote.go +++ b/act/runner/step_action_remote.go @@ -145,6 +145,7 @@ func (sar *stepActionRemote) prepareActionExecutor() common.Executor { return common.NewPipelineExecutor( ntErr, func(ctx context.Context) error { + defer git.AcquireCloneLock(actionDir)() actionModel, err := sar.readAction(ctx, sar.Step, actionDir, sar.remoteAction.Path, remoteReader(ctx), os.WriteFile) sar.action = actionModel return err