From ad49378f24e0ddde42471353478ee965c0e45d78 Mon Sep 17 00:00:00 2001 From: Rueian Date: Tue, 21 Dec 2021 17:25:21 +0800 Subject: [PATCH 1/3] server: use separate goroutines for sotw bidi streams (#530) Signed-off-by: Rueian --- go.mod | 1 + go.sum | 2 + pkg/server/sotw/v3/server.go | 180 +++++++++++++++------------------ pkg/server/sotw/v3/watches.go | 56 +++++----- pkg/server/stream/v3/stream.go | 48 +++++++++ 5 files changed, 155 insertions(+), 132 deletions(-) diff --git a/go.mod b/go.mod index ce57eb360d..1894f8515c 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/prometheus/client_model v0.2.0 github.com/stretchr/testify v1.7.1 go.opentelemetry.io/proto/otlp v0.15.0 + golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f google.golang.org/genproto v0.0.0-20220329172620-7be39ac1afc7 google.golang.org/grpc v1.45.0 google.golang.org/protobuf v1.28.0 diff --git a/go.sum b/go.sum index 950d7d44c6..30e87b0112 100644 --- a/go.sum +++ b/go.sum @@ -253,6 +253,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f h1:Ax0t5p6N38Ga0dThY21weqDEyz2oklo4IvDkpigvkD8= +golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/pkg/server/sotw/v3/server.go b/pkg/server/sotw/v3/server.go index 91681237c9..f00169d4e1 100644 --- a/pkg/server/sotw/v3/server.go +++ b/pkg/server/sotw/v3/server.go @@ -18,10 +18,10 @@ package sotw import ( "context" "errors" - "reflect" "strconv" "sync/atomic" + "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -63,15 +63,6 @@ type server struct { streamCount int64 } -// Discovery response that is sent over GRPC stream -// We need to record what resource names are already sent to a client -// So if the client requests a new name we can respond back -// regardless current snapshot version (even if it is not changed yet) -type lastDiscoveryResponse struct { - nonce string - resources map[string]struct{} -} - // process handles a bi-di stream request func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryRequest, defaultTypeURL string) error { // increment stream count @@ -81,8 +72,7 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq // ignores stale nonces. nonce is only modified within send() function. var streamNonce int64 - streamState := stream.NewStreamState(false, map[string]string{}) - lastDiscoveryResponses := map[string]lastDiscoveryResponse{} + streamState := stream.NewSTOWStreamState() // a collection of stack allocated watches per request type watches := newWatches() @@ -91,7 +81,6 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq var node = &core.Node{} defer func() { - watches.close() if s.callbacks != nil { s.callbacks.OnStreamClosed(streamID, node) } @@ -112,14 +101,8 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq streamNonce = streamNonce + 1 out.Nonce = strconv.FormatInt(streamNonce, 10) - lastResponse := lastDiscoveryResponse{ - nonce: out.Nonce, - resources: make(map[string]struct{}), - } - for _, r := range resp.GetRequest().ResourceNames { - lastResponse.resources[r] = struct{}{} - } - lastDiscoveryResponses[resp.GetRequest().TypeUrl] = lastResponse + lastResponse := stream.NewLastDiscoveryResponse(out.Nonce, resp.GetRequest().ResourceNames) + streamState.Set(resp.GetRequest().TypeUrl, lastResponse) if s.callbacks != nil { s.callbacks.OnStreamResponse(resp.GetContext(), streamID, resp.GetRequest(), out) @@ -133,103 +116,100 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq } } - // recompute dynamic channels for this stream - watches.recompute(s.ctx, reqCh) - - for { - // The list of select cases looks like this: - // 0: <- ctx.Done - // 1: <- reqCh - // 2...: per type watches - index, value, ok := reflect.Select(watches.cases) - switch index { - // ctx.Done() -> if we receive a value here we return as no further computation is needed - case 0: - return nil - // Case 1 handles any request inbound on the stream and handles all initialization as needed - case 1: - // input stream ended or errored out - if !ok { - return nil - } + var resCh = make(chan cache.Response, 1) - req := value.Interface().(*discovery.DiscoveryRequest) - if req == nil { - return status.Errorf(codes.Unavailable, "empty request") - } + ctx, cancel := context.WithCancel(s.ctx) + eg, ctx := errgroup.WithContext(ctx) - // node field in discovery request is delta-compressed - if req.Node != nil { - node = req.Node - } else { - req.Node = node - } - - // nonces can be reused across streams; we verify nonce only if nonce is not initialized - nonce := req.GetResponseNonce() + eg.Go(func() error { + defer func() { + watches.close() // this should remove all watches from the cache + close(resCh) // close resCh and let the second eg.Go drain it + }() - // type URL is required for ADS but is implicit for xDS - if defaultTypeURL == resource.AnyType { - if req.TypeUrl == "" { - return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") + for { + select { + case <-ctx.Done(): + return nil + case req, more := <-reqCh: + if !more { + return nil } - } else if req.TypeUrl == "" { - req.TypeUrl = defaultTypeURL - } + if req == nil { + return status.Errorf(codes.Unavailable, "empty request") + } + // node field in discovery request is delta-compressed + if req.Node != nil { + node = req.Node + } else { + req.Node = node + } + + // nonces can be reused across streams; we verify nonce only if nonce is not initialized + nonce := req.GetResponseNonce() - if s.callbacks != nil { - if err := s.callbacks.OnStreamRequest(streamID, req); err != nil { - return err + // type URL is required for ADS but is implicit for xDS + if defaultTypeURL == resource.AnyType { + if req.TypeUrl == "" { + return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") + } + } else if req.TypeUrl == "" { + req.TypeUrl = defaultTypeURL } - } - if lastResponse, ok := lastDiscoveryResponses[req.TypeUrl]; ok { - if lastResponse.nonce == "" || lastResponse.nonce == nonce { - // Let's record Resource names that a client has received. - streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.resources) + if s.callbacks != nil { + if err := s.callbacks.OnStreamRequest(streamID, req); err != nil { + return err + } } - } - typeURL := req.GetTypeUrl() - responder := make(chan cache.Response, 1) - if w, ok := watches.responders[typeURL]; ok { - // We've found a pre-existing watch, lets check and update if needed. - // If these requirements aren't satisfied, leave an open watch. - if w.nonce == "" || w.nonce == nonce { - w.close() + if lastResponse, ok := streamState.Get(req.TypeUrl); ok { + if lastResponse.Nonce == "" || lastResponse.Nonce == nonce { + // Let's record Resource names that a client has received. + streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.Resources) + } + } + typeURL := req.GetTypeUrl() + if w := watches.getWatch(typeURL); w != nil { + // We've found a pre-existing watch, lets check and update if needed. + // If these requirements aren't satisfied, leave an open watch. + if n := w.getNonce(); n == "" || n == nonce { + w.close() + + watches.addWatch(typeURL, &watch{ + cancel: s.cache.CreateWatch(req, streamState.StreamState, resCh), + }) + } + } else { + // No pre-existing watch exists, let's create one. + // We need to precompute the watches first then open a watch in the cache. watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, streamState, responder), - response: responder, + cancel: s.cache.CreateWatch(req, streamState.StreamState, resCh), }) } - } else { - // No pre-existing watch exists, let's create one. - // We need to precompute the watches first then open a watch in the cache. - watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, streamState, responder), - response: responder, - }) } + } + }) - // Recompute the dynamic select cases for this stream. - watches.recompute(s.ctx, reqCh) - default: - // Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL - if !ok { - // Receiver channel was closed. TODO(jpeach): probably cancel the watch or something? - return status.Errorf(codes.Unavailable, "resource watch %d -> failed", index) + eg.Go(func() (err error) { + var nonce string + for res := range resCh { + if res == nil || err != nil { + continue // this loop should not exit until resCh closed } - - res := value.Interface().(cache.Response) - nonce, err := send(res) - if err != nil { - return err + if nonce, err = send(res); err == nil { + if w := watches.getWatch(res.GetRequest().TypeUrl); w != nil { + w.setNonce(nonce) + } + } else { + cancel() } - - watches.responders[res.GetRequest().TypeUrl].nonce = nonce } - } + return err + }) + + return eg.Wait() } // StreamHandler converts a blocking read call to channels and initiates stream processing diff --git a/pkg/server/sotw/v3/watches.go b/pkg/server/sotw/v3/watches.go index 45670d6a91..b5c659d3f1 100644 --- a/pkg/server/sotw/v3/watches.go +++ b/pkg/server/sotw/v3/watches.go @@ -1,36 +1,38 @@ package sotw import ( - "context" - "reflect" + "sync" - discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" - "github.com/envoyproxy/go-control-plane/pkg/cache/v3" ) // watches for all xDS resource types type watches struct { + mu sync.RWMutex responders map[string]*watch - - // cases is a dynamic select case for the watched channels. - cases []reflect.SelectCase } // newWatches creates and initializes watches. func newWatches() watches { return watches{ responders: make(map[string]*watch, int(types.UnknownType)), - cases: make([]reflect.SelectCase, 0), } } // addWatch creates a new watch entry in the watches map. // Watches are sorted by typeURL. func (w *watches) addWatch(typeURL string, watch *watch) { + w.mu.Lock() + defer w.mu.Unlock() w.responders[typeURL] = watch } +func (w *watches) getWatch(typeURL string) (watch *watch) { + w.mu.RLock() + defer w.mu.RUnlock() + return w.responders[typeURL] +} + // close all open watches func (w *watches) close() { for _, watch := range w.responders { @@ -38,33 +40,23 @@ func (w *watches) close() { } } -// recomputeWatches rebuilds the known list of dynamic channels if needed -func (w *watches) recompute(ctx context.Context, req <-chan *discovery.DiscoveryRequest) { - w.cases = w.cases[:0] // Clear the existing cases while retaining capacity. - - w.cases = append(w.cases, - reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(ctx.Done()), - }, reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(req), - }, - ) +// watch contains the necessary modifiables for receiving resource responses +type watch struct { + mu sync.RWMutex + cancel func() + nonce string +} - for _, watch := range w.responders { - w.cases = append(w.cases, reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(watch.response), - }) - } +func (w *watch) getNonce() (n string) { + w.mu.RLock() + defer w.mu.RUnlock() + return w.nonce } -// watch contains the necessary modifiables for receiving resource responses -type watch struct { - cancel func() - nonce string - response chan cache.Response +func (w *watch) setNonce(n string) { + w.mu.Lock() + defer w.mu.Unlock() + w.nonce = n } // close cancels an open watch diff --git a/pkg/server/stream/v3/stream.go b/pkg/server/stream/v3/stream.go index b5832b7d58..093dd46d7d 100644 --- a/pkg/server/stream/v3/stream.go +++ b/pkg/server/stream/v3/stream.go @@ -1,6 +1,8 @@ package stream import ( + "sync" + "google.golang.org/grpc" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" @@ -124,3 +126,49 @@ func NewStreamState(wildcard bool, initialResourceVersions map[string]string) St return state } + +func NewLastDiscoveryResponse(nonce string, resources []string) LastDiscoveryResponse { + resp := LastDiscoveryResponse{ + Nonce: nonce, + Resources: make(map[string]struct{}), + } + for _, r := range resources { + resp.Resources[r] = struct{}{} + } + return resp +} + +// LastDiscoveryResponse that is sent over GRPC stream +// We need to record what resource names are already sent to a client +// So if the client requests a new name we can respond back +// regardless current snapshot version (even if it is not changed yet) +type LastDiscoveryResponse struct { + Nonce string + Resources map[string]struct{} +} + +func NewSTOWStreamState() STOWStreamState { + return STOWStreamState{ + StreamState: NewStreamState(false, map[string]string{}), + responses: make(map[string]LastDiscoveryResponse), + } +} + +type STOWStreamState struct { + StreamState + responses map[string]LastDiscoveryResponse + mu sync.RWMutex +} + +func (l *STOWStreamState) Set(key string, value LastDiscoveryResponse) { + l.mu.Lock() + defer l.mu.Unlock() + l.responses[key] = value +} + +func (l *STOWStreamState) Get(key string) (value LastDiscoveryResponse, ok bool) { + l.mu.RLock() + defer l.mu.RUnlock() + value, ok = l.responses[key] + return +} From e0c74fbcf0781ca533c408d3964ebb219f1e512d Mon Sep 17 00:00:00 2001 From: Rueian Date: Thu, 13 Jan 2022 23:54:05 +0800 Subject: [PATCH 2/3] server: test the deadlock between LinearCache and SOTW server (#530) Signed-off-by: Rueian --- pkg/server/v3/server_test.go | 63 ++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/pkg/server/v3/server_test.go b/pkg/server/v3/server_test.go index 02078bc3ad..9438682041 100644 --- a/pkg/server/v3/server_test.go +++ b/pkg/server/v3/server_test.go @@ -18,8 +18,11 @@ import ( "context" "errors" "fmt" + "math/rand" "reflect" + "strconv" "sync" + "sync/atomic" "testing" "time" @@ -682,3 +685,63 @@ func TestCallbackError(t *testing.T) { }) } } + +func TestSOTWLinearCacheIntegrationDeadLock(t *testing.T) { + for _, typ := range testTypes { + t.Run(typ, func(t *testing.T) { + t.Log("Integrating LinearCache with SOTW server. If this take too long, they might be dead locked") + + nonce := int64(0) + ver, targetVer := uint64(0), uint64(100000) + untilVerExceed := func(exceed uint64, fn func(current uint64)) { + for current := atomic.LoadUint64(&ver); current < exceed; current = atomic.LoadUint64(&ver) { + fn(current) + } + } + + config := cache.NewLinearCache(typ) + s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) + + resources := make([]string, 10) + for i := range resources { + resources[i] = strconv.Itoa(i) + } + + mStream := makeMockStream(t) + + go func() { + untilVerExceed(targetVer, func(current uint64) { + mStream.recv <- &discovery.DiscoveryRequest{ + Node: node, + TypeUrl: typ, + ResourceNames: resources, + VersionInfo: strconv.FormatUint(current, 10), + ResponseNonce: strconv.FormatInt(atomic.LoadInt64(&nonce), 10), + } + }) + close(mStream.recv) + }() + + go func() { + untilVerExceed(targetVer, func(current uint64) { + config.SetResources(map[string]types.Resource{ + resources[rand.Intn(len(resources))]: opaque, //nolint + }) + }) + }() + + go func() { + for resp := range mStream.sent { + v, _ := strconv.ParseUint(resp.VersionInfo, 10, 64) + atomic.StoreUint64(&ver, v) + n, _ := strconv.ParseInt(resp.Nonce, 10, 64) + atomic.StoreInt64(&nonce, n) + } + }() + + err := s.StreamAggregatedResources(mStream) + assert.Nil(t, err) + close(mStream.sent) + }) + } +} From 6a2832937b0dcc2ec6265795f0b1f7b59adacbb9 Mon Sep 17 00:00:00 2001 From: Rueian Date: Fri, 21 Jan 2022 00:12:32 +0800 Subject: [PATCH 3/3] server: reword comment of the deadlock test (#530) Signed-off-by: Rueian --- pkg/server/v3/server_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/server/v3/server_test.go b/pkg/server/v3/server_test.go index 9438682041..665b954e4a 100644 --- a/pkg/server/v3/server_test.go +++ b/pkg/server/v3/server_test.go @@ -689,7 +689,7 @@ func TestCallbackError(t *testing.T) { func TestSOTWLinearCacheIntegrationDeadLock(t *testing.T) { for _, typ := range testTypes { t.Run(typ, func(t *testing.T) { - t.Log("Integrating LinearCache with SOTW server. If this take too long, they might be dead locked") + t.Log("Integrating LinearCache with SOTW server. If this is never completed, it might be because they are dead locked.") nonce := int64(0) ver, targetVer := uint64(0), uint64(100000)