百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

Maomi.MQ 2.0 | 功能强大的 .NET 消息队列通讯模型框架

cac55 2025-03-24 14:18 29 浏览 0 评论

说明

作者:痴者工良

文档地址:https://mmq.whuanle.cn

仓库地址:https://github.com/whuanle/Maomi.MQ

作者博客:

  • o https://www.whuanle.cn
  • o https://www.cnblogs.com/whuanle

导读

Maomi.MQ 是一个简化了消息队列使用方式的通讯框架,目前支持了 RabbitMQ。

Maomi.MQ.RabbitMQ 是一个用于专为 RabbitMQ 设计的发布者和消费者通讯模型,大大简化了发布和消息的代码,并提供一系列简便和实用的功能,开发者可以通过框架提供的消费模型实现高性能消费、事件编排,框架还支持发布者确认机制、自定义重试机制、补偿机制、死信队列、延迟队列、连接通道复用等一系列的便利功能。开发者可以把更多的精力放到业务逻辑中,通过 Maomi.MQ.RabbitMQ 框架简化跨进程消息通讯模式,使得跨进程消息传递更加简单和可靠。

此外,框架通过 runtime 内置的 api 支持了分布式可观测性,可以通过进一步使用 OpenTelemetry 等框架进一步收集可观测性信息,推送到基础设施平台中。

快速开始

在本篇教程中,将介绍 Maomi.MQ.RabbitMQ 的使用方法,以便读者能够快速了解该框架的使用方式和特点。


创建一个 Web 项目(可参考 WebDemo 项目),引入 Maomi.MQ.RabbitMQ 包,在 Web 配置中注入服务:


// using Maomi.MQ;
// using RabbitMQ.Client;

builder.Services.AddMaomiMQ((MqOptionsBuilder options)=>
{
options.WorkId=1;
options.AppName="myapp";
options.Rabbit=(ConnectionFactory options)=>
{
options.HostName=Environment.GetEnvironmentVariable("RABBITMQ")!;
options.Port=5672;
options.ClientProvidedName=Assembly.GetExecutingAssembly().GetName().Name;
};
},[typeof(Program).Assembly]);

var app = builder.Build();

  • o WorkId: 指定用于生成分布式雪花 id 的节点 id,默认为 0。
    每条消息生成一个唯一的 id,便于追踪。如果不设置雪花id,在分布式服务中,多实例并行工作时,可能会产生相同的 id。
  • o AppName:用于标识消息的生产者,以及在日志和链路追踪中标识消息的生产者或消费者。
  • o Rabbit:RabbitMQ 客户端配置,请参考 ConnectionFactory

定义消息模型类,模型类是 MQ 通讯的消息基础,该模型类将会被序列化为二进制内容传递到 RabbitMQ 服务器中。


public classTestEvent
{
publicintId{get;set;}

public override string ToString()
{
returnId.ToString();
}
}

定义消费者,消费者需要实现 IConsumer 接口,以及使用 [Consumer] 特性注解配置消费者属性,如下所示,[Consumer("test")] 表示该消费者订阅的队列名称是 test

IConsumer 接口有三个方法,ExecuteAsync 方法用于处理消息,FaildAsync 会在 ExecuteAsync 异常时立即执行,如果代码一直异常,最终会调用 FallbackAsync 方法,Maomi.MQ 框架会根据 ConsumerState 值确定是否将消息放回队列重新消费,或者做其它处理动作。


[Consumer("test")]
publicclassMyConsumer:IConsumer<TestEvent>
{
// 消费
publicasyncTaskExecuteAsync(MessageHeader messageHeader,TestEvent message)
{
Console.WriteLine($"事件 id: {message.Id} {DateTime.Now}");
awaitTask.CompletedTask;
}

// 每次消费失败时执行
publicTaskFaildAsync(MessageHeader messageHeader,Exception ex,int retryCount,TestEvent message)
=>Task.CompletedTask;

// 补偿
publicTask<ConsumerState>FallbackAsync(MessageHeader messageHeader,TestEvent? message,Exception? ex)
=>Task.FromResult(ConsumerState.Ack);
}

Maomi.MQ 还具有多种消费者模式,代码写法不一样,后续会详细讲解不同的消费者模式。


如果要发布消息,只需要注入 IMessagePublisher 服务即可。


[ApiController]
[Route("[controller]")]
publicclassIndexController:ControllerBase
{
privatereadonlyIMessagePublisher _messagePublisher;

publicIndexController(IMessagePublisher messagePublisher)
{
_messagePublisher = messagePublisher;
}

[HttpGet("publish")]
publicasyncTask<string>Publisher()
{
// 发布消息
await _messagePublisher.PublishAsync(exchange:string.Empty, routingKey:"test", message:newTestEvent
{
Id=123
});
return"ok";
}
}

启动 Web 服务,在 swagger 页面上请求 API 接口,MyConsumer 服务会立即接收到发布的消息。


如果是控制台项目,则需要引入 Microsoft.Extensions.Hosting 包,以便让消费者在后台订阅队列消费消息。

参考 ConsoleDemo 项目。


using Maomi.MQ;
usingMicrosoft.Extensions.Hosting;
usingMicrosoft.Extensions.Logging;
usingRabbitMQ.Client;
usingSystem.Reflection;

var host =newHostBuilder()
.ConfigureLogging(options =>
{
options.AddConsole();
options.AddDebug();
})
.ConfigureServices(services =>
{
services.AddMaomiMQ(options =>
{
options.WorkId=1;
options.AppName="myapp";
options.Rabbit=(ConnectionFactory options)=>
{
options.HostName=Environment.GetEnvironmentVariable("RABBITMQ")!;
options.Port=5672;
options.ClientProvidedName=Assembly.GetExecutingAssembly().GetName().Name;
};
},newSystem.Reflection.Assembly[]{typeof(Program).Assembly});

}).Build();

// 后台运行
var task = host.RunAsync();

Console.ReadLine();

消息发布者

消息发布者用于推送消息到 RabbitMQ 服务器中,Maomi.MQ 支持多种消息发布者模式,支持 RabbitMQ 事务模式等,示例项目请参考 PublisherWeb

Maomi.MQ 通过 IMessagePublisher 向开发者提供消息推送服务。


在发布消息之前,需要定义一个事件模型类,用于传递消息。


public classTestEvent
{
publicintId{get;set;}

public override string ToString()
{
returnId.ToString();
}
}

然后注入 IMessagePublisher 服务,发布消息:


[ApiController]
[Route("[controller]")]
publicclassIndexController:ControllerBase
{
privatereadonlyIMessagePublisher _messagePublisher;

publicIndexController(IMessagePublisher messagePublisher)
{
_messagePublisher = messagePublisher;
}

[HttpGet("publish")]
publicasyncTask<string>Publisher()
{
for(var i =0; i <100; i++)
{
await _messagePublisher.PublishAsync(exchange:string.Empty, routingKey:"publish", message:newTestEvent
{
Id= i
});
}

return"ok";
}
}

一般情况下,一个模型类只应该被一个消费者所使用,那么通过事件可以找到唯一的消费者,也就是通过事件类型找到消费者的 IConsumerOptions,此时框架可以使用对应的配置发送消息。

TestMessageEvent 模型只有一个消费者:


[Consumer("publish", Qos = 1, RetryFaildRequeue = true)]
public class TestEventConsumer : IConsumer<TestMessageEvent>
{
// ... ...
}

可以直接发送事件,不需要填写交换器(Exchange)和路由键(RoutingKey)。


[HttpGet("publish_message")]
publicasyncTask<string>PublisherMessage()
{
// 如果在本项目中 TestMessageEvent 只指定了一个消费者,那么通过 TestMessageEvent 自动寻找对应的配置
for(var i =0; i <100; i++)
{
await _messagePublisher.PublishAsync(model:newTestMessageEvent
{
Id= i
});
}

return"ok";
}

IMessagePublisher

IMessagePublisher 是 Maomi.MQ 的基础消息发布接口,有以下方法:


// 消息发布者.
publicinterfaceIMessagePublisher
{
TaskPublishAsync<TMessage>(string exchange, // 交换器名称.
string routingKey,// 队列/路由键名称.
TMessage message, // 事件对象.
Action<BasicProperties> properties,
CancellationToken cancellationToken =default)
whereTMessage:class;

TaskPublishAsync<TMessage>(string exchange,
string routingKey,
TMessage message,
BasicProperties? properties =default,
CancellationToken cancellationToken =default);

TaskPublishAsync<TMessage>(TMessage message,
Action<BasicProperties>? properties =,
CancellationToken cancellationToken =default)
whereTMessage:class;

TaskPublishAsync<TMessage>(TMessage model,
BasicProperties? properties =default,
CancellationToken cancellationToken =default);

TaskCustomPublishAsync<TMessage>(string exchange,
string routingKey,
TMessage message,
BasicProperties? properties =default,
CancellationToken cancellationToken =default);
}

Maomi.MQ 的消息发布接口就这么几个,由于直接公开了 BasicProperties ,因此开发者完全自由配置 RabbitMQ 原生的消息属性,所以接口比较简单,开发者使用接口时可以灵活一些,使用难度也不大。


BasicProperties 是 RabbitMQ 中的消息基础属性对象,直接面向开发者,可以消息的发布和消费变得灵活和丰富功能,例如,可以通过 BasicProperties 配置单条消息的过期时间:


await _messagePublisher.PublishAsync(exchange:string.Empty, routingKey:"publish", message:newTestEvent
{
Id= i
},(BasicProperties p)=>
{
p.Expiration="1000";
});

Maomi.MQ 通过 DefaultMessagePublisher 类型实现了 IMessagePublisher,DefaultMessagePublisher 默认生命周期是 Scoped:


services.AddScoped<IMessagePublisher, DefaultMessagePublisher>();

开发者也可以自行实现 IMessagePublisher 接口,实现自己的消息发布模型,具体示例请参考 DefaultMessagePublisher 类型。

原生通道

开发者可以通过 ConnectionPool 服务获取原生连接对象,直接在 IConnection 上使用 RabbitMQ 的接口发布消息:


private readonly ConnectionPool _connectionPool;

var connectionObject = _connectionPool.Get();
connectionObject.DefaultChannel.BasicPublishAsync(... ...);

常驻内存连接对象

Maomi.MQ 通过 ConnectionPool 管理

相关推荐

unetbootin中文版:能够将Linux系统装进U盘的U盘启动盘制作工具

unetbootin中文版是一款能够将Linux操作系统装进U盘或移动硬盘的U盘启动盘制作工具,制作好的U盘启动盘能够用于电脑的维护和系统还原等操作,使用起来非常地不错。该软件不会基于操作系统使用特定...

实用之选,实用之改:DELL 戴尔 灵越14CR-4528B 小改作业

昨天发布了一篇三脚架,今天有时间也写写早就准备写的DELL戴尔灵越14CR-4528B作业吧。话说上个笔记本还是2006年底买的华硕A6JE,电脑挺不错的,在家上上网也够用了,就是转轴设计缺陷,容...

教你如何制作一个启动U盘,从此电脑不用找专人做系统

在电脑使用中,老是遇到卡顿,蓝屏,重启等很多故障,大多都是因为自己日常使用习惯而造成的,很多用户在下载软件的时候不知不觉中都被安装许多乱七八糟的软件,当电脑乱七八糟的东西过多的时候我们就重新来装一个系...

8、Deepin操作系统启动盘(系统盘)制作

1、在Deepin官网https://www.deepin.org/zh/download/下载原版Deepin操作系统2、同时在Deepin官网https://www.deepin.org/zh/d...

电脑死机怎么办,电脑如何使用U盘重装系统

电脑死机是我们最常遇到的系统故障,遇到死机时通常重启就可以解决,不过系统损坏引起的死机就只能重装系统,那么电脑死机如何重装系统呢?下面来看看电脑死机怎么办如何使用U盘重装系统_小白一键重装系统官网。 ...

bootmgr is compressed无法启动系统

bootmgriscompressedPressCtrlAltDeltorestart,电脑启动后无法正常开机出现了这样的字样,就是说明你的C盘驱动被压缩解决方法:1、使用系统光盘或者...

新手教程!如何分辨BIOS启动列表(菜单)中的各种启动项

在BIOS启动菜单中识别各类启动项,是新手安装系统或调整启动顺序的必备技能。下面用最直观的方式,为你梳理常见启动项及其含义,帮助你快速上手:一、传统存储设备启动项1.Floppy(软盘驱动器)对应...

带回家的MINI客厅电脑,自学成才,分享U盘装系统教程

刚好老家新装修了房子,客厅买了个大电视,本来是想在客厅弄台主机,接电视玩,大屏幕玩的才爽,但是台式机箱太占地方了。网上逛了一圈,发现有专门的客厅电脑,就搞了一个,外形不错,放客厅很有档次,主要是主机太...

电脑基础知识:BIOS简介及其与Windows操作系统的关系

什么是BIOS?BIOS,全称BasicInputOutputSystem,即“基本输入输出系统”,是一段固化在电脑主板芯片上的底层固件程序。它类似于一款极简化的操作系统,负责电脑开机时的硬件初...

win 7 系统注册表文件丢失或损坏,求不重做系统的解决办法!

粉丝问题解答:win7系统注册表文件丢失或损坏,求不重做系统的解决办法!解决方法:你只需要有启动盘即可,不需要其他的。之所以要求启动盘,是因为下面要对系统文件进行还原覆盖,所以不能用原系统启动。用...

UEFI怎么装Win7 小编呕血解难点!

自从广开言路之后,小编就被你们害苦了,这不,一条评论又让小编彻夜难眠。另外某些小伙伴坐不上沙发后提出要上墙的需求,其实呢只要大家提出的问题具有普遍性、有难度、而且适合小编做微信内容的话,都有机会将你们...

固态攻坚战——ASUS 华硕k45v换固态、拆机清灰教程

作者:蘑菇爱上我现在固态白菜价固态对于电脑体验的提升还是很大的对于固态存储芯片的问题没什么好说的有钱mlc,没钱tlc,不需要考虑什么寿命的问题,我用了一年多的m600,写入才3TB品牌很重要,主控...

MBR启动报错?Win10不重装一样能好!

Win10一遇到启动故障,很多小伙伴可能就会抓瞎,这可怎么弄,我不会修复啊!其实大可不必惊慌,就像这种最常见的Winload启动错误,多半都是MBR分区表丢失造成的(UEFI分区模式的几乎没有这种故障...

从零开始:硬盘手动装系统全攻略

手动安装操作系统是计算机技术必备的基本技能。对于初学者来说,可能会感到有些挑战。但通过掌握硬盘手动装系统方法,你可以亲身体验整个安装过程,进而更好地理解操作系统的工作原理。本文将详细介绍硬盘手动装系统...

电脑开机后显示File:BCD错误0xc000000f

WIN7\WIN8\WIN101、一个win864位PE。这个64位PE的相关文件,路径在boot\BOOT.WIM实机测试,开机后显示File:\EFI\Microsoft\Boot\BCD,...

取消回复欢迎 发表评论: