Skip to content

Commit

Permalink
prometheus metrics endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Jul 31, 2024
1 parent cabcf84 commit 2cfe928
Show file tree
Hide file tree
Showing 9 changed files with 215 additions and 19 deletions.
6 changes: 4 additions & 2 deletions cmd/curio/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ import (
"github.com/filecoin-project/curio/api/client"
"github.com/filecoin-project/curio/build"
"github.com/filecoin-project/curio/deps"
"github.com/filecoin-project/curio/lib/metrics"
"github.com/filecoin-project/curio/lib/paths"
"github.com/filecoin-project/curio/lib/repo"
"github.com/filecoin-project/curio/web"

lapi "github.com/filecoin-project/lotus/api"
cliutil "github.com/filecoin-project/lotus/cli/util"
"github.com/filecoin-project/lotus/lib/rpcenc"
"github.com/filecoin-project/lotus/metrics"
lotusmetrics "github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/metrics/proxy"
"github.com/filecoin-project/lotus/storage/pipeline/piece"
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
Expand Down Expand Up @@ -71,6 +72,7 @@ func CurioHandler(
mux.Handle("/rpc/v0", rpcServer)
mux.Handle("/rpc/streams/v0/push/{uuid}", readerHandler)
mux.PathPrefix("/remote").HandlerFunc(remote)
mux.Handle("/debug/metrics", metrics.Exporter())
mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof

if !permissioned {
Expand Down Expand Up @@ -283,7 +285,7 @@ func ListenAndServe(ctx context.Context, dependencies *deps.Deps, shutdownChan c
permissioned),
ReadHeaderTimeout: time.Minute * 3,
BaseContext: func(listener net.Listener) context.Context {
ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-worker"))
ctx, _ := tag.New(context.Background(), tag.Upsert(lotusmetrics.APIInterface, "curio"))
return ctx
},
Addr: dependencies.ListenAddr,
Expand Down
9 changes: 1 addition & 8 deletions cmd/curio/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,7 @@ var runCmd = &cli.Command{
ctxclose()
}()
}
// Register all metric views
/*
if err := view.Register(
metrics.MinerNodeViews...,
); err != nil {
log.Fatalf("Cannot register the view: %v", err)
}
*/

// Set the metric to one so it is published to the exporter
stats.Record(ctx, metrics.LotusInfo.M(1))

Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/filecoin-project/curio
go 1.22.3

require (
contrib.go.opencensus.io/exporter/prometheus v0.4.2
github.com/BurntSushi/toml v1.3.2
github.com/KarpelesLab/reflink v1.0.1
github.com/alecthomas/jsonschema v0.0.0-20200530073317-71f438968921
Expand Down Expand Up @@ -56,9 +57,11 @@ require (
github.com/multiformats/go-multiaddr v0.12.4
github.com/open-rpc/meta-schema v0.0.0-20201029221707-1b72ef2ea333
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.19.1
github.com/puzpuzpuz/xsync/v2 v2.4.0
github.com/raulk/clock v1.1.0
github.com/samber/lo v1.39.0
github.com/sirupsen/logrus v1.9.2
github.com/snadrus/must v0.0.0-20240605044437-98cedd57f8eb
github.com/stretchr/testify v1.9.0
github.com/urfave/cli/v2 v2.25.5
Expand All @@ -77,7 +80,6 @@ require (
)

require (
contrib.go.opencensus.io/exporter/prometheus v0.4.2 // indirect
github.com/GeertJohan/go.incremental v1.0.0 // indirect
github.com/GeertJohan/go.rice v1.0.3 // indirect
github.com/Gurpartap/async v0.0.0-20180927173644-4f7f499dd9ee // indirect
Expand Down Expand Up @@ -267,7 +269,6 @@ require (
github.com/pion/webrtc/v3 v3.2.40 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/polydawn/refmt v0.89.0 // indirect
github.com/prometheus/client_golang v1.19.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
Expand All @@ -279,7 +280,6 @@ require (
github.com/rivo/uniseg v0.4.7 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/shirou/gopsutil v2.18.12+incompatible // indirect
github.com/sirupsen/logrus v1.9.2 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect
github.com/triplewz/poseidon v0.0.0-20230828015038-79d8165c88ed // indirect
Expand Down
30 changes: 29 additions & 1 deletion harmony/harmonytask/harmonytask.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"sync/atomic"
"time"

"go.opencensus.io/stats"
"go.opencensus.io/tag"

"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/filecoin-project/curio/harmony/resources"
)
Expand Down Expand Up @@ -256,6 +259,8 @@ func (e *TaskEngine) GracefullyTerminate() {
func (e *TaskEngine) poller() {
nextWait := POLL_NEXT_DURATION
for {
stats.Record(context.Background(), TaskMeasures.PollerIterations.M(1))

select {
case <-time.After(nextWait): // Find work periodically
case <-e.ctx.Done(): ///////////////////// Graceful exit
Expand All @@ -270,6 +275,22 @@ func (e *TaskEngine) poller() {
if time.Since(e.lastFollowTime) > FOLLOW_FREQUENCY {
e.followWorkInDB()
}

// update resource usage
availableResources := e.ResourcesAvailable()
totalResources := e.Resources()

cpuUsage := 1 - float64(availableResources.Cpu)/float64(totalResources.Cpu)
stats.Record(context.Background(), TaskMeasures.CpuUsage.M(cpuUsage*100))

if totalResources.Gpu > 0 {
gpuUsage := 1 - availableResources.Gpu/totalResources.Gpu
stats.Record(context.Background(), TaskMeasures.GpuUsage.M(gpuUsage*100))
}

ramUsage := 1 - float64(availableResources.Ram)/float64(totalResources.Ram)
stats.Record(context.Background(), TaskMeasures.RamUsage.M(ramUsage*100))

}
}

Expand Down Expand Up @@ -401,6 +422,13 @@ func (e *TaskEngine) Resources() resources.Resources {
var Registry = map[string]TaskInterface{}

func Reg(t TaskInterface) bool {
Registry[t.TypeDetails().Name] = t
name := t.TypeDetails().Name
Registry[name] = t

// reset metrics
_ = stats.RecordWithTags(context.Background(), []tag.Mutator{
tag.Upsert(taskNameTag, name),
}, TaskMeasures.ActiveTasks.M(0))

return true
}
105 changes: 105 additions & 0 deletions harmony/harmonytask/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package harmonytask

import (
promclient "github.com/prometheus/client_golang/prometheus"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)

var (
taskNameTag, _ = tag.NewKey("task_name")
sourceTag, _ = tag.NewKey("source")
pre = "harmonytask_"

// tasks can be short, but can extend to hours
durationBuckets = []float64{0.5, 1, 3, 6, 10, 20, 30, 60, 120, 300, 600, 1800, 3600, 7200, 18000, 36000}
)

// TaskMeasures groups all harmonytask metrics.
var TaskMeasures = struct {
TasksStarted *stats.Int64Measure
TasksCompleted *stats.Int64Measure
TasksFailed *stats.Int64Measure
TaskDuration promclient.Histogram
ActiveTasks *stats.Int64Measure
CpuUsage *stats.Float64Measure
GpuUsage *stats.Float64Measure
RamUsage *stats.Float64Measure
PollerIterations *stats.Int64Measure
AddedTasks *stats.Int64Measure
}{
TasksStarted: stats.Int64(pre+"tasks_started", "Total number of tasks started.", stats.UnitDimensionless),
TasksCompleted: stats.Int64(pre+"tasks_completed", "Total number of tasks completed successfully.", stats.UnitDimensionless),
TasksFailed: stats.Int64(pre+"tasks_failed", "Total number of tasks that failed.", stats.UnitDimensionless),
TaskDuration: promclient.NewHistogram(promclient.HistogramOpts{
Name: pre + "task_duration_seconds",
Buckets: durationBuckets,
Help: "The histogram of task durations in seconds.",
}),
ActiveTasks: stats.Int64(pre+"active_tasks", "Current number of active tasks.", stats.UnitDimensionless),
CpuUsage: stats.Float64(pre+"cpu_usage", "Percentage of CPU in use.", stats.UnitDimensionless),
GpuUsage: stats.Float64(pre+"gpu_usage", "Percentage of GPU in use.", stats.UnitDimensionless),
RamUsage: stats.Float64(pre+"ram_usage", "Percentage of RAM in use.", stats.UnitDimensionless),
PollerIterations: stats.Int64(pre+"poller_iterations", "Total number of poller iterations.", stats.UnitDimensionless),
AddedTasks: stats.Int64(pre+"added_tasks", "Total number of tasks added.", stats.UnitDimensionless),
}

// TaskViews groups all harmonytask-related default views.
func init() {
err := view.Register(
&view.View{
Measure: TaskMeasures.TasksStarted,
Aggregation: view.Sum(),
TagKeys: []tag.Key{taskNameTag, sourceTag},
},
&view.View{
Measure: TaskMeasures.TasksCompleted,
Aggregation: view.Sum(),
TagKeys: []tag.Key{taskNameTag},
},
&view.View{
Measure: TaskMeasures.TasksFailed,
Aggregation: view.Sum(),
TagKeys: []tag.Key{taskNameTag},
},
&view.View{
Measure: TaskMeasures.ActiveTasks,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{taskNameTag},
},
&view.View{
Measure: TaskMeasures.CpuUsage,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{},
},
&view.View{
Measure: TaskMeasures.GpuUsage,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{},
},
&view.View{
Measure: TaskMeasures.RamUsage,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{},
},
&view.View{
Measure: TaskMeasures.PollerIterations,
Aggregation: view.Sum(),
TagKeys: []tag.Key{},
},
&view.View{
Measure: TaskMeasures.AddedTasks,
Aggregation: view.Sum(),
TagKeys: []tag.Key{taskNameTag},
},
)
if err != nil {
panic(err)
}

err = promclient.Register(TaskMeasures.TaskDuration)
if err != nil {
panic(err)
}
}
40 changes: 40 additions & 0 deletions harmony/harmonytask/task_type_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"time"

logging "github.com/ipfs/go-log/v2"
"go.opencensus.io/stats"
"go.opencensus.io/tag"

"github.com/filecoin-project/curio/harmony/harmonydb"
)
Expand Down Expand Up @@ -50,6 +52,13 @@ retryAddTask:
log.Errorw("Could not add task. AddTasFunc failed", "error", err, "type", h.Name)
return
}

err = stats.RecordWithTags(context.Background(), []tag.Mutator{
tag.Upsert(taskNameTag, h.Name),
}, TaskMeasures.AddedTasks.M(1))
if err != nil {
log.Errorw("Could not record added task", "error", err)
}
}

const (
Expand Down Expand Up @@ -154,7 +163,16 @@ canAcceptAgain:
}
}

_ = stats.RecordWithTags(context.Background(), []tag.Mutator{
tag.Upsert(taskNameTag, h.Name),
tag.Upsert(sourceTag, from),
}, TaskMeasures.TasksStarted.M(1))

h.Count.Add(1)
_ = stats.RecordWithTags(context.Background(), []tag.Mutator{
tag.Upsert(taskNameTag, h.Name),
}, TaskMeasures.ActiveTasks.M(int64(h.Count.Load())))

go func() {
log.Infow("Beginning work on Task", "id", *tID, "from", from, "name", h.Name)

Expand Down Expand Up @@ -204,6 +222,28 @@ canAcceptAgain:
func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done bool, doErr error) {
workEnd := time.Now()
retryWait := time.Millisecond * 100

{
// metrics

_ = stats.RecordWithTags(context.Background(), []tag.Mutator{
tag.Upsert(taskNameTag, h.Name),
}, TaskMeasures.ActiveTasks.M(int64(h.Count.Load())))

duration := workEnd.Sub(workStart).Seconds()
TaskMeasures.TaskDuration.Observe(duration)

if done {
_ = stats.RecordWithTags(context.Background(), []tag.Mutator{
tag.Upsert(taskNameTag, h.Name),
}, TaskMeasures.TasksCompleted.M(1))
} else {
_ = stats.RecordWithTags(context.Background(), []tag.Mutator{
tag.Upsert(taskNameTag, h.Name),
}, TaskMeasures.TasksFailed.M(1))
}
}

retryRecordCompletion:
cm, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) {
var postedTime time.Time
Expand Down
29 changes: 29 additions & 0 deletions lib/metrics/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package metrics

import (
"net/http"

"contrib.go.opencensus.io/exporter/prometheus"
promclient "github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
)

func Exporter() http.Handler {
// Prometheus globals are exposed as interfaces, but the prometheus
// OpenCensus exporter expects a concrete *Registry. The concrete type of
// the globals are actually *Registry, so we downcast them, staying
// defensive in case things change under the hood.
registry, ok := promclient.DefaultRegisterer.(*promclient.Registry)
if !ok {
log.Warnf("failed to export default prometheus registry; some metrics will be unavailable; unexpected type: %T", promclient.DefaultRegisterer)
}
exporter, err := prometheus.NewExporter(prometheus.Options{
Registry: registry,
Namespace: "curio",
})
if err != nil {
log.Errorf("could not create the prometheus stats exporter: %v", err)
}

return exporter
}
4 changes: 0 additions & 4 deletions market/lmrpc/minerhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/lib/rpcenc"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/metrics/proxy"
"github.com/filecoin-project/lotus/node/impl"
)
Expand Down Expand Up @@ -51,9 +50,6 @@ func MinerHandler(a api.StorageMiner, permissioned bool) (http.Handler, error) {
m := mux.NewRouter()
m.Handle("/rpc/v0", rpcServer)
m.Handle("/rpc/streams/v0/push/{uuid}", readerHandler)
// debugging
m.Handle("/debug/metrics", metrics.Exporter())
m.PathPrefix("/").Handler(http.DefaultServeMux) // pprof

var hnd http.Handler = m
if permissioned {
Expand Down
5 changes: 4 additions & 1 deletion web/static/pages/node_info/node-info.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ customElements.define('node-info',class NodeInfoElement extends LitElement {
<td>${this.data.Info.CPU}</td>
<td>${this.toHumanBytes(this.data.Info.Memory)}</td>
<td>${this.data.Info.GPU}</td>
<td><a href="http://${this.data.Info.Host}/debug/pprof">[pprof]</a></td>
<td>
<a href="http://${this.data.Info.Host}/debug/pprof">[pprof]</a>
<a href="http://${this.data.Info.Host}/debug/metrics">[metrics]</a>
</td>
</tr>
</table>
<hr>
Expand Down

0 comments on commit 2cfe928

Please sign in to comment.