|
@@ -0,0 +1,186 @@
|
|
|
|
|
+package cz.senslog.connector.fetch.senslog.v1;
|
|
|
|
|
+
|
|
|
|
|
+import com.google.gson.reflect.TypeToken;
|
|
|
|
|
+import cz.senslog.common.http.HttpClient;
|
|
|
|
|
+import cz.senslog.common.http.HttpRequest;
|
|
|
|
|
+import cz.senslog.common.http.HttpResponse;
|
|
|
|
|
+import cz.senslog.common.http.URLBuilder;
|
|
|
|
|
+import cz.senslog.common.util.Tuple;
|
|
|
|
|
+import cz.senslog.connector.fetch.api.ConnectorFetcher;
|
|
|
|
|
+import cz.senslog.connector.model.api.ProxySessionModel;
|
|
|
|
|
+import cz.senslog.connector.model.config.HostConfig;
|
|
|
|
|
+import cz.senslog.connector.model.v1.*;
|
|
|
|
|
+import org.apache.logging.log4j.LogManager;
|
|
|
|
|
+import org.apache.logging.log4j.Logger;
|
|
|
|
|
+
|
|
|
|
|
+import java.lang.reflect.Type;
|
|
|
|
|
+import java.time.*;
|
|
|
|
|
+import java.time.format.DateTimeFormatter;
|
|
|
|
|
+import java.util.*;
|
|
|
|
|
+
|
|
|
|
|
+import static cz.senslog.common.json.BasicJson.jsonToObject;
|
|
|
|
|
+import static java.time.OffsetDateTime.now;
|
|
|
|
|
+import static java.time.OffsetDateTime.parse;
|
|
|
|
|
+import static java.time.format.DateTimeFormatter.ofPattern;
|
|
|
|
|
+
|
|
|
|
|
+public class SenslogFetcher implements ConnectorFetcher<SensLogSession, SenslogV1Model> {
|
|
|
|
|
+
|
|
|
|
|
+ private static Logger logger = LogManager.getLogger(SenslogFetcher.class);
|
|
|
|
|
+
|
|
|
|
|
+ private final SenslogConfig config;
|
|
|
|
|
+ private final HttpClient httpClient;
|
|
|
|
|
+
|
|
|
|
|
+ private final SensLogSession localSession;
|
|
|
|
|
+ private final Map<Long, UnitInfo> units;
|
|
|
|
|
+
|
|
|
|
|
+ SenslogFetcher(SenslogConfig config, HttpClient httpClient) {
|
|
|
|
|
+ this.config = config;
|
|
|
|
|
+ this.httpClient = httpClient;
|
|
|
|
|
+ this.localSession = SensLogSession.emptySession();
|
|
|
|
|
+ this.units = new HashMap<>();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ protected static class ObservationInfo { Float value; OffsetDateTime time; }
|
|
|
|
|
+
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public void init() {
|
|
|
|
|
+
|
|
|
|
|
+ HostConfig dataServiceHost = config.getDataServiceHost();
|
|
|
|
|
+ logger.info("Getting last observations from {}.", dataServiceHost.getDomain());
|
|
|
|
|
+
|
|
|
|
|
+ HttpRequest unitsRequest = HttpRequest.newBuilder().GET()
|
|
|
|
|
+ .url(URLBuilder.newBuilder(dataServiceHost.getDomain(), dataServiceHost.getPath())
|
|
|
|
|
+ .addParam("Operation", "GetUnitsList")
|
|
|
|
|
+ .addParam("user", config.getUser())
|
|
|
|
|
+ .build())
|
|
|
|
|
+ .build();
|
|
|
|
|
+ logger.info("Creating a http request to {}.", unitsRequest);
|
|
|
|
|
+
|
|
|
|
|
+ HttpResponse unitsResponse = httpClient.send(unitsRequest);
|
|
|
|
|
+ logger.info("Received a response with a status: {} for the domain {}.", unitsResponse.getStatus(), dataServiceHost.getDomain());
|
|
|
|
|
+
|
|
|
|
|
+ if (unitsResponse.isOk()) {
|
|
|
|
|
+ logger.debug("Parsing body of the response to the list of class {}.", UnitInfo.class);
|
|
|
|
|
+ Type unitInfoType = new TypeToken<Collection<UnitInfo>>() {}.getType();
|
|
|
|
|
+ List<UnitInfo> unitInfos = jsonToObject(unitsResponse.getBody(), unitInfoType);
|
|
|
|
|
+
|
|
|
|
|
+ for (UnitInfo unit : unitInfos) {
|
|
|
|
|
+ if (!config.getAllowedStation().isAllowed(unit.getUnitId())) {
|
|
|
|
|
+ logger.info("Unit {} is not allowd in configuration.", unit.getUnitId());continue;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ units.put(unit.getUnitId(), unit);
|
|
|
|
|
+
|
|
|
|
|
+ HostConfig sensorServiceHost = config.getSensorServiceHost();
|
|
|
|
|
+ logger.info("Getting last observations from {}.", sensorServiceHost.getDomain());
|
|
|
|
|
+
|
|
|
|
|
+ HttpRequest sensorRequest = HttpRequest.newBuilder().GET()
|
|
|
|
|
+ .url(URLBuilder.newBuilder(sensorServiceHost.getDomain(), sensorServiceHost.getPath())
|
|
|
|
|
+ .addParam("Operation", "GetSensors")
|
|
|
|
|
+ .addParam("user", config.getUser())
|
|
|
|
|
+ .addParam("unit_id", unit.getUnitId())
|
|
|
|
|
+ .build())
|
|
|
|
|
+ .build();
|
|
|
|
|
+ logger.info("Creating a http request to {}.", sensorRequest);
|
|
|
|
|
+
|
|
|
|
|
+ HttpResponse sensorResponse = httpClient.send(sensorRequest);
|
|
|
|
|
+ logger.info("Received a response with a status: {} for the domain {}.", sensorResponse.getStatus(), sensorServiceHost.getDomain());
|
|
|
|
|
+
|
|
|
|
|
+ if (sensorResponse.isOk()) {
|
|
|
|
|
+ logger.debug("Parsing body of the response to the list of class {}.", SensorInfo.class);
|
|
|
|
|
+ Type sensorInfoType = new TypeToken<Collection<SensorInfo>>() {}.getType();
|
|
|
|
|
+ List<SensorInfo> sensors = jsonToObject(sensorResponse.getBody(), sensorInfoType,
|
|
|
|
|
+ Tuple.of(OffsetDateTime.class, el -> parse(el + "00", ofPattern("yyyy-MM-dd HH:mm:ssZ"))));
|
|
|
|
|
+
|
|
|
|
|
+ logger.info("For the unit {} was added {} sensors.", unit.getUnitId(), sensors.size());
|
|
|
|
|
+ List<SensorInfo> allowedSensors = new ArrayList<>();
|
|
|
|
|
+ for (SensorInfo sensor : sensors) {
|
|
|
|
|
+ if (config.getAllowedStation().isAllowed(unit.getUnitId(), sensor.getSensorId())) {
|
|
|
|
|
+ allowedSensors.add(sensor);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ unit.setSensors(allowedSensors);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ logger.error("Can not get data from the server {}. Error {} {}",
|
|
|
|
|
+ sensorServiceHost.getDomain(), sensorResponse.getStatus(), sensorResponse.getBody());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ } else {
|
|
|
|
|
+ logger.error("Can not get data from the server {}. Error {} {}",
|
|
|
|
|
+ dataServiceHost.getDomain(), unitsResponse.getStatus(), unitsResponse.getBody());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public SenslogV1Model fetch(Optional<SensLogSession> persistenceSession) {
|
|
|
|
|
+
|
|
|
|
|
+ SensLogSession session = persistenceSession.filter(ProxySessionModel::isActive).orElse(localSession);
|
|
|
|
|
+
|
|
|
|
|
+ List<Record> records = new ArrayList<>();
|
|
|
|
|
+
|
|
|
|
|
+ for (UnitInfo unit : units.values()) {
|
|
|
|
|
+ for (SensorInfo sensor : unit.getSensors()) {
|
|
|
|
|
+
|
|
|
|
|
+ String sessionId = String.format("%s_%s", unit.getUnitId(), sensor.getSensorId());
|
|
|
|
|
+ ZoneOffset offset = sensor.getFirstObservationTime().getOffset();
|
|
|
|
|
+
|
|
|
|
|
+ OffsetDateTime firstValueDate = sensor.getFirstObservationTime();
|
|
|
|
|
+ OffsetDateTime startDate = config.getStartDate().atOffset(offset);
|
|
|
|
|
+ OffsetDateTime lastFetch = session.getLiveInfo(sessionId).lastFetch;
|
|
|
|
|
+
|
|
|
|
|
+ long fromDateEpoch = Math.max(startDate.toEpochSecond(), Math.max(firstValueDate.toEpochSecond(), lastFetch.toEpochSecond()));
|
|
|
|
|
+ OffsetDateTime fromDate = OffsetDateTime.ofInstant(Instant.ofEpochSecond(fromDateEpoch), offset);
|
|
|
|
|
+ OffsetDateTime toDate = fromDate.plusHours(config.getInterval());
|
|
|
|
|
+
|
|
|
|
|
+ if (toDate.isAfter(now())) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ logger.info("Getting new observations from {} to {}.", fromDate, toDate);
|
|
|
|
|
+
|
|
|
|
|
+ final HostConfig sensorServiceHost = config.getSensorServiceHost();
|
|
|
|
|
+ final DateTimeFormatter pattern = ofPattern("yyyy-MM-dd HH:mm:ssZ");
|
|
|
|
|
+ HttpRequest observationRequest = HttpRequest.newBuilder().GET()
|
|
|
|
|
+ .url(URLBuilder.newBuilder(sensorServiceHost.getDomain(), sensorServiceHost.getPath())
|
|
|
|
|
+ .addParam("Operation", "GetObservations")
|
|
|
|
|
+ .addParam("user", config.getUser())
|
|
|
|
|
+ .addParam("unit_id", unit.getUnitId())
|
|
|
|
|
+ .addParam("sensor_id", sensor.getSensorId())
|
|
|
|
|
+ .addParam("from", fromDate.format(pattern))
|
|
|
|
|
+ .addParam("to", toDate.format(pattern))
|
|
|
|
|
+ .build())
|
|
|
|
|
+ .build();
|
|
|
|
|
+ logger.info("Creating a http request to {}.", observationRequest);
|
|
|
|
|
+
|
|
|
|
|
+ HttpResponse observationResponse = httpClient.send(observationRequest);
|
|
|
|
|
+ logger.info("Received a response with a status: {} for the domain {}.", observationResponse.getStatus(), sensorServiceHost.getDomain());
|
|
|
|
|
+
|
|
|
|
|
+ if (observationResponse.isOk()) {
|
|
|
|
|
+ Type observationType = new TypeToken<Collection<ObservationInfo>>() {}.getType();
|
|
|
|
|
+ List<ObservationInfo> observations = jsonToObject(observationResponse.getBody(), observationType,
|
|
|
|
|
+ Tuple.of(OffsetDateTime.class, el -> parse(el + "00", ofPattern("yyyy-MM-dd HH:mm:ssZ")))
|
|
|
|
|
+ );
|
|
|
|
|
+
|
|
|
|
|
+ OffsetDateTime lastTimeStamp = toDate;
|
|
|
|
|
+ for (ObservationInfo observation : observations) {
|
|
|
|
|
+ Observation obs = new Observation();
|
|
|
|
|
+ obs.setSensorId(sensor.getSensorId());
|
|
|
|
|
+ obs.setUnitId(unit.getUnitId());
|
|
|
|
|
+ obs.setValue(observation.value);
|
|
|
|
|
+ obs.setTime(observation.time.toZonedDateTime().toOffsetDateTime());
|
|
|
|
|
+
|
|
|
|
|
+ records.add(obs);
|
|
|
|
|
+
|
|
|
|
|
+ if (observation.time.isAfter(lastTimeStamp)) {
|
|
|
|
|
+ lastTimeStamp = observation.time;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ session.updateLastFetch(sessionId, lastTimeStamp);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return new SenslogV1Model(units, records, session.getGlobalFrom(), session.getGlobalTo());
|
|
|
|
|
+ }
|
|
|
|
|
+}
|