Concurrency Patterns
Channel-Based Patterns
Worker Pool Pattern
package main
import (
"fmt"
"sync"
)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job)
results <- job * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
var wg sync.WaitGroup
// Create worker pool
for w := 1; w <= 3; w++ {
wg.Add(1)
go worker(w, jobs, results, &wg)
}
// Send jobs
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
// Wait for workers to complete
wg.Wait()
close(results)
// Collect results
for result := range results {
fmt.Println("Result:", result)
}
}
Concurrency Patterns Visualization
graph TD
A[Concurrency Patterns] --> B[Channel Patterns]
A --> C[Synchronization Patterns]
B --> D[Worker Pool]
B --> E[Pipeline]
C --> F[Fan-Out/Fan-In]
C --> G[Semaphore]
Pipeline Pattern
package main
import (
"fmt"
)
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func main() {
// Pipeline: generate -> square
pipeline := square(generator(1, 2, 3, 4))
for v := range pipeline {
fmt.Println(v)
}
}
Concurrency Pattern Types
Pattern |
Description |
Use Case |
Worker Pool |
Distribute tasks among fixed workers |
CPU-bound tasks |
Pipeline |
Process data through multiple stages |
Data transformation |
Fan-Out/Fan-In |
Distribute work and collect results |
Parallel processing |
Semaphore |
Limit concurrent access |
Resource management |
Fan-Out/Fan-In Pattern
package main
import (
"fmt"
"sync"
)
func fanOut(ch <-chan int, out1, out2 chan<- int) {
for v := range ch {
out1 <- v
out2 <- v
}
close(out1)
close(out2)
}
func main() {
input := make(chan int)
output1 := make(chan int)
output2 := make(chan int)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for v := range output1 {
fmt.Println("Output 1:", v)
}
}()
go func() {
defer wg.Done()
for v := range output2 {
fmt.Println("Output 2:", v)
}
}()
go fanOut(input, output1, output2)
// Send inputs
for i := 1; i <= 5; i++ {
input <- i
}
close(input)
wg.Wait()
}
Context Pattern
package main
import (
"context"
"fmt"
"time"
)
func longRunningTask(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("Task cancelled")
return
default:
fmt.Println("Working...")
time.Sleep(time.Second)
}
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
go longRunningTask(ctx)
time.Sleep(5 * time.Second)
}
Best Practices
- Use channels for communication
- Avoid sharing memory
- Design for cancellation
- Use context for timeout and cancellation
Learning with LabEx
LabEx offers comprehensive tutorials and interactive environments to master Go's advanced concurrency patterns, helping developers build efficient and scalable applications.