|
@@ -0,0 +1,223 @@
|
|
|
|
|
+package cz.senslog.connector.fetch.soilscount;
|
|
|
|
|
+
|
|
|
|
|
+import cz.senslog.connector.fetch.api.ConnectorFetcher;
|
|
|
|
|
+import cz.senslog.connector.model.soilscount.Device;
|
|
|
|
|
+import cz.senslog.connector.model.soilscount.Measurement;
|
|
|
|
|
+import cz.senslog.connector.model.soilscount.SoilscountModel;
|
|
|
|
|
+import cz.senslog.connector.tools.http.*;
|
|
|
|
|
+import cz.senslog.connector.tools.json.BasicJson;
|
|
|
|
|
+import cz.senslog.connector.tools.jwt.JWTDecoder;
|
|
|
|
|
+import cz.senslog.connector.tools.jwt.JWToken;
|
|
|
|
|
+import org.apache.logging.log4j.LogManager;
|
|
|
|
|
+import org.apache.logging.log4j.Logger;
|
|
|
|
|
+
|
|
|
|
|
+import java.time.*;
|
|
|
|
|
+import java.time.format.DateTimeFormatter;
|
|
|
|
|
+import java.util.*;
|
|
|
|
|
+
|
|
|
|
|
+import static cz.senslog.connector.tools.http.HttpContentType.APPLICATION_JSON;
|
|
|
|
|
+import static cz.senslog.connector.tools.json.BasicJson.jsonToObject;
|
|
|
|
|
+import static cz.senslog.connector.tools.json.BasicJson.objectToJson;
|
|
|
|
|
+import static java.lang.String.format;
|
|
|
|
|
+import static java.time.Instant.ofEpochSecond;
|
|
|
|
|
+
|
|
|
|
|
+public class SoilScountFetcher implements ConnectorFetcher<SessionModel, SoilscountModel> {
|
|
|
|
|
+
|
|
|
|
|
+ private static final Logger logger = LogManager.getLogger(SoilScountFetcher.class);
|
|
|
|
|
+
|
|
|
|
|
+ private final SoilScountConfig config;
|
|
|
|
|
+ private final HttpClient httpClient;
|
|
|
|
|
+
|
|
|
|
|
+ private AuthTokens authTokens;
|
|
|
|
|
+ private List<Device> devices;
|
|
|
|
|
+ private String devicesAsParam;
|
|
|
|
|
+
|
|
|
|
|
+ private static class AuthTokens {
|
|
|
|
|
+ private final JWToken access;
|
|
|
|
|
+ private final String refresh;
|
|
|
|
|
+
|
|
|
|
|
+ public AuthTokens(JWToken access, String refresh) {
|
|
|
|
|
+ this.access = access;
|
|
|
|
|
+ this.refresh = refresh;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ public SoilScountFetcher() { this(null, null); }
|
|
|
|
|
+
|
|
|
|
|
+ public SoilScountFetcher(SoilScountConfig config, HttpClient httpClient) {
|
|
|
|
|
+ this.config = config;
|
|
|
|
|
+ this.httpClient = httpClient;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public void init() {
|
|
|
|
|
+ {
|
|
|
|
|
+ HttpRequest request = HttpRequest.newBuilder().POST()
|
|
|
|
|
+ .url(URLBuilder.newBuilder(config.getAuthUrl()).build())
|
|
|
|
|
+ .header(HttpHeader.ACCEPT, APPLICATION_JSON)
|
|
|
|
|
+ .contentType(APPLICATION_JSON)
|
|
|
|
|
+ .body(objectToJson(config.getAuthConfig()))
|
|
|
|
|
+ .build();
|
|
|
|
|
+
|
|
|
|
|
+ HttpResponse response = httpClient.send(request);
|
|
|
|
|
+
|
|
|
|
|
+ if (response.isError()) {
|
|
|
|
|
+ throw logger.throwing(new IllegalStateException(format(
|
|
|
|
|
+ "Can not login. %s", response.getBody()
|
|
|
|
|
+ )));
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ Map<String, String> tokensMap = BasicJson.<Map<String, String>>jsonToObject(response.getBody(), Map.class);
|
|
|
|
|
+ authTokens = new AuthTokens(JWTDecoder.decodeJWT(tokensMap.get("access")), tokensMap.get("refresh"));
|
|
|
|
|
+
|
|
|
|
|
+ if (authTokens.access == null) {
|
|
|
|
|
+ throw logger.throwing(new IllegalStateException("Authentication failed. No valid access token found."));
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ {
|
|
|
|
|
+ String accessToken = getAccessToken();
|
|
|
|
|
+ HttpRequest request = HttpRequest.newBuilder().GET()
|
|
|
|
|
+ .url(URLBuilder.newBuilder(config.getDevicesUrl()).build())
|
|
|
|
|
+ .header(HttpHeader.ACCEPT, APPLICATION_JSON)
|
|
|
|
|
+ .header(HttpHeader.AUTHORIZATION, String.format("Bearer %s", accessToken))
|
|
|
|
|
+ .build();
|
|
|
|
|
+
|
|
|
|
|
+ HttpResponse response = httpClient.send(request);
|
|
|
|
|
+
|
|
|
|
|
+ if (response.isError()) {
|
|
|
|
|
+ throw logger.throwing(new IllegalStateException(format(
|
|
|
|
|
+ "Can not get devices. %s", response.getBody()
|
|
|
|
|
+ )));
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ List<?> devicesMap = jsonToObject(response.getBody(), List.class);
|
|
|
|
|
+ devices = new ArrayList<>(devicesMap.size());
|
|
|
|
|
+ for (Object d : devicesMap) {
|
|
|
|
|
+ if (d instanceof Map) {
|
|
|
|
|
+ Map<?, ?> device = (Map<?, ?>) d;
|
|
|
|
|
+ int deviceId = ((Double) device.get("id")).intValue();
|
|
|
|
|
+ if (config.getAllowedDevices().contains(deviceId)) {
|
|
|
|
|
+ devices.add(new Device(deviceId,
|
|
|
|
|
+ ((Double) device.get("serial_number")).longValue(),
|
|
|
|
|
+ (String) device.get("name")
|
|
|
|
|
+ ));
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ List<String> devAsStr = new ArrayList<>(devices.size());
|
|
|
|
|
+ for (Device d : devices) {
|
|
|
|
|
+ devAsStr.add(Long.toString(d.getId()));
|
|
|
|
|
+ }
|
|
|
|
|
+ devicesAsParam = String.join(",", devAsStr);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private String getAccessToken() {
|
|
|
|
|
+
|
|
|
|
|
+ if (ofEpochSecond(authTokens.access.getPayload().getExp()).isAfter(Instant.now())) {
|
|
|
|
|
+ return authTokens.access.getRaw();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ Map<String, String> body = new HashMap<>();
|
|
|
|
|
+ body.put("refresh", authTokens.refresh);
|
|
|
|
|
+
|
|
|
|
|
+ HttpRequest request = HttpRequest.newBuilder().POST()
|
|
|
|
|
+ .url(URLBuilder.newBuilder(config.getRefreshUrl()).build())
|
|
|
|
|
+ .header(HttpHeader.ACCEPT, APPLICATION_JSON)
|
|
|
|
|
+ .contentType(APPLICATION_JSON)
|
|
|
|
|
+ .body(objectToJson(body))
|
|
|
|
|
+ .build();
|
|
|
|
|
+
|
|
|
|
|
+ HttpResponse response = httpClient.send(request);
|
|
|
|
|
+
|
|
|
|
|
+ if (response.isError()) {
|
|
|
|
|
+ logger.error("Can not refresh access token.");
|
|
|
|
|
+ return null;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ Map<String, String> tokensMap = BasicJson.<Map<String, String>>jsonToObject(response.getBody(), Map.class);
|
|
|
|
|
+ authTokens = new AuthTokens(JWTDecoder.decodeJWT(tokensMap.get("access")), tokensMap.get("refresh"));
|
|
|
|
|
+
|
|
|
|
|
+ if (authTokens.access == null) {
|
|
|
|
|
+ logger.error("Refreshing token failed. No valid access token.");
|
|
|
|
|
+ return null;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return authTokens.access.getRaw();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public SoilscountModel fetch(Optional<SessionModel> sessionOpt) {
|
|
|
|
|
+ SessionModel session = sessionOpt.orElse(new SessionModel(false));
|
|
|
|
|
+
|
|
|
|
|
+ OffsetDateTime startAt = OffsetDateTime.of(config.getStartDate(), ZoneOffset.UTC);
|
|
|
|
|
+ if (session.getStartAt() != null && startAt.isBefore(session.getStartAt())) {
|
|
|
|
|
+ startAt = session.getStartAt();
|
|
|
|
|
+ }
|
|
|
|
|
+ OffsetDateTime endAt = startAt.plusHours(config.getPeriod());
|
|
|
|
|
+
|
|
|
|
|
+ String accessToken = getAccessToken();
|
|
|
|
|
+ HttpRequest request = HttpRequest.newBuilder().GET()
|
|
|
|
|
+ .url(URLBuilder.newBuilder(config.getMeasurementsUrl())
|
|
|
|
|
+ .addParam("since", startAt.format(DateTimeFormatter.ISO_DATE_TIME))
|
|
|
|
|
+ .addParam("until", endAt.format(DateTimeFormatter.ISO_DATE_TIME))
|
|
|
|
|
+ .addParam("device", devicesAsParam)
|
|
|
|
|
+ .build())
|
|
|
|
|
+ .header(HttpHeader.ACCEPT, APPLICATION_JSON)
|
|
|
|
|
+ .header(HttpHeader.AUTHORIZATION, String.format("Bearer %s", accessToken))
|
|
|
|
|
+ .build();
|
|
|
|
|
+
|
|
|
|
|
+ HttpResponse response = httpClient.send(request);
|
|
|
|
|
+
|
|
|
|
|
+ if (response.isError()) {
|
|
|
|
|
+ throw logger.throwing(new IllegalStateException(format(
|
|
|
|
|
+ "Can not get new measurements. %s", response.getBody()
|
|
|
|
|
+ )));
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ Map<?, ?> measureMap = jsonToObject(response.getBody(), Map.class);
|
|
|
|
|
+ boolean hasNext = measureMap.get("next") != null;
|
|
|
|
|
+ List<?> results = (List<?>) measureMap.get("results");
|
|
|
|
|
+ List<Measurement> measurements = new ArrayList<>(results.size());
|
|
|
|
|
+
|
|
|
|
|
+ OffsetDateTime minTimestamp = OffsetDateTime.MAX;
|
|
|
|
|
+ OffsetDateTime maxTimestamp = OffsetDateTime.MIN;
|
|
|
|
|
+
|
|
|
|
|
+ for (Object r : results) {
|
|
|
|
|
+ if (r instanceof Map) {
|
|
|
|
|
+ Map<?, ?> m = (Map<?, ?>) r;
|
|
|
|
|
+ Measurement measurement = (new Measurement(
|
|
|
|
|
+ OffsetDateTime.parse((String) m.get("timestamp")),
|
|
|
|
|
+ ((Double) m.get("device")).longValue(),
|
|
|
|
|
+ (Double)m.get("temperature"),
|
|
|
|
|
+ (Double)m.get("moisture"),
|
|
|
|
|
+ (Double)m.get("conductivity"),
|
|
|
|
|
+ (Double)m.get("dielectricity"),
|
|
|
|
|
+ ((Double)m.get("site")).intValue(),
|
|
|
|
|
+ (Double)m.get("salinity"),
|
|
|
|
|
+ (Double)m.get("field_capacity"),
|
|
|
|
|
+ (Double)m.get("wilting_point"),
|
|
|
|
|
+ (Double)m.get("water_balance"),
|
|
|
|
|
+ (Double)m.get("oxygen")
|
|
|
|
|
+ ));
|
|
|
|
|
+ measurements.add(measurement);
|
|
|
|
|
+
|
|
|
|
|
+ if (measurement.getTimestamp().isAfter(maxTimestamp)) {
|
|
|
|
|
+ maxTimestamp = measurement.getTimestamp();
|
|
|
|
|
+ } else if (measurement.getTimestamp().isBefore(minTimestamp)) {
|
|
|
|
|
+ minTimestamp = measurement.getTimestamp();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ OffsetDateTime now = OffsetDateTime.now();
|
|
|
|
|
+ if (hasNext || endAt.isBefore(now)) {
|
|
|
|
|
+ session.setStartAt(endAt);
|
|
|
|
|
+ } else if (endAt.isAfter(now)) {
|
|
|
|
|
+ session.setStartAt(maxTimestamp);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return new SoilscountModel(devices, measurements, minTimestamp, maxTimestamp);
|
|
|
|
|
+ }
|
|
|
|
|
+}
|