Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ramls/raml-storage
Submodule raml-storage updated 38 files
+0 −1 examples/mod-data-import-converter-storage/actionProfile.sample
+0 −1 examples/mod-data-import-converter-storage/actionProfileCollection.sample
+0 −1 examples/mod-data-import-converter-storage/actionProfileUpdateDto.sample
+0 −1 examples/mod-data-import-converter-storage/jobProfile.sample
+0 −10 examples/mod-data-import-converter-storage/jobProfileCollection.sample
+0 −1 examples/mod-data-import-converter-storage/jobProfileUpdateDto.sample
+0 −1 examples/mod-data-import-converter-storage/mappingProfile.sample
+0 −1 examples/mod-data-import-converter-storage/mappingProfileCollection.sample
+0 −1 examples/mod-data-import-converter-storage/mappingProfileUpdateDto.sample
+0 −1 examples/mod-data-import-converter-storage/matchProfile.sample
+0 −1 examples/mod-data-import-converter-storage/matchProfileCollection.sample
+0 −1 examples/mod-data-import-converter-storage/matchProfileUpdateDto.sample
+399 −0 examples/mod-source-record-manager/incomingRecord.sample
+3 −2 examples/mod-source-record-storage/fetchParsedRecordsBatchRequest.sample
+29 −0 examples/mod-source-record-storage/recordMatchingRqDto.sample
+13 −0 examples/mod-source-record-storage/recordsIdentifiersCollection.sample
+0 −4 examples/mod-source-record-storage/sourceRecord.sample
+0 −4 examples/mod-source-record-storage/sourceRecordCollection.sample
+6 −2 schemas/common/dataImportEventTypes.json
+6 −1 schemas/dto/fetchParsedRecordsBatchRequest.json
+0 −15 schemas/dto/parsedRecordDto.json
+125 −0 schemas/dto/recordMatchingRqDto.json
+15 −3 schemas/dto/recordProcessingLogDto.json
+27 −0 schemas/dto/recordProcessingLogDtoCollection.json
+34 −0 schemas/dto/recordsIdentifiersCollection.json
+4 −6 schemas/dto/sourceRecord.json
+2 −4 schemas/dto/sourceRecordSearchRequest.json
+2 −1 schemas/dto/strippedParsedRecord.json
+2 −6 schemas/mod-data-import-converter-storage/actionProfile.json
+0 −5 schemas/mod-data-import-converter-storage/jobProfile.json
+0 −5 schemas/mod-data-import-converter-storage/mappingProfile.json
+0 −5 schemas/mod-data-import-converter-storage/matchProfile.json
+3 −0 schemas/mod-data-import-converter-storage/profileAssociation.json
+0 −12 schemas/mod-data-import-converter-storage/profileSnapshotType.json
+2 −7 schemas/mod-data-import-converter-storage/profileSnapshotWrapper.json
+4 −0 schemas/mod-data-import/dataImportQueueItem.json
+34 −0 schemas/mod-source-record-manager/incomingRecord.json
+2 −1 schemas/mod-source-record-manager/journalRecord.json
11 changes: 7 additions & 4 deletions src/main/java/org/folio/dao/DataImportQueueItemDaoImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ public class DataImportQueueItemDaoImpl implements DataImportQueueItemDao {
private static final String GET_BY_ID_SQL =
"SELECT * FROM %s.%s WHERE id = $1";
private static final String INSERT_SQL =
"INSERT INTO %s.%s (id, job_execution_id, upload_definition_id, tenant, original_size, file_path, timestamp, part_number, processing, okapi_url, data_type, okapi_token, okapi_permissions) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)";
"INSERT INTO %s.%s (id, job_execution_id, upload_definition_id, tenant, original_size, file_path, timestamp, part_number, processing, okapi_url, data_type, okapi_token, okapi_permissions, okapi_request_id) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)";
private static final String UPDATE_BY_ID_SQL =
"UPDATE %s.%s SET job_execution_id = $2, upload_definition_id = $3, tenant = $4, original_size = $5, file_path = $6, timestamp = $7, part_number = $8, processing = $9, okapi_url = $10, data_type = $11, okapi_token = $12, okapi_permissions = $13 WHERE id = $1";
"UPDATE %s.%s SET job_execution_id = $2, upload_definition_id = $3, tenant = $4, original_size = $5, file_path = $6, timestamp = $7, part_number = $8, processing = $9, okapi_url = $10, data_type = $11, okapi_token = $12, okapi_permissions = $13, okapi_request_id = $14 WHERE id = $1";
private static final String DELETE_BY_ID_SQL =
"DELETE FROM %s.%s WHERE id = $1";
private static final String DELETE_BY_JOB_ID_SQL =
Expand Down Expand Up @@ -200,7 +200,8 @@ public Future<String> addQueueItem(DataImportQueueItem dataImportQueueItem) {
dataImportQueueItem.getOkapiUrl(),
dataImportQueueItem.getDataType(),
dataImportQueueItem.getOkapiToken(),
dataImportQueueItem.getOkapiPermissions()
dataImportQueueItem.getOkapiPermissions(),
dataImportQueueItem.getOkapiRequestId()
),
promise
);
Expand Down Expand Up @@ -228,7 +229,8 @@ public Future<DataImportQueueItem> updateQueueItem(
dataImportQueueItem.getOkapiUrl(),
dataImportQueueItem.getDataType(),
dataImportQueueItem.getOkapiToken(),
dataImportQueueItem.getOkapiPermissions()
dataImportQueueItem.getOkapiPermissions(),
dataImportQueueItem.getOkapiRequestId()
);

return connection
Expand Down Expand Up @@ -324,6 +326,7 @@ private static DataImportQueueItem mapRowJsonToQueueItem(Row rowAsJson) {
queueItem.setDataType(rowAsJson.getString("data_type"));
queueItem.setOkapiToken(rowAsJson.getString("okapi_token"));
queueItem.setOkapiPermissions(rowAsJson.getString("okapi_permissions"));
queueItem.setOkapiRequestId(rowAsJson.getString("okapi_request_id"));
return queueItem;
}

Expand Down
27 changes: 19 additions & 8 deletions src/main/java/org/folio/service/file/S3JobRunningVerticle.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.folio.rest.jaxrs.model.StatusDto.ErrorStatus;
import org.folio.service.processing.ParallelFileChunkingProcessor;
import org.folio.service.processing.ranking.ScoreService;
import org.folio.service.processing.split.FileSplitUtilities;
import org.folio.service.s3storage.MinioStorageService;
import org.folio.service.upload.UploadDefinitionService;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -253,13 +254,19 @@ protected Future<QueueJob> downloadFromS3(QueueJob job) {
}

protected Future<File> createLocalFile(DataImportQueueItem queueItem) {
return vertx
.fileSystem()
.createTempFile(
"di-tmp-",
Path.of(queueItem.getFilePath()).getFileName().toString(),
"rwx------"
)
String prefix = "di-tmp-";
String suffix = Path.of(queueItem.getFilePath()).getFileName().toString();

Future<String> tempFileFuture;
if (FileSplitUtilities.isWindows()) {
// Windows doesn't support POSIX permissions - create without them
tempFileFuture = vertx.fileSystem().createTempFile(prefix, suffix);
} else {
// Unix/Linux/Mac - use POSIX permissions for security
tempFileFuture = vertx.fileSystem().createTempFile(prefix, suffix, "rwx------");
}

return tempFileFuture
.map(File::new)
.onSuccess(localFile ->
LOGGER.info("Created temporary file {}", localFile.toPath())
Expand All @@ -281,7 +288,9 @@ protected OkapiConnectionParams getConnectionParams(
XOkapiHeaders.TOKEN.toLowerCase(),
queueItem.getOkapiToken(),
XOkapiHeaders.PERMISSIONS.toLowerCase(),
queueItem.getOkapiPermissions()
queueItem.getOkapiPermissions(),
XOkapiHeaders.REQUEST_ID.toLowerCase(),
queueItem.getOkapiRequestId()
),
vertx
);
Expand All @@ -304,6 +313,8 @@ protected OkapiConnectionParams getConnectionParams(
queueItem.getOkapiToken(),
XOkapiHeaders.PERMISSIONS.toLowerCase(),
queueItem.getOkapiPermissions(),
XOkapiHeaders.REQUEST_ID.toLowerCase(),
queueItem.getOkapiRequestId(),
XOkapiHeaders.USER_ID.toLowerCase(),
userId
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,18 +304,10 @@ protected Future<Void> initializeChildren(
)
.withOkapiToken(params.getToken())
.withOkapiPermissions(
params
.getHeaders()
.entries()
.stream()
.filter(header ->
XOkapiHeaders.PERMISSIONS.equalsIgnoreCase(
header.getKey()
)
)
.findFirst()
.map(Map.Entry::getValue)
.orElse("")
getHeaderValue(params, XOkapiHeaders.PERMISSIONS)
)
.withOkapiRequestId(
getHeaderValue(params, XOkapiHeaders.REQUEST_ID)
)
);
})
Expand Down Expand Up @@ -634,6 +626,17 @@ protected String getUserIdFromMetadata(@CheckForNull Metadata metadata) {
}
}

private String getHeaderValue(OkapiConnectionParams params, String headerName) {
return params
.getHeaders()
.entries()
.stream()
.filter(header -> headerName.equalsIgnoreCase(header.getKey()))
.findFirst()
.map(Map.Entry::getValue)
.orElse("");
}

/**
* Hold information about a split file that will be needed for the creation
* of job executions, etc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,16 @@ public static int countTotalRecordsInFile(File file, JobProfileInfo jobProfile)
return 0;
}

SourceReader reader = SourceReaderBuilder.build(file, jobProfile);

int total = 0;
while (reader.hasNext()) {
total += reader.next().size();
try (SourceReader reader = SourceReaderBuilder.build(file, jobProfile)) {
int total = 0;
while (reader.hasNext()) {
total += reader.next().size();
}
return total;
} catch (Exception e) {
LOGGER.error("Error counting records in file", e);
throw new RecordsReaderException("Error counting records in file", e);
}

return total;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ private void handleEnd() {

LOGGER.debug("handleEnd:: End handler. Processing completed: {}", this);

closeReader();

Handler<Void> endHandler;
synchronized (this) {
handler = null;
Expand All @@ -248,11 +250,23 @@ private void handleEnd() {
}

private void handleException(Throwable t) {
closed = true;

closeReader();

if (exceptionHandler != null && t instanceof Exception) {
exceptionHandler.handle(t);
} else {
LOGGER.warn("handleException:: Unhandled exception:", t);
}
}

private void closeReader() {
try {
reader.close();
LOGGER.debug("closeReader:: Reader closed successfully");
} catch (Exception e) {
LOGGER.warn("closeReader:: Error closing reader", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,12 @@ public boolean hasNext() {
public RecordsMetadata.ContentType getContentType() {
return RecordsMetadata.ContentType.EDIFACT_RAW;
}

@Override
public void close() {
if (segmentsStream != null) {
segmentsStream.close();
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,15 @@ public boolean hasNext() {
public RecordsMetadata.ContentType getContentType() {
return RecordsMetadata.ContentType.MARC_JSON;
}

@Override
public void close() {
try {
if (reader != null) {
reader.close();
}
} catch (IOException e) {
LOGGER.warn("close:: Error closing reader", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.util.List;
Expand All @@ -30,14 +31,16 @@ public class MarcRawReader implements SourceReader {

private static final Charset CHARSET = Charset.forName(MODULE_SPECIFIC_ARGS.getOrDefault("file.processing.buffer.record.charset", "UTF8"));
private MarcPermissiveStreamReader reader;
private InputStream inputStream;
private int chunkSize;
private MutableInt recordsCounter;

public MarcRawReader(File file, int chunkSize) {
this.chunkSize = chunkSize;
recordsCounter = new MutableInt(0);
try {
this.reader = new MarcPermissiveStreamReader(FileUtils.openInputStream(file), true, true);
this.inputStream = FileUtils.openInputStream(file);
this.reader = new MarcPermissiveStreamReader(inputStream, true, true);
} catch (IOException e) {
String errorMessage = "Can not initialize reader. Cause: " + e.getMessage();
LOGGER.warn(errorMessage);
Expand Down Expand Up @@ -82,4 +85,15 @@ public boolean hasNext() {
public RecordsMetadata.ContentType getContentType() {
return RecordsMetadata.ContentType.MARC_RAW;
}

@Override
public void close() {
try {
if (inputStream != null) {
inputStream.close();
}
} catch (IOException e) {
LOGGER.warn("close:: Error closing input stream", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,9 @@ public boolean hasNext() {
public RecordsMetadata.ContentType getContentType() {
return RecordsMetadata.ContentType.MARC_XML;
}

@Override
public void close() {
// Document is already loaded into memory, no resources to close
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,7 @@ public RecordsReaderException(Throwable cause) {
super(cause);
}

public RecordsReaderException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
/**
* The root interface for traversing and partitioning elements of a source records.
*/
public interface
SourceReader {
public interface SourceReader extends AutoCloseable {

/**
* Returns the next list of source records in the iteration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,25 @@ public static int countRecordsInFile(
InputStream inStream,
JobProfileInfo profile
) throws IOException {
File tempFile = Files
.createTempFile(
Path tempFilePath;
if (isWindows()) {
// Windows doesn't support POSIX permissions - create without them
tempFilePath = Files.createTempFile(
"di-tmp-",
Path.of(filename).getFileName().toString()
);
} else {
// Unix/Linux/Mac - use POSIX permissions for security
tempFilePath = Files.createTempFile(
"di-tmp-",
// later stage requires correct file extension
Path.of(filename).getFileName().toString(),
PosixFilePermissions.asFileAttribute(
PosixFilePermissions.fromString("rwx------")
)
)
.toFile();
);
}
File tempFile = tempFilePath.toFile();

try (
InputStream autoCloseMe = inStream;
Expand All @@ -77,13 +86,25 @@ public static int countRecordsInFile(
}
}

public static boolean isWindows() {
return System.getProperty("os.name").toLowerCase().contains("win");
}

public static Path createTemporaryDir(String key) throws IOException {
return Files.createTempDirectory(
String.format("di-split-%s", key.replace('/', '-')),
PosixFilePermissions.asFileAttribute(
PosixFilePermissions.fromString("rwx------")
)
);
if (isWindows()) {
// Windows doesn't support POSIX permissions - create without them
return Files.createTempDirectory(
String.format("di-split-%s", key.replace('/', '-'))
);
} else {
// Unix/Linux/Mac - use POSIX permissions for security
return Files.createTempDirectory(
String.format("di-split-%s", key.replace('/', '-')),
PosixFilePermissions.asFileAttribute(
PosixFilePermissions.fromString("rwx------")
)
);
}
}

public boolean isMarcBinary(String path, JobProfileInfo profile) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<databaseChangeLog
xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-2.0.xsd">
<changeSet id="2024-12-15--00-00-add-request-id-to-queue-item-table" author="serhii-nosko">
<addColumn tableName="queue_items">
<column name="okapi_request_id" type="text"
remarks="The originating request's Okapi request ID header" />
</addColumn>
</changeSet>
</databaseChangeLog>
Loading