-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlocal2pg.js
More file actions
66 lines (56 loc) · 2.32 KB
/
local2pg.js
File metadata and controls
66 lines (56 loc) · 2.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
var fs = require('fs');
var pg = require('pg');
var copyFrom = require('pg-copy-streams').from;
var through = require('through2');
var csv2 = require('csv2');
var get = require('./lib/get').localData; // get module exposes get.remoteData and get.localData
var helpers = require('./lib/helpers'),
col2Idx = helpers.gdeltCol2Idx,
dropColumns = helpers.dropColumns,
insertColumn = helpers.insertColumn;
var settings = {
baseurl: 'http://data.gdeltproject.org/events/',
ext: '.export.CSV.zip',
startDate: "2013-4-1", // format: "YYYY-MM-DD", min: "2013-04-01" -- inclusive
endDate: "2013-4-2", // format: "YYYY-MM-DD", max: today -- inclusive
// dataDir: 'data/',
user: 'jamesconkling',
db: 'gdelt',
};
var pgClient = new pg.Client('postgres://' + settings.user + '@localhost/' + settings.db);
/* get function signature
date: date from which to start downloading files
fileStreamHandler: callback invoked after each file has been downloaded and unzipped
given the function signature (stream, date, next)
doneHandler: callback invoked after file for date settings.endDate has been fully downloaded and handled via it's callback handler
*/
function fileStreamHandler(fileStream, date, next){
var pgStream = pgClient.query(copyFrom("COPY events FROM STDIN WITH CSV DELIMITER E'\t'"));
fileStream
.pipe(csv2({ separator: '\t' }))
.pipe(through.obj(function(line, enc, nextLine){
// insert a new geographic column
var lat = line[col2Idx('ActionGeo_Lat')],
lon = line[col2Idx('ActionGeo_Long')],
geo = 'POINT(' + lon + ' ' + lat + ')';
insertColumn(line, 'ActionGeo_Long', geo);
// remove redundant columns to match schema in db/eventsTable.sql
dropColumns(line, ['MonthYear', 'Year', 'FractionDate', 'ActionGeo_Lat', 'ActionGeo_Long']);
this.push(line.join('\t') + '\n');
nextLine();
}))
.pipe(pgStream)
.on('error', (err) => console.log('Error uploading file', date, 'error', err) )
.on('finish', () => {
console.log('Finish uploading file', date);
next();
});
}
function doneHandler(){
console.log('done filtering data\n');
pgClient.end();
}
pgClient.connect((err) => {
if(err){ return console.error('Error opening connection to postgres', err); }
get(settings.startDate, fileStreamHandler, doneHandler, settings);
});