Kaynağa Gözat

scheduler config

Lukas Cerny 5 yıl önce
ebeveyn
işleme
69c8671f7e

+ 3 - 1
config/test.yaml

@@ -57,5 +57,7 @@ scheduler:
       period: 30
       consumer: "schedule-observations"
       config:
+        startDate: "2020-01-01T00:00:00.000" # yyyy-MM-DD hh:mm:ss.sss
         allowedStations:
-          10002222: [1, 2]
+          # unitId: [sensorId...] // [] = all sensors
+          10002222: []

+ 81 - 0
connector-core/src/main/java/io/connector/core/config/AllowedStation.java

@@ -0,0 +1,81 @@
+package io.connector.core.config;
+
+import java.util.*;
+
+public class AllowedStation {
+
+    private static class Sensor {
+        final Integer id;
+        final Set<Integer> channels;
+        Sensor(Integer id, Set<Integer> channels) {
+            this.id = id;
+            this.channels = channels;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Sensor sensor = (Sensor) o;
+            return id.equals(sensor.id);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(id);
+        }
+    }
+
+    private final Map<String, Map<Integer, Sensor>> stations;
+
+    public AllowedStation(PropertyConfig config) {
+        Set<String> attributes = config.getAttributes();
+        Map<String, Map<Integer, Sensor>> stations = new HashMap<>(attributes.size());
+        for (String stationId : attributes) {
+            Object attrValue = config.getProperty(stationId);
+            if (attrValue instanceof List) {
+                List<?> sensors = (List<?>)attrValue;
+                for (Object sensor : sensors) {
+                    if (sensor instanceof Map) {
+                        Map<?, ?> sensorMap = (Map<?, ?>)sensor;
+                        for (Map.Entry<?, ?> sensorEntry : sensorMap.entrySet()) {
+                            if (sensorEntry.getKey() instanceof Integer) {
+                                Integer sensorId = (Integer)sensorEntry.getKey();
+                                if (sensorEntry.getValue() instanceof List) {
+                                    List<?> channels = (List<?>)sensorEntry.getValue();
+                                    Set<Integer> channelsNew = new HashSet<>(channels.size());
+                                    for (Object channel : channels) {
+                                        if (channel instanceof Integer) {
+                                            channelsNew.add((Integer)channel);
+                                        }
+                                    }
+                                    stations.computeIfAbsent(stationId, k -> new HashMap<>())
+                                            .put(sensorId, new Sensor(sensorId, channelsNew));
+                                }
+                            }
+                        }
+                    } else if (sensor instanceof Integer) {
+                        Integer sensorId = (Integer)sensor;
+                        stations.computeIfAbsent(stationId, k -> new HashMap<>())
+                                .put(sensorId, new Sensor(sensorId, Collections.emptySet()));
+                    }
+                }
+            }
+        }
+
+        this.stations = stations;
+    }
+
+    public boolean isAllowed(String stationId) {
+        return stations.containsKey(stationId);
+    }
+
+    public boolean isAllowed(String stationId, Long sensorId) {
+        return isAllowed(stationId) && stations.get(stationId).containsKey(sensorId.intValue());
+    }
+
+    public boolean isAllowed(String stationId, Long sensorId, Long channel) {
+        return isAllowed(stationId, sensorId) &&
+                stations.get(stationId).get(sensorId.intValue()).channels.contains(channel.intValue());
+    }
+}

+ 11 - 4
connector-module-senslog1/src/main/java/io/connector/module/senslog1/SensLog1SchedulerConfig.java

@@ -1,18 +1,25 @@
 package io.connector.module.senslog1;
 
+import io.connector.core.config.AllowedStation;
 import io.connector.core.config.SchedulerConfig;
 
-import java.util.Set;
+import java.time.LocalDateTime;
 
 public class SensLog1SchedulerConfig {
 
-    private final Set<String> allowedStations;
+    private final AllowedStation allowedStations;
+    private final LocalDateTime startDate;
 
     SensLog1SchedulerConfig(SchedulerConfig config) {
-        this.allowedStations = config.getPropertyConfig("allowedStations").getAttributes();
+        this.allowedStations = new AllowedStation(config.getPropertyConfig("allowedStation"));
+        this.startDate = config.getLocalDateTimeProperty("startDate");
     }
 
-    public Set<String> getAllowedStations() {
+    public AllowedStation getAllowedStations() {
         return allowedStations;
     }
+
+    public LocalDateTime getStartDate() {
+        return startDate;
+    }
 }

+ 31 - 12
connector-module-senslog1/src/main/java/io/connector/module/senslog1/gateway/SensLog1Gateway.java

@@ -5,11 +5,13 @@ import io.connector.core.AbstractGateway;
 import io.connector.core.DataCollection;
 import io.connector.core.Message;
 import io.connector.core.MessageHeader;
+import io.connector.core.config.AllowedStation;
 import io.connector.model.senslog1.*;
 import io.connector.module.senslog1.SensLog1Client;
 import io.connector.module.senslog1.SensLog1SchedulerConfig;
 import io.vertx.core.MultiMap;
 import io.vertx.core.buffer.Buffer;
+import io.vertx.core.json.JsonArray;
 import io.vertx.core.json.JsonObject;
 
 import java.time.OffsetDateTime;
@@ -17,6 +19,7 @@ import java.util.*;
 
 import static java.time.OffsetDateTime.MAX;
 import static java.time.OffsetDateTime.MIN;
+import static java.time.format.DateTimeFormatter.ISO_DATE_TIME;
 import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME;
 
 public class SensLog1Gateway extends AbstractGateway {
@@ -39,27 +42,43 @@ public class SensLog1Gateway extends AbstractGateway {
 
         event().consume("schedule-observations", message -> {
             List<UnitData> unitData = client.lastObservations();
-            OffsetDateTime from = MAX, to = MIN;
+
+            AllowedStation allowedStation = schedulerConfig.getAllowedStations();
+            JsonArray stations = new JsonArray();
             for (UnitData unit : unitData) {
-                for (SensorData sensor : unit.getSensors()) {
-                    for (Observation observation : sensor.getObservations()) {
-                        if (observation.getTimestamp().isBefore(from)) {
-                            from = observation.getTimestamp();
-                        }
-                        if (observation.getTimestamp().isAfter(to)) {
-                            to = observation.getTimestamp();
+                if (allowedStation.isAllowed(Long.toString(unit.getId()))) {
+                    OffsetDateTime from = MAX, to = MIN;
+                    JsonArray allowedSensors = new JsonArray();
+                    for (SensorData sensor : unit.getSensors()) {
+                        for (Observation observation : sensor.getObservations()) {
+                            OffsetDateTime time = observation.getTimestamp();
+                            if (time.isBefore(from)) {
+                                from = observation.getTimestamp();
+                            }
+                            if (time.isAfter(to)) {
+                                to = observation.getTimestamp();
+                            }
+                            allowedSensors.add(new JsonObject()
+                                    .put("id", sensor.getId())
+                                    .put("timestamp", time.format(ISO_OFFSET_DATE_TIME))
+                            );
                         }
                     }
+                    stations.add(new JsonObject()
+                            .put("id", unit.getId())
+                            .put("fromDate", from.format(ISO_OFFSET_DATE_TIME))
+                            .put("allowedSensors", allowedSensors));
                 }
             }
+            JsonObject configBody = new JsonObject()
+                    .put("startDate", schedulerConfig.getStartDate().format(ISO_DATE_TIME))
+                    .put("allowedStations", stations);
 
-            // TODO add scheduler config to the body
-            String allowedUnits = String.join(",", schedulerConfig.getAllowedStations());
 
-            message.reply(message.body()).options()
+            message.reply(configBody).options()
                     .addHeader("fromDate", "2020-02-10T06:30:00+01:00") // from.format(ISO_OFFSET_DATE_TIME))
                     .addHeader("toDate", "2020-02-10T07:30:00+01:00") // to.format(ISO_OFFSET_DATE_TIME))
-                    .addHeader("unitId", allowedUnits);
+                    .addHeader("unitId", "");
         });
 
         event().consume("units", message -> {