9.4.13 الگو Subscription

9.4.13 الگو Subscription

9.4.13.1 توضیحات #

الگوی Subscription (یا Pub-Sub / Observer Pattern) یکی از پرکاربردترین الگوها در معماری‌های رویداد-محور و همزمان (event-driven & concurrent) است که امکان ثبت‌نام (subscribe) یک یا چند مصرف‌کننده (consumer) را برای دریافت خودکار داده‌های جدید از یک منبع یا سرویس فراهم می‌کند. در این الگو، یک یا چند مصرف‌کننده به یک “آدرس” یا منبع اشتراک (مثلاً یک topic، کانال یا event source) متصل می‌شوند و هر زمان که داده یا رویداد جدیدی منتشر شد (publish)، اطلاعات به طور خودکار و بی‌نیاز از polling مکرر به همه‌ی مصرف‌کننده‌های عضو ارسال می‌شود.

در زبان Go، پیاده‌سازی Subscription اغلب با استفاده از channelها و goroutineها انجام می‌شود: یک goroutine به عنوان publisher وظیفه تولید و ارسال داده‌ها را دارد، و هر consumer می‌تواند با subscribe کردن (ثبت نام) در یک channel مشترک، داده‌های جدید را دریافت کند. این مدل به شما اجازه می‌دهد تا به سادگی چندین consumer را همزمان به یک منبع داده وصل کنید و مدیریت رویدادهای همزمان، صف‌های پیام (message queue)، بروزرسانی‌های لحظه‌ای، یا حتی سیستم‌های نوتیفیکیشن را به صورت concurrent و بدون بلاک شدن یا پیچیدگی زیاد پیاده‌سازی کنید.

کاربردهای Subscription در Go بسیار گسترده است: از مدیریت پیام‌های real-time (مثل ارسال اعلان در اپلیکیشن‌ها)، اتصال میکروسرویس‌ها، پیاده‌سازی سیستم‌های event sourcing و message broker گرفته تا جمع‌آوری لاگ‌های زنده یا حتی مانیتورینگ سرویس‌های حیاتی. مزیت اصلی این الگو جداسازی کامل بین تولیدکننده و مصرف‌کننده (decoupling)، مقیاس‌پذیری، و سادگی توسعه و تست در معماری‌های concurrent و reactive است.

9.4.13.2 دیاگرام #

flowchart TD Publisher[Publisher / Source] Sub1[Subscriber 1] Sub2[Subscriber 2] SubN[Subscriber N] Topic[Channel / Topic] Publisher -- "Publish Data" --> Topic Topic -- "Push Update" --> Sub1 Topic -- "Push Update" --> Sub2 Topic -- "Push Update" --> SubN Sub1 -- "Subscribe" --> Topic Sub2 -- "Subscribe" --> Topic SubN -- "Subscribe" --> Topic style Topic fill:#e2f0fc,stroke:#377dbf,stroke-width:2px style Publisher fill:#f5e8ff,stroke:#b486e5,stroke-width:2px style Sub1,Sub2,SubN fill:#e9fbe7,stroke:#6dc165,stroke-width:2px

9.4.13.3 نمونه کد #

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"os"
	"time"
)

const exampleAPIAddress = "https://random-data-api.com/api/stripe/random_stripe"

type Card struct {
	Id          uint   `json:"id"`
	Uid         string `json:"uid"`
	ValidCard   string `json:"valid_card"`
	Token       string `json:"token"`
	InvalidCard string `json:"invalid_card"`
	Month       string `json:"month"`
	Year        string `json:"year"`
	CCV         string `json:"ccv"`
	CCVAmex     string `json:"ccv_amex"`
}

type Subscription interface {
	Updates() <-chan Card
}

type Fetcher interface {
	Fetch() (Card, error)
}

type sub struct {
	fetcher Fetcher
	updates chan Card
}

type fetcher struct {
	url string
}

type fetchResult struct {
	fetchedCard Card
	err         error
}

// NewSubscription create subscription for fetch data per freq time in second
func NewSubscription(ctx context.Context, fetcher Fetcher, freq uint) Subscription {
	s := &sub{
		fetcher: fetcher,
		updates: make(chan Card),
	}
	go s.serve(ctx, freq)
	return s
}

func NewFetcher(url string) Fetcher {
	return &fetcher{
		url: url,
	}
}

func (f *fetcher) Fetch() (Card, error) {
	return requestAPI(f.url)
}

func (s *sub) serve(ctx context.Context, freq uint) {
	ticker := time.NewTicker(time.Duration(freq) * time.Second)
	done := make(chan fetchResult, 1)

	var (
		fetchedCard         Card
		fetchResponseStream chan Card
		pending             bool
	)

	for {

		if pending {
			fetchResponseStream = s.updates
		} else {
			fetchResponseStream = nil
		}

		select {
		case <-ticker.C:
			if pending {
				break
			}
			go func() {
				fetched, err := s.fetcher.Fetch()
				done <- fetchResult{fetched, err}
			}()
		case result := <-done:
			fetchedCard = result.fetchedCard
			if result.err != nil {
				log.Printf("fetch got error %v", result.err)
				break
			}
			pending = true
		case fetchResponseStream <- fetchedCard:
			pending = false
		case <-ctx.Done():
			return
		}
	}
}

func (s *sub) Updates() <-chan Card {
	return s.updates
}

func requestAPI(url string) (Card, error) {
	card := Card{}
	req, err := http.NewRequest(http.MethodGet, url, nil)
	if err != nil {
		return Card{}, err
	}
	res, err := http.DefaultClient.Do(req)
	if err != nil {
		return Card{}, err
	}
	body, err := ioutil.ReadAll(res.Body)
	if err != nil {
		return Card{}, err
	}
	if err := json.Unmarshal(body, &card); err != nil {
		return Card{}, err
	}
	return card, nil
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	sub := NewSubscription(ctx, NewFetcher(exampleAPIAddress), 3)

	time.AfterFunc(1*time.Minute, func() {
		cancel()
		log.Println("canceled subscription task")
		os.Exit(0)
	})

	for card := range sub.Updates() {
		fmt.Println(card)
	}
}

در این مثال، ابتدا یک اینترفیس به نام Subscription تعریف شده است که متدی به نام Updates دارد و کانالی از نوع Card را برمی‌گرداند. این کانال نقش مسیر ارتباطی را بین تولیدکننده و مصرف‌کننده ایفا می‌کند، به طوری که مصرف‌کننده می‌تواند به طور همزمان و غیرمسدود داده‌های جدید را دریافت کند. همچنین اینترفیس Fetcher طراحی شده که وظیفه‌ی فراخوانی API و دریافت داده‌ها را بر عهده دارد و متد Fetch را ارائه می‌دهد. این تفکیک وظایف باعث می‌شود کد قابلیت توسعه و تست بیشتری داشته باشد.

تابع NewSubscription به عنوان سازنده Subscription عمل می‌کند؛ این تابع یک struct از نوع sub ایجاد می‌کند که حاوی fetcher و یک کانال updates است. سپس متد serve به صورت یک goroutine اجرا می‌شود تا عملیات fetch را در فواصل زمانی مشخص (که با پارامتر freq تعیین می‌شود) تکرار کند. درون این متد از time.Ticker برای زمان‌بندی دقیق استفاده شده است تا به صورت منظم و بدون ایجاد سربار اضافی، داده‌ها را از API فراخوانی کند و در صورت دریافت موفقیت‌آمیز، آن‌ها را در کانال منتشر نماید. همچنین با کمک متغیر pending اطمینان حاصل می‌شود که یک fetch جدید تا قبل از اتمام fetch قبلی آغاز نشود، بنابراین از فشار بیش از حد به سرویس جلوگیری می‌شود.

مصرف‌کنندگان داده‌ها از طریق متد Updates به کانال updates دسترسی دارند و به محض دریافت داده‌های جدید، می‌توانند پردازش خود را آغاز کنند. استفاده از context.Context در این ساختار اجازه می‌دهد که در هر زمان عملیات fetch به صورت ایمن لغو شود و goroutine مربوطه به سرعت و بدون باقی ماندن در حالت بلاک‌شده خاتمه یابد. این طراحی باعث می‌شود که برنامه همزمانی بهینه‌ای داشته باشد، منابع به خوبی مدیریت شود و کد قابلیت خوانایی، توسعه و تست آسان را حفظ کند.

در کل، این الگو ترکیبی از بهترین شیوه‌های Go در مدیریت جریان داده‌های ناهمزمان، کنترل concurrency و ارتباط بین goroutineها است که برای دریافت داده‌های زنده از API یا منابع خارجی بسیار مناسب است. با چنین معماری می‌توان سیستم‌هایی تولید کرد که علاوه بر مقیاس‌پذیری بالا، مقاوم در برابر خطا و قابل کنترل نیز باشند.

9.4.13.4 کاربردها #

  • دریافت اطلاعات از تولیدکننده‌ها (Publisher) یا سیستم‌های Pub/Sub: الگوی Subscription به شما این امکان را می‌دهد که به سادگی به یک یا چند منبع داده (مانند سرویس‌های پیام‌رسان، سیستم‌های صف پیام، یا هر منبعی که به صورت publish/subscribe کار می‌کند) متصل شوید و داده‌های جدید را به صورت همزمان و غیرمسدود دریافت کنید. این کار باعث می‌شود مصرف‌کننده‌ها به صورت real-time یا نزدیک به real-time اطلاعات را دریافت و پردازش نمایند و از پیچیدگی‌های مدیریت اتصال یا polling مکرر بی‌نیاز شوند.
  • همگام‌سازی داده‌ها از APIهای خارجی: در بسیاری از برنامه‌ها نیاز است داده‌ها یا وضعیت از سرویس‌های خارجی (مانند RESTful APIها، سرویس‌های ابری یا سیستم‌های تحلیلی) به صورت دوره‌ای یا بر اساس رویداد به‌روزرسانی شوند. الگوی Subscription این امکان را فراهم می‌کند که بتوانید با تعریف یک سازوکار هوشمند برای دریافت به‌روزرسانی‌ها، مصرف داده‌ها را ساده، پایدار و بهینه کنید. این الگو به خصوص در سناریوهای real-time dashboards، اطلاع‌رسانی لحظه‌ای و هماهنگ‌سازی داده‌های توزیع‌شده کاربرد فراوان دارد.