Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cancel token for reconcile #541

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
391 changes: 256 additions & 135 deletions api/pkg/apis/v1alpha1/managers/solution/solution-manager.go

Large diffs are not rendered by default.

23 changes: 23 additions & 0 deletions api/pkg/apis/v1alpha1/utils/apiclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type (
Dispatcher interface {
QueueJob(ctx context.Context, id string, namespace string, isDelete bool, isTarget bool, user string, password string) error
QueueDeploymentJob(ctx context.Context, namespace string, isDelete bool, deployment model.DeploymentSpec, user string, password string) error
CancelDeploymentJob(ctx context.Context, id string, jobId string, namespace string, user string, password string) error
}

ApiClient interface {
Expand Down Expand Up @@ -533,6 +534,28 @@ func (a *apiClient) QueueDeploymentJob(ctx context.Context, namespace string, is
return nil
}

func (a *apiClient) CancelDeploymentJob(ctx context.Context, id string, jobId string, namespace string, user string, password string) error {
// func (a *apiClient) CancelDeploymentJob(ctx context.Context, namespace string, deployment model.DeploymentSpec) error {
token, err := a.tokenProvider(ctx, a.baseUrl, a.client, user, password)
if err != nil {
return err
}

path := "solution/cancel"
query := url.Values{
"namespace": []string{namespace},
"instance": []string{id},
"jobid": []string{jobId},
}

log.DebugfCtx(ctx, "apiClient.CancelDeploymentJob: Deployment id: %s, namespace: %v", id, namespace)
_, err = a.callRestAPI(ctx, fmt.Sprintf("%s?%s", path, query.Encode()), "POST", nil, token)
if err != nil {
return err
}
return nil
}

// Deprecated: Use QueueDeploymentJob instead
func (a *apiClient) QueueJob(ctx context.Context, id string, namespace string, isDelete bool, isTarget bool, user string, password string) error {
token, err := a.tokenProvider(ctx, a.baseUrl, a.client, user, password)
Expand Down
4 changes: 0 additions & 4 deletions api/pkg/apis/v1alpha1/vendors/job-vendor.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,6 @@ func (e *JobVendor) Init(config vendors.VendorConfig, factories []managers.IMana
},
})

if err != nil {
linyguo marked this conversation as resolved.
Show resolved Hide resolved
return err
}

return nil
}

Expand Down
62 changes: 60 additions & 2 deletions api/pkg/apis/v1alpha1/vendors/solution-vendor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/eclipse-symphony/symphony/api/constants"
"github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/managers/solution"
Expand All @@ -30,6 +31,10 @@ type SolutionVendor struct {
SolutionManager *solution.SolutionManager
}

const (
defaultTimeout = 60 * time.Minute
)

func (o *SolutionVendor) GetInfo() vendors.VendorInfo {
return vendors.VendorInfo{
Version: o.Vendor.Version,
Expand All @@ -51,6 +56,7 @@ func (e *SolutionVendor) Init(config vendors.VendorConfig, factories []managers.
if e.SolutionManager == nil {
return v1alpha2.NewCOAError(nil, "solution manager is not supplied", v1alpha2.MissingConfig)
}
e.SolutionManager.InitCancelMap()
return nil
}

Expand All @@ -73,6 +79,12 @@ func (o *SolutionVendor) GetEndpoints() []v1alpha2.Endpoint {
Parameters: []string{"delete?"},
Handler: o.onReconcile,
},
{
Methods: []string{fasthttp.MethodPost},
Route: route + "/cancel",
Version: o.Version,
Handler: o.onCancel,
},
{
Methods: []string{fasthttp.MethodGet, fasthttp.MethodPost, fasthttp.MethodDelete},
Route: route + "/queue",
Expand All @@ -81,6 +93,7 @@ func (o *SolutionVendor) GetEndpoints() []v1alpha2.Endpoint {
},
}
}

func (c *SolutionVendor) onQueue(request v1alpha2.COARequest) v1alpha2.COAResponse {
rContext, span := observability.StartSpan("Solution Vendor", request.Context, &map[string]string{
"method": "onQueue",
Expand Down Expand Up @@ -155,6 +168,16 @@ func (c *SolutionVendor) onQueue(request v1alpha2.COARequest) v1alpha2.COARespon
})
}
instance = deployment.Instance.ObjectMeta.Name

if delete == "true" {
// cancel the jobs in queue
sLog.InfofCtx(rContext, "V (Solution): onQueue, delete instance: %s, job id: %s", instance, deployment.JobID)
c.SolutionManager.CancelPreviousJobs(rContext, namespace, instance, deployment.JobID)
} else {
// track the job id for an ongoing job
sLog.InfofCtx(rContext, "V (Solution): onQueue, add tracking job id for instance: %s, job id: %s", instance, deployment.JobID)
c.SolutionManager.TrackJob(rContext, namespace, instance, deployment.JobID)
}
}

if instance == "" {
Expand Down Expand Up @@ -245,14 +268,29 @@ func (c *SolutionVendor) onReconcile(request v1alpha2.COARequest) v1alpha2.COARe
Body: []byte(err.Error()),
})
}
delete := request.Parameters["delete"]
isRemove := request.Parameters["delete"]
targetName := ""
if request.Metadata != nil {
if v, ok := request.Metadata["active-target"]; ok {
targetName = v
}
}
summary, err := c.SolutionManager.Reconcile(ctx, deployment, delete == "true", namespace, targetName)

instance := deployment.Instance.ObjectMeta.Name
sLog.InfofCtx(ctx, "V (Solution): onReconcile create context with timeout, instance: %s, job id: %s, isRemove: %s", instance, deployment.JobID, isRemove)
linyguo marked this conversation as resolved.
Show resolved Hide resolved
cancelCtx, cancel := context.WithTimeout(ctx, defaultTimeout)
if isRemove != "true" {
c.SolutionManager.AddCancelFunc(ctx, namespace, instance, deployment.JobID, cancel)
}
defer func() {
log.InfofCtx(rContext, "V (Solution): onReconcile complete, namespace: %s, instance: %s, job id: %s, isRemove: %s", namespace, instance, deployment.JobID, isRemove)
cancel()
if isRemove != "true" {
c.SolutionManager.UntrackJob(rContext, namespace, instance, deployment.JobID)
}
linyguo marked this conversation as resolved.
Show resolved Hide resolved
}()

summary, err := c.SolutionManager.Reconcile(cancelCtx, deployment, isRemove == "true", namespace, targetName)
data, _ := json.Marshal(summary)
if err != nil {
sLog.ErrorfCtx(ctx, "V (Solution): onReconcile failed POST - reconcile %s", err.Error())
Expand All @@ -275,6 +313,26 @@ func (c *SolutionVendor) onReconcile(request v1alpha2.COARequest) v1alpha2.COARe
})
}

func (c *SolutionVendor) onCancel(request v1alpha2.COARequest) v1alpha2.COAResponse {
rContext, span := observability.StartSpan("Solution Vendor", request.Context, &map[string]string{
"method": "onCancel",
})
defer span.End()

namespace := request.Parameters["namespace"]
instance := request.Parameters["instance"]
jobId := request.Parameters["jobId"]

sLog.InfofCtx(rContext, "V (Solution): onCancel instance: %s job ID: %s", instance, jobId)
c.SolutionManager.CancelPreviousJobs(rContext, namespace, instance, jobId)

return observ_utils.CloseSpanWithCOAResponse(span, v1alpha2.COAResponse{
State: v1alpha2.OK,
Body: []byte("{\"result\":\"200 - OK\"}"),
ContentType: "application/json",
})
}

func (c *SolutionVendor) onApplyDeployment(request v1alpha2.COARequest) v1alpha2.COAResponse {
rContext, span := observability.StartSpan("Solution Vendor", request.Context, &map[string]string{
"method": "onApplyDeployment",
Expand Down
2 changes: 1 addition & 1 deletion api/pkg/apis/v1alpha1/vendors/solution-vendor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestSolutionEndpoints(t *testing.T) {
vendor := createSolutionVendor()
vendor.Route = "solution"
endpoints := vendor.GetEndpoints()
assert.Equal(t, 3, len(endpoints))
assert.Equal(t, 4, len(endpoints))
}

func TestSolutionInfo(t *testing.T) {
Expand Down
9 changes: 9 additions & 0 deletions coa/pkg/apis/v1alpha2/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ func IsNotFound(err error) bool {
}
return coaE.State == NotFound
}

func IsCanceled(err error) bool {
coaE, ok := err.(COAError)
if !ok {
return false
}
return coaE.State == Canceled
}

func IsDelayed(err error) bool {
coaE, ok := err.(COAError)
if !ok {
Expand Down
3 changes: 3 additions & 0 deletions coa/pkg/apis/v1alpha2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const (
ValidateFailed State = 8003
Updated State = 8004
Deleted State = 8005
Canceled State = 8006
// Workflow status
Running State = 9994
Paused State = 9995
Expand Down Expand Up @@ -209,6 +210,8 @@ func (s State) String() string {
return "Updated"
case Deleted:
return "Deleted"
case Canceled:
return "Canceled"
case Running:
return "Running"
case Paused:
Expand Down
6 changes: 6 additions & 0 deletions k8s/testing/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,12 @@ func (c *MockApiClient) QueueDeploymentJob(ctx context.Context, namespace string
return args.Error(0)
}

// CancelDeploymentJob implements utils.ApiClient.
func (c *MockApiClient) CancelDeploymentJob(ctx context.Context, id string, jobId string, namespace string, user string, password string) error {
args := c.Called(ctx, namespace, id, jobId, namespace)
return args.Error(0)
}

// QueueJob implements ApiClient.
// Deprecated and not used.
func (c *MockApiClient) QueueJob(ctx context.Context, id string, scope string, isDelete bool, isTarget bool, user string, password string) error {
Expand Down
Loading