diff --git a/indexer/src/main/java/au/org/aodn/esindexer/service/ElasticSearchIndexService.java b/indexer/src/main/java/au/org/aodn/esindexer/service/ElasticSearchIndexService.java index 484967d9..d6a2a24c 100644 --- a/indexer/src/main/java/au/org/aodn/esindexer/service/ElasticSearchIndexService.java +++ b/indexer/src/main/java/au/org/aodn/esindexer/service/ElasticSearchIndexService.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.io.InputStream; +import java.util.List; import java.util.Set; @@ -91,6 +92,66 @@ public void createIndexFromMappingJSONFile(String indexMappingFile, String index } } + public List getAllIndicesWithPrefix(String baseIndexName) { + try { + GetIndexRequest getIndexRequest = GetIndexRequest.of(b -> b.index(baseIndexName + "*")); + GetIndexResponse getIndexResponse = portalElasticsearchClient.indices().get(getIndexRequest); + Set indexNames = getIndexResponse.result().keySet(); + return indexNames.stream().toList(); + } catch (ElasticsearchException | IOException e) { + throw new IndexNotFoundException("Failed to get indices with prefix: " + baseIndexName + " | " + e.getMessage()); + } + } + + /** + * index name is in this format: baseIndexName_vX where X is the version number + * @param existingIndices list of existing index names + * @return next available version number + */ + public int getAvailableVersionNumber(List existingIndices) { + int maxVersion = 0; + for (String indexName : existingIndices) { + String[] parts = indexName.split("_v"); + if (parts.length != 2) { + log.warn("Index name: {} does not follow the expected format", indexName); + continue; + } + try { + int version = Integer.parseInt(parts[1]); + if (version > maxVersion) { + maxVersion = version; + } + } catch (NumberFormatException e) { + log.warn("Index name: {} has invalid version format", indexName); + } + } + return maxVersion + 1; + } + + public String getAvailableVersionedIndexName(String baseIndexName) { + List existingIndices = this.getAllIndicesWithPrefix(baseIndexName); + int nextVersion = this.getAvailableVersionNumber(existingIndices); + return baseIndexName + "_v" + nextVersion; + } + + public void switchAliasToNewIndex(String alias, String newIndexName) { + try { + log.info("Switching alias: {} to point to new index: {}", alias, newIndexName); + portalElasticsearchClient.indices().updateAliases(ua -> ua + .actions(a -> a + .remove(r -> r.alias(alias).index("*")) + ) + .actions(a -> a + .add(ad -> ad.alias(alias).index(newIndexName)) + ) + ); + log.info("Alias: {} now points to index: {}", alias, newIndexName); + } catch (ElasticsearchException | IOException e) { + throw new IndexNotFoundException("Failed to switch alias: " + alias + " to new index: " + newIndexName + " | " + e.getMessage()); + } + } + + public long getDocumentsCount(String indexName) { try { return portalElasticsearchClient.count(s -> s diff --git a/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerMetadataServiceImpl.java b/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerMetadataServiceImpl.java index b06bf5e1..edf22bb7 100644 --- a/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerMetadataServiceImpl.java +++ b/indexer/src/main/java/au/org/aodn/esindexer/service/IndexerMetadataServiceImpl.java @@ -1,10 +1,7 @@ package au.org.aodn.esindexer.service; import au.org.aodn.ardcvocabs.model.VocabModel; -import au.org.aodn.ardcvocabs.service.ArdcVocabService; -import au.org.aodn.cloudoptimized.service.DataAccessService; import au.org.aodn.datadiscoveryai.service.DataDiscoveryAiService; -import au.org.aodn.datadiscoveryai.model.AiEnhancementResponse; import au.org.aodn.esindexer.configuration.AppConstants; import au.org.aodn.esindexer.exception.*; import au.org.aodn.esindexer.utils.CommonUtils; @@ -42,7 +39,6 @@ import org.springframework.http.ResponseEntity; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; -import org.springframework.web.client.HttpServerErrorException; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -157,9 +153,13 @@ public boolean isMetadataPublished(String uuid) { } protected Set extractTokensFromDescription(String description) throws IOException { + return extractTokensFromDescription(description, indexName); + } + + protected Set extractTokensFromDescription(String description, String targetIndexName) throws IOException { Set results = new HashSet<>(); - AnalyzeRequest request = AnalyzeRequest.of(ar -> ar.index(indexName).analyzer(tokensAnalyserName).text(description)); + AnalyzeRequest request = AnalyzeRequest.of(ar -> ar.index(targetIndexName).analyzer(tokensAnalyserName).text(description)); AnalyzeResponse response = portalElasticsearchClient.indices().analyze(request); for (AnalyzeToken token : response.tokens()) { @@ -174,6 +174,10 @@ protected Set extractTokensFromDescription(String description) throws IO } protected StacCollectionModel getMappedMetadataValues(String metadataValues) throws IOException, FactoryException, TransformException, JAXBException { + return getMappedMetadataValues(metadataValues, indexName); + } + + protected StacCollectionModel getMappedMetadataValues(String metadataValues, String targetIndexName) throws IOException, FactoryException, TransformException, JAXBException { MDMetadataType metadataType = jaxbUtils.unmarshal(metadataValues); // Step 1: Pure mapping (XML -> STAC) @@ -241,7 +245,7 @@ protected StacCollectionModel getMappedMetadataValues(String metadataValues) thr // search_as_you_type enabled fields can be extended SearchSuggestionsModel searchSuggestionsModel = SearchSuggestionsModel.builder() - .abstractPhrases(this.extractTokensFromDescription(stacCollectionModel.getDescription())) + .abstractPhrases(this.extractTokensFromDescription(stacCollectionModel.getDescription(), targetIndexName)) .parameterVocabs(stacCollectionModel.getSummaries().getParameterVocabs()) .platformVocabs(stacCollectionModel.getSummaries().getPlatformVocabs()) .organisationVocabs(stacCollectionModel.getSummaries().getOrganisationVocabs()) @@ -366,6 +370,8 @@ public ResponseEntity deleteDocumentByUUID(String uuid) throws IOExcepti public List indexAllMetadataRecordsFromGeoNetwork( String beginWithUuid, boolean confirm, final Callback callback) { + final String versionedIndexName = beginWithUuid != null? indexName : elasticSearchIndexService.getAvailableVersionedIndexName(indexName); + if (!confirm) { throw new IndexAllRequestNotConfirmedException("Please confirm that you want to index all metadata records from GeoNetwork"); } @@ -373,7 +379,7 @@ public List indexAllMetadataRecordsFromGeoNetwork( if(beginWithUuid == null) { log.info("Indexing all metadata records from GeoNetwork"); // recreate index from mapping JSON file - elasticSearchIndexService.createIndexFromMappingJSONFile(AppConstants.PORTAL_RECORDS_MAPPING_JSON_FILE, indexName); + elasticSearchIndexService.createIndexFromMappingJSONFile(AppConstants.PORTAL_RECORDS_MAPPING_JSON_FILE, versionedIndexName); } else { log.info("Resume indexing records from GeoNetwork at {}", beginWithUuid); @@ -383,7 +389,8 @@ public List indexAllMetadataRecordsFromGeoNetwork( { try { return Optional.of(this.getMappedMetadataValues( - geoNetworkResourceService.searchRecordBy(item.id()) + geoNetworkResourceService.searchRecordBy(item.id()), + versionedIndexName )); } catch (IOException | FactoryException | TransformException | JAXBException e) { return Optional.empty(); @@ -391,7 +398,7 @@ public List indexAllMetadataRecordsFromGeoNetwork( }; List results = new ArrayList<>(); - BulkRequestProcessor bulkRequestProcessor = new BulkRequestProcessor<>(indexName, mapper, self, callback); + BulkRequestProcessor bulkRequestProcessor = new BulkRequestProcessor<>(versionedIndexName, mapper, self, callback); // We need to keep sending messages to client to avoid timeout on long processing ExecutorService executor = Executors.newFixedThreadPool(2); @@ -403,7 +410,7 @@ public List indexAllMetadataRecordsFromGeoNetwork( final CountDownLatch countDown = new CountDownLatch(1); Callable task = () -> { try { - return this.getMappedMetadataValues(metadataRecord); + return this.getMappedMetadataValues(metadataRecord, versionedIndexName); } catch (FactoryException | JAXBException | TransformException | NullPointerException e) { /* @@ -462,7 +469,7 @@ public List indexAllMetadataRecordsFromGeoNetwork( // TODO now processing for record_suggestions index - log.info("Finished execute bulk indexing records to index: {}",indexName); + log.info("Finished execute bulk indexing records to index: {}",versionedIndexName); } catch(Exception e) { log.error("Failed", e); @@ -478,6 +485,55 @@ public List indexAllMetadataRecordsFromGeoNetwork( finally { executor.shutdown(); } + + //After indexing, if there is a same-name index, delete it + checkAndDelete(indexName, versionedIndexName); + + // switch alias to point to the new index + if (beginWithUuid == null) { + elasticSearchIndexService.switchAliasToNewIndex(indexName, versionedIndexName); + log.info("Alias: {} switched to point to index: {}", indexName, versionedIndexName); + } + return results; } + + /** + * This method only for smoothly swapping from non-alias index to alias-based index. + * If alias already working properly in all edge, staging and prod, this method is not needed and can be removed later. + * @param alias + * @param versionedIndexName + */ + public void checkAndDelete(String alias ,String versionedIndexName) { + try { + // First determine if the provided name is an alias. If it is an alias, do NOT delete. + boolean isAlias = false; + try { + var getAliasResp = portalElasticsearchClient.indices().getAlias(g -> g.name(alias)); + if (getAliasResp.result() != null && !getAliasResp.result().isEmpty()) { + isAlias = true; + log.info("Provided name '{}' resolves to an alias pointing to concrete indices: {}. Skipping delete.", alias, getAliasResp.result().keySet()); + } + } catch (ElasticsearchException | IOException e) { + // If alias lookup fails or returns not found, treat as not an alias and continue to index existence check. + log.debug("Alias lookup for '{}' failed or not found: {}", alias, e.getMessage()); + } + + if (isAlias) { + return; // do not attempt to delete when the name is an alias + } + + // Fallback: check if a concrete index with the same name exists, and delete it if appropriate + var hasSameNameIndex = portalElasticsearchClient.indices() + .exists( e -> e.index(alias)) + .value(); + + if (hasSameNameIndex && !alias.equals(versionedIndexName)) { + elasticSearchIndexService.deleteIndexStore(alias); + } + + } catch (ElasticsearchException | IOException e) { + log.error("Failed to check/delete same name index: {} | {}", alias, e.getMessage()); + } + } } diff --git a/indexer/src/test/java/au/org/aodn/esindexer/service/IndexerServiceIT.java b/indexer/src/test/java/au/org/aodn/esindexer/service/IndexerServiceIT.java index 2acb1259..1b607a3e 100644 --- a/indexer/src/test/java/au/org/aodn/esindexer/service/IndexerServiceIT.java +++ b/indexer/src/test/java/au/org/aodn/esindexer/service/IndexerServiceIT.java @@ -172,6 +172,37 @@ public void verifyGetDocumentByUUID() throws IOException { } } + @Test + public void verifyIndexAlias() throws IOException { + String uuid = "7709f541-fc0c-4318-b5b9-9053aa474e0e"; + try { + + insertMetadataRecords(uuid, "classpath:canned/sample4.xml"); + + indexerService.indexAllMetadataRecordsFromGeoNetwork(null, true, null); + + + var indices = elasticSearchIndexService.getAllIndicesWithPrefix(INDEX_NAME); + + //assert there is an index whose name is INDEX_NAME + "-v1" + Assertions.assertTrue( + indices.stream().anyMatch( + indexName -> indexName.equals(INDEX_NAME + "_v1") + ) + ); + + // assert querying via alias works + Hit objectNodeHit = indexerService.getDocumentByUUID(uuid, INDEX_NAME); + String test = String.valueOf(Objects.requireNonNull(objectNodeHit.source())); + + + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + deleteRecord(uuid); + } + } + @Test public void verifyAssociatedRecordIndexer() throws IOException{ diff --git a/indexer/src/test/java/au/org/aodn/esindexer/service/StacCollectionMapperServiceTest.java b/indexer/src/test/java/au/org/aodn/esindexer/service/StacCollectionMapperServiceTest.java index eb72a0fe..7c1e7bd4 100644 --- a/indexer/src/test/java/au/org/aodn/esindexer/service/StacCollectionMapperServiceTest.java +++ b/indexer/src/test/java/au/org/aodn/esindexer/service/StacCollectionMapperServiceTest.java @@ -140,6 +140,7 @@ public void createIndexerService() throws IOException { // Mock the protected method to return an empty string list doReturn(Collections.emptySet()).when(indexerService).extractTokensFromDescription(anyString()); + doReturn(Collections.emptySet()).when(indexerService).extractTokensFromDescription(anyString(), anyString()); doNothing() .when(elasticSearchIndexService)