Concurrency Patterns
Introduction to Concurrency Patterns
Concurrency patterns in Go provide structured approaches to solving complex concurrent programming challenges. These patterns help manage goroutines, synchronization, and communication effectively.
Common Concurrency Patterns
graph TD
A[Concurrency Patterns] --> B[Worker Pool]
A --> C[Fan-Out/Fan-In]
A --> D[Pipeline]
A --> E[Semaphore]
1. Worker Pool Pattern
type Task func()
func WorkerPool(tasks []Task, maxWorkers int) {
taskChan := make(chan Task)
var wg sync.WaitGroup
// Create worker goroutines
for i := 0; i < maxWorkers; i++ {
go func() {
for task := range taskChan {
task()
wg.Done()
}
}()
}
// Submit tasks
for _, task := range tasks {
wg.Add(1)
taskChan <- task
}
wg.Wait()
close(taskChan)
}
Pattern Characteristics
Pattern |
Use Case |
Key Benefits |
Worker Pool |
Parallel task processing |
Resource control, limited concurrency |
Fan-Out/Fan-In |
Distributing work |
Scalability, load balancing |
Pipeline |
Data processing |
Efficient data flow |
Semaphore |
Resource limiting |
Controlled access |
2. Fan-Out/Fan-In Pattern
func fanOut(ch <-chan int, out1, out2 chan<- int) {
for v := range ch {
out1 <- v
out2 <- v
}
close(out1)
close(out2)
}
func fanIn(in1, in2 <-chan int) <-chan int {
merged := make(chan int)
go func() {
for {
select {
case v, ok := <-in1:
if !ok {
in1 = nil
continue
}
merged <- v
case v, ok := <-in2:
if !ok {
in2 = nil
continue
}
merged <- v
}
if in1 == nil && in2 == nil {
close(merged)
return
}
}
}()
return merged
}
3. Pipeline Pattern
func pipeline() <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 1; i <= 10; i++ {
out <- i
}
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for v := range in {
out <- v * v
}
}()
return out
}
Synchronization Mechanisms
graph TD
A[Synchronization] --> B[Mutex]
A --> C[Channels]
A --> D[WaitGroup]
A --> E[Atomic Operations]
Advanced Concurrency Considerations
- Avoid shared memory when possible
- Use channels for communication
- Implement proper error handling
- Be mindful of goroutine lifecycles
Error Handling in Concurrent Code
func processWithErrorHandling(tasks []Task) error {
errChan := make(chan error, len(tasks))
var wg sync.WaitGroup
for _, task := range tasks {
wg.Add(1)
go func(t Task) {
defer wg.Done()
if err := executeTask(t); err != nil {
errChan <- err
}
}(task)
}
go func() {
wg.Wait()
close(errChan)
}()
for err := range errChan {
if err != nil {
return err
}
}
return nil
}
LabEx Concurrency Recommendations
At LabEx, we emphasize:
- Designing for concurrency from the start
- Using patterns that promote clean, maintainable code
- Avoiding over-complication of concurrent designs
- Minimize lock contention
- Use buffered channels judiciously
- Profile and benchmark concurrent code
- Choose the right pattern for your specific use case