Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
language: python
install:
- pip install tox==1.8.1
- pip install tox
- pip install coveralls codecov
script:
- tox
Expand All @@ -9,3 +9,5 @@ after_success:
codecov
env:
- TOXENV=py27
- TOXENV=py35
- TOXENV=py36-dev
7 changes: 5 additions & 2 deletions cornet/connectors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
def get_connector(source):
driver = source['driver']
if driver == 'mysql':
from mysql_connector import MySqlConnector
from .mysql_connector import MySqlConnector
return MySqlConnector(source)
elif driver == 'postgresql':
from postgresql_connector import PostgreSqlConnector
from .postgresql_connector import PostgreSqlConnector
return PostgreSqlConnector(source)
elif driver == 'mssql':
from .mssql_connector import MSSqlConnector
return MSSqlConnector(source)
else:
raise LookupError("Driver {0} not supported".format(driver))
37 changes: 37 additions & 0 deletions cornet/connectors/mssql_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import pymssql # yes I know, would be nice to use JDBC connector instead
import sys
from .base_connector import BaseConnector
from cornet.connectors import Table, Column


class MSSqlConnector(BaseConnector):

jdbc_url_prefix = 'jdbc:sqlserver'

if sys.version_info[0] < 3:
raise("Sorry, MSSQL isn't yet happy on Python 2.")

def _get_db_conn(self):
source = self.source
return pymssql.connect(
host=source['host'],
user=source['user'],
password=self._get_password(),
database=source['db'])

def get_tables(self):
sql = """
select table_name, table_type
from information_schema.tables
where table_name NOT IN ('sysdiagrams'); """
res = self.query(sql)
return list(map(Table._make, res))

def get_columns(self, table):
sql = """
select column_name, upper(data_type)
from information_schema.columns
where table_catalog = '{0}'
and table_name = '{1}'; """
res = self.query(sql.format(self.source['db'], table.name))
return list(map(Column._make, res))
6 changes: 3 additions & 3 deletions cornet/connectors/mysql_connector.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import MySQLdb
from base_connector import BaseConnector
from .base_connector import BaseConnector
from cornet.connectors import Table, Column


Expand All @@ -18,7 +18,7 @@ def _get_db_conn(self):

def get_tables(self):
res = self.query("show full tables")
return map(Table._make, res)
return list(map(Table._make, res))

def get_columns(self, table):
sql = """
Expand All @@ -27,4 +27,4 @@ def get_columns(self, table):
where table_schema = '{0}'
and table_name = '{1}' """
res = self.query(sql.format(self.source['db'], table.name))
return map(Column._make, res)
return list(map(Column._make, res))
6 changes: 3 additions & 3 deletions cornet/connectors/postgresql_connector.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import psycopg2
from base_connector import BaseConnector
from .base_connector import BaseConnector
from cornet.connectors import Table, Column


Expand All @@ -22,7 +22,7 @@ def get_tables(self):
from information_schema.tables
where table_schema NOT IN ('pg_catalog', 'information_schema'); """
res = self.query(sql)
return map(Table._make, res)
return list(map(Table._make, res))

def get_columns(self, table):
sql = """
Expand All @@ -31,4 +31,4 @@ def get_columns(self, table):
where table_catalog = '{0}'
and table_name = '{1}'; """
res = self.query(sql.format(self.source['db'], table.name))
return map(Column._make, res)
return list(map(Column._make, res))
14 changes: 7 additions & 7 deletions cornet/main.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import click
from connectors import get_connector
from task_config import TaskConfig
from sqoop_cmd import SqoopCmd
from utils import match_any
from .connectors import get_connector
from .task_config import TaskConfig
from .sqoop_cmd import SqoopCmd
from .utils import match_any


def print_sqoop_cmds(task):
Expand All @@ -11,7 +11,7 @@ def print_sqoop_cmds(task):
for table in sorted(to_import, key=lambda tbl: tbl.name):
columns = conn.get_columns(table)
cmd = SqoopCmd(task, table, columns)
print cmd.as_string()
print(cmd.as_string())


def get_tables_to_import(conn, task):
Expand All @@ -26,10 +26,10 @@ def print_schema(task):
with get_connector(task.source) as conn:
to_import = get_tables_to_import(conn, task)
for table in to_import:
print '\n=== {0}.{1} ==='.format(task.source['db'], table[0])
print('\n=== {0}.{1} ==='.format(task.source['db'], table[0]))
columns = conn.get_columns(table)
for c in columns:
print '{0}: {1}'.format(c.name, c.type)
print('{0}: {1}'.format(c.name, c.type))


@click.command()
Expand Down
4 changes: 2 additions & 2 deletions cornet/sqoop_cmd.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from utils import merge_dict
from .utils import merge_dict


class SqoopCmd:
Expand Down Expand Up @@ -56,7 +56,7 @@ def _arg2str(k, v):
def as_string(self):
args = {
SqoopCmd._arg2str(k, v)
for k, v in self.args().iteritems()
for k, v in self.args().items()
if v}
name = 'sqoop import'
return ' \\\n '.join([name] + sorted(args)) + '\n'
2 changes: 1 addition & 1 deletion cornet/task_config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import yaml
from jinja2 import Environment, FileSystemLoader
from utils import merge_dict, dict_without_key
from .utils import merge_dict, dict_without_key
import os.path


Expand Down
6 changes: 3 additions & 3 deletions cornet/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ def merge_dict(a, b):
assert isinstance(b, dict), "Second arg not a dict, but {0} ".format(b)

merged = {}
for key in set(a.keys() + b.keys()):
if key not in b.keys():
for key in set(list(a.keys()) + list(b.keys())):
if key not in list(b.keys()):
merged[key] = a[key]
elif key not in a.keys():
elif key not in list(a.keys()):
merged[key] = b[key]
elif isinstance(a[key], dict) and isinstance(b[key], dict):
merged[key] = merge_dict(a[key], b[key])
Expand Down
4 changes: 4 additions & 0 deletions doc/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
all: diagram.pdf

diagram.pdf: diagram.gv
dot diagram.gv -Tpdf -odiagram.pdf
19 changes: 19 additions & 0 deletions doc/diagram.gv
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
digraph cornet {
pg [label="PostgreSQL"];
my [label="MySQL"];
co [label="Cornet"];
sq [label="Sqoop"];
h [label="Hadoop"];
ms [label="SQL Server"; color=gray;];

// to be added
ms->co [style=dotted; color=forestgreen;];

// schema flows
pg->co [color=forestgreen;];
my->co [color=forestgreen;];
co -> sq [label="Commands"; fontcolor=slategray; color=slategray;];
ms -> sq;
my -> sq;
pg -> sq -> h;
}
Binary file added doc/diagram.pdf
Binary file not shown.
28 changes: 28 additions & 0 deletions doc/mssql.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
global:
source:
host: 192.168.122.107
hive:
db: test
map_types:
java:
UUID: String
hive:
VARBINARY: String
LONGBLOB: String
UUID: String
sqoop_args:
m: 2
direct: true
data-warehouse: /user/sqoop
skip_tables:
- schema_version

tasks:
- source:
db: test
user: WORKGROUP\Administrator
password_file: /home/fms/.password
port: 1143
driver: mssql
hive:
table_prefix: mssql_
12 changes: 12 additions & 0 deletions doc/postgres.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
BEGIN;
CREATE TABLE people (
id SERIAL PRIMARY KEY NOT NULL,
name TEXT
);
CREATE TABLE addresses (
id SERIAL PRIMARY KEY NOT NULL,
person INTEGER NOT NULL REFERENCES people(id),
address TEXT NOT NULL
);
-- CREATE VIEW people_addresses AS
COMMIT;
28 changes: 28 additions & 0 deletions doc/postgres.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
global:
source:
host: localhost
hive:
db: fms
map_types:
java:
UUID: String
hive:
VARBINARY: String
LONGBLOB: String
UUID: String
sqoop_args:
m: 2
direct: true
data-warehouse: /user/sqoop
skip_tables:
- schema_version

tasks:
- source:
db: fms
port: 5432
user: fms
password_file: /home/fms/.password
driver: postgresql
hive:
table_prefix: b_