Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 45 additions & 44 deletions Classes/Aspect/QueuingAspect.php
Original file line number Diff line number Diff line change
@@ -1,65 +1,66 @@
<?php
namespace FormatD\Mailer\QueueAdaptor\Aspect;

/* *
* This script belongs to the Flow package "FormatD.Mailer.QueueAdaptor". *
* */
namespace FormatD\Mailer\QueueAdaptor\Aspect;

use FormatD\Mailer\QueueAdaptor\Job\Context;
use FormatD\Mailer\QueueAdaptor\Job\MailJob;
use Flowpack\JobQueue\Common\Job\JobManager;
use FormatD\Mailer\QueueAdaptor\Service\MailQueue;
use FormatD\Mailer\QueueAdaptor\Transport\QueuingTransport;
use Neos\Flow\Annotations as Flow;
use Neos\SwiftMailer\Message;

use Neos\Flow\Aop\JoinPointInterface;
use ReflectionException;
use ReflectionObject;
use Symfony\Component\Mailer\Envelope;
use Symfony\Component\Mailer\Mailer;
use Symfony\Component\Mailer\SentMessage;
use Symfony\Component\Mime\Email;
use Symfony\Component\Mime\RawMessage;

/**
* @Flow\Aspect
* @Flow\Introduce("class(Neos\SwiftMailer\Message)", traitName="FormatD\Mailer\QueueAdaptor\Traits\QueueNameTrait")
*/
class QueuingAspect {
class QueuingAspect
{
#[Flow\InjectConfiguration]
protected array $settings;

/**
* @var JobManager
* @Flow\Inject
*/
protected $jobManager;

/**
* @Flow\Inject
* @var Context
*/
protected $jobContext;
#[Flow\Inject]
protected MailQueue $mailQueue;

/**
* @Flow\InjectConfiguration
* @var array
* When FormatD.Mailer is **not** installed, we augment the Symfony Mailer instance directly
*
* @Flow\Around("method(Neos\SymfonyMailer\Service\MailerService->getMailer())")
* @throws ReflectionException
*/
protected $settings;
public function decorateTransport(JoinPointInterface $joinPoint): Mailer
{
/** @var Mailer $mailer */
$mailer = $joinPoint->getAdviceChain()->proceed($joinPoint);
$transportProperty = (new ReflectionObject($mailer))->getProperty('transport');
$transportProperty->setValue($mailer, new QueuingTransport($transportProperty->getValue($mailer)));
return $mailer;
}

/**
* Intercept all emails or add bcc according to package configuration
* When FormatD.Mailer is installed, the above `decorateMailer()` advice is not always called depending
* on configuration. In that case, we intercept the special transport object's `send()` method instead.
*
* @param \Neos\Flow\Aop\JoinPointInterface $joinPoint
* @Flow\Around("setting(FormatD.Mailer.QueueAdaptor.enableAsynchronousMails) && method(Neos\SwiftMailer\Message->send())")
* @return void
* @Flow\Around("method(FormatD\Mailer\Transport\FdMailerTransport->send()) || method(FormatD\Mailer\Transport\InterceptingTransport->send())")
*/
public function queueEmails(\Neos\Flow\Aop\JoinPointInterface $joinPoint) {
public function transportSend(JoinPointInterface $joinPoint): ?SentMessage
{
/** @var RawMessage $message */
$message = $joinPoint->getMethodArgument('message');

if ($this->jobContext->isMailQueueingDisabled()) {
return $joinPoint->getAdviceChain()->proceed($joinPoint);
}
if ($message instanceof Email && !$this->mailQueue->isMailQueuingDisabled()) {
/** @var ?Envelope $envelope */
$envelope = $joinPoint->getMethodArgument('envelope');

/** @var Message $email */
$email = $joinPoint->getProxy();
$job = new MailJob($email);
$this->jobManager->queue($email->getQueueName() ? $email->getQueueName() : 'fdmailer-mail-queue', $job);
// Queue the mail before interception, i.e. before rewrite of the mail headers (To, Bcc, etc.)
// When the queued mail is released, the mail is intercepted again.
$this->mailQueue->enqueueMessage($message, $envelope);
return null;
}

// Neos\SwiftMailer\Message->send() should return the number of recipients who were accepted for delivery
// We dont know that until mail is execured by queue so we assume every recipient was accepted
// @todo: read recipient count and return that
return 1;
return $joinPoint->getAdviceChain()->proceed($joinPoint);
}

}

?>
52 changes: 0 additions & 52 deletions Classes/Job/Context.php

This file was deleted.

137 changes: 67 additions & 70 deletions Classes/Job/MailJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,107 +2,104 @@

namespace FormatD\Mailer\QueueAdaptor\Job;

use Neos\Cache\Frontend\StringFrontend;
use Neos\Flow\Annotations as Flow;
use Exception;
use Flowpack\JobQueue\Common\Job\JobInterface;
use Flowpack\JobQueue\Common\Queue\QueueInterface;
use Flowpack\JobQueue\Common\Queue\Message;

class MailJob implements JobInterface {

/**
* @Flow\InjectConfiguration(type="Settings", package="FormatD.Mailer.QueueAdaptor", path="serializationCache")
* @var array
*/
protected $serializationCacheSettings;

/**
* @Flow\Inject
* @var Context
*/
protected $jobContext;
use Flowpack\JobQueue\Common\Queue\QueueInterface;
use FormatD\Mailer\QueueAdaptor\Service\MailQueue;
use Neos\Cache\Exception as NeosCacheException;
use Neos\Cache\Exception\InvalidDataException;
use Neos\Cache\Frontend\StringFrontend;
use Neos\Flow\Annotations as Flow;
use Neos\SymfonyMailer\Service\MailerService;
use Ramsey\Uuid\Uuid;
use Symfony\Component\Mailer\Envelope;
use Symfony\Component\Mime\Email;

/**
* @Flow\Scope("prototype")
*/
class MailJob implements JobInterface
{
#[Flow\InjectConfiguration(path: 'serializationCache', package: 'FormatD.Mailer.QueueAdaptor')]
protected array $serializationCacheSettings;

#[Flow\Inject]
protected MailQueue $mailQueue;

/**
* @Flow\Inject
* Factory-backed objects (like cache) are **always** proxified by the Flow Object Manager, regardless if they're lazy or not.
* Such proxy objects do not extend the original class, thus a direct property type declaration gives a TypeError.
* @var StringFrontend
*/
protected $mailSerializationCache;
#[Flow\Inject]
protected mixed $mailDataCache = null;

/**
* @var \Neos\SwiftMailer\Message
*/
protected $email = null;
#[Flow\Inject]
protected MailerService $mailerService;

/**
* @var string
*/
protected $emailSerializationCacheIdentifier = null;
protected ?string $emailCacheIdentifier = null;

/**
* MailJob constructor.
* @param \Neos\SwiftMailer\Message $email
*/
public function __construct(\Neos\SwiftMailer\Message $email) {
$this->email = $email;
public function __construct(
protected Email $email,
protected ?Envelope $envelope = null
)
{
}

/**
* Execute the job
*
* A job should finish itself after successful execution using the queue methods.
*
* @param QueueInterface $queue
* @param Message $message The original message
* @return bool TRUE if the job was executed successfully and the message should be finished
* @throws Exception
*/
public function execute(QueueInterface $queue, Message $message): bool {

$message = $this->getEmail();
public function execute(QueueInterface $queue, Message $message): bool
{
$this->tryRestoreDataFromCache();

$this->jobContext->withoutMailQueuing(function () use ($message) {
$message->send();
});
if ($this->email) {
$this->mailQueue->withoutQueuing(function () {
$this->mailerService->getMailer()->send($this->email, $this->envelope);
});
return true;
}

return TRUE;
// In case the mail job is executed after the email data has already been evicted from cache, we obviously cannot proceed.
throw new Exception('Email data is no longer available in cache, so email cannot be sent.');
}

/**
* Get a readable label for the job
*
* @return string A label for the job
*/
public function getLabel(): string {
return $this->getEmail()->getSubject();
public function getLabel(): string
{
return $this->email->getSubject();
}

/**
* Serialize the email to a file because it can get really big with attachments
* Serialize the email to (file) cache so we don't need to store big email data incl. attachments in the queue
*
* @return string[]
* @throws \Neos\Cache\Exception
* @throws \Neos\Cache\Exception\InvalidDataException
* @throws NeosCacheException|InvalidDataException
*/
public function __sleep()
public function __sleep(): array
{
if ($this->serializationCacheSettings['enabled']) {
$this->emailSerializationCacheIdentifier = uniqid('email-');
$this->mailSerializationCache->set($this->emailSerializationCacheIdentifier, serialize($this->email), [], 172800); // 48 Std. lifetime
return array('emailSerializationCacheIdentifier');
if (($this->serializationCacheSettings['enabled'] ?? false) && $this->mailDataCache) {
$this->emailCacheIdentifier = sprintf('email-%s', Uuid::uuid4());
$data = [
'email' => $this->email,
'envelope' => $this->envelope,
];
$this->mailDataCache->set($this->emailCacheIdentifier, serialize($data), [], 172800); // 48h lifetime
return ['emailCacheIdentifier'];
}

return array('email');
return ['email', 'envelope'];
}

/**
* Restores the serialized email if cached in file
*
* @return \Neos\SwiftMailer\Message
*/
protected function getEmail() {
if (!$this->email && $this->emailSerializationCacheIdentifier && $this->mailSerializationCache->has($this->emailSerializationCacheIdentifier)) {
$this->email = unserialize($this->mailSerializationCache->get($this->emailSerializationCacheIdentifier));
protected function tryRestoreDataFromCache(): void
{
if ($this->emailCacheIdentifier && ($serializedData = $this->mailDataCache?->get($this->emailCacheIdentifier))) {
$data = unserialize($serializedData);
$this->email = $data['email'];
$this->envelope = $data['envelope'];
}
return $this->email;
}

}
23 changes: 23 additions & 0 deletions Classes/QueueableEmail.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

namespace FormatD\Mailer\QueueAdaptor;

use Symfony\Component\Mime\Email;

/**
* Optional decorator for mail queue classification
*/
class QueueableEmail extends Email
{
protected ?string $queueName = null;

public function setQueueName(string $queueName): void
{
$this->queueName = $queueName;
}

public function getQueueName(): ?string
{
return $this->queueName;
}
}
Loading