Commit 0243c96e by Alex Nasyr

1h: TypedConcumer generic type realized

parent 70984214
......@@ -19,31 +19,12 @@ namespace SocialMinistryDataExchange.Model {
}
private static async Task HandleMessage<TMessage, TConsumer>(Message message, IConsumer consumer, IServiceProvider serviceProvider, CancellationToken token) where TConsumer : class, ITypedConsumer<TMessage> {
//var msg = JsonSerializer.Deserialize<TMessage>(message.GetBody<string>());
var msg = (TMessage)Convert.ChangeType(message.GetBody<string>(), typeof(TMessage));
using var scope = serviceProvider.CreateScope();
var typedConsumer = scope.ServiceProvider.GetService<TConsumer>();
await typedConsumer.ConsumeAsync(msg, token);
await consumer.AcceptAsync(message);
}
/*
//public static IActiveMqBuilder AddTypedConsumer<TMessage, TConsumer>(this IActiveMqBuilder builder, RoutingType routingType) where TConsumer : class, ITypedConsumer<TMessage> {
public static IActiveMqBuilder AddTypedConsumer<TMessage, TConsumer>(this IActiveMqBuilder builder, string address, RoutingType routingType) where TConsumer : class, ITypedConsumer<TMessage> {
builder.Services.TryAddScoped<TConsumer>();
//builder.AddConsumer(typeof(TMessage).Name, routingType, HandleCustomerMessage<TMessage, TConsumer>);
builder.AddConsumer(address, routingType, HandleCustomerMessage<TMessage, TConsumer>);
return builder;
}
private static async Task HandleCustomerMessage<TMessage, TConsumer>(Message message, IConsumer consumer, IServiceProvider serviceProvider, CancellationToken token) where TConsumer : class, ITypedConsumer<TMessage> {
var msg = JsonSerializer.Deserialize<TMessage>(message.GetBody<string>());
using var scope = serviceProvider.CreateScope();
var typedConsumer = scope.ServiceProvider.GetService<TConsumer>();
await typedConsumer.ConsumeAsync(msg, token);
await consumer.AcceptAsync(message);
}
*/
}
}
\ No newline at end of file
......@@ -7,6 +7,7 @@ namespace SocialMinistryDataExchange.Model {
public StringMessageCunsomer() {
}
public async Task ConsumeAsync(String message, CancellationToken cancellationToken) {
// тут логика
Console.WriteLine(message);
}
......
......@@ -5,13 +5,12 @@ using System.Threading.Tasks;
namespace SocialMinistryDataExchange.Model {
public class MessageProducer {
private readonly IProducer _producer;
public MessageProducer(IProducer producer) {
_producer = producer;
}
public async Task PublishAsync<T>(T message) {
var msg = new Message(message);
try {
await _producer.SendAsync(msg);
}
......
......@@ -48,7 +48,7 @@ namespace SocialMinistryDataExchange {
string SmQueue = "MV.SMEV_INF_DAN_OBE_ZHIL.REQ";
try {
services.AddActiveMq("ddoApp-cluster", new[] { Endpoint.Create(host: "172.17.100.121", port: 61616, "contingent", "RjQ66VWS") })
services.AddActiveMq("sm-cluster", new[] { Endpoint.Create(host: "172.17.100.121", port: 61616, "contingent", "RjQ66VWS") })
.AddProducer<MessageProducer>(SmQueue, RoutingType.Anycast)
.AddTypedConsumer<String, StringMessageCunsomer>(SmQueue, RoutingType.Anycast);
services.AddActiveMqHostedService();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment