Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

clear min and max on every ExportView #1182

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ require (
golang.org/x/net v0.0.0-20190620200207-3b0461eec859
golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd // indirect
golang.org/x/text v0.3.2 // indirect
google.golang.org/appengine v1.4.0 // indirect
google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb // indirect
google.golang.org/grpc v1.20.1
)
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZi
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1 h1:Hz2g2wirWK7H0qIIhGIqRGTuMwTE8HEKFnDZZ7lm9NU=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand Down
5 changes: 3 additions & 2 deletions plugin/runmetrics/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package runmetrics

import (
"errors"
"runtime"
"sync"

"go.opencensus.io/metric"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/metric/metricproducer"
"runtime"
"sync"
)

type (
Expand Down
6 changes: 3 additions & 3 deletions stats/view/aggregation_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ func (a *SumData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Po
// N+1 buckets.
type DistributionData struct {
Count int64 // number of data points aggregated
Min float64 // minimum value in the distribution
Max float64 // max value in the distribution
Min float64 // minimum value in the distribution, value is non-monotonic
Max float64 // max value in the distribution, value is non-monotonic
Mean float64 // mean of the distribution
SumOfSquaredDev float64 // sum of the squared deviation from the mean
SumOfSquaredDev float64 // sum of the squared deviation from the mean, non-monotnic exporters shouldn't use this
CountPerBucket []int64 // number of occurrences per bucket
// ExemplarsPerBucket is slice the same length as CountPerBucket containing
// an exemplar for the associated bucket, or nil.
Expand Down
11 changes: 11 additions & 0 deletions stats/view/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package view

import (
"math"
"sort"
"time"

Expand Down Expand Up @@ -56,6 +57,16 @@ func (c *collector) clearRows() {
c.signatures = make(map[string]AggregationData)
}

func (c *collector) resetValues() {
for _, ad := range c.signatures {
switch ad := ad.(type) {
case *DistributionData:
ad.Min = math.MaxFloat64
ad.Max = math.SmallestNonzeroFloat64
}
}
}

// encodeWithKeys encodes the map by using values
// only associated with the keys provided.
func encodeWithKeys(m *tag.Map, keys []tag.Key) []byte {
Expand Down
6 changes: 6 additions & 0 deletions stats/view/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ func (v *viewInternal) clearRows() {
v.collector.clearRows()
}

// resetValues resets values that need to be
// completely isolated for each bucket
func (v *viewInternal) resetValues() {
v.collector.resetValues()
}

func (v *viewInternal) collectedRows() []*Row {
return v.collector.collectedRows(v.view.TagKeys)
}
Expand Down
2 changes: 2 additions & 0 deletions stats/view/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ func (w *worker) reportView(v *viewInternal, now time.Time) {
e.ExportView(viewData)
}
exportersMu.Unlock()
v.resetValues()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. This should be also called from w.Read() method. This is for new mechanism (ExportMetrics)
  2. Apart from that reportUsage() should not go through all the views unless there is an exporter registered (used for old mechanism ExportView())
    If user configures both ways or multiple exporter using new mechanism then values of min/max is unpredictable.
  3. Min/Max doc should be updated to reflect the behavior.
  4. Add tests for min/max.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rghetia done, let me know if the last commit is what you were looking for 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test is good.
Item 2 above is not taken care of.

There should be new option in ReadAndExport method. If the option to reset min/max is specified then only do the reset.

Similarly, for old approach (export viewdata) there should be a new method view.ResetMinMaxOnExport() - It is rather a long name but I don't have a good suggestion. Alternatively, provide reset functionality only using new approach with ReadAndExport().

}

func (w *worker) reportUsage(now time.Time) {
Expand Down Expand Up @@ -275,6 +276,7 @@ func (w *worker) Read() []*metricdata.Metric {
metric := w.toMetric(v, now)
if metric != nil {
metrics = append(metrics, metric)
v.resetValues()
}
}
return metrics
Expand Down
147 changes: 123 additions & 24 deletions stats/view/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package view
import (
"context"
"errors"
"math"
"runtime/debug"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -261,36 +263,107 @@ func TestReportUsage(t *testing.T) {
}

for _, tt := range tests {
restart()
SetReportingPeriod(25 * time.Millisecond)
t.Run(tt.name, func(t *testing.T) {
restart()
SetReportingPeriod(25 * time.Millisecond)

if err := Register(tt.view); err != nil {
t.Fatalf("%v: cannot register: %v", tt.name, err)
}
if err := Register(tt.view); err != nil {
t.Fatalf("%v: cannot register: %v", tt.name, err)
}

e := &countExporter{}
RegisterExporter(e)
e := &countExporter{}
RegisterExporter(e)
defer UnregisterExporter(e)

stats.Record(ctx, m.M(1))
stats.Record(ctx, m.M(1))
stats.Record(ctx, m.M(1))
stats.Record(ctx, m.M(1))
stats.Record(ctx, m.M(1))
stats.Record(ctx, m.M(1))
stats.Record(ctx, m.M(1))
stats.Record(ctx, m.M(1))

time.Sleep(50 * time.Millisecond)
time.Sleep(50 * time.Millisecond)

stats.Record(ctx, m.M(1))
stats.Record(ctx, m.M(1))
stats.Record(ctx, m.M(1))
stats.Record(ctx, m.M(1))
stats.Record(ctx, m.M(1))
stats.Record(ctx, m.M(1))
stats.Record(ctx, m.M(1))
stats.Record(ctx, m.M(1))

time.Sleep(50 * time.Millisecond)
time.Sleep(50 * time.Millisecond)

e.Lock()
count := e.count
e.Unlock()
if got, want := count, tt.wantMaxCount; got > want {
t.Errorf("%v: got count data = %v; want at most %v", tt.name, got, want)
}
e.Lock()
count := e.count
e.Unlock()
if got, want := count, tt.wantMaxCount; got > want {
t.Errorf("%v: got count data = %v; want at most %v", tt.name, got, want)
}
})
}
}

func TestReportUsageMinMax(t *testing.T) {
ctx := context.Background()

m := stats.Float64("measure", "desc", "unit")

tests := []struct {
name string
view *View
data [][]float64
wantMin float64
wantMax float64
}{
{
name: "reset_data",
view: &View{Name: "const", Measure: m, Aggregation: Distribution(1, 4, 10, 12)},
data: [][]float64{{2, 2, 2, 2}, {4, 4, 4, 1}},
wantMin: 1,
wantMax: 4,
},
{
name: "no_data",
view: &View{Name: "const", Measure: m, Aggregation: Distribution(1, 4, 10, 12)},
wantMin: 0,
wantMax: 0,
},
{
name: "constant_data",
view: &View{Name: "const", Measure: m, Aggregation: Distribution(1, 4, 10, 12)},
data: [][]float64{{1, 1, 1, 1}, {1, 1, 1, 1}},
wantMin: 1,
wantMax: 1,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
restart()
SetReportingPeriod(25 * time.Millisecond)

if err := Register(tt.view); err != nil {
t.Fatalf("%v: cannot register: %v", tt.name, err)
}

e := &distributionExporter{}
RegisterExporter(e)
defer UnregisterExporter(e)

for _, batch := range tt.data {
for _, val := range batch {
stats.Record(ctx, m.M(val))
}
time.Sleep(50 * time.Millisecond)
}

e.Lock()
min := e.min
max := e.max
e.Unlock()
if got, want := min, tt.wantMin; got != want {
t.Errorf("%v: got min = %v; want %v", tt.name, got, want)
}
if got, want := max, tt.wantMax; got != want {
t.Errorf("%v: got max = %v; want %v", tt.name, got, want)
}
})
}

}
Expand Down Expand Up @@ -494,7 +567,11 @@ func (e *countExporter) ExportView(vd *Data) {
if len(vd.Rows) == 0 {
return
}
d := vd.Rows[0].Data.(*CountData)
d, ok := vd.Rows[0].Data.(*CountData)
if !ok {
debug.PrintStack()
panic("BYE")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary?

}

e.Lock()
defer e.Unlock()
Expand All @@ -514,6 +591,28 @@ func (e *vdExporter) ExportView(vd *Data) {
e.vds = append(e.vds, vd)
}

type distributionExporter struct {
sync.Mutex
min float64
max float64
}

func (e *distributionExporter) ExportView(vd *Data) {
if len(vd.Rows) == 0 {
return
}
d := vd.Rows[0].Data.(*DistributionData)

e.Lock()
defer e.Unlock()
if d.Min != math.MaxFloat64 {
e.min = d.Min
}
if d.Max != math.SmallestNonzeroFloat64 {
e.max = d.Max
}
}

// restart stops the current processors and creates a new one.
func restart() {
defaultWorker.stop()
Expand Down