Commit 71d80092 by Alex Nasyr

111

parent 4f3ed0f0
......@@ -11,6 +11,7 @@ namespace DDO_Application.Controllers {
[ApiController]
public class apiController : ControllerBase {
private readonly MessageProducer _messageProducer;
private readonly MessageCunsomer _messageCunsomer;
private IApiService _apiService;
public IServiceProvider Services { get; }
......@@ -39,12 +40,14 @@ namespace DDO_Application.Controllers {
[HttpPost]
[Route("[controller]/sendmsg/{msg}")]
public async Task<IActionResult> SendMessage(string msg) {
await _messageProducer.PublishAsync(msg);
var @event = new ReceivedMessage {Id = 1, Message = "hello"};
await _messageProducer.PublishAsync(@event);
return StatusCode((int)HttpStatusCode.Created, null);
}
[HttpPost]
[Route("[controller]/receivemsg")]
public async Task<IActionResult> ReceiveMessage() {
return StatusCode((int)HttpStatusCode.Created, null);
}
}
......
using ActiveMQ.Artemis.Client;
using System.Threading;
using System.Threading.Tasks;
using ActiveMQ.Artemis.Client.Transactions;
namespace DDO_Application.Model {
public interface ITransactionConsumer {
Task ConsumeAsync(Message message, Transaction transaction, CancellationToken cancellationToken);
}
}
......@@ -5,8 +5,9 @@ using System.Threading.Tasks;
using ActiveMQ.Artemis.Client.Transactions;
namespace DDO_Application.Model {
public class MessageCunsomer : ITransactionConsumer {
public async Task ConsumeAsync(Message message, Transaction transaction, CancellationToken cancellationToken) {
public class MessageCunsomer : ITypedConsumer<ReceivedMessage> {
public async Task ConsumeAsync(ReceivedMessage message, CancellationToken cancellationToken) {
Console.WriteLine(message);
}
}
......
namespace DDO_Application.Model {
public class ReceivedMessage {
public int Id { get; set; }
public string Message { get; set; }
}
}
......@@ -14,10 +14,7 @@ namespace DDO_Application.Services {
private async Task DoWork(CancellationToken stoppingToken) {
using (var scope = Services.CreateScope()) {
var uploadProcessingService =
scope.ServiceProvider
.GetRequiredService<IApiService>();
var uploadProcessingService = scope.ServiceProvider.GetRequiredService<IApiService>();
await uploadProcessingService.DoWork(stoppingToken);
}
}
......
......@@ -33,12 +33,14 @@ namespace DDO_Application {
services.AddHostedService<ApiHostedService>();
services.AddSingleton<IApiService, ApiProcessingService>();
// turn enable ActiveMQ support in project
services.AddActiveMq("ddoApp-cluster", new[] { Endpoint.Create(host: "192.168.2.19", port: 5672, "guest", "guest") })
//services.AddActiveMq("ddoApp-cluster", new[] { Endpoint.Create(host: "192.168.2.19", port: 5672, "guest", "guest") })
services.AddActiveMq("ddoApp-cluster", new[] { Endpoint.Create(host: "192.168.2.22", port: 5672, "guest", "guest") })
.AddAnonymousProducer<MessageProducer>()
.AddConsumer("test", RoutingType.Multicast, async (message, consumer, serviceProvider, cancellationToken) => {
// your consuming logic
await consumer.AcceptAsync(message);
});
.AddTypedConsumer<ReceivedMessage, MessageCunsomer>(RoutingType.Multicast);
//.AddConsumer("test", RoutingType.Multicast, async (message, consumer, serviceProvider, cancellationToken) => {
// // your consuming logic
// await consumer.AcceptAsync(message);
//});
services.AddActiveMqHostedService();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment