Explorar el Código

Implemented supergroups and merging reports

Lukas Cerny hace 4 años
padre
commit
bf97ed3c4d
Se han modificado 19 ficheros con 451 adiciones y 203 borrados
  1. 90 0
      config/super-test-foodie.yaml
  2. 37 11
      src/main/java/cz/senslog/watchdog/app/Application.java
  3. 23 11
      src/main/java/cz/senslog/watchdog/config/Configuration.java
  4. 49 35
      src/main/java/cz/senslog/watchdog/config/ExecutableGroup.java
  5. 14 0
      src/main/java/cz/senslog/watchdog/config/ExecutableSuperGroup.java
  6. 8 1
      src/main/java/cz/senslog/watchdog/config/GroupConfig.java
  7. 10 0
      src/main/java/cz/senslog/watchdog/config/PropertyConfig.java
  8. 0 10
      src/main/java/cz/senslog/watchdog/config/SuperGroup.java
  9. 1 1
      src/main/java/cz/senslog/watchdog/config/SuperGroupConfig.java
  10. 3 3
      src/main/java/cz/senslog/watchdog/core/Watcher.java
  11. 12 3
      src/main/java/cz/senslog/watchdog/core/connection/SensLogWSConnection.java
  12. 9 11
      src/main/java/cz/senslog/watchdog/messagebroker/MessageBroker.java
  13. 10 3
      src/main/java/cz/senslog/watchdog/messagebroker/MessageBrokerManager.java
  14. 49 36
      src/main/java/cz/senslog/watchdog/messagebroker/MessageStatus.java
  15. 13 0
      src/main/java/cz/senslog/watchdog/messagebroker/MultiMessageBroker.java
  16. 50 0
      src/main/java/cz/senslog/watchdog/messagebroker/broker/AccumulatorMessageBroker.java
  17. 35 37
      src/main/java/cz/senslog/watchdog/messagebroker/broker/ConsoleMessageBroker.java
  18. 19 15
      src/main/java/cz/senslog/watchdog/messagebroker/broker/EmailMessageBroker.java
  19. 19 26
      src/main/java/cz/senslog/watchdog/messagebroker/broker/SignalMessageBroker.java

+ 90 - 0
config/super-test-foodie.yaml

@@ -0,0 +1,90 @@
+general:
+  firstStartAt: "00:20:00" # hh:mm:ss
+
+emailServers:
+  lspEmail:
+    smtpHost: "mail.lesprojekt.cz"
+    smtpPort: 465
+    authUsername: "watchdog@senslog.org"
+    authPassword: "5jspdD"
+
+senslogServers:
+  lspSenslog15:
+    baseUrl: "http://sensor.lesprojekt.cz/senslog15"
+    auth:
+      username: "watchdog"
+      password: "HAFhaf"
+
+messageBrokers:
+  emailToAll:
+    type: EMAIL
+    config:
+      server: lspEmail
+      senderEmail: "watchdog@senslog.org"
+      recipientEmail:
+        - "luccerny@ntis.zcu.cz"
+      subject: "[Watchdog] Test Report SensLog (CZ)"
+
+  console:
+    type: CONSOLE
+    config:
+      messagePattern: "No actual data from: $unit_id/$sensor_id"  # available props: $unit_id, $sensor_id, $timestamp, $value
+      level: INFO
+
+dataProviders:
+  wsSensLogKynsperk:
+    type: WEB_SERVICE
+    config:
+      server: lspSenslog15
+      groupName: "kynsperk"
+
+  wsSensLogRostenice:
+    type: WEB_SERVICE
+    config:
+      server: lspSenslog15
+      groupName: "rostenice_pudni"
+
+  wsSensLogZcu:
+    type: WEB_SERVICE
+    config:
+      server: lspSenslog15
+      groupName: "zcu"
+
+groups:
+  kynsperk:
+    name: "Kynsperk"
+    active: false
+    dataProvider: wsSensLogKynsperk
+    messageBroker: console
+    resultType: FAIL
+    period: 86400
+
+  rostenice:
+    name: "Rostenice"
+    active: true
+    dataProvider: wsSensLogRostenice
+    messageBroker: emailToAll
+    resultType: FAIL
+    period: 86400
+
+  zcu:
+    name: "ZCU Robcice"
+    active: false
+    dataProvider: wsSensLogZcu
+    messageBroker: console
+    resultType: FAIL
+    period: 86400
+
+superGroups:
+  sumarizeError:
+    name: "Kynsperk & ZCU Robcice"
+    messageBroker: emailToAll
+    resultType: FAIL
+    period: 86400
+    groups: [ kynsperk, zcu ]
+
+monitoredObjects:
+
+  1305167562258386:
+    period: 86400
+    groups: [ zcu ]

+ 37 - 11
src/main/java/cz/senslog/watchdog/app/Application.java

@@ -2,9 +2,12 @@ package cz.senslog.watchdog.app;
 
 import cz.senslog.watchdog.config.Configuration;
 import cz.senslog.watchdog.config.ExecutableGroup;
+import cz.senslog.watchdog.config.ExecutableSuperGroup;
 import cz.senslog.watchdog.core.Watcher;
 import cz.senslog.watchdog.messagebroker.MessageBroker;
 import cz.senslog.watchdog.messagebroker.MessageBrokerManager;
+import cz.senslog.watchdog.messagebroker.MultiMessageBroker;
+import cz.senslog.watchdog.messagebroker.broker.AccumulatorMessageBroker;
 import cz.senslog.watchdog.provider.DataProvider;
 import cz.senslog.watchdog.provider.DataProviderManager;
 import cz.senslog.watchdog.util.schedule.Scheduler;
@@ -15,9 +18,8 @@ import java.io.IOException;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
-
-import static cz.senslog.watchdog.util.DateTrunc.trunc;
-
+import java.util.ArrayList;
+import java.util.List;
 
 public class Application extends Thread {
 
@@ -72,18 +74,20 @@ public class Application extends Thread {
         LocalDateTime startAt = LocalDateTime.of(startAtDate, startAtTime);
 
         Scheduler.SchedulerBuilder schedulerBuilder = Scheduler.createBuilder();
-        for (ExecutableGroup exeGroup : config.getExecutableGroups()) {
+        for (ExecutableGroup execGroup : config.getExecGroups()) {
+            if (!execGroup.getConfig().isActive()) { continue; }
 
-            MessageBroker msgBroker = messageBrokerManager.getInstance(exeGroup.getConfig().getMessageBrokerId());
-            DataProvider dataProvider = dataProviderManager.getInstance(exeGroup.getConfig().getDataProviderId());
-            Watcher watcher = Watcher.create(exeGroup, dataProvider, msgBroker);
+            String msgBrokerId = execGroup.getConfig().getMessageBrokerId();
+            MessageBroker msgBroker = messageBrokerManager.getInstance(msgBrokerId, MessageBroker.class);
+            DataProvider dataProvider = dataProviderManager.getInstance(execGroup.getConfig().getDataProviderId());
+            Watcher watcher = Watcher.create(execGroup, dataProvider, msgBroker);
 
-            Integer period = exeGroup.getConfig().getPeriod();
+            Integer period = execGroup.getConfig().getPeriod();
             if (period == null) {
-                throw new IllegalStateException("Period for the group '"+exeGroup.getConfig().getId()+"' is not specified.");
+                throw new IllegalStateException("Period for the group '"+execGroup.getConfig().getId()+"' is not specified.");
             }
 
-            String taskName = exeGroup.getConfig().getId();
+            String taskName = execGroup.getConfig().getId();
             if (params.isExecuteImmediately()) {
                 schedulerBuilder.addTask(taskName, watcher::check);
             } else {
@@ -92,7 +96,29 @@ public class Application extends Thread {
             }
         }
 
-        // TODO super groups
+        for (ExecutableSuperGroup execSuperGroup : config.getSuperGroupsConfig()) {
+            int subGroupCount = execSuperGroup.getKeys().size();
+
+            String messageBrokerId = execSuperGroup.getConfig().getMessageBrokerId();
+            MultiMessageBroker msgBroker = messageBrokerManager.getInstance(messageBrokerId, MultiMessageBroker.class);
+            MessageBroker accumulatorMsgBroker = new AccumulatorMessageBroker(msgBroker, subGroupCount);
+
+            List<Watcher> watchers = new ArrayList<>(subGroupCount);
+            for (String execGroupId : execSuperGroup.getKeys()) {
+                ExecutableGroup execGroup = execSuperGroup.getNode(execGroupId).getValue();
+                DataProvider dataProvider = dataProviderManager.getInstance(execGroup.getConfig().getDataProviderId());
+                watchers.add(Watcher.create(execGroup, dataProvider, accumulatorMsgBroker));
+            }
+
+            String taskName = execSuperGroup.getConfig().getName();
+            Runnable runnableTask = () -> watchers.forEach(Watcher::check);
+
+            if (params.isExecuteImmediately()) {
+                schedulerBuilder.addTask(taskName, runnableTask);
+            } else {
+                schedulerBuilder.addTask(taskName, runnableTask, execSuperGroup.getConfig().getPeriod(), startAt);
+            }
+        }
 
         Scheduler scheduler = schedulerBuilder.build();
 

+ 23 - 11
src/main/java/cz/senslog/watchdog/config/Configuration.java

@@ -21,7 +21,8 @@ public class Configuration {
     private static final Logger logger = LogManager.getLogger(Configuration.class);
 
     private final GeneralConfig generalConfig;
-    private final Collection<ExecutableGroup> executableGroups;
+    private final Collection<ExecutableGroup> execGroups;
+    private final Collection<ExecutableSuperGroup> execSuperGroups;
     private final Collection<MessageBrokerConfig> messageBrokerConfigs;
     private final Collection<EmailServerConfig> emailServerConfigs;
     private final Collection<SensLogServerConfig> sensLogServerConfigs;
@@ -30,13 +31,15 @@ public class Configuration {
     private Configuration(
             GeneralConfig generalConfig,
             Collection<ExecutableGroup> executableGroups,
+            Collection<ExecutableSuperGroup> execSGroups,
             Collection<EmailServerConfig> emailServerConfigs,
             Collection<MessageBrokerConfig> messageBrokerConfigs,
             Collection<SensLogServerConfig> sensLogServerConfigs,
             Collection<DataProviderConfig> dataProviderConfigs
     ){
         this.generalConfig = generalConfig;
-        this.executableGroups = executableGroups;
+        this.execGroups = executableGroups;
+        this.execSuperGroups = execSGroups;
         this.messageBrokerConfigs = messageBrokerConfigs;
         this.emailServerConfigs = emailServerConfigs;
         this.sensLogServerConfigs = sensLogServerConfigs;
@@ -118,7 +121,7 @@ public class Configuration {
             }
         }
 
-        Map<String, SuperGroup> executableSGroupsMap = new HashMap<>();
+        Map<String, ExecutableSuperGroup> executableSGroupsMap = new HashMap<>();
         for (SuperGroupConfig sGConfig : superGroups.values()) {
             if (sGConfig.getPeriod() == null) {
                 throw new IllegalStateException(
@@ -126,26 +129,31 @@ public class Configuration {
                 );
             }
 
-            SuperGroup superGroup = new SuperGroup(sGConfig);
+            ExecutableSuperGroup executableSuperGroup = new ExecutableSuperGroup(sGConfig);
             for (String groupId : sGConfig.getGroups()) {
                 ExecutableGroup group = executableGroupsMap.get(groupId);
                 if (group == null) {
                     throw new IllegalStateException("Assigned group '"+groupId+"' does not exist.");
                 }
-                superGroup.addNode(groupId, group);
+                GroupConfig gConfig = group.getConfig();
+                GroupConfig newGConfig = new GroupConfig(gConfig.getId(), gConfig.getName(), true,
+                        gConfig.getDataProviderId(), gConfig.getMessageBrokerId(),
+                        sGConfig.getResultType(), sGConfig.getPeriod()
+                );
+
+                executableSuperGroup.addNode(groupId, ExecutableGroup.copy(group, newGConfig));
             }
-            executableSGroupsMap.put(sGConfig.getId(), superGroup);
+            executableSGroupsMap.put(sGConfig.getId(), executableSuperGroup);
         }
 
-        // TODO return
         Collection<ExecutableGroup> execGroups = executableGroupsMap.values();
-        Collection<SuperGroup> execSGroups = executableSGroupsMap.values();
+        Collection<ExecutableSuperGroup> execSGroups = executableSGroupsMap.values();
         Collection<EmailServerConfig> emailServerConfigs = emailServers.values();
         Collection<SensLogServerConfig> sensLogServerConfigs = sensLogServers.values();
         Collection<DataProviderConfig> dataProviderConfigs = dataProviders.values();
         Collection<MessageBrokerConfig> messageBrokerConfigs = messageBrokers.values();
 
-        return new Configuration(generalConfig, execGroups,
+        return new Configuration(generalConfig, execGroups, execSGroups,
                 emailServerConfigs, messageBrokerConfigs,
                 sensLogServerConfigs, dataProviderConfigs
         );
@@ -340,14 +348,18 @@ public class Configuration {
         return dataProviderConfigs;
     }
 
-    public Collection<ExecutableGroup> getExecutableGroups() {
-        return executableGroups;
+    public Collection<ExecutableGroup> getExecGroups() {
+        return execGroups;
     }
 
     public GeneralConfig getGeneralConfig() {
         return generalConfig;
     }
 
+    public Collection<ExecutableSuperGroup> getSuperGroupsConfig() {
+        return execSuperGroups;
+    }
+
     private static class TempUnitConfig {
         private final String id;
         private final Integer period;

+ 49 - 35
src/main/java/cz/senslog/watchdog/config/ExecutableGroup.java

@@ -1,35 +1,49 @@
-package cz.senslog.watchdog.config;
-
-import cz.senslog.watchdog.core.adt.Graph;
-
-public class ExecutableGroup extends Graph<String, MonitoredObject> {
-
-    private final MonitoredObject root;
-    private final GroupConfig config;
-
-    private final boolean enableSwap;
-
-    public ExecutableGroup(GroupConfig config) {
-        this(config, new MonitoredObject(config.getId(), config.getPeriod()));
-    }
-
-    private ExecutableGroup(GroupConfig config, MonitoredObject monitoredObject) {
-        super(monitoredObject.getId(), monitoredObject);
-        this.root = monitoredObject;
-        this.config = config;
-        this.enableSwap = config.getPeriod() == null;
-        this.root.setPeriod(enableSwap ? Integer.MAX_VALUE : config.getPeriod());
-    }
-
-    public void swapPeriod(Integer newPeriod) {
-        if (enableSwap && newPeriod != null) {
-            root.setPeriod(Math.min(newPeriod, root.getPeriod()));
-        }
-    }
-
-    public GroupConfig getConfig() {
-        return new GroupConfig(config.getId(), config.getName(), config.getDataProviderId(),
-                config.getMessageBrokerId(), config.getResultType(), root.getPeriod()
-        );
-    }
-}
+package cz.senslog.watchdog.config;
+
+import cz.senslog.watchdog.core.adt.Graph;
+import cz.senslog.watchdog.core.adt.Node;
+
+public class ExecutableGroup extends Graph<String, MonitoredObject> {
+
+    private final MonitoredObject root;
+    private final GroupConfig config;
+
+    private final boolean enableSwap;
+
+    // this is not a deep copy
+    public static ExecutableGroup copy(ExecutableGroup oldGroup, GroupConfig newGroupConfig) {
+        return new ExecutableGroup(newGroupConfig, oldGroup.getRoot());
+    }
+
+    private ExecutableGroup(GroupConfig config, Node<String, MonitoredObject> rootNode) {
+        super(rootNode);
+        this.root = rootNode.getValue();
+        this.config = config;
+        this.enableSwap = config.getPeriod() == null;
+        this.root.setPeriod(enableSwap ? Integer.MAX_VALUE : config.getPeriod());
+    }
+
+    public ExecutableGroup(GroupConfig config) {
+        this(config, new MonitoredObject(config.getId(), config.getPeriod()));
+    }
+
+    private ExecutableGroup(GroupConfig config, MonitoredObject monitoredObject) {
+        super(monitoredObject.getId(), monitoredObject);
+        this.root = monitoredObject;
+        this.config = config;
+        this.enableSwap = config.getPeriod() == null;
+        this.root.setPeriod(enableSwap ? Integer.MAX_VALUE : config.getPeriod());
+    }
+
+    public void swapPeriod(Integer newPeriod) {
+        if (enableSwap && newPeriod != null) {
+            root.setPeriod(Math.min(newPeriod, root.getPeriod()));
+        }
+    }
+
+    public GroupConfig getConfig() {
+        return new GroupConfig(config.getId(), config.getName(), config.isActive(), config.getDataProviderId(),
+                config.getMessageBrokerId(), config.getResultType(), root.getPeriod()
+        );
+    }
+}

+ 14 - 0
src/main/java/cz/senslog/watchdog/config/ExecutableSuperGroup.java

@@ -0,0 +1,14 @@
+package cz.senslog.watchdog.config;
+
+import cz.senslog.watchdog.core.adt.Graph;
+
+public class ExecutableSuperGroup extends Graph<String, ExecutableGroup> {
+
+    public ExecutableSuperGroup(GroupConfig config) {
+        super(config.getId(), new ExecutableGroup(config));
+    }
+
+    public GroupConfig getConfig() {
+        return getRoot().getValue().getConfig();
+    }
+}

+ 8 - 1
src/main/java/cz/senslog/watchdog/config/GroupConfig.java

@@ -6,6 +6,7 @@ public class GroupConfig {
 
     private final String id;
     private final String name;
+    private final boolean active;
     private final String dataProviderId;
     private final String messageBrokerId;
     private final ResultType resultType;
@@ -14,6 +15,7 @@ public class GroupConfig {
     public static GroupConfig create(PropertyConfig config) {
         return new GroupConfig(config.getId(),
                 config.getStringProperty("name"),
+                config.getBooleanProperty("active"),
                 config.getStringProperty("dataProvider"),
                 config.getStringProperty("messageBroker"),
                 ResultType.of(config.getStringProperty("resultType")),
@@ -22,9 +24,10 @@ public class GroupConfig {
         );
     }
 
-    public GroupConfig(String id, String name, String dataProviderId, String messageBrokerId, ResultType resultType, Integer period) {
+    public GroupConfig(String id, String name, boolean active, String dataProviderId, String messageBrokerId, ResultType resultType, Integer period) {
         this.id = id;
         this.name = name;
+        this.active = active;
         this.dataProviderId = dataProviderId;
         this.messageBrokerId = messageBrokerId;
         this.resultType = resultType;
@@ -55,6 +58,10 @@ public class GroupConfig {
         return name;
     }
 
+    public boolean isActive() {
+        return active;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;

+ 10 - 0
src/main/java/cz/senslog/watchdog/config/PropertyConfig.java

@@ -122,6 +122,16 @@ public class PropertyConfig {
         ));
     }
 
+    public boolean getBooleanProperty(String name) {
+        Object value = getProperty(name);
+        if (value instanceof Boolean) {
+            return (Boolean)value;
+        }
+        throw new ClassCastException(format(
+                "Value '%s' can not be cast to Boolean", value
+        ));
+    }
+
     /**
      * Returns property as an Integer.
      * @param name - name of property.

+ 0 - 10
src/main/java/cz/senslog/watchdog/config/SuperGroup.java

@@ -1,10 +0,0 @@
-package cz.senslog.watchdog.config;
-
-import cz.senslog.watchdog.core.adt.Graph;
-
-public class SuperGroup extends Graph<String, ExecutableGroup> {
-
-    public SuperGroup(GroupConfig config) {
-        super(config.getId(), new ExecutableGroup(config));
-    }
-}

+ 1 - 1
src/main/java/cz/senslog/watchdog/config/SuperGroupConfig.java

@@ -7,7 +7,7 @@ public class SuperGroupConfig extends GroupConfig {
     private final Set<String> groups;
 
     public SuperGroupConfig(String id, String name, String messageBrokerId, ResultType resultType, Integer period, Set<String> groups) {
-        super(id, name,null, messageBrokerId, resultType, period);
+        super(id, name, true, null, messageBrokerId, resultType, period);
         this.groups = groups;
     }
 

+ 3 - 3
src/main/java/cz/senslog/watchdog/core/Watcher.java

@@ -117,14 +117,14 @@ public class Watcher {
         operationProperties.put("Group name", group.getConfig().getName());
 
         messageBroker.send(new Report(ofInstant(now, ZoneId.systemDefault()), reports, reportedMessages, operationProperties), status -> {
-            String brokerType = messageBroker.getType().name().toLowerCase();
+            String brokerType = status.getBrokerName();
             if (status.isSuccess()) {
                 logger.info("The report at '{}' was send via '{}' broker successfully for the group's name '{}'.",
-                        status.getReport().getCreated(), brokerType, group.getConfig().getName()
+                        status.getCreatedReport(), brokerType, group.getConfig().getName()
                 );
             } else {
                 logger.error("Can not send a message '{}' via '{}' broker because of '{}'.",
-                        status.getReport().getCreated(), brokerType, status.getError()
+                        status.getCreatedReport(), brokerType, status.getError()
                 );
             }
         });

+ 12 - 3
src/main/java/cz/senslog/watchdog/core/connection/SensLogWSConnection.java

@@ -23,11 +23,13 @@ public class SensLogWSConnection {
     private final SensLogServerConfig config;
 
     private Tuple<Boolean, HttpCookie> authCookie;
+    private int authError;
 
     public SensLogWSConnection(SensLogServerConfig config) {
         this.config = config;
         this.httpClient = HttpClient.newHttpClient();
         this.authCookie = Tuple.of(false, null);
+        this.authError = 0;
     }
 
     private synchronized HttpCookie getAuthCookie() {
@@ -71,7 +73,7 @@ public class SensLogWSConnection {
         return cookie;
     }
 
-    private List<Map<String, Object>> loadLastObservation(HttpRequest.Builder reqBuilder) {
+    private List<Map<String, Object>> callRequest(HttpRequest.Builder reqBuilder) {
         HttpCookie authCookie = getAuthCookie();
         if (!authCookie.isSecure()) {
             logger.warn("Auth cookie is not valid to be used.");
@@ -84,17 +86,24 @@ public class SensLogWSConnection {
         HttpResponse response = httpClient.send(request);
         logger.info("Received data with the status '{}' from the server {}.", response.getStatus(), request.getUrl());
 
+        if (response.getStatus() == HttpCode.UNAUTHORIZED && authError <= 2) {
+            this.authCookie = Tuple.of(false, null);
+            this.authError++;
+            return callRequest(reqBuilder);
+        }
+
         if (response.isError()) {
             logger.error("code: {}, message: {}", response.getStatus(), response.getBody());
             throw new IllegalStateException(response.getBody());
         }
 
+        this.authError = 0;
         final Type lastObsType = new TypeToken<Collection<Map<String, Object>>>() {}.getType();
         return jsonToObject(response.getBody(), lastObsType);
     }
 
     public List<Map<String, Object>> loadLastObservation(String [] unitIds) {
-        return loadLastObservation(HttpRequest.newBuilder().GET()
+        return callRequest(HttpRequest.newBuilder().GET()
                 .url(URLBuilder.newBuilder(config.getBaseUrl(), "/rest/watchdog/unit")
                         .addParam("unit_id", String.join(",", unitIds))
                         .build())
@@ -102,7 +111,7 @@ public class SensLogWSConnection {
     }
 
     public List<Map<String, Object>> loadLastObservation(String groupName) {
-        return loadLastObservation(HttpRequest.newBuilder().GET()
+        return callRequest(HttpRequest.newBuilder().GET()
                 .url(URLBuilder.newBuilder(config.getBaseUrl(), "/rest/watchdog/group")
                         .addParam("group_name", groupName)
                         .build())

+ 9 - 11
src/main/java/cz/senslog/watchdog/messagebroker/MessageBroker.java

@@ -1,11 +1,9 @@
-package cz.senslog.watchdog.messagebroker;
-
-import cz.senslog.watchdog.config.MessageBrokerType;
-import cz.senslog.watchdog.domain.Report;
-
-public interface MessageBroker {
-
-    void send(Report report, MessageBrokerHandler status);
-
-    MessageBrokerType getType();
-}
+package cz.senslog.watchdog.messagebroker;
+
+import cz.senslog.watchdog.domain.Report;
+
+public interface MessageBroker {
+
+    void send(Report report, MessageBrokerHandler status);
+
+}

+ 10 - 3
src/main/java/cz/senslog/watchdog/messagebroker/MessageBrokerManager.java

@@ -12,7 +12,7 @@ import java.util.Map;
 
 public class MessageBrokerManager {
 
-    private final Map<String, MessageBroker> brokerInstances;
+    private final Map<String, ? extends MessageBroker> brokerInstances;
 
     public static MessageBrokerManager createInstance(
             Collection<EmailServerConfig> emailServerConfigs,
@@ -53,10 +53,17 @@ public class MessageBrokerManager {
         this.brokerInstances = brokerInstances;
     }
 
-    public MessageBroker getInstance(String messageBrokerId) {
+    @SuppressWarnings("unchecked")
+    public <T extends MessageBroker> T getInstance(String messageBrokerId, Class<T> brokerClass) {
         MessageBroker instance = brokerInstances.get(messageBrokerId);
         if (instance != null) {
-            return instance;
+            try {
+                return (T) instance;
+            } catch (ClassCastException e) {
+                throw new IllegalStateException(String.format("Can not cast the '%s' to the '%s' for the id '%s'.",
+                        instance.getClass(), brokerClass, messageBrokerId)
+                );
+            }
         }
         throw new IllegalStateException("Message broker '"+messageBrokerId+"' does not exist.");
     }

+ 49 - 36
src/main/java/cz/senslog/watchdog/messagebroker/MessageStatus.java

@@ -1,36 +1,49 @@
-package cz.senslog.watchdog.messagebroker;
-
-import cz.senslog.watchdog.domain.Report;
-
-public class MessageStatus {
-
-    private final Report report;
-    private final String error;
-
-    public MessageStatus(Report report) {
-        this.report = report;
-        this.error = null;
-    }
-
-    public MessageStatus(Report report, String error) {
-        this.report = report;
-        this.error = error;
-    }
-
-    public Report getReport() {
-        return report;
-    }
-
-    public boolean isSuccess() {
-        return error == null;
-    }
-
-    public boolean isError() {
-        return !isSuccess();
-    }
-
-
-    public String getError() {
-        return error;
-    }
-}
+package cz.senslog.watchdog.messagebroker;
+
+import java.time.LocalDateTime;
+
+public class MessageStatus {
+
+    private final LocalDateTime createdReport;
+    private final String error;
+    private final String brokerName;
+
+    public static MessageStatus success(LocalDateTime createdReport) {
+        return success(createdReport, "unknown");
+    }
+
+    public static MessageStatus success(LocalDateTime createdReport, String brokerName) {
+        return new MessageStatus(createdReport, null, brokerName);
+    }
+
+    public static MessageStatus error(LocalDateTime createdReport, String errorMessage, String brokerName) {
+        return new MessageStatus(createdReport, errorMessage, brokerName);
+    }
+
+    public MessageStatus(LocalDateTime createdReport, String error, String brokerName) {
+        this.createdReport = createdReport;
+        this.error = error;
+        this.brokerName = brokerName;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public LocalDateTime getCreatedReport() {
+        return createdReport;
+    }
+
+    public boolean isSuccess() {
+        return error == null;
+    }
+
+    public boolean isError() {
+        return !isSuccess();
+    }
+
+
+    public String getError() {
+        return error;
+    }
+}

+ 13 - 0
src/main/java/cz/senslog/watchdog/messagebroker/MultiMessageBroker.java

@@ -0,0 +1,13 @@
+package cz.senslog.watchdog.messagebroker;
+
+import cz.senslog.watchdog.domain.Report;
+
+public abstract class MultiMessageBroker implements MessageBroker {
+
+    @Override
+    public void send(Report report, MessageBrokerHandler status) {
+        send(new Report[]{report}, status);
+    }
+
+    public abstract void send(Report[] reports, MessageBrokerHandler status);
+}

+ 50 - 0
src/main/java/cz/senslog/watchdog/messagebroker/broker/AccumulatorMessageBroker.java

@@ -0,0 +1,50 @@
+package cz.senslog.watchdog.messagebroker.broker;
+
+import cz.senslog.watchdog.domain.Report;
+import cz.senslog.watchdog.messagebroker.MessageBroker;
+import cz.senslog.watchdog.messagebroker.MessageBrokerHandler;
+import cz.senslog.watchdog.messagebroker.MessageStatus;
+import cz.senslog.watchdog.messagebroker.MultiMessageBroker;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class AccumulatorMessageBroker implements MessageBroker {
+
+    private static final Logger logger = LogManager.getLogger(AccumulatorMessageBroker.class);
+
+    private final MultiMessageBroker messageBroker;
+    private final Report [] accumulatedReports;
+    private int currentAmount;
+
+    public AccumulatorMessageBroker(MultiMessageBroker messageBroker, int resultCount) {
+        this.messageBroker = messageBroker;
+        this.accumulatedReports = new Report[resultCount];
+        this.currentAmount = 0;
+    }
+
+    @Override
+    public synchronized void send(Report report, MessageBrokerHandler status) {
+        int newSize = currentAmount + 1;
+
+        if (newSize > accumulatedReports.length) {
+            throw new IllegalStateException(String.format(
+                    "The accumulator received more reports (%d) than is allowed to handle (%d).", newSize, accumulatedReports.length
+            ));
+        }
+
+        accumulatedReports[currentAmount++] = report;
+        status.handle(MessageStatus.success(report.getCreated(), "accumulatorMessageBroker"));
+
+        if (currentAmount == accumulatedReports.length) {
+            messageBroker.send(accumulatedReports, st -> {
+                String brokerType = st.getBrokerName();
+                if (st.isSuccess()) {
+                    logger.info("The report at '{}' was send via '{}' broker successfully.", st.getCreatedReport(), brokerType);
+                    currentAmount = 0;
+                } else {
+                    logger.error("Can not send a message '{}' via '{}' broker because of '{}'.", st.getCreatedReport(), brokerType, st.getError());
+                }
+            });
+        }
+    }
+}

+ 35 - 37
src/main/java/cz/senslog/watchdog/messagebroker/broker/ConsoleMessageBroker.java

@@ -1,38 +1,36 @@
-package cz.senslog.watchdog.messagebroker.broker;
-
-import cz.senslog.watchdog.config.ConsoleMessageBrokerConfig;
-import cz.senslog.watchdog.config.MessageBrokerType;
-import cz.senslog.watchdog.domain.Report;
-import cz.senslog.watchdog.domain.SimpleReport;
-import cz.senslog.watchdog.messagebroker.MessageBroker;
-import cz.senslog.watchdog.messagebroker.MessageBrokerHandler;
-import cz.senslog.watchdog.messagebroker.MessageStatus;
-import cz.senslog.watchdog.messagebroker.writer.DelimiterTableWriter;
-import cz.senslog.watchdog.messagebroker.writer.TableWriter;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class ConsoleMessageBroker implements MessageBroker {
-
-    private static final Logger logger = LogManager.getLogger(ConsoleMessageBroker.class);
-
-    private final ConsoleMessageBrokerConfig config;
-
-    public ConsoleMessageBroker(ConsoleMessageBrokerConfig config) {
-        this.config = config;
-    }
-
-    @Override
-    public void send(Report report, MessageBrokerHandler status) {
-        TableWriter tableWriter = DelimiterTableWriter.create(";")
-                .cell("unitId").cell("sensorId").cell("lastObservation").end();
-        report.getReports().stream().map(SimpleReport::getRecord).forEach(r -> r.writeRow(tableWriter.row()));
-        logger.info("Sending a report created at {} to the console.\n{}", report.getCreated(), tableWriter.table());
-        status.handle(new MessageStatus(report));
-    }
-
-    @Override
-    public MessageBrokerType getType() {
-        return MessageBrokerType.CONSOLE;
-    }
+package cz.senslog.watchdog.messagebroker.broker;
+
+import cz.senslog.watchdog.config.ConsoleMessageBrokerConfig;
+import cz.senslog.watchdog.config.MessageBrokerType;
+import cz.senslog.watchdog.domain.Report;
+import cz.senslog.watchdog.domain.SimpleReport;
+import cz.senslog.watchdog.messagebroker.MessageBroker;
+import cz.senslog.watchdog.messagebroker.MessageBrokerHandler;
+import cz.senslog.watchdog.messagebroker.MessageStatus;
+import cz.senslog.watchdog.messagebroker.MultiMessageBroker;
+import cz.senslog.watchdog.messagebroker.writer.DelimiterTableWriter;
+import cz.senslog.watchdog.messagebroker.writer.TableWriter;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class ConsoleMessageBroker extends MultiMessageBroker {
+
+    private static final Logger logger = LogManager.getLogger(ConsoleMessageBroker.class);
+
+    private final ConsoleMessageBrokerConfig config;
+
+    public ConsoleMessageBroker(ConsoleMessageBrokerConfig config) {
+        this.config = config;
+    }
+
+    @Override
+    public void send(Report[] reports, MessageBrokerHandler status) {
+        for (Report report : reports) {
+            TableWriter tableWriter = DelimiterTableWriter.create(";")
+                    .cell("unitId").cell("sensorId").cell("lastObservation").end();
+            report.getReports().stream().map(SimpleReport::getRecord).forEach(r -> r.writeRow(tableWriter.row()));
+            logger.info("Sending a report created at {} to the console.\n{}", report.getCreated(), tableWriter.table());
+            status.handle(MessageStatus.success(report.getCreated(), MessageBrokerType.EMAIL.name()));
+        }
+    }
 }

+ 19 - 15
src/main/java/cz/senslog/watchdog/messagebroker/broker/EmailMessageBroker.java

@@ -1,25 +1,27 @@
 package cz.senslog.watchdog.messagebroker.broker;
 
 import cz.senslog.watchdog.config.EmailMessageBrokerConfig;
-import cz.senslog.watchdog.config.MessageBrokerType;
 import cz.senslog.watchdog.core.connection.EmailServerConnection;
 import cz.senslog.watchdog.domain.*;
-import cz.senslog.watchdog.messagebroker.MessageBroker;
 import cz.senslog.watchdog.messagebroker.MessageBrokerHandler;
 import cz.senslog.watchdog.messagebroker.MessageStatus;
+import cz.senslog.watchdog.messagebroker.MultiMessageBroker;
 import cz.senslog.watchdog.messagebroker.writer.HtmlTableWriter;
 import cz.senslog.watchdog.messagebroker.writer.TableWriter;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import javax.mail.*;
+import java.time.LocalDateTime;
 import java.util.*;
 
+import static cz.senslog.watchdog.config.MessageBrokerType.EMAIL;
 import static java.time.format.DateTimeFormatter.ofPattern;
 
-public class EmailMessageBroker implements MessageBroker {
+public class EmailMessageBroker extends MultiMessageBroker {
 
     private static final String BREAK_LINE = "<br />";
+    private static final String HORIZONTAL_SEPARATOR = "<hr>";
 
     private static final Logger logger = LogManager.getLogger(EmailMessageBroker.class);
 
@@ -99,26 +101,28 @@ public class EmailMessageBroker implements MessageBroker {
         return content.toString();
     }
 
+
     @Override
-    public void send(Report report, MessageBrokerHandler status) {
-        if (report == null) {
+    public void send(Report[] reports, MessageBrokerHandler status) {
+        if (reports == null || reports.length <= 0) {
             logger.info("Nothing to send. The receive report is null.");
-            status.handle(new MessageStatus(null, "No report to send.")); return;
+            status.handle(new MessageStatus(null, "No report to send.", EMAIL.name())); return;
         }
 
         try {
+            StringBuilder reportMessage = new StringBuilder();
+            for (Report report : reports) {
+                reportMessage.append(createMessage(report))
+                        .append(HORIZONTAL_SEPARATOR).append(HORIZONTAL_SEPARATOR)
+                        .append(BREAK_LINE);
+            }
             logger.info("Sending a message via email.");
-            serverConnection.send(createMessage(report), messageConfig);
+            serverConnection.send(reportMessage.toString(), messageConfig);
             logger.info("The message was send successfully.");
-            status.handle(new MessageStatus(report));
+            status.handle(MessageStatus.success(LocalDateTime.now(), EMAIL.name()));
         } catch (MessagingException e) {
             logger.catching(e);
-            status.handle(new MessageStatus(report, e.getMessage()));
+            status.handle(MessageStatus.error(LocalDateTime.now(), e.getMessage(), EMAIL.name()));
         }
     }
-
-    @Override
-    public MessageBrokerType getType() {
-        return MessageBrokerType.EMAIL;
-    }
-}
+}

+ 19 - 26
src/main/java/cz/senslog/watchdog/messagebroker/broker/SignalMessageBroker.java

@@ -1,26 +1,19 @@
-package cz.senslog.watchdog.messagebroker.broker;
-
-import cz.senslog.watchdog.config.MessageBrokerType;
-import cz.senslog.watchdog.config.SignalMessageBrokerConfig;
-import cz.senslog.watchdog.domain.Report;
-import cz.senslog.watchdog.messagebroker.MessageBroker;
-import cz.senslog.watchdog.messagebroker.MessageBrokerHandler;
-
-public class SignalMessageBroker implements MessageBroker {
-
-    private final SignalMessageBrokerConfig config;
-
-    public SignalMessageBroker(SignalMessageBrokerConfig config) {
-        this.config = config;
-    }
-
-    @Override
-    public void send(Report report, MessageBrokerHandler status) {
-
-    }
-
-    @Override
-    public MessageBrokerType getType() {
-        return MessageBrokerType.SIGNAL;
-    }
-}
+package cz.senslog.watchdog.messagebroker.broker;
+
+import cz.senslog.watchdog.config.SignalMessageBrokerConfig;
+import cz.senslog.watchdog.domain.Report;
+import cz.senslog.watchdog.messagebroker.MessageBrokerHandler;
+import cz.senslog.watchdog.messagebroker.MultiMessageBroker;
+
+public class SignalMessageBroker extends MultiMessageBroker {
+
+    private final SignalMessageBrokerConfig config;
+
+    public SignalMessageBroker(SignalMessageBrokerConfig config) {
+        this.config = config;
+    }
+    @Override
+    public void send(Report[] reports, MessageBrokerHandler status) {
+
+    }
+}