Skip to content

Commit

Permalink
Added an option to exclude the list of workflows by Type (#1335)
Browse files Browse the repository at this point in the history
  • Loading branch information
agautam478 committed May 3, 2024
1 parent 7f81710 commit 73b12d5
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 2 deletions.
15 changes: 14 additions & 1 deletion internal/query_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,11 @@ var (

type (
// QueryBuilder builds visibility query. It's shadower's own Query builders that processes the shadow filter
// options into a query to pull t he required workflows.
// options into a query to pull the required workflows.

QueryBuilder interface {
WorkflowTypes([]string) QueryBuilder
ExcludeWorkflowTypes([]string) QueryBuilder
WorkflowStatus([]WorkflowStatus) QueryBuilder
StartTime(time.Time, time.Time) QueryBuilder
CloseTime(time.Time, time.Time) QueryBuilder
Expand All @@ -100,6 +101,18 @@ func (q *queryBuilderImpl) WorkflowTypes(types []string) QueryBuilder {
return q
}

func (q *queryBuilderImpl) ExcludeWorkflowTypes(types []string) QueryBuilder {
if len(types) == 0 {
return q
}
excludeTypeQueries := make([]string, 0, len(types))
for _, workflowType := range types {
excludeTypeQueries = append(excludeTypeQueries, fmt.Sprintf(keyWorkflowType+` != "%v"`, workflowType))
}
q.appendPartialQuery(strings.Join(excludeTypeQueries, " and "))
return q
}

func (q *queryBuilderImpl) WorkflowStatus(statuses []WorkflowStatus) QueryBuilder {
workflowStatusQueries := make([]string, 0, len(statuses))
for _, status := range statuses {
Expand Down
32 changes: 32 additions & 0 deletions internal/query_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,38 @@ func (s *queryBuilderSuite) TestStartTimeQuery() {
}
}

func (s *queryBuilderSuite) TestExcludeWorkflowTypesQuery() {
testCases := []struct {
msg string
excludeTypes []string
expectedQuery string
}{
{
msg: "empty excludeTypes",
excludeTypes: nil,
expectedQuery: "",
},
{
msg: "single excludeType",
excludeTypes: []string{"excludedWorkflowType"},
expectedQuery: `(WorkflowType != "excludedWorkflowType")`,
},
{
msg: "multiple excludeTypes",
excludeTypes: []string{"excludedWorkflowType1", "excludedWorkflowType2"},
expectedQuery: `(WorkflowType != "excludedWorkflowType1" and WorkflowType != "excludedWorkflowType2")`,
},
}

for _, test := range testCases {
s.T().Run(test.msg, func(t *testing.T) {
builder := NewQueryBuilder()
builder.ExcludeWorkflowTypes(test.excludeTypes)
s.Equal(test.expectedQuery, builder.Build())
})
}
}

func (s *queryBuilderSuite) TestMultipleFilters() {
maxStartTime := time.Now()
minStartTime := maxStartTime.Add(-time.Hour)
Expand Down
15 changes: 14 additions & 1 deletion internal/workflow_shadower.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ type (
// default: empty list, which matches all workflow types
WorkflowTypes []string

// Optional: A list of workflow type names that need to be excluded in the query.
// The list will be used to construct WorkflowQuery.The listed workflow types will be excluded from replay.
// default: empty list, which matches all workflow types
ExcludeTypes []string

// Optional: A list of workflow status.
// The list will be used to construct WorkflowQuery. Only workflows with status listed will be replayed.
// accepted values (case-insensitive): OPEN, CLOSED, ALL, COMPLETED, FAILED, CANCELED, TERMINATED, CONTINUED_AS_NEW, TIMED_OUT
Expand Down Expand Up @@ -310,7 +315,15 @@ func (o *ShadowOptions) validateAndPopulateFields() error {
}

if len(o.WorkflowQuery) == 0 {
queryBuilder := NewQueryBuilder().WorkflowTypes(o.WorkflowTypes)
queryBuilder := NewQueryBuilder()

if len(o.WorkflowTypes) > 0 {
queryBuilder.WorkflowTypes(o.WorkflowTypes)
}

if len(o.ExcludeTypes) > 0 {
queryBuilder.ExcludeWorkflowTypes(o.ExcludeTypes)
}

statuses := make([]WorkflowStatus, 0, len(o.WorkflowStatus))
for _, statusString := range o.WorkflowStatus {
Expand Down
26 changes: 26 additions & 0 deletions internal/workflow_shadower_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package internal

import (
"context"
"fmt"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -243,6 +245,30 @@ func (s *workflowShadowerSuite) TestShadowOptionsValidation() {
}
}

func (s *workflowShadowerSuite) TestShadowOptionsWithExcludeTypes() {
excludeTypes := []string{"excludedType1", "excludedType2"}
options := ShadowOptions{
WorkflowTypes: []string{"includedType1", "includedType2"},
ExcludeTypes: excludeTypes,
Mode: ShadowModeNormal,
}
expectedQuery := fmt.Sprintf(
`(WorkflowType = "includedType1" or WorkflowType = "includedType2") and (WorkflowType != "excludedType1" and WorkflowType != "excludedType2") and (CloseTime = missing)`,
)
shadower, err := NewWorkflowShadower(s.mockService, "testDomain", options, ReplayOptions{}, nil)
s.NoError(err)
s.mockService.EXPECT().
ScanWorkflowExecutions(gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, request *shared.ListWorkflowExecutionsRequest, opts ...interface{}) (*shared.ListWorkflowExecutionsResponse, error) {
s.Equal(expectedQuery, *request.Query)
return &shared.ListWorkflowExecutionsResponse{
Executions: nil,
NextPageToken: nil,
}, nil
}).Times(1)
s.NoError(shadower.shadowWorker())
}

func (s *workflowShadowerSuite) TestShadowWorkerExitCondition_ExpirationTime() {
totalWorkflows := 50
timePerWorkflow := 7 * time.Second
Expand Down

0 comments on commit 73b12d5

Please sign in to comment.