|
|
@@ -1,9 +1,11 @@
|
|
|
-package cz.senslog.fetch.senslog.telemetry;
|
|
|
+package cz.senslog.connector.fetch.senslog.telemetry;
|
|
|
|
|
|
import com.google.gson.JsonArray;
|
|
|
import com.google.gson.JsonObject;
|
|
|
+import com.google.gson.reflect.TypeToken;
|
|
|
import cz.senslog.connector.fetch.api.ConnectorFetcher;
|
|
|
import cz.senslog.connector.model.api.GeoJsonModel;
|
|
|
+import cz.senslog.connector.model.api.VoidSession;
|
|
|
import cz.senslog.connector.tools.http.HttpClient;
|
|
|
import cz.senslog.connector.tools.http.HttpRequest;
|
|
|
import cz.senslog.connector.tools.http.HttpResponse;
|
|
|
@@ -12,18 +14,19 @@ import cz.senslog.connector.tools.json.BasicJson;
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
|
|
|
+import java.lang.reflect.Type;
|
|
|
import java.time.Duration;
|
|
|
import java.time.Instant;
|
|
|
import java.time.OffsetDateTime;
|
|
|
import java.time.ZoneOffset;
|
|
|
-import java.time.format.DateTimeFormatter;
|
|
|
-import java.util.Optional;
|
|
|
+import java.util.*;
|
|
|
|
|
|
import static cz.senslog.connector.tools.http.HttpContentType.APPLICATION_GEO_JSON;
|
|
|
import static cz.senslog.connector.tools.http.HttpContentType.APPLICATION_JSON;
|
|
|
import static cz.senslog.connector.tools.http.HttpHeader.*;
|
|
|
+import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME;
|
|
|
|
|
|
-public class SensLogTelemetryFetcher implements ConnectorFetcher<SensLogTelemetrySession, GeoJsonModel> {
|
|
|
+public class SensLogTelemetryFetcher implements ConnectorFetcher<VoidSession, GeoJsonModel> {
|
|
|
|
|
|
private static final Logger logger = LogManager.getLogger(SensLogTelemetryFetcher.class);
|
|
|
|
|
|
@@ -33,11 +36,16 @@ public class SensLogTelemetryFetcher implements ConnectorFetcher<SensLogTelemetr
|
|
|
OffsetDateTime fromTime, toTime;
|
|
|
}
|
|
|
|
|
|
+ private static final ZoneOffset DEFAULT_TIME_OFFSET = ZoneOffset.UTC;
|
|
|
+
|
|
|
private final SensLogTelemetryConfig config;
|
|
|
|
|
|
private final HttpClient httpClient;
|
|
|
|
|
|
- private CampaignInfo campaignInfo;
|
|
|
+ private OffsetDateTime startFrom;
|
|
|
+
|
|
|
+ private final Map<Long, SensLogTelemetrySession> multiEventSession;
|
|
|
+
|
|
|
|
|
|
public SensLogTelemetryFetcher() {
|
|
|
this(null, null);
|
|
|
@@ -46,12 +54,16 @@ public class SensLogTelemetryFetcher implements ConnectorFetcher<SensLogTelemetr
|
|
|
public SensLogTelemetryFetcher(SensLogTelemetryConfig config, HttpClient httpClient) {
|
|
|
this.config = config;
|
|
|
this.httpClient = httpClient;
|
|
|
+ this.multiEventSession = new HashMap<>();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void init() throws Exception {
|
|
|
assert config != null; assert httpClient != null;
|
|
|
|
|
|
+ startFrom = config.getStartAt().atOffset(DEFAULT_TIME_OFFSET);
|
|
|
+
|
|
|
+ /*
|
|
|
HttpRequest request = HttpRequest.newBuilder().GET()
|
|
|
.url(URLBuilder.newBuilder(config.getBaseUrl(), String.format("campaigns/%d", config.getCampaignId()))
|
|
|
.addParam("zone", "UTC")
|
|
|
@@ -73,58 +85,100 @@ public class SensLogTelemetryFetcher implements ConnectorFetcher<SensLogTelemetr
|
|
|
}
|
|
|
|
|
|
logger.info("Initialized fetching telemetries from the campaign '{}'", campaignInfo.name);
|
|
|
+ */
|
|
|
+
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public GeoJsonModel fetch(Optional<SensLogTelemetrySession> sessionOpt) {
|
|
|
-
|
|
|
- final OffsetDateTime now = OffsetDateTime.ofInstant(Instant.now(), ZoneOffset.UTC);
|
|
|
-
|
|
|
- SensLogTelemetrySession session = sessionOpt.orElse(null);
|
|
|
- assert session != null;
|
|
|
-
|
|
|
- if (session.hasNotNext()) {
|
|
|
- if (session.getFromTime() == null) {
|
|
|
- if (config.getStartAt() != null) {
|
|
|
- session.setFromTime(OffsetDateTime.of(config.getStartAt(), ZoneOffset.UTC));
|
|
|
+ public GeoJsonModel fetch(Optional<VoidSession> sessionOpt) {
|
|
|
+
|
|
|
+ final OffsetDateTime now = OffsetDateTime.ofInstant(Instant.now(), DEFAULT_TIME_OFFSET);
|
|
|
+ {
|
|
|
+ for (long unitId : config.getAllowedStations()) {
|
|
|
+ logger.info("Events for Unit {} from {}.", unitId, startFrom.format(ISO_OFFSET_DATE_TIME));
|
|
|
+ HttpRequest request = HttpRequest.newBuilder().GET()
|
|
|
+ .url(URLBuilder.newBuilder(config.getBaseUrl(), String.format("/entities/%d/units/%d/actions/%d/events",
|
|
|
+ config.getEntityId(), unitId, config.getActionId()))
|
|
|
+ .addParam("from", startFrom.format(ISO_OFFSET_DATE_TIME))
|
|
|
+ .addParam("zone", "UTC")
|
|
|
+ .addParam("navigationLinks", "false")
|
|
|
+ .build())
|
|
|
+ .header(ACCEPT, APPLICATION_JSON)
|
|
|
+ .header(AUTHORIZATION, "Bearer " + config.getBearerToken())
|
|
|
+ .build();
|
|
|
+
|
|
|
+ HttpResponse response = httpClient.send(request);
|
|
|
+
|
|
|
+ if (response.isOk()) {
|
|
|
+ Type eventInfoListType = new TypeToken<Collection<EventInfo>>(){}.getType();
|
|
|
+ List<EventInfo> eventInfoList = BasicJson.jsonToObject(response.getBody(), eventInfoListType);
|
|
|
+ for (EventInfo eventInfo : eventInfoList) {
|
|
|
+ if (multiEventSession.containsKey(eventInfo.id)) {
|
|
|
+ EventInfo sessionEvent = multiEventSession.get(eventInfo.id).getEventInfo();
|
|
|
+ sessionEvent.fromTime = eventInfo.fromTime;
|
|
|
+ sessionEvent.toTime = eventInfo.toTime;
|
|
|
+ sessionEvent.status = eventInfo.status;
|
|
|
+ } else {
|
|
|
+ logger.info("Events for Unit {} from {}.", unitId, startFrom.format(ISO_OFFSET_DATE_TIME));
|
|
|
+ HttpResponse orderStatusRes = httpClient.send(HttpRequest.newBuilder().GET()
|
|
|
+ .url(URLBuilder.newBuilder(config.getBaseUrl(), "/integration/tracking/status")
|
|
|
+ .addParam("unit_id", unitId)
|
|
|
+ .build())
|
|
|
+ .header(ACCEPT, APPLICATION_JSON)
|
|
|
+ .header(AUTHORIZATION, "Bearer " + config.getBearerToken())
|
|
|
+ .build());
|
|
|
+
|
|
|
+ if (orderStatusRes.isOk()) {
|
|
|
+ Map<?, ?> orderStatusMap = BasicJson.jsonToObject(orderStatusRes.getBody(), Map.class);
|
|
|
+ if (orderStatusMap.containsKey("orderId")) {
|
|
|
+ eventInfo.orderId = ((Double) orderStatusMap.get("orderId")).longValue();
|
|
|
+ multiEventSession.put(eventInfo.id, new SensLogTelemetrySession(eventInfo, startFrom, config.getInterval()));
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ logger.error(orderStatusRes.getBody());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
} else {
|
|
|
- session.setFromTime(campaignInfo.fromTime);
|
|
|
+ logger.error(response.getBody());
|
|
|
}
|
|
|
- session.setToTime(session.getFromTime().plusHours(config.getInterval()));
|
|
|
}
|
|
|
+ }
|
|
|
+ startFrom = now;
|
|
|
|
|
|
- if (session.getToTime().isAfter(now)) {
|
|
|
- logger.info("To early for the new data, the next interval is in {} minutes.", Duration.between(now, session.getToTime()).toMinutes());
|
|
|
- return GeoJsonModel.empty();
|
|
|
- }
|
|
|
+ if (multiEventSession.isEmpty()) {
|
|
|
+ return GeoJsonModel.empty();
|
|
|
}
|
|
|
|
|
|
- HttpRequest request;
|
|
|
- if (config.getAllowedStations().isEmpty()) {
|
|
|
+ JsonArray featureCollections = new JsonArray();
|
|
|
+ OffsetDateTime globalTimestampFirst = OffsetDateTime.MAX;
|
|
|
+ OffsetDateTime globalTimestampLast = OffsetDateTime.MIN;
|
|
|
|
|
|
- request = HttpRequest.newBuilder().GET()
|
|
|
- .url(URLBuilder.newBuilder(config.getBaseUrl(), String.format("campaigns/%d/units/observations", config.getCampaignId()))
|
|
|
- .addParam("limit", config.getLimit())
|
|
|
- .addParam("format", "geojson")
|
|
|
- .addParam("from", session.getFromTime().format(DateTimeFormatter.ISO_OFFSET_DATE_TIME))
|
|
|
- .addParam("to", session.getToTime().format(DateTimeFormatter.ISO_OFFSET_DATE_TIME))
|
|
|
- .addParam("zone", "UTC")
|
|
|
- .addParam("offset", session.getOffset())
|
|
|
- .addParam("navigationLinks", "false")
|
|
|
- .build())
|
|
|
- .header(ACCEPT, APPLICATION_GEO_JSON)
|
|
|
- .header(AUTHORIZATION, "Bearer " + config.getBearerToken())
|
|
|
- .build();
|
|
|
- } else {
|
|
|
+ for (SensLogTelemetrySession session : multiEventSession.values()) {
|
|
|
+ EventInfo event = session.getEventInfo();
|
|
|
+
|
|
|
+ if (session.hasNotNext()) {
|
|
|
+ if (session.getEventInfo().toTime != null) {
|
|
|
+ session.setToTime(session.getEventInfo().toTime);
|
|
|
+ } else if (session.getToTime().isAfter(now)) {
|
|
|
+ session.setToTime(now);
|
|
|
+ }
|
|
|
|
|
|
- long unitId = config.getAllowedStations().iterator().next();
|
|
|
+// if (session.getToTime().isAfter(now)) {
|
|
|
+// logger.info("To early for the new data, the next interval is in {} minutes.", Duration.between(now, session.getToTime()).toMinutes());
|
|
|
+// return GeoJsonModel.empty();
|
|
|
+// }
|
|
|
+ }
|
|
|
|
|
|
- request = HttpRequest.newBuilder().GET()
|
|
|
- .url(URLBuilder.newBuilder(config.getBaseUrl(), String.format("campaigns/%d/units/%d/observations", config.getCampaignId(), unitId))
|
|
|
+ String fromParam = session.getFromTime().format(ISO_OFFSET_DATE_TIME);
|
|
|
+ String toParam = session.getToTime().format(ISO_OFFSET_DATE_TIME);
|
|
|
+ logger.info("Getting observations within: {} - {}.", fromParam, toParam);
|
|
|
+ HttpRequest request = HttpRequest.newBuilder().GET()
|
|
|
+ .url(URLBuilder.newBuilder(config.getBaseUrl(), String.format("/events/%d/observations", event.id))
|
|
|
.addParam("limit", config.getLimit())
|
|
|
.addParam("format", "geojson")
|
|
|
- .addParam("from", session.getFromTime().format(DateTimeFormatter.ISO_OFFSET_DATE_TIME))
|
|
|
- .addParam("to", session.getToTime().format(DateTimeFormatter.ISO_OFFSET_DATE_TIME))
|
|
|
+ .addParam("from", fromParam)
|
|
|
+ .addParam("to", toParam)
|
|
|
.addParam("zone", "UTC")
|
|
|
.addParam("offset", session.getOffset())
|
|
|
.addParam("navigationLinks", "false")
|
|
|
@@ -132,42 +186,64 @@ public class SensLogTelemetryFetcher implements ConnectorFetcher<SensLogTelemetr
|
|
|
.header(ACCEPT, APPLICATION_GEO_JSON)
|
|
|
.header(AUTHORIZATION, "Bearer " + config.getBearerToken())
|
|
|
.build();
|
|
|
- }
|
|
|
|
|
|
- HttpResponse response = httpClient.send(request);
|
|
|
+ HttpResponse response = httpClient.send(request);
|
|
|
|
|
|
- if (response.isError()) {
|
|
|
- logger.error(response.getBody());
|
|
|
- return GeoJsonModel.empty();
|
|
|
- }
|
|
|
+ if (response.isError()) {
|
|
|
+ logger.error(response.getBody());
|
|
|
+ continue;
|
|
|
+ }
|
|
|
|
|
|
- JsonObject geoJson = BasicJson.jsonToObject(response.getBody(), JsonObject.class);
|
|
|
+ JsonObject geoJson = BasicJson.jsonToObject(response.getBody(), JsonObject.class);
|
|
|
|
|
|
- JsonObject metadata = geoJson.remove("metadata").getAsJsonObject();
|
|
|
- int geoSize = metadata.get("size").getAsInt();
|
|
|
- session.setOffset(geoSize);
|
|
|
- session.hasNext(metadata.get("hasNext").getAsBoolean());
|
|
|
+ JsonObject metadata = geoJson.getAsJsonObject("metadata");
|
|
|
+ int geoSize = metadata.get("size").getAsInt();
|
|
|
+ session.setOffset(geoSize);
|
|
|
+ session.hasNext(metadata.get("hasNext").getAsBoolean());
|
|
|
|
|
|
- if (session.hasNotNext()) {
|
|
|
- session.setFromTime(session.getToTime());
|
|
|
- session.setToTime(session.getToTime().plusHours(config.getInterval()));
|
|
|
- session.setOffset(0);
|
|
|
- }
|
|
|
+ if (session.hasNotNext()) {
|
|
|
+ session.setFromTime(session.getToTime());
|
|
|
+ session.setToTime(session.getToTime().plusHours(config.getInterval()));
|
|
|
+ session.setOffset(0);
|
|
|
+ }
|
|
|
|
|
|
- JsonArray features = geoJson.getAsJsonArray("features");
|
|
|
- if (features.size() != geoSize) {
|
|
|
- logger.warn("Retrieved metadata size of features differs to " + geoSize + " from " + features.size());
|
|
|
- geoSize = features.size();
|
|
|
- }
|
|
|
+ JsonArray features = geoJson.getAsJsonArray("features");
|
|
|
+ if (features.size() != geoSize) {
|
|
|
+ logger.warn("Retrieved metadata size of features differs to " + geoSize + " from " + features.size());
|
|
|
+ geoSize = features.size();
|
|
|
+ }
|
|
|
|
|
|
- if (geoSize <= 0) {
|
|
|
- logger.warn("Retrieved zero data within the interval of " + session.getFromTime() + " to " + session.getToTime());
|
|
|
- return GeoJsonModel.empty();
|
|
|
+ if (geoSize <= 0) {
|
|
|
+ logger.warn("Retrieved zero data within the interval of " + session.getFromTime() + " to " + session.getToTime());
|
|
|
+ return GeoJsonModel.empty();
|
|
|
+ }
|
|
|
+
|
|
|
+ OffsetDateTime locTmFirst = OffsetDateTime.parse(features.get(0).getAsJsonObject().getAsJsonObject("properties").get("timestamp").getAsString());
|
|
|
+ OffsetDateTime locTmLast = OffsetDateTime.parse(features.get(geoSize - 1).getAsJsonObject().getAsJsonObject("properties").get("timestamp").getAsString());
|
|
|
+
|
|
|
+ features.asList().forEach(feature -> feature.getAsJsonObject().get("properties").getAsJsonObject()
|
|
|
+ .addProperty("orderId", Long.toString(event.orderId))
|
|
|
+ );
|
|
|
+
|
|
|
+ featureCollections.addAll(features);
|
|
|
+ if (locTmFirst.isBefore(globalTimestampFirst)) {
|
|
|
+ globalTimestampFirst = locTmFirst;
|
|
|
+ }
|
|
|
+ if (locTmLast.isAfter(globalTimestampLast)) {
|
|
|
+ globalTimestampLast = locTmLast;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (event.status.equals(EventInfo.EventStatus.FINISHED) && session.hasNotNext()) {
|
|
|
+ multiEventSession.remove(event.id);
|
|
|
+ } else {
|
|
|
+ event.status = EventInfo.EventStatus.FINISHED;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- OffsetDateTime fTimestampFirst = OffsetDateTime.parse(features.get(0).getAsJsonObject().getAsJsonObject("properties").get("timestamp").getAsString());
|
|
|
- OffsetDateTime fTimestampLast = OffsetDateTime.parse(features.get(geoSize-1).getAsJsonObject().getAsJsonObject("properties").get("timestamp").getAsString());
|
|
|
+ JsonObject geoJson = new JsonObject();
|
|
|
+ geoJson.addProperty("type", "FeatureCollection");
|
|
|
+ geoJson.add("features", featureCollections);
|
|
|
|
|
|
- return new GeoJsonModel(geoJson, fTimestampFirst, fTimestampLast);
|
|
|
+ return new GeoJsonModel(geoJson, globalTimestampFirst, globalTimestampLast);
|
|
|
}
|
|
|
}
|