Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion backend/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>io.edifice</groupId>
<artifactId>app-parent</artifactId>
<version>1.0.1</version>
<version>1.1-SNAPSHOT</version>
</parent>

<groupId>com.opendigitaleducation</groupId>
Expand Down Expand Up @@ -76,5 +76,12 @@
<classifier>fat</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>fr.wseduc</groupId>
<artifactId>mod-postgresql</artifactId>
<version>2.0-zookeeper-SNAPSHOT</version>
<scope>runtime</scope>
<classifier>fat</classifier>
</dependency>
</dependencies>
</project>
214 changes: 106 additions & 108 deletions backend/src/main/java/com/opendigitaleducation/explorer/Explorer.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.opendigitaleducation.explorer.tasks.ExplorerTaskManager;
import com.opendigitaleducation.explorer.tasks.MigrateCronTask;
import fr.wseduc.cron.CronTrigger;
import fr.wseduc.webutils.data.FileResolver;
import io.vertx.core.CompositeFuture;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
Expand Down Expand Up @@ -86,136 +87,133 @@ public Future<Void> initExplorer() {
if (runjobInWroker && enablePgBus) {
IPostgresClient.initPostgresConsumer(vertx, config, poolMode);
}*/
final IPostgresClient postgresClient;
try {
postgresClient = IPostgresClient.create(vertx, config, false, poolMode);
} catch (Exception e) {
return Future.failedFuture(e);
}
//create es client
final ElasticClientManager elasticClientManager;
try {
elasticClientManager = ElasticClientManager.create(vertx, config);
} catch (Exception e) {
return Future.failedFuture(e);
}
//set skip folder
return IPostgresClient.create(vertx, config, false, poolMode).map(postgresClient -> {

//create es client
final ElasticClientManager elasticClientManager;
try {
elasticClientManager = ElasticClientManager.create(vertx, config);
} catch (Exception e) {
return Future.failedFuture(e);
}
//set skip folder
ExplorerConfig.getInstance().setSkipIndexOfTrashedFolders(config.getBoolean(DELETE_FOLDER_CONFIG, DELETE_FOLDER_CONFIG_DEFAULT));
//init indexes
ExplorerConfig.getInstance().setEsPrefix(config.getString("index-prefix", ExplorerConfig.DEFAULT_RESOURCE_INDEX));
ExplorerConfig.getInstance().setEsIndexes(config.getJsonObject("indexes", new JsonObject()));
final Set<String> apps = config.getJsonArray("applications").stream().map(Object::toString).collect(Collectors.toSet());
if (config.getBoolean("create-index", true)) {
//create elastic schema if needed
final Buffer mappingRes = vertx.fileSystem().readFileBlocking("es/mappingResource.json");
//create custom indexs
final Set<String> customApps = ExplorerConfig.getInstance().getApplications();
for (final String app : customApps) {
if (!ExplorerConfig.FOLDER_APPLICATION.equals(app)) {
final String index = ExplorerConfig.getInstance().getIndex(app);
final Future future = elasticClientManager.getClient().createMapping(index, mappingRes);
futures.add(future);
log.info("Creating ES Resource Mapping for application : " + app + " -> using index" + index);
}
//create elastic schema if needed
final Buffer mappingRes = vertx.fileSystem().readFileBlocking(FileResolver.absolutePath("es/mappingResource.json"));
//create custom indexs
final Set<String> customApps = ExplorerConfig.getInstance().getApplications();
for (final String app : customApps) {
if (!ExplorerConfig.FOLDER_APPLICATION.equals(app)) {
final String index = ExplorerConfig.getInstance().getIndex(app);
final Future future = elasticClientManager.getClient().createMapping(index, mappingRes);
futures.add(future);
log.info("Creating ES Resource Mapping for application : " + app + " -> using index" + index);
}
//create default index apps
for (final String app : apps) {
if (!customApps.contains(app) && !ExplorerConfig.FOLDER_APPLICATION.equals(app)) {
final String index = ExplorerConfig.getInstance().getIndex(app);
final Future future = elasticClientManager.getClient().createMapping(index, mappingRes);
futures.add(future);
log.info("Creating ES Resource Mapping for application : " + app + " -> using index" + index);
}
}
//create default index apps
for (final String app : apps) {
if (!customApps.contains(app) && !ExplorerConfig.FOLDER_APPLICATION.equals(app)) {
final String index = ExplorerConfig.getInstance().getIndex(app);
final Future future = elasticClientManager.getClient().createMapping(index, mappingRes);
futures.add(future);
log.info("Creating ES Resource Mapping for application : " + app + " -> using index" + index);
}
//create mapping folder
final Buffer mappingFolder = vertx.fileSystem().readFileBlocking("es/mappingFolder.json");
final String index = ExplorerConfig.getInstance().getIndex(ExplorerConfig.FOLDER_APPLICATION);
final Future future = elasticClientManager.getClient().createMapping(index, mappingFolder);
log.info("Creating ES Resource Folder using index" + index);
futures.add(future);
}
//create mapping folder
final Buffer mappingFolder = vertx.fileSystem().readFileBlocking(FileResolver.absolutePath("es/mappingFolder.json"));
final String index = ExplorerConfig.getInstance().getIndex(ExplorerConfig.FOLDER_APPLICATION);
final Future future = elasticClientManager.getClient().createMapping(index, mappingFolder);
log.info("Creating ES Resource Folder using index" + index);
futures.add(future);

futures.add(createUpsertScript(config.getString("upsert-resource-script", "explorer-upsert-ressource"), elasticClientManager));
futures.add(createUpsertScript(config.getString("upsert-resource-script", "explorer-upsert-ressource"), elasticClientManager));
}
//create resources service
return FolderExplorerPlugin.create().compose(folderPlugin -> {
final ShareTableManager shareTableManager = new DefaultShareTableManager();
final IExplorerPluginCommunication communication = folderPlugin.getCommunication();
final MuteService muteService = new DefaultMuteService(vertx, new ResourceExplorerDbSql(postgresClient));
final ResourceService resourceService = new ResourceServiceElastic(elasticClientManager, shareTableManager, communication, postgresClient, muteService);
//create folder service
final FolderService folderService = new FolderServiceElastic(elasticClientManager, folderPlugin, resourceService);
//create controller
final ExplorerController explorerController = new ExplorerController(folderService, resourceService);
addController(explorerController);
addController(new MuteController(muteService));
//configure filter
AbstractFilter.setResourceService(resourceService);
AbstractFilter.setFolderService(folderService);
//deploy ingest worker
folderPlugin.start();
if (config.getBoolean("enable-job", true)) {
final Promise<String> onWorkerDeploy = Promise.promise();
final DeploymentOptions dep = new DeploymentOptions().setConfig(config);
return FolderExplorerPlugin.create().compose(folderPlugin -> {
final ShareTableManager shareTableManager = new DefaultShareTableManager();
final IExplorerPluginCommunication communication = folderPlugin.getCommunication();
final MuteService muteService = new DefaultMuteService(vertx, new ResourceExplorerDbSql(postgresClient));
final ResourceService resourceService = new ResourceServiceElastic(elasticClientManager, shareTableManager, communication, postgresClient, muteService);
//create folder service
final FolderService folderService = new FolderServiceElastic(elasticClientManager, folderPlugin, resourceService);
//create controller
final ExplorerController explorerController = new ExplorerController(folderService, resourceService);
addController(explorerController);
addController(new MuteController(muteService));
//configure filter
AbstractFilter.setResourceService(resourceService);
AbstractFilter.setFolderService(folderService);
//deploy ingest worker
folderPlugin.start();
if (config.getBoolean("enable-job", true)) {
final Promise<String> onWorkerDeploy = Promise.promise();
final DeploymentOptions dep = new DeploymentOptions().setConfig(config);
/*if (runjobInWroker) {
dep.setWorker(true).setWorkerPoolName("ingestjob").setWorkerPoolSize(config.getInteger("pool-size", 1));
}*/
vertx.deployVerticle(new IngestJobWorker(), dep, onWorkerDeploy);
futures.add(onWorkerDeploy.future());
if (ExplorerConfig.getInstance().skipIndexOfTrashedFolders) {
try {
this.taskManager = Optional.of(new ExplorerTaskManager().start(vertx, config, postgresClient));
} catch (ParseException e) {
return Future.failedFuture(e);
vertx.deployVerticle(new IngestJobWorker(), dep, onWorkerDeploy);
futures.add(onWorkerDeploy.future());
if (ExplorerConfig.getInstance().skipIndexOfTrashedFolders) {
try {
this.taskManager = Optional.of(new ExplorerTaskManager().start(vertx, config, postgresClient));
} catch (ParseException e) {
return Future.failedFuture(e);
}
}
}
}
try {
final JsonObject migrateCron = config.getJsonObject("migrate-task", new JsonObject());
final String cron = migrateCron.getString("cron");
final boolean enabled = migrateCron.getBoolean("enabled", false);
if (enabled && cron != null) {
log.info("Migrate task is enabled using cron: "+ cron);
final boolean dropBefore = migrateCron.getBoolean("drop-before-migrate", true);
final boolean migrateOldFolder = migrateCron.getBoolean("migrate-old-folder", true);
final boolean migrateNewFolder = migrateCron.getBoolean("migrate-new-folder", true);
new CronTrigger(vertx, cron).schedule(new MigrateCronTask(vertx, resourceService, apps, dropBefore, migrateOldFolder, migrateNewFolder, emptySet()));
}else {
log.info("Migrate task is disabled");
}
} catch (ParseException e) {
log.error("Failed to start migrate cron.");
}
//call start promise
final Promise<Void> returnPromise = Promise.promise();
CompositeFuture.all(futures).onComplete(e -> {
log.info("Explorer application started -> " + e.succeeded());
if (e.failed()) {
log.error("Explorer application failed to start", e.cause());
returnPromise.tryFail(e.cause());
} else {
returnPromise.tryComplete();
}
});
vertx.eventBus().consumer("explorer.resources.details", message -> {
try {
final ExplorerResourceDetailsQuery query = ((JsonObject)message.body()).mapTo(ExplorerResourceDetailsQuery.class);
resourceService.findResourceDetails(query)
.onSuccess(details -> message.reply(JsonObject.mapFrom(details)))
.onFailure(th -> {
log.error("An error occurred while fetching resource details query", th);
message.fail(500, th.getMessage());
});
} catch (Exception e) {
log.error("An error occurred while treating a resource details query", e);
message.fail(500, e.getMessage());
final JsonObject migrateCron = config.getJsonObject("migrate-task", new JsonObject());
final String cron = migrateCron.getString("cron");
final boolean enabled = migrateCron.getBoolean("enabled", false);
if (enabled && cron != null) {
log.info("Migrate task is enabled using cron: "+ cron);
final boolean dropBefore = migrateCron.getBoolean("drop-before-migrate", true);
final boolean migrateOldFolder = migrateCron.getBoolean("migrate-old-folder", true);
final boolean migrateNewFolder = migrateCron.getBoolean("migrate-new-folder", true);
new CronTrigger(vertx, cron).schedule(new MigrateCronTask(vertx, resourceService, apps, dropBefore, migrateOldFolder, migrateNewFolder, emptySet()));
}else {
log.info("Migrate task is disabled");
}
} catch (ParseException e) {
log.error("Failed to start migrate cron.");
}
//call start promise
final Promise<Void> returnPromise = Promise.promise();
CompositeFuture.all(futures).onComplete(e -> {
log.info("Explorer application started -> " + e.succeeded());
if (e.failed()) {
log.error("Explorer application failed to start", e.cause());
returnPromise.tryFail(e.cause());
} else {
returnPromise.tryComplete();
}
});
vertx.eventBus().consumer("explorer.resources.details", message -> {
try {
final ExplorerResourceDetailsQuery query = ((JsonObject)message.body()).mapTo(ExplorerResourceDetailsQuery.class);
resourceService.findResourceDetails(query)
.onSuccess(details -> message.reply(JsonObject.mapFrom(details)))
.onFailure(th -> {
log.error("An error occurred while fetching resource details query", th);
message.fail(500, th.getMessage());
});
} catch (Exception e) {
log.error("An error occurred while treating a resource details query", e);
message.fail(500, e.getMessage());
}
});
return returnPromise.future();
});
return returnPromise.future();
});
}).mapEmpty();
}

private Future createUpsertScript(final String scriptId, final ElasticClientManager elasticClientManager) {
final Buffer upsertScript = vertx.fileSystem().readFileBlocking("es/upsertScript.json");
final Buffer upsertScript = vertx.fileSystem().readFileBlocking(FileResolver.absolutePath("es/upsertScript.json"));
final Future future = elasticClientManager.getClient().storeScript(scriptId, upsertScript);
log.info("Creating Resource upsert script " + scriptId);
return future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
import java.util.ArrayList;
import java.util.List;

import static io.vertx.core.Future.succeededFuture;

public class IngestJobWorker extends AbstractVerticle {
static Logger log = LoggerFactory.getLogger(Explorer.class);
private IngestJob job;
Expand All @@ -26,7 +24,7 @@ public void start(final Promise<Void> startPromise) throws Exception {
final Promise<Void> promise = Promise.promise();
super.start(promise);
promise.future()
.compose(init -> SharedDataHelper.getInstance().getMulti("server", "metricsOptions")
.compose(init -> SharedDataHelper.getInstance().getLocalMulti("server", "metricsOptions")
.onSuccess(ingestJobMap -> {
try {
initIngestJobWorker(startPromise, ingestJobMap);
Expand All @@ -44,21 +42,26 @@ public void initIngestJobWorker(final Promise<Void> startPromise, final java.uti
final boolean runjobInWroker = config().getBoolean("worker-job", true);
final boolean poolMode = config().getBoolean("postgres-pool-mode", true);
final boolean enablePgBus = config().getBoolean("postgres-enable-bus", true);
final IPostgresClient postgresClient = IPostgresClient.create(vertx, config(), runjobInWroker && enablePgBus, poolMode);
//create ingest job
final JsonObject ingestConfig = config().getJsonObject("ingest");
MessageReader.create(vertx, config(), ingestConfig).onSuccess(reader -> {
final IngestJobMetricsRecorder metricsRecorder = IngestJobMetricsRecorderFactory.getIngestJobMetricsRecorder();
final MessageIngester ingester = MessageIngester.elasticWithPgBackup(elasticClientManager, postgresClient, metricsRecorder, config());
log.info("Starting ingest job worker. pgBusEnabled="+enablePgBus+ " workerJobEnabled="+runjobInWroker+ " pgPoolEnabled="+poolMode);
job = new IngestJob(vertx, reader, ingester, metricsRecorder, ingestConfig);
final List<Future> futures = new ArrayList<>();
futures.add(job.start());
//call start promise
CompositeFuture.all(futures).onComplete(e->{
log.info("Ingest job started -> "+e.succeeded());
startPromise.handle(e.mapEmpty());
});
IPostgresClient.create(vertx, config(), runjobInWroker && enablePgBus, poolMode).onSuccess(postgresClient -> {
//create ingest job
final JsonObject ingestConfig = config().getJsonObject("ingest");
try {
MessageReader.create(vertx, config(), ingestConfig).onSuccess(reader -> {
final IngestJobMetricsRecorder metricsRecorder = IngestJobMetricsRecorderFactory.getIngestJobMetricsRecorder();
final MessageIngester ingester = MessageIngester.elasticWithPgBackup(elasticClientManager, postgresClient, metricsRecorder, config());
log.info("Starting ingest job worker. pgBusEnabled=" + enablePgBus + " workerJobEnabled=" + runjobInWroker + " pgPoolEnabled=" + poolMode);
job = new IngestJob(vertx, reader, ingester, metricsRecorder, ingestConfig);
final List<Future> futures = new ArrayList<>();
futures.add(job.start());
//call start promise
CompositeFuture.all(futures).onComplete(e -> {
log.info("Ingest job started -> " + e.succeeded());
startPromise.handle(e.mapEmpty());
});
}).onFailure(startPromise::fail);
} catch (Exception e) {
startPromise.fail(e);
}
}).onFailure(startPromise::fail);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import java.util.List;
import java.util.function.Function;

import static io.vertx.core.Future.succeededFuture;

public interface MessageReader {


Expand All @@ -22,8 +20,8 @@ static Future<MessageReader> create(final Vertx vertx, final JsonObject config,
if(config.getString("stream", "redis").equalsIgnoreCase("redis")){
return RedisClient.create(vertx, ExplorerPluginFactory.getRedisConfig()).map(redis -> redis(vertx, redis, ingestConfig));
}else{
final IPostgresClient postgres = IPostgresClient.create(vertx, ExplorerPluginFactory.getPostgresConfig(), true, false);
return succeededFuture(postgres(postgres, ingestConfig));
return IPostgresClient.create(vertx, ExplorerPluginFactory.getPostgresConfig(), true, false)
.map(postgres -> postgres(postgres, ingestConfig));
}
}

Expand Down
Loading
Loading