Skip to content

Commit

Permalink
Add debugger interface to allow extracting the workerStats from Worker
Browse files Browse the repository at this point in the history
  • Loading branch information
ketsiambaku committed Sep 2, 2024
1 parent 3d320e7 commit d3de151
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 0 deletions.
4 changes: 4 additions & 0 deletions debug/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,8 @@ type (
// Activities is a list of executing activities on the worker
// Deprecated: in development and very likely to change
Activities = internal.Activities

// Debugger exposes stats collected on a running Worker
// Deprecated: in development and very likely to change
Debugger = internal.Debugger
)
6 changes: 6 additions & 0 deletions internal/common/debug/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,10 @@ type (
Info ActivityInfo
Count int64
}

// Debugger exposes stats collected on a running Worker
// Deprecated: in development and very likely to change
Debugger interface {
GetWorkerStats() WorkerStats
}
)
8 changes: 8 additions & 0 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,8 +804,11 @@ type aggregatedWorker struct {
shadowWorker *shadowWorker
logger *zap.Logger
registry *registry
workerstats debug.WorkerStats
}

var _ debug.Debugger = &aggregatedWorker{}

func (aw *aggregatedWorker) GetRegisteredWorkflows() []RegistryWorkflowInfo {
workflows := aw.registry.GetRegisteredWorkflows()
var result []RegistryWorkflowInfo
Expand Down Expand Up @@ -1010,6 +1013,10 @@ func (aw *aggregatedWorker) Stop() {
aw.logger.Info("Stopped Worker")
}

func (aw *aggregatedWorker) GetWorkerStats() debug.WorkerStats {
return aw.workerstats
}

// AggregatedWorker returns an instance to manage the workers. Use defaultConcurrentPollRoutineSize (which is 2) as
// poller size. The typical RTT (round-trip time) is below 1ms within data center. And the poll API latency is about 5ms.
// With 2 poller, we could achieve around 300~400 RPS.
Expand Down Expand Up @@ -1148,6 +1155,7 @@ func newAggregatedWorker(
shadowWorker: shadowWorker,
logger: logger,
registry: registry,
workerstats: workerParams.WorkerStats,
}
}

Expand Down
2 changes: 2 additions & 0 deletions internal/internal_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,7 @@ func TestWorkerOptionDefaults(t *testing.T) {
require.NotNil(t, activityWorker.executionParameters.MetricsScope)
require.Nil(t, activityWorker.executionParameters.ContextPropagators)
assertWorkerExecutionParamsEqual(t, expected, activityWorker.executionParameters)
assert.Equal(t, expected.WorkerStats, aggWorker.GetWorkerStats())
}

func TestWorkerOptionNonDefaults(t *testing.T) {
Expand Down Expand Up @@ -1176,6 +1177,7 @@ func TestWorkerOptionNonDefaults(t *testing.T) {
activityWorker := aggWorker.activityWorker
require.True(t, len(activityWorker.executionParameters.ContextPropagators) > 0)
assertWorkerExecutionParamsEqual(t, expected, activityWorker.executionParameters)
assert.Equal(t, expected.WorkerStats, aggWorker.GetWorkerStats())
}

func assertWorkerExecutionParamsEqual(t *testing.T, paramsA workerExecutionParameters, paramsB workerExecutionParameters) {
Expand Down

0 comments on commit d3de151

Please sign in to comment.