From 3293d725a7cf70ca765c42b9563e776b0e9b176c Mon Sep 17 00:00:00 2001 From: ChristopherHX Date: Sun, 8 Jun 2025 12:41:42 +0200 Subject: [PATCH] feat: add parallel flag (#114) * add parallel flag --- cmd/input.go | 1 + cmd/root.go | 3 +++ pkg/runner/run_context.go | 11 +++++++++++ pkg/runner/runner.go | 6 ++++++ pkg/runner/runner_test.go | 4 +++- 5 files changed, 24 insertions(+), 1 deletion(-) diff --git a/cmd/input.go b/cmd/input.go index 9b989ed0..e1142a9b 100644 --- a/cmd/input.go +++ b/cmd/input.go @@ -65,6 +65,7 @@ type Input struct { listOptions bool validate bool strict bool + parallel int } func (i *Input) resolve(path string) string { diff --git a/cmd/root.go b/cmd/root.go index dd4be014..920f50ef 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -95,6 +95,8 @@ func createRootCommand(ctx context.Context, input *Input, version string) *cobra rootCmd.Flags().StringArrayVarP(&input.replaceGheActionWithGithubCom, "replace-ghe-action-with-github-com", "", []string{}, "If you are using GitHub Enterprise Server and allow specified actions from GitHub (github.com), you can set actions on this. (e.g. --replace-ghe-action-with-github-com =github/super-linter)") rootCmd.Flags().StringVar(&input.replaceGheActionTokenWithGithubCom, "replace-ghe-action-token-with-github-com", "", "If you are using replace-ghe-action-with-github-com and you want to use private actions on GitHub, you have to set personal access token") rootCmd.Flags().StringArrayVarP(&input.matrix, "matrix", "", []string{}, "specify which matrix configuration to include (e.g. --matrix java:13") + rootCmd.Flags().IntVarP(&input.parallel, "parallel", "", 0, "number of jobs to run in parallel") + rootCmd.Flags().IntVarP(&input.parallel, "concurrent-jobs", "", 0, "number of jobs to run in parallel") rootCmd.PersistentFlags().StringVarP(&input.actor, "actor", "a", "nektos/act", "user that triggered the event") rootCmd.PersistentFlags().StringVarP(&input.workflowsPath, "workflows", "W", "./.github/workflows/", "path to workflow file(s)") rootCmd.PersistentFlags().BoolVarP(&input.workflowRecurse, "recurse", "", false, "Flag to enable running workflows from subdirectories of specified path in '--workflows'/'-W' flag, this feature doesn't exist on GitHub Actions as of 2024/11") @@ -649,6 +651,7 @@ func newRunCommand(ctx context.Context, input *Input) func(*cobra.Command, []str ReplaceGheActionTokenWithGithubCom: input.replaceGheActionTokenWithGithubCom, Matrix: matrixes, ContainerNetworkMode: docker_container.NetworkMode(input.networkName), + Parallel: input.parallel, } if input.actionOfflineMode { config.ActionCache = &runner.GoGitActionCacheOfflineMode{ diff --git a/pkg/runner/run_context.go b/pkg/runner/run_context.go index 409a2d2c..9deaef3c 100644 --- a/pkg/runner/run_context.go +++ b/pkg/runner/run_context.go @@ -27,6 +27,7 @@ import ( "github.com/docker/go-connections/nat" "github.com/opencontainers/selinux/go-selinux" "github.com/sirupsen/logrus" + "golang.org/x/sync/semaphore" ) // RunContext contains info about current job @@ -772,12 +773,22 @@ func (rc *RunContext) Executor() (common.Executor, error) { } } + if rc.Config != nil && rc.Config.Parallel > 0 && rc.Config.semaphore == nil { + rc.Config.semaphore = semaphore.NewWeighted(int64(rc.Config.Parallel)) + } + return func(ctx context.Context) error { res, err := rc.isEnabled(ctx) if err != nil { return err } if res { + if jobType == model.JobTypeDefault && rc.Config != nil && rc.Config.Parallel > 0 && rc.Config.semaphore != nil { + if err := rc.Config.semaphore.Acquire(ctx, 1); err != nil { + return fmt.Errorf("failed to acquire semaphore: %w", err) + } + defer rc.Config.semaphore.Release(1) + } return executor(ctx) } return nil diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index beadab45..4a998f5a 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -12,6 +12,7 @@ import ( "github.com/actions-oss/act-cli/pkg/model" docker_container "github.com/docker/docker/api/types/container" log "github.com/sirupsen/logrus" + "golang.org/x/sync/semaphore" ) // Runner provides capabilities to run GitHub actions @@ -68,6 +69,8 @@ type Config struct { HostEnvironmentDir string // Custom folder for host environment, parallel jobs must be 1 CustomExecutor map[model.JobType]func(*RunContext) common.Executor // Custom executor to run jobs + semaphore *semaphore.Weighted + Parallel int // Number of parallel jobs to run } func (runnerConfig *Config) GetGitHubServerURL() string { @@ -234,6 +237,9 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor { } pipeline = append(pipeline, common.NewParallelExecutor(maxParallel, stageExecutor...)) } + if runner.config.Parallel != 0 { + return common.NewParallelExecutor(len(pipeline), pipeline...)(ctx) + } ncpu := runtime.NumCPU() if 1 > ncpu { ncpu = 1 diff --git a/pkg/runner/runner_test.go b/pkg/runner/runner_test.go index 27018454..be437fc5 100644 --- a/pkg/runner/runner_test.go +++ b/pkg/runner/runner_test.go @@ -193,6 +193,7 @@ func (j *TestJobFileInfo) runTest(ctx context.Context, t *testing.T, cfg *Config ContainerArchitecture: cfg.ContainerArchitecture, Matrix: cfg.Matrix, ActionCache: cfg.ActionCache, + Parallel: cfg.Parallel, } runner, err := New(runnerConfig) @@ -338,7 +339,8 @@ func TestRunEvent(t *testing.T) { for _, table := range tables { t.Run(table.workflowPath, func(t *testing.T) { config := &Config{ - Secrets: table.secrets, + Secrets: table.secrets, + Parallel: 8, } eventFile := filepath.Join(workdir, table.workflowPath, "event.json")