Ver Fonte

added filtering for OGC SensorThings to AFC

Lukas Cerny há 5 anos atrás
pai
commit
4e058c507d

+ 2 - 1
connector-core/src/main/java/io/connector/core/http/RequestUriParser.java

@@ -7,7 +7,8 @@ import static io.connector.core.AddressPath.Creator.create;
 public final class RequestUriParser {
 
     public static RequestUriComponent parse(HttpServerRequest req, String moduleId, String gatewayId) {
-        String domain = req.absoluteURI().replace(req.path(), "");
+        String params = req.uri().substring(req.uri().indexOf("?")+1);
+        String domain = req.absoluteURI().replace(req.path(), "").replaceAll("\\?"+params, "");
         String prefix = req.path().substring(1, req.path().indexOf(moduleId)-1);
         String address = req.path().replace(create(prefix, moduleId, gatewayId), "").substring(1);
         return new RequestUriComponent(domain, prefix, moduleId, gatewayId, address);

+ 90 - 0
connector-model/src/main/java/io/connector/model/sensorthings/Filter.java

@@ -0,0 +1,90 @@
+package io.connector.model.sensorthings;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class Filter {
+
+    private final boolean exist;
+    private final String original;
+
+    public static class Expression {
+        private final String attribute;
+        private final String operator;
+        private final String value;
+
+        public Expression(String attribute, String operator, String value) {
+            this.attribute = attribute;
+            this.operator = operator;
+            this.value = value;
+        }
+
+        public String getAttribute() {
+            return attribute;
+        }
+
+        public String getOperator() {
+            return operator;
+        }
+
+        public String getValue() {
+            return value;
+        }
+    }
+
+    public static Filter parse(String filterString) {
+        if (filterString == null || filterString.isEmpty()) {
+            return emptyFilter();
+        }
+
+        String [] cmp = filterString.split("\\s(and)\\s");
+        if (cmp.length != 2) { // supported only 'and'
+            return emptyFilter(); // TODO exception
+        }
+
+        final Pattern pattern = Pattern.compile("^(?<attribute>[a-zA-Z]+)\\s*(?<operator>(gt|lt))\\s*(?<date>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}Z)");
+
+        Filter filter = new Filter(true, filterString);
+        filter.expressionsAdd = new ArrayList<>(cmp.length);
+
+        for (String word : cmp) {
+            Matcher matcher = pattern.matcher(word);
+            if (matcher.find()) {
+                filter.expressionsAdd.add(new Expression(
+                        matcher.group("attribute"), matcher.group("operator"), matcher.group("date"))
+                );
+            } else {
+                // TODO exception
+                return emptyFilter();
+            }
+        }
+
+        return filter;
+    }
+
+    private List<Expression> expressionsAdd;
+
+    private static Filter emptyFilter() {
+        return new Filter(false, "");
+    }
+
+    private Filter(boolean exist, String original) {
+        this.exist = exist;
+        this.original = original;
+    }
+
+    public boolean isExists() {
+        return exist;
+    }
+
+    public List<Expression> getAddExpressions() {
+        return expressionsAdd;
+    }
+
+    @Override
+    public String toString() {
+        return original;
+    }
+}

+ 4 - 1
connector-module-afarcloud/src/main/java/io/connector/module/afarcloud/AFCClient.java

@@ -49,7 +49,7 @@ public class AFCClient {
     }
 
     public List<ResourceMeasurement> getObservationsBySensor(Filter filter) {
-        HostConfig host = new HostConfig(config.getRetrievalDomain(), "getObservationsBySensor/latest");
+        HostConfig host = new HostConfig(config.getRetrievalDomain(), "getObservationsBySensor/historic");
 
         URLBuilder urlBuilder = URLBuilder.newBuilder(host.getDomain(), host.getPath());
 
@@ -65,6 +65,9 @@ public class AFCClient {
         if (filter.measurements != null) {
             urlBuilder.addParam("measurements", filter.measurements());
         }
+        if (filter.order != null) {
+            urlBuilder.addParam("order", filter.order());
+        }
 
         HttpRequest request = HttpRequest.newBuilder()
                 .contentType(TEXT_PLAIN).url(urlBuilder.build()).GET().build();

+ 24 - 0
connector-module-afarcloud/src/main/java/io/connector/module/afarcloud/Filter.java

@@ -1,9 +1,19 @@
 package io.connector.module.afarcloud;
 
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
 import java.util.Arrays;
 import java.util.List;
 
+import static java.time.format.DateTimeFormatter.ofPattern;
+
 public class Filter {
+
+    private static final DateTimeFormatter formatter = new DateTimeFormatterBuilder().append(ofPattern("yyyy-MM-dd HH:mm"))
+            .toFormatter().withZone(ZoneId.of("UTC"));
+
     protected int limit = 1;
     protected List<String> entityNames;
     protected List<String> devices;
@@ -80,11 +90,20 @@ public class Filter {
         return this;
     }
 
+    public String order() {
+        return order;
+    }
+
     public Filter startTime(String startTime) {
         this.startTime = startTime;
         return this;
     }
 
+    public Filter startTime(Instant startTime) {
+        this.startTime = formatter.format(startTime);
+        return this;
+    }
+
     public String startTime() {
         return this.startTime;
     }
@@ -94,6 +113,11 @@ public class Filter {
         return this;
     }
 
+    public Filter endTime(Instant endTime) {
+        this.endTime = formatter.format(endTime);
+        return this;
+    }
+
     public String endTime() {
         return this.endTime;
     }

+ 114 - 53
connector-module-afarcloud/src/main/java/io/connector/module/afarcloud/gateway/OGCSensorThingsGateway.java

@@ -8,11 +8,9 @@ import io.connector.model.afarcloud.SensorTelemetry;
 import io.connector.model.sensorthings.*;
 import io.connector.module.afarcloud.AFCClient;
 import io.connector.module.afarcloud.Filter;
+import io.vertx.core.json.JsonObject;
 
 import java.time.Instant;
-import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
-import java.time.format.DateTimeFormatterBuilder;
 import java.util.*;
 import java.util.function.Supplier;
 
@@ -23,7 +21,6 @@ import static io.vertx.core.json.Json.encode;
 import static java.lang.Math.sqrt;
 import static java.lang.StrictMath.pow;
 import static java.lang.String.format;
-import static java.time.format.DateTimeFormatter.ofPattern;
 import static java.util.Arrays.asList;
 import static java.util.Optional.ofNullable;
 
@@ -171,15 +168,20 @@ public class OGCSensorThingsGateway extends AbstractGateway {
         });
 
         router().get(create("Observations(:id)")).handler(ctx -> {
+            ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON);
             RequestUriComponent uriComponent = parseUriToComponents(ctx.request());
             String id = ctx.pathParam("id"); // resource + measurement + time
             String[] idCmp = Converter.disassemblyId(id);
-            String resource = idCmp[0], measurement = idCmp[1];
+            String resource = idCmp[0], measurement = idCmp[1], time = idCmp[2];
+
+            Instant timestamp = Instant.parse(time);
 
-            List<ResourceMeasurement> measurements = client.getLatestObservationsBySensor(new Filter()
-                    .limit(1).entityNames(resource).measurements(measurement)
+            List<ResourceMeasurement> measurements = client.getObservationsBySensor(new Filter()
+                    .limit(1).order("ASC").entityNames(resource).measurements(measurement)
+                    .startTime(timestamp).endTime(timestamp.plusSeconds(60))
             );
-            final Supplier<IllegalArgumentException> exception = () -> new IllegalArgumentException("Can not find Datastream with @iot.id \"" + id + "\".");
+            final Supplier<IllegalArgumentException> exception = () -> new IllegalArgumentException(format(
+                    "Can not find Observation with @iot.id '%s'.", id));
 
             Optional<ResourceMeasurement> measurementOpt = ofNullable(measurements.size() == 1 ? measurements.get(0) : null);
             ResourceMeasurement afcMeasurement = measurementOpt.orElseThrow(exception);
@@ -187,26 +189,42 @@ public class OGCSensorThingsGateway extends AbstractGateway {
             Optional<SensorTelemetry> telemetryOpt = ofNullable(afcMeasurement.getMeasurements().size() == 1 ? afcMeasurement.getMeasurements().get(0) : null);
             SensorTelemetry afcTelemetry = telemetryOpt.orElseThrow(exception);
 
-            Optional<io.connector.model.afarcloud.Observation> observationOpt = ofNullable(afcTelemetry.getObservations().size() == 1 ? afcTelemetry.getObservations().get(0) : null);
+            Optional<io.connector.model.afarcloud.Observation> observationOpt = ofNullable(afcTelemetry.getObservations().size() > 0 ? afcTelemetry.getObservations().get(0) : null);
             io.connector.model.afarcloud.Observation afcObservation = observationOpt.orElseThrow(exception);
 
             Observation ogcObservation = Converter.convertToObservation(afcMeasurement, afcTelemetry, afcObservation, uriComponent);
-            ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON).end(ogcObservation.encode());
+            ctx.response().end(ogcObservation.encode());
         });
 
-        // without filter
         router().get(create("Datastreams(:id)/Observations")).handler(ctx -> {
+            ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON);
             RequestUriComponent uriComponent = parseUriToComponents(ctx.request());
             String id = ctx.pathParam("id"); // resourceUrn + observedProperty
-            String filter = ctx.request().getParam("filter"); // TODO filter
+            String filterParam = ctx.request().getParam("filter");
+            io.connector.model.sensorthings.Filter filter = io.connector.model.sensorthings.Filter.parse(filterParam);
 
             String [] idCmp = Converter.disassemblyId(id);
             String resourceUrn = idCmp[0], observedProperty = idCmp[1];
 
             MultiSensor afcMultiSensor = client.getSensorByResourceUrn(resourceUrn);
-            List<ResourceMeasurement> measurements = client.getLatestObservationsBySensor(new Filter()
-                    .limit(1).entityNames(afcMultiSensor.getResourceId()).measurements(observedProperty)
-            );
+
+            List<ResourceMeasurement> measurements;
+            if (filter.isExists()) {
+                Filter afcFilter;
+                try {
+                    afcFilter = Converter.convertFilter(filter);
+                } catch (RuntimeException e) {
+                    ctx.response().setStatusCode(501).end(new JsonObject()
+                            .put("message", e.getMessage()).encode()); return;
+                }
+                afcFilter.entityNames(afcMultiSensor.getResourceId()).measurements(observedProperty);
+                measurements = client.getObservationsBySensor(afcFilter);
+            } else {
+                measurements = client.getLatestObservationsBySensor(new Filter()
+                        .limit(1).entityNames(afcMultiSensor.getResourceId()).measurements(observedProperty)
+                );
+            }
+
             final Supplier<IllegalArgumentException> exception = () -> new IllegalArgumentException("Can not find Datastream with @iot.id \"" + id + "\".");
 
             Optional<ResourceMeasurement> measurementOpt = ofNullable(measurements.size() == 1 ? measurements.get(0) : null);
@@ -215,12 +233,8 @@ public class OGCSensorThingsGateway extends AbstractGateway {
             Optional<SensorTelemetry> telemetryOpt = ofNullable(afcMeasurement.getMeasurements().size() == 1 ? afcMeasurement.getMeasurements().get(0) : null);
             SensorTelemetry afcTelemetry = telemetryOpt.orElseThrow(exception);
 
-            Optional<io.connector.model.afarcloud.Observation> observationOpt = ofNullable(afcTelemetry.getObservations().size() == 1 ? afcTelemetry.getObservations().get(0) : null);
-            io.connector.model.afarcloud.Observation afcObservation = observationOpt.orElseThrow(exception);
-
-            Observation ogcObservation = Converter.convertToObservation(afcMeasurement, afcTelemetry, afcObservation, uriComponent);
-            List<Observation> ogcObservations = Collections.singletonList(ogcObservation);
-            ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON).end(encode(ogcObservations));
+            List<Observation> ogcObservations = Converter.convertToObservations(afcMeasurement, afcTelemetry, uriComponent);
+            ctx.response().end(encode(ogcObservations));
         });
 
         router().get(create("FeaturesOfInterest(:id)")).handler(ctx -> {
@@ -238,15 +252,13 @@ public class OGCSensorThingsGateway extends AbstractGateway {
             String id = ctx.pathParam("id"); // resourceUrn + observedProperty + time
             String [] idCmp = Converter.disassemblyId(id);
             String resourceUrn = idCmp[0], observedProperty = idCmp[1], time = idCmp[2];
-            final DateTimeFormatter formatter = new DateTimeFormatterBuilder().append(ofPattern("yyyy-MM-dd HH:mm"))
-                    .toFormatter().withZone(ZoneId.of("UTC"));
 
             Instant startTime = Instant.parse(time);
             Instant endTime = startTime.plusSeconds(60);
 
             MultiSensor afcMultiSensor = client.getSensorByResourceUrn(resourceUrn);
             List<ResourceMeasurement> afcMeasurements = client.getObservationsBySensor(new Filter()
-                    .startTime(formatter.format(startTime)).endTime(formatter.format(endTime))
+                    .startTime(startTime).endTime(endTime)
                     .entityNames(afcMultiSensor.getResourceId()).measurements(observedProperty)
             );
             final Supplier<IllegalArgumentException> exception = () -> new IllegalArgumentException("Can not find Location with @iot.id \"" + resourceUrn + "\".");
@@ -296,15 +308,13 @@ public class OGCSensorThingsGateway extends AbstractGateway {
 
             String [] idCmp = Converter.disassemblyId(id);
             String resourceUrn = idCmp[0], observedProperty = idCmp[1], time = idCmp[2];
-            final DateTimeFormatter formatter = new DateTimeFormatterBuilder().append(ofPattern("yyyy-MM-dd HH:mm"))
-                    .toFormatter().withZone(ZoneId.of("UTC"));
 
             Instant startTime = Instant.parse(time);
             Instant endTime = startTime.plusSeconds(60);
 
             MultiSensor afcMultiSensor = client.getSensorByResourceUrn(resourceUrn);
             List<ResourceMeasurement> afcMeasurements = client.getObservationsBySensor(new Filter()
-                    .startTime(formatter.format(startTime)).endTime(formatter.format(endTime))
+                    .startTime(startTime).endTime(endTime)
                     .entityNames(afcMultiSensor.getResourceId()).measurements(observedProperty)
             );
             final Supplier<IllegalArgumentException> exception = () -> new IllegalArgumentException("Can not find Location with @iot.id \"" + resourceUrn + "\".");
@@ -320,27 +330,25 @@ public class OGCSensorThingsGateway extends AbstractGateway {
             ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON).end(ogcLocation.encode());
         });
 
-        // without filter
         router().get(create("Things(:id)/HistoricalLocations")).handler(ctx -> {
+            ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON);
             RequestUriComponent uriComponent = parseUriToComponents(ctx.request());
             String resourceUrn = ctx.pathParam("id"); // resourceUrn
-            String filter = ctx.request().getParam("filter"); // TODO filter
-
-            final DateTimeFormatter formatter = new DateTimeFormatterBuilder().append(ofPattern("yyyy-MM-dd HH:mm"))
-                    .toFormatter().withZone(ZoneId.of("UTC"));
+            String filterParam = ctx.request().getParam("filter");
+            io.connector.model.sensorthings.Filter filter = io.connector.model.sensorthings.Filter.parse(filterParam);
 
             MultiSensor afcMultiSensor = client.getSensorByResourceUrn(resourceUrn);
             List<ResourceMeasurement> afcMeasurements;
-            if (filter != null && !filter.isEmpty()) {
-                String time = ""; // TODO parse filter
-
-                Instant startTime = Instant.parse(time);
-                Instant endTime = startTime.plusSeconds(60);
-
-                afcMeasurements = client.getObservationsBySensor(new Filter()
-                        .startTime(formatter.format(startTime)).endTime(formatter.format(endTime))
-                        .entityNames(afcMultiSensor.getResourceId())
-                );
+            if (filter.isExists()) {
+                Filter afcFilter;
+                try {
+                    afcFilter = Converter.convertFilter(filter);
+                } catch (RuntimeException e) {
+                    ctx.response().setStatusCode(501).end(new JsonObject()
+                            .put("message", e.getMessage()).encode()); return;
+                }
+                afcFilter.entityNames(afcMultiSensor.getResourceId());
+                afcMeasurements = client.getObservationsBySensor(afcFilter);
             } else {
                 afcMeasurements = client.getLatestObservationsBySensor(new Filter()
                         .limit(1).entityNames(afcMultiSensor.getResourceId())
@@ -355,31 +363,41 @@ public class OGCSensorThingsGateway extends AbstractGateway {
             AFCLocationList afcLocations = AFCLocationUtils.sort(afcMeasurement);
 
             List<HistoricalLocation> locations = Converter.convertToHistoricalLocation(afcMultiSensor, afcLocations.getList(), uriComponent);
-            ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON).end(encode(locations));
+            ctx.response().end(encode(locations));
         });
 
-        // without filter
         router().get(create("Locations(:id)/HistoricalLocations")).handler(ctx -> {
+            ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON);
             RequestUriComponent uriComponent = parseUriToComponents(ctx.request());
             String id = ctx.pathParam("id"); // resourceUrn + observedProperty + time
-            String filter = ctx.request().getParam("filter"); // TODO filter
+            String filterParam = ctx.request().getParam("filter");
+            io.connector.model.sensorthings.Filter filter = io.connector.model.sensorthings.Filter.parse(filterParam);
 
             String [] idCmp = Converter.disassemblyId(id);
             String resourceUrn = idCmp[0], observedProperty = idCmp[1], time = idCmp[2];
 
-            final DateTimeFormatter formatter = new DateTimeFormatterBuilder().append(ofPattern("yyyy-MM-dd HH:mm"))
-                    .toFormatter().withZone(ZoneId.of("UTC"));
-
             final Supplier<IllegalArgumentException> exception =
                     () -> new IllegalArgumentException("Can not find HistoricalLocations of the Thing with @iot.id \"" + resourceUrn + "\".");
 
             MultiSensor afcMultiSensor = client.getSensorByResourceUrn(resourceUrn);
 
-            Instant startTime = Instant.parse(time);
-            Instant endTime = startTime.plusSeconds(60*60); // 1h
+            Filter afcFilter;
+            if (filter.isExists()) {
+                try {
+                    afcFilter = Converter.convertFilter(filter);
+                } catch (RuntimeException e) {
+                    ctx.response().setStatusCode(501).end(new JsonObject()
+                            .put("message", e.getMessage()).encode()); return;
+                }
+                afcFilter.entityNames(afcMultiSensor.getResourceId());
+            } else {
+                Instant startTime = Instant.parse(time);
+                Instant endTime = startTime.plusSeconds(60*60); // 1h
 
-            List<ResourceMeasurement> afcMeasurements = client.getObservationsBySensor(new Filter()
-                    .startTime(formatter.format(startTime)).endTime(formatter.format(endTime))
+                afcFilter = new Filter().startTime(startTime).endTime(endTime);
+            }
+
+            List<ResourceMeasurement> afcMeasurements = client.getObservationsBySensor(afcFilter
                     .entityNames(afcMultiSensor.getResourceId()).measurements(observedProperty)
             );
 
@@ -389,7 +407,7 @@ public class OGCSensorThingsGateway extends AbstractGateway {
             AFCLocationList afcLocations = AFCLocationUtils.sort(afcMeasurement);
 
             List<HistoricalLocation> locations = Converter.convertToHistoricalLocation(afcMultiSensor, afcLocations.getList(), uriComponent);
-            ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON).end(encode(locations));
+            ctx.response().end(encode(locations));
         });
     }
 
@@ -507,6 +525,41 @@ public class OGCSensorThingsGateway extends AbstractGateway {
             return id.split(DELIMITER);
         }
 
+        static Filter convertFilter(io.connector.model.sensorthings.Filter ogcFilter) {
+
+            if (ogcFilter == null) {
+                throw new IllegalArgumentException("Unsupported filter attributes in filter expression '" + ogcFilter + "'.");
+            }
+
+            Filter afcFilter = new Filter();
+
+            Instant startTime = null, endTime = null;
+            for (io.connector.model.sensorthings.Filter.Expression expression : ogcFilter.getAddExpressions()) {
+                switch (expression.getAttribute()) {
+                    case "time":
+                    case "resultTime": {
+                        switch (expression.getOperator()) {
+                            case "lt": {
+                                endTime = Instant.parse(expression.getValue());
+                            } break;
+                            case "gt": {
+                                startTime = Instant.parse(expression.getValue());
+                            }break;
+                            default: throw new IllegalArgumentException(format(
+                                        "Unsupported operator '%s' in the filter expression '%s'.", expression.getOperator(), ogcFilter));
+                        }
+                    } break;
+                    default: throw new IllegalArgumentException(format(
+                                "Unsupported attribute '%s' in the filter expression '%s'.", expression.getAttribute(), ogcFilter));
+                }
+            }
+
+            afcFilter.startTime(startTime)
+                    .endTime(endTime);
+
+            return afcFilter;
+        }
+
         static HistoricalLocation convertToHistoricalLocation(MultiSensor afcMultiSensor, AFCAggrLocation afcLocation, RequestUriComponent uriComponent) {
             HistoricalLocation historicalLocation = new HistoricalLocation();
             String locationId = assemblyId(afcMultiSensor.getResourceUrn(), afcLocation.measurement, afcLocation.observation.getTime());
@@ -563,6 +616,14 @@ public class OGCSensorThingsGateway extends AbstractGateway {
             return observation;
         }
 
+        static List<Observation> convertToObservations(ResourceMeasurement afcMeasurement, SensorTelemetry afcTelemetry, RequestUriComponent uriComponent) {
+            List<Observation> ogcObservations = new ArrayList<>(afcTelemetry.getObservations().size());
+            for (io.connector.model.afarcloud.Observation afcObservation : afcTelemetry.getObservations()) {
+                ogcObservations.add(convertToObservation(afcMeasurement, afcTelemetry, afcObservation, uriComponent));
+            }
+            return ogcObservations;
+        }
+
         static ObservedProperty convertToObservedProperty(MultiSensor afcMultiSensor, MultiSensor.SensorSchema afcSensor, RequestUriComponent uriComponent) {
             Objects.requireNonNull(afcMultiSensor);
             Objects.requireNonNull(afcSensor);