使用Masstransit操作RabbitMQ

使用Masstransit操作RabbitMQ

简介

MassTransit 是一个自由、开源、轻量级的消息总线, 用于使用. NET 框架创建分布式应用程序。MassTransit 在现有消息传输上提供了一组广泛的功能, 从而使开发人员能够友好地使用基于消息的会话模式异步连接服务。基于消息的通信是实现面向服务的体系结构的可靠和可扩展的方式。
RabbitMQ是成熟的MQ队列服务,是由 Erlang 语言开发的 AMQP 的开源实现。关于介绍RabbitMQ的中文资料也很多,有需要可以自行查找。

官网地址:https://masstransit-project.com/
文档地址:https://masstransit-project.com/getting-started/
GitHub地址:https://github.com/MassTransit/MassTransit
下面将介绍如何使用Masstransit与RabbitMQ实现消息的发布订阅

安装RabbitMQ

  1. 安装之前需要先安装 erlang
  2. 然后到RabbitMQ 官网下载安装包安装

Quick Start

先决条件

  1. 安装nuget MassTransit.Extensions.DependencyInjection
  2. 安装nuget MassTransit.RabbitMQ
  3. 本篇使用的.net core 版本为3.0

新建项目创建生产者

  1. 注册 Masstransit服务

Startup类中加入

public void RegisterBus(IServiceCollection services)
{
    services.AddMassTransit();
    services.AddSingleton(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        var host = cfg.Host("host(127.0.0.1)", "virtual(test)", "connectionName(test)", h =>
        {
            h.Username("username(guest)");
            h.Password("password(guest)");
        });
    }));

    services.AddSingleton<IPublishEndpoint>(provider => provider.GetRequiredService<IBusControl>());
    services.AddSingleton<ISendEndpointProvider>(provider => provider.GetRequiredService<IBusControl>());
    services.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>());
}

ConfigureServices方法中加入

public void ConfigureServices(IServiceCollection services)
{
    services.AddControllers();
    RegisterBus(services);
}

Configure中启动Masstransit

public void Configure(IApplicationBuilder app, IWebHostEnvironment env,IBusControl bus, IHostApplicationLifetime lifetime)
{
    app.UseRouting();

    app.UseAuthorization();

    app.UseEndpoints(endpoints =>
    {
        endpoints.MapControllers();
    });

    lifetime.ApplicationStarted.Register(() => { bus.StartAsync(); });
    lifetime.ApplicationStopped.Register(() => { bus.StopAsync(); });
}

其中IHostApplicationLifetime为.net core 应用程序生命周期

  1. 定义消息

消息类型主要分为两种,事件和命令。在为消息选择名称时,消息的类型应决定消息的时态。

命令告诉服务做某事。命令被发送(使用Send)到端点,因为期望单个服务实例执行命令动作。永远不要发布命令。命令应遵循Tell风格,以动词-名词顺序表达,示例:UpdateOrderStatus 更改订单状态

事件表明发生了某些事情。通过(独立)或(在消息使用者内)发布事件(使用Publish)。事件不应直接发送到端点。事件应以名词-动词(过去式)序列表示,表明发生了某些事情。示例:OrderPaid 订单已支付

在创建消息时,应避免为消息创建基类,这会带来一些错误。这句话来自官网,具体是什么样的错误,暂时还未遇见

定义事件

public interface IOrderPaid
{
    Guid EventId { get;}
    string Text { get;}
    DateTime Date { get; }
}

定义命令

public interface IUpdateOrderStatus
{
    Guid CommandId { get;}
    string Text { get;}
    DateTime Date { get; }
}
  1. 发布事件

在Controller做如下操作

namespace Producer.Controllers
{
    [ApiController]
    [Route("[controller]")]
    public class HomeController : ControllerBase
    {
        private readonly IBus _bus;
        public HomeController(IBus bus) => _bus = bus;

        [HttpGet]
        public async Task<IActionResult> Index()
        {
            await _bus.Publish<IOrderPaid>(new
            {
                EventId = Guid.NewGuid(),
                Text = "订单已支付",
                Date = DateTime.Now
            });
            return Ok("ok");
        }
    }
}

新建项目创建消费者

  1. Startup中注册Masstransit
namespace Consumer
{
    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        // This method gets called by the runtime. Use this method to add services to the container.
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddControllers();
            RegisterBus(services);
        }

        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, IWebHostEnvironment env,
            IBusControl bus, IHostApplicationLifetime lifetime)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            app.UseHttpsRedirection();

            app.UseRouting();

            app.UseAuthorization();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });

            lifetime.ApplicationStarted.Register(() => { bus.StartAsync(); });
            lifetime.ApplicationStopped.Register(() => { bus.StopAsync(); });
        }

        public void RegisterBus(IServiceCollection services)
        {
            services.AddMassTransit(conf =>
            {
                conf.AddConsumer<OrderPaidConsumer>();
                conf.AddConsumer<UpdateOrderStatusConsumer>();
            });

            services.AddSingleton(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                var host = cfg.Host("127.0.0.1", "test", "test", h =>
                {
                    h.Username("guest");
                    h.Password("guest");
                });

                cfg.ReceiveEndpoint(host, "order-queue", e =>
                {
                    e.Consumer<OrderPaidConsumer>(provider);
                    e.Consumer<UpdateOrderStatusConsumer>(provider);
                });

                //失败后重试3次,每次间隔1分钟
                cfg.UseRetry(r => r.Interval(3, TimeSpan.FromMinutes(1)));
                //每分钟消息消费数1000以内
                cfg.UseRateLimit(1000, TimeSpan.FromMinutes(1));

                services.AddSingleton<IPublishEndpoint>(provider => provider.GetRequiredService<IBusControl>());
                services.AddSingleton<ISendEndpointProvider>(provider => provider.GetRequiredService<IBusControl>());
                services.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>());
            }));
        }
    }
}
  1. 实现IConsumer<T>来接收消息
namespace Consumer
{
    public class OrderPaidConsumer : IConsumer<IOrderPaid>
    {
        public Task Consume(ConsumeContext<IOrderPaid> context)
        {
            return Task.Run(async () =>
            {
                var message = context.Message;
                Console.WriteLine(JsonConvert.SerializeObject(message));
                // 发送命令
		var endPoint = await context.GetSendEndpoint(new Uri("rabbitmq://localhost/test/order-queue"));
                await endPoint.Send<IUpdateOrderStatus>(new
                {
                    CommandId = Guid.NewGuid(),
                    Text = "修改订单状态",
                    Date = DateTime.Now
                });
            });
        }
    }
}

处理上面发送的命令,和上面差不多,只要实现IConsumer<T>即可

运行结果

生产者直接运行,访问 /Home/Index Producer

消费者接收到的消息 Consumer

结语

本篇只是极简的介绍了一下Masstransit操作RabbitMQ来实现消息的接收/发送及发布/订阅。使用Masstransit能够使RabbitMQ的操作变得极为简洁。如需了解更多,请查阅官网文档