Procházet zdrojové kódy

Implemented push Observation and Thing

Lukas Cerny před 4 roky
rodič
revize
4ca3e66585
22 změnil soubory, kde provedl 677 přidání a 84 odebrání
  1. 2 1
      build.gradle
  2. 2 2
      connector-core/src/main/java/io/connector/core/Handler.java
  3. 2 2
      connector-core/src/main/java/io/connector/core/VertxHttpServer.java
  4. 2 2
      connector-core/src/main/java/io/connector/core/VertxScheduler.java
  5. 2 2
      connector-core/src/main/java/io/connector/core/http/HttpContentType.java
  6. 2 2
      connector-core/src/main/java/io/connector/core/http/HttpHeader.java
  7. 36 0
      connector-model/src/main/java/io/connector/model/afarcloud/MultiSensor.java
  8. 0 28
      connector-model/src/main/java/io/connector/model/sensorthings/Datastream.java
  9. 92 0
      connector-model/src/main/java/io/connector/model/sensorthings/DatastreamInsert.java
  10. 6 0
      connector-model/src/main/java/io/connector/model/sensorthings/Geometry.java
  11. 43 25
      connector-model/src/main/java/io/connector/model/sensorthings/Location.java
  12. 26 0
      connector-model/src/main/java/io/connector/model/sensorthings/LocationInfo.java
  13. 50 0
      connector-model/src/main/java/io/connector/model/sensorthings/ObservationInsert.java
  14. 7 0
      connector-model/src/main/java/io/connector/model/sensorthings/ObservedProperty.java
  15. 46 0
      connector-model/src/main/java/io/connector/model/sensorthings/SensorInsert.java
  16. 67 0
      connector-model/src/main/java/io/connector/model/sensorthings/ThingInsert.java
  17. 37 0
      connector-model/src/main/java/io/connector/model/sensorthings/UnitOfMeasurement.java
  18. 32 5
      connector-module-afarcloud/src/main/java/io/connector/module/afarcloud/AFCHttpClient.java
  19. 211 8
      connector-module-afarcloud/src/main/java/io/connector/module/afarcloud/gateway/OGCSensorThingsGateway.java
  20. 7 2
      connector-module-afarcloud/src/test/java/io/connector/module/afarcloud/OGCSensorThingsOnlineIntegrationTest.java
  21. 1 1
      connector-module-senslog1/src/main/java/io/connector/module/senslog1/SensLog1HttpClient.java
  22. 4 4
      connector-module-senslog1/src/main/java/io/connector/module/senslog1/gateway/OGCSensorThingsGateway.java

+ 2 - 1
build.gradle

@@ -49,7 +49,6 @@ allprojects {
         compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.12.0'
         compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.12.0'
 
-//        compile fileTree(include: ['*.jar'], dir: 'drivers')
         testCompile group: 'org.junit.jupiter', name: 'junit-jupiter-engine', version: '5.8.0-M1'
     }
 
@@ -111,6 +110,8 @@ configure(moduleNames) {
         compile project(":connector-model")
 
         compile group: 'io.vertx', name: 'vertx-core', version: '3.9.1'
+
+        testCompile group: 'io.vertx', name: 'vertx-junit5', version: '4.0.3'
     }
 }
 

+ 2 - 2
connector-core/src/main/java/io/connector/core/Handler.java

@@ -8,8 +8,8 @@ import io.vertx.ext.web.RoutingContext;
 
 import java.util.function.Consumer;
 
-import static io.connector.core.http.ContentType.APPLICATION_JSON;
-import static io.connector.core.http.Header.CONTENT_TYPE;
+import static io.connector.core.http.HttpContentType.APPLICATION_JSON;
+import static io.connector.core.http.HttpHeader.CONTENT_TYPE;
 
 /**
  *

+ 2 - 2
connector-core/src/main/java/io/connector/core/VertxHttpServer.java

@@ -26,8 +26,8 @@ import java.util.ArrayList;
 import java.util.List;
 
 import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
-import static io.connector.core.http.ContentType.APPLICATION_JSON;
-import static io.connector.core.http.Header.CONTENT_TYPE;
+import static io.connector.core.http.HttpContentType.APPLICATION_JSON;
+import static io.connector.core.http.HttpHeader.CONTENT_TYPE;
 import static io.connector.core.AddressPath.Creator.create;
 import static io.connector.core.AddressPath.Creator.createNormalized;
 import static io.connector.core.AddressPath.EVENT;

+ 2 - 2
connector-core/src/main/java/io/connector/core/VertxScheduler.java

@@ -19,8 +19,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static io.connector.core.http.ContentType.APPLICATION_JSON;
-import static io.connector.core.http.Header.CONTENT_TYPE;
+import static io.connector.core.http.HttpContentType.APPLICATION_JSON;
+import static io.connector.core.http.HttpHeader.CONTENT_TYPE;
 import static io.connector.core.AddressPath.Creator.create;
 import static io.connector.core.AddressPath.Creator.createNormalized;
 import static io.connector.core.AddressPath.SCHEDULER_CONSUMER;

+ 2 - 2
connector-core/src/main/java/io/connector/core/http/ContentType.java → connector-core/src/main/java/io/connector/core/http/HttpContentType.java

@@ -9,7 +9,7 @@ package io.connector.core.http;
  * @version 1.0
  * @since 1.0
  */
-public final class ContentType {
+public final class HttpContentType {
 
     public static final String APPLICATION_ATOM_XML = "application/atom+xml";
     public static final String APPLICATION_FORM_URLENCODED = "application/x-www-form-urlencoded";
@@ -24,6 +24,6 @@ public final class ContentType {
     public static final String TEXT_XML = "text/xml";
     public static final String WILDCARD = "*/*";
 
-    private ContentType() {}
+    private HttpContentType() {}
 
 }

+ 2 - 2
connector-core/src/main/java/io/connector/core/http/Header.java → connector-core/src/main/java/io/connector/core/http/HttpHeader.java

@@ -9,12 +9,12 @@ package io.connector.core.http;
  * @version 1.0
  * @since 1.0
  */
-public final class Header {
+public final class HttpHeader {
 
     public static final String AUTHORIZATION = "Authorization";
     public static final String DATE = "Date";
     public static final String ACCEPT = "Accept";
     public static final String CONTENT_TYPE = "Content-Type";
 
-    private Header() { }
+    private HttpHeader() { }
 }

+ 36 - 0
connector-model/src/main/java/io/connector/model/afarcloud/MultiSensor.java

@@ -22,6 +22,10 @@ public class MultiSensor {
     private Boolean preprocessing;
     private String pythonScript;
     private List<SensorSchema> observations;
+    private String supportedProtocol;
+    private String hardwareVersion;
+    private String softwareVersion;
+    private String firmwareVersion;
 
     public String getResourceId() {
         return resourceId;
@@ -95,6 +99,38 @@ public class MultiSensor {
         this.observations = observations;
     }
 
+    public String getSupportedProtocol() {
+        return supportedProtocol;
+    }
+
+    public void setSupportedProtocol(String supportedProtocol) {
+        this.supportedProtocol = supportedProtocol;
+    }
+
+    public String getHardwareVersion() {
+        return hardwareVersion;
+    }
+
+    public void setHardwareVersion(String hardwareVersion) {
+        this.hardwareVersion = hardwareVersion;
+    }
+
+    public String getSoftwareVersion() {
+        return softwareVersion;
+    }
+
+    public void setSoftwareVersion(String softwareVersion) {
+        this.softwareVersion = softwareVersion;
+    }
+
+    public String getFirmwareVersion() {
+        return firmwareVersion;
+    }
+
+    public void setFirmwareVersion(String firmwareVersion) {
+        this.firmwareVersion = firmwareVersion;
+    }
+
     public static class SensorSchema {
         private String observedProperty;
         private String uom;

+ 0 - 28
connector-model/src/main/java/io/connector/model/sensorthings/Datastream.java

@@ -116,32 +116,4 @@ public class Datastream extends JsonObject {
     public String getResultTime() {
         return getString("resultTime");
     }
-
-
-    public static class UnitOfMeasurement extends JsonObject {
-
-        public void setName(String name) {
-            put("name", name);
-        }
-
-        public String getName() {
-            return getString("name");
-        }
-
-        public void setSymbol(String symbol) {
-            put("symbol", symbol);
-        }
-
-        public String getSymbol() {
-            return getString("symbol");
-        }
-
-        public void setDefinition(String definition) {
-            put("definition", definition);
-        }
-
-        public String getDefinition() {
-            return getString("definition");
-        }
-    }
 }

+ 92 - 0
connector-model/src/main/java/io/connector/model/sensorthings/DatastreamInsert.java

@@ -0,0 +1,92 @@
+package io.connector.model.sensorthings;
+
+import io.vertx.core.json.JsonObject;
+
+import java.time.OffsetDateTime;
+import java.util.Collections;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+public class DatastreamInsert {
+
+    private final String name;
+    private final String description;
+    private final UnitOfMeasurement unitOfMeasurement;
+    private final String observationType;
+    private final Geometry observedArea;
+    private final OffsetDateTime[] phenomenonTime;
+    private final List<ObservationInsert> observations;
+    private final ObservedProperty observedProperty;
+    private final SensorInsert sensor;
+
+    public static DatastreamInsert parse(JsonObject json) {
+        requireNonNull(json, "JSON 'Datastream' is empty.");
+        String name = requireNonNull(json.getString("name", null), "Attribute 'Datastream/name' is required.");
+        String description = requireNonNull(json.getString("description", null), "Attribute 'Datastream/description' is required.");
+        UnitOfMeasurement unitOfMeasurement = UnitOfMeasurement.parse(requireNonNull(json.getJsonObject("unitOfMeasurement", null), "Attribute 'Datastream/unitOfMeasurement' is required."));
+        String observationType = requireNonNull(json.getString("observationType", null), "Attribute 'Datastream/observationType' is required.");
+        Geometry observedArea = Geometry.parse(requireNonNull(json.getJsonObject("observedArea", null), "Attribute 'Datastream/observedArea' is required."));
+        String phenomenonTime = requireNonNull(json.getString("phenomenonTime", null), "Attribute 'Datastream/phenomenonTime' is required.");
+        String [] phenomenonTimes = phenomenonTime.split("/");
+        OffsetDateTime firstPhenomenonTime = phenomenonTimes[0].equals("<none>") ? null : OffsetDateTime.parse(phenomenonTimes[0]);
+        OffsetDateTime secondPhenomenonTime = phenomenonTimes[1].equals("<none>") ? null :  OffsetDateTime.parse(phenomenonTimes[1]);
+        requireNonNull(json.getJsonArray("Observations", null), "Attribute 'Datastream/Observations' is required.");
+        ObservedProperty observedProperty = ObservedProperty.parse(requireNonNull(json.getJsonObject("ObservedProperty", null), "Attribute 'Datastream/ObservedProperty' is required."));
+        SensorInsert sensor = SensorInsert.parse(requireNonNull(json.getJsonObject("Sensor", null), "Attribute 'Datastream/Sensor' is required."));
+        return new DatastreamInsert(name, description, unitOfMeasurement, observationType, observedArea,
+                new OffsetDateTime[]{firstPhenomenonTime, secondPhenomenonTime}, Collections.emptyList(), observedProperty, sensor);
+    }
+
+    private DatastreamInsert(String name, String description, UnitOfMeasurement unitOfMeasurement,
+                            String observationType, Geometry observedArea, OffsetDateTime[] phenomenonTime,
+                            List<ObservationInsert> observations, ObservedProperty observedProperty,
+                            SensorInsert sensor)
+    {
+        this.name = name;
+        this.description = description;
+        this.unitOfMeasurement = unitOfMeasurement;
+        this.observationType = observationType;
+        this.observedArea = observedArea;
+        this.phenomenonTime = phenomenonTime;
+        this.observations = observations;
+        this.observedProperty = observedProperty;
+        this.sensor = sensor;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public UnitOfMeasurement getUnitOfMeasurement() {
+        return unitOfMeasurement;
+    }
+
+    public String getObservationType() {
+        return observationType;
+    }
+
+    public Geometry getObservedArea() {
+        return observedArea;
+    }
+
+    public OffsetDateTime[] getPhenomenonTime() {
+        return phenomenonTime;
+    }
+
+    public List<ObservationInsert> getObservations() {
+        return observations;
+    }
+
+    public ObservedProperty getObservedProperty() {
+        return observedProperty;
+    }
+
+    public SensorInsert getSensor() {
+        return sensor;
+    }
+}

+ 6 - 0
connector-model/src/main/java/io/connector/model/sensorthings/Geometry.java

@@ -8,6 +8,7 @@ import io.vertx.core.json.JsonObject;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 
 /**
  *
@@ -17,6 +18,11 @@ import java.util.List;
  */
 public class Geometry extends JsonObject {
 
+    public static Geometry parse(JsonObject json) {
+        Objects.requireNonNull(json, "JSON 'Geometry' is empty.");
+        return json.mapTo(Geometry.class);
+    }
+
     public void setType(String type) {
         put("type", type);
     }

+ 43 - 25
connector-model/src/main/java/io/connector/model/sensorthings/Location.java

@@ -5,6 +5,8 @@ package io.connector.model.sensorthings;
 
 import io.vertx.core.json.JsonObject;
 
+import java.util.Objects;
+
 /**
  *
  * @author Lukas Cerny
@@ -13,6 +15,41 @@ import io.vertx.core.json.JsonObject;
  */
 public class Location extends JsonObject {
 
+    public static Location parse(JsonObject json) {
+        Objects.requireNonNull(json, "JSON 'Location' is empty.");
+        Location location = json.mapTo(Location.class);
+        location.valid();
+        return location;
+    }
+
+    private void valid() {
+        try {
+            Objects.requireNonNull(getName(), "Attribute 'Location/name' is required.");
+            Objects.requireNonNull(getDescription(), "Attribute 'Location/description' is required.");
+            Objects.requireNonNull(getEncodingType(), "Attribute 'Location/encodingType' is required.");
+            Objects.requireNonNull(getLocation(), "Attribute 'Location/location' is required.");
+        } catch (NullPointerException e) {
+            throw new IllegalArgumentException(e.getMessage());
+        }
+    }
+
+    public boolean isValid() {
+        try {
+            valid(); return true;
+        } catch (IllegalArgumentException e) {
+            return false;
+        }
+    }
+
+    public boolean isValidWithLink() {
+        boolean preValid = isValid();
+        if (!preValid) { return false; }
+
+        // TODO
+
+        return true;
+    }
+
     public void setId(String id) {
         put("@iot.id", id);
     }
@@ -50,7 +87,7 @@ public class Location extends JsonObject {
     }
 
     public String getEncodingType() {
-        return getString("encodingType");
+        return getString("encodingType", null);
     }
 
     public void setName(String name) {
@@ -58,7 +95,7 @@ public class Location extends JsonObject {
     }
 
     public String getName() {
-        return getString("name");
+        return getString("name", null);
     }
 
     public void setDescription(String description) {
@@ -66,33 +103,14 @@ public class Location extends JsonObject {
     }
 
     public String getDescription(){
-        return getString("description");
+        return getString("description", null);
     }
 
-    public void setLocation(Info location) {
+    public void setLocation(LocationInfo location) {
         put("location", location);
     }
 
-    public Info getLocation() {
-        return (Info) getJsonObject("location");
-    }
-
-    public static class Info extends JsonObject {
-
-        public void setType(String type) {
-            put("type", type);
-        }
-
-        public String getType() {
-            return getString("type");
-        }
-
-        public void setGeometry(Geometry geometry) {
-            put("geometry", geometry);
-        }
-
-        public Geometry getGeometry() {
-            return (Geometry)getJsonObject("geometry");
-        }
+    public LocationInfo getLocation() {
+        return (LocationInfo) getJsonObject("location", null);
     }
 }

+ 26 - 0
connector-model/src/main/java/io/connector/model/sensorthings/LocationInfo.java

@@ -0,0 +1,26 @@
+package io.connector.model.sensorthings;
+
+import io.vertx.core.json.JsonObject;
+
+public class LocationInfo extends JsonObject {
+
+    public static LocationInfo parse(JsonObject json) {
+        return json != null ? json.mapTo(LocationInfo.class) : null;
+    }
+
+    public void setType(String type) {
+        put("type", type);
+    }
+
+    public String getType() {
+        return getString("type");
+    }
+
+    public void setGeometry(Geometry geometry) {
+        put("geometry", geometry);
+    }
+
+    public Geometry getGeometry() {
+        return (Geometry)getJsonObject("geometry");
+    }
+}

+ 50 - 0
connector-model/src/main/java/io/connector/model/sensorthings/ObservationInsert.java

@@ -0,0 +1,50 @@
+package io.connector.model.sensorthings;
+
+import io.vertx.core.json.JsonObject;
+
+import java.time.OffsetDateTime;
+import java.util.Objects;
+
+public class ObservationInsert {
+
+    private final String dataStreamId;
+    private final OffsetDateTime phenomenonTime;
+    private final Double result;
+    private final String featureOfInterestId;
+
+    private ObservationInsert(String dataStreamId, OffsetDateTime phenomenonTime, Double result, String featureOfInterestId) {
+        Objects.requireNonNull(dataStreamId, "Attribute 'Datastream' and specified '@iot.id' is required.");
+        Objects.requireNonNull(phenomenonTime, "Attribute 'phenomenonTime' is required.");
+        Objects.requireNonNull(result, "Attribute 'result' is required.");
+
+        this.dataStreamId = dataStreamId;
+        this.phenomenonTime = phenomenonTime;
+        this.result = result;
+        this.featureOfInterestId = featureOfInterestId;
+    }
+
+    public static ObservationInsert parse(JsonObject jsonObject) {
+        return new ObservationInsert(
+                jsonObject.getJsonObject("Datastream", new JsonObject()).getString("@iot.id", null),
+                OffsetDateTime.parse(jsonObject.getString("phenomenonTime", null)),
+                jsonObject.getDouble("result", null),
+                jsonObject.getJsonObject("FeatureOfInterest", new JsonObject()).getString("@iot.id", null)
+        );
+    }
+
+    public String getDatastreamId() {
+        return dataStreamId;
+    }
+
+    public OffsetDateTime getPhenomenonTime() {
+        return phenomenonTime;
+    }
+
+    public Double getResult() {
+        return result;
+    }
+
+    public String getFeatureOfInterestId() {
+        return featureOfInterestId;
+    }
+}

+ 7 - 0
connector-model/src/main/java/io/connector/model/sensorthings/ObservedProperty.java

@@ -5,6 +5,8 @@ package io.connector.model.sensorthings;
 
 import io.vertx.core.json.JsonObject;
 
+import java.util.Objects;
+
 /**
  *
  * @author Lukas Cerny
@@ -13,6 +15,11 @@ import io.vertx.core.json.JsonObject;
  */
 public class ObservedProperty extends JsonObject {
 
+    public static ObservedProperty parse(JsonObject json) {
+        Objects.requireNonNull(json,"JSON 'ObservedProperty' is empty.");
+        return json.mapTo(ObservedProperty.class);
+    }
+
     public void setId(String id) {
         put("@iot.id", id);
     }

+ 46 - 0
connector-model/src/main/java/io/connector/model/sensorthings/SensorInsert.java

@@ -0,0 +1,46 @@
+package io.connector.model.sensorthings;
+
+import io.vertx.core.json.JsonObject;
+
+
+import static java.util.Objects.requireNonNull;
+
+public class SensorInsert {
+
+    private final String name;
+    private final String description;
+    private final String encodingType;
+    private final String metadata;
+
+    public static SensorInsert parse(JsonObject json) {
+        requireNonNull(json, "JSON 'Sensor' is empty.");
+        String name = requireNonNull(json.getString("name", null), "Attribute 'Sensor/name' is required.");
+        String description = requireNonNull(json.getString("description", null), "Attribute 'Sensor/description' is required.");
+        String encodingType = requireNonNull(json.getString("encodingType", null), "Attribute 'Sensor/encodingType' is required.");
+        String metadata = requireNonNull(json.getString("metadata", null), "Attribute 'Sensor/metadata' is required.");
+        return new SensorInsert(name, description, encodingType, metadata);
+    }
+
+    private SensorInsert(String name, String description, String encodingType, String metadata) {
+        this.name = name;
+        this.description = description;
+        this.encodingType = encodingType;
+        this.metadata = metadata;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public String getEncodingType() {
+        return encodingType;
+    }
+
+    public String getMetadata() {
+        return metadata;
+    }
+}

+ 67 - 0
connector-model/src/main/java/io/connector/model/sensorthings/ThingInsert.java

@@ -0,0 +1,67 @@
+package io.connector.model.sensorthings;
+
+import io.vertx.core.json.JsonArray;
+import io.vertx.core.json.JsonObject;
+
+import java.util.*;
+
+import static java.util.Objects.requireNonNull;
+
+public class ThingInsert {
+
+    private final String name;
+    private final String description;
+    private final List<Location> locations;
+    private final List<DatastreamInsert> datastreams;
+    private final Map<String, Object> properties;
+
+    public static ThingInsert parse(JsonObject json) {
+        Objects.requireNonNull(json, "JSON 'Thing' is empty.");
+        String name = requireNonNull(json.getString("name", null), "Attribute 'name' is required.");
+        String description = requireNonNull(json.getString("description", null), "Attribute 'description' is required.");
+        JsonArray locationsJson = requireNonNull(json.getJsonArray("Locations", null), "Attribute 'Location' is required.");
+        List<Location> locations = new ArrayList<>(locationsJson.size());
+        for (int i = 0; i < locationsJson.size(); i++) {
+            locations.add(Location.parse(locationsJson.getJsonObject(i)));
+        }
+        JsonArray datastreamsJson = requireNonNull(json.getJsonArray("Datastreams", null), "Attribute 'Datastreams' is required.");
+        List<DatastreamInsert> datastreams = new ArrayList<>(datastreamsJson.size());
+        for (int i = 0; i < datastreamsJson.size(); i++) {
+            datastreams.add(DatastreamInsert.parse(datastreamsJson.getJsonObject(i)));
+        }
+        Map<String, Object> properties = json.getJsonObject("properties", new JsonObject()).getMap();
+        return new ThingInsert(name, description, locations, datastreams, properties);
+    }
+
+    public ThingInsert(String name, String description, List<Location> locations, List<DatastreamInsert> datastreams, Map<String, Object> properties) {
+        this.name = name;
+        this.description = description;
+        this.locations = locations;
+        this.datastreams = datastreams;
+        this.properties = properties.isEmpty() ? null : properties;
+    }
+
+    public ThingInsert(String name, String description, List<Location> locations, List<DatastreamInsert> datastreams) {
+        this(name, description, locations, datastreams, Collections.emptyMap());
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public List<Location> getLocations() {
+        return locations;
+    }
+
+    public List<DatastreamInsert> getDatastreams() {
+        return datastreams;
+    }
+
+    public Map<String, Object> getProperties() {
+        return properties;
+    }
+}

+ 37 - 0
connector-model/src/main/java/io/connector/model/sensorthings/UnitOfMeasurement.java

@@ -0,0 +1,37 @@
+package io.connector.model.sensorthings;
+
+import io.vertx.core.json.JsonObject;
+
+import java.util.Objects;
+
+public class UnitOfMeasurement extends JsonObject {
+
+    public static UnitOfMeasurement parse(JsonObject json) {
+        Objects.requireNonNull(json, "JSON 'UnitOfMeasurement' is empty.");
+        return json.mapTo(UnitOfMeasurement.class);
+    }
+
+    public void setName(String name) {
+        put("name", name);
+    }
+
+    public String getName() {
+        return getString("name");
+    }
+
+    public void setSymbol(String symbol) {
+        put("symbol", symbol);
+    }
+
+    public String getSymbol() {
+        return getString("symbol");
+    }
+
+    public void setDefinition(String definition) {
+        put("definition", definition);
+    }
+
+    public String getDefinition() {
+        return getString("definition");
+    }
+}

+ 32 - 5
connector-module-afarcloud/src/main/java/io/connector/module/afarcloud/AFCHttpClient.java

@@ -15,10 +15,11 @@ import io.vertx.core.json.Json;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import static io.connector.core.http.ContentType.TEXT_PLAIN;
+import static io.connector.core.http.HttpContentType.TEXT_PLAIN;
 import static java.lang.String.format;
 
 /**
@@ -68,13 +69,39 @@ public class AFCHttpClient {
         }
 
         HostConfig host = new HostConfig(config.getTelemetryDomain(), "telemetry");
-        logger.debug("Getting observations from {}.", host.getDomain());
+
+        List<MultiSimpleObservation> failedObservations = new ArrayList<>();
 
         for (MultiSimpleObservation multiSensor : observations) {
-            System.out.println(Json.encode(multiSensor)); // TODO upload to the server
+
+            HttpRequest request = HttpRequest.newBuilder().POST()
+                    .url(URLBuilder.newBuilder(host.getDomain(), host.getPath())
+                            .addParam("test", "")
+                            .build())
+                    .body(Json.encode(multiSensor))
+                    .build();
+            logger.info("Creating a http request to {}.", request);
+
+            HttpResponse response = httpClient.send(request);
+            logger.info("Received a response with a status: {} for the domain {}.", response.getStatus(), host.getDomain());
+
+            if (response.isError()) {
+                failedObservations.add(multiSensor);
+            }
+        }
+
+        logger.info("Uploaded {}/{} observations.", observations.size()-failedObservations.size(), observations.size());
+
+        if (!failedObservations.isEmpty()) {
+            StringBuilder resourceIdBuilder = new StringBuilder("[");
+            for (int i = 0; i < failedObservations.size() - 1; i++) {
+                resourceIdBuilder.append(failedObservations.get(i).getResourceId());
+            }
+            resourceIdBuilder.append(failedObservations.get(failedObservations.size()-1)).append("]");
+            throw logger.throwing(new RuntimeException(String.format(
+                    "Observations for following resourceIds can not be send: %s.", resourceIdBuilder.toString()
+            )));
         }
-        
-        logger.info("Converted {} observations.", observations.size());
     }
 
     public List<ResourceMeasurement> getObservationsBySensor(Filter filter) {

+ 211 - 8
connector-module-afarcloud/src/main/java/io/connector/module/afarcloud/gateway/OGCSensorThingsGateway.java

@@ -3,31 +3,36 @@
 
 package io.connector.module.afarcloud.gateway;
 
+import com.sun.org.apache.xpath.internal.operations.Bool;
 import io.connector.core.AbstractGateway;
 import io.connector.core.http.RequestUriComponent;
-import io.connector.model.afarcloud.MultiSensor;
-import io.connector.model.afarcloud.ResourceMeasurement;
-import io.connector.model.afarcloud.SensorTelemetry;
+import io.connector.model.afarcloud.*;
 import io.connector.model.sensorthings.*;
+import io.connector.model.sensorthings.Location;
+import io.connector.model.sensorthings.Observation;
 import io.connector.module.afarcloud.AFCHttpClient;
 import io.connector.module.afarcloud.Filter;
+import io.vertx.core.json.Json;
 import io.vertx.core.json.JsonArray;
 import io.vertx.core.json.JsonObject;
+import io.vertx.ext.web.handler.BodyHandler;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.time.Instant;
 import java.util.*;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
-import static io.connector.core.http.ContentType.APPLICATION_JSON;
-import static io.connector.core.http.Header.CONTENT_TYPE;
+import static io.connector.core.http.HttpContentType.APPLICATION_JSON;
+import static io.connector.core.http.HttpHeader.CONTENT_TYPE;
 import static io.connector.core.AddressPath.Creator.create;
 import static io.vertx.core.json.Json.encode;
 import static java.lang.Math.sqrt;
 import static java.lang.StrictMath.pow;
 import static java.lang.String.format;
 import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
 import static java.util.Optional.ofNullable;
 
 /**
@@ -56,6 +61,18 @@ public class OGCSensorThingsGateway extends AbstractGateway {
     @Override
     protected void run() {
 
+        router().post().handler(BodyHandler.create()).handler(ctx -> {
+            String contentType = ctx.request().getHeader(CONTENT_TYPE);
+            if (!contentType.equals(APPLICATION_JSON)) {
+                ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON).setStatusCode(415)
+                        .end(new JsonObject().put("message", String.format(
+                                "Unsupported content type. Use one of these [%s].", APPLICATION_JSON
+                        )).encode()
+                );
+            }
+            ctx.next();
+        });
+
         router().get(create("Things")).handler(ctx -> {
             logger.info("Handling a request: {}.", ctx.request().absoluteURI());
             RequestUriComponent uriComponent = parseUriToComponents(ctx.request());
@@ -336,7 +353,7 @@ public class OGCSensorThingsGateway extends AbstractGateway {
             AFCAggrLocation afcLastLocation = afcLastLocationOpt.orElseThrow(exception);
 
             Location ogcLocation = Converter.convertToLocation(afcMultiSensor, afcLastLocation, uriComponent);
-            List<Location> ogcLocations = Collections.singletonList(ogcLocation);
+            List<Location> ogcLocations = singletonList(ogcLocation);
             ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON).end(encode(ogcLocations));
         });
 
@@ -480,6 +497,28 @@ public class OGCSensorThingsGateway extends AbstractGateway {
                 ctx.response().end(encode(locations));
             }
         });
+
+        router().post(create("Datastream")).handler(ctx -> {
+            logger.info("Handling a request: {}.", ctx.request().absoluteURI());
+            ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON);
+
+            // TODO catch exception and create a wrapper with http/error code
+            ObservationInsert ogcObservation = ObservationInsert.parse(ctx.getBodyAsJson());
+            MultiSimpleObservation afcObservation = Converter.convertToMultiSimpleObservation(ogcObservation);
+            client.uploadAggregatedMeasurements(singletonList(afcObservation));
+            // TODO response http 201 (created) and link to the Datastream
+            ctx.response().end();
+        });
+
+        router().post(create("Things")).handler(ctx -> {
+            logger.info("Handling a request: {}.", ctx.request().absoluteURI());
+            ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON);
+
+            ThingInsert ogcThing = ThingInsert.parse(ctx.getBodyAsJson());
+            MultiSensor afcMultiSensor = Converter.convertToMultiSensor(ogcThing);
+
+            ctx.response().end(Json.encode(afcMultiSensor));
+        });
     }
 
     private static class AFCAggrLocation implements Comparable<AFCAggrLocation> {
@@ -659,7 +698,7 @@ public class OGCSensorThingsGateway extends AbstractGateway {
             location.setDescription(afcLocation.observation.getProvider());
             location.setEncodingType("application/vnd.geo+json");
 
-            Location.Info info = new Location.Info();
+            LocationInfo info = new LocationInfo();
             info.setType("Feature");
             location.setLocation(info);
 
@@ -755,7 +794,7 @@ public class OGCSensorThingsGateway extends AbstractGateway {
             datastream.setName(afcSensor.getObservedProperty());
             datastream.setDescription(afcSensor.getObservedProperty());
 
-            Datastream.UnitOfMeasurement uom = new Datastream.UnitOfMeasurement();
+            UnitOfMeasurement uom = new UnitOfMeasurement();
             uom.setName(afcSensor.getObservedProperty());
             uom.setSymbol("");
             uom.setDefinition(afcSensor.getUom());
@@ -816,5 +855,169 @@ public class OGCSensorThingsGateway extends AbstractGateway {
 
             return sensor;
         }
+
+        static MultiSimpleObservation convertToMultiSimpleObservation(ObservationInsert ogcObservation) {
+            String [] idPartsAfc = disassemblyId(ogcObservation.getDatastreamId());
+            String resourceUrn = idPartsAfc[0], observedProperty = idPartsAfc[1];
+            String [] urnPartsAfc = resourceUrn.split(":");
+            String resourceId = urnPartsAfc[urnPartsAfc.length-1];
+
+            MultiSimpleObservation multiSimpleObservation = new MultiSimpleObservation();
+            multiSimpleObservation.setResourceId(resourceId);
+
+            SimpleObservation simpleObservation = new SimpleObservation();
+            simpleObservation.setObservedProperty(observedProperty);
+            simpleObservation.setResultTime(ogcObservation.getPhenomenonTime().toEpochSecond());
+            simpleObservation.setResult(ogcObservation.getResult());
+
+            multiSimpleObservation.setObservations(singletonList(simpleObservation));
+
+            return multiSimpleObservation;
+        }
+
+        static MultiSensor convertToMultiSensor(ThingInsert ogcThing) {
+            String afcResourceType = ogcThing.getName();
+            if (!afcResourceType.equals(ogcThing.getDescription())) {
+                throw new IllegalArgumentException("Attribute 'description' has to be equal with 'name'. Contains 'resourceType' attribute from AFC.");
+            }
+            if (ogcThing.getLocations().size() != 1) {
+                throw new IllegalArgumentException("Attribute 'Locations' has to contain one location.");
+            }
+            Location ogcLocation = ogcThing.getLocations().get(0);
+            if (!afcResourceType.equals(ogcLocation.getName())) {
+                throw new IllegalArgumentException("Attribute 'Location/name' has to be equal with 'name'. Contains 'resourceType attribute from AFC.");
+            }
+            LocationInfo ogcLocationInfo = ogcLocation.getLocation();
+            String ogcLocationInfoType = ogcLocationInfo.getType();
+            if (!ogcLocationInfoType.equals("Feature")) {
+                throw new IllegalArgumentException("Allowed only 'Feature' type of location.");
+            }
+            Geometry ogcLocationGeometry = ogcLocationInfo.getGeometry();
+            if (!ogcLocationGeometry.getType().equals("Point")) {
+                throw new IllegalArgumentException("Allowed only 'Point' type of geometry.");
+            }
+            List<Double> ogcCoordinates = ogcLocationGeometry.getCoordinates();
+            if (ogcCoordinates.size() != 3) {
+                throw new IllegalArgumentException("Coordinates of location have to be in following format [longitude, latitude, altitude].");
+            }
+
+            double afcLongitude = ogcCoordinates.get(0);
+            double afcLatitude = ogcCoordinates.get(1);
+            double afcAltitude = ogcCoordinates.get(2);
+
+            List<DatastreamInsert> ogcDatastreams = ogcThing.getDatastreams();
+            List<MultiSensor.SensorSchema> afcSensorSchemas = new ArrayList<>(ogcDatastreams.size());
+            for (DatastreamInsert ogcDatastream : ogcDatastreams) {
+                String afcObservedProperty = ogcDatastream.getName();
+                if (!afcObservedProperty.equals(ogcDatastream.getDescription())) {
+                    throw new IllegalArgumentException("Attribute 'Datastream/description' has to be equal with 'Datastream/name'. Contains 'observedProperty attribute from AFC.");
+                }
+                UnitOfMeasurement ogcUnitOfMeasurement = ogcDatastream.getUnitOfMeasurement();
+                String afcUom = ogcUnitOfMeasurement.getDefinition();
+
+                String ogcObservationType = ogcDatastream.getObservationType();
+                if (!ogcObservationType.equals("http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement")) {
+                    throw new IllegalArgumentException("For the attribute 'Datastream/observationType' is allowed following values [http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement].");
+                }
+
+                Geometry ogcObservedArea = ogcDatastream.getObservedArea();
+                if (!ogcObservedArea.getType().equals("Point")) {
+                    throw new IllegalArgumentException("Allowed only 'Point' type of observedArea for the datastream '"+afcObservedProperty+"'.");
+                }
+                List<Double> ogcObcAreaCoordinates = ogcObservedArea.getCoordinates();
+                if (ogcObcAreaCoordinates.size() != 3) {
+                    throw new IllegalArgumentException("Coordinates of location for the datastream '"+afcObservedProperty+"' have to be in following format [longitude, latitude, altitude].");
+                }
+                if (!(ogcObcAreaCoordinates.get(0).equals(afcLongitude) || ogcObcAreaCoordinates.get(1).equals(afcLatitude) || ogcObcAreaCoordinates.get(2).equals(afcAltitude))) {
+                    throw new IllegalArgumentException("Coordinates of the observedArea have to be same as the last location.");
+                }
+
+                ObservedProperty ogcObservedProperty = ogcDatastream.getObservedProperty();
+                if (!ogcObservedProperty.getName().equals(afcObservedProperty)) {
+                    throw new IllegalArgumentException("Name of ObservedProperty has to be same as the name of datastream.");
+                }
+                if (!ogcObservedProperty.getDescription().equals(afcObservedProperty)) {
+                    throw new IllegalArgumentException("Description of ObservedProperty has to be same as the name of the datastream.");
+                }
+                if (!ogcObservedProperty.getDefinition().equals(afcUom)) {
+                    throw new IllegalArgumentException("Definition of ObservedProperty has to be same as the definition of unit measurement.");
+                }
+
+                SensorInsert ogcSensor = ogcDatastream.getSensor();
+                if (!ogcSensor.getName().equals(afcResourceType)) {
+                    throw new IllegalArgumentException("Name of sensor has to be same as the name of thing.");
+                }
+                if (!ogcSensor.getDescription().equals(afcResourceType)) {
+                    throw new IllegalArgumentException("Description of sensor has to be same as the name of thing.");
+                }
+                if (!ogcSensor.getEncodingType().equals("application/json")) {
+                    throw new IllegalArgumentException("For the attribute 'datastream/sensor/encodingType is allowed only 'application/json' type.");
+                }
+
+                MultiSensor.SensorSchema afcSensor = new MultiSensor.SensorSchema();
+                afcSensor.setObservedProperty(afcObservedProperty);
+                afcSensor.setUom(afcUom);
+                afcSensor.setMin_value(Double.MIN_VALUE);   // TODO set min_value
+                afcSensor.setMax_value(Double.MAX_VALUE);   // TODO set max_value
+                afcSensor.setAccuracy(0.1);                 // TODO set accuracy
+                afcSensorSchemas.add(afcSensor);
+            }
+
+            if (ogcThing.getProperties() == null) {
+                final String[] propAttrs = new String[]{"resourceUrn","resourceId", "supportedProtocol", "hardwareVersion", "softwareVersion", "firmwareVersion", "preprocessing", "pythonScript"};
+                throw new IllegalArgumentException("Attribute 'properties' is required and has to contain following attributes " + Arrays.toString(propAttrs) + ".");
+            }
+            Map<String, Object> ogcProperties = ogcThing.getProperties();
+
+            String afcResourceId = (String) ogcProperties.get("resourceId");
+            if (afcResourceId == null) {
+                throw new IllegalArgumentException("Attribute 'properties/resourceId' is required.");
+            }
+            String afcResourceUrn = (String) ogcProperties.get("resourceUrn");
+            if (afcResourceUrn == null) {
+                throw new IllegalArgumentException("Attribute 'properties/resourceUrn' is required.");
+            }
+            String afcSupportedProtocol = (String) ogcProperties.get("supportedProtocol");
+            if (afcSupportedProtocol == null) {
+                throw new IllegalArgumentException("Attribute 'properties/supportedProtocol' is required.");
+            }
+            String afcHardwareVersion = (String) ogcProperties.get("hardwareVersion");
+            if (afcHardwareVersion == null) {
+                throw new IllegalArgumentException("Attribute 'properties/hardwareVersion' is required.");
+            }
+            String afcSoftwareVersion = (String) ogcProperties.get("softwareVersion");
+            if (afcSoftwareVersion == null) {
+                throw new IllegalArgumentException("Attribute 'properties/softwareVersion' is required.");
+            }
+            String afcFirmwareVersion = (String) ogcProperties.get("firmwareVersion");
+            if (afcFirmwareVersion == null) {
+                throw new IllegalArgumentException("Attribute 'properties/firmwareVersion' is required.");
+            }
+            String afcPythonScript = (String) ogcProperties.get("pythonScript");
+            if (afcPythonScript == null) {
+                throw new IllegalArgumentException("Attribute 'properties/pythonScript' is required.");
+            }
+            Boolean afcPreprocessing = (Boolean) ogcProperties.get("preprocessing");
+            if (afcPreprocessing == null) {
+                throw new IllegalArgumentException("Attribute 'properties/preprocessing' is required.");
+            }
+
+            MultiSensor afcMultiSensor = new MultiSensor();
+            afcMultiSensor.setResourceId(afcResourceId);
+            afcMultiSensor.setResourceType(afcResourceType);
+            afcMultiSensor.setResourceUrn(afcResourceUrn);
+            afcMultiSensor.setLongitude(afcLongitude);
+            afcMultiSensor.setLatitude(afcLatitude);
+            afcMultiSensor.setAltitude(afcAltitude);
+            afcMultiSensor.setPreprocessing(afcPreprocessing);
+            afcMultiSensor.setPythonScript(afcPythonScript);
+            afcMultiSensor.setObservations(afcSensorSchemas);
+            afcMultiSensor.setSupportedProtocol(afcSupportedProtocol);
+            afcMultiSensor.setHardwareVersion(afcHardwareVersion);
+            afcMultiSensor.setSoftwareVersion(afcSoftwareVersion);
+            afcMultiSensor.setFirmwareVersion(afcFirmwareVersion);
+
+            return afcMultiSensor;
+        }
     }
 }

+ 7 - 2
connector-module-afarcloud/src/test/java/io/connector/module/afarcloud/OGCSensorThingsOnlineIntegrationTest.java

@@ -1,14 +1,19 @@
 package io.connector.module.afarcloud;
 
 import io.connector.test.api.TestType;
+import io.vertx.core.Vertx;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import static org.junit.jupiter.api.Assertions.*;
 
 @Tag(TestType.INTEGRATION)
+@ExtendWith(VertxExtension.class)
 class OGCSensorThingsOnlineIntegrationTest {
 
     @BeforeEach
@@ -22,7 +27,7 @@ class OGCSensorThingsOnlineIntegrationTest {
     }
 
     @Test
-    void run() {
-        assertEquals("hello_AFC", "hello_AFC");
+    void run(Vertx vertx, VertxTestContext testContext) {
+
     }
 }

+ 1 - 1
connector-module-senslog1/src/main/java/io/connector/module/senslog1/SensLog1HttpClient.java

@@ -15,7 +15,7 @@ import java.time.OffsetDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.*;
 
-import static io.connector.core.http.ContentType.TEXT_PLAIN;
+import static io.connector.core.http.HttpContentType.TEXT_PLAIN;
 import static java.lang.String.format;
 import static java.time.format.DateTimeFormatter.ofPattern;
 

+ 4 - 4
connector-module-senslog1/src/main/java/io/connector/module/senslog1/gateway/OGCSensorThingsGateway.java

@@ -13,8 +13,8 @@ import org.apache.logging.log4j.Logger;
 import java.util.*;
 import java.util.function.Supplier;
 
-import static io.connector.core.http.ContentType.APPLICATION_JSON;
-import static io.connector.core.http.Header.CONTENT_TYPE;
+import static io.connector.core.http.HttpContentType.APPLICATION_JSON;
+import static io.connector.core.http.HttpHeader.CONTENT_TYPE;
 import static io.connector.core.AddressPath.Creator.create;
 import static io.vertx.core.json.Json.encode;
 import static java.lang.Integer.parseInt;
@@ -280,7 +280,7 @@ public class OGCSensorThingsGateway extends AbstractGateway {
             datastream.setName(sensor.getName());
             datastream.setDescription(unit.getDescription());
 
-            Datastream.UnitOfMeasurement uom = new Datastream.UnitOfMeasurement();
+            UnitOfMeasurement uom = new UnitOfMeasurement();
             uom.setName(phenomenon.getName());
             uom.setSymbol("");
             uom.setDefinition(phenomenon.getUnit());
@@ -350,7 +350,7 @@ public class OGCSensorThingsGateway extends AbstractGateway {
 //            location.setDescription(); // TODO
             location.setEncodingType("application/vnd.geo+json");
 
-            Location.Info info = new Location.Info();
+            LocationInfo info = new LocationInfo();
             info.setType("Feature");
             location.setLocation(info);