forked from wwindcloud/pg-subscription-stream
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpg-subscription-stream.js
More file actions
110 lines (97 loc) · 3.25 KB
/
pg-subscription-stream.js
File metadata and controls
110 lines (97 loc) · 3.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
const {Transform} = require('stream')
const {both} = require('pg-copy-streams')
const now = () => BigInt(Date.now()) - 946684800000n
const invalid_lsn = 0n
class PgSubscriptionStream extends Transform {
constructor(options) {
super()
this.options = options || {}
this.output_written_lsn = invalid_lsn
this.flush_written_lsn = invalid_lsn
this.last_feedback_time = now()
const {slotName, feedbackInterval = 20000, startPos = 0n, pluginOptions = {
proto_version: 1,
publication_names: slotName
}} = this.options
const lsn = typeof(startPos) === 'bigint' ? startPos.toString(16).padStart(9, '0').replace(/.{8}$/, '\/$&') : startPos
const query = `START_REPLICATION SLOT ${slotName} LOGICAL ${lsn} (${Object.entries(pluginOptions).map(([k, v]) => `"${k}" '${v}'`).join(',')})`
this.copyBoth = new both(query, {
alignOnCopyDataFrame: true
})
this.copyBoth.pipe(this)
this.copyBoth.on('error', err => this.emit('error', err))
this.interval = setInterval(() => {
this.sendFeedback()
}, feedbackInterval)
this.on('end', () => {
clearInterval(this.interval)
this.copyBoth.end()
})
}
sendFeedback(force, cb) {
if (this.flush_written_lsn === invalid_lsn) {
cb && cb();
return;
}
const current_time = now()
const {feedbackInterval = 20000} = this.options
if (force || current_time - this.last_feedback_time > feedbackInterval) {
this.last_feedback_time = current_time
const response = new DataView(new ArrayBuffer(1 + 8 + 8 + 8 + 8 + 1))
response.setUint8(0, 'r'.charCodeAt(0))
response.setBigUint64(1, this.output_written_lsn)
response.setBigUint64(1 + 8, this.flush_written_lsn)
response.setBigUint64(1 + 8 + 8, invalid_lsn)
response.setBigUint64(1 + 8 + 8 + 8, current_time)
response.setUint8(1 + 8 + 8 + 8 + 8, 0)
if (cb) {
this.copyBoth.write(Buffer.from(response.buffer), cb)
} else {
this.copyBoth.write(Buffer.from(response.buffer))
}
} else {
cb && cb();
}
}
_transform(chunk, encoding, callback) {
const {autoConfirmLSN = true} = this.options
const [header] = chunk
if (header === 0x77) {
const lsn = chunk.readBigUInt64BE(1)
this.push(chunk)
this.output_written_lsn = this.output_written_lsn > lsn ? this.output_written_lsn : lsn
this.flush_written_lsn = autoConfirmLSN ? this.output_written_lsn : this.flush_written_lsn
} else if (header === 0x6b) {
const lsn = chunk.readBigUInt64BE(1)
const shouldRespond = chunk.readInt8(1 + 8 + 8)
this.output_written_lsn = this.output_written_lsn > lsn ? this.output_written_lsn : lsn
if (autoConfirmLSN || this.flush_written_lsn === invalid_lsn) {
this.flush_written_lsn = this.output_written_lsn;
}
this.sendFeedback(shouldRespond > 0)
} else {
callback(new Error(`Unknown Message: ${chunk}`))
return
}
process.nextTick(callback)
}
submit(connection) {
this.copyBoth.submit(connection)
}
confirmLSN(lsn) {
this.flush_written_lsn = lsn > this.flush_written_lsn ? lsn : this.flush_written_lsn
}
handleError(e) {
this.copyBoth.handleError(e)
}
handleCopyData(chunk) {
this.copyBoth.handleCopyData(chunk)
}
handleCommandComplete() {
this.copyBoth.handleCommandComplete()
}
handleReadyForQuery() {
this.copyBoth.handleReadyForQuery()
}
}
module.exports = PgSubscriptionStream