|
|
@@ -1,7 +1,11 @@
|
|
|
package io.connector.core;
|
|
|
|
|
|
+import io.connector.core.http.RequestUriComponent;
|
|
|
+import io.connector.core.http.RequestUriParser;
|
|
|
import io.vertx.core.eventbus.DeliveryOptions;
|
|
|
import io.vertx.core.eventbus.EventBus;
|
|
|
+import io.vertx.core.http.HttpServerRequest;
|
|
|
+import io.vertx.core.json.JsonObject;
|
|
|
import io.vertx.ext.web.Router;
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
@@ -12,8 +16,9 @@ import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.function.Consumer;
|
|
|
|
|
|
-import static io.connector.core.AddressPath.*;
|
|
|
import static io.connector.core.AddressPath.Creator.createNormalized;
|
|
|
+import static io.connector.core.AddressPath.SCHEDULER_CONSUMER;
|
|
|
+import static io.connector.core.AddressPath.SCHEDULER_PROVIDER;
|
|
|
import static io.connector.core.MessageHeader.*;
|
|
|
import static java.lang.String.format;
|
|
|
|
|
|
@@ -28,12 +33,14 @@ public abstract class AbstractGateway {
|
|
|
private String moduleId;
|
|
|
protected String gatewayId;
|
|
|
private final Set<String> registeredConsumers;
|
|
|
+ private final Set<String> registeredScheduleConsumers;
|
|
|
private final Map<String, String> schedulerMapping;
|
|
|
|
|
|
protected AbstractGateway(String id, boolean isDefault) {
|
|
|
this.gatewayId = id;
|
|
|
this.isDefault = isDefault;
|
|
|
this.registeredConsumers = new HashSet<>();
|
|
|
+ this.registeredScheduleConsumers = new HashSet<>();
|
|
|
this.schedulerMapping = new HashMap<>();
|
|
|
}
|
|
|
|
|
|
@@ -73,8 +80,12 @@ public abstract class AbstractGateway {
|
|
|
registerSchedulerConsumers();
|
|
|
}
|
|
|
|
|
|
+ private boolean isConsumerRegistered(String consumerName) {
|
|
|
+ return registeredConsumers.contains(consumerName) || registeredScheduleConsumers.contains(consumerName);
|
|
|
+ }
|
|
|
+
|
|
|
private String createAddress(String consumerName) {
|
|
|
- if (!registeredConsumers.contains(consumerName)) {
|
|
|
+ if (!isConsumerRegistered(consumerName)) {
|
|
|
throw logger.throwing(new RuntimeException(
|
|
|
format("Consumer '%s' in module %s and gateway %s is not registered.",
|
|
|
consumerName, moduleId, gatewayId)
|
|
|
@@ -83,8 +94,13 @@ public abstract class AbstractGateway {
|
|
|
return createNormalized(moduleId, gatewayId, consumerName);
|
|
|
}
|
|
|
|
|
|
+ protected RequestUriComponent parseUriToComponents(HttpServerRequest request) {
|
|
|
+ return RequestUriParser.parse(request, moduleId, gatewayId);
|
|
|
+ }
|
|
|
+
|
|
|
public interface SchedulerMapping {
|
|
|
SchedulerMapping addMapping(String from, String to);
|
|
|
+
|
|
|
}
|
|
|
|
|
|
protected final SchedulerMapping schedulerMapping() {
|
|
|
@@ -114,33 +130,70 @@ public abstract class AbstractGateway {
|
|
|
}
|
|
|
|
|
|
String baseAddr = isDefault ? SCHEDULER_PROVIDER : SCHEDULER_CONSUMER;
|
|
|
- eventBus.consumer(createNormalized(baseAddr, gatewayId), message -> {
|
|
|
+ String gatewayAddr = createNormalized(baseAddr, gatewayId);
|
|
|
+ logger.info("[{}/{}] Creating a consumer at the address {}.", moduleId, gatewayId, gatewayAddr);
|
|
|
+
|
|
|
+ eventBus.consumer(gatewayAddr, message -> {
|
|
|
+ logger.info("[{}/{}] Handling request from address {}.", moduleId, gatewayId, message.address());
|
|
|
+ String schedulerConsumerName = message.headers().get(ADDRESS);
|
|
|
+ if (!registeredScheduleConsumers.contains(schedulerConsumerName)) {
|
|
|
+ logger.error("[{}/{}] Creating a fail message for the address {}.", moduleId, gatewayId, schedulerConsumerName);
|
|
|
+ message.fail(400, "No registered consumer for the address " + schedulerConsumerName);
|
|
|
+ } else {
|
|
|
+ String schedulerConsumerAddr = createAddress(schedulerConsumerName);
|
|
|
+ logger.info("[{}/{}] Creating a request at the address {}.", moduleId, gatewayId, schedulerConsumerAddr);
|
|
|
+ eventBus.request(schedulerConsumerAddr, message.body(), schedulerConsumerReply -> {
|
|
|
+ if (schedulerConsumerReply.succeeded()) {
|
|
|
+ io.vertx.core.eventbus.Message<Object> schedulerConsumerMsg = schedulerConsumerReply.result();
|
|
|
+ logger.info("[{}/{}] Handling request from address {}.", moduleId, gatewayId, schedulerConsumerMsg.address());
|
|
|
+ message.reply(schedulerConsumerMsg.body());
|
|
|
+ } else {
|
|
|
+ logger.error("[{}/{}] Creating a fail message for the address {}.", moduleId, gatewayId, schedulerConsumerAddr);
|
|
|
+ message.fail(400, schedulerConsumerReply.cause().getMessage());
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ /*
|
|
|
+ eventBus.consumer(gatewayAddr, message -> {
|
|
|
+ logger.info("[{}/{}] Handling request from address {}.", moduleId, gatewayId, message.address());
|
|
|
if (isDefault) {
|
|
|
String proxyConsumerName = message.headers().get(ADDRESS);
|
|
|
// this is a provider and must request a local consumer
|
|
|
DeliveryOptions proxyOptions = new DeliveryOptions().setHeaders(message.headers());
|
|
|
// request proxy consumer (eq scheduler-observations)
|
|
|
- eventBus.request(createAddress(proxyConsumerName), message.body(), proxyOptions, proxyReply -> {
|
|
|
+ String proxyAddr = createAddress(proxyConsumerName);
|
|
|
+ logger.info("[{}/{}] Creating a request at the address {}.", moduleId, gatewayId, proxyAddr);
|
|
|
+ eventBus.request(proxyAddr, message.body(), proxyOptions, proxyReply -> {
|
|
|
if (proxyReply.succeeded()) {
|
|
|
io.vertx.core.eventbus.Message<Object> proxyMsg = proxyReply.result();
|
|
|
+ logger.info("[{}/{}] Handling request from address {}.", moduleId, gatewayId, proxyMsg.address());
|
|
|
String providerProxyName = proxyMsg.headers().get(ADDRESS); // hit: consumerName is the same as providerProxyName
|
|
|
String providerName = schedulerMapping.get(providerProxyName);
|
|
|
if (providerName != null) {
|
|
|
DeliveryOptions providerOptions = new DeliveryOptions().setHeaders(proxyMsg.headers());
|
|
|
// request provider consumer (eq observations)
|
|
|
- eventBus.request(createAddress(providerName), proxyMsg.body(), providerOptions, providerReply -> {
|
|
|
+ String providerAddr = createAddress(providerName);
|
|
|
+ logger.info("[{}/{}] Creating a request at the address {}.", moduleId, gatewayId, providerAddr);
|
|
|
+ eventBus.request(providerAddr, proxyMsg.body(), providerOptions, providerReply -> {
|
|
|
if (providerReply.succeeded()) {
|
|
|
io.vertx.core.eventbus.Message<Object> providerMsg = providerReply.result();
|
|
|
+ logger.info("[{}/{}] Handling request from address {}.", moduleId, gatewayId, providerMsg.address());
|
|
|
DeliveryOptions replyOptions = new DeliveryOptions().setHeaders(providerMsg.headers());
|
|
|
+ logger.info("[{}/{}] Reply of the message {}.", moduleId, gatewayId, message.address());
|
|
|
message.reply(providerMsg.body(), replyOptions);
|
|
|
}else {
|
|
|
- message.fail(400, proxyReply.cause().getMessage());
|
|
|
+ logger.error("[{}/{}] Creating a fail message for the address {}.", moduleId, gatewayId, message.address());
|
|
|
+ message.fail(400, providerReply.cause().getMessage());
|
|
|
}
|
|
|
});
|
|
|
} else {
|
|
|
+ logger.error("[{}/{}] Creating a fail message for the address {}.", moduleId, gatewayId, message.address());
|
|
|
message.fail(400, "No conversion endpoint for " + providerName);
|
|
|
}
|
|
|
} else {
|
|
|
+ logger.error("[{}/{}] Creating a fail message for the address {}.", moduleId, gatewayId, message.address());
|
|
|
message.fail(400, proxyReply.cause().getMessage());
|
|
|
}
|
|
|
});
|
|
|
@@ -149,26 +202,37 @@ public abstract class AbstractGateway {
|
|
|
String consumerName = schedulerMapping.get(providerName);
|
|
|
if (consumerName != null) {
|
|
|
DeliveryOptions consumerOptions = new DeliveryOptions().setHeaders(message.headers());
|
|
|
- eventBus.request(createAddress(consumerName), message.body(), consumerOptions, consumerReply -> {
|
|
|
+ String consumerAddr = createAddress(consumerName);
|
|
|
+ logger.info("[{}/{}] Creating a request at the address {}.", moduleId, gatewayId, consumerAddr);
|
|
|
+ eventBus.request(consumerAddr, message.body(), consumerOptions, consumerReply -> {
|
|
|
if (consumerReply.succeeded()) {
|
|
|
io.vertx.core.eventbus.Message<Object> consumerMsg = consumerReply.result();
|
|
|
+ logger.info("[{}/{}] Handling request from address {}.", moduleId, gatewayId, consumerMsg.address());
|
|
|
DeliveryOptions replyOptions = new DeliveryOptions().setHeaders(consumerMsg.headers());
|
|
|
+ logger.info("[{}/{}] Reply of the message {}.", moduleId, gatewayId, message.address());
|
|
|
message.reply(consumerMsg.body(), replyOptions);
|
|
|
} else {
|
|
|
+ logger.error("[{}/{}] Creating a fail message for the address {}.", moduleId, gatewayId, message.address());
|
|
|
+ logger.catching(consumerReply.cause());
|
|
|
message.fail(400, consumerReply.cause().getMessage());
|
|
|
}
|
|
|
});
|
|
|
} else {
|
|
|
+ logger.error("[{}/{}] Creating a fail message for the address {}.", moduleId, gatewayId, message.address());
|
|
|
message.fail(400, "No conversion endpoint for " + providerName);
|
|
|
}
|
|
|
}
|
|
|
});
|
|
|
+
|
|
|
+ */
|
|
|
}
|
|
|
|
|
|
|
|
|
protected interface Event {
|
|
|
+ <T> void consumeScheduler(String address, Consumer<Message<T>> handler);
|
|
|
<T> void consume(String address, Consumer<Message<T>> handler);
|
|
|
<T> void send(String address, Object body, DeliveryOptions options, Consumer<Message<T>> handler);
|
|
|
+ <T> void send(String address, Object body, Consumer<Message<T>> handler);
|
|
|
}
|
|
|
|
|
|
private class EventImpl implements Event {
|
|
|
@@ -180,9 +244,29 @@ public abstract class AbstractGateway {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
+ public <T> void consumeScheduler(String address, Consumer<Message<T>> handler) {
|
|
|
+ if (registeredScheduleConsumers.contains(address)) {
|
|
|
+ throw logger.throwing(new RuntimeException(
|
|
|
+ format("Consumer '%s' in module %s and gateway %s is already registered as a scheduler consumer.",
|
|
|
+ address, moduleId, gatewayId
|
|
|
+ )
|
|
|
+ ));
|
|
|
+ }
|
|
|
+ registeredScheduleConsumers.add(address);
|
|
|
+ String consumerAddr = createAddress(address);
|
|
|
+ logger.info("[{}/{}] Creating a consumer at the address {}.", moduleId, gatewayId, consumerAddr);
|
|
|
+ eventBus.<T>consumer(consumerAddr, message -> {
|
|
|
+ // TODO vymyslet, jak registrovat scheudler tak, aby byl dosazitelny pouze ze scheduleru
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
public <T> void consume(String address, Consumer<Message<T>> handler) {
|
|
|
registeredConsumers.add(address);
|
|
|
- eventBus.<T>consumer(createAddress(address), message -> {
|
|
|
+ String consumerAddr = createAddress(address);
|
|
|
+ logger.info("[{}/{}] Creating a consumer at the address {}.", moduleId, gatewayId, consumerAddr);
|
|
|
+ eventBus.<T>consumer(consumerAddr, message -> {
|
|
|
+ logger.info("[{}/{}] Handling request from address {}.", moduleId, gatewayId, message.address());
|
|
|
Message<T> msg = new Message<>(message);
|
|
|
try {
|
|
|
handler.accept(msg);
|
|
|
@@ -191,6 +275,7 @@ public abstract class AbstractGateway {
|
|
|
}
|
|
|
if (msg.isFail()) {
|
|
|
Fail fail = msg.cause();
|
|
|
+ logger.error("[{}/{}] Creating a fail message for the address {}.", moduleId, gatewayId, message.address());
|
|
|
message.fail(fail.getCode(), fail.getMessage());
|
|
|
} else {
|
|
|
Reply<Object> reply = msg.reply;
|
|
|
@@ -199,9 +284,11 @@ public abstract class AbstractGateway {
|
|
|
.addHeader(MODULE_TYPE, moduleId)
|
|
|
.addHeader(GATEWAY_TYPE, gatewayId)
|
|
|
.addHeader(ADDRESS, address);
|
|
|
+ logger.info("[{}/{}] Reply of the message {}.", moduleId, gatewayId, message.address());
|
|
|
message.reply(reply.data, reply.options());
|
|
|
} else {
|
|
|
- message.fail(204, "no content");
|
|
|
+ logger.info("[{}/{}] Reply empty message for the address {}.", moduleId, gatewayId, message.address());
|
|
|
+ message.reply(new JsonObject());
|
|
|
}
|
|
|
}
|
|
|
});
|
|
|
@@ -209,9 +296,12 @@ public abstract class AbstractGateway {
|
|
|
|
|
|
@Override
|
|
|
public <T> void send(String address, Object body, DeliveryOptions options, Consumer<Message<T>> handler) {
|
|
|
- eventBus.<T>request(createAddress(address), body, options, reply -> {
|
|
|
+ String requestAddr = createAddress(address);
|
|
|
+ logger.info("[{}/{}] Creating a request at the address {}.", moduleId, gatewayId, requestAddr);
|
|
|
+ eventBus.<T>request(requestAddr, body, options, reply -> {
|
|
|
Message<T> message;
|
|
|
if (reply.succeeded()) {
|
|
|
+ logger.info("[{}/{}] Handling request from address {}.", moduleId, gatewayId, reply.result().address());
|
|
|
message = new Message<>(reply.result());
|
|
|
} else {
|
|
|
message = new Message<>(reply.cause());
|
|
|
@@ -219,5 +309,10 @@ public abstract class AbstractGateway {
|
|
|
handler.accept(message);
|
|
|
});
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public <T> void send(String address, Object body, Consumer<Message<T>> handler) {
|
|
|
+ send(address, body, new DeliveryOptions(), handler);
|
|
|
+ }
|
|
|
}
|
|
|
}
|