Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use GUID for summary queries during deployment #588

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
4 changes: 3 additions & 1 deletion api/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ var (
)

const (
FullGroupName = "symphony"
FullGroupName = "symphony"
TargetRuntimePrefix = "target-runtime"

// system annotations, reserved and should not be modified by client.
AzureCorrelationIdKey = "management.azure.com/correlationId"
Expand All @@ -28,6 +29,7 @@ const (
AzureResourceIdKey = "management.azure.com/resourceId"
AzureSystemDataKey = "management.azure.com/systemData"
AzureTenantIdKey = "management.azure.com/tenantId" // Not used
GuidKey = "Guid"
RunningAzureCorrelationIdKey = "management.azure.com/runningCorrelationId"
SummaryJobIdKey = "SummaryJobIdKey"
OperationStartTimeKeyPostfix = FullGroupName + "/started-at" // instance/target
Expand Down
3 changes: 2 additions & 1 deletion api/pkg/apis/v1alpha1/managers/jobs/jobs-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"fmt"
"time"

"github.com/eclipse-symphony/symphony/api/constants"
"github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/model"
"github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/utils"
api_utils "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/utils"
Expand Down Expand Up @@ -328,7 +329,7 @@ func (s *JobsManager) DelayOrSkipJob(ctx context.Context, namespace string, obje

key := "h_" + job.Id
if objectType == "target" {
key = fmt.Sprintf("h_%s-%s", "target-runtime", job.Id)
key = fmt.Sprintf("h_%s-%s", constants.TargetRuntimePrefix, job.Id)
}
//check if a manager is working on the job
var entry states.StateEntry
Expand Down
34 changes: 20 additions & 14 deletions api/pkg/apis/v1alpha1/managers/solution/solution-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func (s *SolutionManager) GetSummary(ctx context.Context, key string, namespace
return model.SummaryResult{}, err
}

log.InfofCtx(ctx, " M (Solution): get summary, key: %s, namespace: %s, summary: %+v", key, namespace, result)
return result, nil
}

Expand Down Expand Up @@ -324,8 +325,13 @@ func (s *SolutionManager) Reconcile(ctx context.Context, deployment model.Deploy
deploymentType = DeploymentType_Delete
}
summary.IsRemoval = remove
guid := deployment.Instance.ObjectMeta.GetGuid()
if guid == "" {
log.ErrorfCtx(ctx, " M (Solution): object GUID is null: %+v", err)
return summary, err
}

err = s.saveSummaryProgress(ctx, deployment.Instance.ObjectMeta.Name, deployment.Generation, deployment.Hash, summary, namespace)
err = s.saveSummaryProgress(ctx, guid, deployment.Generation, deployment.Hash, summary, namespace)
if err != nil {
log.ErrorfCtx(ctx, " M (Solution): failed to save summary progress: %+v", err)
return summary, err
Expand All @@ -336,7 +342,7 @@ func (s *SolutionManager) Reconcile(ctx context.Context, deployment model.Deploy
if deployment.IsDryRun {
summary.SuccessCount = 0
}
s.concludeSummary(ctx, deployment.Instance.ObjectMeta.Name, deployment.Generation, deployment.Hash, summary, namespace)
s.concludeSummary(ctx, guid, deployment.Generation, deployment.Hash, summary, namespace)
} else {
log.ErrorfCtx(ctx, " M (Solution): panic happens: %v", debug.Stack())
panic(r)
Expand Down Expand Up @@ -424,7 +430,7 @@ func (s *SolutionManager) Reconcile(ctx context.Context, deployment model.Deploy
summary.PlannedDeployment += len(step.Components)
}
summary.CurrentDeployed = 0
err = s.saveSummaryProgress(ctx, deployment.Instance.ObjectMeta.Name, deployment.Generation, deployment.Hash, summary, namespace)
err = s.saveSummaryProgress(ctx, guid, deployment.Generation, deployment.Hash, summary, namespace)
if err != nil {
log.ErrorfCtx(ctx, " M (Solution): failed to save summary progress: %+v", err)
return summary, err
Expand Down Expand Up @@ -522,7 +528,7 @@ func (s *SolutionManager) Reconcile(ctx context.Context, deployment model.Deploy
targetResult[step.Target] = 1
summary.AllAssignedDeployed = plannedCount == planSuccessCount
summary.UpdateTargetResult(step.Target, model.TargetResultSpec{Status: "OK", Message: "", ComponentResults: componentResults})
err = s.saveSummaryProgress(ctx, deployment.Instance.ObjectMeta.Name, deployment.Generation, deployment.Hash, summary, namespace)
err = s.saveSummaryProgress(ctx, guid, deployment.Generation, deployment.Hash, summary, namespace)
if err != nil {
log.ErrorfCtx(ctx, " M (Solution): failed to save summary progress: %+v", err)
return summary, err
Expand Down Expand Up @@ -559,7 +565,7 @@ func (s *SolutionManager) Reconcile(ctx context.Context, deployment model.Deploy
}
planSuccessCount++
summary.CurrentDeployed += len(step.Components)
err = s.saveSummaryProgress(ctx, deployment.Instance.ObjectMeta.Name, deployment.Generation, deployment.Hash, summary, namespace)
err = s.saveSummaryProgress(ctx, guid, deployment.Generation, deployment.Hash, summary, namespace)
if err != nil {
log.ErrorfCtx(ctx, " M (Solution): failed to save summary progress: %+v", err)
return summary, err
Expand Down Expand Up @@ -636,11 +642,11 @@ func (s *SolutionManager) getTargetStateForStep(step model.DeploymentStep, deplo
return targetSpec
}

func (s *SolutionManager) saveSummary(ctx context.Context, objectName string, generation string, hash string, summary model.SummarySpec, state model.SummaryState, namespace string) error {
func (s *SolutionManager) saveSummary(ctx context.Context, objectGuid string, generation string, hash string, summary model.SummarySpec, state model.SummaryState, namespace string) error {
// TODO: delete this state when time expires. This should probably be invoked by the vendor (via GetSummary method, for instance)
log.DebugfCtx(ctx, " M (Solution): saving summary, objectName: %s, state: %s, namespace: %s, jobid: %s, hash %s, targetCount %d, successCount %d",
objectName, state, namespace, summary.JobID, hash, summary.TargetCount, summary.SuccessCount)
oldSummary, err := s.GetSummary(ctx, objectName, namespace)
log.DebugfCtx(ctx, " M (Solution): saving summary, objectGuid: %s, state: %v, namespace: %s, jobid: %s, hash %s, targetCount %d, successCount %d",
linyguo marked this conversation as resolved.
Show resolved Hide resolved
objectGuid, state, namespace, summary.JobID, hash, summary.TargetCount, summary.SuccessCount)
oldSummary, err := s.GetSummary(ctx, objectGuid, namespace)
if err != nil && !v1alpha2.IsNotFound(err) {
log.ErrorfCtx(ctx, " M (Solution): failed to get previous summary: %+v", err)
return err
Expand All @@ -664,7 +670,7 @@ func (s *SolutionManager) saveSummary(ctx context.Context, objectName string, ge
}
_, err = s.StateProvider.Upsert(ctx, states.UpsertRequest{
Value: states.StateEntry{
ID: fmt.Sprintf("%s-%s", "summary", objectName),
ID: fmt.Sprintf("%s-%s", "summary", objectGuid),
Body: model.SummaryResult{
Summary: summary,
Generation: generation,
Expand All @@ -683,12 +689,12 @@ func (s *SolutionManager) saveSummary(ctx context.Context, objectName string, ge
return err
}

func (s *SolutionManager) saveSummaryProgress(ctx context.Context, objectName string, generation string, hash string, summary model.SummarySpec, namespace string) error {
return s.saveSummary(ctx, objectName, generation, hash, summary, model.SummaryStateRunning, namespace)
func (s *SolutionManager) saveSummaryProgress(ctx context.Context, objectGuid string, generation string, hash string, summary model.SummarySpec, namespace string) error {
return s.saveSummary(ctx, objectGuid, generation, hash, summary, model.SummaryStateRunning, namespace)
}

func (s *SolutionManager) concludeSummary(ctx context.Context, objectName string, generation string, hash string, summary model.SummarySpec, namespace string) error {
return s.saveSummary(ctx, objectName, generation, hash, summary, model.SummaryStateDone, namespace)
func (s *SolutionManager) concludeSummary(ctx context.Context, objectGuid string, generation string, hash string, summary model.SummarySpec, namespace string) error {
return s.saveSummary(ctx, objectGuid, generation, hash, summary, model.SummaryStateDone, namespace)
}

func (s *SolutionManager) canSkipStep(ctx context.Context, step model.DeploymentStep, target string, provider tgt.ITargetProvider, previousComponents []model.ComponentSpec, currentState model.DeploymentState) bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,8 @@ func TestMockGet(t *testing.T) {
},
},
}
guid := uuid.New().String()
deployment.Instance.ObjectMeta.SetGuid(guid)
targetProvider := &mock.MockTargetProvider{}
targetProvider.Init(mock.MockTargetProviderConfig{})
stateProvider := &memorystate.MemoryStateProvider{}
Expand All @@ -372,7 +374,7 @@ func TestMockGet(t *testing.T) {
assert.Equal(t, 0, len(components))
assert.Equal(t, 0, len(state.TargetComponent))

_, err = manager.GetSummary(context.Background(), "", "default")
_, err = manager.GetSummary(context.Background(), guid, "default")
assert.NotNil(t, err)

_, err = manager.Reconcile(context.Background(), deployment, false, "default", "")
Expand All @@ -387,17 +389,17 @@ func TestMockGet(t *testing.T) {
assert.Equal(t, "mock", state.TargetComponent["a::T1"])
assert.Equal(t, "mock", state.TargetComponent["b::T1"])

_, err = manager.GetSummary(context.Background(), "", "default")
_, err = manager.GetSummary(context.Background(), guid, "default")
assert.Nil(t, err)

// Test reconcile idempotency
_, err = manager.Reconcile(context.Background(), deployment, false, "default", "")
assert.Nil(t, err)

// Test summary deletion
err = manager.DeleteSummary(context.Background(), "", "default")
err = manager.DeleteSummary(context.Background(), guid, "default")
assert.Nil(t, err)
_, err = manager.GetSummary(context.Background(), "", "default")
_, err = manager.GetSummary(context.Background(), guid, "default")
assert.NotNil(t, err)
}
func TestMockGetTwoTargets(t *testing.T) {
Expand Down Expand Up @@ -464,6 +466,7 @@ func TestMockGetTwoTargets(t *testing.T) {
},
},
}
deployment.Instance.ObjectMeta.SetGuid(uuid.New().String())
targetProvider := &mock.MockTargetProvider{}
targetProvider.Init(mock.MockTargetProviderConfig{ID: id})
stateProvider := &memorystate.MemoryStateProvider{}
Expand Down Expand Up @@ -560,6 +563,7 @@ func TestMockGetTwoTargetsTwoProviders(t *testing.T) {
},
},
}
deployment.Instance.ObjectMeta.SetGuid(uuid.New().String())
targetProvider := &mock.MockTargetProvider{}
targetProvider.Init(mock.MockTargetProviderConfig{ID: id})
stateProvider := &memorystate.MemoryStateProvider{}
Expand Down Expand Up @@ -636,6 +640,7 @@ func TestMockApply(t *testing.T) {
},
},
}
deployment.Instance.ObjectMeta.SetGuid(uuid.New().String())
targetProvider := &mock.MockTargetProvider{}
targetProvider.Init(mock.MockTargetProviderConfig{ID: id})
stateProvider := &memorystate.MemoryStateProvider{}
Expand Down Expand Up @@ -698,6 +703,7 @@ func TestMockApplyMultiRoles(t *testing.T) {
},
},
}
deployment.Instance.ObjectMeta.SetGuid(uuid.New().String())
targetProvider := &mock.MockTargetProvider{}
targetProvider2 := &mock.MockTargetProvider{}
targetProvider.Init(mock.MockTargetProviderConfig{ID: id1})
Expand Down Expand Up @@ -759,6 +765,7 @@ func TestMockApplyWithUpdateAndRemove(t *testing.T) {
},
},
}
deployment.Instance.ObjectMeta.SetGuid(uuid.New().String())
targetProvider := &mock.MockTargetProvider{}
targetProvider.Init(mock.MockTargetProviderConfig{ID: id})
stateProvider := &memorystate.MemoryStateProvider{}
Expand Down Expand Up @@ -812,6 +819,7 @@ func TestMockApplyWithError(t *testing.T) {
},
},
}
deployment.Instance.ObjectMeta.SetGuid(uuid.New().String())
targetProvider := &mock.MockTargetProvider{}
targetProvider.Init(mock.MockTargetProviderConfig{ID: id})
stateProvider := &memorystate.MemoryStateProvider{}
Expand Down
18 changes: 18 additions & 0 deletions api/pkg/apis/v1alpha1/model/objectmeta.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ func (c ObjectMeta) DeepEquals(other IDeepEquals) (bool, error) {
return false, errors.New("parameter is not a ObjectMeta type")
}

if c.GetGuid() != otherC.GetGuid() {
return false, nil
}

if c.Name != otherC.Name {
return false, nil
}
Expand Down Expand Up @@ -150,3 +154,17 @@ func (c *ObjectMeta) PreserveSystemMetadata(metadata ObjectMeta) {
}
}
}

func (c *ObjectMeta) GetGuid() string {
if c.Annotations == nil {
return ""
}
return c.Annotations[constants.GuidKey]
}

func (c *ObjectMeta) SetGuid(guid string) {
if c.Annotations == nil {
c.Annotations = make(map[string]string)
}
c.Annotations[constants.GuidKey] = guid
}
41 changes: 36 additions & 5 deletions api/pkg/apis/v1alpha1/providers/stage/create/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ import (
"github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/model"
"github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/providers/metrics"
"github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/providers/stage"
"github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/utils"
api_utils "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/utils"
"github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2"
"github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/contexts"
"github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/observability"
observ_utils "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/observability/utils"
"github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers"
"github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/utils"
"github.com/eclipse-symphony/symphony/coa/pkg/logger"
)

Expand Down Expand Up @@ -98,7 +98,7 @@ func toSymphonyStageProviderConfig(config providers.IProviderConfig) (CreateStag
if err != nil {
return ret, err
}
err = json.Unmarshal(data, &ret)
err = utils.Unmarshal[CreateStageProviderConfig](data, &ret)
linyguo marked this conversation as resolved.
Show resolved Hide resolved
return ret, err
}
func (i *CreateStageProvider) InitWithMap(properties map[string]string) error {
Expand Down Expand Up @@ -200,7 +200,7 @@ func (i *CreateStageProvider) Process(ctx context.Context, mgrContext contexts.M
return nil, false, err
}
for ic := 0; ic < i.Config.WaitCount; ic++ {
remainings := utils.FilterIncompleteDelete(ctx, &i.ApiClient, objectNamespace, []string{objectName}, true, i.Config.User, i.Config.Password)
remainings := api_utils.FilterIncompleteDelete(ctx, &i.ApiClient, objectNamespace, []string{objectName}, true, i.Config.User, i.Config.Password)
if len(remainings) == 0 {
return outputs, false, nil
}
Expand All @@ -219,7 +219,7 @@ func (i *CreateStageProvider) Process(ctx context.Context, mgrContext contexts.M
return nil, false, err
} else if strings.EqualFold(action, CreateAction) {
var instanceState model.InstanceState
err = json.Unmarshal(objectData, &instanceState)
err = utils.Unmarshal[model.InstanceState](objectData, &instanceState)
if err != nil {
mLog.ErrorfCtx(ctx, "Failed to unmarshal instance state %s: %s", objectName, err.Error())
providerOperationMetrics.ProviderOperationErrors(
Expand Down Expand Up @@ -277,8 +277,39 @@ func (i *CreateStageProvider) Process(ctx context.Context, mgrContext contexts.M
mLog.ErrorfCtx(ctx, " P (Create Stage) process failed, failed to create instance: %+v", err)
return nil, false, err
}

// check guid after instance created
ret, err := i.ApiClient.GetInstance(ctx, instanceState.ObjectMeta.Name, instanceState.ObjectMeta.Namespace, i.Config.User, i.Config.Password)
if err != nil {
mLog.ErrorfCtx(ctx, "Failed to get instance %s: %s", instanceState.ObjectMeta.Name, err.Error())
providerOperationMetrics.ProviderOperationErrors(
create,
functionName,
metrics.ProcessOperation,
metrics.RunOperationType,
v1alpha2.InstanceGetFailed.String(),
)
return outputs, false, err
}
objectGuid := ret.ObjectMeta.GetGuid()
if objectGuid == "" {
mLog.ErrorfCtx(ctx, "Instance GUID is empty: - %s", objectName)
providerOperationMetrics.ProviderOperationErrors(
create,
functionName,
metrics.ProcessOperation,
metrics.RunOperationType,
v1alpha2.CreateInstanceFailed.String(),
)
return outputs, false, v1alpha2.NewCOAError(nil, fmt.Sprintf("Empty instance guid: - %s", objectName), v1alpha2.BadRequest)
}

for ic := 0; ic < i.Config.WaitCount; ic++ {
remaining, failed := utils.FilterIncompleteDeploymentUsingSummary(ctx, &i.ApiClient, objectNamespace, []string{objectName}, true, i.Config.User, i.Config.Password)
obj := api_utils.ObjectInfo{
Name: objectName,
Guid: objectGuid,
}
remaining, failed := api_utils.FilterIncompleteDeploymentUsingSummary(ctx, &i.ApiClient, objectNamespace, []api_utils.ObjectInfo{obj}, true, i.Config.User, i.Config.Password)
if len(remaining) == 0 {
outputs["objectType"] = objectType
outputs["objectName"] = objectName
Expand Down
Loading
Loading