diff --git a/.github/workflows/linkinator.yaml b/.github/workflows/linkinator.yaml index 47e7df14..b80b0f81 100644 --- a/.github/workflows/linkinator.yaml +++ b/.github/workflows/linkinator.yaml @@ -19,4 +19,4 @@ jobs: paths: "**/*.md" markdown: true retry: true - linksToSkip: "https://github.com/kedacore/http-add-on/pkgs/container/http-add-on-interceptor, https://github.com/kedacore/http-add-on/pkgs/container/http-add-on-operator, https://github.com/kedacore/http-add-on/pkgs/container/http-add-on-scaler, http://opentelemetry-collector.open-telemetry-system:4318" + linksToSkip: "https://github.com/kedacore/http-add-on/pkgs/container/http-add-on-interceptor, https://github.com/kedacore/http-add-on/pkgs/container/http-add-on-operator, https://github.com/kedacore/http-add-on/pkgs/container/http-add-on-scaler,http://opentelemetry-collector.open-telemetry-system:4318,http://opentelemetry-collector.open-telemetry-system:4318/v1/traces" diff --git a/CHANGELOG.md b/CHANGELOG.md index 0bd7a11d..ac6b7e8f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ This changelog keeps track of work items that have been completed and are ready ### New +- **General**: Add configurable tracing support to the interceptor proxy ([#1021](https://github.com/kedacore/http-add-on/pull/1021)) - **General**: TODO ([#TODO](https://github.com/kedacore/http-add-on/issues/TODO)) ### Improvements diff --git a/config/interceptor/e2e-test/otel/deployment.yaml b/config/interceptor/e2e-test/otel/deployment.yaml index f3712fc5..51540b0f 100644 --- a/config/interceptor/e2e-test/otel/deployment.yaml +++ b/config/interceptor/e2e-test/otel/deployment.yaml @@ -19,3 +19,11 @@ spec: value: "http://opentelemetry-collector.open-telemetry-system:4318" - name: OTEL_METRIC_EXPORT_INTERVAL value: "1" + - name: OTEL_EXPORTER_OTLP_TRACES_ENABLED + value: "true" + - name: OTEL_EXPORTER_OTLP_TRACES_PROTOCOL + value: "http/protobuf" + - name: OTEL_EXPORTER_OTLP_TRACES_ENDPOINT + value: "http://opentelemetry-collector.open-telemetry-system:4318/v1/traces" + - name: OTEL_EXPORTER_OTLP_TRACES_INSECURE + value: "true" diff --git a/docs/operate.md b/docs/operate.md index 691c3a32..c4e91759 100644 --- a/docs/operate.md +++ b/docs/operate.md @@ -23,3 +23,31 @@ If you need to provide any headers such as authentication details in order to ut The interceptor proxy has the ability to run both a HTTP and HTTPS server simultaneously to allow you to scale workloads that use either protocol. By default, the interceptor proxy will only serve over HTTP, but this behavior can be changed by configuring the appropriate environment variables on the deployment. The TLS server can be enabled by setting the environment variable `KEDA_HTTP_PROXY_TLS_ENABLED` to `true` on the interceptor deployment (`false` by default). The TLS server will start on port `8443` by default, but this can be configured by setting `KEDA_HTTP_PROXY_TLS_PORT` to your desired port number. The TLS server will require valid TLS certificates to start, the path to the certificates can be configured via the `KEDA_HTTP_PROXY_TLS_CERT_PATH` and `KEDA_HTTP_PROXY_TLS_KEY_PATH` environment variables (`/certs/tls.crt` and `/certs/tls.key` by default). + +# Configuring tracing for the KEDA HTTP Add-on interceptor proxy + +### Supported Exporters: +* **console** - The console exporter is useful for development and debugging tasks, and is the simplest to set up. +* **http/protobuf** - To send trace data to an OTLP endpoint (like the collector or Jaeger >= v1.35.0) you’ll want to configure an OTLP exporter that sends to your endpoint. +* * **grpc** - To configure exporter to send trace data over gRPC connection to an OTLP endpoint (like the collector or Jaeger >= v1.35.0) you’ll want to configure an OTLP exporter that sends to your endpoint. + +### Configuring tracing with console exporter + +To enable tracing with the console exporter, the `OTEL_EXPORTER_OTLP_TRACES_ENABLED` environment variable should be set to `true` on the interceptor deployment. (`false` by default). +Secondly set `OTEL_EXPORTER_OTLP_TRACES_PROTOCOL` to `console` (`console` by default). Other protocols include (`http/protobuf` and `grpc`). +Finally set `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` to `"http://localhost:4318/v1/traces"` (`"http://localhost:4318/v1/traces"` by default). + + +### Configuring tracing with OTLP exporter +When configured, the interceptor proxy can export metrics to a OTEL HTTP collector. + +To enable tracing with otlp exporter, the `OTEL_EXPORTER_OTLP_TRACES_ENABLED` environment variable should be set to `true` on the interceptor deployment. (`false` by default). +Secondly set `OTEL_EXPORTER_OTLP_TRACES_PROTOCOL` to `otlphttp` (`console` by default). Other protocols include (`http/protobuf` and `grpc`) +Finally set `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` to the collector to send the traces to (e.g. http://opentelemetry-collector.open-telemetry-system:4318/v1/traces) (`"http://localhost:4318/v1/traces"` by default). +NOTE: full path is required to be set including + + +Optional variables +`OTEL_EXPORTER_OTLP_HEADERS` - To pass any extra headers to the spans to utilise your OTEL collector e.g. authentication details (`"key1=value1,key2=value2"`) +`OTEL_EXPORTER_OTLP_TRACES_INSECURE` - To send traces to the tracing via HTTP rather than HTTPS (`false` by default) +`OTEL_EXPORTER_OTLP_TRACES_TIMEOUT` - The batcher timeout in seconds to send batch of data points (`5` by default) diff --git a/go.mod b/go.mod index 7a934175..cf96e435 100644 --- a/go.mod +++ b/go.mod @@ -6,13 +6,18 @@ require ( github.com/go-logr/logr v1.4.2 github.com/google/go-cmp v0.6.0 github.com/hashicorp/go-immutable-radix/v2 v2.1.0 - github.com/kedacore/keda/v2 v2.14.1-0.20240429185716-c55e306af94b + github.com/kedacore/keda/v2 v2.14.0 github.com/kelseyhightower/envconfig v1.4.0 github.com/onsi/ginkgo/v2 v2.19.0 github.com/onsi/gomega v1.33.1 github.com/stretchr/testify v1.9.0 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 + go.opentelemetry.io/contrib/propagators/b3 v1.28.0 go.opentelemetry.io/otel v1.28.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.28.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.22.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.24.0 + go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.24.0 go.opentelemetry.io/otel/sdk v1.28.0 go.uber.org/mock v0.4.0 golang.org/x/sync v0.7.0 @@ -20,7 +25,7 @@ require ( google.golang.org/protobuf v1.34.2 k8s.io/api v0.29.4 k8s.io/apimachinery v0.29.4 - k8s.io/client-go v0.29.4 + k8s.io/client-go v1.5.2 k8s.io/code-generator v0.29.4 k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0 sigs.k8s.io/controller-runtime v0.17.5 @@ -45,6 +50,21 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/blang/semver/v4 v4.0.0 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-task/slim-sprig/v3 v3.0.0 // indirect + github.com/gorilla/websocket v1.5.1 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect + github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect + go.opentelemetry.io/proto/otlp v1.3.1 // indirect + go.uber.org/zap v1.27.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20240701130421-f6361c86f094 // indirect + gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect + k8s.io/component-base v0.29.4 // indirect + k8s.io/gengo v0.0.0-20240129211411-f967bbeff4b4 // indirect +) + +require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/emicklei/go-restful/v3 v3.12.1 // indirect @@ -53,12 +73,10 @@ require ( github.com/expr-lang/expr v1.16.9 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-errors/errors v1.5.1 // indirect - github.com/go-logr/stdr v1.2.2 // indirect github.com/go-logr/zapr v1.3.0 // indirect github.com/go-openapi/jsonpointer v0.21.0 // indirect github.com/go-openapi/jsonreference v0.21.0 // indirect github.com/go-openapi/swag v0.23.0 // indirect - github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect @@ -67,8 +85,6 @@ require ( github.com/google/pprof v0.0.0-20240528025155-186aa0362fba // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect github.com/google/uuid v1.6.0 // indirect - github.com/gorilla/websocket v1.5.1 // indirect - github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/imdario/mergo v0.3.16 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect @@ -80,7 +96,6 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_golang v1.19.1 @@ -90,14 +105,13 @@ require ( github.com/spf13/cobra v1.8.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/xlab/treeprint v1.2.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.50.0 go.opentelemetry.io/otel/metric v1.28.0 go.opentelemetry.io/otel/sdk/metric v1.28.0 - go.opentelemetry.io/otel/trace v1.28.0 // indirect - go.opentelemetry.io/proto/otlp v1.3.1 // indirect + go.opentelemetry.io/otel/trace v1.28.0 go.starlark.net v0.0.0-20231121155337-90ade8b19d09 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.27.0 // indirect golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect golang.org/x/mod v0.18.0 // indirect golang.org/x/net v0.26.0 // indirect @@ -108,15 +122,11 @@ require ( golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20240701130421-f6361c86f094 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect - gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.29.4 // indirect - k8s.io/component-base v0.29.4 // indirect - k8s.io/gengo v0.0.0-20240129211411-f967bbeff4b4 // indirect k8s.io/klog/v2 v2.120.1 // indirect k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect knative.dev/pkg v0.0.0-20240423132823-3c6badc82748 // indirect diff --git a/go.sum b/go.sum index 56fa0654..11f5e26c 100644 --- a/go.sum +++ b/go.sum @@ -21,6 +21,8 @@ github.com/evanphx/json-patch/v5 v5.9.0 h1:kcBlZQbplgElYIlo/n1hJbls2z/1awpXxpRi0 github.com/evanphx/json-patch/v5 v5.9.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ= github.com/expr-lang/expr v1.16.9 h1:WUAzmR0JNI9JCiF0/ewwHB1gmcGw5wW7nWt8gc6PpCI= github.com/expr-lang/expr v1.16.9/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk= @@ -82,8 +84,8 @@ github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8Hm github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= -github.com/kedacore/keda/v2 v2.14.1-0.20240429185716-c55e306af94b h1:FCGepXWVVCrqPDhPFFAuMyeJfgC7bcu1GFgQQsXESVA= -github.com/kedacore/keda/v2 v2.14.1-0.20240429185716-c55e306af94b/go.mod h1:V03Uj1+jxKYAzqa09FSwlEzqj8HfYeXM6yhH4gm4PyA= +github.com/kedacore/keda/v2 v2.14.0 h1:0vxF1cEbEcAVWwbSHzgmanA43Hnnz5oGZZPN9yC7/rg= +github.com/kedacore/keda/v2 v2.14.0/go.mod h1:Gk8Bm9uiiQcUwhS31Aib72y+9K4LvBaMPZuA1n3kKR8= github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8= github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= @@ -147,12 +149,24 @@ github.com/xlab/treeprint v1.2.0 h1:HzHnuAF1plUN2zGlAFHbSQP2qJ0ZAD3XF5XD7OesXRQ= github.com/xlab/treeprint v1.2.0/go.mod h1:gj5Gd3gPdKtR1ikdDK6fnFLdmIS0X30kTTuNd/WEJu0= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw= +go.opentelemetry.io/contrib/propagators/b3 v1.28.0 h1:XR6CFQrQ/ttAYmTBX2loUEFGdk1h17pxYI8828dk/1Y= +go.opentelemetry.io/contrib/propagators/b3 v1.28.0/go.mod h1:DWRkzJONLquRz7OJPh2rRbZ7MugQj62rk7g6HRnEqh0= go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.28.0 h1:aLmmtjRke7LPDQ3lvpFz+kNEH43faFhzW7v8BFIEydg= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.28.0/go.mod h1:TC1pyCt6G9Sjb4bQpShH+P5R53pO6ZuGnHuuln9xMeE= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0 h1:t6wl9SPayj+c7lEIFgm4ooDBZVb01IhLB4InpomhRw8= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0/go.mod h1:iSDOcsnSA5INXzZtwaBPrKp/lWu/V14Dd+llD0oI2EA= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.22.0 h1:H2JFgRcGiyHg7H7bwcwaQJYrNFqCqrbTQ8K4p1OvDu8= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.22.0/go.mod h1:WfCWp1bGoYK8MeULtI15MmQVczfR+bFkk0DF3h06QmQ= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.24.0 h1:Xw8U6u2f8DK2XAkGRFV7BBLENgnTGX9i4rQRxJf+/vs= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.24.0/go.mod h1:6KW1Fm6R/s6Z3PGXwSJN2K4eT6wQB3vXX6CVnYX9NmM= go.opentelemetry.io/otel/exporters/prometheus v0.50.0 h1:2Ewsda6hejmbhGFyUvWZjUThC98Cf8Zy6g0zkIimOng= go.opentelemetry.io/otel/exporters/prometheus v0.50.0/go.mod h1:pMm5PkUo5YwbLiuEf7t2xg4wbP0/eSJrMxIMxKosynY= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.24.0 h1:s0PHtIkN+3xrbDOpt2M8OTG92cWqUESvzh2MxiR5xY8= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.24.0/go.mod h1:hZlFbDbRt++MMPCCfSJfmhkGIWnX1h3XjkfxZUjLrIA= go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q= go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s= go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE= diff --git a/interceptor/config/tracing.go b/interceptor/config/tracing.go new file mode 100644 index 00000000..7fab5b06 --- /dev/null +++ b/interceptor/config/tracing.go @@ -0,0 +1,21 @@ +package config + +import ( + "github.com/kelseyhightower/envconfig" +) + +// Tracing is the configuration for configuring tracing through the interceptor. +type Tracing struct { + // States whether tracing should be enabled, False by default + Enabled bool `envconfig:"OTEL_EXPORTER_OTLP_TRACES_ENABLED" default:"false"` + // Sets what tracing export to use, must be one of: console,http/protobuf, grpc + Exporter string `envconfig:"OTEL_EXPORTER_OTLP_TRACES_PROTOCOL" default:"console"` +} + +// Parse parses standard configs using envconfig and returns a pointer to the +// newly created config. Returns nil and a non-nil error if parsing failed +func MustParseTracing() *Tracing { + ret := new(Tracing) + envconfig.MustProcess("", ret) + return ret +} diff --git a/interceptor/handler/upstream.go b/interceptor/handler/upstream.go index e556be60..c55241df 100644 --- a/interceptor/handler/upstream.go +++ b/interceptor/handler/upstream.go @@ -5,6 +5,12 @@ import ( "net/http" "net/http/httputil" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" + + "github.com/kedacore/http-add-on/interceptor/config" "github.com/kedacore/http-add-on/pkg/util" ) @@ -14,11 +20,13 @@ var ( type Upstream struct { roundTripper http.RoundTripper + tracingCfg *config.Tracing } -func NewUpstream(roundTripper http.RoundTripper) *Upstream { +func NewUpstream(roundTripper http.RoundTripper, tracingCfg *config.Tracing) *Upstream { return &Upstream{ roundTripper: roundTripper, + tracingCfg: tracingCfg, } } @@ -28,6 +36,21 @@ func (uh *Upstream) ServeHTTP(w http.ResponseWriter, r *http.Request) { r = util.RequestWithLoggerWithName(r, "UpstreamHandler") ctx := r.Context() + if uh.tracingCfg.Enabled { + p := otel.GetTextMapPropagator() + ctx = p.Extract(ctx, propagation.HeaderCarrier(r.Header)) + + p.Inject(ctx, propagation.HeaderCarrier(w.Header())) + + span := trace.SpanFromContext(ctx) + defer span.End() + + serviceValAttr := attribute.String("service", "keda-http-interceptor-proxy-upstream") + coldStartValAttr := attribute.String("cold-start", w.Header().Get("X-KEDA-HTTP-Cold-Start")) + + span.SetAttributes(serviceValAttr, coldStartValAttr) + } + stream := util.StreamFromContext(ctx) if stream == nil { sh := NewStatic(http.StatusInternalServerError, errNilStream) diff --git a/interceptor/handler/upstream_test.go b/interceptor/handler/upstream_test.go index d3f9da48..48df8355 100644 --- a/interceptor/handler/upstream_test.go +++ b/interceptor/handler/upstream_test.go @@ -1,6 +1,12 @@ package handler import ( + "context" + "fmt" + "github.com/kedacore/http-add-on/interceptor/tracing" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/sdk/trace/tracetest" "log" "net/http" "net/http/httptest" @@ -14,8 +20,223 @@ import ( "github.com/kedacore/http-add-on/interceptor/config" kedanet "github.com/kedacore/http-add-on/pkg/net" "github.com/kedacore/http-add-on/pkg/util" + "go.opentelemetry.io/otel/sdk/trace" ) +const ( + traceId = "a8419b25ec2051e5" + fullW3CLengthTraceId = "29b3290dc5a93f2618b17502ccb2a728" + spanId = "97337bce1bc3e368" + parentSpanId = "2890e7e08fc6592b" + sampled = "1" + w3cPadding = "0000000000000000" +) + +func TestB3MultiPropagation(t *testing.T) { + + // Given + r := require.New(t) + + microservice, microserviceURL, closeServer := startMicroservice(t) + defer closeServer() + + exporter, tracerProvider := setupOTelSDKForTesting() + instrumentedServeHTTP := withAutoInstrumentation(serveHTTP) + + request, responseWriter := createRequestAndResponse("GET", microserviceURL) + + request.Header.Set("X-B3-Traceid", traceId) + request.Header.Set("X-B3-Spanid", spanId) + request.Header.Set("X-B3-Parentspanid", parentSpanId) + request.Header.Set("X-B3-Sampled", sampled) + + defer func(traceProvider *trace.TracerProvider, ctx context.Context) { + _ = traceProvider.Shutdown(ctx) + }(tracerProvider, request.Context()) + + // When + instrumentedServeHTTP.ServeHTTP(responseWriter, request) + + // Then + receivedRequest := microservice.IncomingRequests()[0] + receivedHeaders := receivedRequest.Header + + r.Equal(receivedHeaders.Get("X-B3-Parentspanid"), parentSpanId) + r.Equal(receivedHeaders.Get("X-B3-Traceid"), traceId) + r.Equal(receivedHeaders.Get("X-B3-Spanid"), spanId) + r.Equal(receivedHeaders.Get("X-B3-Sampled"), sampled) + + r.NotContains(receivedHeaders, "Traceparent") + r.NotContains(receivedHeaders, "B3") + r.NotContains(receivedHeaders, "b3") + + _ = tracerProvider.ForceFlush(request.Context()) + + exportedSpans := exporter.GetSpans() + if len(exportedSpans) != 1 { + t.Fatalf("Expected 1 Span, got %d", len(exportedSpans)) + } + sc := exportedSpans[0].SpanContext + r.Equal(w3cPadding+traceId, sc.TraceID().String()) + r.NotEqual(sc.SpanID().String(), spanId) +} + +func TestW3CAndB3MultiPropagation(t *testing.T) { + + // Given + r := require.New(t) + + microservice, microserviceURL, closeServer := startMicroservice(t) + defer closeServer() + + exporter, tracerProvider := setupOTelSDKForTesting() + instrumentedServeHTTP := withAutoInstrumentation(serveHTTP) + + request, responseWriter := createRequestAndResponse("GET", microserviceURL) + + request.Header.Set("X-B3-Traceid", traceId) + request.Header.Set("X-B3-Spanid", spanId) + request.Header.Set("X-B3-Parentspanid", parentSpanId) + request.Header.Set("X-B3-Sampled", sampled) + request.Header.Set("Traceparent", w3cPadding+traceId) + + defer func(traceProvider *trace.TracerProvider, ctx context.Context) { + _ = traceProvider.Shutdown(ctx) + }(tracerProvider, request.Context()) + + // When + instrumentedServeHTTP.ServeHTTP(responseWriter, request) + + // Then + receivedRequest := microservice.IncomingRequests()[0] + receivedHeaders := receivedRequest.Header + + r.Equal(receivedHeaders.Get("X-B3-Parentspanid"), parentSpanId) + r.Equal(receivedHeaders.Get("X-B3-Traceid"), traceId) + r.Equal(receivedHeaders.Get("X-B3-Spanid"), spanId) + r.Equal(receivedHeaders.Get("X-B3-Sampled"), sampled) + r.Equal(receivedHeaders.Get("Traceparent"), w3cPadding+traceId) + + r.NotContains(receivedHeaders, "B3") + r.NotContains(receivedHeaders, "b3") + + _ = tracerProvider.ForceFlush(request.Context()) + + exportedSpans := exporter.GetSpans() + if len(exportedSpans) != 1 { + t.Fatalf("Expected 1 Span, got %d", len(exportedSpans)) + } + sc := exportedSpans[0].SpanContext + r.Equal(w3cPadding+traceId, sc.TraceID().String()) + r.NotEqual(sc.SpanID().String(), spanId) +} + +func TestW3CPropagation(t *testing.T) { + + // Given + r := require.New(t) + + microservice, microserviceURL, closeServer := startMicroservice(t) + defer closeServer() + + exporter, tracerProvider := setupOTelSDKForTesting() + instrumentedServeHTTP := withAutoInstrumentation(serveHTTP) + + request, responseWriter := createRequestAndResponse("GET", microserviceURL) + + traceParent := fmt.Sprintf("00-%s-%s-01", fullW3CLengthTraceId, spanId) + request.Header.Set("Traceparent", traceParent) + + defer func(traceProvider *trace.TracerProvider, ctx context.Context) { + _ = traceProvider.Shutdown(ctx) + }(tracerProvider, request.Context()) + + // When + instrumentedServeHTTP.ServeHTTP(responseWriter, request) + + // Then + receivedRequest := microservice.IncomingRequests()[0] + receivedHeaders := receivedRequest.Header + + r.Equal(receivedHeaders.Get("Traceparent"), traceParent) + + r.NotContains(receivedHeaders, "B3") + r.NotContains(receivedHeaders, "b3") + r.NotContains(receivedHeaders, "X-B3-Parentspanid") + r.NotContains(receivedHeaders, "X-B3-Traceid") + r.NotContains(receivedHeaders, "X-B3-Spanid") + r.NotContains(receivedHeaders, "X-B3-Sampled") + + _ = tracerProvider.ForceFlush(request.Context()) + + exportedSpans := exporter.GetSpans() + if len(exportedSpans) != 1 { + t.Fatalf("Expected 1 Span, got %d", len(exportedSpans)) + } + sc := exportedSpans[0].SpanContext + r.Equal(fullW3CLengthTraceId, sc.TraceID().String()) + r.Equal(true, sc.IsSampled()) + r.NotEqual(sc.SpanID().String(), spanId) +} + +func TestPropagationWhenNoHeaders(t *testing.T) { + + // Given + r := require.New(t) + + microservice, microserviceURL, closeServer := startMicroservice(t) + defer closeServer() + + exporter, tracerProvider := setupOTelSDKForTesting() + instrumentedServeHTTP := withAutoInstrumentation(serveHTTP) + + request, responseWriter := createRequestAndResponse("GET", microserviceURL) + + defer func(traceProvider *trace.TracerProvider, ctx context.Context) { + _ = traceProvider.Shutdown(ctx) + }(tracerProvider, request.Context()) + + // When + instrumentedServeHTTP.ServeHTTP(responseWriter, request) + + // Then + receivedRequest := microservice.IncomingRequests()[0] + receivedHeaders := receivedRequest.Header + + r.NotContains(receivedHeaders, "Traceparent") + r.NotContains(receivedHeaders, "B3") + r.NotContains(receivedHeaders, "b3") + r.NotContains(receivedHeaders, "X-B3-Parentspanid") + r.NotContains(receivedHeaders, "X-B3-Traceid") + r.NotContains(receivedHeaders, "X-B3-Spanid") + r.NotContains(receivedHeaders, "X-B3-Sampled") + + _ = tracerProvider.ForceFlush(request.Context()) + + exportedSpans := exporter.GetSpans() + if len(exportedSpans) != 1 { + t.Fatalf("Expected 1 Span, got %d", len(exportedSpans)) + } + sc := exportedSpans[0].SpanContext + r.NotEmpty(sc.SpanID()) + r.NotEmpty(sc.TraceID()) + + hasServiceAttribute := false + hasColdStartAttribute := false + for _, attribute := range exportedSpans[0].Attributes { + + if attribute.Key == "service" && attribute.Value.AsString() == "keda-http-interceptor-proxy-upstream" { + hasServiceAttribute = true + } + + if attribute.Key == "cold-start" { + hasColdStartAttribute = true + } + } + r.True(hasServiceAttribute) + r.True(hasColdStartAttribute) +} + func TestForwarderSuccess(t *testing.T) { r := require.New(t) // this channel will be closed after the request was received, but @@ -43,7 +264,7 @@ func TestForwarderSuccess(t *testing.T) { timeouts := defaultTimeouts() dialCtxFunc := retryDialContextFunc(timeouts, timeouts.DefaultBackoff()) rt := newRoundTripper(dialCtxFunc, timeouts.ResponseHeader) - uh := NewUpstream(rt) + uh := NewUpstream(rt, &config.Tracing{}) uh.ServeHTTP(res, req) r.True( @@ -88,7 +309,7 @@ func TestForwarderHeaderTimeout(t *testing.T) { r.NoError(err) req = util.RequestWithStream(req, originURL) rt := newRoundTripper(dialCtxFunc, timeouts.ResponseHeader) - uh := NewUpstream(rt) + uh := NewUpstream(rt, &config.Tracing{}) uh.ServeHTTP(res, req) forwardedRequests := hdl.IncomingRequests() @@ -138,7 +359,7 @@ func TestForwarderWaitsForSlowOrigin(t *testing.T) { r.NoError(err) req = util.RequestWithStream(req, originURL) rt := newRoundTripper(dialCtxFunc, timeouts.ResponseHeader) - uh := NewUpstream(rt) + uh := NewUpstream(rt, &config.Tracing{}) uh.ServeHTTP(res, req) // wait for the goroutine above to finish, with a little cusion ensureSignalBeforeTimeout(originWaitCh, originDelay*2) @@ -161,7 +382,7 @@ func TestForwarderConnectionRetryAndTimeout(t *testing.T) { r.NoError(err) req = util.RequestWithStream(req, noSuchURL) rt := newRoundTripper(dialCtxFunc, timeouts.ResponseHeader) - uh := NewUpstream(rt) + uh := NewUpstream(rt, &config.Tracing{}) start := time.Now() uh.ServeHTTP(res, req) @@ -217,7 +438,7 @@ func TestForwardRequestRedirectAndHeaders(t *testing.T) { r.NoError(err) req = util.RequestWithStream(req, srvURL) rt := newRoundTripper(dialCtxFunc, timeouts.ResponseHeader) - uh := NewUpstream(rt) + uh := NewUpstream(rt, &config.Tracing{}) uh.ServeHTTP(res, req) r.Equal(301, res.Code) r.Equal("abc123.com", res.Header().Get("Location")) @@ -281,3 +502,56 @@ func ensureSignalBeforeTimeout(signalCh <-chan struct{}, timeout time.Duration) return true } } + +func serveHTTP(w http.ResponseWriter, r *http.Request) { + timeouts := defaultTimeouts() + dialCtxFunc := retryDialContextFunc(timeouts, timeouts.DefaultBackoff()) + rt := newRoundTripper(dialCtxFunc, timeouts.ResponseHeader) + upstream := NewUpstream(rt, &config.Tracing{Enabled: true}) + + upstream.ServeHTTP(w, r) +} + +func setupOTelSDKForTesting() (*tracetest.InMemoryExporter, *trace.TracerProvider) { + exporter := tracetest.NewInMemoryExporter() + traceProvider := trace.NewTracerProvider(trace.WithBatcher(exporter, trace.WithBatchTimeout(time.Second))) + otel.SetTracerProvider(traceProvider) + prop := tracing.NewPropagator() + otel.SetTextMapPropagator(prop) + return exporter, traceProvider +} + +func startMicroservice(t *testing.T) (*kedanet.TestHTTPHandlerWrapper, *url.URL, func()) { + assert := require.New(t) + requestReceiveChannel := make(chan struct{}) + + const respCode = 200 + const respBody = "Success Response" + microservice := kedanet.NewTestHTTPHandlerWrapper( + http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + close(requestReceiveChannel) + w.WriteHeader(respCode) + _, err := w.Write([]byte(respBody)) + assert.NoError(err) + }), + ) + server := httptest.NewServer(microservice) + + url, err := url.Parse(server.URL) + assert.NoError(err) + + return microservice, url, func() { + server.Close() + } +} + +func createRequestAndResponse(method string, url *url.URL) (*http.Request, http.ResponseWriter) { + ctx := util.ContextWithStream(context.Background(), url) + request, _ := http.NewRequestWithContext(ctx, method, url.String(), nil) + recorder := httptest.NewRecorder() + return request, recorder +} + +func withAutoInstrumentation(sut func(w http.ResponseWriter, r *http.Request)) http.Handler { + return otelhttp.NewHandler(http.HandlerFunc(sut), "SystemUnderTest") +} diff --git a/interceptor/main.go b/interceptor/main.go index 2d98776c..7c1e9caa 100644 --- a/interceptor/main.go +++ b/interceptor/main.go @@ -9,10 +9,12 @@ import ( "fmt" "net/http" "os" + "runtime" "time" "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus/promhttp" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "golang.org/x/sync/errgroup" "k8s.io/client-go/kubernetes" ctrl "sigs.k8s.io/controller-runtime" @@ -22,6 +24,7 @@ import ( "github.com/kedacore/http-add-on/interceptor/handler" "github.com/kedacore/http-add-on/interceptor/metrics" "github.com/kedacore/http-add-on/interceptor/middleware" + "github.com/kedacore/http-add-on/interceptor/tracing" clientset "github.com/kedacore/http-add-on/operator/generated/clientset/versioned" informers "github.com/kedacore/http-add-on/operator/generated/informers/externalversions" "github.com/kedacore/http-add-on/pkg/build" @@ -41,9 +44,11 @@ var ( // +kubebuilder:rbac:groups="",resources=endpoints,verbs=get;list;watch func main() { + defer os.Exit(1) timeoutCfg := config.MustParseTimeouts() servingCfg := config.MustParseServing() metricsCfg := config.MustParseMetrics() + tracingCfg := config.MustParseTracing() opts := zap.Options{ Development: true, @@ -55,7 +60,7 @@ func main() { if err := config.Validate(servingCfg, *timeoutCfg, ctrl.Log); err != nil { setupLog.Error(err, "invalid configuration") - os.Exit(1) + runtime.Goexit() } setupLog.Info( @@ -80,7 +85,7 @@ func main() { cl, err := kubernetes.NewForConfig(cfg) if err != nil { setupLog.Error(err, "creating new Kubernetes ClientSet") - os.Exit(1) + runtime.Goexit() } endpointsCache := k8s.NewInformerBackedEndpointsCache( ctrl.Log, @@ -89,14 +94,14 @@ func main() { ) if err != nil { setupLog.Error(err, "creating new endpoints cache") - os.Exit(1) + runtime.Goexit() } waitFunc := newWorkloadReplicasForwardWaitFunc(ctrl.Log, endpointsCache) httpCl, err := clientset.NewForConfig(cfg) if err != nil { setupLog.Error(err, "creating new HTTP ClientSet") - os.Exit(1) + runtime.Goexit() } queues := queue.NewMemory() @@ -105,7 +110,7 @@ func main() { routingTable, err := routing.NewTable(sharedInformerFactory, servingCfg.WatchNamespace, queues) if err != nil { setupLog.Error(err, "fetching routing table") - os.Exit(1) + runtime.Goexit() } setupLog.Info("Interceptor starting") @@ -115,6 +120,18 @@ func main() { eg, ctx := errgroup.WithContext(ctx) + if tracingCfg.Enabled { + shutdown, err := tracing.SetupOTelSDK(ctx, tracingCfg) + + if err != nil { + setupLog.Error(err, "Error setting up tracer") + } + + defer func() { + err = errors.Join(err, shutdown(context.Background())) + }() + } + // start the endpoints cache updater eg.Go(func() error { setupLog.Info("starting the endpoints cache") @@ -173,7 +190,7 @@ func main() { setupLog.Info("starting the proxy server with TLS enabled", "port", proxyTLSPort) - if err := runProxyServer(ctx, ctrl.Log, queues, waitFunc, routingTable, timeoutCfg, proxyTLSPort, proxyTLSEnabled, proxyTLSConfig); !util.IsIgnoredErr(err) { + if err := runProxyServer(ctx, ctrl.Log, queues, waitFunc, routingTable, timeoutCfg, proxyTLSPort, proxyTLSEnabled, proxyTLSConfig, tracingCfg); !util.IsIgnoredErr(err) { setupLog.Error(err, "tls proxy server failed") return err } @@ -185,7 +202,7 @@ func main() { eg.Go(func() error { setupLog.Info("starting the proxy server with TLS disabled", "port", proxyPort) - if err := runProxyServer(ctx, ctrl.Log, queues, waitFunc, routingTable, timeoutCfg, proxyPort, false, nil); !util.IsIgnoredErr(err) { + if err := runProxyServer(ctx, ctrl.Log, queues, waitFunc, routingTable, timeoutCfg, proxyPort, false, nil, tracingCfg); !util.IsIgnoredErr(err) { setupLog.Error(err, "proxy server failed") return err } @@ -197,7 +214,7 @@ func main() { if err := eg.Wait(); err != nil && !errors.Is(err, context.Canceled) { setupLog.Error(err, "fatal error") - os.Exit(1) + runtime.Goexit() } setupLog.Info("Bye!") @@ -242,6 +259,7 @@ func runProxyServer( port int, tlsEnabled bool, tlsConfig map[string]string, + tracingConfig *config.Tracing, ) error { dialer := kedanet.NewNetDialer(timeouts.Connect, timeouts.KeepAlive) dialContextFunc := kedanet.DialContextWithRetry(dialer, timeouts.DefaultBackoff()) @@ -256,7 +274,7 @@ func runProxyServer( caCert, err := os.ReadFile(tlsConfig["certificatePath"]) if err != nil { logger.Error(fmt.Errorf("error reading file from TLSCertPath"), "error", err) - os.Exit(1) + runtime.Goexit() } caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) @@ -264,7 +282,7 @@ func runProxyServer( if err != nil { logger.Error(fmt.Errorf("error creating TLS configuration for proxy server"), "error", err) - os.Exit(1) + runtime.Goexit() } tlsCfg.RootCAs = caCertPool @@ -278,6 +296,7 @@ func runProxyServer( waitFunc, newForwardingConfigFromTimeouts(timeouts), &tlsCfg, + tracingConfig, ) upstreamHandler = middleware.NewCountingMiddleware( q, @@ -291,6 +310,11 @@ func runProxyServer( upstreamHandler, tlsEnabled, ) + + if tracingConfig.Enabled { + rootHandler = otelhttp.NewHandler(rootHandler, "keda-http-interceptor") + } + rootHandler = middleware.NewLogging( logger, rootHandler, diff --git a/interceptor/main_test.go b/interceptor/main_test.go index 3a08be53..6421d350 100644 --- a/interceptor/main_test.go +++ b/interceptor/main_test.go @@ -16,6 +16,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/kedacore/http-add-on/interceptor/config" + "github.com/kedacore/http-add-on/interceptor/tracing" "github.com/kedacore/http-add-on/pkg/k8s" kedanet "github.com/kedacore/http-add-on/pkg/net" "github.com/kedacore/http-add-on/pkg/queue" @@ -68,6 +69,15 @@ func TestRunProxyServerCountMiddleware(t *testing.T) { <-waiterCh return false, nil } + + tracingCfg := config.Tracing{Enabled: true, Exporter: "otlphttp"} + + _, err = tracing.SetupOTelSDK(ctx, &tracingCfg) + + if err != nil { + fmt.Println(err, "Error setting up tracer") + } + g.Go(func() error { return runProxyServer( ctx, @@ -79,6 +89,7 @@ func TestRunProxyServerCountMiddleware(t *testing.T) { port, false, map[string]string{}, + &tracingCfg, ) }) // wait for server to start @@ -107,6 +118,10 @@ func TestRunProxyServerCountMiddleware(t *testing.T) { resp.StatusCode, ) } + if _, ok := resp.Header["Traceparent"]; !ok { + return fmt.Errorf("expected Traceparent header to exist, but the header wasn't found") + } + if resp.Header.Get("X-KEDA-HTTP-Cold-Start") != "false" { return fmt.Errorf("expected X-KEDA-HTTP-Cold-Start false, but got %s", resp.Header.Get("X-KEDA-HTTP-Cold-Start")) } @@ -199,6 +214,7 @@ func TestRunProxyServerWithTLSCountMiddleware(t *testing.T) { <-waiterCh return false, nil } + tracingCfg := config.Tracing{Enabled: true, Exporter: "otlphttp"} g.Go(func() error { return runProxyServer( @@ -211,6 +227,7 @@ func TestRunProxyServerWithTLSCountMiddleware(t *testing.T) { port, true, map[string]string{"certificatePath": "../certs/tls.crt", "keyPath": "../certs/tls.key"}, + &tracingCfg, ) }) diff --git a/interceptor/proxy_handlers.go b/interceptor/proxy_handlers.go index e39cf6d1..ff06a16f 100644 --- a/interceptor/proxy_handlers.go +++ b/interceptor/proxy_handlers.go @@ -9,6 +9,7 @@ import ( "time" "github.com/go-logr/logr" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "github.com/kedacore/http-add-on/interceptor/config" "github.com/kedacore/http-add-on/interceptor/handler" @@ -50,6 +51,7 @@ func newForwardingHandler( waitFunc forwardWaitFunc, fwdCfg forwardingConfig, tlsCfg *tls.Config, + tracingCfg *config.Tracing, ) http.Handler { roundTripper := &http.Transport{ Proxy: http.ProxyFromEnvironment, @@ -63,10 +65,11 @@ func newForwardingHandler( TLSClientConfig: tlsCfg, } return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var uh *handler.Upstream ctx := r.Context() httpso := util.HTTPSOFromContext(ctx) - waitFuncCtx, done := context.WithTimeout(r.Context(), fwdCfg.waitTimeout) + waitFuncCtx, done := context.WithTimeout(ctx, fwdCfg.waitTimeout) defer done() isColdStart, err := waitFunc( waitFuncCtx, @@ -83,7 +86,11 @@ func newForwardingHandler( } w.Header().Add("X-KEDA-HTTP-Cold-Start", strconv.FormatBool(isColdStart)) - uh := handler.NewUpstream(roundTripper) + if tracingCfg.Enabled { + uh = handler.NewUpstream(otelhttp.NewTransport(roundTripper), tracingCfg) + } else { + uh = handler.NewUpstream(roundTripper, &config.Tracing{}) + } uh.ServeHTTP(w, r) }) } diff --git a/interceptor/proxy_handlers_integration_test.go b/interceptor/proxy_handlers_integration_test.go index 6ae02691..147633b0 100644 --- a/interceptor/proxy_handlers_integration_test.go +++ b/interceptor/proxy_handlers_integration_test.go @@ -21,6 +21,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" + "github.com/kedacore/http-add-on/interceptor/config" "github.com/kedacore/http-add-on/interceptor/middleware" "github.com/kedacore/http-add-on/pkg/k8s" kedanet "github.com/kedacore/http-add-on/pkg/net" @@ -307,7 +308,8 @@ func newHarness( waitTimeout: activeEndpointsTimeout, respHeaderTimeout: time.Second, }, - &tls.Config{}), + &tls.Config{}, + &config.Tracing{}), false, ) diff --git a/interceptor/proxy_handlers_test.go b/interceptor/proxy_handlers_test.go index d4a5e41e..b3321423 100644 --- a/interceptor/proxy_handlers_test.go +++ b/interceptor/proxy_handlers_test.go @@ -78,6 +78,7 @@ func TestImmediatelySuccessfulProxy(t *testing.T) { respHeaderTimeout: timeouts.ResponseHeader, }, &tls.Config{}, + &config.Tracing{}, ) const path = "/testfwd" res, req, err := reqAndRes(path) @@ -129,6 +130,7 @@ func TestImmediatelySuccessfulProxyTLS(t *testing.T) { respHeaderTimeout: timeouts.ResponseHeader, }, &TestTLSConfig, + &config.Tracing{}, ) const path = "/testfwd" res, req, err := reqAndRes(path) @@ -174,6 +176,7 @@ func TestWaitFailedConnection(t *testing.T) { respHeaderTimeout: timeouts.ResponseHeader, }, &tls.Config{}, + &config.Tracing{}, ) stream, err := url.Parse("http://0.0.0.0:0") r.NoError(err) @@ -224,6 +227,7 @@ func TestWaitFailedConnectionTLS(t *testing.T) { respHeaderTimeout: timeouts.ResponseHeader, }, &TestTLSConfig, + &config.Tracing{}, ) stream, err := url.Parse("http://0.0.0.0:0") r.NoError(err) @@ -275,6 +279,7 @@ func TestTimesOutOnWaitFunc(t *testing.T) { respHeaderTimeout: timeouts.ResponseHeader, }, &tls.Config{}, + &config.Tracing{}, ) stream, err := url.Parse("http://1.1.1.1") r.NoError(err) @@ -347,6 +352,7 @@ func TestTimesOutOnWaitFuncTLS(t *testing.T) { respHeaderTimeout: timeouts.ResponseHeader, }, &TestTLSConfig, + &config.Tracing{}, ) stream, err := url.Parse("http://1.1.1.1") r.NoError(err) @@ -430,6 +436,7 @@ func TestWaitsForWaitFunc(t *testing.T) { respHeaderTimeout: timeouts.ResponseHeader, }, &tls.Config{}, + &config.Tracing{}, ) const path = "/testfwd" res, req, err := reqAndRes(path) @@ -496,6 +503,7 @@ func TestWaitsForWaitFuncTLS(t *testing.T) { respHeaderTimeout: timeouts.ResponseHeader, }, &TestTLSConfig, + &config.Tracing{}, ) const path = "/testfwd" res, req, err := reqAndRes(path) @@ -566,6 +574,7 @@ func TestWaitHeaderTimeout(t *testing.T) { respHeaderTimeout: timeouts.ResponseHeader, }, &tls.Config{}, + &config.Tracing{}, ) const path = "/testfwd" res, req, err := reqAndRes(path) @@ -624,6 +633,7 @@ func TestWaitHeaderTimeoutTLS(t *testing.T) { respHeaderTimeout: timeouts.ResponseHeader, }, &TestTLSConfig, + &config.Tracing{}, ) const path = "/testfwd" res, req, err := reqAndRes(path) diff --git a/interceptor/tracing/tracing.go b/interceptor/tracing/tracing.go new file mode 100644 index 00000000..aea9a12b --- /dev/null +++ b/interceptor/tracing/tracing.go @@ -0,0 +1,102 @@ +package tracing + +import ( + "context" + "errors" + "go.opentelemetry.io/contrib/propagators/b3" + "strings" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.26.0" + + "github.com/kedacore/http-add-on/interceptor/config" +) + +var serviceName = "keda-http-interceptor" + +func SetupOTelSDK(ctx context.Context, tCfg *config.Tracing) (shutdown func(context.Context) error, err error) { + var shutdownFuncs []func(context.Context) error + + // shutdown calls cleanup functions registered via shutdownFuncs. + // The errors from the calls are joined. + // Each registered cleanup will be invoked once. + shutdown = func(ctx context.Context) error { + var err error + for _, fn := range shutdownFuncs { + err = errors.Join(err, fn(ctx)) + } + shutdownFuncs = nil + return err + } + + handleErr := func(inErr error) { + err = errors.Join(inErr, shutdown(ctx)) + } + + res, err := newResource(serviceName) + if err != nil { + handleErr(err) + return + } + + prop := NewPropagator() + otel.SetTextMapPropagator(prop) + + tracerProvider, err := newTraceProvider(ctx, res, tCfg) + if err != nil { + handleErr(err) + return + } + shutdownFuncs = append(shutdownFuncs, tracerProvider.Shutdown) + otel.SetTracerProvider(tracerProvider) + + return +} + +func newResource(serviceName string) (*resource.Resource, error) { + return resource.Merge(resource.Default(), + resource.NewWithAttributes(semconv.SchemaURL, + semconv.ServiceName(serviceName), + )) +} + +func NewPropagator() propagation.TextMapPropagator { + return propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + b3.New(), + ) +} + +func newTraceProvider(ctx context.Context, res *resource.Resource, tCfg *config.Tracing) (*trace.TracerProvider, error) { + traceExporter, err := newExporter(ctx, tCfg) + if err != nil { + return nil, err + } + + traceProvider := trace.NewTracerProvider( + trace.WithSampler(trace.AlwaysSample()), + trace.WithBatcher(traceExporter), + trace.WithResource(res), + ) + return traceProvider, nil +} + +func newExporter(ctx context.Context, tCfg *config.Tracing) (trace.SpanExporter, error) { + switch strings.ToLower(tCfg.Exporter) { + case "console": + return stdouttrace.New() + case "http/protobuf": + return otlptracehttp.New(ctx) + case "grpc": + return otlptracegrpc.New(ctx) + default: + return nil, errors.New("no valid tracing exporter defined") + } +} diff --git a/interceptor/tracing/tracing_test.go b/interceptor/tracing/tracing_test.go new file mode 100644 index 00000000..a21f53e2 --- /dev/null +++ b/interceptor/tracing/tracing_test.go @@ -0,0 +1,18 @@ +package tracing + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/kedacore/http-add-on/interceptor/config" +) + +func TestTracingConfig(t *testing.T) { + tracingCfg := config.MustParseTracing() + tracingCfg.Enabled = true + + // check defaults are set correctly + assert.Equal(t, "console", tracingCfg.Exporter) + assert.Equal(t, true, tracingCfg.Enabled) +} diff --git a/tests/checks/interceptor_otel_tracing/interceptor_otel_tracing_test.go b/tests/checks/interceptor_otel_tracing/interceptor_otel_tracing_test.go new file mode 100644 index 00000000..170d49fb --- /dev/null +++ b/tests/checks/interceptor_otel_tracing/interceptor_otel_tracing_test.go @@ -0,0 +1,328 @@ +//go:build e2e +// +build e2e + +package interceptor_otel_tracing_test + +import ( + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/http-add-on/tests/helper" +) + +const ( + testName = "interceptor-otel-tracing-test" +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + serviceName = fmt.Sprintf("%s-service", testName) + clientName = fmt.Sprintf("%s-client", testName) + httpScaledObjectName = fmt.Sprintf("%s-http-so", testName) + host = testName + minReplicaCount = 0 + maxReplicaCount = 1 + otelCollectorZipKinURL = "http://zipkin.zipkin:9411/api/v2/traces?serviceName=keda-http-interceptor\\&annotationQuery=net.host.name+and+net.host.name\\%3Dinterceptor-otel-tracing-test" + traces = Trace{} +) + +type templateData struct { + TestNamespace string + DeploymentName string + ServiceName string + ClientName string + HTTPScaledObjectName string + Host string + MinReplicas int + MaxReplicas int +} + +type Trace [][]struct { + TraceID string `json:"traceId"` + ParentID string `json:"parentId"` + ID string `json:"id"` + Kind string `json:"kind"` + Name string `json:"name"` + Timestamp int `json:"timestamp"` + Duration int `json:"duration"` + LocalEndpoint struct { + ServiceName string `json:"serviceName"` + } `json:"localEndpoint"` + Tags struct { + HTTPFlavor string `json:"http.flavor"` + HTTPMethod string `json:"http.method"` + HTTPResponseContentLength string `json:"http.response_content_length"` + HTTPStatusCode string `json:"http.status_code"` + HTTPURL string `json:"http.url"` + HTTPUserAgent string `json:"http.user_agent"` + NetPeerName string `json:"net.peer.name"` + OtelLibraryName string `json:"otel.library.name"` + OtelLibraryVersion string `json:"otel.library.version"` + TelemetrySdkLanguage string `json:"telemetry.sdk.language"` + TelemetrySdkName string `json:"telemetry.sdk.name"` + TelemetrySdkVersion string `json:"telemetry.sdk.version"` + } `json:"tags"` +} + +const ( + serviceTemplate = ` +apiVersion: v1 +kind: Service +metadata: + name: {{.ServiceName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + ports: + - port: 8080 + targetPort: http + protocol: TCP + name: http + selector: + app: {{.DeploymentName}} +` + + deploymentTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + replicas: 0 + selector: + matchLabels: + app: {{.DeploymentName}} + template: + metadata: + labels: + app: {{.DeploymentName}} + spec: + containers: + - name: {{.DeploymentName}} + image: registry.k8s.io/e2e-test-images/agnhost:2.45 + args: + - netexec + ports: + - name: http + containerPort: 8080 + protocol: TCP + readinessProbe: + httpGet: + path: / + port: http +` + + loadJobTemplate = ` +apiVersion: batch/v1 +kind: Job +metadata: + name: generate-request + namespace: {{.TestNamespace}} +spec: + template: + spec: + containers: + - name: curl-client + image: curlimages/curl + imagePullPolicy: Always + command: ["curl", "-H", "Host: {{.Host}}", "keda-http-add-on-interceptor-proxy.keda:8080"] + restartPolicy: Never + activeDeadlineSeconds: 600 + backoffLimit: 5 +` + + httpScaledObjectTemplate = ` +kind: HTTPScaledObject +apiVersion: http.keda.sh/v1alpha1 +metadata: + name: {{.HTTPScaledObjectName}} + namespace: {{.TestNamespace}} +spec: + hosts: + - {{.Host}} + targetPendingRequests: 100 + scaledownPeriod: 10 + scaleTargetRef: + name: {{.DeploymentName}} + service: {{.ServiceName}} + port: 8080 + replicas: + min: {{ .MinReplicas }} + max: {{ .MaxReplicas }} +` + + clientTemplate = ` +apiVersion: v1 +kind: Pod +metadata: + name: {{.ClientName}} + namespace: {{.TestNamespace}} +spec: + containers: + - name: {{.ClientName}} + image: curlimages/curl + command: + - sh + - -c + - "exec tail -f /dev/null"` + + zipkinTemplate = ` +apiVersion: v1 +kind: Namespace +metadata: + creationTimestamp: null + name: zipkin +spec: {} + +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + creationTimestamp: null + labels: + app: zipkin + name: zipkin + namespace: zipkin +spec: + replicas: 1 + selector: + matchLabels: + app: zipkin + strategy: {} + template: + metadata: + creationTimestamp: null + labels: + app: zipkin + spec: + containers: + - image: openzipkin/zipkin + name: zipkin + env: + - name: "JAVA_OPTS" + value: "-Xmx500M" + resources: + limits: + memory: "700M" + requests: + memory: "500M" +--- +apiVersion: v1 +kind: Service +metadata: + creationTimestamp: null + labels: + app: zipkin + name: zipkin + namespace: zipkin +spec: + ports: + - port: 9411 + protocol: TCP + targetPort: 9411 + selector: + app: zipkin + type: ClusterIP +status: + loadBalancer: {} +` +) + +func TestTraceGeneration(t *testing.T) { + // setup + t.Log("--- setting up ---") + // Create kubernetes resources + kc := GetKubernetesClient(t) + data, templates := getTemplateData() + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 6, 10), + "replica count should be %d after 1 minutes", minReplicaCount) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, "zipkin", "zipkin", 1, 12, 10), + "zipkin replica count should be %d after 1 minutes", 1) + + time.Sleep(5 * time.Second) + + // Send a test request to the interceptor + sendLoad(t, kc, data) + + // setting sleep for 5 sec so traces are sent over + time.Sleep(5 * time.Second) + + // Fetch metrics and validate them + traces = fetchAndParseZipkinTraces(t, fmt.Sprintf("curl %s", otelCollectorZipKinURL)) + assert.GreaterOrEqual(t, len(traces), 1) + + traceStatus := getTracesStatus(traces) + assert.EqualValues(t, "200", traceStatus) + + // cleanup + DeleteKubernetesResources(t, testNamespace, data, templates) +} + +func sendLoad(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- sending load ---") + + KubectlApplyWithTemplate(t, data, "loadJobTemplate", loadJobTemplate) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 6, 10), + "replica count should be %d after 1 minutes", maxReplicaCount) +} + +func fetchAndParseZipkinTraces(t *testing.T, cmd string) Trace { + out, _, err := ExecCommandOnSpecificPod(t, clientName, testNamespace, cmd) + assert.NoErrorf(t, err, "cannot execute command - %s", err) + + var traces Trace + + e := json.Unmarshal([]byte(out), &traces) + if e != nil { + assert.NoErrorf(t, err, "JSON decode error! - %s", e) + return nil + } + + return traces +} + +func getTracesStatus(traces Trace) string { + for _, t := range traces { + for _, t1 := range t { + if t1.Kind == "CLIENT" { + s := t1.Tags.HTTPStatusCode + return s + } + } + } + + return "" +} + +func getTemplateData() (templateData, []Template) { + return templateData{ + TestNamespace: testNamespace, + DeploymentName: deploymentName, + ServiceName: serviceName, + ClientName: clientName, + HTTPScaledObjectName: httpScaledObjectName, + Host: host, + MinReplicas: minReplicaCount, + MaxReplicas: maxReplicaCount, + }, []Template{ + {Name: "zipkinTemplate", Config: zipkinTemplate}, + {Name: "deploymentTemplate", Config: deploymentTemplate}, + {Name: "serviceNameTemplate", Config: serviceTemplate}, + {Name: "clientTemplate", Config: clientTemplate}, + {Name: "httpScaledObjectTemplate", Config: httpScaledObjectTemplate}, + } +} diff --git a/tests/utils/setup_test.go b/tests/utils/setup_test.go index 1b30db30..5097531e 100644 --- a/tests/utils/setup_test.go +++ b/tests/utils/setup_test.go @@ -25,13 +25,19 @@ config: loglevel: debug prometheus: endpoint: 0.0.0.0:8889 + zipkin: + endpoint: http://zipkin.zipkin:9411/api/v2/spans receivers: jaeger: null prometheus: null zipkin: null service: pipelines: - traces: null + traces: + receivers: + - otlp + exporters: + - zipkin metrics: receivers: - otlp