-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathdump2csv.py
More file actions
374 lines (323 loc) · 12.1 KB
/
dump2csv.py
File metadata and controls
374 lines (323 loc) · 12.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
#!/usr/bin/env python
# encoding: utf-8
'''
Usage:
dump2csv.py -s SID -u REDIS_URL -d DIR [-m COUNT] [-l DIR] [-v] [<table>...] [-g GSTORAGE]
dump2csv.py -c CONFIG_FILE [-v] [<table>...]
dump2csv.py (-h | --help | --version)
Arguments:
<table> Specify tables to dump like: "testdb.testtable"
It will dump all tables if specify nothing
Options:
-h --help Show this help message and exit
--version Show version and exit
-c --config_file=CONFIG_FILE Specify config file
-v --verbose Print the running status message
-s --server_id=SID Specify mysql server id
-u --cache_url=REDIS_URL Specify the redis cache url like:
"redis://host:port/db"
-d --dump_dir=DIR Specify the dir of dump result
-l --log_dir=DIR Specify the dir of logging
-m --max_rows=COUNT Specify max rows of one csv file [default: 1000000]
-g --gs_url=GSTORSGE Specify the gs url for storaging dumping files
'''
import csv
import os
import time
from functools import partial
from docopt import docopt
from collections import defaultdict
import json
from Queue import Queue, Empty
import threading
import commands
import rcache
import mwlogger
from datetime import datetime
__version__ = "Version0.1"
rqueue = Queue() # queue for uploading to google storage
bqueue = Queue() # queue for loading to bigquery
glogger = None
def group_by_field(rows):
'''
return {(field1, field2...):[row1, row2...] ....}
'''
g_rows = defaultdict(list)
for row in rows:
fields = row.keys()
fields.sort()
g_rows[tuple(fields)].append(row)
return g_rows
def save2csv(dump_dir, table, trows, gs_url):
"""
save table's rows into csv_files. csv_file like
'db.table.timestamp.csv'
:param dump_dir:
:param table:
:param trows:
:param gs_url:
:return: None
"""
try:
if len(trows) == 0:
glogger.info("table[{}] has no rows to dump".format(table))
return
g_rows = group_by_field(trows)
table_alter = False
if len(g_rows) > 1:
glogger.warn("table[{}] maybe altered.".format(table))
table_alter = True
for fieldnames, rows in g_rows.items():
save_dir = os.path.join(dump_dir, datetime.strftime(datetime.today(), "%Y%m%d"))
if not os.path.exists(save_dir):
os.makedirs(save_dir)
suffix = "tmp" if table_alter else "csv"
csv_file = os.path.join(save_dir,
"{}.{:.6f}.{}".format(table, time.time(), suffix))
glogger.info("dump to {}, rows:{}".format(csv_file, len(rows)))
exists = os.path.exists(csv_file)
with open(csv_file, 'ab+') as fp:
dict_writer = csv.DictWriter(fp, fieldnames=fieldnames)
if not exists:
dict_writer.writeheader()
dict_writer.writerows(rows)
glogger.info("{} dump Done.".format(csv_file))
if gs_url:
glogger.info("dispatch {} to rqueue".format(csv_file))
rqueue.put(csv_file)
time.sleep(2)
glogger.info("table:{}, rows:{} dump OK!".format(table, len(trows)))
except:
glogger.error("{} dump Error".format(table), exc_info=True)
raise
def create_logger(log_dir, verbose):
log_level = "INFO"
if verbose:
log_file = None
log_level = "DEBUG"
elif log_dir:
log_file = os.path.join(log_dir, "dump.log")
else:
log_file = "dump.log"
return mwlogger.MwLogger("dump", log_file, log_level=log_level)
def _upload_by_date(csv_file, gs_url):
'''
-m: multi-thread
-n: skip files exist in gstorage
-L: record the file uploaded info
'''
csv_pdir = os.path.dirname(csv_file)
date = os.path.basename(csv_pdir)
cmd = "gsutil -m cp -n -L {log} -r {src} {dst}".format(
log=os.path.join(csv_pdir, "upload.info"),
src=os.path.join(csv_pdir, "*.csv"),
dst=os.path.join(gs_url, date)
)
for tries in range(3):
ret, out = commands.getstatusoutput(cmd)
if ret == 0 or tries == 2:
break
else:
time.sleep(2)
return ret >> 8, out
def async_upload2gstorage_ex(gs_url):
while 1:
csv_file = rqueue.get()
if csv_file is None:
glogger.info("all csv_files upload ok, thread exit!")
break
else:
ret, output = _upload_by_date(csv_file, gs_url)
if ret != 0:
glogger.error("upload failed, return code:{}, out:{}".format(ret, output))
else:
glogger.info("upload ok!")
def group_lst(csvs):
to_ups = []
pre_date = cur_date = None
for csv_f in csvs:
csv_pdir = os.path.dirname(csv_f)
cur_date = os.path.basename(csv_pdir)
if pre_date is None or pre_date == cur_date:
to_ups.append(csv_f)
if len(to_ups) >= 8:
to_ups = yield to_ups
yield # yield to send
else:
to_ups = yield to_ups
yield # yield to send
to_ups.append(csv)
pre_date = cur_date
to_ups = yield to_ups
yield # yield to send
def upload_csvs(gs_url, csvs):
loop_times = 0
gen = group_lst(csvs)
for gcsvs in gen:
if len(gcsvs) == 0:
break
glogger.info("start uploading {} to gstorage".format(str(gcsvs)))
csv_pdir = os.path.dirname(gcsvs[0])
date = os.path.basename(csv_pdir)
cmd = "gsutil -m cp -n -L {log} {src} {dst}/".format(
log=os.path.join(csv_pdir, "upload.info"),
src=' '.join(gcsvs),
dst=os.path.join(gs_url, date)
)
ret, out = _run_cmd_retry(cmd, 3)
if ret == 0:
glogger.info("upload successfully, files count:{}".format(len(gcsvs)))
gen.send([])
else:
# should check and upload failed files to google cloud storage manually
glogger.error("{} run error. ret:{}, out:{}".format(
cmd, ret, out
))
# parse success from log_file, upload.info's schema:
# Source,Destination,Start,End,Md5,UploadId,Source Size,Bytes Transferred,Result,Description
loop_times += 1
if loop_times < 3: # avoid endless loop
log = os.path.join(csv_pdir, "upload.info")
with open(log) as fp:
_ups = list(csv.DictReader(fp))
sources = [up['Source'].strip("file://") for up in _ups]
# retry in next loop
gen.send(list(set(gcsvs) - set(sources)))
else:
gen.send([])
glogger.info("start load gstorage csv files to bigquery......")
bqueue.put(csv_pdir)
#load2bq(csv_pdir)
def load2bq(upload_dir):
upload_log = os.path.join(upload_dir, "upload.info")
bq_log = os.path.join(upload_dir, "bqload.info")
to_loads = loadeds = []
if os.path.exists(bq_log):
with open(bq_log, 'r') as fp:
loadeds = fp.readlines()
loadeds = [load.strip() for load in loadeds]
with open (upload_log, 'r') as fp:
_ups = list(csv.DictReader(fp))
gs_urls = [up['Destination'] for up in _ups]
to_loads = list(set(gs_urls) - set(loadeds))
# load all uploaded files to bigquery
with open(bq_log, 'a') as fp:
for gs_url in to_loads:
glogger.debug(gs_url)
[_, system, sid, _, csv_file] = gs_url.strip("gs://").split('/')
db = csv_file.split('.')[0]
tb = csv_file.split('.')[1]
schema = os.path.join("bq_schema",
system,
sid,
db,
tb)
#bqDataset = "{}:{}:{}".format(system, sid, db)
bqDataset = db # Not support the same database name from different systems
ret, out = _run_cmd_retry("bq mk {}".format(bqDataset), 3)
glogger.debug("cmd:{}, ret={}, out={}".format("bq mk {}".format(bqDataset), ret, out))
if not (ret == 0 or ret == 1 and "already exists" in out):
glogger.error("Dataset[{}] may not exists and create it failed".format(bqDataset))
if not os.path.exists(schema):
glogger.warn("Not found schema: {}. Ignore it".format(schema))
cmd = "bq load --skip_leading_rows=1 --allow_quoted_newlines" \
" {}.{} {}".format(bqDataset, tb, gs_url)
else:
cmd = "bq load --schema={} --skip_leading_rows=1 --allow_quoted_newlines" \
" {}.{} {}".format(schema, bqDataset, tb, gs_url)
glogger.debug("load to bigqeury command: {}".format(cmd))
ret, out = _run_cmd_retry(cmd, 3)
if ret == 0:
glogger.info("load {} to bigquery successfully".format(gs_url))
fp.write(gs_url + '\n')
else:
# should check and load failed files to bigquery manually
glogger.error("load {} to bigquery failed. msg is {} "
"Please check command ['{}'] manually".format(gs_url, out, cmd))
def _run_cmd_retry(cmd, tries=1):
for t in range(tries):
ret, out = commands.getstatusoutput(cmd)
if ret == 0 or t == tries - 1:
break
else:
time.sleep(1)
return ret >> 8, out
def async_upload2gstorage(gs_url):
csvs = []
while 1:
while not rqueue.empty():
csvs.append(rqueue.get_nowait())
if not csvs:
time.sleep(0.1)
continue
if csvs[-1] is None:
del csvs[-1]
break
upload_csvs(gs_url, csvs)
del csvs[:]
if len(csvs) > 0:
upload_csvs(gs_url, csvs)
bqueue.put(None)
def async_load2bigquery():
while 1:
csv_dir = bqueue.get()
if csv_dir:
load2bq(csv_dir)
else:
break
def main():
'''
{'--cache_url': 'redis://127.0.0.1/1',
'--config_file': False,
'--dump_out_put': '/tmp/dumps',
'--help': False,
'--log_output': None,
'--max_rows': '1000000',
'--server_id': '1',
'--version': False,
'-v': False,
'CONFIG_FILE': None}
'''
options = docopt(__doc__, version=__version__)
config_file = options['--config_file']
verbose = options['--verbose']
if config_file:
cfg = json.load(file(config_file))
cache_url = cfg['cache_url']
server_id = cfg['server_id']
max_rows = cfg['max_rows']
log_dir = cfg.get('log_dir', None)
dump_dir = cfg['dump_dir']
gs_url = cfg.get('gs_url', None)
else:
cache_url = options['--cache_url']
server_id = options['--server_id']
max_rows = options['--max_rows']
log_dir = options['--log_dir']
dump_dir = options['--dump_dir']
gs_url = options['--gs_url']
dump_tables = options['<table>']
cache = rcache.Rcache(cache_url, server_id)
global glogger
glogger = create_logger(log_dir, verbose)
if gs_url:
gs_url = os.path.join(gs_url, str(server_id))
upload_thr = threading.Thread(target=async_upload2gstorage, args=(gs_url,))
upload_thr.setDaemon(True)
upload_thr.start()
glogger.info("upload csv files to {} thread running...".format(gs_url))
load_thr = threading.Thread(target=async_load2bigquery)
load_thr.setDaemon(True)
load_thr.start()
glogger.info("load to bigquery threading running....")
glogger.info("start dump from cache to csv files")
callback = partial(save2csv, dump_dir, gs_url=gs_url)
cache.dump_t(callback, max_rows, dump_tables)
glogger.info("dump complete!")
if gs_url:
glogger.info("wait uploading to gstorage and loading to bigquery threads completed......")
rqueue.put(None)
upload_thr.join()
load_thr.join()
if __name__ == "__main__":
main()