9.4.13.1 توضیحات #
الگوی Subscription (یا Pub-Sub / Observer Pattern) یکی از پرکاربردترین الگوها در معماریهای رویداد-محور و همزمان (event-driven & concurrent) است که امکان ثبتنام (subscribe) یک یا چند مصرفکننده (consumer) را برای دریافت خودکار دادههای جدید از یک منبع یا سرویس فراهم میکند. در این الگو، یک یا چند مصرفکننده به یک “آدرس” یا منبع اشتراک (مثلاً یک topic، کانال یا event source) متصل میشوند و هر زمان که داده یا رویداد جدیدی منتشر شد (publish)، اطلاعات به طور خودکار و بینیاز از polling مکرر به همهی مصرفکنندههای عضو ارسال میشود.
در زبان Go، پیادهسازی Subscription اغلب با استفاده از channelها و goroutineها انجام میشود: یک goroutine به عنوان publisher وظیفه تولید و ارسال دادهها را دارد، و هر consumer میتواند با subscribe کردن (ثبت نام) در یک channel مشترک، دادههای جدید را دریافت کند. این مدل به شما اجازه میدهد تا به سادگی چندین consumer را همزمان به یک منبع داده وصل کنید و مدیریت رویدادهای همزمان، صفهای پیام (message queue)، بروزرسانیهای لحظهای، یا حتی سیستمهای نوتیفیکیشن را به صورت concurrent و بدون بلاک شدن یا پیچیدگی زیاد پیادهسازی کنید.
کاربردهای Subscription در Go بسیار گسترده است: از مدیریت پیامهای real-time (مثل ارسال اعلان در اپلیکیشنها)، اتصال میکروسرویسها، پیادهسازی سیستمهای event sourcing و message broker گرفته تا جمعآوری لاگهای زنده یا حتی مانیتورینگ سرویسهای حیاتی. مزیت اصلی این الگو جداسازی کامل بین تولیدکننده و مصرفکننده (decoupling)، مقیاسپذیری، و سادگی توسعه و تست در معماریهای concurrent و reactive است.
9.4.13.2 دیاگرام #
9.4.13.3 نمونه کد #
package main
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"time"
)
const exampleAPIAddress = "https://random-data-api.com/api/stripe/random_stripe"
type Card struct {
Id uint `json:"id"`
Uid string `json:"uid"`
ValidCard string `json:"valid_card"`
Token string `json:"token"`
InvalidCard string `json:"invalid_card"`
Month string `json:"month"`
Year string `json:"year"`
CCV string `json:"ccv"`
CCVAmex string `json:"ccv_amex"`
}
type Subscription interface {
Updates() <-chan Card
}
type Fetcher interface {
Fetch() (Card, error)
}
type sub struct {
fetcher Fetcher
updates chan Card
}
type fetcher struct {
url string
}
type fetchResult struct {
fetchedCard Card
err error
}
// NewSubscription create subscription for fetch data per freq time in second
func NewSubscription(ctx context.Context, fetcher Fetcher, freq uint) Subscription {
s := &sub{
fetcher: fetcher,
updates: make(chan Card),
}
go s.serve(ctx, freq)
return s
}
func NewFetcher(url string) Fetcher {
return &fetcher{
url: url,
}
}
func (f *fetcher) Fetch() (Card, error) {
return requestAPI(f.url)
}
func (s *sub) serve(ctx context.Context, freq uint) {
ticker := time.NewTicker(time.Duration(freq) * time.Second)
done := make(chan fetchResult, 1)
var (
fetchedCard Card
fetchResponseStream chan Card
pending bool
)
for {
if pending {
fetchResponseStream = s.updates
} else {
fetchResponseStream = nil
}
select {
case <-ticker.C:
if pending {
break
}
go func() {
fetched, err := s.fetcher.Fetch()
done <- fetchResult{fetched, err}
}()
case result := <-done:
fetchedCard = result.fetchedCard
if result.err != nil {
log.Printf("fetch got error %v", result.err)
break
}
pending = true
case fetchResponseStream <- fetchedCard:
pending = false
case <-ctx.Done():
return
}
}
}
func (s *sub) Updates() <-chan Card {
return s.updates
}
func requestAPI(url string) (Card, error) {
card := Card{}
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return Card{}, err
}
res, err := http.DefaultClient.Do(req)
if err != nil {
return Card{}, err
}
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return Card{}, err
}
if err := json.Unmarshal(body, &card); err != nil {
return Card{}, err
}
return card, nil
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
sub := NewSubscription(ctx, NewFetcher(exampleAPIAddress), 3)
time.AfterFunc(1*time.Minute, func() {
cancel()
log.Println("canceled subscription task")
os.Exit(0)
})
for card := range sub.Updates() {
fmt.Println(card)
}
}
در این مثال، ابتدا یک اینترفیس به نام Subscription تعریف شده است که متدی به نام Updates
دارد و کانالی از نوع Card
را برمیگرداند. این کانال نقش مسیر ارتباطی را بین تولیدکننده و مصرفکننده ایفا میکند، به طوری که مصرفکننده میتواند به طور همزمان و غیرمسدود دادههای جدید را دریافت کند. همچنین اینترفیس Fetcher طراحی شده که وظیفهی فراخوانی API و دریافت دادهها را بر عهده دارد و متد Fetch
را ارائه میدهد. این تفکیک وظایف باعث میشود کد قابلیت توسعه و تست بیشتری داشته باشد.
تابع NewSubscription
به عنوان سازنده Subscription عمل میکند؛ این تابع یک struct از نوع sub
ایجاد میکند که حاوی fetcher و یک کانال updates
است. سپس متد serve
به صورت یک goroutine اجرا میشود تا عملیات fetch را در فواصل زمانی مشخص (که با پارامتر freq تعیین میشود) تکرار کند. درون این متد از time.Ticker
برای زمانبندی دقیق استفاده شده است تا به صورت منظم و بدون ایجاد سربار اضافی، دادهها را از API فراخوانی کند و در صورت دریافت موفقیتآمیز، آنها را در کانال منتشر نماید. همچنین با کمک متغیر pending
اطمینان حاصل میشود که یک fetch جدید تا قبل از اتمام fetch قبلی آغاز نشود، بنابراین از فشار بیش از حد به سرویس جلوگیری میشود.
مصرفکنندگان دادهها از طریق متد Updates
به کانال updates
دسترسی دارند و به محض دریافت دادههای جدید، میتوانند پردازش خود را آغاز کنند. استفاده از context.Context
در این ساختار اجازه میدهد که در هر زمان عملیات fetch به صورت ایمن لغو شود و goroutine مربوطه به سرعت و بدون باقی ماندن در حالت بلاکشده خاتمه یابد. این طراحی باعث میشود که برنامه همزمانی بهینهای داشته باشد، منابع به خوبی مدیریت شود و کد قابلیت خوانایی، توسعه و تست آسان را حفظ کند.
در کل، این الگو ترکیبی از بهترین شیوههای Go در مدیریت جریان دادههای ناهمزمان، کنترل concurrency و ارتباط بین goroutineها است که برای دریافت دادههای زنده از API یا منابع خارجی بسیار مناسب است. با چنین معماری میتوان سیستمهایی تولید کرد که علاوه بر مقیاسپذیری بالا، مقاوم در برابر خطا و قابل کنترل نیز باشند.
9.4.13.4 کاربردها #
- دریافت اطلاعات از تولیدکنندهها (Publisher) یا سیستمهای Pub/Sub: الگوی Subscription به شما این امکان را میدهد که به سادگی به یک یا چند منبع داده (مانند سرویسهای پیامرسان، سیستمهای صف پیام، یا هر منبعی که به صورت publish/subscribe کار میکند) متصل شوید و دادههای جدید را به صورت همزمان و غیرمسدود دریافت کنید. این کار باعث میشود مصرفکنندهها به صورت real-time یا نزدیک به real-time اطلاعات را دریافت و پردازش نمایند و از پیچیدگیهای مدیریت اتصال یا polling مکرر بینیاز شوند.
- همگامسازی دادهها از APIهای خارجی: در بسیاری از برنامهها نیاز است دادهها یا وضعیت از سرویسهای خارجی (مانند RESTful APIها، سرویسهای ابری یا سیستمهای تحلیلی) به صورت دورهای یا بر اساس رویداد بهروزرسانی شوند. الگوی Subscription این امکان را فراهم میکند که بتوانید با تعریف یک سازوکار هوشمند برای دریافت بهروزرسانیها، مصرف دادهها را ساده، پایدار و بهینه کنید. این الگو به خصوص در سناریوهای real-time dashboards، اطلاعرسانی لحظهای و هماهنگسازی دادههای توزیعشده کاربرد فراوان دارد.