Explorar o código

Refactored DB, Observation flow, Alerting system

Lukas Cerny hai 1 ano
pai
achega
8daa67ba0d
Modificáronse 52 ficheiros con 786 adicións e 908 borrados
  1. 4 1
      docker.dev.env
  2. 36 33
      init.sql
  3. 4 1
      local.dev.env
  4. 1 1
      sql/config.sql
  5. 0 67
      src/main/java/cz/senslog/analytics/MockData.java
  6. 2 1
      src/main/java/cz/senslog/analytics/app/Application.java
  7. 34 9
      src/main/java/cz/senslog/analytics/app/PropertyConfig.java
  8. 5 5
      src/main/java/cz/senslog/analytics/app/VertxDeployer.java
  9. 12 0
      src/main/java/cz/senslog/analytics/domain/AnalyticGroup.java
  10. 40 0
      src/main/java/cz/senslog/analytics/domain/AnalyticObservation.java
  11. 1 1
      src/main/java/cz/senslog/analytics/domain/AttributeType.java
  12. 5 0
      src/main/java/cz/senslog/analytics/domain/CollectorType.java
  13. 9 9
      src/main/java/cz/senslog/analytics/domain/DoubleStatistics.java
  14. 0 16
      src/main/java/cz/senslog/analytics/domain/Group.java
  15. 0 1
      src/main/java/cz/senslog/analytics/domain/MessageBrokerConfig.java
  16. 27 2
      src/main/java/cz/senslog/analytics/domain/Observation.java
  17. 10 0
      src/main/java/cz/senslog/analytics/domain/RawObservation.java
  18. 3 44
      src/main/java/cz/senslog/analytics/domain/Sensor.java
  19. 1 1
      src/main/java/cz/senslog/analytics/domain/StatisticRecord.java
  20. 1 1
      src/main/java/cz/senslog/analytics/domain/Threshold.java
  21. 17 0
      src/main/java/cz/senslog/analytics/domain/ThresholdMode.java
  22. 5 1
      src/main/java/cz/senslog/analytics/domain/ThresholdRule.java
  23. 6 0
      src/main/java/cz/senslog/analytics/domain/ThresholdViolationAlert.java
  24. 0 6
      src/main/java/cz/senslog/analytics/domain/ThresholdViolationNotification.java
  25. 7 7
      src/main/java/cz/senslog/analytics/domain/ValidationResult.java
  26. 1 2
      src/main/java/cz/senslog/analytics/domain/ViolationReport.java
  27. 0 63
      src/main/java/cz/senslog/analytics/messaging/MessageBroker.java
  28. 0 21
      src/main/java/cz/senslog/analytics/messaging/SensLogAlertMessageBroker.java
  29. 56 0
      src/main/java/cz/senslog/analytics/module/AlertModule.java
  30. 19 17
      src/main/java/cz/senslog/analytics/module/DoubleStatisticsModule.java
  31. 3 4
      src/main/java/cz/senslog/analytics/module/MoldAnalysisModule.java
  32. 0 62
      src/main/java/cz/senslog/analytics/module/NotificationModule.java
  33. 78 65
      src/main/java/cz/senslog/analytics/module/ObservationReceiverModule.java
  34. 9 11
      src/main/java/cz/senslog/analytics/module/ScheduleDBLoaderModule.java
  35. 12 13
      src/main/java/cz/senslog/analytics/module/api/CollectedStatistics.java
  36. 39 28
      src/main/java/cz/senslog/analytics/module/api/CollectorModule.java
  37. 21 12
      src/main/java/cz/senslog/analytics/module/api/Module.java
  38. 32 11
      src/main/java/cz/senslog/analytics/module/api/SimpleModule.java
  39. 0 128
      src/main/java/cz/senslog/analytics/repository/AnalyticsConfigRepository.java
  40. 0 34
      src/main/java/cz/senslog/analytics/repository/AnalyticsDataRepository.java
  41. 22 0
      src/main/java/cz/senslog/analytics/repository/AnalyticsRepository.java
  42. 159 0
      src/main/java/cz/senslog/analytics/repository/AnalyticsRepositoryImpl.java
  43. 0 22
      src/main/java/cz/senslog/analytics/repository/ConfigurationRepository.java
  44. 0 41
      src/main/java/cz/senslog/analytics/repository/MockConfigRepository.java
  45. 0 77
      src/main/java/cz/senslog/analytics/repository/SensLogDataRepository.java
  46. 1 1
      src/main/java/cz/senslog/analytics/server/ws/ContentTypeHandler.java
  47. 6 6
      src/main/java/cz/senslog/analytics/utils/validator/InstantNotifyTrigger.java
  48. 2 2
      src/main/java/cz/senslog/analytics/utils/validator/NotifyTrigger.java
  49. 3 3
      src/main/java/cz/senslog/analytics/utils/validator/OnChangeNotifyTrigger.java
  50. 18 10
      src/main/java/cz/senslog/analytics/utils/validator/ThresholdChecker.java
  51. 20 21
      src/main/java/cz/senslog/analytics/utils/validator/Validator.java
  52. 55 47
      src/test/java/cz/senslog/analytics/utils/ThresholdCheckerTest.java

+ 4 - 1
docker.dev.env

@@ -13,4 +13,7 @@ DATABASE_POOL_SIZE=5
 AUTH_DISABLED=true
 AUTH_KEYSTORE_PATH=/app/keystore.jceks
 AUTH_KEYSTORE_TYPE=PKCS12
-AUTH_KEYSTORE_PASSWORD=SENSlog
+AUTH_KEYSTORE_PASSWORD=SENSlog
+
+# Modules
+MODULE_ALERT_SERVICE_URL=http://127.0.0.1:9090/channel/alert

+ 36 - 33
init.sql

@@ -35,8 +35,6 @@ CREATE SCHEMA analytics;
 ALTER SCHEMA analytics OWNER TO senslog;
 ALTER SCHEMA public OWNER TO senslog;
 
--- CREATE EXTENSION IF NOT EXISTS postgis WITH SCHEMA public;
-
 
 CREATE TYPE analytics.attribute_type AS ENUM ('MIN', 'MAX', 'AVG', 'VAL');
 
@@ -46,36 +44,29 @@ CREATE TYPE analytics.collector_type AS ENUM ('DOUBLE', 'MOLD');
 
 CREATE TYPE analytics.notify_trigger_mode AS ENUM ('INSTANT', 'ON_CHANGE', 'DISABLED');
 
--- CREATE TYPE message_broker_type AS ENUM ('SENSLOG_ALERT');
-
 
-create table analytics.entity_source (
+create table analytics.sensor_to_unit (
     id SERIAL PRIMARY KEY NOT NULL,
     unit_id BIGINT NOT NULL,
     sensor_id BIGINT NOT NULL,
+    last_observation TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT to_timestamp(0),
     UNIQUE (unit_id, sensor_id)
 );
 
-alter table analytics.entity_source OWNER TO senslog;
+alter table analytics.sensor_to_unit OWNER TO senslog;
+
 
 create table analytics.analytic_group (
     id SERIAL PRIMARY KEY NOT NULL,
     name VARCHAR(200) NOT NULL DEFAULT 'no_name',
     time_interval INTEGER NOT NULL,     -- interval in seconds
     persistence BOOLEAN NOT NULL,
-    collector_type analytics.collector_type
+    collector_type analytics.collector_type NOT NULL,
+    CHECK ( time_interval > 0 )
 );
 
 alter table analytics.analytic_group OWNER TO senslog;
 
-create table analytics.entity_source_to_analytic_group (
-    id SERIAL PRIMARY KEY NOT NULL,
-    entity_source_id BIGINT REFERENCES analytics.entity_source(id),
-    analytic_group_id INTEGER REFERENCES analytics.analytic_group(id),
-    time_created TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL
-);
-
-alter table analytics.entity_source_to_analytic_group OWNER TO senslog;
 
 create table analytics.record (
     id SERIAL PRIMARY KEY NOT NULL,
@@ -93,14 +84,15 @@ alter table analytics.record OWNER TO senslog;
 
 create table analytics.threshold (
    id SERIAL PRIMARY KEY NOT NULL,
-   analytic_group_id INTEGER NOT NULL,
    notify_trigger_mode analytics.notify_trigger_mode NOT NULL,
    attribute_type analytics.attribute_type NOT NULL,
-   process_on_fail BOOLEAN NOT NULL
+   process_on_fail BOOLEAN NOT NULL,
+   alert_enable BOOLEAN NOT NULL
 );
 
 alter table analytics.threshold OWNER TO senslog;
 
+
 create table analytics.threshold_rule (
    id SERIAL PRIMARY KEY NOT NULL,
    threshold_id INTEGER NOT NULL REFERENCES analytics.threshold(id),
@@ -110,19 +102,30 @@ create table analytics.threshold_rule (
 
 alter table analytics.threshold_rule OWNER TO senslog;
 
--- create table analytics.alert_broker (
---    id SERIAL PRIMARY KEY NOT NULL,
---    broker_type message_broker_type NOT NULL,
---    config jsonb NOT NULL
--- );
---
--- alter table analytics.alert_broker OWNER TO senslog;
---
--- create table analytics.notify_to_msg_broker (
---     id SERIAL PRIMARY KEY NOT NULL,
---     threshold_id integer REFERENCES analytics.thresholds(id),
---     message_broker_id integer REFERENCES analytics.alert_broker(id),
---     properties json NOT NULL
--- );
---
--- alter table analytics.notify_to_msg_broker OWNER TO senslog;
+
+create table analytics.sensor_to_unit_to_threshold (
+    sensor_to_unit_id INTEGER REFERENCES analytics.sensor_to_unit(id),
+    threshold_id INTEGER REFERENCES analytics.threshold(id),
+    UNIQUE (sensor_to_unit_id, threshold_id)
+);
+
+alter table analytics.sensor_to_unit_to_threshold OWNER TO senslog;
+
+
+create table analytics.sensor_to_unit_to_analytic_group (
+    sensor_to_unit_id INTEGER REFERENCES analytics.sensor_to_unit(id),
+    analytic_group_id INTEGER REFERENCES analytics.analytic_group(id),
+    time_created TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
+    UNIQUE (sensor_to_unit_id, analytic_group_id)
+);
+
+alter table analytics.sensor_to_unit_to_analytic_group OWNER TO senslog;
+
+
+create table analytics.analytic_group_to_threshold (
+    analytic_group_id INTEGER REFERENCES analytics.analytic_group(id),
+    threshold_id INTEGER REFERENCES analytics.threshold(id),
+    UNIQUE (analytic_group_id, threshold_id)
+);
+
+alter table analytics.analytic_group_to_threshold OWNER TO senslog;

+ 4 - 1
local.dev.env

@@ -12,4 +12,7 @@ DATABASE_POOL_SIZE=5
 # Auth
 AUTH_KEYSTORE_PATH=keystore.jceks
 AUTH_KEYSTORE_TYPE=PKCS12
-AUTH_KEYSTORE_PASSWORD=SENSlog
+AUTH_KEYSTORE_PASSWORD=SENSlog
+
+# Modules
+MODULE_ALERT_SERVICE_URL=http://127.0.0.1:9090/channel/alert

+ 1 - 1
sql/config.sql

@@ -9,7 +9,7 @@ JOIN export.units AS u ON u.unit_id = us.unit_id;
 INSERT INTO analytics.groups_interval(id, name, time_interval, persistence, collector_type)
 SELECT id, CONCAT(unit_id, '/', sensor_id), 3600, true, 'DOUBLE' FROM analytics.sensors;
 
--- Map every sensor to its group (1:1 relation)
+-- Map every sensor to its analyticGroup (1:1 relation)
 INSERT INTO analytics.sensor_to_group(sensor_id, group_id)
 SELECT id, id FROM analytics.groups_interval;
 

+ 0 - 67
src/main/java/cz/senslog/analytics/MockData.java

@@ -1,67 +0,0 @@
-package cz.senslog.analytics;
-
-import cz.senslog.analytics.domain.*;
-import cz.senslog.analytics.utils.validator.NotifyTrigger;
-import io.vertx.core.json.JsonObject;
-
-import java.time.OffsetDateTime;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-import static cz.senslog.analytics.messaging.MessageBroker.Type.SENSLOG_ALERT;
-
-public class MockData {
-
-    public static final long uniqueSensorId = 100L;
-    public static final long groupId = 1L;
-    public static final long unitId = 100000;
-    public static final long sensorId = 10343;
-    public static final double observationValue = 25.3;
-
-    public static final Sensor sensor = new Sensor(uniqueSensorId, unitId, sensorId);
-    public static final Observation observation = new Observation(sensor, observationValue, OffsetDateTime.now());
-
-    public static final Group doubleGroup = Group.of(groupId, "Rostenice_Temperature", CollectorType.DOUBLE);
-
-    public static Map<Sensor, List<Group>> mockSensorToGroupConfig() {
-        return Map.of(sensor, List.of(doubleGroup));
-    }
-
-    public static List<Threshold> mockThresholdsConfigForSensors() {
-        return List.of(new Threshold(1, uniqueSensorId, NotifyTrigger.Type.INSTANT, true, AttributeName.VAL, List.of(
-                        new ThresholdRule("gt", 10.0)
-        )));
-    }
-
-    public static List<Threshold> mockThresholdsConfigForGroups(CollectorType type) {
-        if (Objects.requireNonNull(type) == CollectorType.DOUBLE) {
-            return List.of(new Threshold(2, doubleGroup.id(), NotifyTrigger.Type.INSTANT, true, AttributeName.MIN, List.of(
-                    new ThresholdRule("lt", 0.0),
-                    new ThresholdRule("gt", 10.0)))
-            );
-        }
-        return Collections.emptyList();
-    }
-
-    public static Map<Long, Group> mockGroupsConfig(CollectorType type) {
-        if (Objects.requireNonNull(type) == CollectorType.DOUBLE) {
-            return Map.of(doubleGroup.id(), doubleGroup);
-        }
-        return Collections.emptyMap();
-    }
-
-    public static List<MessageBrokerConfig> mockMessageBrokers() {
-        return List.of(
-                new MessageBrokerConfig(1L, SENSLOG_ALERT, JsonObject.of())
-        );
-    }
-
-    public static List<SourceToMessageBroker> mockSourceToMessageBroker() {
-        return List.of(
-                new SourceToMessageBroker(uniqueSensorId, 1L, JsonObject.of()),
-                new SourceToMessageBroker(groupId, 1L, JsonObject.of())
-        );
-    }
-}

+ 2 - 1
src/main/java/cz/senslog/analytics/app/Application.java

@@ -67,7 +67,8 @@ public final class Application {
         PropertyConfig config = PropertyConfig.getInstance();
         DeploymentOptions options = new DeploymentOptions().setConfig(JsonObject.of(
                 "server", config.server(),
-                "auth", config.auth()
+                "auth", config.auth(),
+                "modules", config.modules()
         ));
 
         PropertyConfig.Database dbConfig = config.dbConfig();

+ 34 - 9
src/main/java/cz/senslog/analytics/app/PropertyConfig.java

@@ -10,8 +10,8 @@ public final class PropertyConfig {
 
     private final HttpServer httpServerConfig;
     private final Database dbConfig;
-
     private final Auth authConfig;
+    private final Modules modulesConfig;
 
     public static PropertyConfig getInstance() {
         return getInstance(System::getenv);
@@ -20,16 +20,18 @@ public final class PropertyConfig {
     private static PropertyConfig getInstance(Function<String, String> getEnv) {
         if (INSTANCE == null) {
             INSTANCE = new PropertyConfig(
-                    new HttpServer(getEnv), new Database(getEnv), new Auth(getEnv)
+                    new HttpServer(getEnv), new Database(getEnv),
+                    new Auth(getEnv), new Modules(getEnv)
             );
         }
         return INSTANCE;
     }
 
-    private PropertyConfig(HttpServer httpServer, Database dbConfig, Auth authConfig) {
+    private PropertyConfig(HttpServer httpServer, Database dbConfig, Auth authConfig, Modules modulesConfig) {
         this.httpServerConfig = httpServer;
         this.dbConfig = dbConfig;
         this.authConfig = authConfig;
+        this.modulesConfig = modulesConfig;
     }
 
     public static class HttpServer {
@@ -97,18 +99,31 @@ public final class PropertyConfig {
             return Boolean.parseBoolean(getEnv.apply("AUTH_DISABLED"));
         }
         public String getKeyStorePath() {
-            String user = getEnv.apply("AUTH_KEYSTORE_PATH");
-            return Objects.requireNonNull(user, "System environmental variable 'AUTH_KEYSTORE_PATH' is not set.");
+            String path = getEnv.apply("AUTH_KEYSTORE_PATH");
+            return Objects.requireNonNull(path, "System environmental variable 'AUTH_KEYSTORE_PATH' is not set.");
         }
 
         public String getKeyStoreType() {
-            String user = getEnv.apply("AUTH_KEYSTORE_TYPE");
-            return Objects.requireNonNull(user, "System environmental variable 'AUTH_KEYSTORE_TYPE' is not set.");
+            String type = getEnv.apply("AUTH_KEYSTORE_TYPE");
+            return Objects.requireNonNull(type, "System environmental variable 'AUTH_KEYSTORE_TYPE' is not set.");
         }
 
         public String getKeyStorePassword() {
-            String user = getEnv.apply("AUTH_KEYSTORE_PASSWORD");
-            return Objects.requireNonNull(user, "System environmental variable 'AUTH_KEYSTORE_PASSWORD' is not set.");
+            String password = getEnv.apply("AUTH_KEYSTORE_PASSWORD");
+            return Objects.requireNonNull(password, "System environmental variable 'AUTH_KEYSTORE_PASSWORD' is not set.");
+        }
+    }
+
+    public static class Modules {
+        private Modules(Function<String, String> getEnv) {
+            this.getEnv = getEnv;
+        }
+
+        private final Function<String, String> getEnv;
+
+        public String getAlertServiceUrl() {
+            String url = getEnv.apply("MODULE_ALERT_SERVICE_URL");
+            return Objects.requireNonNull(url, "System environmental variable 'MODULE_ALERT_SERVICE_URL' is not set.");
         }
     }
 
@@ -124,6 +139,10 @@ public final class PropertyConfig {
         return authConfig;
     }
 
+    public Modules modulesConfig() {
+        return modulesConfig;
+    }
+
     public JsonObject server() {
         return JsonObject.of(
                 "http.port", httpServerConfig.getPort()
@@ -149,4 +168,10 @@ public final class PropertyConfig {
             "pool.size", dbConfig.getPoolSize()
         );
     }
+
+    public JsonObject modules() {
+        return JsonObject.of(
+                "alert.service.url", modulesConfig.getAlertServiceUrl()
+        );
+    }
 }

+ 5 - 5
src/main/java/cz/senslog/analytics/app/VertxDeployer.java

@@ -1,7 +1,7 @@
 package cz.senslog.analytics.app;
 
-import cz.senslog.analytics.domain.ThresholdViolationNotification;
-import cz.senslog.analytics.domain.Observation;
+import cz.senslog.analytics.domain.ThresholdViolationAlert;
+import cz.senslog.analytics.domain.RawObservation;
 import io.vertx.core.*;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.eventbus.MessageCodec;
@@ -37,8 +37,8 @@ public class VertxDeployer extends AbstractVerticle {
 
     @Override
     public void start(Promise<Void> startPromise) {
-        vertx.eventBus().registerDefaultCodec(Observation.class, new IdentityCodec<>(Observation.class));
-        vertx.eventBus().registerDefaultCodec(ThresholdViolationNotification.class, new IdentityCodec<>(ThresholdViolationNotification.class));
+        vertx.eventBus().registerDefaultCodec(RawObservation.class, new IdentityCodec<>(RawObservation.class));
+        vertx.eventBus().registerDefaultCodec(ThresholdViolationAlert.class, new IdentityCodec<>(ThresholdViolationAlert.class));
 
 
         List<Future<Void>> futureModules = new ArrayList<>(verticles.length);
@@ -54,7 +54,7 @@ public class VertxDeployer extends AbstractVerticle {
     }
 
     private static Future<Void> deployHelper(Vertx vertx, DeploymentOptions options, AbstractVerticle verticle) {
-        logger.info("Deploying module: " + vertx.getClass().getName());
+        logger.info("Deploying module: " + verticle.getClass().getSimpleName());
         final Promise<Void> promise = Promise.promise();
         vertx.deployVerticle(verticle, options, res -> {
             if(res.failed()){

+ 12 - 0
src/main/java/cz/senslog/analytics/domain/AnalyticGroup.java

@@ -0,0 +1,12 @@
+package cz.senslog.analytics.domain;
+
+public record AnalyticGroup(long id, String name, int interval, boolean persistence, CollectorType type) {
+
+    public static AnalyticGroup of(long id, String name, int interval, boolean persistence, CollectorType type) {
+        return new AnalyticGroup(id, name, interval, persistence, type);
+    }
+
+    public static AnalyticGroup of(long id, String name, CollectorType type) {
+        return new AnalyticGroup(id, name, -1, false, type);
+    }
+}

+ 40 - 0
src/main/java/cz/senslog/analytics/domain/AnalyticObservation.java

@@ -0,0 +1,40 @@
+package cz.senslog.analytics.domain;
+
+import java.time.OffsetDateTime;
+
+public class AnalyticObservation implements TimeSeriesDatasource {
+
+    private final Observation observation;
+    private final long analyticGroupId;
+
+    public static AnalyticObservation of(Observation observation, long analyticGroup) {
+        return new AnalyticObservation(observation, analyticGroup);
+    }
+
+    private AnalyticObservation(Observation observation, long analyticGroupId) {
+        this.observation = observation;
+        this.analyticGroupId = analyticGroupId;
+    }
+
+    @Override
+    public long datasourceId() {
+        return observation.datasourceId();
+    }
+
+    @Override
+    public OffsetDateTime timestamp() {
+        return observation.timestamp();
+    }
+
+    public double value() {
+        return observation.value();
+    }
+
+    public long analyticGroupId() {
+        return analyticGroupId;
+    }
+
+    public Sensor sensor() {
+        return observation.sensor();
+    }
+}

+ 1 - 1
src/main/java/cz/senslog/analytics/domain/AttributeName.java → src/main/java/cz/senslog/analytics/domain/AttributeType.java

@@ -1,6 +1,6 @@
 package cz.senslog.analytics.domain;
 
-public enum AttributeName {
+public enum AttributeType {
 
     MIN,
     MAX,

+ 5 - 0
src/main/java/cz/senslog/analytics/domain/CollectorType.java

@@ -3,4 +3,9 @@ package cz.senslog.analytics.domain;
 public enum CollectorType {
     DOUBLE,
     MOLD,
+
+    ;
+    public static CollectorType of(String type) {
+        return valueOf(type.toUpperCase());
+    }
 }

+ 9 - 9
src/main/java/cz/senslog/analytics/domain/DoubleStatistics.java

@@ -14,27 +14,27 @@ public class DoubleStatistics implements TimeSeriesDatasource {
     private double max = -1.0D / 0.0;
     private final OffsetDateTime timestamp;
 
-    private final long sourceId;
+    private final long analyticGroupId;
 
     public static DoubleStatistics init(long sourceId, OffsetDateTime timestamp) {
         return new DoubleStatistics(sourceId, timestamp);
     }
 
-    private DoubleStatistics(long sourceId, OffsetDateTime timestamp) {
-        this.sourceId = sourceId;
+    private DoubleStatistics(long analyticGroupId, OffsetDateTime timestamp) {
+        this.analyticGroupId = analyticGroupId;
         this.timestamp = timestamp;
     }
 
     public DoubleStatistics(DoubleStatistics statistics) {
-        this(statistics.sourceId, statistics.timestamp);
+        this(statistics.analyticGroupId, statistics.timestamp);
     }
 
-    public DoubleStatistics(long sourceId, DoubleStatistics statistics, OffsetDateTime timestamp) {
-        this(sourceId, statistics.count, statistics.min, statistics.max, statistics.sum, timestamp);
+    public DoubleStatistics(long analyticGroupId, DoubleStatistics statistics, OffsetDateTime timestamp) {
+        this(analyticGroupId, statistics.count, statistics.min, statistics.max, statistics.sum, timestamp);
     }
 
-    public DoubleStatistics(long sourceId, long count, double min, double max, double sum, OffsetDateTime timestamp) {
-        this.sourceId = sourceId;
+    public DoubleStatistics(long analyticGroupId, long count, double min, double max, double sum, OffsetDateTime timestamp) {
+        this.analyticGroupId = analyticGroupId;
         this.timestamp = timestamp;
         if (count < 0L) {
             throw new IllegalArgumentException("Negative count value");
@@ -140,6 +140,6 @@ public class DoubleStatistics implements TimeSeriesDatasource {
 
     @Override
     public long datasourceId() {
-        return sourceId;
+        return analyticGroupId;
     }
 }

+ 0 - 16
src/main/java/cz/senslog/analytics/domain/Group.java

@@ -1,16 +0,0 @@
-package cz.senslog.analytics.domain;
-
-public record Group(long id, String name, int interval, boolean persistence, CollectorType type) {
-
-    public Group(long id, CollectorType type) {
-        this(id, null, -1, false, type);
-    }
-
-    public static Group of(long id) {
-        return new Group(id, null, -1, false, null);
-    }
-
-    public static Group of(long id, String name, CollectorType type) {
-        return new Group(id, name, 0, false, type);
-    }
-}

+ 0 - 1
src/main/java/cz/senslog/analytics/domain/MessageBrokerConfig.java

@@ -1,6 +1,5 @@
 package cz.senslog.analytics.domain;
 
-import cz.senslog.analytics.messaging.MessageBroker;
 import io.vertx.core.json.JsonObject;
 
 public record MessageBrokerConfig(long id, MessageBroker.Type senderType, JsonObject config) {}

+ 27 - 2
src/main/java/cz/senslog/analytics/domain/Observation.java

@@ -2,10 +2,35 @@ package cz.senslog.analytics.domain;
 
 import java.time.OffsetDateTime;
 
-public record Observation(Sensor source, double value, OffsetDateTime timestamp) implements TimeSeriesDatasource {
+public class Observation implements TimeSeriesDatasource {
+
+    private final Sensor sensor;
+    private final RawObservation rawObservation;
+
+    public static Observation of(Sensor sensor, RawObservation rawObservation) {
+        return new Observation(sensor, rawObservation);
+    }
+
+    private Observation(Sensor sensor, RawObservation rawObservation) {
+        this.sensor = sensor;
+        this.rawObservation = rawObservation;
+    }
+
+    @Override
+    public OffsetDateTime timestamp() {
+        return rawObservation.timestamp();
+    }
 
     @Override
     public long datasourceId() {
-        return source.id();
+        return sensor.id();
+    }
+
+    public Sensor sensor() {
+        return sensor;
+    }
+
+    public double value() {
+        return rawObservation.value();
     }
 }

+ 10 - 0
src/main/java/cz/senslog/analytics/domain/RawObservation.java

@@ -0,0 +1,10 @@
+package cz.senslog.analytics.domain;
+
+import java.time.OffsetDateTime;
+
+public record RawObservation(long unitId, long sensorId, double value, OffsetDateTime timestamp) {
+
+    public static RawObservation of(long unitId, long sensorId, double value, OffsetDateTime timestamp) {
+        return new RawObservation(unitId, sensorId, value, timestamp);
+    }
+}

+ 3 - 44
src/main/java/cz/senslog/analytics/domain/Sensor.java

@@ -1,50 +1,9 @@
 package cz.senslog.analytics.domain;
 
 
-import java.util.Objects;
+public record Sensor(long id, long unitId, long sensorId) {
 
-public record Sensor(long id, long unitId, long sensorId, long groupId) {
-
-    public static Sensor empty() {
-        return new Sensor(-1, -1, -1, -1);
-    }
-
-    public Sensor(long unitId, long  sensorId) {
-        this(-1, unitId, sensorId, -1);
-    }
-
-    public Sensor(long id, long unitId, long  sensorId) {
-        this(id, unitId, sensorId, -1);
-    }
-
-    public Sensor(Sensor sensor, long groupId) {
-        this(sensor.id(), sensor.unitId(), sensor.sensorId(), groupId);
-    }
-
-    public boolean isEmpty() {
-        return this.equals(empty());
-    }
-
-    public boolean isNotEmpty() {
-        return !isEmpty();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        Sensor sensor = (Sensor) o;
-        return unitId == sensor.unitId &&
-                sensorId == sensor.sensorId;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(unitId, sensorId);
-    }
-
-    @Override
-    public String toString() {
-        return String.format("[%d] %d-%d", id, unitId, sensorId);
+    public static Sensor of(long id, long unitId, long  sensorId) {
+        return new Sensor(id, unitId, sensorId);
     }
 }

+ 1 - 1
src/main/java/cz/senslog/analytics/domain/StatisticRecord.java

@@ -2,5 +2,5 @@ package cz.senslog.analytics.domain;
 
 import java.time.OffsetDateTime;
 
-public record StatisticRecord(long id, long groupId, String valueAttribute, double recordValue, int timeInterval, OffsetDateTime timestamp) {
+public record StatisticRecord(long id, long analyticGroupId, AttributeType attributeType, double calculatedValue, int timeInterval, OffsetDateTime timestamp) {
 }

+ 1 - 1
src/main/java/cz/senslog/analytics/domain/Threshold.java

@@ -5,5 +5,5 @@ import cz.senslog.analytics.utils.validator.NotifyTrigger;
 
 import java.util.List;
 
-public record Threshold(long id, long groupId, NotifyTrigger.Type notifyTriggerType, boolean allowProcess, AttributeName attribute, List<ThresholdRule> rules) {
+public record Threshold(long id, long datasourceId, NotifyTrigger.Mode notifyTriggerMode, AttributeType attributeType, boolean processOnFail, boolean notifyEnabled, List<ThresholdRule> rules) {
 }

+ 17 - 0
src/main/java/cz/senslog/analytics/domain/ThresholdMode.java

@@ -0,0 +1,17 @@
+package cz.senslog.analytics.domain;
+
+public enum ThresholdMode {
+
+    LT,
+    LE,
+    GE,
+    GT,
+    NE,
+    EQ
+
+    ;
+    public static ThresholdMode of(String mode) {
+        return valueOf(mode.toUpperCase());
+    } 
+
+}

+ 5 - 1
src/main/java/cz/senslog/analytics/domain/ThresholdRule.java

@@ -1,4 +1,8 @@
 package cz.senslog.analytics.domain;
 
-public record ThresholdRule(String mode, Double value) {
+public record ThresholdRule(ThresholdMode thresholdMode, double value) {
+
+    public static ThresholdRule of(ThresholdMode thresholdMode, double value) {
+        return new ThresholdRule(thresholdMode, value);
+    }
 }

+ 6 - 0
src/main/java/cz/senslog/analytics/domain/ThresholdViolationAlert.java

@@ -0,0 +1,6 @@
+package cz.senslog.analytics.domain;
+
+import java.time.OffsetDateTime;
+
+public record ThresholdViolationAlert(String moduleName, long analyticGroupId, String sourceName, ValidationResult[] violatedData, OffsetDateTime timestamp) {
+}

+ 0 - 6
src/main/java/cz/senslog/analytics/domain/ThresholdViolationNotification.java

@@ -1,6 +0,0 @@
-package cz.senslog.analytics.domain;
-
-import java.time.OffsetDateTime;
-
-public record ThresholdViolationNotification(String moduleName, long groupId, String sourceName, ValidationResult[] violatedData, OffsetDateTime timestamp) {
-}

+ 7 - 7
src/main/java/cz/senslog/analytics/domain/ValidationResult.java

@@ -5,14 +5,14 @@ import java.util.List;
 
 public class ValidationResult {
 
-    public record Record(String mode, double thresholdValue) {}
+    public record Record(ThresholdMode mode, double thresholdValue) {}
 
     private final double validatedValue;
-    private final AttributeName attribute;
+    private final AttributeType attributeType;
     private final List<Record> records;
 
-    public ValidationResult(AttributeName attribute, double validatedValue) {
-        this.attribute = attribute;
+    public ValidationResult(AttributeType attributeType, double validatedValue) {
+        this.attributeType = attributeType;
         this.validatedValue = validatedValue;
         this.records = new ArrayList<>();
     }
@@ -33,11 +33,11 @@ public class ValidationResult {
         return validatedValue;
     }
 
-    public AttributeName attribute() {
-        return attribute;
+    public AttributeType attributeType() {
+        return attributeType;
     }
 
-    public void addRecord(String mode, double thresholdValue) {
+    public void addRecord(ThresholdMode mode, double thresholdValue) {
         records.add(new Record(mode, thresholdValue));
     }
 }

+ 1 - 2
src/main/java/cz/senslog/analytics/domain/ViolationReport.java

@@ -1,7 +1,6 @@
 package cz.senslog.analytics.domain;
 
 import java.time.OffsetDateTime;
-import java.util.Map;
 
-public record ViolationReport(long sourceId, OffsetDateTime timestamp, ValidationResult[] violatedData) {
+public record ViolationReport(long datasourceId, OffsetDateTime timestamp, ValidationResult[] violatedData) {
 }

+ 0 - 63
src/main/java/cz/senslog/analytics/messaging/MessageBroker.java

@@ -1,63 +0,0 @@
-package cz.senslog.analytics.messaging;
-
-import cz.senslog.analytics.domain.ValidationResult;
-import cz.senslog.analytics.domain.ThresholdViolationNotification;
-import io.vertx.core.json.JsonObject;
-
-import java.time.OffsetDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.Iterator;
-import java.util.List;
-import java.util.function.Function;
-
-@FunctionalInterface
-public interface MessageBroker {
-    enum Type {
-        SENSLOG_ALERT   (SensLogAlertMessageBroker::new),
-
-        ;
-        private final Function<JsonObject, MessageBroker> constructCreator;
-
-        Type(Function<JsonObject, MessageBroker> constructCreator) {
-            this.constructCreator = constructCreator;
-        }
-
-        public MessageBroker createBroker(JsonObject config) {
-            return constructCreator.apply(config);
-        }
-    }
-
-    void send(ThresholdViolationNotification notification, JsonObject recipients);
-
-    default String createMessage(ThresholdViolationNotification n) {
-        ValidationResult[] violatedData = n.violatedData();
-        if (violatedData.length == 0) {
-            return null;
-        }
-        StringBuilder b = new StringBuilder();
-        b.append(createLineMessage(n.moduleName(), n.sourceName(), violatedData[0], n.timestamp()));
-        for (int i = 1; i < violatedData.length; i++) {
-            b.append("\n").append(createLineMessage(n.moduleName(), n.sourceName(), violatedData[i], n.timestamp()));
-        }
-        return b.toString();
-    }
-
-    default String createLineMessage(String moduleName, String sourceName, ValidationResult validationResult, OffsetDateTime timestamp) {
-        final String delimiter = " ";
-        String sourceStr = moduleName + delimiter + sourceName;
-        String attrValue = String.format("%s(%.2f)", validationResult.attribute(), validationResult.validatedValue());
-        String timestampStr = "at" + delimiter + timestamp.format(DateTimeFormatter.ISO_DATE_TIME);
-        List<ValidationResult.Record> records = validationResult.records();
-        if (records.isEmpty()) {
-            return sourceStr + delimiter + timestampStr;
-        }
-        Iterator<ValidationResult.Record> recIter = records.iterator();
-        ValidationResult.Record rec1 = recIter.next();
-        StringBuilder rulesStr = new StringBuilder(String.format("%s %s %.2f", attrValue, rec1.mode(), rec1.thresholdValue()));
-        while (recIter.hasNext()) {
-            ValidationResult.Record rec = recIter.next();
-            rulesStr.append(String.format("&& %s %s %.2f", attrValue, rec.mode(), rec.thresholdValue()));
-        }
-        return sourceStr + delimiter + rulesStr + delimiter + timestampStr;
-    }
-}

+ 0 - 21
src/main/java/cz/senslog/analytics/messaging/SensLogAlertMessageBroker.java

@@ -1,21 +0,0 @@
-package cz.senslog.analytics.messaging;
-
-import cz.senslog.analytics.domain.ThresholdViolationNotification;
-import io.vertx.core.json.JsonObject;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class SensLogAlertMessageBroker implements MessageBroker {
-    private static final Logger logger = LogManager.getLogger(SensLogAlertMessageBroker.class);
-
-    private final JsonObject config;
-
-    public SensLogAlertMessageBroker(JsonObject config) {
-        this.config = config;
-    }
-
-    @Override
-    public void send(ThresholdViolationNotification notification, JsonObject recipients) {
-        logger.info("SensLog Alert: {}", createMessage(notification));
-    }
-}

+ 56 - 0
src/main/java/cz/senslog/analytics/module/AlertModule.java

@@ -0,0 +1,56 @@
+package cz.senslog.analytics.module;
+
+import cz.senslog.analytics.domain.ThresholdViolationAlert;
+import cz.senslog.analytics.module.api.SimpleModule;
+import cz.senslog.analytics.repository.AnalyticsRepository;
+import io.vertx.core.Promise;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Arrays;
+
+
+public final class AlertModule extends SimpleModule {
+
+    private static final Logger logger = LogManager.getLogger(AlertModule.class);
+
+    private final AnalyticsRepository repo;
+
+    private String alertServiceURL;
+
+
+    public AlertModule(AnalyticsRepository repo) {
+        this.repo = repo;
+    }
+
+    @Override
+    public void configure(Promise<Void> completePromise) {
+        alertServiceURL = config().getJsonObject("modules").getString("alert.service.url");
+        completePromise.complete();
+    }
+
+    @Override
+    public void run(Promise<Void> completePromise) {
+        vertx.eventBus().<ThresholdViolationAlert>consumer(id(), msg -> vertx.executeBlocking(() -> blockingAlertHandler(msg.body()))
+                        .onSuccess(res -> logger.info("Alert sent successfully!"))
+                        .onFailure(logger::error))
+                .completionHandler(ar -> {
+                    if (ar.succeeded()) {
+                        completePromise.complete();
+                    } else {
+                        completePromise.fail(ar.cause());
+                    }
+                });
+    }
+
+    private Object blockingAlertHandler(ThresholdViolationAlert alert) {
+        logger.info("Sending the alert: Title: {}, Message: {}, At: {} to {}.", alert.sourceName(), Arrays.toString(alert.violatedData()), alert.timestamp(), alertServiceURL);
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+        // TODO send the alert
+        return null;
+    }
+}

+ 19 - 17
src/main/java/cz/senslog/analytics/module/DoubleStatisticsModule.java

@@ -2,8 +2,7 @@ package cz.senslog.analytics.module;
 
 import cz.senslog.analytics.domain.*;
 import cz.senslog.analytics.module.api.CollectorModule;
-import cz.senslog.analytics.repository.ConfigurationRepository;
-import cz.senslog.analytics.repository.AnalyticsDataRepository;
+import cz.senslog.analytics.repository.AnalyticsRepository;
 import cz.senslog.analytics.utils.validator.ThresholdChecker;
 import cz.senslog.analytics.utils.validator.Validator;
 import org.apache.logging.log4j.LogManager;
@@ -13,49 +12,52 @@ import java.util.Collection;
 import java.util.Optional;
 import java.util.stream.Stream;
 
+import static cz.senslog.analytics.domain.AttributeType.*;
+
 public class DoubleStatisticsModule extends CollectorModule {
 
     private static final Logger logger = LogManager.getLogger(DoubleStatisticsModule.class);
 
+    private static final String DEFAULT_GROUP_NAME = "unknown";
     private static final Validator<DoubleStatistics> validator;
 
     private ThresholdChecker<DoubleStatistics> thresholdChecker;
 
     static {
         validator = Validator.<DoubleStatistics>create()
-                .addMapping(AttributeName.MIN, s -> s::min)
-                .addMapping(AttributeName.MAX, s -> s::max)
-                .addMapping(AttributeName.AVG, s -> s::average);
+                .addMapping(MIN, s -> s::min)
+                .addMapping(MAX, s -> s::max)
+                .addMapping(AVG, s -> s::average);
     }
 
-    public DoubleStatisticsModule(ConfigurationRepository configRepo, AnalyticsDataRepository statisticsRep) {
-        super(CollectorType.DOUBLE, configRepo, statisticsRep);
+    public DoubleStatisticsModule(AnalyticsRepository configRepo) {
+        super(CollectorType.DOUBLE, configRepo);
     }
 
     @Override
-    protected void postConfigure() {
-        thresholdChecker = new ThresholdChecker<>(thresholds(), validator, this::notifyIfViolation);
+    protected void init() {
+        thresholdChecker = new ThresholdChecker<>(thresholds(), validator, this::notifyIfViolation, true);
     }
 
     private void notifyIfViolation(ViolationReport report) {
-        Optional<Group> source = Optional.ofNullable(groups().get(report.sourceId()));
-        String sourceName = source.map(Group::name).orElse("unknown");
-        notify(new ThresholdViolationNotification(type().name(), report.sourceId(), sourceName, report.violatedData(), report.timestamp()));
+        Optional<AnalyticGroup> source = Optional.ofNullable(groups().get(report.datasourceId()));
+        String sourceName = source.map(AnalyticGroup::name).orElse(DEFAULT_GROUP_NAME);
+        notify(new ThresholdViolationAlert(type().name(), report.datasourceId(), sourceName, report.violatedData(), report.timestamp()));
     }
 
     private Stream<StatisticRecord> mapToStatisticRecord(DoubleStatistics st) {
-        Group group = groups().get(st.datasourceId());
+        AnalyticGroup analyticGroup = groups().get(st.datasourceId());
         return Stream.of(
-            new StatisticRecord(0, st.datasourceId(), AttributeName.MIN.name(), st.min(), group.interval(), st.timestamp()),
-            new StatisticRecord(0, st.datasourceId(), AttributeName.AVG.name(), st.average(), group.interval(), st.timestamp()),
-            new StatisticRecord(0, st.datasourceId(), AttributeName.MAX.name(), st.max(), group.interval(), st.timestamp())
+            new StatisticRecord(0, st.datasourceId(), MIN, st.min(), analyticGroup.interval(), st.timestamp()),
+            new StatisticRecord(0, st.datasourceId(), AVG, st.average(), analyticGroup.interval(), st.timestamp()),
+            new StatisticRecord(0, st.datasourceId(), MAX, st.max(), analyticGroup.interval(), st.timestamp())
         );
     }
 
     @Override
     protected void handle(Collection<DoubleStatistics> aggregations) {
         aggregations.stream()
-                .filter(thresholdChecker.check())
+                .filter(thresholdChecker::check)
                 .flatMap(this::mapToStatisticRecord)
                 .forEach(this::persist);
     }

+ 3 - 4
src/main/java/cz/senslog/analytics/module/MoldAnalysisModule.java

@@ -3,8 +3,7 @@ package cz.senslog.analytics.module;
 import cz.senslog.analytics.domain.CollectorType;
 import cz.senslog.analytics.domain.DoubleStatistics;
 import cz.senslog.analytics.module.api.CollectorModule;
-import cz.senslog.analytics.repository.ConfigurationRepository;
-import cz.senslog.analytics.repository.AnalyticsDataRepository;
+import cz.senslog.analytics.repository.AnalyticsRepository;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -14,8 +13,8 @@ public class MoldAnalysisModule extends CollectorModule {
 
     private static final Logger logger = LogManager.getLogger(MoldAnalysisModule.class);
 
-    public MoldAnalysisModule(ConfigurationRepository configRepo, AnalyticsDataRepository statisticsRep) {
-        super(CollectorType.MOLD, configRepo, statisticsRep);
+    public MoldAnalysisModule(AnalyticsRepository repo) {
+        super(CollectorType.MOLD, repo);
     }
 
     @Override

+ 0 - 62
src/main/java/cz/senslog/analytics/module/NotificationModule.java

@@ -1,62 +0,0 @@
-package cz.senslog.analytics.module;
-
-import cz.senslog.analytics.domain.MessageBrokerConfig;
-import cz.senslog.analytics.domain.SourceToMessageBroker;
-import cz.senslog.analytics.domain.ThresholdViolationNotification;
-import cz.senslog.analytics.module.api.SimpleModule;
-import cz.senslog.analytics.messaging.MessageBroker;
-import cz.senslog.analytics.repository.ConfigurationRepository;
-import io.vertx.core.json.JsonObject;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-
-public final class NotificationModule extends SimpleModule {
-
-    private static final Logger logger = LogManager.getLogger(NotificationModule.class);
-
-    private record SourceSender(MessageBroker messageBroker, JsonObject properties) {
-        public void send(ThresholdViolationNotification notification) {
-            messageBroker.send(notification, properties);
-        }
-    }
-
-    private static final SourceSender DEFAULT_SOURCE_SENDER = new SourceSender((n, props) ->
-            logger.warn("No notification sender for source {}.", n.groupId()), null);
-
-    private final ConfigurationRepository repo;
-
-    private Map<Long, SourceSender> senders;
-
-    public NotificationModule(ConfigurationRepository repo) {
-        this.repo = repo;
-        this.senders = Collections.emptyMap();
-    }
-
-    @Override
-    public void configure() {
-        repo.loadMessageBrokers().onSuccess(brokers -> {
-            Map<Long, MessageBroker> msgBrokerMap = new HashMap<>(brokers.size());
-            for (MessageBrokerConfig brokerConfig : brokers) {
-                msgBrokerMap.put(brokerConfig.id(), brokerConfig.senderType().createBroker(brokerConfig.config()));
-            }
-            repo.loadSourceToMessageBrokerMapping().onSuccess(mapping -> {
-                senders = new HashMap<>(mapping.size());
-                for (SourceToMessageBroker sToMsgConfig : mapping) {
-                    MessageBroker msgBroker = msgBrokerMap.getOrDefault(sToMsgConfig.messageBrokerId(), DEFAULT_SOURCE_SENDER.messageBroker);
-                    senders.put(sToMsgConfig.thresholdId(), new SourceSender(msgBroker, sToMsgConfig.properties()));
-                }
-            }).onFailure(logger::catching);
-        }).onFailure(logger::catching);
-    }
-
-    @Override
-    public void run() {
-        vertx.eventBus().<ThresholdViolationNotification>consumer(id(),
-                msg -> senders.getOrDefault(msg.body().groupId(), DEFAULT_SOURCE_SENDER).send(msg.body()));
-    }
-}

+ 78 - 65
src/main/java/cz/senslog/analytics/module/ObservationReceiverModule.java

@@ -1,19 +1,20 @@
 package cz.senslog.analytics.module;
 
 import cz.senslog.analytics.domain.*;
-import cz.senslog.analytics.repository.ConfigurationRepository;
+import cz.senslog.analytics.repository.AnalyticsRepository;
 import cz.senslog.analytics.utils.validator.ThresholdChecker;
 import cz.senslog.analytics.module.api.Module;
 import cz.senslog.analytics.module.api.ModuleDescriptor;
 import cz.senslog.analytics.module.api.SimpleModule;
 import cz.senslog.analytics.utils.validator.Validator;
 import cz.senslog.analytics.utils.Tuple;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Function;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static java.util.Collections.emptyMap;
@@ -22,87 +23,99 @@ public class ObservationReceiverModule extends SimpleModule {
 
     private static final Logger logger = LogManager.getLogger(ObservationReceiverModule.class);
 
+    private record RawSensor(long unitId, long sensorId) {}
 
     private static final String MODULE_NAME = "RECEIVE";
-    private static final String DEFAULT_SOURCE_NAME = "unknown";
+    private final AnalyticsRepository repo;
 
-    private final ConfigurationRepository repo;
-
-    private Map<Sensor, List<Group>> sensorToGroupMap;
+    private Map<RawSensor, Sensor> sensorsMap;
+    private Map<Long, Sensor> sensorIdToSensorMap;
+    private Map<Long, List<AnalyticGroup>> sensorToMultiGroup;
     private ThresholdChecker<Observation> thresholdChecker;
 
-    private static final Validator<Observation> VALIDATOR;
-
-    static {
-        VALIDATOR = Validator.<Observation>create()
-                .addMapping(AttributeName.VAL, o -> o::value);
-    }
-
-    private AtomicInteger counter = new AtomicInteger(1);
-
-    public ObservationReceiverModule(ConfigurationRepository repo) {
+    public ObservationReceiverModule(AnalyticsRepository repo) {
         this.repo = repo;
-        this.sensorToGroupMap = emptyMap();
+        this.sensorsMap = emptyMap();
+        this.sensorToMultiGroup = emptyMap();
         this.thresholdChecker = ThresholdChecker.disabled();
     }
 
     @Override
-    public void configure() {
-        repo.loadSensorToGroupsMapping()
-                .onSuccess(data -> sensorToGroupMap = data)
-                .onFailure(logger::catching);
-
-        repo.loadSensorThresholds()
-                .onSuccess(data -> thresholdChecker = new ThresholdChecker<>(data, VALIDATOR, this::notifyIfViolated))
-                .onFailure(logger::catching);
+    public void configure(Promise<Void> completePromise) {
+
+        Future<List<Sensor>> allSensorsFuture = repo.loadAllSensors();
+        Future<List<Threshold>> thresholdsForSensorsFuture = repo.loadThresholdsForSensor();
+        Future<List<Tuple<Long, AnalyticGroup>>> datasourceToAnalyticGroupFuture = repo.loadAllAnalyticGroupsWithDatasource();
+
+        final Validator<Observation> validator = Validator.<Observation>create()
+                .addMapping(AttributeType.VAL, o -> o::value);
+
+        Future.all(allSensorsFuture, thresholdsForSensorsFuture, datasourceToAnalyticGroupFuture)
+                .onSuccess(ar -> {
+                    List<Sensor> allSensors = ar.resultAt(0);
+                    sensorsMap = new HashMap<>(allSensors.size());
+                    sensorIdToSensorMap = new HashMap<>(allSensors.size());
+                    for (Sensor s : allSensors) {
+                        sensorsMap.put(new RawSensor(s.unitId(), s.sensorId()), s);
+                        sensorIdToSensorMap.put(s.id(), s);
+                    }
+
+                    List<Threshold> sensorsThresholds = ar.resultAt(1);
+                    thresholdChecker = new ThresholdChecker<>(sensorsThresholds, validator, this::notifyIfViolated, true);
+
+                    List<Tuple<Long, AnalyticGroup>> groupsWithDS = ar.resultAt(2);
+                    sensorToMultiGroup = groupsWithDS.stream()
+                            .filter(t -> Module.containsCollectorOf(t.getItem2().type()))
+                            .collect(Collectors.groupingBy(Tuple::getItem1, TreeMap::new, Collectors.mapping(Tuple::getItem2, Collectors.toList())));
+
+                    completePromise.complete();
+                })
+                .onFailure(completePromise::fail);
     }
 
     private void notifyIfViolated(ViolationReport report) {
-        Optional<Sensor> sourceOpt = sensorToGroupMap.keySet().stream().filter(s -> s.id() == report.sourceId()).findFirst();
-        String sourceName = sourceOpt.map(s -> String.format("%d/%d", s.unitId(), s.sensorId())).orElse(DEFAULT_SOURCE_NAME);
-        notify(new ThresholdViolationNotification(MODULE_NAME, report.sourceId(), sourceName, report.violatedData(), report.timestamp()));
+        Sensor sensor = sensorIdToSensorMap.get(report.datasourceId());
+        String sourceName = String.format("unit(%d)/sensor(%d)", sensor.unitId(), sensor.sensorId());
+        notify(new ThresholdViolationAlert(MODULE_NAME, report.datasourceId(), sourceName, report.violatedData(), report.timestamp()));
     }
 
     @Override
-    public void run() {
-        // consume -> flatMap -> thsChecker -> filter(collector_type != null) -> publish
-        //-> consume -> map(collector_type == null) -> thsChecker -> map(collector_type != null) -> publish
-        eventBus().<Observation>consumer(id(), h -> Stream.of(h.body())
-                .flatMap(mappingToGroups())
-                .filter(o -> thresholdChecker.check().test(o.getItem2()))
-                .forEach(o -> o.getItem1().ifPresent(mId -> eventBus().publish(mId, o.getItem2())))
-        );
-        
-//        eventBus().<Observation>consumer(id(), v -> Stream.of(v.body())
-//                .filter(o -> sensorToGroupMap.containsKey(o.source()))
-//                .filter(thresholdChecker.check())
-//                .flatMap(mapToGroups())
-//                .forEach(t -> eventBus().publish(t.getItem1(), t.getItem2())));
-    }
-
-    private Function<Observation, Stream<Tuple<Optional<String>, Observation>>> mappingToGroups() {
-        return o -> {
-//            System.out.println(counter.getAndIncrement());
-            if (sensorToGroupMap.containsKey(o.source())) {
-                return sensorToGroupMap.get(o.source()).stream()
-                        .map(g -> Tuple.of(collectorTypeToId(g), new Observation(new Sensor(o.source(), g.id()), o.value(), o.timestamp())));
+    public void run(Promise<Void> completePromise) {
+
+        /*
+
+        1. [consume]    receive observations
+        2. [filter]     check if it is in sensorToSingleGroup OR sensorToCollectorGroup
+        2. [filter]     check thresholds by single analytic group (collector is null)
+            a) send alert if threshold alert is enabled
+            b) processed if processing on failed is enabled
+        3. [flatMap]    mapping source based on the sensorToCollectorGroup (collector is not null)
+        4. [forEach]    send rawObservation to each collector module
+
+         */
+
+        eventBus().<RawObservation>consumer(id(), h -> Stream.of(h.body())
+                .filter(o -> sensorsMap.containsKey(new RawSensor(o.unitId(), o.sensorId())))
+                .map(o -> Observation.of(sensorsMap.get(new RawSensor(o.unitId(), o.sensorId())), o))
+                .peek(o -> repo.updateLastTimestamp(o.sensor().id(), o.timestamp()).onFailure(logger::error))
+                .filter(thresholdChecker::check)
+                .flatMap(this::mappingToGroups)
+                .forEach(m -> eventBus().publish(m.getItem1().id(), m.getItem2()))
+        ).completionHandler(ar -> {
+            if (ar.succeeded()) {
+                completePromise.complete();
             } else {
-                return Stream.empty();
+                completePromise.fail(ar.cause());
             }
-        };
+        });
     }
 
-//    private Function<Observation, Stream<Tuple<Optional<String>, Observation>>> mapToGroups() {
-//        return o -> sensorToGroupMap.getOrDefault(o.source(), emptyList()) // TODO map directly to thresholdId
-//                .stream()
-//                .map(g -> Tuple.of(
-//                        collectorTypeToId(g),
-//                        Observation.of(new Sensor(o.source(), g.id()), o.value(), o.timestamp()))
-//                ).filter(t -> t.getItem1().isPresent()).map(t -> Tuple.of(t.getItem1().get(), t.getItem2()));
-//    }
-
-    private static Optional<String> collectorTypeToId(Group g) {
-        ModuleDescriptor descriptor = Module.collectorOf(g.type());
-        return descriptor != null ? Optional.of(descriptor.id()) : Optional.empty();
+    private Stream<Tuple<ModuleDescriptor, AnalyticObservation>> mappingToGroups(Observation o) {
+        if (sensorToMultiGroup.containsKey(o.datasourceId())) {
+            return sensorToMultiGroup.get(o.datasourceId()).stream()
+                    .map(g -> Tuple.of(Module.collectorOf(g.type()), AnalyticObservation.of(o, g.id())));
+        } else {
+            return Stream.empty();
+        }
     }
 }

+ 9 - 11
src/main/java/cz/senslog/analytics/module/ScheduleDBLoaderModule.java

@@ -1,9 +1,8 @@
 package cz.senslog.analytics.module;
 
-import cz.senslog.analytics.module.api.Module;
 import cz.senslog.analytics.module.api.SimpleModule;
-import cz.senslog.analytics.repository.ConfigurationRepository;
-import cz.senslog.analytics.repository.SensLogDataRepository;
+import cz.senslog.analytics.repository.AnalyticsRepository;
+import io.vertx.core.Promise;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -13,27 +12,26 @@ public class ScheduleDBLoaderModule extends SimpleModule {
 
     private static final Logger logger = LogManager.getLogger(ScheduleDBLoaderModule.class);
 
-    private final ConfigurationRepository configRepo;
+    private final AnalyticsRepository repo;
 
-    private final SensLogDataRepository sensLogRepo;
-
-    public ScheduleDBLoaderModule(ConfigurationRepository configRepo, SensLogDataRepository dataRepo) {
-        this.configRepo = configRepo;
-        this.sensLogRepo = dataRepo;
+    public ScheduleDBLoaderModule(AnalyticsRepository repo) {
+        this.repo = repo;
     }
 
     @Override
-    public void configure() {
+    public void configure(Promise<Void> completePromise) {
         // TODO load tasks from DB and creat timers
         List<Object> tasks = List.of(
                 new Object()
         );
+        completePromise.complete();
 
     }
 
     @Override
-    public void run() throws Exception {
+    public void run(Promise<Void> completePromise) {
         // TODO calculate FROM & TO
+        completePromise.complete();
 
 //        final String busId = Module.of(ObservationReceiverModule.class).id();
 //        sensLogRepo.cursorAllObservations(1000, o -> eventBus().publish(busId, o), logger::catching);

+ 12 - 13
src/main/java/cz/senslog/analytics/module/api/CollectedStatistics.java

@@ -1,7 +1,7 @@
 package cz.senslog.analytics.module.api;
 
+import cz.senslog.analytics.domain.AnalyticGroup;
 import cz.senslog.analytics.domain.DoubleStatistics;
-import cz.senslog.analytics.domain.Group;
 import cz.senslog.analytics.domain.Sensor;
 import cz.senslog.analytics.utils.DateTrunc;
 
@@ -13,18 +13,18 @@ import java.util.Map;
 
 public class CollectedStatistics {
 
-    private final Group group;
+    private final AnalyticGroup analyticGroup;
     private final OffsetDateTime startTime, endTime;
     private final Map<Long, DoubleStatistics> statistics;
 
-    public static CollectedStatistics init(Group group, OffsetDateTime timestamp) {
-        return new CollectedStatistics(group, DateTrunc.trunc(timestamp, group.interval()));
+    public static CollectedStatistics init(AnalyticGroup analyticGroup, OffsetDateTime timestamp) {
+        return new CollectedStatistics(analyticGroup, DateTrunc.trunc(timestamp, analyticGroup.interval()));
     }
 
-    private CollectedStatistics(Group group, OffsetDateTime startTime) {
-        this.group = group;
+    private CollectedStatistics(AnalyticGroup analyticGroup, OffsetDateTime startTime) {
+        this.analyticGroup = analyticGroup;
         this.startTime = startTime;
-        this.endTime = startTime.plusSeconds(group.interval());
+        this.endTime = startTime.plusSeconds(analyticGroup.interval());
         this.statistics = new HashMap<>();
     }
 
@@ -36,18 +36,17 @@ public class CollectedStatistics {
         return endTime;
     }
 
-    public Group group() {
-        return group;
+    public AnalyticGroup group() {
+        return analyticGroup;
     }
 
     public Collection<DoubleStatistics> statistics() {
         return statistics.values();
     }
 
-    public DoubleStatistics accept(Sensor source, double value) {
-        long sourceId = source.groupId();
-        DoubleStatistics st = statistics.computeIfAbsent(sourceId, s -> DoubleStatistics.init(s, startTime));
-        st.accept(sourceId, value);
+    public DoubleStatistics accept(long datasourceId, double value) {
+        DoubleStatistics st = statistics.computeIfAbsent(datasourceId, s -> DoubleStatistics.init(s, startTime));
+        st.accept(datasourceId, value);
         return st;
     }
 

+ 39 - 28
src/main/java/cz/senslog/analytics/module/api/CollectorModule.java

@@ -1,14 +1,15 @@
 package cz.senslog.analytics.module.api;
 
 import cz.senslog.analytics.domain.*;
-import cz.senslog.analytics.repository.ConfigurationRepository;
-import cz.senslog.analytics.repository.AnalyticsDataRepository;
+import cz.senslog.analytics.repository.AnalyticsRepository;
+import io.vertx.core.Promise;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.time.Instant;
 import java.time.OffsetDateTime;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 
 import static cz.senslog.analytics.utils.TimeUtils.minToMillis;
 import static cz.senslog.analytics.utils.TimeUtils.minToSec;
@@ -21,20 +22,17 @@ public abstract class CollectorModule extends SimpleModule {
 
     private final CollectorType type;
 
-    protected final ConfigurationRepository configRepo;
-
-    private final AnalyticsDataRepository statisticsRepo;
+    protected final AnalyticsRepository repo;
 
     private Map<Long, List<CollectedStatistics>> collectedStats;
-    private Map<Long, Group> groupMap;
+    private Map<Long, AnalyticGroup> groupMap;
     private List<Threshold> thresholds;
 
     private Instant lastHarvesting;
 
-    protected CollectorModule(CollectorType type, ConfigurationRepository configRepo, AnalyticsDataRepository statisticsRepo) {
+    protected CollectorModule(CollectorType type, AnalyticsRepository repo) {
         this.type = type;
-        this.configRepo = configRepo;
-        this.statisticsRepo = statisticsRepo;
+        this.repo = repo;
 
         this.groupMap = Collections.emptyMap();
         this.thresholds = Collections.emptyList();
@@ -47,20 +45,22 @@ public abstract class CollectorModule extends SimpleModule {
         return type;
     }
 
-    protected void postConfigure() {}
+    protected void init() {}
 
     @Override
-    protected final void configure() {
+    protected final void configure(Promise<Void> completePromise) {
         lastHarvesting = Instant.now();
-        configRepo.loadGroupsByCollectorType(type).onSuccess(groups -> {
-            collectedStats = new HashMap<>(groups.size());
+        repo.loadGroupsByCollectorType(type).onSuccess(groups -> {
+            collectedStats = new ConcurrentHashMap<>(groups.size());
             groupMap = groups;
 
-            configRepo.loadGroupThresholds(type).onSuccess(data -> {
+            repo.loadGroupThresholds(type).onSuccess(data -> {
                 thresholds = data;
-                postConfigure();
-            }).onFailure(logger::catching);
-        }).onFailure(logger::catching);
+                init();
+                completePromise.complete();
+            }).onFailure(completePromise::fail);
+        }).onFailure(completePromise::fail);
+
         long period = minToMillis(MAX_WAITING_TO_HARVEST_MIN);
         vertx.setPeriodic(period, period, l -> {
            if (overtakeHarvestTime()) {
@@ -92,16 +92,20 @@ public abstract class CollectorModule extends SimpleModule {
         return thresholds;
     }
 
-    protected Map<Long, Group> groups() {
+    protected Map<Long, AnalyticGroup> groups() {
         return groupMap;
     }
 
-    private void collect(Observation value) {
-        Group group = groupMap.get(value.source().groupId());
+    private void collect(AnalyticObservation observation) {
+
+        long datasourceId = observation.datasourceId();
+        OffsetDateTime timestamp = observation.timestamp();
+        double observedValue = observation.value();
+
+        AnalyticGroup analyticGroup = groupMap.get(observation.analyticGroupId());
 
-        List<CollectedStatistics> groupStatistics = collectedStats.computeIfAbsent(group.id(), g -> new ArrayList<>());
+        List<CollectedStatistics> groupStatistics = collectedStats.computeIfAbsent(analyticGroup.id(), g -> new ArrayList<>());
 
-        OffsetDateTime timestamp = value.timestamp();
         boolean newDataAccepted = false;
         Iterator<CollectedStatistics> statisticsIterator = groupStatistics.iterator();
         while(statisticsIterator.hasNext()) {
@@ -109,7 +113,7 @@ public abstract class CollectorModule extends SimpleModule {
             if (timestamp.isEqual(st.startTime()) ||    // startInterval <= timestamp < endInterval
                     (timestamp.isAfter(st.startTime()) && timestamp.isBefore(st.endTime()))
             ) {
-                st.accept(value.source(), value.value());
+                st.accept(datasourceId, observedValue);
                 newDataAccepted = true;
             } else if (timestamp.isAfter(st.endTime())) {
                 handle(st.statistics());
@@ -119,18 +123,25 @@ public abstract class CollectorModule extends SimpleModule {
         lastHarvesting = Instant.now();
 
         if (!newDataAccepted) {
-            CollectedStatistics newSt = CollectedStatistics.init(group, value.timestamp());
-            newSt.accept(value.source(), value.value());
+            CollectedStatistics newSt = CollectedStatistics.init(analyticGroup, timestamp);
+            newSt.accept(datasourceId, observedValue);
             groupStatistics.add(newSt);
         }
     }
 
     @Override
-    public void run() {
-        eventBus().<Observation>consumer(id(), v -> collect(v.body()));
+    public void run(Promise<Void> completePromise) {
+        eventBus().<AnalyticObservation>consumer(id(), v -> collect(v.body()))
+                .completionHandler(ar -> {
+                    if (ar.succeeded()) {
+                        completePromise.complete();
+                    } else {
+                        completePromise.fail(ar.cause());
+                    }
+                });
     }
 
     protected final void persist(StatisticRecord record) {
-        statisticsRepo.saveStatisticRecord(record).onFailure(logger::catching);
+        repo.saveStatisticRecord(record).onFailure(logger::catching);
     }
 }

+ 21 - 12
src/main/java/cz/senslog/analytics/module/api/Module.java

@@ -3,29 +3,27 @@ package cz.senslog.analytics.module.api;
 import cz.senslog.analytics.domain.CollectorType;
 import cz.senslog.analytics.module.*;
 import cz.senslog.analytics.repository.*;
-import cz.senslog.analytics.server.HttpVertxServer;
 import io.vertx.sqlclient.Pool;
 
-import java.util.HashMap;
+import java.util.Collection;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 public final class Module {
 
-    private static final Map<Class<? extends SimpleModule>, SimpleModule> MODULES = new HashMap<>();
+    private static final Map<Class<? extends SimpleModule>, SimpleModule> MODULES = new ConcurrentHashMap<>();
 
-    private static final Map<CollectorType, CollectorModule> COLLECTORS  = new HashMap<>();
+    private static final Map<CollectorType, CollectorModule> COLLECTORS  = new ConcurrentHashMap<>();
 
     public static SimpleModule[] createModules(Pool sensLogDB) {
-        ConfigurationRepository configRepo = new AnalyticsConfigRepository(sensLogDB);
-        SensLogDataRepository dataRepo = new SensLogDataRepository(sensLogDB);
-        AnalyticsDataRepository statisticsRep = new AnalyticsDataRepository(sensLogDB);
+        AnalyticsRepository repo = new AnalyticsRepositoryImpl(sensLogDB);
 
-        createModule(new ObservationReceiverModule(configRepo));
-        createModule(new DoubleStatisticsModule(configRepo, statisticsRep));
-        createModule(new MoldAnalysisModule(configRepo, statisticsRep));
+        createModule(new ObservationReceiverModule(repo));
+        createModule(new DoubleStatisticsModule(repo));
+        createModule(new MoldAnalysisModule(repo));
 
-        createModule(new NotificationModule(configRepo));
-        createModule(new ScheduleDBLoaderModule(configRepo, dataRepo));
+        createModule(new AlertModule(repo));
+//        createModule(new ScheduleDBLoaderModule(repo));
 
         return MODULES.values().toArray(new SimpleModule[0]);
     }
@@ -42,6 +40,7 @@ public final class Module {
     }
 
     public static SimpleModule of(Class<? extends SimpleModule> aClass) {
+        if (aClass == null) { return null; }
         return MODULES.get(aClass);
     }
 
@@ -49,4 +48,14 @@ public final class Module {
         if (type == null) { return null; }
         return COLLECTORS.get(type);
     }
+
+    public static boolean containsCollectorOf(CollectorType type) {
+        if (type == null) { return false; }
+        return COLLECTORS.containsKey(type);
+    }
+
+    public static Collection<CollectorModule> collectors() {
+        // TODO remake to ModuleDescriptor
+        return COLLECTORS.values();
+    }
 }

+ 32 - 11
src/main/java/cz/senslog/analytics/module/api/SimpleModule.java

@@ -1,18 +1,27 @@
 package cz.senslog.analytics.module.api;
 
-import cz.senslog.analytics.domain.ThresholdViolationNotification;
-import cz.senslog.analytics.module.NotificationModule;
+import cz.senslog.analytics.domain.ThresholdViolationAlert;
+import cz.senslog.analytics.module.AlertModule;
 import io.vertx.core.AbstractVerticle;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
 import io.vertx.core.eventbus.EventBus;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 public abstract class SimpleModule extends AbstractVerticle implements ModuleDescriptor {
 
-    protected abstract void configure();
+    private static final Logger logger = LogManager.getLogger(SimpleModule.class);
 
-    protected abstract void run() throws Exception;
+    protected abstract void configure(Promise<Void> completePromise);
 
-    public final void reconfigure() {
-        configure();
+    protected abstract void run(Promise<Void> completePromise);
+
+    public final Future<Void> reconfigure() {
+        Promise<Void> configPromise = Promise.promise();
+        logger.info("Reconfiguration of the module: {}", getClass().getSimpleName());
+        configure(configPromise);
+        return configPromise.future();
     }
 
     @Override
@@ -20,17 +29,29 @@ public abstract class SimpleModule extends AbstractVerticle implements ModuleDes
         return String.format("__module.%s", getClass().getSimpleName());
     }
 
+
     @Override
-    public final void start() throws Exception {
-        configure();
-        run();
+    public void start(Promise<Void> startPromise) {
+        logger.info("Start of the module: {}", getClass().getSimpleName());
+        Promise<Void> configPromise = Promise.promise();
+        logger.info("Configuration of the module: {}", getClass().getSimpleName());
+        configure(configPromise);
+        configPromise.future().compose(v -> {
+                    Promise<Void> p = Promise.promise();
+                    logger.info("Running the module: {}", getClass().getSimpleName());
+                    run(p);
+                    return p.future();
+                })
+                .onSuccess(startPromise::complete)
+                .onFailure(startPromise::fail);
+
     }
 
     protected final EventBus eventBus() {
         return vertx.eventBus();
     }
 
-    protected final void notify(ThresholdViolationNotification notification) {
-        eventBus().publish(Module.of(NotificationModule.class).id(), notification);
+    protected final void notify(ThresholdViolationAlert notification) {
+        eventBus().publish(Module.of(AlertModule.class).id(), notification);
     }
 }

+ 0 - 128
src/main/java/cz/senslog/analytics/repository/AnalyticsConfigRepository.java

@@ -1,128 +0,0 @@
-package cz.senslog.analytics.repository;
-
-import cz.senslog.analytics.domain.*;
-import cz.senslog.analytics.utils.Tuple;
-import cz.senslog.analytics.utils.validator.NotifyTrigger;
-import io.vertx.core.Future;
-import io.vertx.sqlclient.Pool;
-
-import java.util.*;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
-
-public final class AnalyticsConfigRepository implements ConfigurationRepository {
-
-    private final Pool client;
-
-    public AnalyticsConfigRepository(Pool client) {
-        this.client = client;
-    }
-
-
-    @Override
-    public Future<Map<Sensor, List<Group>>> loadSensorToGroupsMapping() {
-        return client.query("SELECT s.sensor_id, s.unit_id, sg.analytic_group_id, g.collector_type FROM analytics.entity_source_to_analytic_group sg " +
-                        "    JOIN analytics.analytic_group g on g.id = sg.id " +
-                        "    JOIN analytics.entity_source s on s.id = sg.entity_source_id")
-                .execute()
-                .map(rs -> StreamSupport.stream(rs.spliterator(), false)
-                        .map(row -> Tuple.of(
-                                new Sensor(-1, // TODO is ID important?
-                                    row.getLong("unit_id"),
-                                    row.getLong("sensor_id"),
-                                    row.getLong("analytic_group_id")
-                                ), new Group(
-                                    row.getLong("analytic_group_id"),
-                                    CollectorType.valueOf(row.getString("collector_type"))
-                                ))
-                        ).collect(Collectors.groupingBy(
-                                Tuple::getItem1, HashMap::new, Collectors.mapping(Tuple::getItem2, Collectors.toList()))
-                        )
-                );
-    }
-
-    @Override
-    public Future<List<Threshold>> loadSensorThresholds() {
-        return client.query("SELECT th.id, th.analytic_group_id, th.notify_trigger_mode, th.attribute_type, th.process_on_fail, tr.threshold_mode, tr.threshold_value " +
-                        "FROM analytics.threshold AS th " +
-                        "JOIN analytics.analytic_group AS gi ON gi.id = th.analytic_group_id " +
-                        "JOIN analytics.threshold_rule tr on th.id = tr.threshold_id " +
-                        "WHERE gi.collector_type IS NULL")
-                .execute()
-                .map(rs -> StreamSupport.stream(rs.spliterator(), false)
-                        .map(row -> new Threshold(
-                                        row.getLong("id"),
-                                        row.getLong("analytic_group_id"),
-                                        NotifyTrigger.Type.valueOf(row.getString("notify_trigger_mode")),
-                                        row.getBoolean("process_on_fail"),
-                                        AttributeName.valueOf(row.getString("attribute_type")),
-                                        new ArrayList<>() {{
-                                            add(new ThresholdRule(
-                                                    row.getString("threshold_mode"),
-                                                    row.getDouble("threshold_value")
-                                            ));
-                                        }}
-                                    )
-                        ).collect(Collectors.toMap(Threshold::id, t -> t, (p, q) -> {
-                            p.rules().add(q.rules().get(0)); return p;
-                        })).values().stream().toList()
-                );
-    }
-
-    @Override
-    public Future<List<MessageBrokerConfig>> loadMessageBrokers() {
-        // TODO waits to redesign the messaging
-        return Future.succeededFuture(Collections.emptyList());
-    }
-
-    @Override
-    public Future<List<SourceToMessageBroker>> loadSourceToMessageBrokerMapping() {
-        // TODO waits to redesign the messaging
-        return Future.succeededFuture(Collections.emptyList());
-    }
-
-    @Override
-    public Future<Map<Long, Group>> loadGroupsByCollectorType(CollectorType type) {
-        return client.preparedQuery("SELECT id, name, time_interval, persistence FROM analytics.analytic_group WHERE collector_type = $1")
-                .execute(io.vertx.sqlclient.Tuple.of(type.name()))
-                .map(rs -> StreamSupport.stream(rs.spliterator(), false)
-                        .map((row -> new Group(
-                                row.getLong("id"),
-                                row.getString("name"),
-                                row.getInteger("time_interval"),
-                                row.getBoolean("persistence"),
-                                type
-                        )))
-                        .collect(Collectors.toMap(Group::id, Function.identity()))
-        );
-    }
-
-    @Override
-    public Future<List<Threshold>> loadGroupThresholds(CollectorType type) {
-        return client.preparedQuery("SELECT th.id, th.analytic_group_id, th.notify_trigger_mode, th.attribute_type, th.process_on_fail, tr.threshold_mode, tr.threshold_value " +
-                        "FROM analytics.threshold AS th " +
-                        "JOIN analytics.analytic_group AS gi ON gi.id = th.analytic_group_id " +
-                        "JOIN analytics.threshold_rule tr on th.id = tr.threshold_id " +
-                        "WHERE gi.collector_type = $1")
-                .execute(io.vertx.sqlclient.Tuple.of(type.name()))
-                .map(rs -> StreamSupport.stream(rs.spliterator(), false)
-                        .map(row -> new Threshold(
-                                        row.getLong("id"),
-                                        row.getLong("analytic_group_id"),
-                                        NotifyTrigger.Type.valueOf(row.getString("notify_trigger_mode")),
-                                        row.getBoolean("process_on_fail"),
-                                        AttributeName.valueOf(row.getString("attribute_type")),
-                                        new ArrayList<>() {{
-                                            add(new ThresholdRule(
-                                                    row.getString("threshold_mode"),
-                                                    row.getDouble("threshold_value")
-                                            ));
-                                        }}
-                                )
-                        ).collect(Collectors.toMap(Threshold::id, t -> t, (p, q) -> {
-                            p.rules().add(q.rules().get(0)); return p;
-                        })).values().stream().toList()
-                );
-    }
-}

+ 0 - 34
src/main/java/cz/senslog/analytics/repository/AnalyticsDataRepository.java

@@ -1,34 +0,0 @@
-package cz.senslog.analytics.repository;
-
-import cz.senslog.analytics.domain.StatisticRecord;
-import io.vertx.core.Future;
-import io.vertx.pgclient.PgPool;
-import io.vertx.sqlclient.Pool;
-import io.vertx.sqlclient.Tuple;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public final class AnalyticsDataRepository {
-
-    private static final Logger logger = LogManager.getLogger(AnalyticsDataRepository.class);
-
-    private final Pool client;
-
-    public AnalyticsDataRepository(Pool client) {
-        this.client = client;
-    }
-
-    public Future<Integer> saveStatisticRecord(StatisticRecord record) {
-        if (record == null) { return Future.failedFuture("StatisticRecord to save is null."); }
-        return client.preparedQuery("INSERT INTO analytics.records(group_id, value_attribute, record_value, time_interval, time_stamp) " +
-                        "VALUES ($1, $2, $3, $4, $5) RETURNING (id)")
-                .execute(Tuple.of(record.groupId(),
-                        record.valueAttribute(),
-                        record.recordValue(),
-                        record.timeInterval(),
-                        record.timestamp()
-                ))
-                .map(rs -> rs.iterator().next().getInteger("id"))
-                .onFailure(logger::error);
-    }
-}

+ 22 - 0
src/main/java/cz/senslog/analytics/repository/AnalyticsRepository.java

@@ -0,0 +1,22 @@
+package cz.senslog.analytics.repository;
+
+import cz.senslog.analytics.domain.*;
+import cz.senslog.analytics.utils.Tuple;
+import io.vertx.core.Future;
+
+import java.time.OffsetDateTime;
+import java.util.List;
+import java.util.Map;
+
+public interface AnalyticsRepository {
+
+    Future<Long> updateLastTimestamp(long unitSensorId, OffsetDateTime timestamp);
+    Future<Integer> saveStatisticRecord(StatisticRecord record);
+    Future<List<Sensor>> loadAllSensors();
+    Future<List<Threshold>> loadThresholdsForSensor();
+    Future<List<Tuple<Long, AnalyticGroup>>> loadAllAnalyticGroupsWithDatasource();
+
+    Future<Map<Long, AnalyticGroup>> loadGroupsByCollectorType(CollectorType type);
+
+    Future<List<Threshold>> loadGroupThresholds(CollectorType type);
+}

+ 159 - 0
src/main/java/cz/senslog/analytics/repository/AnalyticsRepositoryImpl.java

@@ -0,0 +1,159 @@
+package cz.senslog.analytics.repository;
+
+import cz.senslog.analytics.domain.*;
+import cz.senslog.analytics.utils.Tuple;
+import cz.senslog.analytics.utils.validator.NotifyTrigger;
+import io.vertx.core.Future;
+import io.vertx.sqlclient.Pool;
+import io.vertx.sqlclient.RowSet;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.time.OffsetDateTime;
+import java.util.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+public final class AnalyticsRepositoryImpl implements AnalyticsRepository {
+
+    private static final Logger logger = LogManager.getLogger(AnalyticsRepositoryImpl.class);
+
+    private final Pool client;
+
+    public AnalyticsRepositoryImpl(Pool client) {
+        this.client = client;
+    }
+
+
+    @Override
+    public Future<Long> updateLastTimestamp(long unitSensorId, OffsetDateTime timestamp) {
+        return client.withTransaction(tr -> tr.preparedQuery("UPDATE analytics.sensor_to_unit SET last_observation = GREATEST(last_observation, $2) WHERE id = $1")
+                .execute(io.vertx.sqlclient.Tuple.of(unitSensorId, timestamp))
+                .map(RowSet::iterator)
+                .map(it -> it.hasNext() ? it.next().getLong(0) : 0)
+        );
+    }
+
+    @Override
+    public Future<Integer> saveStatisticRecord(StatisticRecord record) {
+        if (record == null) {
+            return Future.failedFuture("StatisticRecord to save is null.");
+        }
+        return client.preparedQuery("INSERT INTO analytics.record(analytic_group_id, attribute_type, calculated_value, time_interval, time_stamp) " +
+                        "VALUES ($1, $2, $3, $4, $5) RETURNING (id)")
+                .execute(io.vertx.sqlclient.Tuple.of(record.analyticGroupId(),
+                        record.attributeType(),
+                        record.calculatedValue(),
+                        record.timeInterval(),
+                        record.timestamp()
+                ))
+                .map(rs -> rs.iterator().next().getInteger("id"))
+                .onFailure(logger::error);
+    }
+
+    @Override
+    public Future<List<Sensor>> loadAllSensors() {
+        return client.query("SELECT id, unit_id, sensor_id FROM analytics.sensor_to_unit")
+                .execute()
+                .map(rs -> StreamSupport.stream(rs.spliterator(), false)
+                        .map(row -> Sensor.of(
+                                row.getLong("id"),
+                                row.getLong("unit_id"),
+                                row.getLong("sensor_id")
+                        )).toList()
+                );
+    }
+
+    @Override
+    public Future<List<Threshold>> loadThresholdsForSensor() {
+        return client.query("SELECT ss.id AS datasource_id, sstt.threshold_id, t.notify_trigger_mode, t.attribute_type, t.process_on_fail, t.alert_enable, tr.threshold_mode, tr.threshold_value " +
+                        "FROM analytics.sensor_to_unit ss " +
+                        "JOIN analytics.sensor_to_unit_to_threshold sstt on ss.id = sstt.sensor_to_unit_id " +
+                        "JOIN analytics.threshold t on t.id = sstt.threshold_id " +
+                        "JOIN analytics.threshold_rule tr on t.id = tr.threshold_id")
+                .execute()
+                .map(rs -> StreamSupport.stream(rs.spliterator(), false)
+                        .map(row -> new Threshold(
+                                    row.getLong("threshold_id"),
+                                    row.getLong("datasource_id"),
+                                    NotifyTrigger.Mode.valueOf(row.getString("notify_trigger_mode")),
+                                    AttributeType.valueOf(row.getString("attribute_type")),
+                                    row.getBoolean("process_on_fail"),
+                                    row.getBoolean("alert_enable"),
+                                    new ArrayList<>() {{
+                                                add(ThresholdRule.of(
+                                                        ThresholdMode.of(row.getString("threshold_mode")),
+                                                        row.getDouble("threshold_value")
+                                                ));
+                                            }}
+                                        )
+                        ).collect(Collectors.toMap(Threshold::id, t -> t, (p, q) -> {
+                            p.rules().add(q.rules().get(0)); return p;
+                        })).values().stream().toList()
+                );
+    }
+
+    @Override
+    public Future<List<Tuple<Long, AnalyticGroup>>> loadAllAnalyticGroupsWithDatasource() {
+        return client.query("SELECT sstag.sensor_to_unit_id AS datasource_id, sstag.analytic_group_id, ag.collector_type, ag.name FROM analytics.analytic_group ag " +
+                "JOIN analytics.sensor_to_unit_to_analytic_group sstag on ag.id = sstag.analytic_group_id")
+                .execute()
+                .map(rs -> StreamSupport.stream(rs.spliterator(), false)
+                        .map(row -> Tuple.of(
+                                row.getLong("datasource_id"),
+                                AnalyticGroup.of(
+                                        row.getLong("analytic_group_id"),
+                                        row.getString("name"),
+                                        CollectorType.of(row.getString("collector_type"))
+                                )
+                        )).toList()
+                );
+    }
+
+    @Override
+    public Future<Map<Long, AnalyticGroup>> loadGroupsByCollectorType(CollectorType type) {
+        return client.preparedQuery("SELECT id, name, time_interval, persistence FROM analytics.analytic_group WHERE collector_type = $1")
+                .execute(io.vertx.sqlclient.Tuple.of(type.name()))
+                .map(rs -> StreamSupport.stream(rs.spliterator(), false)
+                        .map((row -> AnalyticGroup.of(
+                                row.getLong("id"),
+                                row.getString("name"),
+                                row.getInteger("time_interval"),
+                                row.getBoolean("persistence"),
+                                type
+                        )))
+                        .collect(Collectors.toMap(AnalyticGroup::id, Function.identity()))
+        );
+    }
+
+    @Override
+    public Future<List<Threshold>> loadGroupThresholds(CollectorType type) {
+        return client.preparedQuery("SELECT ag.id AS datasource_id, agtt.threshold_id, t.notify_trigger_mode, t.attribute_type, t.process_on_fail, t.alert_enable, tr.threshold_mode, tr.threshold_value " +
+                        "FROM analytics.analytic_group ag " +
+                        "JOIN analytics.analytic_group_to_threshold agtt on ag.id = agtt.analytic_group_id " +
+                        "JOIN analytics.threshold t on t.id = agtt.threshold_id\n" +
+                        "JOIN analytics.threshold_rule tr on t.id = tr.threshold_id " +
+                        "WHERE ag.collector_type = $1")
+                .execute(io.vertx.sqlclient.Tuple.of(type.name()))
+                .map(rs -> StreamSupport.stream(rs.spliterator(), false)
+                        .map(row -> new Threshold(
+                                        row.getLong("threshold_id"),
+                                        row.getLong("datasource_id"),
+                                        NotifyTrigger.Mode.valueOf(row.getString("notify_trigger_mode")),
+                                        AttributeType.valueOf(row.getString("attribute_type")),
+                                        row.getBoolean("process_on_fail"),
+                                        row.getBoolean("alert_enable"),
+                                        new ArrayList<>() {{
+                                            add(new ThresholdRule(
+                                                    ThresholdMode.of(row.getString("threshold_mode")),
+                                                    row.getDouble("threshold_value")
+                                            ));
+                                        }}
+                                )
+                        ).collect(Collectors.toMap(Threshold::id, t -> t, (p, q) -> {
+                            p.rules().add(q.rules().get(0)); return p;
+                        })).values().stream().toList()
+                );
+    }
+}

+ 0 - 22
src/main/java/cz/senslog/analytics/repository/ConfigurationRepository.java

@@ -1,22 +0,0 @@
-package cz.senslog.analytics.repository;
-
-import cz.senslog.analytics.domain.*;
-import io.vertx.core.Future;
-
-import java.util.List;
-import java.util.Map;
-
-public interface ConfigurationRepository {
-
-    Future<Map<Sensor, List<Group>>> loadSensorToGroupsMapping();
-
-    Future<List<Threshold>> loadSensorThresholds();
-
-    Future<List<MessageBrokerConfig>> loadMessageBrokers();
-
-    Future<List<SourceToMessageBroker>> loadSourceToMessageBrokerMapping();
-
-    Future<Map<Long, Group>> loadGroupsByCollectorType(CollectorType type);
-
-    Future<List<Threshold>> loadGroupThresholds(CollectorType type);
-}

+ 0 - 41
src/main/java/cz/senslog/analytics/repository/MockConfigRepository.java

@@ -1,41 +0,0 @@
-package cz.senslog.analytics.repository;
-
-import cz.senslog.analytics.MockData;
-import cz.senslog.analytics.domain.*;
-import io.vertx.core.Future;
-
-import java.util.List;
-import java.util.Map;
-
-public class MockConfigRepository implements ConfigurationRepository {
-
-    @Override
-    public Future<Map<Sensor, List<Group>>> loadSensorToGroupsMapping() {
-        return Future.succeededFuture(MockData.mockSensorToGroupConfig());
-    }
-
-    @Override
-    public Future<List<Threshold>> loadSensorThresholds() {
-        return Future.succeededFuture(MockData.mockThresholdsConfigForSensors());
-    }
-
-    @Override
-    public Future<List<MessageBrokerConfig>> loadMessageBrokers() {
-        return Future.succeededFuture(MockData.mockMessageBrokers());
-    }
-
-    @Override
-    public Future<List<SourceToMessageBroker>> loadSourceToMessageBrokerMapping() {
-        return Future.succeededFuture(MockData.mockSourceToMessageBroker());
-    }
-
-    @Override
-    public Future<Map<Long, Group>> loadGroupsByCollectorType(CollectorType type) {
-        return Future.succeededFuture(MockData.mockGroupsConfig(type));
-    }
-
-    @Override
-    public Future<List<Threshold>> loadGroupThresholds(CollectorType type) {
-        return Future.succeededFuture(MockData.mockThresholdsConfigForGroups(type));
-    }
-}

+ 0 - 77
src/main/java/cz/senslog/analytics/repository/SensLogDataRepository.java

@@ -1,77 +0,0 @@
-package cz.senslog.analytics.repository;
-
-import cz.senslog.analytics.domain.Observation;
-import cz.senslog.analytics.domain.Sensor;
-import io.vertx.core.Future;
-import io.vertx.core.Handler;
-import io.vertx.pgclient.PgPool;
-import io.vertx.sqlclient.*;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.time.OffsetDateTime;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
-
-public final class SensLogDataRepository {
-
-    private static final Logger logger = LogManager.getLogger(SensLogDataRepository.class);
-
-    private final Pool client;
-
-    public SensLogDataRepository(Pool client) {
-        this.client = client;
-    }
-
-    public Future<List<Observation>> loadObservations(OffsetDateTime from, OffsetDateTime to) {
-        return client.query("SELECT unit_id, sensor_id, observed_value, time_stamp " +
-                        "FROM export.observations ORDER BY time_stamp LIMIT 1000")
-                .execute()
-                .map(rs -> StreamSupport.stream(rs.spliterator(), false)
-                        .map(row -> new Observation(
-                                new Sensor(
-                                        row.getLong("unit_id"),
-                                        row.getLong("sensor_id")
-                                ),
-                                row.getDouble("observed_value"),
-                                row.getOffsetDateTime("time_stamp"))
-                        ).collect(Collectors.toList())
-                );
-    }
-
-    public void cursorAllObservations(int fetch, Handler<Observation> dataStreamHandler, Handler<Throwable> exceptionHandler) {
-        client.getConnection().onSuccess(conn -> conn
-                .prepare("SELECT observation_id, unit_id, sensor_id, observed_value, time_stamp " +
-                        "FROM export.observations ORDER BY time_stamp")
-                .onComplete(ar -> {
-                    if (ar.succeeded()) {
-                        PreparedStatement pq = ar.result();
-                        conn.begin().onComplete(ar1 -> {
-                            if (ar1.succeeded()) {
-                                Transaction tx = ar1.result();
-
-                                RowStream<Row> stream = pq.createStream(fetch);
-
-                                stream.exceptionHandler(exceptionHandler);
-                                stream.endHandler(v -> stream.close()
-                                        .onComplete(closed -> tx.commit()
-                                            .onComplete(committed ->
-                                                    logger.info("End of stream")
-                                            )
-                                        )
-                                );
-                                stream.handler(row -> dataStreamHandler.handle(new Observation(
-                                        new Sensor(
-                                                row.getLong("unit_id"),
-                                                row.getLong("sensor_id")
-                                        ),
-                                        row.getDouble("observed_value"),
-                                        row.getOffsetDateTime("time_stamp"))
-                                ));
-                            }
-                        });
-                    }
-                }));
-    }
-}

+ 1 - 1
src/main/java/cz/senslog/analytics/server/ws/ContentTypeHandler.java

@@ -38,7 +38,7 @@ public final class ContentTypeHandler {
     
     public FluentHandler accept(String accept) {
         if (accept == null || accept.isBlank()) {
-            throw new HttpIllegalArgumentException(400, "The Media Type is missing. Add 'Accept' attribute in header.");
+            throw new HttpIllegalArgumentException(400, "The Media Type is missing. Add 'Accept' attributeType in header.");
         }
 
         Set<ContentType> parsedTypes = parseContentTypes(accept);

+ 6 - 6
src/main/java/cz/senslog/analytics/utils/validator/InstantNotifyTrigger.java

@@ -1,6 +1,6 @@
 package cz.senslog.analytics.utils.validator;
 
-import cz.senslog.analytics.domain.AttributeName;
+import cz.senslog.analytics.domain.AttributeType;
 import cz.senslog.analytics.domain.ValidationResult;
 
 import java.util.ArrayList;
@@ -10,8 +10,8 @@ import java.util.Map;
 
 public class InstantNotifyTrigger implements NotifyTrigger {
 
-    private final Map<AttributeName, ValidationResult> currentResultMap;
-    private List<AttributeName> tempAttrsToNotify;
+    private final Map<AttributeType, ValidationResult> currentResultMap;
+    private List<AttributeType> tempAttrsToNotify;
 
     public InstantNotifyTrigger() {
         this.currentResultMap = new HashMap<>();
@@ -19,7 +19,7 @@ public class InstantNotifyTrigger implements NotifyTrigger {
 
     @Override
     public void accept(ValidationResult validationResult) {
-        currentResultMap.put(validationResult.attribute(), validationResult);
+        currentResultMap.put(validationResult.attributeType(), validationResult);
     }
 
     @Override
@@ -29,7 +29,7 @@ public class InstantNotifyTrigger implements NotifyTrigger {
         for (ValidationResult res : currentResultMap.values()) {
             if (res.isNotValid()) {
                 notify = true;
-                tempAttrsToNotify.add(res.attribute());
+                tempAttrsToNotify.add(res.attributeType());
             }
         }
         return notify;
@@ -39,7 +39,7 @@ public class InstantNotifyTrigger implements NotifyTrigger {
     public ValidationResult[] resultsToNotify() {
         ValidationResult[] res = new ValidationResult[tempAttrsToNotify.size()];
         int ind = 0;
-        for (AttributeName attr : tempAttrsToNotify) {
+        for (AttributeType attr : tempAttrsToNotify) {
             res[ind++] = currentResultMap.get(attr);
         }
         tempAttrsToNotify = null;

+ 2 - 2
src/main/java/cz/senslog/analytics/utils/validator/NotifyTrigger.java

@@ -12,14 +12,14 @@ public interface NotifyTrigger {
     boolean shouldNotify();
     ValidationResult[] resultsToNotify();
 
-    enum Type {
+    enum Mode {
         DISABLED    (DisableNotifyTrigger::new),
         INSTANT     (InstantNotifyTrigger::new),
         ON_CHANGE   (OnChangeNotifyTrigger::new),
 
         ;
 
-        Type(Supplier<NotifyTrigger> constructCreator) {
+        Mode(Supplier<NotifyTrigger> constructCreator) {
             this.constructCreator = constructCreator;
         }
 

+ 3 - 3
src/main/java/cz/senslog/analytics/utils/validator/OnChangeNotifyTrigger.java

@@ -1,7 +1,7 @@
 package cz.senslog.analytics.utils.validator;
 
 
-import cz.senslog.analytics.domain.AttributeName;
+import cz.senslog.analytics.domain.AttributeType;
 import cz.senslog.analytics.domain.ValidationResult;
 
 import java.util.ArrayList;
@@ -23,7 +23,7 @@ public class OnChangeNotifyTrigger implements NotifyTrigger {
 
     private static final int CACHE_LEN = 2;
 
-    private final Map<AttributeName, CacheWrapper> cacheMap;
+    private final Map<AttributeType, CacheWrapper> cacheMap;
 
     private List<ValidationResult> tempToNotify;
 
@@ -33,7 +33,7 @@ public class OnChangeNotifyTrigger implements NotifyTrigger {
 
     @Override
     public void accept(ValidationResult validationResult) {
-        AttributeName attr = validationResult.attribute();
+        AttributeType attr = validationResult.attributeType();
         if (!cacheMap.containsKey(attr)) {
             cacheMap.put(attr, new CacheWrapper(new ValidationResult[CACHE_LEN], 0));
         }

+ 18 - 10
src/main/java/cz/senslog/analytics/utils/validator/ThresholdChecker.java

@@ -12,7 +12,7 @@ import static java.util.Collections.emptyList;
 public class ThresholdChecker<DS extends TimeSeriesDatasource> {
 
     public static <T extends TimeSeriesDatasource> ThresholdChecker<T> disabled() {
-        return ThresholdChecker.create(emptyList(), null, null);
+        return ThresholdChecker.create(emptyList(), null, null, true);
     }
 
     private final Map<Long, List<Threshold>> thresholds;
@@ -23,28 +23,32 @@ public class ThresholdChecker<DS extends TimeSeriesDatasource> {
 
     private final Map<Long, NotifyTrigger> notifyTriggerMap;
 
+    private final boolean defaultIfNotPresent;
+
     public static <DS extends TimeSeriesDatasource> ThresholdChecker<DS> create(List<Threshold> thresholds,
                                                            Validator<DS> validator,
-                                                           Consumer<ViolationReport> ifViolated)
+                                                           Consumer<ViolationReport> ifViolated, boolean defaultIfNotPresent)
     {
-        return new ThresholdChecker<>(thresholds, validator, ifViolated);
+        return new ThresholdChecker<>(thresholds, validator, ifViolated, defaultIfNotPresent);
     }
 
     public ThresholdChecker(List<Threshold> thresholds,
                             Validator<DS> validator,
-                            Consumer<ViolationReport> ifViolated
+                            Consumer<ViolationReport> ifViolated,
+                            boolean defaultIfNotPresent
     ) {
         this.validator = validator;
         this.ifViolated = ifViolated;
+        this.defaultIfNotPresent = defaultIfNotPresent;
 
         this.notifyTriggerMap = new HashMap<>();
         this.thresholds = new HashMap<>();
         for (Threshold th : thresholds) {
-            long sourceId = th.groupId();
+            long sourceId = th.datasourceId();
             if (!notifyTriggerMap.containsKey(sourceId)) {
-                this.notifyTriggerMap.put(sourceId, th.notifyTriggerType().createInstance());
+                this.notifyTriggerMap.put(sourceId, th.notifyTriggerMode().createInstance());
             }
-            this.thresholds.computeIfAbsent(th.groupId(), k -> new ArrayList<>()).add(th);
+            this.thresholds.computeIfAbsent(th.datasourceId(), k -> new ArrayList<>()).add(th);
         }
     }
 
@@ -52,12 +56,16 @@ public class ThresholdChecker<DS extends TimeSeriesDatasource> {
         return this::validateThreshold;
     }
 
+    public boolean check(DS data) {
+        return validateThreshold(data);
+    }
+
     private boolean validateThreshold(DS data) {
         long sourceId = data.datasourceId();
         OffsetDateTime timestamp = data.timestamp();
 
-        if (!thresholds.containsKey(sourceId)) {
-            return true;
+        if (sourceId <= 0 || !thresholds.containsKey(sourceId)) {
+            return defaultIfNotPresent;
         }
         boolean process = true;
         List<Threshold> rules = thresholds.get(sourceId);
@@ -67,7 +75,7 @@ public class ThresholdChecker<DS extends TimeSeriesDatasource> {
             ValidationResult res = validator.validate(data, th);
             notifier.accept(res);
 
-            if (res.isNotValid() && !th.allowProcess()) {
+            if (res.isNotValid() && !th.processOnFail()) {
                 process = false;
             }
         }

+ 20 - 21
src/main/java/cz/senslog/analytics/utils/validator/Validator.java

@@ -1,9 +1,6 @@
 package cz.senslog.analytics.utils.validator;
 
-import cz.senslog.analytics.domain.AttributeName;
-import cz.senslog.analytics.domain.Threshold;
-import cz.senslog.analytics.domain.ThresholdRule;
-import cz.senslog.analytics.domain.ValidationResult;
+import cz.senslog.analytics.domain.*;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -11,32 +8,34 @@ import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
+import static cz.senslog.analytics.domain.ThresholdMode.*;
+
 public class Validator<T> {
 
-    private static final Map<String, BiFunction<Double, Double, Boolean>> functions;
+    private static final Map<ThresholdMode, BiFunction<Double, Double, Boolean>> functions;
 
     static {
         functions = new HashMap<>(6);
-        functions.put("gt", (val, ths) -> val > ths);
-        functions.put("ge", (val, ths) -> val >= ths);
-        functions.put("lt", (val, ths) -> val < ths);
-        functions.put("le", (val, ths) -> val <= ths);
-        functions.put("eq", Double::equals);
-        functions.put("ne", (val, ths) -> !val.equals(ths));
+        functions.put(GT, (val, ths) -> val > ths);
+        functions.put(GE, (val, ths) -> val >= ths);
+        functions.put(LT, (val, ths) -> val < ths);
+        functions.put(LE, (val, ths) -> val <= ths);
+        functions.put(EQ, Double::equals);
+        functions.put(NE, (val, ths) -> !val.equals(ths));
     }
 
-    public static boolean checkThresholdValue(String mode, Double value, Double threshold) {
+    public static boolean checkThresholdValue(ThresholdMode mode, Double value, Double threshold) {
         if (mode == null || value == null || threshold == null) return false;
         return functions.getOrDefault(mode, (val, ths) -> false).apply(value, threshold);
     }
 
     public static boolean checkThresholdValue(ThresholdRule rule, Double value) {
         if (rule == null || value == null) return false;
-        return checkThresholdValue(rule.mode(), value, rule.value());
+        return checkThresholdValue(rule.thresholdMode(), value, rule.value());
     }
 
     public interface AttributeMapping<T> {
-        Validator<T> addMapping(AttributeName property, Function<T, Supplier<Double>> getter);
+        Validator<T> addMapping(AttributeType property, Function<T, Supplier<Double>> getter);
     }
 
 
@@ -44,20 +43,20 @@ public class Validator<T> {
         return new AttributeMapping<>() {
             private final Validator<T> validator = new Validator<>();
             @Override
-            public Validator<T> addMapping(AttributeName property, Function<T, Supplier<Double>> getter) {
+            public Validator<T> addMapping(AttributeType property, Function<T, Supplier<Double>> getter) {
                 validator.map.put(property, getter);
                 return validator;
             }
         };
     }
 
-    private final Map<AttributeName, Function<T, Supplier<Double>>> map;
+    private final Map<AttributeType, Function<T, Supplier<Double>>> map;
 
     private Validator() {
         this.map = new HashMap<>();
     }
 
-    public Validator<T> addMapping(AttributeName property, Function<T, Supplier<Double>> getter) {
+    public Validator<T> addMapping(AttributeType property, Function<T, Supplier<Double>> getter) {
         map.put(property, getter);
         return this;
     }
@@ -66,12 +65,12 @@ public class Validator<T> {
         return validate(object, threshold, map);
     }
 
-    public static <T> ValidationResult validate(T object, Threshold threshold, Map<AttributeName, Function<T, Supplier<Double>>> attributeMapping) {
-        Double value = attributeMapping.getOrDefault(threshold.attribute(), (o) -> () -> null).apply(object).get();
-        ValidationResult result = new ValidationResult(threshold.attribute(), value);
+    public static <T> ValidationResult validate(T object, Threshold threshold, Map<AttributeType, Function<T, Supplier<Double>>> attributeMapping) {
+        Double value = attributeMapping.getOrDefault(threshold.attributeType(), (o) -> () -> null).apply(object).get();
+        ValidationResult result = new ValidationResult(threshold.attributeType(), value);
         for (ThresholdRule rule : threshold.rules()) {
             if (checkThresholdValue(rule, value)) {
-                result.addRecord(rule.mode(), rule.value());
+                result.addRecord(rule.thresholdMode(), rule.value());
             }
         }
         return result;

+ 55 - 47
src/test/java/cz/senslog/analytics/utils/ThresholdCheckerTest.java

@@ -8,8 +8,6 @@ import org.junit.jupiter.api.Test;
 
 import java.time.OffsetDateTime;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Stream;
 
@@ -17,30 +15,33 @@ import static org.junit.jupiter.api.Assertions.*;
 
 class ThresholdCheckerTest {
 
-    private static final Sensor SOURCE = new Sensor(100L, 100000L, 10343L, 10L);
+    private static final Sensor SENSOR = new Sensor(100L, 1000L, 1001L);
 
     @Test
     void check_size_1_INSTANT() {
 
         ThresholdChecker<Observation> checker = new ThresholdChecker<>(
                 List.of(
-                    new Threshold(SOURCE.id(), NotifyTrigger.Type.INSTANT, true, AttributeName.VAL, List.of(new ThresholdRule("gt", 10.0)))
+                    new Threshold(1, SENSOR.id(), NotifyTrigger.Mode.INSTANT, AttributeType.VAL, true, false, List.of(
+                            new ThresholdRule(ThresholdMode.GT, 10.0)
+                    ))
                 ),
                 Validator.<Observation>create()
-                        .addMapping(AttributeName.VAL, o -> o::value),
+                        .addMapping(AttributeType.VAL, o -> o::value),
                 (report) -> {
-                    assertEquals(SOURCE.id(), report.sourceId());
+                    assertEquals(SENSOR.id(), report.datasourceId());
                     assertEquals(1, report.violatedData().length);
                     ValidationResult valResult = report.violatedData()[0];
-                    assertEquals(AttributeName.VAL, valResult.attribute());
-                    assertEquals(25.3, valResult.validatedValue());
+                    assertEquals(AttributeType.VAL, valResult.attributeType());
+                    assertEquals(23.3, valResult.validatedValue());
                     assertEquals(1, valResult.records().size());
-                }
+                },
+                true
         );
 
         long count = Stream.of(
-                Observation.of(SOURCE, 25.3, OffsetDateTime.now())
-        ).filter(checker.check()).count();
+                Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 23.3, OffsetDateTime.now()))
+        ).filter(checker::check).count();
         assertEquals(1, count);
     }
 
@@ -49,25 +50,28 @@ class ThresholdCheckerTest {
 
         ThresholdChecker<Observation> checker = new ThresholdChecker<>(
                 List.of(
-                        new Threshold(SOURCE.id(), NotifyTrigger.Type.ON_CHANGE, true, AttributeName.VAL, List.of(new ThresholdRule("gt", 10.0)))
+                        new Threshold(1, SENSOR.id(), NotifyTrigger.Mode.ON_CHANGE, AttributeType.VAL, true, false, List.of(
+                                new ThresholdRule(ThresholdMode.GT, 10.0)
+                        ))
                 ),
                 Validator.<Observation>create()
-                        .addMapping(AttributeName.VAL, o -> o::value),
+                        .addMapping(AttributeType.VAL, o -> o::value),
                 (report) -> {
-                    assertEquals(SOURCE.id(), report.sourceId());
+                    assertEquals(SENSOR.id(), report.datasourceId());
                     assertEquals(1, report.violatedData().length);
                     ValidationResult valResult = report.violatedData()[0];
-                    assertEquals(AttributeName.VAL, valResult.attribute());
+                    assertEquals(AttributeType.VAL, valResult.attributeType());
                     assertEquals(25.3, valResult.validatedValue());
                     assertEquals(1, valResult.records().size());
-                }
+                },
+                true
         );
 
         long count = Stream.of(
-                Observation.of(SOURCE, 9.9, OffsetDateTime.now()),
-                Observation.of(SOURCE, 25.3, OffsetDateTime.now()),
-                Observation.of(SOURCE, 30.5, OffsetDateTime.now())
-        ).filter(checker.check()).count();
+                Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(),9.9, OffsetDateTime.now())),
+                Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(),25.3, OffsetDateTime.now())),
+                Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(),30.5, OffsetDateTime.now()))
+        ).filter(checker::check).count();
         assertEquals(3, count);
     }
 
@@ -76,15 +80,17 @@ class ThresholdCheckerTest {
         AtomicInteger reportStep = new AtomicInteger(1);
         ThresholdChecker<Observation> checker = new ThresholdChecker<>(
                 List.of(
-                        new Threshold(SOURCE.id(), NotifyTrigger.Type.ON_CHANGE, true, AttributeName.VAL, List.of(new ThresholdRule("gt", 10.0)))
+                        new Threshold(1, SENSOR.id(), NotifyTrigger.Mode.ON_CHANGE, AttributeType.VAL, true, false, List.of(
+                                new ThresholdRule(ThresholdMode.GT, 10.0)
+                        ))
                 ),
                 Validator.<Observation>create()
-                        .addMapping(AttributeName.VAL, o -> o::value),
+                        .addMapping(AttributeType.VAL, o -> o::value),
                 (report) -> {
-                    assertEquals(SOURCE.id(), report.sourceId());
+                    assertEquals(SENSOR.id(), report.datasourceId());
                     assertEquals(1, report.violatedData().length);
                     ValidationResult valResult = report.violatedData()[0];
-                    assertEquals(AttributeName.VAL, valResult.attribute());
+                    assertEquals(AttributeType.VAL, valResult.attributeType());
 
                     if (reportStep.compareAndSet(1, 2)) {
                         assertEquals(25.3, valResult.validatedValue());
@@ -96,56 +102,58 @@ class ThresholdCheckerTest {
                         assertTrue(valResult.isValid());
                         assertEquals(0, valResult.records().size());
                     }
-                }
+                },
+                true
         );
 
         long count = Stream.of(
-                Observation.of(SOURCE, 9.9, OffsetDateTime.now()),
-                Observation.of(SOURCE, 25.3, OffsetDateTime.now()),
-                Observation.of(SOURCE, 30.5, OffsetDateTime.now()),
-                Observation.of(SOURCE, 8.5, OffsetDateTime.now())
-        ).filter(checker.check()).count();
+                Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 9.9, OffsetDateTime.now())),
+                Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 25.3, OffsetDateTime.now())),
+                Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(),30.5, OffsetDateTime.now())),
+                Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 8.5, OffsetDateTime.now()))
+        ).filter(checker::check).count();
         assertEquals(4, count);
     }
     
     @Test
     void check_doubleStatistics_INSTANT() {
-        long sourceId = SOURCE.groupId();
+        long datasourceId = SENSOR.id();
 
         ThresholdChecker<DoubleStatistics> checker = new ThresholdChecker<>(
                 List.of(
-                        new Threshold(sourceId, NotifyTrigger.Type.INSTANT, true, AttributeName.MIN, List.of(new ThresholdRule("lt", 0.0))),
-                        new Threshold(sourceId, NotifyTrigger.Type.INSTANT, true, AttributeName.MAX, List.of(new ThresholdRule("gt", 5.0)))
+                        new Threshold(1, -1, NotifyTrigger.Mode.INSTANT, AttributeType.MIN, true, false, List.of(new ThresholdRule(ThresholdMode.LT, 0.0))),
+                        new Threshold(1, -1, NotifyTrigger.Mode.INSTANT, AttributeType.MAX, true, false, List.of(new ThresholdRule(ThresholdMode.GT, 5.0)))
                 ),
                 Validator.<DoubleStatistics>create()
-                        .addMapping(AttributeName.MIN, s -> s::min)
-                        .addMapping(AttributeName.MAX, s -> s::max)
-                        .addMapping(AttributeName.AVG, s -> s::average),
+                        .addMapping(AttributeType.MIN, s -> s::min)
+                        .addMapping(AttributeType.MAX, s -> s::max)
+                        .addMapping(AttributeType.AVG, s -> s::average),
                 (report) -> {
-                    assertEquals(sourceId, report.sourceId());
+                    assertEquals(datasourceId, report.datasourceId());
                     assertEquals(2, report.violatedData().length);
                     ValidationResult[] violatedData = report.violatedData();
-                    ValidationResult minResult = violatedData[0].attribute().equals(AttributeName.MIN) ? violatedData[0] : violatedData[1];
-                    ValidationResult maxResult = violatedData[0].attribute().equals(AttributeName.MAX) ? violatedData[0] : violatedData[1];
+                    ValidationResult minResult = violatedData[0].attributeType().equals(AttributeType.MIN) ? violatedData[0] : violatedData[1];
+                    ValidationResult maxResult = violatedData[0].attributeType().equals(AttributeType.MAX) ? violatedData[0] : violatedData[1];
 
-                    assertEquals(AttributeName.MIN, minResult.attribute());
+                    assertEquals(AttributeType.MIN, minResult.attributeType());
                     assertEquals(-1.5, minResult.validatedValue());
                     assertEquals(1, minResult.records().size());
 
-                    assertEquals(AttributeName.MAX, maxResult.attribute());
+                    assertEquals(AttributeType.MAX, maxResult.attributeType());
                     assertEquals(5.2, maxResult.validatedValue());
                     assertEquals(1, maxResult.records().size());
-                }
+                },
+                true
         );
 
-        DoubleStatistics ds = DoubleStatistics.init(sourceId, OffsetDateTime.now());
-        ds.accept(sourceId, -1.5);
-        ds.accept(sourceId, 3.3);
-        ds.accept(sourceId, 5.2);
+        DoubleStatistics ds = DoubleStatistics.init(datasourceId, OffsetDateTime.now());
+        ds.accept(datasourceId, -1.5);
+        ds.accept(datasourceId, 3.3);
+        ds.accept(datasourceId, 5.2);
 
 
         long count = Stream.of(ds)
-                .filter(checker.check()).count();
+                .filter(checker::check).count();
         assertEquals(1, count);
     }
 }