浏览代码

refactored Scheduler, edited AFC

Lukas Cerny 5 年之前
父节点
当前提交
f797252705

+ 8 - 5
config/test.yaml

@@ -42,15 +42,18 @@ services:
 
 
   AFC:
-      api: &apiDomain
-        domain: "https://rest.afarcloud.smartarch.cz/storage/rest"
+      retrievalApi:
+        domain: "http://torcos.etsist.upm.es:9219/"
+
+      telemetryApi:
+        domain: "https://torcos.etsist.upm.es:9207"
+
+      infoApi:
+        domain: "https://storage07-afarcloud.qa.pdmfc.com/storage/rest/"
 
       name: "AFarCloud"
       provider: "io.connector.module.afarcloud.AFCModuleProvider"
 
-      host:
-        <<: *apiDomain
-        path: "<path>"
 
 scheduler:
 #  senslog1:

+ 3 - 10
connector-core/src/main/java/io/connector/core/AbstractModule.java

@@ -1,6 +1,5 @@
 package io.connector.core;
 
-import io.vertx.core.AbstractVerticle;
 import io.vertx.ext.web.Router;
 
 import java.util.ArrayList;
@@ -10,15 +9,14 @@ import java.util.Map;
 import static io.connector.core.AddressPath.Creator.create;
 import static io.connector.core.AddressPath.INFO;
 
-public abstract class AbstractModule extends AbstractVerticle {
+public abstract class AbstractModule extends Module {
 
     protected final ModuleDescriptor descriptor;
-    public final String moduleId;
     private final Map<String, AbstractGateway> gateways;
 
     protected AbstractModule(String id, ModuleDescriptor descriptor) {
+        super(id);
         this.descriptor = descriptor;
-        this.moduleId = id;
         this.gateways = new HashMap<>();
     }
 
@@ -32,18 +30,13 @@ public abstract class AbstractModule extends AbstractVerticle {
         }
     }
 
-    public abstract void run() throws Exception;
-
     public abstract ModuleInfo info();
 
-    public String id() {
-        return moduleId;
-    }
-
     public ModuleDescriptor descriptor() {
         return descriptor;
     }
 
+    @Override
     public Router router() {
         Router moduleRouter = Router.router(vertx);
         for (AbstractGateway gateway : gateways.values()) {

+ 31 - 0
connector-core/src/main/java/io/connector/core/Module.java

@@ -0,0 +1,31 @@
+package io.connector.core;
+
+import io.vertx.core.AbstractVerticle;
+import io.vertx.ext.web.Router;
+
+public abstract class Module extends AbstractVerticle {
+
+    public final String moduleId;
+
+    private Router router;
+
+    public Module(String moduleId) {
+        this.moduleId = moduleId;
+    }
+
+    public abstract void run() throws Exception;
+
+    public String id() {
+        return moduleId;
+    }
+
+    public Router router() {
+        return this.router;
+    }
+
+    @Override
+    public void start() throws Exception {
+        this.router = Router.router(vertx);
+        run();
+    }
+}

+ 13 - 6
connector-core/src/main/java/io/connector/core/ModuleDeployer.java

@@ -23,20 +23,27 @@ public class ModuleDeployer extends AbstractVerticle {
 
     @Override
     public void start(Promise<Void> startPromise) {
-        List<Future> futureModules = new ArrayList<>(modules.size());
+        final int finalCountOfModules = modules.size() + 1;
+
+        List<Future> futureModules = new ArrayList<>(finalCountOfModules);
+        List<Module> extendedModules = new ArrayList<>(finalCountOfModules);
 
         for (AbstractModule module : modules) {
-            DeploymentOptions options = new DeploymentOptions()
-                    .setWorker(true);
+            DeploymentOptions options = new DeploymentOptions().setWorker(true);
             futureModules.add(deployHelper(vertx, options, module));
+            extendedModules.add(module);
         }
 
+        VertxScheduler vertxScheduler = new VertxScheduler(modules);
+        DeploymentOptions schedulerOpt = new DeploymentOptions().setWorker(true);
+        futureModules.add(deployHelper(vertx, schedulerOpt, vertxScheduler));
+        extendedModules.add(vertxScheduler);
+
         CompositeFuture.all(futureModules).onComplete(result -> {
             if(result.succeeded()) {
                 DeploymentOptions options = new DeploymentOptions()
-                        .setWorker(true)
-                        .setConfig(config());
-                deployHelper(vertx, options, new VertxServer(modules))
+                        .setWorker(true).setConfig(config());
+                deployHelper(vertx, options, new VertxHttpServer(extendedModules))
                         .onSuccess(startPromise::complete).onFailure(startPromise::fail);
             } else {
                 startPromise.fail(result.cause());

+ 17 - 41
connector-core/src/main/java/io/connector/core/VertxServer.java → connector-core/src/main/java/io/connector/core/VertxHttpServer.java

@@ -2,12 +2,13 @@ package io.connector.core;
 
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import io.connector.core.config.SchedulerConfig;
 import io.connector.core.json.JsonAttributeFormatter;
-import io.vertx.core.*;
+import io.vertx.core.AbstractVerticle;
+import io.vertx.core.CompositeFuture;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
 import io.vertx.core.eventbus.DeliveryOptions;
 import io.vertx.core.eventbus.EventBus;
-import io.vertx.core.eventbus.Message;
 import io.vertx.core.http.HttpServerResponse;
 import io.vertx.core.json.Json;
 import io.vertx.core.json.JsonObject;
@@ -18,26 +19,28 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.time.OffsetDateTime;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
 
 import static cz.senslog.common.http.HttpContentType.APPLICATION_JSON;
 import static cz.senslog.common.http.HttpHeader.CONTENT_TYPE;
-import static io.connector.core.AddressPath.*;
-import static io.connector.core.AddressPath.Creator.*;
-import static io.connector.core.MessageHeader.*;
+import static io.connector.core.AddressPath.Creator.create;
+import static io.connector.core.AddressPath.Creator.createNormalized;
+import static io.connector.core.AddressPath.EVENT;
+import static io.connector.core.AddressPath.INFO;
+import static io.connector.core.MessageHeader.RESOURCE;
 import static io.connector.core.MessageHeader.Resource.HTTP_SERVER;
-import static io.connector.core.MessageHeader.Resource.SCHEDULER;
 import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME;
 
-public class VertxServer extends AbstractVerticle {
+public class VertxHttpServer extends AbstractVerticle {
 
-    private final static Logger logger = LogManager.getLogger(VertxServer.class);
+    private final static Logger logger = LogManager.getLogger(VertxHttpServer.class);
 
     private final static String DEFAULT_DOMAIN_PREFIX  = "/api";
 
-    private final List<AbstractModule> modules;
+    private final List<Module> modules;
 
-    public VertxServer(List<AbstractModule> modules) {
+    public VertxHttpServer(List<Module> modules) {
         this.modules = modules;
     }
 
@@ -57,34 +60,7 @@ public class VertxServer extends AbstractVerticle {
               .getModule()
         );
 
-        for (AbstractModule module : modules) {
-            SchedulerConfig config = module.descriptor().getSchedulerConfig();
-            if (config != null && config.getPeriodSecond() > 0) {
-                vertx.setPeriodic(config.getPeriodMillisecond(), ctx -> {
-                    DeliveryOptions fetcherOpt = new DeliveryOptions()
-                            .addHeader(RESOURCE, SCHEDULER)
-                            .addHeader(ADDRESS, config.getConsumer());
-                    String providerAddress = createNormalized(SCHEDULER_PROVIDER, module.id());
-                    eventBus.request(providerAddress, new JsonObject(), fetcherOpt, reply -> {
-                            if (reply.succeeded()) {
-                                Message<Object> result = reply.result();
-                                String source = result.headers().get(MODULE_TYPE);
-                                DeliveryOptions pusherOpt = new DeliveryOptions();
-                                MultiMap headers = result.headers().addAll(fetcherOpt.getHeaders());
-                                for (Map.Entry<String, String> entryHeader : headers.entries()) {
-                                    if (entryHeader.getKey().startsWith(MessageHeader.getPrefix())) {
-                                        pusherOpt.addHeader(entryHeader.getKey(), entryHeader.getValue());
-                                    }
-                                }
-                                String consumerAddress = createNormalized(SCHEDULER_CONSUMER, source);
-                                eventBus.publish(consumerAddress, result.body(), pusherOpt);
-                            } else {
-                                logger.catching(reply.cause());
-                            }
-                        });
-                });
-            }
-
+        for (Module module : modules) {
             router.mountSubRouter(create(domainPrefix, module.id()), module.router());
         }
 
@@ -131,7 +107,7 @@ public class VertxServer extends AbstractVerticle {
 
         router.get(create(domainPrefix, INFO)).handler(ctx -> {
             List<Future> futures = new ArrayList<>(modules.size());
-            for (AbstractModule module : modules) {
+            for (Module module : modules) {
                 final Promise<ModuleInfo> promise = Promise.promise();
                 eventBus.<ModuleInfo>request(create(module.id(), INFO), new JsonObject(), reply -> {
                     if (reply.succeeded()) {

+ 107 - 0
connector-core/src/main/java/io/connector/core/VertxScheduler.java

@@ -0,0 +1,107 @@
+package io.connector.core;
+
+import io.connector.core.config.SchedulerConfig;
+import io.vertx.core.MultiMap;
+import io.vertx.core.eventbus.DeliveryOptions;
+import io.vertx.core.eventbus.EventBus;
+import io.vertx.core.eventbus.Message;
+import io.vertx.core.json.JsonArray;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.web.handler.BodyHandler;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static io.connector.core.AddressPath.Creator.create;
+import static io.connector.core.AddressPath.Creator.createNormalized;
+import static io.connector.core.AddressPath.SCHEDULER_CONSUMER;
+import static io.connector.core.AddressPath.SCHEDULER_PROVIDER;
+import static io.connector.core.MessageHeader.*;
+import static io.connector.core.MessageHeader.Resource.SCHEDULER;
+import static java.lang.String.format;
+
+class VertxScheduler extends Module {
+
+    private final static Logger logger = LogManager.getLogger(VertxScheduler.class);
+
+    private final Map<String, SchedulerConfig> schedulerConfigs;
+    private final Map<String, Long> scheduledModules;
+
+    VertxScheduler(List<AbstractModule> modules) {
+        super("scheduler");
+        this.schedulerConfigs = new HashMap<>();
+        for (AbstractModule module : modules) {
+            String moduleId = module.id();
+            ModuleDescriptor descriptor = module.descriptor();
+            if (descriptor != null && descriptor.getSchedulerConfig() != null) {
+                SchedulerConfig config = descriptor.getSchedulerConfig();
+                schedulerConfigs.put(moduleId, config);
+            }
+        }
+        this.scheduledModules = new HashMap<>(schedulerConfigs.size());
+    }
+
+    @Override
+    public void run() {
+
+        EventBus eventBus = vertx.eventBus();
+        for (Map.Entry<String, SchedulerConfig> configEntry : schedulerConfigs.entrySet()) {
+            String moduleId = configEntry.getKey();
+            SchedulerConfig config = configEntry.getValue();
+
+            if (config != null && config.getPeriodSecond() > 0) {
+                long schedulerId = vertx.setPeriodic(config.getPeriodMillisecond(), ctx -> {
+                    DeliveryOptions fetcherOpt = new DeliveryOptions()
+                            .addHeader(RESOURCE, SCHEDULER)
+                            .addHeader(ADDRESS, config.getConsumer());
+                    String providerAddress = createNormalized(SCHEDULER_PROVIDER, moduleId);
+                    // TODO instance of new JsonObject(), put config as a json
+                    eventBus.request(providerAddress, new JsonObject(), fetcherOpt, reply -> {
+                        if (reply.succeeded()) {
+                            Message<Object> result = reply.result();
+                            String source = result.headers().get(MODULE_TYPE);
+                            DeliveryOptions pusherOpt = new DeliveryOptions();
+                            MultiMap headers = result.headers().addAll(fetcherOpt.getHeaders());
+                            for (Map.Entry<String, String> entryHeader : headers.entries()) {
+                                if (entryHeader.getKey().startsWith(MessageHeader.getPrefix())) {
+                                    pusherOpt.addHeader(entryHeader.getKey(), entryHeader.getValue());
+                                }
+                            }
+                            String consumerAddress = createNormalized(SCHEDULER_CONSUMER, source);
+                            eventBus.publish(consumerAddress, result.body(), pusherOpt);
+                        } else {
+                            logger.catching(reply.cause());
+                        }
+                    });
+                });
+                scheduledModules.put(moduleId, schedulerId);
+            }
+        }
+
+        router().get(create("rules")).handler(BodyHandler.create()).handler(ctx -> {
+            JsonArray rules = new JsonArray();
+            for (Map.Entry<String, Long> entry : scheduledModules.entrySet()) {
+                rules.add(new JsonObject()
+                        .put("id", entry.getValue())
+                        .put("module", entry.getKey())
+                        .put("config", new JsonObject())
+                );
+            }
+            ctx.response().end(rules.encode());
+        });
+
+        router().delete(create("rule/:id")).handler(ctx -> {
+            String idString = ctx.pathParam("id");
+            long id;
+            try {
+                id = Long.parseLong(idString);
+            } catch (NumberFormatException e) {
+                throw new RuntimeException(format("Param '%s' can not be converted to integer.", idString));
+            }
+            ctx.response().end("Deleted rule " + id);
+        });
+    }
+}

+ 145 - 0
connector-model/src/main/java/io/connector/model/afarcloud/Unit.java

@@ -0,0 +1,145 @@
+package io.connector.model.afarcloud;
+
+import java.util.List;
+
+public class Unit {
+
+    private String resourceId;
+    private String resourceType;
+    private String resourceUrn;
+    private Double latitude;
+    private Double longitude;
+    private Double altitude;
+    private Boolean preprocessing;
+    private String pythonScript;
+    private List<SensorSchema> observations;
+
+    public String getResourceId() {
+        return resourceId;
+    }
+
+    public void setResourceId(String resourceId) {
+        this.resourceId = resourceId;
+    }
+
+    public String getResourceType() {
+        return resourceType;
+    }
+
+    public void setResourceType(String resourceType) {
+        this.resourceType = resourceType;
+    }
+
+    public String getResourceUrn() {
+        return resourceUrn;
+    }
+
+    public void setResourceUrn(String resourceUrn) {
+        this.resourceUrn = resourceUrn;
+    }
+
+    public Double getLatitude() {
+        return latitude;
+    }
+
+    public void setLatitude(Double latitude) {
+        this.latitude = latitude;
+    }
+
+    public Double getLongitude() {
+        return longitude;
+    }
+
+    public void setLongitude(Double longitude) {
+        this.longitude = longitude;
+    }
+
+    public Double getAltitude() {
+        return altitude;
+    }
+
+    public void setAltitude(Double altitude) {
+        this.altitude = altitude;
+    }
+
+    public Boolean getPreprocessing() {
+        return preprocessing;
+    }
+
+    public void setPreprocessing(Boolean preprocessing) {
+        this.preprocessing = preprocessing;
+    }
+
+    public String getPythonScript() {
+        return pythonScript;
+    }
+
+    public void setPythonScript(String pythonScript) {
+        this.pythonScript = pythonScript;
+    }
+
+    public List<SensorSchema> getObservations() {
+        return observations;
+    }
+
+    public void setObservations(List<SensorSchema> observations) {
+        this.observations = observations;
+    }
+
+    public static class SensorSchema {
+        private String observedProperty;
+        private String uom;
+        private Double accuracy;
+        private Integer propertyId;
+        private Double min_value;
+        private Double max_value;
+
+        public String getObservedProperty() {
+            return observedProperty;
+        }
+
+        public void setObservedProperty(String observedProperty) {
+            this.observedProperty = observedProperty;
+        }
+
+        public String getUom() {
+            return uom;
+        }
+
+        public void setUom(String uom) {
+            this.uom = uom;
+        }
+
+        public Double getAccuracy() {
+            return accuracy;
+        }
+
+        public void setAccuracy(Double accuracy) {
+            this.accuracy = accuracy;
+        }
+
+        public Integer getPropertyId() {
+            return propertyId;
+        }
+
+        public void setPropertyId(Integer propertyId) {
+            this.propertyId = propertyId;
+        }
+
+        public Double getMin_value() {
+            return min_value;
+        }
+
+        public void setMin_value(Double min_value) {
+            this.min_value = min_value;
+        }
+
+        public Double getMax_value() {
+            return max_value;
+        }
+
+        public void setMax_value(Double max_value) {
+            this.max_value = max_value;
+        }
+    }
+}

+ 76 - 1
connector-module-afarcloud/src/main/java/io/connector/module/afarcloud/AFCClient.java

@@ -1,15 +1,26 @@
 package io.connector.module.afarcloud;
 
+import com.google.gson.reflect.TypeToken;
 import cz.senslog.common.http.HttpClient;
+import cz.senslog.common.http.HttpRequest;
+import cz.senslog.common.http.HttpResponse;
+import cz.senslog.common.http.URLBuilder;
 import io.connector.core.config.HostConfig;
 import io.connector.model.afarcloud.MultiSimpleObservation;
+import io.connector.model.afarcloud.Unit;
 import io.connector.module.afarcloud.gateway.SensLog1Gateway;
 import io.vertx.core.json.JsonObject;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.lang.reflect.Type;
+import java.util.Collection;
 import java.util.List;
 
+import static cz.senslog.common.http.HttpContentType.TEXT_PLAIN;
+import static cz.senslog.common.json.BasicJson.jsonToObject;
+import static java.lang.String.format;
+
 public class AFCClient {
 
     private final static Logger logger = LogManager.getLogger(SensLog1Gateway.class);
@@ -25,7 +36,7 @@ public class AFCClient {
 
     public void uploadAggregatedMeasurements(List<MultiSimpleObservation> observations) {
 
-        HostConfig host = new HostConfig(config.getDomain(), "telemetry");
+        HostConfig host = new HostConfig(config.getTelemetryDomain(), "telemetry");
 
         if (observations.size() > 0) {
             JsonObject json = JsonObject.mapFrom(observations.get(0));
@@ -34,4 +45,68 @@ public class AFCClient {
         
         logger.info("Converted {} observations.", observations.size());
     }
+
+    public List<Unit> getAllSensors() {
+
+        HostConfig host = new HostConfig(config.getInfoDomain(), "registry/getAllSensors");
+
+        HttpRequest request = HttpRequest.newBuilder()
+                .contentType(TEXT_PLAIN)
+                .url(URLBuilder.newBuilder(host.getDomain(), host.getPath()).build())
+                .GET().build();
+
+        HttpResponse response = httpClient.send(request);
+
+        if (response.isError()) {
+            throw new RuntimeException(response.getBody());
+        }
+
+        final Type unitType = new TypeToken<Collection<Unit>>() {}.getType();
+        return jsonToObject(response.getBody(), unitType);
+
+    }
+
+    public Unit getSensor(String id) {
+
+        HostConfig host = new HostConfig(config.getInfoDomain(), format("registry/getSensor/%s", id));
+
+        HttpRequest request = HttpRequest.newBuilder()
+                .contentType(TEXT_PLAIN)
+                .url(URLBuilder.newBuilder(host.getDomain(), host.getPath()).build())
+                .GET().build();
+
+        HttpResponse response = httpClient.send(request);
+
+        if (response.isError()) {
+            throw new RuntimeException(response.getBody());
+        }
+
+        return jsonToObject(response.getBody(), Unit.class);
+
+        /*
+
+
+        Unit unit = new Unit();
+        unit.setResourceId(id);
+        unit.setResourceType("AFC 8CF95740000008AE");
+        unit.setResourceUrn("urn:afc:AS07:enviro:IMA:air_quality:10002222");
+        unit.setLatitude(50.0382589);
+        unit.setLongitude(14.6112164);
+        unit.setAltitude(280.0);
+        unit.setPreprocessing(false);
+        unit.setPythonScript("");
+
+        Unit.SensorSchema sensorSchema = new Unit.SensorSchema();
+        sensorSchema.setObservedProperty("air_temperature");
+        sensorSchema.setUom("http://qudt.org/vocab/unit/DEG_C");
+        sensorSchema.setAccuracy(0.1);
+        sensorSchema.setPropertyId(456);
+        sensorSchema.setMin_value(-40.0);
+        sensorSchema.setMax_value(60.0);
+        unit.setObservations(singletonList(sensorSchema));
+
+        return unit;
+
+         */
+    }
 }

+ 17 - 4
connector-module-afarcloud/src/main/java/io/connector/module/afarcloud/AFCConfig.java

@@ -4,13 +4,26 @@ import io.connector.core.config.DefaultConfig;
 
 public class AFCConfig {
 
-    private final String domain;
+    private final String telemetryDomain;
+    private final String retrievalDomain;
+    private final String infoDomain;
 
     AFCConfig(DefaultConfig defaultConfig) {
-        this.domain = defaultConfig.getPropertyConfig("api").getStringProperty("domain");
+        this.telemetryDomain = defaultConfig.getPropertyConfig("telemetryApi").getStringProperty("domain");
+        this.retrievalDomain = defaultConfig.getPropertyConfig("telemetryApi").getStringProperty("domain");
+        this.infoDomain = defaultConfig.getPropertyConfig("infoApi").getStringProperty("domain");
+
+    }
+
+    public String getTelemetryDomain() {
+        return telemetryDomain;
+    }
+
+    public String getRetrievalDomain() {
+        return retrievalDomain;
     }
 
-    public String getDomain() {
-        return domain;
+    public String getInfoDomain() {
+        return infoDomain;
     }
 }

+ 1 - 1
connector-module-afarcloud/src/main/java/io/connector/module/afarcloud/AFCModule.java

@@ -21,7 +21,7 @@ public class AFCModule extends AbstractModule {
     public void run() {
         registerGateway(new AFCGateway("AFarCloud", client));
         registerGateway(new SensLog1Gateway("SensLogV1", client));
-        registerGateway(new OGCSensorThingsGateway("OGCSensorThings"));
+        registerGateway(new OGCSensorThingsGateway("OGCSensorThings", client));
     }
 
     @Override

+ 94 - 25
connector-module-afarcloud/src/main/java/io/connector/module/afarcloud/gateway/OGCSensorThingsGateway.java

@@ -2,7 +2,9 @@ package io.connector.module.afarcloud.gateway;
 
 import io.connector.core.AbstractGateway;
 import io.connector.core.http.RequestUriComponent;
+import io.connector.model.afarcloud.Unit;
 import io.connector.model.sensorthings.*;
+import io.connector.module.afarcloud.AFCClient;
 import io.vertx.core.json.JsonObject;
 
 import java.util.ArrayList;
@@ -11,13 +13,17 @@ import java.util.List;
 import static cz.senslog.common.http.HttpContentType.APPLICATION_JSON;
 import static cz.senslog.common.http.HttpHeader.CONTENT_TYPE;
 import static io.connector.core.AddressPath.Creator.create;
+import static io.vertx.core.json.Json.encode;
 import static java.lang.String.format;
 import static java.util.Arrays.asList;
 
 public class OGCSensorThingsGateway extends AbstractGateway {
 
-    public OGCSensorThingsGateway(String id) {
+    private final AFCClient client;
+
+    public OGCSensorThingsGateway(String id, AFCClient client) {
         super(id);
+        this.client = client;
     }
 
     @Override
@@ -45,27 +51,48 @@ public class OGCSensorThingsGateway extends AbstractGateway {
 
         router().get(create("Locations(:id)")).handler(ctx -> {
             RequestUriComponent uriComponent = parseUriToComponents(ctx.request());
-            int id = Integer.parseInt(ctx.pathParam("id"));
+            String id = ctx.pathParam("id");
 
-            Location location = new Location();
-            location.setId(id);
-            location.setSelfLink(format("%s/Locations(%s)", uriComponent.getGatewayUri(), id));
-            location.setHistoricalLocationsNavigationLink(format("Locations(%s)/HistoricalLocations", id));
-            location.setEncodingType("application/vnd.geo+json");
+            Unit afcUnit = client.getSensor(id);
 
-            location.setName("CCIT");
-            location.setDescription("Calgary Center for Innvative Technologies");
+            Location ogcLocation = new Location();
+            ogcLocation.setId(Integer.parseInt(id));
+            ogcLocation.setSelfLink(format("%s/Locations(%s)", uriComponent.getGatewayUri(), id));
+            ogcLocation.setHistoricalLocationsNavigationLink("unknown");
+            ogcLocation.setName(afcUnit.getResourceType());
+            ogcLocation.setDescription(afcUnit.getResourceUrn());
+            ogcLocation.setEncodingType("application/vnd.geo+json");
 
             Location.Info info = new Location.Info();
             info.setType("Feature");
-            location.setLocation(info);
+            ogcLocation.setLocation(info);
 
             Geometry geometry = new Geometry();
             geometry.setType("Point");
-            geometry.setCoordinates(asList(-114.06,51.05));
+            geometry.setCoordinates(asList(afcUnit.getLatitude(), afcUnit.getLongitude()));
             info.setGeometry(geometry);
 
-            ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON).end(location.encode());
+            /*
+            Location example = new Location();
+            example.setId(Integer.parseInt(id));
+            example.setSelfLink(format("%s/Locations(%s)", uriComponent.getGatewayUri(), id));
+            example.setHistoricalLocationsNavigationLink(format("Locations(%s)/HistoricalLocations", id));
+            example.setEncodingType("application/vnd.geo+json");
+
+            example.setName("CCIT");
+            example.setDescription("Calgary Center for Innvative Technologies");
+
+            Location.Info exampleInfo = new Location.Info();
+            exampleInfo.setType("Feature");
+            example.setLocation(exampleInfo);
+
+            Geometry exampleGeometry = new Geometry();
+            exampleGeometry.setType("Point");
+            exampleGeometry.setCoordinates(asList(-114.06,51.05));
+            exampleInfo.setGeometry(exampleGeometry);
+            */
+
+            ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON).end(ogcLocation.encode());
         });
 
         router().get(create("Datastreams(:id)")).handler(ctx -> {
@@ -107,7 +134,7 @@ public class OGCSensorThingsGateway extends AbstractGateway {
             ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON).end(datastream.encode());
         });
 
-        router().get(create("Things(:id)/HistoricalLocations")).handler(ctx -> {
+        router().get(create("HistoricalLocations(:id)")).handler(ctx -> {
             RequestUriComponent uriComponent = parseUriToComponents(ctx.request());
             int id = Integer.parseInt(ctx.pathParam("id"));
 
@@ -129,20 +156,31 @@ public class OGCSensorThingsGateway extends AbstractGateway {
             ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON).end(historicalLocation.encode());
         });
 
-        router().get(create("Sensors(:id)")).handler(ctx -> {
+        router().get(create("Sensors")).handler(ctx -> {
             RequestUriComponent uriComponent = parseUriToComponents(ctx.request());
-            int id = Integer.parseInt(ctx.pathParam("id"));
-
-            Sensor sensor = new Sensor();
-            sensor.setId(id);
-            sensor.setSelfLink(format("%s/Sensors(%s)", uriComponent.getGatewayUri(), id));
-            sensor.setDataStreamNavigationLink(format("Sensors(%s)/Datastreams", id));
-            sensor.setName("TMP36");
-            sensor.setDescription("TMP36 - Analog Temperature sensor");
-            sensor.setEncodingType("application/pdf");
-            sensor.setMetadata("http://example.org/TMP35_36_37.pdf");
+            List<Unit> afcAllSensors = client.getAllSensors();
+            List<Sensor> ogcSensors = Converter.convertSensors(afcAllSensors, uriComponent);
+            ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON).end(encode(ogcSensors));
+        });
 
-            ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON).end(sensor.encode());
+        router().get(create("Sensors(:id)")).handler(ctx -> {
+            RequestUriComponent uriComponent = parseUriToComponents(ctx.request());
+            String id = ctx.pathParam("id");
+            Unit afcUnit = client.getSensor(id);
+            Sensor ogcSensor = Converter.convertSensor(afcUnit, uriComponent);
+
+            /*
+            Sensor example = new Sensor();
+            example.setId(Integer.parseInt(id));
+            example.setSelfLink(format("%s/Sensors(%s)", uriComponent.getGatewayUri(), id));
+            example.setDataStreamNavigationLink(format("Sensors(%s)/Datastreams", id));
+            example.setName("TMP36");
+            example.setDescription("TMP36 - Analog Temperature sensor");
+            example.setEncodingType("application/pdf");
+            example.setMetadata("http://example.org/TMP35_36_37.pdf");
+            */
+
+            ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON).end(ogcSensor.encode());
         });
 
         router().get(create("ObservedProperties(:id)")).handler(ctx -> {
@@ -200,4 +238,35 @@ public class OGCSensorThingsGateway extends AbstractGateway {
             ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON).end(featureOfInterest.encode());
         });
     }
+
+    private static class Converter {
+
+        static Sensor convertSensor(Unit afcUnit, RequestUriComponent uriComponent) {
+            Sensor ogcSensor = new Sensor();
+
+            int ogcId;
+            try {
+                ogcId = Integer.parseInt(afcUnit.getResourceId());
+            } catch (NumberFormatException e) {
+                throw new NumberFormatException(format("Number %s can not be converted as integer.", afcUnit.getResourceId()));
+            }
+
+            ogcSensor.setId(ogcId);
+            ogcSensor.setSelfLink(format("%s/Sensors(%s)", uriComponent.getGatewayUri(), afcUnit.getResourceId()));
+            ogcSensor.setDataStreamNavigationLink("unknown"); // TODO add Datastream navigation link
+            ogcSensor.setName(afcUnit.getResourceType());
+            ogcSensor.setDescription(afcUnit.getResourceUrn());
+            ogcSensor.setEncodingType("unknown");
+            ogcSensor.setMetadata("none");
+            return ogcSensor;
+        }
+
+        static List<Sensor> convertSensors(List<Unit> afcUnits, RequestUriComponent uriComponent) {
+            List<Sensor> ogcSensors = new ArrayList<>(afcUnits.size());
+            for (Unit afcUnit : afcUnits) {
+                ogcSensors.add(convertSensor(afcUnit, uriComponent));
+            }
+            return ogcSensors;
+        }
+    }
 }

+ 68 - 26
connector-module-ogc-sensorthings/src/main/java/io/connector/module/ogc/sensorthings/gateway/AFarCloudGateway.java

@@ -1,13 +1,11 @@
 package io.connector.module.ogc.sensorthings.gateway;
 
 import io.connector.core.AbstractGateway;
-import io.connector.model.afarcloud.Observation;
-import io.connector.model.afarcloud.ResourceMeasurement;
-import io.connector.model.afarcloud.ResponseModel;
-import io.connector.model.afarcloud.SensorTelemetry;
+import io.connector.model.afarcloud.*;
 import io.connector.module.ogc.sensorthings.SensorThingsClient;
 import io.vertx.core.MultiMap;
 import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.json.JsonObject;
 
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -51,9 +49,9 @@ public class AFarCloudGateway extends AbstractGateway {
         return params.contains(paramName) ? params.get(paramName) : defValue;
     }
 
-    private static List<String> getSplittedParam(MultiMap params, String paramName, String splitter) {
+    private static List<String> getSplittedParam(MultiMap params, String paramName) {
         String param = getParam(params, paramName);
-        return param != null ? asList(param.split(splitter)) : emptyList();
+        return param != null ? asList(param.split(",")) : emptyList();
     }
 
     @Override
@@ -66,12 +64,12 @@ public class AFarCloudGateway extends AbstractGateway {
             MultiMap params = req.params();
 
             final int limit = parseInt(getRequiredParam(params, "limit"));
-            final List<String> entityNames = getSplittedParam(params, "entityNames", ",");
-            final List<String> devices = getSplittedParam(params, "devices", ",");
-            final List<String> services = getSplittedParam(params, "services", ",");
-            final List<String> types = getSplittedParam(params, "types", ",");
-            final List<String> providers = getSplittedParam(params, "providers", ",");
-            final List<String> measurements = getSplittedParam(params, "measurements", ",");
+            final List<String> entityNames = getSplittedParam(params, "entityNames");
+            final List<String> devices = getSplittedParam(params, "devices");
+            final List<String> services = getSplittedParam(params, "services");
+            final List<String> types = getSplittedParam(params, "types");
+            final List<String> providers = getSplittedParam(params, "providers");
+            final List<String> measurements = getSplittedParam(params, "measurements");
             final Integer minAltitude = params.contains("altitude") ? parseInt(params.get("altitude")) : null; // TODO
             final String order = getParam(params, "order", "DESC").toUpperCase();
 
@@ -123,12 +121,12 @@ public class AFarCloudGateway extends AbstractGateway {
             MultiMap params = req.params();
 
             final int limit = parseInt(getRequiredParam(params, "limit"));
-            final List<String> entityNames = getSplittedParam(params, "entityNames", ",");
-            final List<String> devices = getSplittedParam(params, "devices", ",");
-            final List<String> services = getSplittedParam(params, "services", ",");
-            final List<String> types = getSplittedParam(params, "types", ",");
-            final List<String> providers = getSplittedParam(params, "providers", ",");
-            final List<String> measurements = getSplittedParam(params, "measurements", ",");
+            final List<String> entityNames = getSplittedParam(params, "entityNames");
+            final List<String> devices = getSplittedParam(params, "devices");
+            final List<String> services = getSplittedParam(params, "services");
+            final List<String> types = getSplittedParam(params, "types");
+            final List<String> providers = getSplittedParam(params, "providers");
+            final List<String> measurements = getSplittedParam(params, "measurements");
             final String order = getParam(params, "order", "DESC").toUpperCase();
             final Integer minAltitude = params.contains("altitude") ? parseInt(params.get("altitude")) : null; // TODO
             final Double centrLong = params.contains("centr_long") ? parseDouble(params.get("centr_long")) : null; // TODO
@@ -195,7 +193,8 @@ public class AFarCloudGateway extends AbstractGateway {
             final String startTime = getRequiredParam(params, "start_time");
             final String endTime = getParam(params, "end_time");
 
-            ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON).end(encode(null));
+            ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON).end(
+                    new JsonObject().put("message", "not implemented yet").encode());
         });
 
         router().get(create("getObservationsBySensor/historic")).handler(ctx -> {
@@ -207,40 +206,83 @@ public class AFarCloudGateway extends AbstractGateway {
             final String startTime = getRequiredParam(params, "start_time");
             final String endTime = getParam(params, "end_time");
 
-            ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON).end(encode(null));
+            ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON).end(
+                    new JsonObject().put("message", "not implemented yet").encode());
         });
 
         // Query region telemetry
         router().get(create("getRegionTelemetry/latest")).handler(ctx -> {
-
+            ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON).end(
+                    new JsonObject().put("message", "not implemented yet").encode());
         });
 
         // Query region telemetry interval
         router().get(create("getRegionTelemetry/historic")).handler(ctx -> {
-
+            ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON).end(
+                    new JsonObject().put("message", "not implemented yet").encode());
         });
 
         // Query collar telemetry
         router().get(create("getCollarTelemetry/latest")).handler(ctx -> {
-
+            ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON).end(
+                    new JsonObject().put("message", "not implemented yet").encode());
         });
 
         router().get(create("getObservationsByCollar/latest")).handler(ctx -> {
-
+            ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON).end(
+                    new JsonObject().put("message", "not implemented yet").encode());
         });
 
         router().get(create("getObservationsByCollar/historic")).handler(ctx -> {
-
+            ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON).end(
+                    new JsonObject().put("message", "not implemented yet").encode());
         });
 
         // Query collar telemetry interval
         router().get(create("getCollarTelemetry/historic")).handler(ctx -> {
-
+            ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON).end(
+                    new JsonObject().put("message", "not implemented yet").encode());
         });
 
         // Schema-measurement
         router().get(create("getMeasurements")).handler(ctx -> {
+            ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON).end(
+                    new JsonObject().put("message", "not implemented yet").encode());
+        });
+
+        // Registry
+        router().get(create("registry/getSensor/:id")).handler(ctx -> {
+            // domain is different
+
+            String resourceId = ctx.pathParam("id");
+
+            Unit unit = new Unit();
+            unit.setResourceId(resourceId);
+            unit.setResourceType("AFC 8CF95740000008AE");
+            unit.setResourceUrn("urn:afc:AS07:enviro:IMA:air_quality:10002222");
+            unit.setLatitude(50.0382589);
+            unit.setLongitude(14.6112164);
+            unit.setAltitude(280.0);
+            unit.setPreprocessing(false);
+            unit.setPythonScript("");
+
+            Unit.SensorSchema sensorSchema = new Unit.SensorSchema();
+            sensorSchema.setObservedProperty("air_temperature");
+            sensorSchema.setUom("http://qudt.org/vocab/unit/DEG_C");
+            sensorSchema.setAccuracy(0.1);
+            sensorSchema.setPropertyId(456);
+            sensorSchema.setMin_value(-40.0);
+            sensorSchema.setMax_value(60.0);
+            unit.setObservations(singletonList(sensorSchema));
+
+            ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON).end(encode(unit));
+        });
+
+        router().get(create("registry/getCollar/:id")).handler(ctx -> {
+            // domain is different
 
+            ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON).end(
+                    new JsonObject().put("message", "not implemented yet").encode());
         });
     }
 }