|
|
@@ -0,0 +1,148 @@
|
|
|
+package cz.senslog.analyzer.persistence.repository;
|
|
|
+
|
|
|
+import cz.senslog.analyzer.domain.DoubleStatistics;
|
|
|
+import cz.senslog.analyzer.domain.Observation;
|
|
|
+import cz.senslog.analyzer.domain.Sensor;
|
|
|
+import cz.senslog.analyzer.domain.Timestamp;
|
|
|
+import cz.senslog.analyzer.persistence.AttributeCode;
|
|
|
+import cz.senslog.analyzer.persistence.Connection;
|
|
|
+import cz.senslog.common.util.TimeRange;
|
|
|
+import cz.senslog.common.util.Tuple;
|
|
|
+import org.jdbi.v3.core.Jdbi;
|
|
|
+import org.jdbi.v3.core.statement.PreparedBatch;
|
|
|
+
|
|
|
+import javax.inject.Inject;
|
|
|
+import java.time.Instant;
|
|
|
+import java.time.LocalDateTime;
|
|
|
+import java.util.*;
|
|
|
+import java.util.stream.Stream;
|
|
|
+
|
|
|
+import static cz.senslog.analyzer.persistence.AttributeCode.*;
|
|
|
+import static cz.senslog.analyzer.persistence.SensorIdConverter.decodeId;
|
|
|
+import static cz.senslog.analyzer.persistence.SensorIdConverter.encodeId;
|
|
|
+
|
|
|
+import static java.util.stream.Collectors.*;
|
|
|
+
|
|
|
+public class StatisticsRepository {
|
|
|
+
|
|
|
+ private final Jdbi jdbi;
|
|
|
+
|
|
|
+ @Inject
|
|
|
+ public StatisticsRepository(Connection<Jdbi> connection) {
|
|
|
+ this.jdbi = connection.get();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void save(DoubleStatistics statistics) {
|
|
|
+ save(Collections.singletonList(statistics));
|
|
|
+ }
|
|
|
+
|
|
|
+ public void save(List<DoubleStatistics> statistics) {
|
|
|
+ jdbi.withHandle(h -> h.inTransaction(t -> {
|
|
|
+
|
|
|
+ PreparedBatch batch = t.prepareBatch("INSERT INTO statistics.records(sensor_attribute, value, interval, timestamp) " +
|
|
|
+ "VALUES(:id, :value, :interval, :timestamp)");
|
|
|
+
|
|
|
+ statistics.forEach(st -> batch
|
|
|
+ .bind("id", encodeId(st.getSensor(), MIN))
|
|
|
+ .bind("value", st.getMin())
|
|
|
+ .bind("interval", st.getInterval())
|
|
|
+ .bind("timestamp", st.getTimestamp().get())
|
|
|
+ .add()
|
|
|
+ .bind("id", encodeId(st.getSensor(), MAX))
|
|
|
+ .bind("value", st.getMax())
|
|
|
+ .bind("interval", st.getInterval())
|
|
|
+ .bind("timestamp", st.getTimestamp().get())
|
|
|
+ .add()
|
|
|
+ .bind("id", encodeId(st.getSensor(), SUM))
|
|
|
+ .bind("value", st.getSum())
|
|
|
+ .bind("interval", st.getInterval())
|
|
|
+ .bind("timestamp", st.getTimestamp().get())
|
|
|
+ .add()
|
|
|
+ .bind("id", encodeId(st.getSensor(), COUNT))
|
|
|
+ .bind("value", Long.valueOf(st.getCount()).doubleValue())
|
|
|
+ .bind("interval", st.getInterval())
|
|
|
+ .bind("timestamp", st.getTimestamp().get())
|
|
|
+ .add()
|
|
|
+ );
|
|
|
+
|
|
|
+ return batch.execute();
|
|
|
+ }));
|
|
|
+ }
|
|
|
+
|
|
|
+ public List<DoubleStatistics> getByTimeRange(Sensor sensor, TimeRange<Instant> timeRange) {
|
|
|
+ Map<Tuple<Timestamp, Integer>, List<Tuple<AttributeCode, Double>>> result =
|
|
|
+ jdbi.withHandle(h -> h.createQuery(
|
|
|
+ "SELECT r.sensor_attribute as id, r.value as value, r.interval as interval, r.timestamp as timestamp " +
|
|
|
+ "FROM statistics.records AS r " +
|
|
|
+ "WHERE (r.sensor_attribute = :id_min " +
|
|
|
+ "OR r.sensor_attribute = :id_max " +
|
|
|
+ "OR r.sensor_attribute = :id_sum " +
|
|
|
+ "OR r.sensor_attribute = :id_count) " +
|
|
|
+ "AND timestamp >= :time_from " +
|
|
|
+ "AND (r.timestamp + r.interval * interval '1 second') < :time_to"
|
|
|
+ )
|
|
|
+ .bind("id_min", encodeId(sensor, MIN))
|
|
|
+ .bind("id_max", encodeId(sensor, MAX))
|
|
|
+ .bind("id_sum", encodeId(sensor, SUM))
|
|
|
+ .bind("id_count", encodeId(sensor, COUNT))
|
|
|
+ .bind("time_from", timeRange.getFrom())
|
|
|
+ .bind("time_to", timeRange.getTo())
|
|
|
+ .map((rs, ctx) -> {
|
|
|
+ AttributeCode code = decodeId(rs.getLong("id")).getItem2();
|
|
|
+ Double value = rs.getDouble("value");
|
|
|
+ Tuple<AttributeCode, Double> val = Tuple.of(code, value);
|
|
|
+
|
|
|
+ Timestamp timestamp = Timestamp.parse(rs.getString("timestamp"));
|
|
|
+ Integer interval = rs.getInt("interval");
|
|
|
+ Tuple<Timestamp, Integer> key = Tuple.of(timestamp, interval);
|
|
|
+
|
|
|
+ return new AbstractMap.SimpleEntry<>(key, val);
|
|
|
+ }).collect(groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toList()))));
|
|
|
+
|
|
|
+ // TODO refactor
|
|
|
+ return result.entrySet().stream().filter(e -> e.getValue().size() == 4).flatMap(recordEntry -> {
|
|
|
+ Tuple<Timestamp, Integer> key = recordEntry.getKey();
|
|
|
+ Timestamp timestamp = key.getItem1();
|
|
|
+ Integer interval = key.getItem2();
|
|
|
+
|
|
|
+ double min=0, max=0, sum=0; long count=0;
|
|
|
+ boolean unknown = false;
|
|
|
+ for (Tuple<AttributeCode, Double> valEntry : recordEntry.getValue()) {
|
|
|
+ switch (valEntry.getItem1()) {
|
|
|
+ case MAX: max = valEntry.getItem2(); break;
|
|
|
+ case MIN: min = valEntry.getItem2(); break;
|
|
|
+ case SUM: sum = valEntry.getItem2(); break;
|
|
|
+ case COUNT: count = valEntry.getItem2().longValue(); break;
|
|
|
+ default: unknown = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return unknown ? Stream.empty() : Stream.of(new DoubleStatistics(sensor, count, min, max, sum, timestamp, interval));
|
|
|
+ }).collect(toList());
|
|
|
+ }
|
|
|
+
|
|
|
+ public List<Observation> getRawByTimeRange(Sensor sensor, TimeRange<Instant> timeRange) {
|
|
|
+ return jdbi.withHandle(h -> h.createQuery("SELECT r.sensor_attribute as id, r.value as value, r.timestamp as timestamp " +
|
|
|
+ "FROM statistics.records AS r " +
|
|
|
+ "WHERE (r.sensor_attribute = :id_min " +
|
|
|
+ "OR r.sensor_attribute = :id_max " +
|
|
|
+ "OR r.sensor_attribute = :id_sum " +
|
|
|
+ "OR r.sensor_attribute = :id_count) " +
|
|
|
+ "AND timestamp >= :time_from " +
|
|
|
+ "AND (r.timestamp + r.interval * interval '1 second') < :time_to"
|
|
|
+ )
|
|
|
+ .bind("id_min", encodeId(sensor, MIN))
|
|
|
+ .bind("id_max", encodeId(sensor, MAX))
|
|
|
+ .bind("id_sum", encodeId(sensor, SUM))
|
|
|
+ .bind("id_count", encodeId(sensor, COUNT))
|
|
|
+ .bind("time_from", timeRange.getFrom())
|
|
|
+ .bind("time_to", timeRange.getTo())
|
|
|
+ .map((rs, ctx) -> new Observation(
|
|
|
+ new Sensor(rs.getLong("id")),
|
|
|
+ rs.getDouble("value"),
|
|
|
+ Timestamp.parse(rs.getString("timestamp"))
|
|
|
+ )
|
|
|
+ ).list()
|
|
|
+ );
|
|
|
+ }
|
|
|
+}
|