|
|
@@ -145,8 +145,9 @@ public abstract class AbstractGateway {
|
|
|
eventBus.request(schedulerConsumerAddr, message.body(), schedulerConsumerReply -> {
|
|
|
if (schedulerConsumerReply.succeeded()) {
|
|
|
io.vertx.core.eventbus.Message<Object> schedulerConsumerMsg = schedulerConsumerReply.result();
|
|
|
+ DeliveryOptions options = new DeliveryOptions().setHeaders(schedulerConsumerMsg.headers());
|
|
|
logger.info("[{}/{}] Handling request from address {}.", moduleId, gatewayId, schedulerConsumerMsg.address());
|
|
|
- message.reply(schedulerConsumerMsg.body());
|
|
|
+ message.reply(schedulerConsumerMsg.body(), options);
|
|
|
} else {
|
|
|
logger.error("[{}/{}] Creating a fail message for the address {}.", moduleId, gatewayId, schedulerConsumerAddr);
|
|
|
message.fail(400, schedulerConsumerReply.cause().getMessage());
|
|
|
@@ -154,77 +155,6 @@ public abstract class AbstractGateway {
|
|
|
});
|
|
|
}
|
|
|
});
|
|
|
-
|
|
|
- /*
|
|
|
- eventBus.consumer(gatewayAddr, message -> {
|
|
|
- logger.info("[{}/{}] Handling request from address {}.", moduleId, gatewayId, message.address());
|
|
|
- if (isDefault) {
|
|
|
- String proxyConsumerName = message.headers().get(ADDRESS);
|
|
|
- // this is a provider and must request a local consumer
|
|
|
- DeliveryOptions proxyOptions = new DeliveryOptions().setHeaders(message.headers());
|
|
|
- // request proxy consumer (eq scheduler-observations)
|
|
|
- String proxyAddr = createAddress(proxyConsumerName);
|
|
|
- logger.info("[{}/{}] Creating a request at the address {}.", moduleId, gatewayId, proxyAddr);
|
|
|
- eventBus.request(proxyAddr, message.body(), proxyOptions, proxyReply -> {
|
|
|
- if (proxyReply.succeeded()) {
|
|
|
- io.vertx.core.eventbus.Message<Object> proxyMsg = proxyReply.result();
|
|
|
- logger.info("[{}/{}] Handling request from address {}.", moduleId, gatewayId, proxyMsg.address());
|
|
|
- String providerProxyName = proxyMsg.headers().get(ADDRESS); // hit: consumerName is the same as providerProxyName
|
|
|
- String providerName = schedulerMapping.get(providerProxyName);
|
|
|
- if (providerName != null) {
|
|
|
- DeliveryOptions providerOptions = new DeliveryOptions().setHeaders(proxyMsg.headers());
|
|
|
- // request provider consumer (eq observations)
|
|
|
- String providerAddr = createAddress(providerName);
|
|
|
- logger.info("[{}/{}] Creating a request at the address {}.", moduleId, gatewayId, providerAddr);
|
|
|
- eventBus.request(providerAddr, proxyMsg.body(), providerOptions, providerReply -> {
|
|
|
- if (providerReply.succeeded()) {
|
|
|
- io.vertx.core.eventbus.Message<Object> providerMsg = providerReply.result();
|
|
|
- logger.info("[{}/{}] Handling request from address {}.", moduleId, gatewayId, providerMsg.address());
|
|
|
- DeliveryOptions replyOptions = new DeliveryOptions().setHeaders(providerMsg.headers());
|
|
|
- logger.info("[{}/{}] Reply of the message {}.", moduleId, gatewayId, message.address());
|
|
|
- message.reply(providerMsg.body(), replyOptions);
|
|
|
- }else {
|
|
|
- logger.error("[{}/{}] Creating a fail message for the address {}.", moduleId, gatewayId, message.address());
|
|
|
- message.fail(400, providerReply.cause().getMessage());
|
|
|
- }
|
|
|
- });
|
|
|
- } else {
|
|
|
- logger.error("[{}/{}] Creating a fail message for the address {}.", moduleId, gatewayId, message.address());
|
|
|
- message.fail(400, "No conversion endpoint for " + providerName);
|
|
|
- }
|
|
|
- } else {
|
|
|
- logger.error("[{}/{}] Creating a fail message for the address {}.", moduleId, gatewayId, message.address());
|
|
|
- message.fail(400, proxyReply.cause().getMessage());
|
|
|
- }
|
|
|
- });
|
|
|
- } else {
|
|
|
- String providerName = message.headers().get(ADDRESS);
|
|
|
- String consumerName = schedulerMapping.get(providerName);
|
|
|
- if (consumerName != null) {
|
|
|
- DeliveryOptions consumerOptions = new DeliveryOptions().setHeaders(message.headers());
|
|
|
- String consumerAddr = createAddress(consumerName);
|
|
|
- logger.info("[{}/{}] Creating a request at the address {}.", moduleId, gatewayId, consumerAddr);
|
|
|
- eventBus.request(consumerAddr, message.body(), consumerOptions, consumerReply -> {
|
|
|
- if (consumerReply.succeeded()) {
|
|
|
- io.vertx.core.eventbus.Message<Object> consumerMsg = consumerReply.result();
|
|
|
- logger.info("[{}/{}] Handling request from address {}.", moduleId, gatewayId, consumerMsg.address());
|
|
|
- DeliveryOptions replyOptions = new DeliveryOptions().setHeaders(consumerMsg.headers());
|
|
|
- logger.info("[{}/{}] Reply of the message {}.", moduleId, gatewayId, message.address());
|
|
|
- message.reply(consumerMsg.body(), replyOptions);
|
|
|
- } else {
|
|
|
- logger.error("[{}/{}] Creating a fail message for the address {}.", moduleId, gatewayId, message.address());
|
|
|
- logger.catching(consumerReply.cause());
|
|
|
- message.fail(400, consumerReply.cause().getMessage());
|
|
|
- }
|
|
|
- });
|
|
|
- } else {
|
|
|
- logger.error("[{}/{}] Creating a fail message for the address {}.", moduleId, gatewayId, message.address());
|
|
|
- message.fail(400, "No conversion endpoint for " + providerName);
|
|
|
- }
|
|
|
- }
|
|
|
- });
|
|
|
-
|
|
|
- */
|
|
|
}
|
|
|
|
|
|
|