简介
MassTransit 是一个自由、开源、轻量级的消息总线, 用于使用. NET 框架创建分布式应用程序。MassTransit 在现有消息传输上提供了一组广泛的功能, 从而使开发人员能够友好地使用基于消息的会话模式异步连接服务。基于消息的通信是实现面向服务的体系结构的可靠和可扩展的方式。
RabbitMQ是成熟的MQ队列服务,是由 Erlang 语言开发的 AMQP 的开源实现。关于介绍RabbitMQ的中文资料也很多,有需要可以自行查找。
官网地址:https://masstransit-project.com/
文档地址:https://masstransit-project.com/getting-started/
GitHub地址:https://github.com/MassTransit/MassTransit
下面将介绍如何使用Masstransit与RabbitMQ实现消息的发布订阅
安装RabbitMQ
Quick Start
先决条件
- 安装nuget MassTransit.Extensions.DependencyInjection
- 安装nuget MassTransit.RabbitMQ
- 本篇使用的.net core 版本为3.0
新建项目创建生产者
- 注册 Masstransit服务
在Startup
类中加入
public void RegisterBus(IServiceCollection services)
{
services.AddMassTransit();
services.AddSingleton(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host("host(127.0.0.1)", "virtual(test)", "connectionName(test)", h =>
{
h.Username("username(guest)");
h.Password("password(guest)");
});
}));
services.AddSingleton<IPublishEndpoint>(provider => provider.GetRequiredService<IBusControl>());
services.AddSingleton<ISendEndpointProvider>(provider => provider.GetRequiredService<IBusControl>());
services.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>());
}
在ConfigureServices
方法中加入
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
RegisterBus(services);
}
在Configure
中启动Masstransit
public void Configure(IApplicationBuilder app, IWebHostEnvironment env,IBusControl bus, IHostApplicationLifetime lifetime)
{
app.UseRouting();
app.UseAuthorization();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
lifetime.ApplicationStarted.Register(() => { bus.StartAsync(); });
lifetime.ApplicationStopped.Register(() => { bus.StopAsync(); });
}
其中IHostApplicationLifetime
为.net core 应用程序生命周期
- 定义消息
消息类型主要分为两种,事件和命令。在为消息选择名称时,消息的类型应决定消息的时态。
命令告诉服务做某事。命令被发送(使用Send)到端点,因为期望单个服务实例执行命令动作。永远不要发布命令。命令应遵循Tell风格,以动词-名词顺序表达,示例:UpdateOrderStatus 更改订单状态
事件表明发生了某些事情。通过(独立)或(在消息使用者内)发布事件(使用Publish)。事件不应直接发送到端点。事件应以名词-动词(过去式)序列表示,表明发生了某些事情。示例:OrderPaid 订单已支付
在创建消息时,应避免为消息创建基类,这会带来一些错误。这句话来自官网,具体是什么样的错误,暂时还未遇见
定义事件
public interface IOrderPaid
{
Guid EventId { get;}
string Text { get;}
DateTime Date { get; }
}
定义命令
public interface IUpdateOrderStatus
{
Guid CommandId { get;}
string Text { get;}
DateTime Date { get; }
}
- 发布事件
在Controller做如下操作
namespace Producer.Controllers
{
[ApiController]
[Route("[controller]")]
public class HomeController : ControllerBase
{
private readonly IBus _bus;
public HomeController(IBus bus) => _bus = bus;
[HttpGet]
public async Task<IActionResult> Index()
{
await _bus.Publish<IOrderPaid>(new
{
EventId = Guid.NewGuid(),
Text = "订单已支付",
Date = DateTime.Now
});
return Ok("ok");
}
}
}
新建项目创建消费者
- 在
Startup
中注册Masstransit
namespace Consumer
{
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
public IConfiguration Configuration { get; }
// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
RegisterBus(services);
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IWebHostEnvironment env,
IBusControl bus, IHostApplicationLifetime lifetime)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseHttpsRedirection();
app.UseRouting();
app.UseAuthorization();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
lifetime.ApplicationStarted.Register(() => { bus.StartAsync(); });
lifetime.ApplicationStopped.Register(() => { bus.StopAsync(); });
}
public void RegisterBus(IServiceCollection services)
{
services.AddMassTransit(conf =>
{
conf.AddConsumer<OrderPaidConsumer>();
conf.AddConsumer<UpdateOrderStatusConsumer>();
});
services.AddSingleton(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host("127.0.0.1", "test", "test", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ReceiveEndpoint(host, "order-queue", e =>
{
e.Consumer<OrderPaidConsumer>(provider);
e.Consumer<UpdateOrderStatusConsumer>(provider);
});
//失败后重试3次,每次间隔1分钟
cfg.UseRetry(r => r.Interval(3, TimeSpan.FromMinutes(1)));
//每分钟消息消费数1000以内
cfg.UseRateLimit(1000, TimeSpan.FromMinutes(1));
services.AddSingleton<IPublishEndpoint>(provider => provider.GetRequiredService<IBusControl>());
services.AddSingleton<ISendEndpointProvider>(provider => provider.GetRequiredService<IBusControl>());
services.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>());
}));
}
}
}
- 实现
IConsumer<T>
来接收消息
namespace Consumer
{
public class OrderPaidConsumer : IConsumer<IOrderPaid>
{
public Task Consume(ConsumeContext<IOrderPaid> context)
{
return Task.Run(async () =>
{
var message = context.Message;
Console.WriteLine(JsonConvert.SerializeObject(message));
// 发送命令
var endPoint = await context.GetSendEndpoint(new Uri("rabbitmq://localhost/test/order-queue"));
await endPoint.Send<IUpdateOrderStatus>(new
{
CommandId = Guid.NewGuid(),
Text = "修改订单状态",
Date = DateTime.Now
});
});
}
}
}
处理上面发送的命令,和上面差不多,只要实现IConsumer<T>
即可
运行结果
生产者直接运行,访问 /Home/Index
消费者接收到的消息
结语
本篇只是极简的介绍了一下Masstransit操作RabbitMQ来实现消息的接收/发送及发布/订阅。使用Masstransit能够使RabbitMQ的操作变得极为简洁。如需了解更多,请查阅官网文档