From 1c3d23f1c4567c41a1e89a8697da30c87463a436 Mon Sep 17 00:00:00 2001 From: Rueian Date: Thu, 13 Jan 2022 23:54:05 +0800 Subject: [PATCH] server: test the deadlock between LinearCache and SOTW server (#530) Signed-off-by: Rueian --- pkg/server/v3/server_test.go | 63 ++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/pkg/server/v3/server_test.go b/pkg/server/v3/server_test.go index af349e0c8a..102d8fdb13 100644 --- a/pkg/server/v3/server_test.go +++ b/pkg/server/v3/server_test.go @@ -18,8 +18,11 @@ import ( "context" "errors" "fmt" + "math/rand" "reflect" + "strconv" "sync" + "sync/atomic" "testing" "time" @@ -667,3 +670,63 @@ func TestCallbackError(t *testing.T) { }) } } + +func TestSOTWLinearCacheIntegrationDeadLock(t *testing.T) { + for _, typ := range testTypes { + t.Run(typ, func(t *testing.T) { + t.Log("Integrating LinearCache with SOTW server. If this take too long, they might be dead locked") + + nonce := int64(0) + ver, targetVer := uint64(0), uint64(100000) + untilVerExceed := func(exceed uint64, fn func(current uint64)) { + for current := atomic.LoadUint64(&ver); current < exceed; current = atomic.LoadUint64(&ver) { + fn(current) + } + } + + config := cache.NewLinearCache(typ) + s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) + + resources := make([]string, 10) + for i := range resources { + resources[i] = strconv.Itoa(i) + } + + mStream := makeMockStream(t) + + go func() { + untilVerExceed(targetVer, func(current uint64) { + mStream.recv <- &discovery.DiscoveryRequest{ + Node: node, + TypeUrl: typ, + ResourceNames: resources, + VersionInfo: strconv.FormatUint(current, 10), + ResponseNonce: strconv.FormatInt(atomic.LoadInt64(&nonce), 10), + } + }) + close(mStream.recv) + }() + + go func() { + untilVerExceed(targetVer, func(current uint64) { + config.SetResources(map[string]types.Resource{ + resources[rand.Intn(len(resources))]: opaque, //nolint + }) + }) + }() + + go func() { + for resp := range mStream.sent { + v, _ := strconv.ParseUint(resp.VersionInfo, 10, 64) + atomic.StoreUint64(&ver, v) + n, _ := strconv.ParseInt(resp.Nonce, 10, 64) + atomic.StoreInt64(&nonce, n) + } + }() + + err := s.StreamAggregatedResources(mStream) + assert.Nil(t, err) + close(mStream.sent) + }) + } +}