Practical Concurrency Patterns
Concurrency Design Patterns Overview
Concurrency patterns help manage complex parallel operations efficiently and safely in Go.
graph TD
A[Concurrency Patterns] --> B[Worker Pool]
A --> C[Fan-Out/Fan-In]
A --> D[Pipeline]
A --> E[Semaphore]
A --> F[Rate Limiting]
1. Worker Pool Pattern
Implementation
package main
import (
"context"
"fmt"
"sync"
"time"
)
type Task struct {
ID int
}
func workerPool(ctx context.Context, tasks <-chan Task, maxWorkers int) {
var wg sync.WaitGroup
for i := 0; i < maxWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for {
select {
case task, ok := <-tasks:
if !ok {
return
}
processTask(workerID, task)
case <-ctx.Done():
return
}
}
}(i)
}
wg.Wait()
}
func processTask(workerID int, task Task) {
fmt.Printf("Worker %d processing task %d\n", workerID, task.ID)
time.Sleep(100 * time.Millisecond)
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
tasks := make(chan Task, 100)
// Generate tasks
go func() {
for i := 0; i < 50; i++ {
tasks <- Task{ID: i}
}
close(tasks)
}()
workerPool(ctx, tasks, 5)
}
2. Fan-Out/Fan-In Pattern
Pattern Characteristics
Characteristic |
Description |
Fan-Out |
Distribute work across multiple goroutines |
Fan-In |
Collect results from multiple goroutines |
Use Case |
Parallel processing of independent tasks |
Implementation Example
func fanOutFanIn(ctx context.Context, input <-chan int) <-chan int {
numWorkers := 3
outputs := make([]<-chan int, numWorkers)
// Fan-Out
for i := 0; i < numWorkers; i++ {
outputs[i] = processWorker(ctx, input)
}
// Fan-In
return mergeChannels(ctx, outputs...)
}
func processWorker(ctx context.Context, input <-chan int) <-chan int {
output := make(chan int)
go func() {
defer close(output)
for num := range input {
select {
case output <- num * num:
case <-ctx.Done():
return
}
}
}()
return output
}
func mergeChannels(ctx context.Context, channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
mergedCh := make(chan int)
multiplex := func(ch <-chan int) {
defer wg.Done()
for num := range ch {
select {
case mergedCh <- num:
case <-ctx.Done():
return
}
}
}
wg.Add(len(channels))
for _, ch := range channels {
go multiplex(ch)
}
go func() {
wg.Wait()
close(mergedCh)
}()
return mergedCh
}
3. Pipeline Pattern
Pipeline Stages
graph LR
A[Input Stage] --> B[Processing Stage]
B --> C[Output Stage]
Implementation
func generateNumbers(ctx context.Context, max int) <-chan int {
ch := make(chan int)
go func() {
defer close(ch)
for i := 1; i <= max; i++ {
select {
case ch <- i:
case <-ctx.Done():
return
}
}
}()
return ch
}
func filterEven(ctx context.Context, input <-chan int) <-chan int {
output := make(chan int)
go func() {
defer close(output)
for num := range input {
select {
case <-ctx.Done():
return
default:
if num%2 == 0 {
output <- num
}
}
}
}()
return output
}
Concurrency Pattern Best Practices
- Use context for cancellation
- Limit number of goroutines
- Avoid shared state
- Use channels for communication
- Implement proper error handling
- Goroutine overhead is minimal
- Use buffered channels for performance
- Monitor resource consumption
Error Handling Strategies
func robustConcurrentOperation(ctx context.Context) error {
errCh := make(chan error, 1)
go func() {
// Perform operation
if err != nil {
select {
case errCh <- err:
case <-ctx.Done():
}
}
}()
select {
case err := <-errCh:
return err
case <-ctx.Done():
return ctx.Err()
}
}
LabEx recommends practicing these patterns to build scalable and efficient concurrent applications in Go.