Skip to content

Commit

Permalink
Back to using semaphores removed by #126.
Browse files Browse the repository at this point in the history
This solution is a bit more idiomatic and resonates with "Don't communicate by
sharing memory, share memory by communicating." Go proverb[^1].

Similar approach to use buffered channel as a semaphore to limit throughput is
discussed in Effective Go[^2].

[^1]: https://go-proverbs.github.io/
[^2]: https://golang.org/doc/effective_go.html#channels
  • Loading branch information
yarcat committed Sep 10, 2020
1 parent 5d514ca commit ba776bf
Showing 1 changed file with 19 additions and 35 deletions.
54 changes: 19 additions & 35 deletions goprocess/gp.go
Expand Up @@ -7,7 +7,6 @@ package goprocess

import (
"os"
"sync"

goversion "rsc.io/goversion/version"

Expand Down Expand Up @@ -39,51 +38,36 @@ func FindAll() []P {
type isGoFunc func(ps.Process) (path, version string, agent, ok bool, err error)

func findAll(pss []ps.Process, isGo isGoFunc, concurrencyLimit int) []P {
input := make(chan ps.Process, len(pss))
output := make(chan P, len(pss))

for _, ps := range pss {
input <- ps
}
close(input)

var wg sync.WaitGroup
wg.Add(concurrencyLimit) // used to wait for workers to be finished

// Run concurrencyLimit of workers until there
// is no more processes to be checked in the input channel.
for i := 0; i < concurrencyLimit; i++ {
output := make(chan []P, 1)
output <- nil
// Using buffered channel as a semaphore to limit throughput.
// See https://golang.org/doc/effective_go.html#channels
type token struct{}
sem := make(chan token, concurrencyLimit)
for _, pr := range pss {
sem <- token{}
pr := pr
go func() {
defer wg.Done()

for pr := range input {
path, version, agent, ok, err := isGo(pr)
if err != nil {
// TODO(jbd): Return a list of errors.
continue
}
if !ok {
continue
}
output <- P{
defer func() { <-sem }()
if path, version, agent, ok, err := isGo(pr); err != nil {
// TODO(jbd): Return a list of errors.
} else if ok {
output <- append(<-output, P{
PID: pr.Pid(),
PPID: pr.PPid(),
Exec: pr.Executable(),
Path: path,
BuildVersion: version,
Agent: agent,
}
})
}
}()
}
wg.Wait() // wait until all workers are finished
close(output) // no more results to be waited for

var results []P
for p := range output {
results = append(results, p)
// Acquire all semaphore slots to wait for work to complete.
for n := cap(sem); n > 0; n-- {
sem <- token{}
}
return results
return <-output
}

// Find finds info about the process identified with the given PID.
Expand Down

0 comments on commit ba776bf

Please sign in to comment.