diff --git a/composer.json b/composer.json index 3cef51a..c3dcbe2 100644 --- a/composer.json +++ b/composer.json @@ -16,7 +16,8 @@ }, "require": { "php": ">=5.3", - "ptrofimov/tinyredisclient": "1.1.1" + "ptrofimov/tinyredisclient": "1.1.1", + "rybakit/msgpack": "0.9.1" }, "autoload": { "psr-4": { diff --git a/src/Emitter.php b/src/Emitter.php index 7df0255..b9b6b8e 100644 --- a/src/Emitter.php +++ b/src/Emitter.php @@ -5,11 +5,11 @@ define('EVENT', 2); define('BINARY_EVENT', 5); -if (!function_exists('msgpack_pack')) { - require(__DIR__ . '/msgpack_pack.php'); -} - class Emitter { + const UID = "4N8LaD"; + const DEFAULT_KEY = 'socket.io'; + const DEFAULT_NSP = '/'; + public function __construct($redis = FALSE, $opts = array()) { if (is_array($redis)) { $opts = $redis; @@ -41,10 +41,12 @@ public function __construct($redis = FALSE, $opts = array()) { } $this->redis = $redis; - $this->key = (isset($opts['key']) ? $opts['key'] : 'socket.io') . '#emitter'; + $this->key = (isset($opts['key']) ? $opts['key'] : self::DEFAULT_KEY); $this->_rooms = array(); - $this->_flags = array(); + $this->setDefaultFlags(); + $this->_exceptRooms = array(); + $this->_packer = new \MessagePack\Packer(); } /* @@ -60,6 +62,10 @@ private function readFlag($flag) { return isset($this->_flags[$flag]) ? $this->_flags[$flag] : false; } + private function setDefaultFlags() { + $this->_flags = array('nsp' => self::DEFAULT_NSP); + } + /* * Broadcasting */ @@ -72,6 +78,14 @@ public function in($room) { return $this; } + public function except($room) { + if (!in_array($room, $this->_exceptRooms)) { + $this->_exceptRooms[] = $room; + } + + return $this; + } + // Alias for in public function to($room) { return $this->in($room); @@ -83,6 +97,7 @@ public function to($room) { public function of($nsp) { $this->_flags['nsp'] = $nsp; + return $this; } @@ -109,18 +124,18 @@ public function emit() { $packet['data'] = $args; // set namespace - if (isset($this->_flags['nsp'])) { - $packet['nsp'] = $this->_flags['nsp']; - unset($this->_flags['nsp']); - } else { - $packet['nsp'] = '/'; - } + $nsp = $this->_flags['nsp']; + $packet['nsp'] = $nsp; + unset($this->_flags['nsp']); // publish - $packed = msgpack_pack(array($packet, array( - 'rooms' => $this->_rooms, - 'flags' => $this->_flags - ))); + $opts = array( + 'rooms' => $this->_rooms, + 'flags' => $this->_flags, + 'except' => $this->_exceptRooms, + ); + + $packed = $this->_packer->pack(array(self::UID, $packet, $opts)); // hack buffer extensions for msgpack with binary if ($packet['type'] == BINARY_EVENT) { @@ -128,14 +143,14 @@ public function emit() { $packed = str_replace(pack('c', 0xdb), pack('c', 0xd9), $packed); } - $this->redis->publish($this->key, $packed); + $prefix = $this->key.'#'.$nsp.'#'; + $this->redis->publish($prefix, $packed); // reset state $this->_rooms = array(); - $this->_flags = array(); + $this->_exceptRooms = array(); + $this->setDefaultFlags(); return $this; } } - - diff --git a/src/msgpack_pack.php b/src/msgpack_pack.php deleted file mode 100644 index 21d3c1f..0000000 --- a/src/msgpack_pack.php +++ /dev/null @@ -1,113 +0,0 @@ -= -32) return pack('c', $input); - // uint8 - if ($input > 0 && $input <= 0xFF) return pack('CC', 0xCC, $input); - // uint16 - if ($input > 0 && $input <= 0xFFFF) return pack('Cn', 0xCD, $input); - // uint32 - if ($input > 0 && $input <= 0xFFFFFFFF) return pack('CN', 0xCE, $input); - // uint64 - if ($input > 0 && $input <= 0xFFFFFFFFFFFFFFFF) { - // pack() does not support 64-bit ints, so pack into two 32-bits - $h = ($input & 0xFFFFFFFF00000000) >> 32; - $l = $input & 0xFFFFFFFF; - return $bigendian ? pack('CNN', 0xCF, $l, $h) : pack('CNN', 0xCF, $h, $l); - } - // int8 - if ($input < 0 && $input >= -0x80) return pack('Cc', 0xD0, $input); - // int16 - if ($input < 0 && $input >= -0x8000) { - $p = pack('s', $input); - return pack('Ca2', 0xD1, $bigendian ? $p : strrev($p)); - } - // int32 - if ($input < 0 && $input >= -0x80000000) { - $p = pack('l', $input); - return pack('Ca4', 0xD2, $bigendian ? $p : strrev($p)); - } - // int64 - if ($input < 0 && $input >= -0x8000000000000000) { - // pack() does not support 64-bit ints either so pack into two 32-bits - $p1 = pack('l', $input & 0xFFFFFFFF); - $p2 = pack('l', ($input >> 32) & 0xFFFFFFFF); - return $bigendian ? pack('Ca4a4', 0xD3, $p1, $p2) : pack('Ca4a4', 0xD3, strrev($p2), strrev($p1)); - } - throw new \InvalidArgumentException('Invalid integer: ' . $input); - } - - // Floats - if (is_float($input)) { - // Just pack into a double, don't take any chances with single precision - return pack('C', 0xCB) . ($bigendian ? pack('d', $input) : strrev(pack('d', $input))); - } - - // Strings/Raw - if (is_string($input)) { - $len = strlen($input); - if ($len < 32) { - return pack('Ca*', 0xA0 | $len, $input); - } else if ($len <= 0xFFFF) { - return pack('Cna*', 0xDA, $len, $input); - } else if ($len <= 0xFFFFFFFF) { - return pack('CNa*', 0xDB, $len, $input); - } else { - throw new \InvalidArgumentException('Input overflows (2^32)-1 byte max'); - } - } - - // Arrays & Maps - if (is_array($input)) { - $keys = array_keys($input); - $len = count($input); - - // Is this an associative array? - $isMap = false; - foreach ($keys as $key) { - if (!is_int($key)) { - $isMap = true; - break; - } - } - - $buf = ''; - if ($len < 16) { - $buf .= pack('C', ($isMap ? 0x80 : 0x90) | $len); - } else if ($len <= 0xFFFF) { - $buf .= pack('Cn', ($isMap ? 0xDE : 0xDC), $len); - } else if ($len <= 0xFFFFFFFF) { - $buf .= pack('CN', ($isMap ? 0xDF : 0xDD), $len); - } else { - throw new \InvalidArgumentException('Input overflows (2^32)-1 max elements'); - } - - foreach ($input as $key => $elm) { - if ($isMap) $buf .= msgpack_pack($key); - $buf .= msgpack_pack($elm); - } - return $buf; - - } - - throw new \InvalidArgumentException('Not able to pack/serialize input type: ' . gettype($input)); -} -?> diff --git a/test/test.php b/test/test.php index 0fe65ec..0c5606d 100644 --- a/test/test.php +++ b/test/test.php @@ -69,6 +69,7 @@ public function testPublishContainsExpectedAttributes() { $this->assertTrue(strpos($contents, 'yo') !== FALSE); $this->assertTrue(strpos($contents, 'rooms') !== FALSE); $this->assertTrue(strpos($contents, 'flags') !== FALSE); + $this->assertTrue(strpos($contents, 'except') !== FALSE); // Should not broadcast by default $this->assertFalse(strpos($contents, 'broadcast') !== FALSE); // Should have the default namespace @@ -92,6 +93,7 @@ public function testPublishContainsBroadcastWhenBroadcasting() { $this->assertTrue(strpos($contents, 'rooms') !== FALSE); $this->assertTrue(strpos($contents, 'flags') !== FALSE); $this->assertTrue(strpos($contents, 'broadcast') !== FALSE); + $this->assertTrue(strpos($contents, 'except') !== FALSE); } public function testPublishContainsExpectedDataWhenEmittingBinary() {