diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e63eef..5e8db49 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +# Version 5.4.0 +* Add possibility to save exceptions in file + # Version 5.3.1 * Fix interface naming diff --git a/README.md b/README.md index 85a4aa5..84638b5 100644 --- a/README.md +++ b/README.md @@ -146,6 +146,20 @@ framework: CodeRhapsodie\DataflowBundle\MessengerMode\JobMessage: async ``` +### Exceptions mode +Dataflow can save exceptions in any filesystem you want. +This allows dataflow to save exceptions in filesystem instead of the database +You have to install `league/flysystem`. + +To enable exceptions mode: + +```yaml +code_rhapsodie_dataflow: + exceptions_mode: + type: 'file' + flysystem_service: 'app.filesystem' #The name of the \League\Flysystem\Filesystem service +``` + ## Define a dataflow type This bundle uses a fixed and simple workflow structure in order to let you focus on the data processing logic part of diff --git a/Tests/Processor/JobProcessorTest.php b/Tests/Processor/JobProcessorTest.php index f1879ee..bd37342 100644 --- a/Tests/Processor/JobProcessorTest.php +++ b/Tests/Processor/JobProcessorTest.php @@ -7,6 +7,7 @@ use CodeRhapsodie\DataflowBundle\Entity\Job; use CodeRhapsodie\DataflowBundle\Event\Events; use CodeRhapsodie\DataflowBundle\Event\ProcessingEvent; +use CodeRhapsodie\DataflowBundle\Gateway\JobGateway; use CodeRhapsodie\DataflowBundle\Processor\JobProcessor; use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface; use CodeRhapsodie\DataflowBundle\Repository\JobRepository; @@ -20,14 +21,16 @@ class JobProcessorTest extends TestCase private JobRepository|MockObject $repository; private DataflowTypeRegistryInterface|MockObject $registry; private EventDispatcherInterface|MockObject $dispatcher; + private JobGateway|MockObject $jobGateway; protected function setUp(): void { $this->repository = $this->createMock(JobRepository::class); $this->registry = $this->createMock(DataflowTypeRegistryInterface::class); $this->dispatcher = $this->createMock(EventDispatcherInterface::class); + $this->jobGateway = $this->createMock(JobGateway::class); - $this->processor = new JobProcessor($this->repository, $this->registry, $this->dispatcher); + $this->processor = new JobProcessor($this->repository, $this->registry, $this->dispatcher, $this->jobGateway); } public function testProcess() @@ -72,7 +75,7 @@ public function testProcess() ->willReturn($result) ; - $this->repository + $this->jobGateway ->expects($this->exactly(2)) ->method('save') ; diff --git a/src/Command/JobShowCommand.php b/src/Command/JobShowCommand.php index 67e5412..2863b94 100644 --- a/src/Command/JobShowCommand.php +++ b/src/Command/JobShowCommand.php @@ -6,7 +6,7 @@ use CodeRhapsodie\DataflowBundle\Entity\Job; use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory; -use CodeRhapsodie\DataflowBundle\Repository\JobRepository; +use CodeRhapsodie\DataflowBundle\Gateway\JobGateway; use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; @@ -26,7 +26,7 @@ class JobShowCommand extends Command Job::STATUS_COMPLETED => 'Completed', ]; - public function __construct(private JobRepository $jobRepository, private ConnectionFactory $connectionFactory) + public function __construct(private JobGateway $jobGateway, private ConnectionFactory $connectionFactory) { parent::__construct(); } @@ -58,9 +58,9 @@ protected function execute(InputInterface $input, OutputInterface $output): int } if ($scheduleId) { - $job = $this->jobRepository->findLastForDataflowId($scheduleId); + $job = $this->jobGateway->findLastForDataflowId($scheduleId); } elseif ($jobId) { - $job = $this->jobRepository->find($jobId); + $job = $this->jobGateway->find($jobId); } else { $io->error('You must pass `job-id` or `schedule-id` option.'); diff --git a/src/DependencyInjection/CodeRhapsodieDataflowExtension.php b/src/DependencyInjection/CodeRhapsodieDataflowExtension.php index 7c00a85..ca4bb9c 100644 --- a/src/DependencyInjection/CodeRhapsodieDataflowExtension.php +++ b/src/DependencyInjection/CodeRhapsodieDataflowExtension.php @@ -15,7 +15,7 @@ */ class CodeRhapsodieDataflowExtension extends Extension { - public function load(array $configs, ContainerBuilder $container) + public function load(array $configs, ContainerBuilder $container): void { $loader = new YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config')); $loader->load('services.yaml'); @@ -29,6 +29,12 @@ public function load(array $configs, ContainerBuilder $container) $container->setParameter('coderhapsodie.dataflow.dbal_default_connection', $config['dbal_default_connection']); $container->setParameter('coderhapsodie.dataflow.default_logger', $config['default_logger']); + $container->setParameter('coderhapsodie.dataflow.exceptions_mode.type', $config['exceptions_mode']['type']); + + if ($config['exceptions_mode']['type'] === 'file') { + $container->setParameter('coderhapsodie.dataflow.flysystem_service', $config['exceptions_mode']['flysystem_service']); + $loader->load('exceptions_services.yaml'); + } if ($config['messenger_mode']['enabled']) { $container->setParameter('coderhapsodie.dataflow.bus', $config['messenger_mode']['bus']); diff --git a/src/DependencyInjection/Compiler/BusCompilerPass.php b/src/DependencyInjection/Compiler/BusCompilerPass.php index 5a8114f..9c95169 100644 --- a/src/DependencyInjection/Compiler/BusCompilerPass.php +++ b/src/DependencyInjection/Compiler/BusCompilerPass.php @@ -12,7 +12,7 @@ class BusCompilerPass implements CompilerPassInterface { - public function process(ContainerBuilder $container) + public function process(ContainerBuilder $container): void { if (!$container->hasParameter('coderhapsodie.dataflow.bus')) { return; diff --git a/src/DependencyInjection/Compiler/DataflowTypeCompilerPass.php b/src/DependencyInjection/Compiler/DataflowTypeCompilerPass.php index 2c0ba24..d2ad94d 100644 --- a/src/DependencyInjection/Compiler/DataflowTypeCompilerPass.php +++ b/src/DependencyInjection/Compiler/DataflowTypeCompilerPass.php @@ -16,7 +16,7 @@ */ class DataflowTypeCompilerPass implements CompilerPassInterface { - public function process(ContainerBuilder $container) + public function process(ContainerBuilder $container): void { if (!$container->has(DataflowTypeRegistry::class)) { return; diff --git a/src/DependencyInjection/Compiler/DefaultLoggerCompilerPass.php b/src/DependencyInjection/Compiler/DefaultLoggerCompilerPass.php index e3c3a41..fe5fdfb 100644 --- a/src/DependencyInjection/Compiler/DefaultLoggerCompilerPass.php +++ b/src/DependencyInjection/Compiler/DefaultLoggerCompilerPass.php @@ -12,7 +12,7 @@ class DefaultLoggerCompilerPass implements CompilerPassInterface { - public function process(ContainerBuilder $container) + public function process(ContainerBuilder $container): void { $defaultLogger = $container->getParameter('coderhapsodie.dataflow.default_logger'); if (!$container->has($defaultLogger)) { diff --git a/src/DependencyInjection/Compiler/ExceptionCompilerPass.php b/src/DependencyInjection/Compiler/ExceptionCompilerPass.php new file mode 100644 index 0000000..105f56f --- /dev/null +++ b/src/DependencyInjection/Compiler/ExceptionCompilerPass.php @@ -0,0 +1,29 @@ +hasParameter('coderhapsodie.dataflow.flysystem_service')) { + return; + } + + $flysystem = $container->getParameter('coderhapsodie.dataflow.flysystem_service'); + if (!$container->has($flysystem)) { + throw new InvalidArgumentException(\sprintf('Service "%s" not found', $flysystem)); + } + + $definition = $container->findDefinition(FilesystemExceptionHandler::class); + $definition->setArgument('$filesystem', new Reference($flysystem)); + } +} diff --git a/src/DependencyInjection/Configuration.php b/src/DependencyInjection/Configuration.php index a4bf333..bd0a67b 100644 --- a/src/DependencyInjection/Configuration.php +++ b/src/DependencyInjection/Configuration.php @@ -38,6 +38,20 @@ public function getConfigTreeBuilder(): TreeBuilder ->thenInvalid('You need "symfony/messenger" in order to use Dataflow messenger mode.') ->end() ->end() + ->arrayNode('exceptions_mode') + ->addDefaultsIfNotSet() + ->children() + ->scalarNode('type') + ->defaultValue('database') + ->end() + ->scalarNode('flysystem_service') + ->end() + ->validate() + ->ifTrue(static fn ($v): bool => $v['type'] === 'file' && !interface_exists('\League\Flysystem\Filesystem')) + ->thenInvalid('You need "league/flysystem" to use Dataflow file exception mode.') + ->end() + ->end() + ->end() ->end() ; diff --git a/src/ExceptionsHandler/ExceptionHandlerInterface.php b/src/ExceptionsHandler/ExceptionHandlerInterface.php new file mode 100644 index 0000000..a950c85 --- /dev/null +++ b/src/ExceptionsHandler/ExceptionHandlerInterface.php @@ -0,0 +1,12 @@ +filesystem->write(\sprintf('dataflow-job-%s.log', $jobId), json_encode($exceptions)); + } + + public function find(int $jobId): ?array + { + try { + if (!$this->filesystem->has(\sprintf('dataflow-job-%s.log', $jobId))) { + return []; + } + + return json_decode($this->filesystem->read(\sprintf('dataflow-job-%s.log', $jobId)), true); + } catch (FilesystemException) { + return []; + } + } +} diff --git a/src/ExceptionsHandler/NullExceptionHandler.php b/src/ExceptionsHandler/NullExceptionHandler.php new file mode 100644 index 0000000..c3a0d2d --- /dev/null +++ b/src/ExceptionsHandler/NullExceptionHandler.php @@ -0,0 +1,17 @@ +repository->find($jobId); + + return $this->loadExceptions($job); + } + + public function save(Job $job): void + { + if (!$this->exceptionHandler instanceof NullExceptionHandler) { + $this->exceptionHandler->save($job->getId(), $job->getExceptions()); + $job->setExceptions([]); + } + + $this->repository->save($job); + } + + public function findLastForDataflowId(int $scheduleId): ?Job + { + $job = $this->repository->findLastForDataflowId($scheduleId); + + return $this->loadExceptions($job); + } + + private function loadExceptions(?Job $job): ?Job + { + if ($job === null || $this->exceptionHandler instanceof NullExceptionHandler) { + return $job; + } + + $this->exceptionHandler->save($job->getId(), $job->getExceptions()); + + return $job->setExceptions($this->exceptionHandler->find($job->getId())); + } +} diff --git a/src/Processor/JobProcessor.php b/src/Processor/JobProcessor.php index b59d995..e7019db 100644 --- a/src/Processor/JobProcessor.php +++ b/src/Processor/JobProcessor.php @@ -9,6 +9,7 @@ use CodeRhapsodie\DataflowBundle\Entity\Job; use CodeRhapsodie\DataflowBundle\Event\Events; use CodeRhapsodie\DataflowBundle\Event\ProcessingEvent; +use CodeRhapsodie\DataflowBundle\Gateway\JobGateway; use CodeRhapsodie\DataflowBundle\Logger\BufferHandler; use CodeRhapsodie\DataflowBundle\Logger\DelegatingLogger; use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface; @@ -22,8 +23,12 @@ class JobProcessor implements JobProcessorInterface, LoggerAwareInterface { use LoggerAwareTrait; - public function __construct(private JobRepository $repository, private DataflowTypeRegistryInterface $registry, private EventDispatcherInterface $dispatcher) - { + public function __construct( + private JobRepository $repository, + private DataflowTypeRegistryInterface $registry, + private EventDispatcherInterface $dispatcher, + private JobGateway $jobGateway, + ) { } public function process(Job $job): void @@ -64,7 +69,7 @@ private function beforeProcessing(Job $job): void ->setStatus(Job::STATUS_RUNNING) ->setStartTime(new \DateTime()) ; - $this->repository->save($job); + $this->jobGateway->save($job); } private function afterProcessing(Job $job, Result $result, BufferHandler $bufferLogger): void @@ -75,7 +80,8 @@ private function afterProcessing(Job $job, Result $result, BufferHandler $buffer ->setCount($result->getSuccessCount()) ->setExceptions($bufferLogger->clearBuffer()) ; - $this->repository->save($job); + + $this->jobGateway->save($job); $this->dispatcher->dispatch(new ProcessingEvent($job), Events::AFTER_PROCESSING); } diff --git a/src/Resources/config/exceptions_services.yaml b/src/Resources/config/exceptions_services.yaml new file mode 100644 index 0000000..44b142a --- /dev/null +++ b/src/Resources/config/exceptions_services.yaml @@ -0,0 +1,5 @@ +services: + CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface: '@CodeRhapsodie\DataflowBundle\ExceptionsHandler\FilesystemExceptionHandler' + CodeRhapsodie\DataflowBundle\ExceptionsHandler\FilesystemExceptionHandler: + arguments: + $filesystem: ~ # Filled in compiler pass diff --git a/src/Resources/config/services.yaml b/src/Resources/config/services.yaml index 3233846..dce5b62 100644 --- a/src/Resources/config/services.yaml +++ b/src/Resources/config/services.yaml @@ -28,7 +28,7 @@ services: CodeRhapsodie\DataflowBundle\Command\JobShowCommand: arguments: - $jobRepository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository' + $jobGateway: '@CodeRhapsodie\DataflowBundle\Gateway\JobGateway' $connectionFactory: '@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory' tags: ['console.command'] @@ -93,3 +93,10 @@ services: $repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository' $registry: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface' $dispatcher: '@event_dispatcher' + $jobGateway: '@CodeRhapsodie\DataflowBundle\Gateway\JobGateway' + CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface: '@CodeRhapsodie\DataflowBundle\ExceptionsHandler\NullExceptionHandler' + CodeRhapsodie\DataflowBundle\ExceptionsHandler\NullExceptionHandler: + CodeRhapsodie\DataflowBundle\Gateway\JobGateway: + arguments: + $repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository' + $exceptionHandler: '@CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface'