|
@@ -4,15 +4,11 @@
|
|
|
package cz.senslog.connector.fetch.senslog.v1;
|
|
package cz.senslog.connector.fetch.senslog.v1;
|
|
|
|
|
|
|
|
import com.google.gson.reflect.TypeToken;
|
|
import com.google.gson.reflect.TypeToken;
|
|
|
|
|
+import cz.senslog.connector.fetch.api.ConnectorFetcher;
|
|
|
import cz.senslog.connector.model.v1.Record;
|
|
import cz.senslog.connector.model.v1.Record;
|
|
|
-import cz.senslog.connector.tools.http.HttpClient;
|
|
|
|
|
-import cz.senslog.connector.tools.http.HttpRequest;
|
|
|
|
|
-import cz.senslog.connector.tools.http.HttpResponse;
|
|
|
|
|
-import cz.senslog.connector.tools.http.URLBuilder;
|
|
|
|
|
|
|
+import cz.senslog.connector.tools.http.*;
|
|
|
import cz.senslog.connector.tools.util.Tuple;
|
|
import cz.senslog.connector.tools.util.Tuple;
|
|
|
-import cz.senslog.connector.fetch.api.ConnectorFetcher;
|
|
|
|
|
import cz.senslog.connector.model.api.ProxySessionModel;
|
|
import cz.senslog.connector.model.api.ProxySessionModel;
|
|
|
-import cz.senslog.connector.model.config.HostConfig;
|
|
|
|
|
import cz.senslog.connector.model.v1.*;
|
|
import cz.senslog.connector.model.v1.*;
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
import org.apache.logging.log4j.Logger;
|
|
import org.apache.logging.log4j.Logger;
|
|
@@ -21,9 +17,10 @@ import java.lang.reflect.Type;
|
|
|
import java.time.*;
|
|
import java.time.*;
|
|
|
import java.time.format.DateTimeFormatter;
|
|
import java.time.format.DateTimeFormatter;
|
|
|
import java.util.*;
|
|
import java.util.*;
|
|
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
+import java.util.stream.Stream;
|
|
|
|
|
|
|
|
import static cz.senslog.connector.tools.json.BasicJson.jsonToObject;
|
|
import static cz.senslog.connector.tools.json.BasicJson.jsonToObject;
|
|
|
-import static java.time.OffsetDateTime.now;
|
|
|
|
|
import static java.time.OffsetDateTime.parse;
|
|
import static java.time.OffsetDateTime.parse;
|
|
|
import static java.time.format.DateTimeFormatter.ofPattern;
|
|
import static java.time.format.DateTimeFormatter.ofPattern;
|
|
|
|
|
|
|
@@ -31,17 +28,24 @@ public class SenslogFetcher implements ConnectorFetcher<SensLogSession, SenslogV
|
|
|
|
|
|
|
|
private static final Logger logger = LogManager.getLogger(SenslogFetcher.class);
|
|
private static final Logger logger = LogManager.getLogger(SenslogFetcher.class);
|
|
|
|
|
|
|
|
|
|
+ private static final int MAX_AUTH_ERRORS = 3;
|
|
|
|
|
+
|
|
|
private final SenslogConfig config;
|
|
private final SenslogConfig config;
|
|
|
private final HttpClient httpClient;
|
|
private final HttpClient httpClient;
|
|
|
|
|
|
|
|
private final SensLogSession localSession;
|
|
private final SensLogSession localSession;
|
|
|
private final Map<Long, UnitInfo> units;
|
|
private final Map<Long, UnitInfo> units;
|
|
|
|
|
+ private Tuple<Boolean, HttpCookie> authCookie;
|
|
|
|
|
+ private final AtomicInteger authErrors;
|
|
|
|
|
+
|
|
|
|
|
|
|
|
SenslogFetcher(SenslogConfig config, HttpClient httpClient) {
|
|
SenslogFetcher(SenslogConfig config, HttpClient httpClient) {
|
|
|
this.config = config;
|
|
this.config = config;
|
|
|
this.httpClient = httpClient;
|
|
this.httpClient = httpClient;
|
|
|
this.localSession = SensLogSession.emptySession();
|
|
this.localSession = SensLogSession.emptySession();
|
|
|
this.units = new HashMap<>();
|
|
this.units = new HashMap<>();
|
|
|
|
|
+ this.authCookie = Tuple.of(false, null);
|
|
|
|
|
+ this.authErrors = new AtomicInteger(0);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
SenslogFetcher() { this(null, null); }
|
|
SenslogFetcher() { this(null, null); }
|
|
@@ -49,54 +53,117 @@ public class SenslogFetcher implements ConnectorFetcher<SensLogSession, SenslogV
|
|
|
|
|
|
|
|
protected static class ObservationInfo { Float value; OffsetDateTime time; }
|
|
protected static class ObservationInfo { Float value; OffsetDateTime time; }
|
|
|
|
|
|
|
|
- @Override
|
|
|
|
|
- public void init() {
|
|
|
|
|
|
|
+ private synchronized HttpCookie getAuthCookie() {
|
|
|
|
|
+ /*
|
|
|
|
|
+ if (authCookie.getItem1()) {
|
|
|
|
|
+ return authCookie.getItem2();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if (config.getAuth() == null) {
|
|
|
|
|
+ authCookie = Tuple.of(true, null);
|
|
|
|
|
+ return authCookie.getItem2();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ */
|
|
|
|
|
+
|
|
|
|
|
+ HttpRequest request = HttpRequest.newBuilder().GET()
|
|
|
|
|
+ .url(URLBuilder.newBuilder(config.getBaseUrl(), "/ControllerServlet")
|
|
|
|
|
+ .addParam("username", config.getAuth().getUsername())
|
|
|
|
|
+ .addParam("password", config.getAuth().getPassword())
|
|
|
|
|
+ .build()
|
|
|
|
|
+ ).build();
|
|
|
|
|
+ logger.info("Getting new auth cookie from the server: {}.", config.getBaseUrl());
|
|
|
|
|
+
|
|
|
|
|
+ HttpResponse response = httpClient.send(request);
|
|
|
|
|
+ logger.info("Received new auth token with the status '{}' from the server {}.", response.getStatus(), config.getBaseUrl());
|
|
|
|
|
+
|
|
|
|
|
+ if (response.isError()) {
|
|
|
|
|
+ logger.warn("Authorization failed. Error code {} with the reason '{}'.", response.getStatus(), response.getBody());
|
|
|
|
|
+ HttpCookie cookie = HttpCookie.empty();
|
|
|
|
|
+ authCookie = Tuple.of(false, cookie);
|
|
|
|
|
+ return cookie;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ final Type lastObsType = new TypeToken<Map<String, Object>>() {}.getType();
|
|
|
|
|
+ Map<String, Object> jsonResponse = jsonToObject(response.getBody(), lastObsType);
|
|
|
|
|
+
|
|
|
|
|
+ if (!jsonResponse.containsKey("sessionid")) {
|
|
|
|
|
+ logger.error("Authorization failed. JSON does not contain session id. {}", response.getBody());
|
|
|
|
|
+ HttpCookie cookie = HttpCookie.empty();
|
|
|
|
|
+ authCookie = Tuple.of(false, cookie);
|
|
|
|
|
+ return cookie;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ String sessionId = (String) jsonResponse.get("sessionid");
|
|
|
|
|
+ String domain = config.getBaseURI().getHost();
|
|
|
|
|
+ String path = config.getBaseURI().getPath();
|
|
|
|
|
+ HttpCookie cookie = new HttpCookie("JSESSIONID", sessionId, domain, path);
|
|
|
|
|
+ authCookie = Tuple.of(true, cookie);
|
|
|
|
|
+ return cookie;
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- HostConfig dataServiceHost = config.getDataServiceHost();
|
|
|
|
|
- logger.info("Getting last observations from {}.", dataServiceHost.getDomain());
|
|
|
|
|
|
|
+ private Optional<String> callRequest(HttpRequest.Builder reqBuilder) {
|
|
|
|
|
+ HttpCookie authCookie = getAuthCookie();
|
|
|
|
|
+ if (!authCookie.isSecure()) {
|
|
|
|
|
+ logger.warn("Auth cookie is not valid to be used.");
|
|
|
|
|
+ return Optional.empty();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ HttpRequest request = reqBuilder.addCookie(authCookie).build();
|
|
|
|
|
+ logger.info("Getting new data from the server: {}.", request.getUrl());
|
|
|
|
|
+
|
|
|
|
|
+ HttpResponse response = httpClient.send(request);
|
|
|
|
|
+ logger.info("Received new data with the status '{}' from the server {}.", response.getStatus(), request.getUrl());
|
|
|
|
|
+
|
|
|
|
|
+ if (response.getStatus() == HttpCode.UNAUTHORIZED && authErrors.get() <= MAX_AUTH_ERRORS) {
|
|
|
|
|
+ this.authCookie = Tuple.of(false, null);
|
|
|
|
|
+ authErrors.incrementAndGet();
|
|
|
|
|
+ return callRequest(reqBuilder);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if (response.isError()) {
|
|
|
|
|
+ logger.error("code: {}, message: {}", response.getStatus(), response.getBody());
|
|
|
|
|
+ throw new IllegalStateException(response.getBody());
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ authErrors.set(0);
|
|
|
|
|
+ return Optional.of(response.getBody());
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- HttpRequest unitsRequest = HttpRequest.newBuilder().GET()
|
|
|
|
|
- .url(URLBuilder.newBuilder(dataServiceHost.getDomain(), dataServiceHost.getPath())
|
|
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public void init() throws Exception {
|
|
|
|
|
+
|
|
|
|
|
+ HttpRequest.Builder reqUnitsBuilder = HttpRequest.newBuilder().GET()
|
|
|
|
|
+ .url(URLBuilder.newBuilder(config.getBaseUrl(), "/DataService")
|
|
|
.addParam("Operation", "GetUnitsList")
|
|
.addParam("Operation", "GetUnitsList")
|
|
|
.addParam("user", config.getUser())
|
|
.addParam("user", config.getUser())
|
|
|
- .build())
|
|
|
|
|
- .build();
|
|
|
|
|
- logger.info("Creating a http request to {}.", unitsRequest);
|
|
|
|
|
|
|
+ .build());
|
|
|
|
|
|
|
|
- HttpResponse unitsResponse = httpClient.send(unitsRequest);
|
|
|
|
|
- logger.info("Received a response with a status: {} for the domain {}.", unitsResponse.getStatus(), dataServiceHost.getDomain());
|
|
|
|
|
|
|
+ Optional<String> resUnitsBody = callRequest(reqUnitsBuilder);
|
|
|
|
|
|
|
|
- if (unitsResponse.isOk()) {
|
|
|
|
|
- logger.debug("Parsing body of the response to the list of class {}.", UnitInfo.class);
|
|
|
|
|
|
|
+ if (resUnitsBody.isPresent()) {
|
|
|
Type unitInfoType = new TypeToken<Collection<UnitInfo>>() {}.getType();
|
|
Type unitInfoType = new TypeToken<Collection<UnitInfo>>() {}.getType();
|
|
|
- List<UnitInfo> unitInfos = jsonToObject(unitsResponse.getBody(), unitInfoType);
|
|
|
|
|
|
|
+ List<UnitInfo> unitInfos = jsonToObject(resUnitsBody.get(), unitInfoType);
|
|
|
|
|
|
|
|
for (UnitInfo unit : unitInfos) {
|
|
for (UnitInfo unit : unitInfos) {
|
|
|
if (!config.getAllowedStation().isAllowed(unit.getUnitId())) {
|
|
if (!config.getAllowedStation().isAllowed(unit.getUnitId())) {
|
|
|
- logger.info("Unit {} is not allowd in configuration.", unit.getUnitId());continue;
|
|
|
|
|
|
|
+ logger.info("Unit {} is not allowed in configuration.", unit.getUnitId());continue;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
units.put(unit.getUnitId(), unit);
|
|
units.put(unit.getUnitId(), unit);
|
|
|
|
|
|
|
|
- HostConfig sensorServiceHost = config.getSensorServiceHost();
|
|
|
|
|
- logger.info("Getting last observations from {}.", sensorServiceHost.getDomain());
|
|
|
|
|
-
|
|
|
|
|
- HttpRequest sensorRequest = HttpRequest.newBuilder().GET()
|
|
|
|
|
- .url(URLBuilder.newBuilder(sensorServiceHost.getDomain(), sensorServiceHost.getPath())
|
|
|
|
|
|
|
+ HttpRequest.Builder reqSensorsBuilder = HttpRequest.newBuilder().GET()
|
|
|
|
|
+ .url(URLBuilder.newBuilder(config.getBaseUrl(), "/SensorService")
|
|
|
.addParam("Operation", "GetSensors")
|
|
.addParam("Operation", "GetSensors")
|
|
|
.addParam("user", config.getUser())
|
|
.addParam("user", config.getUser())
|
|
|
.addParam("unit_id", unit.getUnitId())
|
|
.addParam("unit_id", unit.getUnitId())
|
|
|
- .build())
|
|
|
|
|
- .build();
|
|
|
|
|
- logger.info("Creating a http request to {}.", sensorRequest);
|
|
|
|
|
|
|
+ .build());
|
|
|
|
|
|
|
|
- HttpResponse sensorResponse = httpClient.send(sensorRequest);
|
|
|
|
|
- logger.info("Received a response with a status: {} for the domain {}.", sensorResponse.getStatus(), sensorServiceHost.getDomain());
|
|
|
|
|
|
|
+ Optional<String> resSensorsBody = callRequest(reqSensorsBuilder);
|
|
|
|
|
|
|
|
- if (sensorResponse.isOk()) {
|
|
|
|
|
- logger.debug("Parsing body of the response to the list of class {}.", SensorInfo.class);
|
|
|
|
|
|
|
+ if (resSensorsBody.isPresent()) {
|
|
|
Type sensorInfoType = new TypeToken<Collection<SensorInfo>>() {}.getType();
|
|
Type sensorInfoType = new TypeToken<Collection<SensorInfo>>() {}.getType();
|
|
|
- List<SensorInfo> sensors = jsonToObject(sensorResponse.getBody(), sensorInfoType,
|
|
|
|
|
|
|
+ List<SensorInfo> sensors = jsonToObject(resSensorsBody.get(), sensorInfoType,
|
|
|
Tuple.of(OffsetDateTime.class, el -> parse(el + "00", ofPattern("yyyy-MM-dd HH:mm:ssZ"))));
|
|
Tuple.of(OffsetDateTime.class, el -> parse(el + "00", ofPattern("yyyy-MM-dd HH:mm:ssZ"))));
|
|
|
|
|
|
|
|
logger.info("For the unit {} was added {} sensors.", unit.getUnitId(), sensors.size());
|
|
logger.info("For the unit {} was added {} sensors.", unit.getUnitId(), sensors.size());
|
|
@@ -107,19 +174,17 @@ public class SenslogFetcher implements ConnectorFetcher<SensLogSession, SenslogV
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
unit.setSensors(allowedSensors);
|
|
unit.setSensors(allowedSensors);
|
|
|
- } else {
|
|
|
|
|
- logger.error("Can not get data from the server {}. Error {} {}",
|
|
|
|
|
- sensorServiceHost.getDomain(), sensorResponse.getStatus(), sensorResponse.getBody());
|
|
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- } else {
|
|
|
|
|
- logger.error("Can not get data from the server {}. Error {} {}",
|
|
|
|
|
- dataServiceHost.getDomain(), unitsResponse.getStatus(), unitsResponse.getBody());
|
|
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
|
public SenslogV1Model fetch(Optional<SensLogSession> persistenceSession) {
|
|
public SenslogV1Model fetch(Optional<SensLogSession> persistenceSession) {
|
|
|
|
|
+ HttpCookie authCookie = getAuthCookie();
|
|
|
|
|
+
|
|
|
|
|
+ OffsetDateTime now = ZonedDateTime.ofInstant(Instant.now(), config.getTimeZone().toZoneId()).toOffsetDateTime();
|
|
|
|
|
+ OffsetDateTime startDate = ZonedDateTime.of(config.getStartDate(), config.getTimeZone().toZoneId()).toOffsetDateTime();
|
|
|
|
|
|
|
|
SensLogSession session = persistenceSession.filter(ProxySessionModel::isActive).orElse(localSession);
|
|
SensLogSession session = persistenceSession.filter(ProxySessionModel::isActive).orElse(localSession);
|
|
|
|
|
|
|
@@ -128,26 +193,22 @@ public class SenslogFetcher implements ConnectorFetcher<SensLogSession, SenslogV
|
|
|
for (UnitInfo unit : units.values()) {
|
|
for (UnitInfo unit : units.values()) {
|
|
|
for (SensorInfo sensor : unit.getSensors()) {
|
|
for (SensorInfo sensor : unit.getSensors()) {
|
|
|
|
|
|
|
|
- ZoneOffset offset = sensor.getFirstObservationTime().getOffset();
|
|
|
|
|
-
|
|
|
|
|
OffsetDateTime firstValueDate = sensor.getFirstObservationTime();
|
|
OffsetDateTime firstValueDate = sensor.getFirstObservationTime();
|
|
|
- OffsetDateTime startDate = config.getStartDate().atOffset(offset);
|
|
|
|
|
OffsetDateTime lastFetch = session.getLiveInfo(unit.getUnitId(), sensor.getSensorId()).lastFetch;
|
|
OffsetDateTime lastFetch = session.getLiveInfo(unit.getUnitId(), sensor.getSensorId()).lastFetch;
|
|
|
|
|
|
|
|
- long fromDateEpoch = Math.max(startDate.toEpochSecond(), Math.max(firstValueDate.toEpochSecond(), lastFetch.toEpochSecond()));
|
|
|
|
|
- OffsetDateTime fromDate = OffsetDateTime.ofInstant(Instant.ofEpochSecond(fromDateEpoch), offset);
|
|
|
|
|
- OffsetDateTime toDate = fromDate.plusHours(config.getInterval());
|
|
|
|
|
|
|
+ OffsetDateTime fromDate = Stream.of(startDate, firstValueDate, lastFetch).max(Comparator.comparing(OffsetDateTime::toEpochSecond)).get();
|
|
|
|
|
+ OffsetDateTime toDate = fromDate.plusMinutes(config.getInterval());
|
|
|
|
|
|
|
|
- if (toDate.isAfter(now())) {
|
|
|
|
|
- continue;
|
|
|
|
|
|
|
+ if (fromDate.isBefore(now) && toDate.isAfter(now)) {
|
|
|
|
|
+ toDate = now;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
logger.info("Getting new observations for the unit {} and the sensor {} from {} to {}.", unit.getUnitId(), sensor.getSensorId(), fromDate, toDate);
|
|
logger.info("Getting new observations for the unit {} and the sensor {} from {} to {}.", unit.getUnitId(), sensor.getSensorId(), fromDate, toDate);
|
|
|
|
|
|
|
|
- final HostConfig sensorServiceHost = config.getSensorServiceHost();
|
|
|
|
|
final DateTimeFormatter pattern = ofPattern("yyyy-MM-dd HH:mm:ssZ");
|
|
final DateTimeFormatter pattern = ofPattern("yyyy-MM-dd HH:mm:ssZ");
|
|
|
HttpRequest observationRequest = HttpRequest.newBuilder().GET()
|
|
HttpRequest observationRequest = HttpRequest.newBuilder().GET()
|
|
|
- .url(URLBuilder.newBuilder(sensorServiceHost.getDomain(), sensorServiceHost.getPath())
|
|
|
|
|
|
|
+ .addCookie(authCookie)
|
|
|
|
|
+ .url(URLBuilder.newBuilder(config.getBaseUrl(), "/SensorService")
|
|
|
.addParam("Operation", "GetObservations")
|
|
.addParam("Operation", "GetObservations")
|
|
|
.addParam("user", config.getUser())
|
|
.addParam("user", config.getUser())
|
|
|
.addParam("unit_id", unit.getUnitId())
|
|
.addParam("unit_id", unit.getUnitId())
|
|
@@ -156,10 +217,8 @@ public class SenslogFetcher implements ConnectorFetcher<SensLogSession, SenslogV
|
|
|
.addParam("to", toDate.format(pattern))
|
|
.addParam("to", toDate.format(pattern))
|
|
|
.build())
|
|
.build())
|
|
|
.build();
|
|
.build();
|
|
|
- logger.info("Creating a http request to {}.", observationRequest);
|
|
|
|
|
|
|
|
|
|
HttpResponse observationResponse = httpClient.send(observationRequest);
|
|
HttpResponse observationResponse = httpClient.send(observationRequest);
|
|
|
- logger.info("Received a response with a status: {} for the domain {}.", observationResponse.getStatus(), sensorServiceHost.getDomain());
|
|
|
|
|
|
|
|
|
|
if (observationResponse.isOk()) {
|
|
if (observationResponse.isOk()) {
|
|
|
Type observationType = new TypeToken<Collection<ObservationInfo>>() {}.getType();
|
|
Type observationType = new TypeToken<Collection<ObservationInfo>>() {}.getType();
|
|
@@ -183,6 +242,8 @@ public class SenslogFetcher implements ConnectorFetcher<SensLogSession, SenslogV
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
session.updateLastFetch(unit.getUnitId(), sensor.getSensorId(), lastTimeStamp);
|
|
session.updateLastFetch(unit.getUnitId(), sensor.getSensorId(), lastTimeStamp);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ logger.error("Request railed: {}", observationResponse.getBody());
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|