Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@
import warnings
from collections import defaultdict
from importlib import import_module
from urllib.parse import urlencode, urlparse

import requests
from clickhouse_sqlalchemy import engines
from sqlalchemy import Boolean, MetaData, Table, create_engine
from sqlalchemy.schema import AddConstraint, DropConstraint, ForeignKeyConstraint

CLICKHOUSE_HTTP_PORT = 8123


def warninger(message, category, filename, lineno, line=None):
return f"{filename}:{lineno}: {category.__name__}: {message}\n"
Expand Down Expand Up @@ -172,6 +177,18 @@ def warninger(message, category, filename, lineno, line=None):
engine.execute(f"CREATE SCHEMA IF NOT EXISTS {table.schema}")
elif engine.name == "sqlite":
table.name = f"{args.schema_prefix}{module_name}_{table.name}"
elif engine.name == "clickhouse":
first_col = table.columns[0].name
tbl_engine = engines.MergeTree(order_by=first_col)
table = Table(
table.name,
table.metadata,
*table.columns,
tbl_engine,
extend_existing=True,
)
table.schema = f"{args.schema_prefix}{module_name}"
engine.execute(f"CREATE DATABASE IF NOT EXISTS {table.schema};")

if args.drop_first:
table.drop(engine, checkfirst=True)
Expand Down Expand Up @@ -223,6 +240,29 @@ def warninger(message, category, filename, lineno, line=None):
if len(buffer) > 0:
conn.executemany(query, buffer)
conn.commit()
elif engine.name == "clickhouse":
full_table_name = f"{table.schema}.{table.name}"
engine.execute(f"TRUNCATE TABLE {full_table_name}")

# nasleduje pomerne humpolacke bulk loadovani do db
hostname = urlparse(str(engine.url)).netloc.rpartition("@")[-1]
querystr = urlencode(
{
"query": f"INSERT INTO {full_table_name} FORMAT CSVWithNames",
"date_time_input_format": "best_effort",
# TODO: potrebujem, aby to nullable fieldy ukladalo jako NULL,
# ne s defaultama. Asi to bude `input_format_null_as_default`,
# ale z nejakyho duvodu nemame zadny sloupce nullable (bug
# v clickhouse-sqlalchemy?)
# TODO: taky mozna vypnem kontrolu headeru, protoze to
# bude minimalne u zakazek padat
}
)
url = f"http://{hostname}:{CLICKHOUSE_HTTP_PORT}/?{querystr}"
for filename in files:
with open(filename, "rb") as f:
r = requests.post(url, data=f)
assert r.ok, r.text
else:
raise IOError(f"{engine.name} not supported yet")

Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ tqdm
xlrd
cssselect
sqlalchemy
clickhouse-sqlalchemy