-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.go
More file actions
203 lines (180 loc) · 6.75 KB
/
main.go
File metadata and controls
203 lines (180 loc) · 6.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
package main
import (
"context"
"os"
"strconv"
"sync"
"time"
ValueStore "github.com/diadata-org/fair-value/contracts/valuestore"
"github.com/diadata-org/fair-value/metrics"
"github.com/diadata-org/fair-value/models"
"github.com/diadata-org/fair-value/onchain"
"github.com/diadata-org/fair-value/scrapers"
"github.com/diadata-org/fair-value/utils"
"github.com/ethereum/go-ethereum/common"
log "github.com/sirupsen/logrus"
)
var (
configUpdateSeconds int
writeTickerSeconds int
decimalsOracleValue int
feedConfigs, feedConfigsNew []models.FeedConfig
)
func init() {
var err error
configUpdateSeconds, err = strconv.Atoi(utils.Getenv("CONFIG_UPDATE_SECONDS", "86400"))
if err != nil {
log.Errorf("parse CONFIG_UPDATE_SECONDS: %v", err)
configUpdateSeconds = 86400
}
writeTickerSeconds, err = strconv.Atoi(utils.Getenv("WRITE_TICKER_SECONDS", "300"))
if err != nil {
log.Errorf("parse WRITE_TICKER_SECONDS: %v", err)
writeTickerSeconds = 300
}
decimalsOracleValue, err = strconv.Atoi(utils.Getenv("DECIMALS_ORACLE_VALUE", "18"))
if err != nil {
log.Errorf("parse DECIMALS_ORACLE_VALUE: %v", err)
decimalsOracleValue = 18
}
log.Infof("Using DECIMALS_ORACLE_VALUE: %d", decimalsOracleValue)
}
func main() {
// On-chain setup
deployedContract := utils.Getenv("DEPLOYED_CONTRACT", "")
chainId, err := strconv.ParseInt(utils.Getenv("CHAIN_ID", "100640"), 10, 64)
if err != nil {
log.Fatalf("Failed to parse chainId: %v", err)
}
auth, conn, connBackup, privateKey, err := utils.SetupOnchain(
utils.Getenv("BLOCKCHAIN_NODE", ""),
utils.Getenv("BACKUP_NODE", ""),
utils.Getenv("PRIVATE_KEY", ""),
chainId,
)
if err != nil {
log.Fatal("SetupOnchain: ", err)
}
var contract *ValueStore.ValueStore
var contractBackup *ValueStore.ValueStore
err = onchain.DeployOrBindContract(deployedContract, conn, connBackup, auth, &contract, &contractBackup)
if err != nil {
log.Fatalf("Failed to Deploy or Bind primary and backup contract: %v", err)
}
var metacontractData models.MetacontractData
metacontractData.Address = common.HexToAddress(utils.Getenv("METACONTRACT_ADDRESS", ""))
metacontractData.Precision, err = strconv.Atoi(utils.Getenv("METACONTRACT_PRECISION", ""))
if err != nil {
log.Error("parse METACONTRACT_PRECISION: ", err)
metacontractData.Precision = 8
}
metacontractData.Client, err = utils.MakeEthClient(utils.Getenv("METACONTRACT_NODE", ""), utils.Getenv("METACONTRACT_NODE", ""))
if err != nil {
log.Fatalf("MakeEthClient for metacontract connection: %v", err)
}
// Start collecting and pushing metrics.
metrics.StartMetrics(
conn,
privateKey,
deployedContract,
chainId,
os.Getenv("PUSHGATEWAY_URL"),
os.Getenv("PUSHGATEWAY_USER"),
os.Getenv("PUSHGATEWAY_PASSWORD"),
utils.Getenv("ENABLE_METRICS_SERVER", "false"),
utils.Getenv("NODE_OPERATOR_NAME", ""),
utils.Getenv("METRICS_PORT", "9090"),
)
// Setting up feeders.
// Fetch configuration from local filesystem or remote github repository (default).
remoteConfig, err := strconv.ParseBool(utils.Getenv("REMOTE_CONFIG", "true"))
if err != nil {
log.Error("parse REMOTE_CONFIG: ", err)
remoteConfig = true
}
// fetch configuration from master branch per default.
branchConfig := utils.Getenv("BRANCH_CONFIG", "")
feedConfigs, err = models.GetFeedsFromConfig("fair-value-feeds.json", remoteConfig, branchConfig)
if err != nil {
log.Fatal("GetFeedsFromConfig: ", err)
}
for _, fc := range feedConfigs {
log.Infof("loaded %s from config. %s -- %s", fc.Symbol, fc.Blockchain, fc.Address)
}
var wg sync.WaitGroup
defer wg.Wait()
collectorChannel := make(chan models.FairValueData)
// Create scrapers for all assets available in config file.
allIscrapers := make(map[string]scrapers.IScraper)
for _, config := range feedConfigs {
ctx, cancel := context.WithCancel(context.Background())
allIscrapers[config.FeedConfigIdentifier()] = scrapers.NewIScraper(cancel, config, metacontractData)
wg.Add(1)
go handleData(ctx, allIscrapers[config.FeedConfigIdentifier()], &wg, collectorChannel)
}
// Routine that periodically fetches the configs and compares to deployed config.
// Whenever something is added to or removed from the config, either deploy or close it.
// For now, assume that configs are either added or removed, i.e. existing ones cannot be changed.
go func() {
configTicker := time.NewTicker(time.Duration(time.Duration(configUpdateSeconds) * time.Second))
for range configTicker.C {
feedConfigsNew, err = models.GetFeedsFromConfig("fair-value-feeds.json", remoteConfig, branchConfig)
if err != nil {
log.Error("GetFeedsFromConfig: ", err)
}
plus, minus := models.GetDiffConfig(feedConfigs, feedConfigsNew)
log.Info("REMOVING the following feeds...")
for _, m := range minus {
log.Infof("Symbol -- Address: %s -- %s", m.Symbol, m.Address)
}
log.Info("ADDING the following feeds...")
for _, m := range plus {
log.Infof("Symbol -- Address: %s -- %s", m.Symbol, m.Address)
}
// Close scrapers for removed configs.
for _, config := range minus {
if _, ok := allIscrapers[config.FeedConfigIdentifier()]; ok {
allIscrapers[config.FeedConfigIdentifier()].Close() <- true
delete(allIscrapers, config.FeedConfigIdentifier())
}
}
// Add scraper for added configs.
for _, config := range plus {
ctx, cancel := context.WithCancel(context.Background())
allIscrapers[config.FeedConfigIdentifier()] = scrapers.NewIScraper(cancel, config, metacontractData)
wg.Add(1)
go handleData(ctx, allIscrapers[config.FeedConfigIdentifier()], &wg, collectorChannel)
}
}
}()
// Routine writing collected data to the oracle.
collectedData := make(map[string]models.FairValueData)
go func() {
writeTicker := time.NewTicker(time.Duration(writeTickerSeconds) * time.Second)
for range writeTicker.C {
log.Info("collectedData:----------------------------------- ", collectedData)
onchain.OracleUpdateExecutor(auth, contract, contractBackup, conn, connBackup, collectedData, deployedContract, decimalsOracleValue)
}
}()
go func() {
for d := range collectorChannel {
// Only store the latest data point.
collectedData[d.FairValueDataIdentifier()] = d
}
}()
}
// handleData handles data from dataChannel.
func handleData(ctx context.Context, scraper scrapers.IScraper, wg *sync.WaitGroup, collectorChannel chan models.FairValueData) {
defer wg.Done()
for {
select {
case d := <-scraper.DataChannel():
log.Infof("symbol -- fairValueNative -- priceUSD: %s -- %v -- %v", d.Symbol, d.FairValueNative, d.PriceUSD)
log.Infof("symbol -- numerator -- denominator: %s -- %v -- %v", d.Symbol, d.Numerator.String(), d.Denominator.String())
collectorChannel <- d
case <-ctx.Done():
log.Warn("close data handler for scraper ", scraper.GetConfig().Symbol)
return
}
}
}