Bladeren bron

refactored scheduling (not done yet)

Lukas Cerny 5 jaren geleden
bovenliggende
commit
02130469e6

+ 2 - 2
config/test.yaml

@@ -58,10 +58,10 @@ services:
 scheduler:
 #  senslog1:
 #      period: 30
-#      consumer: "schedule-observations"
+#      consumer: "test"
 #      config:
 #        startDate: "2020-08-05T12:24:17[Europe/Prague]"
 #        allowedStations:
 #          # unitId: [sensorId...]
 #          10002222: [410010000, 560030000, 340020000, 380010000, 380090000]
-##          10002376: [410010000, 560030000, 340020000, 380010000, 380090000]
+#          10002376: [410010000, 560030000, 340020000, 380010000, 380090000]

+ 4 - 3
connector-core/src/main/java/io/connector/core/AbstractGateway.java

@@ -255,9 +255,10 @@ public abstract class AbstractGateway {
             registeredScheduleConsumers.add(address);
             String consumerAddr = createAddress(address);
             logger.info("[{}/{}] Creating a consumer at the address {}.", moduleId, gatewayId, consumerAddr);
-            eventBus.<T>consumer(consumerAddr, message -> {
-                // TODO vymyslet, jak registrovat scheudler tak, aby byl dosazitelny pouze ze scheduleru
-            });
+//            eventBus.<T>consumer(consumerAddr, message -> {
+//                // TODO vymyslet, jak registrovat scheudler tak, aby byl dosazitelny pouze ze scheduleru
+//            });
+            consume(address, handler); // temporary hack
         }
 
         @Override

+ 85 - 33
connector-core/src/main/java/io/connector/core/VertxScheduler.java

@@ -11,10 +11,13 @@ import io.vertx.ext.web.handler.BodyHandler;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.time.LocalDateTime;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static cz.senslog.common.http.HttpContentType.APPLICATION_JSON;
+import static cz.senslog.common.http.HttpHeader.CONTENT_TYPE;
 import static io.connector.core.AddressPath.Creator.create;
 import static io.connector.core.AddressPath.Creator.createNormalized;
 import static io.connector.core.AddressPath.SCHEDULER_CONSUMER;
@@ -44,53 +47,87 @@ class VertxScheduler extends Module {
         this.scheduledModules = new HashMap<>(schedulerConfigs.size());
     }
 
+    private long registerNewSchedulingRole(String moduleId, SchedulerConfig config) {
+        EventBus eventBus = vertx.eventBus();
+        if (config != null && config.getPeriodSecond() > 0) {
+            return vertx.setPeriodic(config.getPeriodMillisecond(), ctx -> {
+                if (config.shouldTerminate()) { return; }
+                DeliveryOptions fetcherOpt = new DeliveryOptions()
+                        .addHeader(RESOURCE, SCHEDULER)
+                        .addHeader(ADDRESS, config.getConsumer());
+                String providerAddress = createNormalized(SCHEDULER_PROVIDER, moduleId);
+                eventBus.request(providerAddress, config.getConfig(), fetcherOpt, reply -> {
+                    if (reply.succeeded()) {
+
+                        Message<Object> result = reply.result();
+                        String source = result.headers().get(MODULE_TYPE);
+
+                        if (source == null) {
+                            logger.warn("Unknown module type of the address '{}'. The message will not be published.", providerAddress); return;
+                        }
+
+                        DeliveryOptions pusherOpt = new DeliveryOptions();
+                        MultiMap headers = result.headers().addAll(fetcherOpt.getHeaders());
+                        for (Map.Entry<String, String> entryHeader : headers.entries()) {
+                            if (entryHeader.getKey().startsWith(MessageHeader.getPrefix())) {
+                                pusherOpt.addHeader(entryHeader.getKey(), entryHeader.getValue());
+                            }
+                        }
+
+                        String consumerAddress = createNormalized(SCHEDULER_CONSUMER, source);
+                        eventBus.publish(consumerAddress, result.body(), pusherOpt);
+                    } else {
+                        logger.catching(reply.cause());
+                    }
+                });
+            });
+        }
+        return -1;
+    }
+
     @Override
     public void run() {
 
-        EventBus eventBus = vertx.eventBus();
+//        vertx.setPeriodic(1000, ctx -> { // every minute?
+//            // TODO delete all rules after deadLine
+//        });
+
         for (Map.Entry<String, SchedulerConfig> configEntry : schedulerConfigs.entrySet()) {
             String moduleId = configEntry.getKey();
             SchedulerConfig config = configEntry.getValue();
-
-            if (config != null && config.getPeriodSecond() > 0) {
-                long schedulerId = vertx.setPeriodic(config.getPeriodMillisecond(), ctx -> {
-                    DeliveryOptions fetcherOpt = new DeliveryOptions()
-                            .addHeader(RESOURCE, SCHEDULER)
-                            .addHeader(ADDRESS, config.getConsumer());
-                    String providerAddress = createNormalized(SCHEDULER_PROVIDER, moduleId);
-                    // TODO instance of new JsonObject(), put config as a json
-                    eventBus.request(providerAddress, new JsonObject(), fetcherOpt, reply -> {
-                        if (reply.succeeded()) {
-                            Message<Object> result = reply.result();
-                            String source = result.headers().get(MODULE_TYPE);
-                            DeliveryOptions pusherOpt = new DeliveryOptions();
-                            MultiMap headers = result.headers().addAll(fetcherOpt.getHeaders());
-                            for (Map.Entry<String, String> entryHeader : headers.entries()) {
-                                if (entryHeader.getKey().startsWith(MessageHeader.getPrefix())) {
-                                    pusherOpt.addHeader(entryHeader.getKey(), entryHeader.getValue());
-                                }
-                            }
-                            String consumerAddress = createNormalized(SCHEDULER_CONSUMER, source);
-                            eventBus.publish(consumerAddress, result.body(), pusherOpt);
-                        } else {
-                            logger.catching(reply.cause());
-                        }
-                    });
-                });
+            long schedulerId = registerNewSchedulingRole(moduleId, config);
+            if (schedulerId >= 0) {
                 scheduledModules.put(moduleId, schedulerId);
             }
         }
 
-        router().get(create("rules")).handler(BodyHandler.create()).handler(ctx -> {
+        router().get(create("rules")).handler(ctx -> {
             JsonArray rules = new JsonArray();
             for (Map.Entry<String, Long> entry : scheduledModules.entrySet()) {
+                String moduleId = entry.getKey();
+                Long schedulerId = entry.getValue();
+
+                SchedulerConfig config = schedulerConfigs.get(moduleId);
+
+                JsonObject settings = new JsonObject()
+                        .put("consumer", config.getConsumer())
+                        .put("period", config.getPeriodSecond());
+                if (!config.getDeadLine().equals(LocalDateTime.MAX)) {
+                    settings.put("deadLine", config.getDeadLine());
+                }
+                settings.put("config", config.getConfig());
+
                 rules.add(new JsonObject()
-                        .put("id", entry.getValue())
-                        .put("module", entry.getKey())
-                        .put("config", new JsonObject())
+                        .put("id", schedulerId)
+                        .put("module", moduleId)
+                        .put("settings", settings)
                 );
             }
-            ctx.response().end(rules.encode());
+            ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON).end(rules.encode());
+        });
+
+        router().post(create("rule")).handler(BodyHandler.create()).handler(ctx -> {
+            ctx.response().end("added new rule with id");
         });
 
         router().delete(create("rule/:id")).handler(ctx -> {
@@ -101,7 +138,22 @@ class VertxScheduler extends Module {
             } catch (NumberFormatException e) {
                 throw new RuntimeException(format("Param '%s' can not be converted to integer.", idString));
             }
-            ctx.response().end("Deleted rule " + id);
+
+            if (!scheduledModules.containsValue(id)) {
+                throw new RuntimeException(format("Identifier '%s' does not exist.", id));
+            }
+
+            boolean result = vertx.cancelTimer(id);
+
+            if (result) {
+                ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON).end(new JsonObject()
+                        .put("message", format("Scheduling rule of the ID %s was successfully terminated.", id))
+                        .encode());
+            } else {
+                throw new RuntimeException(format(
+                        "Scheduling rule of the ID %s was not terminated successfully.", id
+                ));
+            }
         });
     }
 }

+ 34 - 4
connector-core/src/main/java/io/connector/core/config/SchedulerConfig.java

@@ -1,14 +1,31 @@
 package io.connector.core.config;
 
-public final class SchedulerConfig extends PropertyConfig {
+import io.vertx.core.json.JsonObject;
 
-    private final Integer period;
+import java.time.LocalDateTime;
+
+public final class SchedulerConfig {
+
+    private final String id;
     private final String consumer;
+    private final Integer period;
+    private final LocalDateTime deadLine;
+    private final JsonObject config;
 
-    public SchedulerConfig(String id, Integer period, String consumer) {
-        super(id);
+    public SchedulerConfig(String id, Integer period, String consumer, LocalDateTime deadLine, JsonObject config) {
+        this.id = id;
         this.period = period;
         this.consumer = consumer;
+        this.deadLine = deadLine;
+        this.config = config;
+    }
+
+    public SchedulerConfig(String id, Integer period, String consumer, JsonObject config) {
+        this(id, period, consumer, LocalDateTime.MAX, config);
+    }
+
+    public String getId() {
+        return id;
     }
 
     public Integer getPeriodMillisecond() {
@@ -22,4 +39,17 @@ public final class SchedulerConfig extends PropertyConfig {
     public String getConsumer() {
         return consumer;
     }
+
+    public LocalDateTime getDeadLine() {
+        return deadLine;
+    }
+
+    public boolean shouldTerminate() {
+        LocalDateTime now = LocalDateTime.now();
+        return now.isAfter(deadLine);
+    }
+
+    public JsonObject getConfig() {
+        return config;
+    }
 }

+ 4 - 5
connector-core/src/main/java/io/connector/core/config/file/FileConfigurationServiceImpl.java

@@ -4,8 +4,8 @@ import cz.senslog.common.exception.UnsupportedFileException;
 import cz.senslog.common.util.StringUtils;
 import io.connector.core.ModuleDescriptor;
 import io.connector.core.config.DefaultConfig;
-import io.connector.core.config.PropertyConfig;
 import io.connector.core.config.SchedulerConfig;
+import io.vertx.core.json.JsonObject;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.yaml.snakeyaml.Yaml;
@@ -172,19 +172,18 @@ class FileConfigurationServiceImpl  implements FileConfigurationService {
                     logger.warn(""); continue;
                 }
 
-                SchedulerConfig schedulerConfig = new SchedulerConfig(serviceId, period, consumer);
-
                 Object configObject = scheduleSettMap.get("config");
                 assert configObject instanceof Map;
                 Map<?, ?> configMap = (Map<?, ?>)configObject;
+                JsonObject jsonConfig = new JsonObject();
                 for (Map.Entry<?, ?> configEntry : configMap.entrySet()) {
                     Object name = configEntry.getKey();
                     if (name instanceof String) {
-                        schedulerConfig.setProperty((String)name, configEntry.getValue());
+                        jsonConfig.put((String)name, configEntry.getValue());
                     }
                 }
 
-                schedulerSettings.put(serviceId, schedulerConfig);
+                schedulerSettings.put(serviceId, new SchedulerConfig(serviceId, period, consumer, jsonConfig));
             }
 
             return schedulerSettings;

+ 0 - 25
connector-module-senslog1/src/main/java/io/connector/module/senslog1/SensLog1SchedulerConfig.java

@@ -1,25 +0,0 @@
-package io.connector.module.senslog1;
-
-import io.connector.core.config.AllowedStation;
-import io.connector.core.config.SchedulerConfig;
-
-import java.time.OffsetDateTime;
-
-public class SensLog1SchedulerConfig {
-
-    private final AllowedStation allowedStations;
-    private final OffsetDateTime startDate;
-
-    SensLog1SchedulerConfig(SchedulerConfig config) {
-        this.allowedStations = new AllowedStation(config.getPropertyConfig("allowedStations"));
-        this.startDate = config.getZonedDateTimeProperty("startDate").toOffsetDateTime();
-    }
-
-    public AllowedStation getAllowedStations() {
-        return allowedStations;
-    }
-
-    public OffsetDateTime getStartDate() {
-        return startDate;
-    }
-}

+ 15 - 47
connector-module-senslog1/src/main/java/io/connector/module/senslog1/gateway/SensLog1Gateway.java

@@ -1,11 +1,14 @@
 package io.connector.module.senslog1.gateway;
 
 import cz.senslog.common.util.Tuple;
-import io.connector.core.*;
-import io.connector.core.config.AllowedStation;
-import io.connector.model.senslog1.*;
+import io.connector.core.AbstractGateway;
+import io.connector.core.DataCollection;
+import io.connector.core.Message;
+import io.connector.core.MessageHeader;
+import io.connector.model.senslog1.Unit;
+import io.connector.model.senslog1.UnitData;
+import io.connector.model.senslog1.UnitInfo;
 import io.connector.module.senslog1.SensLog1HttpClient;
-import io.connector.module.senslog1.SensLog1SchedulerConfig;
 import io.vertx.core.MultiMap;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.json.JsonArray;
@@ -14,63 +17,20 @@ import io.vertx.core.json.JsonObject;
 import java.time.OffsetDateTime;
 import java.util.*;
 
-import static io.connector.core.Handler.replyToEventContext;
-import static java.time.OffsetDateTime.MAX;
-import static java.time.OffsetDateTime.MIN;
 import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME;
 
 public class SensLog1Gateway extends AbstractGateway {
 
     private final SensLog1HttpClient client;
-    private final SensLog1SchedulerConfig schedulerConfig; // TODO delete in future
 
     public SensLog1Gateway(String id, SensLog1HttpClient client) {
         super(id, true);
         this.client = client;
-        this.schedulerConfig = null;
     }
 
     @Override
     public void run() {
 
-        // TODO rename to a general name
-//        schedulerMapping()
-//                .addMapping("schedule-observations", "observations-with-info");
-
-        event().consumeScheduler("schedule-observations", message -> {
-            List<UnitData> unitData = client.lastObservations();
-
-            AllowedStation allowedStation = schedulerConfig.getAllowedStations();
-            JsonArray stations = new JsonArray();
-            for (UnitData multiSensor : unitData) {
-                if (allowedStation.isAllowed(Long.toString(multiSensor.getId()))) {
-                    OffsetDateTime from = MAX, to = MIN;
-                    for (SensorData sensor : multiSensor.getSensors()) {
-                        for (Observation observation : sensor.getObservations()) {
-                            OffsetDateTime time = observation.getTimestamp();
-                            if (time.isBefore(from)) {
-                                from = observation.getTimestamp();
-                            }
-                            if (time.isAfter(to)) {
-                                to = observation.getTimestamp();
-                            }
-                        }
-                    }
-                    stations.add(new JsonObject()
-                            .put("id", multiSensor.getId())
-                            .put("fromDate", from.format(ISO_OFFSET_DATE_TIME))
-                    );
-                }
-            }
-            JsonObject configBody = new JsonObject()
-                    .put("startDate", schedulerConfig.getStartDate().format(ISO_OFFSET_DATE_TIME))
-                    .put("allowedStations", stations);
-
-            event().send("observations-with-info", configBody, replyToEventContext(message));
-
-//            message.reply(configBody);
-        });
-
         event().consume("units", message -> {
             List<UnitInfo> units = client.units();
             message.reply(new DataCollection<>(units));
@@ -174,6 +134,14 @@ public class SensLog1Gateway extends AbstractGateway {
                 message.fail(400, "Configuration in body is required.");
             }
         });
+
+        event().consumeScheduler("test", message -> {
+            JsonObject filter = message.body() != null && message.body() instanceof JsonObject ? (JsonObject)message.body() : new JsonObject();
+
+            System.out.println(filter.encode());
+
+            message.reply("ok");
+        });
     }
 
     private static <T> Tuple<OffsetDateTime, OffsetDateTime> getTimeRangeFromParam(Message<T> message) {