diff --git a/.travis.yml b/.travis.yml index d4c07ce..20abaae 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ language: python install: - - pip install tox==1.8.1 + - pip install tox - pip install coveralls codecov script: - tox @@ -9,3 +9,5 @@ after_success: codecov env: - TOXENV=py27 + - TOXENV=py35 + - TOXENV=py36-dev diff --git a/cornet/connectors/__init__.py b/cornet/connectors/__init__.py index a758e25..e5a9291 100644 --- a/cornet/connectors/__init__.py +++ b/cornet/connectors/__init__.py @@ -7,10 +7,13 @@ def get_connector(source): driver = source['driver'] if driver == 'mysql': - from mysql_connector import MySqlConnector + from .mysql_connector import MySqlConnector return MySqlConnector(source) elif driver == 'postgresql': - from postgresql_connector import PostgreSqlConnector + from .postgresql_connector import PostgreSqlConnector return PostgreSqlConnector(source) + elif driver == 'mssql': + from .mssql_connector import MSSqlConnector + return MSSqlConnector(source) else: raise LookupError("Driver {0} not supported".format(driver)) diff --git a/cornet/connectors/mssql_connector.py b/cornet/connectors/mssql_connector.py new file mode 100644 index 0000000..f756a0a --- /dev/null +++ b/cornet/connectors/mssql_connector.py @@ -0,0 +1,37 @@ +import pymssql # yes I know, would be nice to use JDBC connector instead +import sys +from .base_connector import BaseConnector +from cornet.connectors import Table, Column + + +class MSSqlConnector(BaseConnector): + + jdbc_url_prefix = 'jdbc:sqlserver' + + if sys.version_info[0] < 3: + raise("Sorry, MSSQL isn't yet happy on Python 2.") + + def _get_db_conn(self): + source = self.source + return pymssql.connect( + host=source['host'], + user=source['user'], + password=self._get_password(), + database=source['db']) + + def get_tables(self): + sql = """ + select table_name, table_type + from information_schema.tables + where table_name NOT IN ('sysdiagrams'); """ + res = self.query(sql) + return list(map(Table._make, res)) + + def get_columns(self, table): + sql = """ + select column_name, upper(data_type) + from information_schema.columns + where table_catalog = '{0}' + and table_name = '{1}'; """ + res = self.query(sql.format(self.source['db'], table.name)) + return list(map(Column._make, res)) diff --git a/cornet/connectors/mysql_connector.py b/cornet/connectors/mysql_connector.py index 842dff9..0b9696c 100644 --- a/cornet/connectors/mysql_connector.py +++ b/cornet/connectors/mysql_connector.py @@ -1,5 +1,5 @@ import MySQLdb -from base_connector import BaseConnector +from .base_connector import BaseConnector from cornet.connectors import Table, Column @@ -18,7 +18,7 @@ def _get_db_conn(self): def get_tables(self): res = self.query("show full tables") - return map(Table._make, res) + return list(map(Table._make, res)) def get_columns(self, table): sql = """ @@ -27,4 +27,4 @@ def get_columns(self, table): where table_schema = '{0}' and table_name = '{1}' """ res = self.query(sql.format(self.source['db'], table.name)) - return map(Column._make, res) + return list(map(Column._make, res)) diff --git a/cornet/connectors/postgresql_connector.py b/cornet/connectors/postgresql_connector.py index d95b872..df09f2d 100644 --- a/cornet/connectors/postgresql_connector.py +++ b/cornet/connectors/postgresql_connector.py @@ -1,5 +1,5 @@ import psycopg2 -from base_connector import BaseConnector +from .base_connector import BaseConnector from cornet.connectors import Table, Column @@ -22,7 +22,7 @@ def get_tables(self): from information_schema.tables where table_schema NOT IN ('pg_catalog', 'information_schema'); """ res = self.query(sql) - return map(Table._make, res) + return list(map(Table._make, res)) def get_columns(self, table): sql = """ @@ -31,4 +31,4 @@ def get_columns(self, table): where table_catalog = '{0}' and table_name = '{1}'; """ res = self.query(sql.format(self.source['db'], table.name)) - return map(Column._make, res) + return list(map(Column._make, res)) diff --git a/cornet/main.py b/cornet/main.py index 0477bb7..20d8e81 100644 --- a/cornet/main.py +++ b/cornet/main.py @@ -1,8 +1,8 @@ import click -from connectors import get_connector -from task_config import TaskConfig -from sqoop_cmd import SqoopCmd -from utils import match_any +from .connectors import get_connector +from .task_config import TaskConfig +from .sqoop_cmd import SqoopCmd +from .utils import match_any def print_sqoop_cmds(task): @@ -11,7 +11,7 @@ def print_sqoop_cmds(task): for table in sorted(to_import, key=lambda tbl: tbl.name): columns = conn.get_columns(table) cmd = SqoopCmd(task, table, columns) - print cmd.as_string() + print(cmd.as_string()) def get_tables_to_import(conn, task): @@ -26,10 +26,10 @@ def print_schema(task): with get_connector(task.source) as conn: to_import = get_tables_to_import(conn, task) for table in to_import: - print '\n=== {0}.{1} ==='.format(task.source['db'], table[0]) + print('\n=== {0}.{1} ==='.format(task.source['db'], table[0])) columns = conn.get_columns(table) for c in columns: - print '{0}: {1}'.format(c.name, c.type) + print('{0}: {1}'.format(c.name, c.type)) @click.command() diff --git a/cornet/sqoop_cmd.py b/cornet/sqoop_cmd.py index c2800cf..3835317 100644 --- a/cornet/sqoop_cmd.py +++ b/cornet/sqoop_cmd.py @@ -1,4 +1,4 @@ -from utils import merge_dict +from .utils import merge_dict class SqoopCmd: @@ -56,7 +56,7 @@ def _arg2str(k, v): def as_string(self): args = { SqoopCmd._arg2str(k, v) - for k, v in self.args().iteritems() + for k, v in self.args().items() if v} name = 'sqoop import' return ' \\\n '.join([name] + sorted(args)) + '\n' diff --git a/cornet/task_config.py b/cornet/task_config.py index b900a55..c27ea1b 100644 --- a/cornet/task_config.py +++ b/cornet/task_config.py @@ -1,6 +1,6 @@ import yaml from jinja2 import Environment, FileSystemLoader -from utils import merge_dict, dict_without_key +from .utils import merge_dict, dict_without_key import os.path diff --git a/cornet/utils.py b/cornet/utils.py index 2aec2fe..a3d9702 100644 --- a/cornet/utils.py +++ b/cornet/utils.py @@ -12,10 +12,10 @@ def merge_dict(a, b): assert isinstance(b, dict), "Second arg not a dict, but {0} ".format(b) merged = {} - for key in set(a.keys() + b.keys()): - if key not in b.keys(): + for key in set(list(a.keys()) + list(b.keys())): + if key not in list(b.keys()): merged[key] = a[key] - elif key not in a.keys(): + elif key not in list(a.keys()): merged[key] = b[key] elif isinstance(a[key], dict) and isinstance(b[key], dict): merged[key] = merge_dict(a[key], b[key]) diff --git a/doc/Makefile b/doc/Makefile new file mode 100644 index 0000000..906fb22 --- /dev/null +++ b/doc/Makefile @@ -0,0 +1,4 @@ +all: diagram.pdf + +diagram.pdf: diagram.gv + dot diagram.gv -Tpdf -odiagram.pdf diff --git a/doc/diagram.gv b/doc/diagram.gv new file mode 100644 index 0000000..1ada956 --- /dev/null +++ b/doc/diagram.gv @@ -0,0 +1,19 @@ +digraph cornet { + pg [label="PostgreSQL"]; + my [label="MySQL"]; + co [label="Cornet"]; + sq [label="Sqoop"]; + h [label="Hadoop"]; + ms [label="SQL Server"; color=gray;]; + + // to be added + ms->co [style=dotted; color=forestgreen;]; + + // schema flows + pg->co [color=forestgreen;]; + my->co [color=forestgreen;]; + co -> sq [label="Commands"; fontcolor=slategray; color=slategray;]; + ms -> sq; + my -> sq; + pg -> sq -> h; +} diff --git a/doc/diagram.pdf b/doc/diagram.pdf new file mode 100644 index 0000000..ed3a329 Binary files /dev/null and b/doc/diagram.pdf differ diff --git a/doc/mssql.yaml b/doc/mssql.yaml new file mode 100644 index 0000000..06f76df --- /dev/null +++ b/doc/mssql.yaml @@ -0,0 +1,28 @@ +global: + source: + host: 192.168.122.107 + hive: + db: test + map_types: + java: + UUID: String + hive: + VARBINARY: String + LONGBLOB: String + UUID: String + sqoop_args: + m: 2 + direct: true + data-warehouse: /user/sqoop + skip_tables: + - schema_version + +tasks: + - source: + db: test + user: WORKGROUP\Administrator + password_file: /home/fms/.password + port: 1143 + driver: mssql + hive: + table_prefix: mssql_ diff --git a/doc/postgres.sql b/doc/postgres.sql new file mode 100644 index 0000000..a413593 --- /dev/null +++ b/doc/postgres.sql @@ -0,0 +1,12 @@ +BEGIN; +CREATE TABLE people ( + id SERIAL PRIMARY KEY NOT NULL, + name TEXT +); +CREATE TABLE addresses ( + id SERIAL PRIMARY KEY NOT NULL, + person INTEGER NOT NULL REFERENCES people(id), + address TEXT NOT NULL +); +-- CREATE VIEW people_addresses AS +COMMIT; diff --git a/doc/postgres.yaml b/doc/postgres.yaml new file mode 100644 index 0000000..d9359ed --- /dev/null +++ b/doc/postgres.yaml @@ -0,0 +1,28 @@ +global: + source: + host: localhost + hive: + db: fms + map_types: + java: + UUID: String + hive: + VARBINARY: String + LONGBLOB: String + UUID: String + sqoop_args: + m: 2 + direct: true + data-warehouse: /user/sqoop + skip_tables: + - schema_version + +tasks: + - source: + db: fms + port: 5432 + user: fms + password_file: /home/fms/.password + driver: postgresql + hive: + table_prefix: b_