diff --git a/internal/pkg/report/reporter.go b/internal/pkg/report/reporter.go index dc2bfbca..6229e64e 100644 --- a/internal/pkg/report/reporter.go +++ b/internal/pkg/report/reporter.go @@ -10,6 +10,7 @@ import ( "regexp" "strings" "sync" + "sync/atomic" "time" "gitea.com/gitea/act_runner/internal/pkg/client" @@ -48,6 +49,10 @@ type Reporter struct { outputs sync.Map daemon chan struct{} + // Unix-nanos of the last successful UpdateTask. Atomic so the heartbeat + // guard in ReportState reads it without contending stateMu. + lastReportedAtNanos atomic.Int64 + // Adaptive batching control logReportInterval time.Duration logReportMaxLatency time.Duration @@ -489,8 +494,12 @@ func (r *Reporter) ReportState(reportResult bool) error { // Consume stateChanged atomically with the snapshot; restored on error // below so a concurrent Fire() during UpdateTask isn't silently lost. + // Heartbeat at stateReportInterval even when nothing changed, so the server + // doesn't time out long-running silent jobs as orphaned (#826). + last := r.lastReportedAtNanos.Load() + withinHeartbeatInterval := last != 0 && time.Since(time.Unix(0, last)) < r.stateReportInterval r.stateMu.Lock() - if !reportResult && !r.stateChanged && len(outputs) == 0 { + if !reportResult && !r.stateChanged && len(outputs) == 0 && withinHeartbeatInterval { r.stateMu.Unlock() return nil } @@ -517,6 +526,7 @@ func (r *Reporter) ReportState(reportResult bool) error { return err } metrics.ReportStateTotal.WithLabelValues(metrics.LabelResultSuccess).Inc() + r.lastReportedAtNanos.Store(time.Now().UnixNano()) for _, k := range resp.Msg.SentOutputs { r.outputs.Store(k, struct{}{}) diff --git a/internal/pkg/report/reporter_test.go b/internal/pkg/report/reporter_test.go index 15b361ab..74697851 100644 --- a/internal/pkg/report/reporter_test.go +++ b/internal/pkg/report/reporter_test.go @@ -597,3 +597,45 @@ func TestReporter_StateNotifyFlush(t *testing.T) { }, 500*time.Millisecond, 10*time.Millisecond, "step transition should have triggered immediate state flush via stateNotify") } + +// TestReporter_StateHeartbeat verifies that ReportState sends a heartbeat +// UpdateTask once stateReportInterval has elapsed since the last successful +// report, even when nothing has changed. Without this, long-running silent +// jobs (no log output, no step transitions) cause the server to time the +// task out and cancel it (#826). +func TestReporter_StateHeartbeat(t *testing.T) { + var updateTaskCalls atomic.Int64 + + client := mocks.NewClient(t) + client.On("UpdateTask", mock.Anything, mock.Anything).Return( + func(_ context.Context, _ *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) { + updateTaskCalls.Add(1) + return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil + }, + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + taskCtx, err := structpb.NewStruct(map[string]any{}) + require.NoError(t, err) + cfg, _ := config.LoadDefault("") + cfg.Runner.StateReportInterval = 50 * time.Millisecond + reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg) + reporter.ResetSteps(1) + + // First call has no prior report — sends to seed lastReportedAt. + reporter.stateMu.Lock() + reporter.stateChanged = true + reporter.stateMu.Unlock() + require.NoError(t, reporter.ReportState(false)) + require.Equal(t, int64(1), updateTaskCalls.Load()) + + // Second call immediately after with nothing changed — must skip. + require.NoError(t, reporter.ReportState(false)) + assert.Equal(t, int64(1), updateTaskCalls.Load(), "no-op ReportState within stateReportInterval must skip") + + // After stateReportInterval elapses, a heartbeat must fire even with no changes. + time.Sleep(2 * cfg.Runner.StateReportInterval) + require.NoError(t, reporter.ReportState(false)) + assert.Equal(t, int64(2), updateTaskCalls.Load(), "ReportState must heartbeat after stateReportInterval even with no state change") +}