9.4.4 الگوی Worker Pool

9.4.4 الگوی Worker Pool

9.4.4.1 توضیحات #

الگوی Worker Pool یکی از مهم‌ترین الگوهای همزمانی در Go محسوب می‌شود و زمانی به‌کار می‌رود که بخواهید تعداد مشخصی goroutine (معمولاً با نقش کارگر یا worker) داشته باشید که وظایف مختلف را به صورت صف (queue) دریافت و اجرا کنند. این کار باعث کنترل بهتر منابع، جلوگیری از ایجاد goroutine بیش از حد (که ممکن است باعث مصرف بی‌رویه CPU و memory یا حتی crash برنامه شود) و مدیریت صف کارها در سیستم‌های real-world و پرلود می‌شود. در این الگو، یک یا چند کانال برای ارسال وظایف (task queue) و دریافت نتایج بین goroutineهای تولیدکننده (producer) و goroutineهای worker (مصرف‌کننده) استفاده می‌شود.

مثلاً در یک سیستم وب یا پردازش موازی داده، می‌توانید یک کانال برای صف کردن درخواست‌ها ایجاد کنید و چند goroutine به عنوان worker راه‌اندازی کنید تا هرکدام از این صف وظیفه برداشته و پردازش کنند. پس از اتمام کار، نتیجه را می‌توانند در یک کانال نتایج (result channel) قرار دهند تا main goroutine یا یک جمع‌کننده (collector) نتایج را جمع‌آوری کند. این معماری، بهترین شیوه برای مدیریت connection pool دیتابیس، پردازش موازی queueها، انجام وظایف تکراری (مثل scraping، پردازش تصاویر یا فایل‌ها) و افزایش مقیاس‌پذیری است.

در مجموع، Worker Pool با جلوگیری از ایجاد تعداد زیاد goroutine، افزایش کنترل بر مصرف منابع، افزایش throughput و جلوگیری از bottleneck شدن سیستم، یکی از حرفه‌ای‌ترین الگوهای تولیدی در Go محسوب می‌شود. استفاده هوشمندانه از کانال‌ها برای توزیع و جمع‌آوری وظایف و نتایج، کدنویسی را هم ساده‌تر و هم کاملاً idiomatic می‌کند.

9.4.4.2 دیاگرام #

flowchart LR subgraph Producer A1[Job Queue
job_0, job_1, ..., job_N] end subgraph WorkerPool direction TB W0[Worker 0
job_0] W1[Worker 1
job_1] WD[...] Wn[Worker N
job_N] end subgraph Collector B1[Result Queue
res_0, res_1, ..., res_N] end A1 -- "job_0" --> W0 A1 -- "job_1" --> W1 A1 -- "job_i ..." --> WD A1 -- "job_N" --> Wn W0 -- "res_0" --> B1 W1 -- "res_1" --> B1 WD -- "..." --> B1 Wn -- "res_N" --> B1 MGR((Manager)) MGR --- WorkerPool classDef worker fill:#e3ffe3,stroke:#6bc76b,stroke-width:2px; classDef queue fill:#f2f6fa,stroke:#4c78a8,stroke-width:2px; classDef mgr fill:#ffe9c6,stroke:#a58954,stroke-width:2px; class W0,W1,WD,Wn worker; class A1,B1 queue; class MGR mgr;

9.4.4.3 نمونه کد #

 1package main
 2
 3import (
 4	"fmt"
 5	"sync"
 6)
 7
 8// ساختار نتیجه خروجی هر کارگر
 9type JobResult struct {
10	JobID    int
11	Input    int
12	Output   int
13	WorkerID int
14	Err      error
15}
16
17func main() {
18	const (
19		numJobs    = 5
20		numWorkers = 3
21	)
22
23	jobs := make(chan int, numJobs)
24	results := make(chan JobResult, numJobs)
25
26	var wg sync.WaitGroup
27
28	// راه‌اندازی worker pool
29	for w := 1; w <= numWorkers; w++ {
30		wg.Add(1)
31		go worker(w, jobs, results, &wg)
32	}
33
34	// ارسال jobها
35	for j := 1; j <= numJobs; j++ {
36		jobs <- j
37	}
38	close(jobs)
39
40	// انتظار برای اتمام همه workerها و سپس بستن کانال نتایج
41	go func() {
42		wg.Wait()
43		close(results)
44	}()
45
46	// جمع‌آوری و پردازش نتایج
47	for result := range results {
48		if result.Err != nil {
49			fmt.Printf("[Job %d] خطا در Worker %d: %v\n", result.JobID, result.WorkerID, result.Err)
50			continue
51		}
52		fmt.Printf("[Job %d] Worker %d → input: %d, output: %d\n",
53			result.JobID, result.WorkerID, result.Input, result.Output)
54	}
55}
56
57// Worker function
58func worker(id int, jobs <-chan int, results chan<- JobResult, wg *sync.WaitGroup) {
59	defer wg.Done()
60	for input := range jobs {
61		// شبیه‌سازی کار و احتمال خطا
62		var output int
63		var err error
64		if input == 3 {
65			err = fmt.Errorf("مشکل در پردازش داده")
66		} else {
67			output = input * 2
68		}
69
70		result := JobResult{
71			JobID:    input,
72			Input:    input,
73			Output:   output,
74			WorkerID: id,
75			Err:      err,
76		}
77		results <- result
78	}
79}
1$ go run main.go
2[Job 1] Worker 3 → input: 1, output: 2
3[Job 2] Worker 3 → input: 2, output: 4
4[Job 4] Worker 2 → input: 4, output: 8
5[Job 5] Worker 2 → input: 5, output: 10
6[Job 3] خطا در Worker 3: مشکل در پردازش داده

در این مثال از Worker Pool، سعی شده معماری‌ای تولید شود که هم خوانایی و توسعه‌پذیری بالایی داشته باشد و هم از نظر اطمینان و مدیریت منابع کاملاً production-ready باشد. در ابتدای برنامه، تعداد jobها و workerها به صورت ثابت تعیین شده و دو کانال برای مدیریت ارسال کارها (jobs) و جمع‌آوری نتایج (results) تعریف شده است. با استفاده از یک حلقه، به تعداد workerها goroutine اجرا می‌شود؛ هرکدام با استفاده از یک اشاره‌گر به sync.WaitGroup، اتمام کار خود را اعلام می‌کنند. این باعث می‌شود که بدانیم دقیقاً چه زمانی همه کارگرها کارشان را به اتمام رسانده‌اند.

برای هر job که وارد صف می‌شود، اطلاعات آن در کانال jobs قرار می‌گیرد و پس از ارسال تمام jobها، کانال بسته می‌شود تا workerها پس از اتمام کار بتوانند از حلقه خارج شوند. پس از اتمام همه goroutineها (به کمک WaitGroup)، یک goroutine کمکی کانال results را می‌بندد تا حلقه جمع‌آوری نتایج نیز بدون مشکل به پایان برسد. خروجی هر کار به صورت یک ساختار JobResult است که هم شناسه job، هم ورودی و خروجی، هم شماره worker و هم خطای احتمالی را شامل می‌شود. این ساختار هم امکان لاگ‌گیری دقیق، هم مدیریت خطا و هم تحلیل بعدی را به سادگی ممکن می‌کند.

در این مثال، برای یکی از jobها به صورت شبیه‌سازی‌شده یک خطا تولید می‌شود تا نشان داده شود چگونه مدیریت خطا باید به صورت ایمن و جداگانه برای هر job انجام گیرد. در حلقه دریافت نتایج، ابتدا خطا بررسی می‌شود و در صورت وجود خطا، پیام مناسب نمایش داده می‌شود؛ در غیر این صورت، ورودی، خروجی و شماره worker برای هر job به صورت فرمت‌بندی‌شده چاپ می‌گردد. این رویکرد علاوه بر رعایت idiomatic بودن کد Go، کنترل کاملی روی منابع و وضعیت اجرایی همه بخش‌ها ایجاد می‌کند و پایه‌ای ایده‌آل برای پروژه‌های واقعی و مقیاس‌پذیر محسوب می‌شود.

در نهایت، این معماری به راحتی قابل گسترش برای jobهای پیچیده‌تر، مدیریت صف‌های بزرگ‌تر یا حتی پیاده‌سازی با context و timeout است و از بروز مشکلات رایج مانند goroutine leak یا deadlock جلوگیری می‌کند.

9.4.4.4 کاربردها #

  • تقسیم کارهای پردازشی (Parallel Data Processing): با استفاده از الگوی Worker Pool می‌توانید حجم زیادی از داده‌ها یا کارهای محاسباتی سنگین را به بخش‌های کوچک‌تر تقسیم کنید و به طور موازی بین چندین goroutine کارگر توزیع نمایید. این رویکرد علاوه بر افزایش سرعت پردازش، باعث استفاده بهینه‌تر از منابع سیستم (CPU و حافظه) می‌شود و از ایجاد goroutineهای بیش از حد یا سربار اضافی جلوگیری می‌کند. در نتیجه، سربار مدیریت همزمانی کاهش یافته و عملکرد نهایی سیستم به طور قابل ملاحظه‌ای بهبود می‌یابد.
  • مدیریت و محدودسازی منابع (Resource Management & Limiting): Worker Pool به شما امکان می‌دهد تعداد ثابتی goroutine برای انجام کارها داشته باشید و از مصرف بیش از حد منابع سیستم، مانند اتصال به دیتابیس یا پردازش همزمان بیش از حد، جلوگیری کنید. این کار برای کنترل بار روی سرویس‌های خارجی (مانند دیتابیس، API یا حتی سخت‌افزار) حیاتی است و جلوی شکست یا کندی سیستم را می‌گیرد.
  • اجرای موازی درخواست‌های خارجی (Parallel External Requests): این الگو برای ارسال همزمان تعداد زیادی درخواست به سرویس‌های خارجی (مانند APIهای وب، ذخیره‌سازی ابری یا دانلود فایل‌ها) بسیار کاربردی است. Worker Pool با محدودسازی تعداد کارگرها، امکان ارسال کنترل‌شده و پایدار درخواست‌ها را فراهم می‌کند.
  • پذیرش و پردازش صف کارها (Job Queue Processing): در معماری‌های صف محور (مانند پردازش پیام یا وظایف پس‌زمینه)، Worker Pool به شما اجازه می‌دهد کارها را از صف بخوانید و توسط کارگرها به شکل کنترل‌شده و موازی اجرا کنید. این الگو پایه بسیاری از سیستم‌های background task، notification و microservice است.
  • پردازش تصاویر، فایل‌ها و داده‌های بزرگ: Worker Pool برای سیستم‌هایی که باید تعداد زیادی تصویر یا فایل را به طور موازی پردازش کنند (مثلاً تغییر سایز عکس، رمزنگاری فایل یا پردازش ویدئو)، ایده‌آل است و بازدهی را به طرز چشمگیری افزایش می‌دهد.
  • مدیریت کانکشن‌های شبکه یا سرور: در سرورهایی که با تعداد زیادی اتصال همزمان مواجه هستند، Worker Pool می‌تواند برای مدیریت همزمان کانکشن‌ها یا درخواست‌های ورودی، و جلوگیری از overload شدن سیستم، استفاده شود.