Преглед на файлове

added proxy for senslog -> afc

Lukas Cerny преди 5 години
родител
ревизия
019fb6ec0e

+ 21 - 15
config/senslog1AFarCloud.yaml

@@ -1,47 +1,53 @@
-api1: &sensLogApiDomain
-    domain: "http://foodie.lesprojekt.cz:8080/MapLogOT"
-
-api2: &afcApiDomain
-    domain: "https://torcos.etsist.upm.es:9207"
 
 settings:
     - Senslog:
         name: "SensLog v1"
         provider: "cz.senslog.connector.fetch.senslog.v1.SenslogFetchProvider"
 
-        startDate: 2019-07-14T13:50:00.000
+        api: &apiDomain
+            domain: "http://foodie.lesprojekt.cz:8080/MapLogOT"
+
+        startDate: 1960-07-14T13:50:00.000
         interval: 5 # hours
 
         user: "afarcloud"
         group: "afc"
 
         sensorServiceHost:
-            <<: *sensLogApiDomain
+            <<: *apiDomain
             path: "SensorService"
 
         dataServiceHost:
-            <<: *sensLogApiDomain
+            <<: *apiDomain
             path: "DataService"
 
         allowedStation:
             # station_id: [sensor_id]
+            "10002222": [340020000, 410010000, 560030000]
             "10002376": [340020000, 410010000, 560030000]
-#            "10002222": [340020000, 410010000, 560030000]
-#            "1305167561991327": [340380097, 410150097, 360200000, 780010097]
-#            "1305167562028072": [340380097, 410150097, 360200000, 780010097]
-#            "1305167562287832": [340420000, 410180000, 360200000, 460090000, 470160000, 470180000, 480080000, 620030000]
+            "1305167562287832": [340420000, 410180000, 360200000, 460090000, 470160000, 470180000, 480080000, 620030000]
+            "1305167561991327": [340380097, 410150097, 360200000, 780010097]
+            "1305167562028072": [340380097, 410150097, 360200000, 780010097]
+            "1305167562017824": [340380097, 410150097, 360200000, 780010097]
+
+        sessionProxy:
+            timeZone: "Europe/Prague"
+            lastObservationHost:
+                domain: "http://torcos.etsist.upm.es:9219"
+                path: "getObservationsBySensor/latest"
+
 
     - AFC:
         name: "AFarCloud"
         provider: "cz.senslog.connector.push.afarcloud.AFarCloudPushProvider"
 
         telemetryHost:
-            <<: *afcApiDomain
+            domain: "https://torcos.etsist.upm.es:9207"
             path: "/telemetry"
 
 connectors:
     - SenslogToAFC:
         fetcher: "Senslog"
         pusher: "AFC"
-        period: 30
-        initDelay: 1
+        period: 600
+        initDelay: 5

+ 15 - 3
connector-fetch-senslog-v1/src/main/java/cz/senslog/connector/fetch/senslog/v1/SensLogSession.java

@@ -30,13 +30,25 @@ public class SensLogSession extends ProxySessionModel {
         this.globalTo = OffsetDateTime.MIN;
     }
 
-    public Info getLiveInfo(String id) {
+    private String createId(long unitId, long sensorId) {
+        return String.format("%s/%s", unitId, sensorId);
+    }
+
+    public Info getLiveInfo(long unitId, long sensorId) {
+        return getLiveInfo(createId(unitId, sensorId));
+    }
+
+    private Info getLiveInfo(String id) {
         return data.computeIfAbsent(id, k -> new Info());
     }
 
 
-    public void updateLastFetch(String id, OffsetDateTime dateTime) {
-        getLiveInfo(id).lastFetch = dateTime;
+    public void updateLastFetch(long unitId, long sensorId, OffsetDateTime dateTime) {
+        Info info = getLiveInfo(createId(unitId, sensorId));
+
+        if (info.lastFetch.isBefore(dateTime)) {
+            info.lastFetch = dateTime;
+        }
 
         if (dateTime.isBefore(globalFrom)) {
             this.globalFrom = dateTime;

+ 9 - 0
connector-fetch-senslog-v1/src/main/java/cz/senslog/connector/fetch/senslog/v1/SenslogConfig.java

@@ -14,6 +14,8 @@ public class SenslogConfig {
     private final String group;
     private final Integer interval;
     private final AllowedStation allowedStation;
+    private final SenslogSessionProxyConfig sessionProxy;
+
 
     SenslogConfig(DefaultConfig defaultConfig) {
         this.startDate = defaultConfig.getLocalDateTimeProperty("startDate");
@@ -23,6 +25,9 @@ public class SenslogConfig {
         this.group = defaultConfig.getStringProperty("group");
         this.interval = defaultConfig.getIntegerProperty("interval");
         this.allowedStation = new AllowedStation(defaultConfig.getPropertyConfig("allowedStation"));
+
+        this.sessionProxy = defaultConfig.containsProperty("sessionProxy") ?
+                new SenslogSessionProxyConfig(defaultConfig.getPropertyConfig("sessionProxy")) : null;
     }
 
     public LocalDateTime getStartDate() {
@@ -52,4 +57,8 @@ public class SenslogConfig {
     public AllowedStation getAllowedStation() {
         return allowedStation;
     }
+
+    public SenslogSessionProxyConfig getSessionProxy() {
+        return sessionProxy;
+    }
 }

+ 15 - 1
connector-fetch-senslog-v1/src/main/java/cz/senslog/connector/fetch/senslog/v1/SenslogFetchProvider.java

@@ -26,6 +26,20 @@ public class SenslogFetchProvider implements ConnectorFetchProvider {
         SenslogFetcher fetcher = new SenslogFetcher(config, newHttpClient());
         logger.info("Fetcher for {} was created successfully.", SenslogFetcher.class);
 
-        return ExecutableFetcher.create(fetcher);
+        logger.debug("Getting a configuration for proxy session.");
+        SenslogSessionProxyConfig proxyConfig = config.getSessionProxy();
+
+        ExecutableFetcher<SenslogV1Model> executor;
+        if (proxyConfig != null) {
+            logger.debug("Creating a new instance of {}.", SenslogProxySession.class);
+            SenslogProxySession proxySession = new SenslogProxySession(fetcher, proxyConfig, newHttpClient());
+            logger.info("Fetcher session for {} was created successfully.", SenslogProxySession.class);
+            executor = ExecutableFetcher.createWithProxySession(proxySession);
+        } else {
+            executor = ExecutableFetcher.create(fetcher);
+        }
+        logger.info("Fetcher executor for {} was created successfully.", SenslogFetcher.class);
+
+        return executor;
     }
 }

+ 5 - 3
connector-fetch-senslog-v1/src/main/java/cz/senslog/connector/fetch/senslog/v1/SenslogFetcher.java

@@ -40,6 +40,9 @@ public class SenslogFetcher implements ConnectorFetcher<SensLogSession, SenslogV
         this.units = new HashMap<>();
     }
 
+    SenslogFetcher() { this(null, null); }
+
+
     protected static class ObservationInfo { Float value; OffsetDateTime time; }
 
     @Override
@@ -121,12 +124,11 @@ public class SenslogFetcher implements ConnectorFetcher<SensLogSession, SenslogV
         for (UnitInfo unit : units.values()) {
             for (SensorInfo sensor : unit.getSensors()) {
 
-                String sessionId = String.format("%s_%s", unit.getUnitId(), sensor.getSensorId());
                 ZoneOffset offset = sensor.getFirstObservationTime().getOffset();
 
                 OffsetDateTime firstValueDate = sensor.getFirstObservationTime();
                 OffsetDateTime startDate = config.getStartDate().atOffset(offset);
-                OffsetDateTime lastFetch = session.getLiveInfo(sessionId).lastFetch;
+                OffsetDateTime lastFetch = session.getLiveInfo(unit.getUnitId(), sensor.getSensorId()).lastFetch;
 
                 long fromDateEpoch = Math.max(startDate.toEpochSecond(), Math.max(firstValueDate.toEpochSecond(), lastFetch.toEpochSecond()));
                 OffsetDateTime fromDate = OffsetDateTime.ofInstant(Instant.ofEpochSecond(fromDateEpoch), offset);
@@ -176,7 +178,7 @@ public class SenslogFetcher implements ConnectorFetcher<SensLogSession, SenslogV
                         }
                     }
 
-                    session.updateLastFetch(sessionId, lastTimeStamp);
+                    session.updateLastFetch(unit.getUnitId(), sensor.getSensorId(), lastTimeStamp);
                 }
             }
         }

+ 127 - 0
connector-fetch-senslog-v1/src/main/java/cz/senslog/connector/fetch/senslog/v1/SenslogProxySession.java

@@ -0,0 +1,127 @@
+package cz.senslog.connector.fetch.senslog.v1;
+
+import cz.senslog.common.http.HttpClient;
+import cz.senslog.common.http.HttpRequest;
+import cz.senslog.common.http.HttpResponse;
+import cz.senslog.common.http.URLBuilder;
+import cz.senslog.common.util.Tuple;
+import cz.senslog.connector.fetch.api.ConnectorFetcher;
+import cz.senslog.connector.fetch.api.FetchProxySession;
+import cz.senslog.connector.model.config.HostConfig;
+import cz.senslog.connector.model.converter.AFarCloudUnitSensorConverter;
+import cz.senslog.connector.model.v1.SenslogV1Model;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Optional;
+
+import static cz.senslog.common.json.BasicJson.jsonToObject;
+import static cz.senslog.connector.fetch.senslog.v1.SensLogSession.emptySession;
+import static java.time.OffsetDateTime.ofInstant;
+
+
+public class SenslogProxySession extends FetchProxySession<SensLogSession, SenslogV1Model> {
+
+    private static Logger logger = LogManager.getLogger(SenslogProxySession.class);
+
+    private final HttpClient httpClient;
+    private final SenslogSessionProxyConfig config;
+
+    public SenslogProxySession(
+            ConnectorFetcher<SensLogSession, SenslogV1Model> instance,
+            SenslogSessionProxyConfig config,
+            HttpClient httpClient
+    ) {
+        super(instance);
+        this.config = config;
+        this.httpClient = httpClient;
+    }
+
+    @Override
+    protected SensLogSession preProcessing(Optional<SensLogSession> previousSession) {
+
+        HostConfig host = config.getLastObservationHost();
+        logger.info("Getting last observations from {}.", host.getDomain());
+
+        HttpRequest request = HttpRequest.newBuilder().GET()
+                .url(URLBuilder.newBuilder(host.getDomain(), host.getPath()).build())
+                .build();
+        logger.info("Creating a http request to {}.", request);
+
+        HttpResponse response = httpClient.send(request);
+        logger.info("Received a response with a status: {} for the domain {}.", response.getStatus(), host.getDomain());
+
+        if (response.isError()) {
+            logger.error("Can not get data from the server {}. Error {} {}",
+                    host.getDomain(), response.getStatus(), response.getBody());
+            return emptySession();
+        }
+
+        AFCResponse afcResponse = jsonToObject(response.getBody(), AFCResponse.class);
+
+        SensLogSession session = previousSession.filter(SensLogSession::isActive)
+                .orElse(new SensLogSession(true));
+        logger.debug("Created a new session of {}.", SensLogSession.class);
+
+        if (afcResponse == null || afcResponse.results == null || afcResponse.results.resources == null) {
+            return session;
+        }
+
+
+        for (AFCResource resource : afcResponse.results.resources) {
+            String afcUnitId = resource.resource;
+            for (AFCMeasurement measurement : resource.measurements) {
+                String afcSensorId = measurement.measurement;
+
+                Tuple<Long, Long> sensLogIds = AFarCloudUnitSensorConverter.afcToSensLog(afcUnitId, afcSensorId);
+                if (sensLogIds == null) { continue; }
+
+                Long unitId = sensLogIds.getItem1();
+                Long sensorId = sensLogIds.getItem2();;
+
+                for (AFCObservation observation : measurement.observations) {
+                    Instant time = Instant.parse(observation.time);
+                    final ZoneId zoneId = config.getTimeZone().toZoneId();
+                    OffsetDateTime timestamp = ofInstant(time, zoneId);
+                    session.updateLastFetch(unitId, sensorId, timestamp);
+                }
+            }
+        }
+
+        return session;
+    }
+
+    @Override
+    protected void postProcessing(Optional<SenslogV1Model> model, Optional<SensLogSession> currentSession) {
+        if (currentSession.isPresent() && currentSession.get().isActive()) {
+            SensLogSession session = currentSession.get();
+            // TODO persistence the current session
+        }
+    }
+
+    private static class AFCResponse {
+        AFCResults results;
+    }
+
+    private static class AFCResults {
+        List<AFCResource> resources;
+    }
+
+    private static class AFCResource {
+        String resource;
+        List<AFCMeasurement> measurements;
+    }
+
+    private static class AFCMeasurement {
+        String measurement;
+        List<AFCObservation> observations;
+    }
+
+    private static class AFCObservation {
+        String time;
+    }
+}

+ 25 - 0
connector-fetch-senslog-v1/src/main/java/cz/senslog/connector/fetch/senslog/v1/SenslogSessionProxyConfig.java

@@ -0,0 +1,25 @@
+package cz.senslog.connector.fetch.senslog.v1;
+
+import cz.senslog.connector.model.config.HostConfig;
+import cz.senslog.connector.model.config.PropertyConfig;
+
+import java.util.TimeZone;
+
+public class SenslogSessionProxyConfig {
+
+    private final HostConfig lastObservationHost;
+    private final TimeZone timeZone;
+
+    public SenslogSessionProxyConfig(PropertyConfig config) {
+        this.lastObservationHost = new HostConfig(config.getPropertyConfig("lastObservationHost"));
+        this.timeZone = TimeZone.getTimeZone(config.getStringProperty("timeZone"));
+    }
+
+    public HostConfig getLastObservationHost() {
+        return lastObservationHost;
+    }
+
+    public TimeZone getTimeZone() {
+        return timeZone;
+    }
+}

+ 12 - 0
connector-model/src/main/java/cz/senslog/connector/model/converter/AFarCloudUnitSensorConverter.java

@@ -96,6 +96,18 @@ public final class AFarCloudUnitSensorConverter {
         register(1305167562287832L, "1305167562287832", sensors);
     }
 
+    static {
+        // 1305167562017824
+        Mapping<Long, String> sensors = new Mapping<>();
+
+        sensors.put(340380097L, "air_temperature");
+        sensors.put(410150097L, "air_humidity");
+        sensors.put(360200000L, "battery");
+        sensors.put(780010097L, "co2");
+
+        register(1305167562017824L, "1305167562017824", sensors);
+    }
+
     public static long afcResourceIdToSensLogUnitId(String resourceId) {
         Long unitId = UNITS.getKey(resourceId);
         return unitId != null ? unitId : -1;