|
@@ -2,6 +2,7 @@ package cz.senslog.telemetry.database.repository;
|
|
|
|
|
|
|
|
import cz.senslog.telemetry.database.DataNotFoundException;
|
|
import cz.senslog.telemetry.database.DataNotFoundException;
|
|
|
import cz.senslog.telemetry.database.PagingRetrieve;
|
|
import cz.senslog.telemetry.database.PagingRetrieve;
|
|
|
|
|
+import cz.senslog.telemetry.database.SortType;
|
|
|
import cz.senslog.telemetry.database.domain.*;
|
|
import cz.senslog.telemetry.database.domain.*;
|
|
|
import io.vertx.core.Future;
|
|
import io.vertx.core.Future;
|
|
|
import io.vertx.core.json.JsonObject;
|
|
import io.vertx.core.json.JsonObject;
|
|
@@ -17,6 +18,7 @@ import java.time.OffsetDateTime;
|
|
|
import java.time.ZoneId;
|
|
import java.time.ZoneId;
|
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
|
|
|
+import java.util.Objects;
|
|
|
import java.util.Optional;
|
|
import java.util.Optional;
|
|
|
import java.util.function.Function;
|
|
import java.util.function.Function;
|
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
@@ -59,8 +61,8 @@ public class MapLogRepository implements SensLogRepository {
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
|
public Future<Integer> saveTelemetry(ObsTelemetry data) {
|
|
public Future<Integer> saveTelemetry(ObsTelemetry data) {
|
|
|
- return client.preparedQuery("INSERT INTO maplog.obs_telemetry(time_stamp, unit_id, observed_values, the_geom) " +
|
|
|
|
|
- "VALUES ($1, $2, $3::json, ST_SetSRID(ST_MakePoint($4, $5, $6, $7), 4326)) RETURNING (obs_id)")
|
|
|
|
|
|
|
+ return client.preparedQuery("INSERT INTO maplog.obs_telemetry(time_stamp, unit_id, observed_values, the_geom, speed) " +
|
|
|
|
|
+ "VALUES ($1, $2, $3::json, ST_SetSRID(ST_MakePoint($4, $5, $6), 4326), $7) RETURNING (obs_id)")
|
|
|
.execute(Tuple.of(
|
|
.execute(Tuple.of(
|
|
|
data.getTimestamp(),
|
|
data.getTimestamp(),
|
|
|
data.getUnitId(),
|
|
data.getUnitId(),
|
|
@@ -93,12 +95,21 @@ public class MapLogRepository implements SensLogRepository {
|
|
|
d.getSpeed()
|
|
d.getSpeed()
|
|
|
)).collect(toList());
|
|
)).collect(toList());
|
|
|
return client.preparedQuery("INSERT INTO maplog.obs_telemetry(time_stamp, unit_id, observed_values, the_geom, speed) " +
|
|
return client.preparedQuery("INSERT INTO maplog.obs_telemetry(time_stamp, unit_id, observed_values, the_geom, speed) " +
|
|
|
- "VALUES ($1, $2, $3::json, ST_SetSRID(ST_MakePoint($4, $5, $6, $7), 4326))")
|
|
|
|
|
|
|
+ "VALUES ($1, $2, $3::json, ST_SetSRID(ST_MakePoint($4, $5, $6), 4326), $7)")
|
|
|
.executeBatch(tuples)
|
|
.executeBatch(tuples)
|
|
|
.map(SqlResult::rowCount);
|
|
.map(SqlResult::rowCount);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
|
|
|
+ public Future<Boolean> createSensor(Sensor sensor, long unitId) {
|
|
|
|
|
+ return client.preparedQuery("WITH rows AS " +
|
|
|
|
|
+ "(INSERT INTO maplog.sensor(sensor_id, io_id, sensor_name, phenomenon_id) VALUES ($1, $2, $3, $4) RETURNING sensor_id) " +
|
|
|
|
|
+ "INSERT INTO maplog.unit_to_sensor(sensor_id, unit_id) SELECT sensor_id, $5 FROM rows RETURNING sensor_id")
|
|
|
|
|
+ .execute(Tuple.of(sensor.getSensorId(), sensor.getIoID(), sensor.getName(), 1, unitId))
|
|
|
|
|
+ .map(r -> r.iterator().next().getInteger(0) > 0);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @Override
|
|
|
public Future<List<Unit>> loadAllUnits() {
|
|
public Future<List<Unit>> loadAllUnits() {
|
|
|
return client.query("SELECT unit_id FROM maplog.unit")
|
|
return client.query("SELECT unit_id FROM maplog.unit")
|
|
|
.execute()
|
|
.execute()
|
|
@@ -124,14 +135,14 @@ public class MapLogRepository implements SensLogRepository {
|
|
|
private static final Function<Row, Sensor> ROW_TO_SENSOR = (row) -> Sensor.of(
|
|
private static final Function<Row, Sensor> ROW_TO_SENSOR = (row) -> Sensor.of(
|
|
|
row.getLong("sensor_id"),
|
|
row.getLong("sensor_id"),
|
|
|
row.getString("sensor_name"),
|
|
row.getString("sensor_name"),
|
|
|
- row.getInteger("sensor_type_id")
|
|
|
|
|
|
|
+ row.getInteger("io_id")
|
|
|
);
|
|
);
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
|
public Future<Sensor> findSensorByIOAndUnitId(int ioID, long unitId) {
|
|
public Future<Sensor> findSensorByIOAndUnitId(int ioID, long unitId) {
|
|
|
- return client.preparedQuery("SELECT s.sensor_id, s.sensor_name, s.sensor_type_id FROM maplog.sensor AS s " +
|
|
|
|
|
|
|
+ return client.preparedQuery("SELECT s.sensor_id, s.sensor_name, s.io_id FROM maplog.sensor AS s " +
|
|
|
"JOIN maplog.unit_to_sensor uts ON s.sensor_id = uts.sensor_id " +
|
|
"JOIN maplog.unit_to_sensor uts ON s.sensor_id = uts.sensor_id " +
|
|
|
- "WHERE s.sensor_type_id = $1 AND uts.unit_id = $2")
|
|
|
|
|
|
|
+ "WHERE s.io_id = $1 AND uts.unit_id = $2")
|
|
|
.execute(Tuple.of(ioID, unitId))
|
|
.execute(Tuple.of(ioID, unitId))
|
|
|
.map(RowSet::iterator)
|
|
.map(RowSet::iterator)
|
|
|
.map(iterator -> iterator.hasNext() ? ROW_TO_SENSOR.apply(iterator.next()) : null);
|
|
.map(iterator -> iterator.hasNext() ? ROW_TO_SENSOR.apply(iterator.next()) : null);
|
|
@@ -139,7 +150,7 @@ public class MapLogRepository implements SensLogRepository {
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
|
public Future<List<Sensor>> findSensorsByUnitId(long unitId) {
|
|
public Future<List<Sensor>> findSensorsByUnitId(long unitId) {
|
|
|
- return client.preparedQuery("SELECT s.sensor_id, s.sensor_name, s.sensor_type_id FROM maplog.unit_to_sensor AS uts " +
|
|
|
|
|
|
|
+ return client.preparedQuery("SELECT s.sensor_id, s.sensor_name, s.io_id FROM maplog.unit_to_sensor AS uts " +
|
|
|
"JOIN maplog.sensor s on s.sensor_id = uts.sensor_id " +
|
|
"JOIN maplog.sensor s on s.sensor_id = uts.sensor_id " +
|
|
|
"WHERE UTS.unit_id = $1")
|
|
"WHERE UTS.unit_id = $1")
|
|
|
.execute(Tuple.of(unitId))
|
|
.execute(Tuple.of(unitId))
|
|
@@ -216,11 +227,10 @@ public class MapLogRepository implements SensLogRepository {
|
|
|
@Override
|
|
@Override
|
|
|
public Future<List<ObsTelemetry>> findObservationsByCampaignId(long campaignId) {
|
|
public Future<List<ObsTelemetry>> findObservationsByCampaignId(long campaignId) {
|
|
|
return client.preparedQuery(
|
|
return client.preparedQuery(
|
|
|
- "SELECT obs.obs_id, obs.time_stamp, obs.unit_id, obs.observed_values, " +
|
|
|
|
|
|
|
+ "SELECT obs.obs_id, obs.time_stamp, obs.unit_id, obs.observed_values, obs.speed " +
|
|
|
"ST_X (ST_Transform (obs.the_geom, 4326)) AS long, " +
|
|
"ST_X (ST_Transform (obs.the_geom, 4326)) AS long, " +
|
|
|
"ST_Y (ST_Transform (obs.the_geom, 4326)) AS lat " +
|
|
"ST_Y (ST_Transform (obs.the_geom, 4326)) AS lat " +
|
|
|
"ST_Z (ST_Transform (obs.the_geom, 4326)) AS alt " +
|
|
"ST_Z (ST_Transform (obs.the_geom, 4326)) AS alt " +
|
|
|
- "ST_M (obs.the_geom) AS speed " +
|
|
|
|
|
"FROM maplog.obs_telemetry AS obs " +
|
|
"FROM maplog.obs_telemetry AS obs " +
|
|
|
"JOIN maplog.unit AS u ON u.unit_id = obs.unit_id " +
|
|
"JOIN maplog.unit AS u ON u.unit_id = obs.unit_id " +
|
|
|
"JOIN maplog.unit_to_campaign AS u_cam ON u_cam.unit_id = u.unit_id " +
|
|
"JOIN maplog.unit_to_campaign AS u_cam ON u_cam.unit_id = u.unit_id " +
|
|
@@ -247,9 +257,8 @@ public class MapLogRepository implements SensLogRepository {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
|
- public Future<List<ObsTelemetry>> findObservationsByCampaignIdAndUnitId(long campaignId, long unitId,
|
|
|
|
|
- OffsetDateTime from, OffsetDateTime to, ZoneId zone,
|
|
|
|
|
- int offset, int limit
|
|
|
|
|
|
|
+ public Future<List<ObsTelemetry>> findObservationsByCampaignIdAndUnitId(
|
|
|
|
|
+ long campaignId, long unitId, OffsetDateTime from, OffsetDateTime to, ZoneId zone, int offset, int limit
|
|
|
) {
|
|
) {
|
|
|
String whereTimestampClause;
|
|
String whereTimestampClause;
|
|
|
Tuple tupleParams;
|
|
Tuple tupleParams;
|
|
@@ -267,12 +276,11 @@ public class MapLogRepository implements SensLogRepository {
|
|
|
tupleParams = Tuple.of(campaignId, unitId, offset, limit, zone.getId());
|
|
tupleParams = Tuple.of(campaignId, unitId, offset, limit, zone.getId());
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- String sql = "SELECT tel.obs_id, tel.unit_id, tel.observed_values::json, " +
|
|
|
|
|
|
|
+ String sql = "SELECT tel.obs_id, tel.unit_id, tel.observed_values::json, tel.speed " +
|
|
|
"tel.time_stamp, $5 AS zone_id, " + // ::timestamp with time zone at time zone $5 AS time_stamp
|
|
"tel.time_stamp, $5 AS zone_id, " + // ::timestamp with time zone at time zone $5 AS time_stamp
|
|
|
"ST_X (ST_Transform (tel.the_geom, 4326)) AS long, " +
|
|
"ST_X (ST_Transform (tel.the_geom, 4326)) AS long, " +
|
|
|
"ST_Y (ST_Transform (tel.the_geom, 4326)) AS lat, " +
|
|
"ST_Y (ST_Transform (tel.the_geom, 4326)) AS lat, " +
|
|
|
"ST_Z (ST_Transform (tel.the_geom, 4326)) AS alt " +
|
|
"ST_Z (ST_Transform (tel.the_geom, 4326)) AS alt " +
|
|
|
- "ST_M (tel.the_geom) AS speed " +
|
|
|
|
|
"FROM maplog.obs_telemetry AS tel " +
|
|
"FROM maplog.obs_telemetry AS tel " +
|
|
|
"JOIN maplog.unit u on u.unit_id = tel.unit_id " +
|
|
"JOIN maplog.unit u on u.unit_id = tel.unit_id " +
|
|
|
"JOIN maplog.unit_to_campaign utc on tel.unit_id = utc.unit_id " +
|
|
"JOIN maplog.unit_to_campaign utc on tel.unit_id = utc.unit_id " +
|
|
@@ -297,7 +305,9 @@ public class MapLogRepository implements SensLogRepository {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
|
- public Future<PagingRetrieve<List<ObsTelemetry>>> findObservationsByCampaignIdAndUnitIdWithPaging(long campaignId, long unitId, OffsetDateTime from, OffsetDateTime to, ZoneId zone, int offset, int limit) {
|
|
|
|
|
|
|
+ public Future<PagingRetrieve<List<ObsTelemetry>>> findObservationsByCampaignIdAndUnitIdWithPaging(
|
|
|
|
|
+ long campaignId, long unitId, OffsetDateTime from, OffsetDateTime to, ZoneId zone, int offset, int limit
|
|
|
|
|
+ ) {
|
|
|
return findObservationsByCampaignIdAndUnitId(campaignId, unitId, from, to, zone, offset, limit+1)
|
|
return findObservationsByCampaignIdAndUnitId(campaignId, unitId, from, to, zone, offset, limit+1)
|
|
|
.map(data -> {
|
|
.map(data -> {
|
|
|
boolean hasNext = data.size() > limit;
|
|
boolean hasNext = data.size() > limit;
|
|
@@ -307,4 +317,108 @@ public class MapLogRepository implements SensLogRepository {
|
|
|
return new PagingRetrieve<>(hasNext, data.size(), data);
|
|
return new PagingRetrieve<>(hasNext, data.size(), data);
|
|
|
});
|
|
});
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public Future<List<UnitLocation>> findLocationsByCampaignIdAndUnitId(
|
|
|
|
|
+ long campaignId, long unitId, OffsetDateTime from, OffsetDateTime to, ZoneId zone, int offset, int limit
|
|
|
|
|
+ ) {
|
|
|
|
|
+ String whereTimestampClause;
|
|
|
|
|
+ Tuple tupleParams;
|
|
|
|
|
+ if (from != null && to != null) {
|
|
|
|
|
+ whereTimestampClause = "tel.time_stamp <= (CASE WHEN $6 > utc.to_time THEN utc.to_time ELSE $6 END) AND tel.time_stamp >= (CASE WHEN $7 > utc.from_time THEN utc.from_time ELSE $7 END)";
|
|
|
|
|
+ tupleParams = Tuple.of(campaignId, unitId, offset, limit, zone.getId(), from, to);
|
|
|
|
|
+ } else if (from != null) {
|
|
|
|
|
+ whereTimestampClause = "tel.time_stamp >= (CASE WHEN $6 > utc.from_time THEN utc.from_time ELSE $6 END) AND tel.time_stamp <= utc.to_time";
|
|
|
|
|
+ tupleParams = Tuple.of(campaignId, unitId, offset, limit, zone.getId(), from);
|
|
|
|
|
+ } else if (to != null) {
|
|
|
|
|
+ whereTimestampClause = "tel.time_stamp >= utc.from_time AND tel.time_stamp <= (CASE WHEN $6 > utc.to_time THEN utc.to_time ELSE $6 END)";
|
|
|
|
|
+ tupleParams = Tuple.of(campaignId, unitId, offset, limit, zone.getId(), to);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ whereTimestampClause = "tel.time_stamp >= utc.from_time AND tel.time_stamp <= utc.to_time";
|
|
|
|
|
+ tupleParams = Tuple.of(campaignId, unitId, offset, limit, zone.getId());
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ String sql = "SELECT tel.unit_id, tel.time_stamp, $5 AS zone_id, " + // ::timestamp with time zone at time zone $5 AS time_stamp
|
|
|
|
|
+ "ST_X (ST_Transform (tel.the_geom, 4326)) AS long, " +
|
|
|
|
|
+ "ST_Y (ST_Transform (tel.the_geom, 4326)) AS lat, " +
|
|
|
|
|
+ "ST_Z (ST_Transform (tel.the_geom, 4326)) AS alt " +
|
|
|
|
|
+ "FROM maplog.obs_telemetry AS tel " +
|
|
|
|
|
+ "JOIN maplog.unit u on u.unit_id = tel.unit_id " +
|
|
|
|
|
+ "JOIN maplog.unit_to_campaign utc on tel.unit_id = utc.unit_id " +
|
|
|
|
|
+ "WHERE utc.camp_id = $1 AND utc.unit_id = $2 AND " + whereTimestampClause + " " +
|
|
|
|
|
+ "ORDER BY tel.time_stamp OFFSET $3 LIMIT $4;";
|
|
|
|
|
+
|
|
|
|
|
+ return client.preparedQuery(sql)
|
|
|
|
|
+ .execute(tupleParams)
|
|
|
|
|
+ .map(rs -> StreamSupport.stream(rs.spliterator(), false)
|
|
|
|
|
+ .map(r -> UnitLocation.of(
|
|
|
|
|
+ r.getLong("unit_id"),
|
|
|
|
|
+ r.getOffsetDateTime("time_stamp"),
|
|
|
|
|
+ new float[] {
|
|
|
|
|
+ r.getFloat("long"),
|
|
|
|
|
+ r.getFloat("lat"),
|
|
|
|
|
+ r.getFloat("alt")
|
|
|
|
|
+ })
|
|
|
|
|
+ )
|
|
|
|
|
+ .collect(Collectors.toList()))
|
|
|
|
|
+ .onFailure(logger::catching);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public Future<PagingRetrieve<List<UnitLocation>>> findLocationsByCampaignIdAndUnitIdWithPaging(
|
|
|
|
|
+ long campaignId, long unitId, OffsetDateTime from, OffsetDateTime to, ZoneId zone, int offset, int limit
|
|
|
|
|
+ ) {
|
|
|
|
|
+ return findLocationsByCampaignIdAndUnitId(campaignId, unitId, from, to, zone, offset, limit+1)
|
|
|
|
|
+ .map(data -> {
|
|
|
|
|
+ boolean hasNext = data.size() > limit;
|
|
|
|
|
+ if (hasNext) {
|
|
|
|
|
+ data.remove(data.size() - 1);
|
|
|
|
|
+ }
|
|
|
|
|
+ return new PagingRetrieve<>(hasNext, data.size(), data);
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public Future<List<UnitLocation>> findUnitsLocationsByCampaignId(
|
|
|
|
|
+ long campaignId, int limitPerUnit, OffsetDateTime from, OffsetDateTime to, ZoneId zone, SortType sort
|
|
|
|
|
+ ) {
|
|
|
|
|
+ String whereTimestampClause;
|
|
|
|
|
+ Tuple tupleParams;
|
|
|
|
|
+ if (from != null && to != null) {
|
|
|
|
|
+ whereTimestampClause = "WHERE utc.camp_id = $1 AND time_stamp >= $4 AND time_stamp < $5";
|
|
|
|
|
+ tupleParams = Tuple.of(campaignId, limitPerUnit, zone.getId(), from, to);
|
|
|
|
|
+ } else if (from != null) {
|
|
|
|
|
+ whereTimestampClause = "WHERE utc.camp_id = $1 AND time_stamp >= $4";
|
|
|
|
|
+ tupleParams = Tuple.of(campaignId, limitPerUnit, zone.getId(), from);
|
|
|
|
|
+ } else if (to != null) {
|
|
|
|
|
+ whereTimestampClause = "WHERE utc.camp_id = $1 AND time_stamp < $4";
|
|
|
|
|
+ tupleParams = Tuple.of(campaignId, limitPerUnit, zone.getId(), to);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ whereTimestampClause = "WHERE utc.camp_id = $1";
|
|
|
|
|
+ tupleParams = Tuple.of(campaignId, limitPerUnit, zone.getId());
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ String sql = "SELECT unit_id, time_stamp, $3 AS zone_id, " + // ::timestamp with time zone at time zone $5 AS time_stamp
|
|
|
|
|
+ "ST_X (ST_Transform (the_geom, 4326)) AS long, " +
|
|
|
|
|
+ "ST_Y (ST_Transform (the_geom, 4326)) AS lat, " +
|
|
|
|
|
+ "ST_Z (ST_Transform (the_geom, 4326)) AS alt " +
|
|
|
|
|
+ "FROM (SELECT *, row_number() OVER (PARTITION BY unit_id ORDER BY time_stamp "+ sort.name() +" ) AS rn " +
|
|
|
|
|
+ "FROM (SELECT obs.* FROM maplog.obs_telemetry obs JOIN maplog.unit_to_campaign utc ON obs.unit_id = utc.unit_id "+whereTimestampClause+") AS data) AS g " +
|
|
|
|
|
+ "WHERE rn <= $2";
|
|
|
|
|
+
|
|
|
|
|
+ return client.preparedQuery(sql)
|
|
|
|
|
+ .execute(tupleParams)
|
|
|
|
|
+ .map(rs -> StreamSupport.stream(rs.spliterator(), false)
|
|
|
|
|
+ .map(r -> UnitLocation.of(
|
|
|
|
|
+ r.getLong("unit_id"),
|
|
|
|
|
+ r.getOffsetDateTime("time_stamp"),
|
|
|
|
|
+ new float[] {
|
|
|
|
|
+ r.getFloat("long"),
|
|
|
|
|
+ r.getFloat("lat"),
|
|
|
|
|
+ r.getFloat("alt")
|
|
|
|
|
+ }
|
|
|
|
|
+ ))
|
|
|
|
|
+ .collect(toList()))
|
|
|
|
|
+ .onFailure(logger::catching);
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|