RabbitMQ 官方NET教程(二)【工作队列】
cac55 2024-11-15 16:38 20 浏览 0 评论
这篇中我们将会创建一个工作队列用来在工作者(consumer)间分发耗时任务。
工作队列的主要任务是:避免立刻执行资源密集型任务和避免必须等待其完成。相反地,我们进行任务调度:我们把任务封装为消息发送给队列。工作进行在后台运行并不断的从队列中取出任务然后执行。当你运行了多个工作进程时,任务队列中的任务将会被这些工作进程共享执行。
这样的概念在web应用中极其有用,当在很短的HTTP请求间需要执行复杂的任务。
准备
在本教程的前面部分,我们发送了一个包含Hello World!的消息。 现在我们将发送代替复杂任务的字符串。 我们没有一个现实世界的任务,比如图像被调整大小,或者是要渲染的pdf文件,所以假设我们很忙 - 通过使用Thread.sleep()函数来假冒它。 我们将把字符串中的点数作为其复杂度; 每个点都将占“work”的一秒钟。 例如,由Hello...描述的假任务将需要三秒钟。
我们将稍后从之前的例子中修改Send程序,以允许从命令行发送任意消息。 这个程序会将任务安排到我们的工作队列中,所以让我们命名为NewTask:
dotnet new console --name NewTask
mv NewTask/Program.cs NewTask/NewTask.cs
dotnet new console --name Worker
mv Worker/Program.cs Worker/Worker.cs
cd NewTask
dotnet add package RabbitMQ.Client
dotnet restore
cd ../Worker
dotnet add package RabbitMQ.Client
dotnet restore
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body);
有些帮助从命令行参数获取消息:
private static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}
我们的旧的Receive.cs脚本还需要一些更改:它需要为消息体中的每个点伪造一秒的工作时间。 它将处理RabbitMQ发送的消息并执行任务,因此我们将其复制到Worker项目并修改:
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" [x] Done");
};
channel.BasicConsume(queue: "task_queue", noAck: true, consumer: consumer);
我们假任务到模拟执行时间:
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
循环调度
使用任务队列的优点之一是能够轻松地并行工作。 如果我们正在建立积压的工作,我们可以增加更多的工作者,这样可以轻松扩展。
首先,我们同时尝试运行两个Worker实例。 他们都会从队列中获取消息,但是究竟如何? 让我们来看看。
你需要三个控制台打开。 两个将运行Worker程序。 这些控制台将是我们两个消费者 - C1和C2。
# shell 1
cd Worker
dotnet run
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
cd Worker
dotnet run
# => [*] Waiting for messages. To exit press CTRL+C
在第三个我们将发布新的任务。 一旦您已经开始使用消费者,您可以发布一些消息:
# shell 3
cd NewTask
dotnet run "First message."
dotnet run "Second message.."
dotnet run "Third message..."
dotnet run "Fourth message...."
dotnet run "Fifth message....."
让我们看看送给我们workers的内容:
# shell 1
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'
默认情况下,RabbitMQ将按顺序将每条消息发送给下一个消费者。 平均每个消费者将获得相同数量的消息。 这种分发消息的方式叫做循环(round-robin)。 与三名或更多的workers一起尝试。
消息应答(message acknowledgments)
执行一个任务需要花费几秒钟。你可能会担心当一个消费者在执行任务时发生中断。使用我们当前的代码,一旦RabbitMQ向客户发送消息,它立即将其从内存中删除。在这种情况下,如果杀死正在执行任务的某个工作者,我们会丢失它正在处理的信息。我们也会丢失已经转发给这个工作者且它还未执行的消息。
但是我们不想失去任何任务。如果一个worker挂了,我们希望把这个任务交给另一个工作者。
为了确保消息永远不会丢失,RabbitMQ支持消息确认。从消费者发送一个确认信息告诉RabbitMQ已经收到,处理了特定的消息,然后RabbitMQ可以自由删除它。
如果消费者死机(其通道关闭,连接关闭或TCP连接丢失),而不发送确认信息,RabbitMQ将会明白消息未被完全处理并重新排队。如果同时有其他消费者在线,则会迅速将其重新提供给另一个消费者。这样就可以确保没有消息丢失,即使工作者偶然死亡。
没有任何消息超时; RabbitMQ将在消费者挂了时重新发送消息。如果消费者处理一个信息需要耗费特别特别长的时间是允许的。
消息确认默认情况下打开。 在前面的例子中,我们通过将noAck(“no manual acks”)参数设置为true来明确地将其关闭。 一旦完成任务,现在该删除这个标志并发送正确的确认。
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" [x] Done");
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);
使用这个代码,我们可以确定即使在处理消息时,使用CTRL + C杀死一个工作者,也不会丢失任何东西。工作者挂了之后不久,所有未确认的消息将被重新发送。
忘记确认
丢失BasicAck是一个常见的错误。 这是一个容易的错误,但后果是严重的。
当您的客户端退出(可能看起来像随机重新传递)时,消息将被重新传递,但是RabbitMQ将会消耗越来越多的内存,因为它将无法释放任何未包含的消息。
为了调试这种错误,您可以使用rabbitmqctl打印messages_unacknowledged字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在Windows上:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
消息持久化(Message durability)
我们已经学会了如何确保即使消费者死亡,任务也不会丢失。 但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。
当RabbitMQ退出或崩溃时,它会忘记队列和消息,除非你告诉它不要丢失。需要两件事来确保消息不会丢失:我们需要将所有队列和消息标记为持久化。
首先,我们需要确保RabbitMQ不会丢失我们的队列。 为了这样做,我们需要将其声明为持久的:
channel.QueueDeclare(queue: "hello",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
虽然这个命令本身是正确的,但是在我们目前的设置中是不行的。 这是因为我们已经定义了一个非持久化的名为hello的队列。 RabbitMQ不允许您重新定义具有不同参数的现有队列,并会向尝试执行此操作的任何程序返回错误。 但是有一个快速的解决方法 - 让我们用不同的名称声明一个队列,例如task_queue:
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
这个queueDeclare更改需要应用于生产者和消费者代码。
在这一点上,我们确信,即使RabbitMQ重新启动,task_queue队列也不会丢失。 现在我们需要将我们的消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true。
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
注意消息持久性
将消息标记为持久性不能完全保证消息不会丢失。 虽然它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接受消息并且还没有保存时,仍然有一个很短的时间窗口。 此外,RabbitMQ不会对每个消息执行`fsync`(同步内存中所有已修改的文件数据到储存设备) - 它可能只是保存到缓存中,而不是真正写入磁盘。 持久性保证不强,但对我们的简单任务队列来说已经足够了。 如果您需要更强大的保证,那么您可以使用[publisher confirms](https://www.rabbitmq.com/confirms.html)。
公平转发(Fair dispatch)
或许会发现,目前的消息转发机制(Round-robin)并非是我们想要的。例如,这样一种情况,对于两个消费者,有一系列的任务,奇数任务特别耗时,而偶数任务却很轻松,这样造成一个消费者一直繁忙,另一个消费者却很快执行完任务后等待。
造成这样的原因是因为RabbitMQ仅仅是当消息到达队列进行转发消息。并不在乎有多少任务消费者并未传递一个应答给RabbitMQ。仅仅盲目转发所有的奇数给一个消费者,偶数给另一个消费者。
channel.BasicQos(0, 1, false);
注意队列大小
如果所有的工作者都处于繁忙状态,你的队列有可能被填充满。你可能会观察队列的使用情况,然后增加工作者,或者使用别的什么策略。
完整的代码
NewTask.cs 类:
using System;
using RabbitMQ.Client;
using System.Text;
class NewTask
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
private static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}
}
Worker.cs类:
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;
class Worker
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" [x] Done");
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue",
noAck: false,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
相关推荐
- 正点原子开拓者FPGA开发板资料连载第四十章 SD卡图片显示实验
-
1)实验平台:正点原子开拓者FPGA开发板2)摘自《开拓者FPGA开发指南》关注官方微信号公众号,获取更多资料:正点原子3)全套实验源码+手册+视频下载地址:http://www.openedv.c...
- 东芝存储改名为铠侠了,铠侠microSD卡128GB全网首测
-
作为一个数码爱好者,平时总爱把玩各种科技数码产品,最近又迷上了口袋云台相机,大疆OsmoPocket、飞宇口袋相机、SnoppaVmate口袋相机什么的,不过这类产品由于设计的机身体积很小(毕竟为...
- SD存储卡卡面上奇奇怪怪的图标,你知道几个?
-
现在对高像素照片、连拍、4K甚至8K的需求越来越多,对存储卡的传输速度、容量等,要求也越来越多了。但是,看到SD存储卡卡面上奇奇怪怪的图标,让人非常迷惑。这篇文章让你简单认识这些图标和奇奇怪怪的数字。...
- 拍摄4K视频上选!铠侠 EXCERIA PLUS microSD卡
-
大家好,我是波导终结者。今天跟大家分享的是铠侠的EXCERIAPLUS极至光速microSDXCUHS-1存储卡,名字有点长,但是不用担心,我会帮大家梳理好存储卡的选购建议。有不少刚入门的朋友...
- 高速稳定,一卡多用:铠侠极至光速microSD存储卡评测
-
Hello,大家好,我是小胖子。半个月前收到了KIOXIA铠侠寄来的一张256GB的TF卡,用了大半个月,让我们看看这款产品表现如何吧。其实很多人并不太了解铠侠,问我铠侠是什么品牌,好不好。其实,东芝...
- 读速205MB/s、V30规格,雷克沙SILVER系列存储卡再添新成员
-
IT之家6月19日消息,雷克沙今日推出3款SILVER系列SD/microSD存储卡新品,支持4K60fps录像。据介绍,该系列存储卡均符合V30标准,其中micr...
- 相机、无人机拍视频,选择SD存储卡有什么需要知道的?
-
本文章不涉及产品推荐导购行为,致力于给到小白带来基础知识。相机一般使用SD卡,无人机一般使用microSD卡(也叫TF卡),使用的标准和图标标识是一样的。相机、无人机拍视频,选择SD存储卡有什么需要知...
- PNY推出适用Switch 2的microSD Express卡,读取速度高达890MB/s
-
任天堂Switch2开始预订,其比前代产品变得更加昂贵,各种配件的价格都高于预期,这也包括转向microSDExpress存储。此时,PNY推出了新款microSDExpress闪存卡。新款mi...
- SD卡迎来25周年:全球售出120亿张,容量翻50万倍
-
IT之家5月21日消息,科技媒体betanews今天(5月21日)发布博文,报道称SD卡迎来了25周年的生日。自2000年首款SD存储卡问世以来,已走过25个年头...
- 微单相机买一款什么样的SD卡才够用?写入速度更为关键
-
最近,评价君朋友发现自己的卡拍摄视频时候总断流,于是感觉写入速度应该是不够的,打算换卡,评价君正好跟他说道说道。目前的SD存储卡,很多只标注读取速度,比如95MB/s,80MB/s等等,而没有写写入速...
- 金士顿Canvas Go!Plus 系列存储卡评测
-
前言2020年,金士顿推出了CanvasGo!Plus系列存储卡,凭借其优秀的读写速度和稳定性获得了广大用户的认可。时隔5年,金士顿推出了其全新升级产品:SDG4/SDCG4,可选容量覆盖64GB...
- TF卡速度等级|MK米客方德(tf卡速度等级图)
-
TF卡(TransFlash卡,又称MicroSD卡)是一种常见的便携式存储媒体,广泛用于智能手机、相机、平板电脑等设备中。TF卡的性能通常由速度等级来衡量,这些等级反映了TF卡的数据传输速度。拓优星...
- 关于SD卡,看这张表就够了(sd卡的作用)
-
这里是溢图科技(原“相机笔记”)。这两天有不少存储产品促销,随之而来的就是关于SD卡的一些提问。文章以前已经写过很多了,这里主要给大家看一张表格:上面就是SD卡协会官方制作的“族谱”,明确给出了不同版...
- 轻量化储存的首选——凯侠极致光速256G microSD存储卡实测
-
对于摄影师而言,我们经常会接触到相关存储设备,像照片拍摄中给相机安装的SD卡,视频录制中外录高规格画面的SSD等,都属于专业的存储介质,被应用于商业拍摄、电影级别拍摄之中。而针对生活中我们日常用于拍摄...
- 首发1569元,读取速度可达250MB/s,闪迪推出最新2TB至尊超极速存储卡
-
近日,闪迪(SanDisk)正式发布了其最新的2TB至尊超极速microSDXCUHS-I存储卡。据悉,这款存储卡的读取速度可达250MB/s,写入速度则达到150MB/s。这意味着用户在处理高分辨...
你 发表评论:
欢迎- 一周热门
- 最近发表
-
- 正点原子开拓者FPGA开发板资料连载第四十章 SD卡图片显示实验
- 东芝存储改名为铠侠了,铠侠microSD卡128GB全网首测
- SD存储卡卡面上奇奇怪怪的图标,你知道几个?
- 拍摄4K视频上选!铠侠 EXCERIA PLUS microSD卡
- 高速稳定,一卡多用:铠侠极至光速microSD存储卡评测
- 读速205MB/s、V30规格,雷克沙SILVER系列存储卡再添新成员
- 相机、无人机拍视频,选择SD存储卡有什么需要知道的?
- PNY推出适用Switch 2的microSD Express卡,读取速度高达890MB/s
- SD卡迎来25周年:全球售出120亿张,容量翻50万倍
- 微单相机买一款什么样的SD卡才够用?写入速度更为关键
- 标签列表
-
- 如何绘制折线图 (52)
- javaabstract (48)
- 新浪微博头像 (53)
- grub4dos (66)
- s扫描器 (51)
- httpfile dll (48)
- ps实例教程 (55)
- taskmgr (51)
- s spline (61)
- vnc远程控制 (47)
- 数据丢失 (47)
- wbem (57)
- flac文件 (72)
- 网页制作基础教程 (53)
- 镜像文件刻录 (61)
- ug5 0软件免费下载 (78)
- debian下载 (53)
- ubuntu10 04 (60)
- web qq登录 (59)
- 笔记本变成无线路由 (52)
- flash player 11 4 (50)
- 右键菜单清理 (78)
- cuteftp 注册码 (57)
- ospf协议 (53)
- ms17 010 下载 (60)