使用Redis作消息队列

基于内存的单线程数据库,使Redis的线程安全性与性能极高。而Redis的双向链表数据类型(List)天生就可做为消息队列存储消息.redis

在这里就不说消息队列的等等一些优势。可是补充一下Redis的List类型的几个命令,你能够指定将一个元素投送到列表的头部(左边)或者尾部(右边),固然也能够指定从列表的头部或尾部取出数据.数据库

LPush:添加元素至列表的头部安全

 

 RPush:添加元素至列表的尾部性能

 

LPop:移除并获取列表的头部的第一个元素spa

 RPop:移除并获取列表的尾部的第一个元素线程

 

BLpop:移出并获取列表头部的第一个元素, 若是列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。命令格式:blpop key timeout,当timeout=0时,表示一直阻塞等待,直到有其余客户端执行rpush或者lpush命令,插入数据后,阻塞才解除.3d

BRpop:与BLpop相同,不一样的是它是移除列表尾部的第一个元素.code

以下,开启两个客户端,一个客户端先使用BLpop阻塞读取数据,另外一个客户端写入数据.blog

OK,到此我想你已经明白了,List的做用已经显而易见。生产者投入消息,消费者拿到消息。并且双向链表的数据类型,投入和拿取数据都特别灵活。是否是感受很不错?接着往下看😄队列

下面在代码中实现消息队列的数据投递与拾取.

引入NuGet包:StackExchange.Redis

 生产者:

static void Main(string[] args)
{
    var redis = new RedisContent();
    for (int i = 0; i < 10; i++)
    {
        redis.db.ListRightPush("datalist", $"data{i}");//向列表的尾部投递消息
        Console.WriteLine($"{DateTime.Now} 已投递消息data{i}!");
        Thread.Sleep(3000);
    }
    Console.ReadKey();
}

消费者:

static void Main(string[] args)
{
    var redis = new RedisContent();
    while (true)
    {
        string result = redis.db.ListRightPop("datalist");//从列表的尾部拾取消息

        if (string.IsNullOrEmpty(result)) { }
        else
        {
            Console.WriteLine($"{DateTime.Now} 已接收消息,Message={result.ToString()}");
            Thread.Sleep(1000);
        }
    }
}

为了让你们看到效果,我故意让线程等待了几秒钟.

先进先出和先进后出的实现方式都比较灵活,若是要想实现先进先出的规则的话,要将上面的消费者代码改成redis.db.ListLeftPop("datalist")=>从头部开始读取消息

可是上面的代码有一个很大的弊端,虽然消息已经消费完了,可是仍然在不停的lpop,因此形成很大的浪费.就算是这里使用了Sleep,必定程度上减小了CPU的占用率,可是消息处理的时效性就削弱了.

不用担忧,对此确定有解决的方法😀,咱们上面提到了Redis有两个阻塞命令BRpop与BLpop,这两个命令能够解决上述问题.有消息的话它就会帮你拿出来,并且不用while(true)的方式也会减小CPU的开销.由于列表没有消息的话,它就会一直阻塞,能够理解为保持了一个长链接(就至关于你问你女友为何生气,而后她就说由于什么什么...,晚上你问她想吃点什么,而后她说想吃点什么什么...,你每次都要去问她,时间久了她就以为很烦,会以为你不懂她。因此你就住进她内心面,她内心面想什么你就能第一时间知道,用这个作比喻我相信大家都能懂😂)。

可是StackExchange.Redis并无提供BLpop与BRpop的API,咱们可使用使用pub/sub的方式.代码以下:

生产者:

static void Main(string[] args)
{
    var redis = new RedisContent();
    var sub = RedisContent.redis.GetSubscriber();

    for (int i = 0; i < 10; i++)
    {
        sub.PublishAsync("datalist", $"data{i}").GetAwaiter();
        Console.WriteLine($"{DateTime.Now} 已投递消息data{i}!");
        Thread.Sleep(3000);
    }
    Console.ReadKey();
}

消费者:

static void Main(string[] args)
{
    var redis = new RedisContent();
    var sub = RedisContent.redis.GetSubscriber();
    sub.Subscribe("datalist", (channel, message) =>
    {
        Console.WriteLine($"{DateTime.Now} 已接收消息,Message={message}");
        Thread.Sleep(1000);
    });
    Console.WriteLine("消费者0已启动成功!");
    Console.ReadKey();
}

分别启动两个消费者客户端

这种为广播模式,每个订阅者都会收到消息。可是该消息不保证是否被接收,生产者投递完消息若是没有消费者接收的话,消息会丢失.

还有一种方式消息不会丢失,将消息存在列表里面。首先生产者向列表投入数据,紧接着去通知订阅者,让订阅者从列表中取出数据。可是有一个弊端,若是有多个消费者订阅时,只有一个消费者能取到数据。代码以下:

生产者:

var redis = new RedisContent();
var sub = RedisContent.redis.GetSubscriber();

for (int i = 0; i < 10; i++)
{
    redis.db.ListLeftPush("datalist", $"data{i}", flags: CommandFlags.FireAndForget);
    sub.PublishAsync("channel1", "").GetAwaiter();
    Console.WriteLine($"{DateTime.Now} 已投递消息data{i}!");
    Thread.Sleep(3000);
}
Console.ReadKey();

消费者:

static void Main(string[] args)
{
    var redis = new RedisContent();
    var sub = RedisContent.redis.GetSubscriber();
//若是消费者后启动,或者宕机重启,要先查询列表中是否有数据,若是有数据要消费掉
var len = redis.db.ListRange("datalist").Length; if (len > 0) { Task.Run(() => { for (int i = 0; i < len; i++) { string result = redis.db.ListRightPop("datalist"); //业务操做... } }); } sub.Subscribe("channel1", (channel, message) => { string result = redis.db.ListRightPop("datalist"); Console.WriteLine($"{DateTime.Now} 已接收消息,Message={result}"); Thread.Sleep(1000); }); Console.WriteLine("消费者0已启动成功!"); Console.ReadKey();

若有不足,请见谅!今天是十月二号,昨天是中秋佳节又是国庆,在这里祝你们双节快乐。原本昨天晚上写完这篇的,可是八九点的时候太困了,就😴... ...

昨晚坐在窗边,偶然间向外面瞄了一眼,月亮的光芒太耀眼了,也太漂亮了,赶忙拍了一张🌕