diff --git a/cmd/curio/rpc/rpc.go b/cmd/curio/rpc/rpc.go index 984d8006f..882003c94 100644 --- a/cmd/curio/rpc/rpc.go +++ b/cmd/curio/rpc/rpc.go @@ -30,6 +30,7 @@ import ( "github.com/filecoin-project/curio/api/client" "github.com/filecoin-project/curio/build" "github.com/filecoin-project/curio/deps" + "github.com/filecoin-project/curio/lib/metrics" "github.com/filecoin-project/curio/lib/paths" "github.com/filecoin-project/curio/lib/repo" "github.com/filecoin-project/curio/web" @@ -37,7 +38,7 @@ import ( lapi "github.com/filecoin-project/lotus/api" cliutil "github.com/filecoin-project/lotus/cli/util" "github.com/filecoin-project/lotus/lib/rpcenc" - "github.com/filecoin-project/lotus/metrics" + lotusmetrics "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/metrics/proxy" "github.com/filecoin-project/lotus/storage/pipeline/piece" "github.com/filecoin-project/lotus/storage/sealer/fsutil" @@ -71,6 +72,7 @@ func CurioHandler( mux.Handle("/rpc/v0", rpcServer) mux.Handle("/rpc/streams/v0/push/{uuid}", readerHandler) mux.PathPrefix("/remote").HandlerFunc(remote) + mux.Handle("/debug/metrics", metrics.Exporter()) mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof if !permissioned { @@ -283,7 +285,7 @@ func ListenAndServe(ctx context.Context, dependencies *deps.Deps, shutdownChan c permissioned), ReadHeaderTimeout: time.Minute * 3, BaseContext: func(listener net.Listener) context.Context { - ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-worker")) + ctx, _ := tag.New(context.Background(), tag.Upsert(lotusmetrics.APIInterface, "curio")) return ctx }, Addr: dependencies.ListenAddr, diff --git a/cmd/curio/run.go b/cmd/curio/run.go index e6eb99275..9a359cddf 100644 --- a/cmd/curio/run.go +++ b/cmd/curio/run.go @@ -106,14 +106,7 @@ var runCmd = &cli.Command{ ctxclose() }() } - // Register all metric views - /* - if err := view.Register( - metrics.MinerNodeViews..., - ); err != nil { - log.Fatalf("Cannot register the view: %v", err) - } - */ + // Set the metric to one so it is published to the exporter stats.Record(ctx, metrics.LotusInfo.M(1)) diff --git a/go.mod b/go.mod index faa144ec9..00105232d 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/filecoin-project/curio go 1.22.3 require ( + contrib.go.opencensus.io/exporter/prometheus v0.4.2 github.com/BurntSushi/toml v1.3.2 github.com/KarpelesLab/reflink v1.0.1 github.com/alecthomas/jsonschema v0.0.0-20200530073317-71f438968921 @@ -56,9 +57,11 @@ require ( github.com/multiformats/go-multiaddr v0.12.4 github.com/open-rpc/meta-schema v0.0.0-20201029221707-1b72ef2ea333 github.com/pkg/errors v0.9.1 + github.com/prometheus/client_golang v1.19.1 github.com/puzpuzpuz/xsync/v2 v2.4.0 github.com/raulk/clock v1.1.0 github.com/samber/lo v1.39.0 + github.com/sirupsen/logrus v1.9.2 github.com/snadrus/must v0.0.0-20240605044437-98cedd57f8eb github.com/stretchr/testify v1.9.0 github.com/urfave/cli/v2 v2.25.5 @@ -77,7 +80,6 @@ require ( ) require ( - contrib.go.opencensus.io/exporter/prometheus v0.4.2 // indirect github.com/GeertJohan/go.incremental v1.0.0 // indirect github.com/GeertJohan/go.rice v1.0.3 // indirect github.com/Gurpartap/async v0.0.0-20180927173644-4f7f499dd9ee // indirect @@ -267,7 +269,6 @@ require ( github.com/pion/webrtc/v3 v3.2.40 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/polydawn/refmt v0.89.0 // indirect - github.com/prometheus/client_golang v1.19.1 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.55.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect @@ -279,7 +280,6 @@ require ( github.com/rivo/uniseg v0.4.7 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/shirou/gopsutil v2.18.12+incompatible // indirect - github.com/sirupsen/logrus v1.9.2 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect github.com/triplewz/poseidon v0.0.0-20230828015038-79d8165c88ed // indirect diff --git a/harmony/harmonytask/harmonytask.go b/harmony/harmonytask/harmonytask.go index d78d0ee00..9fd3cfbb9 100644 --- a/harmony/harmonytask/harmonytask.go +++ b/harmony/harmonytask/harmonytask.go @@ -7,6 +7,9 @@ import ( "sync/atomic" "time" + "go.opencensus.io/stats" + "go.opencensus.io/tag" + "github.com/filecoin-project/curio/harmony/harmonydb" "github.com/filecoin-project/curio/harmony/resources" ) @@ -256,6 +259,8 @@ func (e *TaskEngine) GracefullyTerminate() { func (e *TaskEngine) poller() { nextWait := POLL_NEXT_DURATION for { + stats.Record(context.Background(), TaskMeasures.PollerIterations.M(1)) + select { case <-time.After(nextWait): // Find work periodically case <-e.ctx.Done(): ///////////////////// Graceful exit @@ -270,6 +275,22 @@ func (e *TaskEngine) poller() { if time.Since(e.lastFollowTime) > FOLLOW_FREQUENCY { e.followWorkInDB() } + + // update resource usage + availableResources := e.ResourcesAvailable() + totalResources := e.Resources() + + cpuUsage := 1 - float64(availableResources.Cpu)/float64(totalResources.Cpu) + stats.Record(context.Background(), TaskMeasures.CpuUsage.M(cpuUsage*100)) + + if totalResources.Gpu > 0 { + gpuUsage := 1 - availableResources.Gpu/totalResources.Gpu + stats.Record(context.Background(), TaskMeasures.GpuUsage.M(gpuUsage*100)) + } + + ramUsage := 1 - float64(availableResources.Ram)/float64(totalResources.Ram) + stats.Record(context.Background(), TaskMeasures.RamUsage.M(ramUsage*100)) + } } @@ -401,6 +422,13 @@ func (e *TaskEngine) Resources() resources.Resources { var Registry = map[string]TaskInterface{} func Reg(t TaskInterface) bool { - Registry[t.TypeDetails().Name] = t + name := t.TypeDetails().Name + Registry[name] = t + + // reset metrics + _ = stats.RecordWithTags(context.Background(), []tag.Mutator{ + tag.Upsert(taskNameTag, name), + }, TaskMeasures.ActiveTasks.M(0)) + return true } diff --git a/harmony/harmonytask/metrics.go b/harmony/harmonytask/metrics.go new file mode 100644 index 000000000..76a6b2e5a --- /dev/null +++ b/harmony/harmonytask/metrics.go @@ -0,0 +1,105 @@ +package harmonytask + +import ( + promclient "github.com/prometheus/client_golang/prometheus" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" +) + +var ( + taskNameTag, _ = tag.NewKey("task_name") + sourceTag, _ = tag.NewKey("source") + pre = "harmonytask_" + + // tasks can be short, but can extend to hours + durationBuckets = []float64{0.5, 1, 3, 6, 10, 20, 30, 60, 120, 300, 600, 1800, 3600, 7200, 18000, 36000} +) + +// TaskMeasures groups all harmonytask metrics. +var TaskMeasures = struct { + TasksStarted *stats.Int64Measure + TasksCompleted *stats.Int64Measure + TasksFailed *stats.Int64Measure + TaskDuration promclient.Histogram + ActiveTasks *stats.Int64Measure + CpuUsage *stats.Float64Measure + GpuUsage *stats.Float64Measure + RamUsage *stats.Float64Measure + PollerIterations *stats.Int64Measure + AddedTasks *stats.Int64Measure +}{ + TasksStarted: stats.Int64(pre+"tasks_started", "Total number of tasks started.", stats.UnitDimensionless), + TasksCompleted: stats.Int64(pre+"tasks_completed", "Total number of tasks completed successfully.", stats.UnitDimensionless), + TasksFailed: stats.Int64(pre+"tasks_failed", "Total number of tasks that failed.", stats.UnitDimensionless), + TaskDuration: promclient.NewHistogram(promclient.HistogramOpts{ + Name: pre + "task_duration_seconds", + Buckets: durationBuckets, + Help: "The histogram of task durations in seconds.", + }), + ActiveTasks: stats.Int64(pre+"active_tasks", "Current number of active tasks.", stats.UnitDimensionless), + CpuUsage: stats.Float64(pre+"cpu_usage", "Percentage of CPU in use.", stats.UnitDimensionless), + GpuUsage: stats.Float64(pre+"gpu_usage", "Percentage of GPU in use.", stats.UnitDimensionless), + RamUsage: stats.Float64(pre+"ram_usage", "Percentage of RAM in use.", stats.UnitDimensionless), + PollerIterations: stats.Int64(pre+"poller_iterations", "Total number of poller iterations.", stats.UnitDimensionless), + AddedTasks: stats.Int64(pre+"added_tasks", "Total number of tasks added.", stats.UnitDimensionless), +} + +// TaskViews groups all harmonytask-related default views. +func init() { + err := view.Register( + &view.View{ + Measure: TaskMeasures.TasksStarted, + Aggregation: view.Sum(), + TagKeys: []tag.Key{taskNameTag, sourceTag}, + }, + &view.View{ + Measure: TaskMeasures.TasksCompleted, + Aggregation: view.Sum(), + TagKeys: []tag.Key{taskNameTag}, + }, + &view.View{ + Measure: TaskMeasures.TasksFailed, + Aggregation: view.Sum(), + TagKeys: []tag.Key{taskNameTag}, + }, + &view.View{ + Measure: TaskMeasures.ActiveTasks, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{taskNameTag}, + }, + &view.View{ + Measure: TaskMeasures.CpuUsage, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{}, + }, + &view.View{ + Measure: TaskMeasures.GpuUsage, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{}, + }, + &view.View{ + Measure: TaskMeasures.RamUsage, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{}, + }, + &view.View{ + Measure: TaskMeasures.PollerIterations, + Aggregation: view.Sum(), + TagKeys: []tag.Key{}, + }, + &view.View{ + Measure: TaskMeasures.AddedTasks, + Aggregation: view.Sum(), + TagKeys: []tag.Key{taskNameTag}, + }, + ) + if err != nil { + panic(err) + } + + err = promclient.Register(TaskMeasures.TaskDuration) + if err != nil { + panic(err) + } +} diff --git a/harmony/harmonytask/task_type_handler.go b/harmony/harmonytask/task_type_handler.go index da7f9e36d..ed56c4d47 100644 --- a/harmony/harmonytask/task_type_handler.go +++ b/harmony/harmonytask/task_type_handler.go @@ -10,6 +10,8 @@ import ( "time" logging "github.com/ipfs/go-log/v2" + "go.opencensus.io/stats" + "go.opencensus.io/tag" "github.com/filecoin-project/curio/harmony/harmonydb" ) @@ -50,6 +52,13 @@ retryAddTask: log.Errorw("Could not add task. AddTasFunc failed", "error", err, "type", h.Name) return } + + err = stats.RecordWithTags(context.Background(), []tag.Mutator{ + tag.Upsert(taskNameTag, h.Name), + }, TaskMeasures.AddedTasks.M(1)) + if err != nil { + log.Errorw("Could not record added task", "error", err) + } } const ( @@ -154,7 +163,16 @@ canAcceptAgain: } } + _ = stats.RecordWithTags(context.Background(), []tag.Mutator{ + tag.Upsert(taskNameTag, h.Name), + tag.Upsert(sourceTag, from), + }, TaskMeasures.TasksStarted.M(1)) + h.Count.Add(1) + _ = stats.RecordWithTags(context.Background(), []tag.Mutator{ + tag.Upsert(taskNameTag, h.Name), + }, TaskMeasures.ActiveTasks.M(int64(h.Count.Load()))) + go func() { log.Infow("Beginning work on Task", "id", *tID, "from", from, "name", h.Name) @@ -204,6 +222,28 @@ canAcceptAgain: func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done bool, doErr error) { workEnd := time.Now() retryWait := time.Millisecond * 100 + + { + // metrics + + _ = stats.RecordWithTags(context.Background(), []tag.Mutator{ + tag.Upsert(taskNameTag, h.Name), + }, TaskMeasures.ActiveTasks.M(int64(h.Count.Load()))) + + duration := workEnd.Sub(workStart).Seconds() + TaskMeasures.TaskDuration.Observe(duration) + + if done { + _ = stats.RecordWithTags(context.Background(), []tag.Mutator{ + tag.Upsert(taskNameTag, h.Name), + }, TaskMeasures.TasksCompleted.M(1)) + } else { + _ = stats.RecordWithTags(context.Background(), []tag.Mutator{ + tag.Upsert(taskNameTag, h.Name), + }, TaskMeasures.TasksFailed.M(1)) + } + } + retryRecordCompletion: cm, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) { var postedTime time.Time diff --git a/lib/metrics/exporter.go b/lib/metrics/exporter.go new file mode 100644 index 000000000..a098d71c8 --- /dev/null +++ b/lib/metrics/exporter.go @@ -0,0 +1,29 @@ +package metrics + +import ( + "net/http" + + "contrib.go.opencensus.io/exporter/prometheus" + promclient "github.com/prometheus/client_golang/prometheus" + log "github.com/sirupsen/logrus" +) + +func Exporter() http.Handler { + // Prometheus globals are exposed as interfaces, but the prometheus + // OpenCensus exporter expects a concrete *Registry. The concrete type of + // the globals are actually *Registry, so we downcast them, staying + // defensive in case things change under the hood. + registry, ok := promclient.DefaultRegisterer.(*promclient.Registry) + if !ok { + log.Warnf("failed to export default prometheus registry; some metrics will be unavailable; unexpected type: %T", promclient.DefaultRegisterer) + } + exporter, err := prometheus.NewExporter(prometheus.Options{ + Registry: registry, + Namespace: "curio", + }) + if err != nil { + log.Errorf("could not create the prometheus stats exporter: %v", err) + } + + return exporter +} diff --git a/market/lmrpc/minerhandler.go b/market/lmrpc/minerhandler.go index 8d60bb360..8bb2ab907 100644 --- a/market/lmrpc/minerhandler.go +++ b/market/lmrpc/minerhandler.go @@ -11,7 +11,6 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/lib/rpcenc" - "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/metrics/proxy" "github.com/filecoin-project/lotus/node/impl" ) @@ -51,9 +50,6 @@ func MinerHandler(a api.StorageMiner, permissioned bool) (http.Handler, error) { m := mux.NewRouter() m.Handle("/rpc/v0", rpcServer) m.Handle("/rpc/streams/v0/push/{uuid}", readerHandler) - // debugging - m.Handle("/debug/metrics", metrics.Exporter()) - m.PathPrefix("/").Handler(http.DefaultServeMux) // pprof var hnd http.Handler = m if permissioned { diff --git a/web/static/pages/node_info/node-info.mjs b/web/static/pages/node_info/node-info.mjs index ca6034e50..a5ce32b08 100644 --- a/web/static/pages/node_info/node-info.mjs +++ b/web/static/pages/node_info/node-info.mjs @@ -38,7 +38,10 @@ customElements.define('node-info',class NodeInfoElement extends LitElement { ${this.data.Info.CPU} ${this.toHumanBytes(this.data.Info.Memory)} ${this.data.Info.GPU} - [pprof] + + [pprof] + [metrics] +