Browse Source

Added support for Analytics service, prepared support for Alert service, added 'observation' to the alert detail info

Lukas Cerny 1 year ago
parent
commit
1344aaef3c
29 changed files with 725 additions and 185 deletions
  1. 9 10
      build.gradle
  2. 1 1
      docker-compose.yaml
  3. 8 3
      docker.dev.env
  4. 2 1
      init.sql
  5. 14 9
      src/main/java/cz/senslog/telemetry/app/Application.java
  6. 39 3
      src/main/java/cz/senslog/telemetry/app/PropertyConfig.java
  7. 6 4
      src/main/java/cz/senslog/telemetry/app/VertxDeployer.java
  8. 4 0
      src/main/java/cz/senslog/telemetry/database/domain/AlertStatus.java
  9. 10 4
      src/main/java/cz/senslog/telemetry/database/domain/CampaignUnitAlert.java
  10. 15 0
      src/main/java/cz/senslog/telemetry/database/domain/Location.java
  11. 51 0
      src/main/java/cz/senslog/telemetry/database/domain/SensLogObservation.java
  12. 33 0
      src/main/java/cz/senslog/telemetry/database/domain/UnitTelemetry.java
  13. 3 2
      src/main/java/cz/senslog/telemetry/database/repository/CachedMapLogRepository.java
  14. 109 31
      src/main/java/cz/senslog/telemetry/database/repository/MapLogRepository.java
  15. 4 4
      src/main/java/cz/senslog/telemetry/database/repository/SensLogRepository.java
  16. 65 0
      src/main/java/cz/senslog/telemetry/module/AlertModule.java
  17. 102 0
      src/main/java/cz/senslog/telemetry/module/AnalyticModule.java
  18. 12 0
      src/main/java/cz/senslog/telemetry/module/EventBusModulePaths.java
  19. 26 12
      src/main/java/cz/senslog/telemetry/server/Fm4exSocketHandler.java
  20. 1 1
      src/main/java/cz/senslog/telemetry/server/HttpVertxServer.java
  21. 1 1
      src/main/java/cz/senslog/telemetry/server/TCPVertxServer.java
  22. 2 1
      src/main/java/cz/senslog/telemetry/server/ws/ContentType.java
  23. 105 75
      src/main/java/cz/senslog/telemetry/server/ws/OpenAPIHandler.java
  24. 13 3
      src/main/resources/openAPISpec.yaml
  25. 10 10
      src/test/java/cz/senslog/telemetry/MockSensLogRepository.java
  26. 53 0
      src/test/java/cz/senslog/telemetry/database/repository/MapLogRepositoryTest.java
  27. 19 6
      src/test/java/cz/senslog/telemetry/server/Fm4exSocketHandlerTest.java
  28. 7 3
      src/test/java/cz/senslog/telemetry/server/ws/OpenAPIHandlerTest.java
  29. 1 1
      src/test/resources/tests.junit.env

+ 9 - 10
build.gradle

@@ -8,6 +8,7 @@ plugins {
 group projectGroup
 version projectVersion
 
+jar.archiveFileName = "telemetry.jar"
 
 application {
     mainClass = 'cz.senslog.telemetry.app.Main'
@@ -44,8 +45,6 @@ jar {
     }
 }
 
-jar.archiveFileName = "telemetry.jar"
-
 test {
     useJUnitPlatform()
 }
@@ -54,12 +53,12 @@ dependencies {
     implementation 'org.apache.logging.log4j:log4j-api:2.22.0'
     implementation 'org.apache.logging.log4j:log4j-core:2.22.0'
 
-    implementation 'io.vertx:vertx-core:4.5.3'
-    implementation 'io.vertx:vertx-web:4.5.3'
-    implementation 'io.vertx:vertx-web-openapi:4.5.3'
-    implementation 'io.vertx:vertx-auth-jwt:4.5.3'
-    implementation 'io.vertx:vertx-pg-client:4.5.3'
-    implementation 'org.postgresql:postgresql:42.7.1'
+    implementation 'io.vertx:vertx-core:4.5.7'
+    implementation 'io.vertx:vertx-web:4.5.7'
+    implementation 'io.vertx:vertx-web-openapi:4.5.7'
+    implementation 'io.vertx:vertx-auth-jwt:4.5.7'
+    implementation 'io.vertx:vertx-pg-client:4.5.7'
+    implementation 'org.postgresql:postgresql:42.7.3'
     implementation 'com.ongres.scram:client:2.1'
 
 
@@ -67,8 +66,8 @@ dependencies {
     testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.9.2'
     testImplementation 'uk.org.webcompere:system-stubs-jupiter:2.1.5'
     testImplementation 'org.mockito:mockito-core:5.3.1'
-    testImplementation 'io.vertx:vertx-junit5:4.5.3'
-    testImplementation 'io.vertx:vertx-unit:4.5.3'
+    testImplementation 'io.vertx:vertx-junit5:4.5.7'
+    testImplementation 'io.vertx:vertx-unit:4.5.7'
     testImplementation 'org.assertj:assertj-core:3.24.2'
     testImplementation 'org.openapi4j:openapi-parser:1.0.7'
     testImplementation 'org.openapi4j:openapi-schema-validator:1.0.7'

+ 1 - 1
docker-compose.yaml

@@ -19,7 +19,7 @@ services:
     env_file:
       - docker.dev.env
     ports:
-      - "8080:8080"
+      - "8085:8085"
       - "5005:5005"
     depends_on:
       - telemetry-db

+ 8 - 3
docker.dev.env

@@ -1,10 +1,10 @@
 # Servers
-SERVER_HTTP_PORT=8080
+SERVER_HTTP_PORT=8085
 SERVER_TCP_PORT=9999
 
 # Database properties
 DATABASE_HOST=172.17.0.1
-DATABASE_PORT=5432
+DATABASE_PORT=5433
 DATABASE_NAME=maplog
 DATABASE_USER=maplog_app
 DATABASE_PASSWORD=MAPlog
@@ -14,4 +14,9 @@ DATABASE_POOL_SIZE=5
 AUTH_DISABLED=true
 AUTH_KEYSTORE_PATH=/app/keystore.jceks
 AUTH_KEYSTORE_TYPE=PKCS12
-AUTH_KEYSTORE_PASSWORD=SENSlog
+AUTH_KEYSTORE_PASSWORD=SENSlog
+
+# Modules
+MODULE_ANALYTIC_DISABLED=false
+MODULE_ANALYTIC_HOST=172.17.0.1
+MODULE_ANALYTIC_PORT=8080

+ 2 - 1
init.sql

@@ -71,7 +71,8 @@ CREATE TABLE maplog.obs_telemetry (
     observed_values jsonb NOT NULL,
     the_geom public.geometry NOT NULL,
     speed INTEGER NOT NULL,
-    time_received TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL
+    time_received TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
+    UNIQUE (unit_id, time_stamp) -- can not have multiple measurements on the same unit at the same time
 );
 
 ALTER TABLE maplog.obs_telemetry OWNER TO senslog;

+ 14 - 9
src/main/java/cz/senslog/telemetry/app/Application.java

@@ -2,6 +2,8 @@ package cz.senslog.telemetry.app;
 
 import cz.senslog.telemetry.database.repository.CachedMapLogRepository;
 import cz.senslog.telemetry.database.repository.MapLogRepository;
+import cz.senslog.telemetry.module.AlertModule;
+import cz.senslog.telemetry.module.AnalyticModule;
 import cz.senslog.telemetry.server.HttpVertxServer;
 import cz.senslog.telemetry.server.TCPVertxServer;
 import io.vertx.core.DeploymentOptions;
@@ -41,6 +43,7 @@ public final class Application {
             BUILD_VERSION = prop.getProperty("buildVersion", DEFAULT_UNKNOWN);
         } catch (IOException ex) {
             logger.catching(ex);
+            terminate(ex.getMessage());
         }
     }
 
@@ -67,17 +70,18 @@ public final class Application {
         PropertyConfig config = PropertyConfig.getInstance();
         DeploymentOptions options = new DeploymentOptions().setConfig(JsonObject.of(
                 "server", config.server(),
-                "auth", config.auth()
+                "auth", config.auth(),
+                "module", config.modules()
         ));
 
-        JsonObject dbConfig = config.db();
+        PropertyConfig.Database dbConfig = config.dbConfig();
         Pool dbPool = Pool.pool(new SqlConnectOptions()
-                .setPort(dbConfig.getInteger("port"))
-                .setHost(dbConfig.getString("host"))
-                .setDatabase(dbConfig.getString("database"))
-                .setUser(dbConfig.getString("user"))
-                .setPassword(dbConfig.getString("password")), new PoolOptions()
-                .setMaxSize(dbConfig.getInteger("pool.size")));
+                .setPort(dbConfig.getPort())
+                .setHost(dbConfig.getHost())
+                .setDatabase(dbConfig.getDatabase())
+                .setUser(dbConfig.getUser())
+                .setPassword(dbConfig.getPassword()), new PoolOptions()
+                .setMaxSize(dbConfig.getPoolSize()));
 
         dbPool.query("SELECT version()").execute().map(RowSet::iterator)
                 .map(it -> it.hasNext() ? it.next().getString(0) : null)
@@ -86,7 +90,8 @@ public final class Application {
 
         Vertx.vertx().deployVerticle(VertxDeployer.deploy(
                 new HttpVertxServer(MapLogRepository.create(dbPool)),
-                new TCPVertxServer(CachedMapLogRepository.create(dbPool))
+                new TCPVertxServer(CachedMapLogRepository.create(dbPool)),
+                new AnalyticModule(), new AlertModule()
         ), options, res -> {
             if(res.succeeded()) {
                 logger.info("Deployment id is: {}", res.result());

+ 39 - 3
src/main/java/cz/senslog/telemetry/app/PropertyConfig.java

@@ -12,8 +12,8 @@ public final class PropertyConfig {
     private final HttpServer httpServerConfig;
     private final TCPServer tcpServerConfig;
     private final Database dbConfig;
-
     private final Auth authConfig;
+    private final Modules modulesConfig;
 
     public static PropertyConfig getInstance() {
         return getInstance(System::getenv);
@@ -23,17 +23,19 @@ public final class PropertyConfig {
         if (INSTANCE == null) {
             INSTANCE = new PropertyConfig(
                     new HttpServer(getEnv), new TCPServer(getEnv),
-                    new Database(getEnv), new Auth(getEnv)
+                    new Database(getEnv), new Auth(getEnv),
+                    new Modules(getEnv)
             );
         }
         return INSTANCE;
     }
 
-    private PropertyConfig(HttpServer httpServer, TCPServer tcpServer, Database dbConfig, Auth authConfig) {
+    private PropertyConfig(HttpServer httpServer, TCPServer tcpServer, Database dbConfig, Auth authConfig, Modules modulesConfig) {
         this.httpServerConfig = httpServer;
         this.tcpServerConfig = tcpServer;
         this.dbConfig = dbConfig;
         this.authConfig = authConfig;
+        this.modulesConfig = modulesConfig;
     }
 
     public static class HttpServer {
@@ -130,6 +132,28 @@ public final class PropertyConfig {
         }
     }
 
+    public static class Modules {
+        private Modules(Function<String, String> getEnv) {
+            this.getEnv = getEnv;
+        }
+
+        private final Function<String, String> getEnv;
+
+        public String getAnalyticHost() {
+            return getEnv.apply("MODULE_ANALYTIC_HOST");
+        }
+
+        public int getAnalyticPort() {
+            String portStr = getEnv.apply("MODULE_ANALYTIC_PORT");
+            return portStr != null ? Integer.parseInt(portStr) : 80;
+        }
+
+        public boolean getAnalyticDisabled() {
+            String disabled = getEnv.apply("MODULE_ANALYTIC_DISABLED");
+            return disabled != null && Boolean.parseBoolean(getEnv.apply("MODULE_ANALYTIC_DISABLED"));
+        }
+    }
+
     public HttpServer httpServerConfig() {
         return httpServerConfig;
     }
@@ -146,6 +170,10 @@ public final class PropertyConfig {
         return authConfig;
     }
 
+    public Modules modulesConfig() {
+        return modulesConfig;
+    }
+
     public JsonObject server() {
         return JsonObject.of(
                 "http.port", httpServerConfig.getPort(),
@@ -172,4 +200,12 @@ public final class PropertyConfig {
             "pool.size", dbConfig.getPoolSize()
         );
     }
+
+    public JsonObject modules() {
+        return JsonObject.of(
+                "analytic.disabled", modulesConfig.getAnalyticDisabled(),
+                "analytic.host", modulesConfig.getAnalyticHost(),
+                "analytic.port", modulesConfig.getAnalyticPort()
+        );
+    }
 }

+ 6 - 4
src/main/java/cz/senslog/telemetry/app/VertxDeployer.java

@@ -23,14 +23,16 @@ public class VertxDeployer extends AbstractVerticle {
 
     @Override
     public void start(Promise<Void> startPromise) {
-        List<Future> futureModules = new ArrayList<>(verticles.length);
+        List<Future<Void>> futureModules = new ArrayList<>(verticles.length);
         for (AbstractVerticle v : verticles) {
             DeploymentOptions options = new DeploymentOptions()
-                    .setWorker(true).setConfig(config());
+                    .setThreadingModel(ThreadingModel.WORKER)
+                    .setConfig(config());
             futureModules.add(deployHelper(vertx, options, v));
         }
-        CompositeFuture.all(futureModules)
-                .onSuccess(v -> startPromise.complete()).onFailure(startPromise::fail);
+        Future.all(futureModules)
+                .onSuccess(v -> startPromise.complete())
+                .onFailure(startPromise::fail);
     }
 
     private static Future<Void> deployHelper(Vertx vertx, DeploymentOptions options, AbstractVerticle verticle) {

+ 4 - 0
src/main/java/cz/senslog/telemetry/database/domain/AlertStatus.java

@@ -2,4 +2,8 @@ package cz.senslog.telemetry.database.domain;
 
 public enum AlertStatus {
     CREATED, INFORMED, IN_PROCESS, SOLVED, DELETED;
+
+    public static AlertStatus of(String alertStatusString) {
+        return valueOf(alertStatusString.toUpperCase());
+    }
 }

+ 10 - 4
src/main/java/cz/senslog/telemetry/database/domain/CampaignUnitAlert.java

@@ -5,18 +5,24 @@ import java.time.OffsetDateTime;
 public class CampaignUnitAlert extends UnitAlert {
 
     private final long campaignId;
+    private final UnitTelemetry observation;
 
-    public static CampaignUnitAlert of(long id, long campaignId, long unitId, String message, OffsetDateTime timestamp, AlertStatus status) {
-        return new CampaignUnitAlert(id, campaignId, unitId, message, timestamp, status);
+    public static CampaignUnitAlert of(long id, long campaignId, long unitId, String message, OffsetDateTime timestamp, AlertStatus status, UnitTelemetry observation) {
+        return new CampaignUnitAlert(id, campaignId, unitId, message, timestamp, status, observation);
     }
 
     public static CampaignUnitAlert of(long unitId, long campaignId, String message, OffsetDateTime timestamp, AlertStatus status) {
-        return new CampaignUnitAlert(-1, campaignId, unitId, message, timestamp, status);
+        return new CampaignUnitAlert(-1, campaignId, unitId, message, timestamp, status, null);
     }
 
-    public CampaignUnitAlert(long id, long campaignId, long unitId, String message, OffsetDateTime timestamp, AlertStatus status) {
+    public CampaignUnitAlert(long id, long campaignId, long unitId, String message, OffsetDateTime timestamp, AlertStatus status, UnitTelemetry observation) {
         super(id, unitId, message, timestamp, status);
         this.campaignId = campaignId;
+        this.observation = observation;
+    }
+
+    public UnitTelemetry getObservation() {
+        return observation;
     }
 
     public long getCampaignId() {

+ 15 - 0
src/main/java/cz/senslog/telemetry/database/domain/Location.java

@@ -1,5 +1,7 @@
 package cz.senslog.telemetry.database.domain;
 
+import java.util.Objects;
+
 public class Location {
 
     private final float longitude;
@@ -43,6 +45,19 @@ public class Location {
     }
 
     @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        Location location = (Location) o;
+        return Float.compare(longitude, location.longitude) == 0 && Float.compare(latitude, location.latitude) == 0 && Float.compare(altitude, location.altitude) == 0 && Float.compare(angle, location.angle) == 0;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(longitude, latitude, altitude, angle);
+    }
+
+    @Override
     public String toString() {
         return "{" +
                 "\"longitude\":" + longitude +

+ 51 - 0
src/main/java/cz/senslog/telemetry/database/domain/SensLogObservation.java

@@ -0,0 +1,51 @@
+package cz.senslog.telemetry.database.domain;
+
+import io.vertx.core.json.JsonObject;
+
+import java.time.OffsetDateTime;
+
+import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME;
+
+public class SensLogObservation {
+
+    private final long unitId;
+    private final long sensorId;
+    private final OffsetDateTime timestamp;
+    private final double observationValue;
+
+    public static SensLogObservation of(long unitId, long sensorId, OffsetDateTime timestamp, double observationValue) {
+        return new SensLogObservation(unitId, sensorId, timestamp, observationValue);
+    }
+
+    private SensLogObservation(long unitId, long sensorId, OffsetDateTime timestamp, double observationValue) {
+        this.unitId = unitId;
+        this.sensorId = sensorId;
+        this.timestamp = timestamp;
+        this.observationValue = observationValue;
+    }
+
+    public long getUnitId() {
+        return unitId;
+    }
+
+    public long getSensorId() {
+        return sensorId;
+    }
+
+    public OffsetDateTime getTimestamp() {
+        return timestamp;
+    }
+
+    public double getObservationValue() {
+        return observationValue;
+    }
+
+    public JsonObject toJsonObject() {
+        return JsonObject.of(
+                "unitId", getUnitId(),
+                "sensorId", getSensorId(),
+                "timestamp", getTimestamp().format(ISO_OFFSET_DATE_TIME),
+                "observedValue", getObservationValue()
+        );
+    }
+}

+ 33 - 0
src/main/java/cz/senslog/telemetry/database/domain/UnitTelemetry.java

@@ -4,6 +4,9 @@ import io.vertx.core.json.JsonObject;
 
 import java.time.OffsetDateTime;
 import java.time.format.DateTimeFormatter;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Stream;
 
 public class UnitTelemetry {
 
@@ -66,4 +69,34 @@ public class UnitTelemetry {
                 ", \"speed\":" + speed +
                 '}';
     }
+
+    public static class Converter {
+
+        public static Stream<SensLogObservation> toSensLogObservationAsStream(UnitTelemetry unitTelemetry) {
+            if (unitTelemetry == null) {
+                return Stream.empty();
+            }
+            return unitTelemetry.getObservedValues().stream().map(sensorEntry -> SensLogObservation.of(
+                    unitTelemetry.getUnitId(),
+                    Integer.parseInt(sensorEntry.getKey()),
+                    unitTelemetry.getTimestamp(),
+                    sensorEntry.getValue() instanceof Double ? (Double) sensorEntry.getValue() : Double.NaN
+            ));
+        }
+
+        public static Stream<SensLogObservation> toSensLogObservationAsStream(List<UnitTelemetry> unitTelemetry) {
+            if (unitTelemetry == null) {
+                return Stream.empty();
+            }
+            return unitTelemetry.stream().flatMap(Converter::toSensLogObservationAsStream);
+        }
+
+        public static List<SensLogObservation> toSensLogObservation(UnitTelemetry unitTelemetry) {
+            return toSensLogObservationAsStream(unitTelemetry).toList();
+        }
+
+        public static List<SensLogObservation> toSensLogObservation(List<UnitTelemetry> unitTelemetryList) {
+            return toSensLogObservationAsStream(unitTelemetryList).toList();
+        }
+    }
 }

+ 3 - 2
src/main/java/cz/senslog/telemetry/database/repository/CachedMapLogRepository.java

@@ -4,17 +4,18 @@ import cz.senslog.telemetry.database.domain.Sensor;
 import cz.senslog.telemetry.database.domain.Unit;
 import io.vertx.core.Future;
 import io.vertx.pgclient.PgPool;
+import io.vertx.sqlclient.Pool;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 public class CachedMapLogRepository extends MapLogRepository {
 
-    public static CachedMapLogRepository create(PgPool client) {
+    public static CachedMapLogRepository create(Pool client) {
         return new CachedMapLogRepository(client);
     }
 
-    protected CachedMapLogRepository(PgPool client) {
+    protected CachedMapLogRepository(Pool client) {
         super(client);
     }
 

+ 109 - 31
src/main/java/cz/senslog/telemetry/database/repository/MapLogRepository.java

@@ -118,10 +118,29 @@ public class MapLogRepository implements SensLogRepository {
                 .map(rs -> StreamSupport.stream(rs.spliterator(), false).map(r -> r.getLong(0)).collect(toList())));
     }
 
+    private final static Function<Row, UnitTelemetry> ROW_TO_UNIT_TELEMETRY = r -> UnitTelemetry.of(
+            r.getLong("id"),
+            r.getLong("unit_id"),
+            r.getOffsetDateTime("time_stamp"),
+            Location.of(
+                    r.getFloat("long"),
+                    r.getFloat("lat"),
+                    r.getFloat("alt"),
+                    r.getFloat("angle")
+            ),
+            r.getInteger("speed"),
+            r.getJsonObject("observed_values")
+    );
+
     @Override
-    public Future<Integer> saveTelemetry(UnitTelemetry data) {
+    public Future<UnitTelemetry> saveTelemetry(UnitTelemetry data) {
         return client.preparedQuery("INSERT INTO maplog.obs_telemetry(unit_id, time_stamp, the_geom, speed, observed_values) " +
-                        "VALUES ($1, $2, ST_SetSRID(ST_MakePoint($3, $4, $5, $6), 4326), $7, $8::jsonb) RETURNING (id)")
+                        "VALUES ($1, $2, ST_SetSRID(ST_MakePoint($3, $4, $5, $6), 4326), $7, $8::jsonb) " +
+                        "RETURNING id, unit_id, time_stamp, speed, observed_values, " +
+                                "ST_X (ST_Transform (the_geom, 4326)) AS long, " +
+                                "ST_Y (ST_Transform (the_geom, 4326)) AS lat, " +
+                                "ST_Z (ST_Transform (the_geom, 4326)) AS alt, " +
+                                "ST_M (the_geom) AS angle")
                 .execute(Tuple.of(
                         data.getUnitId(),
                         data.getTimestamp(),
@@ -132,13 +151,13 @@ public class MapLogRepository implements SensLogRepository {
                         data.getSpeed(),
                         data.getObservedValues()))
                 .map(RowSet::iterator)
-                .map(it -> it.hasNext() ? it.next().getInteger(0) : 0);
+                .map(it -> it.hasNext() ? ROW_TO_UNIT_TELEMETRY.apply(it.next()) : null);
     }
 
     @Override
-    public Future<Integer> saveAllTelemetry(List<UnitTelemetry> data) {
+    public Future<List<UnitTelemetry>> saveAllTelemetry(List<UnitTelemetry> data) {
         if (data == null || data.isEmpty()) {
-            return Future.succeededFuture(0);
+            return Future.succeededFuture(Collections.emptyList());
         }
         List<Tuple> tuples = data.stream().map(d -> Tuple.of(
                 d.getUnitId(),
@@ -151,9 +170,26 @@ public class MapLogRepository implements SensLogRepository {
                 d.getObservedValues()
         )).collect(toList());
         return client.preparedQuery("INSERT INTO maplog.obs_telemetry(unit_id, time_stamp, the_geom, speed, observed_values) " +
-                        "VALUES ($1, $2, ST_SetSRID(ST_MakePoint($3, $4, $5, $6), 4326), $7, $8::jsonb)")
+                        "VALUES ($1, $2, ST_SetSRID(ST_MakePoint($3, $4, $5, $6), 4326), $7, $8::jsonb) " +
+                        "RETURNING id, unit_id, time_stamp, speed, observed_values, " +
+                                "ST_X (ST_Transform (the_geom, 4326)) AS long, " +
+                                "ST_Y (ST_Transform (the_geom, 4326)) AS lat, " +
+                                "ST_Z (ST_Transform (the_geom, 4326)) AS alt, " +
+                                "ST_M (the_geom) AS angle")
                 .executeBatch(tuples)
-                .map(SqlResult::rowCount);
+                .map(row -> StreamSupport.stream(row.spliterator(), false).map(r -> UnitTelemetry.of(
+                        r.getLong("id"),
+                        r.getLong("unit_id"),
+                        r.getOffsetDateTime("time_stamp"),
+                        Location.of(
+                                r.getFloat("long"),
+                                r.getFloat("lat"),
+                                r.getFloat("alt"),
+                                r.getFloat("angle")
+                        ),
+                        r.getInteger("speed"),
+                        r.getJsonObject("observed_values")
+                )).toList());
     }
 
     @Override
@@ -1848,9 +1884,7 @@ public class MapLogRepository implements SensLogRepository {
                   case "array"  -> {
                       return new JsonArray(row.getString("value"));
                   }
-                  default -> {
-                      throw new IllegalArgumentException(String.format("Unsupported type <%s> of sensor's value.", type));
-                  }
+                  default -> throw new IllegalArgumentException(String.format("Unsupported type <%s> of sensor's value.", type));
               }
         };
 
@@ -1941,9 +1975,7 @@ public class MapLogRepository implements SensLogRepository {
                 case "array"  -> {
                     return new JsonArray(row.getString("value"));
                 }
-                default -> {
-                    throw new IllegalArgumentException(String.format("Unsupported type <%s> of sensor's value.", type));
-                }
+                default -> throw new IllegalArgumentException(String.format("Unsupported type <%s> of sensor's value.", type));
             }
         };
 
@@ -2436,21 +2468,41 @@ public class MapLogRepository implements SensLogRepository {
     }
 
     private static final Function<Row, CampaignUnitAlert> ROW_TO_ALERT = (row) -> CampaignUnitAlert.of(
-        row.getLong("id"),
-        row.getLong("campaign_id"),
-        row.getLong("unit_id"),
-        row.getString("message"),
-        row.getOffsetDateTime("time_stamp"),
-        AlertStatus.valueOf(row.getString("status"))
+            row.getLong("alert_id"),
+            row.getLong("campaign_id"),
+            row.getLong("unit_id"),
+            row.getString("message"),
+            row.getOffsetDateTime("time_stamp"),
+            AlertStatus.valueOf(row.getString("status")),
+            row.getLong("telemetry_id") != null ? UnitTelemetry.of(
+                    row.getLong("telemetry_id"),
+                    row.getLong("unit_id"),
+                    row.getOffsetDateTime("time_stamp"),
+                    Location.of(
+                            row.getFloat("long"),
+                            row.getFloat("lat"),
+                            row.getFloat("alt"),
+                            row.getFloat("angle")
+                    ),
+                    row.getInteger("speed"),
+                    row.getJsonObject("observed_values")
+            ) : null
     );
 
     @Override
     public Future<CampaignUnitAlert> findAlertById(long alertId) {
-        return client.preparedQuery("SELECT a.id, utc.campaign_id, a.unit_id, a.message, a.time_stamp, a.status FROM maplog.alert AS a " +
+        return client.preparedQuery("SELECT a.id as alert_id, utc.campaign_id, a.unit_id, a.message, a.time_stamp, a.status, " +
+                            "ot.id as telemetry_id, ot.speed, ot.observed_values, " +
+                            "ST_X (ST_Transform (ot.the_geom, 4326)) AS long, " +
+                            "ST_Y (ST_Transform (ot.the_geom, 4326)) AS lat, " +
+                            "ST_Z (ST_Transform (ot.the_geom, 4326)) AS alt, " +
+                            "ST_M (ot.the_geom) AS angle " +
+                        "FROM maplog.alert AS a " +
                         "JOIN maplog.unit u on u.unit_id = a.unit_id " +
                         "JOIN maplog.unit_to_campaign utc on u.unit_id = utc.unit_id " +
+                        "LEFT JOIN maplog.obs_telemetry ot on ot.unit_id = a.unit_id and ot.time_stamp = a.time_stamp " +
                         "WHERE utc.from_time <= a.time_stamp AND (CASE WHEN utc.to_time IS NULL THEN now() ELSE utc.to_time END) >= a.time_stamp " +
-                        "AND a.id = $1")
+                        "AND a.id = $1 ORDER BY ot.time_received limit 1")
                 .execute(Tuple.of(alertId))
                 .map(RowSet::iterator)
                 .map(it -> it.hasNext() ? ROW_TO_ALERT.apply(it.next()) : null)
@@ -2460,14 +2512,21 @@ public class MapLogRepository implements SensLogRepository {
 
     @Override
     public Future<CampaignUnitAlert> findAlertByIdentityAndId(String userIdentity, long alertId) {
-        return client.preparedQuery("SELECT a.id, utc.campaign_id, a.unit_id, a.message, a.time_stamp, a.status FROM maplog.alert AS a " +
+        return client.preparedQuery("SELECT a.id, utc.campaign_id, a.unit_id, a.message, a.time_stamp, a.status, " +
+                            "ot.id as telemetry_id, ot.speed, ot.observed_values, " +
+                            "ST_X (ST_Transform (ot.the_geom, 4326)) AS long, " +
+                            "ST_Y (ST_Transform (ot.the_geom, 4326)) AS lat, " +
+                            "ST_Z (ST_Transform (ot.the_geom, 4326)) AS alt, " +
+                            "ST_M (ot.the_geom) AS angle " +
+                        "FROM maplog.alert AS a " +
                         "JOIN maplog.unit u on u.unit_id = a.unit_id " +
                         "JOIN maplog.unit_to_campaign utc on u.unit_id = utc.unit_id " +
                         "JOIN maplog.event e on e.unit_id = utc.unit_id " +
                         "JOIN maplog.user_to_campaign_config uc ON uc.campaign_id = utc.campaign_id and uc.entity_id = e.entity_id " +
                         "JOIN maplog.system_user AS su ON su.id = uc.user_id " +
+                        "LEFT JOIN maplog.obs_telemetry ot on ot.unit_id = a.unit_id and ot.time_stamp = a.time_stamp " +
                         "WHERE utc.from_time <= a.time_stamp AND (CASE WHEN utc.to_time IS NULL THEN now() ELSE utc.to_time END) >= a.time_stamp " +
-                        "AND a.id = $1 AND su.identity = $2 GROUP BY a.id, utc.campaign_id")
+                        "AND a.id = $1 AND su.identity = $2 GROUP BY a.id, utc.campaign_id, ot.id limit 1")
                 .execute(Tuple.of(alertId, userIdentity))
                 .map(RowSet::iterator)
                 .map(it -> it.hasNext() ? ROW_TO_ALERT.apply(it.next()) : null)
@@ -2486,20 +2545,28 @@ public class MapLogRepository implements SensLogRepository {
         );
     }
 
+    private static final Function<Row, UnitAlert> ROW_TO_UNIT_ALERT = (row) -> UnitAlert.of(
+            row.getLong("id"),
+            row.getLong("unit_id"),
+            row.getString("message"),
+            row.getOffsetDateTime("time_stamp"),
+            AlertStatus.of(row.getString("status"))
+    );
+
     @Override
-    public Future<Long> saveAlert(UnitAlert alert) {
+    public Future<UnitAlert> saveAlert(UnitAlert alert) {
         return client.withTransaction(conn ->
-                conn.preparedQuery("INSERT INTO maplog.alert(time_stamp, unit_id, message, status) VALUES ($1, $2, $3, $4) RETURNING id")
+                conn.preparedQuery("INSERT INTO maplog.alert(time_stamp, unit_id, message, status) VALUES ($1, $2, $3, $4) RETURNING id, time_stamp, unit_id, message, status")
                 .execute(Tuple.of(alert.getTimestamp(), alert.getUnitId(), alert.getMessage(), alert.getStatus()))
                 .map(RowSet::iterator)
-                .map(it -> it.hasNext() ? it.next().getLong(0) : null)
+                .map(it -> it.hasNext() ? ROW_TO_UNIT_ALERT.apply(it.next()) : null)
                 .map(Optional::ofNullable)
                 .map(p -> p.orElseThrow(() -> new IllegalStateException("Alert was not saved successfully.")))
         );
     }
 
     @Override
-    public Future<List<Long>> saveAlerts(List<UnitAlert> alerts) {
+    public Future<List<UnitAlert>> saveAlerts(List<UnitAlert> alerts) {
         if (alerts == null || alerts.isEmpty()) {
             return Future.succeededFuture(Collections.emptyList());
         }
@@ -2509,8 +2576,17 @@ public class MapLogRepository implements SensLogRepository {
         )).toList();
 
         return client.withTransaction(conn ->
-                conn.preparedQuery("INSERT INTO maplog.alert(time_stamp, unit_id, message, status) VALUES ($1, $2, $3, $4) RETURNING id;")
-                        .executeBatch(tuples).map(rs -> StreamSupport.stream(rs.spliterator(), false).map(r -> r.getLong(0)).toList())
+                conn.preparedQuery("INSERT INTO maplog.alert(time_stamp, unit_id, message, status) VALUES ($1, $2, $3, $4) RETURNING id, time_stamp, unit_id, message, status")
+                        .executeBatch(tuples)
+                        .map(rs -> StreamSupport.stream(rs.spliterator(), false)
+                                .map(r -> UnitAlert.of(
+                                        r.getLong("id"),
+                                        r.getLong("unit_id"),
+                                        r.getString("message"),
+                                        r.getOffsetDateTime("time_stamp"),
+                                        AlertStatus.of(r.getString("status"))
+                                )).toList()
+                        )
         );
     }
 
@@ -2639,7 +2715,8 @@ public class MapLogRepository implements SensLogRepository {
                                 r.getLong("unit_id"),
                                 r.getString("message"),
                                 r.getOffsetDateTime("time_stamp"),
-                                AlertStatus.valueOf(r.getString("status"))
+                                AlertStatus.valueOf(r.getString("status")),
+                                null
                         ))
                         .collect(toList())
                 );
@@ -2689,7 +2766,8 @@ public class MapLogRepository implements SensLogRepository {
                                 r.getLong("unit_id"),
                                 r.getString("message"),
                                 r.getOffsetDateTime("time_stamp"),
-                                AlertStatus.valueOf(r.getString("status"))
+                                AlertStatus.valueOf(r.getString("status")),
+                                null
                         ))
                         .collect(toList())
                 );

+ 4 - 4
src/main/java/cz/senslog/telemetry/database/repository/SensLogRepository.java

@@ -19,8 +19,8 @@ public interface SensLogRepository {
     Future<Long> saveEvent(Event event);
     Future<List<Long>> saveEvents(List<Event> events);
 
-    Future<Integer> saveTelemetry(UnitTelemetry data);
-    Future<Integer> saveAllTelemetry(List<UnitTelemetry> data);
+    Future<UnitTelemetry> saveTelemetry(UnitTelemetry data);
+    Future<List<UnitTelemetry>> saveAllTelemetry(List<UnitTelemetry> data);
 
 
     Future<Boolean> createSensor(Sensor sensor, long unitId);
@@ -248,8 +248,8 @@ public interface SensLogRepository {
     Future<CampaignUnitAlert> findAlertById(long alertId);
     Future<CampaignUnitAlert> findAlertByIdentityAndId(String userIdentity, long alertId);
     Future<Long> updateAlert(long alertId, AlertStatus status);
-    Future<Long> saveAlert(UnitAlert alert);
-    Future<List<Long>> saveAlerts(List<UnitAlert> alerts);
+    Future<UnitAlert> saveAlert(UnitAlert alert);
+    Future<List<UnitAlert>> saveAlerts(List<UnitAlert> alerts);
     Future<List<Alert>> findAlertsByEventId(long eventId, Set<AlertStatus> statusFilter, SortType sortType);
     Future<List<Alert>> findAlertsByIdentityAndEventId(String userIdentity, long eventId, Set<AlertStatus> statusFilter, SortType sortType);
     Future<EventAlert> findAlertByIdAndEventId(long alertId, long eventId);

+ 65 - 0
src/main/java/cz/senslog/telemetry/module/AlertModule.java

@@ -0,0 +1,65 @@
+package cz.senslog.telemetry.module;
+
+import cz.senslog.telemetry.database.domain.AlertStatus;
+import io.vertx.core.AbstractVerticle;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.json.JsonArray;
+import io.vertx.core.json.JsonObject;
+import io.vertx.json.schema.*;
+import io.vertx.json.schema.draft7.dsl.StringFormat;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import static cz.senslog.telemetry.module.EventBusModulePaths.SENSLOG_ALERTS;
+import static io.vertx.json.schema.common.dsl.Schemas.*;
+import static io.vertx.json.schema.draft7.dsl.Keywords.exclusiveMinimum;
+import static io.vertx.json.schema.draft7.dsl.Keywords.format;
+import static java.lang.Long.parseLong;
+
+public class AlertModule extends AbstractVerticle {
+
+    private static final Logger logger = LogManager.getLogger(AlertModule.class);
+
+    private static final Validator alertValidator;
+
+    static {
+        JsonObject schemaJson = arraySchema().items(objectSchema()
+                        .requiredProperty("id", intSchema().with(exclusiveMinimum(0)))
+                        .requiredProperty("message", stringSchema())
+                        .requiredProperty("status", enumSchema((Object[]) AlertStatus.values()))
+                        .requiredProperty("timestamp", stringSchema().with(format(StringFormat.DATETIME))))
+                .toJson();
+
+        alertValidator = Validator.create(JsonSchema.of(schemaJson), new JsonSchemaOptions()
+                .setBaseUri("https://vertx.io").setDraft(Draft.DRAFT7));
+    }
+
+    @Override
+    public void start(Promise<Void> startPromise) throws Exception {
+        vertx.eventBus().<JsonArray>consumer(SENSLOG_ALERTS, msg -> {
+                    OutputUnit validationResult = alertValidator.validate(msg.body());
+                    if (validationResult.getValid()) {
+                        long campaignId = parseLong(msg.headers().get("campaignId"));
+                        String userId = msg.headers().get("userId");
+                        handleAlert(userId, campaignId, msg.body())
+                                .onSuccess(msg::reply).onFailure(f -> msg.fail(400, f.getMessage()));
+                    } else {
+                        msg.fail(400, validationResult.getError());
+                    }
+                })
+                .completionHandler(res -> {
+                    if (res.succeeded()) {
+                        startPromise.complete();
+                    } else {
+                        startPromise.fail(res.cause());
+                    }
+                });
+    }
+
+    private Future<String> handleAlert(final String userId, final long campaignId, final JsonArray alerts) {
+        //  TODO load messaging configuration for the alert based on logged user
+        logger.info("Campaign <{}>\talerts {}", campaignId, alerts.toString());
+        return Future.succeededFuture(String.format("Printed <%d> alerts", alerts.size()));
+    }
+}

+ 102 - 0
src/main/java/cz/senslog/telemetry/module/AnalyticModule.java

@@ -0,0 +1,102 @@
+package cz.senslog.telemetry.module;
+
+import io.vertx.core.*;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.*;
+import io.vertx.core.json.JsonArray;
+import io.vertx.core.json.JsonObject;
+import io.vertx.json.schema.Draft;
+import io.vertx.json.schema.JsonSchema;
+import io.vertx.json.schema.JsonSchemaOptions;
+import io.vertx.json.schema.Validator;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.function.BiFunction;
+
+import static cz.senslog.telemetry.module.EventBusModulePaths.SENSLOG_OBSERVATIONS;
+import static cz.senslog.telemetry.server.ws.ContentType.JSON;
+import static cz.senslog.telemetry.server.ws.ContentType.JSON_SCHEMA;
+
+public class AnalyticModule extends AbstractVerticle {
+
+    private static final Logger logger = LogManager.getLogger(AnalyticModule.class);
+
+    @Override
+    public void start(Promise<Void> startPromise) {
+        JsonObject moduleConfig = config().getJsonObject("module");
+
+        boolean disabled = moduleConfig.getBoolean("analytic.disabled", false);
+        if (disabled) {
+            logger.warn("The module <{}> is disabled.", AnalyticModule.class.getSimpleName());
+            startPromise.complete(); return;
+        }
+
+        String host = moduleConfig.getString("analytic.host");
+        int port = moduleConfig.getInteger("analytic.port");
+
+        if (host == null || host.isEmpty()) {
+            logger.warn("The module <{}> is disabled. It requires an analytic.host property", AnalyticModule.class.getSimpleName());
+            startPromise.complete();
+        } else {
+            final HttpClient httpClient = vertx.createHttpClient();
+
+            final Future<JsonObject> infoGETReq = httpClient.request(new RequestOptions()
+                    .setMethod(HttpMethod.GET).setPort(port).setHost(host).setURI("/info")
+                    .setHeaders(MultiMap.caseInsensitiveMultiMap()
+                            .add(HttpHeaders.ACCEPT, JSON.contentType())))
+                    .compose(req -> req.send().compose(HttpClientResponse::body).map(Buffer::toJsonObject));
+
+            final Future<JsonObject> schemaGETReq = httpClient.request(new RequestOptions()
+                    .setMethod(HttpMethod.GET).setPort(port).setHost(host).setURI("/observations")
+                    .setHeaders(MultiMap.caseInsensitiveMultiMap()
+                            .add(HttpHeaders.ACCEPT, JSON_SCHEMA.contentType())))
+                    .compose(req -> req.send().compose(HttpClientResponse::body).map(Buffer::toJsonObject));
+
+            final BiFunction<Validator, JsonArray, Future<JsonObject>> obsPOSTReq = (validator, observations) -> {
+                if (validator.validate(observations).getValid()) {
+                    logger.warn("No valid observations to be sent.");
+                    return Future.failedFuture("No valid observations to be sent.");
+                }
+                return httpClient.request(new RequestOptions()
+                    .setMethod(HttpMethod.POST).setPort(port).setHost(host).setURI("/observations")
+                    .setHeaders(MultiMap.caseInsensitiveMultiMap()
+                            .add(HttpHeaders.CONTENT_TYPE, JSON.contentType())))
+                        .compose(req -> req.send(observations.toBuffer())
+                                .compose(HttpClientResponse::body).map(Buffer::toJsonObject)
+                        );
+            };
+
+            infoGETReq.compose(infoJSON -> {
+                String alertServerInfo = String.join(" | ", infoJSON.stream().map(e -> e.getKey() + ": " + e.getValue()).toList());
+                logger.info("Connected to Analytic Server: {}", alertServerInfo);
+                return schemaGETReq;
+            }).compose(schemaJSON -> {
+                logger.info("Loaded JSON Schema for publishing observations to the Analytics server.");
+
+                final Validator validator = Validator.create(JsonSchema.of(schemaJSON), new JsonSchemaOptions()
+                        .setBaseUri("https://vertx.io").setDraft(Draft.DRAFT7));
+
+                Promise<Void> eventBusRegister = Promise.promise();
+                vertx.eventBus().<JsonArray>consumer(SENSLOG_OBSERVATIONS, msg -> obsPOSTReq.apply(validator, msg.body())
+                        .onComplete(res -> {
+                            if (res.succeeded()) {
+                                msg.reply(res.result());
+                            } else {
+                                msg.fail(400, res.cause().getMessage());
+                            }
+                        }))
+                        .completionHandler(comp -> {
+                            if (comp.succeeded()) {
+                                logger.info("Event bus registered.");
+                                eventBusRegister.complete();
+                            } else {
+                                logger.error("Event bus failed.", comp.cause());
+                                eventBusRegister.fail(comp.cause());
+                            }
+                        });
+                return eventBusRegister.future();
+            }).onSuccess(startPromise::complete).onFailure(startPromise::fail);
+        }
+    }
+}

+ 12 - 0
src/main/java/cz/senslog/telemetry/module/EventBusModulePaths.java

@@ -0,0 +1,12 @@
+package cz.senslog.telemetry.module;
+
+public final class EventBusModulePaths {
+
+    private static String prefix(String name) {
+        return "__module#"+name;
+    }
+
+    public static final String SENSLOG_OBSERVATIONS = prefix("senslog_observations");
+
+    public static final String SENSLOG_ALERTS = prefix("senslog_alerts");
+}

+ 26 - 12
src/main/java/cz/senslog/telemetry/server/Fm4exSocketHandler.java

@@ -2,10 +2,14 @@ package cz.senslog.telemetry.server;
 
 import cz.senslog.telemetry.database.domain.*;
 import cz.senslog.telemetry.database.repository.SensLogRepository;
+import cz.senslog.telemetry.module.EventBusModulePaths;
 import cz.senslog.telemetry.protocol.domain.*;
 import cz.senslog.telemetry.protocol.Fm4ex;
+import cz.senslog.telemetry.utils.Tuple;
 import io.vertx.core.Future;
+import io.vertx.core.Vertx;
 import io.vertx.core.buffer.Buffer;
+import io.vertx.core.json.JsonArray;
 import io.vertx.core.json.JsonObject;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -19,6 +23,7 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
 
+import static cz.senslog.telemetry.module.EventBusModulePaths.SENSLOG_OBSERVATIONS;
 import static java.time.ZoneOffset.UTC;
 
 public class Fm4exSocketHandler {
@@ -32,12 +37,12 @@ public class Fm4exSocketHandler {
 
     private final Function<String, LoopInvoker<Buffer, Buffer, SocketContext>> newContext;
 
-    private Fm4exSocketHandler(SensLogRepository repo) {
-        this.newContext = socId -> invoker.init(new SocketContext(socId, repo));
+    private Fm4exSocketHandler(Vertx vertx, SensLogRepository repo) {
+        this.newContext = socId -> invoker.init(new SocketContext(socId, vertx, repo));
     }
 
-    public static Fm4exSocketHandler create(SensLogRepository repo) {
-        return new Fm4exSocketHandler(repo);
+    public static Fm4exSocketHandler create(Vertx vertx, SensLogRepository repo) {
+        return new Fm4exSocketHandler(vertx, repo);
     }
 
     public Future<String> destroySocket(String socketID) {
@@ -58,15 +63,18 @@ public class Fm4exSocketHandler {
 
         private static final Buffer SUCCESS = Buffer.buffer(new byte[]{0x01}), ERROR = Buffer.buffer(new byte[]{0x00});
 
+        private final Vertx vertx;
+
         private final SensLogRepository repo;
 
         private final String socketId;
 
         private Unit contextUnit;
 
-        private SocketContext(String socketId, SensLogRepository repo) {
+        private SocketContext(String socketId, Vertx vertx, SensLogRepository repo) {
             logger.info("[{}] New socket handling.", socketId);
             this.socketId = socketId;
+            this.vertx = vertx;
             this.repo = repo;
             this.contextUnit = null;
         }
@@ -163,19 +171,25 @@ public class Fm4exSocketHandler {
                         Sensor sensor = ioToSensor.get((long)io.getId());
                         if (sensor != null) {
                             observedValues.put(Long.toString(sensor.getSensorId()), io.getValue());
-                        } else {
+                         } else {
                             logger.error("Sensor for the IO Property <{}> does not exist.", io.getId());
                         }
                     }
                     telemetries.add(UnitTelemetry.of(unitId, timestamp, location, avl.getSpeed(), observedValues));
                 }
                 return telemetries;
-            }).flatMap(telemetries -> repo.saveAllTelemetry(telemetries)
-                        .flatMap(r -> {
-                            logger.info("[{}] AVL Records <{}> was saved successfully.", socketId, r);
-                            return Future.succeededFuture(SUCCESS);
-                        })
-            );
+            }).compose(data -> repo.saveAllTelemetry(data).map(count -> Tuple.of(count, data)))
+                    .flatMap(dataTuple -> {
+                        JsonArray sensLogObsArr = new JsonArray(UnitTelemetry.Converter
+                                .toSensLogObservationAsStream(dataTuple.item2())
+                                .map(SensLogObservation::toJsonObject).toList()
+                        );
+                        vertx.eventBus().request(SENSLOG_OBSERVATIONS, sensLogObsArr)
+                                .onSuccess(v -> logger.info(v.body())).onFailure(logger::error);
+
+                        logger.info("[{}] AVL Records <{}> was saved successfully.", socketId, dataTuple.item1());
+                        return Future.succeededFuture(SUCCESS);
+                    });
         }
     }
 }

+ 1 - 1
src/main/java/cz/senslog/telemetry/server/HttpVertxServer.java

@@ -77,7 +77,7 @@ public final class HttpVertxServer extends AbstractVerticle {
                     // The order matters, so adding the body handler should happen after any PLATFORM or SECURITY_POLICY handler(s).
                     openAPIRouterBuilder.rootHandler(BodyHandler.create());
 
-                    OpenAPIHandler apiHandler = OpenAPIHandler.create(repo);
+                    OpenAPIHandler apiHandler = OpenAPIHandler.create(vertx, repo);
 
                     openAPIRouterBuilder.operation("infoGET").handler(apiHandler::info);
 

+ 1 - 1
src/main/java/cz/senslog/telemetry/server/TCPVertxServer.java

@@ -24,7 +24,7 @@ public final class TCPVertxServer extends AbstractVerticle {
         NetServerOptions serverOpt = new NetServerOptions().setRegisterWriteHandler(true);
         NetServer server = vertx.createNetServer(serverOpt);
 
-        Fm4exSocketHandler socHandler = Fm4exSocketHandler.create(repo);
+        Fm4exSocketHandler socHandler = Fm4exSocketHandler.create(vertx, repo);
         server.connectHandler(socket -> socket.handler(
                         buffer -> socHandler.process(socket.writeHandlerID(), buffer)
                                 .onSuccess(socket::write)

+ 2 - 1
src/main/java/cz/senslog/telemetry/server/ws/ContentType.java

@@ -2,7 +2,8 @@ package cz.senslog.telemetry.server.ws;
 
 public enum ContentType {
     JSON    ("application/json"),
-    GEOJSON ("application/geo+json")
+    GEOJSON ("application/geo+json"),
+    JSON_SCHEMA ("application/json+schema")
     ;
 
     private final String contentType;

+ 105 - 75
src/main/java/cz/senslog/telemetry/server/ws/OpenAPIHandler.java

@@ -6,14 +6,19 @@ import cz.senslog.telemetry.database.PagingRetrieve;
 import cz.senslog.telemetry.database.SortType;
 import cz.senslog.telemetry.database.domain.*;
 import cz.senslog.telemetry.database.repository.SensLogRepository;
+import cz.senslog.telemetry.module.EventBusModulePaths;
 import cz.senslog.telemetry.utils.CascadeCondition;
 import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.DeliveryOptions;
 import io.vertx.core.http.HttpServerRequest;
 import io.vertx.core.json.Json;
 import io.vertx.core.json.JsonArray;
 import io.vertx.core.json.JsonObject;
 import io.vertx.ext.web.RoutingContext;
 import io.vertx.json.schema.common.JsonUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 
 import java.time.OffsetDateTime;
@@ -23,6 +28,7 @@ import java.util.function.*;
 import java.util.stream.Collectors;
 
 import static cz.senslog.telemetry.database.validation.UnitTelemetryValidation.*;
+import static cz.senslog.telemetry.module.EventBusModulePaths.SENSLOG_ALERTS;
 import static cz.senslog.telemetry.server.ws.AuthorizationScope.READ_INFRASTRUCTURE;
 import static cz.senslog.telemetry.server.ws.AuthorizationScope.READ_PERSONAL;
 import static cz.senslog.telemetry.server.ws.AuthorizationType.BEARER;
@@ -40,17 +46,24 @@ import static java.util.stream.Collectors.*;
 
 public class OpenAPIHandler {
 
-    private static final BiFunction<OffsetDateTime, ZoneId, String> DATE_TIME_FORMATTER = (dateTime, zoneId) ->
+    private static final Logger logger = LogManager.getLogger(OpenAPIHandler.class);
+
+    private static final BiFunction<OffsetDateTime, ZoneId, String> DATE_TIME_FORMATTER_AT_ZONE = (dateTime, zoneId) ->
             dateTime != null ? ofInstant(dateTime.toInstant(), zoneId).format(ISO_OFFSET_DATE_TIME) : null;
 
+    private static final Function<OffsetDateTime, String> DATE_TIME_FORMATTER = (dateTime) ->
+            dateTime != null ? dateTime.format(ISO_OFFSET_DATE_TIME) : null;
+
     private final SensLogRepository repo;
+    private final Vertx vertx;
 
-    private OpenAPIHandler(SensLogRepository repo) {
+    private OpenAPIHandler(Vertx vertx, SensLogRepository repo) {
         this.repo = repo;
+        this.vertx = vertx;
     }
 
-    public static OpenAPIHandler create(SensLogRepository repo) {
-        return new OpenAPIHandler(repo);
+    public static OpenAPIHandler create(Vertx vertx, SensLogRepository repo) {
+        return new OpenAPIHandler(vertx, repo);
     }
 
     private static String hostURLFull(HttpServerRequest req) {
@@ -94,8 +107,8 @@ public class OpenAPIHandler {
                             ) : JsonObject.of()).mergeIn(JsonObject.of(
                                     "id", c.getId(),
                                     "name", c.getName(),
-                                    "fromTime", DATE_TIME_FORMATTER.apply(c.getFromTime(), zone),
-                                    "toTime", DATE_TIME_FORMATTER.apply(c.getToTime(), zone)
+                                    "fromTime", DATE_TIME_FORMATTER_AT_ZONE.apply(c.getFromTime(), zone),
+                                    "toTime", DATE_TIME_FORMATTER_AT_ZONE.apply(c.getToTime(), zone)
                             ))).collect(toList())).encode()))
                     .onFailure(rc::fail)
                 );
@@ -122,8 +135,8 @@ public class OpenAPIHandler {
                                 "id", c.getId(),
                                 "name", c.getName(),
                                 "description", c.getDescription(),
-                                "fromTime", DATE_TIME_FORMATTER.apply(c.getFromTime(), zone),
-                                "toTime", DATE_TIME_FORMATTER.apply(c.getToTime(), zone)
+                                "fromTime", DATE_TIME_FORMATTER_AT_ZONE.apply(c.getFromTime(), zone),
+                                "toTime", DATE_TIME_FORMATTER_AT_ZONE.apply(c.getToTime(), zone)
                         )).encode()))
                 .onFailure(rc::fail));
     }
@@ -149,8 +162,8 @@ public class OpenAPIHandler {
                                 "unitId", u.getUnitId(),
                                 "name", u.getName(),
                                 "description", u.getDescription(),
-                                "fromTime", DATE_TIME_FORMATTER.apply(u.getFromTime(), zone),
-                                "toTime", DATE_TIME_FORMATTER.apply(u.getToTime(), zone)
+                                "fromTime", DATE_TIME_FORMATTER_AT_ZONE.apply(u.getFromTime(), zone),
+                                "toTime", DATE_TIME_FORMATTER_AT_ZONE.apply(u.getToTime(), zone)
                         ))).collect(toList())).encode()))
                 .onFailure(rc::fail));
     }
@@ -202,7 +215,7 @@ public class OpenAPIHandler {
                                                         "data", new JsonArray(
                                                                 paging.data().stream().map(o -> JsonObject.of(
                                                                         "unitId", o.getUnitId(),
-                                                                        "timestamp", DATE_TIME_FORMATTER.apply(o.getTimestamp(), zone),
+                                                                        "timestamp", DATE_TIME_FORMATTER_AT_ZONE.apply(o.getTimestamp(), zone),
                                                                         "speed", o.getSpeed(),
                                                                         "location", JsonObject.of(
                                                                                 "longitude", o.getLocation().getLongitude(),
@@ -234,7 +247,7 @@ public class OpenAPIHandler {
                                                             ),
                                                             "properties", JsonObject.of(
                                                                     "unitId", o.getUnitId(),
-                                                                    "timestamp",  DATE_TIME_FORMATTER.apply(o.getTimestamp(), zone),
+                                                                    "timestamp",  DATE_TIME_FORMATTER_AT_ZONE.apply(o.getTimestamp(), zone),
                                                                     "speed", o.getSpeed(),
                                                                     "observedValues", o.getObservedValues()
                                                             )
@@ -291,7 +304,7 @@ public class OpenAPIHandler {
                                                 "hasNext", paging.hasNext(),
                                                 "data", new JsonArray(
                                                         paging.data().stream().map(o -> JsonObject.of(
-                                                                "timestamp", DATE_TIME_FORMATTER.apply(o.getTimestamp(), zone),
+                                                                "timestamp", DATE_TIME_FORMATTER_AT_ZONE.apply(o.getTimestamp(), zone),
                                                                 "speed", o.getSpeed(),
                                                                 "location", JsonObject.of(
                                                                         "longitude", o.getLocation().getLongitude(),
@@ -320,7 +333,7 @@ public class OpenAPIHandler {
                                                             ),
                                                             "properties", JsonObject.of(
                                                                     "unitId", o.getUnitId(),
-                                                                    "timestamp", DATE_TIME_FORMATTER.apply(o.getTimestamp(), zone),
+                                                                    "timestamp", DATE_TIME_FORMATTER_AT_ZONE.apply(o.getTimestamp(), zone),
                                                                     "speed", o.getSpeed(),
                                                                     "observedValues", o.getObservedValues()
                                                             )
@@ -363,7 +376,7 @@ public class OpenAPIHandler {
                                         "size", locations.size(),
                                         "data", new JsonArray(locations.stream().map(l -> JsonObject.of(
                                                 "unitId", l.getUnitId(),
-                                                "timestamp", DATE_TIME_FORMATTER.apply(l.getTimestamp(), zone),
+                                                "timestamp", DATE_TIME_FORMATTER_AT_ZONE.apply(l.getTimestamp(), zone),
                                                 "location", JsonObject.of(
                                                         "longitude", l.getLocation().getLongitude(),
                                                         "latitude", l.getLocation().getLatitude(),
@@ -380,8 +393,8 @@ public class OpenAPIHandler {
                                                     "type", "Feature",
                                                     "properties", JsonObject.of(
                                                             "unitId", entry.getKey(),
-                                                            "fromTime", firstOf(entry.getValue()).map(t -> DATE_TIME_FORMATTER.apply(t.getTimestamp(), zone)).orElse(null),
-                                                            "toTime", lastOf(entry.getValue()).map(t -> DATE_TIME_FORMATTER.apply(t.getTimestamp(), zone)).orElse(null)),
+                                                            "fromTime", firstOf(entry.getValue()).map(t -> DATE_TIME_FORMATTER_AT_ZONE.apply(t.getTimestamp(), zone)).orElse(null),
+                                                            "toTime", lastOf(entry.getValue()).map(t -> DATE_TIME_FORMATTER_AT_ZONE.apply(t.getTimestamp(), zone)).orElse(null)),
                                                     "geometry", JsonObject.of(
                                                             "type", "MultiPoint",
                                                             "coordinates", new JsonArray(entry.getValue().stream().map(l -> JsonArray.of(
@@ -444,7 +457,7 @@ public class OpenAPIHandler {
                                                 "hasNext", paging.hasNext(),
                                                 "data", new JsonArray(
                                                         paging.data().stream().map(l -> JsonObject.of(
-                                                                "timestamp", DATE_TIME_FORMATTER.apply(l.getTimestamp(), zone),
+                                                                "timestamp", DATE_TIME_FORMATTER_AT_ZONE.apply(l.getTimestamp(), zone),
                                                                 "location", JsonObject.of(
                                                                         "longitude", l.getLocation().getLongitude(),
                                                                         "latitude", l.getLocation().getLatitude(),
@@ -460,8 +473,8 @@ public class OpenAPIHandler {
                                         ),
                                         "properties", ofNonEmpty(paging.data()).map(locs -> JsonObject.of(
                                                 "unitId", firstOf(locs).map(UnitLocation::getUnitId).orElse(null),
-                                                "fromTime", firstOf(locs).map(t -> DATE_TIME_FORMATTER.apply(t.getTimestamp(), zone)).orElse(null),
-                                                "toTime", lastOf(locs).map(t -> DATE_TIME_FORMATTER.apply(t.getTimestamp(), zone)).orElse(null)
+                                                "fromTime", firstOf(locs).map(t -> DATE_TIME_FORMATTER_AT_ZONE.apply(t.getTimestamp(), zone)).orElse(null),
+                                                "toTime", lastOf(locs).map(t -> DATE_TIME_FORMATTER_AT_ZONE.apply(t.getTimestamp(), zone)).orElse(null)
                                         )).orElseGet(JsonObject::new),
                                         "geometry", JsonObject.of(
                                                 "type", "MultiPoint",
@@ -564,8 +577,8 @@ public class OpenAPIHandler {
                                 ) : JsonObject.of()).mergeIn(JsonObject.of(
                                         "id", c.getId(),
                                         "name", c.getName(),
-                                        "fromTime", DATE_TIME_FORMATTER.apply(c.getFromTime(), zone),
-                                        "toTime", DATE_TIME_FORMATTER.apply(c.getToTime(), zone)
+                                        "fromTime", DATE_TIME_FORMATTER_AT_ZONE.apply(c.getFromTime(), zone),
+                                        "toTime", DATE_TIME_FORMATTER_AT_ZONE.apply(c.getToTime(), zone)
                                 ))).collect(toList())).encode()))
                         .onFailure(rc::fail));
     }
@@ -731,8 +744,8 @@ public class OpenAPIHandler {
                                 "name", u.getName(),
                                 "imei", u.getImei(),
                                 "description", u.getDescription(),
-                                "fromTime", DATE_TIME_FORMATTER.apply(u.getFromTime(), zone),
-                                "toTime", DATE_TIME_FORMATTER.apply(u.getToTime(), zone)
+                                "fromTime", DATE_TIME_FORMATTER_AT_ZONE.apply(u.getFromTime(), zone),
+                                "toTime", DATE_TIME_FORMATTER_AT_ZONE.apply(u.getToTime(), zone)
                         )).encode()))
                         .onFailure(rc::fail));
     }
@@ -1149,8 +1162,8 @@ public class OpenAPIHandler {
                                 ) : JsonObject.of()).mergeIn(JsonObject.of(
                                         "id", e.getId(),
                                         "status", e.getStatus(),
-                                        "fromTime", DATE_TIME_FORMATTER.apply(e.getFromTime(), zone),
-                                        "toTime", DATE_TIME_FORMATTER.apply(e.getToTime(), zone)
+                                        "fromTime", DATE_TIME_FORMATTER_AT_ZONE.apply(e.getFromTime(), zone),
+                                        "toTime", DATE_TIME_FORMATTER_AT_ZONE.apply(e.getToTime(), zone)
                                 ))).collect(toList())).encode()))
                         .onFailure(rc::fail)
                 );
@@ -1182,8 +1195,8 @@ public class OpenAPIHandler {
                                 "actionId", e.getActionId(),
                                 "unitId", e.getUnitId(),
                                 "status", e.getStatus(),
-                                "fromTime", DATE_TIME_FORMATTER.apply(e.getFromTime(), zone),
-                                "toTime", DATE_TIME_FORMATTER.apply(e.getToTime(), zone)
+                                "fromTime", DATE_TIME_FORMATTER_AT_ZONE.apply(e.getFromTime(), zone),
+                                "toTime", DATE_TIME_FORMATTER_AT_ZONE.apply(e.getToTime(), zone)
                         )).encode()))
                         .onFailure(rc::fail)
                 );
@@ -1235,7 +1248,7 @@ public class OpenAPIHandler {
                                                 "hasNext", paging.hasNext(),
                                                 "data", new JsonArray(
                                                         paging.data().stream().map(o -> JsonObject.of(
-                                                                "timestamp", DATE_TIME_FORMATTER.apply(o.getTimestamp(), zone),
+                                                                "timestamp", DATE_TIME_FORMATTER_AT_ZONE.apply(o.getTimestamp(), zone),
                                                                 "speed", o.getSpeed(),
                                                                 "location", JsonObject.of(
                                                                         "longitude", o.getLocation().getLongitude(),
@@ -1264,7 +1277,7 @@ public class OpenAPIHandler {
                                                         ),
                                                         "properties", JsonObject.of(
                                                                 "unitId", o.getUnitId(),
-                                                                "timestamp", DATE_TIME_FORMATTER.apply(o.getTimestamp(), zone),
+                                                                "timestamp", DATE_TIME_FORMATTER_AT_ZONE.apply(o.getTimestamp(), zone),
                                                                 "speed", o.getSpeed(),
                                                                 "observedValues", o.getObservedValues()
                                                         )
@@ -1320,7 +1333,7 @@ public class OpenAPIHandler {
                                                 "hasNext", paging.hasNext(),
                                                 "data", new JsonArray(
                                                         paging.data().stream().map(l -> JsonObject.of(
-                                                                "timestamp", DATE_TIME_FORMATTER.apply(l.getTimestamp(), zone),
+                                                                "timestamp", DATE_TIME_FORMATTER_AT_ZONE.apply(l.getTimestamp(), zone),
                                                                 "location", JsonObject.of(
                                                                         "longitude", l.getLocation().getLongitude(),
                                                                         "latitude", l.getLocation().getLatitude(),
@@ -1336,8 +1349,8 @@ public class OpenAPIHandler {
                                         ),
                                         "properties", ofNonEmpty(paging.data()).map(locs -> JsonObject.of(
                                                 "unitId", firstOf(locs).map(UnitLocation::getUnitId).orElse(null),
-                                                "fromTime", firstOf(locs).map(t -> DATE_TIME_FORMATTER.apply(t.getTimestamp(), zone)).orElse(null),
-                                                "toTime", lastOf(locs).map(t -> DATE_TIME_FORMATTER.apply(t.getTimestamp(), zone)).orElse(null)
+                                                "fromTime", firstOf(locs).map(t -> DATE_TIME_FORMATTER_AT_ZONE.apply(t.getTimestamp(), zone)).orElse(null),
+                                                "toTime", lastOf(locs).map(t -> DATE_TIME_FORMATTER_AT_ZONE.apply(t.getTimestamp(), zone)).orElse(null)
                                         )).orElseGet(JsonObject::new),
                                         "geometry", JsonObject.of(
                                                 "type", "MultiPoint",
@@ -1377,7 +1390,7 @@ public class OpenAPIHandler {
     public void campaignIdUnitsObservationsPOST(RoutingContext rc) {
         AuthBearerUser user = AuthBearerUser.of(rc.user());
         WSParameters params = WSParameters.wrap(rc.queryParams(), rc.pathParams());
-        // TODO
+
         long campaignId = params.pathParams().campaignId();
 
         final Consumer<List<UnitTelemetry>> validateAndSave = originalTlms -> repo.findCampaignById(campaignId)
@@ -1385,11 +1398,18 @@ public class OpenAPIHandler {
                         .then(tlmsWithinCamp -> repo.findSensorsByCampaignIdGroupByUnitId(campaign.getId())
                                 .onSuccess(campUnitsID -> fluentlyOf(telemetriesAccToSensors(campUnitsID, tlmsWithinCamp))
                                         .then(tlmsToSave -> repo.saveAllTelemetry(tlmsToSave)
-                                                .onSuccess(inserted -> rc.response()
-                                                        .end(JsonObject.of(
-                                                                "saved", inserted,
-                                                                "errors", originalTlms.size() - inserted
-                                                        ).encode()))
+                                                .onSuccess(savedTelemetries -> {
+                                                    JsonArray sensLogObsArr = new JsonArray(UnitTelemetry.Converter
+                                                            .toSensLogObservationAsStream(savedTelemetries)
+                                                            .map(SensLogObservation::toJsonObject).toList()
+                                                    );
+                                                    vertx.eventBus().request(EventBusModulePaths.SENSLOG_OBSERVATIONS, sensLogObsArr)
+                                                            .onSuccess(v -> logger.info(v.body())).onFailure(logger::error);
+                                                    rc.response().end(JsonObject.of(
+                                                            "saved", savedTelemetries.size(),
+                                                            "errors", originalTlms.size() - savedTelemetries.size()
+                                                    ).encode());
+                                                })
                                                 .onFailure(rc::fail)
                                         ))
                                 .onFailure(rc::fail)
@@ -1429,26 +1449,6 @@ public class OpenAPIHandler {
         }
     }
 
-    public void alertsGET(RoutingContext rc) {
-        String host =  hostURLFull(rc.request());
-        AuthBearerUser user = AuthBearerUser.of(rc.user());
-
-        JsonObject paramsJson = new JsonObject();
-        WSParameters params = WSParameters.wrap(rc.queryParams(), rc.pathParams(), paramsJson);
-
-        OffsetDateTime from = params.queryParams().from();
-        OffsetDateTime to = params.queryParams().to();
-        ZoneId zone = params.queryParams().zone();
-
-        int offset = params.queryParams().offset();
-        int limit = params.queryParams().limit();
-
-        SortType sort = params.queryParams().sort();
-        boolean navigationLinks = params.queryParams().navigationLinks();
-
-        // TODO
-    }
-
     public void alertIdGET(RoutingContext rc) {
         String host =  hostURLFull(rc.request());
         AuthBearerUser user = AuthBearerUser.of(rc.user());
@@ -1471,7 +1471,18 @@ public class OpenAPIHandler {
                                 "unitId", a.getUnitId(),
                                 "message", a.getMessage(),
                                 "status", a.getStatus(),
-                                "timestamp", DATE_TIME_FORMATTER.apply(a.getTimestamp(), zone)
+                                "timestamp", DATE_TIME_FORMATTER_AT_ZONE.apply(a.getTimestamp(), zone),
+                                "observation", a.getObservation() == null ? null : JsonObject.of(
+                                        "unitId", a.getObservation().getUnitId(),
+                                        "timestamp", DATE_TIME_FORMATTER_AT_ZONE.apply(a.getObservation().getTimestamp(), zone),
+                                "speed", a.getObservation().getSpeed(),
+                                "location", JsonObject.of(
+                                        "longitude", a.getObservation().getLocation().getLongitude(),
+                                        "latitude", a.getObservation().getLocation().getLatitude(),
+                                        "altitude", a.getObservation().getLocation().getAltitude()
+                                        ),
+                                "observedValues", a.getObservation().getObservedValues()
+                                )
                         )).encode()))
                         .onFailure(rc::fail)
                 );
@@ -1499,7 +1510,7 @@ public class OpenAPIHandler {
                                         "id", a.getId(),
                                         "message", a.getMessage(),
                                         "status", a.getStatus(),
-                                        "timestamp", DATE_TIME_FORMATTER.apply(a.getTimestamp(), zone)
+                                        "timestamp", DATE_TIME_FORMATTER_AT_ZONE.apply(a.getTimestamp(), zone)
                                 ))).collect(toList())).encode()))
                         .onFailure(rc::fail)
                 );
@@ -1526,7 +1537,7 @@ public class OpenAPIHandler {
                                 "id", a.getId(),
                                 "message", a.getMessage(),
                                 "status", a.getStatus(),
-                                "timestamp", DATE_TIME_FORMATTER.apply(a.getTimestamp(), zone)
+                                "timestamp", DATE_TIME_FORMATTER_AT_ZONE.apply(a.getTimestamp(), zone)
                         )).encode()))
                         .onFailure(rc::fail)
                 );
@@ -1557,7 +1568,7 @@ public class OpenAPIHandler {
                                         "unitId", a.getUnitId(),
                                         "message", a.getMessage(),
                                         "status", a.getStatus(),
-                                        "timestamp", DATE_TIME_FORMATTER.apply(a.getTimestamp(), zone)
+                                        "timestamp", DATE_TIME_FORMATTER_AT_ZONE.apply(a.getTimestamp(), zone)
                                 ))).collect(toList())).encode()))
                         .onFailure(rc::fail)
                 );
@@ -1576,24 +1587,43 @@ public class OpenAPIHandler {
                 AlertStatus.CREATED
         );
 
-        Object reqBody = Json.decodeValue(rc.body().buffer());
+        final Object reqBody = Json.decodeValue(rc.body().buffer());
+
+        final Function<UnitAlert, JsonObject> alertToJson = a -> JsonObject.of(
+                "id", a.getId(),
+                "message", a.getMessage(),
+                "status", a.getStatus(),
+                "timestamp", DATE_TIME_FORMATTER.apply(a.getTimestamp())
+        );
+
         if (JsonUtil.isArray(reqBody)) {
             List<UnitAlert> alerts = ((JsonArray)reqBody).stream().map(JsonObject.class::cast).map(jsonToAlert).toList();
-            // TODO send notifications
             repo.saveAlerts(alerts)
-                    .onSuccess(ids -> rc.response().end(JsonObject.of(
-                        "ids", new JsonArray(ids),
-                        "message", String.format("Saved: %s/%s", ids.size(), alerts.size())
-                    ).encode()))
+                    .onSuccess(savedAlerts -> {
+                        JsonArray alertsJson = new JsonArray(savedAlerts.stream().map(alertToJson).toList());
+                        vertx.eventBus().request(SENSLOG_ALERTS, alertsJson, new DeliveryOptions()
+                                        .addHeader("campaignId", String.valueOf(campaignId))
+                                        .addHeader("userId", user != null ? user.getId() : null)
+                                , reply -> {
+                                    if (reply.succeeded()) {
+                                        logger.info(reply.result().body());
+                                    } else {
+                                        logger.error(reply.result().body());
+                                    }
+                                });
+                        rc.response().end(alertsJson.encode());
+                    })
                     .onFailure(rc::fail);
 
         } else if (JsonUtil.isObject(reqBody)) {
             UnitAlert alert = jsonToAlert.apply((JsonObject) reqBody);
-            // TODO send notifications
             repo.saveAlert(alert)
-                    .onSuccess(id -> rc.response().end(JsonObject.of(
-                            "id", 15
-                    ).encode()))
+                    .onSuccess(savedAlert -> {
+                        JsonObject alertJson = alertToJson.apply(savedAlert);
+                        vertx.eventBus().request(SENSLOG_ALERTS, JsonArray.of(alertJson))
+                                .onSuccess(msg -> logger.info(msg.body())).onFailure(logger::error);
+                        rc.response().end(alertJson.encode());
+                    })
                     .onFailure(rc::fail);
 
         } else {
@@ -1626,7 +1656,7 @@ public class OpenAPIHandler {
                                         "id", a.getId(),
                                         "message", a.getMessage(),
                                         "status", a.getStatus(),
-                                        "timestamp", DATE_TIME_FORMATTER.apply(a.getTimestamp(), zone)
+                                        "timestamp", DATE_TIME_FORMATTER_AT_ZONE.apply(a.getTimestamp(), zone)
                                 ))).collect(toList())).encode()))
                         .onFailure(rc::fail)
                 );
@@ -1656,7 +1686,7 @@ public class OpenAPIHandler {
                                     "id", a.getId(),
                                     "message", a.getMessage(),
                                     "status", a.getStatus(),
-                                    "timestamp", DATE_TIME_FORMATTER.apply(a.getTimestamp(), zone)
+                                    "timestamp", DATE_TIME_FORMATTER_AT_ZONE.apply(a.getTimestamp(), zone)
                             ))).collect(toList())).encode()))
                         .onFailure(rc::fail)
                 );

+ 13 - 3
src/main/resources/openAPISpec.yaml

@@ -3,7 +3,7 @@ info:
   version: 1.0.0
   title: SensLog Telemetry
 servers:
-  - url: http://127.0.0.1:8080
+  - url: http://127.0.0.1:8085
   - url: https://theros.wirelessinfo.cz
 paths:
   /info:
@@ -1376,8 +1376,10 @@ paths:
             application/json:
               schema:
                 oneOf:
-                  - $ref: '#/components/schemas/ResponseArrayChange'
-                  - $ref: '#/components/schemas/ResponseSingleChange'
+                  - type: array
+                    items:
+                      $ref: '#/components/schemas/CampaignAlertBasicInfo'
+                  - $ref: '#/components/schemas/CampaignAlertBasicInfo'
         default:
           description: unexpected error
           content:
@@ -3629,6 +3631,8 @@ components:
           type: string
         status:
           $ref: '#/components/schemas/AlertStatus'
+        observation:
+          $ref: '#/components/schemas/UnitDataObservation'
       example:
         self@NavigationLink: "<domain>/alerts/34"
         CampaignUnit@NavigationLink: "<domain>/campaign/1/units/25"
@@ -3638,6 +3642,7 @@ components:
         timestamp: "2011-12-03T10:15:30+01:00"
         message: "Example of an alert message"
         status: "CREATED"
+        observation: {}
 
     AlertInsert:
       type: object
@@ -3725,6 +3730,7 @@ components:
         - build
         - uptime
         - uptimeMillis
+        - authType
       properties:
         name:
           type: string
@@ -3737,12 +3743,16 @@ components:
           format: int64
         uptime:
           type: string
+        authType:
+          type: string
+          enum: [BEARER, NONE]
       example:
         name: "senslog-telemetry"
         version: "1.1.0"
         build: "123456789"
         uptimeMillis: 1684862333
         uptime: "1:20:00"
+        authType: "NONE"
 
     Error:
       type: object

+ 10 - 10
src/test/java/cz/senslog/telemetry/MockSensLogRepository.java

@@ -37,13 +37,13 @@ public class MockSensLogRepository implements SensLogRepository {
     }
 
     @Override
-    public Future<Integer> saveTelemetry(UnitTelemetry data) {
-        return Future.succeededFuture(1);
+    public Future<UnitTelemetry> saveTelemetry(UnitTelemetry data) {
+        return Future.succeededFuture(UnitTelemetry.of(1L, 1L, OffsetDateTime.now(), Location.of(10f, 10f, 10f, 10f), 0, JsonObject.of()));
     }
 
     @Override
-    public Future<Integer> saveAllTelemetry(List<UnitTelemetry> data) {
-        return Future.succeededFuture(1);
+    public Future<List<UnitTelemetry>> saveAllTelemetry(List<UnitTelemetry> data) {
+        return Future.succeededFuture(List.of(UnitTelemetry.of(1L, 1L, OffsetDateTime.now(), Location.of(10f, 10f, 10f, 10f), 0, JsonObject.of())));
     }
 
     @Override
@@ -631,7 +631,7 @@ public class MockSensLogRepository implements SensLogRepository {
     @Override
     public Future<CampaignUnitAlert> findAlertById(long alertId) {
         OffsetDateTime baseTimestamp = OffsetDateTime.ofInstant(BASE_INSTANT_TIMESTAMP, ZoneOffset.UTC);
-        return Future.succeededFuture(CampaignUnitAlert.of(1, 1, 1000, "mock(message)", baseTimestamp, AlertStatus.CREATED));
+        return Future.succeededFuture(CampaignUnitAlert.of(1, 1, 1000, "mock(message)", baseTimestamp, AlertStatus.CREATED, null));
     }
 
     @Override
@@ -645,13 +645,13 @@ public class MockSensLogRepository implements SensLogRepository {
     }
 
     @Override
-    public Future<Long> saveAlert(UnitAlert alert) {
-        return Future.succeededFuture(1L);
+    public Future<UnitAlert> saveAlert(UnitAlert alert) {
+        return Future.succeededFuture(UnitAlert.of(1L, 1L, "", OffsetDateTime.now(), AlertStatus.CREATED));
     }
 
     @Override
-    public Future<List<Long>> saveAlerts(List<UnitAlert> alerts) {
-        return Future.succeededFuture(List.of(1L, 2L, 3L));
+    public Future<List<UnitAlert>> saveAlerts(List<UnitAlert> alerts) {
+        return Future.succeededFuture(List.of(UnitAlert.of(1L, 1L, "", OffsetDateTime.now(), AlertStatus.CREATED)));
     }
 
     @Override
@@ -679,7 +679,7 @@ public class MockSensLogRepository implements SensLogRepository {
     @Override
     public Future<List<CampaignUnitAlert>> findAlertsByCampaignId(long campaignId, OffsetDateTime from, OffsetDateTime to, Set<AlertStatus> statusFilter, SortType sort) {
         OffsetDateTime baseTimestamp = OffsetDateTime.ofInstant(BASE_INSTANT_TIMESTAMP, ZoneOffset.UTC);
-        return Future.succeededFuture(List.of(CampaignUnitAlert.of(1, 1, 1000, "mock(message)", baseTimestamp, AlertStatus.CREATED)));
+        return Future.succeededFuture(List.of(CampaignUnitAlert.of(1, 1, 1000, "mock(message)", baseTimestamp, AlertStatus.CREATED, null)));
     }
 
     @Override

+ 53 - 0
src/test/java/cz/senslog/telemetry/database/repository/MapLogRepositoryTest.java

@@ -3821,4 +3821,57 @@ class MapLogRepositoryTest {
             testContext.completeNow();
         })));
     }
+
+    @Test
+    @DisplayName("DB Test: saveTelemetry#1")
+    void saveTelemetry_NM1(Vertx vertx, VertxTestContext testContext) {
+
+        UnitTelemetry telemetry = UnitTelemetry.of(
+                1000L,
+                OffsetDateTime.of(2024, 5, 1, 0, 0, 0, 0, UTC),
+                Location.of(13.9f, 49.33f, 488f, 0f),
+                0,
+                JsonObject.of("1001", 11.11)
+        );
+
+        repo.saveTelemetry(telemetry).onComplete(testContext.succeeding(data -> testContext.verify(() -> {
+            assertThat(data.getId()).isGreaterThan(0);
+            assertThat(data.getUnitId()).isEqualTo(telemetry.getUnitId());
+            assertThat(data.getTimestamp()).isEqualTo(telemetry.getTimestamp());
+            assertThat(data.getLocation()).isEqualTo(telemetry.getLocation());
+            assertThat(data.getSpeed()).isEqualTo(telemetry.getSpeed());
+            assertThat(data.getObservedValues()).isEqualTo(telemetry.getObservedValues());
+
+            testContext.completeNow();
+        })));
+    }
+
+    @Test
+    @DisplayName("DB Test: saveAllTelemetry#1")
+    void saveAllTelemetry_NM1(Vertx vertx, VertxTestContext testContext) {
+
+        UnitTelemetry telemetry = UnitTelemetry.of(
+                1000L,
+                OffsetDateTime.of(2024, 5, 1, 1, 0, 0, 0, UTC),
+                Location.of(13.9f, 49.33f, 488f, 0f),
+                0,
+                JsonObject.of("1001", 11.11)
+        );
+
+        List<UnitTelemetry> telemetries = List.of(telemetry);
+
+        repo.saveAllTelemetry(telemetries).onComplete(testContext.succeeding(data -> testContext.verify(() -> {
+            assertThat(data.size()).isEqualTo(1);
+
+            UnitTelemetry t = data.get(0);
+            assertThat(t.getId()).isGreaterThan(0);
+            assertThat(t.getUnitId()).isEqualTo(telemetry.getUnitId());
+            assertThat(t.getTimestamp()).isEqualTo(telemetry.getTimestamp());
+            assertThat(t.getLocation()).isEqualTo(telemetry.getLocation());
+            assertThat(t.getSpeed()).isEqualTo(telemetry.getSpeed());
+            assertThat(t.getObservedValues()).isEqualTo(telemetry.getObservedValues());
+
+            testContext.completeNow();
+        })));
+    }
 }

+ 19 - 6
src/test/java/cz/senslog/telemetry/server/Fm4exSocketHandlerTest.java

@@ -2,18 +2,24 @@ package cz.senslog.telemetry.server;
 
 import cz.senslog.telemetry.BinaryDataSet;
 import cz.senslog.telemetry.DataSet;
+import cz.senslog.telemetry.database.domain.Location;
 import cz.senslog.telemetry.database.domain.UnitTelemetry;
 import cz.senslog.telemetry.database.domain.Sensor;
 import cz.senslog.telemetry.database.domain.Unit;
 import cz.senslog.telemetry.database.repository.MapLogRepository;
 import cz.senslog.telemetry.protocol.domain.AVLTelemetryObservation;
 import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import io.vertx.core.json.JsonObject;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
 
+import java.time.OffsetDateTime;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Stream;
 
+import static java.time.ZoneOffset.UTC;
 import static org.junit.jupiter.api.Assertions.*;
 import static org.mockito.Mockito.*;
 
@@ -47,10 +53,17 @@ class Fm4exSocketHandlerTest {
                 )));
 
         when(repo.saveAllTelemetry(obsTelemetryCapture.capture()))
-                .thenReturn(Future.succeededFuture(datasetAVLData8Ex.getObject().length));
+                .thenReturn(Future.succeededFuture(Stream.of(datasetAVLData8Ex.getObject()).map(o -> UnitTelemetry.of(
+                        1L,
+                        1L,
+                        OffsetDateTime.ofInstant(o.getTimestamp(), UTC),
+                        Location.of(o.getLongitude(), o.getLatitude(), o.getAltitude(), o.getAngle()),
+                        o.getSpeed(),
+                        JsonObject.of()
+                )).toList()));
 
 
-        Fm4exSocketHandler handler = Fm4exSocketHandler.create(repo);
+        Fm4exSocketHandler handler = Fm4exSocketHandler.create(mock(Vertx.class), repo);
 
         handler.process(SOCKET_ID, datasetIMEI.getBuffer())
                 .onSuccess(ctx -> assertEquals(SUCCESS, ctx.getByte(0)))
@@ -75,7 +88,7 @@ class Fm4exSocketHandlerTest {
         when(repo.findUnitByIMEI(datasetIMEI.getObject()))
                 .thenReturn(Future.succeededFuture(null));
 
-        Fm4exSocketHandler handler = Fm4exSocketHandler.create(repo);
+        Fm4exSocketHandler handler = Fm4exSocketHandler.create(mock(Vertx.class), repo);
 
         handler.process(SOCKET_ID, datasetIMEI.getBuffer()).onComplete(ctx ->
                 assertEquals(ERROR, ctx.result().getByte(0))
@@ -93,7 +106,7 @@ class Fm4exSocketHandlerTest {
         when(repo.findUnitByIMEI(anyString()))
                 .thenReturn(Future.failedFuture(new Exception("Random database exception")));
 
-        Fm4exSocketHandler handler = Fm4exSocketHandler.create(repo);
+        Fm4exSocketHandler handler = Fm4exSocketHandler.create(mock(Vertx.class), repo);
 
         handler.process(SOCKET_ID, datasetIMEI.getBuffer()).onComplete(ctx ->
                 assertTrue(ctx.failed())
@@ -110,7 +123,7 @@ class Fm4exSocketHandlerTest {
         when(repo.findUnitByIMEI(anyString()))
                 .thenReturn(Future.succeededFuture(Unit.of(TEST_UNIT_ID, datasetIMEI.getObject())));
 
-        Fm4exSocketHandler handler = Fm4exSocketHandler.create(repo);
+        Fm4exSocketHandler handler = Fm4exSocketHandler.create(mock(Vertx.class), repo);
 
         handler.process(SOCKET_ID, datasetIMEI.getBuffer()).onComplete(ctx ->
                 assertEquals(SUCCESS, ctx.result().getByte(0))
@@ -131,7 +144,7 @@ class Fm4exSocketHandlerTest {
 
         when(repo.saveAllTelemetry(anyList())).thenReturn(Future.failedFuture(new Exception("Random database exception")));
 
-        Fm4exSocketHandler handler = Fm4exSocketHandler.create(repo);
+        Fm4exSocketHandler handler = Fm4exSocketHandler.create(mock(Vertx.class), repo);
 
         handler.process(SOCKET_ID, datasetIMEI.getBuffer()).onComplete(ctx ->
                 assertEquals(SUCCESS, ctx.result().getByte(0))

+ 7 - 3
src/test/java/cz/senslog/telemetry/server/ws/OpenAPIHandlerTest.java

@@ -23,6 +23,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.NullSource;
 import org.mockito.Mockito;
 import org.openapi4j.core.exception.EncodeException;
 import org.openapi4j.core.exception.ResolutionException;
@@ -2346,8 +2347,8 @@ class OpenAPIHandlerTest {
         return Stream.of(
                 Arguments.of(
                         Future.succeededFuture(List.of(
-                                CampaignUnitAlert.of(1, 1, 10, "mock(message)", baseTimestamp, AlertStatus.CREATED),
-                                CampaignUnitAlert.of(1, 1, 10, "mock(message)", baseTimestamp.plusHours(1), AlertStatus.CREATED)
+                                CampaignUnitAlert.of(1, 1, 10, "mock(message)", baseTimestamp, AlertStatus.CREATED, null),
+                                CampaignUnitAlert.of(1, 1, 10, "mock(message)", baseTimestamp.plusHours(1), AlertStatus.CREATED, null)
                         )),
                         SUCCESS, JSON
                 ),
@@ -2385,10 +2386,13 @@ class OpenAPIHandlerTest {
 
     private static Stream<Arguments> alertIdGET_dataSource() {
         OffsetDateTime baseTimestamp = OffsetDateTime.ofInstant(BASE_INSTANT_TIMESTAMP, ZoneOffset.UTC);
+        Location location = Location.of(10.5f, 10.f, 450f, 1);
         return Stream.of(
                 Arguments.of(
                         Future.succeededFuture(
-                                CampaignUnitAlert.of(1, 1, 10, "mock(message)", baseTimestamp, AlertStatus.CREATED)
+                                CampaignUnitAlert.of(1, 1, 10, "mock(message)", baseTimestamp, AlertStatus.CREATED,
+                                        UnitTelemetry.of(1, 10, baseTimestamp, location, 0, JsonObject.of())
+                                )
                         ), SUCCESS, JSON
                 ),
                 Arguments.of(

+ 1 - 1
src/test/resources/tests.junit.env

@@ -4,7 +4,7 @@ SERVER_TCP_PORT=9999
 
 # Database properties
 DATABASE_HOST=localhost
-DATABASE_PORT=5432
+DATABASE_PORT=5433
 DATABASE_NAME=junit
 DATABASE_USER=junit_test
 DATABASE_PASSWORD=JUnit