Commit 9eaadc67 by AlexNasyr

гавно сраное

parent 1868ac05
...@@ -5,6 +5,7 @@ using DDO_Application.Model; ...@@ -5,6 +5,7 @@ using DDO_Application.Model;
using System; using System;
using System.Threading.Tasks; using System.Threading.Tasks;
using System.Net; using System.Net;
using ActiveMQ.Artemis.Client.Transactions;
namespace DDO_Application.Controllers { namespace DDO_Application.Controllers {
...@@ -41,7 +42,7 @@ namespace DDO_Application.Controllers { ...@@ -41,7 +42,7 @@ namespace DDO_Application.Controllers {
[HttpPost] [HttpPost]
[Route("[controller]/sendmsg/{msg}")] [Route("[controller]/sendmsg/{msg}")]
public async Task<IActionResult> SendMessage(string msg) { public async Task<IActionResult> SendMessage(string msg) {
var @event = new ReceivedMessage {Id = 1, Message = msg}; var @event = new TestMessage { Id = 1, Message = msg};
await _messageProducer.PublishAsync(@event); await _messageProducer.PublishAsync(@event);
return StatusCode((int)HttpStatusCode.Created, null); return StatusCode((int)HttpStatusCode.Created, null);
} }
......
...@@ -10,18 +10,20 @@ using System.Threading.Tasks; ...@@ -10,18 +10,20 @@ using System.Threading.Tasks;
namespace DDO_Application.Model { namespace DDO_Application.Model {
public static class ActiveMqExtensions { public static class ActiveMqExtensions {
public static IActiveMqBuilder AddTypedConsumer<TMessage, TConsumer>(this IActiveMqBuilder builder, RoutingType routingType) where TConsumer : class, ITypedConsumer<TMessage> { public static IActiveMqBuilder AddTypedConsumer<TMessage, TConsumer>(this IActiveMqBuilder builder, string address, RoutingType routingType) where TConsumer : class, ITypedConsumer<TMessage> {
builder.Services.TryAddScoped<TConsumer>(); builder.Services.TryAddScoped<TConsumer>();
builder.AddConsumer("test", routingType, HandleMessage<TMessage, TConsumer>); builder.AddConsumer(address, routingType, HandleCustomerMessage<TMessage, TConsumer>);
return builder; return builder;
} }
private static async Task HandleMessage<TMessage, TConsumer>(Message message, IConsumer consumer, IServiceProvider serviceProvider, CancellationToken token) where TConsumer : class, ITypedConsumer<TMessage> { private static async Task HandleCustomerMessage<TMessage, TConsumer>(Message message, IConsumer consumer, IServiceProvider serviceProvider, CancellationToken token) where TConsumer : class, ITypedConsumer<TMessage> {
var msg = JsonSerializer.Deserialize<TMessage>(message.GetBody<string>()); var msg = JsonSerializer.Deserialize<TMessage>(message.GetBody<string>());
using var scope = serviceProvider.CreateScope(); using var scope = serviceProvider.CreateScope();
var typedConsumer = scope.ServiceProvider.GetService<TConsumer>(); var typedConsumer = scope.ServiceProvider.GetService<TConsumer>();
await typedConsumer.ConsumeAsync(msg, token); await typedConsumer.ConsumeAsync(msg, token);
await consumer.AcceptAsync(message); await consumer.AcceptAsync(message);
} }
} }
} }
\ No newline at end of file
...@@ -5,12 +5,12 @@ using System.Threading.Tasks; ...@@ -5,12 +5,12 @@ using System.Threading.Tasks;
using ActiveMQ.Artemis.Client.Transactions; using ActiveMQ.Artemis.Client.Transactions;
namespace DDO_Application.Model { namespace DDO_Application.Model {
public class MessageCunsomer : ITypedConsumer<ReceivedMessage> { public class MessageCunsomer : ITypedConsumer<TestMessage> {
public MessageCunsomer() { public MessageCunsomer() {
} }
public async Task ConsumeAsync(ReceivedMessage message, CancellationToken cancellationToken) { public async Task ConsumeAsync(TestMessage message, CancellationToken cancellationToken) {
// тут логика // тут логика
......
using ActiveMQ.Artemis.Client; using ActiveMQ.Artemis.Client;
using ActiveMQ.Artemis.Client.Transactions;
using System.Text.Json; using System.Text.Json;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace DDO_Application.Model { namespace DDO_Application.Model {
...@@ -10,10 +12,11 @@ namespace DDO_Application.Model { ...@@ -10,10 +12,11 @@ namespace DDO_Application.Model {
_producer = producer; _producer = producer;
_address = "test"; _address = "test";
} }
public MessageProducer(IAnonymousProducer producer, string address) { public MessageProducer(IProducer producer, string address) {
_producer = producer; _producer = producer as IAnonymousProducer;
_address = address; _address = address;
} }
public async Task PublishAsync<T>(T message) { public async Task PublishAsync<T>(T message) {
var serialized = JsonSerializer.Serialize(message); var serialized = JsonSerializer.Serialize(message);
var msg = new Message(serialized); var msg = new Message(serialized);
......
namespace DDO_Application.Model { namespace DDO_Application.Model {
public class ReceivedMessage { public class TestMessage {
public int Id { get; set; } public int Id { get; set; }
public string Message { get; set; } public string Message { get; set; }
} }
......
...@@ -36,11 +36,12 @@ namespace DDO_Application { ...@@ -36,11 +36,12 @@ namespace DDO_Application {
services.AddHostedService<ApiHostedService>(); services.AddHostedService<ApiHostedService>();
services.AddSingleton<IApiService, ApiProcessingService>(); services.AddSingleton<IApiService, ApiProcessingService>();
// turn enable ActiveMQ support in project // turn enable ActiveMQ support in project
//services.AddActiveMq("ddoApp-cluster", new[] { Endpoint.Create(host: "192.168.2.19", port: 5672, "guest", "guest") }) services.AddActiveMq("ddoApp-cluster", new[] { Endpoint.Create(host: "192.168.2.19", port: 5672, "guest", "guest") })
services.AddActiveMq("ddoApp-cluster", new[] { ActiveMQ }) //services.AddActiveMq("ddoApp-cluster", new[] { ActiveMQ })
.AddAnonymousProducer<MessageProducer>() //.AddAnonymousProducer<MessageProducer>()
//.AddProducer<MessageProducer>(testQueue.Out, RoutingType.Multicast) .AddProducer<MessageProducer>(testQueue.Out)
.AddTypedConsumer<ReceivedMessage, MessageCunsomer>(RoutingType.Multicast); //.AddTypedProducer<TestMessage, TypedProducer>(testQueue.Out, RoutingType.Multicast)
.AddTypedConsumer<TestMessage, MessageCunsomer>(testQueue.In, RoutingType.Multicast);
services.AddActiveMqHostedService(); services.AddActiveMqHostedService();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment