forked from cloudera-labs/envelope
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfix.conf
More file actions
127 lines (121 loc) · 3.32 KB
/
fix.conf
File metadata and controls
127 lines (121 loc) · 3.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
application {
name = FIX Envelope example
batch.milliseconds = 5000
executors = 1
executor.cores = 4
executor.memory = 4G
}
steps {
fix {
input {
type = kafka
brokers = "REPLACEME:9092"
topic = fix
encoding = string
translator {
type = kvp
delimiter.kvp = "\u0001"
delimiter.field = "="
field.names = [6,10,11,14,17,20,21,35,37,38,39,40,54,55,60,150,151]
field.types = [double,string,string,int,string,int,int,string,string,int,int,int,int,string,long,int,int]
}
}
}
messagetypes {
input {
type = kudu
connection = "REPLACEME:7051"
table.name = "impala::default.fix_messagetypes"
hint.small = true
}
}
newordersingle {
dependencies = [fix, messagetypes]
deriver {
type = sql
query.literal = """
SELECT `11` AS clordid, `35` AS msgtype, msgtypedesc, `21` AS handlinst, `55` AS symbol,
`54` AS side, `60` AS transacttime, `38` AS orderqty, `40` AS ordtype, `10` AS checksum
FROM fix f LEFT OUTER JOIN messagetypes mt ON f.`35` = mt.msgtype WHERE msgtype = 'D'"""
}
planner {
type = upsert
}
output {
type = kudu
connection = "REPLACEME:7051"
table.name = "impala::default.fix_newordersingle"
}
}
execrpt {
dependencies = [fix, messagetypes]
deriver {
type = sql
query.literal = """
SELECT `17` AS execid, `35` AS msgtype, msgtypedesc, `37` AS orderid, `11` AS clordid,
`20` AS exectranstype, `150` AS exectype, `39` AS ordstatus, `55` AS symbol, `54` AS side,
`151` AS leavesqty, `14` AS cumqty, `6` AS avgpx, `60` AS transacttime, `10` AS checksum
FROM fix f LEFT OUTER JOIN messagetypes mt ON f.`35` = mt.msgtype WHERE msgtype = '8'"""
}
planner {
type = upsert
field.last.updated = lastupdated
}
output {
type = kudu
connection = "REPLACEME:7051"
table.name = "impala::default.fix_execrpt"
}
}
orderhistory {
dependencies = [fix]
deriver {
type = sql
query.literal = """
SELECT `11` AS clordid, `55` AS symbol, `38` AS orderqty, NVL(`151`, `38`) AS leavesqty,
NVL(`14`, 0) AS cumqty, `6` AS avgpx, `60` AS transacttime FROM fix"""
}
partitioner {
type = uuid
}
planner {
type = history
carry.forward.when.null = true
fields.key = [clordid]
fields.timestamp = [transacttime]
fields.values = [symbol,orderqty,leavesqty,cumqty]
field.last.updated = lastupdated
fields.effective.from = [startdate]
fields.effective.to = [enddate]
field.current.flag = currentflag
time.model {
event.type = longmillis
last.updated.type = stringdatetime
}
}
output {
type = kudu
connection = "REPLACEME:7051"
table.name = "impala::default.fix_orderhistory"
}
}
largeorderalert {
dependencies = [newordersingle]
deriver {
type = sql
query.literal = "SELECT clordid, symbol, orderqty, transacttime FROM newordersingle WHERE orderqty = 9999"
}
planner {
type = append
}
output {
type = kafka
brokers = "REPLACEME:9092"
topic = largeorders
serializer {
type = delimited
field.delimiter = ","
}
}
}
}