|
|
@@ -0,0 +1,187 @@
|
|
|
+package cz.senslog.messaging.app;
|
|
|
+
|
|
|
+import com.noenv.jsonpath.JsonPath;
|
|
|
+import cz.senslog.messaging.domain.ServiceType;
|
|
|
+import cz.senslog.messaging.service.AbstractService;
|
|
|
+import cz.senslog.messaging.utils.PromiseSupport;
|
|
|
+import io.vertx.core.AbstractVerticle;
|
|
|
+import io.vertx.core.Future;
|
|
|
+import io.vertx.core.Promise;
|
|
|
+import io.vertx.core.json.JsonArray;
|
|
|
+import io.vertx.core.json.JsonObject;
|
|
|
+import io.vertx.json.schema.*;
|
|
|
+import org.apache.logging.log4j.LogManager;
|
|
|
+import org.apache.logging.log4j.Logger;
|
|
|
+
|
|
|
+import java.util.*;
|
|
|
+import java.util.function.Function;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+import static io.vertx.json.schema.common.dsl.Schemas.*;
|
|
|
+import static io.vertx.json.schema.common.dsl.Schemas.objectSchema;
|
|
|
+
|
|
|
+public class ChannelSubscriberDeployer extends AbstractVerticle {
|
|
|
+
|
|
|
+ private static final Logger logger = LogManager.getLogger(ChannelSubscriberDeployer.class);
|
|
|
+
|
|
|
+ public static final Function<String, String> CHANNEL_PATH_TO_DEPLOY = (deploymentID) -> ("___"+deploymentID+"#deploy");
|
|
|
+
|
|
|
+ public static final Function<String, String> CHANNEL_PATH_TO_LIST = (deploymentID) -> ("___"+deploymentID+"#channels");
|
|
|
+
|
|
|
+ public static final Function<String, String> CHANNEL_PATH_TO_SEND = (deploymentID) -> ("___"+deploymentID+"#send");
|
|
|
+
|
|
|
+ public static final Function<String, String> CHANNEL_PATH_TO_DESCRIPTOR = (deploymentID) -> ("___"+deploymentID+"#descriptor");
|
|
|
+
|
|
|
+ private static final JsonSchemaOptions JSON_SCHEMA_OPTIONS;
|
|
|
+
|
|
|
+ private static final Validator channelDescriptionValidator;
|
|
|
+
|
|
|
+ private String serviceDeploymentId;
|
|
|
+
|
|
|
+ private Map<String, ChannelDescriptor> registeredChannels = new HashMap<>();
|
|
|
+
|
|
|
+ static {
|
|
|
+ JSON_SCHEMA_OPTIONS = new JsonSchemaOptions()
|
|
|
+ .setBaseUri("http://localhost:8080")
|
|
|
+ .setDraft(Draft.DRAFT7);
|
|
|
+
|
|
|
+ final JsonObject schemaJson = arraySchema().items(objectSchema()
|
|
|
+ .requiredProperty("id", stringSchema())
|
|
|
+ .requiredProperty("schema", objectSchema())
|
|
|
+ .requiredProperty("subscribers", arraySchema().items(objectSchema()
|
|
|
+ .requiredProperty("serviceId", stringSchema())
|
|
|
+ .requiredProperty("messageTransform", arraySchema().items(objectSchema())))
|
|
|
+ ))
|
|
|
+ .toJson();
|
|
|
+
|
|
|
+ channelDescriptionValidator = Validator.create(JsonSchema.of(schemaJson), JSON_SCHEMA_OPTIONS) ;
|
|
|
+ }
|
|
|
+
|
|
|
+ private record ChannelDescriptor(String id, JsonObject messageSchema, Validator messageValidator, List<ServiceHandler> handlers) {}
|
|
|
+
|
|
|
+ private record ServiceHandler(String serviceId, ServiceType serviceType, JsonObject subscriberInfo, JsonArray transformRules) {
|
|
|
+ public JsonObject transformMessage(JsonObject message) {
|
|
|
+ final JsonObject copySubscriber = subscriberInfo.copy();
|
|
|
+ transformRules.stream().map(JsonObject.class::cast).forEach(rule -> {
|
|
|
+ for (String fieldName : rule.fieldNames()) {
|
|
|
+ JsonPath.put(copySubscriber, rule.getString(fieldName), JsonPath.getString(message, fieldName));
|
|
|
+ }
|
|
|
+ });
|
|
|
+ return copySubscriber;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void start(Promise<Void> startPromise) {
|
|
|
+ this.serviceDeploymentId = config().getString("services.deploymentId");
|
|
|
+ Future.all(registerEventBusDeploy(), registerEventBusListChannels(), registerEventBusSend(), registerEventBusDescriptor())
|
|
|
+ .onSuccess(s -> startPromise.complete()).onFailure(startPromise::fail);
|
|
|
+ }
|
|
|
+
|
|
|
+ private Future<Void> registerEventBusDescriptor() {
|
|
|
+ final Promise<Void> deployPromise = Promise.promise();
|
|
|
+ vertx.eventBus().<JsonObject>consumer(CHANNEL_PATH_TO_DESCRIPTOR.apply(deploymentID()), msg -> {
|
|
|
+ final String channelId = msg.headers().get("channelId");
|
|
|
+ final ChannelDescriptor channelDescriptor = registeredChannels.get(channelId);
|
|
|
+ if (channelDescriptor == null) {
|
|
|
+ msg.fail(204, "No subscribers for the channel <" + channelId + ">.");
|
|
|
+ } else {
|
|
|
+ msg.reply(JsonObject.of(
|
|
|
+ "channelId", channelDescriptor.id(),
|
|
|
+ "schema", channelDescriptor.messageSchema(),
|
|
|
+ "subscribers", new JsonArray(channelDescriptor.handlers.stream().map(s -> JsonObject.of(
|
|
|
+ "serviceId", s.serviceId(),
|
|
|
+ "serviceType", s.serviceType()
|
|
|
+ )).toList())
|
|
|
+ ));
|
|
|
+ }
|
|
|
+ }).completionHandler(PromiseSupport.onComplete(deployPromise));
|
|
|
+ return deployPromise.future();
|
|
|
+ }
|
|
|
+
|
|
|
+ private Future<Void> registerEventBusSend() {
|
|
|
+ final Promise<Void> deployPromise = Promise.promise();
|
|
|
+ vertx.eventBus().<JsonObject>consumer(CHANNEL_PATH_TO_SEND.apply(deploymentID()), msg -> {
|
|
|
+ final String channelId = msg.headers().get("channelId");
|
|
|
+ final JsonObject channelMessage = msg.body();
|
|
|
+ final ChannelDescriptor channelDescriptor = registeredChannels.get(channelId);
|
|
|
+ if (channelDescriptor == null) {
|
|
|
+ msg.fail(204, "No subscribers for the channel <" + channelId + ">.");
|
|
|
+ } else {
|
|
|
+ final OutputUnit validationRes = channelDescriptor.messageValidator().validate(channelMessage);
|
|
|
+ if (validationRes.getValid()) {
|
|
|
+ channelDescriptor.handlers.forEach(h -> vertx.eventBus().
|
|
|
+ <JsonObject>request(AbstractService.EVENT_BUS_PATH_TO_SEND.apply(h.serviceId()), h.transformMessage(channelMessage), reply -> {
|
|
|
+ if (reply.succeeded()) {
|
|
|
+ logger.info("Message sent to the channel <" + channelId + "> with the result: " + reply.result().body());
|
|
|
+ } else {
|
|
|
+ logger.error("Message sent to the channel <" + channelId + "> failed: " + reply.cause());
|
|
|
+ }
|
|
|
+ })
|
|
|
+ );
|
|
|
+ msg.reply(JsonObject.of("message", "Accepted"));
|
|
|
+ } else {
|
|
|
+ msg.fail(422, "Unprocessable message: " + validationRes.getError());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }).completionHandler(PromiseSupport.onComplete(deployPromise));
|
|
|
+ return deployPromise.future();
|
|
|
+ }
|
|
|
+
|
|
|
+ private Future<Void> registerEventBusListChannels() {
|
|
|
+ final Promise<Void> deployPromise = Promise.promise();
|
|
|
+ vertx.eventBus().<JsonArray>consumer(CHANNEL_PATH_TO_LIST.apply(deploymentID()), msg -> {
|
|
|
+ msg.reply(new JsonArray(registeredChannels.keySet().stream().map(channelId -> JsonObject.of(
|
|
|
+ "name", channelId
|
|
|
+ )).toList()));
|
|
|
+ }).completionHandler(PromiseSupport.onComplete(deployPromise));
|
|
|
+ return deployPromise.future();
|
|
|
+ }
|
|
|
+
|
|
|
+ private Future<Void> registerEventBusDeploy() {
|
|
|
+ final Promise<Void> deployPromise = Promise.promise();
|
|
|
+ vertx.eventBus().<JsonArray>consumer(CHANNEL_PATH_TO_DEPLOY.apply(deploymentID()), msg -> {
|
|
|
+ final JsonArray descriptors = msg.body();
|
|
|
+ final OutputUnit vRes = channelDescriptionValidator.validate(descriptors);
|
|
|
+ if (!vRes.getValid()) {
|
|
|
+ msg.fail(400, vRes.getError());
|
|
|
+ } else {
|
|
|
+ vertx.eventBus().<JsonArray>request(ServiceDeployer.SERVICE_PATH_TO_LIST.apply(serviceDeploymentId), JsonArray.of(), reply -> {
|
|
|
+ if (reply.failed()) {
|
|
|
+ msg.fail(500, reply.cause().getMessage());
|
|
|
+ } else {
|
|
|
+ final Map<String, JsonObject> services = reply.result().body().stream().map(JsonObject.class::cast)
|
|
|
+ .collect(Collectors.toMap(k -> k.getString("serviceId"), Function.identity()));
|
|
|
+ final Map<String, JsonObject> channels = descriptors.stream().map(JsonObject.class::cast)
|
|
|
+ .collect(Collectors.toMap(k -> k.getString("id"), Function.identity()));
|
|
|
+ this.registeredChannels = mappingChannelsToServices(channels, services);
|
|
|
+ msg.reply(new JsonArray(registeredChannels.keySet().stream().toList()));
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }).completionHandler(PromiseSupport.onComplete(deployPromise));
|
|
|
+ return deployPromise.future();
|
|
|
+ }
|
|
|
+
|
|
|
+ private Map<String, ChannelDescriptor> mappingChannelsToServices(Map<String, JsonObject> channels, Map<String, JsonObject> services) {
|
|
|
+ final Map<String, ChannelDescriptor> registeredChannels = new HashMap<>(channels.size());
|
|
|
+ for (Map.Entry<String, JsonObject> channelEntry : channels.entrySet()) {
|
|
|
+ final String channelId = channelEntry.getKey();
|
|
|
+ final JsonObject channelDescriptor = channelEntry.getValue();
|
|
|
+ final JsonObject channelSchema = channelDescriptor.getJsonObject("schema");
|
|
|
+ final JsonArray channelSubscribers = channelDescriptor.getJsonArray("subscribers");
|
|
|
+ final List<ServiceHandler> serviceHandlers = new ArrayList<>(channelSubscribers.size());
|
|
|
+ final Validator channelValidator = Validator.create(JsonSchema.of(channelSchema), JSON_SCHEMA_OPTIONS);
|
|
|
+ registeredChannels.put(channelId, new ChannelDescriptor(channelId, channelSchema, channelValidator, serviceHandlers));
|
|
|
+ channelSubscribers.stream().map(JsonObject.class::cast).forEach(subscriber -> {
|
|
|
+ final String serviceId = subscriber.getString("serviceId");
|
|
|
+ final JsonArray messageTransform = subscriber.getJsonArray("messageTransform");
|
|
|
+ final JsonObject serviceDescriptor = services.get(serviceId);
|
|
|
+ final ServiceType serviceType = ServiceType.of(serviceDescriptor.getString("type"));
|
|
|
+ serviceHandlers.add(new ServiceHandler(serviceId, serviceType, subscriber, messageTransform));
|
|
|
+ });
|
|
|
+ }
|
|
|
+ return registeredChannels;
|
|
|
+ }
|
|
|
+}
|