|
@@ -11,11 +11,10 @@ import org.apache.logging.log4j.Logger;
|
|
|
|
|
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.Executors;
|
|
import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
|
|
|
|
|
|
-import static java.util.concurrent.TimeUnit.SECONDS;
|
|
|
|
|
-
|
|
|
|
|
/**
|
|
/**
|
|
|
* The class {@code Application} represents a trigger for entire application.
|
|
* The class {@code Application} represents a trigger for entire application.
|
|
|
*
|
|
*
|
|
@@ -23,37 +22,38 @@ import static java.util.concurrent.TimeUnit.SECONDS;
|
|
|
* @version 1.0
|
|
* @version 1.0
|
|
|
* @since 1.0
|
|
* @since 1.0
|
|
|
*/
|
|
*/
|
|
|
-class Application implements Runnable {
|
|
|
|
|
|
|
+class Application extends Thread {
|
|
|
|
|
|
|
|
private static Logger logger = LogManager.getLogger(Application.class);
|
|
private static Logger logger = LogManager.getLogger(Application.class);
|
|
|
|
|
|
|
|
-
|
|
|
|
|
- /** Default initialization delay value when the scheduler starts to schedule tasks (in seconds). */
|
|
|
|
|
- private static int DEFAULT_TASK_DELAY = 2;
|
|
|
|
|
-
|
|
|
|
|
- /** Default value for scheduling tasks when the value missing in the configuration file (in seconds). */
|
|
|
|
|
- private static int DEFAULT_SCHEDULE_PERIOD = 3600; // every hour
|
|
|
|
|
-
|
|
|
|
|
/** Attribute of basic configuration values of the application. */
|
|
/** Attribute of basic configuration values of the application. */
|
|
|
private final AppConfig appConfig;
|
|
private final AppConfig appConfig;
|
|
|
|
|
|
|
|
/** Attribute of input parameters of the application. */
|
|
/** Attribute of input parameters of the application. */
|
|
|
private final Parameters params;
|
|
private final Parameters params;
|
|
|
|
|
|
|
|
|
|
+ private CountDownLatch latch;
|
|
|
|
|
+
|
|
|
|
|
+ private ScheduledExecutorService scheduler;
|
|
|
|
|
+
|
|
|
/**
|
|
/**
|
|
|
* Initialization method to trigger the application.
|
|
* Initialization method to trigger the application.
|
|
|
* @param args - array of parameters.
|
|
* @param args - array of parameters.
|
|
|
* @return new thread of {@code Runnable}.
|
|
* @return new thread of {@code Runnable}.
|
|
|
* @throws IOException throws if input parameters or application configuration file can not be parsed.
|
|
* @throws IOException throws if input parameters or application configuration file can not be parsed.
|
|
|
*/
|
|
*/
|
|
|
- static Runnable init(String... args) throws IOException {
|
|
|
|
|
|
|
+ static Thread init(String... args) throws IOException {
|
|
|
AppConfig appConfig = AppConfig.load();
|
|
AppConfig appConfig = AppConfig.load();
|
|
|
Parameters parameters = Parameters.parse(appConfig, args);
|
|
Parameters parameters = Parameters.parse(appConfig, args);
|
|
|
|
|
|
|
|
if (parameters.isHelp()) {
|
|
if (parameters.isHelp()) {
|
|
|
- return parameters::printHelp;
|
|
|
|
|
|
|
+ return new Thread(parameters::printHelp);
|
|
|
}
|
|
}
|
|
|
- return new Application(appConfig, parameters);
|
|
|
|
|
|
|
+
|
|
|
|
|
+ Application app = new Application(appConfig, parameters);
|
|
|
|
|
+ Runtime.getRuntime().addShutdownHook(new Thread(app::interrupt, "clean-app"));
|
|
|
|
|
+
|
|
|
|
|
+ return app;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -62,38 +62,73 @@ class Application implements Runnable {
|
|
|
* @param parameters parsed input parameters of the application. More info of the class {@see Parameters}.
|
|
* @param parameters parsed input parameters of the application. More info of the class {@see Parameters}.
|
|
|
*/
|
|
*/
|
|
|
private Application(AppConfig appConfig, Parameters parameters) {
|
|
private Application(AppConfig appConfig, Parameters parameters) {
|
|
|
|
|
+ super("app");
|
|
|
|
|
+
|
|
|
this.appConfig = appConfig;
|
|
this.appConfig = appConfig;
|
|
|
this.params = parameters;
|
|
this.params = parameters;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
|
|
|
+ public void interrupt() {
|
|
|
|
|
+ logger.info("Stopping the application {} version {}", appConfig.getName(), appConfig.getVersion());
|
|
|
|
|
+
|
|
|
|
|
+ if (latch != null) {
|
|
|
|
|
+ for (int i = 0; i < latch.getCount(); i++) {
|
|
|
|
|
+ latch.countDown();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ logger.info("Cleaning all connector's threads.");
|
|
|
|
|
+ if (scheduler != null) {
|
|
|
|
|
+ scheduler.shutdownNow();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ logger.info("The application was stopped.");
|
|
|
|
|
+ super.interrupt();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @Override
|
|
|
public void run() {
|
|
public void run() {
|
|
|
- logger.info("Starting application {} version {}", appConfig.getName(), appConfig.getVersion());
|
|
|
|
|
|
|
+ logger.info("Starting the application {} version {}", appConfig.getName(), appConfig.getVersion());
|
|
|
|
|
|
|
|
|
|
+
|
|
|
|
|
+ ConfigurationService configService;
|
|
|
try {
|
|
try {
|
|
|
- FileConfigurationService configService = ConfigurationService.newFileBuilder()
|
|
|
|
|
|
|
+ FileConfigurationService service = ConfigurationService.newFileBuilder()
|
|
|
.fileName(params.getConfigFileName()).build();
|
|
.fileName(params.getConfigFileName()).build();
|
|
|
|
|
|
|
|
- configService.load();
|
|
|
|
|
-
|
|
|
|
|
- ServiceProvider serviceProvider = new ServiceProvider(ConnectorFetch::getProvider, ConnectorPush::getProvider);
|
|
|
|
|
- ModelConverterProvider connectorProvider = new ModelConverterProvider();
|
|
|
|
|
-
|
|
|
|
|
- Set<Connector> connectors = ConnectorBuilder.init(serviceProvider, connectorProvider, configService).createConnectors();
|
|
|
|
|
- if (!connectors.isEmpty()) {
|
|
|
|
|
- logger.info("Starting a scheduler for connectors.");
|
|
|
|
|
- ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(connectors.size());
|
|
|
|
|
- connectors.forEach(conn -> {
|
|
|
|
|
- int schedulePeriod = conn.getPeriod().orElse(DEFAULT_SCHEDULE_PERIOD);
|
|
|
|
|
- int delay = conn.getInitDelay().orElse(DEFAULT_TASK_DELAY);
|
|
|
|
|
- logger.info("Scheduling the {} starts in {} with the period {} seconds.", conn.getName(), delay, schedulePeriod);
|
|
|
|
|
- scheduler.scheduleAtFixedRate(conn.getTask(), delay, schedulePeriod, SECONDS);
|
|
|
|
|
- });
|
|
|
|
|
- } else {
|
|
|
|
|
- logger.warn("No connectors were loaded.");
|
|
|
|
|
|
|
+ service.load();
|
|
|
|
|
+
|
|
|
|
|
+ configService = service;
|
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
|
+ logger.catching(e); return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ ServiceProvider serviceProvider = new ServiceProvider(ConnectorFetch::getProvider, ConnectorPush::getProvider);
|
|
|
|
|
+ ModelConverterProvider connectorProvider = new ModelConverterProvider();
|
|
|
|
|
+ ConnectorBuilder connectorBuilder = ConnectorBuilder.init(serviceProvider, connectorProvider, configService);
|
|
|
|
|
+ Set<Connector> connectors = connectorBuilder.createConnectors();
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ if (!connectors.isEmpty()) {
|
|
|
|
|
+ scheduler = Executors.newScheduledThreadPool(connectors.size());
|
|
|
|
|
+ latch = new CountDownLatch(connectors.size());
|
|
|
|
|
+
|
|
|
|
|
+ logger.info("Starting a scheduler for {} connector(s).", connectors.size());
|
|
|
|
|
+ connectors.forEach(c -> c.schedule(scheduler, latch));
|
|
|
|
|
+
|
|
|
|
|
+ try {
|
|
|
|
|
+ logger.info("Waiting for the working threads.");
|
|
|
|
|
+ latch.await();
|
|
|
|
|
+ logger.info("All scheduled connector finished their job.");
|
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
|
+ logger.catching(e);
|
|
|
}
|
|
}
|
|
|
- } catch (Exception e) {
|
|
|
|
|
- logger.catching(e);
|
|
|
|
|
|
|
+ } else {
|
|
|
|
|
+ logger.warn("No connectors were loaded.");
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ interrupt();
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|