Skip to content

Commit

Permalink
handle creation of new watche more explicitly
Browse files Browse the repository at this point in the history
Signed-off-by: Alec Holmes <[email protected]>
  • Loading branch information
Alec Holmes committed Nov 30, 2021
1 parent 8784bbf commit 3946fb8
Showing 1 changed file with 21 additions and 15 deletions.
36 changes: 21 additions & 15 deletions pkg/server/sotw/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
return out.Nonce, str.Send(out)
}

open := func(w *watch, req *discovery.DiscoveryRequest, responder chan cache.Response) {
w.cancel = s.cache.CreateWatch(req, streamState, responder)
watches.cases[w.index] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(responder),
}
}

if s.callbacks != nil {
if err := s.callbacks.OnStreamOpen(str.Context(), streamID, defaultTypeURL); err != nil {
return err
Expand Down Expand Up @@ -191,25 +199,23 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
}

typeURL := req.GetTypeUrl()

w, ok := watches.responders[typeURL]
if !ok {
responder := make(chan cache.Response, 1)
if w, ok := watches.responders[typeURL]; ok {
// We've found a pre-existing watch, lets check and update if needed.
// If these requirements aren't satisfied, leave an open watch.
if w.nonce == "" || w.nonce == nonce {
w.Cancel()

open(w, req, responder)
}
} else {
// No pre-existing watch exists, let's create one.
// We need to precompute the watches first then open a watch in the cache.
watches.responders[typeURL] = &watch{}

w = watches.responders[typeURL]
watches.RecomputeWatches(s.ctx, reqCh)
}
if w.nonce == "" || w.nonce == nonce {
if w.cancel != nil {
w.cancel()
}

responder := make(chan cache.Response, 1)
w.cancel = s.cache.CreateWatch(req, streamState, responder)
watches.cases[w.index] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(responder),
}
open(w, req, responder)
}
default:
// Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL
Expand Down

0 comments on commit 3946fb8

Please sign in to comment.