| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- package cz.senslog.analyzer.analysis.module;
- import cz.senslog.analyzer.core.api.BlockingHandler;
- import cz.senslog.analyzer.core.api.DataFinisher;
- import cz.senslog.analyzer.core.api.HandlerContext;
- import cz.senslog.analyzer.domain.*;
- import org.apache.logging.log4j.LogManager;
- import org.apache.logging.log4j.Logger;
- import java.util.*;
- import java.util.function.Consumer;
- import static cz.senslog.common.json.BasicJson.objectToJson;
- import static java.util.Collections.singletonList;
- public abstract class CollectorHandler<I extends Data<?, ?>> extends BlockingHandler<I, DoubleStatistics> {
- private static final Logger logger = LogManager.getLogger(CollectorHandler.class);
- private static class CollectedStatistics {
- private final Timestamp startTime;
- private final Timestamp endTime;
- private final DoubleStatistics summaryStatistics;
- private CollectedStatistics(Group group, Timestamp startTime) {
- this.startTime = startTime;
- this.endTime = Timestamp.of(startTime.get().plusSeconds(group.getInterval()));
- this.summaryStatistics = DoubleStatistics.init(group, startTime);
- }
- }
- /** Map of saved statistics group by group_id and interval (Map<group_id, Statistics>).*/
- private final Map<Group, List<CollectedStatistics>> collectedStatistics;
- /** Map of saved groups (Map<group_id, Group>). */
- private Map<Long, Group> groupsGroupById;
- public CollectorHandler() {
- this.collectedStatistics = new HashMap<>();
- }
- protected abstract List<Group> loadGroups();
- protected abstract Consumer<I> collectData(DoubleStatistics statistics);
- protected abstract long getGroupId(I data);
- @Override
- public void init() {
- List<Group> groups = loadGroups();
- groupsGroupById = new HashMap<>(groups.size());
- for (Group group : groups) {
- groupsGroupById.put(group.getId(), group);
- }
- }
- @Override
- public void finish(DataFinisher<DoubleStatistics> finisher, Timestamp edgeDateTime) {
- List<DoubleStatistics> finishedData = new ArrayList<>();
- for (List<CollectedStatistics> statistics : collectedStatistics.values()) {
- Iterator<CollectedStatistics> statisticsIterator = statistics.iterator();
- while (statisticsIterator.hasNext()) {
- CollectedStatistics statistic = statisticsIterator.next();
- if (statistic.endTime.isBefore(edgeDateTime)) {
- finishedData.add(statistic.summaryStatistics);
- statisticsIterator.remove();
- }
- }
- }
- finisher.finish(finishedData);
- }
- @Override
- public void handle(HandlerContext<I, DoubleStatistics> context) {
- I data = context.data();
- long groupId = getGroupId(context.data());
- Timestamp timestamp = data.getTimestamp();
- Group group = getGroupByGroupId(groupId);
- if (group.getInterval() <= 0) { return; }
- List<CollectedStatistics> groupStatistics = getCollectedStatisticsByGroup(group);
- boolean newDataAccepted = false;
- for (CollectedStatistics st : groupStatistics) {
- if (timestamp.isEqual(st.startTime) ||
- (timestamp.isAfter(st.startTime) && timestamp.isBefore(st.endTime))
- ) {
- collectData(st.summaryStatistics).accept(data);
- newDataAccepted = true;
- }
- }
- if (!newDataAccepted) {
- CollectedStatistics st = new CollectedStatistics(group, timestamp);
- collectData(st.summaryStatistics).accept(data);
- groupStatistics.add(st);
- }
- }
- private List<CollectedStatistics> getCollectedStatisticsByGroup(Group group) {
- return collectedStatistics.computeIfAbsent(group, k -> new ArrayList<>());
- }
- private Group getGroupByGroupId(long groupId) {
- return groupsGroupById.getOrDefault(groupId, Group.empty());
- }
- }
|