diff --git a/act/runner/job_executor.go b/act/runner/job_executor.go index d4a3076b..7cc5fa9d 100644 --- a/act/runner/job_executor.go +++ b/act/runner/job_executor.go @@ -5,15 +5,46 @@ package runner import ( + "archive/tar" + "bytes" "context" + "encoding/base64" + "encoding/json" "fmt" + "io" + "net/http" + "path" + "slices" "strconv" + "strings" "time" + "unicode" "gitea.com/gitea/runner/act/common" + "gitea.com/gitea/runner/act/container" "gitea.com/gitea/runner/act/model" ) +const maxJobSummaryBytes = 1024 * 1024 + +// jobSummaryTruncationMarker is appended to a summary that exceeded the size limit +// so the rendered output makes the truncation visible instead of silently cutting off. +const jobSummaryTruncationMarker = "\n\n---\n\n*Job summary truncated: it exceeded the maximum allowed size.*\n" + +var ( + jobSummaryUploadRetryDelay = time.Second + // jobSummaryUploadRequestTimeout bounds a single step upload request. It is kept + // below jobSummaryUploadPhaseTimeout so one slow or unreachable request times out + // and lets the remaining steps still upload within the phase budget, instead of a + // single stuck request consuming the whole phase. + jobSummaryUploadRequestTimeout = 5 * time.Second + // jobSummaryUploadPhaseTimeout bounds the total time spent uploading all step + // summaries. The uploads run inside the job cleanup budget that is also used to + // stop and remove the container, so a slow or unreachable endpoint must not be + // allowed to consume it; this keeps the remaining budget available for teardown. + jobSummaryUploadPhaseTimeout = 15 * time.Second +) + type jobInfo interface { matrix() map[string]any steps() []*model.Step @@ -80,8 +111,10 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo return common.NewErrorExecutor(err) } + stepIdx := stepModel.Number preExec := step.pre() preSteps = append(preSteps, useStepLogger(rc, stepModel, stepStagePre, func(ctx context.Context) error { + rc.CurrentStepIndex = stepIdx preErr := preExec(ctx) if preErr != nil { reportStepError(ctx, preErr) @@ -93,6 +126,7 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo stepExec := step.main() steps = append(steps, useStepLogger(rc, stepModel, stepStageMain, func(ctx context.Context) error { + rc.CurrentStepIndex = stepIdx err := stepExec(ctx) if err != nil { reportStepError(ctx, err) @@ -104,6 +138,7 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo postFn := step.post() postExec := useStepLogger(rc, stepModel, stepStagePost, func(ctx context.Context) error { + rc.CurrentStepIndex = stepIdx err := postFn(ctx) if err != nil { reportStepError(ctx, err) @@ -129,6 +164,7 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo defer cancel() logger := common.Logger(ctx) + tryUploadJobSummary(ctx, rc) // For Gitea // We don't need to call `stopServiceContainers` here since it will be called by following `info.stopContainer` // logger.Infof("Cleaning up services for job %s", rc.JobName) @@ -235,6 +271,180 @@ func setJobOutputs(ctx context.Context, rc *RunContext) { } } +func tryUploadJobSummary(ctx context.Context, rc *RunContext) { + if rc == nil || rc.JobContainer == nil || rc.Config == nil { + return + } + // Bound the whole upload phase so a slow or unreachable endpoint cannot consume + // the job cleanup budget reserved for stopping and removing the container. + ctx, cancel := context.WithTimeout(ctx, jobSummaryUploadPhaseTimeout) + defer cancel() + env := rc.GetEnv() + caps := strings.TrimSpace(env["GITEA_ACTIONS_CAPABILITIES"]) + if !hasJobSummaryCapability(caps) { + // Server did not advertise support. Do not attempt upload. + return + } + runtimeURL := strings.TrimSpace(env["ACTIONS_RUNTIME_URL"]) + runtimeToken := strings.TrimSpace(env["ACTIONS_RUNTIME_TOKEN"]) + runID := strings.TrimSpace(env["GITEA_RUN_ID"]) + if runtimeURL == "" || runtimeToken == "" || runID == "" { + return + } + if rc.Run == nil || rc.Run.Job() == nil { + return + } + // The numeric ActionRunJob ID is not exposed in the proto Task message or task context, + // but the server signs it into the ACTIONS_RUNTIME_TOKEN JWT claims. We decode the + // unverified claims to retrieve it; the server re-verifies the token on the request. + jobID := extractJobIDFromRuntimeToken(runtimeToken) + if jobID <= 0 { + return + } + + base := strings.TrimRight(runtimeURL, "/") + "/_apis/pipelines/workflows/" + runID + + "/jobs/" + strconv.FormatInt(jobID, 10) + "/steps/" + actPath := rc.JobContainer.GetActPath() + // Reuse a single client across all step uploads so connections can be pooled. + client := &http.Client{Timeout: jobSummaryUploadRequestTimeout} + for i := range rc.Run.Job().Steps { + summaryPath := path.Join(actPath, "workflow", "step-summary-"+strconv.Itoa(i)+".md") + body, ok := readSingleFileFromContainerArchive(ctx, rc.JobContainer, summaryPath, maxJobSummaryBytes) + if !ok || len(body) == 0 { + continue + } + uploadJobSummary(ctx, client, base+strconv.Itoa(i)+"/summary", runtimeToken, body) + } +} + +// extractJobIDFromRuntimeToken returns the JobID claim from an ACTIONS_RUNTIME_TOKEN JWT +// without verifying its signature. Returns 0 if the token is unparseable or has no JobID. +func extractJobIDFromRuntimeToken(token string) int64 { + parts := strings.Split(token, ".") + if len(parts) != 3 { + return 0 + } + payload, err := base64.RawURLEncoding.DecodeString(parts[1]) + if err != nil { + return 0 + } + var claims struct { + JobID int64 `json:"JobID"` + } + if err := json.Unmarshal(payload, &claims); err != nil { + return 0 + } + return claims.JobID +} + +func hasJobSummaryCapability(caps string) bool { + return slices.Contains(strings.FieldsFunc(caps, func(r rune) bool { + return r == ',' || unicode.IsSpace(r) + }), "job-summary") +} + +func uploadJobSummary(ctx context.Context, client *http.Client, url, runtimeToken string, body []byte) { + logger := common.Logger(ctx) + + var lastStatus int + var lastErr error + for attempt := 0; attempt < 2; attempt++ { + status, err := putJobSummary(ctx, client, url, runtimeToken, body) + if err == nil && status/100 == 2 { + return + } + lastStatus = status + lastErr = err + if attempt == 1 || !isTransientJobSummaryUploadFailure(status, err) { + break + } + timer := time.NewTimer(jobSummaryUploadRetryDelay) + select { + case <-ctx.Done(): + timer.Stop() + lastErr = ctx.Err() + attempt = 1 + case <-timer.C: + } + } + + // Best-effort only; do not fail job, but log because capability was advertised. + if lastErr != nil { + logger.WithError(lastErr).Warn("job summary upload failed") + return + } + logger.Warnf("job summary upload failed: status=%d", lastStatus) +} + +func putJobSummary(ctx context.Context, client *http.Client, url, runtimeToken string, body []byte) (int, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewReader(body)) + if err != nil { + return 0, err + } + req.Header.Set("Authorization", "Bearer "+runtimeToken) + req.Header.Set("Content-Type", "text/markdown; charset=utf-8") + + resp, err := client.Do(req) + if err != nil { + return 0, err + } + defer resp.Body.Close() + _, _ = io.Copy(io.Discard, resp.Body) + return resp.StatusCode, nil +} + +func isTransientJobSummaryUploadFailure(status int, err error) bool { + return err != nil || status == http.StatusRequestTimeout || status == http.StatusTooManyRequests || status/100 == 5 +} + +func readSingleFileFromContainerArchive(ctx context.Context, env container.ExecutionsEnvironment, p string, maxBytes int64) ([]byte, bool) { + rc, err := env.GetContainerArchive(ctx, p) + if err != nil { + return nil, false + } + defer rc.Close() + + tr := tar.NewReader(rc) + for { + header, err := tr.Next() + if err == io.EOF { + return nil, false + } + if err != nil { + return nil, false + } + if header.Typeflag != tar.TypeReg { + continue + } + if !archiveEntryMatchesPath(header.Name, p) { + continue + } + // Summaries larger than the limit are truncated rather than dropped, so the + // user still gets the leading content (mirroring how GitHub caps oversized + // step summaries instead of discarding them). Read one extra byte so an + // over-limit file is detected from the actual stream rather than trusting + // header.Size, then cap the returned content at maxBytes. + b, err := io.ReadAll(io.LimitReader(tr, maxBytes+1)) + if err != nil { + return nil, false + } + if int64(len(b)) > maxBytes { + // Reserve room for the marker so the marked-up result still fits in maxBytes. + marker := []byte(jobSummaryTruncationMarker) + keep := max(maxBytes-int64(len(marker)), 0) + b = append(b[:keep], marker...) + common.Logger(ctx).Warnf("job summary truncated: path=%s max=%d", p, maxBytes) + } + return b, true + } +} + +func archiveEntryMatchesPath(entryName, requestedPath string) bool { + entryName = path.Clean(strings.TrimPrefix(entryName, "/")) + requestedPath = path.Clean(strings.TrimPrefix(requestedPath, "/")) + return entryName == requestedPath || entryName == path.Base(requestedPath) +} + func useStepLogger(rc *RunContext, stepModel *model.Step, stage stepStage, executor common.Executor) common.Executor { return func(ctx context.Context) error { ctx = withStepLogger(ctx, stepModel.Number, stepModel.ID, rc.ExprEval.Interpolate(ctx, stepModel.String()), stage.String()) diff --git a/act/runner/job_executor_test.go b/act/runner/job_executor_test.go index a89ed0c7..55228e85 100644 --- a/act/runner/job_executor_test.go +++ b/act/runner/job_executor_test.go @@ -5,19 +5,29 @@ package runner import ( + "archive/tar" + "bytes" "context" + "encoding/base64" "errors" "fmt" "io" + "net/http" + "net/http/httptest" "slices" + "strconv" + "strings" "testing" + "time" "gitea.com/gitea/runner/act/common" "gitea.com/gitea/runner/act/container" "gitea.com/gitea/runner/act/model" + logrustest "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" ) func TestJobExecutor(t *testing.T) { @@ -336,3 +346,331 @@ func TestNewJobExecutor(t *testing.T) { }) } } + +func TestHasJobSummaryCapability(t *testing.T) { + assert.True(t, hasJobSummaryCapability("cache,job-summary artifacts")) + assert.True(t, hasJobSummaryCapability("cache,\njob-summary\tartifacts")) + assert.False(t, hasJobSummaryCapability("not-job-summary,job-summary-v2")) +} + +// fakeRuntimeToken builds a JWT-shaped string whose middle (claims) segment encodes +// the given JobID. The header and signature segments are filler — the runner does not +// verify the signature; the server does. +func fakeRuntimeToken(jobID int64) string { + header := base64.RawURLEncoding.EncodeToString([]byte(`{"alg":"HS256","typ":"JWT"}`)) + claims := base64.RawURLEncoding.EncodeToString(fmt.Appendf(nil, `{"JobID":%d}`, jobID)) + sig := base64.RawURLEncoding.EncodeToString([]byte("sig")) + return header + "." + claims + "." + sig +} + +func newJobSummaryRC(env map[string]string, jobContainer container.ExecutionsEnvironment, stepCount int) *RunContext { + steps := make([]*model.Step, stepCount) + for i := range steps { + steps[i] = &model.Step{ID: strconv.Itoa(i)} + } + return &RunContext{ + Config: &Config{}, + JobContainer: jobContainer, + Env: env, + Run: &model.Run{ + JobID: "test", + Workflow: &model.Workflow{ + Jobs: map[string]*model.Job{ + "test": {Steps: steps}, + }, + }, + }, + } +} + +func TestTryUploadJobSummaryRetriesTransientFailure(t *testing.T) { + oldDelay := jobSummaryUploadRetryDelay + jobSummaryUploadRetryDelay = 0 + defer func() { + jobSummaryUploadRetryDelay = oldDelay + }() + + runtimeToken := fakeRuntimeToken(34) + + requests := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + requests++ + assert.Equal(t, http.MethodPut, r.Method) + assert.Equal(t, "/_apis/pipelines/workflows/12/jobs/34/steps/0/summary", r.URL.Path) + assert.Equal(t, "Bearer "+runtimeToken, r.Header.Get("Authorization")) + assert.Equal(t, "text/markdown; charset=utf-8", r.Header.Get("Content-Type")) + body, err := io.ReadAll(r.Body) + assert.NoError(t, err) + assert.Equal(t, []byte("# summary"), body) + if requests == 1 { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusNoContent) + })) + defer server.Close() + + ctx := context.Background() + cm := &containerMock{} + cm.On("GetContainerArchive", mock.Anything, "/var/run/act/workflow/step-summary-0.md").Return( + io.NopCloser(bytes.NewReader(tarArchive(t, tarEntry{name: "step-summary-0.md", body: "# summary"}))), + nil, + ).Once() + + rc := newJobSummaryRC(map[string]string{ + "GITEA_ACTIONS_CAPABILITIES": "cache, job-summary", + "ACTIONS_RUNTIME_URL": server.URL, + "ACTIONS_RUNTIME_TOKEN": runtimeToken, + "GITEA_RUN_ID": "12", + }, cm, 1) + + tryUploadJobSummary(ctx, rc) + + assert.Equal(t, 2, requests) + cm.AssertExpectations(t) +} + +func TestTryUploadJobSummaryStopsAtPhaseTimeout(t *testing.T) { + oldPhase := jobSummaryUploadPhaseTimeout + jobSummaryUploadPhaseTimeout = 100 * time.Millisecond + defer func() { + jobSummaryUploadPhaseTimeout = oldPhase + }() + + runtimeToken := fakeRuntimeToken(34) + + // The server blocks until either the request context is cancelled (the behaviour + // under test: the phase timeout aborts the in-flight upload) or the test tears it + // down. Without the phase timeout the upload would hang until the 30s client + // timeout instead of releasing the cleanup budget. The release channel guarantees + // the handler always returns so server.Close() cannot itself hang. + release := make(chan struct{}) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + select { + case <-r.Context().Done(): + case <-release: + } + })) + defer server.Close() + defer close(release) + + ctx := context.Background() + cm := &containerMock{} + cm.On("GetContainerArchive", mock.Anything, "/var/run/act/workflow/step-summary-0.md").Return( + io.NopCloser(bytes.NewReader(tarArchive(t, tarEntry{name: "step-summary-0.md", body: "# summary"}))), + nil, + ).Once() + + rc := newJobSummaryRC(map[string]string{ + "GITEA_ACTIONS_CAPABILITIES": "job-summary", + "ACTIONS_RUNTIME_URL": server.URL, + "ACTIONS_RUNTIME_TOKEN": runtimeToken, + "GITEA_RUN_ID": "12", + }, cm, 1) + + done := make(chan struct{}) + go func() { + defer close(done) + tryUploadJobSummary(ctx, rc) + }() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("tryUploadJobSummary did not honour the phase timeout") + } + cm.AssertExpectations(t) +} + +func TestTryUploadJobSummaryUploadsEachStepIndependently(t *testing.T) { + runtimeToken := fakeRuntimeToken(34) + + type upload struct { + path string + body string + } + var got []upload + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + assert.NoError(t, err) + got = append(got, upload{r.URL.Path, string(body)}) + w.WriteHeader(http.StatusNoContent) + })) + defer server.Close() + + ctx := context.Background() + cm := &containerMock{} + // Three steps: 0 has content, 1 has empty content (skipped), 2 has content. + cm.On("GetContainerArchive", mock.Anything, "/var/run/act/workflow/step-summary-0.md").Return( + io.NopCloser(bytes.NewReader(tarArchive(t, tarEntry{name: "step-summary-0.md", body: "first"}))), + nil, + ).Once() + cm.On("GetContainerArchive", mock.Anything, "/var/run/act/workflow/step-summary-1.md").Return( + io.NopCloser(bytes.NewReader(tarArchive(t, tarEntry{name: "step-summary-1.md", body: ""}))), + nil, + ).Once() + cm.On("GetContainerArchive", mock.Anything, "/var/run/act/workflow/step-summary-2.md").Return( + io.NopCloser(bytes.NewReader(tarArchive(t, tarEntry{name: "step-summary-2.md", body: "third"}))), + nil, + ).Once() + + rc := newJobSummaryRC(map[string]string{ + "GITEA_ACTIONS_CAPABILITIES": "job-summary", + "ACTIONS_RUNTIME_URL": server.URL, + "ACTIONS_RUNTIME_TOKEN": runtimeToken, + "GITEA_RUN_ID": "12", + }, cm, 3) + + tryUploadJobSummary(ctx, rc) + + assert.Equal(t, []upload{ + {"/_apis/pipelines/workflows/12/jobs/34/steps/0/summary", "first"}, + {"/_apis/pipelines/workflows/12/jobs/34/steps/2/summary", "third"}, + }, got) + cm.AssertExpectations(t) +} + +func TestTryUploadJobSummaryRequiresExactCapability(t *testing.T) { + requests := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + requests++ + w.WriteHeader(http.StatusNoContent) + })) + defer server.Close() + + rc := newJobSummaryRC(map[string]string{ + "GITEA_ACTIONS_CAPABILITIES": "not-job-summary,job-summary-v2", + "ACTIONS_RUNTIME_URL": server.URL, + "ACTIONS_RUNTIME_TOKEN": fakeRuntimeToken(34), + "GITEA_RUN_ID": "12", + }, &containerMock{}, 1) + + tryUploadJobSummary(context.Background(), rc) + + assert.Equal(t, 0, requests) +} + +func TestTryUploadJobSummarySkipsWhenJobIDMissingFromToken(t *testing.T) { + requests := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + requests++ + w.WriteHeader(http.StatusNoContent) + })) + defer server.Close() + + rc := newJobSummaryRC(map[string]string{ + "GITEA_ACTIONS_CAPABILITIES": "job-summary", + "ACTIONS_RUNTIME_URL": server.URL, + "ACTIONS_RUNTIME_TOKEN": "not-a-jwt", + "GITEA_RUN_ID": "12", + }, &containerMock{}, 1) + + tryUploadJobSummary(context.Background(), rc) + + assert.Equal(t, 0, requests) +} + +func TestExtractJobIDFromRuntimeToken(t *testing.T) { + assert.Equal(t, int64(42), extractJobIDFromRuntimeToken(fakeRuntimeToken(42))) + assert.Equal(t, int64(0), extractJobIDFromRuntimeToken("not-a-jwt")) + assert.Equal(t, int64(0), extractJobIDFromRuntimeToken("a.b.c")) + assert.Equal(t, int64(0), extractJobIDFromRuntimeToken("")) +} + +func TestReadSingleFileFromContainerArchiveFindsMatchingRegularFile(t *testing.T) { + ctx := context.Background() + cm := &containerMock{} + cm.On("GetContainerArchive", ctx, "/var/run/act/workflow/SUMMARY.md").Return( + io.NopCloser(bytes.NewReader(tarArchive(t, + tarEntry{name: "workflow", typeflag: tar.TypeDir}, + tarEntry{name: "other.md", body: "wrong"}, + tarEntry{name: "SUMMARY.md", body: "right"}, + ))), + nil, + ).Once() + + body, ok := readSingleFileFromContainerArchive(ctx, cm, "/var/run/act/workflow/SUMMARY.md", 1024) + + assert.True(t, ok) + assert.Equal(t, []byte("right"), body) + cm.AssertExpectations(t) +} + +func TestReadSingleFileFromContainerArchiveTruncatesWhenTooLarge(t *testing.T) { + logger, hook := logrustest.NewNullLogger() + ctx := common.WithLogger(context.Background(), logger) + cm := &containerMock{} + content := strings.Repeat("a", 300) + cm.On("GetContainerArchive", ctx, "/var/run/act/workflow/SUMMARY.md").Return( + io.NopCloser(bytes.NewReader(tarArchive(t, tarEntry{name: "SUMMARY.md", body: content}))), + nil, + ).Once() + + const maxBytes = 200 + body, ok := readSingleFileFromContainerArchive(ctx, cm, "/var/run/act/workflow/SUMMARY.md", maxBytes) + + // Oversized summaries are truncated to the limit (reserving room for the marker) + // rather than dropped entirely, and the truncation marker is appended. + assert.True(t, ok) + assert.LessOrEqual(t, len(body), maxBytes) + keep := maxBytes - len(jobSummaryTruncationMarker) + assert.Equal(t, []byte(content[:keep]+jobSummaryTruncationMarker), body) + if assert.Len(t, hook.Entries, 1) { + assert.Contains(t, hook.Entries[0].Message, "job summary truncated") + } + cm.AssertExpectations(t) +} + +func TestReadSingleFileFromContainerArchiveKeepsExactLimitWithoutWarning(t *testing.T) { + logger, hook := logrustest.NewNullLogger() + ctx := common.WithLogger(context.Background(), logger) + cm := &containerMock{} + cm.On("GetContainerArchive", ctx, "/var/run/act/workflow/SUMMARY.md").Return( + io.NopCloser(bytes.NewReader(tarArchive(t, tarEntry{name: "SUMMARY.md", body: "abc"}))), + nil, + ).Once() + + body, ok := readSingleFileFromContainerArchive(ctx, cm, "/var/run/act/workflow/SUMMARY.md", 3) + + // A summary that is exactly at the limit is kept whole and not flagged as truncated. + assert.True(t, ok) + assert.Equal(t, []byte("abc"), body) + assert.Empty(t, hook.Entries) + cm.AssertExpectations(t) +} + +type tarEntry struct { + name string + body string + typeflag byte +} + +func tarArchive(t *testing.T, entries ...tarEntry) []byte { + t.Helper() + + buf := &bytes.Buffer{} + tw := tar.NewWriter(buf) + for _, entry := range entries { + typeflag := entry.typeflag + if typeflag == 0 { + typeflag = tar.TypeReg + } + header := &tar.Header{ + Name: entry.name, + Typeflag: typeflag, + Mode: 0o644, + Size: int64(len(entry.body)), + } + if typeflag == tar.TypeDir { + header.Mode = 0o755 + header.Size = 0 + } + require.NoError(t, tw.WriteHeader(header)) + if typeflag == tar.TypeReg { + _, err := tw.Write([]byte(entry.body)) + require.NoError(t, err) + } + } + require.NoError(t, tw.Close()) + return buf.Bytes() +} diff --git a/act/runner/run_context.go b/act/runner/run_context.go index a7f98ec9..0ff660bd 100644 --- a/act/runner/run_context.go +++ b/act/runner/run_context.go @@ -36,15 +36,19 @@ import ( // RunContext contains info about current job type RunContext struct { - Name string - Config *Config - Matrix map[string]any - Run *model.Run - EventJSON string - Env map[string]string - GlobalEnv map[string]string // to pass env changes of GITHUB_ENV and set-env correctly, due to dirty Env field - ExtraPath []string - CurrentStep string + Name string + Config *Config + Matrix map[string]any + Run *model.Run + EventJSON string + Env map[string]string + GlobalEnv map[string]string // to pass env changes of GITHUB_ENV and set-env correctly, due to dirty Env field + ExtraPath []string + CurrentStep string + // CurrentStepIndex is the index of the top-level job step currently executing + // (model.Step.Number). Composite sub-steps inherit the outer step's index by + // walking the Parent chain; see topLevelRunContext. + CurrentStepIndex int StepResults map[string]*model.StepResult IntraActionState map[string]map[string]string ExprEval ExpressionEvaluator @@ -57,6 +61,14 @@ type RunContext struct { Masks []string cleanUpJobContainer common.Executor caller *caller // job calling this RunContext (reusable workflows) + // summaryFileInitialized tracks which per-step summary files (workflow/step-summary-N.md) + // have already been created on the JobContainer. The runner sets up file-command files + // via JobContainer.Copy at the start of every phase, which truncates them — fine for + // GITHUB_ENV/OUTPUT/STATE/PATH (consumed per phase) but wrong for GITHUB_STEP_SUMMARY, + // which has accumulating semantics. We initialize each step's summary file exactly once + // so writes from later phases and from composite sub-steps append to the same file. + // Only populated on the top-level RunContext; child RCs walk Parent via topLevelRunContext. + summaryFileInitialized map[int]bool // outputTemplate is this combination's pristine snapshot of the job's output expressions, // captured before execution so each matrix combo interpolates from the originals rather // than from a sibling's already-resolved values written into the shared Job.Outputs. @@ -704,6 +716,17 @@ func (rc *RunContext) steps() []*model.Step { return steps } +// topLevelRunContext walks the Parent chain to the outermost RunContext. Composite +// actions create child RunContexts whose sub-steps need to share the outer job step's +// summary file path so that nested writes accumulate under the right step_index. +func (rc *RunContext) topLevelRunContext() *RunContext { + top := rc + for top.Parent != nil { + top = top.Parent + } + return top +} + // Executor returns a pipeline executor for all the steps in the job func (rc *RunContext) Executor() (common.Executor, error) { var executor common.Executor diff --git a/act/runner/step.go b/act/runner/step.go index 8a241acf..c67f5ba7 100644 --- a/act/runner/step.go +++ b/act/runner/step.go @@ -124,7 +124,12 @@ func runStepExecutor(step step, stage stepStage, executor common.Executor) commo envFileCommand := path.Join("workflow", "envs.txt") (*step.getEnv())["GITHUB_ENV"] = path.Join(actPath, envFileCommand) - summaryFileCommand := path.Join("workflow", "SUMMARY.md") + // Per-step summary file. Composite sub-steps share the outer job step's index + // via the Parent chain so all writes from within a composite action accumulate + // in the same file and upload under the outer step_index. + topRC := rc.topLevelRunContext() + stepSummaryIndex := topRC.CurrentStepIndex + summaryFileCommand := path.Join("workflow", "step-summary-"+strconv.Itoa(stepSummaryIndex)+".md") (*step.getEnv())["GITHUB_STEP_SUMMARY"] = path.Join(actPath, summaryFileCommand) { @@ -136,22 +141,23 @@ func runStepExecutor(step step, stage stepStage, executor common.Executor) commo (*step.getEnv())["GITEA_STEP_SUMMARY"] = (*step.getEnv())["GITHUB_STEP_SUMMARY"] } - _ = rc.JobContainer.Copy(actPath, &container.FileEntry{ - Name: outputFileCommand, - Mode: 0o666, - }, &container.FileEntry{ - Name: stateFileCommand, - Mode: 0o666, - }, &container.FileEntry{ - Name: pathFileCommand, - Mode: 0o666, - }, &container.FileEntry{ - Name: envFileCommand, - Mode: 0o666, - }, &container.FileEntry{ - Name: summaryFileCommand, - Mode: 0o666, - })(ctx) + // Reset the per-phase file-command files. GITHUB_STEP_SUMMARY is intentionally + // excluded here and initialized below at most once per step so writes from later + // phases and from composite sub-steps accumulate instead of being truncated. + files := []*container.FileEntry{ + {Name: outputFileCommand, Mode: 0o666}, + {Name: stateFileCommand, Mode: 0o666}, + {Name: pathFileCommand, Mode: 0o666}, + {Name: envFileCommand, Mode: 0o666}, + } + if topRC.summaryFileInitialized == nil { + topRC.summaryFileInitialized = map[int]bool{} + } + if !topRC.summaryFileInitialized[stepSummaryIndex] { + files = append(files, &container.FileEntry{Name: summaryFileCommand, Mode: 0o666}) + topRC.summaryFileInitialized[stepSummaryIndex] = true + } + _ = rc.JobContainer.Copy(actPath, files...)(ctx) timeoutctx, cancelTimeOut := evaluateStepTimeout(ctx, rc.ExprEval, stepModel) defer cancelTimeOut() diff --git a/internal/app/cmd/daemon.go b/internal/app/cmd/daemon.go index 247d77f0..f533b017 100644 --- a/internal/app/cmd/daemon.go +++ b/internal/app/cmd/daemon.go @@ -148,6 +148,7 @@ func runDaemon(ctx context.Context, daemArgs *daemonArgs, configFile *string) fu log.Infof("runner: %s, with version: %s, with labels: %v, declare successfully", resp.Msg.Runner.Name, resp.Msg.Runner.Version, resp.Msg.Runner.Labels) } + runner.SetCapabilitiesFromDeclare(resp) if cfg.Metrics.Enabled { metrics.Init() diff --git a/internal/app/run/runner.go b/internal/app/run/runner.go index d3d787c7..9bcb9f4a 100644 --- a/internal/app/run/runner.go +++ b/internal/app/run/runner.go @@ -47,6 +47,7 @@ type Runner struct { labels labels.Labels envs map[string]string cacheHandler *artifactcache.Handler + capabilities string runningTasks sync.Map runningCount atomic.Int64 @@ -185,6 +186,14 @@ func (r *Runner) cleanupStaleTaskDirs(ctx context.Context, workdirRoot string) { } } +func (r *Runner) SetCapabilitiesFromDeclare(resp *connect.Response[runnerv1.DeclareResponse]) { + if resp == nil { + return + } + // Capability negotiation is done via response headers to avoid a hard proto bump. + r.capabilities = strings.TrimSpace(resp.Header().Get("X-Gitea-Actions-Capabilities")) +} + func (r *Runner) Run(ctx context.Context, task *runnerv1.Task) error { if _, ok := r.runningTasks.Load(task.Id); ok { return fmt.Errorf("task %d is already running", task.Id) @@ -219,9 +228,10 @@ func (r *Runner) Run(ctx context.Context, task *runnerv1.Task) error { } func (r *Runner) cloneEnvs() map[string]string { - // +3 reserves space for the per-task keys injected by run(): - // ACTIONS_ID_TOKEN_REQUEST_URL, ACTIONS_ID_TOKEN_REQUEST_TOKEN, ACTIONS_RUNTIME_TOKEN. - envs := make(map[string]string, len(r.envs)+3) + // Reserve space for the per-task keys injected by run(): + // ACTIONS_ID_TOKEN_REQUEST_URL, ACTIONS_ID_TOKEN_REQUEST_TOKEN, ACTIONS_RUNTIME_TOKEN, + // GITEA_ACTIONS_CAPABILITIES, GITEA_RUN_ID. + envs := make(map[string]string, len(r.envs)+5) maps.Copy(envs, r.envs) return envs } @@ -261,6 +271,13 @@ func (r *Runner) run(ctx context.Context, task *runnerv1.Task, reporter *report. taskContext := task.Context.Fields envs := r.cloneEnvs() + if r.capabilities != "" { + envs["GITEA_ACTIONS_CAPABILITIES"] = r.capabilities + } + if v := taskContext["run_id"].GetStringValue(); v != "" { + envs["GITEA_RUN_ID"] = v + } + log.Infof("task %v repo is %v %v %v", task.Id, taskContext["repository"].GetStringValue(), r.getDefaultActionsURL(task), r.client.Address())