瀏覽代碼

added a system to continue after the app's crash

Lukas Cerny 4 年之前
父節點
當前提交
cbbdd2d7e3
共有 21 個文件被更改,包括 609 次插入84 次删除
  1. 1 1
      config/config.yaml
  2. 17 8
      src/main/java/cz/senslog/analyzer/analysis/module/CollectorHandler.java
  3. 7 3
      src/main/java/cz/senslog/analyzer/analysis/module/HandlersModule.java
  4. 1 1
      src/main/java/cz/senslog/analyzer/app/Application.java
  5. 16 5
      src/main/java/cz/senslog/analyzer/domain/Timestamp.java
  6. 5 0
      src/main/java/cz/senslog/analyzer/domain/TimestampType.java
  7. 2 0
      src/main/java/cz/senslog/analyzer/provider/AnalyzerTask.java
  8. 3 2
      src/main/java/cz/senslog/analyzer/provider/ScheduleDatabaseProviderModule.java
  9. 8 52
      src/main/java/cz/senslog/analyzer/provider/ScheduledDatabaseProvider.java
  10. 92 0
      src/main/java/cz/senslog/analyzer/provider/task/ObservationAnalyzerTask.java
  11. 7 0
      src/main/java/cz/senslog/analyzer/storage/RepositoryModule.java
  12. 16 0
      src/main/java/cz/senslog/analyzer/storage/inmemory/AbstractInMemoryRepository.java
  13. 36 0
      src/main/java/cz/senslog/analyzer/storage/inmemory/TimestampStorage.java
  14. 5 7
      src/main/java/cz/senslog/analyzer/storage/inmemory/repository/CollectedStatisticsRepository.java
  15. 80 0
      src/main/java/cz/senslog/analyzer/storage/inmemory/repository/TimestampRepository.java
  16. 6 3
      src/main/java/cz/senslog/analyzer/storage/permanent/repository/SensLogRepository.java
  17. 51 0
      src/main/java/cz/senslog/analyzer/util/ListUtils.java
  18. 11 2
      src/test/java/cz/senslog/analyzer/domain/TimestampTest.java
  19. 145 0
      src/test/java/cz/senslog/analyzer/provider/task/ObservationAnalyzerTaskTest.java
  20. 46 0
      src/test/java/cz/senslog/analyzer/storage/inmemory/repository/TimestampRepositoryTest.java
  21. 54 0
      src/test/java/cz/senslog/analyzer/util/ListUtilsTest.java

+ 1 - 1
config/config.yaml

@@ -11,7 +11,7 @@ inMemoryStorage:
 
 scheduler:
   initDate: "1970-01-01T00:00:00.00+02:00"
-  period: 2
+  period: 10
 
 server:
   port: 9090

+ 17 - 8
src/main/java/cz/senslog/analyzer/analysis/module/CollectorHandler.java

@@ -5,14 +5,17 @@ import cz.senslog.analyzer.core.api.DataFinisher;
 import cz.senslog.analyzer.core.api.HandlerContext;
 import cz.senslog.analyzer.domain.*;
 import cz.senslog.analyzer.storage.inmemory.CollectedStatisticsStorage;
+import cz.senslog.analyzer.storage.inmemory.TimestampStorage;
 import cz.senslog.common.util.DateTrunc;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.time.OffsetDateTime;
 import java.util.*;
 import java.util.function.Function;
 
+import static cz.senslog.analyzer.domain.TimestampType.LAST_ACCEPTED_INCLUSIVE;
+import static cz.senslog.analyzer.domain.TimestampType.LAST_COMMITTED_INCLUSIVE;
+
 
 public abstract class CollectorHandler<I extends Data<?, ?>> extends BlockingHandler<I, DoubleStatistics> {
 
@@ -23,10 +26,12 @@ public abstract class CollectorHandler<I extends Data<?, ?>> extends BlockingHan
 
     private Map<Group, List<CollectedStatistics>> collectedStatistics;
 
-    private final CollectedStatisticsStorage storage;
+    private final CollectedStatisticsStorage statisticsStorage;
+    private final TimestampStorage timestampStorage;
 
-    public CollectorHandler(CollectedStatisticsStorage storage) {
-        this.storage = storage;
+    public CollectorHandler(CollectedStatisticsStorage statisticsStorage, TimestampStorage timestampStorage) {
+        this.statisticsStorage = statisticsStorage;
+        this.timestampStorage = timestampStorage;
     }
 
     protected abstract List<Group> loadGroups();
@@ -40,7 +45,7 @@ public abstract class CollectorHandler<I extends Data<?, ?>> extends BlockingHan
         collectedStatistics = new HashMap<>(groups.size());
         for (Group group : groups) {
             groupsGroupById.put(group.getId(), group);
-            collectedStatistics.put(group, storage.restore(group));
+            collectedStatistics.put(group, statisticsStorage.restore(group));
         }
     }
 
@@ -55,11 +60,13 @@ public abstract class CollectorHandler<I extends Data<?, ?>> extends BlockingHan
                 if (st.getEndTime().isBefore(edgeDateTime)) {
                     finishedData.add(st.getStatistics());
                     statisticsIterator.remove();
-                    storage.remove(st);
+                    statisticsStorage.remove(st);
                 }
             }
         }
-        storage.commit();
+        if (statisticsStorage.commit()) {
+            timestampStorage.update(edgeDateTime, LAST_COMMITTED_INCLUSIVE);
+        }
         finisher.finish(finishedData);
     }
 
@@ -88,8 +95,10 @@ public abstract class CollectorHandler<I extends Data<?, ?>> extends BlockingHan
             Timestamp startOfInterval = createStartOfInterval(timestamp, group);
             CollectedStatistics newSt = new CollectedStatistics(group, startOfInterval);
             collectData(newSt.getStatistics()).apply(data);
-            groupStatistics.add(storage.watch(newSt));
+            groupStatistics.add(statisticsStorage.watch(newSt));
         }
+
+        timestampStorage.update(timestamp, LAST_ACCEPTED_INCLUSIVE);
     }
 
     private static Timestamp createStartOfInterval(Timestamp timestamp, Group group) {

+ 7 - 3
src/main/java/cz/senslog/analyzer/analysis/module/HandlersModule.java

@@ -4,7 +4,9 @@ import cz.senslog.analyzer.core.api.BlockingHandler;
 import cz.senslog.analyzer.domain.*;
 import cz.senslog.analyzer.storage.RepositoryModule;
 import cz.senslog.analyzer.storage.inmemory.CollectedStatisticsStorage;
+import cz.senslog.analyzer.storage.inmemory.TimestampStorage;
 import cz.senslog.analyzer.storage.inmemory.repository.CollectedStatisticsRepository;
+import cz.senslog.analyzer.storage.inmemory.repository.TimestampRepository;
 import cz.senslog.analyzer.storage.permanent.repository.StatisticsConfigRepository;
 import dagger.Module;
 import dagger.Provides;
@@ -53,11 +55,13 @@ public class HandlersModule {
     @Provides @Singleton @Named("aggregationCollectorHandler")
     public BlockingHandler<Observation, DoubleStatistics> provideObservationCollector(
             StatisticsConfigRepository statisticsConfigRepository,
-            CollectedStatisticsRepository collectedStatisticsRepository
+            CollectedStatisticsRepository collectedStatisticsRepository,
+            TimestampRepository timestampRepository
     ) {
         logger.info("Creating a new instance for the handler '{}'.", "aggregationCollectorHandler");
-        CollectedStatisticsStorage storage = CollectedStatisticsStorage.createContext(collectedStatisticsRepository);
-        return new CollectorHandler<Observation>(storage) {
+        CollectedStatisticsStorage statisticsStorage = CollectedStatisticsStorage.createContext(collectedStatisticsRepository);
+        TimestampStorage timestampStorage = TimestampStorage.createContext(timestampRepository);
+        return new CollectorHandler<Observation>(statisticsStorage, timestampStorage) {
             @Override protected List<Group> loadGroups() {
                 return statisticsConfigRepository.getGroupInfos();
             }

+ 1 - 1
src/main/java/cz/senslog/analyzer/app/Application.java

@@ -101,6 +101,6 @@ public class Application extends Thread {
                 .createServer();
 
         server.start(port);
-       // dataProvider.start();
+        dataProvider.start();
     }
 }

+ 16 - 5
src/main/java/cz/senslog/analyzer/domain/Timestamp.java

@@ -10,8 +10,12 @@ import java.util.Objects;
 
 public class Timestamp implements Comparable<Timestamp> {
 
+    public static final Timestamp MIN = of(OffsetDateTime.MIN);
+
+    public static final Timestamp MAX = of(OffsetDateTime.MAX);
+
     private static final DateTimeFormatter INPUT_FORMATTER = new DateTimeFormatterBuilder()
-            .append(DateTimeFormatter.ofPattern("yyyy-MM-dd[ HH:mm:ss]"))
+            .append(DateTimeFormatter.ofPattern("yyyy-MM-dd[ HH:mm:ss][.SSSSSS]"))
             .optionalStart()
             .appendOffset("+HH", "+00")
             .optionalEnd()
@@ -19,10 +23,11 @@ public class Timestamp implements Comparable<Timestamp> {
             .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
             .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
             .parseDefaulting(ChronoField.OFFSET_SECONDS, 0)
+            .parseDefaulting(ChronoField.MILLI_OF_SECOND, 0)
             .toFormatter();
 
     private static final DateTimeFormatter DATE_TIME_FORMATTER = new DateTimeFormatterBuilder()
-            .append(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
+            .append(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"))
             .optionalStart().appendOffset("+HH", "+00").optionalEnd()
             .toFormatter();
 
@@ -31,11 +36,10 @@ public class Timestamp implements Comparable<Timestamp> {
             .toFormatter();
 
     private static final DateTimeFormatter TIME_FORMATTER = new DateTimeFormatterBuilder()
-            .append(DateTimeFormatter.ofPattern("HH:mm:ss"))
+            .append(DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS"))
             .optionalStart().appendOffset("+HH", "+00").optionalEnd()
             .toFormatter();
 
-
     private final OffsetDateTime value;
 
     public static Timestamp parse(String value) {
@@ -43,6 +47,9 @@ public class Timestamp implements Comparable<Timestamp> {
     }
 
     public static Timestamp parse(String value, DateTimeFormatter formatter) {
+        if (value == null || value.isEmpty() || formatter == null) {
+            return null;
+        }
         return of(OffsetDateTime.parse(value, formatter));
     }
 
@@ -79,7 +86,11 @@ public class Timestamp implements Comparable<Timestamp> {
     }
 
     public String format() {
-        return value.format(DATE_TIME_FORMATTER);
+        return format(DATE_TIME_FORMATTER);
+    }
+
+    public String format(DateTimeFormatter formatter) {
+        return value.format(formatter);
     }
 
     public String timeFormat() {

+ 5 - 0
src/main/java/cz/senslog/analyzer/domain/TimestampType.java

@@ -0,0 +1,5 @@
+package cz.senslog.analyzer.domain;
+
+public enum TimestampType {
+    FIRST_PROVIDED_INCLUSIVE, LAST_PROVIDED_INCLUSIVE, LAST_PROVIDED_EXCLUSIVE, LAST_ACCEPTED_INCLUSIVE, LAST_COMMITTED_INCLUSIVE
+}

+ 2 - 0
src/main/java/cz/senslog/analyzer/provider/AnalyzerTask.java

@@ -4,12 +4,14 @@ package cz.senslog.analyzer.provider;
 import cz.senslog.analyzer.analysis.Analyzer;
 
 import java.util.List;
+import java.util.Objects;
 
 public abstract class AnalyzerTask<T> implements Runnable {
 
     private final Analyzer<T> analyzer;
 
     protected AnalyzerTask(Analyzer<T> analyzer) {
+        Objects.requireNonNull(analyzer);
         this.analyzer = analyzer;
     }
 

+ 3 - 2
src/main/java/cz/senslog/analyzer/provider/ScheduleDatabaseProviderModule.java

@@ -1,6 +1,7 @@
 package cz.senslog.analyzer.provider;
 
 import cz.senslog.analyzer.storage.RepositoryModule;
+import cz.senslog.analyzer.storage.inmemory.repository.TimestampRepository;
 import cz.senslog.analyzer.storage.permanent.repository.SensLogRepository;
 import dagger.Module;
 import dagger.Provides;
@@ -14,7 +15,7 @@ public class ScheduleDatabaseProviderModule {
     }
 
     @Provides
-    public ScheduledDatabaseProvider provideScheduleDatabaseProvider(SensLogRepository repository) {
-        return new ScheduledDatabaseProvider(repository);
+    public ScheduledDatabaseProvider provideScheduleDatabaseProvider(TimestampRepository configRepository, SensLogRepository sensLogRepository) {
+        return new ScheduledDatabaseProvider(configRepository, sensLogRepository);
     }
 }

+ 8 - 52
src/main/java/cz/senslog/analyzer/provider/ScheduledDatabaseProvider.java

@@ -1,36 +1,36 @@
 package cz.senslog.analyzer.provider;
 
-import cz.senslog.analyzer.analysis.Analyzer;
 import cz.senslog.analyzer.domain.Observation;
-import cz.senslog.analyzer.domain.Timestamp;
+import cz.senslog.analyzer.provider.task.ObservationAnalyzerTask;
+import cz.senslog.analyzer.storage.inmemory.TimestampStorage;
+import cz.senslog.analyzer.storage.inmemory.repository.TimestampRepository;
 import cz.senslog.analyzer.storage.permanent.repository.SensLogRepository;
 import cz.senslog.common.util.schedule.Scheduler;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import javax.inject.Inject;
-import java.time.*;
-import java.util.ArrayList;
-import java.util.List;
 
 public class ScheduledDatabaseProvider extends DataProvider<Observation> {
 
     private static final Logger logger = LogManager.getLogger(ScheduledDatabaseProvider.class);
 
+    private final TimestampStorage storage;
     private final SensLogRepository repository;
 
     private Scheduler scheduler;
 
     @Inject
-    public ScheduledDatabaseProvider(SensLogRepository repository) {
-        this.repository = repository;
+    public ScheduledDatabaseProvider(TimestampRepository configRepository, SensLogRepository sensLogRepository) {
+        this.storage = TimestampStorage.createContext(configRepository);
+        this.repository = sensLogRepository;
     }
 
     @Override
     public void start() {
 
         scheduler = Scheduler.createBuilder()
-                .addTask(new ObservationAnalyzerTask(analyzer, repository, config.getStartDateTime()), config.getPeriod())
+                .addTask(new ObservationAnalyzerTask(analyzer, storage, repository, config.getStartDateTime()), config.getPeriod())
         .build();
 
         scheduler.start();
@@ -40,48 +40,4 @@ public class ScheduledDatabaseProvider extends DataProvider<Observation> {
     private Scheduler registerTask(AnalyzerTask<Observation> mainTask, AnalyzerTask<Observation>... tasks) {
         return null;
     }
-
-    private static class ObservationAnalyzerTask extends AnalyzerTask<Observation> {
-
-        private final SensLogRepository repository;
-
-        private OffsetDateTime timestamp;
-
-        private ObservationAnalyzerTask(Analyzer<Observation> analyzer, SensLogRepository repository, OffsetDateTime startDateTime) {
-            super(analyzer);
-            this.repository = repository;
-            this.timestamp = startDateTime;
-        }
-
-        @Override
-        protected List<Observation> loadData() {
-            List<Observation> observations = repository.getObservationsFromTime(timestamp);
-
-            if (!observations.isEmpty()) {
-                Observation observationToDelete = observations.get(observations.size() - 1);
-                Timestamp timestampToDelete = observationToDelete.getTimestamp();
-
-                int originalCount = observations.size();
-                List<Observation> deletedObservations = new ArrayList<>();
-                for (int i = observations.size(); i-- > 0;) {
-                    Observation currentObservation = observations.get(i);
-                    Timestamp currentTimestamp = currentObservation.getTimestamp();
-                    if (currentTimestamp.equals(timestampToDelete)) {
-                        deletedObservations.add(currentObservation);
-                        observations.remove(i);
-                    }
-                }
-
-                if (originalCount == deletedObservations.size()) {
-                    observations = deletedObservations;
-                }
-
-                Observation lastObservation = observations.get(observations.size() - 1);
-                OffsetDateTime lastTimestamp = lastObservation.getTimestamp().get();
-                timestamp = lastTimestamp.plusSeconds(1);
-            }
-
-            return observations;
-        }
-    }
 }

+ 92 - 0
src/main/java/cz/senslog/analyzer/provider/task/ObservationAnalyzerTask.java

@@ -0,0 +1,92 @@
+package cz.senslog.analyzer.provider.task;
+
+import cz.senslog.analyzer.analysis.Analyzer;
+import cz.senslog.analyzer.domain.Data;
+import cz.senslog.analyzer.domain.Observation;
+import cz.senslog.analyzer.domain.Timestamp;
+import cz.senslog.analyzer.provider.AnalyzerTask;
+import cz.senslog.analyzer.storage.inmemory.TimestampStorage;
+import cz.senslog.analyzer.storage.permanent.repository.SensLogRepository;
+
+import java.time.OffsetDateTime;
+import java.util.*;
+
+import static cz.senslog.analyzer.domain.TimestampType.*;
+import static cz.senslog.analyzer.domain.TimestampType.LAST_PROVIDED_EXCLUSIVE;
+import static cz.senslog.analyzer.util.ListUtils.sublistToEnd;
+
+public class ObservationAnalyzerTask extends AnalyzerTask<Observation> {
+
+    private static final int MAX_OBSERVATIONS = 100;
+
+    private final TimestampStorage storage;
+    private final SensLogRepository repository;
+    private final Timestamp startDateTime;
+
+    public ObservationAnalyzerTask(Analyzer<Observation> analyzer, TimestampStorage storage,
+                                   SensLogRepository repository, OffsetDateTime startDateTime
+    ) {
+        super(analyzer);
+        Objects.requireNonNull(storage);
+        Objects.requireNonNull(repository);
+        Objects.requireNonNull(startDateTime);
+        this.storage = storage;
+        this.repository = repository;
+        this.startDateTime = Timestamp.of(startDateTime);
+    }
+
+    @Override
+    protected List<Observation> loadData() {
+
+        Timestamp firstProvided = storage.get(FIRST_PROVIDED_INCLUSIVE, startDateTime);
+        Timestamp lastProvidedExclusive = storage.get(LAST_PROVIDED_EXCLUSIVE, Timestamp.MIN);
+        Timestamp lastProvidedInclusive = storage.get(LAST_PROVIDED_INCLUSIVE, Timestamp.MIN);
+        Timestamp acceptedInclusive = storage.get(LAST_ACCEPTED_INCLUSIVE, Timestamp.MIN);
+        Timestamp committedInclusive = storage.get(LAST_COMMITTED_INCLUSIVE, Timestamp.MIN);
+
+        boolean previousIterationWasProvided = firstProvided.isBefore(lastProvidedExclusive);
+        boolean previousIterationStarted = previousIterationWasProvided && (firstProvided.isBefore(acceptedInclusive) || firstProvided.isEqual(acceptedInclusive));
+        boolean previousIterationWasFinished = firstProvided.isBefore(committedInclusive) || firstProvided.isEqual(committedInclusive);
+        boolean previousIterationWasOk = previousIterationWasFinished && committedInclusive.isEqual(acceptedInclusive) && committedInclusive.isEqual(lastProvidedInclusive);
+        boolean previousIterationWasInProgress = previousIterationStarted && !previousIterationWasFinished;
+
+        List<Observation> newObservations;
+
+        if (previousIterationWasOk) {
+            newObservations = repository.getObservationsFromTime(lastProvidedExclusive, true, MAX_OBSERVATIONS);
+        } else if (previousIterationWasInProgress) {
+            // TODO a problem of more observations at the same timestamp
+            newObservations = repository.getObservationsFromTime(acceptedInclusive, false, MAX_OBSERVATIONS);
+        } else {
+            newObservations = repository.getObservationsFromTime(startDateTime, true, MAX_OBSERVATIONS);
+        }
+
+        if (newObservations.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        Timestamp start = newObservations.get(0).getTimestamp();
+        Timestamp end = newObservations.get(newObservations.size() - 1).getTimestamp();
+
+        List<Observation> observations = sublistToEnd(newObservations, end, false, Observation::getTimestamp);
+
+        if (observations.isEmpty()) {
+            if (newObservations.size() < MAX_OBSERVATIONS) {
+                return Collections.emptyList();
+            } else {
+                // TODO find a better solution -> a problem of more than MAX_OBSERVATIONS observations at the same timestamp
+                observations = newObservations;
+            }
+        }
+
+        observations.sort(Comparator.comparing(Data::getTimestamp));
+        Timestamp lastInclusive = observations.get(observations.size() - 1).getTimestamp();
+
+        storage.update(start, FIRST_PROVIDED_INCLUSIVE);
+
+        storage.update(end, LAST_PROVIDED_EXCLUSIVE);
+        storage.update(lastInclusive, LAST_PROVIDED_INCLUSIVE);
+
+        return observations;
+    }
+}

+ 7 - 0
src/main/java/cz/senslog/analyzer/storage/RepositoryModule.java

@@ -2,6 +2,7 @@ package cz.senslog.analyzer.storage;
 
 import cz.senslog.analyzer.storage.inmemory.InMemoryConnection;
 import cz.senslog.analyzer.storage.inmemory.repository.CollectedStatisticsRepository;
+import cz.senslog.analyzer.storage.inmemory.repository.TimestampRepository;
 import cz.senslog.analyzer.storage.permanent.PermanentConnection;
 import cz.senslog.analyzer.storage.permanent.repository.SensLogRepository;
 import cz.senslog.analyzer.storage.permanent.repository.StatisticsConfigRepository;
@@ -41,4 +42,10 @@ public class RepositoryModule {
         logger.info("Creating a new instance of {}.", CollectedStatisticsRepository.class);
         return new CollectedStatisticsRepository(connection);
     }
+
+    @Provides @Singleton
+    public TimestampRepository provideScheduledDatabaseRepository(InMemoryConnection connection) {
+        logger.info("Creating a new instance of {}.", CollectedStatisticsRepository.class);
+        return new TimestampRepository(connection);
+    }
 }

+ 16 - 0
src/main/java/cz/senslog/analyzer/storage/inmemory/AbstractInMemoryRepository.java

@@ -0,0 +1,16 @@
+package cz.senslog.analyzer.storage.inmemory;
+
+import cz.senslog.analyzer.storage.Connection;
+import org.jdbi.v3.core.Jdbi;
+
+public abstract class AbstractInMemoryRepository {
+
+    protected final Jdbi jdbi;
+
+    protected AbstractInMemoryRepository(Connection<Jdbi> connection) {
+        this.jdbi = connection.get();
+        createTable();
+    }
+
+    protected abstract void createTable();
+}

+ 36 - 0
src/main/java/cz/senslog/analyzer/storage/inmemory/TimestampStorage.java

@@ -0,0 +1,36 @@
+package cz.senslog.analyzer.storage.inmemory;
+
+import cz.senslog.analyzer.domain.Timestamp;
+import cz.senslog.analyzer.domain.TimestampType;
+import cz.senslog.analyzer.storage.inmemory.repository.TimestampRepository;
+
+import java.util.Objects;
+
+public class TimestampStorage {
+
+    private final TimestampRepository repository;
+
+    public static TimestampStorage createContext(TimestampRepository repository) {
+        return new TimestampStorage(repository);
+    }
+
+    private TimestampStorage(TimestampRepository repository) {
+        this.repository = repository;
+    }
+
+    public void update(Timestamp timestamp, TimestampType type) {
+        Objects.requireNonNull(timestamp);
+        Objects.requireNonNull(type);
+        repository.update(timestamp, type);
+    }
+
+    public Timestamp get(TimestampType type) {
+        Objects.requireNonNull(type);
+        return repository.get(type);
+    }
+
+    public Timestamp get(TimestampType type, Timestamp defaultTimestamp) {
+        Timestamp timestamp = get(type);
+        return timestamp != null ? timestamp : defaultTimestamp;
+    }
+}

+ 5 - 7
src/main/java/cz/senslog/analyzer/storage/inmemory/repository/CollectedStatisticsRepository.java

@@ -5,6 +5,7 @@ import cz.senslog.analyzer.domain.DoubleStatistics;
 import cz.senslog.analyzer.domain.Group;
 import cz.senslog.analyzer.domain.Timestamp;
 import cz.senslog.analyzer.storage.Connection;
+import cz.senslog.analyzer.storage.inmemory.AbstractInMemoryRepository;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.jdbi.v3.core.Jdbi;
@@ -14,7 +15,7 @@ import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeFormatterBuilder;
 import java.util.List;
 
-public class CollectedStatisticsRepository {
+public class CollectedStatisticsRepository extends AbstractInMemoryRepository {
 
     private static final Logger logger = LogManager.getLogger(CollectedStatisticsRepository.class);
 
@@ -24,16 +25,13 @@ public class CollectedStatisticsRepository {
             .appendOffset("+HH", "Z")
             .toFormatter();
 
-    private final Jdbi jdbi;
-
     @Inject
     public CollectedStatisticsRepository(Connection<Jdbi> connection) {
-        this.jdbi = connection.get();
-
-        create();
+        super(connection);
     }
 
-    private void create() {
+    @Override
+    protected void createTable() {
         jdbi.withHandle(h -> h.execute(
                 "CREATE TABLE IF NOT EXISTS double_statistics (" +
                         "group_id INTEGER NOT NULL , " +

+ 80 - 0
src/main/java/cz/senslog/analyzer/storage/inmemory/repository/TimestampRepository.java

@@ -0,0 +1,80 @@
+package cz.senslog.analyzer.storage.inmemory.repository;
+
+import cz.senslog.analyzer.domain.Timestamp;
+import cz.senslog.analyzer.domain.TimestampType;
+import cz.senslog.analyzer.storage.Connection;
+import cz.senslog.analyzer.storage.inmemory.AbstractInMemoryRepository;
+import org.jdbi.v3.core.Jdbi;
+import org.jdbi.v3.core.statement.PreparedBatch;
+
+import javax.inject.Inject;
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class TimestampRepository extends AbstractInMemoryRepository {
+
+    @Inject
+    public TimestampRepository(Connection<Jdbi> connection) {
+        super(connection);
+    }
+
+    @Override
+    protected void createTable() {
+        jdbi.withHandle(h -> h.execute(
+                "CREATE TABLE IF NOT EXISTS TIMESTAMP_REPOSITORY (" +
+                        "type VARCHAR(255) NOT NULL PRIMARY KEY, " +
+                        "time_stamp TIMESTAMP WITH TIME ZONE DEFAULT NULL" +
+                        ")"
+        ));
+        int count = jdbi.withHandle(h -> h.createQuery(
+                "SELECT COUNT(type) AS count FROM TIMESTAMP_REPOSITORY"
+        )
+                .map((rs, rv) -> rs.getInt("count"))
+                .findFirst().orElse(0)
+        );
+        if (count <= 0) {
+            jdbi.withHandle(h -> {
+                PreparedBatch batch = h.prepareBatch(
+                        "INSERT INTO TIMESTAMP_REPOSITORY(type) VALUES(:type)"
+                );
+                for (TimestampType type : TimestampType.values()) {
+                    batch.bind("type", type.name()).add();
+                }
+
+                return batch.execute();
+            });
+        }
+    }
+
+    public void update(Timestamp timestamp, TimestampType type) {
+        Objects.requireNonNull(timestamp);
+        jdbi.withHandle(h -> h.execute(
+                "UPDATE TIMESTAMP_REPOSITORY SET time_stamp = ? WHERE type = ?", timestamp.format(), type.name())
+        );
+    }
+
+    public Timestamp get(TimestampType type) {
+        Objects.requireNonNull(type);
+        return jdbi.withHandle(h -> h.createQuery(
+                "SELECT time_stamp FROM TIMESTAMP_REPOSITORY WHERE type = :type LIMIT 1"
+                )
+                    .bind("type", type.name())
+                .map((rs, rv) -> Timestamp.parse(rs.getString("time_stamp")))
+                .first()
+        );
+    }
+
+    public Map<TimestampType, Timestamp> getAll() {
+        return jdbi.withHandle(h -> h.createQuery(
+                "SELECT time_stamp, type FROM TIMESTAMP_REPOSITORY"
+        )
+                .map((rs, rv) -> new AbstractMap.SimpleEntry<>(
+                        TimestampType.valueOf(rs.getString("type")),
+                        Timestamp.parse(rs.getString("time_stamp"))
+                )).stream().filter(v -> v.getValue() != null)
+                        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
+        );
+    }
+}

+ 6 - 3
src/main/java/cz/senslog/analyzer/storage/permanent/repository/SensLogRepository.java

@@ -12,6 +12,7 @@ import org.jdbi.v3.core.Jdbi;
 import javax.inject.Inject;
 import java.time.OffsetDateTime;
 import java.time.format.DateTimeFormatter;
+import java.util.Collections;
 import java.util.List;
 
 public class SensLogRepository {
@@ -28,12 +29,14 @@ public class SensLogRepository {
         this.jdbi = connection.get();
     }
 
-    public List<Observation> getObservationsFromTime(OffsetDateTime timestamp) {
+    public List<Observation> getObservationsFromTime(Timestamp timestamp, boolean inclusive, int limit) {
+        if (limit <= 0) { return Collections.emptyList(); }
         return jdbi.withHandle(h -> h.createQuery(
                 "SELECT unit_id, sensor_id, observed_value, time_stamp FROM public.observations " +
-                        "WHERE time_stamp >= :timestamp ORDER BY time_stamp LIMIT 100"
+                        "WHERE time_stamp " + (inclusive ? ">=" : ">") + " :timestamp ORDER BY time_stamp LIMIT :limit"
                 )
-                    .bind("timestamp", timestamp)
+                    .bind("timestamp", timestamp.get())
+                    .bind("limit", limit)
                     .map((rs, ctx) -> new Observation(
                             new Sensor(
                                     rs.getLong("unit_id"),

+ 51 - 0
src/main/java/cz/senslog/analyzer/util/ListUtils.java

@@ -0,0 +1,51 @@
+package cz.senslog.analyzer.util;
+
+import cz.senslog.analyzer.domain.Timestamp;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+
+import static java.util.Collections.emptyList;
+
+public final class ListUtils {
+
+    public static <T> List<T> sublistExclusive(List<T> list, Timestamp start, Timestamp end, Function<T, Timestamp> attribute) {
+        return sublist(list, start, false, end, false, attribute);
+    }
+
+    public static <T> List<T> sublistInclusive(List<T> list, Timestamp start, Timestamp end, Function<T, Timestamp> attribute) {
+        return sublist(list, start, true, end, true, attribute);
+    }
+
+    public static <T> List<T> sublistToEnd(List<T> list, Timestamp end, boolean inclusive, Function<T, Timestamp> attribute) {
+        return sublist(list, Timestamp.MIN, true, end, inclusive, attribute);
+    }
+
+    public static <T> List<T> sublistFromStart(List<T> list, Timestamp start, boolean inclusive, Function<T, Timestamp> attribute) {
+        return sublist(list, start, inclusive, Timestamp.MAX, true, attribute);
+    }
+
+    public static <T> List<T> sublist(List<T> list, Timestamp start, boolean inclusiveStart, Timestamp end, boolean inclusiveEnd, Function<T, Timestamp> attribute) {
+        if (list == null || list.isEmpty()) {
+            return emptyList();
+        }
+
+        Objects.requireNonNull(start);
+        Objects.requireNonNull(end);
+        Objects.requireNonNull(attribute);
+
+        List<T> result = new ArrayList<>(list.size());
+        for (T item : list) {
+            Timestamp timestamp = attribute.apply(item);
+            if ((inclusiveStart && timestamp.isEqual(start))
+                    || (start.isBefore(timestamp) && end.isAfter(timestamp))
+                    || (inclusiveEnd && timestamp.isEqual(end))
+            ) {
+                result.add(item);
+            }
+        }
+        return result.isEmpty() ? emptyList() : result;
+    }
+}

+ 11 - 2
src/test/java/cz/senslog/analyzer/domain/TimestampTest.java

@@ -2,6 +2,8 @@ package cz.senslog.analyzer.domain;
 
 import org.junit.jupiter.api.Test;
 
+import java.time.*;
+
 import static org.junit.jupiter.api.Assertions.*;
 
 class TimestampTest {
@@ -9,8 +11,15 @@ class TimestampTest {
     @Test
     void parse() {
 
-        System.out.println(Timestamp.parse("2020-12-30"));
-        System.out.println(Timestamp.parse("2020-12-30 12:30:50+02"));
+        assertEquals(LocalDate.of(2020, 12, 30), Timestamp.parse("2020-12-30").get().toLocalDate());
+
+        assertEquals(OffsetDateTime.of(LocalDateTime
+                .of(2020, 12, 30, 12, 30, 50), ZoneOffset.ofHours(2)).toInstant(),
+                Timestamp.parse("2020-12-30 12:30:50+02").toInstant());
+
 
+        assertEquals(OffsetDateTime.of(LocalDateTime
+                .of(2019, 12, 31, 23, 0, 0, 319000), ZoneOffset.ofHours(1)).toInstant(),
+                Timestamp.parse("2019-12-31 23:00:00.000319+01").toInstant());
     }
 }

+ 145 - 0
src/test/java/cz/senslog/analyzer/provider/task/ObservationAnalyzerTaskTest.java

@@ -0,0 +1,145 @@
+package cz.senslog.analyzer.provider.task;
+
+import cz.senslog.analyzer.analysis.Analyzer;
+import cz.senslog.analyzer.domain.Observation;
+import cz.senslog.analyzer.domain.Sensor;
+import cz.senslog.analyzer.domain.Timestamp;
+import cz.senslog.analyzer.provider.AnalyzerTask;
+import cz.senslog.analyzer.storage.Connection;
+import cz.senslog.analyzer.storage.inmemory.TimestampStorage;
+import cz.senslog.analyzer.storage.inmemory.repository.TimestampRepository;
+import cz.senslog.analyzer.storage.permanent.repository.SensLogRepository;
+import org.jdbi.v3.core.Jdbi;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+
+import static cz.senslog.analyzer.domain.TimestampType.*;
+import static java.time.temporal.ChronoUnit.SECONDS;
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+class ObservationAnalyzerTaskTest {
+
+    @Test
+    void loadData_dataNotAccepted_true() {
+
+        LocalDateTime time = LocalDateTime.of(2020, 1, 1, 0, 0);
+        OffsetDateTime startTime = OffsetDateTime.of(time, ZoneOffset.UTC);
+
+        Analyzer<Observation> analyzer = mock(Analyzer.class);
+        SensLogRepository repository = mock(SensLogRepository.class);
+        when(repository.getObservationsFromTime(Timestamp.of(startTime), true, 100))
+                .then(answer -> {
+                    Timestamp timestamp = answer.getArgument(0);
+                    return Arrays.asList(
+                            new Observation(new Sensor(123, 456), 1.0, timestamp.plus(0, SECONDS)),
+                            new Observation(new Sensor(123, 456), 1.0, timestamp.plus(1, SECONDS)),
+                            new Observation(new Sensor(123, 456), 1.0, timestamp.plus(2, SECONDS)),
+                            new Observation(new Sensor(123, 456), 1.0, timestamp.plus(3, SECONDS))
+                    );
+                });
+
+        Connection<Jdbi> connection = new Connection<>(Jdbi.create("jdbc:h2:mem:loadData_dataNotAccepted_true;DB_CLOSE_DELAY=-1"));
+        TimestampStorage storage = TimestampStorage.createContext(new TimestampRepository(connection));
+
+
+        AnalyzerTask<Observation> task = new ObservationAnalyzerTask(analyzer, storage, repository, startTime);
+
+        task.run();
+        assertEquals(startTime.toInstant(), storage.get(FIRST_PROVIDED_INCLUSIVE).toInstant());
+        assertEquals(startTime.plusSeconds(2).toInstant(), storage.get(LAST_PROVIDED_EXCLUSIVE).toInstant());
+
+        task.run();
+        assertEquals(startTime.toInstant(), storage.get(FIRST_PROVIDED_INCLUSIVE).toInstant());
+        assertEquals(startTime.plusSeconds(2).toInstant(), storage.get(LAST_PROVIDED_EXCLUSIVE).toInstant());
+    }
+
+    @Test
+    void loadData_allDataAccepted_true() {
+
+        LocalDateTime time = LocalDateTime.of(2020, 1, 1, 0, 0);
+        OffsetDateTime startTime = OffsetDateTime.of(time, ZoneOffset.UTC);
+
+        Analyzer<Observation> analyzer = mock(Analyzer.class);
+        SensLogRepository repository = mock(SensLogRepository.class);
+
+        doAnswer(answer -> {
+            Timestamp timestamp = answer.getArgument(0);
+            return Arrays.asList(
+                    new Observation(new Sensor(123, 456), 1.0, timestamp.plus(0, SECONDS)),
+                    new Observation(new Sensor(123, 456), 1.0, timestamp.plus(1, SECONDS)),
+                    new Observation(new Sensor(123, 456), 1.0, timestamp.plus(2, SECONDS)),
+                    new Observation(new Sensor(123, 456), 1.0, timestamp.plus(3, SECONDS))
+            );
+        }).when(repository).getObservationsFromTime(any(Timestamp.class), true, 100);
+
+        Connection<Jdbi> connection = new Connection<>(Jdbi.create("jdbc:h2:mem:loadData_allDataAccepted_true;DB_CLOSE_DELAY=-1"));
+        TimestampStorage storage = TimestampStorage.createContext(new TimestampRepository(connection));
+        AnalyzerTask<Observation> task = new ObservationAnalyzerTask(analyzer, storage, repository, startTime);
+
+        Timestamp startTimestamp;
+
+        // 1. run
+        startTimestamp = Timestamp.of(startTime);
+        task.run();
+        assertEquals(startTime.toInstant(), storage.get(FIRST_PROVIDED_INCLUSIVE).toInstant());
+        assertEquals(startTime.plusSeconds(2).toInstant(), storage.get(LAST_PROVIDED_EXCLUSIVE).toInstant());
+
+        // simulate analyzer
+        storage.update(startTimestamp.plus(2, SECONDS), LAST_ACCEPTED_INCLUSIVE);
+        storage.update(startTimestamp.plus(2, SECONDS), LAST_COMMITTED_INCLUSIVE);
+
+        // 2. run
+        startTimestamp = startTimestamp.plus(3, SECONDS);
+        task.run();
+        assertEquals(startTimestamp.toInstant(), storage.get(FIRST_PROVIDED_INCLUSIVE).toInstant());
+        assertEquals(startTimestamp.plus(2, SECONDS).toInstant(), storage.get(LAST_PROVIDED_EXCLUSIVE).toInstant());
+    }
+
+    @Test
+    void loadData_processHalted_true() {
+
+        LocalDateTime time = LocalDateTime.of(2020, 1, 1, 0, 0);
+        OffsetDateTime startTime = OffsetDateTime.of(time, ZoneOffset.UTC);
+
+        Analyzer<Observation> analyzer = mock(Analyzer.class);
+        SensLogRepository repository = mock(SensLogRepository.class);
+
+        doAnswer(answer -> {
+            Timestamp timestamp = answer.getArgument(0);
+            return Arrays.asList(
+                    new Observation(new Sensor(123, 456), 1.0, timestamp.plus(0, SECONDS)),
+                    new Observation(new Sensor(123, 456), 1.0, timestamp.plus(1, SECONDS)),
+                    new Observation(new Sensor(123, 456), 1.0, timestamp.plus(2, SECONDS)),
+                    new Observation(new Sensor(123, 456), 1.0, timestamp.plus(3, SECONDS))
+            );
+        }).when(repository).getObservationsFromTime(any(Timestamp.class), true, 100);
+
+        Connection<Jdbi> connection = new Connection<>(Jdbi.create("jdbc:h2:mem:loadData_processHalted_true;DB_CLOSE_DELAY=-1"));
+        TimestampStorage storage = TimestampStorage.createContext(new TimestampRepository(connection));
+        AnalyzerTask<Observation> task = new ObservationAnalyzerTask(analyzer, storage, repository, startTime);
+
+        Timestamp startTimestamp;
+
+        // 1. run
+        task.run();
+        startTimestamp = Timestamp.of(startTime);
+        assertEquals(startTimestamp.toInstant(), storage.get(FIRST_PROVIDED_INCLUSIVE).toInstant());
+        assertEquals(startTimestamp.plus(2, SECONDS).toInstant(), storage.get(LAST_PROVIDED_EXCLUSIVE).toInstant());
+
+        // simulate analyzer
+        // data was accepted but not persisted to the database
+        storage.update(startTimestamp.plus(1, SECONDS), LAST_ACCEPTED_INCLUSIVE);
+
+        // 2. run
+        task.run();
+        startTimestamp = startTimestamp.plus(0, SECONDS);
+        assertEquals(startTimestamp.toInstant(), storage.get(FIRST_PROVIDED_INCLUSIVE).toInstant());
+        assertEquals(startTimestamp.plus(2, SECONDS).toInstant(), storage.get(LAST_PROVIDED_EXCLUSIVE).toInstant());
+    }
+}

+ 46 - 0
src/test/java/cz/senslog/analyzer/storage/inmemory/repository/TimestampRepositoryTest.java

@@ -0,0 +1,46 @@
+package cz.senslog.analyzer.storage.inmemory.repository;
+
+import cz.senslog.analyzer.domain.Timestamp;
+import cz.senslog.analyzer.domain.TimestampType;
+import cz.senslog.analyzer.storage.Connection;
+import org.jdbi.v3.core.Jdbi;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.Map;
+
+import static cz.senslog.analyzer.domain.TimestampType.*;
+import static org.junit.jupiter.api.Assertions.*;
+
+class TimestampRepositoryTest {
+
+    @Test
+    void updateProviderTimestamp() {
+
+        Connection<Jdbi> connection = new Connection<>(Jdbi.create("jdbc:h2:mem:timestamp_rep_test;DB_CLOSE_DELAY=-1"));
+        TimestampRepository repository = new TimestampRepository(connection);
+
+        LocalDateTime dateTime = LocalDateTime.of(2020, 12, 31, 10, 0, 0);
+
+        Timestamp timestampProvider = Timestamp.of(OffsetDateTime.of(dateTime, ZoneOffset.UTC));
+        repository.update(timestampProvider, LAST_PROVIDED_EXCLUSIVE);
+
+        Timestamp timestampAccepted = timestampProvider.plus(1, ChronoUnit.MINUTES);
+        repository.update(timestampAccepted, TimestampType.LAST_ACCEPTED_INCLUSIVE);
+
+        Timestamp timestampCommitted = timestampProvider.plus(2, ChronoUnit.MINUTES);
+        repository.update(timestampCommitted, TimestampType.LAST_COMMITTED_INCLUSIVE);
+
+        assertEquals(timestampProvider.toInstant(), repository.get(LAST_PROVIDED_EXCLUSIVE).toInstant());
+        assertEquals(timestampAccepted.toInstant(), repository.get(LAST_ACCEPTED_INCLUSIVE).toInstant());
+        assertEquals(timestampCommitted.toInstant(), repository.get(LAST_COMMITTED_INCLUSIVE).toInstant());
+
+        Map<TimestampType, Timestamp> all = repository.getAll();
+        assertEquals(timestampProvider.toInstant(), all.get(LAST_PROVIDED_EXCLUSIVE).toInstant());
+        assertEquals(timestampAccepted.toInstant(), all.get(LAST_ACCEPTED_INCLUSIVE).toInstant());
+        assertEquals(timestampCommitted.toInstant(), all.get(LAST_COMMITTED_INCLUSIVE).toInstant());
+    }
+}

+ 54 - 0
src/test/java/cz/senslog/analyzer/util/ListUtilsTest.java

@@ -0,0 +1,54 @@
+package cz.senslog.analyzer.util;
+
+import cz.senslog.analyzer.domain.Observation;
+import cz.senslog.analyzer.domain.Sensor;
+import cz.senslog.analyzer.domain.Timestamp;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class ListUtilsTest {
+
+    @Test
+    void sublistByInterval() {
+
+        LocalDateTime startTime = LocalDateTime.of(2020, 1, 1, 0, 0, 0);
+        Timestamp start = Timestamp.of(OffsetDateTime.of(startTime, ZoneOffset.UTC));
+        Timestamp end = start.plus(1, ChronoUnit.DAYS);
+
+        List<Observation> list = Arrays.asList(
+                new Observation(null, 1.0, start.minus(1, ChronoUnit.SECONDS)),
+                new Observation(null, 1.0, start.minus(1, ChronoUnit.MILLIS)),
+                new Observation(null, 1.0, start.minus(1, ChronoUnit.MICROS)),
+                new Observation(null, 1.0, start.minus(1, ChronoUnit.NANOS)),
+                new Observation(null, 1.0, start),
+                new Observation(null, 1.0, start.plus(1, ChronoUnit.NANOS)),
+                new Observation(null, 1.0, start.plus(1, ChronoUnit.MICROS)),
+                new Observation(null, 1.0, start.plus(1, ChronoUnit.MILLIS)),
+                new Observation(null, 1.0, start.plus(1, ChronoUnit.SECONDS)),
+
+                new Observation(null, 1.0, end.minus(1, ChronoUnit.SECONDS)),
+                new Observation(null, 1.0, end.minus(1, ChronoUnit.MILLIS)),
+                new Observation(null, 1.0, end.minus(1, ChronoUnit.MICROS)),
+                new Observation(null, 1.0, end.minus(1, ChronoUnit.NANOS)),
+                new Observation(null, 1.0, end),
+                new Observation(null, 1.0, end.plus(1, ChronoUnit.NANOS)),
+                new Observation(null, 1.0, end.plus(1, ChronoUnit.MICROS)),
+                new Observation(null, 1.0, end.plus(1, ChronoUnit.MILLIS)),
+                new Observation(null, 1.0, end.plus(1, ChronoUnit.SECONDS))
+        );
+
+        List<Observation> subListInclusive = ListUtils.sublistInclusive(list, start, end, Observation::getTimestamp);
+        assertEquals(10, subListInclusive.size());
+
+        List<Observation> subListExclusive = ListUtils.sublistExclusive(list, start, end, Observation::getTimestamp);
+        assertEquals(8, subListExclusive.size());
+    }
+}