乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      手把手教姐姐寫消息隊(duì)列

       F2967527 2021-04-24

      前言

      這周姐姐入職了新公司,老板想探探他的底,看了一眼他的簡歷,呦呵,精通kafka,這小姑娘有兩下子,既然這樣,那你寫一個(gè)消息隊(duì)列吧。因?yàn)橐胓o語言寫,這可給姐姐愁壞了。趕緊來求助我,我這么堅(jiān)貞不屈一人,在姐姐的軟磨硬泡下還是答應(yīng)他了,所以接下來我就手把手教姐姐怎么寫一個(gè)消息隊(duì)列。下面我們就來看一看我是怎么寫的吧~~~。

      本代碼已上傳到我的github:

      有需要的小伙伴,可自行下載,順便給個(gè)小星星吧~~~

      什么是消息隊(duì)列

      姐姐真是把我愁壞了,自己寫的精通kafka,竟然不知道什么是消息隊(duì)列,于是,一向好脾氣的我開始給姐姐講一講什么是消息隊(duì)列。

      消息隊(duì)列,我們一般稱它為MQ(Message Queue),兩個(gè)單詞的結(jié)合,這兩個(gè)英文單詞想必大家都應(yīng)該知道吧,其實(shí)最熟悉的還是Queue吧,即隊(duì)列。隊(duì)列是一種先進(jìn)先出的數(shù)據(jù)結(jié)構(gòu),隊(duì)列的使用還是比較普遍的,但是已經(jīng)有隊(duì)列了,怎么還需要MQ呢?

      我:問你呢,姐姐,知道嗎?為什么還需要MQ

      姐姐:快點(diǎn)講,想挨打呀?

      我:噗。。。算我多嘴,哼~~~

      欠欠的我開始了接下來的耐心講解......

      舉一個(gè)簡單的例子,假設(shè)現(xiàn)在我們要做一個(gè)系統(tǒng),該登陸系統(tǒng)需要在用戶登陸成功后,發(fā)送封郵件到用戶郵箱進(jìn)行提醒,需求還是很簡單的,我們先看一看沒有MQ,我們該怎么實(shí)現(xiàn)呢?畫一個(gè)時(shí)序圖來看一看:

      圖片

      看這個(gè)圖,郵件發(fā)送在請求登陸時(shí)進(jìn)行,當(dāng)密碼驗(yàn)證成功后,就發(fā)送郵件,然后返回登陸成功。這樣是可以的,但是他是有缺陷的。這讓我們的登陸操作變得復(fù)雜了,每次請求登陸都需要進(jìn)行郵件發(fā)送,如果這里出現(xiàn)錯(cuò)誤,整個(gè)登陸請求也出現(xiàn)了錯(cuò)誤,導(dǎo)致登陸不成功;還有一個(gè)問題,本來我們登陸請求調(diào)用接口僅僅需要100ms,因?yàn)橹虚g要做一次發(fā)送郵件的等待,那么調(diào)用一次登陸接口的時(shí)間就要增長,這就是問題所在,一封郵件他的優(yōu)先級 不是很高的,用戶也不需要實(shí)時(shí)收到這封郵件,所以這時(shí),就體現(xiàn)了消息隊(duì)列的重要性了,我們用消息隊(duì)列進(jìn)行改進(jìn)一下。

      圖片

      這里我們將發(fā)送郵件請求放到Mq中,這樣我們就能提高用戶體驗(yàn)的吞吐量,這個(gè)很重要,顧客就是上帝嘛,畢竟也沒有人喜歡用一個(gè)很慢很慢的app。

      這里只是舉了MQ眾多應(yīng)用中的其中一個(gè),即異步應(yīng)用,MQ還在系統(tǒng)解藕、削峰/限流中有著重要應(yīng)用,這兩個(gè)我就不具體講解了,原理都一樣,好好思考一下,你們都能懂得。

      channel

      好啦,姐姐終于知道什么是消息隊(duì)列了,但是現(xiàn)在還是沒法進(jìn)行消息隊(duì)列開發(fā)的,因?yàn)檫€差一個(gè)知識(shí)點(diǎn),即go語言中的channel。這個(gè)很重要,我們還需要靠這個(gè)來開發(fā)我們的消息隊(duì)列呢。

      因篇幅有限,這里不詳細(xì)介紹channel,只介紹基本使用方法。

      什么是channel

      Goroutine 和 Channel 是 Go 語言并發(fā)編程的兩大基石。Goroutine 用于執(zhí)行并發(fā)任務(wù),Channel 用于 goroutine 之間的同步、通信。Go提倡使用通信的方法代替共享內(nèi)存,當(dāng)一個(gè)Goroutine需要和其他Goroutine資源共享時(shí),Channel就會(huì)在他們之間架起一座橋梁,并提供確保安全同步的機(jī)制。channel本質(zhì)上其實(shí)還是一個(gè)隊(duì)列,遵循FIFO原則。具體規(guī)則如下:

      • 先從 Channel 讀取數(shù)據(jù)的 Goroutine 會(huì)先接收到數(shù)據(jù);
      • 先向 Channel 發(fā)送數(shù)據(jù)的 Goroutine 會(huì)得到先發(fā)送數(shù)據(jù)的權(quán)利;

      創(chuàng)建通道

      創(chuàng)建通道需要用到關(guān)鍵字 make ,格式如下:

      通道實(shí)例 := make(chan 數(shù)據(jù)類型)
      • 數(shù)據(jù)類型:通道內(nèi)傳輸?shù)脑仡愋汀?/section>
      • 通道實(shí)例:通過make創(chuàng)建的通道句柄。

      無緩沖通道的使用

      Go語言中無緩沖的通道(unbuffered channel)是指在接收前沒有能力保存任何值的通道。這種類型的通道要求發(fā)送 goroutine 和接收 goroutine 同時(shí)準(zhǔn)備好,才能完成發(fā)送和接收操作。

      無緩沖通道的定義方式如下:

      通道實(shí)例 := make(chan 通道類型)
      • 通道類型:和無緩沖通道用法一致,影響通道發(fā)送和接收的數(shù)據(jù)類型。
      • 緩沖大?。?
      • 通道實(shí)例:被創(chuàng)建出的通道實(shí)例。

      寫個(gè)例子來幫助大家理解一下吧:

      package main

      import (
          'sync'
          'time'
      )

      func main() {
          c := make(chan string)

          var wg sync.WaitGroup
          wg.Add(2)

          go func() {
              defer wg.Done()
              c <- `Golang夢工廠`
          }()

          go func() {
              defer wg.Done()

              time.Sleep(time.Second * 1)
              println(`Message: `+ <-c)
          }()

          wg.Wait()
      }

      帶緩沖的通道的使用

      Go語言中有緩沖的通道(buffered channel)是一種在被接收前能存儲(chǔ)一個(gè)或者多個(gè)值的通道。這種類型的通道并不強(qiáng)制要求 goroutine 之間必須同時(shí)完成發(fā)送和接收。通道會(huì)阻塞發(fā)送和接收動(dòng)作的條件也會(huì)不同。只有在通道中沒有要接收的值時(shí),接收動(dòng)作才會(huì)阻塞。只有在通道沒有可用緩沖區(qū)容納被發(fā)送的值時(shí),發(fā)送動(dòng)作才會(huì)阻塞。

      有緩沖通道的定義方式如下:

      通道實(shí)例 := make(chan 通道類型, 緩沖大小)
      • 通道類型:和無緩沖通道用法一致,影響通道發(fā)送和接收的數(shù)據(jù)類型。
      • 緩沖大小:決定通道最多可以保存的元素?cái)?shù)量。
      • 通道實(shí)例:被創(chuàng)建出的通道實(shí)例。

      來寫一個(gè)例子講解一下:

      package main

      import (
          'sync'
          'time'
      )

      func main() {
          c := make(chan string2)

          var wg sync.WaitGroup
          wg.Add(2)

          go func() {
              defer wg.Done()

              c <- `Golang夢工廠`
              c <- `asong`
          }()

          go func() {
              defer wg.Done()

              time.Sleep(time.Second * 1)
              println(`公眾號: `+ <-c)
              println(`作者: `+ <-c)
          }()

          wg.Wait()
      }

      好啦,通道的概念就介紹到這里了,如果需要,下一篇我出一個(gè)channel詳細(xì)講解的文章。

      消息隊(duì)列編碼實(shí)現(xiàn)

      準(zhǔn)備篇

      終于開始進(jìn)入主題了,姐姐都聽的快要睡著了,我轟隆一嗓子,立馬精神,但是呢,asong也是挨了一頓小電炮,代價(jià)慘痛呀,嗚嗚嗚............

      在開始編寫代碼編寫直接,我需要構(gòu)思我們的整個(gè)代碼架構(gòu),這才是正確的編碼方式。我們先來定義一個(gè)接口,把我們需要實(shí)現(xiàn)的方法先列出來,后期對每一個(gè)代碼進(jìn)行實(shí)現(xiàn)就可以了。因此可以列出如下方法:

      type Broker interface {
       publish(topic string, msg interface{}) error
       subscribe(topic string) (<-chan interface{}, error)
       unsubscribe(topic string, sub <-chan interface{}) error
       close()
       broadcast(msg interface{}, subscribers []chan interface{})
       setConditions(capacity int)
      }
      • publish:進(jìn)行消息的推送,有兩個(gè)參數(shù)即topicmsg,分別是訂閱的主題、要傳遞的消息
      • subscribe:消息的訂閱,傳入訂閱的主題,即可完成訂閱,并返回對應(yīng)的channel通道用來接收數(shù)據(jù)
      • unsubscribe:取消訂閱,傳入訂閱的主題和對應(yīng)的通道
      • Golang夢工廠 發(fā)起了一個(gè)讀者討論 小伙伴們,你們有什么想學(xué)的嘛,可以留言的呦
      • close:這個(gè)的作用就是很明顯了,就是用來關(guān)閉消息隊(duì)列的
      • broadCast:這個(gè)屬于內(nèi)部方法,作用是進(jìn)行廣播,對推送的消息進(jìn)行廣播,保證每一個(gè)訂閱者都可以收到
      • setConditions:這里是用來設(shè)置條件,條件就是消息隊(duì)列的容量,這樣我們就可以控制消息隊(duì)列的大小了

      細(xì)心的你們有沒有發(fā)現(xiàn)什么問題,這些代碼我都定義的是內(nèi)部方法,也就是包外不可用。為什么這么做呢,因?yàn)檫@里屬于代理要做的事情,我們還需要在封裝一層,也就是客戶端能直接調(diào)用的方法,這樣才符合軟件架構(gòu)。因此可以寫出如下代碼:

      package mq


      type Client struct {
       bro *BrokerImpl
      }

      func NewClient() *Client {
       return &Client{
        bro: NewBroker(),
       }
      }

      func (c *Client)SetConditions(capacity int)  {
       c.bro.setConditions(capacity)
      }

      func (c *Client)Publish(topic string, msg interface{}) error{
       return c.bro.publish(topic,msg)
      }

      func (c *Client)Subscribe(topic string) (<-chan interface{}, error){
       return c.bro.subscribe(topic)
      }

      func (c *Client)Unsubscribe(topic string, sub <-chan interface{}) error {
       return c.bro.unsubscribe(topic,sub)
      }

      func (c *Client)Close()  {
        c.bro.close()
      }

      func (c *Client)GetPayLoad(sub <-chan interface{})  interface{}{
       for val:= range sub{
        if val != nil{
         return val
        }
       }
       return nil
      }

      上面只是準(zhǔn)好了代碼結(jié)構(gòu),但是消息隊(duì)列實(shí)現(xiàn)的結(jié)構(gòu)我們還沒有設(shè)計(jì),現(xiàn)在我們就來設(shè)計(jì)一下。

      type BrokerImpl struct {
       exit chan bool
       capacity int

       topics map[string][]chan interface{} // key:topic value :queue
       sync.RWMutex // 同步鎖
      }
      • exit:也是一個(gè)通道,這個(gè)用來做關(guān)閉消息隊(duì)列用的
      • capacity:即用來設(shè)置消息隊(duì)列的容量
      • topics:這里使用一個(gè)map結(jié)構(gòu),key即是topic,其值則是一個(gè)切片,chan類型,這里這么做的原因是我們一個(gè)topic可以有多個(gè)訂閱者,所以一個(gè)訂閱者對應(yīng)著一個(gè)通道
      • sync.RWMutex:讀寫鎖,這里是為了防止并發(fā)情況下,數(shù)據(jù)的推送出現(xiàn)錯(cuò)誤,所以采用加鎖的方式進(jìn)行保證

      好啦,現(xiàn)在我們已經(jīng)準(zhǔn)備的很充分啦,開始接下來方法填充之旅吧~~~

      Publishbroadcast

      這里兩個(gè)合在一起講的原因是braodcast是屬于publish里的。這里的思路很簡單,我們只需要把傳入的數(shù)據(jù)進(jìn)行廣播即可了,下面我們來看代碼實(shí)現(xiàn):

      func (b *BrokerImpl) publish(topic string, pub interface{}) error {
       select {
       case <-b.exit:
        return errors.New('broker closed')
       default:
       }

       b.RLock()
       subscribers, ok := b.topics[topic]
       b.RUnlock()
       if !ok {
        return nil
       }

       b.broadcast(pub, subscribers)
       return nil
      }


      func (b *BrokerImpl) broadcast(msg interface{}, subscribers []chan interface{}) {
       count := len(subscribers)
       concurrency := 1

       switch {
       case count > 1000:
        concurrency = 3
       case count > 100:
        concurrency = 2
       default:
        concurrency = 1
       }
       pub := func(start int) {
        for j := start; j < count; j += concurrency {
         select {
         case subscribers[j] <- msg:
         case <-time.After(time.Millisecond * 5):
         case <-b.exit:
          return
         }
        }
       }
       for i := 0; i < concurrency; i++ {
        go pub(i)
       }
      }

      publish方法中沒有什么好講的,這里主要說一下broadcast的實(shí)現(xiàn):

      這里主要對數(shù)據(jù)進(jìn)行廣播,所以數(shù)據(jù)推送出去就可以了,沒必要一直等著他推送成功,所以這里我們我們采用goroutine。在推送的時(shí)候,當(dāng)推送失敗時(shí),我們也不能一直等待呀,所以這里我們加了一個(gè)超時(shí)機(jī)制,超過5毫秒就停止推送,接著進(jìn)行下面的推送。

      可能你們會(huì)有疑惑,上面怎么還有一個(gè)switch選項(xiàng)呀,干什么用的呢?考慮這樣一個(gè)問題,當(dāng)有大量的訂閱者時(shí),,比如10000個(gè),我們一個(gè)for循環(huán)去做消息的推送,那推送一次就會(huì)耗費(fèi)很多時(shí)間,并且不同的消費(fèi)者之間也會(huì)產(chǎn)生延時(shí),,所以采用這種方法進(jìn)行分解可以降低一定的時(shí)間。

      subscribeunsubScribe

      我們先來看代碼:

      func (b *BrokerImpl) subscribe(topic string) (<-chan interface{}, error) {
       select {
       case <-b.exit:
        return nil, errors.New('broker closed')
       default:
       }

       ch := make(chan interface{}, b.capacity)
       b.Lock()
       b.topics[topic] = append(b.topics[topic], ch)
       b.Unlock()
       return ch, nil
      }
      func (b *BrokerImpl) unsubscribe(topic string, sub <-chan interface{}) error {
       select {
       case <-b.exit:
        return errors.New('broker closed')
       default:
       }

       b.RLock()
       subscribers, ok := b.topics[topic]
       b.RUnlock()

       if !ok {
        return nil
       }
       // delete subscriber
       var newSubs []chan interface{}
       for _, subscriber := range subscribers {
        if subscriber == sub {
         continue
        }
        newSubs = append(newSubs, subscriber)
       }

       b.Lock()
       b.topics[topic] = newSubs
       b.Unlock()
       return nil
      }

      這里其實(shí)就很簡單了:

      • subscribe:這里的實(shí)現(xiàn)則是為訂閱的主題創(chuàng)建一個(gè)channel,然后將訂閱者加入到對應(yīng)的topic中就可以了,并且返回一個(gè)接收channel
      • unsubScribe:這里實(shí)現(xiàn)的思路就是將我們剛才添加的channel刪除就可以了。

      close

      func (b *BrokerImpl) close()  {
       select {
       case <-b.exit:
        return
       default:
        close(b.exit)
        b.Lock()
        b.topics = make(map[string][]chan interface{})
        b.Unlock()
       }
       return
      }

      這里就是為了關(guān)閉整個(gè)消息隊(duì)列,這句代碼b.topics = make(map[string][]chan interface{})比較重要,這里主要是為了保證下一次使用該消息隊(duì)列不發(fā)生沖突。

      setConditions GetPayLoad

      還差最后兩個(gè)方法,一個(gè)是設(shè)置我們的消息隊(duì)列容量,另一個(gè)是封裝一個(gè)方法來獲取我們訂閱的消息:

      func (b *BrokerImpl)setConditions(capacity int)  {
       b.capacity = capacity
      }
      func (c *Client)GetPayLoad(sub <-chan interface{})  interface{}{
       for val:= range sub{
        if val != nil{
         return val
        }
       }
       return nil
      }

      測試

      好啦,代碼這么快就被寫完了,接下來我們進(jìn)行測試一下吧。

      單元測試

      正式測試之前,我們還是需要先進(jìn)行一下單元測試,養(yǎng)成好的習(xí)慣,只有先自測了,才能有底氣說我的代碼沒問題,要不直接跑程序,會(huì)出現(xiàn)很多bug的。

      這里我們測試方法如下:我們向不同的topic發(fā)送不同的信息,當(dāng)訂閱者收到消息后,就行取消訂閱。

      func TestClient(t *testing.T) {
       b := NewClient()
       b.SetConditions(100)
       var wg sync.WaitGroup

       for i := 0; i < 100; i++ {
        topic := fmt.Sprintf('Golang夢工廠%d', i)
        payload := fmt.Sprintf('asong%d', i)

        ch, err := b.Subscribe(topic)
        if err != nil {
         t.Fatal(err)
        }

        wg.Add(1)
        go func() {
         e := b.GetPayLoad(ch)
         if e != payload {
          t.Fatalf('%s expected %s but get %s', topic, payload, e)
         }
         if err := b.Unsubscribe(topic, ch); err != nil {
          t.Fatal(err)
         }
         wg.Done()
        }()

        if err := b.Publish(topic, payload); err != nil {
         t.Fatal(err)
        }
       }

       wg.Wait()
      }

      測試通過,沒問題,接下來我們在寫幾個(gè)方法測試一下

      測試

      這里分為兩種方式測試

      測試一:使用一個(gè)定時(shí)器,向一個(gè)主題定時(shí)推送消息.

      // 一個(gè)topic 測試
      func OnceTopic()  {
       m := mq.NewClient()
       m.SetConditions(10)
       ch,err :=m.Subscribe(topic)
       if err != nil{
        fmt.Println('subscribe failed')
        return
       }
       go OncePub(m)
       OnceSub(ch,m)
       defer m.Close()
      }

      // 定時(shí)推送
      func OncePub(c *mq.Client)  {
       t := time.NewTicker(10 * time.Second)
       defer t.Stop()
       for  {
        select {
        case <- t.C:
         err := c.Publish(topic,'asong真帥')
         if err != nil{
          fmt.Println('pub message failed')
         }
        default:

        }
       }
      }

      // 接受訂閱消息
      func OnceSub(m <-chan interface{},c *mq.Client)  {
       for  {
        val := c.GetPayLoad(m)
        fmt.Printf('get message is %s\n',val)
       }
      }

      測試二:使用一個(gè)定時(shí)器,定時(shí)向多個(gè)主題發(fā)送消息:

      //多個(gè)topic測試
      func ManyTopic()  {
       m := mq.NewClient()
       defer m.Close()
       m.SetConditions(10)
       top := ''
       for i:=0;i<10;i++{
        top = fmt.Sprintf('Golang夢工廠_%02d',i)
        go Sub(m,top)
       }
       ManyPub(m)
      }

      func ManyPub(c *mq.Client)  {
       t := time.NewTicker(10 * time.Second)
       defer t.Stop()
       for  {
        select {
        case <- t.C:
         for i:= 0;i<10;i++{
          //多個(gè)topic 推送不同的消息
          top := fmt.Sprintf('Golang夢工廠_%02d',i)
          payload := fmt.Sprintf('asong真帥_%02d',i)
          err := c.Publish(top,payload)
          if err != nil{
           fmt.Println('pub message failed')
          }
         }
        default:

        }
       }
      }

      func Sub(c *mq.Client,top string)  {
       ch,err := c.Subscribe(top)
       if err != nil{
        fmt.Printf('sub top:%s failed\n',top)
       }
       for  {
        val := c.GetPayLoad(ch)
        if val != nil{
         fmt.Printf('%s get message is %s\n',top,val)
        }
       }
      }

      總結(jié)

      終于幫助姐姐解決了這個(gè)問題,姐姐開心死了,給我一頓親,啊不對,是一頓夸,夸的人家都不好意思了。

      這一篇你學(xué)會(huì)了嗎?沒學(xué)會(huì)不要緊,趕快去把源代碼下載下來,好好通讀一下,很好理解的~~~。

      其實(shí)這一篇是為了接下來的kafka學(xué)習(xí)打基礎(chǔ)的,學(xué)好了這一篇,接下來學(xué)習(xí)的kafka就會(huì)容易很多啦~~~

      github地址:https://github.com/asong2020/Golang_Dream/tree/master/code_demo/queue

      如果能給一個(gè)小星星就好了~~~

        本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點(diǎn)擊一鍵舉報(bào)。
        轉(zhuǎn)藏 分享 獻(xiàn)花(0

        0條評論

        發(fā)表

        請遵守用戶 評論公約

        類似文章 更多