From 9b8dfca58a294ecfd432914d34c48e592100e87b Mon Sep 17 00:00:00 2001 From: Marco Montagna Date: Thu, 30 Jan 2020 13:48:06 -0800 Subject: [PATCH 1/6] Make select Faster --- lore/io/connection.py | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/lore/io/connection.py b/lore/io/connection.py index 1f9f811..ebb3547 100644 --- a/lore/io/connection.py +++ b/lore/io/connection.py @@ -78,13 +78,17 @@ class Connection(object): UNLOAD_PREFIX = os.path.join(lore.env.NAME, 'unloads') IAM_ROLE = os.environ.get('IAM_ROLE', None) - def __init__(self, url, name='connection', watermark=True, **kwargs): + def __init__(self, url, name='connection', watermark=True, allow_raw_adapter_queries=True, **kwargs): if not sqlalchemy: raise lore.env.ModuleNotFoundError('No module named sqlalchemy. Please add it to requirements.txt.') parsed = lore.env.parse_url(url) self.adapter = parsed.scheme + self.use_psycopg2 = False + if self.adapter == 'postgres' and allow_raw_adapter_queries: + self._use_psycopg2 = True + if self.adapter == 'postgres': require(lore.dependencies.POSTGRES) if self.adapter == 'snowflake': @@ -107,6 +111,7 @@ def __init__(self, url, name='connection', watermark=True, **kwargs): self.name = name self._transactions = [] self.__thread_local = threading.local() + self._raw_conn_pool = self._connection.engine.raw_connection() @event.listens_for(self._engine, "before_cursor_execute", retval=True) def comment_sql_calls(conn, cursor, statement, parameters, context, executemany): @@ -272,7 +277,22 @@ def select(self, sql=None, extract=None, filename=None, **kwargs): @query_cached def _select(self, sql, bindings): - return self.__execute(sql, bindings).fetchall() + if self._use_psycopg2: + try: + with self._raw_conn_pool.connection: + with self._raw_conn_pool.connection.cursor() as cursor: + cursor.execute(sql, bindings) + return cursor.fetchall() + except Psycopg2OperationalError as e: + logger.warning('Reconnect and retry due to invalid connection') + self.close() + self._raw_conn_pool = self._connection.engine.raw_connection() + with self._raw_conn_pool.connection: + with self._raw_conn_pool.connection.cursor() as cursor: + cursor.execute(sql, bindings) + return cursor.fetchall() + else: + return self.__execute(sql, bindings).fetchall() def unload(self, sql=None, extract=None, filename=None, **kwargs): cache = kwargs.pop('cache', False) From 2bd342d343cfa000c629bcefd2296420d9cd5382 Mon Sep 17 00:00:00 2001 From: Marco Montagna Date: Thu, 30 Jan 2020 14:10:08 -0800 Subject: [PATCH 2/6] Fixes. --- lore/io/connection.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/lore/io/connection.py b/lore/io/connection.py index ebb3547..18260da 100644 --- a/lore/io/connection.py +++ b/lore/io/connection.py @@ -85,7 +85,7 @@ def __init__(self, url, name='connection', watermark=True, allow_raw_adapter_que parsed = lore.env.parse_url(url) self.adapter = parsed.scheme - self.use_psycopg2 = False + self._use_psycopg2 = False if self.adapter == 'postgres' and allow_raw_adapter_queries: self._use_psycopg2 = True @@ -279,16 +279,17 @@ def select(self, sql=None, extract=None, filename=None, **kwargs): def _select(self, sql, bindings): if self._use_psycopg2: try: - with self._raw_conn_pool.connection: - with self._raw_conn_pool.connection.cursor() as cursor: + conn = self._connection.engine.raw_connection().connection + with conn: + with conn.cursor() as cursor: cursor.execute(sql, bindings) return cursor.fetchall() except Psycopg2OperationalError as e: logger.warning('Reconnect and retry due to invalid connection') self.close() - self._raw_conn_pool = self._connection.engine.raw_connection() - with self._raw_conn_pool.connection: - with self._raw_conn_pool.connection.cursor() as cursor: + conn = self._connection.engine.raw_connection().connection + with conn: + with conn.cursor() as cursor: cursor.execute(sql, bindings) return cursor.fetchall() else: From 70547735c1bd311cdb6ec984e7f6ec8a6485b020 Mon Sep 17 00:00:00 2001 From: Marco Montagna Date: Thu, 30 Jan 2020 16:08:03 -0800 Subject: [PATCH 3/6] Simplify. --- lore/io/connection.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/lore/io/connection.py b/lore/io/connection.py index 18260da..89a55a3 100644 --- a/lore/io/connection.py +++ b/lore/io/connection.py @@ -111,7 +111,6 @@ def __init__(self, url, name='connection', watermark=True, allow_raw_adapter_que self.name = name self._transactions = [] self.__thread_local = threading.local() - self._raw_conn_pool = self._connection.engine.raw_connection() @event.listens_for(self._engine, "before_cursor_execute", retval=True) def comment_sql_calls(conn, cursor, statement, parameters, context, executemany): @@ -279,16 +278,14 @@ def select(self, sql=None, extract=None, filename=None, **kwargs): def _select(self, sql, bindings): if self._use_psycopg2: try: - conn = self._connection.engine.raw_connection().connection - with conn: + with self._connection.engine.raw_connection().connection as conn: with conn.cursor() as cursor: cursor.execute(sql, bindings) return cursor.fetchall() except Psycopg2OperationalError as e: logger.warning('Reconnect and retry due to invalid connection') self.close() - conn = self._connection.engine.raw_connection().connection - with conn: + with self._connection.engine.raw_connection().connection as conn: with conn.cursor() as cursor: cursor.execute(sql, bindings) return cursor.fetchall() From 55cd147fb48f463e6608e1b8746b74c0955a95b0 Mon Sep 17 00:00:00 2001 From: Marco Montagna Date: Thu, 30 Jan 2020 18:16:22 -0800 Subject: [PATCH 4/6] Fix test. --- tests/unit/io/test_connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/io/test_connection.py b/tests/unit/io/test_connection.py index 4071bcd..c36421a 100644 --- a/tests/unit/io/test_connection.py +++ b/tests/unit/io/test_connection.py @@ -163,7 +163,7 @@ def test_close(self): lore.io.main.close() reopened = lore.io.main.select(sql='select 1') self.assertEquals(reopened, [(1,)]) - with self.assertRaises(sqlalchemy.exc.ProgrammingError): + with self.assertRaises(psycopg2.ProgrammingError): lore.io.main.select(sql='select count(*) from tests_close') def test_reconnect_and_retry(self): From 3fd26571f0e92ca4ece377ce7b2644e7a691de58 Mon Sep 17 00:00:00 2001 From: Marco Montagna Date: Mon, 3 Feb 2020 17:22:41 -0800 Subject: [PATCH 5/6] WIP attempt to fix tests. --- lore/io/connection.py | 43 ++++++++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/lore/io/connection.py b/lore/io/connection.py index 89a55a3..306b680 100644 --- a/lore/io/connection.py +++ b/lore/io/connection.py @@ -11,6 +11,7 @@ import sys import tempfile import threading +import psycopg2 from datetime import datetime @@ -74,6 +75,14 @@ def after_replace(func): sqlalchemy_logger.setLevel(log_levels.get(lore.env.NAME, logging.WARN)) +class ResultWrapper(object): + # Used to make psycopg2 results compatible + # with the interface provided by Connection.execute + def __init__(self, results): + self.results = results + def fetchall(self): + return self.results + class Connection(object): UNLOAD_PREFIX = os.path.join(lore.env.NAME, 'unloads') IAM_ROLE = os.environ.get('IAM_ROLE', None) @@ -276,21 +285,7 @@ def select(self, sql=None, extract=None, filename=None, **kwargs): @query_cached def _select(self, sql, bindings): - if self._use_psycopg2: - try: - with self._connection.engine.raw_connection().connection as conn: - with conn.cursor() as cursor: - cursor.execute(sql, bindings) - return cursor.fetchall() - except Psycopg2OperationalError as e: - logger.warning('Reconnect and retry due to invalid connection') - self.close() - with self._connection.engine.raw_connection().connection as conn: - with conn.cursor() as cursor: - cursor.execute(sql, bindings) - return cursor.fetchall() - else: - return self.__execute(sql, bindings).fetchall() + return self.__execute(sql, bindings).fetchall() def unload(self, sql=None, extract=None, filename=None, **kwargs): cache = kwargs.pop('cache', False) @@ -443,14 +438,28 @@ def __prepare(self, sql=None, extract=None, filename=None, **kwargs): return sql + def __connection_execute(self, sql, bindings): + if self._use_psycopg2: + with self._connection.engine.raw_connection().connection as conn: + with conn.cursor() as cursor: + cursor.execute(sql, bindings) + try: + return ResultWrapper(cursor.fetchall()) + except psycopg2.ProgrammingError as e: + if 'no results to fetch' in str(y): + return None + raise e + else: + return self._connection.execute(sql, bindings) + def __execute(self, sql, bindings): try: - return self._connection.execute(sql, bindings) + return self.__connection_execute(sql, bindings) except (sqlalchemy.exc.DBAPIError, Psycopg2OperationalError, SnowflakeProgrammingError) as e: if not self._transactions and (isinstance(e, Psycopg2OperationalError) or e.connection_invalidated): logger.warning('Reconnect and retry due to invalid connection') self.close() - return self._connection.execute(sql, bindings) + return self.__connection_execute(sql, bindings) elif not self._transactions and (isinstance(e, SnowflakeProgrammingError) or e.connection_invalidated): if hasattr(e, 'msg') and e.msg and "authenticate" in e.msg.lower(): logger.warning('Reconnect and retry due to unauthenticated connection') From 3c712085ab30eb25d134b4cef0bec13e3f210c42 Mon Sep 17 00:00:00 2001 From: Marco Montagna Date: Mon, 3 Feb 2020 19:49:53 -0800 Subject: [PATCH 6/6] Partial fixes --- lore/io/connection.py | 8 ++++---- tests/unit/io/test_connection.py | 16 ++++++++-------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/lore/io/connection.py b/lore/io/connection.py index 306b680..8a8c7bb 100644 --- a/lore/io/connection.py +++ b/lore/io/connection.py @@ -438,7 +438,7 @@ def __prepare(self, sql=None, extract=None, filename=None, **kwargs): return sql - def __connection_execute(self, sql, bindings): + def _connection_execute(self, sql, bindings): if self._use_psycopg2: with self._connection.engine.raw_connection().connection as conn: with conn.cursor() as cursor: @@ -446,7 +446,7 @@ def __connection_execute(self, sql, bindings): try: return ResultWrapper(cursor.fetchall()) except psycopg2.ProgrammingError as e: - if 'no results to fetch' in str(y): + if 'no results to fetch' in str(e): return None raise e else: @@ -454,12 +454,12 @@ def __connection_execute(self, sql, bindings): def __execute(self, sql, bindings): try: - return self.__connection_execute(sql, bindings) + return self._connection_execute(sql, bindings) except (sqlalchemy.exc.DBAPIError, Psycopg2OperationalError, SnowflakeProgrammingError) as e: if not self._transactions and (isinstance(e, Psycopg2OperationalError) or e.connection_invalidated): logger.warning('Reconnect and retry due to invalid connection') self.close() - return self.__connection_execute(sql, bindings) + return self._connection_execute(sql, bindings) elif not self._transactions and (isinstance(e, SnowflakeProgrammingError) or e.connection_invalidated): if hasattr(e, 'msg') and e.msg and "authenticate" in e.msg.lower(): logger.warning('Reconnect and retry due to unauthenticated connection') diff --git a/tests/unit/io/test_connection.py b/tests/unit/io/test_connection.py index c36421a..acbdd87 100644 --- a/tests/unit/io/test_connection.py +++ b/tests/unit/io/test_connection.py @@ -12,7 +12,7 @@ from sqlalchemy import event from sqlalchemy.engine import Engine import pandas - +import psycopg2 import lore @@ -137,7 +137,7 @@ def insert(delay=0): lore.io.main.execute(sql='insert into tests_autocommit values (1), (2), (3)') posts.append(lore.io.main.select(sql='select count(*) from tests_autocommit')[0][0]) time.sleep(delay) - except sqlalchemy.exc.IntegrityError as ex: + except psycopg2.IntegrityError as ex: thrown.append(True) slow = Thread(target=insert, args=(1,)) @@ -167,17 +167,17 @@ def test_close(self): lore.io.main.select(sql='select count(*) from tests_close') def test_reconnect_and_retry(self): - original_execute = lore.io.main._connection.execute + original_execute = lore.io.main._connection_execute def raise_dbapi_error_on_first_call(sql, bindings): - lore.io.main._connection.execute = original_execute + lore.io.main._connection_execute = original_execute e = lore.io.connection.Psycopg2OperationalError('server closed the connection unexpectedly. This probably means the server terminated abnormally before or while processing the request.') raise sqlalchemy.exc.DBAPIError('select 1', [], e, True) exceptions = lore.env.STDOUT_EXCEPTIONS lore.env.STDOUT_EXCEPTIONS = False connection = lore.io.main._connection - lore.io.main._connection.execute = raise_dbapi_error_on_first_call + lore.io.main._connection_execute = raise_dbapi_error_on_first_call result = lore.io.main.select(sql='select 1') lore.env.STDOUT_EXCEPTIONS = exceptions @@ -192,17 +192,17 @@ def test_tuple_interpolation(self): self.assertEqual(len(temps), 3) def test_reconnect_and_retry_on_expired_connection(self): - original_execute = lore.io.main._connection.execute + original_execute = lore.io.main._connection_execute def raise_snowflake_programming_error_on_first_call(sql, bindings): - lore.io.main._connection.execute = original_execute + lore.io.main._connection_execute = original_execute e = lore.io.connection.SnowflakeProgrammingError('Authentication token has expired. The user must authenticate again') raise sqlalchemy.exc.DBAPIError('select 1', [], e, True) exceptions = lore.env.STDOUT_EXCEPTIONS lore.env.STDOUT_EXCEPTIONS = False connection = lore.io.main._connection - lore.io.main._connection.execute = raise_snowflake_programming_error_on_first_call + lore.io.main._connection_execute = raise_snowflake_programming_error_on_first_call result = lore.io.main.select(sql='select 1') lore.env.STDOUT_EXCEPTIONS = exceptions