Commit 3004d43e by Alex Nasyr

produce / consume msgs added

parent 71d80092
...@@ -16,7 +16,7 @@ namespace DDO_Application.Controllers { ...@@ -16,7 +16,7 @@ namespace DDO_Application.Controllers {
public IServiceProvider Services { get; } public IServiceProvider Services { get; }
public apiController(IServiceProvider services, MessageProducer messageProducer) { public apiController(IServiceProvider services, MessageProducer messageProducer, MessageCunsomer messageCunsomer) {
Services = services; Services = services;
using (var scope = Services.CreateScope()) { using (var scope = Services.CreateScope()) {
...@@ -26,6 +26,7 @@ namespace DDO_Application.Controllers { ...@@ -26,6 +26,7 @@ namespace DDO_Application.Controllers {
_apiService = uploadProcessingService; _apiService = uploadProcessingService;
} }
_messageProducer = messageProducer; _messageProducer = messageProducer;
_messageCunsomer = messageCunsomer;
} }
[HttpGet] [HttpGet]
...@@ -40,16 +41,10 @@ namespace DDO_Application.Controllers { ...@@ -40,16 +41,10 @@ namespace DDO_Application.Controllers {
[HttpPost] [HttpPost]
[Route("[controller]/sendmsg/{msg}")] [Route("[controller]/sendmsg/{msg}")]
public async Task<IActionResult> SendMessage(string msg) { public async Task<IActionResult> SendMessage(string msg) {
var @event = new ReceivedMessage {Id = 1, Message = "hello"}; var @event = new ReceivedMessage {Id = 1, Message = msg};
await _messageProducer.PublishAsync(@event); await _messageProducer.PublishAsync(@event);
return StatusCode((int)HttpStatusCode.Created, null); return StatusCode((int)HttpStatusCode.Created, null);
} }
[HttpPost]
[Route("[controller]/receivemsg")]
public async Task<IActionResult> ReceiveMessage() {
return StatusCode((int)HttpStatusCode.Created, null);
}
} }
public class apiStatus { public class apiStatus {
......
using ActiveMQ.Artemis.Client; using ActiveMQ.Artemis.Client;
using ActiveMQ.Artemis.Client.Extensions.DependencyInjection; using ActiveMQ.Artemis.Client.Extensions.DependencyInjection;
using ActiveMQ.Artemis.Client.Transactions;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.DependencyInjection.Extensions;
using System; using System;
...@@ -11,14 +12,7 @@ namespace DDO_Application.Model { ...@@ -11,14 +12,7 @@ namespace DDO_Application.Model {
public static class ActiveMqExtensions { public static class ActiveMqExtensions {
public static IActiveMqBuilder AddTypedConsumer<TMessage, TConsumer>(this IActiveMqBuilder builder, RoutingType routingType) where TConsumer : class, ITypedConsumer<TMessage> { public static IActiveMqBuilder AddTypedConsumer<TMessage, TConsumer>(this IActiveMqBuilder builder, RoutingType routingType) where TConsumer : class, ITypedConsumer<TMessage> {
builder.Services.TryAddScoped<TConsumer>(); builder.Services.TryAddScoped<TConsumer>();
builder.AddConsumer(typeof(TMessage).Name, routingType, HandleMessage<TMessage, TConsumer>); builder.AddConsumer("test", routingType, HandleMessage<TMessage, TConsumer>);
return builder;
}
public static IActiveMqBuilder AddTypedConsumer<TMessage, TConsumer>(this IActiveMqBuilder builder, RoutingType routingType, string queue) where TConsumer : class, ITypedConsumer<TMessage> {
builder.Services.TryAddScoped<TConsumer>();
var address = typeof(TMessage).Name;
var queueName = $"{queue}";
builder.AddConsumer(address, routingType, queueName, HandleMessage<TMessage, TConsumer>);
return builder; return builder;
} }
......
...@@ -7,7 +7,12 @@ using ActiveMQ.Artemis.Client.Transactions; ...@@ -7,7 +7,12 @@ using ActiveMQ.Artemis.Client.Transactions;
namespace DDO_Application.Model { namespace DDO_Application.Model {
public class MessageCunsomer : ITypedConsumer<ReceivedMessage> { public class MessageCunsomer : ITypedConsumer<ReceivedMessage> {
public MessageCunsomer() {
}
public async Task ConsumeAsync(ReceivedMessage message, CancellationToken cancellationToken) { public async Task ConsumeAsync(ReceivedMessage message, CancellationToken cancellationToken) {
Console.WriteLine(message); Console.WriteLine(message);
} }
} }
......
using ActiveMQ.Artemis.Client;
namespace DDO_Application.Model {
public class configActiveMQ {
public string Host { get; set; }
public int Port { get; set; }
public string User { get; set; }
public string Pass { get; set; }
public Endpoint Endpoint => Endpoint.Create(Host, Port, User, Pass);
}
}
...@@ -28,21 +28,25 @@ namespace DDO_Application { ...@@ -28,21 +28,25 @@ namespace DDO_Application {
// This method gets called by the runtime. Use this method to add services to the container. // This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services) { public void ConfigureServices(IServiceCollection services) {
var ActiveMQ = Configuration.GetSection("ActiveMQ").Get<configActiveMQ>().Endpoint;
services.AddControllers(); services.AddControllers();
services.AddSwaggerGen(c => {c.SwaggerDoc("v1", new OpenApiInfo { Title = "DDO_Application", Version = "v1" }); }); services.AddSwaggerGen(c => {c.SwaggerDoc("v1", new OpenApiInfo { Title = "DDO_Application", Version = "v1" }); });
services.AddHostedService<ApiHostedService>(); services.AddHostedService<ApiHostedService>();
services.AddSingleton<IApiService, ApiProcessingService>(); services.AddSingleton<IApiService, ApiProcessingService>();
// turn enable ActiveMQ support in project // turn enable ActiveMQ support in project
//services.AddActiveMq("ddoApp-cluster", new[] { Endpoint.Create(host: "192.168.2.19", port: 5672, "guest", "guest") }) //services.AddActiveMq("ddoApp-cluster", new[] { Endpoint.Create(host: "192.168.2.19", port: 5672, "guest", "guest") })
services.AddActiveMq("ddoApp-cluster", new[] { Endpoint.Create(host: "192.168.2.22", port: 5672, "guest", "guest") }) services.AddActiveMq("ddoApp-cluster", new[] { ActiveMQ })
.AddAnonymousProducer<MessageProducer>() .AddAnonymousProducer<MessageProducer>()
.AddTypedConsumer<ReceivedMessage, MessageCunsomer>(RoutingType.Multicast); .AddTypedConsumer<ReceivedMessage, MessageCunsomer>(RoutingType.Multicast);
//.AddConsumer("test", RoutingType.Multicast, async (message, consumer, serviceProvider, cancellationToken) => { //.AddConsumer("test", RoutingType.Multicast, async (message, consumer, serviceProvider, cancellationToken) => {
// // your consuming logic // // your consuming logic
// await consumer.AcceptAsync(message); // await consumer.AcceptAsync(message);
//}); //});
services.AddActiveMqHostedService(); services.AddActiveMqHostedService();
} }
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline. // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
......
{ {
"ActiveMQ": {
"Host": "192.168.2.22",
"Port": 5672,
"User": "guest",
"Pass": "guest"
},
"Logging": { "Logging": {
"LogLevel": { "LogLevel": {
"Default": "Information", "Default": "Information",
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment