Selaa lähdekoodia

Refactor events

Lukas Cerny 5 vuotta sitten
vanhempi
commit
5cb7aade7b

+ 79 - 193
connector-core/src/main/java/io/connector/core/AbstractGateway.java

@@ -1,140 +1,58 @@
 package io.connector.core;
 
 import io.connector.core.module.ModuleType;
-import io.vertx.core.MultiMap;
 import io.vertx.core.eventbus.DeliveryOptions;
 import io.vertx.core.eventbus.EventBus;
-import io.vertx.core.eventbus.ReplyException;
 import io.vertx.ext.web.Router;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.Consumer;
 
 import static io.connector.core.AddressPath.*;
 import static io.connector.core.AddressPath.Creator.create;
 import static io.connector.core.MessageHeader.*;
-import static io.vertx.core.eventbus.ReplyFailure.NO_HANDLERS;
+import static java.lang.String.format;
 
 public abstract class AbstractGateway {
 
     private final static Logger logger = LogManager.getLogger(AbstractGateway.class);
 
-    protected static class Message<T> {
-        private final io.vertx.core.eventbus.Message<T> message;
-        private Reply<Object> reply;
-
-        private Fail fail;
-
-        private Message(io.vertx.core.eventbus.Message<T> message) {
-            this.message=  message;
-        }
-
-        private Message(Throwable throwable) {
-            this.message = null;
-
-            if (throwable instanceof ReplyException) {
-                this.fail = new Fail(((ReplyException)throwable).failureCode(), throwable.getMessage());
-            } else {
-                this.fail = new Fail(400, throwable.getMessage());
-            }
-        }
-
-        public T body() {
-            return message.body();
-        }
-
-        public MultiMap headers() {
-            return message.headers();
-        }
-
-        public String address() {
-            return message.address();
-        }
-
-        public void fail(int code, String message) {
-            fail = new Fail(code, message);
-        }
-
-        public boolean success() {
-            return fail == null;
-        }
-
-        public boolean isFail() {
-            return !success();
-        }
-
-        public Fail cause() {
-            return fail;
-        }
-
-        public Reply<Object> reply(Object data) {
-            reply = new Reply<>(data);
-            return reply;
-        }
-    }
-
-    protected static class Fail {
-        private final int code;
-        private final String message;
-
-        Fail(int code, String message) {
-            this.code = code;
-            this.message = message;
-        }
-
-        public int getCode() {
-            return code;
-        }
-
-        public String getMessage() {
-            return message;
-        }
-    }
-
-    protected static class Reply<T> {
-        private final T data;
-        private final DeliveryOptions options;
-
-        private Reply(T data) {
-            this.data = data;
-            this.options = new DeliveryOptions();
-        }
-
-        public DeliveryOptions options() {
-            return options;
-        }
-    }
-
     private final EventBus eventBus;
+    private final Event event;
     private final Router router;
     private final ModuleType moduleType;
     protected final ModuleType gatewayType;
-    private final Map<String, String> registeredConsumers;
+    private final Set<String> registeredConsumers;
     private final Map<String, String> schedulerMapping;
-    private final Map<String, Consumer<Message<Object>>> consumerHandlers;
 
     protected AbstractGateway(EventBus eventBus, Router router, ModuleType moduleType, ModuleType gatewayType) {
         this.eventBus = eventBus;
+        this.event = new EventImpl(eventBus);
         this.router = router;
         this.moduleType = moduleType;
         this.gatewayType = gatewayType;
-        this.registeredConsumers = new HashMap<>();
+        this.registeredConsumers = new HashSet<>();
         this.schedulerMapping = new HashMap<>();
-        this.consumerHandlers = new HashMap<>();
     }
 
+    protected abstract void run();
+
     public String id() {
         return gatewayType.name().toLowerCase();
     }
 
-    public Router router() {
+    public final Router router() {
         return router;
     }
 
-    protected abstract void run();
+    public final Event event() {
+        return event;
+    }
 
     public final void start() {
         run();
@@ -142,83 +60,14 @@ public abstract class AbstractGateway {
     }
 
     private String createAddress(String consumerName) {
-        if (!registeredConsumers.containsKey(consumerName)) {
+        if (!registeredConsumers.contains(consumerName)) {
             throw logger.throwing(new RuntimeException(
-                    String.format("Consumer '%s' in module %s and gateway %s is not registered.",
+                    format("Consumer '%s' in module %s and gateway %s is not registered.",
                             consumerName, moduleType.name().toLowerCase(), gatewayType.name().toLowerCase()
                     )
             ));
         }
-        String encapsulation = registeredConsumers.get(consumerName);
-        return create(encapsulation, moduleType, gatewayType, consumerName);
-    }
-
-    private <T> void privateRequest(String address, Object body, DeliveryOptions options, Consumer<Message<T>> handler) {
-        // TODO request in the same gateway
-    }
-
-    private <T> void publicRequest(String address, Object body, DeliveryOptions options, Consumer<Message<T>> handler) {
-        // TODO request to another module
-    }
-
-
-    protected final <T> void request(String address, Object body, DeliveryOptions options, Consumer<Message<T>> handler) {
-        if (!registeredConsumers.containsKey(address)) {
-            handler.accept(new Message<>(new ReplyException(NO_HANDLERS, "no handler " + address))); return;
-        }
-
-        String encapsulation = registeredConsumers.get(address);
-        if (encapsulation.equals(PUBLIC_CONSUMER)) {
-            eventBus.<T>request(createAddress(address), body, options, reply -> {
-                Message<T> message;
-                if (reply.succeeded()) {
-                    message = new Message<>(reply.result());
-                } else {
-                    message = new Message<>(reply.cause());
-                }
-                handler.accept(message);
-            });
-        } else if (encapsulation.equals(PRIVATE_CONSUMER)) {
-
-
-        }
-    }
-
-    protected final <T> void publicConsumer(String address, Consumer<Message<T>> handler) {
-        registerConsumer(address, PUBLIC_CONSUMER, handler);
-    }
-
-    // TODO change handler to map of handlers without eventBus
-    protected final <T> void privateConsumer(String address, Consumer<Message<T>> handler) {
-        registerConsumer(address, PRIVATE_CONSUMER, handler);
-    }
-
-    private <T> void registerConsumer(String address, String encapsulation, Consumer<Message<T>> handler) {
-        if (registeredConsumers.containsKey(address)) {
-            logger.throwing(new RuntimeException(
-                    String.format("Address '%s' is already registered as '%s'.", address, registeredConsumers.get(address))
-            ));
-        }
-        registeredConsumers.put(address, encapsulation);
-        eventBus.<T>consumer(createAddress(address), message -> {
-            Message<T> msg = new Message<>(message);
-            handler.accept(msg);
-            if (msg.isFail()) {
-                Fail fail = msg.fail;
-                message.fail(fail.code, fail.message);
-            } else {
-                Reply<Object> reply = msg.reply;
-                if (reply != null) {
-                    reply.options
-                            .addHeader(MODULE_TYPE, moduleType.name())
-                            .addHeader(GATEWAY_TYPE, gatewayType.name())
-                            .addHeader(ADDRESS, address);
-                    message.reply(reply.data, reply.options);
-                } else {
-                    message.fail(204, "no content");
-                }
-            }
-        });
+        return create(moduleType, gatewayType, consumerName);
     }
 
     public interface SchedulerMapping {
@@ -236,34 +85,19 @@ public abstract class AbstractGateway {
 
     private void registerSchedulerConsumers() {
         final boolean isDefault = moduleType.equals(gatewayType);
-
         for (Map.Entry<String, String> mappingEntry : schedulerMapping.entrySet()) {
             if (isDefault) {
-                String privateConsumer = registeredConsumers.get(mappingEntry.getKey());
-                if (privateConsumer == null) {
-                    throw logger.throwing(new RuntimeException(
-                            String.format("Consumer '%s' in module %s and gateway %s is not registered.",
-                                    mappingEntry.getKey(),
-                                    moduleType.name().toLowerCase(),
-                                    gatewayType.name().toLowerCase()
-                            )
-                    ));
-                }
-
-                if (!privateConsumer.equalsIgnoreCase(PRIVATE_CONSUMER)) {
-                    logger.warn("Consumer '{}' in module {} and gateway {} is not private and could be dangerous.",
-                            mappingEntry.getKey(), moduleType.name().toLowerCase(), gatewayType.name().toLowerCase());
-                }
-            }
-
-            if (!registeredConsumers.containsKey(mappingEntry.getValue())) {
-                throw logger.throwing(new RuntimeException(
-                        String.format("Consumer '%s' in module %s and gateway %s is not registered.",
-                                mappingEntry.getValue(),
-                                moduleType.name().toLowerCase(),
-                                gatewayType.name().toLowerCase()
-                        )
-                ));
+                Consumer<String> check = name -> {
+                    if (!registeredConsumers.contains(name)) {
+                        throw logger.throwing(new RuntimeException(
+                                format("Consumer '%s' in module %s and gateway %s is not registered.",
+                                        name, moduleType.name().toLowerCase(), gatewayType.name().toLowerCase()
+                                )
+                        ));
+                    }
+                };
+                check.accept(mappingEntry.getKey());
+                check.accept(mappingEntry.getValue());
             }
         }
 
@@ -318,4 +152,56 @@ public abstract class AbstractGateway {
             }
         });
     }
+
+
+    protected interface Event {
+        <T> void consume(String address, Consumer<Message<T>> handler);
+        <T> void send(String address, Object body, DeliveryOptions options, Consumer<Message<T>> handler);
+    }
+
+    private class EventImpl implements Event {
+
+        private final EventBus eventBus;
+
+        private EventImpl(EventBus eventBus) {
+            this.eventBus = eventBus;
+        }
+
+        @Override
+        public <T> void consume(String address, Consumer<Message<T>> handler) {
+            registeredConsumers.add(address);
+            eventBus.<T>consumer(createAddress(address), message -> {
+                Message<T> msg = new Message<>(message);
+                handler.accept(msg);
+                if (msg.isFail()) {
+                    Fail fail = msg.cause();
+                    message.fail(fail.getCode(), fail.getMessage());
+                } else {
+                    Reply<Object> reply = msg.reply;
+                    if (reply != null) {
+                        reply.options()
+                                .addHeader(MODULE_TYPE, moduleType.name())
+                                .addHeader(GATEWAY_TYPE, gatewayType.name())
+                                .addHeader(ADDRESS, address);
+                        message.reply(reply.data, reply.options());
+                    } else {
+                        message.fail(204, "no content");
+                    }
+                }
+            });
+        }
+
+        @Override
+        public <T> void send(String address, Object body, DeliveryOptions options, Consumer<Message<T>> handler) {
+            eventBus.<T>request(createAddress(address), body, options, reply -> {
+                Message<T> message;
+                if (reply.succeeded()) {
+                    message = new Message<>(reply.result());
+                } else {
+                    message = new Message<>(reply.cause());
+                }
+                handler.accept(message);
+            });
+        }
+    }
 }

+ 8 - 9
connector-core/src/main/java/io/connector/core/AddressPath.java

@@ -9,20 +9,19 @@ public final class AddressPath {
     public static final String SCHEDULER_CONSUMER = create("scheduler", "consumer");
     public static final String SCHEDULER_PROVIDER = create("scheduler", "provider");
 
-    public static final String PRIVATE_CONSUMER = create("private");
-    public static final String PUBLIC_CONSUMER = create("public");
-
-    public static final String EVENT = create("event");
-    public static final String INFO = create("info");
-    public static final String HTTP_SERVER = create("api");
-
+    public static final String EVENT = "event";
+    public static final String INFO = "info";
 
     public static final class Creator {
 
         private final static char DELIMITER = '/';
 
-        public static String create(String encapsulation, ModuleType moduleType, ModuleType gatewayType, String address) {
-            return create(encapsulation, moduleType.name(), gatewayType.name(), address);
+        public static String create(ModuleType moduleType, ModuleType gatewayType, String address) {
+            return create(moduleType.name(), gatewayType.name(), address);
+        }
+
+        public static String create(ModuleType moduleType, String address) {
+            return create(moduleType.name(), address);
         }
 
         public static String create(String ...parts) {

+ 20 - 0
connector-core/src/main/java/io/connector/core/Fail.java

@@ -0,0 +1,20 @@
+package io.connector.core;
+
+public class Fail {
+
+    private final int code;
+    private final String message;
+
+    Fail(int code, String message) {
+        this.code = code;
+        this.message = message;
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+}

+ 2 - 2
connector-core/src/main/java/io/connector/core/Handler.java

@@ -10,13 +10,13 @@ import static cz.senslog.common.http.HttpHeader.CONTENT_TYPE;
 
 public final class Handler {
 
-    public static <T> Consumer<AbstractGateway.Message<T>> replyToHttpContext(RoutingContext httpContext) {
+    public static <T> Consumer<Message<T>> replyToHttpContext(RoutingContext httpContext) {
         return reply -> {
             if (reply.success()) {
                 httpContext.response().putHeader(CONTENT_TYPE, APPLICATION_JSON);
                 httpContext.response().end(Json.encode(reply.body()));
             } else {
-                AbstractGateway.Fail fail = reply.cause();
+                Fail fail = reply.cause();
                 httpContext.fail(fail.getCode(), new RuntimeException(fail.getMessage()));
             }
         };

+ 58 - 0
connector-core/src/main/java/io/connector/core/Message.java

@@ -0,0 +1,58 @@
+package io.connector.core;
+
+import io.vertx.core.MultiMap;
+import io.vertx.core.eventbus.ReplyException;
+
+public class Message<T> {
+
+    private final io.vertx.core.eventbus.Message<T> message;
+    protected Reply<Object> reply;
+    private Fail fail;
+
+    Message(io.vertx.core.eventbus.Message<T> message) {
+        this.message=  message;
+    }
+
+    Message(Throwable throwable) {
+        this.message = null;
+
+        if (throwable instanceof ReplyException) {
+            this.fail = new Fail(((ReplyException)throwable).failureCode(), throwable.getMessage());
+        } else {
+            this.fail = new Fail(400, throwable.getMessage());
+        }
+    }
+
+    public T body() {
+        return message.body();
+    }
+
+    public MultiMap headers() {
+        return message.headers();
+    }
+
+    public String address() {
+        return message.address();
+    }
+
+    public void fail(int code, String message) {
+        fail = new Fail(code, message);
+    }
+
+    public boolean success() {
+        return fail == null;
+    }
+
+    public boolean isFail() {
+        return !success();
+    }
+
+    public Fail cause() {
+        return fail;
+    }
+
+    public Reply<Object> reply(Object data) {
+        reply = new Reply<>(data);
+        return reply;
+    }
+}

+ 18 - 0
connector-core/src/main/java/io/connector/core/Reply.java

@@ -0,0 +1,18 @@
+package io.connector.core;
+
+import io.vertx.core.eventbus.DeliveryOptions;
+
+public class Reply<T> {
+
+    protected final T data;
+    private final DeliveryOptions options;
+
+    Reply(T data) {
+        this.data = data;
+        this.options = new DeliveryOptions();
+    }
+
+    public DeliveryOptions options() {
+        return options;
+    }
+}

+ 9 - 55
connector-core/src/main/java/io/connector/core/VertxServer.java

@@ -7,18 +7,14 @@ import io.connector.core.json.JsonAttributeFormatter;
 import io.connector.core.module.AbstractModule;
 import io.connector.core.module.ModuleInfo;
 import io.vertx.core.*;
-import io.vertx.core.buffer.Buffer;
 import io.vertx.core.eventbus.DeliveryOptions;
 import io.vertx.core.eventbus.EventBus;
 import io.vertx.core.eventbus.Message;
-import io.vertx.core.eventbus.ReplyException;
-import io.vertx.core.http.HttpServerRequest;
 import io.vertx.core.http.HttpServerResponse;
 import io.vertx.core.json.Json;
 import io.vertx.core.json.JsonObject;
 import io.vertx.core.json.jackson.DatabindCodec;
 import io.vertx.ext.web.Router;
-import io.vertx.ext.web.handler.BodyHandler;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -27,7 +23,6 @@ import java.util.*;
 
 import static cz.senslog.common.http.HttpContentType.APPLICATION_JSON;
 import static cz.senslog.common.http.HttpHeader.CONTENT_TYPE;
-import static cz.senslog.common.util.StringUtils.isBlank;
 import static io.connector.core.AddressPath.*;
 import static io.connector.core.AddressPath.Creator.*;
 import static io.connector.core.MessageHeader.*;
@@ -45,6 +40,8 @@ public class VertxServer extends AbstractVerticle {
 
     @Override
     public void start(Promise<Void> startPromise) {
+        String domainPrefix = "/api";
+
         EventBus eventBus = vertx.eventBus();
         Router router = Router.router(vertx);
 
@@ -86,7 +83,7 @@ public class VertxServer extends AbstractVerticle {
                 });
             }
 
-            router.mountSubRouter(create(HTTP_SERVER, module.id()), module.router());
+            router.mountSubRouter(create(domainPrefix, module.id()), module.router());
         }
 
         router.route().failureHandler(ctx -> {
@@ -100,57 +97,15 @@ public class VertxServer extends AbstractVerticle {
             response.setStatusCode(code).end(error.encode());
         });
 
-        router.route(create(HTTP_SERVER, "*")).handler(BodyHandler.create()).handler(ctx -> {
-
-            HttpServerRequest request = ctx.request();
+        router.get(create(domainPrefix, EVENT)).handler(ctx -> {
             HttpServerResponse response = ctx.response();
-
-            String moduleType = request.headers().get("Module-Type");
-            String gatewayType = request.headers().get("Gateway-Type");
-
-            if (isBlank(moduleType) || isBlank(gatewayType)) {
-                ctx.next(); return;
-            }
-
-            String path = ctx.normalisedPath().substring(4);
-            String address = create(PUBLIC_CONSUMER, moduleType, gatewayType, path);
-            Buffer bodyBuffer = ctx.getBody();
-            DeliveryOptions options = new DeliveryOptions().setHeaders(request.params())
-                    .addHeader(RESOURCE, MessageHeader.Resource.HTTP_SERVER);
-
-            eventBus.request(address, bodyBuffer, options, reply -> {
-                response.putHeader(CONTENT_TYPE, APPLICATION_JSON);
-                if (reply.succeeded()) {
-                    Message<Object> replyMessage = reply.result();
-
-                    for (Map.Entry<String, String> headerEntry : replyMessage.headers().entries()) {
-                        if (headerEntry.getKey().startsWith(MessageHeader.getPrefix())) { continue; }
-                        response.putHeader(headerEntry.getKey(), headerEntry.getValue());
-                    }
-
-                    String responseBody;
-                    if (replyMessage.body() instanceof DataCollection) {
-                        DataCollection<?> dataCollection = (DataCollection<?>) replyMessage.body();
-                        responseBody = Json.encode(dataCollection.getList());
-                    } else {
-                        responseBody = Json.encode(replyMessage.body());
-                    }
-
-                    response.end(responseBody);
-                } else {
-                    ctx.fail(reply.cause());
-                    Throwable throwable = reply.cause();
-                    int code = throwable instanceof ReplyException ? ((ReplyException)throwable).failureCode() : 400;
-                    ctx.fail(code, throwable);
-                }
-            });
-        });
-
-        router.get(EVENT).handler(ctx -> {
-
+            response.putHeader(CONTENT_TYPE, APPLICATION_JSON);
+            response.end(new JsonObject()
+                    .put("message", "Events not implemented yet.")
+                    .encode());
         });
 
-        router.get(INFO).handler(ctx -> {
+        router.get(create(domainPrefix, INFO)).handler(ctx -> {
             List<Future> futures = new ArrayList<>(modules.size());
             for (AbstractModule module : modules) {
                 final Promise<ModuleInfo> promise = Promise.promise();
@@ -185,6 +140,5 @@ public class VertxServer extends AbstractVerticle {
                         startPromise.fail(result.cause());
                     }
                 });
-
     }
 }

+ 2 - 2
connector-core/src/main/java/io/connector/core/module/AbstractModule.java

@@ -54,6 +54,6 @@ public abstract class AbstractModule extends AbstractVerticle {
     public void start() throws Exception {
         run();
         gateways.values().forEach(AbstractGateway::start);
-        vertx.eventBus().consumer(INFO, msg -> msg.reply(info()));
+        vertx.eventBus().consumer(create(type, INFO), msg -> msg.reply(info()));
     }
-}
+}

+ 1 - 0
connector-core/src/main/java/io/connector/core/module/ModuleLoader.java

@@ -18,6 +18,7 @@ public final class ModuleLoader {
         ServiceLoader<ModuleProvider> loader = ServiceLoader.load(ModuleProvider.class);
 
         for (ModuleProvider provider : loader) {
+            logger.info("Loading a module provided by {}.", provider.getClass());
             moduleProviders.put(provider.getClass(), provider);
         }
 

+ 1 - 2
connector-module-afarcloud/src/main/java/io/connector/module/afarcloud/gateway/SensLog1Gateway.java

@@ -40,8 +40,7 @@ public class SensLog1Gateway extends AbstractGateway {
         schedulerMapping()
                 .addMapping("observations", "add-observations");
 
-
-        publicConsumer("add-observations", message -> {
+        event().consume("add-observations", message -> {
             String resource = message.headers().get(MessageHeader.RESOURCE);
 
             Iterator<Object> dataIterator;

+ 6 - 11
connector-module-ogc-sensorthings/src/main/java/io/connector/module/ogc/sensorthings/gateway/SensorThingsGateway.java

@@ -9,42 +9,37 @@ import io.vertx.core.eventbus.EventBus;
 import io.vertx.core.json.JsonObject;
 import io.vertx.ext.web.Router;
 
-import static io.connector.core.AddressPath.Creator.create;
 import static io.connector.core.Handler.replyToHttpContext;
+import static io.connector.core.module.ModuleType.OGC_SENSOR_THINGS;
 
 public class SensorThingsGateway extends AbstractGateway {
 
     private final SensorThingsClient client;
 
     public SensorThingsGateway(EventBus eventBus, Router router, ModuleType moduleType, SensorThingsClient client) {
-        super(eventBus, router, moduleType, ModuleType.OGC_SENSOR_THINGS);
+        super(eventBus, router, moduleType, OGC_SENSOR_THINGS);
         this.client = client;
     }
 
     @Override
     protected void run() {
 
-        router().get(create("Things(:id)")).handler(ctx -> {
+        router().get("/Things(:id)").handler(ctx -> {
 
             DeliveryOptions options = new DeliveryOptions()
                     .addHeader("id", ctx.pathParam("id"));
 
-            request("things", ctx.getBody(), options, replyToHttpContext(ctx));
+            event().send("things", ctx.getBody(), options, replyToHttpContext(ctx));
         });
 
-        publicConsumer("things", message -> {
+        event().consume("things", message -> {
             MultiMap headers = message.headers();
             String id = headers.get("id");
             message.reply(new JsonObject().put("id", id));
         });
 
-        privateConsumer("location", message -> {
-
-        });
-
-        publicConsumer("test", message -> {
+        event().consume("test", message -> {
             message.reply("OK");
         });
-
     }
 }

+ 1 - 2
connector-module-senslog1/src/main/java/io/connector/module/senslog1/SensLog1Module.java

@@ -7,7 +7,6 @@ import io.connector.core.module.ModuleInfo;
 import io.connector.core.module.ModuleType;
 import io.connector.module.senslog1.gateway.AFCGateway;
 import io.connector.module.senslog1.gateway.SensLog1Gateway;
-import io.vertx.core.eventbus.EventBus;
 import io.vertx.core.json.JsonObject;
 import io.vertx.ext.web.Router;
 
@@ -21,7 +20,7 @@ public class SensLog1Module extends AbstractModule {
     }
 
     @Override
-    public void run() throws Exception {
+    public void run() {
         registerGateway(new AFCGateway(vertx.eventBus(), Router.router(vertx), type, client));
         registerGateway(new SensLog1Gateway(vertx.eventBus(), Router.router(vertx), type, client));
     }

+ 6 - 6
connector-module-senslog1/src/main/java/io/connector/module/senslog1/gateway/SensLog1Gateway.java

@@ -39,7 +39,7 @@ public class SensLog1Gateway extends AbstractGateway {
         schedulerMapping()
                 .addMapping("schedule-observations", "observations");
 
-        privateConsumer("schedule-observations", message -> {
+        event().consume("schedule-observations", message -> {
             List<UnitData> unitData = client.lastObservations();
             OffsetDateTime from = MAX, to = MIN;
             for (UnitData unit : unitData) {
@@ -62,12 +62,12 @@ public class SensLog1Gateway extends AbstractGateway {
                     .addHeader("toDate", from.plusMinutes(30).format(ISO_OFFSET_DATE_TIME));
         });
 
-        publicConsumer("units", message -> {
+        event().consume("units", message -> {
             List<UnitInfo> units = client.units();
             message.reply(new DataCollection<>(units));
         });
 
-        publicConsumer("add-observations", message -> {
+        event().consume("add-observations", message -> {
             String resource = message.headers().get(MessageHeader.RESOURCE);
 
             Iterator<Object> dataIterator;
@@ -96,7 +96,7 @@ public class SensLog1Gateway extends AbstractGateway {
             message.reply(new DataCollection<>(unsentObservations));
         });
 
-        publicConsumer("unit-data", message -> {
+        event().consume("unit-data", message -> {
             MultiMap params = message.headers();
 
             OffsetDateTime fromDate;
@@ -127,7 +127,7 @@ public class SensLog1Gateway extends AbstractGateway {
             message.reply(new DataCollection<>(mergedUnitData));
         });
 
-        publicConsumer("positions", message -> {
+        event().consume("positions", message -> {
             MultiMap params = message.headers();
 
             OffsetDateTime fromDate;
@@ -155,7 +155,7 @@ public class SensLog1Gateway extends AbstractGateway {
             message.reply(new DataCollection<>(positions));
         });
 
-        publicConsumer("observations", message -> {
+        event().consume("observations", message -> {
             // TODO get filter/config from message.body()
 
             MultiMap params = message.headers();