|
|
@@ -8,6 +8,7 @@ import cz.senslog.telemetry.database.domain.*;
|
|
|
import cz.senslog.telemetry.database.repository.SensLogRepository;
|
|
|
import cz.senslog.telemetry.module.EventBusModulePaths;
|
|
|
import cz.senslog.telemetry.utils.CascadeCondition;
|
|
|
+import cz.senslog.telemetry.utils.Tuple;
|
|
|
import io.vertx.core.Future;
|
|
|
import io.vertx.core.Vertx;
|
|
|
import io.vertx.core.eventbus.DeliveryOptions;
|
|
|
@@ -23,10 +24,7 @@ import org.apache.logging.log4j.Logger;
|
|
|
|
|
|
import java.text.ParseException;
|
|
|
import java.text.SimpleDateFormat;
|
|
|
-import java.time.OffsetDateTime;
|
|
|
-import java.time.ZoneId;
|
|
|
-import java.time.ZoneOffset;
|
|
|
-import java.time.ZonedDateTime;
|
|
|
+import java.time.*;
|
|
|
import java.time.format.DateTimeFormatter;
|
|
|
import java.util.*;
|
|
|
import java.util.function.*;
|
|
|
@@ -47,8 +45,10 @@ import static io.vertx.core.http.HttpHeaders.ACCEPT;
|
|
|
import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE;
|
|
|
import static java.time.OffsetDateTime.ofInstant;
|
|
|
import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME;
|
|
|
+import static java.time.format.DateTimeFormatter.ofPattern;
|
|
|
import static java.util.Collections.emptyList;
|
|
|
import static java.util.Comparator.comparing;
|
|
|
+import static java.util.Optional.ofNullable;
|
|
|
import static java.util.stream.Collectors.*;
|
|
|
|
|
|
public class OpenAPIHandler {
|
|
|
@@ -1437,22 +1437,20 @@ public class OpenAPIHandler {
|
|
|
tel.getInteger("speed"),
|
|
|
tel.getJsonObject("observedValues")
|
|
|
)).sorted(comparing(UnitTelemetry::getTimestamp)).collect(toList()))
|
|
|
- .ifPresent(validateAndSave);
|
|
|
+ .ifPresentOrElse(validateAndSave, () -> {throw new IllegalArgumentException("Invalid input data."); } );
|
|
|
|
|
|
case GEOJSON -> Optional.of(rc.body().asJsonObject()).map(j -> j.getJsonArray("features"))
|
|
|
.map(jsonArray -> jsonArray.stream().filter(JsonObject.class::isInstance).map(JsonObject.class::cast)
|
|
|
.map(feature -> UnitTelemetry.of(
|
|
|
feature.getJsonObject("properties").getLong("unitId"),
|
|
|
OffsetDateTime.parse(feature.getJsonObject("properties").getString("timestamp")),
|
|
|
- Optional.of(feature.getJsonObject("location")).map(location -> Location.of(
|
|
|
- location.getFloat("longitude"),
|
|
|
- location.getFloat("latitude"),
|
|
|
- location.getFloat("altitude")
|
|
|
+ Optional.of(feature.getJsonObject("geometry").getJsonArray("coordinates")).map(location -> Location.of(
|
|
|
+ location.getFloat(0), location.getFloat(1), location.getFloat(2), 0
|
|
|
)).get(),
|
|
|
feature.getJsonObject("properties").getInteger("speed"),
|
|
|
feature.getJsonObject("observedValues")
|
|
|
)).sorted(comparing(UnitTelemetry::getTimestamp)).collect(toList()))
|
|
|
- .ifPresent(validateAndSave);
|
|
|
+ .ifPresentOrElse(validateAndSave, () -> {throw new IllegalArgumentException("Invalid input data."); } );
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -1781,7 +1779,7 @@ public class OpenAPIHandler {
|
|
|
value = Double.parseDouble(rc.queryParam("value").get(0));
|
|
|
unitId = Long.parseLong(rc.queryParam("unit_id").get(0));
|
|
|
sensorIdStr = rc.queryParam("sensor_id").get(0);
|
|
|
- timestamp = OffsetDateTime.parse(rc.queryParam("date").get(0), ISO_OFFSET_DATE_TIME);
|
|
|
+ timestamp = ZonedDateTime.of(LocalDateTime.parse(rc.queryParam("date").get(0), ofPattern("yyyy-MM-dd[+][ ]HH:mm:ss")), ZoneOffset.systemDefault()).toOffsetDateTime();
|
|
|
} catch (Exception e) {
|
|
|
logger.catching(e);
|
|
|
rc.response().end("false"); return;
|
|
|
@@ -1791,17 +1789,69 @@ public class OpenAPIHandler {
|
|
|
|
|
|
repo.findMobileUnitByStaticUnitIdAndTimestamp(unitId, timestamp)
|
|
|
.compose(u -> repo
|
|
|
- .findObservationsByCampaignIdAndUnitId(u.getCampaignId(), u.getMobileUnitId(), timestamp.minusMinutes(minuteDiff), timestamp.plusMinutes(minuteDiff), 0, 1, DESC, emptyList()))
|
|
|
- .compose(mobileTelemetry -> {
|
|
|
- if (mobileTelemetry.isEmpty()) {
|
|
|
- // TODO no unit location within -30 - +30
|
|
|
- return Future.succeededFuture(false);
|
|
|
- } else {
|
|
|
- UnitTelemetry lastUnitTelemetry = mobileTelemetry.get(0);
|
|
|
- return repo.saveTelemetry(UnitTelemetry.of(unitId, timestamp, lastUnitTelemetry.getLocation(), lastUnitTelemetry.getSpeed(), JsonObject.of(sensorIdStr, value))).map(r -> r.getId() > 0);
|
|
|
- }
|
|
|
- })
|
|
|
+ .findNearestLocationToTimestampByCampaignIdAndUnitId(u.getCampaignId(), u.getMobileUnitId(), timestamp, timestamp.minusMinutes(minuteDiff), timestamp.plusMinutes(minuteDiff))
|
|
|
+ .map(t -> t.map(UnitLocation::getLocation).orElseGet(u::getDefaultLocation)))
|
|
|
+ .compose(l -> repo.saveTelemetry(UnitTelemetry.of(unitId, timestamp, l, 0, JsonObject.of(sensorIdStr, value))).map(r -> r.getId() > 0))
|
|
|
.onSuccess(res -> rc.response().end(res.toString()))
|
|
|
.onFailure(rc::fail);
|
|
|
}
|
|
|
+
|
|
|
+ public void campaignIdUnitsObservationsLocationsPUT(RoutingContext rc) {
|
|
|
+ AuthBearerUser user = AuthBearerUser.of(rc.user());
|
|
|
+ WSParameters params = WSParameters.wrap(rc.queryParams(), rc.pathParams());
|
|
|
+
|
|
|
+ long campaignId = params.pathParams().campaignId();
|
|
|
+
|
|
|
+ final Consumer<List<UnitTelemetry>> validateAndUpdate = tlsOrig -> repo.findCampaignById(campaignId)
|
|
|
+ .onSuccess(campaign -> Future.all(telemetriesWithinCampaign(campaign, tlsOrig).stream()
|
|
|
+ .collect(Collectors.groupingBy(UnitTelemetry::getUnitId, Collectors.mapping(Function.identity(), Collectors.toList())))
|
|
|
+ .entrySet().stream().map(entry -> {
|
|
|
+ long unitId = entry.getKey();
|
|
|
+ List<UnitTelemetry> tlsToUpdate = entry.getValue();
|
|
|
+ return repo.updateLocations(campaignId, unitId, tlsToUpdate);
|
|
|
+ }).toList())
|
|
|
+ .onSuccess(f -> rc.response().end(JsonObject.of(
|
|
|
+ "total", f.list().stream().mapToLong(Long.class::cast).sum()
|
|
|
+ ).encode()))
|
|
|
+ .onFailure(rc::fail)
|
|
|
+ )
|
|
|
+ .onFailure(rc::fail);
|
|
|
+
|
|
|
+ switch (ContentType.ofType(rc.request().getHeader(CONTENT_TYPE))) {
|
|
|
+ case JSON -> ofNullable(rc.body().asJsonArray())
|
|
|
+ .map(arr -> arr.stream()
|
|
|
+ .filter(JsonObject.class::isInstance).map(JsonObject.class::cast)
|
|
|
+ .map(tel -> UnitTelemetry.of(
|
|
|
+ tel.getLong("unitId"),
|
|
|
+ OffsetDateTime.parse(tel.getString("timestamp")),
|
|
|
+ Optional.of(tel.getJsonObject("location")).map(location -> Location.of(
|
|
|
+ location.getFloat("longitude"),
|
|
|
+ location.getFloat("latitude"),
|
|
|
+ location.getFloat("altitude")
|
|
|
+ )).get(),
|
|
|
+ 0, JsonObject.of()
|
|
|
+ )).sorted(comparing(UnitTelemetry::getTimestamp)).collect(toList()))
|
|
|
+ .ifPresentOrElse(validateAndUpdate, () -> {throw new IllegalArgumentException("Invalid input data."); } );
|
|
|
+
|
|
|
+ case GEOJSON -> Optional.of(rc.body().asJsonObject()).map(j -> j.getJsonArray("features", JsonArray.of()))
|
|
|
+ .map(jsonArray -> jsonArray.stream().filter(JsonObject.class::isInstance).map(JsonObject.class::cast)
|
|
|
+ .map(feature -> UnitTelemetry.of(
|
|
|
+ feature.getJsonObject("properties").getLong("unitId"),
|
|
|
+ OffsetDateTime.parse(feature.getJsonObject("properties").getString("timestamp")),
|
|
|
+ Optional.of(feature.getJsonObject("geometry").getJsonArray("coordinates")).map(location -> Location.of(
|
|
|
+ location.getFloat(0), location.getFloat(1), location.getFloat(2), 0
|
|
|
+ )).get(),
|
|
|
+ 0, JsonObject.of()
|
|
|
+ )).sorted(comparing(UnitTelemetry::getTimestamp)).collect(toList()))
|
|
|
+ .ifPresentOrElse(validateAndUpdate, () -> {throw new IllegalArgumentException("Invalid input data."); } );
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // ok 1. default location for mobile unit if the real one does not exist yet (or at all)
|
|
|
+ // 2. add POST endpoint pro /observations/location
|
|
|
+ // 2.1 mapping loc(A) <--> loc(B) --> change all telemetry observations between
|
|
|
+
|
|
|
+ // 4. connector PPL ->> telemetry/location
|
|
|
+ // 3. prepare example of endpoint MarketPlace ->>> telemetry
|
|
|
+
|
|
|
}
|