Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: use separate goroutines for sotw bidi streams (#530) #531

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/prometheus/client_model v0.2.0
github.com/stretchr/testify v1.7.1
go.opentelemetry.io/proto/otlp v0.15.0
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f
google.golang.org/genproto v0.0.0-20220329172620-7be39ac1afc7
google.golang.org/grpc v1.45.0
google.golang.org/protobuf v1.28.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f h1:Ax0t5p6N38Ga0dThY21weqDEyz2oklo4IvDkpigvkD8=
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
180 changes: 80 additions & 100 deletions pkg/server/sotw/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ package sotw
import (
"context"
"errors"
"reflect"
"strconv"
"sync/atomic"

"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand Down Expand Up @@ -63,15 +63,6 @@ type server struct {
streamCount int64
}

// Discovery response that is sent over GRPC stream
// We need to record what resource names are already sent to a client
// So if the client requests a new name we can respond back
// regardless current snapshot version (even if it is not changed yet)
type lastDiscoveryResponse struct {
nonce string
resources map[string]struct{}
}

// process handles a bi-di stream request
func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryRequest, defaultTypeURL string) error {
// increment stream count
Expand All @@ -81,8 +72,7 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
// ignores stale nonces. nonce is only modified within send() function.
var streamNonce int64

streamState := stream.NewStreamState(false, map[string]string{})
lastDiscoveryResponses := map[string]lastDiscoveryResponse{}
streamState := stream.NewSTOWStreamState()

// a collection of stack allocated watches per request type
watches := newWatches()
Expand All @@ -91,7 +81,6 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
var node = &core.Node{}

defer func() {
watches.close()
if s.callbacks != nil {
s.callbacks.OnStreamClosed(streamID, node)
}
Expand All @@ -112,14 +101,8 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
streamNonce = streamNonce + 1
out.Nonce = strconv.FormatInt(streamNonce, 10)

lastResponse := lastDiscoveryResponse{
nonce: out.Nonce,
resources: make(map[string]struct{}),
}
for _, r := range resp.GetRequest().ResourceNames {
lastResponse.resources[r] = struct{}{}
}
lastDiscoveryResponses[resp.GetRequest().TypeUrl] = lastResponse
lastResponse := stream.NewLastDiscoveryResponse(out.Nonce, resp.GetRequest().ResourceNames)
streamState.Set(resp.GetRequest().TypeUrl, lastResponse)

if s.callbacks != nil {
s.callbacks.OnStreamResponse(resp.GetContext(), streamID, resp.GetRequest(), out)
Expand All @@ -133,103 +116,100 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
}
}

// recompute dynamic channels for this stream
watches.recompute(s.ctx, reqCh)

for {
// The list of select cases looks like this:
// 0: <- ctx.Done
// 1: <- reqCh
// 2...: per type watches
index, value, ok := reflect.Select(watches.cases)
switch index {
// ctx.Done() -> if we receive a value here we return as no further computation is needed
case 0:
return nil
// Case 1 handles any request inbound on the stream and handles all initialization as needed
case 1:
// input stream ended or errored out
if !ok {
return nil
}
var resCh = make(chan cache.Response, 1)

req := value.Interface().(*discovery.DiscoveryRequest)
if req == nil {
return status.Errorf(codes.Unavailable, "empty request")
}
ctx, cancel := context.WithCancel(s.ctx)
eg, ctx := errgroup.WithContext(ctx)

// node field in discovery request is delta-compressed
if req.Node != nil {
node = req.Node
} else {
req.Node = node
}

// nonces can be reused across streams; we verify nonce only if nonce is not initialized
nonce := req.GetResponseNonce()
eg.Go(func() error {
defer func() {
watches.close() // this should remove all watches from the cache
close(resCh) // close resCh and let the second eg.Go drain it
}()

// type URL is required for ADS but is implicit for xDS
if defaultTypeURL == resource.AnyType {
if req.TypeUrl == "" {
return status.Errorf(codes.InvalidArgument, "type URL is required for ADS")
for {
select {
case <-ctx.Done():
return nil
case req, more := <-reqCh:
if !more {
return nil
}
} else if req.TypeUrl == "" {
req.TypeUrl = defaultTypeURL
}
if req == nil {
return status.Errorf(codes.Unavailable, "empty request")
}
// node field in discovery request is delta-compressed
if req.Node != nil {
node = req.Node
} else {
req.Node = node
}

// nonces can be reused across streams; we verify nonce only if nonce is not initialized
nonce := req.GetResponseNonce()

if s.callbacks != nil {
if err := s.callbacks.OnStreamRequest(streamID, req); err != nil {
return err
// type URL is required for ADS but is implicit for xDS
if defaultTypeURL == resource.AnyType {
if req.TypeUrl == "" {
return status.Errorf(codes.InvalidArgument, "type URL is required for ADS")
}
} else if req.TypeUrl == "" {
req.TypeUrl = defaultTypeURL
}
}

if lastResponse, ok := lastDiscoveryResponses[req.TypeUrl]; ok {
if lastResponse.nonce == "" || lastResponse.nonce == nonce {
// Let's record Resource names that a client has received.
streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.resources)
if s.callbacks != nil {
if err := s.callbacks.OnStreamRequest(streamID, req); err != nil {
return err
}
}
}

typeURL := req.GetTypeUrl()
responder := make(chan cache.Response, 1)
if w, ok := watches.responders[typeURL]; ok {
// We've found a pre-existing watch, lets check and update if needed.
// If these requirements aren't satisfied, leave an open watch.
if w.nonce == "" || w.nonce == nonce {
w.close()
if lastResponse, ok := streamState.Get(req.TypeUrl); ok {
if lastResponse.Nonce == "" || lastResponse.Nonce == nonce {
// Let's record Resource names that a client has received.
streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.Resources)
}
}

typeURL := req.GetTypeUrl()
if w := watches.getWatch(typeURL); w != nil {
// We've found a pre-existing watch, lets check and update if needed.
// If these requirements aren't satisfied, leave an open watch.
if n := w.getNonce(); n == "" || n == nonce {
w.close()

watches.addWatch(typeURL, &watch{
cancel: s.cache.CreateWatch(req, streamState.StreamState, resCh),
})
}
} else {
// No pre-existing watch exists, let's create one.
// We need to precompute the watches first then open a watch in the cache.
watches.addWatch(typeURL, &watch{
cancel: s.cache.CreateWatch(req, streamState, responder),
response: responder,
cancel: s.cache.CreateWatch(req, streamState.StreamState, resCh),
})
}
} else {
// No pre-existing watch exists, let's create one.
// We need to precompute the watches first then open a watch in the cache.
watches.addWatch(typeURL, &watch{
cancel: s.cache.CreateWatch(req, streamState, responder),
response: responder,
})
}
}
})

// Recompute the dynamic select cases for this stream.
watches.recompute(s.ctx, reqCh)
default:
// Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL
if !ok {
// Receiver channel was closed. TODO(jpeach): probably cancel the watch or something?
return status.Errorf(codes.Unavailable, "resource watch %d -> failed", index)
eg.Go(func() (err error) {
var nonce string
for res := range resCh {
if res == nil || err != nil {
continue // this loop should not exit until resCh closed
}

res := value.Interface().(cache.Response)
nonce, err := send(res)
if err != nil {
return err
if nonce, err = send(res); err == nil {
if w := watches.getWatch(res.GetRequest().TypeUrl); w != nil {
w.setNonce(nonce)
}
} else {
cancel()
}

watches.responders[res.GetRequest().TypeUrl].nonce = nonce
}
}
return err
})

return eg.Wait()
}

// StreamHandler converts a blocking read call to channels and initiates stream processing
Expand Down
56 changes: 24 additions & 32 deletions pkg/server/sotw/v3/watches.go
Original file line number Diff line number Diff line change
@@ -1,70 +1,62 @@
package sotw

import (
"context"
"reflect"
"sync"

discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
)

// watches for all xDS resource types
type watches struct {
mu sync.RWMutex
responders map[string]*watch

// cases is a dynamic select case for the watched channels.
cases []reflect.SelectCase
}

// newWatches creates and initializes watches.
func newWatches() watches {
return watches{
responders: make(map[string]*watch, int(types.UnknownType)),
cases: make([]reflect.SelectCase, 0),
}
}

// addWatch creates a new watch entry in the watches map.
// Watches are sorted by typeURL.
func (w *watches) addWatch(typeURL string, watch *watch) {
w.mu.Lock()
defer w.mu.Unlock()
w.responders[typeURL] = watch
}

func (w *watches) getWatch(typeURL string) (watch *watch) {
w.mu.RLock()
defer w.mu.RUnlock()
return w.responders[typeURL]
}

// close all open watches
func (w *watches) close() {
for _, watch := range w.responders {
watch.close()
}
}

// recomputeWatches rebuilds the known list of dynamic channels if needed
func (w *watches) recompute(ctx context.Context, req <-chan *discovery.DiscoveryRequest) {
w.cases = w.cases[:0] // Clear the existing cases while retaining capacity.

w.cases = append(w.cases,
reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ctx.Done()),
}, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(req),
},
)
// watch contains the necessary modifiables for receiving resource responses
type watch struct {
mu sync.RWMutex
cancel func()
nonce string
}

for _, watch := range w.responders {
w.cases = append(w.cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(watch.response),
})
}
func (w *watch) getNonce() (n string) {
w.mu.RLock()
defer w.mu.RUnlock()
return w.nonce
}

// watch contains the necessary modifiables for receiving resource responses
type watch struct {
cancel func()
nonce string
response chan cache.Response
func (w *watch) setNonce(n string) {
w.mu.Lock()
defer w.mu.Unlock()
w.nonce = n
}

// close cancels an open watch
Expand Down
Loading