diff --git a/go.mod b/go.mod index 830a795df3..8079cd04a1 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/stretchr/testify v1.7.0 go.opentelemetry.io/proto/otlp v0.7.0 golang.org/x/net v0.0.0-20201021035429-f5854403a974 // indirect + golang.org/x/sync v0.0.0-20190423024810-112230192c58 golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 // indirect google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 google.golang.org/grpc v1.36.0 diff --git a/go.sum b/go.sum index 28c9b41f49..15b45aa4c0 100644 --- a/go.sum +++ b/go.sum @@ -74,6 +74,7 @@ golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4Iltr golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/pkg/server/sotw/v3/server.go b/pkg/server/sotw/v3/server.go index d3b167379a..8d0776e89b 100644 --- a/pkg/server/sotw/v3/server.go +++ b/pkg/server/sotw/v3/server.go @@ -18,10 +18,10 @@ package sotw import ( "context" "errors" - "reflect" "strconv" "sync/atomic" + "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -63,15 +63,6 @@ type server struct { streamCount int64 } -// Discovery response that is sent over GRPC stream -// We need to record what resource names are already sent to a client -// So if the client requests a new name we can respond back -// regardless current snapshot version (even if it is not changed yet) -type lastDiscoveryResponse struct { - nonce string - resources map[string]struct{} -} - // process handles a bi-di stream request func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryRequest, defaultTypeURL string) error { // increment stream count @@ -81,14 +72,12 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq // ignores stale nonces. nonce is only modified within send() function. var streamNonce int64 - streamState := stream.NewStreamState(false, map[string]string{}) - lastDiscoveryResponses := map[string]lastDiscoveryResponse{} + streamState := stream.NewSTOWStreamState() // a collection of stack allocated watches per request type watches := newWatches() defer func() { - watches.close() if s.callbacks != nil { s.callbacks.OnStreamClosed(streamID) } @@ -109,14 +98,8 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq streamNonce = streamNonce + 1 out.Nonce = strconv.FormatInt(streamNonce, 10) - lastResponse := lastDiscoveryResponse{ - nonce: out.Nonce, - resources: make(map[string]struct{}), - } - for _, r := range resp.GetRequest().ResourceNames { - lastResponse.resources[r] = struct{}{} - } - lastDiscoveryResponses[resp.GetRequest().TypeUrl] = lastResponse + lastResponse := stream.NewLastDiscoveryResponse(out.Nonce, resp.GetRequest().ResourceNames) + streamState.Set(resp.GetRequest().TypeUrl, lastResponse) if s.callbacks != nil { s.callbacks.OnStreamResponse(resp.GetContext(), streamID, resp.GetRequest(), out) @@ -133,103 +116,100 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq // node may only be set on the first discovery request var node = &core.Node{} - // recompute dynamic channels for this stream - watches.recompute(s.ctx, reqCh) - - for { - // The list of select cases looks like this: - // 0: <- ctx.Done - // 1: <- reqCh - // 2...: per type watches - index, value, ok := reflect.Select(watches.cases) - switch index { - // ctx.Done() -> if we receive a value here we return as no further computation is needed - case 0: - return nil - // Case 1 handles any request inbound on the stream and handles all initialization as needed - case 1: - // input stream ended or errored out - if !ok { - return nil - } + var resCh = make(chan cache.Response, 1) - req := value.Interface().(*discovery.DiscoveryRequest) - if req == nil { - return status.Errorf(codes.Unavailable, "empty request") - } + ctx, cancel := context.WithCancel(s.ctx) + eg, ctx := errgroup.WithContext(ctx) - // node field in discovery request is delta-compressed - if req.Node != nil { - node = req.Node - } else { - req.Node = node - } - - // nonces can be reused across streams; we verify nonce only if nonce is not initialized - nonce := req.GetResponseNonce() + eg.Go(func() error { + defer func() { + watches.close() // this should remove all watches from the cache + close(resCh) // close resCh and let the second eg.Go drain it + }() - // type URL is required for ADS but is implicit for xDS - if defaultTypeURL == resource.AnyType { - if req.TypeUrl == "" { - return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") + for { + select { + case <-ctx.Done(): + return nil + case req, more := <-reqCh: + if !more { + return nil } - } else if req.TypeUrl == "" { - req.TypeUrl = defaultTypeURL - } + if req == nil { + return status.Errorf(codes.Unavailable, "empty request") + } + // node field in discovery request is delta-compressed + if req.Node != nil { + node = req.Node + } else { + req.Node = node + } + + // nonces can be reused across streams; we verify nonce only if nonce is not initialized + nonce := req.GetResponseNonce() - if s.callbacks != nil { - if err := s.callbacks.OnStreamRequest(streamID, req); err != nil { - return err + // type URL is required for ADS but is implicit for xDS + if defaultTypeURL == resource.AnyType { + if req.TypeUrl == "" { + return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") + } + } else if req.TypeUrl == "" { + req.TypeUrl = defaultTypeURL } - } - if lastResponse, ok := lastDiscoveryResponses[req.TypeUrl]; ok { - if lastResponse.nonce == "" || lastResponse.nonce == nonce { - // Let's record Resource names that a client has received. - streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.resources) + if s.callbacks != nil { + if err := s.callbacks.OnStreamRequest(streamID, req); err != nil { + return err + } } - } - typeURL := req.GetTypeUrl() - responder := make(chan cache.Response, 1) - if w, ok := watches.responders[typeURL]; ok { - // We've found a pre-existing watch, lets check and update if needed. - // If these requirements aren't satisfied, leave an open watch. - if w.nonce == "" || w.nonce == nonce { - w.close() + if lastResponse, ok := streamState.Get(req.TypeUrl); ok { + if lastResponse.Nonce == "" || lastResponse.Nonce == nonce { + // Let's record Resource names that a client has received. + streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.Resources) + } + } + typeURL := req.GetTypeUrl() + if w := watches.getWatch(typeURL); w != nil { + // We've found a pre-existing watch, lets check and update if needed. + // If these requirements aren't satisfied, leave an open watch. + if n := w.getNonce(); n == "" || n == nonce { + w.close() + + watches.addWatch(typeURL, &watch{ + cancel: s.cache.CreateWatch(req, streamState.StreamState, resCh), + }) + } + } else { + // No pre-existing watch exists, let's create one. + // We need to precompute the watches first then open a watch in the cache. watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, streamState, responder), - response: responder, + cancel: s.cache.CreateWatch(req, streamState.StreamState, resCh), }) } - } else { - // No pre-existing watch exists, let's create one. - // We need to precompute the watches first then open a watch in the cache. - watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, streamState, responder), - response: responder, - }) } + } + }) - // Recompute the dynamic select cases for this stream. - watches.recompute(s.ctx, reqCh) - default: - // Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL - if !ok { - // Receiver channel was closed. TODO(jpeach): probably cancel the watch or something? - return status.Errorf(codes.Unavailable, "resource watch %d -> failed", index) + eg.Go(func() (err error) { + var nonce string + for res := range resCh { + if res == nil || err != nil { + continue // this loop should not exit until resCh closed } - - res := value.Interface().(cache.Response) - nonce, err := send(res) - if err != nil { - return err + if nonce, err = send(res); err == nil { + if w := watches.getWatch(res.GetRequest().TypeUrl); w != nil { + w.setNonce(nonce) + } + } else { + cancel() } - - watches.responders[res.GetRequest().TypeUrl].nonce = nonce } - } + return err + }) + + return eg.Wait() } // StreamHandler converts a blocking read call to channels and initiates stream processing diff --git a/pkg/server/sotw/v3/watches.go b/pkg/server/sotw/v3/watches.go index 45670d6a91..b5c659d3f1 100644 --- a/pkg/server/sotw/v3/watches.go +++ b/pkg/server/sotw/v3/watches.go @@ -1,36 +1,38 @@ package sotw import ( - "context" - "reflect" + "sync" - discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" - "github.com/envoyproxy/go-control-plane/pkg/cache/v3" ) // watches for all xDS resource types type watches struct { + mu sync.RWMutex responders map[string]*watch - - // cases is a dynamic select case for the watched channels. - cases []reflect.SelectCase } // newWatches creates and initializes watches. func newWatches() watches { return watches{ responders: make(map[string]*watch, int(types.UnknownType)), - cases: make([]reflect.SelectCase, 0), } } // addWatch creates a new watch entry in the watches map. // Watches are sorted by typeURL. func (w *watches) addWatch(typeURL string, watch *watch) { + w.mu.Lock() + defer w.mu.Unlock() w.responders[typeURL] = watch } +func (w *watches) getWatch(typeURL string) (watch *watch) { + w.mu.RLock() + defer w.mu.RUnlock() + return w.responders[typeURL] +} + // close all open watches func (w *watches) close() { for _, watch := range w.responders { @@ -38,33 +40,23 @@ func (w *watches) close() { } } -// recomputeWatches rebuilds the known list of dynamic channels if needed -func (w *watches) recompute(ctx context.Context, req <-chan *discovery.DiscoveryRequest) { - w.cases = w.cases[:0] // Clear the existing cases while retaining capacity. - - w.cases = append(w.cases, - reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(ctx.Done()), - }, reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(req), - }, - ) +// watch contains the necessary modifiables for receiving resource responses +type watch struct { + mu sync.RWMutex + cancel func() + nonce string +} - for _, watch := range w.responders { - w.cases = append(w.cases, reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(watch.response), - }) - } +func (w *watch) getNonce() (n string) { + w.mu.RLock() + defer w.mu.RUnlock() + return w.nonce } -// watch contains the necessary modifiables for receiving resource responses -type watch struct { - cancel func() - nonce string - response chan cache.Response +func (w *watch) setNonce(n string) { + w.mu.Lock() + defer w.mu.Unlock() + w.nonce = n } // close cancels an open watch diff --git a/pkg/server/stream/v3/stream.go b/pkg/server/stream/v3/stream.go index 3a4247c457..c32ef99046 100644 --- a/pkg/server/stream/v3/stream.go +++ b/pkg/server/stream/v3/stream.go @@ -1,9 +1,10 @@ package stream import ( - "google.golang.org/grpc" + "sync" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "google.golang.org/grpc" ) // Generic RPC stream. @@ -85,3 +86,49 @@ func NewStreamState(wildcard bool, initialResourceVersions map[string]string) St return state } + +func NewLastDiscoveryResponse(nonce string, resources []string) LastDiscoveryResponse { + resp := LastDiscoveryResponse{ + Nonce: nonce, + Resources: make(map[string]struct{}), + } + for _, r := range resources { + resp.Resources[r] = struct{}{} + } + return resp +} + +// LastDiscoveryResponse that is sent over GRPC stream +// We need to record what resource names are already sent to a client +// So if the client requests a new name we can respond back +// regardless current snapshot version (even if it is not changed yet) +type LastDiscoveryResponse struct { + Nonce string + Resources map[string]struct{} +} + +func NewSTOWStreamState() STOWStreamState { + return STOWStreamState{ + StreamState: NewStreamState(false, map[string]string{}), + responses: make(map[string]LastDiscoveryResponse), + } +} + +type STOWStreamState struct { + StreamState + responses map[string]LastDiscoveryResponse + mu sync.RWMutex +} + +func (l *STOWStreamState) Set(key string, value LastDiscoveryResponse) { + l.mu.Lock() + defer l.mu.Unlock() + l.responses[key] = value +} + +func (l *STOWStreamState) Get(key string) (value LastDiscoveryResponse, ok bool) { + l.mu.RLock() + defer l.mu.RUnlock() + value, ok = l.responses[key] + return +}