-
Notifications
You must be signed in to change notification settings - Fork 1
Feature/7802 elasticsearch alias #236
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
4d5df6e
87abec4
a6fdc9a
4da64c1
10d3b57
f7edb3c
989f2d7
69c574b
0624f6f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ | |
|
|
||
| import java.io.IOException; | ||
| import java.io.InputStream; | ||
| import java.util.List; | ||
| import java.util.Set; | ||
|
|
||
|
|
||
|
|
@@ -91,6 +92,66 @@ public void createIndexFromMappingJSONFile(String indexMappingFile, String index | |
| } | ||
| } | ||
|
|
||
| public List<String> getAllIndicesWithPrefix(String baseIndexName) { | ||
| try { | ||
| GetIndexRequest getIndexRequest = GetIndexRequest.of(b -> b.index(baseIndexName + "*")); | ||
| GetIndexResponse getIndexResponse = portalElasticsearchClient.indices().get(getIndexRequest); | ||
| Set<String> indexNames = getIndexResponse.result().keySet(); | ||
| return indexNames.stream().toList(); | ||
| } catch (ElasticsearchException | IOException e) { | ||
| throw new IndexNotFoundException("Failed to get indices with prefix: " + baseIndexName + " | " + e.getMessage()); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * index name is in this format: baseIndexName_vX where X is the version number | ||
| * @param existingIndices list of existing index names | ||
| * @return next available version number | ||
| */ | ||
| public int getAvailableVersionNumber(List<String> existingIndices) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| int maxVersion = 0; | ||
| for (String indexName : existingIndices) { | ||
| String[] parts = indexName.split("_v"); | ||
| if (parts.length != 2) { | ||
| log.warn("Index name: {} does not follow the expected format", indexName); | ||
| continue; | ||
| } | ||
| try { | ||
| int version = Integer.parseInt(parts[1]); | ||
| if (version > maxVersion) { | ||
| maxVersion = version; | ||
| } | ||
| } catch (NumberFormatException e) { | ||
| log.warn("Index name: {} has invalid version format", indexName); | ||
| } | ||
| } | ||
| return maxVersion + 1; | ||
| } | ||
|
|
||
| public String getAvailableVersionedIndexName(String baseIndexName) { | ||
| List<String> existingIndices = this.getAllIndicesWithPrefix(baseIndexName); | ||
| int nextVersion = this.getAvailableVersionNumber(existingIndices); | ||
| return baseIndexName + "_v" + nextVersion; | ||
| } | ||
|
|
||
| public void switchAliasToNewIndex(String alias, String newIndexName) { | ||
| try { | ||
| log.info("Switching alias: {} to point to new index: {}", alias, newIndexName); | ||
| portalElasticsearchClient.indices().updateAliases(ua -> ua | ||
| .actions(a -> a | ||
| .remove(r -> r.alias(alias).index("*")) | ||
| ) | ||
| .actions(a -> a | ||
| .add(ad -> ad.alias(alias).index(newIndexName)) | ||
| ) | ||
| ); | ||
| log.info("Alias: {} now points to index: {}", alias, newIndexName); | ||
| } catch (ElasticsearchException | IOException e) { | ||
| throw new IndexNotFoundException("Failed to switch alias: " + alias + " to new index: " + newIndexName + " | " + e.getMessage()); | ||
| } | ||
| } | ||
|
|
||
|
|
||
| public long getDocumentsCount(String indexName) { | ||
| try { | ||
| return portalElasticsearchClient.count(s -> s | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,10 +1,7 @@ | ||
| package au.org.aodn.esindexer.service; | ||
|
|
||
| import au.org.aodn.ardcvocabs.model.VocabModel; | ||
| import au.org.aodn.ardcvocabs.service.ArdcVocabService; | ||
| import au.org.aodn.cloudoptimized.service.DataAccessService; | ||
| import au.org.aodn.datadiscoveryai.service.DataDiscoveryAiService; | ||
| import au.org.aodn.datadiscoveryai.model.AiEnhancementResponse; | ||
| import au.org.aodn.esindexer.configuration.AppConstants; | ||
| import au.org.aodn.esindexer.exception.*; | ||
| import au.org.aodn.esindexer.utils.CommonUtils; | ||
|
|
@@ -42,7 +39,6 @@ | |
| import org.springframework.http.ResponseEntity; | ||
| import org.springframework.scheduling.annotation.Async; | ||
| import org.springframework.stereotype.Service; | ||
| import org.springframework.web.client.HttpServerErrorException; | ||
|
|
||
| import java.io.ByteArrayInputStream; | ||
| import java.io.IOException; | ||
|
|
@@ -157,9 +153,13 @@ public boolean isMetadataPublished(String uuid) { | |
| } | ||
|
|
||
| protected Set<String> extractTokensFromDescription(String description) throws IOException { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function isn't in use except test, is that we can delete it and my the test case?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i added an overload function of this function, so it made this function not in use. Yes i will remove it with its tests. |
||
| return extractTokensFromDescription(description, indexName); | ||
| } | ||
|
|
||
| protected Set<String> extractTokensFromDescription(String description, String targetIndexName) throws IOException { | ||
| Set<String> results = new HashSet<>(); | ||
|
|
||
| AnalyzeRequest request = AnalyzeRequest.of(ar -> ar.index(indexName).analyzer(tokensAnalyserName).text(description)); | ||
| AnalyzeRequest request = AnalyzeRequest.of(ar -> ar.index(targetIndexName).analyzer(tokensAnalyserName).text(description)); | ||
| AnalyzeResponse response = portalElasticsearchClient.indices().analyze(request); | ||
|
|
||
| for (AnalyzeToken token : response.tokens()) { | ||
|
|
@@ -174,6 +174,10 @@ protected Set<String> extractTokensFromDescription(String description) throws IO | |
| } | ||
|
|
||
| protected StacCollectionModel getMappedMetadataValues(String metadataValues) throws IOException, FactoryException, TransformException, JAXBException { | ||
| return getMappedMetadataValues(metadataValues, indexName); | ||
| } | ||
|
|
||
| protected StacCollectionModel getMappedMetadataValues(String metadataValues, String targetIndexName) throws IOException, FactoryException, TransformException, JAXBException { | ||
| MDMetadataType metadataType = jaxbUtils.unmarshal(metadataValues); | ||
|
|
||
| // Step 1: Pure mapping (XML -> STAC) | ||
|
|
@@ -241,7 +245,7 @@ protected StacCollectionModel getMappedMetadataValues(String metadataValues) thr | |
|
|
||
| // search_as_you_type enabled fields can be extended | ||
| SearchSuggestionsModel searchSuggestionsModel = SearchSuggestionsModel.builder() | ||
| .abstractPhrases(this.extractTokensFromDescription(stacCollectionModel.getDescription())) | ||
| .abstractPhrases(this.extractTokensFromDescription(stacCollectionModel.getDescription(), targetIndexName)) | ||
| .parameterVocabs(stacCollectionModel.getSummaries().getParameterVocabs()) | ||
| .platformVocabs(stacCollectionModel.getSummaries().getPlatformVocabs()) | ||
| .organisationVocabs(stacCollectionModel.getSummaries().getOrganisationVocabs()) | ||
|
|
@@ -366,14 +370,16 @@ public ResponseEntity<String> deleteDocumentByUUID(String uuid) throws IOExcepti | |
| public List<BulkResponse> indexAllMetadataRecordsFromGeoNetwork( | ||
| String beginWithUuid, boolean confirm, final Callback callback) { | ||
|
|
||
| final String versionedIndexName = beginWithUuid != null? indexName : elasticSearchIndexService.getAvailableVersionedIndexName(indexName); | ||
|
|
||
| if (!confirm) { | ||
| throw new IndexAllRequestNotConfirmedException("Please confirm that you want to index all metadata records from GeoNetwork"); | ||
| } | ||
|
|
||
| if(beginWithUuid == null) { | ||
| log.info("Indexing all metadata records from GeoNetwork"); | ||
| // recreate index from mapping JSON file | ||
| elasticSearchIndexService.createIndexFromMappingJSONFile(AppConstants.PORTAL_RECORDS_MAPPING_JSON_FILE, indexName); | ||
| elasticSearchIndexService.createIndexFromMappingJSONFile(AppConstants.PORTAL_RECORDS_MAPPING_JSON_FILE, versionedIndexName); | ||
| } | ||
| else { | ||
| log.info("Resume indexing records from GeoNetwork at {}", beginWithUuid); | ||
|
|
@@ -383,15 +389,16 @@ public List<BulkResponse> indexAllMetadataRecordsFromGeoNetwork( | |
| { | ||
| try { | ||
| return Optional.of(this.getMappedMetadataValues( | ||
| geoNetworkResourceService.searchRecordBy(item.id()) | ||
| geoNetworkResourceService.searchRecordBy(item.id()), | ||
| versionedIndexName | ||
| )); | ||
| } catch (IOException | FactoryException | TransformException | JAXBException e) { | ||
| return Optional.empty(); | ||
| } | ||
| }; | ||
|
|
||
| List<BulkResponse> results = new ArrayList<>(); | ||
| BulkRequestProcessor<StacCollectionModel> bulkRequestProcessor = new BulkRequestProcessor<>(indexName, mapper, self, callback); | ||
| BulkRequestProcessor<StacCollectionModel> bulkRequestProcessor = new BulkRequestProcessor<>(versionedIndexName, mapper, self, callback); | ||
|
|
||
| // We need to keep sending messages to client to avoid timeout on long processing | ||
| ExecutorService executor = Executors.newFixedThreadPool(2); | ||
|
|
@@ -403,7 +410,7 @@ public List<BulkResponse> indexAllMetadataRecordsFromGeoNetwork( | |
| final CountDownLatch countDown = new CountDownLatch(1); | ||
| Callable<StacCollectionModel> task = () -> { | ||
| try { | ||
| return this.getMappedMetadataValues(metadataRecord); | ||
| return this.getMappedMetadataValues(metadataRecord, versionedIndexName); | ||
| } | ||
| catch (FactoryException | JAXBException | TransformException | NullPointerException e) { | ||
| /* | ||
|
|
@@ -462,7 +469,7 @@ public List<BulkResponse> indexAllMetadataRecordsFromGeoNetwork( | |
|
|
||
|
|
||
| // TODO now processing for record_suggestions index | ||
| log.info("Finished execute bulk indexing records to index: {}",indexName); | ||
| log.info("Finished execute bulk indexing records to index: {}",versionedIndexName); | ||
| } | ||
| catch(Exception e) { | ||
| log.error("Failed", e); | ||
|
|
@@ -478,6 +485,55 @@ public List<BulkResponse> indexAllMetadataRecordsFromGeoNetwork( | |
| finally { | ||
| executor.shutdown(); | ||
| } | ||
|
|
||
| //After indexing, if there is a same-name index, delete it | ||
| checkAndDelete(indexName, versionedIndexName); | ||
|
|
||
| // switch alias to point to the new index | ||
| if (beginWithUuid == null) { | ||
| elasticSearchIndexService.switchAliasToNewIndex(indexName, versionedIndexName); | ||
| log.info("Alias: {} switched to point to index: {}", indexName, versionedIndexName); | ||
| } | ||
|
|
||
| return results; | ||
| } | ||
|
|
||
| /** | ||
| * This method only for smoothly swapping from non-alias index to alias-based index. | ||
| * If alias already working properly in all edge, staging and prod, this method is not needed and can be removed later. | ||
| * @param alias | ||
| * @param versionedIndexName | ||
| */ | ||
| public void checkAndDelete(String alias ,String versionedIndexName) { | ||
| try { | ||
| // First determine if the provided name is an alias. If it is an alias, do NOT delete. | ||
| boolean isAlias = false; | ||
| try { | ||
| var getAliasResp = portalElasticsearchClient.indices().getAlias(g -> g.name(alias)); | ||
| if (getAliasResp.result() != null && !getAliasResp.result().isEmpty()) { | ||
| isAlias = true; | ||
| log.info("Provided name '{}' resolves to an alias pointing to concrete indices: {}. Skipping delete.", alias, getAliasResp.result().keySet()); | ||
| } | ||
| } catch (ElasticsearchException | IOException e) { | ||
| // If alias lookup fails or returns not found, treat as not an alias and continue to index existence check. | ||
| log.debug("Alias lookup for '{}' failed or not found: {}", alias, e.getMessage()); | ||
| } | ||
|
|
||
| if (isAlias) { | ||
| return; // do not attempt to delete when the name is an alias | ||
| } | ||
|
|
||
| // Fallback: check if a concrete index with the same name exists, and delete it if appropriate | ||
| var hasSameNameIndex = portalElasticsearchClient.indices() | ||
| .exists( e -> e.index(alias)) | ||
| .value(); | ||
|
|
||
| if (hasSameNameIndex && !alias.equals(versionedIndexName)) { | ||
| elasticSearchIndexService.deleteIndexStore(alias); | ||
| } | ||
|
|
||
| } catch (ElasticsearchException | IOException e) { | ||
| log.error("Failed to check/delete same name index: {} | {}", alias, e.getMessage()); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can change to protected without issue