Skip to content

Commit

Permalink
feat: use channels instead of map
Browse files Browse the repository at this point in the history
maps will cause issues for long running programs, see
golang/go#20135
  • Loading branch information
fholzer committed Oct 26, 2022
1 parent 1f38df2 commit 79b2bcd
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 49 deletions.
90 changes: 43 additions & 47 deletions parseq.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
// that respects the order of input.
package parseq

import "sync"
import (
"sync"
)

type ParSeq struct {
// Input is the channel the client code should send to.
Expand All @@ -14,27 +16,33 @@ type ParSeq struct {
Output chan interface{}

parallelism int
order int64
unresolved []int64
l sync.Mutex
work chan input
outs chan output
process func(interface{}) interface{}
work []chan interface{}
outs []chan interface{}
process ProcessFunc
wg sync.WaitGroup
}

type ProcessFunc func(interface{}) interface{}

// New returns a new ParSeq. Processing doesn't begin until the Start method is called.
// ParSeq is concurrency-safe; multiple ParSeqs can run in parallel.
// `parallelism` determines how many goroutines read from the Input channel, and each
// of the goroutines uses the `process` function to process the inputs.
func New(parallelism int, process func(interface{}) interface{}) ParSeq {
return ParSeq{
func New(parallelism int, process ProcessFunc) *ParSeq {
work := make([]chan interface{}, parallelism)
outs := make([]chan interface{}, parallelism)
for i := 0; i < parallelism; i++ {
work[i] = make(chan interface{}, parallelism)
outs[i] = make(chan interface{}, parallelism)
}

return &ParSeq{
Input: make(chan interface{}, parallelism),
Output: make(chan interface{}),
Output: make(chan interface{}, parallelism),

parallelism: parallelism,
work: make(chan input, parallelism),
outs: make(chan output, parallelism),
work: work,
outs: outs,
process: process,
}
}
Expand All @@ -48,12 +56,14 @@ func (p *ParSeq) Start() {

for i := 0; i < p.parallelism; i++ {
p.wg.Add(1)
go p.processRequests()
go p.processRequests(p.work[i], p.outs[i], p.process)
}

go func() {
p.wg.Wait()
close(p.outs)
for _, o := range p.outs {
close(o)
}
}()
}

Expand All @@ -66,50 +76,36 @@ func (p *ParSeq) Close() {
}

func (p *ParSeq) readRequests() {
i := 0
for r := range p.Input {
p.order++
p.l.Lock()
p.unresolved = append(p.unresolved, p.order)
p.l.Unlock()
p.work <- input{order: p.order, request: r}
p.work[i%p.parallelism] <- r
i++
if i >= p.parallelism {
i = 0
}
}
for _, w := range p.work {
close(w)
}
close(p.work)
}

func (p *ParSeq) processRequests() {
func (p *ParSeq) processRequests(in chan interface{}, out chan interface{}, process ProcessFunc) {
defer p.wg.Done()

for r := range p.work {
p.outs <- output{order: r.order, product: p.process(r.request)}
for r := range in {
out <- process(r)
}
}

func (p *ParSeq) orderResults() {
rtBuf := make(map[int64]interface{})
for pr := range p.outs {
rtBuf[pr.order] = pr.product
loop:
if len(p.unresolved) > 0 {
u := p.unresolved[0]
if rtBuf[u] != nil {
p.l.Lock()
p.unresolved = p.unresolved[1:]
p.l.Unlock()
p.Output <- rtBuf[u]
delete(rtBuf, u)
goto loop
for {
for i := 0; i < p.parallelism; i++ {
val, ok := <-p.outs[i]
if !ok {
close(p.Output)
return
}
p.Output <- val
}
}
close(p.Output)
}

type input struct {
request interface{}
order int64
}

type output struct {
product interface{}
order int64
}
4 changes: 2 additions & 2 deletions parseq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,14 @@ func noopProcessor(v interface{}) interface{} {
return v
}

func processAfter(d time.Duration) func(interface{}) interface{} {
func processAfter(d time.Duration) parseq.ProcessFunc {
return func(v interface{}) interface{} {
time.Sleep(d)
return v
}
}

func processAfterRandom(r *rand.Rand) func(interface{}) interface{} {
func processAfterRandom(r *rand.Rand) parseq.ProcessFunc {
var mu sync.Mutex
return func(v interface{}) interface{} {
mu.Lock()
Expand Down

0 comments on commit 79b2bcd

Please sign in to comment.