diff --git a/DependencyInjection/Configuration.php b/DependencyInjection/Configuration.php index 1d1b09e..67dfe02 100644 --- a/DependencyInjection/Configuration.php +++ b/DependencyInjection/Configuration.php @@ -42,6 +42,13 @@ public function getConfigTreeBuilder() ->arrayNode('redis') ->addDefaultsIfNotSet() ->children() + ->scalarNode('driver') + ->defaultValue('phpredis') + ->validate() + ->ifNotInArray(['phpredis','predis']) + ->thenInvalid("'driver' parameter must be 'phpredis' or 'predis'") + ->end() + ->end() ->scalarNode('host') ->defaultValue('127.0.0.1') ->end() diff --git a/DependencyInjection/RSQueueExtension.php b/DependencyInjection/RSQueueExtension.php index 6c4c26f..ea46884 100644 --- a/DependencyInjection/RSQueueExtension.php +++ b/DependencyInjection/RSQueueExtension.php @@ -2,11 +2,11 @@ namespace Mmoreram\RSQueueBundle\DependencyInjection; +use Symfony\Component\Config\FileLocator; use Symfony\Component\DependencyInjection\ContainerBuilder; +use Symfony\Component\DependencyInjection\Loader; use Symfony\Component\DependencyInjection\Reference; -use Symfony\Component\Config\FileLocator; use Symfony\Component\HttpKernel\DependencyInjection\Extension; -use Symfony\Component\DependencyInjection\Loader; /** * This is the class that loads and manages your bundle configuration @@ -38,6 +38,16 @@ public function load(array $configs, ContainerBuilder $container) $config['server']['redis'] ); + $rsQueueRedisClass = '\Mmoreram\RSQueueBundle\Redis\RedisAdapter'; + + if($config['server']['redis']['driver'] === 'predis') { + $rsQueueRedisClass = '\Mmoreram\RSQueueBundle\Redis\PredisClientAdapter'; + } + $container->setParameter( + 'rs_queue.redis.class', + $rsQueueRedisClass + ); + $loader = new Loader\YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config')); $loader->load('services.yml'); @@ -49,7 +59,6 @@ public function load(array $configs, ContainerBuilder $container) $definition->setFactoryService('rs_queue.serializer.factory'); $definition->setFactoryMethod('get'); } - // BC sf < 2.6 $definition = $container->getDefinition('rs_queue.redis'); if (method_exists($definition, 'setFactory')) { diff --git a/Event/Abstracts/AbstractRSChannelEvent.php b/Event/Abstracts/AbstractRSChannelEvent.php index 4da6f8c..6c0a7b0 100644 --- a/Event/Abstracts/AbstractRSChannelEvent.php +++ b/Event/Abstracts/AbstractRSChannelEvent.php @@ -35,7 +35,7 @@ abstract class AbstractRSChannelEvent extends AbstractRSEvent * @param String $payloadSerialized Payload serialized * @param string $channelAlias Channel alias * @param string $channelName Channel name - * @param Redis $redis Redis instance + * @param \Redis|\Predis\Client $redis Redis instance */ public function __construct($payload, $payloadSerialized, $channelAlias, $channelName, $redis) { diff --git a/Event/Abstracts/AbstractRSEvent.php b/Event/Abstracts/AbstractRSEvent.php index c2e240f..3ef8c9c 100644 --- a/Event/Abstracts/AbstractRSEvent.php +++ b/Event/Abstracts/AbstractRSEvent.php @@ -9,7 +9,6 @@ namespace Mmoreram\RSQueueBundle\Event\Abstracts; use Symfony\Component\EventDispatcher\Event; -use Redis as RedisClient; /** * Abstract event @@ -32,7 +31,7 @@ abstract class AbstractRSEvent extends Event protected $payloadSerialized; /** - * @var Redis + * @var \Redis|\Predis\Client * * Redis instance */ @@ -43,9 +42,9 @@ abstract class AbstractRSEvent extends Event * * @param Mixed $payload Payload * @param String $payloadSerialized Payload serialized - * @param Redis $redis Redis instance + * @param \Redis|\Predis\Client $redis Redis instance */ - public function __construct($payload, $payloadSerialized, RedisClient $redis) + public function __construct($payload, $payloadSerialized, $redis) { $this->payload = $payload; $this->payloadSerialized = $payloadSerialized; @@ -75,7 +74,7 @@ public function getPayloadSerialized() /** * Return redis instance * - * @return Redis + * @return \Redis|\Predis\Client */ public function getRedis() { diff --git a/Event/Abstracts/AbstractRSQueueEvent.php b/Event/Abstracts/AbstractRSQueueEvent.php index de61ff7..a7b1760 100644 --- a/Event/Abstracts/AbstractRSQueueEvent.php +++ b/Event/Abstracts/AbstractRSQueueEvent.php @@ -35,7 +35,7 @@ abstract class AbstractRSQueueEvent extends AbstractRSEvent * @param String $payloadSerialized Payload serialized * @param string $queueAlias Queue alias * @param string $queueName Queue name - * @param Redis $redis Redis instance + * @param \Redis|\Predis\Client $redis Redis instance */ public function __construct($payload, $payloadSerialized, $queueAlias, $queueName, $redis) { diff --git a/Factory/RedisFactory.php b/Factory/RedisFactory.php index e6ac037..ea8053c 100644 --- a/Factory/RedisFactory.php +++ b/Factory/RedisFactory.php @@ -8,6 +8,10 @@ namespace Mmoreram\RSQueueBundle\Factory; +use Mmoreram\RSQueueBundle\Redis\AdapterInterface; +use Mmoreram\RSQueueBundle\Redis\PredisClientAdapter; +use Mmoreram\RSQueueBundle\Redis\RedisAdapter; +use Predis\Client; use Redis; /** @@ -30,19 +34,37 @@ public function __construct(array $config) } /** - * Generate new Predis instance + * Generate a AdapterInterface instance * - * @return \Redis instance + * @return AdapterInterface */ public function get() { - $redis = new Redis; - $redis->connect($this->config['host'], $this->config['port']); - $redis->setOption(Redis::OPT_READ_TIMEOUT, -1); - if ($this->config['database']) { - $redis->select($this->config['database']); + if ($this->config['driver'] === 'predis') { + + $connectionParameters = array( + 'scheme' => 'tcp', + 'host' => $this->config['host'], + 'port' => $this->config['port'], + 'read_write_timeout' => -1 + ); + if ($this->config['database']) { + $connectionParameters['database'] = $this->config['database']; + } + $redis = new Client($connectionParameters); + + return new PredisClientAdapter($redis); + } else { + $redis = new Redis; + $redis->connect($this->config['host'], $this->config['port']); + $redis->setOption(Redis::OPT_READ_TIMEOUT, -1); + if ($this->config['database']) { + $redis->select($this->config['database']); + } + + return new RedisAdapter($redis); } - return $redis; + } } diff --git a/Redis/AdapterInterface.php b/Redis/AdapterInterface.php new file mode 100644 index 0000000..7877b06 --- /dev/null +++ b/Redis/AdapterInterface.php @@ -0,0 +1,48 @@ +client = $client; + } + + /** + * @param $queues + * @param $timeout + * @return array + */ + public function blPop($queues, $timeout) + { + return $this->client->blpop($queues, $timeout); + } + + /** + * @param $key + * @param array $messages + * @return int + */ + public function rPush($key, array $messages) + { + return $this->client->rpush($key, $messages); + } + + /** + * @param $channels + * @param $callback + */ + public function subscribe($channels, $callback) + { + $pubSub = $this->client->pubSubLoop(); + $pubSub->subscribe($channels); + foreach ($pubSub as $message) { + switch ($message->kind) { + case 'message': + $callback($pubSub->getClient(), $message->channel, $message->payload); + break; + } + + } + unset($pubSub); + + } + + /** + * @param $channel + * @param $message + * @return int + */ + public function publish($channel, $message) + { + return $this->client->publish($channel, $message); + } + + /** + * @return \Predis\Client + */ + public function getClient() + { + return $this->client; + } +} \ No newline at end of file diff --git a/Redis/RedisAdapter.php b/Redis/RedisAdapter.php new file mode 100644 index 0000000..f8eaacb --- /dev/null +++ b/Redis/RedisAdapter.php @@ -0,0 +1,79 @@ +client = $redis; + } + + /** + * @param $queues + * @param $timeout + * @return array + */ + public function blPop($queues, $timeout) + { + return $this->client->blPop($queues, $timeout); + } + + /** + * @param $key + * @param array $messages + * @return int + */ + public function rPush($key, array $messages) + { + return $this->client->rPush($key, $messages); + } + + /** + * @param $channels + * @param $callback + * @return mixed|void + */ + public function subscribe($channels, $callback) + { + return $this->client->subscribe($channels, $callback); + } + + /** + * @param $channel + * @param $message + * @return int|mixed + */ + public function publish($channel, $message) + { + return $this->client->publish($channel, $message); + } + + /** + * @return \Redis + */ + public function getClient() + { + return $this->client; + } +} \ No newline at end of file diff --git a/Resources/config/services.yml b/Resources/config/services.yml index f51ad63..a1feaf0 100644 --- a/Resources/config/services.yml +++ b/Resources/config/services.yml @@ -20,7 +20,7 @@ services: redis: %rs_queue.server.redis% rs_queue.redis: - class: Redis + class: %rs_queue.redis.class% rs_queue.resolver.queuealias: class: %rs_queue.resolver.queuealias.class% diff --git a/Services/Abstracts/AbstractService.php b/Services/Abstracts/AbstractService.php index 590c6e0..818adf2 100644 --- a/Services/Abstracts/AbstractService.php +++ b/Services/Abstracts/AbstractService.php @@ -8,8 +8,8 @@ namespace Mmoreram\RSQueueBundle\Services\Abstracts; +use Mmoreram\RSQueueBundle\Redis\AdapterInterface; use Symfony\Component\EventDispatcher\EventDispatcherInterface; -use Redis as RedisClient; use Mmoreram\RSQueueBundle\Serializer\Interfaces\SerializerInterface; use Mmoreram\RSQueueBundle\Resolver\QueueAliasResolver; @@ -28,11 +28,11 @@ class AbstractService protected $eventDispatcher; /** - * @var Predis\Client + * @var AdapterInterface * * Redis client used to interact with redis service */ - protected $redis; + protected $redisAdapter; /** * @var QueueAliasResolver @@ -49,17 +49,17 @@ class AbstractService protected $serializer; /** - * @param EventDispatcher $eventDispatcher EventDispatcher instance - * @param Predis\Client $redis Redis instance - * @param QueueAliasResolver $queueAliasResolver Resolver for queue alias - * @param SerializerInterface $serializer Serializer instance + * @param EventDispatcherInterface $eventDispatcher EventDispatcher instance + * @param AdapterInterface $redisAdapter Redis adapter instance + * @param QueueAliasResolver $queueAliasResolver Resolver for queue alias + * @param SerializerInterface $serializer Serializer instance * * Construct method */ - public function __construct(EventDispatcherInterface $eventDispatcher, RedisClient $redis, QueueAliasResolver $queueAliasResolver, SerializerInterface $serializer) + public function __construct(EventDispatcherInterface $eventDispatcher,AdapterInterface $redisAdapter, QueueAliasResolver $queueAliasResolver, SerializerInterface $serializer) { $this->eventDispatcher = $eventDispatcher; - $this->redis = $redis; + $this->redisAdapter = $redisAdapter; $this->queueAliasResolver = $queueAliasResolver; $this->serializer = $serializer; } diff --git a/Services/Consumer.php b/Services/Consumer.php index 5667cec..51aa0bb 100644 --- a/Services/Consumer.php +++ b/Services/Consumer.php @@ -8,6 +8,7 @@ namespace Mmoreram\RSQueueBundle\Services; +use Mmoreram\RSQueueBundle\Exception\InvalidAliasException; use Mmoreram\RSQueueBundle\Services\Abstracts\AbstractService; use Mmoreram\RSQueueBundle\RSQueueEvents; use Mmoreram\RSQueueBundle\Event\RSQueueConsumerEvent; @@ -41,7 +42,7 @@ public function consume($queueAlias, $timeout = 0) ? $this->queueAliasResolver->getQueues($queueAlias) : $this->queueAliasResolver->getQueue($queueAlias); - $payloadArray = $this->redis->blpop($queues, $timeout); + $payloadArray = $this->redisAdapter->blPop($queues, $timeout); if (empty($payloadArray)) { return array(); @@ -54,7 +55,7 @@ public function consume($queueAlias, $timeout = 0) /** * Dispatching consumer event... */ - $consumerEvent = new RSQueueConsumerEvent($payload, $payloadSerialized, $givenQueueAlias, $givenQueue, $this->redis); + $consumerEvent = new RSQueueConsumerEvent($payload, $payloadSerialized, $givenQueueAlias, $givenQueue, $this->redisAdapter->getClient()); $this->eventDispatcher->dispatch(RSQueueEvents::RSQUEUE_CONSUMER, $consumerEvent); return array($givenQueueAlias, $payload); diff --git a/Services/Producer.php b/Services/Producer.php index 454bf33..c4bf3e2 100644 --- a/Services/Producer.php +++ b/Services/Producer.php @@ -8,6 +8,7 @@ namespace Mmoreram\RSQueueBundle\Services; +use Mmoreram\RSQueueBundle\Exception\InvalidAliasException; use Mmoreram\RSQueueBundle\Services\Abstracts\AbstractService; use Mmoreram\RSQueueBundle\RSQueueEvents; use Mmoreram\RSQueueBundle\Event\RSQueueProducerEvent; @@ -32,15 +33,15 @@ public function produce($queueAlias, $payload) $queue = $this->queueAliasResolver->getQueue($queueAlias); $payloadSerialized = $this->serializer->apply($payload); - $this->redis->rpush( + $this->redisAdapter->rPush( $queue, - $payloadSerialized + array($payloadSerialized) ); /** * Dispatching producer event... */ - $producerEvent = new RSQueueProducerEvent($payload, $payloadSerialized, $queueAlias, $queue, $this->redis); + $producerEvent = new RSQueueProducerEvent($payload, $payloadSerialized, $queueAlias, $queue, $this->redisAdapter->getClient()); $this->eventDispatcher->dispatch(RSQueueEvents::RSQUEUE_PRODUCER, $producerEvent); return $this; diff --git a/Services/Publisher.php b/Services/Publisher.php index dc6e0d9..6737340 100644 --- a/Services/Publisher.php +++ b/Services/Publisher.php @@ -8,9 +8,10 @@ namespace Mmoreram\RSQueueBundle\Services; -use Mmoreram\RSQueueBundle\Services\Abstracts\AbstractService; -use Mmoreram\RSQueueBundle\RSQueueEvents; use Mmoreram\RSQueueBundle\Event\RSQueuePublisherEvent; +use Mmoreram\RSQueueBundle\Exception\InvalidAliasException; +use Mmoreram\RSQueueBundle\RSQueueEvents; +use Mmoreram\RSQueueBundle\Services\Abstracts\AbstractService; /** * Publisher class @@ -23,7 +24,7 @@ class Publisher extends AbstractService * @param String $channelAlias Name of channel to publish payload * @param Mixed $payload Data to publish * - * @return Producer self Object + * @return Publisher self Object * * @throws InvalidAliasException If any alias is not defined */ @@ -32,7 +33,7 @@ public function publish($channelAlias, $payload) $channel = $this->queueAliasResolver->getQueue($channelAlias); $payloadSerialized = $this->serializer->apply($payload); - $this->redis->publish( + $this->redisAdapter->publish( $channel, $payloadSerialized ); @@ -40,7 +41,7 @@ public function publish($channelAlias, $payload) /** * Dispatching publisher event... */ - $publisherEvent = new RSQueuePublisherEvent($payload, $payloadSerialized, $channelAlias, $channel, $this->redis); + $publisherEvent = new RSQueuePublisherEvent($payload, $payloadSerialized, $channelAlias, $channel, $this->redisAdapter->getClient()); $this->eventDispatcher->dispatch(RSQueueEvents::RSQUEUE_PUBLISHER, $publisherEvent); return $this; diff --git a/Tests/Services/ConsumerTest.php b/Tests/Services/ConsumerTest.php index 4541c1a..2a7f502 100644 --- a/Tests/Services/ConsumerTest.php +++ b/Tests/Services/ConsumerTest.php @@ -27,7 +27,7 @@ public function testConsume() $payload = array('engonga'); $redis = $this - ->getMock('\Redis', array('blPop')); + ->getMock('Mmoreram\RSQueueBundle\Redis\AdapterInterface'); $redis ->expects($this->once()) diff --git a/composer.json b/composer.json index e1a7eeb..640bfe8 100644 --- a/composer.json +++ b/composer.json @@ -24,6 +24,9 @@ "require-dev": { "symfony/phpunit-bridge": "~2.7" }, + "suggest": { + "predis/predis": "Another Redis driver" + }, "autoload": { "psr-4": { "Mmoreram\\RSQueueBundle\\": "" } },