Skip to content

cassiobotaro/concorrencia-go

Repository files navigation

Concorrência em Go

Go é fundamentada no modelo CSP (Communicating sequential processes) proposto por Tony Hoare. Neste modelo os dados são compartilhados enviando mensagens através de canais.

As explicações e exemplos são altamente inspiradas na apresentação do @andrebq.

Uma outra influência é a artigo sobre pipelines e cancelamento em go.

Aqui serão apresentados alguns padrões de concorrência, porem sugiro também a leitura sobre context, select, canais com buffer e outros mecanismos de controle de concorrencia.

🔗 Canais

Canais (channels) são uma estrutura primitiva na linguagem, e você pode utilizá-los para envio e recebimento de valores entre rotinas (goroutines). Os valores podem ser de qualquer tipo, inclusive do tipo canal.

Um canal é um ponto de sincronização entre goroutines. Uma goroutine vai ficar bloqueada escrevendo em um canal até que aquele canal seja lido.

Ler de um canal é semelhante, uma goroutine vai ficar bloqueada lendo até que um valor seja enviado para o canal ou o canal seja fechado (quando isso ocorre, o valor zero do tipo é retornado).

Um canal pode ser fechado. Isso é útil para indicar que nenhum outro valor será escrito no canal.

Ler um canal fechado retorna um valor zero do tipo do canal.

Escrever em um canal fechado retorna um erro (panic).

🗺️ Olá Mundo

O primeiro exemplo mostra como criar um canal, que será utilizado como ponte entre a aplicação principal e uma goroutine.

O programa principal fica bloqueado até que a mensagem "Olá mundo" seja enviada para o canal.

Quando isto ocorre, a goroutine é desbloqueada e a mensagem é exibida.

Quando o programa principal termina, a goroutine é também terminada.

package main

import "fmt"

func main() {
	canal := make(chan string)
	go func() {
		canal <- "Olá, mundo!"
	}()

	fmt.Println(<-canal)
}

🆕 Geradores

Geradores são funções que iniciam uma goroutine para escrever uma lista de valores em um canal que é retornado para quem acionou a função.

No exemplo uma sequência de números inteiros é gerada e enviada para um canal.

A função principal (main) irá realizar a leitura do canal e imprimir os valores. Essa é uma característica interessante sobre canais, quando utilizados com o range, a iteração continuará até que o canal seja fechado.

package main

import "fmt"

func sequenciaNumeros(inicial, final int) <-chan int {
	saida := make(chan int)
	go func() {
		for i := inicial; i <= final; i++ {
			saida <- i
		}
		// após gerar todos os valores, fecha o canal
		close(saida)
	}()
	return saida
}

func main() {
	valores := sequenciaNumeros(1, 1000)
	for valor := range valores {
		fmt.Printf("valor: %v\n", valor)
	}
}

🚧 Trabalhador (worker)

Um trabalhador é uma goroutine que recebe valores de um canal e os processa.

No exemplo valores inteiros são enviados pela função principal (main) através do canal de entrada e processados por um trabalhador.

É possível criar vários trabalhadores para processarem um mesmo canal.

package main

import "fmt"

func trabalhador(entrada <-chan int) {
	for valor := range entrada {
		fmt.Printf("valor: %v\n", valor)
	}
}

func main() {
	entrada := make(chan int)
	// Um trabalhador é iniciado e aguarda por valores no canal de entrada
	go trabalhador(entrada)
	for i := 0; i < 10; i++ {
		entrada <- i
	}
	// Após ter enviado todos os valores, fecha o canal de entrada
	// avisando ao trabalhador que o trabalho terminou
	close(entrada)
}

👷‍♂️👷‍♀️ Grupo de Trabalhadores (pool of workers)

Uma coleção de goroutines que ficam esperando tarefas serem atribuídas a elas. Quando a goroutine finaliza a tarefa que foi atribuída, se torna disponível novamente para execução de uma nova tarefa.

No exemplo, um grupo de n trabalhadores aguardam a chegada de valores pelo canal de entrada. Cada trabalhador executa seu processmento e envia o resultado por um canal.

O tipo sync.WaitGroup fornece uma maneira simples de organizar o grupo de trabalhadores.

package main

import (
	"fmt"
	"sync"
)

func trabalhador(id int, entrada <-chan int, saida chan<- int, grupo *sync.WaitGroup) {
	for valor := range entrada {
		fmt.Printf("id: %d processou valor: %v\n", id, valor)
		saida <- valor * 2
	}
	grupo.Done()
}

func grupoDeTrabalhadores(entrada <-chan int, nTrabalhadores int) chan int {
	saida := make(chan int)
	var wg sync.WaitGroup
	for i := 0; i < nTrabalhadores; i++ {
		go trabalhador(i+1, entrada, saida, &wg)
	}
	wg.Add(nTrabalhadores)
	go func() {
		// Quando todos os trabalhadores estiverem terminado
		// informa que o grupo não vai mais enviar resultados
		wg.Wait()
		close(saida)
	}()
	return saida
}

func sequenciaNumeros(inicial, final int) <-chan int {
	saida := make(chan int)
	go func() {
		for i := inicial; i <= final; i++ {
			saida <- i
		}
		// após gerar todos os valores, fecha o canal
		close(saida)
	}()
	return saida
}

func main() {
	// Produz uma sequência de 10 valores
	entrada := sequenciaNumeros(1, 10)
	// Um grupo de trabalhadores irá processar esses números
	saida := grupoDeTrabalhadores(entrada, 2)

	// somente termina quando todo o trabalho for processado
	for s := range saida {
		fmt.Println(s)
	}
}

🧑‍🏭 Pipeline

Um pipeline trabalha recebendo valores de um canal e escrevendo em outro canal, normalmente após realizar alguma tranformação no valor.

No exemplo temos a função dobro atuando como um pipeline, que irá receber os valores enviados ao canal de entrada retornando os valores transformados.

Um canal pode ser definido como sendo apenas para leitura (<-chan) ou apenas para escrita (chan<-).

Os valores gerados pelo gerador sequenciaNumeros são enviados para o canal de entrada do pipeline e seu valor transformado recebido pelo canal de saída na função principal e é impresso.

Vários pipelines poderiam ser encadeados para realizar múltiplas transformações.

package main

import "fmt"

func dobro(entrada <-chan int) <-chan int {
	saida := make(chan int)
	go func() {
		for valor := range entrada {
			saida <- valor * 2
		}
		// Após ter terminado de transformar os valores de entrada,
		//  fecha o canal de saida
		close(saida)
	}()
	return saida
}

func sequenciaNumeros(inicial, final int) <-chan int {
	saida := make(chan int)
	go func() {
		for i := inicial; i <= final; i++ {
			saida <- i
		}
		// após gerar todos os valores, fecha o canal
		close(saida)
	}()
	return saida
}

func main() {
	for valor := range dobro(dobro(sequenciaNumeros(1, 10))) {
		fmt.Printf("valor: %v\n", valor)
	}
}

⚗️ Fan-in

Um fan-in copia dados de múltiplos canais de entrada e escreve em um único canal de saída. Normalmente um fan-in só termina quando todos os canais de entrada são fechados.

A função fan-in pode receber vários canais entrada através de parâmetros múltiplos.

No exemplo abaixo, enviamos vários geradores como entrada para a função fan-in e nos é retornado um único canal de saída. Internamente, uma goroutine é criada para ler os valores de cada canal de entrada, porém todas escrevem no mesmo canal de saída.

Envio de mensagem em um canal fechado causa um erro (panic), por isso é importante garantir que todos os canais de entrada estejam fechados antes de fechar o canal de saída. O tipo sync.WaitGroup fornece uma maneira simples de organizar essa sincronização.

Repare que temos uma goroutine que aguarda um sinal indicando que todas as todas entradas foram consumidas (wg.Wait), finalizando assim o canal de saída.

package main

import (
	"fmt"
	"sync"
)

func fanin(entradas ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	// canal de saída que será compartilhado entre os canais de entrada
	saida := make(chan int)

	// lê os valores de cada canal de entrada e envia para o canal de saída
	// quando todos os valores forem lidos, envia sinal avisando que terminou
	enviarSaida := func(c <-chan int) {
		for n := range c {
			saida <- n
		}
		// aviso que terminou de ler os valores de um canal
		wg.Done()
	}
	wg.Add(len(entradas))
	// Inicializa uma goroutine de saída para cada canal de entrada em canais_entrada.
	for _, c := range entradas {
		go enviarSaida(c)
	}

	// Inicia uma goroutine para fechar o canal de saída quando todas as
	// goroutines de entrada terminarem.
	// isto deve ser feito após o wg.Add
	go func() {
		wg.Wait()
		close(saida)
	}()
	return saida
}

func sequenciaNumeros(inicial, final int) <-chan int {
	saida := make(chan int)
	go func() {
		for i := inicial; i <= final; i++ {
			saida <- i
		}
		// após gerar todos os valores, fecha o canal
		close(saida)
	}()
	return saida
}

func main() {
	canal := fanin(
		sequenciaNumeros(1, 10),
		sequenciaNumeros(11, 20),
		sequenciaNumeros(21, 30),
	)
	for valor := range canal {
		fmt.Printf("valor: %v\n", valor)
	}
}

📣 Fan-out

Um fan-out copia dados de um canal de entrada para múltiplos canais de saída.

No exemplo uma sequência de números é gerada e enviada para múltiplos canais de saída. Estes canais possuem seus respectivos trabalhadores que irão fazer o processamento do valor.

Esta implementação de fan-out tenta garantir a entrega de todas as mensagens utilizando um agrupador (WaitGroup) para aguardar a publicação dos valores em todos os canais de saída. A publicação é feita em sua própria goroutine e conta também com um mecanismo(timer) de forma a previnir o bloqueio caso algum canal de saída não consiga consumir a mensagem. As mensagens não consumidas são descartadas.

package main

import (
	"fmt"
	"sync"
	"time"
)

func publicar(saida chan<- int, valor int, wg *sync.WaitGroup) {
	timer := time.NewTimer(1 * time.Second)
	// Aguarda 1 segundo ou o canal ser lido
	select {
	case saida <- valor:
	case <-timer.C:
	}
	// Independente do canal ser lido ou não,
	// avisa que a publicação terminou
	wg.Done()
	timer.Stop()
}

func fanout(entrada <-chan int, saidas ...chan<- int) {

	// O agrupamento das publicações é para evitar que
	// o processamento fique bloquando enquanto um canal de saída não é lido
	// e garante que todos os valores serão publicados
	var wg sync.WaitGroup
	for valor := range entrada {
		wg.Add(len(saidas))
		// Publica o valor de entrada em todas as saídas
		for _, saida := range saidas {
			go publicar(saida, valor, &wg)
		}
		wg.Wait()
	}
	// Como a entrada foi consumida, fecha os canais de saída
	for _, saida := range saidas {
		close(saida)
	}
}

func sequenciaNumeros(inicial, final int) <-chan int {
	saida := make(chan int)
	go func() {
		for i := inicial; i <= final; i++ {
			saida <- i
		}
		// após gerar todos os valores, fecha o canal
		close(saida)
	}()
	return saida
}

func trabalhador(in <-chan int, id int, wg *sync.WaitGroup) {
	for v := range in {
		fmt.Println("id: ", id, " valor: ", v)
	}
	wg.Done()
}

func main() {
	saida1 := make(chan int)
	saida2 := make(chan int)
	// Agrupamos os trabalhadores de forma
	// a aguardar o processamento de todos antes do programa principal
	// ser finalizado
	var wg sync.WaitGroup
	wg.Add(2)
	go trabalhador(saida1, 1, &wg)
	go trabalhador(saida2, 2, &wg)
	fanout(sequenciaNumeros(1, 10), saida1, saida2)
	wg.Wait()
}

🪟 Janela deslizante

Uma janela deslizante (sliding window) é utilizada para prevenir que um leitor lento trave um escritor rápido. Ela funciona deslizando sobre os dados. A ordem de entregas é garantida porém dados antigos podem ser descartados se o consumidor for muito lento.

No exemplo uma sequência de números é gerada, porém nosso consumidor é mais lento que o produtor, logo a medida que a janela desliza os valores antigos são descartados.

Para fazer a janela deslizante, utilizamos um buffer, que possui um tamanho fixo. Utilizamos uma técnica de seleção (select) onde caso o canal de saída seja lido, enviamos o valor para o consumidor e o removemos do buffer. Caso o canal de entrada seja lido, o valor é adicionado ao buffer.

package main

import (
	"container/list"
	"fmt"
	"time"
)

func janelaDeslizante(saida chan<- interface{}, entrada <-chan interface{}, tamanho int) {
	buffer := list.New()
	defer close(saida)
	for entrada != nil || buffer.Len() > 0 {
		if buffer.Len() == 0 {
			// nós temos um buffer vazio
			// e um canal de entrada válido
			val := <-entrada
			if val == nil { // assume que nil significa fechado
				entrada = nil // não vai mais ler dados
				continue
			}
			buffer.PushBack(val)
			continue
		}
		select {
		case saida <- buffer.Front().Value:
			// consumidor lê o dado
			buffer.Remove(buffer.Front()) // remove first item
		case val := <-entrada:
			// recebeu nova entrada
			if val == nil {
				// invalida entrada
				entrada = nil
				// continua já que podemos ter dados
				// no buffer
				continue
			}
			if buffer.Len() == tamanho {
				// buffer cheio, descarta dados antigos
				buffer.Remove(buffer.Front())
			}
			// adiciona novo dado no buffer
			buffer.PushBack(val)
		}
	}
}

func leitorLento(in <-chan interface{}) {
	for val := range in {
		fmt.Printf("valor: %v\n", val)
		time.Sleep(4 * time.Second)
	}
}

func sequenciaNumeros(inicial, final int) <-chan interface{} {
	saida := make(chan interface{})
	go func() {
		for i := inicial; i <= final; i++ {
			saida <- i
			time.Sleep(1 * time.Second)
		}
		// após gerar todos os valores, fecha o canal
		close(saida)
	}()
	return saida
}

func main() {
	valores := sequenciaNumeros(1, 10)
	saida := make(chan interface{})
	go leitorLento(saida)
	janelaDeslizante(saida, valores, 3)
}

🧑‍🤝‍🧑 Processamento em lote (batch processing)

Um processamento em lote (batch processing) é usado quando uma goroutine gera itens um-por-um mas o consumidor deseja processar os items em blocos. Normalmente um canal de conclusão é usado para notificar o escritor que o item foi processado. Um canal de descarga (flush) pode user usado para forçar que o buffer seja enviado antes que ele esteja cheio.

Exemplo: Ao invés de salvar cada item no banco de dados assim que ele é recebido, é possível utilizar um buffer de 100 itens ou 100ms e salvar os itens em uma única requisição.

No exemplo, quando a terceira requisição (req) é enviada, o buffer percebe que ele está cheio e envia os dados para o canal de saída.

Há um canal que permite enviar os dados antes que o buffer esteja cheio, chamado descarga.

Quando o canal de entrada é fechado, mas ainda há itens no buffer, o buffer é enviado para o canal de saída.

package main

import (
	"fmt"
)

type req struct {
	valor int
}

func processar(lote []req) {
	fmt.Println("processando lote com valores: ", lote)
}

func processadorLotes(entrada <-chan []req) chan bool {
	pronto := make(chan bool)
	go func() {
		for lote := range entrada {
			processar(lote)
		}
		pronto <- true
	}()
	return pronto
}

func processamentoLotes(entrada <-chan req, descarga <-chan struct{}, tamanhoLote int) chan []req {
	saida := make(chan []req)
	go func() {
		buf := make([]req, 0, tamanhoLote)
		valorZero := req{}
		var fechado bool
		// enquanto houver itens para processar
		for !fechado {
			var deveDescarregar bool

			select {
			case r := <-entrada:
				if r == valorZero {
					// close on zero value
					fechado = true
					continue
				}
				// Adiciona o item no buffer
				buf = append(buf, r)
				deveDescarregar = len(buf) == tamanhoLote
			case <-descarga:
				deveDescarregar = true
			}
			if deveDescarregar {
				saida <- buf
				buf = make([]req, 0, tamanhoLote)
			}
		}
		// garante que caso a entrada seja fechada sem preencher o buffer
		// os ultimos itens sejam enviados para processamento
		if len(buf) > 0 {
			saida <- buf
		}
		close(saida)
	}()
	return saida
}

func main() {
	entrada := make(chan req)
	descarga := make(chan struct{})
	// inicia de forma concorrente o processamento em lotes
	saida := processamentoLotes(entrada, descarga, 3)
	// O consumidor de lotes será iniciado de forma concorrente
	pronto := processadorLotes(saida)
	entrada <- req{valor: 1}
	entrada <- req{valor: 2}
	entrada <- req{valor: 3}

	// Envia mais dois itens, porém força a descarga
	// através de um sinal
	entrada <- req{valor: 4}
	entrada <- req{valor: 5}
	descarga <- struct{}{}

	// Envia mais dois itens, não o suficiente para descarregar
	// o lote.
	entrada <- req{valor: 6}
	entrada <- req{valor: 7}
	// Eles serão processados mesmo assim.

	close(entrada)
	// Aguarda todo o processamento do processador de lotes
	// antes de encerrar o programa
	<-pronto
}

🎫 Sistema de ticket

Um sistema de ticket é usado para controlar quando um determinado trabalho pode ser executado, normalmente é utilizado para limitar o uso de um recurso sobre um período de tempo.

Exemplo: Uma API pode ser acionada apenas 15 vezes em um perído de 15 minutos. A utilização é medida em blocos de 15 minutos.

No exemplo, a bilheteria é um sistema de ticket que garante que apenas 15 "tickets" sejam processados a cada segundo.

Enviamos através de um canal 30 processamentos a serem feitos, mas o sistema de ticket garante que apenas 15 processamentos sejam executados por segundo.

Como pode ser visto, o trabalhador fica bloqueado, até que um ticket seja enviado através do canal.

package main

import (
	"fmt"
	"time"
)

type (
	Trabalho func()
	ticket   int
)

func trabalhador(tickets <-chan ticket, work <-chan Trabalho) {
	for w := range work {
		<-tickets // espera por um ticket
		w()       // executa um trabalho
	}
}

func bilheteria(tickets chan<- ticket, timeout time.Duration, nTickets int) {
	for {
		for i := 0; i < nTickets; i++ {
			tickets <- ticket(i)
		}

		// espera até que mais tickets possam ser emitidos
		<-time.After(timeout)
	}
}

func main() {
	tickets := make(chan ticket)
	trabalhos := make(chan Trabalho)

	go bilheteria(tickets, 1*time.Second, 10)
	go trabalhador(tickets, trabalhos)

	for i := 0; i <= 30; i++ {

		trabalhos <- func() {
			fmt.Println("processando ticket")
		}
		fmt.Println("trabalho ", i, " enviado")
	}

	close(trabalhos)
	close(tickets)
}