diff --git a/ramls/raml-storage b/ramls/raml-storage index 26d2d703..811c0170 160000 --- a/ramls/raml-storage +++ b/ramls/raml-storage @@ -1 +1 @@ -Subproject commit 26d2d70312a0f07e1dabb907ac7d67d63fce64dc +Subproject commit 811c0170b212d29cbd40e5eb7d6a45949e636208 diff --git a/src/main/java/org/folio/dao/DataImportQueueItemDaoImpl.java b/src/main/java/org/folio/dao/DataImportQueueItemDaoImpl.java index 825d42fe..93d9dec4 100644 --- a/src/main/java/org/folio/dao/DataImportQueueItemDaoImpl.java +++ b/src/main/java/org/folio/dao/DataImportQueueItemDaoImpl.java @@ -41,9 +41,9 @@ public class DataImportQueueItemDaoImpl implements DataImportQueueItemDao { private static final String GET_BY_ID_SQL = "SELECT * FROM %s.%s WHERE id = $1"; private static final String INSERT_SQL = - "INSERT INTO %s.%s (id, job_execution_id, upload_definition_id, tenant, original_size, file_path, timestamp, part_number, processing, okapi_url, data_type, okapi_token, okapi_permissions) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)"; + "INSERT INTO %s.%s (id, job_execution_id, upload_definition_id, tenant, original_size, file_path, timestamp, part_number, processing, okapi_url, data_type, okapi_token, okapi_permissions, okapi_request_id) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)"; private static final String UPDATE_BY_ID_SQL = - "UPDATE %s.%s SET job_execution_id = $2, upload_definition_id = $3, tenant = $4, original_size = $5, file_path = $6, timestamp = $7, part_number = $8, processing = $9, okapi_url = $10, data_type = $11, okapi_token = $12, okapi_permissions = $13 WHERE id = $1"; + "UPDATE %s.%s SET job_execution_id = $2, upload_definition_id = $3, tenant = $4, original_size = $5, file_path = $6, timestamp = $7, part_number = $8, processing = $9, okapi_url = $10, data_type = $11, okapi_token = $12, okapi_permissions = $13, okapi_request_id = $14 WHERE id = $1"; private static final String DELETE_BY_ID_SQL = "DELETE FROM %s.%s WHERE id = $1"; private static final String DELETE_BY_JOB_ID_SQL = @@ -200,7 +200,8 @@ public Future addQueueItem(DataImportQueueItem dataImportQueueItem) { dataImportQueueItem.getOkapiUrl(), dataImportQueueItem.getDataType(), dataImportQueueItem.getOkapiToken(), - dataImportQueueItem.getOkapiPermissions() + dataImportQueueItem.getOkapiPermissions(), + dataImportQueueItem.getOkapiRequestId() ), promise ); @@ -228,7 +229,8 @@ public Future updateQueueItem( dataImportQueueItem.getOkapiUrl(), dataImportQueueItem.getDataType(), dataImportQueueItem.getOkapiToken(), - dataImportQueueItem.getOkapiPermissions() + dataImportQueueItem.getOkapiPermissions(), + dataImportQueueItem.getOkapiRequestId() ); return connection @@ -324,6 +326,7 @@ private static DataImportQueueItem mapRowJsonToQueueItem(Row rowAsJson) { queueItem.setDataType(rowAsJson.getString("data_type")); queueItem.setOkapiToken(rowAsJson.getString("okapi_token")); queueItem.setOkapiPermissions(rowAsJson.getString("okapi_permissions")); + queueItem.setOkapiRequestId(rowAsJson.getString("okapi_request_id")); return queueItem; } diff --git a/src/main/java/org/folio/service/file/S3JobRunningVerticle.java b/src/main/java/org/folio/service/file/S3JobRunningVerticle.java index d3e036bf..e5f32bdd 100644 --- a/src/main/java/org/folio/service/file/S3JobRunningVerticle.java +++ b/src/main/java/org/folio/service/file/S3JobRunningVerticle.java @@ -30,6 +30,7 @@ import org.folio.rest.jaxrs.model.StatusDto.ErrorStatus; import org.folio.service.processing.ParallelFileChunkingProcessor; import org.folio.service.processing.ranking.ScoreService; +import org.folio.service.processing.split.FileSplitUtilities; import org.folio.service.s3storage.MinioStorageService; import org.folio.service.upload.UploadDefinitionService; import org.springframework.beans.factory.annotation.Autowired; @@ -253,13 +254,19 @@ protected Future downloadFromS3(QueueJob job) { } protected Future createLocalFile(DataImportQueueItem queueItem) { - return vertx - .fileSystem() - .createTempFile( - "di-tmp-", - Path.of(queueItem.getFilePath()).getFileName().toString(), - "rwx------" - ) + String prefix = "di-tmp-"; + String suffix = Path.of(queueItem.getFilePath()).getFileName().toString(); + + Future tempFileFuture; + if (FileSplitUtilities.isWindows()) { + // Windows doesn't support POSIX permissions - create without them + tempFileFuture = vertx.fileSystem().createTempFile(prefix, suffix); + } else { + // Unix/Linux/Mac - use POSIX permissions for security + tempFileFuture = vertx.fileSystem().createTempFile(prefix, suffix, "rwx------"); + } + + return tempFileFuture .map(File::new) .onSuccess(localFile -> LOGGER.info("Created temporary file {}", localFile.toPath()) @@ -281,7 +288,9 @@ protected OkapiConnectionParams getConnectionParams( XOkapiHeaders.TOKEN.toLowerCase(), queueItem.getOkapiToken(), XOkapiHeaders.PERMISSIONS.toLowerCase(), - queueItem.getOkapiPermissions() + queueItem.getOkapiPermissions(), + XOkapiHeaders.REQUEST_ID.toLowerCase(), + queueItem.getOkapiRequestId() ), vertx ); @@ -304,6 +313,8 @@ protected OkapiConnectionParams getConnectionParams( queueItem.getOkapiToken(), XOkapiHeaders.PERMISSIONS.toLowerCase(), queueItem.getOkapiPermissions(), + XOkapiHeaders.REQUEST_ID.toLowerCase(), + queueItem.getOkapiRequestId(), XOkapiHeaders.USER_ID.toLowerCase(), userId ), diff --git a/src/main/java/org/folio/service/file/SplitFileProcessingService.java b/src/main/java/org/folio/service/file/SplitFileProcessingService.java index e7da1d64..c706ee18 100644 --- a/src/main/java/org/folio/service/file/SplitFileProcessingService.java +++ b/src/main/java/org/folio/service/file/SplitFileProcessingService.java @@ -304,18 +304,10 @@ protected Future initializeChildren( ) .withOkapiToken(params.getToken()) .withOkapiPermissions( - params - .getHeaders() - .entries() - .stream() - .filter(header -> - XOkapiHeaders.PERMISSIONS.equalsIgnoreCase( - header.getKey() - ) - ) - .findFirst() - .map(Map.Entry::getValue) - .orElse("") + getHeaderValue(params, XOkapiHeaders.PERMISSIONS) + ) + .withOkapiRequestId( + getHeaderValue(params, XOkapiHeaders.REQUEST_ID) ) ); }) @@ -634,6 +626,17 @@ protected String getUserIdFromMetadata(@CheckForNull Metadata metadata) { } } + private String getHeaderValue(OkapiConnectionParams params, String headerName) { + return params + .getHeaders() + .entries() + .stream() + .filter(header -> headerName.equalsIgnoreCase(header.getKey())) + .findFirst() + .map(Map.Entry::getValue) + .orElse(""); + } + /** * Hold information about a split file that will be needed for the creation * of job executions, etc diff --git a/src/main/java/org/folio/service/processing/ParallelFileChunkingProcessor.java b/src/main/java/org/folio/service/processing/ParallelFileChunkingProcessor.java index b984418b..509d995a 100644 --- a/src/main/java/org/folio/service/processing/ParallelFileChunkingProcessor.java +++ b/src/main/java/org/folio/service/processing/ParallelFileChunkingProcessor.java @@ -208,14 +208,16 @@ public static int countTotalRecordsInFile(File file, JobProfileInfo jobProfile) return 0; } - SourceReader reader = SourceReaderBuilder.build(file, jobProfile); - - int total = 0; - while (reader.hasNext()) { - total += reader.next().size(); + try (SourceReader reader = SourceReaderBuilder.build(file, jobProfile)) { + int total = 0; + while (reader.hasNext()) { + total += reader.next().size(); + } + return total; + } catch (Exception e) { + LOGGER.error("Error counting records in file", e); + throw new RecordsReaderException("Error counting records in file", e); } - - return total; } /** diff --git a/src/main/java/org/folio/service/processing/kafka/SourceReaderReadStreamWrapper.java b/src/main/java/org/folio/service/processing/kafka/SourceReaderReadStreamWrapper.java index 7c30842e..8dd83f75 100644 --- a/src/main/java/org/folio/service/processing/kafka/SourceReaderReadStreamWrapper.java +++ b/src/main/java/org/folio/service/processing/kafka/SourceReaderReadStreamWrapper.java @@ -237,6 +237,8 @@ private void handleEnd() { LOGGER.debug("handleEnd:: End handler. Processing completed: {}", this); + closeReader(); + Handler endHandler; synchronized (this) { handler = null; @@ -248,6 +250,10 @@ private void handleEnd() { } private void handleException(Throwable t) { + closed = true; + + closeReader(); + if (exceptionHandler != null && t instanceof Exception) { exceptionHandler.handle(t); } else { @@ -255,4 +261,12 @@ private void handleException(Throwable t) { } } + private void closeReader() { + try { + reader.close(); + LOGGER.debug("closeReader:: Reader closed successfully"); + } catch (Exception e) { + LOGGER.warn("closeReader:: Error closing reader", e); + } + } } diff --git a/src/main/java/org/folio/service/processing/reader/EdifactReader.java b/src/main/java/org/folio/service/processing/reader/EdifactReader.java index e91b3e12..3e8f2558 100644 --- a/src/main/java/org/folio/service/processing/reader/EdifactReader.java +++ b/src/main/java/org/folio/service/processing/reader/EdifactReader.java @@ -79,5 +79,12 @@ public boolean hasNext() { public RecordsMetadata.ContentType getContentType() { return RecordsMetadata.ContentType.EDIFACT_RAW; } + + @Override + public void close() { + if (segmentsStream != null) { + segmentsStream.close(); + } + } } diff --git a/src/main/java/org/folio/service/processing/reader/MarcJsonReader.java b/src/main/java/org/folio/service/processing/reader/MarcJsonReader.java index 73d8972a..ac713d85 100644 --- a/src/main/java/org/folio/service/processing/reader/MarcJsonReader.java +++ b/src/main/java/org/folio/service/processing/reader/MarcJsonReader.java @@ -82,4 +82,15 @@ public boolean hasNext() { public RecordsMetadata.ContentType getContentType() { return RecordsMetadata.ContentType.MARC_JSON; } + + @Override + public void close() { + try { + if (reader != null) { + reader.close(); + } + } catch (IOException e) { + LOGGER.warn("close:: Error closing reader", e); + } + } } diff --git a/src/main/java/org/folio/service/processing/reader/MarcRawReader.java b/src/main/java/org/folio/service/processing/reader/MarcRawReader.java index e10b0c1a..05ebc487 100644 --- a/src/main/java/org/folio/service/processing/reader/MarcRawReader.java +++ b/src/main/java/org/folio/service/processing/reader/MarcRawReader.java @@ -14,6 +14,7 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.nio.charset.Charset; import java.util.List; @@ -30,6 +31,7 @@ public class MarcRawReader implements SourceReader { private static final Charset CHARSET = Charset.forName(MODULE_SPECIFIC_ARGS.getOrDefault("file.processing.buffer.record.charset", "UTF8")); private MarcPermissiveStreamReader reader; + private InputStream inputStream; private int chunkSize; private MutableInt recordsCounter; @@ -37,7 +39,8 @@ public MarcRawReader(File file, int chunkSize) { this.chunkSize = chunkSize; recordsCounter = new MutableInt(0); try { - this.reader = new MarcPermissiveStreamReader(FileUtils.openInputStream(file), true, true); + this.inputStream = FileUtils.openInputStream(file); + this.reader = new MarcPermissiveStreamReader(inputStream, true, true); } catch (IOException e) { String errorMessage = "Can not initialize reader. Cause: " + e.getMessage(); LOGGER.warn(errorMessage); @@ -82,4 +85,15 @@ public boolean hasNext() { public RecordsMetadata.ContentType getContentType() { return RecordsMetadata.ContentType.MARC_RAW; } + + @Override + public void close() { + try { + if (inputStream != null) { + inputStream.close(); + } + } catch (IOException e) { + LOGGER.warn("close:: Error closing input stream", e); + } + } } diff --git a/src/main/java/org/folio/service/processing/reader/MarcXmlReader.java b/src/main/java/org/folio/service/processing/reader/MarcXmlReader.java index 5109c0ea..f49b0a74 100644 --- a/src/main/java/org/folio/service/processing/reader/MarcXmlReader.java +++ b/src/main/java/org/folio/service/processing/reader/MarcXmlReader.java @@ -65,4 +65,9 @@ public boolean hasNext() { public RecordsMetadata.ContentType getContentType() { return RecordsMetadata.ContentType.MARC_XML; } + + @Override + public void close() { + // Document is already loaded into memory, no resources to close + } } diff --git a/src/main/java/org/folio/service/processing/reader/RecordsReaderException.java b/src/main/java/org/folio/service/processing/reader/RecordsReaderException.java index b139e990..589e508d 100644 --- a/src/main/java/org/folio/service/processing/reader/RecordsReaderException.java +++ b/src/main/java/org/folio/service/processing/reader/RecordsReaderException.java @@ -10,4 +10,7 @@ public RecordsReaderException(Throwable cause) { super(cause); } + public RecordsReaderException(String message, Throwable cause) { + super(message, cause); + } } diff --git a/src/main/java/org/folio/service/processing/reader/SourceReader.java b/src/main/java/org/folio/service/processing/reader/SourceReader.java index f47ff62e..6c1aab21 100644 --- a/src/main/java/org/folio/service/processing/reader/SourceReader.java +++ b/src/main/java/org/folio/service/processing/reader/SourceReader.java @@ -8,8 +8,7 @@ /** * The root interface for traversing and partitioning elements of a source records. */ -public interface -SourceReader { +public interface SourceReader extends AutoCloseable { /** * Returns the next list of source records in the iteration. diff --git a/src/main/java/org/folio/service/processing/split/FileSplitUtilities.java b/src/main/java/org/folio/service/processing/split/FileSplitUtilities.java index b0f4a1dc..7bbb3d44 100644 --- a/src/main/java/org/folio/service/processing/split/FileSplitUtilities.java +++ b/src/main/java/org/folio/service/processing/split/FileSplitUtilities.java @@ -50,16 +50,25 @@ public static int countRecordsInFile( InputStream inStream, JobProfileInfo profile ) throws IOException { - File tempFile = Files - .createTempFile( + Path tempFilePath; + if (isWindows()) { + // Windows doesn't support POSIX permissions - create without them + tempFilePath = Files.createTempFile( + "di-tmp-", + Path.of(filename).getFileName().toString() + ); + } else { + // Unix/Linux/Mac - use POSIX permissions for security + tempFilePath = Files.createTempFile( "di-tmp-", // later stage requires correct file extension Path.of(filename).getFileName().toString(), PosixFilePermissions.asFileAttribute( PosixFilePermissions.fromString("rwx------") ) - ) - .toFile(); + ); + } + File tempFile = tempFilePath.toFile(); try ( InputStream autoCloseMe = inStream; @@ -77,13 +86,25 @@ public static int countRecordsInFile( } } + public static boolean isWindows() { + return System.getProperty("os.name").toLowerCase().contains("win"); + } + public static Path createTemporaryDir(String key) throws IOException { - return Files.createTempDirectory( - String.format("di-split-%s", key.replace('/', '-')), - PosixFilePermissions.asFileAttribute( - PosixFilePermissions.fromString("rwx------") - ) - ); + if (isWindows()) { + // Windows doesn't support POSIX permissions - create without them + return Files.createTempDirectory( + String.format("di-split-%s", key.replace('/', '-')) + ); + } else { + // Unix/Linux/Mac - use POSIX permissions for security + return Files.createTempDirectory( + String.format("di-split-%s", key.replace('/', '-')), + PosixFilePermissions.asFileAttribute( + PosixFilePermissions.fromString("rwx------") + ) + ); + } } public boolean isMarcBinary(String path, JobProfileInfo profile) { diff --git a/src/main/resources/liquibase/module/scripts/v-3.4.0/2024-12-15--00-00-add-request-id-to-queue-item-table.xml b/src/main/resources/liquibase/module/scripts/v-3.4.0/2024-12-15--00-00-add-request-id-to-queue-item-table.xml new file mode 100644 index 00000000..3226c572 --- /dev/null +++ b/src/main/resources/liquibase/module/scripts/v-3.4.0/2024-12-15--00-00-add-request-id-to-queue-item-table.xml @@ -0,0 +1,11 @@ + + + + + + + diff --git a/src/test/java/org/folio/dao/DataImportQueueDaoTest.java b/src/test/java/org/folio/dao/DataImportQueueDaoTest.java index ab779711..487d46d5 100644 --- a/src/test/java/org/folio/dao/DataImportQueueDaoTest.java +++ b/src/test/java/org/folio/dao/DataImportQueueDaoTest.java @@ -62,7 +62,10 @@ public void initializeVariables() { .withPartNumber(1) .withProcessing(false) .withOkapiUrl("okapi-url-1") - .withDataType("data-type-1"); + .withDataType("data-type-1") + .withOkapiToken("okapi-token-1") + .withOkapiPermissions("okapi-permissions-1") + .withOkapiRequestId("okapi-request-id-1"); WAITING_2 = new DataImportQueueItem() .withId("c7e786c6-6fd1-5633-9d09-ced97fc1ff9e") @@ -79,7 +82,10 @@ public void initializeVariables() { .withPartNumber(2) .withProcessing(false) .withOkapiUrl("okapi-url-2") - .withDataType("data-type-2"); + .withDataType("data-type-2") + .withOkapiToken("okapi-token-2") + .withOkapiPermissions("okapi-permissions-2") + .withOkapiRequestId("okapi-request-id-2"); IN_PROGRESS_1 = new DataImportQueueItem() @@ -97,7 +103,10 @@ public void initializeVariables() { .withPartNumber(1) .withProcessing(true) .withOkapiUrl("okapi-url-p-1") - .withDataType("data-type-p-1"); + .withDataType("data-type-p-1") + .withOkapiToken("okapi-token-p-1") + .withOkapiPermissions("okapi-permissions-p-1") + .withOkapiRequestId("okapi-request-id-p-1"); IN_PROGRESS_2 = new DataImportQueueItem() .withId("9a197f42-16c9-5a4b-a213-1adf4498fb02") @@ -114,7 +123,10 @@ public void initializeVariables() { .withPartNumber(2) .withProcessing(true) .withOkapiUrl("okapi-url-p-2") - .withDataType("data-type-p-2"); + .withDataType("data-type-p-2") + .withOkapiToken("okapi-token-p-2") + .withOkapiPermissions("okapi-permissions-p-2") + .withOkapiRequestId("okapi-request-id-p-2"); queueItemDao = new DataImportQueueItemDaoImpl(new PostgresClientFactory(vertx)); diff --git a/src/test/java/org/folio/rest/ProcessS3APITest.java b/src/test/java/org/folio/rest/ProcessS3APITest.java index 21a0df3c..32625954 100644 --- a/src/test/java/org/folio/rest/ProcessS3APITest.java +++ b/src/test/java/org/folio/rest/ProcessS3APITest.java @@ -20,6 +20,8 @@ import java.net.HttpURLConnection; import java.net.URL; import java.util.Arrays; + +import lombok.SneakyThrows; import org.apache.commons.io.FileUtils; import org.apache.http.HttpStatus; import org.folio.rest.jaxrs.model.AssembleFileDto; @@ -44,6 +46,7 @@ public static void configureEnv() { } @Test + @SneakyThrows public void testProcessingSuccess() throws IOException { UploadDefinition uploadDefinition = RestAssured .given() @@ -97,7 +100,7 @@ public void testProcessingSuccess() throws IOException { .getResource( uploadDefinition.getFileDefinitions().get(0).getName() ) - .getFile() + .toURI() ) ) ); diff --git a/src/test/java/org/folio/rest/UploadDefinitionAPITest.java b/src/test/java/org/folio/rest/UploadDefinitionAPITest.java index fdf29148..4cf6d69b 100644 --- a/src/test/java/org/folio/rest/UploadDefinitionAPITest.java +++ b/src/test/java/org/folio/rest/UploadDefinitionAPITest.java @@ -335,7 +335,7 @@ public void fileUpload() { String uploadDefId = uploadDefinition.getId(); String fileId = uploadDefinition.getFileDefinitions().getFirst().getId(); ClassLoader classLoader = getClass().getClassLoader(); - File file = new File(Objects.requireNonNull(classLoader.getResource("CornellFOLIOExemplars_Bibs.mrc")).getFile()); + File file = new File(Objects.requireNonNull(classLoader.getResource("CornellFOLIOExemplars_Bibs.mrc")).toURI()); UploadDefinition uploadDefinition1 = RestAssured.given() .spec(specUpload) .when() @@ -371,7 +371,7 @@ public void fileUploadFailedIfSrmReturnedException() { WireMock.stubFor(WireMock.put(new UrlPathPattern(new RegexPattern("/change-manager/jobExecutions/" + id + "/status"), true)) .willReturn(WireMock.notFound())); ClassLoader classLoader = getClass().getClassLoader(); - File file = new File(Objects.requireNonNull(classLoader.getResource("CornellFOLIOExemplars_Bibs.mrc")).getFile()); + File file = new File(Objects.requireNonNull(classLoader.getResource("CornellFOLIOExemplars_Bibs.mrc")).toURI()); RestAssured.given() .spec(specUpload) .when() @@ -425,9 +425,10 @@ public void fileUploadShouldReturnFileDefinitionWithStatusErrorWhenFileUploadStr } @Test + @SneakyThrows public void fileUploadNotFound() { ClassLoader classLoader = getClass().getClassLoader(); - File file = new File(Objects.requireNonNull(classLoader.getResource("CornellFOLIOExemplars_Bibs.mrc")).getFile()); + File file = new File(Objects.requireNonNull(classLoader.getResource("CornellFOLIOExemplars_Bibs.mrc")).toURI()); RestAssured.given() .spec(specUpload) .when() diff --git a/src/test/java/org/folio/service/file/S3JobRunningVerticleUnitTest.java b/src/test/java/org/folio/service/file/S3JobRunningVerticleUnitTest.java index 040d2fe3..a54eb1f8 100644 --- a/src/test/java/org/folio/service/file/S3JobRunningVerticleUnitTest.java +++ b/src/test/java/org/folio/service/file/S3JobRunningVerticleUnitTest.java @@ -121,6 +121,7 @@ public void testConnectionParams() { .withOkapiUrl("okapi-url") .withOkapiToken("token") .withOkapiPermissions("permissions") + .withOkapiRequestId("request-id") ); assertThat(params.getTenantId(), is("tenant")); @@ -130,6 +131,10 @@ public void testConnectionParams() { params.getHeaders().get("x-okapi-permissions"), is("permissions") ); + assertThat( + params.getHeaders().get("x-okapi-request-id"), + is("request-id") + ); } @Test @@ -139,7 +144,8 @@ public void testConnectionParamsWithUserId() { .withTenant("tenant") .withOkapiUrl("okapi-url") .withOkapiToken("token") - .withOkapiPermissions("permissions"), + .withOkapiPermissions("permissions") + .withOkapiRequestId("request-id"), "user-id" ); @@ -150,6 +156,10 @@ public void testConnectionParamsWithUserId() { params.getHeaders().get("x-okapi-permissions"), is("permissions") ); + assertThat( + params.getHeaders().get("x-okapi-request-id"), + is("request-id") + ); assertThat(params.getHeaders().get("x-okapi-user-id"), is("user-id")); } @@ -218,6 +228,9 @@ public void testDownloadFromS3Failure(TestContext context) public void testCreateLocalFileSuccess(TestContext context) { File testResult = new File("result"); + // Mock both 2-parameter (Windows) and 3-parameter (Unix) versions + when(fileSystem.createTempFile(anyString(), anyString())) + .thenReturn(Future.succeededFuture(testResult.toString())); when(fileSystem.createTempFile(anyString(), anyString(), anyString())) .thenReturn(Future.succeededFuture(testResult.toString())); @@ -406,7 +419,8 @@ public void testProcessQueueItemSuccess(TestContext context) .withTenant("tenant") .withOkapiUrl("okapi-url") .withOkapiToken("token") - .withOkapiPermissions("permissions"); + .withOkapiPermissions("permissions") + .withOkapiRequestId("request-id"); doReturn(Future.succeededFuture(tempFile)) .when(verticle) @@ -474,7 +488,8 @@ public void testProcessQueueItemFailure(TestContext context) .withTenant("tenant") .withOkapiUrl("okapi-url") .withOkapiToken("token") - .withOkapiPermissions("permissions"); + .withOkapiPermissions("permissions") + .withOkapiRequestId("request-id"); doReturn(Future.succeededFuture(tempFile)) .when(verticle) @@ -544,7 +559,8 @@ public void testProcessQueueItemEarlyFailure(TestContext context) .withTenant("tenant") .withOkapiUrl("okapi-url") .withOkapiToken("token") - .withOkapiPermissions("permissions"); + .withOkapiPermissions("permissions") + .withOkapiRequestId("request-id"); doThrow(new UncheckedIOException(new IOException())) .when(verticle) diff --git a/src/test/java/org/folio/service/kafka/KafkaAdminClientServiceTest.java b/src/test/java/org/folio/service/kafka/KafkaAdminClientServiceTest.java index a9f8d56d..21c98117 100644 --- a/src/test/java/org/folio/service/kafka/KafkaAdminClientServiceTest.java +++ b/src/test/java/org/folio/service/kafka/KafkaAdminClientServiceTest.java @@ -26,6 +26,7 @@ import java.util.stream.Collectors; import org.apache.kafka.common.errors.TopicExistsException; import org.folio.kafka.services.KafkaAdminClientService; +import org.folio.kafka.services.KafkaEnvironmentProperties; import org.folio.kafka.services.KafkaTopic; import org.folio.service.kafka.support.DataImportKafkaTopic; import org.junit.Before; @@ -33,11 +34,13 @@ import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; +import org.mockito.MockedStatic; @RunWith(VertxUnitRunner.class) public class KafkaAdminClientServiceTest { private final String STUB_TENANT = "foo-tenant"; + private static final String ENV_VARIABLE = "folio"; private KafkaAdminClient mockClient; private Vertx vertx; @Mock @@ -129,8 +132,10 @@ private List getTopicNames(ArgumentCaptor> createTopicsCa } private Future createKafkaTopicsAsync(KafkaAdminClient client) { - try (var mocked = mockStatic(KafkaAdminClient.class)) { - mocked.when(() -> KafkaAdminClient.create(eq(vertx), anyMap())).thenReturn(client); + try (MockedStatic mockedClient = mockStatic(KafkaAdminClient.class); + MockedStatic mockedEnv = mockStatic(KafkaEnvironmentProperties.class)) { + mockedClient.when(() -> KafkaAdminClient.create(eq(vertx), anyMap())).thenReturn(client); + mockedEnv.when(KafkaEnvironmentProperties::environment).thenReturn(ENV_VARIABLE); return new KafkaAdminClientService(vertx) .createKafkaTopics(diKafkaTopicService.createTopicObjects(), STUB_TENANT);