diff --git a/go.mod b/go.mod index 6256b528..c22d6930 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,8 @@ require ( gotest.tools/v3 v3.5.2 ) +require github.com/prometheus/client_golang v1.23.2 + require ( cyphar.com/go-pathrs v0.2.3 // indirect dario.cat/mergo v1.0.2 // indirect @@ -29,6 +31,7 @@ require ( github.com/Masterminds/semver v1.5.0 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/ProtonMail/go-crypto v1.3.0 // indirect + github.com/beorn7/perks v1.0.1 // indirect github.com/bmatcuk/doublestar/v4 v4.10.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/clipperhouse/uax29/v2 v2.7.0 // indirect @@ -75,12 +78,16 @@ require ( github.com/moby/sys/user v0.4.0 // indirect github.com/moby/sys/userns v0.1.0 // indirect github.com/moby/term v0.5.2 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.1 // indirect github.com/opencontainers/selinux v1.13.1 // indirect github.com/pjbgf/sha1cd v0.5.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.66.1 // indirect + github.com/prometheus/procfs v0.16.1 // indirect github.com/rhysd/actionlint v1.7.11 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect github.com/sergi/go-diff v1.4.0 // indirect @@ -98,6 +105,7 @@ require ( go.opentelemetry.io/otel v1.40.0 // indirect go.opentelemetry.io/otel/metric v1.40.0 // indirect go.opentelemetry.io/otel/trace v1.40.0 // indirect + go.yaml.in/yaml/v2 v2.4.2 // indirect golang.org/x/crypto v0.48.0 // indirect golang.org/x/net v0.50.0 // indirect golang.org/x/sync v0.19.0 // indirect diff --git a/go.sum b/go.sum index 76984d61..c047481b 100644 --- a/go.sum +++ b/go.sum @@ -29,6 +29,8 @@ github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPd github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/avast/retry-go/v4 v4.7.0 h1:yjDs35SlGvKwRNSykujfjdMxMhMQQM0TnIjJaHB+Zio= github.com/avast/retry-go/v4 v4.7.0/go.mod h1:ZMPDa3sY2bKgpLtap9JRUgk2yTAba7cgiFhqxY2Sg6Q= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bmatcuk/doublestar/v4 v4.10.0 h1:zU9WiOla1YA122oLM6i4EXvGW62DvKZVxIe6TYWexEs= github.com/bmatcuk/doublestar/v4 v4.10.0/go.mod h1:xBQ8jztBU6kakFMg+8WGxn0c6z1fTSPVIjEY1Wr7jzc= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= @@ -129,6 +131,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= @@ -153,6 +157,8 @@ github.com/moby/term v0.5.2 h1:6qk3FJAFDs6i/q3W/pQ97SX192qKfZgGjCQqfCJkgzQ= github.com/moby/term v0.5.2/go.mod h1:d3djjFCrjnB+fl8NJux+EJzu0msscUP+f8it8hPkFLc= github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/onsi/gomega v1.34.1 h1:EUMJIKUjM8sKjYbtxQI9A4z2o+rruxnzNvpknOXie6k= github.com/onsi/gomega v1.34.1/go.mod h1:kU1QgUvBDLXBJq618Xvm2LUX6rSAfRaFRTcdOeDLwwY= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= @@ -167,6 +173,14 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= +github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs= +github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= +github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= +github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= github.com/rhysd/actionlint v1.7.11 h1:m+aSuCpCIClS8X02xMG4Z8s87fCHPsAtYkAoWGQZgEE= github.com/rhysd/actionlint v1.7.11/go.mod h1:8n50YougV9+50niD7oxgDTZ1KbN/ZnKiQ2xpLFeVhsI= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= @@ -237,6 +251,10 @@ go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZY go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA= go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= +go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= go.yaml.in/yaml/v4 v4.0.0-rc.3 h1:3h1fjsh1CTAPjW7q/EMe+C8shx5d8ctzZTrLcs/j8Go= go.yaml.in/yaml/v4 v4.0.0-rc.3/go.mod h1:aZqd9kCMsGL7AuUv/m/PvWLdg5sjJsZ4oHDEnfPPfY0= diff --git a/internal/app/cmd/daemon.go b/internal/app/cmd/daemon.go index b63d9557..fb7ab41f 100644 --- a/internal/app/cmd/daemon.go +++ b/internal/app/cmd/daemon.go @@ -27,6 +27,7 @@ import ( "gitea.com/gitea/act_runner/internal/pkg/config" "gitea.com/gitea/act_runner/internal/pkg/envcheck" "gitea.com/gitea/act_runner/internal/pkg/labels" + "gitea.com/gitea/act_runner/internal/pkg/metrics" "gitea.com/gitea/act_runner/internal/pkg/ver" ) @@ -149,6 +150,15 @@ func runDaemon(ctx context.Context, daemArgs *daemonArgs, configFile *string) fu resp.Msg.Runner.Name, resp.Msg.Runner.Version, resp.Msg.Runner.Labels) } + if cfg.Metrics.Enabled { + metrics.Init() + metrics.RunnerInfo.WithLabelValues(ver.Version(), resp.Msg.Runner.Name).Set(1) + metrics.RunnerCapacity.Set(float64(cfg.Runner.Capacity)) + metrics.RegisterUptimeFunc(time.Now()) + metrics.RegisterRunningJobsFunc(runner.RunningCount, cfg.Runner.Capacity) + metrics.StartServer(ctx, cfg.Metrics.Addr) + } + poller := poll.New(cfg, cli, runner) if daemArgs.Once || reg.Ephemeral { diff --git a/internal/app/poll/poller.go b/internal/app/poll/poller.go index 0dc8a4c3..eaf6dac1 100644 --- a/internal/app/poll/poller.go +++ b/internal/app/poll/poller.go @@ -19,6 +19,7 @@ import ( "gitea.com/gitea/act_runner/internal/app/run" "gitea.com/gitea/act_runner/internal/pkg/client" "gitea.com/gitea/act_runner/internal/pkg/config" + "gitea.com/gitea/act_runner/internal/pkg/metrics" ) type Poller struct { @@ -43,6 +44,10 @@ type Poller struct { type workerState struct { consecutiveEmpty int64 consecutiveErrors int64 + // lastBackoff is the last interval reported to the PollBackoffSeconds gauge + // from this worker; used to suppress redundant no-op Set calls when the + // backoff plateaus (e.g. at FetchIntervalMax). + lastBackoff time.Duration } func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller { @@ -166,8 +171,12 @@ func (p *Poller) pollOnce(s *workerState) { for { task, ok := p.fetchTask(p.pollingCtx, s) if !ok { - interval := addJitter(p.calculateInterval(s)) - timer := time.NewTimer(interval) + base := p.calculateInterval(s) + if base != s.lastBackoff { + metrics.PollBackoffSeconds.Set(base.Seconds()) + s.lastBackoff = base + } + timer := time.NewTimer(addJitter(base)) select { case <-timer.C: case <-p.pollingCtx.Done(): @@ -205,15 +214,27 @@ func (p *Poller) fetchTask(ctx context.Context, s *workerState) (*runnerv1.Task, // Load the version value that was in the cache when the request was sent. v := p.tasksVersion.Load() + start := time.Now() resp, err := p.client.FetchTask(reqCtx, connect.NewRequest(&runnerv1.FetchTaskRequest{ TasksVersion: v, })) + + // DeadlineExceeded is the designed idle path for a long-poll: the server + // found no work within FetchTimeout. Treat it as an empty response and do + // not record the duration — the timeout value would swamp the histogram. if errors.Is(err, context.DeadlineExceeded) { - err = nil + s.consecutiveEmpty++ + s.consecutiveErrors = 0 // timeout is a healthy idle response + metrics.PollFetchTotal.WithLabelValues(metrics.LabelResultEmpty).Inc() + return nil, false } + metrics.PollFetchDuration.Observe(time.Since(start).Seconds()) + if err != nil { log.WithError(err).Error("failed to fetch task") s.consecutiveErrors++ + metrics.PollFetchTotal.WithLabelValues(metrics.LabelResultError).Inc() + metrics.ClientErrors.WithLabelValues(metrics.LabelMethodFetchTask).Inc() return nil, false } @@ -222,6 +243,7 @@ func (p *Poller) fetchTask(ctx context.Context, s *workerState) (*runnerv1.Task, if resp == nil || resp.Msg == nil { s.consecutiveEmpty++ + metrics.PollFetchTotal.WithLabelValues(metrics.LabelResultEmpty).Inc() return nil, false } @@ -231,11 +253,13 @@ func (p *Poller) fetchTask(ctx context.Context, s *workerState) (*runnerv1.Task, if resp.Msg.Task == nil { s.consecutiveEmpty++ + metrics.PollFetchTotal.WithLabelValues(metrics.LabelResultEmpty).Inc() return nil, false } - // got a task, set `tasksVersion` to zero to focre query db in next request. + // got a task, set `tasksVersion` to zero to force query db in next request. p.tasksVersion.CompareAndSwap(resp.Msg.TasksVersion, 0) + metrics.PollFetchTotal.WithLabelValues(metrics.LabelResultTask).Inc() return resp.Msg.Task, true } diff --git a/internal/app/run/runner.go b/internal/app/run/runner.go index db692a55..05158cca 100644 --- a/internal/app/run/runner.go +++ b/internal/app/run/runner.go @@ -12,6 +12,7 @@ import ( "path/filepath" "strings" "sync" + "sync/atomic" "time" runnerv1 "code.gitea.io/actions-proto-go/runner/v1" @@ -26,6 +27,7 @@ import ( "gitea.com/gitea/act_runner/internal/pkg/client" "gitea.com/gitea/act_runner/internal/pkg/config" "gitea.com/gitea/act_runner/internal/pkg/labels" + "gitea.com/gitea/act_runner/internal/pkg/metrics" "gitea.com/gitea/act_runner/internal/pkg/report" "gitea.com/gitea/act_runner/internal/pkg/ver" ) @@ -41,6 +43,7 @@ type Runner struct { envs map[string]string runningTasks sync.Map + runningCount atomic.Int64 } func NewRunner(cfg *config.Config, reg *config.Registration, cli client.Client) *Runner { @@ -96,16 +99,25 @@ func (r *Runner) Run(ctx context.Context, task *runnerv1.Task) error { r.runningTasks.Store(task.Id, struct{}{}) defer r.runningTasks.Delete(task.Id) + r.runningCount.Add(1) + + start := time.Now() + ctx, cancel := context.WithTimeout(ctx, r.cfg.Runner.Timeout) defer cancel() reporter := report.NewReporter(ctx, cancel, r.client, task, r.cfg) var runErr error defer func() { + r.runningCount.Add(-1) + lastWords := "" if runErr != nil { lastWords = runErr.Error() } _ = reporter.Close(lastWords) + + metrics.JobDuration.Observe(time.Since(start).Seconds()) + metrics.JobsTotal.WithLabelValues(metrics.ResultToStatusLabel(reporter.Result())).Inc() }() reporter.RunDaemon() runErr = r.run(ctx, task, reporter) @@ -266,6 +278,10 @@ func (r *Runner) run(ctx context.Context, task *runnerv1.Task, reporter *report. return execErr } +func (r *Runner) RunningCount() int64 { + return r.runningCount.Load() +} + func (r *Runner) Declare(ctx context.Context, labels []string) (*connect.Response[runnerv1.DeclareResponse], error) { return r.client.Declare(ctx, connect.NewRequest(&runnerv1.DeclareRequest{ Version: ver.Version(), diff --git a/internal/pkg/config/config.example.yaml b/internal/pkg/config/config.example.yaml index 2d4b9d98..14da4c73 100644 --- a/internal/pkg/config/config.example.yaml +++ b/internal/pkg/config/config.example.yaml @@ -132,3 +132,12 @@ host: # The parent directory of a job's working directory. # If it's empty, $HOME/.cache/act/ will be used. workdir_parent: + +metrics: + # Enable the Prometheus metrics endpoint. + # When enabled, metrics are served at http:///metrics and a liveness check at /healthz. + enabled: false + # The address for the metrics HTTP server to listen on. + # Defaults to localhost only. Set to ":9101" to allow external access, + # but ensure the port is firewall-protected as there is no authentication. + addr: "127.0.0.1:9101" diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go index 814634c8..eed587c9 100644 --- a/internal/pkg/config/config.go +++ b/internal/pkg/config/config.go @@ -70,6 +70,12 @@ type Host struct { WorkdirParent string `yaml:"workdir_parent"` // WorkdirParent specifies the parent directory for the host's working directory. } +// Metrics represents the configuration for the Prometheus metrics endpoint. +type Metrics struct { + Enabled bool `yaml:"enabled"` // Enabled indicates whether the metrics endpoint is exposed. + Addr string `yaml:"addr"` // Addr specifies the listen address for the metrics HTTP server (e.g., ":9101"). +} + // Config represents the overall configuration. type Config struct { Log Log `yaml:"log"` // Log represents the configuration for logging. @@ -77,6 +83,7 @@ type Config struct { Cache Cache `yaml:"cache"` // Cache represents the configuration for caching. Container Container `yaml:"container"` // Container represents the configuration for the container. Host Host `yaml:"host"` // Host represents the configuration for the host. + Metrics Metrics `yaml:"metrics"` // Metrics represents the configuration for the Prometheus metrics endpoint. } // LoadDefault returns the default configuration. @@ -157,6 +164,9 @@ func LoadDefault(file string) (*Config, error) { if cfg.Runner.StateReportInterval <= 0 { cfg.Runner.StateReportInterval = 5 * time.Second } + if cfg.Metrics.Addr == "" { + cfg.Metrics.Addr = "127.0.0.1:9101" + } // Validate and fix invalid config combinations to prevent confusing behavior. if cfg.Runner.FetchIntervalMax < cfg.Runner.FetchInterval { diff --git a/internal/pkg/metrics/metrics.go b/internal/pkg/metrics/metrics.go new file mode 100644 index 00000000..968ed9b8 --- /dev/null +++ b/internal/pkg/metrics/metrics.go @@ -0,0 +1,216 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package metrics + +import ( + "sync" + "time" + + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" +) + +// Namespace is the Prometheus namespace for all act_runner metrics. +const Namespace = "act_runner" + +// Label value constants for Prometheus metrics. +// Using constants prevents typos from silently creating new time-series. +// +// LabelResult* values are used on metrics with label key "result" (RPC outcomes). +// LabelStatus* values are used on metrics with label key "status" (job outcomes). +const ( + LabelResultTask = "task" + LabelResultEmpty = "empty" + LabelResultError = "error" + LabelResultSuccess = "success" + + LabelMethodFetchTask = "FetchTask" + LabelMethodUpdateLog = "UpdateLog" + LabelMethodUpdateTask = "UpdateTask" + + LabelStatusSuccess = "success" + LabelStatusFailure = "failure" + LabelStatusCancelled = "cancelled" + LabelStatusSkipped = "skipped" + LabelStatusUnknown = "unknown" +) + +// rpcDurationBuckets covers the expected latency range for short-running +// UpdateLog / UpdateTask RPCs. FetchTask uses its own buckets (it has a 10s tail). +var rpcDurationBuckets = []float64{0.01, 0.05, 0.1, 0.25, 0.5, 1, 2, 5} + +// ResultToStatusLabel maps a runnerv1.Result to the "status" label value used on job metrics. +func ResultToStatusLabel(r runnerv1.Result) string { + switch r { + case runnerv1.Result_RESULT_SUCCESS: + return LabelStatusSuccess + case runnerv1.Result_RESULT_FAILURE: + return LabelStatusFailure + case runnerv1.Result_RESULT_CANCELLED: + return LabelStatusCancelled + case runnerv1.Result_RESULT_SKIPPED: + return LabelStatusSkipped + default: + return LabelStatusUnknown + } +} + +var ( + RunnerInfo = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "info", + Help: "Runner metadata. Always 1. Labels carry version and name.", + }, []string{"version", "name"}) + + RunnerCapacity = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "capacity", + Help: "Configured maximum concurrent jobs.", + }) + + PollFetchTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: Namespace, + Subsystem: "poll", + Name: "fetch_total", + Help: "Total number of FetchTask RPCs by result (task, empty, error).", + }, []string{"result"}) + + PollFetchDuration = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: Namespace, + Subsystem: "poll", + Name: "fetch_duration_seconds", + Help: "Latency of FetchTask RPCs, excluding expected long-poll timeouts.", + Buckets: []float64{0.01, 0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10}, + }) + + PollBackoffSeconds = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "poll", + Name: "backoff_seconds", + Help: "Last observed polling backoff interval. With Capacity > 1, reflects whichever worker wrote last.", + }) + + JobsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: Namespace, + Subsystem: "job", + Name: "total", + Help: "Total jobs processed by status (success, failure, cancelled, skipped, unknown).", + }, []string{"status"}) + + JobDuration = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: Namespace, + Subsystem: "job", + Name: "duration_seconds", + Help: "Duration of job execution from start to finish.", + Buckets: prometheus.ExponentialBuckets(1, 2, 14), // 1s to ~4.5h + }) + + ReportLogTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: Namespace, + Subsystem: "report", + Name: "log_total", + Help: "Total UpdateLog RPCs by result (success, error).", + }, []string{"result"}) + + ReportLogDuration = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: Namespace, + Subsystem: "report", + Name: "log_duration_seconds", + Help: "Latency of UpdateLog RPCs.", + Buckets: rpcDurationBuckets, + }) + + ReportStateTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: Namespace, + Subsystem: "report", + Name: "state_total", + Help: "Total UpdateTask (state) RPCs by result (success, error).", + }, []string{"result"}) + + ReportStateDuration = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: Namespace, + Subsystem: "report", + Name: "state_duration_seconds", + Help: "Latency of UpdateTask RPCs.", + Buckets: rpcDurationBuckets, + }) + + ReportLogBufferRows = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "report", + Name: "log_buffer_rows", + Help: "Current number of buffered log rows awaiting send.", + }) + + ClientErrors = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: Namespace, + Subsystem: "client", + Name: "errors_total", + Help: "Total client RPC errors by method.", + }, []string{"method"}) +) + +// Registry is the custom Prometheus registry used by the runner. +var Registry = prometheus.NewRegistry() + +var initOnce sync.Once + +// Init registers all static metrics and the standard Go/process collectors. +// Safe to call multiple times; only the first call has effect. +func Init() { + initOnce.Do(func() { + Registry.MustRegister( + collectors.NewGoCollector(), + collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), + RunnerInfo, RunnerCapacity, + PollFetchTotal, PollFetchDuration, PollBackoffSeconds, + JobsTotal, JobDuration, + ReportLogTotal, ReportLogDuration, + ReportStateTotal, ReportStateDuration, ReportLogBufferRows, + ClientErrors, + ) + }) +} + +// RegisterUptimeFunc registers a GaugeFunc that reports seconds since startTime. +func RegisterUptimeFunc(startTime time.Time) { + Registry.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "uptime_seconds", + Help: "Seconds since the runner daemon started.", + }, + func() float64 { return time.Since(startTime).Seconds() }, + )) +} + +// RegisterRunningJobsFunc registers GaugeFuncs for the running job count and +// capacity utilisation ratio, evaluated lazily at Prometheus scrape time. +func RegisterRunningJobsFunc(countFn func() int64, capacity int) { + capF := float64(capacity) + Registry.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "job", + Name: "running", + Help: "Number of jobs currently executing.", + }, + func() float64 { return float64(countFn()) }, + )) + Registry.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "job", + Name: "capacity_utilization_ratio", + Help: "Ratio of running jobs to configured capacity (0.0-1.0).", + }, + func() float64 { + if capF <= 0 { + return 0 + } + return float64(countFn()) / capF + }, + )) +} diff --git a/internal/pkg/metrics/server.go b/internal/pkg/metrics/server.go new file mode 100644 index 00000000..8195e8b7 --- /dev/null +++ b/internal/pkg/metrics/server.go @@ -0,0 +1,50 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package metrics //nolint:revive // "metrics" is the conventional package name for Prometheus instrumentation; runtime/metrics stdlib is not used here. + +import ( + "context" + "net/http" + "time" + + "github.com/prometheus/client_golang/prometheus/promhttp" + log "github.com/sirupsen/logrus" +) + +// StartServer starts an HTTP server that serves Prometheus metrics on /metrics +// and a liveness check on /healthz. The server shuts down when ctx is cancelled. +// Call Init() before StartServer to register metrics with the Registry. +func StartServer(ctx context.Context, addr string) { + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.HandlerFor(Registry, promhttp.HandlerOpts{})) + mux.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ok")) + }) + + srv := &http.Server{ + Addr: addr, + Handler: mux, + ReadHeaderTimeout: 5 * time.Second, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + IdleTimeout: 60 * time.Second, + } + + go func() { + log.Infof("metrics server listening on %s", addr) + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.WithError(err).Error("metrics server failed") + } + }() + + go func() { + <-ctx.Done() + shutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := srv.Shutdown(shutCtx); err != nil { + log.WithError(err).Warn("metrics server shutdown error") + } + }() +} diff --git a/internal/pkg/report/reporter.go b/internal/pkg/report/reporter.go index b7505cbd..9f1bf2c2 100644 --- a/internal/pkg/report/reporter.go +++ b/internal/pkg/report/reporter.go @@ -21,6 +21,7 @@ import ( "gitea.com/gitea/act_runner/internal/pkg/client" "gitea.com/gitea/act_runner/internal/pkg/config" + "gitea.com/gitea/act_runner/internal/pkg/metrics" ) type Reporter struct { @@ -36,6 +37,11 @@ type Reporter struct { logReplacer *strings.Replacer oldnew []string + // lastLogBufferRows is the last value written to the ReportLogBufferRows + // gauge; guarded by clientM (the same lock held around each ReportLog call) + // so the gauge skips no-op Set calls when the buffer size is unchanged. + lastLogBufferRows int + state *runnerv1.TaskState stateChanged bool stateMu sync.RWMutex @@ -93,6 +99,13 @@ func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.C return rv } +// Result returns the final job result. Safe to call after Close() returns. +func (r *Reporter) Result() runnerv1.Result { + r.stateMu.RLock() + defer r.stateMu.RUnlock() + return r.state.Result +} + func (r *Reporter) ResetSteps(l int) { r.stateMu.Lock() defer r.stateMu.Unlock() @@ -421,15 +434,20 @@ func (r *Reporter) ReportLog(noMore bool) error { return nil } + start := time.Now() resp, err := r.client.UpdateLog(r.ctx, connect.NewRequest(&runnerv1.UpdateLogRequest{ TaskId: r.state.Id, Index: int64(r.logOffset), Rows: rows, NoMore: noMore, })) + metrics.ReportLogDuration.Observe(time.Since(start).Seconds()) if err != nil { + metrics.ReportLogTotal.WithLabelValues(metrics.LabelResultError).Inc() + metrics.ClientErrors.WithLabelValues(metrics.LabelMethodUpdateLog).Inc() return err } + metrics.ReportLogTotal.WithLabelValues(metrics.LabelResultSuccess).Inc() ack := int(resp.Msg.AckIndex) if ack < r.logOffset { @@ -440,7 +458,12 @@ func (r *Reporter) ReportLog(noMore bool) error { r.logRows = r.logRows[ack-r.logOffset:] submitted := r.logOffset + len(rows) r.logOffset = ack + remaining := len(r.logRows) r.stateMu.Unlock() + if remaining != r.lastLogBufferRows { + metrics.ReportLogBufferRows.Set(float64(remaining)) + r.lastLogBufferRows = remaining + } if noMore && ack < submitted { return errors.New("not all logs are submitted") @@ -479,16 +502,21 @@ func (r *Reporter) ReportState(reportResult bool) error { state.Result = runnerv1.Result_RESULT_UNSPECIFIED } + start := time.Now() resp, err := r.client.UpdateTask(r.ctx, connect.NewRequest(&runnerv1.UpdateTaskRequest{ State: state, Outputs: outputs, })) + metrics.ReportStateDuration.Observe(time.Since(start).Seconds()) if err != nil { + metrics.ReportStateTotal.WithLabelValues(metrics.LabelResultError).Inc() + metrics.ClientErrors.WithLabelValues(metrics.LabelMethodUpdateTask).Inc() r.stateMu.Lock() r.stateChanged = true r.stateMu.Unlock() return err } + metrics.ReportStateTotal.WithLabelValues(metrics.LabelResultSuccess).Inc() for _, k := range resp.Msg.SentOutputs { r.outputs.Store(k, struct{}{})