Skip to content

Commit

Permalink
attempted fix for evanw#2485: Add and Wait safety
Browse files Browse the repository at this point in the history
  • Loading branch information
evanw authored and ArrayZoneYour committed Dec 5, 2022
1 parent 48f7e23 commit 105adf7
Showing 1 changed file with 68 additions and 10 deletions.
78 changes: 68 additions & 10 deletions cmd/esbuild/service.go
Expand Up @@ -47,6 +47,7 @@ type serviceType struct {
keepAliveWaitGroup sync.WaitGroup
mutex sync.Mutex
nextRequestID uint32
disableSendRequest bool
}

func (service *serviceType) getActiveBuild(key int) *activeBuild {
Expand Down Expand Up @@ -108,10 +109,7 @@ func runService(sendPings bool) {
// Write packets on a single goroutine so they aren't interleaved
go func() {
for {
packet, ok := <-service.outgoingPackets
if !ok {
break // No more packets
}
packet := <-service.outgoingPackets
if _, err := os.Stdout.Write(packet.bytes); err != nil {
os.Exit(1) // I/O error
}
Expand All @@ -122,6 +120,37 @@ func runService(sendPings bool) {
// The protocol always starts with the version
os.Stdout.Write(append(writeUint32(nil, uint32(len(esbuildVersion))), esbuildVersion...))

// IMPORTANT: To avoid a data race, we must ensure that calling "Add()" on a
// "WaitGroup" with a counter of zero cannot possibly happen concurrently
// with calling "Wait()" on another goroutine. Thus we must ensure that the
// counter starts off positive before "Wait()" is called and that it only
// ever reaches zero exactly once (and that the counter never goes back above
// zero once it reaches zero). See this for discussion and more information:
// https://github.com/evanw/esbuild/issues/2485#issuecomment-1299318498
service.keepAliveWaitGroup.Add(1)
defer func() {
// Stop all future calls to "sendRequest()" from calling "Add()" on our
// "WaitGroup" while we're calling "Wait()", since it may have a counter of
// zero at that point. This is a mutex so calls to "sendRequest()" must
// fall into one of two cases:
//
// a) The critical section in "sendRequest()" comes before this critical
// section and "Add()" is called while the counter is non-zero, which
// is fine.
//
// b) The critical section in "sendRequest()" comes after this critical
// section and it does not call "Add()", which is also fine.
//
service.mutex.Lock()
service.disableSendRequest = true
service.keepAliveWaitGroup.Done()
service.mutex.Unlock()

// Wait for the last response to be written to stdout before returning from
// the enclosing function, which will return from "main()" and exit.
service.keepAliveWaitGroup.Wait()
}()

// Periodically ping the host even when we're idle. This will catch cases
// where the host has disappeared and will never send us anything else but
// we incorrectly think we are still needed. In that case we will now try
Expand Down Expand Up @@ -173,11 +202,11 @@ func runService(sendPings bool) {
// Move the remaining partial packet to the end to avoid reallocating
stream = append(stream[:0], bytes...)
}

// Wait for the last response to be written to stdout
service.keepAliveWaitGroup.Wait()
}

// This will either block until the request has been sent and a response has
// been received, or it will return nil to indicate failure to send due to
// stdin being closed.
func (service *serviceType) sendRequest(request interface{}) interface{} {
result := make(chan interface{})
var id uint32
Expand All @@ -193,7 +222,27 @@ func (service *serviceType) sendRequest(request interface{}) interface{} {
service.callbacks[id] = callback
return id
}()

// This function can be called from any thread. For example, it might be called
// by the implementation of watch or serve mode, or by esbuild's keep-alive
// timer goroutine. To avoid data races, we must ensure that it's not possible
// for "Add()" to be called on a "WaitGroup" with a counter of zero at the
// same time as "Wait()" is called on another goroutine.
//
// There's a potential data race when the stdin thread has finished reading
// from stdin because stdin has been closed but while it's calling "Wait()"
// on the "WaitGroup" with a counter of zero, some other thread calls
// "sendRequest()" which calls "Add()".
//
// This data race is prevented by not sending any more requests once the
// stdin thread has finished. This is ok because we are about to exit.
service.mutex.Lock()
if service.disableSendRequest {
service.mutex.Unlock()
return nil
}
service.keepAliveWaitGroup.Add(1) // The writer thread will call "Done()"
service.mutex.Unlock()
service.outgoingPackets <- outgoingPacket{
bytes: encodePacket(packet{
id: id,
Expand Down Expand Up @@ -769,10 +818,13 @@ func (service *serviceType) convertPlugins(key int, jsPlugins interface{}, activ
build.OnStart(func() (api.OnStartResult, error) {
result := api.OnStartResult{}

response := service.sendRequest(map[string]interface{}{
response, ok := service.sendRequest(map[string]interface{}{
"command": "on-start",
"key": key,
}).(map[string]interface{})
if !ok {
return result, errors.New("The service was stopped")
}

if value, ok := response["errors"]; ok {
result.Errors = decodeMessages(value.([]interface{}))
Expand All @@ -798,7 +850,7 @@ func (service *serviceType) convertPlugins(key int, jsPlugins interface{}, activ
return result, nil
}

response := service.sendRequest(map[string]interface{}{
response, ok := service.sendRequest(map[string]interface{}{
"command": "on-resolve",
"key": key,
"ids": ids,
Expand All @@ -809,6 +861,9 @@ func (service *serviceType) convertPlugins(key int, jsPlugins interface{}, activ
"kind": resolveKindToString(args.Kind),
"pluginData": args.PluginData,
}).(map[string]interface{})
if !ok {
return result, errors.New("The service was stopped")
}

if value, ok := response["id"]; ok {
id := value.(int)
Expand Down Expand Up @@ -883,7 +938,7 @@ func (service *serviceType) convertPlugins(key int, jsPlugins interface{}, activ
return result, nil
}

response := service.sendRequest(map[string]interface{}{
response, ok := service.sendRequest(map[string]interface{}{
"command": "on-load",
"key": key,
"ids": ids,
Expand All @@ -892,6 +947,9 @@ func (service *serviceType) convertPlugins(key int, jsPlugins interface{}, activ
"suffix": args.Suffix,
"pluginData": args.PluginData,
}).(map[string]interface{})
if !ok {
return result, errors.New("The service was stopped")
}

if value, ok := response["id"]; ok {
id := value.(int)
Expand Down

0 comments on commit 105adf7

Please sign in to comment.