Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ public function getConfigTreeBuilder()
->arrayNode('redis')
->addDefaultsIfNotSet()
->children()
->scalarNode('driver')
->defaultValue('phpredis')
->validate()
->ifNotInArray(['phpredis','predis'])
->thenInvalid("'driver' parameter must be 'phpredis' or 'predis'")
->end()
->end()
->scalarNode('host')
->defaultValue('127.0.0.1')
->end()
Expand Down
15 changes: 12 additions & 3 deletions DependencyInjection/RSQueueExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

namespace Mmoreram\RSQueueBundle\DependencyInjection;

use Symfony\Component\Config\FileLocator;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Loader;
use Symfony\Component\DependencyInjection\Reference;
use Symfony\Component\Config\FileLocator;
use Symfony\Component\HttpKernel\DependencyInjection\Extension;
use Symfony\Component\DependencyInjection\Loader;

/**
* This is the class that loads and manages your bundle configuration
Expand Down Expand Up @@ -38,6 +38,16 @@ public function load(array $configs, ContainerBuilder $container)
$config['server']['redis']
);

$rsQueueRedisClass = '\Mmoreram\RSQueueBundle\Redis\RedisAdapter';

if($config['server']['redis']['driver'] === 'predis') {
$rsQueueRedisClass = '\Mmoreram\RSQueueBundle\Redis\PredisClientAdapter';
}
$container->setParameter(
'rs_queue.redis.class',
$rsQueueRedisClass
);

$loader = new Loader\YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config'));
$loader->load('services.yml');

Expand All @@ -49,7 +59,6 @@ public function load(array $configs, ContainerBuilder $container)
$definition->setFactoryService('rs_queue.serializer.factory');
$definition->setFactoryMethod('get');
}

// BC sf < 2.6
$definition = $container->getDefinition('rs_queue.redis');
if (method_exists($definition, 'setFactory')) {
Expand Down
2 changes: 1 addition & 1 deletion Event/Abstracts/AbstractRSChannelEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ abstract class AbstractRSChannelEvent extends AbstractRSEvent
* @param String $payloadSerialized Payload serialized
* @param string $channelAlias Channel alias
* @param string $channelName Channel name
* @param Redis $redis Redis instance
* @param \Redis|\Predis\Client $redis Redis instance
*/
public function __construct($payload, $payloadSerialized, $channelAlias, $channelName, $redis)
{
Expand Down
9 changes: 4 additions & 5 deletions Event/Abstracts/AbstractRSEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
namespace Mmoreram\RSQueueBundle\Event\Abstracts;

use Symfony\Component\EventDispatcher\Event;
use Redis as RedisClient;

/**
* Abstract event
Expand All @@ -32,7 +31,7 @@ abstract class AbstractRSEvent extends Event
protected $payloadSerialized;

/**
* @var Redis
* @var \Redis|\Predis\Client
*
* Redis instance
*/
Expand All @@ -43,9 +42,9 @@ abstract class AbstractRSEvent extends Event
*
* @param Mixed $payload Payload
* @param String $payloadSerialized Payload serialized
* @param Redis $redis Redis instance
* @param \Redis|\Predis\Client $redis Redis instance
*/
public function __construct($payload, $payloadSerialized, RedisClient $redis)
public function __construct($payload, $payloadSerialized, $redis)
{
$this->payload = $payload;
$this->payloadSerialized = $payloadSerialized;
Expand Down Expand Up @@ -75,7 +74,7 @@ public function getPayloadSerialized()
/**
* Return redis instance
*
* @return Redis
* @return \Redis|\Predis\Client
*/
public function getRedis()
{
Expand Down
2 changes: 1 addition & 1 deletion Event/Abstracts/AbstractRSQueueEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ abstract class AbstractRSQueueEvent extends AbstractRSEvent
* @param String $payloadSerialized Payload serialized
* @param string $queueAlias Queue alias
* @param string $queueName Queue name
* @param Redis $redis Redis instance
* @param \Redis|\Predis\Client $redis Redis instance
*/
public function __construct($payload, $payloadSerialized, $queueAlias, $queueName, $redis)
{
Expand Down
38 changes: 30 additions & 8 deletions Factory/RedisFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

namespace Mmoreram\RSQueueBundle\Factory;

use Mmoreram\RSQueueBundle\Redis\AdapterInterface;
use Mmoreram\RSQueueBundle\Redis\PredisClientAdapter;
use Mmoreram\RSQueueBundle\Redis\RedisAdapter;
use Predis\Client;
use Redis;

/**
Expand All @@ -30,19 +34,37 @@ public function __construct(array $config)
}

/**
* Generate new Predis instance
* Generate a AdapterInterface instance
*
* @return \Redis instance
* @return AdapterInterface
*/
public function get()
{
$redis = new Redis;
$redis->connect($this->config['host'], $this->config['port']);
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
if ($this->config['database']) {
$redis->select($this->config['database']);
if ($this->config['driver'] === 'predis') {

$connectionParameters = array(
'scheme' => 'tcp',
'host' => $this->config['host'],
'port' => $this->config['port'],
'read_write_timeout' => -1
);
if ($this->config['database']) {
$connectionParameters['database'] = $this->config['database'];
}
$redis = new Client($connectionParameters);

return new PredisClientAdapter($redis);
} else {
$redis = new Redis;
$redis->connect($this->config['host'], $this->config['port']);
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
if ($this->config['database']) {
$redis->select($this->config['database']);
}

return new RedisAdapter($redis);
}

return $redis;

}
}
48 changes: 48 additions & 0 deletions Redis/AdapterInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?php
/**
* Created by PhpStorm.
* User: pilou
* Date: 04/12/16
* Time: 12:14
*/

namespace Mmoreram\RSQueueBundle\Redis;


interface AdapterInterface
{

/**
* @param $queues
* @param $timeout
* @return mixed
*/
public function blPop($queues, $timeout);

/**
* @param $key
* @param array $messages
* @return mixed
*/
public function rPush($key, array $messages);

/**
* @param $channels
* @param $callback
* @return mixed
*/
public function subscribe($channels, $callback);

/**
* @param $channel
* @param $message
* @return mixed
*/
public function publish($channel, $message);

/**
* @return \Redis|\Predis\Client
*/
public function getClient();

}
89 changes: 89 additions & 0 deletions Redis/PredisClientAdapter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
<?php
/**
* Created by PhpStorm.
* User: pilou
* Date: 04/12/16
* Time: 12:23
*/

namespace Mmoreram\RSQueueBundle\Redis;

/**
* Class PredisClientAdapter
* @package Mmoreram\RSQueueBundle\Redis
*/
class PredisClientAdapter implements AdapterInterface
{

/**
* @var \Predis\Client
*/
protected $client;

/**
* PredisClientAdapter constructor.
* @param \Predis\Client $client
*/
public function __construct(\Predis\Client $client)
{
$this->client = $client;
}

/**
* @param $queues
* @param $timeout
* @return array
*/
public function blPop($queues, $timeout)
{
return $this->client->blpop($queues, $timeout);
}

/**
* @param $key
* @param array $messages
* @return int
*/
public function rPush($key, array $messages)
{
return $this->client->rpush($key, $messages);
}

/**
* @param $channels
* @param $callback
*/
public function subscribe($channels, $callback)
{
$pubSub = $this->client->pubSubLoop();
$pubSub->subscribe($channels);
foreach ($pubSub as $message) {
switch ($message->kind) {
case 'message':
$callback($pubSub->getClient(), $message->channel, $message->payload);
break;
}

}
unset($pubSub);

}

/**
* @param $channel
* @param $message
* @return int
*/
public function publish($channel, $message)
{
return $this->client->publish($channel, $message);
}

/**
* @return \Predis\Client
*/
public function getClient()
{
return $this->client;
}
}
79 changes: 79 additions & 0 deletions Redis/RedisAdapter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
<?php
/**
* Created by PhpStorm.
* User: pilou
* Date: 04/12/16
* Time: 12:14
*/

namespace Mmoreram\RSQueueBundle\Redis;


/**
* Class RedisAdapter
* @package Mmoreram\RSQueueBundle\Redis
*/
class RedisAdapter implements AdapterInterface
{
/**
* @var \Redis
*/
protected $client;

/**
* RedisAdapter constructor.
* @param \Redis $redis
*/
public function __construct(\Redis $redis)
{
$this->client = $redis;
}

/**
* @param $queues
* @param $timeout
* @return array
*/
public function blPop($queues, $timeout)
{
return $this->client->blPop($queues, $timeout);
}

/**
* @param $key
* @param array $messages
* @return int
*/
public function rPush($key, array $messages)
{
return $this->client->rPush($key, $messages);
}

/**
* @param $channels
* @param $callback
* @return mixed|void
*/
public function subscribe($channels, $callback)
{
return $this->client->subscribe($channels, $callback);
}

/**
* @param $channel
* @param $message
* @return int|mixed
*/
public function publish($channel, $message)
{
return $this->client->publish($channel, $message);
}

/**
* @return \Redis
*/
public function getClient()
{
return $this->client;
}
}
2 changes: 1 addition & 1 deletion Resources/config/services.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ services:
redis: %rs_queue.server.redis%

rs_queue.redis:
class: Redis
class: %rs_queue.redis.class%

rs_queue.resolver.queuealias:
class: %rs_queue.resolver.queuealias.class%
Expand Down
Loading