mirror of
https://gitea.com/gitea/act_runner.git
synced 2026-04-24 12:50:31 +08:00
Merges the `gitea.com/gitea/act` fork into this repository as the `act/` directory and consumes it as a local package. The `replace github.com/nektos/act => gitea.com/gitea/act` directive is removed; act's dependencies are merged into the root `go.mod`. - Imports rewritten: `github.com/nektos/act/pkg/...` → `gitea.com/gitea/act_runner/act/...` (flattened — `pkg/` boundary dropped to match the layout forgejo-runner adopted). - Dropped act's CLI (`cmd/`, `main.go`) and all upstream project files; kept the library tree + `LICENSE`. - Added `// Copyright <year> The Gitea Authors ...` / `// Copyright <year> nektos` headers to 104 `.go` files. - Pre-existing act lint violations annotated inline with `//nolint:<linter> // pre-existing issue from nektos/act`. `.golangci.yml` is unchanged vs `main`. - Makefile test target: `-race -short` (matches forgejo-runner). - Pre-existing integration test failures fixed: race in parallel executor (atomic counters); TestSetupEnv / command_test / expression_test / run_context_test updated to match gitea fork runtime; TestJobExecutor and TestActionCache gated on `testing.Short()`. Full `gitea/act` commit history is reachable via the second parent. Co-Authored-By: Claude (Opus 4.7) <noreply@anthropic.com>
284 lines
7.8 KiB
Go
284 lines
7.8 KiB
Go
// Copyright 2026 The Gitea Authors. All rights reserved.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package common
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
)
|
|
|
|
// TestMaxParallelJobExecution tests actual job execution with max-parallel
|
|
func TestMaxParallelJobExecution(t *testing.T) {
|
|
t.Run("MaxParallel=1 Sequential", func(t *testing.T) {
|
|
var currentRunning atomic.Int32
|
|
var maxConcurrent int32
|
|
var executionOrder []int
|
|
var mu sync.Mutex
|
|
|
|
executors := make([]Executor, 5)
|
|
for i := range 5 {
|
|
taskID := i
|
|
executors[i] = func(ctx context.Context) error {
|
|
current := currentRunning.Add(1)
|
|
|
|
// Track max concurrent
|
|
for {
|
|
maxValue := atomic.LoadInt32(&maxConcurrent)
|
|
if current <= maxValue || atomic.CompareAndSwapInt32(&maxConcurrent, maxValue, current) {
|
|
break
|
|
}
|
|
}
|
|
|
|
mu.Lock()
|
|
executionOrder = append(executionOrder, taskID)
|
|
mu.Unlock()
|
|
|
|
time.Sleep(10 * time.Millisecond)
|
|
currentRunning.Add(-1)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
ctx := context.Background()
|
|
err := NewParallelExecutor(1, executors...)(ctx)
|
|
assert.NoError(t, err) //nolint:testifylint // pre-existing issue from nektos/act
|
|
|
|
assert.Equal(t, int32(1), maxConcurrent, "Should never exceed 1 concurrent execution")
|
|
assert.Len(t, executionOrder, 5, "All tasks should execute")
|
|
})
|
|
|
|
t.Run("MaxParallel=3 Limited", func(t *testing.T) {
|
|
var currentRunning atomic.Int32
|
|
var maxConcurrent int32
|
|
|
|
executors := make([]Executor, 10)
|
|
for i := range 10 {
|
|
executors[i] = func(ctx context.Context) error {
|
|
current := currentRunning.Add(1)
|
|
|
|
for {
|
|
maxValue := atomic.LoadInt32(&maxConcurrent)
|
|
if current <= maxValue || atomic.CompareAndSwapInt32(&maxConcurrent, maxValue, current) {
|
|
break
|
|
}
|
|
}
|
|
|
|
time.Sleep(20 * time.Millisecond)
|
|
currentRunning.Add(-1)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
ctx := context.Background()
|
|
err := NewParallelExecutor(3, executors...)(ctx)
|
|
assert.NoError(t, err) //nolint:testifylint // pre-existing issue from nektos/act
|
|
|
|
assert.LessOrEqual(t, int(maxConcurrent), 3, "Should never exceed 3 concurrent executions")
|
|
assert.GreaterOrEqual(t, int(maxConcurrent), 1, "Should have at least 1 concurrent execution")
|
|
})
|
|
|
|
t.Run("MaxParallel=0 Uses1Worker", func(t *testing.T) {
|
|
var maxConcurrent int32
|
|
var currentRunning atomic.Int32
|
|
|
|
executors := make([]Executor, 5)
|
|
for i := range 5 {
|
|
executors[i] = func(ctx context.Context) error {
|
|
current := currentRunning.Add(1)
|
|
|
|
for {
|
|
maxValue := atomic.LoadInt32(&maxConcurrent)
|
|
if current <= maxValue || atomic.CompareAndSwapInt32(&maxConcurrent, maxValue, current) {
|
|
break
|
|
}
|
|
}
|
|
|
|
time.Sleep(10 * time.Millisecond)
|
|
currentRunning.Add(-1)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
ctx := context.Background()
|
|
// When maxParallel is 0 or negative, it defaults to 1
|
|
err := NewParallelExecutor(0, executors...)(ctx)
|
|
assert.NoError(t, err) //nolint:testifylint // pre-existing issue from nektos/act
|
|
|
|
assert.Equal(t, int32(1), maxConcurrent, "Should use 1 worker when max-parallel is 0")
|
|
})
|
|
}
|
|
|
|
// TestMaxParallelWithErrors tests error handling with max-parallel
|
|
func TestMaxParallelWithErrors(t *testing.T) {
|
|
t.Run("OneTaskFailsOthersContinue", func(t *testing.T) {
|
|
var successCount int32
|
|
|
|
executors := make([]Executor, 5)
|
|
for i := range 5 {
|
|
taskID := i
|
|
executors[i] = func(ctx context.Context) error {
|
|
if taskID == 2 {
|
|
return assert.AnError
|
|
}
|
|
atomic.AddInt32(&successCount, 1)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
ctx := context.Background()
|
|
err := NewParallelExecutor(2, executors...)(ctx)
|
|
|
|
// Should return the error from task 2
|
|
assert.Error(t, err) //nolint:testifylint // pre-existing issue from nektos/act
|
|
|
|
// Other tasks should still execute
|
|
assert.Equal(t, int32(4), successCount, "4 tasks should succeed")
|
|
})
|
|
|
|
t.Run("ContextCancellation", func(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
var startedCount int32
|
|
executors := make([]Executor, 10)
|
|
for i := range 10 {
|
|
executors[i] = func(ctx context.Context) error {
|
|
atomic.AddInt32(&startedCount, 1)
|
|
time.Sleep(100 * time.Millisecond)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Cancel after a short delay
|
|
go func() {
|
|
time.Sleep(30 * time.Millisecond)
|
|
cancel()
|
|
}()
|
|
|
|
err := NewParallelExecutor(3, executors...)(ctx)
|
|
assert.Error(t, err) //nolint:testifylint // pre-existing issue from nektos/act
|
|
assert.ErrorIs(t, err, context.Canceled) //nolint:testifylint // pre-existing issue from nektos/act
|
|
|
|
// Not all tasks should start due to cancellation (but timing may vary)
|
|
// Just verify cancellation occurred
|
|
t.Logf("Started %d tasks before cancellation", startedCount)
|
|
})
|
|
}
|
|
|
|
// TestMaxParallelPerformance tests performance characteristics
|
|
func TestMaxParallelPerformance(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("Skipping performance test in short mode")
|
|
}
|
|
|
|
t.Run("ParallelFasterThanSequential", func(t *testing.T) {
|
|
executors := make([]Executor, 10)
|
|
for i := range 10 {
|
|
executors[i] = func(ctx context.Context) error {
|
|
time.Sleep(50 * time.Millisecond)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
ctx := context.Background()
|
|
|
|
// Sequential (max-parallel=1)
|
|
start := time.Now()
|
|
err := NewParallelExecutor(1, executors...)(ctx)
|
|
sequentialDuration := time.Since(start)
|
|
assert.NoError(t, err) //nolint:testifylint // pre-existing issue from nektos/act
|
|
|
|
// Parallel (max-parallel=5)
|
|
start = time.Now()
|
|
err = NewParallelExecutor(5, executors...)(ctx)
|
|
parallelDuration := time.Since(start)
|
|
assert.NoError(t, err) //nolint:testifylint // pre-existing issue from nektos/act
|
|
|
|
// Parallel should be significantly faster
|
|
assert.Less(t, parallelDuration, sequentialDuration/2,
|
|
"Parallel execution should be at least 2x faster")
|
|
})
|
|
|
|
t.Run("OptimalWorkerCount", func(t *testing.T) {
|
|
executors := make([]Executor, 20)
|
|
for i := range 20 {
|
|
executors[i] = func(ctx context.Context) error {
|
|
time.Sleep(10 * time.Millisecond)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
ctx := context.Background()
|
|
|
|
// Test with different worker counts
|
|
workerCounts := []int{1, 2, 5, 10, 20}
|
|
durations := make(map[int]time.Duration)
|
|
|
|
for _, count := range workerCounts {
|
|
start := time.Now()
|
|
err := NewParallelExecutor(count, executors...)(ctx)
|
|
durations[count] = time.Since(start)
|
|
assert.NoError(t, err) //nolint:testifylint // pre-existing issue from nektos/act
|
|
}
|
|
|
|
// More workers should generally be faster (up to a point)
|
|
assert.Less(t, durations[5], durations[1], "5 workers should be faster than 1")
|
|
assert.Less(t, durations[10], durations[2], "10 workers should be faster than 2")
|
|
})
|
|
}
|
|
|
|
// TestMaxParallelResourceSharing tests resource sharing scenarios
|
|
func TestMaxParallelResourceSharing(t *testing.T) {
|
|
t.Run("SharedResourceWithMutex", func(t *testing.T) {
|
|
var sharedCounter int
|
|
var mu sync.Mutex
|
|
|
|
executors := make([]Executor, 100)
|
|
for i := range 100 {
|
|
executors[i] = func(ctx context.Context) error {
|
|
mu.Lock()
|
|
sharedCounter++
|
|
mu.Unlock()
|
|
return nil
|
|
}
|
|
}
|
|
|
|
ctx := context.Background()
|
|
err := NewParallelExecutor(10, executors...)(ctx)
|
|
assert.NoError(t, err) //nolint:testifylint // pre-existing issue from nektos/act
|
|
|
|
assert.Equal(t, 100, sharedCounter, "All tasks should increment counter")
|
|
})
|
|
|
|
t.Run("ChannelCommunication", func(t *testing.T) {
|
|
resultChan := make(chan int, 50)
|
|
|
|
executors := make([]Executor, 50)
|
|
for i := range 50 {
|
|
taskID := i
|
|
executors[i] = func(ctx context.Context) error {
|
|
resultChan <- taskID
|
|
return nil
|
|
}
|
|
}
|
|
|
|
ctx := context.Background()
|
|
err := NewParallelExecutor(5, executors...)(ctx)
|
|
assert.NoError(t, err) //nolint:testifylint // pre-existing issue from nektos/act
|
|
|
|
close(resultChan)
|
|
|
|
results := make(map[int]bool)
|
|
for result := range resultChan {
|
|
results[result] = true
|
|
}
|
|
|
|
assert.Len(t, results, 50, "All task IDs should be received")
|
|
})
|
|
}
|