mirror of
https://gitea.com/gitea/act_runner.git
synced 2026-04-24 04:40:22 +08:00
## What
Add an optional Prometheus `/metrics` HTTP endpoint to `act_runner` so operators can observe runner health, polling behavior, job outcomes, and RPC latency without scraping logs.
New surface:
- `internal/pkg/metrics/metrics.go` — metric definitions, custom `Registry`, static Go/process collectors, label constants, `ResultToStatusLabel` helper.
- `internal/pkg/metrics/server.go` — hardened `http.Server` serving `/metrics` and `/healthz` with Slowloris-safe timeouts (`ReadHeaderTimeout` 5s, `ReadTimeout`/`WriteTimeout` 10s, `IdleTimeout` 60s) and a 5s graceful shutdown.
- `daemon.go` wires it up behind `cfg.Metrics.Enabled` (disabled by default).
- `poller.go` / `reporter.go` / `runner.go` instrument their existing hot paths with counters/histograms/gauges — no behavior change.
Metrics exported (namespace `act_runner_`):
| Subsystem | Metric | Type | Labels |
|---|---|---|---|
| — | `info` | Gauge | `version`, `name` |
| — | `capacity`, `uptime_seconds` | Gauge | — |
| `poll` | `fetch_total`, `client_errors_total` | Counter | `result` / `method` |
| `poll` | `fetch_duration_seconds`, `backoff_seconds` | Histogram / Gauge | — |
| `job` | `total` | Counter | `status` |
| `job` | `duration_seconds`, `running`, `capacity_utilization_ratio` | Histogram / GaugeFunc | — |
| `report` | `log_total`, `state_total` | Counter | `result` |
| `report` | `log_duration_seconds`, `state_duration_seconds` | Histogram | — |
| `report` | `log_buffer_rows` | Gauge | — |
| — | `go_*`, `process_*` | standard collectors | — |
All label values are predefined constants — **no high-cardinality labels** (no task IDs, repo URLs, branches, tokens, or secrets) so scraping is safe and bounded.
## Why
Teams self-hosting Gitea + `act_runner` at scale need to answer basic SRE questions that are currently invisible:
- How often are RPCs failing? Which RPC? (`act_runner_client_errors_total`)
- Are runners saturated? (`act_runner_job_capacity_utilization_ratio`, `act_runner_job_running`)
- How long do jobs take? (`act_runner_job_duration_seconds`)
- Is polling backing off? (`act_runner_poll_backoff_seconds`, `act_runner_poll_fetch_total{result=\"error\"}`)
- Are log/state reports slow? (`act_runner_report_{log,state}_duration_seconds`)
- Is the log buffer draining? (`act_runner_report_log_buffer_rows`)
Today operators have to grep logs. This PR makes all of the above first-class metrics so they can feed dashboards and alerts (`rate(act_runner_client_errors_total[5m]) > 0.1`, capacity saturation alerts, etc.).
The endpoint is **disabled by default** and binds to `127.0.0.1:9101` when enabled, so it's opt-in and safe for existing deployments.
## How
### Config
```yaml
metrics:
enabled: false # opt-in
addr: 127.0.0.1:9101 # change to 0.0.0.0:9101 only behind a reverse proxy
```
`config.example.yaml` documents both fields plus a security note about binding externally without auth.
### Wiring
1. `daemon.go` calls `metrics.Init()` (guarded by `sync.Once`), sets `act_runner_info`, `act_runner_capacity`, registers uptime + running-jobs GaugeFuncs, then starts the server goroutine with the daemon context — it shuts down cleanly on `ctx.Done()`.
2. `poller.fetchTask` observes RPC latency / result / error counters. `DeadlineExceeded` (long-poll idle) is treated as an empty result and **not** observed into the histogram so the 5s timeout doesn't swamp the buckets.
3. `poller.pollOnce` reports `poll_backoff_seconds` using the pre-jitter base interval (the true backoff level), and only when it changes — prevents noisy no-op gauge updates at the `FetchIntervalMax` plateau.
4. `reporter.ReportLog` / `ReportState` record duration histograms and success/error counters; `log_buffer_rows` is updated only when the value changes, guarded by the already-held `clientM`.
5. `runner.Run` observes `job_duration_seconds` and increments `job_total` by outcome via `metrics.ResultToStatusLabel`.
### Safety / security review
- All timeouts set; Slowloris-safe.
- Custom `prometheus.NewRegistry()` — no global registration side-effects.
- No sensitive data in labels (reviewed every instrumentation site).
- Single new dependency: `github.com/prometheus/client_golang v1.23.2`.
- Endpoint is unauthenticated by design and documented as such; default localhost bind mitigates exposure. Operators exposing externally should front it with a reverse proxy.
## Verification
### Unit tests
\`\`\`bash
go build ./...
go vet ./...
go test ./...
\`\`\`
### Manual smoke test
1. Enable metrics in `config.yaml`:
\`\`\`yaml
metrics:
enabled: true
addr: 127.0.0.1:9101
\`\`\`
2. Start the runner against a Gitea instance: \`./act_runner daemon\`.
3. Scrape the endpoint:
\`\`\`bash
curl -s http://127.0.0.1:9101/metrics | grep '^act_runner_'
curl -s http://127.0.0.1:9101/healthz # → ok
\`\`\`
4. Confirm the static series appear immediately: \`act_runner_info\`, \`act_runner_capacity\`, \`act_runner_uptime_seconds\`, \`act_runner_job_running\`, \`act_runner_job_capacity_utilization_ratio\`.
5. Trigger a workflow and confirm counters increment: \`act_runner_poll_fetch_total{result=\"task\"}\`, \`act_runner_job_total{status=\"success\"}\`, \`act_runner_report_log_total{result=\"success\"}\`.
6. Leave the runner idle and confirm \`act_runner_poll_backoff_seconds\` settles (and does **not** churn on every poll).
7. Ctrl-C and confirm a clean \"metrics server shutdown\" log line (no port-in-use error on restart within 5s).
### Prometheus integration
Add to \`prometheus.yml\`:
\`\`\`yaml
scrape_configs:
- job_name: act_runner
static_configs:
- targets: ['127.0.0.1:9101']
\`\`\`
Sample alert to try:
\`\`\`
sum(rate(act_runner_client_errors_total[5m])) by (method) > 0.1
\`\`\`
## Out of scope (follow-ups)
- TLS and auth on the metrics endpoint (mitigated today by localhost default; add when operators need external scraping).
- Per-task labels (intentionally avoided for cardinality safety).
---
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Reviewed-on: https://gitea.com/gitea/act_runner/pulls/820
Reviewed-by: Lunny Xiao <xiaolunwen@gmail.com>
Co-authored-by: Bo-Yi Wu <appleboy.tw@gmail.com>
Co-committed-by: Bo-Yi Wu <appleboy.tw@gmail.com>
640 lines
16 KiB
Go
640 lines
16 KiB
Go
// Copyright 2022 The Gitea Authors. All rights reserved.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package report
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"regexp"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
|
|
"connectrpc.com/connect"
|
|
"github.com/avast/retry-go/v4"
|
|
log "github.com/sirupsen/logrus"
|
|
"google.golang.org/protobuf/proto"
|
|
"google.golang.org/protobuf/types/known/timestamppb"
|
|
|
|
"gitea.com/gitea/act_runner/internal/pkg/client"
|
|
"gitea.com/gitea/act_runner/internal/pkg/config"
|
|
"gitea.com/gitea/act_runner/internal/pkg/metrics"
|
|
)
|
|
|
|
type Reporter struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
|
|
closed bool
|
|
client client.Client
|
|
clientM sync.Mutex
|
|
|
|
logOffset int
|
|
logRows []*runnerv1.LogRow
|
|
logReplacer *strings.Replacer
|
|
oldnew []string
|
|
|
|
// lastLogBufferRows is the last value written to the ReportLogBufferRows
|
|
// gauge; guarded by clientM (the same lock held around each ReportLog call)
|
|
// so the gauge skips no-op Set calls when the buffer size is unchanged.
|
|
lastLogBufferRows int
|
|
|
|
state *runnerv1.TaskState
|
|
stateChanged bool
|
|
stateMu sync.RWMutex
|
|
outputs sync.Map
|
|
daemon chan struct{}
|
|
|
|
// Adaptive batching control
|
|
logReportInterval time.Duration
|
|
logReportMaxLatency time.Duration
|
|
logBatchSize int
|
|
stateReportInterval time.Duration
|
|
|
|
// Event notification channels (non-blocking, buffered 1)
|
|
logNotify chan struct{} // signal: new log rows arrived
|
|
stateNotify chan struct{} // signal: step transition (start/stop)
|
|
|
|
debugOutputEnabled bool
|
|
stopCommandEndToken string
|
|
}
|
|
|
|
func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.Client, task *runnerv1.Task, cfg *config.Config) *Reporter {
|
|
var oldnew []string
|
|
if v := task.Context.Fields["token"].GetStringValue(); v != "" {
|
|
oldnew = append(oldnew, v, "***")
|
|
}
|
|
if v := task.Context.Fields["gitea_runtime_token"].GetStringValue(); v != "" {
|
|
oldnew = append(oldnew, v, "***")
|
|
}
|
|
for _, v := range task.Secrets {
|
|
oldnew = append(oldnew, v, "***")
|
|
}
|
|
|
|
rv := &Reporter{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
client: client,
|
|
oldnew: oldnew,
|
|
logReplacer: strings.NewReplacer(oldnew...),
|
|
logReportInterval: cfg.Runner.LogReportInterval,
|
|
logReportMaxLatency: cfg.Runner.LogReportMaxLatency,
|
|
logBatchSize: cfg.Runner.LogReportBatchSize,
|
|
stateReportInterval: cfg.Runner.StateReportInterval,
|
|
logNotify: make(chan struct{}, 1),
|
|
stateNotify: make(chan struct{}, 1),
|
|
state: &runnerv1.TaskState{
|
|
Id: task.Id,
|
|
},
|
|
daemon: make(chan struct{}),
|
|
}
|
|
|
|
if task.Secrets["ACTIONS_STEP_DEBUG"] == "true" {
|
|
rv.debugOutputEnabled = true
|
|
}
|
|
|
|
return rv
|
|
}
|
|
|
|
// Result returns the final job result. Safe to call after Close() returns.
|
|
func (r *Reporter) Result() runnerv1.Result {
|
|
r.stateMu.RLock()
|
|
defer r.stateMu.RUnlock()
|
|
return r.state.Result
|
|
}
|
|
|
|
func (r *Reporter) ResetSteps(l int) {
|
|
r.stateMu.Lock()
|
|
defer r.stateMu.Unlock()
|
|
for i := range l {
|
|
r.state.Steps = append(r.state.Steps, &runnerv1.StepState{
|
|
Id: int64(i),
|
|
})
|
|
}
|
|
}
|
|
|
|
func (r *Reporter) Levels() []log.Level {
|
|
return log.AllLevels
|
|
}
|
|
|
|
func appendIfNotNil[T any](s []*T, v *T) []*T {
|
|
if v != nil {
|
|
return append(s, v)
|
|
}
|
|
return s
|
|
}
|
|
|
|
// isJobStepEntry is used to not report composite step results incorrectly as step result
|
|
// returns true if the logentry is on job level
|
|
// returns false for composite action step messages
|
|
func isJobStepEntry(entry *log.Entry) bool {
|
|
if v, ok := entry.Data["stepID"]; ok {
|
|
if v, ok := v.([]string); ok && len(v) > 1 {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// notifyLog sends a non-blocking signal that new log rows are available.
|
|
func (r *Reporter) notifyLog() {
|
|
select {
|
|
case r.logNotify <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// notifyState sends a non-blocking signal that a UX-critical state change occurred (step start/stop, job result).
|
|
func (r *Reporter) notifyState() {
|
|
select {
|
|
case r.stateNotify <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// unlockAndNotify releases stateMu and sends channel notifications.
|
|
// Must be called with stateMu held.
|
|
func (r *Reporter) unlockAndNotify(urgentState bool) {
|
|
r.stateMu.Unlock()
|
|
r.notifyLog()
|
|
if urgentState {
|
|
r.notifyState()
|
|
}
|
|
}
|
|
|
|
func (r *Reporter) Fire(entry *log.Entry) error {
|
|
urgentState := false
|
|
|
|
r.stateMu.Lock()
|
|
|
|
r.stateChanged = true
|
|
|
|
if log.IsLevelEnabled(log.TraceLevel) {
|
|
log.WithFields(entry.Data).Trace(entry.Message)
|
|
}
|
|
|
|
timestamp := entry.Time
|
|
if r.state.StartedAt == nil {
|
|
r.state.StartedAt = timestamppb.New(timestamp)
|
|
}
|
|
|
|
stage := entry.Data["stage"]
|
|
|
|
if stage != "Main" {
|
|
if v, ok := entry.Data["jobResult"]; ok {
|
|
if jobResult, ok := r.parseResult(v); ok {
|
|
// We need to ensure log upload before this upload
|
|
r.state.Result = jobResult
|
|
r.state.StoppedAt = timestamppb.New(timestamp)
|
|
for _, s := range r.state.Steps {
|
|
if s.Result == runnerv1.Result_RESULT_UNSPECIFIED {
|
|
s.Result = runnerv1.Result_RESULT_CANCELLED
|
|
if jobResult == runnerv1.Result_RESULT_SKIPPED {
|
|
s.Result = runnerv1.Result_RESULT_SKIPPED
|
|
}
|
|
}
|
|
}
|
|
urgentState = true
|
|
}
|
|
}
|
|
if !r.duringSteps() {
|
|
r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry))
|
|
}
|
|
r.unlockAndNotify(urgentState)
|
|
return nil
|
|
}
|
|
|
|
var step *runnerv1.StepState
|
|
if v, ok := entry.Data["stepNumber"]; ok {
|
|
if v, ok := v.(int); ok && len(r.state.Steps) > v {
|
|
step = r.state.Steps[v]
|
|
}
|
|
}
|
|
if step == nil {
|
|
if !r.duringSteps() {
|
|
r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry))
|
|
}
|
|
r.unlockAndNotify(false)
|
|
return nil
|
|
}
|
|
|
|
if step.StartedAt == nil {
|
|
step.StartedAt = timestamppb.New(timestamp)
|
|
urgentState = true
|
|
}
|
|
|
|
// Force reporting log errors as raw output to prevent silent failures
|
|
if entry.Level == log.ErrorLevel {
|
|
entry.Data["raw_output"] = true
|
|
}
|
|
|
|
if v, ok := entry.Data["raw_output"]; ok {
|
|
if rawOutput, ok := v.(bool); ok && rawOutput {
|
|
if row := r.parseLogRow(entry); row != nil {
|
|
if step.LogLength == 0 {
|
|
step.LogIndex = int64(r.logOffset + len(r.logRows))
|
|
}
|
|
step.LogLength++
|
|
r.logRows = append(r.logRows, row)
|
|
}
|
|
}
|
|
} else if !r.duringSteps() {
|
|
r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry))
|
|
}
|
|
if v, ok := entry.Data["stepResult"]; ok && isJobStepEntry(entry) {
|
|
if stepResult, ok := r.parseResult(v); ok {
|
|
if step.LogLength == 0 {
|
|
step.LogIndex = int64(r.logOffset + len(r.logRows))
|
|
}
|
|
step.Result = stepResult
|
|
step.StoppedAt = timestamppb.New(timestamp)
|
|
urgentState = true
|
|
}
|
|
}
|
|
|
|
r.unlockAndNotify(urgentState)
|
|
return nil
|
|
}
|
|
|
|
func (r *Reporter) RunDaemon() {
|
|
go r.runDaemonLoop()
|
|
}
|
|
|
|
func (r *Reporter) stopLatencyTimer(active *bool, timer *time.Timer) {
|
|
if *active {
|
|
if !timer.Stop() {
|
|
select {
|
|
case <-timer.C:
|
|
default:
|
|
}
|
|
}
|
|
*active = false
|
|
}
|
|
}
|
|
|
|
func (r *Reporter) runDaemonLoop() {
|
|
logTicker := time.NewTicker(r.logReportInterval)
|
|
stateTicker := time.NewTicker(r.stateReportInterval)
|
|
|
|
// maxLatencyTimer ensures the first buffered log row is sent within logReportMaxLatency.
|
|
// Start inactive — it is armed when the first log row arrives in an empty buffer.
|
|
maxLatencyTimer := time.NewTimer(0)
|
|
if !maxLatencyTimer.Stop() {
|
|
<-maxLatencyTimer.C
|
|
}
|
|
maxLatencyActive := false
|
|
|
|
defer logTicker.Stop()
|
|
defer stateTicker.Stop()
|
|
defer maxLatencyTimer.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-logTicker.C:
|
|
_ = r.ReportLog(false)
|
|
r.stopLatencyTimer(&maxLatencyActive, maxLatencyTimer)
|
|
|
|
case <-stateTicker.C:
|
|
_ = r.ReportState(false)
|
|
|
|
case <-r.logNotify:
|
|
r.stateMu.RLock()
|
|
n := len(r.logRows)
|
|
r.stateMu.RUnlock()
|
|
|
|
if n >= r.logBatchSize {
|
|
_ = r.ReportLog(false)
|
|
r.stopLatencyTimer(&maxLatencyActive, maxLatencyTimer)
|
|
} else if !maxLatencyActive && n > 0 {
|
|
maxLatencyTimer.Reset(r.logReportMaxLatency)
|
|
maxLatencyActive = true
|
|
}
|
|
|
|
case <-r.stateNotify:
|
|
// Step transition or job result — flush both immediately for frontend UX.
|
|
_ = r.ReportLog(false)
|
|
_ = r.ReportState(false)
|
|
r.stopLatencyTimer(&maxLatencyActive, maxLatencyTimer)
|
|
|
|
case <-maxLatencyTimer.C:
|
|
maxLatencyActive = false
|
|
_ = r.ReportLog(false)
|
|
|
|
case <-r.ctx.Done():
|
|
close(r.daemon)
|
|
return
|
|
}
|
|
|
|
r.stateMu.RLock()
|
|
closed := r.closed
|
|
r.stateMu.RUnlock()
|
|
if closed {
|
|
close(r.daemon)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *Reporter) Logf(format string, a ...any) {
|
|
r.stateMu.Lock()
|
|
defer r.stateMu.Unlock()
|
|
|
|
r.logf(format, a...)
|
|
}
|
|
|
|
func (r *Reporter) logf(format string, a ...any) {
|
|
if !r.duringSteps() {
|
|
r.logRows = append(r.logRows, &runnerv1.LogRow{
|
|
Time: timestamppb.Now(),
|
|
Content: fmt.Sprintf(format, a...),
|
|
})
|
|
}
|
|
}
|
|
|
|
func (r *Reporter) SetOutputs(outputs map[string]string) {
|
|
r.stateMu.Lock()
|
|
defer r.stateMu.Unlock()
|
|
|
|
for k, v := range outputs {
|
|
if len(k) > 255 {
|
|
r.logf("ignore output because the key is too long: %q", k)
|
|
continue
|
|
}
|
|
if l := len(v); l > 1024*1024 {
|
|
log.Println("ignore output because the value is too long:", k, l)
|
|
r.logf("ignore output because the value %q is too long: %d", k, l)
|
|
}
|
|
if _, ok := r.outputs.Load(k); ok {
|
|
continue
|
|
}
|
|
r.outputs.Store(k, v)
|
|
}
|
|
}
|
|
|
|
func (r *Reporter) Close(lastWords string) error {
|
|
r.stateMu.Lock()
|
|
r.closed = true
|
|
if r.state.Result == runnerv1.Result_RESULT_UNSPECIFIED {
|
|
if lastWords == "" {
|
|
lastWords = "Early termination"
|
|
}
|
|
for _, v := range r.state.Steps {
|
|
if v.Result == runnerv1.Result_RESULT_UNSPECIFIED {
|
|
v.Result = runnerv1.Result_RESULT_CANCELLED
|
|
}
|
|
}
|
|
r.state.Result = runnerv1.Result_RESULT_FAILURE
|
|
r.logRows = append(r.logRows, &runnerv1.LogRow{
|
|
Time: timestamppb.Now(),
|
|
Content: lastWords,
|
|
})
|
|
r.state.StoppedAt = timestamppb.Now()
|
|
} else if lastWords != "" {
|
|
r.logRows = append(r.logRows, &runnerv1.LogRow{
|
|
Time: timestamppb.Now(),
|
|
Content: lastWords,
|
|
})
|
|
}
|
|
r.stateMu.Unlock()
|
|
|
|
// Wake up the daemon loop so it detects closed promptly.
|
|
r.notifyLog()
|
|
|
|
// Wait for Acknowledge
|
|
select {
|
|
case <-r.daemon:
|
|
case <-time.After(60 * time.Second):
|
|
close(r.daemon)
|
|
log.Error("No Response from RunDaemon for 60s, continue best effort")
|
|
}
|
|
|
|
// Report the job outcome even when all log upload retry attempts have been exhausted
|
|
return errors.Join(
|
|
retry.Do(func() error {
|
|
return r.ReportLog(true)
|
|
}, retry.Context(r.ctx)),
|
|
retry.Do(func() error {
|
|
return r.ReportState(true)
|
|
}, retry.Context(r.ctx)),
|
|
)
|
|
}
|
|
|
|
func (r *Reporter) ReportLog(noMore bool) error {
|
|
r.clientM.Lock()
|
|
defer r.clientM.Unlock()
|
|
|
|
r.stateMu.RLock()
|
|
rows := r.logRows
|
|
r.stateMu.RUnlock()
|
|
|
|
if !noMore && len(rows) == 0 {
|
|
return nil
|
|
}
|
|
|
|
start := time.Now()
|
|
resp, err := r.client.UpdateLog(r.ctx, connect.NewRequest(&runnerv1.UpdateLogRequest{
|
|
TaskId: r.state.Id,
|
|
Index: int64(r.logOffset),
|
|
Rows: rows,
|
|
NoMore: noMore,
|
|
}))
|
|
metrics.ReportLogDuration.Observe(time.Since(start).Seconds())
|
|
if err != nil {
|
|
metrics.ReportLogTotal.WithLabelValues(metrics.LabelResultError).Inc()
|
|
metrics.ClientErrors.WithLabelValues(metrics.LabelMethodUpdateLog).Inc()
|
|
return err
|
|
}
|
|
metrics.ReportLogTotal.WithLabelValues(metrics.LabelResultSuccess).Inc()
|
|
|
|
ack := int(resp.Msg.AckIndex)
|
|
if ack < r.logOffset {
|
|
return errors.New("submitted logs are lost")
|
|
}
|
|
|
|
r.stateMu.Lock()
|
|
r.logRows = r.logRows[ack-r.logOffset:]
|
|
submitted := r.logOffset + len(rows)
|
|
r.logOffset = ack
|
|
remaining := len(r.logRows)
|
|
r.stateMu.Unlock()
|
|
if remaining != r.lastLogBufferRows {
|
|
metrics.ReportLogBufferRows.Set(float64(remaining))
|
|
r.lastLogBufferRows = remaining
|
|
}
|
|
|
|
if noMore && ack < submitted {
|
|
return errors.New("not all logs are submitted")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ReportState only reports the job result if reportResult is true
|
|
// RunDaemon never reports results even if result is set
|
|
func (r *Reporter) ReportState(reportResult bool) error {
|
|
r.clientM.Lock()
|
|
defer r.clientM.Unlock()
|
|
|
|
// Build the outputs map first (single Range pass instead of two).
|
|
outputs := make(map[string]string)
|
|
r.outputs.Range(func(k, v any) bool {
|
|
if val, ok := v.(string); ok {
|
|
outputs[k.(string)] = val
|
|
}
|
|
return true
|
|
})
|
|
|
|
// Consume stateChanged atomically with the snapshot; restored on error
|
|
// below so a concurrent Fire() during UpdateTask isn't silently lost.
|
|
r.stateMu.Lock()
|
|
if !reportResult && !r.stateChanged && len(outputs) == 0 {
|
|
r.stateMu.Unlock()
|
|
return nil
|
|
}
|
|
state := proto.Clone(r.state).(*runnerv1.TaskState)
|
|
r.stateChanged = false
|
|
r.stateMu.Unlock()
|
|
|
|
if !reportResult {
|
|
state.Result = runnerv1.Result_RESULT_UNSPECIFIED
|
|
}
|
|
|
|
start := time.Now()
|
|
resp, err := r.client.UpdateTask(r.ctx, connect.NewRequest(&runnerv1.UpdateTaskRequest{
|
|
State: state,
|
|
Outputs: outputs,
|
|
}))
|
|
metrics.ReportStateDuration.Observe(time.Since(start).Seconds())
|
|
if err != nil {
|
|
metrics.ReportStateTotal.WithLabelValues(metrics.LabelResultError).Inc()
|
|
metrics.ClientErrors.WithLabelValues(metrics.LabelMethodUpdateTask).Inc()
|
|
r.stateMu.Lock()
|
|
r.stateChanged = true
|
|
r.stateMu.Unlock()
|
|
return err
|
|
}
|
|
metrics.ReportStateTotal.WithLabelValues(metrics.LabelResultSuccess).Inc()
|
|
|
|
for _, k := range resp.Msg.SentOutputs {
|
|
r.outputs.Store(k, struct{}{})
|
|
}
|
|
|
|
if resp.Msg.State != nil && resp.Msg.State.Result == runnerv1.Result_RESULT_CANCELLED {
|
|
r.cancel()
|
|
}
|
|
|
|
var noSent []string
|
|
r.outputs.Range(func(k, v any) bool {
|
|
if _, ok := v.(string); ok {
|
|
noSent = append(noSent, k.(string))
|
|
}
|
|
return true
|
|
})
|
|
if len(noSent) > 0 {
|
|
return fmt.Errorf("there are still outputs that have not been sent: %v", noSent)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *Reporter) duringSteps() bool {
|
|
if steps := r.state.Steps; len(steps) == 0 {
|
|
return false
|
|
} else if first := steps[0]; first.Result == runnerv1.Result_RESULT_UNSPECIFIED && first.LogLength == 0 {
|
|
return false
|
|
} else if last := steps[len(steps)-1]; last.Result != runnerv1.Result_RESULT_UNSPECIFIED {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
var stringToResult = map[string]runnerv1.Result{
|
|
"success": runnerv1.Result_RESULT_SUCCESS,
|
|
"failure": runnerv1.Result_RESULT_FAILURE,
|
|
"skipped": runnerv1.Result_RESULT_SKIPPED,
|
|
"cancelled": runnerv1.Result_RESULT_CANCELLED,
|
|
}
|
|
|
|
func (r *Reporter) parseResult(result any) (runnerv1.Result, bool) {
|
|
str := ""
|
|
if v, ok := result.(string); ok { // for jobResult
|
|
str = v
|
|
} else if v, ok := result.(fmt.Stringer); ok { // for stepResult
|
|
str = v.String()
|
|
}
|
|
|
|
ret, ok := stringToResult[str]
|
|
return ret, ok
|
|
}
|
|
|
|
var cmdRegex = regexp.MustCompile(`^::([^ :]+)( .*)?::(.*)$`)
|
|
|
|
func (r *Reporter) handleCommand(originalContent, command, value string) *string {
|
|
if r.stopCommandEndToken != "" && command != r.stopCommandEndToken {
|
|
return &originalContent
|
|
}
|
|
|
|
switch command {
|
|
case "add-mask":
|
|
r.addMask(value)
|
|
return nil
|
|
case "debug":
|
|
if r.debugOutputEnabled {
|
|
return &value
|
|
}
|
|
return nil
|
|
|
|
case "notice":
|
|
// Not implemented yet, so just return the original content.
|
|
return &originalContent
|
|
case "warning":
|
|
// Not implemented yet, so just return the original content.
|
|
return &originalContent
|
|
case "error":
|
|
// Not implemented yet, so just return the original content.
|
|
return &originalContent
|
|
case "group":
|
|
// Returning the original content, because I think the frontend
|
|
// will use it when rendering the output.
|
|
return &originalContent
|
|
case "endgroup":
|
|
// Ditto
|
|
return &originalContent
|
|
case "stop-commands":
|
|
r.stopCommandEndToken = value
|
|
return nil
|
|
case r.stopCommandEndToken:
|
|
r.stopCommandEndToken = ""
|
|
return nil
|
|
}
|
|
return &originalContent
|
|
}
|
|
|
|
func (r *Reporter) parseLogRow(entry *log.Entry) *runnerv1.LogRow {
|
|
content := strings.TrimRightFunc(entry.Message, func(r rune) bool { return r == '\r' || r == '\n' })
|
|
|
|
matches := cmdRegex.FindStringSubmatch(content)
|
|
if matches != nil {
|
|
if output := r.handleCommand(content, matches[1], matches[3]); output != nil {
|
|
content = *output
|
|
} else {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
content = r.logReplacer.Replace(content)
|
|
|
|
return &runnerv1.LogRow{
|
|
Time: timestamppb.New(entry.Time),
|
|
Content: strings.ToValidUTF8(content, "?"),
|
|
}
|
|
}
|
|
|
|
func (r *Reporter) addMask(msg string) {
|
|
r.oldnew = append(r.oldnew, msg, "***")
|
|
r.logReplacer = strings.NewReplacer(r.oldnew...)
|
|
}
|