瀏覽代碼

Reimplementation of alert triggers, legacy ThresholdChecker is present

Lukas Cerny 1 年之前
父節點
當前提交
9829cc2d8e
共有 58 個文件被更改,包括 1933 次插入273 次删除
  1. 1 0
      build.gradle
  2. 5 1
      docker-compose.yaml
  3. 5 1
      docker.dev.env
  4. 29 14
      init.sql
  5. 2 1
      src/main/java/cz/senslog/analytics/app/Application.java
  6. 28 4
      src/main/java/cz/senslog/analytics/app/PropertyConfig.java
  7. 9 2
      src/main/java/cz/senslog/analytics/app/VertxDeployer.java
  8. 6 0
      src/main/java/cz/senslog/analytics/domain/AttributeType.java
  9. 27 0
      src/main/java/cz/senslog/analytics/domain/ComparisonOperator.java
  10. 48 0
      src/main/java/cz/senslog/analytics/domain/ListHolder.java
  11. 0 5
      src/main/java/cz/senslog/analytics/domain/MessageBrokerConfig.java
  12. 5 3
      src/main/java/cz/senslog/analytics/domain/Threshold.java
  13. 12 0
      src/main/java/cz/senslog/analytics/domain/ThresholdDimension.java
  14. 8 0
      src/main/java/cz/senslog/analytics/domain/ThresholdDimensionRule.java
  15. 0 17
      src/main/java/cz/senslog/analytics/domain/ThresholdMode.java
  16. 0 8
      src/main/java/cz/senslog/analytics/domain/ThresholdRule.java
  17. 5 1
      src/main/java/cz/senslog/analytics/domain/ThresholdViolationAlert.java
  18. 13 0
      src/main/java/cz/senslog/analytics/domain/VariableType.java
  19. 0 6
      src/main/java/cz/senslog/analytics/domain/ViolationReport.java
  20. 51 25
      src/main/java/cz/senslog/analytics/module/AlertModule.java
  21. 2 1
      src/main/java/cz/senslog/analytics/module/DoubleStatisticsModule.java
  22. 27 28
      src/main/java/cz/senslog/analytics/module/ObservationReceiverModule.java
  23. 1 1
      src/main/java/cz/senslog/analytics/module/api/Module.java
  24. 41 30
      src/main/java/cz/senslog/analytics/repository/AnalyticsRepositoryImpl.java
  25. 31 1
      src/main/java/cz/senslog/analytics/server/HttpVertxServer.java
  26. 44 5
      src/main/java/cz/senslog/analytics/server/ws/OpenAPIHandler.java
  27. 6 0
      src/main/java/cz/senslog/analytics/utils/newvalidator/DataSource.java
  28. 11 0
      src/main/java/cz/senslog/analytics/utils/newvalidator/DisableNotifyTrigger.java
  29. 20 0
      src/main/java/cz/senslog/analytics/utils/newvalidator/InstantNotifyTrigger.java
  30. 152 0
      src/main/java/cz/senslog/analytics/utils/newvalidator/MessageFormatter.java
  31. 6 0
      src/main/java/cz/senslog/analytics/utils/newvalidator/NotifyReport.java
  32. 32 0
      src/main/java/cz/senslog/analytics/utils/newvalidator/NotifyTrigger.java
  33. 58 0
      src/main/java/cz/senslog/analytics/utils/newvalidator/OnChangeNotifyTrigger.java
  34. 12 0
      src/main/java/cz/senslog/analytics/utils/newvalidator/Threshold.java
  35. 77 0
      src/main/java/cz/senslog/analytics/utils/newvalidator/ThresholdChecker.java
  36. 102 0
      src/main/java/cz/senslog/analytics/utils/newvalidator/ThresholdManager.java
  37. 4 0
      src/main/java/cz/senslog/analytics/utils/newvalidator/ThresholdViolationReport.java
  38. 6 0
      src/main/java/cz/senslog/analytics/utils/newvalidator/ValidationReport.java
  39. 28 0
      src/main/java/cz/senslog/analytics/utils/newvalidator/ValidationResult.java
  40. 93 0
      src/main/java/cz/senslog/analytics/utils/newvalidator/Validator.java
  41. 6 0
      src/main/java/cz/senslog/analytics/utils/newvalidator/ViolationReport.java
  42. 12 5
      src/main/java/cz/senslog/analytics/utils/validator/DisableNotifyTrigger.java
  43. 46 20
      src/main/java/cz/senslog/analytics/utils/validator/InstantNotifyTrigger.java
  44. 10 3
      src/main/java/cz/senslog/analytics/utils/validator/NotifyTrigger.java
  45. 58 23
      src/main/java/cz/senslog/analytics/utils/validator/OnChangeNotifyTrigger.java
  46. 44 25
      src/main/java/cz/senslog/analytics/utils/validator/ThresholdChecker.java
  47. 81 0
      src/main/java/cz/senslog/analytics/utils/validator/ThresholdManager.java
  48. 27 16
      src/main/java/cz/senslog/analytics/utils/validator/Validator.java
  49. 14 0
      src/main/java/cz/senslog/analytics/utils/validator/domain/ThresholdValidationResult.java
  50. 41 0
      src/main/java/cz/senslog/analytics/utils/validator/domain/ValidationReport.java
  51. 11 13
      src/main/java/cz/senslog/analytics/utils/validator/domain/ValidationResult.java
  52. 6 0
      src/main/java/cz/senslog/analytics/utils/validator/domain/ViolationReport.java
  53. 44 0
      src/main/resources/openAPISpec.yaml
  54. 43 0
      src/test/java/cz/senslog/analytics/utils/AttributeRegexTest.java
  55. 184 14
      src/test/java/cz/senslog/analytics/utils/ThresholdCheckerTest.java
  56. 183 0
      src/test/java/cz/senslog/analytics/utils/newvalidator/ThresholdCheckerTest.java
  57. 41 0
      src/test/java/cz/senslog/analytics/utils/validator/MessageFormatterTest.java
  58. 75 0
      src/test/java/cz/senslog/analytics/utils/validator/OnChangeNotifyTriggerTest.java

+ 1 - 0
build.gradle

@@ -55,6 +55,7 @@ dependencies {
     implementation 'io.vertx:vertx-core:4.5.7'
     implementation 'io.vertx:vertx-web:4.5.7'
     implementation 'io.vertx:vertx-web-openapi:4.5.7'
+    implementation 'io.vertx:vertx-json-schema:4.5.7'
     implementation 'io.vertx:vertx-auth-jwt:4.5.7'
     implementation 'io.vertx:vertx-pg-client:4.5.7'
     implementation 'org.postgresql:postgresql:42.7.3'

+ 5 - 1
docker-compose.yaml

@@ -16,6 +16,8 @@ services:
     build:
       target: dev-debug
       context: .
+    environment:
+      - TZ=Europe/Prague
     env_file:
       - docker.dev.env
     ports:
@@ -30,6 +32,8 @@ services:
     build:
       target: test
       context: .
+    environment:
+      - TZ=Europe/Prague
     depends_on:
       - analytics-db
 
@@ -42,4 +46,4 @@ services:
     ports:
       - '5432:5432'
     volumes:
-      - ./init.sql:/docker-entrypoint-initdb.d/create_tables.sql
+      - ./init.sql:/docker-entrypoint-initdb.d/create_tables.sql

+ 5 - 1
docker.dev.env

@@ -16,4 +16,8 @@ AUTH_KEYSTORE_TYPE=PKCS12
 AUTH_KEYSTORE_PASSWORD=SENSlog
 
 # Modules
-MODULE_ALERT_SERVICE_URL=http://127.0.0.1:9090/channel/alert
+MODULE_ALERT_SERVICE_DISABLED=true
+MODULE_ALERT_SERVICE_HOST=172.17.0.1
+MODULE_ALERT_SERVICE_PORT=9090
+MODULE_ALERT_SERVICE_PATH_MESSAGE=/channels/analytics_alert
+MODULE_ALERT_SERVICE_PATH_INFO=/info

+ 29 - 14
init.sql

@@ -36,19 +36,23 @@ ALTER SCHEMA analytics OWNER TO senslog;
 ALTER SCHEMA public OWNER TO senslog;
 
 
-CREATE TYPE analytics.attribute_type AS ENUM ('MIN', 'MAX', 'AVG', 'VAL');
+CREATE TYPE analytics.attribute_type AS ENUM ('MIN', 'MAX', 'AVG', 'VAL', 'TIME');
+ALTER TYPE analytics.attribute_type OWNER TO senslog;
 
-CREATE TYPE analytics.threshold_mode AS ENUM ('LT', 'LE', 'GE', 'GT', 'NE', 'EQ');
+CREATE TYPE analytics.comparison_operator AS ENUM ('LT', 'LE', 'GE', 'GT', 'NE', 'EQ');
+ALTER TYPE analytics.comparison_operator OWNER TO senslog;
 
 CREATE TYPE analytics.collector_type AS ENUM ('DOUBLE', 'MOLD');
+ALTER TYPE analytics.collector_type OWNER TO senslog;
 
 CREATE TYPE analytics.notify_trigger_mode AS ENUM ('INSTANT', 'ON_CHANGE', 'DISABLED');
-
+ALTER TYPE analytics.notify_trigger_mode OWNER TO senslog;
 
 create table analytics.sensor_to_unit (
     id SERIAL PRIMARY KEY NOT NULL,
     unit_id BIGINT NOT NULL,
     sensor_id BIGINT NOT NULL,
+    name VARCHAR(200) NOT NULL,
     last_observation TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT to_timestamp(0),
     UNIQUE (unit_id, sensor_id)
 );
@@ -83,24 +87,35 @@ alter table analytics.record OWNER TO senslog;
 
 
 create table analytics.threshold (
-   id SERIAL PRIMARY KEY NOT NULL,
-   notify_trigger_mode analytics.notify_trigger_mode NOT NULL,
-   attribute_type analytics.attribute_type NOT NULL,
-   process_on_fail BOOLEAN NOT NULL,
-   alert_enable BOOLEAN NOT NULL
+     id SERIAL PRIMARY KEY NOT NULL,
+     enable BOOLEAN NOT NULL,
+     enable_alert BOOLEAN NOT NULL,
+     enable_process BOOLEAN NOT NULL,
+     custom_alert_message TEXT
 );
 
 alter table analytics.threshold OWNER TO senslog;
 
 
-create table analytics.threshold_rule (
-   id SERIAL PRIMARY KEY NOT NULL,
-   threshold_id INTEGER NOT NULL REFERENCES analytics.threshold(id),
-   threshold_mode analytics.threshold_mode NOT NULL,
-   threshold_value DOUBLE PRECISION NOT NULL
+create table analytics.threshold_dimension (
+    id SERIAL PRIMARY KEY NOT NULL,
+    attribute_type analytics.attribute_type NOT NULL,
+    notify_trigger_mode analytics.notify_trigger_mode NOT NULL,
+    threshold_id INTEGER NOT NULL REFERENCES analytics.threshold(id),
+    UNIQUE (attribute_type, threshold_id)
+);
+
+alter table analytics.threshold_dimension OWNER TO senslog;
+
+
+create table analytics.threshold_dimension_rule (
+    id SERIAL PRIMARY KEY NOT NULL,
+    threshold_dimension_id INTEGER NOT NULL REFERENCES analytics.threshold_dimension(id),
+    comparison_operator analytics.comparison_operator NOT NULL,
+    value DOUBLE PRECISION NOT NULL
 );
 
-alter table analytics.threshold_rule OWNER TO senslog;
+alter table analytics.threshold_dimension_rule OWNER TO senslog;
 
 
 create table analytics.sensor_to_unit_to_threshold (

+ 2 - 1
src/main/java/cz/senslog/analytics/app/Application.java

@@ -17,6 +17,7 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.time.Duration;
+import java.time.ZoneId;
 import java.util.Properties;
 
 public final class Application {
@@ -62,7 +63,7 @@ public final class Application {
     }
 
     public static void start() {
-        logger.info("Starting app '{}', version '{}', build '{}'.", PROJECT_NAME, COMPILED_VERSION, BUILD_VERSION);
+        logger.info("Starting app '{}', version '{}', build '{}', timezone '{}'.", PROJECT_NAME, COMPILED_VERSION, BUILD_VERSION, ZoneId.systemDefault());
 
         PropertyConfig config = PropertyConfig.getInstance();
         DeploymentOptions options = new DeploymentOptions().setConfig(JsonObject.of(

+ 28 - 4
src/main/java/cz/senslog/analytics/app/PropertyConfig.java

@@ -121,9 +121,29 @@ public final class PropertyConfig {
 
         private final Function<String, String> getEnv;
 
-        public String getAlertServiceUrl() {
-            String url = getEnv.apply("MODULE_ALERT_SERVICE_URL");
-            return Objects.requireNonNull(url, "System environmental variable 'MODULE_ALERT_SERVICE_URL' is not set.");
+        public String getAlertServiceHost() {
+            String url = getEnv.apply("MODULE_ALERT_SERVICE_HOST");
+            return Objects.requireNonNull(url, "System environmental variable 'MODULE_ALERT_SERVICE_HOST' is not set.");
+        }
+
+        public int getAlertServicePort() {
+            String port = getEnv.apply("MODULE_ALERT_SERVICE_PORT");
+            return port != null ? Integer.parseInt(port) : 80;
+        }
+
+        public String getAlertServicePathMessage() {
+            String url = getEnv.apply("MODULE_ALERT_SERVICE_PATH_MESSAGE");
+            return Objects.requireNonNull(url, "System environmental variable 'MODULE_ALERT_SERVICE_PATH_MESSAGE' is not set.");
+        }
+
+        public String getAlertServicePathInfo() {
+            String url = getEnv.apply("MODULE_ALERT_SERVICE_PATH_INFO");
+            return Objects.requireNonNull(url, "System environmental variable 'MODULE_ALERT_SERVICE_PATH_INFO' is not set.");
+        }
+
+        public boolean getAlertServiceDisabled() {
+            String disabled = getEnv.apply("MODULE_ALERT_SERVICE_DISABLED");
+            return Boolean.parseBoolean(disabled);
         }
     }
 
@@ -171,7 +191,11 @@ public final class PropertyConfig {
 
     public JsonObject modules() {
         return JsonObject.of(
-                "alert.service.url", modulesConfig.getAlertServiceUrl()
+                "alert.service.disabled", modulesConfig.getAlertServiceDisabled(),
+                "alert.service.host", modulesConfig.getAlertServiceHost(),
+                "alert.service.port", modulesConfig.getAlertServicePort(),
+                "alert.service.path.message", modulesConfig.getAlertServicePathMessage(),
+                "alert.service.path.info", modulesConfig.getAlertServicePathInfo()
         );
     }
 }

+ 9 - 2
src/main/java/cz/senslog/analytics/app/VertxDeployer.java

@@ -1,8 +1,14 @@
 package cz.senslog.analytics.app;
 
+import cz.senslog.analytics.domain.ListHolder;
 import cz.senslog.analytics.domain.ThresholdViolationAlert;
 import cz.senslog.analytics.domain.RawObservation;
 import io.vertx.core.*;
+import io.vertx.core.AbstractVerticle;
+import io.vertx.core.DeploymentOptions;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.eventbus.MessageCodec;
 import org.apache.logging.log4j.LogManager;
@@ -37,14 +43,15 @@ public class VertxDeployer extends AbstractVerticle {
 
     @Override
     public void start(Promise<Void> startPromise) {
-        vertx.eventBus().registerDefaultCodec(RawObservation.class, new IdentityCodec<>(RawObservation.class));
+//        vertx.eventBus().registerDefaultCodec(RawObservation.class, new IdentityCodec<>(RawObservation.class));
+        vertx.eventBus().registerDefaultCodec(ListHolder.class, new IdentityCodec<>(ListHolder.class));
         vertx.eventBus().registerDefaultCodec(ThresholdViolationAlert.class, new IdentityCodec<>(ThresholdViolationAlert.class));
 
 
         List<Future<Void>> futureModules = new ArrayList<>(verticles.length);
         for (AbstractVerticle v : verticles) {
             DeploymentOptions options = new DeploymentOptions()
-                    .setThreadingModel(ThreadingModel.WORKER)
+                    .setThreadingModel(io.vertx.core.ThreadingModel.WORKER)
                     .setConfig(config());
             futureModules.add(deployHelper(vertx, options, v));
         }

+ 6 - 0
src/main/java/cz/senslog/analytics/domain/AttributeType.java

@@ -1,5 +1,7 @@
 package cz.senslog.analytics.domain;
 
+import java.io.Serializable;
+
 public enum AttributeType {
 
     MIN,
@@ -8,6 +10,10 @@ public enum AttributeType {
     COUNT,
     AVG,
     VAL,
+    TIME,
 
     ;
+    public static AttributeType of(String type) {
+        return valueOf(type.toUpperCase());
+    }
 }

+ 27 - 0
src/main/java/cz/senslog/analytics/domain/ComparisonOperator.java

@@ -0,0 +1,27 @@
+package cz.senslog.analytics.domain;
+
+public enum ComparisonOperator {
+
+    LT  ("<"),
+    LE  ("<="),
+    GE  (">="),
+    GT  (">"),
+    NE  ("!="),
+    EQ  ("=")
+
+    ;
+    private final String format;
+
+    ComparisonOperator(String format) {
+        this.format = format;
+    }
+
+    public String format() {
+        return format;
+    }
+
+    public static ComparisonOperator of(String mode) {
+        return valueOf(mode.toUpperCase());
+    } 
+
+}

+ 48 - 0
src/main/java/cz/senslog/analytics/domain/ListHolder.java

@@ -0,0 +1,48 @@
+package cz.senslog.analytics.domain;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Stream;
+
+public final class ListHolder<T> {
+
+    private final List<T> dataList;
+
+    public static <T> ListHolder<T> of(final List<T> dataList) {
+        return new ListHolder<>(Objects.requireNonNullElse(dataList, Collections.emptyList()));
+    }
+
+    private ListHolder(List<T> dataList) {
+        this.dataList = dataList;
+    }
+
+    public List<T> list() {
+        return this.dataList;
+    }
+
+    public boolean isEmpty() {
+        return this.dataList.isEmpty();
+    }
+
+    public boolean isNotEmpty() {
+        return !this.isEmpty();
+    }
+
+    public Stream<T> stream() {
+        return this.dataList.stream();
+    }
+
+    public int size() {
+        return this.dataList.size();
+    }
+
+    public T first() {
+        return this.dataList.get(0);
+    }
+
+    public T last() {
+        if (dataList.isEmpty()) { return null; }
+        return this.dataList.get(this.dataList.size() - 1);
+    }
+}

+ 0 - 5
src/main/java/cz/senslog/analytics/domain/MessageBrokerConfig.java

@@ -1,5 +0,0 @@
-package cz.senslog.analytics.domain;
-
-import io.vertx.core.json.JsonObject;
-
-public record MessageBrokerConfig(long id, MessageBroker.Type senderType, JsonObject config) {}

+ 5 - 3
src/main/java/cz/senslog/analytics/domain/Threshold.java

@@ -1,9 +1,11 @@
 package cz.senslog.analytics.domain;
 
 
-import cz.senslog.analytics.utils.validator.NotifyTrigger;
-
 import java.util.List;
 
-public record Threshold(long id, long datasourceId, NotifyTrigger.Mode notifyTriggerMode, AttributeType attributeType, boolean processOnFail, boolean notifyEnabled, List<ThresholdRule> rules) {
+public record Threshold(long id, long datasourceId, boolean enable, boolean enableProcess, boolean enableAlert, String customAlertMessage, List<ThresholdDimension> thresholdDimensions) {
+
+    public static Threshold of(long id, long datasourceId, boolean enable, boolean enableProcess, boolean enableAlert, String alertMessage, List<ThresholdDimension> thresholdDimensions) {
+        return new Threshold(id, datasourceId, enable, enableProcess, enableAlert, alertMessage, thresholdDimensions);
+    }
 }

+ 12 - 0
src/main/java/cz/senslog/analytics/domain/ThresholdDimension.java

@@ -0,0 +1,12 @@
+package cz.senslog.analytics.domain;
+
+import cz.senslog.analytics.utils.validator.NotifyTrigger;
+
+import java.util.List;
+
+public record ThresholdDimension(long id, NotifyTrigger.Mode notifyTriggerMode, AttributeType attributeType, List<ThresholdDimensionRule> rules) {
+
+    public static ThresholdDimension of(long id, NotifyTrigger.Mode notifyTriggerMode, AttributeType attributeType, List<ThresholdDimensionRule> rules) {
+        return new ThresholdDimension(id, notifyTriggerMode, attributeType, rules);
+    }
+}

+ 8 - 0
src/main/java/cz/senslog/analytics/domain/ThresholdDimensionRule.java

@@ -0,0 +1,8 @@
+package cz.senslog.analytics.domain;
+
+public record ThresholdDimensionRule(ComparisonOperator comparisonOperator, double value) {
+
+    public static ThresholdDimensionRule of(ComparisonOperator comparisonOperator, double value) {
+        return new ThresholdDimensionRule(comparisonOperator, value);
+    }
+}

+ 0 - 17
src/main/java/cz/senslog/analytics/domain/ThresholdMode.java

@@ -1,17 +0,0 @@
-package cz.senslog.analytics.domain;
-
-public enum ThresholdMode {
-
-    LT,
-    LE,
-    GE,
-    GT,
-    NE,
-    EQ
-
-    ;
-    public static ThresholdMode of(String mode) {
-        return valueOf(mode.toUpperCase());
-    } 
-
-}

+ 0 - 8
src/main/java/cz/senslog/analytics/domain/ThresholdRule.java

@@ -1,8 +0,0 @@
-package cz.senslog.analytics.domain;
-
-public record ThresholdRule(ThresholdMode thresholdMode, double value) {
-
-    public static ThresholdRule of(ThresholdMode thresholdMode, double value) {
-        return new ThresholdRule(thresholdMode, value);
-    }
-}

+ 5 - 1
src/main/java/cz/senslog/analytics/domain/ThresholdViolationAlert.java

@@ -2,5 +2,9 @@ package cz.senslog.analytics.domain;
 
 import java.time.OffsetDateTime;
 
-public record ThresholdViolationAlert(String moduleName, long analyticGroupId, String sourceName, ValidationResult[] violatedData, OffsetDateTime timestamp) {
+public record ThresholdViolationAlert(String moduleName, long analyticGroupId, String sourceName, String message, OffsetDateTime timestamp) {
+
+    public static ThresholdViolationAlert of(String moduleName, long analyticGroupId, String sourceName, String alertMessage, OffsetDateTime timestamp) {
+        return new ThresholdViolationAlert(moduleName, analyticGroupId, sourceName, alertMessage, timestamp);
+    }
 }

+ 13 - 0
src/main/java/cz/senslog/analytics/domain/VariableType.java

@@ -0,0 +1,13 @@
+package cz.senslog.analytics.domain;
+
+public enum VariableType {
+
+    TIMESTAMP,
+    MODULE,
+    DATASOURCE
+
+    ;
+    public static VariableType of(String type) {
+        return valueOf(type.toUpperCase());
+    }
+}

+ 0 - 6
src/main/java/cz/senslog/analytics/domain/ViolationReport.java

@@ -1,6 +0,0 @@
-package cz.senslog.analytics.domain;
-
-import java.time.OffsetDateTime;
-
-public record ViolationReport(long datasourceId, OffsetDateTime timestamp, ValidationResult[] violatedData) {
-}

+ 51 - 25
src/main/java/cz/senslog/analytics/module/AlertModule.java

@@ -1,39 +1,76 @@
 package cz.senslog.analytics.module;
 
+import cz.senslog.analytics.app.Application;
 import cz.senslog.analytics.domain.ThresholdViolationAlert;
 import cz.senslog.analytics.module.api.SimpleModule;
-import cz.senslog.analytics.repository.AnalyticsRepository;
+import cz.senslog.analytics.server.ws.ContentType;
+import io.vertx.core.Future;
 import io.vertx.core.Promise;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.*;
+import io.vertx.core.json.JsonObject;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.util.Arrays;
-
+import java.util.function.Function;
 
 public final class AlertModule extends SimpleModule {
 
     private static final Logger logger = LogManager.getLogger(AlertModule.class);
 
-    private final AnalyticsRepository repo;
-
-    private String alertServiceURL;
+    private HttpClient httpClient;
 
+    private Function<ThresholdViolationAlert, Future<Buffer>> alertHandler;
 
-    public AlertModule(AnalyticsRepository repo) {
-        this.repo = repo;
-    }
+    private String alertServerMsgPath;
 
     @Override
     public void configure(Promise<Void> completePromise) {
-        alertServiceURL = config().getJsonObject("modules").getString("alert.service.url");
-        completePromise.complete();
+        JsonObject moduleConfig = config().getJsonObject("modules");
+
+        boolean disabled = moduleConfig.getBoolean("alert.service.disabled");
+        if (disabled) {
+            this.alertHandler = alert -> {
+                logger.info("Alert: {}", alert.message());
+                return Future.succeededFuture(Buffer.buffer("Alert service disabled. Logging to console."));
+            };
+            completePromise.complete(); return;
+        }
+
+        HttpClientOptions httpOpts = new HttpClientOptions()
+                .setDefaultHost(moduleConfig.getString("alert.service.host"))
+                .setDefaultPort(moduleConfig.getInteger("alert.service.port"));
+
+        httpClient = vertx.createHttpClient(httpOpts);
+        alertServerMsgPath = moduleConfig.getString("alert.service.path.message");
+
+        logger.info("Connecting to the alert server {} at {}.", httpOpts.getDefaultHost(), httpOpts.getDefaultPort());
+        httpClient.request(HttpMethod.GET, moduleConfig.getString("alert.service.path.info"))
+                        .compose(req -> req.send().compose(HttpClientResponse::body))
+                .onSuccess(info -> {
+                    String alertServerInfo = String.join(" | ", info.toJsonObject().stream().map(e -> e.getKey() + ": " + e.getValue()).toList());
+                    logger.info("Connected to Alert Server: {}", alertServerInfo);
+                    this.alertHandler = this::sendToAlertService;
+                    completePromise.complete();
+                })
+                .onFailure(completePromise::fail);
+    }
+
+    private Future<Buffer> sendToAlertService(ThresholdViolationAlert alert) {
+        return httpClient.request(HttpMethod.POST, alertServerMsgPath)
+                .compose(req -> req.putHeader(HttpHeaders.CONTENT_TYPE, ContentType.JSON.contentType())
+                        .send(JsonObject.of(
+                                "title",  String.format("[%s] %s", Application.PROJECT_NAME, alert.sourceName()),
+                                "message", alert.message()
+                        ).encode())
+                        .compose(HttpClientResponse::body));
     }
 
     @Override
     public void run(Promise<Void> completePromise) {
-        vertx.eventBus().<ThresholdViolationAlert>consumer(id(), msg -> vertx.executeBlocking(() -> blockingAlertHandler(msg.body()))
-                        .onSuccess(res -> logger.info("Alert sent successfully!"))
-                        .onFailure(logger::error))
+        vertx.eventBus().<ThresholdViolationAlert>consumer(id(), msg -> alertHandler.apply(msg.body())
+                            .onSuccess(b -> logger.info(b.toString()))
+                            .onFailure(logger::error))
                 .completionHandler(ar -> {
                     if (ar.succeeded()) {
                         completePromise.complete();
@@ -42,15 +79,4 @@ public final class AlertModule extends SimpleModule {
                     }
                 });
     }
-
-    private Object blockingAlertHandler(ThresholdViolationAlert alert) {
-        logger.info("Sending the alert: Title: {}, Message: {}, At: {} to {}.", alert.sourceName(), Arrays.toString(alert.violatedData()), alert.timestamp(), alertServiceURL);
-        try {
-            Thread.sleep(1000);
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-        }
-        // TODO send the alert
-        return null;
-    }
 }

+ 2 - 1
src/main/java/cz/senslog/analytics/module/DoubleStatisticsModule.java

@@ -5,6 +5,7 @@ import cz.senslog.analytics.module.api.CollectorModule;
 import cz.senslog.analytics.repository.AnalyticsRepository;
 import cz.senslog.analytics.utils.validator.ThresholdChecker;
 import cz.senslog.analytics.utils.validator.Validator;
+import cz.senslog.analytics.utils.validator.domain.ViolationReport;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -42,7 +43,7 @@ public class DoubleStatisticsModule extends CollectorModule {
     private void notifyIfViolation(ViolationReport report) {
         Optional<AnalyticGroup> source = Optional.ofNullable(groups().get(report.datasourceId()));
         String sourceName = source.map(AnalyticGroup::name).orElse(DEFAULT_GROUP_NAME);
-        notify(new ThresholdViolationAlert(type().name(), report.datasourceId(), sourceName, report.violatedData(), report.timestamp()));
+        notify(new ThresholdViolationAlert(type().name(), report.datasourceId(), sourceName, report.message(), report.timestamp()));
     }
 
     private Stream<StatisticRecord> mapToStatisticRecord(DoubleStatistics st) {

+ 27 - 28
src/main/java/cz/senslog/analytics/module/ObservationReceiverModule.java

@@ -2,12 +2,14 @@ package cz.senslog.analytics.module;
 
 import cz.senslog.analytics.domain.*;
 import cz.senslog.analytics.repository.AnalyticsRepository;
-import cz.senslog.analytics.utils.validator.ThresholdChecker;
+import cz.senslog.analytics.utils.newvalidator.NotifyTrigger;
+import cz.senslog.analytics.utils.newvalidator.ThresholdChecker;
 import cz.senslog.analytics.module.api.Module;
 import cz.senslog.analytics.module.api.ModuleDescriptor;
 import cz.senslog.analytics.module.api.SimpleModule;
-import cz.senslog.analytics.utils.validator.Validator;
+import cz.senslog.analytics.utils.newvalidator.Validator;
 import cz.senslog.analytics.utils.Tuple;
+import cz.senslog.analytics.utils.newvalidator.ViolationReport;
 import io.vertx.core.Future;
 import io.vertx.core.Promise;
 import org.apache.logging.log4j.LogManager;
@@ -25,7 +27,8 @@ public class ObservationReceiverModule extends SimpleModule {
 
     private record RawSensor(long unitId, long sensorId) {}
 
-    private static final String MODULE_NAME = "RECEIVE";
+    private static final String MODULE_NAME = "RECEIVER";
+
     private final AnalyticsRepository repo;
 
     private Map<RawSensor, Sensor> sensorsMap;
@@ -48,7 +51,8 @@ public class ObservationReceiverModule extends SimpleModule {
         Future<List<Tuple<Long, AnalyticGroup>>> datasourceToAnalyticGroupFuture = repo.loadAllAnalyticGroupsWithDatasource();
 
         final Validator<Observation> validator = Validator.<Observation>create()
-                .addMapping(AttributeType.VAL, o -> o::value);
+                .addMapping(AttributeType.VAL, o -> o::value)
+                .addMapping(AttributeType.TIME, o -> () ->  (double) o.timestamp().toLocalTime().toSecondOfDay());
 
         Future.all(allSensorsFuture, thresholdsForSensorsFuture, datasourceToAnalyticGroupFuture)
                 .onSuccess(ar -> {
@@ -61,7 +65,12 @@ public class ObservationReceiverModule extends SimpleModule {
                     }
 
                     List<Threshold> sensorsThresholds = ar.resultAt(1);
-                    thresholdChecker = new ThresholdChecker<>(sensorsThresholds, validator, this::notifyIfViolated, true);
+                    // TODO refactor
+                    thresholdChecker = ThresholdChecker.create(MODULE_NAME,
+                            sensorsThresholds.stream().map(t -> new cz.senslog.analytics.utils.newvalidator.Threshold(t.id(), t.datasourceId(), t.enableProcess(), t.enableAlert(), t.customAlertMessage(),
+                                    t.thresholdDimensions().stream().map(d -> new cz.senslog.analytics.utils.newvalidator.Threshold.Dimension(d.attributeType(), NotifyTrigger.Mode.of(d.notifyTriggerMode().name()),
+                                            d.rules().stream().map(r -> new cz.senslog.analytics.utils.newvalidator.Threshold.Dimension.Rule(r.comparisonOperator(), r.value())).toList())).toList())).toList(),
+                            validator, this::notifyIfViolated, true);
 
                     List<Tuple<Long, AnalyticGroup>> groupsWithDS = ar.resultAt(2);
                     sensorToMultiGroup = groupsWithDS.stream()
@@ -74,34 +83,24 @@ public class ObservationReceiverModule extends SimpleModule {
     }
 
     private void notifyIfViolated(ViolationReport report) {
-        Sensor sensor = sensorIdToSensorMap.get(report.datasourceId());
+        Sensor sensor = sensorIdToSensorMap.get(report.dataSourceId());
         String sourceName = String.format("unit(%d)/sensor(%d)", sensor.unitId(), sensor.sensorId());
-        notify(new ThresholdViolationAlert(MODULE_NAME, report.datasourceId(), sourceName, report.violatedData(), report.timestamp()));
+        notify(ThresholdViolationAlert.of(MODULE_NAME, report.dataSourceId(), sourceName, report.message(), report.timestamp()));
     }
 
     @Override
     public void run(Promise<Void> completePromise) {
-
-        /*
-
-        1. [consume]    receive observations
-        2. [filter]     check if it is in sensorToSingleGroup OR sensorToCollectorGroup
-        2. [filter]     check thresholds by single analytic group (collector is null)
-            a) send alert if threshold alert is enabled
-            b) processed if processing on failed is enabled
-        3. [flatMap]    mapping source based on the sensorToCollectorGroup (collector is not null)
-        4. [forEach]    send rawObservation to each collector module
-
-         */
-
-        eventBus().<RawObservation>consumer(id(), h -> Stream.of(h.body())
-                .filter(o -> sensorsMap.containsKey(new RawSensor(o.unitId(), o.sensorId())))
-                .map(o -> Observation.of(sensorsMap.get(new RawSensor(o.unitId(), o.sensorId())), o))
-                .peek(o -> repo.updateLastTimestamp(o.sensor().id(), o.timestamp()).onFailure(logger::error))
-                .filter(thresholdChecker::check)
-                .flatMap(this::mappingToGroups)
-                .forEach(m -> eventBus().publish(m.getItem1().id(), m.getItem2()))
-        ).completionHandler(ar -> {
+        eventBus().<ListHolder<RawObservation>>consumer(id(), h -> {
+            logger.info("Received {} observations.", h.body().size());
+            h.body().stream()
+                            .filter(o -> sensorsMap.containsKey(new RawSensor(o.unitId(), o.sensorId())))
+                            .map(o -> Observation.of(sensorsMap.get(new RawSensor(o.unitId(), o.sensorId())), o))
+                            .peek(o -> repo.updateLastTimestamp(o.sensor().id(), o.timestamp()).onFailure(logger::error))
+                            .filter(thresholdChecker::check)
+                            .flatMap(this::mappingToGroups)
+                            .forEach(m -> eventBus().publish(m.getItem1().id(), m.getItem2()));
+            h.reply(String.format("Processed %d observations.", h.body().size()));
+        }).completionHandler(ar -> {
             if (ar.succeeded()) {
                 completePromise.complete();
             } else {

+ 1 - 1
src/main/java/cz/senslog/analytics/module/api/Module.java

@@ -22,7 +22,7 @@ public final class Module {
         createModule(new DoubleStatisticsModule(repo));
         createModule(new MoldAnalysisModule(repo));
 
-        createModule(new AlertModule(repo));
+        createModule(new AlertModule());
 //        createModule(new ScheduleDBLoaderModule(repo));
 
         return MODULES.values().toArray(new SimpleModule[0]);

+ 41 - 30
src/main/java/cz/senslog/analytics/repository/AnalyticsRepositoryImpl.java

@@ -2,7 +2,9 @@ package cz.senslog.analytics.repository;
 
 import cz.senslog.analytics.domain.*;
 import cz.senslog.analytics.utils.Tuple;
+import cz.senslog.analytics.domain.ComparisonOperator;
 import cz.senslog.analytics.utils.validator.NotifyTrigger;
+import io.vertx.core.CompositeFuture;
 import io.vertx.core.Future;
 import io.vertx.sqlclient.Pool;
 import io.vertx.sqlclient.RowSet;
@@ -67,30 +69,40 @@ public final class AnalyticsRepositoryImpl implements AnalyticsRepository {
 
     @Override
     public Future<List<Threshold>> loadThresholdsForSensor() {
-        return client.query("SELECT ss.id AS datasource_id, sstt.threshold_id, t.notify_trigger_mode, t.attribute_type, t.process_on_fail, t.alert_enable, tr.threshold_mode, tr.threshold_value " +
-                        "FROM analytics.sensor_to_unit ss " +
-                        "JOIN analytics.sensor_to_unit_to_threshold sstt on ss.id = sstt.sensor_to_unit_id " +
-                        "JOIN analytics.threshold t on t.id = sstt.threshold_id " +
-                        "JOIN analytics.threshold_rule tr on t.id = tr.threshold_id")
+        return client.query("SELECT stutt.sensor_to_unit_id AS datasource_id, stutt.threshold_id, t.enable, t.enable_process, t.enable_alert, t.custom_alert_message " +
+                "FROM analytics.sensor_to_unit_to_threshold stutt " +
+                "JOIN analytics.threshold t on t.id = stutt.threshold_id")
                 .execute()
                 .map(rs -> StreamSupport.stream(rs.spliterator(), false)
-                        .map(row -> new Threshold(
-                                    row.getLong("threshold_id"),
-                                    row.getLong("datasource_id"),
-                                    NotifyTrigger.Mode.valueOf(row.getString("notify_trigger_mode")),
-                                    AttributeType.valueOf(row.getString("attribute_type")),
-                                    row.getBoolean("process_on_fail"),
-                                    row.getBoolean("alert_enable"),
-                                    new ArrayList<>() {{
-                                                add(ThresholdRule.of(
-                                                        ThresholdMode.of(row.getString("threshold_mode")),
-                                                        row.getDouble("threshold_value")
-                                                ));
-                                            }}
-                                        )
-                        ).collect(Collectors.toMap(Threshold::id, t -> t, (p, q) -> {
-                            p.rules().add(q.rules().get(0)); return p;
-                        })).values().stream().toList()
+                        .map(row -> Threshold.of(
+                                row.getLong("threshold_id"),
+                                row.getLong("datasource_id"),
+                                row.getBoolean("enable"),
+                                row.getBoolean("enable_process"),
+                                row.getBoolean("enable_alert"),
+                                row.getString("custom_alert_message"),
+                                new ArrayList<>()
+                        )).toList()
+                ).compose(thresholds -> Future.join(thresholds.stream()
+                        .map(th -> client.preparedQuery("SELECT e.id, e.notify_trigger_mode, e.attribute_type, er.comparison_operator, er.value FROM analytics.threshold_dimension e " +
+                                            "JOIN analytics.threshold_dimension_rule er on e.id = er.threshold_dimension_id WHERE e.threshold_id = $1")
+                            .execute(io.vertx.sqlclient.Tuple.of(th.id()))
+                            .map(rs -> StreamSupport.stream(rs.spliterator(), false)
+                                    .map(row -> ThresholdDimension.of(
+                                            row.getLong("id"),
+                                            NotifyTrigger.Mode.of(row.getString("notify_trigger_mode")),
+                                            AttributeType.of(row.getString("attribute_type")),
+                                                    new ArrayList<>() {{
+                                                        add(ThresholdDimensionRule.of(
+                                                                ComparisonOperator.of(row.getString("comparison_operator")),
+                                                                row.getDouble("value")
+                                                        ));
+                                                    }}
+                                            )
+                                    ).collect(Collectors.toMap(ThresholdDimension::id, t -> t, (p, q) -> {
+                                        p.rules().add(q.rules().get(0)); return p;
+                                    })).values().stream().collect(Collectors.toCollection(th::thresholdDimensions))
+                            ).compose(edges -> Future.succeededFuture(th))).toList()).map(CompositeFuture::list)
                 );
     }
 
@@ -129,12 +141,9 @@ public final class AnalyticsRepositoryImpl implements AnalyticsRepository {
 
     @Override
     public Future<List<Threshold>> loadGroupThresholds(CollectorType type) {
-        return client.preparedQuery("SELECT ag.id AS datasource_id, agtt.threshold_id, t.notify_trigger_mode, t.attribute_type, t.process_on_fail, t.alert_enable, tr.threshold_mode, tr.threshold_value " +
-                        "FROM analytics.analytic_group ag " +
-                        "JOIN analytics.analytic_group_to_threshold agtt on ag.id = agtt.analytic_group_id " +
-                        "JOIN analytics.threshold t on t.id = agtt.threshold_id\n" +
-                        "JOIN analytics.threshold_rule tr on t.id = tr.threshold_id " +
-                        "WHERE ag.collector_type = $1")
+        return Future.succeededFuture(Collections.emptyList());
+        /*
+        return client.preparedQuery("")
                 .execute(io.vertx.sqlclient.Tuple.of(type.name()))
                 .map(rs -> StreamSupport.stream(rs.spliterator(), false)
                         .map(row -> new Threshold(
@@ -145,8 +154,8 @@ public final class AnalyticsRepositoryImpl implements AnalyticsRepository {
                                         row.getBoolean("process_on_fail"),
                                         row.getBoolean("alert_enable"),
                                         new ArrayList<>() {{
-                                            add(new ThresholdRule(
-                                                    ThresholdMode.of(row.getString("threshold_mode")),
+                                            add(new ThresholdEdgeRule(
+                                                    ComparisonOperator.of(row.getString("threshold_mode")),
                                                     row.getDouble("threshold_value")
                                             ));
                                         }}
@@ -155,5 +164,7 @@ public final class AnalyticsRepositoryImpl implements AnalyticsRepository {
                             p.rules().add(q.rules().get(0)); return p;
                         })).values().stream().toList()
                 );
+
+         */
     }
 }

+ 31 - 1
src/main/java/cz/senslog/analytics/server/HttpVertxServer.java

@@ -1,11 +1,16 @@
 package cz.senslog.analytics.server;
 
+import cz.senslog.analytics.domain.ListHolder;
+import cz.senslog.analytics.domain.RawObservation;
+import cz.senslog.analytics.module.ObservationReceiverModule;
+import cz.senslog.analytics.module.api.Module;
 import cz.senslog.analytics.server.ws.AuthorizationType;
 import cz.senslog.analytics.server.ws.DisableAuthorizationHandler;
 import cz.senslog.analytics.server.ws.ExceptionHandler;
 import cz.senslog.analytics.server.ws.OpenAPIHandler;
 import cz.senslog.analytics.utils.ResourcesUtils;
 import io.vertx.core.AbstractVerticle;
+import io.vertx.core.Future;
 import io.vertx.core.Promise;
 import io.vertx.core.http.HttpMethod;
 import io.vertx.core.json.JsonObject;
@@ -14,11 +19,13 @@ import io.vertx.ext.auth.jwt.JWTAuth;
 import io.vertx.ext.auth.jwt.JWTAuthOptions;
 import io.vertx.ext.web.Router;
 import io.vertx.ext.web.handler.*;
+import io.vertx.ext.web.openapi.OpenAPIHolder;
 import io.vertx.ext.web.openapi.RouterBuilder;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.nio.file.Path;
+import java.util.function.Function;
 
 import static cz.senslog.analytics.server.ws.AuthorizationType.BEARER;
 import static java.util.Objects.requireNonNull;
@@ -74,9 +81,12 @@ public final class HttpVertxServer extends AbstractVerticle {
                     // The order matters, so adding the body handler should happen after any PLATFORM or SECURITY_POLICY handler(s).
                     openAPIRouterBuilder.rootHandler(BodyHandler.create());
 
-                    OpenAPIHandler apiHandler = OpenAPIHandler.create();
+
+                    OpenAPIHandler apiHandler = getOpenAPIHandler(openAPIRouterBuilder.getOpenAPI());
 
                     openAPIRouterBuilder.operation("infoGET").handler(apiHandler::info);
+                    openAPIRouterBuilder.operation("observationsPOST").handler(apiHandler::observationsPOST);
+                    openAPIRouterBuilder.operation("observationsGET").handler(apiHandler::observationsGET);
 
                     Router mainRouter = openAPIRouterBuilder.createRouter();
 
@@ -101,4 +111,24 @@ public final class HttpVertxServer extends AbstractVerticle {
                 })
                 .onFailure(startPromise::fail);
     }
+
+    private OpenAPIHandler getOpenAPIHandler(OpenAPIHolder openAPI) {
+        final String observationReceiverBusId = Module.of(ObservationReceiverModule.class).id();
+
+        Function<ListHolder<RawObservation>, Future<String>> observationReceiver = listHolder -> {
+            Promise<String> replyPromise = Promise.promise();
+
+            vertx.eventBus().request(observationReceiverBusId, listHolder, reply -> {
+                if (reply.succeeded()) {
+                    replyPromise.complete(reply.result().body().toString());
+                } else {
+                    replyPromise.fail(reply.cause());
+                }
+            });
+
+            return replyPromise.future();
+        };
+
+        return OpenAPIHandler.create(openAPI, observationReceiver);
+    }
 }

+ 44 - 5
src/main/java/cz/senslog/analytics/server/ws/OpenAPIHandler.java

@@ -2,25 +2,39 @@ package cz.senslog.analytics.server.ws;
 
 import cz.senslog.analytics.app.Application;
 import cz.senslog.analytics.app.PropertyConfig;
+import cz.senslog.analytics.domain.ListHolder;
+import cz.senslog.analytics.domain.RawObservation;
 import io.vertx.core.Future;
-import io.vertx.core.json.JsonArray;
 import io.vertx.core.json.JsonObject;
+import io.vertx.core.json.pointer.JsonPointer;
 import io.vertx.ext.web.RoutingContext;
+import io.vertx.ext.web.openapi.OpenAPIHolder;
+import io.vertx.json.schema.JsonSchema;
+import io.vertx.json.schema.impl.JsonObjectSchema;
 
-import java.time.ZoneId;
+import java.time.OffsetDateTime;
+import java.util.Comparator;
 import java.util.List;
+import java.util.function.Function;
 
 import static cz.senslog.analytics.server.ws.AuthorizationType.BEARER;
 import static cz.senslog.analytics.server.ws.AuthorizationType.NONE;
 import static cz.senslog.analytics.server.ws.ContentType.JSON;
 import static io.vertx.core.http.HttpHeaders.ACCEPT;
 import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE;
-import static java.util.stream.Collectors.toList;
 
 public class OpenAPIHandler {
 
-    public static OpenAPIHandler create() {
-        return new OpenAPIHandler();
+    private final Function<ListHolder<RawObservation>, Future<String>> observationConsumer;
+    private final OpenAPIHolder openAPI;
+
+    public static OpenAPIHandler create(OpenAPIHolder openAPI, Function<ListHolder<RawObservation>, Future<String>> observationConsumer) {
+        return new OpenAPIHandler(openAPI, observationConsumer);
+    }
+
+    private OpenAPIHandler(OpenAPIHolder openAPI, Function<ListHolder<RawObservation>, Future<String>> observationConsumer) {
+        this.openAPI = openAPI;
+        this.observationConsumer = observationConsumer;
     }
 
     public void info(RoutingContext rc) {
@@ -37,4 +51,29 @@ public class OpenAPIHandler {
                                 "authType", authEnable ? BEARER : NONE
                         ).encode()));
     }
+
+    public void observationsPOST(RoutingContext rc) {
+        List<RawObservation> observations = rc.body().asJsonArray()
+                .stream().map(JsonObject.class::cast)
+                .map(e -> RawObservation.of(
+                        e.getLong("unitId"),
+                        e.getLong("sensorId"),
+                        e.getDouble("observedValue"),
+                        OffsetDateTime.parse(e.getString("timestamp"))
+                ))
+                .sorted(Comparator.comparing(RawObservation::timestamp))
+                .toList();
+
+        observationConsumer.apply(ListHolder.of(observations))
+                .onSuccess(msg -> rc.end(JsonObject.of(
+                        "message", msg
+                ).encode()))
+                .onFailure(rc::fail);
+    }
+
+    public void observationsGET(RoutingContext rc) {
+        final JsonSchema schema = JsonSchema.of(openAPI.getCached(JsonPointer.from("/components/schemas/ObservationList")));
+        rc.response().putHeader(CONTENT_TYPE, "application/json+schema")
+                .end(schema.toString());
+    }
 }

+ 6 - 0
src/main/java/cz/senslog/analytics/utils/newvalidator/DataSource.java

@@ -0,0 +1,6 @@
+package cz.senslog.analytics.utils.newvalidator;
+
+import java.util.List;
+
+public record DataSource(long id, String name, String moduleName) {
+}

+ 11 - 0
src/main/java/cz/senslog/analytics/utils/newvalidator/DisableNotifyTrigger.java

@@ -0,0 +1,11 @@
+package cz.senslog.analytics.utils.newvalidator;
+
+import java.util.Optional;
+
+class DisableNotifyTrigger implements NotifyTrigger {
+
+    @Override
+    public Optional<NotifyReport> accept(ValidationResult validationResult) {
+        return Optional.empty();
+    }
+}

+ 20 - 0
src/main/java/cz/senslog/analytics/utils/newvalidator/InstantNotifyTrigger.java

@@ -0,0 +1,20 @@
+package cz.senslog.analytics.utils.newvalidator;
+
+import org.apache.logging.log4j.util.Strings;
+
+import java.util.Optional;
+
+class InstantNotifyTrigger implements NotifyTrigger {
+
+    @Override
+    public Optional<NotifyReport> accept(ValidationResult validationResult) {
+        if (validationResult.hasViolation()) {
+            return Optional.of(new NotifyReport(
+                    validationResult.violatedValue(),
+                    validationResult.violatedAttribute(),
+                    "Inside " + validationResult
+            ));
+        }
+        return Optional.empty();
+    }
+}

+ 152 - 0
src/main/java/cz/senslog/analytics/utils/newvalidator/MessageFormatter.java

@@ -0,0 +1,152 @@
+package cz.senslog.analytics.utils.newvalidator;
+
+import cz.senslog.analytics.domain.AttributeType;
+import cz.senslog.analytics.domain.VariableType;
+import cz.senslog.analytics.utils.Tuple;
+import org.apache.logging.log4j.util.Strings;
+
+import java.time.DateTimeException;
+import java.time.OffsetDateTime;
+import java.util.*;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME;
+import static java.time.format.DateTimeFormatter.ofPattern;
+
+public class MessageFormatter {
+
+    private final Pattern attributePattern;
+    private final Pattern variablePattern;
+
+    public static MessageFormatter createOf(Set<VariableType> variables, Set<AttributeType> attributes) {
+        if (variables.isEmpty() || attributes.isEmpty()) {
+            return null;
+        }
+        return new MessageFormatter(variables, attributes);
+    }
+
+    public static MessageFormatter create() {
+        return new MessageFormatter(Set.of(VariableType.values()), Set.of(AttributeType.values()));
+    }
+
+    private MessageFormatter(Set<VariableType> variables, Set<AttributeType> attributes) {
+        final Function<Enum<?>, String> wrapWord = w -> String.format("\\b%s\\b", w.name().toLowerCase());
+
+        Iterator<VariableType> varIter = variables.iterator();
+        StringBuilder varBuilder = new StringBuilder(wrapWord.apply(varIter.next()));
+        while (varIter.hasNext()) {
+            varBuilder.append("|").append(wrapWord.apply(varIter.next()));
+        }
+        final String variableRegexStr = "(?<variable>\\$("+varBuilder+"))(\\((?<format>\\S*)\\))?";
+
+        Iterator<AttributeType> attrIter = attributes.iterator();
+        StringBuilder attrBuilder = new StringBuilder(wrapWord.apply(attrIter.next()));
+        while (attrIter.hasNext()) {
+            attrBuilder.append("|").append(wrapWord.apply(attrIter.next()));
+        }
+        final String attributeRegexStr = "(?<attribute>\\$("+attrBuilder+"))(\\((?<format>\\S*)\\))?";
+
+        attributePattern = Pattern.compile(attributeRegexStr, Pattern.CASE_INSENSITIVE | Pattern.MULTILINE | Pattern.DOTALL );
+        variablePattern = Pattern.compile(variableRegexStr, Pattern.CASE_INSENSITIVE | Pattern.MULTILINE | Pattern.DOTALL );
+    }
+
+    public String format(final String pattern, final Map<AttributeType, Double> attributeArgs, final Map<VariableType, Function<Optional<String>, String>> variableArgs) {
+
+        String message = pattern;
+        int iterationCount = 0;
+        boolean found;
+        do {
+            Matcher matcher = variablePattern.matcher(message);
+            found = matcher.find();
+            if (found) {
+                String variable = matcher.group("variable").substring(1);
+                String format = matcher.group("format");
+
+                message = matcher.replaceFirst(variableArgs.get(VariableType.of(variable)).apply(Optional.ofNullable(format)));
+            }
+        } while (found && iterationCount++ < 10);
+
+        iterationCount = 0;
+        do {
+            Matcher matcher = attributePattern.matcher(message);
+            found = matcher.find();
+            if (found) {
+                String attribute = matcher.group("attribute").substring(1);
+                String format = matcher.group("format");
+
+                double value = attributeArgs.get(AttributeType.of(attribute));
+                message = matcher.replaceFirst(String.format(Strings.isBlank(format) ? "%f" : format, value));
+            }
+        } while (found && iterationCount++ < 10);
+
+        return message;
+    }
+
+    /*
+    public boolean isValid(final String patternMessage, List<ThresholdDimension> thresholdDimensions) {
+        AttributeType[] types = new AttributeType[thresholdDimensions.size()];
+        for (int i = 0; i < thresholdDimensions.size(); i++) {
+            types[i] = thresholdDimensions.get(i).attributeType();
+        }
+        return isValid(patternMessage, types);
+    }
+
+    public boolean isValid(final String patternMessage, AttributeType[] attributeTypes) {
+
+        if (patternMessage == null || Strings.isBlank(patternMessage) || patternMessage.isEmpty()) {
+            return true;
+        }
+
+        AttributeType[] enableTypes = new AttributeType[AttributeType.values().length];
+        for (AttributeType type : attributeTypes) {
+            enableTypes[type.ordinal()] = type;
+        }
+
+        double testValue = 0.0;
+
+        Matcher attrMatcher = attributePattern.matcher(patternMessage);
+        while (attrMatcher.find()) {
+            String attribute = attrMatcher.group("attribute").substring(1);
+            String format = attrMatcher.group("format");
+
+            AttributeType attrType = AttributeType.of(attribute);
+            if (attrType == null || enableTypes[attrType.ordinal()] == null) {
+                return false;
+            }
+            if (Strings.isNotBlank(format)) {
+                try {
+                    String.format(format, testValue);
+                } catch (IllegalFormatException e) {
+                    return false;
+                }
+            }
+        }
+
+        Matcher variableMatcher = variablePattern.matcher(patternMessage);
+        while (variableMatcher.find()) {
+            String variable = variableMatcher.group("variable").substring(1);
+            String format = variableMatcher.group("format");
+
+            if (variable.equalsIgnoreCase("timestamp") && Strings.isNotBlank(format)) {
+                try {
+                    OffsetDateTime.now().format(ofPattern(format));
+                } catch (IllegalArgumentException | DateTimeException e) {
+                    return false;
+                }
+            }
+        }
+
+        return true;
+    }
+
+    public boolean isNotValid(final String patternMessage, Map<AttributeType, Double> args) {
+        return !isValid(patternMessage, args);
+    }
+
+     */
+}

+ 6 - 0
src/main/java/cz/senslog/analytics/utils/newvalidator/NotifyReport.java

@@ -0,0 +1,6 @@
+package cz.senslog.analytics.utils.newvalidator;
+
+import cz.senslog.analytics.domain.AttributeType;
+
+public record NotifyReport(double value, AttributeType attributeType, String violatedRulesMessage) {
+}

+ 32 - 0
src/main/java/cz/senslog/analytics/utils/newvalidator/NotifyTrigger.java

@@ -0,0 +1,32 @@
+package cz.senslog.analytics.utils.newvalidator;
+
+import java.util.Optional;
+import java.util.function.Supplier;
+
+public interface NotifyTrigger {
+
+    Optional<NotifyReport> accept(ValidationResult validationResult);
+
+    enum Mode {
+        DISABLED    (DisableNotifyTrigger::new),
+        INSTANT     (InstantNotifyTrigger::new),
+        ON_CHANGE   (OnChangeNotifyTrigger::new),
+
+        ;
+
+        Mode(Supplier<NotifyTrigger> constructCreator) {
+            this.constructCreator = constructCreator;
+        }
+
+        private final Supplier<NotifyTrigger> constructCreator;
+
+        NotifyTrigger createInstance() {
+            return constructCreator.get();
+        }
+
+        public static NotifyTrigger.Mode of(String mode) {
+            return NotifyTrigger.Mode.valueOf(mode.toUpperCase());
+        }
+    }
+
+}

+ 58 - 0
src/main/java/cz/senslog/analytics/utils/newvalidator/OnChangeNotifyTrigger.java

@@ -0,0 +1,58 @@
+package cz.senslog.analytics.utils.newvalidator;
+
+import java.util.Optional;
+
+class OnChangeNotifyTrigger implements NotifyTrigger {
+
+    private static final int CACHE_LEN = 2;
+
+    private static int nextIndex(int index) {
+        return  (index + 1) % CACHE_LEN;
+    }
+
+    private static int previousIndex(int index) {
+        return (index <= 0 ? CACHE_LEN : index) - 1;
+    }
+
+    private static final class ResultCache {
+        private final ValidationResult[] resultsArr;
+        private int freeIndex;
+
+        private ResultCache(ValidationResult[] resultsArr, int freeIndex) {
+            this.resultsArr = resultsArr;
+            this.freeIndex = freeIndex;
+        }
+    }
+
+    private final ResultCache resultCache;
+
+    public OnChangeNotifyTrigger() {
+        this.resultCache = new ResultCache(new ValidationResult[CACHE_LEN], 0);
+    }
+
+    @Override
+    public Optional<NotifyReport> accept(ValidationResult validationResult) {
+
+        resultCache.resultsArr[resultCache.freeIndex] = validationResult;
+        resultCache.freeIndex = nextIndex(resultCache.freeIndex);
+
+        int currentDataInd = previousIndex(resultCache.freeIndex);
+        ValidationResult currentRes = resultCache.resultsArr[currentDataInd];
+        ValidationResult previousRes = resultCache.resultsArr[previousIndex(currentDataInd)];
+
+        if ((previousRes == null || !previousRes.hasViolation()) && currentRes.hasViolation()) {
+            // notify about the enter the threshold
+            return Optional.of(new NotifyReport(
+                currentRes.violatedValue(), currentRes.violatedAttribute(), ("Entry " + currentRes)
+            ));
+        } else if(!currentRes.hasViolation() && (previousRes != null && previousRes.hasViolation())) {
+            // notify about the exit the threshold
+            return Optional.of(new NotifyReport(
+                    currentRes.violatedValue(), currentRes.violatedAttribute(), ("Exit " + currentRes)
+            ));
+        }
+
+        // nothing to notify
+        return Optional.empty();
+    }
+}

+ 12 - 0
src/main/java/cz/senslog/analytics/utils/newvalidator/Threshold.java

@@ -0,0 +1,12 @@
+package cz.senslog.analytics.utils.newvalidator;
+
+import cz.senslog.analytics.domain.AttributeType;
+import cz.senslog.analytics.domain.ComparisonOperator;
+
+import java.util.List;
+
+public record Threshold(long id, long dataSourceId, boolean enableProcess, boolean enableAlert, String messagePattern, List<Dimension> dimensions) {
+    public record Dimension(AttributeType attributeType, NotifyTrigger.Mode notifyTriggerMode, List<Rule> rules) {
+        public record Rule(ComparisonOperator comparisonOperator, double value) {}
+    }
+}

+ 77 - 0
src/main/java/cz/senslog/analytics/utils/newvalidator/ThresholdChecker.java

@@ -0,0 +1,77 @@
+package cz.senslog.analytics.utils.newvalidator;
+
+import cz.senslog.analytics.domain.TimeSeriesDatasource;
+
+import java.time.OffsetDateTime;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.emptyList;
+import static java.util.stream.Collectors.mapping;
+import static java.util.stream.Collectors.toList;
+
+public class ThresholdChecker<DS extends TimeSeriesDatasource> {
+
+    public static <T extends TimeSeriesDatasource> ThresholdChecker<T> disabled() {
+        return ThresholdChecker.create("disabled", emptyList(), null, r -> {}, true);
+    }
+
+    private final Map<Long, ThresholdManager<DS>> thresholdManagers;
+    private final Consumer<ViolationReport> notifier;
+    private final boolean defaultEnablingProcess;
+
+    public static <DS extends TimeSeriesDatasource> ThresholdChecker<DS> create(
+            final String moduleName,
+            final List<Threshold> thresholds,
+            final Validator<DS> validator,
+            final Consumer<ViolationReport> notifier,
+            final boolean defaultEnablingProcess
+    ) {
+//        Objects.requireNonNull(validator);
+        Objects.requireNonNull(thresholds);
+        Objects.requireNonNull(notifier);
+
+        final Map<Long, List<Threshold>> dsToTh = thresholds.stream()
+                .collect(Collectors.groupingBy(Threshold::dataSourceId, mapping(Function.identity(), toList())));
+
+        // TODO get datasource
+        final Map<Long, ThresholdManager<DS>> thresholdManagers = new HashMap<>(dsToTh.size());
+        for (Map.Entry<Long, List<Threshold>> datasourceEntry : dsToTh.entrySet()) {
+            thresholdManagers.put(datasourceEntry.getKey(), new ThresholdManager<>(
+                    new DataSource(datasourceEntry.getKey(), "datasource("+datasourceEntry.getKey()+")", moduleName), validator, datasourceEntry.getValue()
+            ));
+        }
+
+        return new ThresholdChecker<>(notifier, thresholdManagers, defaultEnablingProcess);
+    }
+
+    private ThresholdChecker(Consumer<ViolationReport> notifier, Map<Long, ThresholdManager<DS>> thresholdManagers, boolean defaultEnablingProcess) {
+        this.defaultEnablingProcess = defaultEnablingProcess;
+        this.notifier = notifier;
+        this.thresholdManagers = thresholdManagers;
+    }
+
+    public boolean check(DS data) {
+
+        long sourceId = data.datasourceId();
+        OffsetDateTime timestamp = data.timestamp();
+
+        ThresholdManager<DS> thresholdManager = thresholdManagers.get(sourceId);
+        if (sourceId <= 0 || thresholdManager == null) {
+            return defaultEnablingProcess;
+        }
+
+        ValidationReport validationReport = thresholdManager.accept(data);
+
+        if (validationReport.message().isPresent()) {
+            notifier.accept(new ViolationReport(sourceId, timestamp, validationReport.message().get()));
+        }
+
+        return validationReport.process();
+    }
+}

+ 102 - 0
src/main/java/cz/senslog/analytics/utils/newvalidator/ThresholdManager.java

@@ -0,0 +1,102 @@
+package cz.senslog.analytics.utils.newvalidator;
+
+import cz.senslog.analytics.domain.AttributeType;
+import cz.senslog.analytics.domain.TimeSeriesDatasource;
+import cz.senslog.analytics.domain.VariableType;
+
+import java.time.OffsetDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.*;
+
+import static cz.senslog.analytics.domain.VariableType.*;
+import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME;
+
+class ThresholdManager<DS extends TimeSeriesDatasource> {
+
+    private final DataSource datasource;
+    private final Validator<DS> validator;
+    private final List<Threshold> thresholds;
+    private final Map<Long, Map<AttributeType, NotifyTrigger>> notifyTriggers;
+    private final MessageFormatter messageFormatter;
+
+    protected ThresholdManager(DataSource datasource, Validator<DS> validator, List<Threshold> thresholds) {
+        this.datasource = datasource;
+        this.validator = validator;
+        this.thresholds = thresholds;
+
+        Set<AttributeType> supportedAttributes = validator.attributes();
+        this.messageFormatter = MessageFormatter.createOf(Set.of(VariableType.values()), supportedAttributes);
+        this.notifyTriggers = new HashMap<>(thresholds.size());
+        for (Threshold th : thresholds) {
+            // TODO check if Threshold#message is valid according to the dimension#attributeType
+            for (Threshold.Dimension dim : th.dimensions()) {
+                notifyTriggers.computeIfAbsent(th.id(), thId -> new HashMap<>(dim.rules().size()))
+                        .computeIfAbsent(dim.attributeType(), attr -> dim.notifyTriggerMode().createInstance());
+            }
+        }
+    }
+
+    public ValidationReport accept(final DS data) {
+        if (data == null || data.datasourceId() != this.datasource.id()) {
+            return null;
+        }
+
+        OffsetDateTime timestamp = data.timestamp();
+        List<String> messagesToNotify = new ArrayList<>(thresholds.size());
+
+        boolean shouldProcess = false;
+        boolean shouldNotify = false;
+
+        for (Threshold threshold : thresholds) {
+            ValidationResult[] validationResults = validator.validate(data, threshold);
+            Map<AttributeType, NotifyTrigger> triggers = notifyTriggers.get(threshold.id());
+            List<NotifyReport> thReports = new ArrayList<>(validationResults.length);
+
+            boolean thIsViolated = true;
+            boolean thShouldNotify = threshold.enableAlert(); // if alert is disabled then it is always false
+
+            for (ValidationResult result : validationResults) {
+                Optional<NotifyReport> dimNotifyReportOpt = triggers.get(result.violatedAttribute()).accept(result); // all trigger must have something to notify
+                dimNotifyReportOpt.ifPresent(thReports::add);
+                thShouldNotify = thShouldNotify && dimNotifyReportOpt.isPresent(); // isPresent() = true = violation should notify
+                thIsViolated = thIsViolated && result.hasViolation(); // all must be violated
+            }
+
+            // should process if at least one threshold is valid over all dimensions OR the process is enabled if violation
+            shouldProcess = shouldProcess || !thIsViolated || threshold.enableProcess();
+
+            // none or at least one has something to notify
+            shouldNotify = shouldNotify || thShouldNotify;
+
+            if (thShouldNotify) {
+                if (threshold.messagePattern() != null) {
+                    String patternMessage = threshold.messagePattern();
+                    Map<AttributeType, Double> messageArgs = new HashMap<>(thReports.size());
+                    for (NotifyReport thR : thReports) {
+                        messageArgs.put(thR.attributeType(), thR.value());
+                    }
+
+                    messagesToNotify.add(messageFormatter.format(patternMessage, messageArgs, Map.of(
+                            TIMESTAMP, (f -> timestamp.format(f.map(DateTimeFormatter::ofPattern).orElse(ISO_OFFSET_DATE_TIME))),
+                            MODULE, (f -> datasource.moduleName()),
+                            DATASOURCE, (f -> datasource.moduleName())
+                    )));
+
+                } else {
+                    StringBuilder thMessage = new StringBuilder();
+                    NotifyReport r0 = thReports.get(0);
+                    thMessage.append(r0.violatedRulesMessage());
+                    for (int i = 1; i < thReports.size(); i++) {
+                        thMessage.append("\n").append(thReports.get(i).violatedRulesMessage());
+                    }
+                    messagesToNotify.add(thMessage.toString());
+                }
+            }
+        }
+
+        return new ValidationReport(
+                shouldProcess,
+                shouldNotify ? Optional.of( String.join("\n\n", messagesToNotify)) : Optional.empty()
+        );
+    }
+}

+ 4 - 0
src/main/java/cz/senslog/analytics/utils/newvalidator/ThresholdViolationReport.java

@@ -0,0 +1,4 @@
+package cz.senslog.analytics.utils.newvalidator;
+
+record ThresholdViolationReport() {
+}

+ 6 - 0
src/main/java/cz/senslog/analytics/utils/newvalidator/ValidationReport.java

@@ -0,0 +1,6 @@
+package cz.senslog.analytics.utils.newvalidator;
+
+import java.util.Optional;
+
+record ValidationReport(boolean process, Optional<String> message) {
+}

+ 28 - 0
src/main/java/cz/senslog/analytics/utils/newvalidator/ValidationResult.java

@@ -0,0 +1,28 @@
+package cz.senslog.analytics.utils.newvalidator;
+
+import cz.senslog.analytics.domain.AttributeType;
+
+import java.util.List;
+
+public record ValidationResult(double violatedValue, AttributeType violatedAttribute, List<Threshold.Dimension.Rule> rules) {
+
+    public boolean hasViolation() {
+        return rules != null && !rules.isEmpty();
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder result = new StringBuilder(violatedAttribute.name().toLowerCase()).append(": ");
+        if (rules != null && !rules.isEmpty()) {
+            Threshold.Dimension.Rule r0 = rules.get(0);
+            result.append(String.format("%.2f %s %.2f", violatedValue, r0.comparisonOperator().format(), r0.value()));
+            for (int i = 1; i < rules.size(); i++) {
+                Threshold.Dimension.Rule ri = rules.get(i);
+                result.append(String.format(" AND %.2f %s %.2f", violatedValue, ri.comparisonOperator().format(), ri.value()));
+            }
+        } else {
+            result.append(String.format("%.2f", violatedValue));
+        }
+        return result.toString();
+    }
+}

+ 93 - 0
src/main/java/cz/senslog/analytics/utils/newvalidator/Validator.java

@@ -0,0 +1,93 @@
+package cz.senslog.analytics.utils.newvalidator;
+
+import cz.senslog.analytics.domain.AttributeType;
+import cz.senslog.analytics.domain.ComparisonOperator;
+
+import java.util.*;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static cz.senslog.analytics.domain.ComparisonOperator.*;
+import static cz.senslog.analytics.domain.ComparisonOperator.NE;
+import static java.util.Collections.emptyList;
+
+public class Validator<T> {
+
+    private static final Map<ComparisonOperator, BiFunction<Double, Double, Boolean>> functions;
+
+    static {
+        functions = new HashMap<>(6);
+        functions.put(GT, (val, ths) -> val > ths);
+        functions.put(GE, (val, ths) -> val >= ths);
+        functions.put(LT, (val, ths) -> val < ths);
+        functions.put(LE, (val, ths) -> val <= ths);
+        functions.put(EQ, Double::equals);
+        functions.put(NE, (val, ths) -> !val.equals(ths));
+    }
+
+    public static boolean checkThresholdValue(ComparisonOperator mode, Double value, Double threshold) {
+        if (mode == null || value == null || threshold == null) return false;
+        return functions.getOrDefault(mode, (val, ths) -> false).apply(value, threshold);
+    }
+
+    public static boolean checkThresholdValue(Threshold.Dimension.Rule rule, Double value) {
+        if (rule == null || value == null) return false;
+        return checkThresholdValue(rule.comparisonOperator(), value, rule.value());
+    }
+
+    public interface AttributeMapping<T> {
+        Validator<T> addMapping(AttributeType property, Function<T, Supplier<Double>> getter);
+    }
+
+
+    public static <T> Validator.AttributeMapping<T> create() {
+        return new Validator.AttributeMapping<>() {
+            private final Validator<T> validator = new Validator<>();
+            @Override
+            public Validator<T> addMapping(AttributeType property, Function<T, Supplier<Double>> getter) {
+                validator.map.put(property, getter);
+                return validator;
+            }
+        };
+    }
+
+    private final Map<AttributeType, Function<T, Supplier<Double>>> map;
+
+    private Validator() {
+        this.map = new HashMap<>();
+    }
+
+    public Set<AttributeType> attributes() {
+        return map.keySet();
+    }
+
+    public Validator<T> addMapping(AttributeType property, Function<T, Supplier<Double>> getter) {
+        map.put(property, getter);
+        return this;
+    }
+
+    public ValidationResult[] validate(T object, Threshold threshold) {
+        return validate(object, threshold, map);
+    }
+
+    private static <T> ValidationResult validate(T object, Threshold.Dimension dimension, Map<AttributeType, Function<T, Supplier<Double>>> attributeMapping) {
+        final Double testingValue = attributeMapping.getOrDefault(dimension.attributeType(), (o) -> () -> null).apply(object).get();
+        final List<Threshold.Dimension.Rule> violatedRules = new ArrayList<>(dimension.rules().size());
+        boolean passed = true;
+        for (Threshold.Dimension.Rule rule : dimension.rules()) {
+            passed &= checkThresholdValue(rule, testingValue);
+            violatedRules.add(rule);
+        }
+        return new ValidationResult(testingValue, dimension.attributeType(), passed ? violatedRules : emptyList());
+    }
+
+    private static <T> ValidationResult[] validate(T object, Threshold threshold, Map<AttributeType, Function<T, Supplier<Double>>> attributeMapping) {
+        final ValidationResult[] results = new ValidationResult[threshold.dimensions().size()];
+        int resultIndex = 0;
+        for (Threshold.Dimension dimension : threshold.dimensions()) {
+            results[resultIndex++] = validate(object, dimension, attributeMapping);
+        }
+        return results;
+    }
+}

+ 6 - 0
src/main/java/cz/senslog/analytics/utils/newvalidator/ViolationReport.java

@@ -0,0 +1,6 @@
+package cz.senslog.analytics.utils.newvalidator;
+
+import java.time.OffsetDateTime;
+
+public record ViolationReport(long dataSourceId, OffsetDateTime timestamp, String message) {
+}

+ 12 - 5
src/main/java/cz/senslog/analytics/utils/validator/DisableNotifyTrigger.java

@@ -1,14 +1,21 @@
 package cz.senslog.analytics.utils.validator;
 
 
-import cz.senslog.analytics.domain.ValidationResult;
+import cz.senslog.analytics.domain.Threshold;
+import cz.senslog.analytics.utils.validator.domain.ValidationReport;
+import cz.senslog.analytics.utils.validator.domain.ValidationResult;
 
 public class DisableNotifyTrigger implements NotifyTrigger {
 
     public DisableNotifyTrigger() {}
 
     @Override
-    public void accept(ValidationResult validationResult) {}
+    public void accept(Threshold threshold, ValidationResult... validationResults) {}
+
+    @Override
+    public Mode mode() {
+        return Mode.DISABLED;
+    }
 
     @Override
     public boolean shouldNotify() {
@@ -16,7 +23,7 @@ public class DisableNotifyTrigger implements NotifyTrigger {
     }
 
     @Override
-    public ValidationResult[] resultsToNotify() {
-        return new ValidationResult[0];
+    public ValidationReport resultsToNotify() {
+        return new ValidationReport(null, new ValidationResult[0]);
     }
-}
+}

+ 46 - 20
src/main/java/cz/senslog/analytics/utils/validator/InstantNotifyTrigger.java

@@ -1,48 +1,74 @@
 package cz.senslog.analytics.utils.validator;
 
-import cz.senslog.analytics.domain.AttributeType;
-import cz.senslog.analytics.domain.ValidationResult;
+import cz.senslog.analytics.domain.*;
+import cz.senslog.analytics.utils.validator.domain.ThresholdValidationResult;
+import cz.senslog.analytics.utils.validator.domain.ValidationReport;
+import cz.senslog.analytics.utils.validator.domain.ValidationResult;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
+/**
+ * The class InstantNotifyTrigger is unique for each datasource, i.e., unit/sensor = one NotifyTrigger
+ */
 public class InstantNotifyTrigger implements NotifyTrigger {
 
-    private final Map<AttributeType, ValidationResult> currentResultMap;
-    private List<AttributeType> tempAttrsToNotify;
+    private final Map<Long, ThresholdValidationResult> currentResultMap;
+    private Set<Long> tempAttrsToNotify;
 
     public InstantNotifyTrigger() {
         this.currentResultMap = new HashMap<>();
     }
 
     @Override
-    public void accept(ValidationResult validationResult) {
-        currentResultMap.put(validationResult.attributeType(), validationResult);
+    public void accept(Threshold threshold, ValidationResult... validationResults) {
+        currentResultMap.put(threshold.id(), new ThresholdValidationResult(threshold, validationResults));
+    }
+
+    @Override
+    public Mode mode() {
+        return Mode.INSTANT;
     }
 
     @Override
     public boolean shouldNotify() {
         boolean notify = false;
-        tempAttrsToNotify = new ArrayList<>();
-        for (ValidationResult res : currentResultMap.values()) {
-            if (res.isNotValid()) {
+        tempAttrsToNotify = new HashSet<>();
+        for (Map.Entry<Long, ThresholdValidationResult> thresholdResultEntry : currentResultMap.entrySet()) {
+            if (thresholdResultEntry.getValue().isViolated()) {
                 notify = true;
-                tempAttrsToNotify.add(res.attributeType());
+                tempAttrsToNotify.add(thresholdResultEntry.getKey());
             }
         }
         return notify;
     }
 
     @Override
-    public ValidationResult[] resultsToNotify() {
-        ValidationResult[] res = new ValidationResult[tempAttrsToNotify.size()];
-        int ind = 0;
-        for (AttributeType attr : tempAttrsToNotify) {
-            res[ind++] = currentResultMap.get(attr);
+    public ValidationReport resultsToNotify() {
+
+        if (tempAttrsToNotify.isEmpty()) {
+            return ValidationReport.empty();
         }
+
+        StringBuilder alertMessageBuilder = new StringBuilder();
+        List<ValidationResult> attributeValidationResults = new ArrayList<>();
+
+        for (Long thresholdId : tempAttrsToNotify) {
+            ThresholdValidationResult thResult = currentResultMap.get(thresholdId);
+            attributeValidationResults.addAll(Arrays.asList(thResult.validationResults()));
+
+            if (thResult.threshold().enableAlert()) {
+                String alertMessage = thResult.threshold().customAlertMessage();
+                if (alertMessage != null) {
+                    alertMessageBuilder.append(alertMessage).append("\n");
+                }
+            }
+        }
+
         tempAttrsToNotify = null;
-        return res;
+
+        return new ValidationReport(
+                alertMessageBuilder.toString(),
+                attributeValidationResults.toArray(new ValidationResult[0])
+        );
     }
 }

+ 10 - 3
src/main/java/cz/senslog/analytics/utils/validator/NotifyTrigger.java

@@ -1,16 +1,19 @@
 package cz.senslog.analytics.utils.validator;
 
 
-import cz.senslog.analytics.domain.ValidationResult;
+import cz.senslog.analytics.domain.Threshold;
+import cz.senslog.analytics.utils.validator.domain.ValidationReport;
+import cz.senslog.analytics.utils.validator.domain.ValidationResult;
 
 import java.util.function.Supplier;
 
 public interface NotifyTrigger {
 
-    void accept(ValidationResult validationResult);
+    void accept(Threshold threshold, ValidationResult... validationResults);
 
+    Mode mode();
     boolean shouldNotify();
-    ValidationResult[] resultsToNotify();
+    ValidationReport resultsToNotify();
 
     enum Mode {
         DISABLED    (DisableNotifyTrigger::new),
@@ -28,6 +31,10 @@ public interface NotifyTrigger {
         public NotifyTrigger createInstance() {
             return constructCreator.get();
         }
+
+        public static Mode of(String mode) {
+            return Mode.valueOf(mode.toUpperCase());
+        }
     }
 
 

+ 58 - 23
src/main/java/cz/senslog/analytics/utils/validator/OnChangeNotifyTrigger.java

@@ -1,8 +1,11 @@
 package cz.senslog.analytics.utils.validator;
 
 
-import cz.senslog.analytics.domain.AttributeType;
-import cz.senslog.analytics.domain.ValidationResult;
+import cz.senslog.analytics.domain.Threshold;
+import cz.senslog.analytics.domain.ThresholdDimension;
+import cz.senslog.analytics.utils.validator.domain.ThresholdValidationResult;
+import cz.senslog.analytics.utils.validator.domain.ValidationReport;
+import cz.senslog.analytics.utils.validator.domain.ValidationResult;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -12,10 +15,10 @@ import java.util.Map;
 public class OnChangeNotifyTrigger implements NotifyTrigger {
 
     private static final class CacheWrapper {
-        private final ValidationResult[] resultsArr;
+        private final ThresholdValidationResult[] resultsArr;
         private int freeIndex;
 
-        private CacheWrapper(ValidationResult[] resultsArr, int freeIndex) {
+        private CacheWrapper(ThresholdValidationResult[] resultsArr, int freeIndex) {
             this.resultsArr = resultsArr;
             this.freeIndex = freeIndex;
         }
@@ -23,25 +26,30 @@ public class OnChangeNotifyTrigger implements NotifyTrigger {
 
     private static final int CACHE_LEN = 2;
 
-    private final Map<AttributeType, CacheWrapper> cacheMap;
+    private final Map<Long, CacheWrapper> cacheMap;
 
-    private List<ValidationResult> tempToNotify;
+    private List<ThresholdValidationResult> tempToNotify;
 
     public OnChangeNotifyTrigger() {
         this.cacheMap = new HashMap<>();
     }
 
     @Override
-    public void accept(ValidationResult validationResult) {
-        AttributeType attr = validationResult.attributeType();
-        if (!cacheMap.containsKey(attr)) {
-            cacheMap.put(attr, new CacheWrapper(new ValidationResult[CACHE_LEN], 0));
+    public void accept(Threshold threshold, ValidationResult... validationResults) {
+        long thresholdId = threshold.id();
+        if (!cacheMap.containsKey(thresholdId)) {
+            cacheMap.put(thresholdId, new CacheWrapper(new ThresholdValidationResult[CACHE_LEN], 0));
         }
-        CacheWrapper cache = cacheMap.get(attr);
-        cache.resultsArr[cache.freeIndex] = validationResult;
+        CacheWrapper cache = cacheMap.get(thresholdId);
+        cache.resultsArr[cache.freeIndex] = new ThresholdValidationResult(threshold, validationResults);
         cache.freeIndex = nextIndex(cache.freeIndex);
     }
 
+    @Override
+    public Mode mode() {
+        return Mode.ON_CHANGE;
+    }
+
     private static int nextIndex(int index) {
         return  (index + 1) % CACHE_LEN;
     }
@@ -54,15 +62,15 @@ public class OnChangeNotifyTrigger implements NotifyTrigger {
     public boolean shouldNotify() {
         boolean notify = false;
         tempToNotify = new ArrayList<>(cacheMap.size());
-        for (CacheWrapper cache : cacheMap.values()) {
+        for (CacheWrapper cache : cacheMap.values()) { // over all thresholds
             int currentDataInd = previousIndex(cache.freeIndex);
-            ValidationResult currentRes = cache.resultsArr[currentDataInd];
-            ValidationResult previousRes = cache.resultsArr[previousIndex(currentDataInd)];
-            if ((previousRes == null || previousRes.isValid()) && currentRes.isNotValid()) {
-                notify = true; // pass the threshold
+            ThresholdValidationResult currentRes = cache.resultsArr[currentDataInd];
+            ThresholdValidationResult previousRes = cache.resultsArr[previousIndex(currentDataInd)];
+            if ((previousRes == null || !previousRes.isViolated()) && currentRes.isViolated()) {
+                notify = true; // enter the threshold
                 tempToNotify.add(currentRes);
-            } else if(currentRes.isValid() && (previousRes != null && previousRes.isNotValid())) {
-                notify = true; // leave the threshold
+            } else if(!currentRes.isViolated() && (previousRes != null && previousRes.isViolated())) {
+                notify = true; // exit the threshold
                 tempToNotify.add(currentRes);
             }
         }
@@ -70,11 +78,38 @@ public class OnChangeNotifyTrigger implements NotifyTrigger {
     }
 
     @Override
-    public ValidationResult[] resultsToNotify() {
-        if (tempToNotify == null) { return new ValidationResult[0]; }
-        ValidationResult[] res = tempToNotify.toArray(new ValidationResult[0]);
+    public ValidationReport resultsToNotify() {
+
+        if (tempToNotify == null) {
+            return ValidationReport.empty();
+        }
+
+        StringBuilder alertMessageBuilder = new StringBuilder();
+        List<ValidationResult> attributeValidationResults = new ArrayList<>();
+
+        for (ThresholdValidationResult thResult : tempToNotify) {
+            for (ThresholdDimension thEdge : thResult.threshold().thresholdDimensions()) {
+                    for (ValidationResult validationResult : thResult.validationResults()) {
+                        if (validationResult.attributeType().equals(thEdge.attributeType())) {
+                            attributeValidationResults.add(validationResult);
+                        }
+                }
+            }
+
+            if (thResult.threshold().enableAlert()) {
+                String alertMessage = thResult.threshold().customAlertMessage();
+                if (alertMessage != null) {
+                    alertMessageBuilder.append(alertMessage).append("\n");
+                }
+            }
+        }
+
         tempToNotify = null;
-        return res;
+
+        return new ValidationReport(
+                alertMessageBuilder.toString(),
+                attributeValidationResults.toArray(new ValidationResult[0])
+        );
     }
 
 }

+ 44 - 25
src/main/java/cz/senslog/analytics/utils/validator/ThresholdChecker.java

@@ -1,28 +1,33 @@
 package cz.senslog.analytics.utils.validator;
 
 import cz.senslog.analytics.domain.*;
+import cz.senslog.analytics.utils.newvalidator.MessageFormatter;
+import cz.senslog.analytics.utils.validator.domain.ValidationReport;
+import cz.senslog.analytics.utils.validator.domain.ValidationResult;
+import cz.senslog.analytics.utils.validator.domain.ViolationReport;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.time.OffsetDateTime;
 import java.util.*;
 import java.util.function.Consumer;
-import java.util.function.Predicate;
 
 import static java.util.Collections.emptyList;
 
 public class ThresholdChecker<DS extends TimeSeriesDatasource> {
 
+    private static final Logger logger = LogManager.getLogger(ThresholdChecker.class);
+
     public static <T extends TimeSeriesDatasource> ThresholdChecker<T> disabled() {
         return ThresholdChecker.create(emptyList(), null, null, true);
     }
 
-    private final Map<Long, List<Threshold>> thresholds;
+    private final Map<Long, ThresholdManager> datasourceThresholdManagers;
 
     private final Validator<DS> validator;
 
     private final Consumer<ViolationReport> ifViolated;
 
-    private final Map<Long, NotifyTrigger> notifyTriggerMap;
-
     private final boolean defaultIfNotPresent;
 
     public static <DS extends TimeSeriesDatasource> ThresholdChecker<DS> create(List<Threshold> thresholds,
@@ -40,48 +45,62 @@ public class ThresholdChecker<DS extends TimeSeriesDatasource> {
         this.validator = validator;
         this.ifViolated = ifViolated;
         this.defaultIfNotPresent = defaultIfNotPresent;
+        this.datasourceThresholdManagers = new HashMap<>();
 
-        this.notifyTriggerMap = new HashMap<>();
-        this.thresholds = new HashMap<>();
+        Map<Long, Map<NotifyTrigger.Mode, NotifyTrigger>> dataSourceToNotifyTriggers = new HashMap<>();
         for (Threshold th : thresholds) {
-            long sourceId = th.datasourceId();
-            if (!notifyTriggerMap.containsKey(sourceId)) {
-                this.notifyTriggerMap.put(sourceId, th.notifyTriggerMode().createInstance());
+            Set<NotifyTrigger> triggers = new HashSet<>(th.thresholdDimensions().size());
+            for (int i = 0; i < th.thresholdDimensions().size(); i++) {
+                ThresholdDimension thDim = th.thresholdDimensions().get(i);
+                triggers.add(dataSourceToNotifyTriggers
+                    .computeIfAbsent(th.datasourceId(), dataSource -> new HashMap<>(NotifyTrigger.Mode.values().length))
+                    .computeIfAbsent(thDim.notifyTriggerMode(), NotifyTrigger.Mode::createInstance));
             }
-            this.thresholds.computeIfAbsent(th.datasourceId(), k -> new ArrayList<>()).add(th);
-        }
-    }
 
-    public Predicate<DS> check() {
-        return this::validateThreshold;
+            this.datasourceThresholdManagers.computeIfAbsent(th.datasourceId(), ThresholdManager::new).register(th, triggers.toArray(new NotifyTrigger[0]));
+        }
     }
 
+    /**
+     * The method checks the DS as datasource over all thresholds and returns boolean value of enable/disable further processing.
+     * If the validation result if 'false' but processing on failure is true, the return value is true.
+     * @param data DS to check
+     * @return boolean of enable/disable further processing
+     */
     public boolean check(DS data) {
         return validateThreshold(data);
     }
 
     private boolean validateThreshold(DS data) {
+
         long sourceId = data.datasourceId();
         OffsetDateTime timestamp = data.timestamp();
 
-        if (sourceId <= 0 || !thresholds.containsKey(sourceId)) {
+        if (sourceId <= 0 || !datasourceThresholdManagers.containsKey(sourceId)) {
             return defaultIfNotPresent;
         }
         boolean process = true;
-        List<Threshold> rules = thresholds.get(sourceId);
-
-        NotifyTrigger notifier = notifyTriggerMap.get(sourceId);
-        for (Threshold th : rules) {
-            ValidationResult res = validator.validate(data, th);
-            notifier.accept(res);
 
-            if (res.isNotValid() && !th.processOnFail()) {
-                process = false;
+        ThresholdManager thresholdManager = datasourceThresholdManagers.get(sourceId);
+        for (Threshold th : thresholdManager.thresholds()) {
+            ValidationResult[] res = validator.validate(data, th);
+            thresholdManager.accept(th, res);
+
+            if (!th.enableProcess()) {
+                // process on failure is set to false then calculate if the validation is true or false
+                boolean isNotValid = res[0].isNotValid();
+                for (int i = 1; i < res.length; i++) {
+                    isNotValid &= res[i].isNotValid();
+                }
+                if (isNotValid) {
+                    process = false;
+                }
             }
         }
 
-        if (notifier.shouldNotify()) {
-            ifViolated.accept(new ViolationReport(sourceId, timestamp, notifier.resultsToNotify()));
+        if (thresholdManager.shouldNotify()) {
+            ValidationReport report = thresholdManager.resultsToNotify();
+            ifViolated.accept(new ViolationReport(sourceId, timestamp, report.message(),  report.validationResults()));
         }
 
         return process;

+ 81 - 0
src/main/java/cz/senslog/analytics/utils/validator/ThresholdManager.java

@@ -0,0 +1,81 @@
+package cz.senslog.analytics.utils.validator;
+
+import cz.senslog.analytics.domain.AttributeType;
+import cz.senslog.analytics.domain.Threshold;
+import cz.senslog.analytics.domain.ThresholdDimension;
+import cz.senslog.analytics.utils.validator.domain.ValidationReport;
+import cz.senslog.analytics.utils.validator.domain.ValidationResult;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.util.Collections.emptyMap;
+
+class ThresholdManager {
+
+    private static final NotifyTrigger DISABLED_TRIGGER = NotifyTrigger.Mode.DISABLED.createInstance();
+
+    private final long datasourceId;
+
+    private final Map<Long, Threshold> thresholds;
+    private final Map<Long, Map<AttributeType, NotifyTrigger>> notifyTriggers;
+
+    ThresholdManager(long datasourceId) {
+        this.datasourceId = datasourceId;
+        this.thresholds = new HashMap<>();
+        this.notifyTriggers = new HashMap<>();
+    }
+
+    void register(Threshold threshold, NotifyTrigger... notifyTrigger) {
+        thresholds.put(threshold.id(), threshold);
+
+        for (ThresholdDimension dimension : threshold.thresholdDimensions()) {
+            for (NotifyTrigger trigger : notifyTrigger) {
+                if (dimension.notifyTriggerMode().equals(trigger.mode())) {
+                    notifyTriggers.computeIfAbsent(threshold.id(), id -> new HashMap<>())
+                            .put(dimension.attributeType(), trigger);
+                }
+            }
+        }
+    }
+
+    Collection<Threshold> thresholds() {
+        return thresholds.values();
+    }
+
+    void accept(Threshold threshold, ValidationResult... res) {
+        if (threshold.datasourceId() == this.datasourceId) {
+            Map<AttributeType, NotifyTrigger> triggers = notifyTriggers.getOrDefault(threshold.id(), emptyMap());
+            for (ValidationResult vRes : res) {
+                triggers.getOrDefault(vRes.attributeType(), DISABLED_TRIGGER).accept(threshold, vRes);
+            }
+        }
+    }
+
+    boolean shouldNotify() {
+        boolean shouldNotify = true;
+        for (Map<AttributeType, NotifyTrigger> triggers : notifyTriggers.values()) { // over all thresholds
+            for (NotifyTrigger trigger : triggers.values()) {
+                shouldNotify &= trigger.shouldNotify();
+            }
+        }
+        return shouldNotify;
+    }
+
+    ValidationReport resultsToNotify() {
+        ValidationReport finalReport = ValidationReport.empty();
+        for (Map.Entry<Long, Map<AttributeType, NotifyTrigger>> entry : notifyTriggers.entrySet()) {
+            Threshold threshold = thresholds.get(entry.getKey());
+            ValidationReport thReport = ValidationReport.empty();
+            for (NotifyTrigger trigger : entry.getValue().values()) {
+                thReport = thReport.join(trigger.resultsToNotify());
+            }
+
+
+            finalReport.join(thReport);
+        }
+        return finalReport;
+    }
+}

+ 27 - 16
src/main/java/cz/senslog/analytics/utils/validator/Validator.java

@@ -1,18 +1,20 @@
 package cz.senslog.analytics.utils.validator;
 
 import cz.senslog.analytics.domain.*;
+import cz.senslog.analytics.domain.ComparisonOperator;
+import cz.senslog.analytics.utils.validator.domain.ValidationResult;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
-import static cz.senslog.analytics.domain.ThresholdMode.*;
+import static cz.senslog.analytics.domain.ComparisonOperator.*;
+import static java.util.Collections.emptyList;
 
 public class Validator<T> {
 
-    private static final Map<ThresholdMode, BiFunction<Double, Double, Boolean>> functions;
+    private static final Map<ComparisonOperator, BiFunction<Double, Double, Boolean>> functions;
 
     static {
         functions = new HashMap<>(6);
@@ -24,14 +26,14 @@ public class Validator<T> {
         functions.put(NE, (val, ths) -> !val.equals(ths));
     }
 
-    public static boolean checkThresholdValue(ThresholdMode mode, Double value, Double threshold) {
+    public static boolean checkThresholdValue(ComparisonOperator mode, Double value, Double threshold) {
         if (mode == null || value == null || threshold == null) return false;
         return functions.getOrDefault(mode, (val, ths) -> false).apply(value, threshold);
     }
 
-    public static boolean checkThresholdValue(ThresholdRule rule, Double value) {
+    public static boolean checkThresholdValue(ThresholdDimensionRule rule, Double value) {
         if (rule == null || value == null) return false;
-        return checkThresholdValue(rule.thresholdMode(), value, rule.value());
+        return checkThresholdValue(rule.comparisonOperator(), value, rule.value());
     }
 
     public interface AttributeMapping<T> {
@@ -61,18 +63,27 @@ public class Validator<T> {
         return this;
     }
 
-    public ValidationResult validate(T object, Threshold threshold) {
+    public ValidationResult[] validate(T object, Threshold threshold) {
         return validate(object, threshold, map);
     }
 
-    public static <T> ValidationResult validate(T object, Threshold threshold, Map<AttributeType, Function<T, Supplier<Double>>> attributeMapping) {
-        Double value = attributeMapping.getOrDefault(threshold.attributeType(), (o) -> () -> null).apply(object).get();
-        ValidationResult result = new ValidationResult(threshold.attributeType(), value);
-        for (ThresholdRule rule : threshold.rules()) {
-            if (checkThresholdValue(rule, value)) {
-                result.addRecord(rule.thresholdMode(), rule.value());
-            }
+    private static <T> ValidationResult validate(T object, ThresholdDimension dimension, Map<AttributeType, Function<T, Supplier<Double>>> attributeMapping) {
+        final Double testingValue = attributeMapping.getOrDefault(dimension.attributeType(), (o) -> () -> null).apply(object).get();
+        final List<ValidationResult.Rule> violatedRules = new ArrayList<>(dimension.rules().size());
+        boolean passed = true;
+        for (ThresholdDimensionRule rule : dimension.rules()) {
+            passed &= checkThresholdValue(rule, testingValue);
+            violatedRules.add(new ValidationResult.Rule(rule.comparisonOperator(), rule.value()));
+        }
+        return new ValidationResult(dimension.attributeType(), testingValue, passed ? violatedRules : emptyList());
+    }
+
+    private static <T> ValidationResult[] validate(T object, Threshold threshold, Map<AttributeType, Function<T, Supplier<Double>>> attributeMapping) {
+        final ValidationResult[] results = new ValidationResult[threshold.thresholdDimensions().size()];
+        int resultIndex = 0;
+        for (ThresholdDimension dimension : threshold.thresholdDimensions()) {
+            results[resultIndex++] = validate(object, dimension, attributeMapping);
         }
-        return result;
+        return results;
     }
 }

+ 14 - 0
src/main/java/cz/senslog/analytics/utils/validator/domain/ThresholdValidationResult.java

@@ -0,0 +1,14 @@
+package cz.senslog.analytics.utils.validator.domain;
+
+import cz.senslog.analytics.domain.Threshold;
+
+public record ThresholdValidationResult(Threshold threshold, ValidationResult[] validationResults) {
+
+    public boolean isViolated() {
+        boolean notValid = validationResults[0].isNotValid();
+        for (int i = 1; i < validationResults.length; i++) {
+            notValid = notValid && validationResults[i].isNotValid();
+        }
+        return notValid;
+    }
+}

+ 41 - 0
src/main/java/cz/senslog/analytics/utils/validator/domain/ValidationReport.java

@@ -0,0 +1,41 @@
+package cz.senslog.analytics.utils.validator.domain;
+
+import org.apache.logging.log4j.util.Strings;
+
+public record ValidationReport(String message, ValidationResult[] validationResults) {
+
+    public static ValidationReport empty() {
+        return new ValidationReport(null, new ValidationResult[0]);
+    }
+
+    public boolean isEmpty() {
+        return validationResults.length == 0 && Strings.isEmpty(message);
+    }
+
+    public ValidationReport join(ValidationReport validationReport) {
+        if (validationReport == null || validationReport.isEmpty()) {
+            return this;
+        }
+
+        if (this.isEmpty() && !validationReport.isEmpty()) {
+            return validationReport;
+        }
+
+        ValidationResult[] newResults = new ValidationResult[this.validationResults.length + validationReport.validationResults.length];
+        System.arraycopy(this.validationResults, 0, newResults, 0, this.validationResults.length);
+        System.arraycopy(validationReport.validationResults, 0, newResults, this.validationResults.length, validationReport.validationResults.length);
+
+        String newMessage;
+        if (this.message == null && validationReport.message == null) {
+            newMessage = null;
+        } else if (this.message == null) {
+            newMessage = validationReport.message;
+        } else if (validationReport.message == null) {
+            newMessage = this.message;
+        } else {
+            newMessage = this.message + validationReport.message;
+        }
+
+        return new ValidationReport(newMessage, newResults);
+    }
+}

+ 11 - 13
src/main/java/cz/senslog/analytics/domain/ValidationResult.java → src/main/java/cz/senslog/analytics/utils/validator/domain/ValidationResult.java

@@ -1,32 +1,34 @@
-package cz.senslog.analytics.domain;
+package cz.senslog.analytics.utils.validator.domain;
+
+import cz.senslog.analytics.domain.AttributeType;
+import cz.senslog.analytics.domain.ComparisonOperator;
 
-import java.util.ArrayList;
 import java.util.List;
 
 public class ValidationResult {
 
-    public record Record(ThresholdMode mode, double thresholdValue) {}
+    public record Rule(ComparisonOperator operator, double thresholdValue) {}
 
     private final double validatedValue;
     private final AttributeType attributeType;
-    private final List<Record> records;
+    private final List<Rule> violatedRules;
 
-    public ValidationResult(AttributeType attributeType, double validatedValue) {
+    public ValidationResult(AttributeType attributeType, double validatedValue, List<Rule> violatedRules) {
         this.attributeType = attributeType;
         this.validatedValue = validatedValue;
-        this.records = new ArrayList<>();
+        this.violatedRules = violatedRules;
     }
 
     public boolean isValid() {
-        return records.isEmpty();
+        return violatedRules.isEmpty();
     }
 
     public boolean isNotValid() {
         return !isValid();
     }
 
-    public List<Record> records() {
-        return records;
+    public List<Rule> records() {
+        return violatedRules;
     }
 
     public double validatedValue() {
@@ -36,8 +38,4 @@ public class ValidationResult {
     public AttributeType attributeType() {
         return attributeType;
     }
-
-    public void addRecord(ThresholdMode mode, double thresholdValue) {
-        records.add(new Record(mode, thresholdValue));
-    }
 }

+ 6 - 0
src/main/java/cz/senslog/analytics/utils/validator/domain/ViolationReport.java

@@ -0,0 +1,6 @@
+package cz.senslog.analytics.utils.validator.domain;
+
+import java.time.OffsetDateTime;
+
+public record ViolationReport(long datasourceId, OffsetDateTime timestamp, String message, ValidationResult[] violatedData) {
+}

+ 44 - 0
src/main/resources/openAPISpec.yaml

@@ -25,8 +25,52 @@ paths:
               schema:
                 $ref: "#/components/schemas/Error"
 
+  /observations:
+    get:
+      operationId: observationsGET
+      summary: Publish JSON schema for POST data
+      responses:
+        200:
+          description: JSON Schema as an object
+          content:
+            application/json+schema:
+              schema:
+                type: object
+    post:
+      operationId: observationsPOST
+      summary: Insert observations to process
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/ObservationList'
+      responses:
+        200:
+          description: Returns status of inserted observations
+          content:
+            application/json:
+              schema:
+                type: object
+                properties:
+                  message:
+                    type: string
+
 components:
   schemas:
+    ObservationList:
+      type: array
+      items:
+        type: object
+        properties:
+          unitId:
+            type: integer
+          sensorId:
+            type: integer
+          observedValue:
+            type: number
+          timestamp:
+            type: string
+            format: date-time
 
     Info:
       type: object

+ 43 - 0
src/test/java/cz/senslog/analytics/utils/AttributeRegexTest.java

@@ -0,0 +1,43 @@
+package cz.senslog.analytics.utils;
+
+import cz.senslog.analytics.domain.AttributeType;
+import org.junit.jupiter.api.Test;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class AttributeRegexTest {
+
+    @Test
+    public void regex() {
+        String attributes = Stream.of(AttributeType.values()).map(AttributeType::name).map(String::toLowerCase).collect(Collectors.joining("|"));
+        String variables = "timestamp|module|datasource";
+
+        final String attrRegex = "(?<attribute>\\$("+attributes+"))(\\((?<format>\\S*)\\))?";
+        final String varRegex = "(?<variable>\\$("+variables+"))(\\((?<format>\\S*)\\))?";
+
+
+        final String test1 = "Hodnota $val(%.2f) překročila práh $VAL >= 25 °C v čase $timestamp.";
+        final String test2 = "$val(%.2f) >= 25 & 12h <= $timestamp(HH:mm) <= 15h v čase $timestamp.";
+        final String test3 = "$timestamp | $val";
+
+        final String[] test = {test1, test2, test3};
+
+        Pattern attrPattern = Pattern.compile(attrRegex, Pattern.CASE_INSENSITIVE | Pattern.MULTILINE | Pattern.DOTALL );
+        Pattern varPattern = Pattern.compile(varRegex, Pattern.CASE_INSENSITIVE | Pattern.MULTILINE | Pattern.DOTALL );
+
+        for (String s : test) {
+            System.out.println(s);
+            Matcher m1 = attrPattern.matcher(s);
+            while (m1.find()) {
+                System.out.format("\tAttr: %s\tFormat: %s\n", m1.group("attribute"), m1.group("format"));
+            }
+            Matcher m2 = varPattern.matcher(s);
+            while (m2.find()) {
+                System.out.format("\tVar: %s\tFormat: %s\n", m2.group("variable"), m2.group("format"));
+            }
+        }
+    }
+}

+ 184 - 14
src/test/java/cz/senslog/analytics/utils/ThresholdCheckerTest.java

@@ -1,38 +1,204 @@
 package cz.senslog.analytics.utils;
 
 import cz.senslog.analytics.domain.*;
+import cz.senslog.analytics.domain.ComparisonOperator;
 import cz.senslog.analytics.utils.validator.NotifyTrigger;
 import cz.senslog.analytics.utils.validator.ThresholdChecker;
 import cz.senslog.analytics.utils.validator.Validator;
+import cz.senslog.analytics.utils.validator.domain.ValidationResult;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
 import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Stream;
 
+import static cz.senslog.analytics.domain.AttributeType.TIME;
+import static cz.senslog.analytics.domain.AttributeType.VAL;
+import static cz.senslog.analytics.domain.ComparisonOperator.*;
+import static cz.senslog.analytics.utils.validator.NotifyTrigger.Mode.INSTANT;
+import static cz.senslog.analytics.utils.validator.NotifyTrigger.Mode.ON_CHANGE;
 import static org.junit.jupiter.api.Assertions.*;
 
 class ThresholdCheckerTest {
 
     private static final Sensor SENSOR = new Sensor(100L, 1000L, 1001L);
 
+    private static Validator<Observation> validator;
+
+    @BeforeAll
+    static void setUp() {
+        validator = Validator.<Observation>create()
+                .addMapping(VAL, o -> o::value)
+                .addMapping(AttributeType.TIME, o -> () ->  (double) o.timestamp().toLocalTime().toSecondOfDay());
+    }
+
+    @Test
+    void test_1() {
+
+        final OffsetDateTime startInterval = OffsetDateTime.of(2024, 1, 1, 12, 0, 0, 0, ZoneOffset.ofHours(2));
+        final OffsetDateTime endInterval = OffsetDateTime.of(2024, 1, 1, 15, 0, 0, 0, ZoneOffset.ofHours(2));
+
+
+        ThresholdChecker<Observation> checker = ThresholdChecker.create(
+                List.of(
+                        Threshold.of(1, SENSOR.id(), true, false, true, null, List.of(
+                                ThresholdDimension.of(1, INSTANT, VAL, List.of(
+                                            ThresholdDimensionRule.of(GT, 25)
+                                        )
+                                ),
+                                ThresholdDimension.of(2, INSTANT, TIME, List.of(
+                                            ThresholdDimensionRule.of(GE, 43200),
+                                            ThresholdDimensionRule.of(LE, 54000)
+                                        )
+                                )
+                        ))
+                ),
+                validator,
+                (alert) -> {
+                    assertEquals(SENSOR.id(), alert.datasourceId());
+                    assertFalse(alert.timestamp().isBefore(startInterval) || alert.timestamp().isAfter(endInterval));
+                    assertEquals(2, alert.violatedData().length);
+                },
+                true
+        );
+
+        List<Observation> passedObservations = Stream.of(
+        /* pass | none  */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 25.3, OffsetDateTime.of(2024, 1, 1, 11, 59, 59, 0, ZoneOffset.ofHours(2)))),
+        /* fail | alert */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 25.3, OffsetDateTime.of(2024, 1, 1, 12, 0, 0, 0, ZoneOffset.ofHours(2)))),
+        /* fail | alert */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 25.3, OffsetDateTime.of(2024, 1, 1, 12, 0, 1, 0, ZoneOffset.ofHours(2)))),
+        /* fail | alert */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 25.3, OffsetDateTime.of(2024, 1, 1, 14, 59, 59, 0, ZoneOffset.ofHours(2)))),
+        /* fail | alert */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 25.3, OffsetDateTime.of(2024, 1, 1, 15, 0, 0, 0, ZoneOffset.ofHours(2)))),
+        /* pass | none  */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 25.3, OffsetDateTime.of(2024, 1, 1, 15, 0, 1, 0, ZoneOffset.ofHours(2))))
+        ).filter(checker::check).toList();
+
+        assertEquals(2, passedObservations.size());
+    }
+
+    @Test
+    void test_2() {
+
+        final OffsetDateTime startInterval = OffsetDateTime.of(2024, 1, 1, 12, 0, 0, 0, ZoneOffset.ofHours(2));
+        final OffsetDateTime endInterval = OffsetDateTime.of(2024, 1, 1, 15, 0, 0, 0, ZoneOffset.ofHours(2));
+
+
+        ThresholdChecker<Observation> checker = ThresholdChecker.create(
+                List.of(
+                        Threshold.of(1, SENSOR.id(), true, false, true, null, List.of(
+                                ThresholdDimension.of(1, ON_CHANGE, VAL, List.of(
+                                                ThresholdDimensionRule.of(GT, 25)
+                                        )
+                                ),
+                                ThresholdDimension.of(2, INSTANT, TIME, List.of(
+                                                ThresholdDimensionRule.of(GE, 43200),
+                                                ThresholdDimensionRule.of(LE, 54000)
+                                        )
+                                )
+                        )),
+                        Threshold.of(2, SENSOR.id(), true, false, true, null, List.of(
+                                ThresholdDimension.of(3, INSTANT, VAL, List.of(
+                                                ThresholdDimensionRule.of(GT, 25)
+                                        )
+                                ),
+                                ThresholdDimension.of(4, ON_CHANGE, TIME, List.of(
+                                                ThresholdDimensionRule.of(GE, 43200),
+                                                ThresholdDimensionRule.of(LE, 54000)
+                                        )
+                                )
+                        ))
+                ),
+                validator,
+                (alert) -> {
+                    assertEquals(SENSOR.id(), alert.datasourceId());
+                    assertTrue(alert.timestamp().isEqual(startInterval.plusSeconds(1)) || alert.timestamp().isEqual(endInterval));
+                    assertEquals(2, alert.violatedData().length);
+                },
+                true
+        );
+
+        List<Observation> passedObservations = Stream.of(
+        /* pass | none  */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 24.3, OffsetDateTime.of(2024, 1, 1, 11, 59, 59, 0, ZoneOffset.ofHours(2)))),
+        /* pass | none  */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 24.3, OffsetDateTime.of(2024, 1, 1, 12, 0, 0, 0, ZoneOffset.ofHours(2)))),
+        /* fail | alert */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 25.3, OffsetDateTime.of(2024, 1, 1, 12, 0, 1, 0, ZoneOffset.ofHours(2)))),
+        /* fail | none  */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 25.3, OffsetDateTime.of(2024, 1, 1, 14, 59, 59, 0, ZoneOffset.ofHours(2)))),
+        /* pass | alert */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 24.3, OffsetDateTime.of(2024, 1, 1, 15, 0, 0, 0, ZoneOffset.ofHours(2)))),
+        /* pass | none  */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 25.3, OffsetDateTime.of(2024, 1, 1, 15, 0, 1, 0, ZoneOffset.ofHours(2))))
+        ).filter(checker::check).toList();
+
+        assertEquals(4, passedObservations.size());
+    }
+
+    @Test
+    void test_3() {
+
+        final OffsetDateTime startInterval = OffsetDateTime.of(2024, 1, 1, 12, 0, 0, 0, ZoneOffset.ofHours(2));
+        final OffsetDateTime endInterval = OffsetDateTime.of(2024, 1, 1, 15, 0, 0, 0, ZoneOffset.ofHours(2));
+
+
+        ThresholdChecker<Observation> checker = ThresholdChecker.create(
+                List.of(
+                        Threshold.of(1, SENSOR.id(), true, false, true, null, List.of(
+                                ThresholdDimension.of(1, ON_CHANGE, VAL, List.of(
+                                                ThresholdDimensionRule.of(GT, 25)
+                                        )
+                                ),
+                                ThresholdDimension.of(2, INSTANT, TIME, List.of(
+                                                ThresholdDimensionRule.of(GE, 43200),
+                                                ThresholdDimensionRule.of(LE, 54000)
+                                        )
+                                )
+                        )),
+                        Threshold.of(2, SENSOR.id(), true, false, true, null, List.of(
+                                ThresholdDimension.of(3, INSTANT, VAL, List.of(
+                                                ThresholdDimensionRule.of(GT, 25)
+                                        )
+                                ),
+                                ThresholdDimension.of(4, ON_CHANGE, TIME, List.of(
+                                                ThresholdDimensionRule.of(GE, 43200),
+                                                ThresholdDimensionRule.of(LE, 54000)
+                                        )
+                                )
+                        ))
+                ),
+                validator,
+                (alert) -> {
+                    assertEquals(SENSOR.id(), alert.datasourceId());
+                    assertTrue(alert.timestamp().isEqual(startInterval) || alert.timestamp().isEqual(endInterval));
+                    assertEquals(2, alert.violatedData().length);
+                },
+                true
+        );
+
+        List<Observation> passedObservations = Stream.of(
+                /* pass | none  */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 25.3, OffsetDateTime.of(2024, 1, 1, 11, 59, 59, 0, ZoneOffset.ofHours(2)))),
+                /* fail | alert */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 25.3, OffsetDateTime.of(2024, 1, 1, 12, 0, 0, 0, ZoneOffset.ofHours(2)))),
+                /* fail | none  */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 25.3, OffsetDateTime.of(2024, 1, 1, 12, 0, 1, 0, ZoneOffset.ofHours(2)))),
+                /* fail | none  */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 25.3, OffsetDateTime.of(2024, 1, 1, 14, 59, 59, 0, ZoneOffset.ofHours(2)))),
+                /* pass | alert */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 24.3, OffsetDateTime.of(2024, 1, 1, 15, 0, 0, 0, ZoneOffset.ofHours(2)))),
+                /* pass | none  */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 24.3, OffsetDateTime.of(2024, 1, 1, 15, 0, 1, 0, ZoneOffset.ofHours(2))))
+        ).filter(checker::check).toList();
+
+        assertEquals(3, passedObservations.size());
+    }
+
     @Test
     void check_size_1_INSTANT() {
 
         ThresholdChecker<Observation> checker = new ThresholdChecker<>(
                 List.of(
-                    new Threshold(1, SENSOR.id(), NotifyTrigger.Mode.INSTANT, AttributeType.VAL, true, false, List.of(
-                            new ThresholdRule(ThresholdMode.GT, 10.0)
+                    new Threshold(1, SENSOR.id(), true, false, true, null, List.of(
+                            ThresholdDimension.of(1, INSTANT, VAL, List.of(ThresholdDimensionRule.of(GT, 10.0)))
                     ))
                 ),
                 Validator.<Observation>create()
-                        .addMapping(AttributeType.VAL, o -> o::value),
+                        .addMapping(VAL, o -> o::value),
                 (report) -> {
                     assertEquals(SENSOR.id(), report.datasourceId());
                     assertEquals(1, report.violatedData().length);
                     ValidationResult valResult = report.violatedData()[0];
-                    assertEquals(AttributeType.VAL, valResult.attributeType());
+                    assertEquals(VAL, valResult.attributeType());
                     assertEquals(23.3, valResult.validatedValue());
                     assertEquals(1, valResult.records().size());
                 },
@@ -50,17 +216,17 @@ class ThresholdCheckerTest {
 
         ThresholdChecker<Observation> checker = new ThresholdChecker<>(
                 List.of(
-                        new Threshold(1, SENSOR.id(), NotifyTrigger.Mode.ON_CHANGE, AttributeType.VAL, true, false, List.of(
-                                new ThresholdRule(ThresholdMode.GT, 10.0)
+                        new Threshold(1, SENSOR.id(), true,false, true, null, List.of(
+                                ThresholdDimension.of(1, NotifyTrigger.Mode.ON_CHANGE, VAL, List.of(ThresholdDimensionRule.of(GT, 10.0)))
                         ))
                 ),
                 Validator.<Observation>create()
-                        .addMapping(AttributeType.VAL, o -> o::value),
+                        .addMapping(VAL, o -> o::value),
                 (report) -> {
                     assertEquals(SENSOR.id(), report.datasourceId());
                     assertEquals(1, report.violatedData().length);
                     ValidationResult valResult = report.violatedData()[0];
-                    assertEquals(AttributeType.VAL, valResult.attributeType());
+                    assertEquals(VAL, valResult.attributeType());
                     assertEquals(25.3, valResult.validatedValue());
                     assertEquals(1, valResult.records().size());
                 },
@@ -80,17 +246,17 @@ class ThresholdCheckerTest {
         AtomicInteger reportStep = new AtomicInteger(1);
         ThresholdChecker<Observation> checker = new ThresholdChecker<>(
                 List.of(
-                        new Threshold(1, SENSOR.id(), NotifyTrigger.Mode.ON_CHANGE, AttributeType.VAL, true, false, List.of(
-                                new ThresholdRule(ThresholdMode.GT, 10.0)
+                        new Threshold(1, SENSOR.id(), true,false, true, null, List.of(
+                                ThresholdDimension.of(1, NotifyTrigger.Mode.ON_CHANGE, VAL, List.of(ThresholdDimensionRule.of(GT, 10.0)))
                         ))
                 ),
                 Validator.<Observation>create()
-                        .addMapping(AttributeType.VAL, o -> o::value),
+                        .addMapping(VAL, o -> o::value),
                 (report) -> {
                     assertEquals(SENSOR.id(), report.datasourceId());
                     assertEquals(1, report.violatedData().length);
                     ValidationResult valResult = report.violatedData()[0];
-                    assertEquals(AttributeType.VAL, valResult.attributeType());
+                    assertEquals(VAL, valResult.attributeType());
 
                     if (reportStep.compareAndSet(1, 2)) {
                         assertEquals(25.3, valResult.validatedValue());
@@ -121,8 +287,12 @@ class ThresholdCheckerTest {
 
         ThresholdChecker<DoubleStatistics> checker = new ThresholdChecker<>(
                 List.of(
-                        new Threshold(1, -1, NotifyTrigger.Mode.INSTANT, AttributeType.MIN, true, false, List.of(new ThresholdRule(ThresholdMode.LT, 0.0))),
-                        new Threshold(1, -1, NotifyTrigger.Mode.INSTANT, AttributeType.MAX, true, false, List.of(new ThresholdRule(ThresholdMode.GT, 5.0)))
+                        new Threshold(1, -1, true, false, true, null, List.of(
+                                ThresholdDimension.of(1, INSTANT, AttributeType.MIN, List.of(ThresholdDimensionRule.of( ComparisonOperator.LT, 0.0))))
+                        ),
+                        new Threshold(1, -1, true, false, true, null, List.of(
+                                ThresholdDimension.of(2, INSTANT, AttributeType.MAX, List.of(ThresholdDimensionRule.of(GT, 5.0))))
+                        )
                 ),
                 Validator.<DoubleStatistics>create()
                         .addMapping(AttributeType.MIN, s -> s::min)

+ 183 - 0
src/test/java/cz/senslog/analytics/utils/newvalidator/ThresholdCheckerTest.java

@@ -0,0 +1,183 @@
+package cz.senslog.analytics.utils.newvalidator;
+
+import cz.senslog.analytics.domain.*;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static cz.senslog.analytics.domain.AttributeType.TIME;
+import static cz.senslog.analytics.domain.AttributeType.VAL;
+import static cz.senslog.analytics.domain.ComparisonOperator.*;
+import static cz.senslog.analytics.utils.newvalidator.NotifyTrigger.Mode.INSTANT;
+import static cz.senslog.analytics.utils.newvalidator.NotifyTrigger.Mode.ON_CHANGE;
+import static org.junit.jupiter.api.Assertions.*;
+
+class ThresholdCheckerTest {
+
+    private static final Sensor SENSOR = new Sensor(100L, 1000L, 1001L);
+
+    private static Validator<Observation> validator;
+
+    @BeforeAll
+    static void setUp() {
+        validator = Validator.<Observation>create()
+                .addMapping(VAL, o -> o::value)
+                .addMapping(AttributeType.TIME, o -> () ->  (double) o.timestamp().toLocalTime().toSecondOfDay());
+    }
+
+    @Test
+    void test_1() {
+
+        final OffsetDateTime startInterval = OffsetDateTime.of(2024, 1, 1, 12, 0, 0, 0, ZoneOffset.ofHours(2));
+        final OffsetDateTime endInterval = OffsetDateTime.of(2024, 1, 1, 15, 0, 0, 0, ZoneOffset.ofHours(2));
+
+
+        ThresholdChecker<Observation> checker = ThresholdChecker.create(
+                "testModule",
+                List.of(
+                        new Threshold(1, SENSOR.id(), false, true, "$val", List.of(
+                                new Threshold.Dimension(VAL, INSTANT, List.of(
+                                                new Threshold.Dimension.Rule(GT, 25)
+                                        )
+                                ),
+                                new Threshold.Dimension(TIME, INSTANT, List.of(
+                                            new Threshold.Dimension.Rule(GE, 43200),
+                                            new Threshold.Dimension.Rule(LE, 54000)
+                                        )
+                                )
+                        ))
+                ),
+                validator,
+                (alert) -> {
+                    assertEquals(25.3, Double.parseDouble(alert.message()));
+                    assertEquals(SENSOR.id(), alert.dataSourceId());
+                    assertFalse(alert.timestamp().isBefore(startInterval) || alert.timestamp().isAfter(endInterval));
+                },
+                true
+        );
+
+        List<Observation> passedObservations = Stream.of(
+                /* pass | none  */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 25.1, OffsetDateTime.of(2024, 1, 1, 11, 59, 59, 0, ZoneOffset.ofHours(2)))),
+                /* fail | alert */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 25.3, OffsetDateTime.of(2024, 1, 1, 12, 0, 0, 0, ZoneOffset.ofHours(2)))),
+                /* fail | alert */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 25.3, OffsetDateTime.of(2024, 1, 1, 12, 0, 1, 0, ZoneOffset.ofHours(2)))),
+                /* fail | alert */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 25.3, OffsetDateTime.of(2024, 1, 1, 14, 59, 59, 0, ZoneOffset.ofHours(2)))),
+                /* fail | alert */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 25.3, OffsetDateTime.of(2024, 1, 1, 15, 0, 0, 0, ZoneOffset.ofHours(2)))),
+                /* pass | none  */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 25.1, OffsetDateTime.of(2024, 1, 1, 15, 0, 1, 0, ZoneOffset.ofHours(2))))
+        ).filter(checker::check).toList();
+
+        assertEquals(2, passedObservations.size());
+    }
+
+    @Test
+    void test_2() {
+
+        final OffsetDateTime startInterval = OffsetDateTime.of(2024, 1, 1, 12, 0, 0, 0, ZoneOffset.ofHours(2));
+        final OffsetDateTime endInterval = OffsetDateTime.of(2024, 1, 1, 15, 0, 0, 0, ZoneOffset.ofHours(2));
+
+
+        ThresholdChecker<Observation> checker = ThresholdChecker.create(
+                "testModule",
+                List.of(
+                        new Threshold(1, SENSOR.id(), false, true, "$val(%.2f) >= 25 & 12h <= $timestamp(HH:mm) <= 15h v čase $timestamp.", List.of(
+                                new Threshold.Dimension(VAL, ON_CHANGE, List.of(
+                                        new Threshold.Dimension.Rule(GT, 25)
+                                )
+                                ),
+                                new Threshold.Dimension(TIME, INSTANT, List.of(
+                                        new Threshold.Dimension.Rule(GE, 43200), // 12h
+                                        new Threshold.Dimension.Rule(LE, 54000) // 15h
+                                )
+                                )
+                        )),
+                        new Threshold(2, SENSOR.id(), false, true, null, List.of(
+                                new Threshold.Dimension(VAL, INSTANT, List.of(
+                                        new Threshold.Dimension.Rule(GT, 25)
+                                    )
+                                ),
+                                new Threshold.Dimension(TIME, ON_CHANGE, List.of(
+                                        new Threshold.Dimension.Rule(GE, 43200), // 12h
+                                        new Threshold.Dimension.Rule(LE, 54000) // 15h
+                                    )
+                                )
+                        ))
+                ),
+                validator,
+                (alert) -> {
+                    System.out.println(alert.message()+"\n");
+                    assertEquals(SENSOR.id(), alert.dataSourceId());
+                    assertTrue(alert.timestamp().isEqual(startInterval.plusSeconds(1)) || alert.timestamp().isEqual(endInterval));
+                },
+                true
+        );
+
+        List<Observation> passedObservations = Stream.of(
+                /* pass | none  */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 24.1, OffsetDateTime.of(2024, 1, 1, 11, 59, 59, 0, ZoneOffset.ofHours(2)))),
+                /* pass | none  */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 24.2, OffsetDateTime.of(2024, 1, 1, 12, 0, 0, 0, ZoneOffset.ofHours(2)))),
+                /* fail | alert */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 25.1, OffsetDateTime.of(2024, 1, 1, 12, 0, 1, 0, ZoneOffset.ofHours(2)))),
+                /* fail | none  */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 25.2, OffsetDateTime.of(2024, 1, 1, 14, 59, 59, 0, ZoneOffset.ofHours(2)))),
+                /* pass | alert */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 24.3, OffsetDateTime.of(2024, 1, 1, 15, 0, 0, 0, ZoneOffset.ofHours(2)))),
+                /* pass | none  */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 24.3, OffsetDateTime.of(2024, 1, 1, 15, 0, 1, 0, ZoneOffset.ofHours(2)))),
+                /* pass | none  */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 25.3, OffsetDateTime.of(2024, 1, 1, 15, 1, 1, 0, ZoneOffset.ofHours(2))))
+        ).filter(checker::check).toList();
+
+        assertEquals(5, passedObservations.size());
+    }
+
+    @Test
+    void test_3() {
+
+        final OffsetDateTime startInterval = OffsetDateTime.of(2024, 1, 1, 12, 0, 0, 0, ZoneOffset.ofHours(2));
+        final OffsetDateTime endInterval = OffsetDateTime.of(2024, 1, 1, 15, 0, 0, 0, ZoneOffset.ofHours(2));
+
+
+        ThresholdChecker<Observation> checker = ThresholdChecker.create(
+                "testModule",
+                List.of(
+                        new Threshold(1, SENSOR.id(), false, true, null, List.of(
+                                new Threshold.Dimension(VAL, ON_CHANGE, List.of(
+                                        new Threshold.Dimension.Rule(GT, 25)
+                                    )
+                                ),
+                                new Threshold.Dimension(TIME, INSTANT, List.of(
+                                        new Threshold.Dimension.Rule(GE, 43200),
+                                        new Threshold.Dimension.Rule(LE, 54000)
+                                    )
+                                )
+                        )),
+                        new Threshold(2, SENSOR.id(), false, true, null, List.of(
+                                new Threshold.Dimension(VAL, INSTANT, List.of(
+                                        new Threshold.Dimension.Rule(GT, 25)
+                                    )
+                                ),
+                                new Threshold.Dimension(TIME, ON_CHANGE, List.of(
+                                        new Threshold.Dimension.Rule(GE, 43200),
+                                        new Threshold.Dimension.Rule(LE, 54000)
+                                    )
+                                )
+                        ))
+                ),
+                validator,
+                (alert) -> {
+                    System.out.println(alert.message()+"\n");
+                    assertEquals(SENSOR.id(), alert.dataSourceId());
+                    assertTrue(alert.timestamp().isEqual(startInterval) || alert.timestamp().isEqual(endInterval));
+                },
+                true
+        );
+
+        List<Observation> passedObservations = Stream.of(
+                /* pass | none  */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 25.3, OffsetDateTime.of(2024, 1, 1, 11, 59, 59, 0, ZoneOffset.ofHours(2)))),
+                /* fail | alert */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 25.3, OffsetDateTime.of(2024, 1, 1, 12, 0, 0, 0, ZoneOffset.ofHours(2)))),
+                /* fail | none  */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 25.3, OffsetDateTime.of(2024, 1, 1, 12, 0, 1, 0, ZoneOffset.ofHours(2)))),
+                /* fail | none  */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 25.3, OffsetDateTime.of(2024, 1, 1, 14, 59, 59, 0, ZoneOffset.ofHours(2)))),
+                /* pass | alert */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 24.3, OffsetDateTime.of(2024, 1, 1, 15, 0, 0, 0, ZoneOffset.ofHours(2)))),
+                /* pass | none  */ Observation.of(SENSOR, RawObservation.of(SENSOR.unitId(), SENSOR.sensorId(), 24.3, OffsetDateTime.of(2024, 1, 1, 15, 0, 1, 0, ZoneOffset.ofHours(2))))
+        ).filter(checker::check).toList();
+
+        assertEquals(3, passedObservations.size());
+    }
+}

+ 41 - 0
src/test/java/cz/senslog/analytics/utils/validator/MessageFormatterTest.java

@@ -0,0 +1,41 @@
+package cz.senslog.analytics.utils.validator;
+
+import cz.senslog.analytics.domain.AttributeType;
+import cz.senslog.analytics.domain.ThresholdDimension;
+import cz.senslog.analytics.domain.ThresholdViolationAlert;
+import cz.senslog.analytics.utils.newvalidator.MessageFormatter;
+import cz.senslog.analytics.utils.validator.domain.ValidationResult;
+import org.junit.jupiter.api.Test;
+
+import java.time.OffsetDateTime;
+import java.util.List;
+
+import static java.util.Collections.emptyList;
+import static org.junit.jupiter.api.Assertions.*;
+
+class MessageFormatterTest {
+
+    @Test
+    void isValid() {
+
+        final String test1 = "V Modulu $module hodnota $val(%.2f) překročila práh $VAL >= 25 °C v čase $timestamp.";
+        final String test2 = "$val >= 25 & 12h <= $timestamp(HH:mm) <= 15h v čase $timestamp.";
+        final String test3 = "$timestamp | $val";
+
+        List<ThresholdDimension> thresholdDimensions = List.of(
+                ThresholdDimension.of(1, null, AttributeType.VAL, null),
+                ThresholdDimension.of(2, null, AttributeType.TIME, null)
+        );
+
+//        MessageFormatter formatter = MessageFormatter.create();
+//        for (String test : new String[]{test1, test2, test3}) {
+//            assertTrue(formatter.isValid(test, thresholdDimensions));
+//
+//            String message = formatter.format(ThresholdViolationAlert.of("moduleName", 1, "sourceName", test, new ValidationResult[]{
+//                    new ValidationResult(AttributeType.VAL, 25.3432, emptyList()), new ValidationResult(AttributeType.TIME, 48800, emptyList())
+//            }, OffsetDateTime.now()));
+//
+//            System.out.println(message);
+//        }
+    }
+}

+ 75 - 0
src/test/java/cz/senslog/analytics/utils/validator/OnChangeNotifyTriggerTest.java

@@ -0,0 +1,75 @@
+package cz.senslog.analytics.utils.validator;
+
+import cz.senslog.analytics.domain.*;
+import cz.senslog.analytics.domain.ComparisonOperator;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+class OnChangeNotifyTriggerTest {
+
+    @Test
+    void value_true_time_true() {
+        Threshold th = new Threshold(1, 1, true, true, true,null, List.of(
+                ThresholdDimension.of(1, NotifyTrigger.Mode.ON_CHANGE, AttributeType.VAL, List.of(
+                        ThresholdDimensionRule.of(ComparisonOperator.GE, 25)
+                )),
+                ThresholdDimension.of(2, NotifyTrigger.Mode.INSTANT, AttributeType.TIME, List.of(
+                        ThresholdDimensionRule.of(ComparisonOperator.GE, 43200),
+                        ThresholdDimensionRule.of(ComparisonOperator.LE, 54000)
+                ))
+        ));
+
+/*
+        NotifyTrigger notifyTrigger = th.notifyTriggerMode().createInstance();
+
+        ValidationResult vr1 = new ValidationResult(AttributeType.VAL, 25.1);
+        vr1.addViolatedRule(ComparisonOperator.GE, 25);
+
+        ValidationResult vr2 = new ValidationResult(AttributeType.TIME, 43200);
+        vr2.addViolatedRule(ComparisonOperator.GE, 43200);
+        vr2.addViolatedRule(ComparisonOperator.LE, 54000);
+
+        notifyTrigger.accept(th, vr1, vr2);
+
+        assertTrue(notifyTrigger.shouldNotify());
+
+        ValidationReport validationReport = notifyTrigger.resultsToNotify();
+        assertFalse(validationReport.isEmpty());
+        assertTrue(Strings.isBlank(validationReport.message()));
+
+        assertEquals(validationReport.validationResults().length, 2);
+
+ */
+    }
+
+    @Test
+    void value_false_time_true() {
+        Threshold th = new Threshold(1, 1, true, true, true, null, List.of(
+                ThresholdDimension.of(1, NotifyTrigger.Mode.ON_CHANGE, AttributeType.VAL, List.of(
+                        ThresholdDimensionRule.of(ComparisonOperator.GE, 25)
+                )),
+                ThresholdDimension.of(2, NotifyTrigger.Mode.INSTANT, AttributeType.TIME, List.of(
+                        ThresholdDimensionRule.of(ComparisonOperator.GE, 43200),
+                        ThresholdDimensionRule.of(ComparisonOperator.LE, 54000)
+                ))
+        ));
+
+
+        /*
+        NotifyTrigger notifyTrigger = th.notifyTriggerMode().createInstance();
+
+        ValidationResult vr1 = new ValidationResult(AttributeType.VAL, 24.9);
+
+        ValidationResult vr2 = new ValidationResult(AttributeType.TIME, 43200);
+        vr2.addViolatedRule(ComparisonOperator.GE, 43200);
+        vr2.addViolatedRule(ComparisonOperator.LE, 54000);
+
+        notifyTrigger.accept(th, vr1, vr2);
+
+        assertFalse(notifyTrigger.shouldNotify());
+        assertTrue(notifyTrigger.resultsToNotify().isEmpty());
+
+         */
+    }
+}