Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
286af14
Add Ione::Stream and Ione::Stream::PushStream
iconara Nov 8, 2014
8d6b0d5
Use PushStream for the data listening in *Connection
iconara Nov 8, 2014
123245b
Add *Connection#to_stream
iconara Nov 8, 2014
dc72914
Rewrite the Redis client example with streams
iconara Nov 8, 2014
d5554dc
Rename a variable in the Redis example
iconara Nov 8, 2014
9676b9e
Write some documentation for the Redis protocol implementation
iconara Nov 8, 2014
1113d87
Refactor the Redis example to better show the three stream transforma…
iconara Nov 8, 2014
1803fc2
Add RedisError to the Redis example
iconara Nov 8, 2014
c157eb2
Remove some unnecessary code from the Redis protocol
iconara Nov 8, 2014
5361c2b
Mark internal methods as private in the Redis example
iconara Nov 8, 2014
c8b8580
Move the is-Redis-running check to a before block
iconara Nov 8, 2014
ca75e1a
Add a clarifying test to the Redis example
iconara Nov 8, 2014
ec1b833
Add a little something to the readme about streams
iconara Nov 8, 2014
097f5ee
Add some more API docs to the stream combinators
iconara Nov 8, 2014
6a8eaef
Add Stream#take and #drop
iconara Nov 8, 2014
0cdf4c6
Rename #each and @listeners #subscribe and @subscribers
iconara Nov 8, 2014
b9cf0a4
Reimplement Acceptor#on_accept as a stream
iconara Nov 8, 2014
74b7666
Rename {Stream,}Combinators
iconara Nov 8, 2014
a0bd9e1
Change the description of some of the Stream tests
iconara Nov 9, 2014
8395bba
Remove bad API docs from Stream#take and #drop
iconara Nov 9, 2014
25c46ad
Make some small changes to the HTTP client example
iconara Nov 9, 2014
9bcb739
Use streams in the HTTP client example
iconara Nov 9, 2014
9f76e1f
Change things around to get closer to the reactive streams spec
iconara Dec 30, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ Ione is a framework for reactive programming in Ruby. It is based on the reactiv

At the core of Ione is a futures API. Futures make it easy to compose asynchronous operations.

## Streams

Streams are a powerful abstraction for building for example composable protocol parsers.

## Evented IO

A key piece of the framework is an IO reactor with which you can easily build network clients and servers.
Expand Down
35 changes: 21 additions & 14 deletions examples/http_client/lib/ione/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ def get(url, headers={})
ctx.cert_store = @cert_store
options[:ssl] = ctx
end
f = @reactor.connect(uri.host, uri.port, options) { |connection| HttpProtocolHandler.new(connection) }
f = @reactor.connect(uri.host, uri.port, options) do |connection|
HttpProtocolHandler.new(connection)
end
f.flat_map do |handler|
handler.send_get(uri.path, uri.query, headers)
end
Expand All @@ -44,24 +46,28 @@ def get(url, headers={})
class HttpProtocolHandler
def initialize(connection)
@connection = connection
@connection.on_data(&method(:process_data))
@connection.to_stream.subscribe(method(:process_data))
@http_parser = Http::Parser.new(self)
@promises = []
end

def send_get(path, query, headers)
message = 'GET '
message << path
message << '?' << query if query && !query.empty?
message << " HTTP/1.1\r\n"
headers.each do |key, value|
message << key
message << ': '
message << value
message << "\r\n"
@connection.write do |buffer|
buffer << 'GET '
buffer << path
if query && !query.empty?
buffer << '?'
buffer << query
end
buffer << " HTTP/1.1\r\n"
headers.each do |key, value|
buffer << key
buffer << ':'
buffer << value
buffer << "\r\n"
end
buffer << "\r\n"
end
message << "\r\n"
@connection.write(message)
@promises << Promise.new
@promises.last.future
end
Expand All @@ -84,7 +90,8 @@ def on_body(chunk)
end

def on_message_complete
@promises.shift.fulfill(HttpResponse.new(@http_parser.status_code, @headers, @body))
response = HttpResponse.new(@http_parser.status_code, @headers, @body)
@promises.shift.fulfill(response)
end
end

Expand Down
221 changes: 119 additions & 102 deletions examples/redis_client/lib/ione/redis_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@


module Ione
RedisError = Class.new(StandardError)

class RedisClient
def self.connect(host, port)
new(host, port).connect
Expand All @@ -13,148 +15,163 @@ def initialize(host, port)
@host = host
@port = port
@reactor = Ione::Io::IoReactor.new
@responses = []
end

def connect
f = @reactor.start
f = f.flat_map { @reactor.connect(@host, @port) }
f = f.map { |connection| RedisProtocolHandler.new(connection) }
f.on_value { |protocol_handler| @protocol_handler = protocol_handler }
f.on_value do |connection|
@connection = connection
process_data_chunks(connection.to_stream)
end
f.map(self)
end

def method_missing(name, *args)
@protocol_handler.send_request(name, *args)
end
end

class LineProtocolHandler
def initialize(connection)
@connection = connection
@connection.on_data(&method(:process_data))
@buffer = Ione::ByteBuffer.new
@requests = []
end

def on_line(&listener)
@line_listener = listener
end

def write(command_string)
@connection.write(command_string)
def process_data_chunks(data_chunk_stream)
line_stream = data_chunk_stream.aggregate(Ione::ByteBuffer.new) do |data, downstream, buffer|
buffer << data
while (newline_index = buffer.index("\r\n"))
line = buffer.read(newline_index + 2)
line.chomp!
downstream << line
end
buffer
end
process_lines(line_stream)
end

def process_data(new_data)
lines = []
@buffer << new_data
while (newline_index = @buffer.index("\r\n"))
line = @buffer.read(newline_index + 2)
line.chomp!
lines << line
end
lines.each do |line|
@line_listener.call(line) if @line_listener
def process_lines(line_stream)
response_stream = line_stream.aggregate(RedisProtocol::BaseState.new) do |line, downstream, state|
state = state.feed_line(line)
if state.response?
downstream << [state.response, state.error?]
end
state
end
process_responses(response_stream)
end
end

class RedisProtocolHandler
def initialize(connection)
@line_protocol = LineProtocolHandler.new(connection)
@line_protocol.on_line(&method(:handle_line))
@responses = []
@state = BaseState.new(method(:handle_response))
def process_responses(response_stream)
response_stream.each do |response, error|
promise = @responses.shift
if error
promise.fail(RedisError.new(response))
else
promise.fulfill(response)
end
end
end

def send_request(*args)
def method_missing(*args)
promise = Ione::Promise.new
@responses << promise
request = "*#{args.size}\r\n"
args.each do |arg|
arg_str = arg.to_s
request << "$#{arg_str.bytesize}\r\n#{arg_str}\r\n"
end
@line_protocol.write(request)
@connection.write(request)
promise.future
end

def handle_response(result, error=false)
promise = @responses.shift
if error
promise.fail(StandardError.new(result))
else
promise.fulfill(result)
end
end

def handle_line(line)
@state = @state.handle_line(line)
end
# This is an implementation of the Redis protocol as a state machine.
#
# You start with {BaseState} and call {#feed_line} with a line received
# from Redis. {#feed_line} will return a new state object, on which you
# should call {#feed_line} with the next line from Redis.
#
# The state objects returned by {#feed_line} represent either a complete or
# a partial response. You can check if a state represents a complete
# response by calling {#response?}. If this method returns true you can call
# {#response} to get the response (which is either a string or an array of
# strings). In some cases the response is an error, in which case {#error?}
# will return true, and {#response} will return the error message.
#
# The state that represents a complete response also works like the base
# state so you can feed it a line to start processing the next response.
module RedisProtocol
class State
def response?
false
end

class State
def initialize(result_handler)
@result_handler = result_handler
end
private

def complete(result)
@result_handler.call(result)
end
def continue(next_state=self)
next_state
end

def fail(message)
@result_handler.call(message, true)
def complete(response, error=false)
CompleteState.new(response, error)
end
end
end

class BulkState < State
def handle_line(line)
complete(line)
BaseState.new(@result_handler)
class BulkState < State
def feed_line(line)
complete(line)
end
end
end

class MultiBulkState < State
def initialize(result_handler, expected_elements)
super(result_handler)
@expected_elements = expected_elements
@elements = []
end
class MultiBulkState < State
def initialize(size)
@size = size
@elements = []
end

def handle_line(line)
if line.start_with?('$')
line.slice!(0, 1)
if line.to_i == -1
@elements << nil
def feed_line(line)
if line.start_with?('$')
line.slice!(0, 1)
if line.to_i == -1
@elements << nil
end
else
@elements << line
end
if @elements.size == @size
complete(@elements)
else
continue
end
else
@elements << line
end
if @elements.size == @expected_elements
complete(@elements)
BaseState.new(@result_handler)
else
self
end
end
end

class BaseState < State
def handle_line(line)
next_state = self
first_char = line.slice!(0, 1)
case first_char
when '+' then complete(line)
when ':' then complete(line.to_i)
when '-' then fail(line)
when '$'
if line.to_i == -1
complete(nil)
class BaseState < State
def feed_line(line)
first_char = line.slice!(0, 1)
case first_char
when '+' then complete(line)
when ':' then complete(line.to_i)
when '-' then complete(line, true)
when '$'
if line.to_i == -1
complete(nil)
else
continue(BulkState.new)
end
when '*'
continue(MultiBulkState.new(line.to_i))
else
next_state = BulkState.new(@result_handler)
continue
end
when '*'
next_state = MultiBulkState.new(@result_handler, line.to_i)
end
next_state
end

class CompleteState < BaseState
attr_reader :response

def initialize(response, error=false)
@response = response
@error = error
end

def response?
true
end

def error?
@error
end
end
end
end
Expand Down
Loading