package cz.senslog.analyzer.analysis.module; import cz.senslog.analyzer.core.api.BlockingHandler; import cz.senslog.analyzer.core.api.DataFinisher; import cz.senslog.analyzer.core.api.HandlerContext; import cz.senslog.analyzer.domain.*; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.*; import java.util.function.Consumer; import static cz.senslog.common.json.BasicJson.objectToJson; import static java.util.Collections.singletonList; public abstract class CollectorHandler> extends BlockingHandler { private static final Logger logger = LogManager.getLogger(CollectorHandler.class); private static class CollectedStatistics { private final Timestamp startTime; private final Timestamp endTime; private final DoubleStatistics summaryStatistics; private CollectedStatistics(Group group, Timestamp startTime) { this.startTime = startTime; this.endTime = Timestamp.of(startTime.get().plusSeconds(group.getInterval())); this.summaryStatistics = DoubleStatistics.init(group, startTime); } } /** Map of saved statistics group by group_id and interval (Map).*/ private final Map> collectedStatistics; /** Map of saved groups (Map). */ private Map groupsGroupById; public CollectorHandler() { this.collectedStatistics = new HashMap<>(); } protected abstract List loadGroups(); protected abstract Consumer collectData(DoubleStatistics statistics); protected abstract long getGroupId(I data); @Override public void init() { List groups = loadGroups(); groupsGroupById = new HashMap<>(groups.size()); for (Group group : groups) { groupsGroupById.put(group.getId(), group); } } @Override public void finish(DataFinisher finisher, Timestamp edgeDateTime) { List finishedData = new ArrayList<>(); for (List statistics : collectedStatistics.values()) { Iterator statisticsIterator = statistics.iterator(); while (statisticsIterator.hasNext()) { CollectedStatistics statistic = statisticsIterator.next(); if (statistic.endTime.isBefore(edgeDateTime)) { finishedData.add(statistic.summaryStatistics); statisticsIterator.remove(); } } } finisher.finish(finishedData); } @Override public void handle(HandlerContext context) { I data = context.data(); long groupId = getGroupId(context.data()); Timestamp timestamp = data.getTimestamp(); Group group = getGroupByGroupId(groupId); if (group.getInterval() <= 0) { return; } List groupStatistics = getCollectedStatisticsByGroup(group); boolean newDataAccepted = false; for (CollectedStatistics st : groupStatistics) { if (timestamp.isEqual(st.startTime) || (timestamp.isAfter(st.startTime) && timestamp.isBefore(st.endTime)) ) { collectData(st.summaryStatistics).accept(data); newDataAccepted = true; } } if (!newDataAccepted) { CollectedStatistics st = new CollectedStatistics(group, timestamp); collectData(st.summaryStatistics).accept(data); groupStatistics.add(st); } } private List getCollectedStatisticsByGroup(Group group) { return collectedStatistics.computeIfAbsent(group, k -> new ArrayList<>()); } private Group getGroupByGroupId(long groupId) { return groupsGroupById.getOrDefault(groupId, Group.empty()); } }