乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      C#隊列學(xué)習(xí)筆記:RabbitMQ延遲隊列

       悅光陰 2021-04-05

          一、引言

          日常生活中,很多的APP都有延遲隊列的影子。比如在手機淘寶上,經(jīng)常遇到APP派發(fā)的限時消費紅包,一般有幾個小時或24小時不等。假如在紅包倒計時的過程中,沒有消費掉紅包的話,紅包會自動失效。假如上述行為使用RabbitMQ延時隊列來理解的話,就是在你收到限時消費紅包的時候,手機淘寶會自動發(fā)一條延時消息到隊列中以供消費。在規(guī)定時間內(nèi),則可正常消費,否則依TTL自動失效。

          在RabbitMQ中,有兩種方式來實現(xiàn)延時隊列:一種是基于隊列方式,另外一種是基于消息方式。

          二、示例

          2.1、發(fā)送端(生產(chǎn)端)

          新建一個控制臺項目Send,并添加一個類RabbitMQConfig。

          class RabbitMQConfig
          {
              public static string Host { get; set; }
      
              public static string VirtualHost { get; set; }
      
              public static string UserName { get; set; }
      
              public static string Password { get; set; }
      
              public static int Port { get; set; }
      
              static RabbitMQConfig()
              {
                  Host = "192.168.2.242";
                  VirtualHost = "/";
                  UserName = "hello";
                  Password = "world";
                  Port = 5672;
              }
          }
      RabbitMQConfig.cs
          class Program
          {
              static void Main(string[] args)
              {
                  Console.WriteLine("C# RabbitMQ實現(xiàn)延遲隊列有以下兩種方式:");
                  Console.WriteLine("1、基于隊列方式實現(xiàn)延遲隊列,請按1開始生產(chǎn)。");
                  Console.WriteLine("2、基于消息方式實現(xiàn)延遲隊列,請按2開始生產(chǎn)。");
      
                  string chooseChar = Console.ReadLine();
                  if (chooseChar == "1")
                  {
                      DelayMessagePublishByQueueExpires();
                  }
                  else if (chooseChar == "2")
                  {
                      DelayMessagePublishByMessageTTL();
                  }
                  Console.ReadLine();
              }
      
              /// <summary>
              /// 基于隊列方式實現(xiàn)延遲隊列
              /// 將隊列中所有消息的TTL(Time To Live,即過期時間)設(shè)置為一樣
              /// </summary>
              private static void DelayMessagePublishByQueueExpires()
              {
                  const string MessagePrefix = "message_";
                  const int PublishMessageCount = 6;
                  const int QuequeExpirySeconds = 1000 * 30;
                  const int MessageExpirySeconds = 1000 * 10;
      
                  var factory = new ConnectionFactory()
                  {
                      HostName = RabbitMQConfig.Host,
                      Port = RabbitMQConfig.Port,
                      VirtualHost = RabbitMQConfig.VirtualHost,
                      UserName = RabbitMQConfig.UserName,
                      Password = RabbitMQConfig.Password,
                      Protocol = Protocols.DefaultProtocol
                  };
      
                  using (var connection = factory.CreateConnection())
                  {
                      using (var channel = connection.CreateModel())
                      {
                          //當(dāng)同時指定了queue和message的TTL值,則兩者中較小的那個才會起作用。
                          Dictionary<string, object> dict = new Dictionary<string, object>
                          {
                              { "x-expires", QuequeExpirySeconds },//隊列過期時間
                              { "x-message-ttl", MessageExpirySeconds },//消息過期時間
                              { "x-dead-letter-exchange", "dead exchange 1" },//過期消息轉(zhuǎn)向路由
                              { "x-dead-letter-routing-key", "dead routing key 1" }//過期消息轉(zhuǎn)向路由的routing key
                          };
      
                          //聲明隊列
                          channel.QueueDeclare(queue: "delay1", durable: true, exclusive: false, autoDelete: false, arguments: dict);
      
      
                          //向該消息隊列發(fā)送消息message
                          for (int i = 0; i < PublishMessageCount; i++)
                          {
                              var message = MessagePrefix + i.ToString();
                              var body = Encoding.UTF8.GetBytes(message);
                              channel.BasicPublish(exchange: "", routingKey: "delay1", basicProperties: null, body: body);
                              Thread.Sleep(1000 * 2);
                              Console.WriteLine($"{DateTime.Now.ToString()} Send {message} MessageExpirySeconds {MessageExpirySeconds / 1000}");
                          }
                      }
                  }
              }
      
              /// <summary>
              /// 基于消息方式實現(xiàn)延遲隊列
              /// 對隊列中消息進(jìn)行單獨設(shè)置,每條消息的TTL可以不同。
              /// </summary>
              private static void DelayMessagePublishByMessageTTL()
              {
                  const string MessagePrefix = "message_";
                  const int PublishMessageCount = 6;
                  int MessageExpirySeconds = 0;
      
                  var factory = new ConnectionFactory()
                  {
                      HostName = RabbitMQConfig.Host,
                      Port = RabbitMQConfig.Port,
                      VirtualHost = RabbitMQConfig.VirtualHost,
                      UserName = RabbitMQConfig.UserName,
                      Password = RabbitMQConfig.Password,
                      Protocol = Protocols.DefaultProtocol
                  };
      
                  using (var connection = factory.CreateConnection())
                  {
                      using (var channel = connection.CreateModel())
                      {
                          Dictionary<string, object> dict = new Dictionary<string, object>
                          {
                              { "x-dead-letter-exchange", "dead exchange 2" },//過期消息轉(zhuǎn)向路由
                              { "x-dead-letter-routing-key", "dead routing key 2" }//過期消息轉(zhuǎn)向路由的routing key
                          };
      
                          //聲明隊列
                          channel.QueueDeclare(queue: "delay2", durable: true, exclusive: false, autoDelete: false, arguments: dict);
      
                          //向該消息隊列發(fā)送消息message
                          Random random = new Random();
                          for (int i = 0; i < PublishMessageCount; i++)
                          {
                              MessageExpirySeconds = i * 1000;
                              var properties = channel.CreateBasicProperties();
                              properties.Expiration = MessageExpirySeconds.ToString();
                              var message = MessagePrefix + i.ToString();
                              var body = Encoding.UTF8.GetBytes(message);
                              channel.BasicPublish(exchange: "", routingKey: "delay2", basicProperties: properties, body: body);
                              Console.WriteLine($"{DateTime.Now.ToString()} Send {message} MessageExpirySeconds {MessageExpirySeconds / 1000}");
                          }
                      }
                  }
              }
          }
      Program.cs

          2.2、接收端(消費端)

          新建一個控制臺項目Receive,按住Alt鍵,將發(fā)送端RabbitMQConfig類拖一個快捷方式到Receive項目中。

          class Program
          {
              static void Main(string[] args)
              {
                  Console.WriteLine("C# RabbitMQ實現(xiàn)延遲隊列有以下兩種方式:");
                  Console.WriteLine("1、基于隊列方式實現(xiàn)延遲隊列,請按1開始消費。");
                  Console.WriteLine("2、基于消息方式實現(xiàn)延遲隊列,請按2開始消費。");
      
                  string chooseChar = Console.ReadLine();
                  if (chooseChar == "1")
                  {
                      DelayMessageConsumeByQueueExpires();
                  }
                  else if (chooseChar == "2")
                  {
                      DelayMessageConsumeByMessageTTL();
                  }
                  Console.ReadLine();
              }
      
              public static void DelayMessageConsumeByQueueExpires()
              {
                  var factory = new ConnectionFactory()
                  {
                      HostName = RabbitMQConfig.Host,
                      Port = RabbitMQConfig.Port,
                      VirtualHost = RabbitMQConfig.VirtualHost,
                      UserName = RabbitMQConfig.UserName,
                      Password = RabbitMQConfig.Password,
                      Protocol = Protocols.DefaultProtocol
                  };
      
                  using (var connection = factory.CreateConnection())
                  {
                      using (var channel = connection.CreateModel())
                      {
                          channel.ExchangeDeclare(exchange: "dead exchange 1", type: "direct");
                          string name = channel.QueueDeclare().QueueName;
                          channel.QueueBind(queue: name, exchange: "dead exchange 1", routingKey: "dead routing key 1");
      
                          var consumer = new EventingBasicConsumer(channel);
                          consumer.Received += (model, ea) =>
                          {
                              var message = Encoding.UTF8.GetString(ea.Body);
                              Console.WriteLine($"{DateTime.Now.ToString()} Received {message}");
                          };
                          channel.BasicConsume(queue: name, noAck: true, consumer: consumer);
                          Console.ReadKey();
                      }
                  }
              }
      
              public static void DelayMessageConsumeByMessageTTL()
              {
                  var factory = new ConnectionFactory()
                  {
                      HostName = RabbitMQConfig.Host,
                      Port = RabbitMQConfig.Port,
                      VirtualHost = RabbitMQConfig.VirtualHost,
                      UserName = RabbitMQConfig.UserName,
                      Password = RabbitMQConfig.Password,
                      Protocol = Protocols.DefaultProtocol
                  };
      
                  using (var connection = factory.CreateConnection())
                  {
                      using (var channel = connection.CreateModel())
                      {
                          channel.ExchangeDeclare(exchange: "dead exchange 2", type: "direct");
                          string name = channel.QueueDeclare().QueueName;
                          channel.QueueBind(queue: name, exchange: "dead exchange 2", routingKey: "dead routing key 2");
      
                          var consumer = new EventingBasicConsumer(channel);
                          consumer.Received += (model, ea) =>
                          {
                              var message = Encoding.UTF8.GetString(ea.Body);
                              Console.WriteLine($"{DateTime.Now.ToString()} Received {message}");
                          };
                          channel.BasicConsume(queue: name, noAck: true, consumer: consumer);
                          Console.ReadKey();
                      }
                  }
              }
          }
      Program.cs

          2.3、運行結(jié)果

      -----------------------------------------------------------------------------------------------------------

        本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
        轉(zhuǎn)藏 分享 獻(xiàn)花(0

        0條評論

        發(fā)表

        請遵守用戶 評論公約

        類似文章 更多