3.3 پکیج sync

3.3 پکیج sync

پکیج sync یکی از کاربردی ترین پکیج ها در زمینه همزمانی می باشد و یکسری تایپ و توابع برایهمگام سازی و کنترل دسترسی همزمان به داده های مشترک ارائه می دهد. پرکاربرد ترین ویژگی های این پکیج شامل موارد زیر می شود :

  • Mutex : با استفاده از این تایپ می توانید آن بخش از داده ای که گوروتین ها بطور همزمان قصد دارند دسترسی داشته باشند را قفل کنید.
  • RWMutex : با استفاده از این تایپ می توانید به چندین گوروتین اجازه خواندن دهید اما فقط یک گوروتین قابلیت نوشتن در یک زمان مشخص را دارد.
  • WaitGroup: یک هماهنگ کننده گوروتین می باشد، برای اینکه ترتیب عملیات ها در هنگام همزمانی رعایت شود.
  • Once: این تایپ تضمین می کند که یک تابع فقط یک بار اجرا شود.
  • Pool: مجموعه‌ای از آبجکت های موقت که امکان ذخیره و دریافت دارند بدون اینکه بخشی از حافظه را اشغال کنند.
  • Cond:با استفاده از Cond می توانید چندین گوروتین ایجاد کنید و اجرای فرآیند آن ها را منتظر وقوع یک رویداد قرار دهید.
توجه کنید که پکیج sync فقط و فقط برای مدیریت و همگام سازی دسترسی های گوروتین ها به یک داده مشترک استفاده می شود.

3.3.1 Mutex #

در پکیج sync یک تایپ به نام Mutex وجود دارد که به شما این امکان را می دهد دسترسی به داده های مشترک را همگام سازی کنید. با استفاده از این قابلیت در واقع منابع مشترک محافظت شده و این اطمینان به شما داده می شود که تنها یک گوروتین در یک زمان مشخص، به دیتای مشترک بین گوروتین ها دسترسی دارد. تایپ Mutex شامل دو متد مهم Lock و Unlock است. وقتی Lock فراخوانی می شود فقط و فقط یک گوروتین می تواند برروی آن بخش از داده کار کند و تا زمانیکه شما Unlock نکنید سایر گوروتین ها نمی توانند به دیتای مورد نظر شما دسترسی داشته باشند و تا زمانیکه Unlock صورت گیرد سایر گوروتین هادر حالت انتظار باقی می مانند.

به مثال زیر توجه کنید :

 1package main
 2
 3import (
 4	"fmt"
 5	"sync"
 6	"time"
 7)
 8
 9var count int
10
11func main() {
12	mu := new(sync.Mutex)
13	
14	go increment(mu)
15	go increment(mu)
16	go increment(mu)
17	go increment(mu)
18
19	time.Sleep(time.Second)
20}
21
22func increment(mu *sync.Mutex) {
23	mu.Lock()
24	defer mu.Unlock()
25	count++
26	fmt.Printf("Incrementing: %d\n", count)
27}
1$ go run main.go
2
3Incrementing: 1
4Incrementing: 2
5Incrementing: 3
6Incrementing: 4
sync mutex

در بالا ما یک تابع به نام increment ایجاد کردیم و به عنوان ورودی تایپ Mutex را بصورت پوینتر پاس دادیم. سپس با استفاده از توابع Lock و Unlock برای افزایش مقدار count یک قفل گذاشتیم.

حال ۴ تا گوروتین داریم که قصد دارند همزمان روی بخش مشترک از حافظه عملیاتی را انجام دهند در اینجا ما با استفاده Mutex جلوی دسترسی همزمان گوروتین ها به حافظه را گرفتیم و فقط یک گوروتین می تواند عملیات انجام دهد و تا زمانیکه شما Unlock نکنید سایر گوروتین ها منتظر می مانند.

3.3.1.1 سناریوهای استفاده #

  1. همگام سازی دسترسی به متغیرهای مشترک: یک mutex می تواند برای همگام سازی دسترسی به متغیرهای مشترک بین چندین گوروتین استفاده شود. این می تواند در مواردی مفید باشد که چندین گوروتین نیاز به خواندن یا به روز رسانی یک متغیر به طور همزمان دارند.

  2. هماهنگی دسترسی به حالت مشترک: یک mutex می تواند برای هماهنگ کردن دسترسی به حالت مشترک بین چندین گوروتین استفاده شود. به عنوان مثال، ممکن است از یک mutex استفاده کنید تا اطمینان حاصل کنید که فقط یک گوروتین می تواند یک ساختار داده مشترک را در یک زمان تغییر دهد.

  3. پیاده سازی الگوهای تولیدکننده-مصرف کننده (producer-consumer): یک mutex می تواند برای پیاده سازی الگوهای تولیدکننده-مصرف کننده استفاده شود، که در آن یک یا چند گوروتین داده تولید می کنند و یک یا چند گوروتین آن را مصرف می کنند. mutex می تواند برای همگام سازی دسترسی به ساختار داده مشترک که داده ها را نگه می دارد استفاده شود.

۲ نکته خیلی مهم

  1. سعی کنید پس از اینکه تابع Lock را فراخوانی میکنید تابع Unlock را داخل defer قرار دهید.
  2. زمانیکه قصد دارید Mutex را به عنوان پارامتر ورودی برای توابع تعریف کنید بهتر است از نوع اشاره گر باشد.

3.3.2 RWMutex #

در پکیج sync یک تایپ به نام RWMutex وجود دارد که عملیات خواندن و نوشتن برروی یک داده مشترک را همگام سازی می کند. شما می توانید به چندین گوروتین اجازه خواندن یک داده مشترک را بدهید ولی فقط یک گوروتین می تواند عملیات نوشتن را برروی آن داده مشترک انجام دهد.

برخی مواقع ممکن است دسترسی به یک مقدار مشترک بین گوروتین ها‌را محدود کنیم اما بر اساس شرایط خاص. مثلا همه ی گورتونین ها بتوانند عملیات خواندن را انجام دهند، اما در آن واحد فقط یک گوروتین بتواند بنویسد.

در زیر یک مثال قرار دادیم توجه کنید :

 1package main
 2
 3import (
 4	"fmt"
 5	"math/rand"
 6	"strings"
 7	"sync"
 8	"time"
 9)
10
11func init() {
12	rand.Seed(time.Now().Unix())
13}
14func sleep() {
15	time.Sleep(time.Duration(rand.Intn(1000))*time.Millisecond)
16}
17func reader(c chan int, m *sync.RWMutex, wg *sync.WaitGroup) {
18	sleep()
19	m.RLock()
20	c <- 1
21	sleep()
22	c <- -1
23	m.RUnlock()
24	wg.Done()
25}
26func writer(c chan int, m *sync.RWMutex, wg *sync.WaitGroup) {
27	sleep()
28	m.Lock()
29	c <- 1
30	sleep()
31	c <- -1
32	m.Unlock()
33	wg.Done()
34}
35func main() {
36	var m sync.RWMutex
37	var rs, ws int
38	rsCh := make(chan int)
39	wsCh := make(chan int)
40	go func() {
41		for {
42			select {
43			case n := <-rsCh:
44				rs += n
45			case n := <-wsCh:
46				ws += n
47			}
48			fmt.Printf("%s%s\n", strings.Repeat("R", rs),
49				strings.Repeat("W", ws))
50		}
51	}()
52	wg := sync.WaitGroup{}
53	for i := 0; i < 10; i++ {
54		wg.Add(1)
55		go reader(rsCh, &m, &wg)
56	}
57	for i := 0; i < 3; i++ {
58		wg.Add(1)
59		go writer(wsCh, &m, &wg)
60	}
61	wg.Wait()
62}
 1$ go run main.go
 2R
 3RR
 4R
 5RR
 6R
 7W
 8R
 9RR
10RRR
11RRRR
12RRRRR
13RRRRRR
14RRRRRRR
15RRRRRR
16RRRRR
17RRRR
18RRR
19RR
20R
21W
22W

3.3.3 WaitGroup #

یک ساختاری داخل پکیج sync به نام WaitGroup وجود دارد. معمولا برای منتظر ماندن برای پایان اجرای گروهی از گوروتین ها استفاده می شود. این ساختار ۳ متد دارد که به شرح زیر می باشد:

  • Add: این متد که به عنوان ورودی عدد می گیرد تعداد گوروتین هایی که قرار است منتظر بمانند را تعیین میکند.
  • Done: این تابع زمانی استفاده می شود که فرآیند داخل هریک از گوروتین ها به اتمام برسد.
  • Wait: این متد کد ما را بلاک می کند تا زمانیکه سیگنال Done از تمامی گوروتین ها دریافت کند.

بگذارید با یک مثال توضیح دهیم:

 1package main
 2
 3import (
 4    "fmt"
 5    "sync"
 6    "time"
 7)
 8
 9func main() {
10    var wg sync.WaitGroup
11    wg.Add(2)
12    go sleep(&wg, time.Second*1)
13    go sleep(&wg, time.Second*2)
14    wg.Wait()
15    fmt.Println("All goroutines finished")
16}
17
18func sleep(wg *sync.WaitGroup, t time.Duration) {
19    defer wg.Done()
20    time.Sleep(t)
21    fmt.Println("Finished Execution")
22}
1$ go run main.go
2Finished Execution
3Finished Execution
4All goroutines finished

در بالا ما یک متغیر از ساختار WaitGroup ایجاد کردیم و پس از ایجاد متد Add را فراخوانی کردیم و تعداد گوروتین هایی که قرار است منتظر بماند را مشخص کردیم. سپس آدرس حافظه متغیر wg را به تابع sleep به عنوان ورودی پاس دادیم و در نهایت داخل تابع sleep با استفاده از defer متد Done را فراخوانی کردیم.

زمانیکه عملیات تابع sleep اتمام می شود متد Done فراخوانی می شود و یک گوروتین از لیست گوروتین های آبجکت WaitGroup کم می شود.

1// Done decrements the WaitGroup counter by one.
2func (wg *WaitGroup) Done() {  
3   wg.Add(-1)  
4}

حال وقتی فرآیند ۲ تا گوروتین اتمام شود اون بخش از کد از حالت بلاک بودن خارج می شود.

۳ نکته خیلی مهم

  1. ساختار WaitGroup یکی از پرکاربرد ترین قابلیت ها برای بحث همزمانی می باشد و برخی اوقات برای جلوگیری از Data race و همچنین برای ترتیب دهی عملیات همزمانی هم می توان از این استفاده کرد.
  2. سعی کنید WaitGroup را بصورت اشاره گر به توابعی که داخل گوروتین قرار دارند پاس دهید.
  3. هیچوقت تعداد گوروتین را بیشتر یا کمتر از اون تعدادی که دارید به متد Add ‌ندهید چون با خطای Panic مواجه خواهید شد.

3.3.4 Once #

در پکیج sync ما یک تایپ داریم به نام Once که برای اطمینان از اینکه یک تابع فقط یکبار فراخوانی می شود استفاده می شود. شما فرض کنید قصد دارید در طول برنامه از یک آبجکت فقط یک instance داشته باشید می توانید با استفاده از Once این کار را انجام دهید (شما با استفاده از Once می توانید الگو طراحی Singleton را پیاده سازی کنید.)

به مثال زیر توجه کنید:

 1package main
 2
 3import (
 4	"fmt"
 5	"sync"
 6)
 7
 8type singleton struct {
 9	data string
10}
11
12var instance *singleton
13var once sync.Once
14
15func GetInstance() *singleton {
16	once.Do(func() {
17		instance = &singleton{data: "some data"}
18	})
19	return instance
20}
21
22func main() {
23	s1 := GetInstance()
24	s2 := GetInstance()
25	if s1 == s2 {
26		fmt.Println("Same instance")
27	} else {
28		fmt.Println("Different instances")
29	}
30}
1$ go run main.go
2Same instance

در مثال فوق ما یک تابع داریم به نام GetInstance که به عنوان خروجی ساختار singleton را بصورت اشاره گر ارائه می دهد. قبل از تابع ۲ تا متغیر تعریف کردیم به نام once و instance که داخل تابع GetInstance از متد Do متغیر once یک تابع inline را قرار دادیم که فقط یک آبجکت از ساختار singleton می سازد و داخل متغیر instance قرار می دهد. در نهایت instance بازگشت داده می شود.

3.3.5 Pool #

در نسخه ۱.۳ زبان گو امکانی تایپی به نام Pool در پکیج sync که امکان ایجاد استخر آبجکت ها بطور موقت بدون اینکه بخواهد بخشی از حافظه را اشتغال کند اضافه شد. هر آبجکتی که در Pool ذخیره شود بطور خودکار در هرزمانی بدون اینکه اطلاع رسانی کند حذف می شود. امکان استفاده مجدد از آبجکت هایی که داخل استخر می گیرند وجود دارد و این باعث می شود سربار استفاده حافظه کاهش یابد.

sync pool
 1package main
 2
 3import (
 4	"bytes"
 5	"io"
 6	"os"
 7	"sync"
 8	"time"
 9)
10
11var bufPool = sync.Pool{
12	New: func() any {
13		// The Pool's New function should generally only return pointer
14		// types, since a pointer can be put into the return interface
15		// value without an allocation:
16		return new(bytes.Buffer)
17	},
18}
19
20// timeNow is a fake version of time.Now for tests.
21func timeNow() time.Time {
22	return time.Unix(1136214245, 0)
23}
24
25func Log(w io.Writer, key, val string) {
26	b := bufPool.Get().(*bytes.Buffer)
27	b.Reset()
28	// Replace this with time.Now() in a real logger.
29	b.WriteString(timeNow().UTC().Format(time.RFC3339))
30	b.WriteByte(' ')
31	b.WriteString(key)
32	b.WriteByte('=')
33	b.WriteString(val)
34	w.Write(b.Bytes())
35	bufPool.Put(b)
36}
37
38func main() {
39	Log(os.Stdout, "path", "/search?q=flowers")
40}
1$ go run main.go
22006-01-02T15:04:05Z path=/search?q=flowers

در بالا ما یک متغیر به نام bufPool ایجاد کردیم که یک آبجکت از نوع bytes.Buffer ایجاد می کند. سپس داخل تابع Log ما با استفاده از متد Get آبجکت مورد نظر را از Pool خارج و داخل متغیر b قرار دادیم و پس از آن عملیات لازم را برروی Buffer انجام و در نهایت با استفاده از متد Put آبجکت را به Pool اضافه کردیم. حالا اتفاقی که صورت گرفته ما عملیاتی که نیاز داشتیم را برروی آبجکت انجام دادیم بدون اینکه بخوایم بخشی از حافظه را درگیر کنیم.

3.3.5.1 بنچمارک در خصوص Pool #

 1package main  
 2  
 3import (  
 4   "sync"  
 5   "testing")  
 6  
 7type Person struct {  
 8   Age int  
 9}  
10  
11var personPool = sync.Pool{  
12   New: func() interface{} { return new(Person) },  
13}  
14  
15func BenchmarkWithoutPool(b *testing.B) {  
16   var p *Person  
17   b.ReportAllocs()  
18   b.ResetTimer()  
19   for i := 0; i < b.N; i++ {  
20      for j := 0; j < 10000; j++ {  
21         p = new(Person)  
22         p.Age = 23  
23      }  
24   }  
25}  
26  
27func BenchmarkWithPool(b *testing.B) {  
28   var p *Person  
29   b.ReportAllocs()  
30   b.ResetTimer()  
31   for i := 0; i < b.N; i++ {  
32      for j := 0; j < 10000; j++ {  
33         p = personPool.Get().(*Person)  
34         p.Age = 23  
35         personPool.Put(p)  
36      }  
37   }  
38}
 1$ go test -bench=. -benchmem 
 2
 3goos: linux
 4goarch: amd64
 5pkg: pool
 6cpu: Intel(R) Core(TM) i5-3570 CPU @ 3.40GHz
 7BenchmarkWithoutPool-4              5262            213177 ns/op           80000 B/op      10000 allocs/op
 8BenchmarkWithPool-4                 7699            152788 ns/op               0 B/op          0 allocs/op
 9PASS
10ok      pool    2.343s

3.3.5.2 مثال های کاربردی #

مثال اول :

فرض کنید شما می خواهید یک فایل csv را با کلی رکورد parse کنید. هر رکورد نیازمند این است که داخل یک ساختاری قرار بگیرد و ایجاد یک ساختار باعث می شود بخشی از حافظه اختصاص یابد به آن ساختار. حالا فکر کنید میلیون ها رکورد داشته باشید و این تخصیص حافظه می تواند باعث توقف برنامه شود.

حالا برای اینکه بخواهیم جلوی این اتفاق را بگیریم و سربار را کاهش دهیم بهتر است ما از sync.Pool استفاده کنیم و ساختارها را داخل استخر قرار دهیم. و زمانیکه هرکدام از ساختارها مورد نیاز نباشد می توانند داخل استخر قرار گیرند و مجدد استفاده شوند. اینکار باعث می شود بطور خیلی قابل توجهی تعداد تخصیص حافظه کاهش یابد و عملکرد برنامه چند برابر شود.

مثال دوم :

موارد استفاده دیگر از sync.Pool برای ذخیره کردن آبجکت های پرکاربرد نظیر کانکشن دیتابیس یا شبکه و همچنین آبجکت هایی که قصد داریم برروی آن عملیات serialize و deserialize انجام دهیم.

3.3.6 Cond #

با استفاده از Cond می توانید چندین گوروتین ایجاد کنید و گوروتین ها تا زمان وقوع یک رویداد منتظر می مانند و هیچ فرآیندی را اجرا نمیکنند. هر Cond که ایجاد میکنید داخلش یک قفل از نوع (Mutex یا RWMutex) وجود دارد که می توانید گوروتین ها را منتظر نگه دارید.

زمانیکه شما یک Cond میسازید به عنوان ورودی یک قفل از نوع (Mutex یا RWMutex) یه عنوان ورودی می دهید. حال شما ۳ تا متد درخصوص Cond خواهید داشت که به شرح زیر است :

  • Broadcast: با استفاده از این متد می توانید تمامی گوروتین هایی که در حالت منتظر هستند را آزاد کنید تا به فرآیند خود ادامه دهند.
  • Signal: با استفاده از این متد می توانید به گوروتین سیگنال بفرستید تا از حالت منتظر خارج شود و به فرآیند خود ادامه دهد.
  • Wait: این متد بصورت atomic منتظر می ماند تا زمانیکه unlocks صورت گیرد و تا اون موقع گوروتین ها را در حالت تعلیق نگه می دارد.

به مثال زیر توجه کنید :

 1package main
 2
 3import (
 4	"fmt"
 5	"sync"
 6)
 7
 8var sharedRsc = make(map[string]interface{})
 9
10func main() {
11	var wg sync.WaitGroup
12	wg.Add(2)
13	m := sync.Mutex{}
14	c := sync.NewCond(&m)
15	go func() {
16		// this go routine wait for changes to the sharedRsc
17		c.L.Lock()
18		for len(sharedRsc) == 0 {
19			c.Wait()
20		}
21		fmt.Println("goroutine1", sharedRsc["rsc1"])
22		c.L.Unlock()
23		wg.Done()
24	}()
25
26	go func() {
27		// this go routine wait for changes to the sharedRsc
28		c.L.Lock()
29		for len(sharedRsc) == 0 {
30			c.Wait()
31		}
32		fmt.Println("goroutine2", sharedRsc["rsc2"])
33		c.L.Unlock()
34		wg.Done()
35	}()
36
37	// this one writes changes to sharedRsc
38	c.L.Lock()
39	sharedRsc["rsc1"] = "foo"
40	sharedRsc["rsc2"] = "bar"
41	c.Broadcast()
42	c.L.Unlock()
43	wg.Wait()
44}
1$ go run main.go
2goroutine2 bar
3goroutine1 foo