|
@@ -7,6 +7,7 @@ import cz.senslog.analyzer.domain.Timestamp;
|
|
|
import cz.senslog.analyzer.provider.AnalyzerTask;
|
|
import cz.senslog.analyzer.provider.AnalyzerTask;
|
|
|
import cz.senslog.analyzer.storage.inmemory.TimestampStorage;
|
|
import cz.senslog.analyzer.storage.inmemory.TimestampStorage;
|
|
|
import cz.senslog.analyzer.storage.permanent.repository.SensLogRepository;
|
|
import cz.senslog.analyzer.storage.permanent.repository.SensLogRepository;
|
|
|
|
|
+import org.apache.logging.log4j.Level;
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
import org.apache.logging.log4j.Logger;
|
|
import org.apache.logging.log4j.Logger;
|
|
|
|
|
|
|
@@ -21,7 +22,7 @@ public class ObservationAnalyzerTask extends AnalyzerTask<Observation> {
|
|
|
|
|
|
|
|
private static final Logger logger = LogManager.getLogger(ObservationAnalyzerTask.class);
|
|
private static final Logger logger = LogManager.getLogger(ObservationAnalyzerTask.class);
|
|
|
|
|
|
|
|
- private static final int MAX_OBSERVATIONS = 100;
|
|
|
|
|
|
|
+ private static final int MAX_OBSERVATIONS = 200;
|
|
|
|
|
|
|
|
private final TimestampStorage storage;
|
|
private final TimestampStorage storage;
|
|
|
private final SensLogRepository repository;
|
|
private final SensLogRepository repository;
|
|
@@ -42,20 +43,22 @@ public class ObservationAnalyzerTask extends AnalyzerTask<Observation> {
|
|
|
@Override
|
|
@Override
|
|
|
protected List<Observation> loadData() {
|
|
protected List<Observation> loadData() {
|
|
|
|
|
|
|
|
- Timestamp firstProvided = storage.get(FIRST_PROVIDED_INCLUSIVE, startDateTime);
|
|
|
|
|
- Timestamp lastProvidedExclusive = storage.get(LAST_PROVIDED_EXCLUSIVE, Timestamp.MIN);
|
|
|
|
|
- Timestamp lastProvidedInclusive = storage.get(LAST_PROVIDED_INCLUSIVE, Timestamp.MIN);
|
|
|
|
|
- Timestamp committedInclusive = storage.get(LAST_COMMITTED_INCLUSIVE, Timestamp.MIN);
|
|
|
|
|
|
|
+ Timestamp firstProvidedIn = storage.get(FIRST_PROVIDED_INCLUSIVE, startDateTime);
|
|
|
|
|
+ Timestamp lastProvidedEx = storage.get(LAST_PROVIDED_EXCLUSIVE, Timestamp.MIN);
|
|
|
|
|
+ Timestamp lastProvidedIn = storage.get(LAST_PROVIDED_INCLUSIVE, Timestamp.MIN);
|
|
|
|
|
+ Timestamp committedIn = storage.get(LAST_COMMITTED_INCLUSIVE, Timestamp.MIN);
|
|
|
|
|
+ logger.info("firstProvidedIn: {}, lastProvidedEx: {}, lastProvidedIn: {}, committedIn: {}.",
|
|
|
|
|
+ firstProvidedIn, lastProvidedEx, lastProvidedIn, committedIn);
|
|
|
|
|
|
|
|
- boolean previousIterationWasFinished = firstProvided.isBefore(committedInclusive) || firstProvided.isEqual(committedInclusive);
|
|
|
|
|
- boolean previousIterationWasOk = previousIterationWasFinished && committedInclusive.isEqual(lastProvidedInclusive);
|
|
|
|
|
|
|
+ boolean previousItrWasFinished = firstProvidedIn.isBefore(committedIn) || firstProvidedIn.isEqual(committedIn);
|
|
|
|
|
+ Level logLevel = previousItrWasFinished ? Level.INFO : Level.WARN;
|
|
|
|
|
+ logger.log(logLevel, "Previous iteration finished: {}.", previousItrWasFinished);
|
|
|
|
|
|
|
|
List<Observation> newObservations;
|
|
List<Observation> newObservations;
|
|
|
-
|
|
|
|
|
- if (previousIterationWasOk) {
|
|
|
|
|
- newObservations = repository.getObservationsFromTime(lastProvidedExclusive, true, MAX_OBSERVATIONS);
|
|
|
|
|
- } else {
|
|
|
|
|
- newObservations = repository.getObservationsFromTime(startDateTime, true, MAX_OBSERVATIONS);
|
|
|
|
|
|
|
+ if (previousItrWasFinished) {
|
|
|
|
|
+ newObservations = repository.getObservationsFromTime(lastProvidedIn, false, MAX_OBSERVATIONS);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ newObservations = repository.getObservationsFromTime(firstProvidedIn, true, MAX_OBSERVATIONS);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if (newObservations.isEmpty()) {
|
|
if (newObservations.isEmpty()) {
|
|
@@ -69,6 +72,7 @@ public class ObservationAnalyzerTask extends AnalyzerTask<Observation> {
|
|
|
|
|
|
|
|
if (observations.isEmpty()) {
|
|
if (observations.isEmpty()) {
|
|
|
if (newObservations.size() < MAX_OBSERVATIONS) {
|
|
if (newObservations.size() < MAX_OBSERVATIONS) {
|
|
|
|
|
+ logger.info("No observations loaded.");
|
|
|
return Collections.emptyList();
|
|
return Collections.emptyList();
|
|
|
} else {
|
|
} else {
|
|
|
// TODO find a better solution -> a problem of more than MAX_OBSERVATIONS observations at the same timestamp
|
|
// TODO find a better solution -> a problem of more than MAX_OBSERVATIONS observations at the same timestamp
|
|
@@ -79,8 +83,8 @@ public class ObservationAnalyzerTask extends AnalyzerTask<Observation> {
|
|
|
observations.sort(Comparator.comparing(Data::getTimestamp));
|
|
observations.sort(Comparator.comparing(Data::getTimestamp));
|
|
|
Timestamp lastInclusive = observations.get(observations.size() - 1).getTimestamp();
|
|
Timestamp lastInclusive = observations.get(observations.size() - 1).getTimestamp();
|
|
|
|
|
|
|
|
|
|
+ logger.info("Loaded {} observations from {} to {}.", observations.size(), start, lastInclusive);
|
|
|
storage.update(start, FIRST_PROVIDED_INCLUSIVE);
|
|
storage.update(start, FIRST_PROVIDED_INCLUSIVE);
|
|
|
-
|
|
|
|
|
storage.update(end, LAST_PROVIDED_EXCLUSIVE);
|
|
storage.update(end, LAST_PROVIDED_EXCLUSIVE);
|
|
|
storage.update(lastInclusive, LAST_PROVIDED_INCLUSIVE);
|
|
storage.update(lastInclusive, LAST_PROVIDED_INCLUSIVE);
|
|
|
|
|
|