9.4.15.1 توضیحات #
الگو Subscription یکی از الگوهای کاربردی برای پیاده سازی consumer می باشد که به یک آدرسی مشترک شوید و در بازده زمانی مختلف درخواست دهید و یکسری اطلاعات دریافت کنید.
9.4.15.2 دیاگرام #
9.4.15.3 نمونه کد #
1package main
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io/ioutil"
8 "log"
9 "net/http"
10 "os"
11 "time"
12)
13
14const exampleAPIAddress = "https://random-data-api.com/api/stripe/random_stripe"
15
16type Card struct {
17 Id uint `json:"id"`
18 Uid string `json:"uid"`
19 ValidCard string `json:"valid_card"`
20 Token string `json:"token"`
21 InvalidCard string `json:"invalid_card"`
22 Month string `json:"month"`
23 Year string `json:"year"`
24 CCV string `json:"ccv"`
25 CCVAmex string `json:"ccv_amex"`
26}
27
28type Subscription interface {
29 Updates() <-chan Card
30}
31
32type Fetcher interface {
33 Fetch() (Card, error)
34}
35
36type sub struct {
37 fetcher Fetcher
38 updates chan Card
39}
40
41type fetcher struct {
42 url string
43}
44
45type fetchResult struct {
46 fetchedCard Card
47 err error
48}
49
50// NewSubscription create subscription for fetch data per freq time in second
51func NewSubscription(ctx context.Context, fetcher Fetcher, freq uint) Subscription {
52 s := &sub{
53 fetcher: fetcher,
54 updates: make(chan Card),
55 }
56 go s.serve(ctx, freq)
57 return s
58}
59
60func NewFetcher(url string) Fetcher {
61 return &fetcher{
62 url: url,
63 }
64}
65
66func (f *fetcher) Fetch() (Card, error) {
67 return requestAPI(f.url)
68}
69
70func (s *sub) serve(ctx context.Context, freq uint) {
71 ticker := time.NewTicker(time.Duration(freq) * time.Second)
72 done := make(chan fetchResult, 1)
73
74 var (
75 fetchedCard Card
76 fetchResponseStream chan Card
77 pending bool
78 )
79
80 for {
81
82 if pending {
83 fetchResponseStream = s.updates
84 } else {
85 fetchResponseStream = nil
86 }
87
88 select {
89 case <-ticker.C:
90 if pending {
91 break
92 }
93 go func() {
94 fetched, err := s.fetcher.Fetch()
95 done <- fetchResult{fetched, err}
96 }()
97 case result := <-done:
98 fetchedCard = result.fetchedCard
99 if result.err != nil {
100 log.Printf("fetch got error %v", result.err)
101 break
102 }
103 pending = true
104 case fetchResponseStream <- fetchedCard:
105 pending = false
106 case <-ctx.Done():
107 return
108 }
109 }
110}
111
112func (s *sub) Updates() <-chan Card {
113 return s.updates
114}
115
116func requestAPI(url string) (Card, error) {
117 card := Card{}
118 req, err := http.NewRequest(http.MethodGet, url, nil)
119 if err != nil {
120 return Card{}, err
121 }
122 res, err := http.DefaultClient.Do(req)
123 if err != nil {
124 return Card{}, err
125 }
126 body, err := ioutil.ReadAll(res.Body)
127 if err != nil {
128 return Card{}, err
129 }
130 if err := json.Unmarshal(body, &card); err != nil {
131 return Card{}, err
132 }
133 return card, nil
134}
135
136func main() {
137 ctx, cancel := context.WithCancel(context.Background())
138 sub := NewSubscription(ctx, NewFetcher(exampleAPIAddress), 3)
139
140 time.AfterFunc(1*time.Minute, func() {
141 cancel()
142 log.Println("canceled subscription task")
143 os.Exit(0)
144 })
145
146 for card := range sub.Updates() {
147 fmt.Println(card)
148 }
149}
1$ go run main.go
2{4643 add2475a-ed64-4039-831d-0e95469752d9 371449635398431 tok_mastercard_debit 4000000000000101 01 2024 920 7875}
3{6992 a89e3d71-785a-4d37-9639-be3ce7534257 2223003122003222 tok_discover 4000000000000069 11 2024 660 5241}
4{9287 f665526e-1b34-46f5-9d9d-50362631ed0f 5200828282828210 tok_mastercard_debit 4000000000000036 05 2026 993 6272}
5{4956 e8ae8e75-0ff2-42e8-921c-5cc438d64fac 3566002020360505 tok_amex 4000000000000044 10 2024 371 9989}
6{1193 954d1b36-829b-4726-bbb7-0f5f01b3dd40 6011000990139424 tok_mastercard_debit 4000000000000341 12 2026 331 5119}
- برای گرفتن دیتا ما یک اینترفیس به نام Subscription با متد Updates تعریف کردیم.
- سپس یک اینترفیس دیگر به نام Fetcher برای گرفتن داده از API با متد Fetch تعریف کردیم.
- اکنون یک تابع به نام NewSubscription ایجاد کردیم context, NetFetcher و مدت زمان فاصله بین هر درخواست را پاس دادیم.
1ctx, cancel := context.WithCancel(context.Background())
2sub := NewSubscription(ctx, NewFetcher(exampleAPIAddress), 3)
- بعد از اینکه ساختار sub را راه اندازی کردیم متد serve را برای اجرا داخل گوروتین قرار دادیم.
1func NewSubscription(ctx context.Context, fetcher Fetcher, freq uint) Subscription {
2 s := &sub{
3 fetcher: fetcher,
4 updates: make(chan Card),
5 }
6 go s.serve(ctx, freq)
7 return s
8}
حال پس از هر time.Ticker به آدرس API مورد نظر بواسطه متد Fetch درخواست ارسال می شود و اطلاعات دریافت می شود به داخل کانال می فرستیم. در نهایت از طریق متد Updates می توانیم اطلاعات را دریافت کنیم.
9.4.15.4 کاربردها #
- دریافت اطلاعات از یک تولید کننده (Publisher) یا آدرسی (Pub/Sub)
- همگام سازی اطلاعات از یک API