Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion lib/bas/bot/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ class Base
attr_accessor :read_response, :process_response, :write_response

def initialize(options, shared_storage_reader, shared_storage_writer = nil)
@process_options = options || {}
default_options = { close_connections_after_process: true }
@process_options = default_options.merge(options || {})
@shared_storage_reader = shared_storage_reader
@shared_storage_writer = shared_storage_writer || shared_storage_reader
end
Expand All @@ -31,6 +32,8 @@ def execute
@shared_storage_reader.set_processed

@write_response = write

close_connections if @process_options[:close_connections_after_process].eql?(true)
end

protected
Expand Down Expand Up @@ -60,6 +63,11 @@ def unprocessable_response

read_data.nil? || read_data == {} || read_data.any? { |_key, value| [[], "", nil].include?(value) }
end

def close_connections
@shared_storage_reader.close_connections if @shared_storage_reader.respond_to?(:close_connections)
@shared_storage_writer.close_connections if @shared_storage_writer.respond_to?(:close_connections)
end
end
end
end
2 changes: 2 additions & 0 deletions lib/bas/shared_storage/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ def set_in_process; end

def set_processed; end

def close_connections; end

protected

def read
Expand Down
31 changes: 24 additions & 7 deletions lib/bas/shared_storage/postgres.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

require_relative "base"
require_relative "types/read"
require_relative "../utils/postgres/request"
require_relative "../utils/postgres/connection"
require_relative "../version"

require "json"
Expand All @@ -17,17 +17,25 @@ class Postgres < Bas::SharedStorage::Base
TABLE_PARAMS = "data, tag, archived, stage, status, error_message, version"

def read
params = { connection: read_options[:connection], query: read_query }
establish_connection(:read)

first_result = Utils::Postgres::Request.execute(params).first || {}
first_result = @read_connection.query(read_query).first || {}

@read_response = Bas::SharedStorage::Types::Read.new(first_result[:id], first_result[:data],
first_result[:inserted_at])
end

def write(data)
params = { connection: write_options[:connection], query: write_query(data) }
@write_response = Utils::Postgres::Request.execute(params)
establish_connection(:write)

@write_response = @write_connection.query(write_query(data))
end

def close_connections
@read_connection&.finish
@write_connection&.finish
@read_connection = nil
@write_connection = nil
end

def set_in_process
Expand All @@ -44,6 +52,15 @@ def set_processed

private

def establish_connection(action)
case action
when :read
@read_connection ||= Utils::Postgres::Connection.new(read_options[:connection])
when :write
@write_connection ||= Utils::Postgres::Connection.new(write_options[:connection])
end
end

def read_query
query = "SELECT id, data, inserted_at FROM #{read_options[:db_table]} WHERE status='success' AND #{where}"

Expand Down Expand Up @@ -78,9 +95,9 @@ def write_params(data)
end

def update_stage(id, stage)
params = { connection: read_options[:connection], query: update_query(id, stage) }
establish_connection(:read)

Utils::Postgres::Request.execute(params)
@read_connection.query(update_query(id, stage))
end

def update_query(id, stage)
Expand Down
42 changes: 42 additions & 0 deletions lib/bas/utils/postgres/connection.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# frozen_string_literal: true

require "pg"

module Utils
module Postgres
# This module is a PostgresDB utility to establish connections to a Postgres database
# and execute raw or parameterized queries.
#
class Connection
def initialize(params)
@connection = PG::Connection.new(params)
end

def query(query)
results = if query.is_a? String
@connection.exec(query)
else
validate_query(query)

sentence, params = query
@connection.exec_params(sentence, params)
end

results.map { |result| result.transform_keys(&:to_sym) }
end

def finish
@connection&.finish
@connection = nil
end

private

def validate_query(query)
return if query.is_a?(Array) && query.size == 2 && query[0].is_a?(String) && query[1].is_a?(Array)

raise ArgumentError, "Parameterized query must be an array of [sentence (String), params (Array)]"
end
end
end
end
40 changes: 37 additions & 3 deletions spec/bas/bot/base_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

RSpec.describe Bas::Bot::Base do
before do
@options = {}
@shared_storage_reader = double(:shared_storage_reader, set_in_process: "in-process", set_processed: "processed")
@shared_storage_writer = double(:shared_storage_writer)
@options = { close_connections_after_process: true }
@shared_storage_reader = double(:shared_storage_reader, set_in_process: "in-process", set_processed: "processed",
close_connections: true)
@shared_storage_writer = double(:shared_storage_writer, close_connections: true)

@bot = described_class.new(@options, @shared_storage_reader, @shared_storage_writer)
end
Expand Down Expand Up @@ -72,6 +73,39 @@
expect(@bot.process_response).to eql({})
expect(@bot.write_response).to eql({})
end

it "closes the connections when close_connections_after_process is true" do
allow(@shared_storage_reader).to receive(:read).and_return(read_response)
allow(@shared_storage_writer).to receive(:write).and_return({})
allow(@shared_storage_reader).to receive(:close_connections).and_return(true)
allow(@shared_storage_writer).to receive(:close_connections).and_return(true)
allow_any_instance_of(described_class).to receive(:process).and_return({ success: "ok" })
allow(@shared_storage_reader).to receive(:respond_to?).with(:close_connections).and_return(true)
allow(@shared_storage_writer).to receive(:respond_to?).with(:close_connections).and_return(true)

bot = described_class.new({ close_connections_after_process: true }, @shared_storage_reader,
@shared_storage_writer)
bot.execute

expect(@shared_storage_reader).to have_received(:close_connections)
expect(@shared_storage_writer).to have_received(:close_connections)
end

it "does not close the connections when close_connections_after_process is false" do
allow(@shared_storage_reader).to receive(:read).and_return(read_response)
allow(@shared_storage_writer).to receive(:write).and_return({})
allow(@shared_storage_reader).to receive(:close_connections).and_return(true)
allow(@shared_storage_writer).to receive(:close_connections).and_return(true)
allow_any_instance_of(described_class).to receive(:process).and_return({ success: "ok" })

options = { close_connections_after_process: false }
bot = described_class.new(options, @shared_storage_reader, @shared_storage_writer)

bot.execute

expect(@shared_storage_reader).not_to have_received(:close_connections)
expect(@shared_storage_writer).not_to have_received(:close_connections)
end
end

describe ".unprocessable_response" do
Expand Down
Loading
Loading