|
|
@@ -61,7 +61,7 @@ public class MapLogRepository implements SensLogRepository {
|
|
|
@Override
|
|
|
public Future<Integer> saveTelemetry(UnitTelemetry data) {
|
|
|
return client.preparedQuery("INSERT INTO maplog.obs_telemetry(unit_id, time_stamp, the_geom, speed, observed_values) " +
|
|
|
- "VALUES ($1, $2, ST_SetSRID(ST_MakePoint($3, $4, $5, $6), 4326), $7, $8::json) RETURNING (obs_id)")
|
|
|
+ "VALUES ($1, $2, ST_SetSRID(ST_MakePoint($3, $4, $5, $6), 4326), $7, $8::jsonb) RETURNING (obs_id)")
|
|
|
.execute(Tuple.of(
|
|
|
data.getUnitId(),
|
|
|
data.getTimestamp(),
|
|
|
@@ -70,7 +70,7 @@ public class MapLogRepository implements SensLogRepository {
|
|
|
data.getLocation().getAltitude(),
|
|
|
data.getLocation().getAngle(),
|
|
|
data.getSpeed(),
|
|
|
- data.getObservedValues().toString()
|
|
|
+ data.getObservedValues()
|
|
|
))
|
|
|
.onComplete(res -> {
|
|
|
if (res.succeeded()) {
|
|
|
@@ -93,10 +93,10 @@ public class MapLogRepository implements SensLogRepository {
|
|
|
d.getLocation().getAltitude(),
|
|
|
d.getLocation().getAngle(),
|
|
|
d.getSpeed(),
|
|
|
- d.getObservedValues().toString()
|
|
|
+ d.getObservedValues()
|
|
|
)).collect(toList());
|
|
|
return client.preparedQuery("INSERT INTO maplog.obs_telemetry(unit_id, time_stamp, the_geom, speed, observed_values) " +
|
|
|
- "VALUES ($1, $2, ST_SetSRID(ST_MakePoint($3, $4, $5, $6), 4326), $7, $8::json)")
|
|
|
+ "VALUES ($1, $2, ST_SetSRID(ST_MakePoint($3, $4, $5, $6), 4326), $7, $8::jsonb)")
|
|
|
.executeBatch(tuples)
|
|
|
.map(SqlResult::rowCount);
|
|
|
}
|
|
|
@@ -466,7 +466,7 @@ public class MapLogRepository implements SensLogRepository {
|
|
|
r.getFloat("alt"),
|
|
|
r.getFloat("angle")),
|
|
|
r.getFloat("speed"),
|
|
|
- new JsonObject(r.getString("observed_values"))
|
|
|
+ r.getJsonObject("observed_values")
|
|
|
))
|
|
|
.collect(Collectors.toList()))
|
|
|
.onFailure(logger::catching);
|
|
|
@@ -513,7 +513,6 @@ public class MapLogRepository implements SensLogRepository {
|
|
|
"ST_Z (ST_Transform (tel.the_geom, 4326)) AS alt, " +
|
|
|
"ST_M (tel.the_geom) AS angle " +
|
|
|
"FROM maplog.obs_telemetry AS tel " +
|
|
|
- "JOIN maplog.unit u on u.unit_id = tel.unit_id " +
|
|
|
"JOIN maplog.unit_to_campaign utc on tel.unit_id = utc.unit_id " +
|
|
|
"WHERE utc.camp_id = $1 AND utc.unit_id = $2 AND " + whereTimestampClause + " " +
|
|
|
"ORDER BY tel.time_stamp OFFSET $3 LIMIT $4;";
|
|
|
@@ -531,7 +530,7 @@ public class MapLogRepository implements SensLogRepository {
|
|
|
r.getFloat("alt"),
|
|
|
r.getFloat("angle")),
|
|
|
r.getFloat("speed"),
|
|
|
- new JsonObject(r.getString("observed_values"))))
|
|
|
+ r.getJsonObject("observed_values")))
|
|
|
.collect(Collectors.toList()))
|
|
|
.onFailure(logger::catching);
|
|
|
}
|
|
|
@@ -551,6 +550,69 @@ public class MapLogRepository implements SensLogRepository {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
+ public Future<List<SensorTelemetry>> findObservationsByCampaignIdAndUnitIdAndSensorId(
|
|
|
+ long campaignId, long unitId, long sensorId, OffsetDateTime from, OffsetDateTime to, ZoneId zone, int offset, int limit
|
|
|
+ ) {
|
|
|
+ String whereTimestampClause;
|
|
|
+ Tuple tupleParams;
|
|
|
+ if (from != null && to != null) {
|
|
|
+ whereTimestampClause = "tel.time_stamp <= (CASE WHEN $7 > utc.to_time THEN utc.to_time ELSE $7 END) AND tel.time_stamp >= (CASE WHEN $8 > utc.from_time THEN utc.from_time ELSE $8 END)";
|
|
|
+ tupleParams = Tuple.of(campaignId, unitId, sensorId, offset, limit, zone.getId(), from, to);
|
|
|
+ } else if (from != null) {
|
|
|
+ whereTimestampClause = "tel.time_stamp >= (CASE WHEN $7 > utc.from_time THEN utc.from_time ELSE $7 END) AND tel.time_stamp <= utc.to_time";
|
|
|
+ tupleParams = Tuple.of(campaignId, unitId, sensorId, offset, limit, zone.getId(), from);
|
|
|
+ } else if (to != null) {
|
|
|
+ whereTimestampClause = "tel.time_stamp >= utc.from_time AND tel.time_stamp <= (CASE WHEN $7 > utc.to_time THEN utc.to_time ELSE $7 END)";
|
|
|
+ tupleParams = Tuple.of(campaignId, unitId, sensorId, offset, limit, zone.getId(), to);
|
|
|
+ } else {
|
|
|
+ whereTimestampClause = "tel.time_stamp >= utc.from_time AND tel.time_stamp <= utc.to_time";
|
|
|
+ tupleParams = Tuple.of(campaignId, unitId, sensorId, offset, limit, zone.getId());
|
|
|
+ }
|
|
|
+
|
|
|
+ String sql = "SELECT tel.obs_id, (tel.observed_values::jsonb ->> $3::bigint::text::varchar)::integer AS value, tel.speed, " +
|
|
|
+ "tel.time_stamp, $6 AS zone_id, " + // ::timestamp with time zone at time zone $6 AS time_stamp
|
|
|
+ "ST_X (ST_Transform (tel.the_geom, 4326)) AS long, " +
|
|
|
+ "ST_Y (ST_Transform (tel.the_geom, 4326)) AS lat, " +
|
|
|
+ "ST_Z (ST_Transform (tel.the_geom, 4326)) AS alt, " +
|
|
|
+ "ST_M (tel.the_geom) AS angle " +
|
|
|
+ "FROM maplog.obs_telemetry AS tel " +
|
|
|
+ "JOIN maplog.unit_to_sensor uts on tel.unit_id = uts.unit_id " +
|
|
|
+ "JOIN maplog.unit_to_campaign utc on tel.unit_id = utc.unit_id " +
|
|
|
+ "WHERE utc.camp_id = $1 AND utc.unit_id = $2 AND uts.sensor_id = $3 AND observed_values::jsonb -> $3::bigint::text::varchar IS NOT NULL " +
|
|
|
+ "AND " + whereTimestampClause + " ORDER BY tel.time_stamp OFFSET $4 LIMIT $5";
|
|
|
+
|
|
|
+ return client.preparedQuery(sql)
|
|
|
+ .execute(tupleParams)
|
|
|
+ .map(rs -> StreamSupport.stream(rs.spliterator(), false)
|
|
|
+ .map(r -> SensorTelemetry.of(
|
|
|
+ r.getLong("obs_id"),
|
|
|
+ r.getLong("value"),
|
|
|
+ r.getOffsetDateTime("time_stamp"),
|
|
|
+ Location.of(
|
|
|
+ r.getFloat("long"),
|
|
|
+ r.getFloat("lat"),
|
|
|
+ r.getFloat("alt"),
|
|
|
+ r.getFloat("angle")),
|
|
|
+ r.getFloat("speed")))
|
|
|
+ .collect(Collectors.toList()))
|
|
|
+ .onFailure(logger::catching);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Future<PagingRetrieve<List<SensorTelemetry>>> findObservationsByCampaignIdAndUnitIdAndSensorIdWithPaging(
|
|
|
+ long campaignId, long unitId, long sensorId, OffsetDateTime from, OffsetDateTime to, ZoneId zone, int offset, int limit
|
|
|
+ ) {
|
|
|
+ return findObservationsByCampaignIdAndUnitIdAndSensorId(campaignId, unitId, sensorId, from, to, zone, offset, limit+1)
|
|
|
+ .map(data -> {
|
|
|
+ boolean hasNext = data.size() > limit;
|
|
|
+ if (hasNext) {
|
|
|
+ data.remove(data.size() - 1);
|
|
|
+ }
|
|
|
+ return new PagingRetrieve<>(hasNext, data.size(), data);
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
public Future<List<UnitLocation>> findLocationsByCampaignIdAndUnitId(
|
|
|
long campaignId, long unitId, OffsetDateTime from, OffsetDateTime to, ZoneId zone, int offset, int limit
|
|
|
) {
|