Просмотр исходного кода

Refactored events, added router...

Lukas Cerny 5 лет назад
Родитель
Сommit
6ab8c5c6de
54 измененных файлов с 730 добавлено и 420 удалено
  1. 12 16
      config/test.yaml
  2. 13 5
      connector-app/src/main/java/io/connector/app/Application.java
  3. 10 7
      connector-app/src/main/java/io/connector/app/Parameters.java
  4. 41 25
      connector-core/src/main/java/io/connector/core/AbstractGateway.java
  5. 14 8
      connector-core/src/main/java/io/connector/core/AbstractModule.java
  6. 5 11
      connector-core/src/main/java/io/connector/core/AddressPath.java
  7. 4 0
      connector-core/src/main/java/io/connector/core/DataCollection.java
  8. 68 0
      connector-core/src/main/java/io/connector/core/EventInfo.java
  9. 4 0
      connector-core/src/main/java/io/connector/core/Message.java
  10. 2 9
      connector-core/src/main/java/io/connector/core/ModuleDeployer.java
  11. 13 1
      connector-core/src/main/java/io/connector/core/ModuleInfo.java
  12. 2 3
      connector-core/src/main/java/io/connector/core/ModuleLoader.java
  13. 1 3
      connector-core/src/main/java/io/connector/core/ModuleProvider.java
  14. 38 14
      connector-core/src/main/java/io/connector/core/VertxServer.java
  15. 1 3
      connector-core/src/main/java/io/connector/core/config/PropertyConfig.java
  16. 2 9
      connector-core/src/main/java/io/connector/core/config/SchedulerConfig.java
  17. 11 0
      connector-core/src/main/java/io/connector/core/config/file/FileConfigurationServiceImpl.java
  18. 0 11
      connector-core/src/main/java/io/connector/core/module/ModuleType.java
  19. 0 22
      connector-model/src/main/java/io/connector/model/afarcloud/AFCModel.java
  20. 32 0
      connector-model/src/main/java/io/connector/model/afarcloud/Location.java
  21. 34 0
      connector-model/src/main/java/io/connector/model/afarcloud/MultiSimpleObservation.java
  22. 0 6
      connector-model/src/main/java/io/connector/model/afarcloud/Observation.java
  23. 58 0
      connector-model/src/main/java/io/connector/model/afarcloud/SimpleObservation.java
  24. 22 0
      connector-model/src/main/java/io/connector/model/senslog1/Sensor.java
  25. 12 12
      connector-model/src/main/java/io/connector/model/senslog1/SensorData.java
  26. 22 0
      connector-model/src/main/java/io/connector/model/senslog1/Unit.java
  27. 10 10
      connector-model/src/main/java/io/connector/model/senslog1/UnitData.java
  28. 17 4
      connector-module-afarcloud/src/main/java/io/connector/module/afarcloud/AFCClient.java
  29. 6 0
      connector-module-afarcloud/src/main/java/io/connector/module/afarcloud/AFCConfig.java
  30. 6 8
      connector-module-afarcloud/src/main/java/io/connector/module/afarcloud/AFCModule.java
  31. 7 4
      connector-module-afarcloud/src/main/java/io/connector/module/afarcloud/AFCModuleProvider.java
  32. 3 10
      connector-module-afarcloud/src/main/java/io/connector/module/afarcloud/gateway/AFCGateway.java
  33. 38 45
      connector-module-afarcloud/src/main/java/io/connector/module/afarcloud/gateway/SensLog1Gateway.java
  34. 0 0
      connector-module-afarcloud/src/main/resources/META-INF/services/io.connector.core.ModuleProvider
  35. 0 21
      connector-module-ima/src/main/java/io/connector/module/ima/AFCConverter.java
  36. 0 26
      connector-module-ima/src/main/java/io/connector/module/ima/IMAModule.java
  37. 0 18
      connector-module-ima/src/main/java/io/connector/module/ima/IMAModuleProvider.java
  38. 3 3
      connector-module-ima/src/main/java/io/connector/module/ima/ImaClient.java
  39. 2 2
      connector-module-ima/src/main/java/io/connector/module/ima/ImaConfig.java
  40. 25 0
      connector-module-ima/src/main/java/io/connector/module/ima/ImaModule.java
  41. 18 0
      connector-module-ima/src/main/java/io/connector/module/ima/ImaModuleProvider.java
  42. 1 0
      connector-module-ima/src/main/resources/META-INF/services/io.connector.core.ModuleProvider
  43. 0 1
      connector-module-ima/src/main/resources/META-INF/services/io.connector.core.module.ModuleProvider
  44. 6 11
      connector-module-ogc-sensorthings/src/main/java/io/connector/module/ogc/sensorthings/SensorThingsModule.java
  45. 3 3
      connector-module-ogc-sensorthings/src/main/java/io/connector/module/ogc/sensorthings/SensorThingsModuleProvider.java
  46. 4 7
      connector-module-ogc-sensorthings/src/main/java/io/connector/module/ogc/sensorthings/gateway/SensorThingsGateway.java
  47. 0 0
      connector-module-ogc-sensorthings/src/main/resources/META-INF/services/io.connector.core.ModuleProvider
  48. 63 6
      connector-module-senslog1/src/main/java/io/connector/module/senslog1/SensLog1Client.java
  49. 9 13
      connector-module-senslog1/src/main/java/io/connector/module/senslog1/SensLog1Module.java
  50. 3 3
      connector-module-senslog1/src/main/java/io/connector/module/senslog1/SensLog1ModuleProvider.java
  51. 18 0
      connector-module-senslog1/src/main/java/io/connector/module/senslog1/SensLog1SchedulerConfig.java
  52. 2 7
      connector-module-senslog1/src/main/java/io/connector/module/senslog1/gateway/AFCGateway.java
  53. 65 53
      connector-module-senslog1/src/main/java/io/connector/module/senslog1/gateway/SensLog1Gateway.java
  54. 0 0
      connector-module-senslog1/src/main/resources/META-INF/services/io.connector.core.ModuleProvider

+ 12 - 16
config/test.yaml

@@ -4,13 +4,13 @@ services:
 
   senslog1:
       api: &apiDomain
-        domain: "http://51.15.45.95:8080/senslog1"
+        domain: "http://foodie.lesprojekt.cz:8080/MapLogOT/"
 
-      name: "SensLog V1 Latvia"
+      name: "Foodie SensLog V1"
       provider: "io.connector.module.senslog1.SensLog1ModuleProvider"
 
-      user: "vilcini"
-      group: "vilcini"
+      user: "afarcloud"
+      group: "afc"
 
       sensorServiceHost:
         <<: *apiDomain
@@ -38,16 +38,12 @@ services:
         domain: "https://iotlorawan.azurewebsites.net"
 
       name: "IoT LoraWan"
-      provider: "io.connector.module.ima.IMAModuleProvider"
-
-      host:
-        <<: *apiDomain
-        path: "<path>"
+      provider: "io.connector.module.ima.ImaModuleProvider"
 
 
   AFC:
       api: &apiDomain
-        domain: "https://rest.afarcloud.smartarch.cz/storage/rest/registry"
+        domain: "https://rest.afarcloud.smartarch.cz/storage/rest"
 
       name: "AFarCloud"
       provider: "io.connector.module.afarcloud.AFCModuleProvider"
@@ -57,9 +53,9 @@ services:
         path: "<path>"
 
 scheduler:
-#  senslog1:
-#      period: 15
-#      consumer: "schedule-observations"
-#      config:
-#        allowedStations:
-#          345345: [1, 2, 3]
+  senslog1:
+      period: 30
+      consumer: "schedule-observations"
+      config:
+        allowedStations:
+          10002222: [1, 2]

+ 13 - 5
connector-app/src/main/java/io/connector/app/Application.java

@@ -3,6 +3,7 @@ package io.connector.app;
 import cz.senslog.common.util.StringUtils;
 import io.connector.core.config.ConfigurationService;
 import io.connector.core.config.file.FileConfigurationService;
+import io.connector.core.AbstractModule;
 import io.vertx.core.DeploymentOptions;
 import io.vertx.core.Vertx;
 import io.vertx.core.json.JsonObject;
@@ -10,13 +11,14 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
+import java.util.List;
 
 import static io.connector.core.ModuleDeployer.deploy;
-import static io.connector.core.module.ModuleLoader.loadModules;
+import static io.connector.core.ModuleLoader.loadModules;
 
 public class Application extends Thread {
 
-    private static Logger logger = LogManager.getLogger(Application.class);
+    private static final Logger logger = LogManager.getLogger(Application.class);
 
     private final AppConfig appConfig;
     private final Parameters params;
@@ -62,10 +64,16 @@ public class Application extends Thread {
             System.exit(1);
         }
 
-        DeploymentOptions options = new DeploymentOptions()
-                .setConfig(new JsonObject().put("http.server.port", params.getPort()));
+        JsonObject config = new JsonObject();
+        config.put("http.server.port", params.getPort());
 
-        Vertx.vertx().deployVerticle(deploy(loadModules(configService)), options, res -> {
+        if (params.getDomainPrefix() != null) {
+            config.put("http.server.prefix", params.getDomainPrefix());
+        }
+
+        DeploymentOptions options = new DeploymentOptions().setConfig(config);
+        List<AbstractModule> modules = loadModules(configService);
+        Vertx.vertx().deployVerticle(deploy(modules), options, res -> {
             if(res.succeeded()) {
                 logger.info("Deployment id is: {}", res.result());
             } else {

+ 10 - 7
connector-app/src/main/java/io/connector/app/Parameters.java

@@ -24,7 +24,7 @@ import static java.nio.file.Paths.get;
  */
 final class Parameters {
 
-    private static Logger logger = LogManager.getLogger(Parameters.class);
+    private static final Logger logger = LogManager.getLogger(Parameters.class);
 
     private JCommander jCommander;
 
@@ -59,16 +59,15 @@ final class Parameters {
     @Parameter(names = {"-h", "-help"}, help = true)
     private boolean help = false;
 
-    @Parameter(names = {"-cf", "-config-file"}, description = "Configuration file in .yaml format.")
+    @Parameter(names = {"-cf", "-config-file"}, description = "Configuration file in .yaml format.", required = false)
     private String configFileName;
 
-    @Parameter(names = {"-p", "-port"}, description = "Access port for HTTP server", required = true)
+    @Parameter(names = {"-p", "-port"}, description = "Access port for the HTTP server", required = true)
     private int port;
 
-    /**
-     * Returns name of the configuration file.
-     * @return string name.
-     */
+    @Parameter(names = {"-prefix"}, description = "Domain prefix for the HTTP server.", required = false)
+    private String domainPrefix;
+
     public String getConfigFileName() {
         return configFileName;
     }
@@ -77,6 +76,10 @@ final class Parameters {
         return port;
     }
 
+    public String getDomainPrefix() {
+        return domainPrefix;
+    }
+
     public boolean isHelp() {
         return help;
     }

+ 41 - 25
connector-core/src/main/java/io/connector/core/AbstractGateway.java

@@ -1,6 +1,5 @@
 package io.connector.core;
 
-import io.connector.core.module.ModuleType;
 import io.vertx.core.eventbus.DeliveryOptions;
 import io.vertx.core.eventbus.EventBus;
 import io.vertx.ext.web.Router;
@@ -14,7 +13,7 @@ import java.util.Set;
 import java.util.function.Consumer;
 
 import static io.connector.core.AddressPath.*;
-import static io.connector.core.AddressPath.Creator.create;
+import static io.connector.core.AddressPath.Creator.createNormalized;
 import static io.connector.core.MessageHeader.*;
 import static java.lang.String.format;
 
@@ -22,38 +21,53 @@ public abstract class AbstractGateway {
 
     private final static Logger logger = LogManager.getLogger(AbstractGateway.class);
 
-    private final EventBus eventBus;
-    private final Event event;
-    private final Router router;
-    private final ModuleType moduleType;
-    protected final ModuleType gatewayType;
+    private final boolean isDefault;
+    private EventBus eventBus;
+    private Router router;
+    private Event event;
+    private String moduleId;
+    protected String gatewayId;
     private final Set<String> registeredConsumers;
     private final Map<String, String> schedulerMapping;
 
-    protected AbstractGateway(EventBus eventBus, Router router, ModuleType moduleType, ModuleType gatewayType) {
-        this.eventBus = eventBus;
-        this.event = new EventImpl(eventBus);
-        this.router = router;
-        this.moduleType = moduleType;
-        this.gatewayType = gatewayType;
+    protected AbstractGateway(String id, boolean isDefault) {
+        this.gatewayId = id;
+        this.isDefault = isDefault;
         this.registeredConsumers = new HashSet<>();
         this.schedulerMapping = new HashMap<>();
     }
 
+    protected AbstractGateway(String id) {
+        this(id, false);
+    }
+
     protected abstract void run();
 
     public String id() {
-        return gatewayType.name().toLowerCase();
+        return gatewayId;
     }
 
-    public final Router router() {
+    protected final Router router() {
         return router;
     }
 
-    public final Event event() {
+    protected final Event event() {
         return event;
     }
 
+    protected void setModuleId(String moduleId) {
+        this.moduleId = moduleId;
+    }
+
+    protected void setEventBus(EventBus eventBus) {
+        this.eventBus = eventBus;
+        this.event = new EventImpl(eventBus);
+    }
+
+    protected void setRouter(Router router) {
+        this.router = router;
+    }
+
     public final void start() {
         run();
         registerSchedulerConsumers();
@@ -63,11 +77,10 @@ public abstract class AbstractGateway {
         if (!registeredConsumers.contains(consumerName)) {
             throw logger.throwing(new RuntimeException(
                     format("Consumer '%s' in module %s and gateway %s is not registered.",
-                            consumerName, moduleType.name().toLowerCase(), gatewayType.name().toLowerCase()
-                    )
+                            consumerName, moduleId, gatewayId)
             ));
         }
-        return create(moduleType, gatewayType, consumerName);
+        return createNormalized(moduleId, gatewayId, consumerName);
     }
 
     public interface SchedulerMapping {
@@ -84,14 +97,13 @@ public abstract class AbstractGateway {
     }
 
     private void registerSchedulerConsumers() {
-        final boolean isDefault = moduleType.equals(gatewayType);
         for (Map.Entry<String, String> mappingEntry : schedulerMapping.entrySet()) {
             if (isDefault) {
                 Consumer<String> check = name -> {
                     if (!registeredConsumers.contains(name)) {
                         throw logger.throwing(new RuntimeException(
                                 format("Consumer '%s' in module %s and gateway %s is not registered.",
-                                        name, moduleType.name().toLowerCase(), gatewayType.name().toLowerCase()
+                                        name, moduleId, gatewayId
                                 )
                         ));
                     }
@@ -102,7 +114,7 @@ public abstract class AbstractGateway {
         }
 
         String baseAddr = isDefault ? SCHEDULER_PROVIDER : SCHEDULER_CONSUMER;
-        eventBus.consumer(create(baseAddr, gatewayType.name()), message -> {
+        eventBus.consumer(createNormalized(baseAddr, gatewayId), message -> {
             if (isDefault) {
                 String proxyConsumerName = message.headers().get(ADDRESS);
                 // this is a provider and must request a local consumer
@@ -172,7 +184,11 @@ public abstract class AbstractGateway {
             registeredConsumers.add(address);
             eventBus.<T>consumer(createAddress(address), message -> {
                 Message<T> msg = new Message<>(message);
-                handler.accept(msg);
+                try {
+                    handler.accept(msg);
+                } catch (Exception e) {
+                    msg.fail(e);
+                }
                 if (msg.isFail()) {
                     Fail fail = msg.cause();
                     message.fail(fail.getCode(), fail.getMessage());
@@ -180,8 +196,8 @@ public abstract class AbstractGateway {
                     Reply<Object> reply = msg.reply;
                     if (reply != null) {
                         reply.options()
-                                .addHeader(MODULE_TYPE, moduleType.name())
-                                .addHeader(GATEWAY_TYPE, gatewayType.name())
+                                .addHeader(MODULE_TYPE, moduleId)
+                                .addHeader(GATEWAY_TYPE, gatewayId)
                                 .addHeader(ADDRESS, address);
                         message.reply(reply.data, reply.options());
                     } else {

+ 14 - 8
connector-core/src/main/java/io/connector/core/module/AbstractModule.java → connector-core/src/main/java/io/connector/core/AbstractModule.java

@@ -1,10 +1,9 @@
-package io.connector.core.module;
+package io.connector.core;
 
-import io.connector.core.AbstractGateway;
-import io.connector.core.ModuleDescriptor;
 import io.vertx.core.AbstractVerticle;
 import io.vertx.ext.web.Router;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -14,18 +13,21 @@ import static io.connector.core.AddressPath.INFO;
 public abstract class AbstractModule extends AbstractVerticle {
 
     protected final ModuleDescriptor descriptor;
-    public final ModuleType type;
+    public final String moduleId;
     private final Map<String, AbstractGateway> gateways;
 
-    protected AbstractModule(ModuleDescriptor descriptor, ModuleType type) {
+    protected AbstractModule(String id, ModuleDescriptor descriptor) {
         this.descriptor = descriptor;
-        this.type = type;
+        this.moduleId = id;
         this.gateways = new HashMap<>();
     }
 
 
     protected void registerGateway(AbstractGateway gateway) {
         if (!gateways.containsKey(gateway.id())) {
+            gateway.setEventBus(vertx.eventBus());
+            gateway.setRouter(Router.router(vertx));
+            gateway.setModuleId(moduleId);
             gateways.put(gateway.id(), gateway);
         }
     }
@@ -35,7 +37,7 @@ public abstract class AbstractModule extends AbstractVerticle {
     public abstract ModuleInfo info();
 
     public String id() {
-        return type.name().toLowerCase();
+        return moduleId;
     }
 
     public ModuleDescriptor descriptor() {
@@ -54,6 +56,10 @@ public abstract class AbstractModule extends AbstractVerticle {
     public void start() throws Exception {
         run();
         gateways.values().forEach(AbstractGateway::start);
-        vertx.eventBus().consumer(create(type, INFO), msg -> msg.reply(info()));
+        vertx.eventBus().consumer(create(moduleId, INFO), msg -> {
+            ModuleInfo info = info();
+            info.setGateways(new ArrayList<>(gateways.keySet()));
+            msg.reply(info);
+        });
     }
 }

+ 5 - 11
connector-core/src/main/java/io/connector/core/AddressPath.java

@@ -1,7 +1,5 @@
 package io.connector.core;
 
-import io.connector.core.module.ModuleType;
-
 import static io.connector.core.AddressPath.Creator.create;
 
 public final class AddressPath {
@@ -16,20 +14,16 @@ public final class AddressPath {
 
         private final static char DELIMITER = '/';
 
-        public static String create(ModuleType moduleType, ModuleType gatewayType, String address) {
-            return create(moduleType.name(), gatewayType.name(), address);
-        }
-
-        public static String create(ModuleType moduleType, String address) {
-            return create(moduleType.name(), address);
-        }
-
         public static String create(String ...parts) {
             StringBuilder builder = new StringBuilder();
             for (String part : parts) {
                 builder.append(part.charAt(0) == DELIMITER ? part : DELIMITER + part);
             }
-            return builder.toString().toLowerCase();
+            return builder.toString();
+        }
+
+        public static String createNormalized(String... parts) {
+            return create(parts).toLowerCase();
         }
     }
 }

+ 4 - 0
connector-core/src/main/java/io/connector/core/DataCollection.java

@@ -16,6 +16,10 @@ public class DataCollection<T> {
         this.list = new ArrayList<>(list);
     }
 
+    public DataCollection(int size) {
+        this.list = new ArrayList<>(size);
+    }
+
     @SafeVarargs
     public static <T> DataCollection<T> of(T... dataList) {
         return new DataCollection<>(Arrays.asList(dataList));

+ 68 - 0
connector-core/src/main/java/io/connector/core/EventInfo.java

@@ -0,0 +1,68 @@
+package io.connector.core;
+
+import io.vertx.core.json.JsonArray;
+import io.vertx.core.json.JsonObject;
+
+import java.util.List;
+import java.util.Map;
+
+public class EventInfo {
+
+    private String module;
+    private String gateway;
+    private String address;
+    private Object data;
+
+    public String getModule() {
+        return module;
+    }
+
+    public void setModule(String module) {
+        this.module = module;
+    }
+
+    public String getGateway() {
+        return gateway;
+    }
+
+    public void setGateway(String gateway) {
+        this.gateway = gateway;
+    }
+
+    public String getAddress() {
+        return address;
+    }
+
+    public void setAddress(String address) {
+        this.address = address;
+    }
+
+    public Object getData() {
+        return data;
+    }
+
+    public void setData(Object data) {
+        if (data instanceof Map) {
+            this.data = JsonObject.mapFrom(data);
+            /*
+            Map<?, ?> dataMap = (Map<?, ?>)data;
+            JsonObject body = new JsonObject();
+            for (Map.Entry<?, ?> dataEntry : dataMap.entrySet()) {
+                if (dataEntry.getKey() instanceof String) {
+                    body.put((String)dataEntry.getKey(), dataEntry.getValue());
+                }
+            }
+            this.data = body;
+             */
+        } else if (data instanceof List) {
+            List<?> dataList = (List<?>)data;
+            DataCollection<JsonObject> body = new DataCollection<>(dataList.size());
+            for (Object value : dataList) {
+                body.add(JsonObject.mapFrom(value));
+            }
+            this.data = body;
+        } else {
+            this.data = new JsonObject();
+        }
+    }
+}

+ 4 - 0
connector-core/src/main/java/io/connector/core/Message.java

@@ -39,6 +39,10 @@ public class Message<T> {
         fail = new Fail(code, message);
     }
 
+    public void fail(Throwable throwable) {
+        fail = new Fail(400, throwable.getMessage());
+    }
+
     public boolean success() {
         return fail == null;
     }

+ 2 - 9
connector-core/src/main/java/io/connector/core/ModuleDeployer.java

@@ -1,6 +1,5 @@
 package io.connector.core;
 
-import io.connector.core.module.AbstractModule;
 import io.vertx.core.*;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -34,14 +33,8 @@ public class ModuleDeployer extends AbstractVerticle {
 
         CompositeFuture.all(futureModules).onComplete(result -> {
             if(result.succeeded()) {
-                DeploymentOptions serverOpt = new DeploymentOptions().setConfig(config());
-                vertx.deployVerticle(new VertxServer(modules), serverOpt, res -> {
-                    if (res.succeeded()) {
-                        startPromise.complete();
-                    } else {
-                        startPromise.fail(res.cause());
-                    }
-                });
+                deployHelper(vertx, new DeploymentOptions().setConfig(config()), new VertxServer(modules))
+                        .onSuccess(startPromise::complete).onFailure(startPromise::fail);
             } else {
                 startPromise.fail(result.cause());
             }

+ 13 - 1
connector-core/src/main/java/io/connector/core/module/ModuleInfo.java → connector-core/src/main/java/io/connector/core/ModuleInfo.java

@@ -1,13 +1,17 @@
-package io.connector.core.module;
+package io.connector.core;
 
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.eventbus.MessageCodec;
 import io.vertx.core.json.JsonObject;
 
+import java.util.List;
+
 public class ModuleInfo {
 
     private final String id;
 
+    private List<String> gateways;
+
     public ModuleInfo(String id) {
         this.id = id;
     }
@@ -16,6 +20,14 @@ public class ModuleInfo {
         return id;
     }
 
+    public void setGateways(List<String> gateways) {
+        this.gateways = gateways;
+    }
+
+    public List<String> getGateways() {
+        return gateways;
+    }
+
     public static MessageCodec<ModuleInfo, ModuleInfo> createCodec() {
         return new MessageCodec<>() {
             @Override

+ 2 - 3
connector-core/src/main/java/io/connector/core/module/ModuleLoader.java → connector-core/src/main/java/io/connector/core/ModuleLoader.java

@@ -1,6 +1,5 @@
-package io.connector.core.module;
+package io.connector.core;
 
-import io.connector.core.ModuleDescriptor;
 import io.connector.core.config.ConfigurationService;
 import io.connector.core.config.DefaultConfig;
 import org.apache.logging.log4j.LogManager;
@@ -10,7 +9,7 @@ import java.util.*;
 
 public final class ModuleLoader {
 
-    private static Logger logger = LogManager.getLogger(ModuleLoader.class);
+    private static final Logger logger = LogManager.getLogger(ModuleLoader.class);
 
     public static List<AbstractModule> loadModules(ConfigurationService configService) {
 

+ 1 - 3
connector-core/src/main/java/io/connector/core/module/ModuleProvider.java → connector-core/src/main/java/io/connector/core/ModuleProvider.java

@@ -1,6 +1,4 @@
-package io.connector.core.module;
-
-import io.connector.core.ModuleDescriptor;
+package io.connector.core;
 
 public interface ModuleProvider {
 

+ 38 - 14
connector-core/src/main/java/io/connector/core/VertxServer.java

@@ -4,8 +4,6 @@ import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.connector.core.config.SchedulerConfig;
 import io.connector.core.json.JsonAttributeFormatter;
-import io.connector.core.module.AbstractModule;
-import io.connector.core.module.ModuleInfo;
 import io.vertx.core.*;
 import io.vertx.core.eventbus.DeliveryOptions;
 import io.vertx.core.eventbus.EventBus;
@@ -15,6 +13,7 @@ import io.vertx.core.json.Json;
 import io.vertx.core.json.JsonObject;
 import io.vertx.core.json.jackson.DatabindCodec;
 import io.vertx.ext.web.Router;
+import io.vertx.ext.web.handler.BodyHandler;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -26,12 +25,16 @@ import static cz.senslog.common.http.HttpHeader.CONTENT_TYPE;
 import static io.connector.core.AddressPath.*;
 import static io.connector.core.AddressPath.Creator.*;
 import static io.connector.core.MessageHeader.*;
+import static io.connector.core.MessageHeader.Resource.HTTP_SERVER;
+import static io.connector.core.MessageHeader.Resource.SCHEDULER;
 import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME;
 
 public class VertxServer extends AbstractVerticle {
 
     private final static Logger logger = LogManager.getLogger(VertxServer.class);
 
+    private final static String DEFAULT_DOMAIN_PREFIX  = "/api";
+
     private final List<AbstractModule> modules;
 
     public VertxServer(List<AbstractModule> modules) {
@@ -40,7 +43,7 @@ public class VertxServer extends AbstractVerticle {
 
     @Override
     public void start(Promise<Void> startPromise) {
-        String domainPrefix = "/api";
+        String domainPrefix = config().getString("http.server.prefix", DEFAULT_DOMAIN_PREFIX);
 
         EventBus eventBus = vertx.eventBus();
         Router router = Router.router(vertx);
@@ -59,15 +62,13 @@ public class VertxServer extends AbstractVerticle {
             if (config != null && config.getPeriodSecond() > 0) {
                 vertx.setPeriodic(config.getPeriodMillisecond(), ctx -> {
                     DeliveryOptions fetcherOpt = new DeliveryOptions()
-                            .addHeader(RESOURCE, MessageHeader.Resource.SCHEDULER)
+                            .addHeader(RESOURCE, SCHEDULER)
                             .addHeader(ADDRESS, config.getConsumer());
-
-                    // TODO config.getId() must the same as the enum ModuleType -> change the idea
-                    String address = create(SCHEDULER_PROVIDER, config.getId());
-                    eventBus.request(address, new JsonObject(), fetcherOpt, reply -> {
+                    String providerAddress = createNormalized(SCHEDULER_PROVIDER, module.id());
+                    eventBus.request(providerAddress, new JsonObject(), fetcherOpt, reply -> {
                             if (reply.succeeded()) {
                                 Message<Object> result = reply.result();
-                                String source = result.headers().get(MODULE_TYPE); // TODO check if MODULE_TYPE of GATEWAY_TYPE
+                                String source = result.headers().get(MODULE_TYPE);
                                 DeliveryOptions pusherOpt = new DeliveryOptions();
                                 MultiMap headers = result.headers().addAll(fetcherOpt.getHeaders());
                                 for (Map.Entry<String, String> entryHeader : headers.entries()) {
@@ -75,7 +76,8 @@ public class VertxServer extends AbstractVerticle {
                                         pusherOpt.addHeader(entryHeader.getKey(), entryHeader.getValue());
                                     }
                                 }
-                                eventBus.publish(create(SCHEDULER_CONSUMER, source), result.body(), pusherOpt);
+                                String consumerAddress = createNormalized(SCHEDULER_CONSUMER, source);
+                                eventBus.publish(consumerAddress, result.body(), pusherOpt);
                             } else {
                                 logger.catching(reply.cause());
                             }
@@ -97,12 +99,34 @@ public class VertxServer extends AbstractVerticle {
             response.setStatusCode(code).end(error.encode());
         });
 
-        router.get(create(domainPrefix, EVENT)).handler(ctx -> {
+
+        router.get(create(domainPrefix, EVENT)).handler(BodyHandler.create()).handler(ctx -> {
+            /* trigger event:
+                    1. the same functionality as scheduler
+                        trigger event, fetch data a publish
+                        result will contain just OK (very time consuming)
+
+                    2. will be added to an event queue
+                        result will contain 'added to the event queue'
+             */
+
             HttpServerResponse response = ctx.response();
             response.putHeader(CONTENT_TYPE, APPLICATION_JSON);
-            response.end(new JsonObject()
-                    .put("message", "Events not implemented yet.")
-                    .encode());
+
+            EventInfo info = Json.decodeValue(ctx.getBody(), EventInfo.class);
+            String address = createNormalized(info.getModule(), info.getGateway(), info.getAddress());
+            DeliveryOptions options = new DeliveryOptions().addHeader(RESOURCE, HTTP_SERVER);
+
+//            eventBus.request(address, info.getData(), options, reply -> {
+//               if (reply.succeeded()) {
+//                   response.end(Json.encode(reply.result().body()));
+//               } else {
+//                   ctx.fail(reply.cause());
+//               }
+//            });
+
+            JsonObject responseBody = new JsonObject().put("message", "Events are not implemented yet.");
+            response.end(responseBody.encode());
         });
 
         router.get(create(domainPrefix, INFO)).handler(ctx -> {

+ 1 - 3
connector-core/src/main/java/io/connector/core/config/PropertyConfig.java

@@ -160,9 +160,7 @@ public class PropertyConfig {
             Map<?, ?> properties = (Map<?, ?>) property;
             for (Map.Entry<?, ?> propertyEntry : properties.entrySet()) {
                 Object propertyName = propertyEntry.getKey();
-                if (propertyName instanceof String) {
-                    config.setProperty((String)propertyName, propertyEntry.getValue());
-                }
+                config.setProperty(propertyName.toString(), propertyEntry.getValue());
             }
         }
 

+ 2 - 9
connector-core/src/main/java/io/connector/core/config/SchedulerConfig.java

@@ -1,23 +1,16 @@
 package io.connector.core.config;
 
-public final class SchedulerConfig {
-
-    private final String id;
+public final class SchedulerConfig extends PropertyConfig {
 
     private final Integer period;
-
     private final String consumer;
 
     public SchedulerConfig(String id, Integer period, String consumer) {
-        this.id = id;
+        super(id);
         this.period = period;
         this.consumer = consumer;
     }
 
-    public String getId() {
-        return id;
-    }
-
     public Integer getPeriodMillisecond() {
         return period * 1000;
     }

+ 11 - 0
connector-core/src/main/java/io/connector/core/config/file/FileConfigurationServiceImpl.java

@@ -4,6 +4,7 @@ import cz.senslog.common.exception.UnsupportedFileException;
 import cz.senslog.common.util.StringUtils;
 import io.connector.core.ModuleDescriptor;
 import io.connector.core.config.DefaultConfig;
+import io.connector.core.config.PropertyConfig;
 import io.connector.core.config.SchedulerConfig;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -173,6 +174,16 @@ class FileConfigurationServiceImpl  implements FileConfigurationService {
 
                 SchedulerConfig schedulerConfig = new SchedulerConfig(serviceId, period, consumer);
 
+                Object configObject = scheduleSettMap.get("config");
+                assert configObject instanceof Map;
+                Map<?, ?> configMap = (Map<?, ?>)configObject;
+                for (Map.Entry<?, ?> configEntry : configMap.entrySet()) {
+                    Object name = configEntry.getKey();
+                    if (name instanceof String) {
+                        schedulerConfig.setProperty((String)name, configEntry.getValue());
+                    }
+                }
+
                 schedulerSettings.put(serviceId, schedulerConfig);
             }
 

+ 0 - 11
connector-core/src/main/java/io/connector/core/module/ModuleType.java

@@ -1,11 +0,0 @@
-package io.connector.core.module;
-
-public enum ModuleType {
-
-    SENSLOG1,
-    AFARCLOUD,
-    IMA,
-    OGC_SENSOR_THINGS,
-
-    ;
-}

+ 0 - 22
connector-model/src/main/java/io/connector/model/afarcloud/AFCModel.java

@@ -1,22 +0,0 @@
-package io.connector.model.afarcloud;
-
-import io.vertx.core.json.JsonObject;
-
-public class AFCModel {
-
-    private final String data;
-
-    public static AFCModel parse(JsonObject jsonObject) {
-        // TODO check by schema
-        String data = jsonObject.getString("data");
-        return new AFCModel(data);
-    }
-
-    public AFCModel(String data) {
-        this.data = data;
-    }
-
-    public String getData() {
-        return data;
-    }
-}

+ 32 - 0
connector-model/src/main/java/io/connector/model/afarcloud/Location.java

@@ -0,0 +1,32 @@
+package io.connector.model.afarcloud;
+
+public class Location {
+
+    private Double latitude;
+    private Double longitude;
+    private Double altitude;
+
+    public Double getLatitude() {
+        return latitude;
+    }
+
+    public void setLatitude(Double latitude) {
+        this.latitude = latitude;
+    }
+
+    public Double getLongitude() {
+        return longitude;
+    }
+
+    public void setLongitude(Double longitude) {
+        this.longitude = longitude;
+    }
+
+    public Double getAltitude() {
+        return altitude;
+    }
+
+    public void setAltitude(Double altitude) {
+        this.altitude = altitude;
+    }
+}

+ 34 - 0
connector-model/src/main/java/io/connector/model/afarcloud/MultiSimpleObservation.java

@@ -0,0 +1,34 @@
+package io.connector.model.afarcloud;
+
+import java.util.List;
+
+public class MultiSimpleObservation {
+
+    private String resourceId;
+    private Location location;
+    private List<SimpleObservation> observations;
+
+    public String getResourceId() {
+        return resourceId;
+    }
+
+    public void setResourceId(String resourceId) {
+        this.resourceId = resourceId;
+    }
+
+    public Location getLocation() {
+        return location;
+    }
+
+    public void setLocation(Location location) {
+        this.location = location;
+    }
+
+    public List<SimpleObservation> getObservations() {
+        return observations;
+    }
+
+    public void setObservations(List<SimpleObservation> observations) {
+        this.observations = observations;
+    }
+}

+ 0 - 6
connector-model/src/main/java/io/connector/model/afarcloud/Observation.java

@@ -1,6 +0,0 @@
-package io.connector.model.afarcloud;
-
-public class Observation {
-
-
-}

+ 58 - 0
connector-model/src/main/java/io/connector/model/afarcloud/SimpleObservation.java

@@ -0,0 +1,58 @@
+package io.connector.model.afarcloud;
+
+public class SimpleObservation {
+
+    private static class Result {
+        private final Double value;
+        private String uom;
+
+        Result(Double value, String uom) {
+            this.value = value;
+            this.uom = uom;
+        }
+
+        Result(Double value) {
+            this.value = value;
+        }
+
+        public Double getValue() {
+            return value;
+        }
+
+        public String getUom() {
+            return uom;
+        }
+    }
+
+    private String observedProperty;
+    private Long resultTime;
+    private Result result;
+
+    public String getObservedProperty() {
+        return observedProperty;
+    }
+
+    public void setObservedProperty(String observedProperty) {
+        this.observedProperty = observedProperty;
+    }
+
+    public Long getResultTime() {
+        return resultTime;
+    }
+
+    public void setResultTime(Long resultTime) {
+        this.resultTime = resultTime;
+    }
+
+    public Result getResult() {
+        return result;
+    }
+
+    public void setResult(Double value, String uom) {
+        this.result = new Result(value, uom);
+    }
+
+    public void setResult(Double value) {
+        this.result = new Result(value);
+    }
+}

+ 22 - 0
connector-model/src/main/java/io/connector/model/senslog1/Sensor.java

@@ -0,0 +1,22 @@
+package io.connector.model.senslog1;
+
+import java.util.List;
+
+public class Sensor {
+
+    private final SensorInfo info;
+    private final List<Observation> observations;
+
+    public Sensor(SensorInfo info, List<Observation> observations) {
+        this.info = info;
+        this.observations = observations;
+    }
+
+    public SensorInfo getInfo() {
+        return info;
+    }
+
+    public List<Observation> getObservations() {
+        return observations;
+    }
+}

+ 12 - 12
connector-model/src/main/java/io/connector/model/senslog1/SensorObservation.java → connector-model/src/main/java/io/connector/model/senslog1/SensorData.java

@@ -10,12 +10,12 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
-public class SensorObservation {
+public class SensorData {
 
     private final long id;
     private final List<Observation> observations;
 
-    public static SensorObservation parse(JsonObject jsonObject) {
+    public static SensorData parse(JsonObject jsonObject) {
         // TODO check by schema
         long id = jsonObject.getLong("id");
         JsonArray observationsJson = jsonObject.getJsonArray("observations");
@@ -25,14 +25,14 @@ public class SensorObservation {
                 observations.add(Observation.parse((JsonObject)observation));
             }
         }
-        return new SensorObservation(id, observations);
+        return new SensorData(id, observations);
     }
 
-    public SensorObservation(long id) {
+    public SensorData(long id) {
         this(id, new ArrayList<>());
     }
 
-    public SensorObservation(long id, List<Observation> observations) {
+    public SensorData(long id, List<Observation> observations) {
         Objects.requireNonNull(observations);
         this.id = id;
         this.observations = observations;
@@ -54,8 +54,8 @@ public class SensorObservation {
     public boolean equals(Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
-        SensorObservation sensorObservation = (SensorObservation) o;
-        return Objects.equals(getId(), sensorObservation.getId());
+        SensorData sensorData = (SensorData) o;
+        return Objects.equals(getId(), sensorData.getId());
     }
 
     @Override
@@ -63,11 +63,11 @@ public class SensorObservation {
         return Objects.hash(getId());
     }
 
-    public static MessageCodec<SensorObservation, SensorObservation> createCodec() {
+    public static MessageCodec<SensorData, SensorData> createCodec() {
         return new MessageCodec<>() {
 
             @Override
-            public void encodeToWire(Buffer buffer, SensorObservation sensor) {
+            public void encodeToWire(Buffer buffer, SensorData sensor) {
                 JsonObject jsonToEncode = new JsonObject();
                 jsonToEncode.put("sensorId", sensor.getId());
                 jsonToEncode.put("observations", Json.encode(sensor.getObservations()));
@@ -80,7 +80,7 @@ public class SensorObservation {
             }
 
             @Override
-            public SensorObservation decodeFromWire(int i, Buffer buffer) {
+            public SensorData decodeFromWire(int i, Buffer buffer) {
                 int _pos = i;
                 int length = buffer.getInt(_pos);
 
@@ -98,11 +98,11 @@ public class SensorObservation {
                 List<Observation> observations = Json.decodeValue(jsonObservationStr, ref);
                 */
 
-                return new SensorObservation(sensorId, new ArrayList<>());
+                return new SensorData(sensorId, new ArrayList<>());
             }
 
             @Override
-            public SensorObservation transform(SensorObservation sensor) {
+            public SensorData transform(SensorData sensor) {
                 return sensor;
             }
 

+ 22 - 0
connector-model/src/main/java/io/connector/model/senslog1/Unit.java

@@ -0,0 +1,22 @@
+package io.connector.model.senslog1;
+
+import java.util.List;
+
+public class Unit {
+
+    private final UnitInfo info;
+    private final List<Sensor> sensors;
+
+    public Unit(UnitInfo info, List<Sensor> sensors) {
+        this.info = info;
+        this.sensors = sensors;
+    }
+
+    public UnitInfo getInfo() {
+        return info;
+    }
+
+    public List<Sensor> getSensors() {
+        return sensors;
+    }
+}

+ 10 - 10
connector-model/src/main/java/io/connector/model/senslog1/UnitData.java

@@ -12,7 +12,7 @@ public class UnitData {
 
     private final long id;
 
-    private Map<Long, SensorObservation> sensorCache;
+    private Map<Long, SensorData> sensorCache;
     private List<Position> positions;
 
     public static UnitData parse(JsonObject jsonObject) {
@@ -26,10 +26,10 @@ public class UnitData {
             }
         }
         JsonArray sensorsJson = jsonObject.getJsonArray("sensors");
-        List<SensorObservation> sensors = new ArrayList<>(sensorsJson.size());
+        List<SensorData> sensors = new ArrayList<>(sensorsJson.size());
         for (Object sensor : sensorsJson) {
             if (sensor instanceof JsonObject) {
-                sensors.add(SensorObservation.parse((JsonObject)sensor));
+                sensors.add(SensorData.parse((JsonObject)sensor));
             }
         }
         return new UnitData(id, sensors, positions);
@@ -39,7 +39,7 @@ public class UnitData {
         this(id, new ArrayList<>(), new ArrayList<>());
     }
 
-    public UnitData(long id, List<SensorObservation> sensors, List<Position> positions) {
+    public UnitData(long id, List<SensorData> sensors, List<Position> positions) {
         this.id = id;
         setSensors(sensors);
         setPositions(positions);
@@ -49,26 +49,26 @@ public class UnitData {
         return id;
     }
 
-    public void setSensors(Collection<SensorObservation> sensors) {
+    public void setSensors(Collection<SensorData> sensors) {
         this.sensorCache = new HashMap<>(sensors.size());
-        for (SensorObservation sensor : sensors) {
+        for (SensorData sensor : sensors) {
             sensorCache.put(sensor.getId(), sensor);
         }
     }
 
-    public void addSensor(SensorObservation sensor) {
+    public void addSensor(SensorData sensor) {
         Objects.requireNonNull(sensor);
         sensorCache.put(sensor.getId(), sensor);
     }
 
-    public SensorObservation getSensor(long sensorId) {
+    public SensorData getSensor(long sensorId) {
         if (sensorCache.containsKey(sensorId)) {
             return sensorCache.get(sensorId);
         }
         return null;
     }
 
-    public Collection<SensorObservation> getSensors() {
+    public Collection<SensorData> getSensors() {
         return sensorCache.values();
     }
 
@@ -88,7 +88,7 @@ public class UnitData {
     public final void mergeIn(UnitData unitData) {
         if (this.id != unitData.getId()) { return; }
 
-        Collection<SensorObservation> sensors = unitData.getSensors();
+        Collection<SensorData> sensors = unitData.getSensors();
         if (this.sensorCache.isEmpty() && !sensors.isEmpty()) {
             setSensors(sensors);
         }

+ 17 - 4
connector-module-afarcloud/src/main/java/io/connector/module/afarcloud/AFCClient.java

@@ -1,12 +1,14 @@
 package io.connector.module.afarcloud;
 
-import io.connector.model.afarcloud.AFCModel;
+import io.connector.core.config.HostConfig;
+import io.connector.model.afarcloud.MultiSimpleObservation;
 import io.connector.module.afarcloud.gateway.SensLog1Gateway;
-import io.vertx.core.json.JsonArray;
+import io.vertx.core.json.Json;
 import io.vertx.core.json.JsonObject;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.net.http.HttpClient;
 import java.util.List;
 
 public class AFCClient {
@@ -15,11 +17,22 @@ public class AFCClient {
 
     private final AFCConfig config;
 
-    public AFCClient(AFCConfig config) {
+    private final HttpClient httpClient;
+
+    public AFCClient(AFCConfig config, HttpClient httpClient) {
         this.config = config;
+        this.httpClient = httpClient;
     }
 
-    public void uploadObservations(List<AFCModel> observations) {
+    public void uploadAggregatedMeasurements(List<MultiSimpleObservation> observations) {
+
+        HostConfig host = new HostConfig(config.getDomain(), "telemetry");
+
+        if (observations.size() > 0) {
+            JsonObject json = JsonObject.mapFrom(observations.get(0));
+            System.out.println(json.encodePrettily());
+        }
+        
         logger.info("Converted {} observations.", observations.size());
     }
 }

+ 6 - 0
connector-module-afarcloud/src/main/java/io/connector/module/afarcloud/AFCConfig.java

@@ -4,7 +4,13 @@ import io.connector.core.config.DefaultConfig;
 
 public class AFCConfig {
 
+    private final String domain;
+
     AFCConfig(DefaultConfig defaultConfig) {
+        this.domain = defaultConfig.getPropertyConfig("api").getStringProperty("domain");
+    }
 
+    public String getDomain() {
+        return domain;
     }
 }

+ 6 - 8
connector-module-afarcloud/src/main/java/io/connector/module/afarcloud/AFCModule.java

@@ -1,27 +1,25 @@
 package io.connector.module.afarcloud;
 
 import io.connector.core.ModuleDescriptor;
-import io.connector.core.module.AbstractModule;
-import io.connector.core.module.ModuleInfo;
-import io.connector.core.module.ModuleType;
+import io.connector.core.AbstractModule;
+import io.connector.core.ModuleInfo;
 import io.connector.module.afarcloud.gateway.AFCGateway;
 import io.connector.module.afarcloud.gateway.SensLog1Gateway;
-import io.vertx.ext.web.Router;
 
 
 public class AFCModule extends AbstractModule {
 
     private final AFCClient client;
 
-    public AFCModule(ModuleDescriptor descriptor, AFCClient client) {
-        super(descriptor, ModuleType.AFARCLOUD);
+    public AFCModule(String id, ModuleDescriptor descriptor, AFCClient client) {
+        super(id, descriptor);
         this.client = client;
     }
 
     @Override
     public void run() {
-        registerGateway(new AFCGateway(vertx.eventBus(), Router.router(vertx), type, client));
-        registerGateway(new SensLog1Gateway(vertx.eventBus(), Router.router(vertx), type, client));
+        registerGateway(new AFCGateway("AFarCloud", client));
+        registerGateway(new SensLog1Gateway("SensLogV1", client));
     }
 
     @Override

+ 7 - 4
connector-module-afarcloud/src/main/java/io/connector/module/afarcloud/AFCModuleProvider.java

@@ -1,8 +1,10 @@
 package io.connector.module.afarcloud;
 
 import io.connector.core.ModuleDescriptor;
-import io.connector.core.module.AbstractModule;
-import io.connector.core.module.ModuleProvider;
+import io.connector.core.AbstractModule;
+import io.connector.core.ModuleProvider;
+
+import java.net.http.HttpClient;
 
 public final class AFCModuleProvider implements ModuleProvider {
 
@@ -10,8 +12,9 @@ public final class AFCModuleProvider implements ModuleProvider {
     public AbstractModule createModule(ModuleDescriptor descriptor) {
 
         AFCConfig config = new AFCConfig(descriptor.getServiceConfig());
-        AFCClient client = new AFCClient(config);
+        AFCClient client = new AFCClient(config, HttpClient.newHttpClient());
+        AFCModule module = new AFCModule("AFarCloud", descriptor, client);
 
-        return new AFCModule(descriptor, client);
+        return module;
     }
 }

+ 3 - 10
connector-module-afarcloud/src/main/java/io/connector/module/afarcloud/gateway/AFCGateway.java

@@ -1,24 +1,17 @@
 package io.connector.module.afarcloud.gateway;
 
 import io.connector.core.AbstractGateway;
-import io.connector.core.module.ModuleType;
 import io.connector.module.afarcloud.AFCClient;
-import io.vertx.core.eventbus.EventBus;
-import io.vertx.ext.web.Router;
-
-
-import static io.connector.core.module.ModuleType.AFARCLOUD;
 
 public class AFCGateway extends AbstractGateway {
 
     private final AFCClient client;
 
-    public AFCGateway(EventBus eventBus, Router router, ModuleType moduleType, AFCClient client) {
-        super(eventBus, router, moduleType, AFARCLOUD);
+    public AFCGateway(String id, AFCClient client) {
+        super(id, true);
         this.client = client;
     }
 
     @Override
-    public void run() {
-    }
+    public void run() {}
 }

+ 38 - 45
connector-module-afarcloud/src/main/java/io/connector/module/afarcloud/gateway/SensLog1Gateway.java

@@ -2,17 +2,11 @@ package io.connector.module.afarcloud.gateway;
 
 import io.connector.core.AbstractGateway;
 import io.connector.core.DataCollection;
-import io.connector.core.MessageHeader;
-import io.connector.core.module.ModuleType;
-import io.connector.model.afarcloud.AFCModel;
-import io.connector.model.senslog1.Observation;
-import io.connector.model.senslog1.SensorObservation;
-import io.connector.model.senslog1.UnitData;
+import io.connector.model.afarcloud.MultiSimpleObservation;
+import io.connector.model.afarcloud.SimpleObservation;
+import io.connector.model.senslog1.*;
 import io.connector.module.afarcloud.AFCClient;
-import io.vertx.core.buffer.Buffer;
-import io.vertx.core.eventbus.EventBus;
 import io.vertx.core.json.JsonObject;
-import io.vertx.ext.web.Router;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -20,7 +14,8 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
-import static io.connector.core.module.ModuleType.SENSLOG1;
+import static java.lang.String.format;
+
 
 public class SensLog1Gateway extends AbstractGateway {
 
@@ -28,8 +23,8 @@ public class SensLog1Gateway extends AbstractGateway {
 
     private final AFCClient client;
 
-    public SensLog1Gateway(EventBus eventBus, Router router, ModuleType moduleType, AFCClient client) {
-        super(eventBus, router, moduleType, SENSLOG1);
+    public SensLog1Gateway(String id, AFCClient client) {
+        super(id);
         this.client = client;
     }
 
@@ -38,43 +33,34 @@ public class SensLog1Gateway extends AbstractGateway {
     public void run() {
 
         schedulerMapping()
-                .addMapping("observations", "add-observations");
+                .addMapping("observations-with-info", "add-observations");
 
         event().consume("add-observations", message -> {
-            String resource = message.headers().get(MessageHeader.RESOURCE);
 
             Iterator<Object> dataIterator;
-            switch (resource) {
-                case MessageHeader.Resource.HTTP_SERVER: {
-                    dataIterator = (((Buffer)message.body()).toJsonArray()).iterator();
-                } break;
-                case MessageHeader.Resource.SCHEDULER: {
-                    dataIterator = ((DataCollection<?>)message.body()).iterator();
-                } break;
-                default: {
-                    message.fail(400, "Unknown resource of data."); return;
-                }
+            if (message.body() instanceof DataCollection) {
+                dataIterator = ((DataCollection<?>)message.body()).iterator();
+            } else {
+                message.fail(400, "Unknown resource of data."); return;
             }
 
-            List<AFCModel> afcObservations = new ArrayList<>();
+            List<MultiSimpleObservation> afcObservations = new ArrayList<>();
             while (dataIterator.hasNext()) {
                 Object data = dataIterator.next();
-                UnitData unitData = null;
-                if (data instanceof JsonObject) {
-                    unitData = UnitData.parse((JsonObject)data);
-                } else if (data instanceof UnitData) {
-                    unitData = (UnitData)data;
+                Unit sensLogUnit;
+                if (data instanceof Unit) {
+                    sensLogUnit = (Unit) data;
+                } else {
+                    message.fail(400, "Unknown resource of data."); return;
                 }
 
-                if (unitData != null) {
-                    afcObservations.addAll(Converter.convertObservation(unitData));
-                }
+                afcObservations.add(Converter.convertObservation(sensLogUnit));
             }
 
-            client.uploadObservations(afcObservations);
+            client.uploadAggregatedMeasurements(afcObservations);
 
             message.reply(new JsonObject().put("message",
-                    String.format("Added %s observations.", afcObservations.size())
+                    format("Added %s observations.", afcObservations.size())
             ));
         });
 
@@ -82,18 +68,25 @@ public class SensLog1Gateway extends AbstractGateway {
 
     private static class Converter {
 
-        public static List<AFCModel> convertObservation(UnitData unitObservations) {
-            List<AFCModel> afcObservations = new ArrayList<>();
-
-            for (SensorObservation sensor : unitObservations.getSensors()) {
-                for (Observation observation : sensor.getObservations()) {
-                    afcObservations.add(new AFCModel(
-                            String.format("%s_%s_%s", unitObservations.getId(), sensor.getId(), observation.getValue()))
-                    );
+        static MultiSimpleObservation convertObservation(Unit sensLogUnit) {
+            MultiSimpleObservation afcResult = new MultiSimpleObservation();
+
+//            String resourceId = format("urn:afc:AS07:environmental:UWB:agronode:%s", unitObservations.getId());
+            String resourceId = Long.toString(sensLogUnit.getInfo().getId());
+            afcResult.setResourceId(resourceId);
+            afcResult.setObservations(new ArrayList<>());
+
+            for (Sensor sensor : sensLogUnit.getSensors()) {
+                for (Observation sensLogObservation : sensor.getObservations()) {
+                    SimpleObservation afcObservation = new SimpleObservation();
+                    String observedProperty = sensor.getInfo().getPhenomenon().getName()+"_"+sensor.getInfo().getName();
+                    afcObservation.setObservedProperty(observedProperty.replaceAll("\\s+", "_"));
+                    afcObservation.setResult(sensLogObservation.getValue());
+                    afcObservation.setResultTime(sensLogObservation.getTimestamp().toEpochSecond());
+                    afcResult.getObservations().add(afcObservation);
                 }
             }
-
-            return afcObservations;
+            return afcResult;
         }
 
     }

+ 0 - 0
connector-module-afarcloud/src/main/resources/META-INF/services/io.connector.core.module.ModuleProvider → connector-module-afarcloud/src/main/resources/META-INF/services/io.connector.core.ModuleProvider


+ 0 - 21
connector-module-ima/src/main/java/io/connector/module/ima/AFCConverter.java

@@ -1,21 +0,0 @@
-package io.connector.module.ima;
-
-import io.connector.model.afarcloud.AFCModel;
-import io.connector.model.ima.IMAModel;
-import io.vertx.core.json.JsonObject;
-
-public final class AFCConverter {
-
-    public static boolean validBySchema(JsonObject afcModel) {
-        String stringClass = afcModel.getString("__class");
-        if (stringClass == null || stringClass.isBlank()) {
-            return false;
-        }
-
-        return stringClass.equals(AFCModel.class.getName());
-    }
-
-    public static IMAModel convert(AFCModel afcModel) {
-        return new IMAModel("Converted: " + afcModel.getData());
-    }
-}

+ 0 - 26
connector-module-ima/src/main/java/io/connector/module/ima/IMAModule.java

@@ -1,26 +0,0 @@
-package io.connector.module.ima;
-
-import io.connector.core.ModuleDescriptor;
-import io.connector.core.module.AbstractModule;
-import io.connector.core.module.ModuleInfo;
-import io.connector.core.module.ModuleType;
-
-public class IMAModule extends AbstractModule {
-
-    private final IMAClient client;
-
-    public IMAModule(ModuleDescriptor descriptor, IMAClient client) {
-        super(descriptor, ModuleType.IMA);
-        this.client = client;
-    }
-
-    @Override
-    public void run() {
-
-    }
-
-    @Override
-    public ModuleInfo info() {
-        return new ModuleInfo(id());
-    }
-}

+ 0 - 18
connector-module-ima/src/main/java/io/connector/module/ima/IMAModuleProvider.java

@@ -1,18 +0,0 @@
-package io.connector.module.ima;
-
-import io.connector.core.ModuleDescriptor;
-import io.connector.core.module.AbstractModule;
-import io.connector.core.module.ModuleProvider;
-
-public final class IMAModuleProvider implements ModuleProvider {
-
-    @Override
-    public AbstractModule createModule(ModuleDescriptor descriptor) {
-
-        IMAConfig config = new IMAConfig(descriptor.getServiceConfig());
-
-        IMAClient client = new IMAClient(config);
-
-        return new IMAModule(descriptor, client);
-    }
-}

+ 3 - 3
connector-module-ima/src/main/java/io/connector/module/ima/IMAClient.java → connector-module-ima/src/main/java/io/connector/module/ima/ImaClient.java

@@ -3,11 +3,11 @@ package io.connector.module.ima;
 import io.connector.model.ima.IMAModel;
 import io.vertx.core.json.JsonObject;
 
-public class IMAClient {
+public class ImaClient {
 
-    private final IMAConfig config;
+    private final ImaConfig config;
 
-    public IMAClient(IMAConfig config) {
+    public ImaClient(ImaConfig config) {
         this.config = config;
     }
 

+ 2 - 2
connector-module-ima/src/main/java/io/connector/module/ima/IMAConfig.java → connector-module-ima/src/main/java/io/connector/module/ima/ImaConfig.java

@@ -2,9 +2,9 @@ package io.connector.module.ima;
 
 import io.connector.core.config.DefaultConfig;
 
-public class IMAConfig {
+public class ImaConfig {
 
-    IMAConfig(DefaultConfig defaultConfig) {
+    ImaConfig(DefaultConfig defaultConfig) {
 
     }
 }

+ 25 - 0
connector-module-ima/src/main/java/io/connector/module/ima/ImaModule.java

@@ -0,0 +1,25 @@
+package io.connector.module.ima;
+
+import io.connector.core.ModuleDescriptor;
+import io.connector.core.AbstractModule;
+import io.connector.core.ModuleInfo;
+
+public class ImaModule extends AbstractModule {
+
+    private final ImaClient client;
+
+    public ImaModule(String id, ModuleDescriptor descriptor, ImaClient client) {
+        super(id, descriptor);
+        this.client = client;
+    }
+
+    @Override
+    public void run() {
+
+    }
+
+    @Override
+    public ModuleInfo info() {
+        return new ModuleInfo(id());
+    }
+}

+ 18 - 0
connector-module-ima/src/main/java/io/connector/module/ima/ImaModuleProvider.java

@@ -0,0 +1,18 @@
+package io.connector.module.ima;
+
+import io.connector.core.ModuleDescriptor;
+import io.connector.core.AbstractModule;
+import io.connector.core.ModuleProvider;
+
+public final class ImaModuleProvider implements ModuleProvider {
+
+    @Override
+    public AbstractModule createModule(ModuleDescriptor descriptor) {
+
+        ImaConfig config = new ImaConfig(descriptor.getServiceConfig());
+        ImaClient client = new ImaClient(config);
+        ImaModule module = new ImaModule("ima", descriptor, client);
+
+        return module;
+    }
+}

+ 1 - 0
connector-module-ima/src/main/resources/META-INF/services/io.connector.core.ModuleProvider

@@ -0,0 +1 @@
+io.connector.module.ima.ImaModuleProvider

+ 0 - 1
connector-module-ima/src/main/resources/META-INF/services/io.connector.core.module.ModuleProvider

@@ -1 +0,0 @@
-io.connector.module.ima.IMAModuleProvider

+ 6 - 11
connector-module-ogc-sensorthings/src/main/java/io/connector/module/ogc/sensorthings/SensorThingsModule.java

@@ -1,27 +1,22 @@
 package io.connector.module.ogc.sensorthings;
 
 import io.connector.core.ModuleDescriptor;
-import io.connector.core.module.AbstractModule;
-import io.connector.core.module.ModuleInfo;
+import io.connector.core.AbstractModule;
+import io.connector.core.ModuleInfo;
 import io.connector.module.ogc.sensorthings.gateway.SensorThingsGateway;
-import io.vertx.ext.web.Router;
-
-import static io.connector.core.module.ModuleType.OGC_SENSOR_THINGS;
 
 public class SensorThingsModule extends AbstractModule {
 
     private final SensorThingsClient client;
 
-    SensorThingsModule(ModuleDescriptor descriptor, SensorThingsClient client) {
-        super(descriptor, OGC_SENSOR_THINGS);
+    SensorThingsModule(String id, ModuleDescriptor descriptor, SensorThingsClient client) {
+        super(id, descriptor);
         this.client = client;
     }
 
     @Override
-    public void run() throws Exception {
-        registerGateway(
-                new SensorThingsGateway(vertx.eventBus(), Router.router(vertx), type, client)
-        );
+    public void run() {
+        registerGateway(new SensorThingsGateway("OGCSensorThings", client));
     }
 
     @Override

+ 3 - 3
connector-module-ogc-sensorthings/src/main/java/io/connector/module/ogc/sensorthings/SensorThingsModuleProvider.java

@@ -1,8 +1,8 @@
 package io.connector.module.ogc.sensorthings;
 
 import io.connector.core.ModuleDescriptor;
-import io.connector.core.module.AbstractModule;
-import io.connector.core.module.ModuleProvider;
+import io.connector.core.AbstractModule;
+import io.connector.core.ModuleProvider;
 
 import static cz.senslog.common.http.HttpClient.newHttpClient;
 
@@ -14,7 +14,7 @@ public class SensorThingsModuleProvider implements ModuleProvider {
 
         SensorThingsConfig config = new SensorThingsConfig(descriptor.getServiceConfig());
         SensorThingsClient client = new SensorThingsClient(config, newHttpClient());
-        SensorThingsModule module = new SensorThingsModule(descriptor, client);
+        SensorThingsModule module = new SensorThingsModule("OGCSensorThings", descriptor, client);
 
         return module;
     }

+ 4 - 7
connector-module-ogc-sensorthings/src/main/java/io/connector/module/ogc/sensorthings/gateway/SensorThingsGateway.java

@@ -1,30 +1,27 @@
 package io.connector.module.ogc.sensorthings.gateway;
 
 import io.connector.core.AbstractGateway;
-import io.connector.core.module.ModuleType;
 import io.connector.module.ogc.sensorthings.SensorThingsClient;
 import io.vertx.core.MultiMap;
 import io.vertx.core.eventbus.DeliveryOptions;
-import io.vertx.core.eventbus.EventBus;
 import io.vertx.core.json.JsonObject;
-import io.vertx.ext.web.Router;
 
 import static io.connector.core.Handler.replyToHttpContext;
-import static io.connector.core.module.ModuleType.OGC_SENSOR_THINGS;
+import static io.connector.core.AddressPath.Creator.create;
 
 public class SensorThingsGateway extends AbstractGateway {
 
     private final SensorThingsClient client;
 
-    public SensorThingsGateway(EventBus eventBus, Router router, ModuleType moduleType, SensorThingsClient client) {
-        super(eventBus, router, moduleType, OGC_SENSOR_THINGS);
+    public SensorThingsGateway(String id, SensorThingsClient client) {
+        super(id, true);
         this.client = client;
     }
 
     @Override
     protected void run() {
 
-        router().get("/Things(:id)").handler(ctx -> {
+        router().get(create("Things(:id)")).handler(ctx -> {
 
             DeliveryOptions options = new DeliveryOptions()
                     .addHeader("id", ctx.pathParam("id"));

+ 0 - 0
connector-module-ogc-sensorthings/src/main/resources/META-INF/services/io.connector.core.module.ModuleProvider → connector-module-ogc-sensorthings/src/main/resources/META-INF/services/io.connector.core.ModuleProvider


+ 63 - 6
connector-module-senslog1/src/main/java/io/connector/module/senslog1/SensLog1Client.java

@@ -16,6 +16,7 @@ import java.time.format.DateTimeFormatter;
 import java.util.*;
 
 import static cz.senslog.common.http.HttpContentType.TEXT_PLAIN;
+import static java.lang.String.format;
 import static java.time.format.DateTimeFormatter.ofPattern;
 
 public class SensLog1Client {
@@ -77,9 +78,9 @@ public class SensLog1Client {
                     JsonObject jsonObservations = (JsonObject)jsonObject;
                     long unitId = jsonObservations.getLong("unitId");
                     long sensorId = jsonObservations.getLong("sensorId");
-                    SensorObservation sensor = units.computeIfAbsent(unitId, UnitData::new).getSensor(sensorId);
+                    SensorData sensor = units.computeIfAbsent(unitId, UnitData::new).getSensor(sensorId);
                     if (sensor == null) {
-                        sensor = new SensorObservation(sensorId);
+                        sensor = new SensorData(sensorId);
                         units.get(unitId).addSensor(sensor);
                     }
 
@@ -139,7 +140,7 @@ public class SensLog1Client {
 
         Map<Long, UnitData> badObservations = new HashMap<>();
         for (UnitData unit : unitData) {
-            for (SensorObservation sensor : unit.getSensors()) {
+            for (SensorData sensor : unit.getSensors()) {
                 for (Observation observation : sensor.getObservations()) {
 
 
@@ -234,6 +235,62 @@ public class SensLog1Client {
         }
     }
 
+    public List<Unit> observationsWithInfo(OffsetDateTime fromDate, OffsetDateTime toDate) {
+        Collection<UnitInfo> unitInfos = getUnitInfos().values();
+        List<Unit> units = new ArrayList<>(unitInfos.size());
+        for (UnitInfo unitInfo : unitInfos) {
+            UnitData unitData = new UnitData(unitInfo.getId());
+            Collection<SensorInfo> sensorInfos = unitInfo.getSensors();
+            List<Sensor> sensors = new ArrayList<>(sensorInfos.size());
+            for (SensorInfo sensorInfo : sensorInfos) {
+                SensorData sensorObs = new SensorData(sensorInfo.getId());
+                observations(unitData, sensorObs, fromDate, toDate);
+                sensors.add(new Sensor(sensorInfo, sensorObs.getObservations()));
+            }
+            units.add(new Unit(unitInfo, sensors));
+        }
+        return units;
+    }
+
+    public List<Unit> observationsWithInfo(long unitId, OffsetDateTime fromDate, OffsetDateTime toDate) {
+        UnitInfo unitInfo = getUnitInfos().get(unitId);
+        if (unitInfo == null) {
+            throw new RuntimeException(format("Unknown unit with id %s.", unitId));
+        }
+
+        UnitData unitData = new UnitData(unitInfo.getId());
+        Collection<SensorInfo> sensorInfos = unitInfo.getSensors();
+        List<Sensor> sensors = new ArrayList<>(sensorInfos.size());
+        for (SensorInfo sensorInfo : sensorInfos) {
+            SensorData sensorObs = new SensorData(sensorInfo.getId());
+            observations(unitData, sensorObs, fromDate, toDate);
+            sensors.add(new Sensor(sensorInfo, sensorObs.getObservations()));
+        }
+        List<Unit> units = new ArrayList<>(1);
+        units.add(new Unit(unitInfo, sensors));
+        return units;
+    }
+
+    public List<Unit> observationsWithInfo(long unitId, long sensorId, OffsetDateTime fromDate, OffsetDateTime toDate) {
+        UnitInfo unitInfo = getUnitInfos().get(unitId);
+        if (unitInfo == null) {
+            throw new RuntimeException(format("Unknown unit with id %s.", unitId));
+        }
+        SensorInfo sensorInfo = unitInfo.getSensor(sensorId);
+        if (sensorInfo == null) {
+            throw new RuntimeException(format("Unknown sensor with id %s for the unit %s.", sensorId, unitId));
+        }
+        UnitData unitData = new UnitData(unitInfo.getId());
+        SensorData sensorObs = new SensorData(sensorInfo.getId());
+        observations(unitData, sensorObs, fromDate, toDate);
+        List<Sensor> sensors = new ArrayList<>(1);
+        sensors.add(new Sensor(sensorInfo, sensorObs.getObservations()));
+
+        List<Unit> units = new ArrayList<>(1);
+        units.add(new Unit(unitInfo, sensors));
+        return units;
+    }
+
     public List<UnitData> observations(OffsetDateTime fromDate, OffsetDateTime toDate) {
         List<UnitData> units = new ArrayList<>();
         for (UnitInfo unit : getUnitInfos().values()) {
@@ -247,7 +304,7 @@ public class SensLog1Client {
         if (unitInfo != null) {
             UnitData unit = new UnitData(unitInfo.getId());
             for (SensorInfo sensorInfo : unitInfo.getSensors()) {
-                SensorObservation sensor = new SensorObservation(sensorInfo.getId());
+                SensorData sensor = new SensorData(sensorInfo.getId());
                 unit.addSensor(sensor);
                 observations(unit, sensor, fromDate, toDate);
             }
@@ -258,13 +315,13 @@ public class SensLog1Client {
 
     public List<UnitData> observations(long unitId, long sensorId, OffsetDateTime fromDate, OffsetDateTime toDate) {
         UnitData unit = new UnitData(unitId);
-        SensorObservation sensor = new SensorObservation(sensorId);
+        SensorData sensor = new SensorData(sensorId);
         unit.addSensor(sensor);
         observations(unit, sensor, fromDate, toDate);
         return Arrays.asList(unit);
     }
 
-    private void observations(UnitData unit, SensorObservation sensor, OffsetDateTime fromDate, OffsetDateTime toDate) {
+    private void observations(UnitData unit, SensorData sensor, OffsetDateTime fromDate, OffsetDateTime toDate) {
 
         HostConfig host = config.getSensorServiceHost();
         logger.info("Getting observations from {}.", host.getDomain());

+ 9 - 13
connector-module-senslog1/src/main/java/io/connector/module/senslog1/SensLog1Module.java

@@ -2,37 +2,33 @@ package io.connector.module.senslog1;
 
 import io.connector.core.ModuleDescriptor;
 import io.connector.core.config.SchedulerConfig;
-import io.connector.core.module.AbstractModule;
-import io.connector.core.module.ModuleInfo;
-import io.connector.core.module.ModuleType;
+import io.connector.core.AbstractModule;
+import io.connector.core.ModuleInfo;
 import io.connector.module.senslog1.gateway.AFCGateway;
 import io.connector.module.senslog1.gateway.SensLog1Gateway;
 import io.vertx.core.json.JsonObject;
-import io.vertx.ext.web.Router;
 
 public class SensLog1Module extends AbstractModule {
 
     private final SensLog1Client client;
 
-    protected SensLog1Module(ModuleDescriptor descriptor, SensLog1Client client) {
-        super(descriptor, ModuleType.SENSLOG1);
+    protected SensLog1Module(String id, ModuleDescriptor descriptor, SensLog1Client client) {
+        super(id, descriptor);
         this.client = client;
     }
 
     @Override
     public void run() {
-        registerGateway(new AFCGateway(vertx.eventBus(), Router.router(vertx), type, client));
-        registerGateway(new SensLog1Gateway(vertx.eventBus(), Router.router(vertx), type, client));
+        registerGateway(new AFCGateway("AFarCloud", client));
+
+        SensLog1SchedulerConfig schedulerConfig = new SensLog1SchedulerConfig(descriptor.getSchedulerConfig());
+        registerGateway(new SensLog1Gateway("SensLogV1", schedulerConfig, client));
     }
 
     @Override
     public ModuleInfo info() {
-        JsonObject jsonInfo = new JsonObject()
-                .put("id", id());
-
-
-        jsonInfo.put("gateways", new JsonObject());
 
+        JsonObject jsonInfo = new JsonObject();
         if (descriptor().getSchedulerConfig() != null) {
             SchedulerConfig config = descriptor().getSchedulerConfig();
             jsonInfo.put("scheduling", new JsonObject()

+ 3 - 3
connector-module-senslog1/src/main/java/io/connector/module/senslog1/SensLog1ModuleProvider.java

@@ -2,8 +2,8 @@ package io.connector.module.senslog1;
 
 import cz.senslog.common.http.HttpClient;
 import io.connector.core.ModuleDescriptor;
-import io.connector.core.module.AbstractModule;
-import io.connector.core.module.ModuleProvider;
+import io.connector.core.AbstractModule;
+import io.connector.core.ModuleProvider;
 
 public final class SensLog1ModuleProvider implements ModuleProvider {
 
@@ -12,7 +12,7 @@ public final class SensLog1ModuleProvider implements ModuleProvider {
 
         SensLog1Config config = new SensLog1Config(descriptor.getServiceConfig());
         SensLog1Client client = new SensLog1Client(config, HttpClient.newHttpClient());
-        SensLog1Module module = new SensLog1Module(descriptor, client);
+        SensLog1Module module = new SensLog1Module("SensLogV1", descriptor, client);
 
         return module;
     }

+ 18 - 0
connector-module-senslog1/src/main/java/io/connector/module/senslog1/SensLog1SchedulerConfig.java

@@ -0,0 +1,18 @@
+package io.connector.module.senslog1;
+
+import io.connector.core.config.SchedulerConfig;
+
+import java.util.Set;
+
+public class SensLog1SchedulerConfig {
+
+    private final Set<String> allowedStations;
+
+    SensLog1SchedulerConfig(SchedulerConfig config) {
+        this.allowedStations = config.getPropertyConfig("allowedStations").getAttributes();
+    }
+
+    public Set<String> getAllowedStations() {
+        return allowedStations;
+    }
+}

+ 2 - 7
connector-module-senslog1/src/main/java/io/connector/module/senslog1/gateway/AFCGateway.java

@@ -1,20 +1,15 @@
 package io.connector.module.senslog1.gateway;
 
 import io.connector.core.AbstractGateway;
-import io.connector.core.module.ModuleType;
 import io.connector.module.senslog1.SensLog1Client;
-import io.vertx.core.eventbus.EventBus;
-import io.vertx.ext.web.Router;
 
 
-import static io.connector.core.module.ModuleType.AFARCLOUD;
-
 public class AFCGateway extends AbstractGateway {
 
     private final SensLog1Client client;
 
-    public AFCGateway(EventBus eventBus, Router router, ModuleType moduleType, SensLog1Client client) {
-        super(eventBus, router, moduleType, AFARCLOUD);
+    public AFCGateway(String id, SensLog1Client client) {
+        super(id);
         this.client = client;
     }
 

+ 65 - 53
connector-module-senslog1/src/main/java/io/connector/module/senslog1/gateway/SensLog1Gateway.java

@@ -1,24 +1,20 @@
 package io.connector.module.senslog1.gateway;
 
+import cz.senslog.common.util.Tuple;
 import io.connector.core.AbstractGateway;
 import io.connector.core.DataCollection;
+import io.connector.core.Message;
 import io.connector.core.MessageHeader;
-import io.connector.core.module.ModuleType;
-import io.connector.model.senslog1.Observation;
-import io.connector.model.senslog1.SensorObservation;
-import io.connector.model.senslog1.UnitData;
-import io.connector.model.senslog1.UnitInfo;
+import io.connector.model.senslog1.*;
 import io.connector.module.senslog1.SensLog1Client;
+import io.connector.module.senslog1.SensLog1SchedulerConfig;
 import io.vertx.core.MultiMap;
 import io.vertx.core.buffer.Buffer;
-import io.vertx.core.eventbus.EventBus;
 import io.vertx.core.json.JsonObject;
-import io.vertx.ext.web.Router;
 
 import java.time.OffsetDateTime;
 import java.util.*;
 
-import static io.connector.core.module.ModuleType.SENSLOG1;
 import static java.time.OffsetDateTime.MAX;
 import static java.time.OffsetDateTime.MIN;
 import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME;
@@ -26,10 +22,12 @@ import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME;
 public class SensLog1Gateway extends AbstractGateway {
 
     private final SensLog1Client client;
+    private final SensLog1SchedulerConfig schedulerConfig;
 
-    public SensLog1Gateway(EventBus eventBus, Router router, ModuleType moduleType, SensLog1Client client) {
-        super(eventBus, router, moduleType, SENSLOG1);
+    public SensLog1Gateway(String id, SensLog1SchedulerConfig schedulerConfig, SensLog1Client client) {
+        super(id, true);
         this.client = client;
+        this.schedulerConfig = schedulerConfig;
     }
 
     @Override
@@ -37,13 +35,13 @@ public class SensLog1Gateway extends AbstractGateway {
 
         // TODO rename to a general name
         schedulerMapping()
-                .addMapping("schedule-observations", "observations");
+                .addMapping("schedule-observations", "observations-with-info");
 
         event().consume("schedule-observations", message -> {
             List<UnitData> unitData = client.lastObservations();
             OffsetDateTime from = MAX, to = MIN;
             for (UnitData unit : unitData) {
-                for (SensorObservation sensor : unit.getSensors()) {
+                for (SensorData sensor : unit.getSensors()) {
                     for (Observation observation : sensor.getObservations()) {
                         if (observation.getTimestamp().isBefore(from)) {
                             from = observation.getTimestamp();
@@ -56,10 +54,12 @@ public class SensLog1Gateway extends AbstractGateway {
             }
 
             // TODO add scheduler config to the body
+            String allowedUnits = String.join(",", schedulerConfig.getAllowedStations());
 
             message.reply(message.body()).options()
-                    .addHeader("fromDate", from.format(ISO_OFFSET_DATE_TIME))
-                    .addHeader("toDate", from.plusMinutes(30).format(ISO_OFFSET_DATE_TIME));
+                    .addHeader("fromDate", "2020-02-10T06:30:00+01:00") // from.format(ISO_OFFSET_DATE_TIME))
+                    .addHeader("toDate", "2020-02-10T07:30:00+01:00") // to.format(ISO_OFFSET_DATE_TIME))
+                    .addHeader("unitId", allowedUnits);
         });
 
         event().consume("units", message -> {
@@ -96,22 +96,12 @@ public class SensLog1Gateway extends AbstractGateway {
             message.reply(new DataCollection<>(unsentObservations));
         });
 
-        event().consume("unit-data", message -> {
+        event().consume("observations-with-positions", message -> {
             MultiMap params = message.headers();
 
-            OffsetDateTime fromDate;
-            if (params.contains("fromDate")) {
-                fromDate = OffsetDateTime.parse(params.get("fromDate"), ISO_OFFSET_DATE_TIME);
-            } else {
-                message.fail(400, "Attribute 'fromDate' is required."); return;
-            }
-
-            OffsetDateTime toDate;
-            if (params.contains("toDate")) {
-                toDate = OffsetDateTime.parse(params.get("toDate"), ISO_OFFSET_DATE_TIME);
-            } else {
-                message.fail(400, "Attribute 'toDate' is required."); return;
-            }
+            Tuple<OffsetDateTime, OffsetDateTime> timeRange = getTimeRangeFromParam(message);
+            if (timeRange == null) { return; }
+            OffsetDateTime fromDate = timeRange.getItem1(), toDate = timeRange.getItem2();
 
             List<UnitData> positions, observations;
             if (params.contains("unitId")) {
@@ -130,19 +120,9 @@ public class SensLog1Gateway extends AbstractGateway {
         event().consume("positions", message -> {
             MultiMap params = message.headers();
 
-            OffsetDateTime fromDate;
-            if (params.contains("fromDate")) {
-                fromDate = OffsetDateTime.parse(params.get("fromDate"), ISO_OFFSET_DATE_TIME);
-            } else {
-                message.fail(400, "Attribute 'fromDate' is required."); return;
-            }
-
-            OffsetDateTime toDate;
-            if (params.contains("toDate")) {
-                toDate = OffsetDateTime.parse(params.get("toDate"), ISO_OFFSET_DATE_TIME);
-            } else {
-                message.fail(400, "Attribute 'toDate' is required."); return;
-            }
+            Tuple<OffsetDateTime, OffsetDateTime> timeRange = getTimeRangeFromParam(message);
+            if (timeRange == null) { return; }
+            OffsetDateTime fromDate = timeRange.getItem1(), toDate = timeRange.getItem2();
 
             List<UnitData> positions;
             if (params.contains("unitId")) {
@@ -155,24 +135,36 @@ public class SensLog1Gateway extends AbstractGateway {
             message.reply(new DataCollection<>(positions));
         });
 
-        event().consume("observations", message -> {
-            // TODO get filter/config from message.body()
+        event().consume("observations-with-info", message -> {
 
             MultiMap params = message.headers();
+            Tuple<OffsetDateTime, OffsetDateTime> timeRange = getTimeRangeFromParam(message);
+            if (timeRange == null) { return; }
+            OffsetDateTime fromDate = timeRange.getItem1(), toDate = timeRange.getItem2();
 
-            OffsetDateTime fromDate;
-            if (params.contains("fromDate")) {
-                fromDate = OffsetDateTime.parse(params.get("fromDate"), ISO_OFFSET_DATE_TIME);
+            List<Unit> unitData;
+            if (params.contains("unitId")) {
+                long unitId = Long.parseLong(params.get("unitId"));
+                if (params.contains("sensorId")) {
+                    long sensorId = Long.parseLong(params.get("sensorId"));
+                    unitData = client.observationsWithInfo(unitId, sensorId, fromDate, toDate);
+                } else {
+                    unitData = client.observationsWithInfo(unitId, fromDate, toDate);
+                }
             } else {
-                message.fail(400, "Attribute 'fromDate' is required."); return;
+                unitData = client.observationsWithInfo(fromDate, toDate);
             }
 
-            OffsetDateTime toDate;
-            if (params.contains("toDate")) {
-                toDate = OffsetDateTime.parse(params.get("toDate"), ISO_OFFSET_DATE_TIME);
-            } else {
-                message.fail(400, "Attribute 'toDate' is required."); return;
-            }
+            message.reply(new DataCollection<>(unitData));
+        });
+
+        event().consume("observations", message -> {
+            // TODO get filter/config from message.body()
+
+            MultiMap params = message.headers();
+            Tuple<OffsetDateTime, OffsetDateTime> timeRange = getTimeRangeFromParam(message);
+            if (timeRange == null) { return; }
+            OffsetDateTime fromDate = timeRange.getItem1(), toDate = timeRange.getItem2();
 
             List<UnitData> observations;
             if (params.contains("unitId")) {
@@ -191,6 +183,26 @@ public class SensLog1Gateway extends AbstractGateway {
         });
     }
 
+    private static <T> Tuple<OffsetDateTime, OffsetDateTime> getTimeRangeFromParam(Message<T> message) {
+        MultiMap params = message.headers();
+
+        OffsetDateTime fromDate;
+        if (params.contains("fromDate")) {
+            fromDate = OffsetDateTime.parse(params.get("fromDate"), ISO_OFFSET_DATE_TIME);
+        } else {
+            message.fail(400, "Attribute 'fromDate' is required."); return null;
+        }
+
+        OffsetDateTime toDate;
+        if (params.contains("toDate")) {
+            toDate = OffsetDateTime.parse(params.get("toDate"), ISO_OFFSET_DATE_TIME);
+        } else {
+            message.fail(400, "Attribute 'toDate' is required."); return null;
+        }
+
+        return Tuple.of(fromDate, toDate);
+    }
+
     @SafeVarargs
     private static List<UnitData> mergeUnitDate(List<UnitData> ...unitsToMerge) {
         int len = unitsToMerge.length;

+ 0 - 0
connector-module-senslog1/src/main/resources/META-INF/services/io.connector.core.module.ModuleProvider → connector-module-senslog1/src/main/resources/META-INF/services/io.connector.core.ModuleProvider