CollectorHandler.java 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package cz.senslog.analyzer.analysis.module;
  2. import cz.senslog.analyzer.core.api.BlockingHandler;
  3. import cz.senslog.analyzer.core.api.DataFinisher;
  4. import cz.senslog.analyzer.core.api.HandlerContext;
  5. import cz.senslog.analyzer.domain.*;
  6. import org.apache.logging.log4j.LogManager;
  7. import org.apache.logging.log4j.Logger;
  8. import java.util.*;
  9. import java.util.function.Consumer;
  10. import static cz.senslog.common.json.BasicJson.objectToJson;
  11. import static java.util.Collections.singletonList;
  12. public abstract class CollectorHandler<I extends Data<?, ?>> extends BlockingHandler<I, DoubleStatistics> {
  13. private static final Logger logger = LogManager.getLogger(CollectorHandler.class);
  14. private static class CollectedStatistics {
  15. private final Timestamp startTime;
  16. private final Timestamp endTime;
  17. private final DoubleStatistics summaryStatistics;
  18. private CollectedStatistics(Group group, Timestamp startTime) {
  19. this.startTime = startTime;
  20. this.endTime = Timestamp.of(startTime.get().plusSeconds(group.getInterval()));
  21. this.summaryStatistics = DoubleStatistics.init(group, startTime);
  22. }
  23. }
  24. /** Map of saved statistics group by group_id and interval (Map<group_id, Statistics>).*/
  25. private final Map<Group, List<CollectedStatistics>> collectedStatistics;
  26. /** Map of saved groups (Map<group_id, Group>). */
  27. private Map<Long, Group> groupsGroupById;
  28. public CollectorHandler() {
  29. this.collectedStatistics = new HashMap<>();
  30. }
  31. protected abstract List<Group> loadGroups();
  32. protected abstract Consumer<I> collectData(DoubleStatistics statistics);
  33. protected abstract long getGroupId(I data);
  34. @Override
  35. public void init() {
  36. List<Group> groups = loadGroups();
  37. groupsGroupById = new HashMap<>(groups.size());
  38. for (Group group : groups) {
  39. groupsGroupById.put(group.getId(), group);
  40. }
  41. }
  42. @Override
  43. public void finish(DataFinisher<DoubleStatistics> finisher, Timestamp edgeDateTime) {
  44. List<DoubleStatistics> finishedData = new ArrayList<>();
  45. for (List<CollectedStatistics> statistics : collectedStatistics.values()) {
  46. Iterator<CollectedStatistics> statisticsIterator = statistics.iterator();
  47. while (statisticsIterator.hasNext()) {
  48. CollectedStatistics statistic = statisticsIterator.next();
  49. if (statistic.endTime.isBefore(edgeDateTime)) {
  50. finishedData.add(statistic.summaryStatistics);
  51. statisticsIterator.remove();
  52. }
  53. }
  54. }
  55. finisher.finish(finishedData);
  56. }
  57. @Override
  58. public void handle(HandlerContext<I, DoubleStatistics> context) {
  59. I data = context.data();
  60. long groupId = getGroupId(context.data());
  61. Timestamp timestamp = data.getTimestamp();
  62. Group group = getGroupByGroupId(groupId);
  63. if (group.getInterval() <= 0) { return; }
  64. List<CollectedStatistics> groupStatistics = getCollectedStatisticsByGroup(group);
  65. boolean newDataAccepted = false;
  66. for (CollectedStatistics st : groupStatistics) {
  67. if (timestamp.isEqual(st.startTime) ||
  68. (timestamp.isAfter(st.startTime) && timestamp.isBefore(st.endTime))
  69. ) {
  70. collectData(st.summaryStatistics).accept(data);
  71. newDataAccepted = true;
  72. }
  73. }
  74. if (!newDataAccepted) {
  75. CollectedStatistics st = new CollectedStatistics(group, timestamp);
  76. collectData(st.summaryStatistics).accept(data);
  77. groupStatistics.add(st);
  78. }
  79. }
  80. private List<CollectedStatistics> getCollectedStatisticsByGroup(Group group) {
  81. return collectedStatistics.computeIfAbsent(group, k -> new ArrayList<>());
  82. }
  83. private Group getGroupByGroupId(long groupId) {
  84. return groupsGroupById.getOrDefault(groupId, Group.empty());
  85. }
  86. }