Skip to content

Commit

Permalink
Fix discovery race condition in vsphere input (influxdata#5217)
Browse files Browse the repository at this point in the history
  • Loading branch information
prydin authored and bitcharmer committed Oct 18, 2019
1 parent 1ccb9a2 commit 29534c2
Showing 1 changed file with 11 additions and 9 deletions.
20 changes: 11 additions & 9 deletions plugins/inputs/vsphere/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const maxMetadataSamples = 100 // Number of resources to sample for metric metad
type Endpoint struct {
Parent *VSphere
URL *url.URL
resourceKinds map[string]resourceKind
resourceKinds map[string]*resourceKind
hwMarks *TSCache
lun2ds map[string]string
discoveryTicker *time.Ticker
Expand Down Expand Up @@ -107,7 +107,7 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint,
clientFactory: NewClientFactory(ctx, url, parent),
}

e.resourceKinds = map[string]resourceKind{
e.resourceKinds = map[string]*resourceKind{
"datacenter": {
name: "datacenter",
pKey: "dcname",
Expand Down Expand Up @@ -363,6 +363,7 @@ func (e *Endpoint) discover(ctx context.Context) error {
numRes := int64(0)

// Populate resource objects, and endpoint instance info.
newObjects := make(map[string]objectMap)
for k, res := range e.resourceKinds {
log.Printf("D! [input.vsphere] Discovering resources for %s", res.name)
// Need to do this for all resource types even if they are not enabled
Expand All @@ -385,13 +386,12 @@ func (e *Endpoint) discover(ctx context.Context) error {
// No need to collect metric metadata if resource type is not enabled
if res.enabled {
if res.simple {
e.simpleMetadataSelect(ctx, client, &res)
e.simpleMetadataSelect(ctx, client, res)
} else {
e.complexMetadataSelect(ctx, &res, objects, metricNames)
e.complexMetadataSelect(ctx, res, objects, metricNames)
}
}
res.objects = objects
resourceKinds[k] = res
newObjects[k] = objects

SendInternalCounterWithTags("discovered_objects", e.URL.Host, map[string]string{"type": res.name}, int64(len(objects)))
numRes += int64(len(objects))
Expand All @@ -413,7 +413,9 @@ func (e *Endpoint) discover(ctx context.Context) error {
e.collectMux.Lock()
defer e.collectMux.Unlock()

e.resourceKinds = resourceKinds
for k, v := range newObjects {
e.resourceKinds[k].objects = v
}
e.lun2ds = l2d

sw.Stop()
Expand Down Expand Up @@ -793,9 +795,9 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc
latestSample := time.Time{}

// Divide workload into chunks and process them concurrently
e.chunkify(ctx, &res, now, latest, acc,
e.chunkify(ctx, res, now, latest, acc,
func(chunk []types.PerfQuerySpec) {
n, localLatest, err := e.collectChunk(ctx, chunk, &res, acc, now, estInterval)
n, localLatest, err := e.collectChunk(ctx, chunk, res, acc, now, estInterval)
log.Printf("D! [inputs.vsphere] CollectChunk for %s returned %d metrics", resourceType, n)
if err != nil {
acc.AddError(errors.New("While collecting " + res.name + ": " + err.Error()))
Expand Down

0 comments on commit 29534c2

Please sign in to comment.