-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathremote2pg.js
More file actions
59 lines (48 loc) · 2.03 KB
/
remote2pg.js
File metadata and controls
59 lines (48 loc) · 2.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
var pg = require('pg');
var copyFrom = require('pg-copy-streams').from;
var through = require('through2');
var csv2 = require('csv2');
var get = require('./lib/get').remoteData; // get module exposes get.remoteData and get.localData
var dropColumns = require('./lib/helpers').dropColumns
var settings = {
baseurl: 'http://data.gdeltproject.org/events/',
ext: '.export.CSV.zip',
startDate: "2013-4-1", // format: "YYYY-MM-DD", min: "2013-04-01" -- inclusive
endDate: "2013-4-8", // format: "YYYY-MM-DD", max: today -- inclusive
// dataDir: 'data/',
user: 'jamesconkling',
db: 'gdelt',
};
/* get function signature
date: date from which to start downloading files
fileStreamHandler: callback invoked after each file has been downloaded and unzipped
given the function signature (stream, date, next)
doneHandler: callback invoked after file for date settings.endDate has been fully downloaded and handled via it's callback handler
*/
var pgClient = new pg.Client('postgres://' + settings.user + '@localhost/' + settings.db);
pgClient.connect((err) => {
if(err){ return console.error('Error opening connection to postgres', err); }
get(settings.startDate, fileStreamHandler, doneHandler, settings);
function fileStreamHandler(fileStream, date, next){
console.log('Finished downloading file', date);
var pgStream = pgClient.query(copyFrom("COPY events FROM STDIN WITH CSV DELIMITER E'\t'"));
fileStream
.pipe(csv2({ separator: '\t' }))
.pipe(through.obj(function(line, enc, nextLine){
// remove redundant columns to match schema in db/eventsTable.sql
dropColumns(line, ['MonthYear', 'Year', 'FractionDate']);
this.push(line.join('\t') + '\n');
nextLine();
}))
.pipe(pgStream)
.on('error', (err) => console.log('Error uploading file', date, 'error', err) )
.on('finish', () => {
console.log('Finish uploading file', date);
next();
});
}
function doneHandler(){
console.log('done uploading all files');
pgClient.end();
}
});