Procházet zdrojové kódy

Added Campaign -> Units -> Sensors validation for new telemetries

Lukas Cerny před 1 rokem
rodič
revize
2637ce5da5

+ 4 - 0
src/main/java/cz/senslog/telemetry/database/domain/Sensor.java

@@ -17,6 +17,10 @@ public class Sensor {
         return of(sensorId, name, type, -1, null, null);
     }
 
+    public static Sensor of(long sensorId, String name) {
+        return of(sensorId, name, null, -1, null, null);
+    }
+
     public static Sensor of(long sensorId, String name, int ioId) {
         return of(sensorId, name, null, ioId, null, null);
     }

+ 25 - 4
src/main/java/cz/senslog/telemetry/database/repository/MapLogRepository.java

@@ -16,16 +16,14 @@ import org.apache.logging.log4j.Logger;
 
 import java.time.OffsetDateTime;
 import java.time.ZoneId;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
 import static java.lang.String.format;
 import static java.util.Optional.of;
-import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.*;
 
 public class MapLogRepository implements SensLogRepository {
 
@@ -516,6 +514,16 @@ public class MapLogRepository implements SensLogRepository {
                 );
     }
 
+    @Override
+    public Future<Set<Long>> findUnitsIDByCampaignId(long campaignId) {
+        return client.preparedQuery("SELECT distinct unit_id FROM maplog.unit_to_campaign WHERE campaign_id = $1 ORDER BY unit_id")
+                .execute(Tuple.of(campaignId))
+                .map(rs -> StreamSupport.stream(rs.spliterator(), false)
+                        .map((row) -> row.getLong("unit_id"))
+                        .collect(Collectors.toSet())
+                );
+    }
+
     private static final Function<Row, CampaignUnit> ROW_TO_CAMPAIGN_UNIT = (row) -> CampaignUnit.of(
             row.getLong("unit_id"),
             row.getLong("campaign_id"),
@@ -589,6 +597,19 @@ public class MapLogRepository implements SensLogRepository {
     }
 
     @Override
+    public Future<Map<Long, List<Sensor>>> findSensorsByCampaignIdGroupByUnitId(long campaignId) {
+        return client.preparedQuery("SELECT uts.unit_id, s.sensor_id, s.name AS sensor_name FROM maplog.unit_to_campaign AS utc " +
+                "JOIN maplog.unit_to_sensor AS uts ON uts.unit_id = utc.unit_id " +
+                "JOIN maplog.sensor AS s ON s.sensor_id = uts.sensor_id " +
+                "WHERE utc.campaign_id = $1")
+                .execute(Tuple.of(campaignId))
+                .map(rs -> StreamSupport.stream(rs.spliterator(), false)
+                        .collect(groupingBy(r -> r.getLong("unit_id"),
+                                mapping(r -> Sensor.of(r.getLong("sensor_id"), r.getString("sensor_name")), toList())))
+                );
+    }
+
+    @Override
     public Future<List<Campaign>> allCampaigns() {
         return client.query("SELECT id, description, from_time, to_time FROM maplog.campaign ORDER BY id")
                 .execute()

+ 11 - 0
src/main/java/cz/senslog/telemetry/database/repository/MockMapLogRepository.java

@@ -13,6 +13,7 @@ import java.time.ZoneId;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class MockMapLogRepository implements SensLogRepository {
 
@@ -152,6 +153,11 @@ public class MockMapLogRepository implements SensLogRepository {
     }
 
     @Override
+    public Future<Map<Long, List<Sensor>>> findSensorsByCampaignIdGroupByUnitId(long campaignId) {
+        return Future.succeededFuture(Collections.emptyMap());
+    }
+
+    @Override
     public Future<List<Sensor>> findSensorsByUnitId(long unitId) {
         return Future.succeededFuture(Collections.emptyList());
     }
@@ -242,6 +248,11 @@ public class MockMapLogRepository implements SensLogRepository {
     }
 
     @Override
+    public Future<Set<Long>> findUnitsIDByCampaignId(long campaignId) {
+        return Future.succeededFuture(Collections.emptySet());
+    }
+
+    @Override
     public Future<CampaignUnit> findUnitByIdAndCampaignId(long unitId, long campaignId) {
         return Future.succeededFuture(CampaignUnit.of(unitId, campaignId, "mock(name)", "mock(description)", OffsetDateTime.now(), OffsetDateTime.now().plusDays(1)));
     }

+ 3 - 0
src/main/java/cz/senslog/telemetry/database/repository/SensLogRepository.java

@@ -10,6 +10,7 @@ import java.time.OffsetDateTime;
 import java.time.ZoneId;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public interface SensLogRepository {
 
@@ -26,6 +27,7 @@ public interface SensLogRepository {
     Future<Unit> findUnitByIMEI(String imei);
     Future<List<Unit>> findUnitsBySensorId(long sensorId);
     Future<List<CampaignUnit>> findUnitsByCampaignId(long campaignId);
+    Future<Set<Long>> findUnitsIDByCampaignId(long campaignId);
     Future<CampaignUnit> findUnitByIdAndCampaignId(long unitId, long campaignId);
     Future<Unit> findUnitByIdAndDriverId(long unitId, int driverId);
     Future<Unit> findUnitByIdAndDriverIdAndActionId(long unitId, int driverId, int actionId);
@@ -36,6 +38,7 @@ public interface SensLogRepository {
     Future<Sensor> findSensorById(long sensorId);
     Future<Sensor> findSensorByIOAndUnitId(int ioID, long unitId);
     Future<Map<Long, Sensor>> findSensorsByUnitIdGroupById(long unitId);
+    Future<Map<Long, List<Sensor>>> findSensorsByCampaignIdGroupByUnitId(long campaignId);
     Future<List<Sensor>> findSensorsByUnitId(long unitId);
     Future<List<Sensor>> findSensorsByPhenomenonId(long phenomenonId);
     Future<List<Sensor>> findSensorsByCampaignIdAndUnitId(long campaignId, long unitId);

+ 51 - 3
src/main/java/cz/senslog/telemetry/database/validation/UnitTelemetryValidation.java

@@ -1,14 +1,17 @@
 package cz.senslog.telemetry.database.validation;
 
 import cz.senslog.telemetry.database.domain.Campaign;
+import cz.senslog.telemetry.database.domain.Sensor;
 import cz.senslog.telemetry.database.domain.UnitTelemetry;
 import cz.senslog.telemetry.utils.ComparisonOperators;
+import cz.senslog.telemetry.utils.Tuple;
+import io.vertx.core.json.JsonObject;
 
 import java.time.OffsetDateTime;
-import java.util.List;
+import java.util.*;
 
 import static cz.senslog.telemetry.utils.ComparisonOperators.ofBoundary;
-import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.*;
 
 public class UnitTelemetryValidation {
 
@@ -27,4 +30,49 @@ public class UnitTelemetryValidation {
 
         return telemetries.stream().filter(v -> campBound.isWithinEqual(v.getTimestamp())).collect(toList());
     }
-}
+
+    public static List<UnitTelemetry> telemetriesAccToSensors(Map<Long, List<Sensor>> sensorsByUnitId, List<UnitTelemetry> telemetries) {
+        List<UnitTelemetry> newTelemetries = new ArrayList<>(telemetries.size());
+        Map<Long, Tuple<Set<String>, Map<String, String>>> sensorSetByUnitId = new HashMap<>(sensorsByUnitId.size());
+        for (Map.Entry<Long, List<Sensor>> sensorsEntry : sensorsByUnitId.entrySet()) {
+            int sensorsCount = sensorsEntry.getValue().size();
+            Set<String> sensorsId = new HashSet<>(sensorsCount);
+            Map<String, String> sensorsName = new HashMap<>(sensorsCount);
+            for (Sensor sensor : sensorsEntry.getValue()) {
+                String sensorId = Long.toString(sensor.getSensorId());
+                sensorsId.add(sensorId);
+                sensorsName.put(sensor.getName(), sensorId);
+            }
+            sensorSetByUnitId.put(sensorsEntry.getKey(), Tuple.of(sensorsId, sensorsName));
+        }
+
+        for (UnitTelemetry tel : telemetries) {
+            Tuple<Set<String>, Map<String, String>> unitSensors = sensorSetByUnitId.get(tel.getUnitId());
+            if (unitSensors == null) continue;
+
+            Set<String> sensorIDs = unitSensors.item1();
+            Map<String, String> sensorNames = unitSensors.item2();
+
+            JsonObject newObservedValues = JsonObject.of();
+            for (Map.Entry<String, Object> observedValue : tel.getObservedValues()) {
+                String key = observedValue.getKey();
+                if (sensorIDs.contains(key)) {
+                    newObservedValues.put(key, observedValue.getValue());
+                } else if (sensorNames.containsKey(key)) {
+                    newObservedValues.put(sensorNames.get(key), observedValue.getValue());
+                }
+            }
+
+            if (!newObservedValues.isEmpty()) {
+                newTelemetries.add(UnitTelemetry.of(
+                        tel.getUnitId(),
+                        tel.getTimestamp(),
+                        tel.getLocation(),
+                        tel.getSpeed(),
+                        newObservedValues
+                ));
+            }
+        }
+        return newTelemetries;
+    }
+}

+ 34 - 39
src/main/java/cz/senslog/telemetry/server/ws/OpenAPIHandler.java

@@ -7,8 +7,6 @@ import cz.senslog.telemetry.database.domain.UnitLocation;
 import cz.senslog.telemetry.database.domain.UnitTelemetry;
 import cz.senslog.telemetry.database.repository.SensLogRepository;
 import cz.senslog.telemetry.database.domain.Filter;
-import cz.senslog.telemetry.database.validation.UnitTelemetryValidation;
-import cz.senslog.telemetry.utils.FluentInvoke;
 import cz.senslog.telemetry.utils.TernaryCondition;
 import io.vertx.core.http.HttpServerRequest;
 import io.vertx.core.json.JsonArray;
@@ -22,10 +20,9 @@ import java.util.*;
 import java.util.function.*;
 import java.util.stream.Collectors;
 
-import static cz.senslog.telemetry.database.validation.UnitTelemetryValidation.telemetriesWithinCampaign;
+import static cz.senslog.telemetry.database.validation.UnitTelemetryValidation.*;
 import static cz.senslog.telemetry.server.ws.ContentType.GEOJSON;
 import static cz.senslog.telemetry.server.ws.ContentType.JSON;
-import static cz.senslog.telemetry.utils.ComparisonOperators.*;
 import static cz.senslog.telemetry.utils.FluentInvoke.fluentlyOf;
 import static cz.senslog.telemetry.utils.TernaryCondition.ternaryIf;
 import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE;
@@ -1523,54 +1520,52 @@ public class OpenAPIHandler {
     public void campaignIdUnitsObservationsPOST(RoutingContext rc) {
         long campaignId = Long.parseLong(rc.pathParam("campaignId"));
 
+        final Consumer<List<UnitTelemetry>> validateAndSave = originalTlms -> repo.findCampaignById(campaignId)
+                .onSuccess(campaign -> fluentlyOf(telemetriesWithinCampaign(campaign, originalTlms))
+                        .then(tlmsWithinCamp -> repo.findSensorsByCampaignIdGroupByUnitId(campaign.getId())
+                                .onSuccess(campUnitsID -> fluentlyOf(telemetriesAccToSensors(campUnitsID, tlmsWithinCamp))
+                                        .then(tlmsToSave -> repo.saveAllTelemetry(tlmsToSave)
+                                                .onSuccess(inserted -> rc.response()
+                                                        .end(JsonObject.of(
+                                                                "saved", inserted,
+                                                                "errors", originalTlms.size() - inserted
+                                                        ).encode()))
+                                                .onFailure(rc::fail)
+                                        ))
+                                .onFailure(rc::fail)
+                        ))
+                .onFailure(rc::fail);
+
         switch (ContentType.ofType(rc.request().getHeader(CONTENT_TYPE))) {
             case JSON -> Optional.of(rc.body().asJsonArray())
                     .map(jsonArray -> jsonArray.stream().filter(JsonObject.class::isInstance).map(JsonObject.class::cast)
-                            .map(f -> UnitTelemetry.of(
-                                    f.getLong("unitId"),
-                                    OffsetDateTime.parse(f.getString("timestamp")),
-                                    Location.of(
-                                            f.getJsonObject("location").getFloat("longitude"),
-                                            f.getJsonObject("location").getFloat("latitude"),
-                                            f.getJsonObject("location").getFloat("altitude")
-                                    ),
-                                    f.getInteger("speed"),
-                                    f.getJsonObject("observedValues")
+                            .map(tel -> UnitTelemetry.of(
+                                    tel.getLong("unitId"),
+                                    OffsetDateTime.parse(tel.getString("timestamp")),
+                                    Optional.of(tel.getJsonObject("location")).map(location -> Location.of(
+                                            location.getFloat("longitude"),
+                                            location.getFloat("latitude"),
+                                            location.getFloat("altitude")
+                                    )).get(),
+                                    tel.getInteger("speed"),
+                                    tel.getJsonObject("observedValues")
                             )).sorted(comparing(UnitTelemetry::getTimestamp)).collect(toList()))
-                    .ifPresent(orgTels -> repo.findCampaignById(campaignId)
-                            .onSuccess(campaign -> fluentlyOf(telemetriesWithinCampaign(campaign, orgTels))
-                                        .then(flTels -> repo.saveAllTelemetry(flTels)
-                                                .onSuccess(count -> rc.response().end(JsonObject.of(
-                                                        "saved", count,
-                                                        "errors", orgTels.size() - count
-                                                ).encode()))
-                                                .onFailure(rc::fail))
-                            )
-                    .onFailure(rc::fail));
+                    .ifPresent(validateAndSave);
 
             case GEOJSON -> Optional.of(rc.body().asJsonObject()).map(j -> j.getJsonArray("features"))
                     .map(jsonArray -> jsonArray.stream().filter(JsonObject.class::isInstance).map(JsonObject.class::cast)
                             .map(feature -> UnitTelemetry.of(
                                     feature.getJsonObject("properties").getLong("unitId"),
                                     OffsetDateTime.parse(feature.getJsonObject("properties").getString("timestamp")),
-                                    Location.of(
-                                            feature.getJsonObject("geometry").getJsonArray("coordinates").getFloat(0),
-                                            feature.getJsonObject("geometry").getJsonArray("coordinates").getFloat(1),
-                                            feature.getJsonObject("geometry").getJsonArray("coordinates").getFloat(2)
-                                    ),
+                                    Optional.of(feature.getJsonObject("location")).map(location -> Location.of(
+                                            location.getFloat("longitude"),
+                                            location.getFloat("latitude"),
+                                            location.getFloat("altitude")
+                                    )).get(),
                                     feature.getJsonObject("properties").getInteger("speed"),
                                     feature.getJsonObject("observedValues")
                             )).sorted(comparing(UnitTelemetry::getTimestamp)).collect(toList()))
-                    .ifPresent(orgTels -> repo.findCampaignById(campaignId)
-                            .onSuccess(campaign -> fluentlyOf(telemetriesWithinCampaign(campaign, orgTels))
-                                    .then(flTels -> repo.saveAllTelemetry(flTels)
-                                            .onSuccess(count -> rc.response().end(JsonObject.of(
-                                                    "saved", count,
-                                                    "errors", orgTels.size() - count
-                                            ).encode()))
-                                            .onFailure(rc::fail))
-                            )
-                            .onFailure(rc::fail));
+                    .ifPresent(validateAndSave);
         }
     }
 }

+ 1 - 17
src/main/java/cz/senslog/telemetry/utils/Tuple.java

@@ -2,28 +2,12 @@ package cz.senslog.telemetry.utils;
 
 import java.util.Objects;
 
-public final class Tuple<A, B> {
-
-    private final A item1;
-    private final B item2;
-
-    private Tuple(A item1, B item2) {
-        this.item1 = item1;
-        this.item2 = item2;
-    }
+public record Tuple<A, B>(A item1, B item2) {
 
     public static <A, B> Tuple<A,B> of(A item1, B item2) {
         return new Tuple<>(item1, item2);
     }
 
-    public A getItem1() {
-        return item1;
-    }
-
-    public B getItem2() {
-        return item2;
-    }
-
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;