Concurrency Patterns
Introduction to Concurrency Patterns
Concurrency patterns help solve complex synchronization and communication challenges in Go programming. These patterns provide structured approaches to managing concurrent operations.
Common Concurrency Patterns
1. Worker Pool Pattern
graph TD
A[Job Queue] --> B[Worker 1]
A --> C[Worker 2]
A --> D[Worker 3]
B --> E[Result Channel]
C --> E
D --> E
Implementation Example:
func workerPool(jobs <-chan int, results chan<- int) {
for job := range jobs {
results <- processJob(job)
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// Create worker pool
for w := 1; w <= 3; w++ {
go workerPool(jobs, results)
}
// Send jobs
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
// Collect results
for a := 1; a <= 5; a++ {
<-results
}
}
2. Fan-Out/Fan-In Pattern
Pattern |
Description |
Use Case |
Fan-Out |
Single input distributed to multiple workers |
Parallel processing |
Fan-In |
Multiple inputs consolidated into single output |
Result aggregation |
func fanOutFanIn() {
input := make(chan int)
output := make(chan int)
// Fan-out
go func() {
for i := 0; i < 5; i++ {
input <- i
}
close(input)
}()
// Multiple workers
go worker(input, output)
go worker(input, output)
// Fan-in results
for result := range output {
fmt.Println(result)
}
}
3. Semaphore Pattern
type Semaphore struct {
semaChan chan struct{}
}
func NewSemaphore(maxConcurrency int) *Semaphore {
return &Semaphore{
semaChan: make(chan struct{}, maxConcurrency),
}
}
func (s *Semaphore) Acquire() {
s.semaChan <- struct{}{}
}
func (s *Semaphore) Release() {
<-s.semaChan
}
4. Pipeline Pattern
graph LR
A[Input] --> B[Stage 1]
B --> C[Stage 2]
C --> D[Stage 3]
D --> E[Output]
func pipeline() {
numbers := generateNumbers()
squared := squareNumbers(numbers)
printed := printNumbers(squared)
for v := range printed {
fmt.Println(v)
}
}
func generateNumbers() <-chan int {
out := make(chan int)
go func() {
for i := 1; i <= 5; i++ {
out <- i
}
close(out)
}()
return out
}
Advanced Concurrency Techniques
Graceful Shutdown
func gracefulShutdown(ctx context.Context) {
done := make(chan bool)
go func() {
// Perform cleanup
select {
case <-ctx.Done():
// Handle cancellation
done <- true
}
}()
select {
case <-done:
fmt.Println("Shutdown complete")
case <-time.After(5 * time.Second):
fmt.Println("Forced shutdown")
}
}
Best Practices
- Use channels for communication
- Limit concurrent operations
- Implement proper error handling
- Use context for cancellation
- Avoid shared memory mutations
Conclusion
Mastering these concurrency patterns with LabEx's comprehensive approach will help you write efficient, scalable Go applications that leverage the power of concurrent programming.