Commit 4f3ed0f0 by AlexNasyr

22

parent 6fb87588
...@@ -37,11 +37,16 @@ namespace DDO_Application.Controllers { ...@@ -37,11 +37,16 @@ namespace DDO_Application.Controllers {
public int GetLifetime() => _apiService.Counter; public int GetLifetime() => _apiService.Counter;
[HttpPost] [HttpPost]
[Route("[controller]/sendmsg")] [Route("[controller]/sendmsg/{msg}")]
public async Task<IActionResult> SendMessage(string msg) { public async Task<IActionResult> SendMessage(string msg) {
await _messageProducer.PublishAsync(msg); await _messageProducer.PublishAsync(msg);
return StatusCode((int)HttpStatusCode.Created, null); return StatusCode((int)HttpStatusCode.Created, null);
} }
[HttpPost]
[Route("[controller]/receivemsg")]
public async Task<IActionResult> ReceiveMessage() {
return StatusCode((int)HttpStatusCode.Created, null);
}
} }
public class apiStatus { public class apiStatus {
......
using ActiveMQ.Artemis.Client;
using ActiveMQ.Artemis.Client.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using System;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
namespace DDO_Application.Model {
public static class ActiveMqExtensions {
public static IActiveMqBuilder AddTypedConsumer<TMessage, TConsumer>(this IActiveMqBuilder builder, RoutingType routingType) where TConsumer : class, ITypedConsumer<TMessage> {
builder.Services.TryAddScoped<TConsumer>();
builder.AddConsumer(typeof(TMessage).Name, routingType, HandleMessage<TMessage, TConsumer>);
return builder;
}
public static IActiveMqBuilder AddTypedConsumer<TMessage, TConsumer>(this IActiveMqBuilder builder, RoutingType routingType, string queue) where TConsumer : class, ITypedConsumer<TMessage> {
builder.Services.TryAddScoped<TConsumer>();
var address = typeof(TMessage).Name;
var queueName = $"{queue}";
builder.AddConsumer(address, routingType, queueName, HandleMessage<TMessage, TConsumer>);
return builder;
}
private static async Task HandleMessage<TMessage, TConsumer>(Message message, IConsumer consumer, IServiceProvider serviceProvider, CancellationToken token) where TConsumer : class, ITypedConsumer<TMessage> {
var msg = JsonSerializer.Deserialize<TMessage>(message.GetBody<string>());
using var scope = serviceProvider.CreateScope();
var typedConsumer = scope.ServiceProvider.GetService<TConsumer>();
await typedConsumer.ConsumeAsync(msg, token);
await consumer.AcceptAsync(message);
}
}
}
\ No newline at end of file
using ActiveMQ.Artemis.Client;
using System.Threading;
using System.Threading.Tasks;
using ActiveMQ.Artemis.Client.Transactions;
namespace DDO_Application.Model {
public interface ITransactionConsumer {
Task ConsumeAsync(Message message, Transaction transaction, CancellationToken cancellationToken);
}
}
using System.Threading;
using System.Threading.Tasks;
namespace DDO_Application.Model {
public interface ITypedConsumer<in T> {
public Task ConsumeAsync(T message, CancellationToken cancellationToken);
}
}
using ActiveMQ.Artemis.Client;
using System;
using System.Threading;
using System.Threading.Tasks;
using ActiveMQ.Artemis.Client.Transactions;
namespace DDO_Application.Model {
public class MessageCunsomer : ITransactionConsumer {
public async Task ConsumeAsync(Message message, Transaction transaction, CancellationToken cancellationToken) {
Console.WriteLine(message);
}
}
}
...@@ -11,7 +11,7 @@ namespace DDO_Application.Model { ...@@ -11,7 +11,7 @@ namespace DDO_Application.Model {
public async Task PublishAsync<T>(T message) { public async Task PublishAsync<T>(T message) {
var serialized = JsonSerializer.Serialize(message); var serialized = JsonSerializer.Serialize(message);
var address = typeof(T).Name; var address = "test";
var msg = new Message(serialized); var msg = new Message(serialized);
await _producer.SendAsync(address, msg); await _producer.SendAsync(address, msg);
} }
......
...@@ -33,9 +33,9 @@ namespace DDO_Application { ...@@ -33,9 +33,9 @@ namespace DDO_Application {
services.AddHostedService<ApiHostedService>(); services.AddHostedService<ApiHostedService>();
services.AddSingleton<IApiService, ApiProcessingService>(); services.AddSingleton<IApiService, ApiProcessingService>();
// turn enable ActiveMQ support in project // turn enable ActiveMQ support in project
services.AddActiveMq("ddoApp-cluster", new[] { Endpoint.Create(host: "localhost", port: 5672, "guest", "guest") }) services.AddActiveMq("ddoApp-cluster", new[] { Endpoint.Create(host: "192.168.2.19", port: 5672, "guest", "guest") })
.AddAnonymousProducer<MessageProducer>() .AddAnonymousProducer<MessageProducer>()
.AddConsumer("BookUpdated", RoutingType.Multicast, async (message, consumer, serviceProvider, cancellationToken) => { .AddConsumer("test", RoutingType.Multicast, async (message, consumer, serviceProvider, cancellationToken) => {
// your consuming logic // your consuming logic
await consumer.AcceptAsync(message); await consumer.AcceptAsync(message);
}); });
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment