Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ public class AzureFileSyncConfiguration {
@Value("${sync.az-file.storage.exclude.metadata.value:Exclude}")
private String storageExcludeValue;

@Value("${sync.hidden.files.use.squashing:false}")
private boolean useSquashingForHiddenFiles;

@Value("${sync.hidden.files.squashed.name}")
private String squashedFilesName;

@Bean
public ObjectStorageFileManager azFileManager() {
return new AzureBlobManager();
Expand All @@ -78,6 +84,9 @@ public ObjectStorageIndex azFileSynchronizer(
DataStorageType.AZ,
SearchDocumentType.AZ_BLOB_FILE,
tagDelimiter, false,
storageExcludeKey, storageExcludeValue);
storageExcludeKey,
storageExcludeValue,
useSquashingForHiddenFiles,
squashedFilesName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ public class GSFileSyncConfiguration {
@Value("${sync.gs-file.storage.exclude.metadata.value:Exclude}")
private String storageExcludeValue;

@Value("${sync.hidden.files.use.squashing:false}")
private boolean useSquashingForHiddenFiles;

@Value("${sync.hidden.files.squashed.name}")
private String squashedFilesName;

@Bean
public ObjectStorageFileManager gsFileManager() {
return new GsBucketFileManager();
Expand All @@ -74,7 +80,10 @@ public ObjectStorageIndex gsFileSynchronizer(
DataStorageType.GS,
SearchDocumentType.GS_FILE,
tagDelimiter, false,
storageExcludeKey, storageExcludeValue);
storageExcludeKey,
storageExcludeValue,
useSquashingForHiddenFiles,
squashedFilesName);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public class S3FileSyncConfiguration {
private String storageExcludeKey;
@Value("${sync.s3-file.storage.exclude.metadata.value:Exclude}")
private String storageExcludeValue;
@Value("${sync.hidden.files.use.squashing:false}")
private boolean useSquashingForHiddenFiles;

@Value("${sync.hidden.files.squashed.name}")
private String squashedFilesName;

@Bean
public ObjectStorageFileManager s3FileManager() {
Expand All @@ -83,7 +88,10 @@ public ObjectStorageIndex s3FileSynchronizer(
SearchDocumentType.S3_FILE,
tagDelimiter,
includeVersions,
storageExcludeKey, storageExcludeValue);
storageExcludeKey,
storageExcludeValue,
useSquashingForHiddenFiles,
squashedFilesName);
if (StringUtils.isNotBlank(storageIds)) {
service.setStorageIds(parseIds(storageIds));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,15 @@ public NFSObserverEventSynchronizer(final @Value("${sync.nfs-file.index.mapping}
final ElasticsearchServiceClient elasticsearchServiceClient,
final ElasticIndexService elasticIndexService,
final List<ObjectStorageFileManager> objectStorageFileManagers,
final NFSStorageMounter nfsMounter) {
final NFSStorageMounter nfsMounter,
@Value("${sync.hidden.files.use.squashing:false}")
final boolean useSquashingForHiddenFiles,
@Value("${sync.hidden.files.squashed.name}")
final String squashedFilesName
) {
super(indexSettingsPath, rootMountPoint, indexPrefix, indexName, bulkInsertSize, bulkLoadTagsSize,
cloudPipelineAPIClient, elasticsearchServiceClient, elasticIndexService, nfsMounter, tagDelimiter);
cloudPipelineAPIClient, elasticsearchServiceClient, elasticIndexService, nfsMounter, tagDelimiter,
useSquashingForHiddenFiles, squashedFilesName);

this.eventsFileChunkSize = eventsFileChunkSize;
final URI eventsBucketURI = URI.create(eventsBucketUriStr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.epam.pipeline.elasticsearchagent.service.ElasticsearchSynchronizer;
import com.epam.pipeline.elasticsearchagent.service.impl.converter.storage.StorageFileMapper;
import com.epam.pipeline.elasticsearchagent.utils.ESConstants;
import com.epam.pipeline.elasticsearchagent.utils.HiddenFilesAggregator;
import com.epam.pipeline.entity.datastorage.AbstractDataStorage;
import com.epam.pipeline.entity.datastorage.DataStorageFile;
import com.epam.pipeline.entity.datastorage.DataStorageType;
Expand Down Expand Up @@ -82,8 +83,10 @@ public class NFSSynchronizer implements ElasticsearchSynchronizer {
private final ElasticIndexService elasticIndexService;
private final NFSStorageMounter nfsMounter;
private final String tagDelimiter;
private final StorageFileMapper fileMapper = new StorageFileMapper();
private final StorageFileMapper fileMapper;
protected final Map<Long, AbstractCloudRegion> cloudRegions;
private final boolean useSquashingForHiddenFiles;
private final String squashedFilesName;


public NFSSynchronizer(@Value("${sync.nfs-file.index.mapping}") String indexSettingsPath,
Expand All @@ -96,7 +99,10 @@ public NFSSynchronizer(@Value("${sync.nfs-file.index.mapping}") String indexSett
ElasticsearchServiceClient elasticsearchServiceClient,
ElasticIndexService elasticIndexService,
NFSStorageMounter nfsMounter,
@Value("${sync.nfs-file.tag.value.delimiter:;}") String tagDelimiter) {
@Value("${sync.nfs-file.tag.value.delimiter:;}") String tagDelimiter,
@Value("${sync.hidden.files.use.squashing:false}") final boolean useSquashingForHiddenFiles,
@Value("${sync.hidden.files.squashed.name}") final String squashedFilesName
) {
this.indexSettingsPath = indexSettingsPath;
this.rootMountPoint = rootMountPoint;
this.indexPrefix = indexPrefix;
Expand All @@ -111,6 +117,9 @@ public NFSSynchronizer(@Value("${sync.nfs-file.index.mapping}") String indexSett
this.cloudRegions = ListUtils.emptyIfNull(cloudPipelineAPIClient.loadAllRegions()).stream()
.map(r -> ImmutablePair.of(r.getId(), r))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
this.useSquashingForHiddenFiles = useSquashingForHiddenFiles;
this.squashedFilesName = squashedFilesName;
this.fileMapper = new StorageFileMapper(useSquashingForHiddenFiles, squashedFilesName);
}

@Override
Expand Down Expand Up @@ -181,10 +190,20 @@ private void createDocuments(final String indexName, final Path mountFolder,
final Stream<DataStorageFile> files = paths
.filter(path -> path.toFile().isFile())
.map(path -> convertToStorageFile(path, mountFolder));
HiddenFilesAggregator hiddenFilesAggregator = new HiddenFilesAggregator(this.squashedFilesName);
processFilesTagsInChunks(dataStorage, files)
.map(file -> createIndexRequest(file, indexName, dataStorage, regionCode, permissionsContainer,
findFileContent(dataStorage.getName(), file.getPath(), mountFolder.toString())))
.forEach(walker::add);
.forEach(file -> {
if (useSquashingForHiddenFiles && fileMapper.isHidden(dataStorage, file)) {
hiddenFilesAggregator.add(file);
} else {
walker.add(createIndexRequest(file, indexName, dataStorage, regionCode,
permissionsContainer, findFileContent(dataStorage.getName(), file.getPath(),
mountFolder.toString())));
}
});
Optional.ofNullable(hiddenFilesAggregator.getHiddenAggregatedFile())
.ifPresent(file -> walker.add(createIndexRequest(file, indexName, dataStorage, regionCode,
permissionsContainer, null)));
} catch (IOException e) {
throw new IllegalArgumentException("An error occurred during creating document.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.epam.pipeline.elasticsearchagent.service.impl.converter.storage.StorageFileMapper;
import com.epam.pipeline.elasticsearchagent.service.lock.LockService;
import com.epam.pipeline.elasticsearchagent.utils.ESConstants;
import com.epam.pipeline.elasticsearchagent.utils.HiddenFilesAggregator;
import com.epam.pipeline.entity.datastorage.AbstractDataStorage;
import com.epam.pipeline.entity.datastorage.AbstractDataStorageItem;
import com.epam.pipeline.entity.datastorage.DataStorageAction;
Expand All @@ -43,7 +44,6 @@
import com.epam.pipeline.vo.data.storage.DataStorageTagLoadBatchRequest;
import com.epam.pipeline.vo.data.storage.DataStorageTagLoadRequest;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
Expand Down Expand Up @@ -76,7 +76,6 @@
import static com.epam.pipeline.elasticsearchagent.utils.ESConstants.DOC_MAPPING_TYPE;
import static com.epam.pipeline.utils.PasswordGenerator.generateRandomString;

@RequiredArgsConstructor
@Slf4j
@Setter
public class ObjectStorageIndexImpl implements ObjectStorageIndex {
Expand Down Expand Up @@ -106,7 +105,49 @@ public class ObjectStorageIndexImpl implements ObjectStorageIndex {
private Set<Long> storageIds;
private Set<Long> skipStorageIds;

private final StorageFileMapper fileMapper = new StorageFileMapper();
private final Boolean useSquashingHiddenFiles;
private final String squashedHiddenFolderName;

private final StorageFileMapper fileMapper;

public ObjectStorageIndexImpl(
CloudPipelineAPIClient cloudPipelineAPIClient,
ElasticsearchServiceClient elasticsearchServiceClient,
ElasticIndexService elasticIndexService,
ObjectStorageFileManager fileManager,
LockService lockService,
String indexPrefix,
String indexMappingFile,
int bulkInsertSize,
int bulkLoadTagsSize,
DataStorageType storageType,
SearchDocumentType documentType,
String tagDelimiter,
boolean includeVersions,
String storageExcludeKey,
String storageExcludeValue,
Boolean useSquashingHiddenFiles,
String squashedHiddenFolderName) {

this.cloudPipelineAPIClient = cloudPipelineAPIClient;
this.elasticsearchServiceClient = elasticsearchServiceClient;
this.elasticIndexService = elasticIndexService;
this.fileManager = fileManager;
this.lockService = lockService;
this.indexPrefix = indexPrefix;
this.indexMappingFile = indexMappingFile;
this.bulkInsertSize = bulkInsertSize;
this.bulkLoadTagsSize = bulkLoadTagsSize;
this.storageType = storageType;
this.documentType = documentType;
this.tagDelimiter = tagDelimiter;
this.includeVersions = includeVersions;
this.storageExcludeKey = storageExcludeKey;
this.storageExcludeValue = storageExcludeValue;
this.useSquashingHiddenFiles = useSquashingHiddenFiles;
this.squashedHiddenFolderName = squashedHiddenFolderName;
this.fileMapper = new StorageFileMapper(this.useSquashingHiddenFiles, this.squashedHiddenFolderName);
}

@Override
public void synchronize(final LocalDateTime lastSyncTime, final LocalDateTime syncStart) {
Expand Down Expand Up @@ -160,11 +201,25 @@ public void indexStorage(final AbstractDataStorage dataStorage) {
final Stream<DataStorageFile> files = dataStorage.isVersioningEnabled() && includeVersions
? loadFileWithVersions(dataStorage, credentialsSupplier)
: loadFiles(dataStorage, credentialsSupplier);

HiddenFilesAggregator hiddenFilesAggregator = new HiddenFilesAggregator(squashedHiddenFolderName);

files.flatMap(file -> countRestored(restoreActions, file).stream())
.map(file -> createIndexRequest(
file, dataStorage, permissionsContainer, indexName, credentials.getRegion(),
findFileContent(dataStorage, file.getPath()))
).forEach(requestContainer::add);
.forEach(file -> {
if (useSquashingHiddenFiles && fileMapper.isHidden(dataStorage, file)) {
hiddenFilesAggregator.add(file);
} else {
IndexRequest indexRequest = createIndexRequest(file, dataStorage, permissionsContainer,
indexName, credentials.getRegion(), findFileContent(dataStorage, file.getPath()));
requestContainer.add(indexRequest);
}
});

Optional.ofNullable(hiddenFilesAggregator.getHiddenAggregatedFile())
.ifPresent(hiddenFile ->
requestContainer.add(createIndexRequest(hiddenFile, dataStorage, permissionsContainer,
indexName, credentials.getRegion(), null)));

}

finalizeIndex(alias, indexName, currentIndexName, dataStorage.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.epam.pipeline.entity.search.SearchDocumentType;
import com.epam.pipeline.entity.search.StorageFileSearchMask;
import com.epam.pipeline.utils.FileContentUtils;
import lombok.Setter;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.collections4.MapUtils;
Expand Down Expand Up @@ -61,6 +62,21 @@ public class StorageFileMapper {
private final Map<String, Set<String>> hiddenMasks = new HashMap<>();
private final Map<String, Set<String>> indexContentMasks = new HashMap<>();

@Setter
private String hiddenFolderName;
@Setter
private boolean useSquashingHiddenFiles;

public StorageFileMapper() {
this.hiddenFolderName = null;
this.useSquashingHiddenFiles = false;
}

public StorageFileMapper(boolean useSquashingHiddenFiles, String hiddenFolderName) {
this.useSquashingHiddenFiles = useSquashingHiddenFiles;
this.hiddenFolderName = hiddenFolderName;
}

public XContentBuilder fileToDocument(final DataStorageFile dataStorageFile,
final AbstractDataStorage dataStorage,
final String region,
Expand Down Expand Up @@ -197,7 +213,11 @@ public String getFileContent(final byte[] byteContent, final String filePath, fi
return null;
}

private boolean isHidden(final AbstractDataStorage dataStorage, final DataStorageFile file) {
public boolean isHidden(final AbstractDataStorage dataStorage, final DataStorageFile file) {
if (useSquashingHiddenFiles &&
!StringUtils.isEmpty(this.hiddenFolderName) && this.hiddenFolderName.equals(file.getPath())) {
return true;
}
return isMaskMatch(dataStorage.getName(), file.getPath(), hiddenMasks);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2025 EPAM Systems, Inc. (https://www.epam.com/)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.epam.pipeline.elasticsearchagent.utils;

import com.epam.pipeline.entity.datastorage.DataStorageFile;
import lombok.RequiredArgsConstructor;

import java.util.Optional;

@RequiredArgsConstructor
public class HiddenFilesAggregator {
private final String hiddenFileName;
private long totalSize = 0L;
private boolean hasHiddenFiles = false;
private DataStorageFile lastFile;

public void add(DataStorageFile file) {
this.lastFile = file;
this.hasHiddenFiles = true;
this.totalSize += Optional.ofNullable(file.getSize()).orElse(0L);
}

public DataStorageFile getHiddenAggregatedFile() {
if (!hasHiddenFiles) {
return null;
}
this.lastFile.setSize(totalSize);
this.lastFile.setPath(hiddenFileName);
this.lastFile.setName(hiddenFileName);
return this.lastFile;
}
}
3 changes: 3 additions & 0 deletions elasticsearch-agent/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ sync.run-configuration.index.name=run-configuration
sync.native.tags.transfer.disable=true
sync.native.tags.transfer.bulk.insert.size=1000

sync.hidden.files.use.squashing=true
sync.hidden.files.squashed.name=.hidden

#High availability
kube.master.pod.check.url=http://localhost:4040
kube.current.pod.name=${CP_ESA_CURRENT_POD_NAME:localhost}
Expand Down
Loading