diff --git a/go.mod b/go.mod index f4d2b071..6256b528 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/stretchr/testify v1.11.1 go.yaml.in/yaml/v4 v4.0.0-rc.3 golang.org/x/term v0.40.0 - golang.org/x/time v0.14.0 + golang.org/x/time v0.14.0 // indirect google.golang.org/protobuf v1.36.11 gopkg.in/yaml.v3 v3.0.1 gotest.tools/v3 v3.5.2 diff --git a/internal/app/poll/poller.go b/internal/app/poll/poller.go index e5f29e47..0dc8a4c3 100644 --- a/internal/app/poll/poller.go +++ b/internal/app/poll/poller.go @@ -7,13 +7,14 @@ import ( "context" "errors" "fmt" + "math/rand/v2" "sync" "sync/atomic" + "time" runnerv1 "code.gitea.io/actions-proto-go/runner/v1" "connectrpc.com/connect" log "github.com/sirupsen/logrus" - "golang.org/x/time/rate" "gitea.com/gitea/act_runner/internal/app/run" "gitea.com/gitea/act_runner/internal/pkg/client" @@ -35,6 +36,15 @@ type Poller struct { done chan struct{} } +// workerState holds per-goroutine polling state. Backoff counters are +// per-worker so that with Capacity > 1, N workers each seeing one empty +// response don't combine into a "consecutive N empty" reading on a shared +// counter and trigger an unnecessarily long backoff. +type workerState struct { + consecutiveEmpty int64 + consecutiveErrors int64 +} + func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller { pollingCtx, shutdownPolling := context.WithCancel(context.Background()) @@ -58,11 +68,10 @@ func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller { } func (p *Poller) Poll() { - limiter := rate.NewLimiter(rate.Every(p.cfg.Runner.FetchInterval), 1) wg := &sync.WaitGroup{} for i := 0; i < p.cfg.Runner.Capacity; i++ { wg.Add(1) - go p.poll(wg, limiter) + go p.poll(wg) } wg.Wait() @@ -71,9 +80,7 @@ func (p *Poller) Poll() { } func (p *Poller) PollOnce() { - limiter := rate.NewLimiter(rate.Every(p.cfg.Runner.FetchInterval), 1) - - p.pollOnce(limiter) + p.pollOnce(&workerState{}) // signal that we're done close(p.done) @@ -108,10 +115,11 @@ func (p *Poller) Shutdown(ctx context.Context) error { } } -func (p *Poller) poll(wg *sync.WaitGroup, limiter *rate.Limiter) { +func (p *Poller) poll(wg *sync.WaitGroup) { defer wg.Done() + s := &workerState{} for { - p.pollOnce(limiter) + p.pollOnce(s) select { case <-p.pollingCtx.Done(): @@ -122,19 +130,57 @@ func (p *Poller) poll(wg *sync.WaitGroup, limiter *rate.Limiter) { } } -func (p *Poller) pollOnce(limiter *rate.Limiter) { +// calculateInterval returns the polling interval with exponential backoff based on +// consecutive empty or error responses. The interval starts at FetchInterval and +// doubles with each consecutive empty/error, capped at FetchIntervalMax. +func (p *Poller) calculateInterval(s *workerState) time.Duration { + base := p.cfg.Runner.FetchInterval + maxInterval := p.cfg.Runner.FetchIntervalMax + + n := max(s.consecutiveEmpty, s.consecutiveErrors) + if n <= 1 { + return base + } + + // Capped exponential backoff: base * 2^(n-1), max shift=5 so multiplier <= 32 + shift := min(n-1, 5) + interval := base * time.Duration(int64(1)<= cfg.Runner.LogReportInterval { + log.Warnf("log_report_max_latency (%v) >= log_report_interval (%v), the max-latency timer will never fire before the periodic ticker; consider lowering log_report_max_latency", + cfg.Runner.LogReportMaxLatency, cfg.Runner.LogReportInterval) + } // although `container.network_mode` will be deprecated, but we have to be compatible with it for now. if cfg.Container.NetworkMode != "" && cfg.Container.Network == "" { diff --git a/internal/pkg/report/reporter.go b/internal/pkg/report/reporter.go index 4591119f..b7505cbd 100644 --- a/internal/pkg/report/reporter.go +++ b/internal/pkg/report/reporter.go @@ -20,6 +20,7 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" "gitea.com/gitea/act_runner/internal/pkg/client" + "gitea.com/gitea/act_runner/internal/pkg/config" ) type Reporter struct { @@ -35,16 +36,27 @@ type Reporter struct { logReplacer *strings.Replacer oldnew []string - state *runnerv1.TaskState - stateMu sync.RWMutex - outputs sync.Map - daemon chan struct{} + state *runnerv1.TaskState + stateChanged bool + stateMu sync.RWMutex + outputs sync.Map + daemon chan struct{} + + // Adaptive batching control + logReportInterval time.Duration + logReportMaxLatency time.Duration + logBatchSize int + stateReportInterval time.Duration + + // Event notification channels (non-blocking, buffered 1) + logNotify chan struct{} // signal: new log rows arrived + stateNotify chan struct{} // signal: step transition (start/stop) debugOutputEnabled bool stopCommandEndToken string } -func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.Client, task *runnerv1.Task) *Reporter { +func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.Client, task *runnerv1.Task, cfg *config.Config) *Reporter { var oldnew []string if v := task.Context.Fields["token"].GetStringValue(); v != "" { oldnew = append(oldnew, v, "***") @@ -57,11 +69,17 @@ func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.C } rv := &Reporter{ - ctx: ctx, - cancel: cancel, - client: client, - oldnew: oldnew, - logReplacer: strings.NewReplacer(oldnew...), + ctx: ctx, + cancel: cancel, + client: client, + oldnew: oldnew, + logReplacer: strings.NewReplacer(oldnew...), + logReportInterval: cfg.Runner.LogReportInterval, + logReportMaxLatency: cfg.Runner.LogReportMaxLatency, + logBatchSize: cfg.Runner.LogReportBatchSize, + stateReportInterval: cfg.Runner.StateReportInterval, + logNotify: make(chan struct{}, 1), + stateNotify: make(chan struct{}, 1), state: &runnerv1.TaskState{ Id: task.Id, }, @@ -108,11 +126,42 @@ func isJobStepEntry(entry *log.Entry) bool { return true } -func (r *Reporter) Fire(entry *log.Entry) error { - r.stateMu.Lock() - defer r.stateMu.Unlock() +// notifyLog sends a non-blocking signal that new log rows are available. +func (r *Reporter) notifyLog() { + select { + case r.logNotify <- struct{}{}: + default: + } +} - log.WithFields(entry.Data).Trace(entry.Message) +// notifyState sends a non-blocking signal that a UX-critical state change occurred (step start/stop, job result). +func (r *Reporter) notifyState() { + select { + case r.stateNotify <- struct{}{}: + default: + } +} + +// unlockAndNotify releases stateMu and sends channel notifications. +// Must be called with stateMu held. +func (r *Reporter) unlockAndNotify(urgentState bool) { + r.stateMu.Unlock() + r.notifyLog() + if urgentState { + r.notifyState() + } +} + +func (r *Reporter) Fire(entry *log.Entry) error { + urgentState := false + + r.stateMu.Lock() + + r.stateChanged = true + + if log.IsLevelEnabled(log.TraceLevel) { + log.WithFields(entry.Data).Trace(entry.Message) + } timestamp := entry.Time if r.state.StartedAt == nil { @@ -135,11 +184,13 @@ func (r *Reporter) Fire(entry *log.Entry) error { } } } + urgentState = true } } if !r.duringSteps() { r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry)) } + r.unlockAndNotify(urgentState) return nil } @@ -153,11 +204,13 @@ func (r *Reporter) Fire(entry *log.Entry) error { if !r.duringSteps() { r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry)) } + r.unlockAndNotify(false) return nil } if step.StartedAt == nil { step.StartedAt = timestamppb.New(timestamp) + urgentState = true } // Force reporting log errors as raw output to prevent silent failures @@ -185,26 +238,91 @@ func (r *Reporter) Fire(entry *log.Entry) error { } step.Result = stepResult step.StoppedAt = timestamppb.New(timestamp) + urgentState = true } } + r.unlockAndNotify(urgentState) return nil } func (r *Reporter) RunDaemon() { - r.stateMu.RLock() - closed := r.closed - r.stateMu.RUnlock() - if closed || r.ctx.Err() != nil { - // Acknowledge close - close(r.daemon) - return + go r.runDaemonLoop() +} + +func (r *Reporter) stopLatencyTimer(active *bool, timer *time.Timer) { + if *active { + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + *active = false } +} - _ = r.ReportLog(false) - _ = r.ReportState(false) +func (r *Reporter) runDaemonLoop() { + logTicker := time.NewTicker(r.logReportInterval) + stateTicker := time.NewTicker(r.stateReportInterval) - time.AfterFunc(time.Second, r.RunDaemon) + // maxLatencyTimer ensures the first buffered log row is sent within logReportMaxLatency. + // Start inactive — it is armed when the first log row arrives in an empty buffer. + maxLatencyTimer := time.NewTimer(0) + if !maxLatencyTimer.Stop() { + <-maxLatencyTimer.C + } + maxLatencyActive := false + + defer logTicker.Stop() + defer stateTicker.Stop() + defer maxLatencyTimer.Stop() + + for { + select { + case <-logTicker.C: + _ = r.ReportLog(false) + r.stopLatencyTimer(&maxLatencyActive, maxLatencyTimer) + + case <-stateTicker.C: + _ = r.ReportState(false) + + case <-r.logNotify: + r.stateMu.RLock() + n := len(r.logRows) + r.stateMu.RUnlock() + + if n >= r.logBatchSize { + _ = r.ReportLog(false) + r.stopLatencyTimer(&maxLatencyActive, maxLatencyTimer) + } else if !maxLatencyActive && n > 0 { + maxLatencyTimer.Reset(r.logReportMaxLatency) + maxLatencyActive = true + } + + case <-r.stateNotify: + // Step transition or job result — flush both immediately for frontend UX. + _ = r.ReportLog(false) + _ = r.ReportState(false) + r.stopLatencyTimer(&maxLatencyActive, maxLatencyTimer) + + case <-maxLatencyTimer.C: + maxLatencyActive = false + _ = r.ReportLog(false) + + case <-r.ctx.Done(): + close(r.daemon) + return + } + + r.stateMu.RLock() + closed := r.closed + r.stateMu.RUnlock() + if closed { + close(r.daemon) + return + } + } } func (r *Reporter) Logf(format string, a ...any) { @@ -268,6 +386,10 @@ func (r *Reporter) Close(lastWords string) error { }) } r.stateMu.Unlock() + + // Wake up the daemon loop so it detects closed promptly. + r.notifyLog() + // Wait for Acknowledge select { case <-r.daemon: @@ -295,6 +417,10 @@ func (r *Reporter) ReportLog(noMore bool) error { rows := r.logRows r.stateMu.RUnlock() + if !noMore && len(rows) == 0 { + return nil + } + resp, err := r.client.UpdateLog(r.ctx, connect.NewRequest(&runnerv1.UpdateLogRequest{ TaskId: r.state.Id, Index: int64(r.logOffset), @@ -329,15 +455,7 @@ func (r *Reporter) ReportState(reportResult bool) error { r.clientM.Lock() defer r.clientM.Unlock() - r.stateMu.RLock() - state := proto.Clone(r.state).(*runnerv1.TaskState) - r.stateMu.RUnlock() - - // Only report result from Close to reliable sent logs - if !reportResult { - state.Result = runnerv1.Result_RESULT_UNSPECIFIED - } - + // Build the outputs map first (single Range pass instead of two). outputs := make(map[string]string) r.outputs.Range(func(k, v any) bool { if val, ok := v.(string); ok { @@ -346,11 +464,29 @@ func (r *Reporter) ReportState(reportResult bool) error { return true }) + // Consume stateChanged atomically with the snapshot; restored on error + // below so a concurrent Fire() during UpdateTask isn't silently lost. + r.stateMu.Lock() + if !reportResult && !r.stateChanged && len(outputs) == 0 { + r.stateMu.Unlock() + return nil + } + state := proto.Clone(r.state).(*runnerv1.TaskState) + r.stateChanged = false + r.stateMu.Unlock() + + if !reportResult { + state.Result = runnerv1.Result_RESULT_UNSPECIFIED + } + resp, err := r.client.UpdateTask(r.ctx, connect.NewRequest(&runnerv1.UpdateTaskRequest{ State: state, Outputs: outputs, })) if err != nil { + r.stateMu.Lock() + r.stateChanged = true + r.stateMu.Unlock() return err } diff --git a/internal/pkg/report/reporter_test.go b/internal/pkg/report/reporter_test.go index f617e296..102064e0 100644 --- a/internal/pkg/report/reporter_test.go +++ b/internal/pkg/report/reporter_test.go @@ -6,8 +6,9 @@ package report import ( "context" "errors" + "fmt" "strings" - "sync" + "sync/atomic" "testing" "time" @@ -21,6 +22,7 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" "gitea.com/gitea/act_runner/internal/pkg/client/mocks" + "gitea.com/gitea/act_runner/internal/pkg/config" ) func TestReporter_parseLogRow(t *testing.T) { @@ -175,9 +177,10 @@ func TestReporter_Fire(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) taskCtx, err := structpb.NewStruct(map[string]any{}) require.NoError(t, err) + cfg, _ := config.LoadDefault("") reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{ Context: taskCtx, - }) + }, cfg) reporter.RunDaemon() defer func() { require.NoError(t, reporter.Close("")) @@ -252,7 +255,8 @@ func TestReporter_EphemeralRunnerDeletion(t *testing.T) { defer cancel() taskCtx, err := structpb.NewStruct(map[string]any{}) require.NoError(t, err) - reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}) + cfg, _ := config.LoadDefault("") + reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg) reporter.ResetSteps(1) // Fire a log entry to create pending data @@ -315,23 +319,281 @@ func TestReporter_RunDaemonClose_Race(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) taskCtx, err := structpb.NewStruct(map[string]any{}) require.NoError(t, err) + cfg, _ := config.LoadDefault("") reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{ Context: taskCtx, - }) + }, cfg) reporter.ResetSteps(1) - // Start the daemon loop in a separate goroutine. - // RunDaemon reads r.closed and reschedules itself via time.AfterFunc. - var wg sync.WaitGroup - wg.Go(func() { - reporter.RunDaemon() - }) + // Start the daemon loop — RunDaemon spawns a goroutine internally. + reporter.RunDaemon() - // Close concurrently — this races with RunDaemon on r.closed. + // Close concurrently — this races with the daemon goroutine on r.closed. require.NoError(t, reporter.Close("")) - // Cancel context so pending AfterFunc callbacks exit quickly. + // Cancel context so the daemon goroutine exits cleanly. cancel() - wg.Wait() - time.Sleep(2 * time.Second) +} + +// TestReporter_MaxLatencyTimer verifies that the maxLatencyTimer flushes a +// single buffered log row before the periodic logTicker fires. +// +// Setup: logReportInterval=10s (effectively never), maxLatency=100ms. +// Fire one log line, then assert UpdateLog is called within 500ms. +func TestReporter_MaxLatencyTimer(t *testing.T) { + var updateLogCalls atomic.Int64 + + client := mocks.NewClient(t) + client.On("UpdateLog", mock.Anything, mock.Anything).Return( + func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) { + updateLogCalls.Add(1) + return connect_go.NewResponse(&runnerv1.UpdateLogResponse{ + AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)), + }), nil + }, + ) + client.On("UpdateTask", mock.Anything, mock.Anything).Maybe().Return( + func(_ context.Context, _ *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) { + return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil + }, + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + taskCtx, err := structpb.NewStruct(map[string]any{}) + require.NoError(t, err) + + // Custom config: logTicker=10s (won't fire during test), maxLatency=100ms + cfg, _ := config.LoadDefault("") + cfg.Runner.LogReportInterval = 10 * time.Second + cfg.Runner.LogReportMaxLatency = 100 * time.Millisecond + cfg.Runner.LogReportBatchSize = 1000 // won't trigger batch flush + + reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg) + reporter.ResetSteps(1) + reporter.RunDaemon() + defer func() { + _ = reporter.Close("") + }() + + // Fire a single log line — not enough to trigger batch flush + require.NoError(t, reporter.Fire(&log.Entry{ + Message: "single log line", + Data: log.Fields{"stage": "Main", "stepNumber": 0, "raw_output": true}, + })) + + // maxLatencyTimer should flush within ~100ms. Wait up to 500ms. + assert.Eventually(t, func() bool { + return updateLogCalls.Load() > 0 + }, 500*time.Millisecond, 10*time.Millisecond, + "maxLatencyTimer should have flushed the log before logTicker (10s)") +} + +// TestReporter_BatchSizeFlush verifies that reaching logBatchSize triggers +// an immediate log flush without waiting for any timer. +func TestReporter_BatchSizeFlush(t *testing.T) { + var updateLogCalls atomic.Int64 + + client := mocks.NewClient(t) + client.On("UpdateLog", mock.Anything, mock.Anything).Return( + func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) { + updateLogCalls.Add(1) + return connect_go.NewResponse(&runnerv1.UpdateLogResponse{ + AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)), + }), nil + }, + ) + client.On("UpdateTask", mock.Anything, mock.Anything).Maybe().Return( + func(_ context.Context, _ *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) { + return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil + }, + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + taskCtx, err := structpb.NewStruct(map[string]any{}) + require.NoError(t, err) + + // Custom config: large timers, small batch size + cfg, _ := config.LoadDefault("") + cfg.Runner.LogReportInterval = 10 * time.Second + cfg.Runner.LogReportMaxLatency = 10 * time.Second + cfg.Runner.LogReportBatchSize = 5 + + reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg) + reporter.ResetSteps(1) + reporter.RunDaemon() + defer func() { + _ = reporter.Close("") + }() + + // Fire exactly batchSize log lines + for i := range 5 { + require.NoError(t, reporter.Fire(&log.Entry{ + Message: fmt.Sprintf("log line %d", i), + Data: log.Fields{"stage": "Main", "stepNumber": 0, "raw_output": true}, + })) + } + + // Batch threshold should trigger immediate flush + assert.Eventually(t, func() bool { + return updateLogCalls.Load() > 0 + }, 500*time.Millisecond, 10*time.Millisecond, + "batch size threshold should have triggered immediate flush") +} + +// TestReporter_StateChangedNotLostDuringReport asserts that a Fire() arriving +// mid-UpdateTask re-dirties the flag so the change is picked up by the next report. +func TestReporter_StateChangedNotLostDuringReport(t *testing.T) { + var updateTaskCalls atomic.Int64 + inFlight := make(chan struct{}) + release := make(chan struct{}) + + client := mocks.NewClient(t) + client.On("UpdateTask", mock.Anything, mock.Anything).Return( + func(_ context.Context, _ *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) { + n := updateTaskCalls.Add(1) + if n == 1 { + // Signal that the first UpdateTask is in flight, then block until released. + close(inFlight) + <-release + } + return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil + }, + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + taskCtx, err := structpb.NewStruct(map[string]any{}) + require.NoError(t, err) + cfg, _ := config.LoadDefault("") + reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg) + reporter.ResetSteps(2) + + // Mark stateChanged=true so the first ReportState proceeds to UpdateTask. + reporter.stateMu.Lock() + reporter.stateChanged = true + reporter.stateMu.Unlock() + + // Kick off the first ReportState in a goroutine — it will block in UpdateTask. + done := make(chan error, 1) + go func() { + done <- reporter.ReportState(false) + }() + + // Wait until UpdateTask is in flight (snapshot taken, flag consumed). + <-inFlight + + // Concurrent Fire() modifies state — must re-flip stateChanged so the + // change is not lost when the in-flight ReportState finishes. + require.NoError(t, reporter.Fire(&log.Entry{ + Message: "step starts", + Data: log.Fields{"stage": "Main", "stepNumber": 1, "raw_output": true}, + })) + + // Release the in-flight UpdateTask and wait for it to return. + close(release) + require.NoError(t, <-done) + + // stateChanged must still be true so the next ReportState picks up the + // concurrent Fire()'s change instead of skipping via the early-return path. + reporter.stateMu.RLock() + changed := reporter.stateChanged + reporter.stateMu.RUnlock() + assert.True(t, changed, "stateChanged must remain true after a concurrent Fire() during in-flight ReportState") + + // And the next ReportState must actually send a second UpdateTask. + require.NoError(t, reporter.ReportState(false)) + assert.Equal(t, int64(2), updateTaskCalls.Load(), "concurrent Fire() change must trigger a second UpdateTask, not be silently lost") +} + +// TestReporter_StateChangedRestoredOnError verifies that when UpdateTask fails, +// the dirty flag is restored so the snapshotted change isn't silently lost. +func TestReporter_StateChangedRestoredOnError(t *testing.T) { + var updateTaskCalls atomic.Int64 + + client := mocks.NewClient(t) + client.On("UpdateTask", mock.Anything, mock.Anything).Return( + func(_ context.Context, _ *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) { + n := updateTaskCalls.Add(1) + if n == 1 { + return nil, errors.New("transient network error") + } + return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil + }, + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + taskCtx, err := structpb.NewStruct(map[string]any{}) + require.NoError(t, err) + cfg, _ := config.LoadDefault("") + reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg) + reporter.ResetSteps(1) + + reporter.stateMu.Lock() + reporter.stateChanged = true + reporter.stateMu.Unlock() + + // First ReportState fails — flag must be restored to true. + require.Error(t, reporter.ReportState(false)) + + reporter.stateMu.RLock() + changed := reporter.stateChanged + reporter.stateMu.RUnlock() + assert.True(t, changed, "stateChanged must be restored to true after UpdateTask error so the change is retried") + + // The next ReportState should still issue a request because the flag was restored. + require.NoError(t, reporter.ReportState(false)) + assert.Equal(t, int64(2), updateTaskCalls.Load()) +} + +// TestReporter_StateNotifyFlush verifies that step transitions trigger +// an immediate state flush via the stateNotify channel. +func TestReporter_StateNotifyFlush(t *testing.T) { + var updateTaskCalls atomic.Int64 + + client := mocks.NewClient(t) + client.On("UpdateLog", mock.Anything, mock.Anything).Maybe().Return( + func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) { + return connect_go.NewResponse(&runnerv1.UpdateLogResponse{ + AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)), + }), nil + }, + ) + client.On("UpdateTask", mock.Anything, mock.Anything).Return( + func(_ context.Context, _ *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) { + updateTaskCalls.Add(1) + return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil + }, + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + taskCtx, err := structpb.NewStruct(map[string]any{}) + require.NoError(t, err) + + // Custom config: large state interval so only stateNotify can trigger + cfg, _ := config.LoadDefault("") + cfg.Runner.StateReportInterval = 10 * time.Second + cfg.Runner.LogReportInterval = 10 * time.Second + + reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg) + reporter.ResetSteps(1) + reporter.RunDaemon() + defer func() { + _ = reporter.Close("") + }() + + // Fire a log entry that starts a step — this triggers stateNotify + require.NoError(t, reporter.Fire(&log.Entry{ + Message: "step starting", + Data: log.Fields{"stage": "Main", "stepNumber": 0, "raw_output": true}, + })) + + // stateNotify should trigger immediate UpdateTask call + assert.Eventually(t, func() bool { + return updateTaskCalls.Load() > 0 + }, 500*time.Millisecond, 10*time.Millisecond, + "step transition should have triggered immediate state flush via stateNotify") }