|
@@ -4,67 +4,60 @@ import cz.senslog.analyzer.core.api.BlockingHandler;
|
|
|
import cz.senslog.analyzer.core.api.DataFinisher;
|
|
import cz.senslog.analyzer.core.api.DataFinisher;
|
|
|
import cz.senslog.analyzer.core.api.HandlerContext;
|
|
import cz.senslog.analyzer.core.api.HandlerContext;
|
|
|
import cz.senslog.analyzer.domain.*;
|
|
import cz.senslog.analyzer.domain.*;
|
|
|
|
|
+import cz.senslog.analyzer.storage.inmemory.CollectedStatisticsStorage;
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
import org.apache.logging.log4j.Logger;
|
|
import org.apache.logging.log4j.Logger;
|
|
|
|
|
|
|
|
import java.util.*;
|
|
import java.util.*;
|
|
|
-import java.util.function.Consumer;
|
|
|
|
|
|
|
+import java.util.function.Function;
|
|
|
|
|
|
|
|
-import static cz.senslog.common.json.BasicJson.objectToJson;
|
|
|
|
|
-import static java.util.Collections.singletonList;
|
|
|
|
|
|
|
|
|
|
public abstract class CollectorHandler<I extends Data<?, ?>> extends BlockingHandler<I, DoubleStatistics> {
|
|
public abstract class CollectorHandler<I extends Data<?, ?>> extends BlockingHandler<I, DoubleStatistics> {
|
|
|
|
|
|
|
|
private static final Logger logger = LogManager.getLogger(CollectorHandler.class);
|
|
private static final Logger logger = LogManager.getLogger(CollectorHandler.class);
|
|
|
|
|
|
|
|
- private static class CollectedStatistics {
|
|
|
|
|
- private final Timestamp startTime;
|
|
|
|
|
- private final Timestamp endTime;
|
|
|
|
|
- private final DoubleStatistics summaryStatistics;
|
|
|
|
|
-
|
|
|
|
|
- private CollectedStatistics(Group group, Timestamp startTime) {
|
|
|
|
|
- this.startTime = startTime;
|
|
|
|
|
- this.endTime = Timestamp.of(startTime.get().plusSeconds(group.getInterval()));
|
|
|
|
|
- this.summaryStatistics = DoubleStatistics.init(group, startTime);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- /** Map of saved statistics group by group_id and interval (Map<group_id, Statistics>).*/
|
|
|
|
|
- private final Map<Group, List<CollectedStatistics>> collectedStatistics;
|
|
|
|
|
-
|
|
|
|
|
/** Map of saved groups (Map<group_id, Group>). */
|
|
/** Map of saved groups (Map<group_id, Group>). */
|
|
|
private Map<Long, Group> groupsGroupById;
|
|
private Map<Long, Group> groupsGroupById;
|
|
|
|
|
|
|
|
- public CollectorHandler() {
|
|
|
|
|
- this.collectedStatistics = new HashMap<>();
|
|
|
|
|
|
|
+ private Map<Group, List<CollectedStatistics>> collectedStatistics;
|
|
|
|
|
+
|
|
|
|
|
+ private final CollectedStatisticsStorage storage;
|
|
|
|
|
+
|
|
|
|
|
+ public CollectorHandler(CollectedStatisticsStorage storage) {
|
|
|
|
|
+ this.storage = storage;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
protected abstract List<Group> loadGroups();
|
|
protected abstract List<Group> loadGroups();
|
|
|
- protected abstract Consumer<I> collectData(DoubleStatistics statistics);
|
|
|
|
|
|
|
+ protected abstract Function<I, Boolean> collectData(DoubleStatistics statistics);
|
|
|
protected abstract long getGroupId(I data);
|
|
protected abstract long getGroupId(I data);
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
|
public void init() {
|
|
public void init() {
|
|
|
List<Group> groups = loadGroups();
|
|
List<Group> groups = loadGroups();
|
|
|
groupsGroupById = new HashMap<>(groups.size());
|
|
groupsGroupById = new HashMap<>(groups.size());
|
|
|
|
|
+ collectedStatistics = new HashMap<>(groups.size());
|
|
|
for (Group group : groups) {
|
|
for (Group group : groups) {
|
|
|
groupsGroupById.put(group.getId(), group);
|
|
groupsGroupById.put(group.getId(), group);
|
|
|
|
|
+ collectedStatistics.put(group, storage.restore(group));
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
|
public void finish(DataFinisher<DoubleStatistics> finisher, Timestamp edgeDateTime) {
|
|
public void finish(DataFinisher<DoubleStatistics> finisher, Timestamp edgeDateTime) {
|
|
|
List<DoubleStatistics> finishedData = new ArrayList<>();
|
|
List<DoubleStatistics> finishedData = new ArrayList<>();
|
|
|
- for (List<CollectedStatistics> statistics : collectedStatistics.values()) {
|
|
|
|
|
|
|
+ for (Group group : groupsGroupById.values()) {
|
|
|
|
|
+ List<CollectedStatistics> statistics = getCollectedStatisticsByGroup(group);
|
|
|
Iterator<CollectedStatistics> statisticsIterator = statistics.iterator();
|
|
Iterator<CollectedStatistics> statisticsIterator = statistics.iterator();
|
|
|
while (statisticsIterator.hasNext()) {
|
|
while (statisticsIterator.hasNext()) {
|
|
|
- CollectedStatistics statistic = statisticsIterator.next();
|
|
|
|
|
- if (statistic.endTime.isBefore(edgeDateTime)) {
|
|
|
|
|
- finishedData.add(statistic.summaryStatistics);
|
|
|
|
|
|
|
+ CollectedStatistics st = statisticsIterator.next();
|
|
|
|
|
+ if (st.getEndTime().isBefore(edgeDateTime)) {
|
|
|
|
|
+ finishedData.add(st.getStatistics());
|
|
|
statisticsIterator.remove();
|
|
statisticsIterator.remove();
|
|
|
|
|
+ storage.remove(st);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
+ storage.commit();
|
|
|
finisher.finish(finishedData);
|
|
finisher.finish(finishedData);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -81,26 +74,26 @@ public abstract class CollectorHandler<I extends Data<?, ?>> extends BlockingHan
|
|
|
|
|
|
|
|
boolean newDataAccepted = false;
|
|
boolean newDataAccepted = false;
|
|
|
for (CollectedStatistics st : groupStatistics) {
|
|
for (CollectedStatistics st : groupStatistics) {
|
|
|
- if (timestamp.isEqual(st.startTime) ||
|
|
|
|
|
- (timestamp.isAfter(st.startTime) && timestamp.isBefore(st.endTime))
|
|
|
|
|
|
|
+ if (timestamp.isEqual(st.getStartTime()) ||
|
|
|
|
|
+ (timestamp.isAfter(st.getStartTime()) && timestamp.isBefore(st.getEndTime()))
|
|
|
) {
|
|
) {
|
|
|
- collectData(st.summaryStatistics).accept(data);
|
|
|
|
|
|
|
+ collectData(st.getStatistics()).apply(data);
|
|
|
newDataAccepted = true;
|
|
newDataAccepted = true;
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- if (!newDataAccepted) {
|
|
|
|
|
- CollectedStatistics st = new CollectedStatistics(group, timestamp);
|
|
|
|
|
- collectData(st.summaryStatistics).accept(data);
|
|
|
|
|
- groupStatistics.add(st);
|
|
|
|
|
|
|
+ if (!newDataAccepted) { // register a new statistics
|
|
|
|
|
+ CollectedStatistics newSt = new CollectedStatistics(group, timestamp);
|
|
|
|
|
+ collectData(newSt.getStatistics()).apply(data);
|
|
|
|
|
+ groupStatistics.add(storage.watch(newSt));
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- private List<CollectedStatistics> getCollectedStatisticsByGroup(Group group) {
|
|
|
|
|
- return collectedStatistics.computeIfAbsent(group, k -> new ArrayList<>());
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
private Group getGroupByGroupId(long groupId) {
|
|
private Group getGroupByGroupId(long groupId) {
|
|
|
return groupsGroupById.getOrDefault(groupId, Group.empty());
|
|
return groupsGroupById.getOrDefault(groupId, Group.empty());
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ private List<CollectedStatistics> getCollectedStatisticsByGroup(Group group) {
|
|
|
|
|
+ return collectedStatistics.computeIfAbsent(group, g -> new ArrayList<>());
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|