Skip to content

Commit

Permalink
server: use separate goroutines for sotw bidi streams (#530)
Browse files Browse the repository at this point in the history
Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian committed Jan 12, 2022
1 parent 01bb8ac commit 8e247d6
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 132 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/proto/otlp v0.7.0
golang.org/x/net v0.0.0-20201021035429-f5854403a974 // indirect
golang.org/x/sync v0.0.0-20190423024810-112230192c58
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013
google.golang.org/grpc v1.36.0
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4Iltr
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
180 changes: 80 additions & 100 deletions pkg/server/sotw/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ package sotw
import (
"context"
"errors"
"reflect"
"strconv"
"sync/atomic"

"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand Down Expand Up @@ -63,15 +63,6 @@ type server struct {
streamCount int64
}

// Discovery response that is sent over GRPC stream
// We need to record what resource names are already sent to a client
// So if the client requests a new name we can respond back
// regardless current snapshot version (even if it is not changed yet)
type lastDiscoveryResponse struct {
nonce string
resources map[string]struct{}
}

// process handles a bi-di stream request
func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryRequest, defaultTypeURL string) error {
// increment stream count
Expand All @@ -81,14 +72,12 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
// ignores stale nonces. nonce is only modified within send() function.
var streamNonce int64

streamState := stream.NewStreamState(false, map[string]string{})
lastDiscoveryResponses := map[string]lastDiscoveryResponse{}
streamState := stream.NewSTOWStreamState()

// a collection of stack allocated watches per request type
watches := newWatches()

defer func() {
watches.close()
if s.callbacks != nil {
s.callbacks.OnStreamClosed(streamID)
}
Expand All @@ -109,14 +98,8 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
streamNonce = streamNonce + 1
out.Nonce = strconv.FormatInt(streamNonce, 10)

lastResponse := lastDiscoveryResponse{
nonce: out.Nonce,
resources: make(map[string]struct{}),
}
for _, r := range resp.GetRequest().ResourceNames {
lastResponse.resources[r] = struct{}{}
}
lastDiscoveryResponses[resp.GetRequest().TypeUrl] = lastResponse
lastResponse := stream.NewLastDiscoveryResponse(out.Nonce, resp.GetRequest().ResourceNames)
streamState.Set(resp.GetRequest().TypeUrl, lastResponse)

if s.callbacks != nil {
s.callbacks.OnStreamResponse(resp.GetContext(), streamID, resp.GetRequest(), out)
Expand All @@ -133,103 +116,100 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
// node may only be set on the first discovery request
var node = &core.Node{}

// recompute dynamic channels for this stream
watches.recompute(s.ctx, reqCh)

for {
// The list of select cases looks like this:
// 0: <- ctx.Done
// 1: <- reqCh
// 2...: per type watches
index, value, ok := reflect.Select(watches.cases)
switch index {
// ctx.Done() -> if we receive a value here we return as no further computation is needed
case 0:
return nil
// Case 1 handles any request inbound on the stream and handles all initialization as needed
case 1:
// input stream ended or errored out
if !ok {
return nil
}
var resCh = make(chan cache.Response, 1)

req := value.Interface().(*discovery.DiscoveryRequest)
if req == nil {
return status.Errorf(codes.Unavailable, "empty request")
}
ctx, cancel := context.WithCancel(s.ctx)
eg, ctx := errgroup.WithContext(ctx)

// node field in discovery request is delta-compressed
if req.Node != nil {
node = req.Node
} else {
req.Node = node
}

// nonces can be reused across streams; we verify nonce only if nonce is not initialized
nonce := req.GetResponseNonce()
eg.Go(func() error {
defer func() {
watches.close() // this should remove all watches from the cache
close(resCh) // close resCh and let the second eg.Go drain it
}()

// type URL is required for ADS but is implicit for xDS
if defaultTypeURL == resource.AnyType {
if req.TypeUrl == "" {
return status.Errorf(codes.InvalidArgument, "type URL is required for ADS")
for {
select {
case <-ctx.Done():
return nil
case req, more := <-reqCh:
if !more {
return nil
}
} else if req.TypeUrl == "" {
req.TypeUrl = defaultTypeURL
}
if req == nil {
return status.Errorf(codes.Unavailable, "empty request")
}
// node field in discovery request is delta-compressed
if req.Node != nil {
node = req.Node
} else {
req.Node = node
}

// nonces can be reused across streams; we verify nonce only if nonce is not initialized
nonce := req.GetResponseNonce()

if s.callbacks != nil {
if err := s.callbacks.OnStreamRequest(streamID, req); err != nil {
return err
// type URL is required for ADS but is implicit for xDS
if defaultTypeURL == resource.AnyType {
if req.TypeUrl == "" {
return status.Errorf(codes.InvalidArgument, "type URL is required for ADS")
}
} else if req.TypeUrl == "" {
req.TypeUrl = defaultTypeURL
}
}

if lastResponse, ok := lastDiscoveryResponses[req.TypeUrl]; ok {
if lastResponse.nonce == "" || lastResponse.nonce == nonce {
// Let's record Resource names that a client has received.
streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.resources)
if s.callbacks != nil {
if err := s.callbacks.OnStreamRequest(streamID, req); err != nil {
return err
}
}
}

typeURL := req.GetTypeUrl()
responder := make(chan cache.Response, 1)
if w, ok := watches.responders[typeURL]; ok {
// We've found a pre-existing watch, lets check and update if needed.
// If these requirements aren't satisfied, leave an open watch.
if w.nonce == "" || w.nonce == nonce {
w.close()
if lastResponse, ok := streamState.Get(req.TypeUrl); ok {
if lastResponse.Nonce == "" || lastResponse.Nonce == nonce {
// Let's record Resource names that a client has received.
streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.Resources)
}
}

typeURL := req.GetTypeUrl()
if w := watches.getWatch(typeURL); w != nil {
// We've found a pre-existing watch, lets check and update if needed.
// If these requirements aren't satisfied, leave an open watch.
if n := w.getNonce(); n == "" || n == nonce {
w.close()

watches.addWatch(typeURL, &watch{
cancel: s.cache.CreateWatch(req, streamState.StreamState, resCh),
})
}
} else {
// No pre-existing watch exists, let's create one.
// We need to precompute the watches first then open a watch in the cache.
watches.addWatch(typeURL, &watch{
cancel: s.cache.CreateWatch(req, streamState, responder),
response: responder,
cancel: s.cache.CreateWatch(req, streamState.StreamState, resCh),
})
}
} else {
// No pre-existing watch exists, let's create one.
// We need to precompute the watches first then open a watch in the cache.
watches.addWatch(typeURL, &watch{
cancel: s.cache.CreateWatch(req, streamState, responder),
response: responder,
})
}
}
})

// Recompute the dynamic select cases for this stream.
watches.recompute(s.ctx, reqCh)
default:
// Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL
if !ok {
// Receiver channel was closed. TODO(jpeach): probably cancel the watch or something?
return status.Errorf(codes.Unavailable, "resource watch %d -> failed", index)
eg.Go(func() (err error) {
var nonce string
for res := range resCh {
if res == nil || err != nil {
continue // this loop should not exit until resCh closed
}

res := value.Interface().(cache.Response)
nonce, err := send(res)
if err != nil {
return err
if nonce, err = send(res); err == nil {
if w := watches.getWatch(res.GetRequest().TypeUrl); w != nil {
w.setNonce(nonce)
}
} else {
cancel()
}

watches.responders[res.GetRequest().TypeUrl].nonce = nonce
}
}
return err
})

return eg.Wait()
}

// StreamHandler converts a blocking read call to channels and initiates stream processing
Expand Down
56 changes: 24 additions & 32 deletions pkg/server/sotw/v3/watches.go
Original file line number Diff line number Diff line change
@@ -1,70 +1,62 @@
package sotw

import (
"context"
"reflect"
"sync"

discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
)

// watches for all xDS resource types
type watches struct {
mu sync.RWMutex
responders map[string]*watch

// cases is a dynamic select case for the watched channels.
cases []reflect.SelectCase
}

// newWatches creates and initializes watches.
func newWatches() watches {
return watches{
responders: make(map[string]*watch, int(types.UnknownType)),
cases: make([]reflect.SelectCase, 0),
}
}

// addWatch creates a new watch entry in the watches map.
// Watches are sorted by typeURL.
func (w *watches) addWatch(typeURL string, watch *watch) {
w.mu.Lock()
defer w.mu.Unlock()
w.responders[typeURL] = watch
}

func (w *watches) getWatch(typeURL string) (watch *watch) {
w.mu.RLock()
defer w.mu.RUnlock()
return w.responders[typeURL]
}

// close all open watches
func (w *watches) close() {
for _, watch := range w.responders {
watch.close()
}
}

// recomputeWatches rebuilds the known list of dynamic channels if needed
func (w *watches) recompute(ctx context.Context, req <-chan *discovery.DiscoveryRequest) {
w.cases = w.cases[:0] // Clear the existing cases while retaining capacity.

w.cases = append(w.cases,
reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ctx.Done()),
}, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(req),
},
)
// watch contains the necessary modifiables for receiving resource responses
type watch struct {
mu sync.RWMutex
cancel func()
nonce string
}

for _, watch := range w.responders {
w.cases = append(w.cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(watch.response),
})
}
func (w *watch) getNonce() (n string) {
w.mu.RLock()
defer w.mu.RUnlock()
return w.nonce
}

// watch contains the necessary modifiables for receiving resource responses
type watch struct {
cancel func()
nonce string
response chan cache.Response
func (w *watch) setNonce(n string) {
w.mu.Lock()
defer w.mu.Unlock()
w.nonce = n
}

// close cancels an open watch
Expand Down
Loading

0 comments on commit 8e247d6

Please sign in to comment.