|
|
@@ -1,30 +1,26 @@
|
|
|
package cz.senslog.analyzer.persistence.repository;
|
|
|
|
|
|
-import cz.senslog.analyzer.domain.DoubleStatistics;
|
|
|
-import cz.senslog.analyzer.domain.Observation;
|
|
|
-import cz.senslog.analyzer.domain.Sensor;
|
|
|
-import cz.senslog.analyzer.domain.Timestamp;
|
|
|
-import cz.senslog.analyzer.persistence.AttributeCode;
|
|
|
+import cz.senslog.analyzer.domain.*;
|
|
|
import cz.senslog.analyzer.persistence.Connection;
|
|
|
import cz.senslog.common.util.TimeRange;
|
|
|
import cz.senslog.common.util.Tuple;
|
|
|
+import org.apache.logging.log4j.LogManager;
|
|
|
+import org.apache.logging.log4j.Logger;
|
|
|
import org.jdbi.v3.core.Jdbi;
|
|
|
import org.jdbi.v3.core.statement.PreparedBatch;
|
|
|
|
|
|
import javax.inject.Inject;
|
|
|
import java.time.Instant;
|
|
|
-import java.time.LocalDateTime;
|
|
|
import java.util.*;
|
|
|
-import java.util.stream.Stream;
|
|
|
-
|
|
|
-import static cz.senslog.analyzer.persistence.AttributeCode.*;
|
|
|
-import static cz.senslog.analyzer.persistence.SensorIdConverter.decodeId;
|
|
|
-import static cz.senslog.analyzer.persistence.SensorIdConverter.encodeId;
|
|
|
|
|
|
+import static cz.senslog.analyzer.domain.AggregationType.DOUBLE;
|
|
|
+import static java.util.Collections.singletonList;
|
|
|
import static java.util.stream.Collectors.*;
|
|
|
|
|
|
public class StatisticsRepository {
|
|
|
|
|
|
+ private static final Logger logger = LogManager.getLogger(StatisticsRepository.class);
|
|
|
+
|
|
|
private final Jdbi jdbi;
|
|
|
|
|
|
@Inject
|
|
|
@@ -33,116 +29,148 @@ public class StatisticsRepository {
|
|
|
}
|
|
|
|
|
|
public void save(DoubleStatistics statistics) {
|
|
|
- save(Collections.singletonList(statistics));
|
|
|
+ try {
|
|
|
+ saveStatisticsBatch(singletonList(statistics));
|
|
|
+ } catch (Exception e) {
|
|
|
+ logger.error("Can not persist this data: {}.", statistics);
|
|
|
+ logger.error(e.getMessage());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public void save(List<DoubleStatistics> statistics) {
|
|
|
- jdbi.withHandle(h -> h.inTransaction(t -> {
|
|
|
-
|
|
|
- PreparedBatch batch = t.prepareBatch("INSERT INTO statistics.records(sensor_attribute, value, interval, timestamp) " +
|
|
|
- "VALUES(:id, :value, :interval, :timestamp)");
|
|
|
-
|
|
|
- statistics.forEach(st -> batch
|
|
|
- .bind("id", encodeId(st.getSensor(), MIN))
|
|
|
- .bind("value", st.getMin())
|
|
|
- .bind("interval", st.getInterval())
|
|
|
- .bind("timestamp", st.getTimestamp().get())
|
|
|
- .add()
|
|
|
- .bind("id", encodeId(st.getSensor(), MAX))
|
|
|
- .bind("value", st.getMax())
|
|
|
- .bind("interval", st.getInterval())
|
|
|
- .bind("timestamp", st.getTimestamp().get())
|
|
|
- .add()
|
|
|
- .bind("id", encodeId(st.getSensor(), SUM))
|
|
|
- .bind("value", st.getSum())
|
|
|
- .bind("interval", st.getInterval())
|
|
|
- .bind("timestamp", st.getTimestamp().get())
|
|
|
- .add()
|
|
|
- .bind("id", encodeId(st.getSensor(), COUNT))
|
|
|
- .bind("value", Long.valueOf(st.getCount()).doubleValue())
|
|
|
- .bind("interval", st.getInterval())
|
|
|
- .bind("timestamp", st.getTimestamp().get())
|
|
|
- .add()
|
|
|
+ try {
|
|
|
+ saveStatisticsBatch(statistics);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logger.warn(e.getMessage());
|
|
|
+ statistics.forEach(this::save);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void saveStatisticsBatch(List<DoubleStatistics> statistics) throws Exception {
|
|
|
+ jdbi.<int[], Exception>withHandle(h -> h.inTransaction(t -> {
|
|
|
+ PreparedBatch batch = t.prepareBatch(
|
|
|
+ "INSERT INTO statistics.records(group_id, value_attribute, record_value, time_interval, time_stamp) " +
|
|
|
+ "VALUES(:group_id, :value_attribute, :recorded_value, :time_interval, :time_stamp)"
|
|
|
);
|
|
|
|
|
|
- return batch.execute();
|
|
|
- }));
|
|
|
+ statistics.stream().filter(st -> st.getSource().isPersistence()).forEach(st -> {
|
|
|
+ long groupId = st.getSource().getId();
|
|
|
+ long interval = st.getSource().getInterval();
|
|
|
+ batch
|
|
|
+ .bind("group_id", groupId)
|
|
|
+ .bind("value_attribute", AttributeValue.MIN)
|
|
|
+ .bind("recorded_value", st.getMin())
|
|
|
+ .bind("time_interval", interval)
|
|
|
+ .bind("time_stamp", st.getTimestamp().get())
|
|
|
+ .add()
|
|
|
+ .bind("group_id", groupId)
|
|
|
+ .bind("value_attribute", AttributeValue.MAX)
|
|
|
+ .bind("recorded_value", st.getMax())
|
|
|
+ .bind("time_interval", interval)
|
|
|
+ .bind("time_stamp", st.getTimestamp().get())
|
|
|
+ .add()
|
|
|
+ .bind("group_id", groupId)
|
|
|
+ .bind("value_attribute", AttributeValue.SUM)
|
|
|
+ .bind("recorded_value", st.getSum())
|
|
|
+ .bind("time_interval", interval)
|
|
|
+ .bind("time_stamp", st.getTimestamp().get())
|
|
|
+ .add()
|
|
|
+ .bind("group_id", groupId)
|
|
|
+ .bind("value_attribute", AttributeValue.COUNT)
|
|
|
+ .bind("recorded_value", Long.valueOf(st.getCount()).doubleValue())
|
|
|
+ .bind("time_interval", interval)
|
|
|
+ .bind("time_stamp", st.getTimestamp().get())
|
|
|
+ .add();
|
|
|
+ });
|
|
|
+ return batch.execute();
|
|
|
+ }));
|
|
|
}
|
|
|
|
|
|
- public List<DoubleStatistics> getByTimeRange(Sensor sensor, TimeRange<Instant> timeRange) {
|
|
|
- Map<Tuple<Timestamp, Integer>, List<Tuple<AttributeCode, Double>>> result =
|
|
|
- jdbi.withHandle(h -> h.createQuery(
|
|
|
- "SELECT r.sensor_attribute as id, r.value as value, r.interval as interval, r.timestamp as timestamp " +
|
|
|
- "FROM statistics.records AS r " +
|
|
|
- "WHERE (r.sensor_attribute = :id_min " +
|
|
|
- "OR r.sensor_attribute = :id_max " +
|
|
|
- "OR r.sensor_attribute = :id_sum " +
|
|
|
- "OR r.sensor_attribute = :id_count) " +
|
|
|
- "AND timestamp >= :time_from " +
|
|
|
- "AND (r.timestamp + r.interval * interval '1 second') < :time_to"
|
|
|
+ public List<DoubleStatistics> getByTimeRange(long groupId, TimeRange<Instant> timeRange) {
|
|
|
+
|
|
|
+ class RawRecord {
|
|
|
+ long recordId, groupId, sensorId, unitId;
|
|
|
+ double value; int interval;
|
|
|
+ AttributeValue attribute;
|
|
|
+ Timestamp timestamp;
|
|
|
+ AggregationType aggregationType;
|
|
|
+ }
|
|
|
+
|
|
|
+ List<RawRecord> rawRecords = jdbi.withHandle(h -> h.createQuery(
|
|
|
+ "SELECT r.id AS record_id," +
|
|
|
+ "g.id AS group_id," +
|
|
|
+ "s.sensor_id AS sensor_id," +
|
|
|
+ "s.unit_id AS unit_id," +
|
|
|
+ "r.value_attribute AS attribute," +
|
|
|
+ "r.record_value AS value," +
|
|
|
+ "r.time_interval AS interval," +
|
|
|
+ "r.time_stamp AS time_stamp," +
|
|
|
+ "g.aggregation_type AS aggregation_type " +
|
|
|
+ "FROM statistics.records AS r " +
|
|
|
+ "JOIN statistics.groups_interval AS g ON g.id = r.group_id " +
|
|
|
+ "JOIN statistics.sensor_to_group AS sg ON sg.group_id = r.group_id " +
|
|
|
+ "JOIN statistics.sensors AS s ON s.id = sg.sensor_id " +
|
|
|
+ "WHERE r.time_stamp >= :time_from " +
|
|
|
+ "AND (r.time_stamp + r.time_interval * interval '1 second') < :time_to " +
|
|
|
+ "AND r.created >= sg.created " +
|
|
|
+ "ORDER BY r.created"
|
|
|
)
|
|
|
- .bind("id_min", encodeId(sensor, MIN))
|
|
|
- .bind("id_max", encodeId(sensor, MAX))
|
|
|
- .bind("id_sum", encodeId(sensor, SUM))
|
|
|
- .bind("id_count", encodeId(sensor, COUNT))
|
|
|
+ .bind("group_id", groupId)
|
|
|
.bind("time_from", timeRange.getFrom())
|
|
|
.bind("time_to", timeRange.getTo())
|
|
|
.map((rs, ctx) -> {
|
|
|
- AttributeCode code = decodeId(rs.getLong("id")).getItem2();
|
|
|
- Double value = rs.getDouble("value");
|
|
|
- Tuple<AttributeCode, Double> val = Tuple.of(code, value);
|
|
|
+ RawRecord r = new RawRecord();
|
|
|
+ r.recordId = rs.getLong("record_id");
|
|
|
+ r.groupId = rs.getLong("group_id");
|
|
|
+ r.sensorId = rs.getLong("sensor_id");
|
|
|
+ r.unitId = rs.getLong("unit_id");
|
|
|
+ r.attribute = AttributeValue.valueOf(rs.getString("attribute"));
|
|
|
+ r.value = rs.getDouble("value");
|
|
|
+ r.interval = rs.getInt("interval");
|
|
|
+ r.timestamp = Timestamp.parse(rs.getString("time_stamp"));
|
|
|
+ r.aggregationType = AggregationType.valueOf(rs.getString("aggregation_type"));
|
|
|
+ return r;
|
|
|
+ }).stream().filter(r -> r.aggregationType.equals(DOUBLE))
|
|
|
+ .collect(toList())
|
|
|
+ );
|
|
|
|
|
|
- Timestamp timestamp = Timestamp.parse(rs.getString("timestamp"));
|
|
|
- Integer interval = rs.getInt("interval");
|
|
|
- Tuple<Timestamp, Integer> key = Tuple.of(timestamp, interval);
|
|
|
+ Set<Sensor> sensors = new HashSet<>();
|
|
|
|
|
|
- return new AbstractMap.SimpleEntry<>(key, val);
|
|
|
- }).collect(groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toList()))));
|
|
|
+ Map<Long, RawRecord> districtRecords = new HashMap<>();
|
|
|
+ for (RawRecord rawRecord : rawRecords) {
|
|
|
+ sensors.add(new Sensor(rawRecord.sensorId, rawRecord.unitId, rawRecord.groupId));
|
|
|
+ districtRecords.put(rawRecord.recordId, rawRecord);
|
|
|
+ }
|
|
|
|
|
|
- // TODO refactor
|
|
|
- return result.entrySet().stream().filter(e -> e.getValue().size() == 4).flatMap(recordEntry -> {
|
|
|
- Tuple<Timestamp, Integer> key = recordEntry.getKey();
|
|
|
- Timestamp timestamp = key.getItem1();
|
|
|
- Integer interval = key.getItem2();
|
|
|
+ Map<Tuple<Timestamp, Integer>, List<RawRecord>> groupedRecords = new HashMap<>();
|
|
|
+ for (RawRecord record : districtRecords.values()) {
|
|
|
+ groupedRecords.computeIfAbsent(Tuple.of(record.timestamp, record.interval), k -> new ArrayList<>())
|
|
|
+ .add(record);
|
|
|
+ }
|
|
|
+
|
|
|
+ List<DoubleStatistics> statistics = new ArrayList<>(groupedRecords.size());
|
|
|
+ for (Map.Entry<Tuple<Timestamp, Integer>, List<RawRecord>> stEntry : groupedRecords.entrySet()) {
|
|
|
+ Timestamp timestamp = stEntry.getKey().getItem1();
|
|
|
+ int interval = stEntry.getKey().getItem2();
|
|
|
+ List<RawRecord> records = stEntry.getValue();
|
|
|
|
|
|
double min=0, max=0, sum=0; long count=0;
|
|
|
boolean unknown = false;
|
|
|
- for (Tuple<AttributeCode, Double> valEntry : recordEntry.getValue()) {
|
|
|
- switch (valEntry.getItem1()) {
|
|
|
- case MAX: max = valEntry.getItem2(); break;
|
|
|
- case MIN: min = valEntry.getItem2(); break;
|
|
|
- case SUM: sum = valEntry.getItem2(); break;
|
|
|
- case COUNT: count = valEntry.getItem2().longValue(); break;
|
|
|
+ for (RawRecord r : records) {
|
|
|
+ switch (r.attribute) {
|
|
|
+ case MAX: max = r.value; break;
|
|
|
+ case MIN: min = r.value; break;
|
|
|
+ case SUM: sum = r.value; break;
|
|
|
+ case COUNT: count = (long) r.value; break;
|
|
|
default: unknown = true;
|
|
|
}
|
|
|
}
|
|
|
+ if (!unknown) {
|
|
|
+ Group group = new Group(groupId, interval, true, DOUBLE, sensors);
|
|
|
+ statistics.add(new DoubleStatistics(group, count, min, max, sum, timestamp));
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- return unknown ? Stream.empty() : Stream.of(new DoubleStatistics(sensor, count, min, max, sum, timestamp, interval));
|
|
|
- }).collect(toList());
|
|
|
- }
|
|
|
-
|
|
|
- public List<Observation> getRawByTimeRange(Sensor sensor, TimeRange<Instant> timeRange) {
|
|
|
- return jdbi.withHandle(h -> h.createQuery("SELECT r.sensor_attribute as id, r.value as value, r.timestamp as timestamp " +
|
|
|
- "FROM statistics.records AS r " +
|
|
|
- "WHERE (r.sensor_attribute = :id_min " +
|
|
|
- "OR r.sensor_attribute = :id_max " +
|
|
|
- "OR r.sensor_attribute = :id_sum " +
|
|
|
- "OR r.sensor_attribute = :id_count) " +
|
|
|
- "AND timestamp >= :time_from " +
|
|
|
- "AND (r.timestamp + r.interval * interval '1 second') < :time_to"
|
|
|
- )
|
|
|
- .bind("id_min", encodeId(sensor, MIN))
|
|
|
- .bind("id_max", encodeId(sensor, MAX))
|
|
|
- .bind("id_sum", encodeId(sensor, SUM))
|
|
|
- .bind("id_count", encodeId(sensor, COUNT))
|
|
|
- .bind("time_from", timeRange.getFrom())
|
|
|
- .bind("time_to", timeRange.getTo())
|
|
|
- .map((rs, ctx) -> new Observation(
|
|
|
- new Sensor(rs.getLong("id")),
|
|
|
- rs.getDouble("value"),
|
|
|
- Timestamp.parse(rs.getString("timestamp"))
|
|
|
- )
|
|
|
- ).list()
|
|
|
- );
|
|
|
+ return statistics;
|
|
|
}
|
|
|
}
|