Skip to content

Commit

Permalink
ingester: clean up chunk transfer code
Browse files Browse the repository at this point in the history
  • Loading branch information
rfratto committed Jul 24, 2019
1 parent f8155a1 commit 15cbba7
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 16 deletions.
2 changes: 1 addition & 1 deletion pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (i *Ingester) flushLoop(j int) {
}

func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error {
instance, ok, _ := i.getInstanceByID(userID)
instance, ok := i.getInstanceByID(userID)
if !ok {
return nil
}
Expand Down
26 changes: 12 additions & 14 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,21 +162,19 @@ func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logpro
instanceID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}

instance, readonly := i.getOrCreateInstance(instanceID)
if readonly {
} else if i.readonly {
return nil, ErrReadOnly
}

instance := i.getOrCreateInstance(instanceID)
err = instance.Push(ctx, req)
return &logproto.PushResponse{}, err
}

func (i *Ingester) getOrCreateInstance(instanceID string) (instance *instance, readonly bool) {
inst, ok, readonly := i.getInstanceByID(instanceID)
if ok || readonly {
return inst, readonly
func (i *Ingester) getOrCreateInstance(instanceID string) *instance {
inst, ok := i.getInstanceByID(instanceID)
if ok || i.readonly {
return inst
}

i.instancesMtx.Lock()
Expand All @@ -186,7 +184,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) (instance *instance, r
inst = newInstance(instanceID, i.cfg.BlockSize)
i.instances[instanceID] = inst
}
return inst, i.readonly
return inst
}

// Query the ingests for log streams matching a set of matchers.
Expand All @@ -196,7 +194,7 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie
return err
}

instance, _ := i.getOrCreateInstance(instanceID)
instance := i.getOrCreateInstance(instanceID)
return instance.Query(req, queryServer)
}

Expand All @@ -207,7 +205,7 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp
return nil, err
}

instance, _ := i.getOrCreateInstance(instanceID)
instance := i.getOrCreateInstance(instanceID)
return instance.Label(ctx, req)
}

Expand Down Expand Up @@ -236,12 +234,12 @@ func (i *Ingester) ReadinessHandler(w http.ResponseWriter, r *http.Request) {
}
}

func (i *Ingester) getInstanceByID(id string) (instance *instance, ok bool, readonly bool) {
func (i *Ingester) getInstanceByID(id string) (*instance, bool) {
i.instancesMtx.RLock()
defer i.instancesMtx.RUnlock()

inst, ok := i.instances[id]
return inst, ok, i.readonly
return inst, ok
}

func (i *Ingester) getInstances() []*instance {
Expand All @@ -268,7 +266,7 @@ func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_
return err
}

instance, _ := i.getOrCreateInstance(instanceID)
instance := i.getOrCreateInstance(instanceID)
tailer, err := newTailer(instanceID, req.Query, req.Regex, queryServer)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer)
lbls = append(lbls, client.LabelAdapter{Name: lbl.Name, Value: lbl.Value})
}

instance, _ := i.getOrCreateInstance(chunkSet.UserId)
instance := i.getOrCreateInstance(chunkSet.UserId)
for _, chunk := range chunkSet.Chunks {
if err := instance.consumeChunk(userCtx, lbls, chunk); err != nil {
return err
Expand Down

0 comments on commit 15cbba7

Please sign in to comment.