Concurrency Patterns
Fan-Out/Fan-In Pattern
A pattern for distributing work across multiple goroutines and collecting results:
func fanOutFanIn() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// Spawn worker goroutines
for w := 1; w <= 3; w++ {
go worker(jobs, results)
}
// Send jobs
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
// Collect results
for a := 1; a <= 5; a++ {
<-results
}
}
func worker(jobs <-chan int, results chan<- int) {
for job := range jobs {
results <- job * 2
}
}
Pipeline Pattern
graph LR
A[Stage 1] --> B[Stage 2]
B --> C[Stage 3]
C --> D[Final Output]
Implementing a data processing pipeline:
func pipeline() {
numbers := generateNumbers()
squared := squareNumbers(numbers)
printed := printNumbers(squared)
// Consume the final stage
for range printed {
// Drain the channel
}
}
func generateNumbers() <-chan int {
out := make(chan int)
go func() {
for i := 1; i <= 10; i++ {
out <- i
}
close(out)
}()
return out
}
func squareNumbers(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
Worker Pool Pattern
Pattern Characteristic |
Description |
Concurrency Control |
Limits number of concurrent workers |
Resource Management |
Prevents system overload |
Scalability |
Easily adjustable worker count |
type Task struct {
ID int
}
func workerPool() {
tasks := make(chan Task, 100)
results := make(chan int, 100)
// Create worker pool
for w := 1; w <= 3; w++ {
go worker(tasks, results)
}
// Dispatch tasks
go func() {
for i := 0; i < 10; i++ {
tasks <- Task{ID: i}
}
close(tasks)
}()
// Collect results
for i := 0; i < 10; i++ {
<-results
}
}
func worker(tasks <-chan Task, results chan<- int) {
for task := range tasks {
// Process task
results <- processTask(task)
}
}
Context Cancellation Pattern
graph TD
A[Start Operation] --> B{Context Active?}
B -->|Yes| C[Continue Processing]
B -->|No| D[Terminate Goroutine]
Implementing graceful shutdown:
func contextCancellation() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Long-running operation
go func() {
for {
select {
case <-ctx.Done():
fmt.Println("Operation cancelled")
return
default:
// Perform work
}
}
}()
// Cancel after timeout
time.AfterFunc(5*time.Second, cancel)
}
Synchronization Patterns
Mutex vs Channel Synchronization
Synchronization Type |
Use Case |
Pros |
Cons |
Mutex |
Protecting shared resources |
Low overhead |
Can lead to deadlocks |
Channels |
Communicating between goroutines |
Prevents race conditions |
Slight performance overhead |
Error Handling in Concurrent Code
func robustConcurrency() error {
errChan := make(chan error, 1)
go func() {
defer close(errChan)
// Perform concurrent operation
if err := riskyOperation(); err != nil {
errChan <- err
return
}
}()
select {
case err := <-errChan:
return err
case <-time.After(5 * time.Second):
return errors.New("operation timeout")
}
}
LabEx Concurrency Best Practices
- Choose the right pattern for your use case
- Minimize shared state
- Use channels for communication
- Implement proper error handling
- Consider performance implications
By mastering these concurrency patterns, developers can create robust, efficient, and scalable Go applications that leverage the full power of goroutines.