百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

Maomi.MQ 2.0 | 功能强大的 .NET 消息队列通讯模型框架

cac55 2025-03-24 14:18 26 浏览 0 评论

说明

作者:痴者工良

文档地址:https://mmq.whuanle.cn

仓库地址:https://github.com/whuanle/Maomi.MQ

作者博客:

  • o https://www.whuanle.cn
  • o https://www.cnblogs.com/whuanle

导读

Maomi.MQ 是一个简化了消息队列使用方式的通讯框架,目前支持了 RabbitMQ。

Maomi.MQ.RabbitMQ 是一个用于专为 RabbitMQ 设计的发布者和消费者通讯模型,大大简化了发布和消息的代码,并提供一系列简便和实用的功能,开发者可以通过框架提供的消费模型实现高性能消费、事件编排,框架还支持发布者确认机制、自定义重试机制、补偿机制、死信队列、延迟队列、连接通道复用等一系列的便利功能。开发者可以把更多的精力放到业务逻辑中,通过 Maomi.MQ.RabbitMQ 框架简化跨进程消息通讯模式,使得跨进程消息传递更加简单和可靠。

此外,框架通过 runtime 内置的 api 支持了分布式可观测性,可以通过进一步使用 OpenTelemetry 等框架进一步收集可观测性信息,推送到基础设施平台中。

快速开始

在本篇教程中,将介绍 Maomi.MQ.RabbitMQ 的使用方法,以便读者能够快速了解该框架的使用方式和特点。


创建一个 Web 项目(可参考 WebDemo 项目),引入 Maomi.MQ.RabbitMQ 包,在 Web 配置中注入服务:


// using Maomi.MQ;
// using RabbitMQ.Client;

builder.Services.AddMaomiMQ((MqOptionsBuilder options)=>
{
options.WorkId=1;
options.AppName="myapp";
options.Rabbit=(ConnectionFactory options)=>
{
options.HostName=Environment.GetEnvironmentVariable("RABBITMQ")!;
options.Port=5672;
options.ClientProvidedName=Assembly.GetExecutingAssembly().GetName().Name;
};
},[typeof(Program).Assembly]);

var app = builder.Build();

  • o WorkId: 指定用于生成分布式雪花 id 的节点 id,默认为 0。
    每条消息生成一个唯一的 id,便于追踪。如果不设置雪花id,在分布式服务中,多实例并行工作时,可能会产生相同的 id。
  • o AppName:用于标识消息的生产者,以及在日志和链路追踪中标识消息的生产者或消费者。
  • o Rabbit:RabbitMQ 客户端配置,请参考 ConnectionFactory

定义消息模型类,模型类是 MQ 通讯的消息基础,该模型类将会被序列化为二进制内容传递到 RabbitMQ 服务器中。


public classTestEvent
{
publicintId{get;set;}

public override string ToString()
{
returnId.ToString();
}
}

定义消费者,消费者需要实现 IConsumer 接口,以及使用 [Consumer] 特性注解配置消费者属性,如下所示,[Consumer("test")] 表示该消费者订阅的队列名称是 test

IConsumer 接口有三个方法,ExecuteAsync 方法用于处理消息,FaildAsync 会在 ExecuteAsync 异常时立即执行,如果代码一直异常,最终会调用 FallbackAsync 方法,Maomi.MQ 框架会根据 ConsumerState 值确定是否将消息放回队列重新消费,或者做其它处理动作。


[Consumer("test")]
publicclassMyConsumer:IConsumer<TestEvent>
{
// 消费
publicasyncTaskExecuteAsync(MessageHeader messageHeader,TestEvent message)
{
Console.WriteLine($"事件 id: {message.Id} {DateTime.Now}");
awaitTask.CompletedTask;
}

// 每次消费失败时执行
publicTaskFaildAsync(MessageHeader messageHeader,Exception ex,int retryCount,TestEvent message)
=>Task.CompletedTask;

// 补偿
publicTask<ConsumerState>FallbackAsync(MessageHeader messageHeader,TestEvent? message,Exception? ex)
=>Task.FromResult(ConsumerState.Ack);
}

Maomi.MQ 还具有多种消费者模式,代码写法不一样,后续会详细讲解不同的消费者模式。


如果要发布消息,只需要注入 IMessagePublisher 服务即可。


[ApiController]
[Route("[controller]")]
publicclassIndexController:ControllerBase
{
privatereadonlyIMessagePublisher _messagePublisher;

publicIndexController(IMessagePublisher messagePublisher)
{
_messagePublisher = messagePublisher;
}

[HttpGet("publish")]
publicasyncTask<string>Publisher()
{
// 发布消息
await _messagePublisher.PublishAsync(exchange:string.Empty, routingKey:"test", message:newTestEvent
{
Id=123
});
return"ok";
}
}

启动 Web 服务,在 swagger 页面上请求 API 接口,MyConsumer 服务会立即接收到发布的消息。


如果是控制台项目,则需要引入 Microsoft.Extensions.Hosting 包,以便让消费者在后台订阅队列消费消息。

参考 ConsoleDemo 项目。


using Maomi.MQ;
usingMicrosoft.Extensions.Hosting;
usingMicrosoft.Extensions.Logging;
usingRabbitMQ.Client;
usingSystem.Reflection;

var host =newHostBuilder()
.ConfigureLogging(options =>
{
options.AddConsole();
options.AddDebug();
})
.ConfigureServices(services =>
{
services.AddMaomiMQ(options =>
{
options.WorkId=1;
options.AppName="myapp";
options.Rabbit=(ConnectionFactory options)=>
{
options.HostName=Environment.GetEnvironmentVariable("RABBITMQ")!;
options.Port=5672;
options.ClientProvidedName=Assembly.GetExecutingAssembly().GetName().Name;
};
},newSystem.Reflection.Assembly[]{typeof(Program).Assembly});

}).Build();

// 后台运行
var task = host.RunAsync();

Console.ReadLine();

消息发布者

消息发布者用于推送消息到 RabbitMQ 服务器中,Maomi.MQ 支持多种消息发布者模式,支持 RabbitMQ 事务模式等,示例项目请参考 PublisherWeb

Maomi.MQ 通过 IMessagePublisher 向开发者提供消息推送服务。


在发布消息之前,需要定义一个事件模型类,用于传递消息。


public classTestEvent
{
publicintId{get;set;}

public override string ToString()
{
returnId.ToString();
}
}

然后注入 IMessagePublisher 服务,发布消息:


[ApiController]
[Route("[controller]")]
publicclassIndexController:ControllerBase
{
privatereadonlyIMessagePublisher _messagePublisher;

publicIndexController(IMessagePublisher messagePublisher)
{
_messagePublisher = messagePublisher;
}

[HttpGet("publish")]
publicasyncTask<string>Publisher()
{
for(var i =0; i <100; i++)
{
await _messagePublisher.PublishAsync(exchange:string.Empty, routingKey:"publish", message:newTestEvent
{
Id= i
});
}

return"ok";
}
}

一般情况下,一个模型类只应该被一个消费者所使用,那么通过事件可以找到唯一的消费者,也就是通过事件类型找到消费者的 IConsumerOptions,此时框架可以使用对应的配置发送消息。

TestMessageEvent 模型只有一个消费者:


[Consumer("publish", Qos = 1, RetryFaildRequeue = true)]
public class TestEventConsumer : IConsumer<TestMessageEvent>
{
// ... ...
}

可以直接发送事件,不需要填写交换器(Exchange)和路由键(RoutingKey)。


[HttpGet("publish_message")]
publicasyncTask<string>PublisherMessage()
{
// 如果在本项目中 TestMessageEvent 只指定了一个消费者,那么通过 TestMessageEvent 自动寻找对应的配置
for(var i =0; i <100; i++)
{
await _messagePublisher.PublishAsync(model:newTestMessageEvent
{
Id= i
});
}

return"ok";
}

IMessagePublisher

IMessagePublisher 是 Maomi.MQ 的基础消息发布接口,有以下方法:


// 消息发布者.
publicinterfaceIMessagePublisher
{
TaskPublishAsync<TMessage>(string exchange, // 交换器名称.
string routingKey,// 队列/路由键名称.
TMessage message, // 事件对象.
Action<BasicProperties> properties,
CancellationToken cancellationToken =default)
whereTMessage:class;

TaskPublishAsync<TMessage>(string exchange,
string routingKey,
TMessage message,
BasicProperties? properties =default,
CancellationToken cancellationToken =default);

TaskPublishAsync<TMessage>(TMessage message,
Action<BasicProperties>? properties =,
CancellationToken cancellationToken =default)
whereTMessage:class;

TaskPublishAsync<TMessage>(TMessage model,
BasicProperties? properties =default,
CancellationToken cancellationToken =default);

TaskCustomPublishAsync<TMessage>(string exchange,
string routingKey,
TMessage message,
BasicProperties? properties =default,
CancellationToken cancellationToken =default);
}

Maomi.MQ 的消息发布接口就这么几个,由于直接公开了 BasicProperties ,因此开发者完全自由配置 RabbitMQ 原生的消息属性,所以接口比较简单,开发者使用接口时可以灵活一些,使用难度也不大。


BasicProperties 是 RabbitMQ 中的消息基础属性对象,直接面向开发者,可以消息的发布和消费变得灵活和丰富功能,例如,可以通过 BasicProperties 配置单条消息的过期时间:


await _messagePublisher.PublishAsync(exchange:string.Empty, routingKey:"publish", message:newTestEvent
{
Id= i
},(BasicProperties p)=>
{
p.Expiration="1000";
});

Maomi.MQ 通过 DefaultMessagePublisher 类型实现了 IMessagePublisher,DefaultMessagePublisher 默认生命周期是 Scoped:


services.AddScoped<IMessagePublisher, DefaultMessagePublisher>();

开发者也可以自行实现 IMessagePublisher 接口,实现自己的消息发布模型,具体示例请参考 DefaultMessagePublisher 类型。

原生通道

开发者可以通过 ConnectionPool 服务获取原生连接对象,直接在 IConnection 上使用 RabbitMQ 的接口发布消息:


private readonly ConnectionPool _connectionPool;

var connectionObject = _connectionPool.Get();
connectionObject.DefaultChannel.BasicPublishAsync(... ...);

常驻内存连接对象

Maomi.MQ 通过 ConnectionPool 管理

相关推荐

用闲置电脑当软路由安装OpenWRT(小白教程)

话说软路由系统OpenWRT用起来真是香,里面的好多功能都是普通路由无法实现的,由于众所周知的原因,在这里就不细说,等安装完自己体验吧。今天就介绍用一台闲置的电脑(自带两个网口)充当软路由,安装Ope...

一招把废旧路由器改成交换机(用旧路由器做交换机)

家里面的路由器用个几年,就会WIFI变卡,新路由器买回来,旧路由器就没什么用了?我在这里教大家把老路由器变成交换机。近两年新出的路由器,基本都是2个LAN口,接网络设备还需要买交换机,淘汰下来的路由器...

如何将PC电脑变成web服务器:将内网主机映射到外网实现远程访问

我是艾西,今天跟大家分享内容还是比较多人问的一个问题:如何将PC电脑变成web服务器。内网主机作为web服务器,内容包括本地内网映射、多层内网映射解决方案、绕过电信80端口封锁、DDNS功能的实现(非...

电脑怎么改Wi-Fi密码(电脑怎么改wifi密码视频教程)

一.电脑打开“任意浏览器ie/google浏览器等”——>地址栏里输入管理ip地址然后按“回车键”打开该地址,如下图所示。二.输入正确的管理员密码——>点击“登录”即可(下图是PC版本的路...

旧路由器不要扔,可当电脑无线网卡使用,你还不知道吧!

家里有旧路由器,卖二手又不值钱,扔了又可惜。想不到路由器还有以下这些功能:扩大Wifi覆盖范围;充当电脑无线网卡;把这个技巧学起来,提升网络冲浪的幸福感!导航栏路由器恢复出厂设置(通用教程)有线桥接无...

硬件大师AIDA64 5.60.3716更新下载:“认准”Win10

著名硬件测试工具AIDA64更新至5.60.3716Beta版,本次更新修复了Win10Build版本号检测错误问题,识别更准确。另外还添加了对ITEIT8738F传感器、ASRock主板、NVI...

互联网病毒木马与盗版软件流量产业链(一)

A.相关地下产业链整体深度分析可能很多用户都有这样的经历,就是不管打开什么网站,甚至根本就没有打开浏览器,都会跳出来一堆的弹窗广告。那么,这个用户要么是中的病毒木马,或者是使用了盗版软件。不管是...

穿越火线tenparty.dat文件损坏怎么办?

很多玩家在玩火线的时候经常会因弹出错误代码,而被退出游戏。下面就教大家一些常见错误代码的解决方案。方法/步骤1SX提示码提示说明:您的电脑出现1,xxx,0(xxx代表任意数字)提示码,存在游...

办公小技巧015:如何关闭Windows Defender安全中心

WindowsDefenderWindowsDefender是Widows中自带杀毒软件,可以检测及清除潜藏在操作系统里的间谍软件及广告软件。为电脑提供最高强度的安全防护,也被誉为Windows的...

Win7/8.1/10团灭:微软发现严重漏洞

据外媒报道称,微软已经停止为Windows7发布新的安全更新了,理由是IE存在严重漏洞。存在严重漏洞的IE按照微软的说法,这个远程代码执行漏洞存在于IE浏览器处理脚本引擎对象的内存中。该漏洞可能以一...

WinCC flexible 2008 SP4 的安装步骤及系统要求

1、软件安装过程安装注意事项(必须严格遵守):软件仅支持以下操作系统(必须是微软原版的操作系统,Ghost版系统不支持,如番茄花园、雨林木风、电脑城装机版等):WinCCflexible2008...

Windows三方杀毒防护软件可能问题以及使用建议

在处理ECSWindows相关案例中,我们遇到很多奇怪的操作系统问题,例如软件安装失败,无法激活操作系统,无法访问本地磁盘,网络访问受到影响,系统蓝屏,系统Hang等,排查发现这与客户安装的各类杀...

杀毒软件被指泄露个人隐私(杀毒软件查出来一定是毒吗)

最近的多篇报道显示,你使用的杀毒软件在监视着你,而不仅仅是你计算机上的文件。2014年的一项研究使用虚拟机监视了杀毒软件产品向企业发送了什么信息。他们发现,所有测试的杀毒软件都给电脑分配了一个唯一的识...

开源杀毒软件ClamAV在推出约20年后终于到达1.0版本

ClamAV是一个开源的反病毒引擎,用于检测木马、病毒、恶意软件和其他恶意威胁。与商业Windows反恶意软件程序相比,它的检测水平相当低,但开发工作已经持续了几十年。该工具可用于所有平台,尽管它主要...

【Excel函数使用】时分秒时间怎么转换成秒?(二)

本节主要分享的函数是IFERROR和NUMBERVALUE上回我们用MID和FIND函数已经将数值提取出来,但是一些错误的返回值显示“#VALUE!”,此时我们需要检验错误返回值,并将错误值返回指定值...

取消回复欢迎 发表评论: