Skip to content

Commit 9c1bec5

Browse files
author
Qiu Jian
committed
fix: clickhouse sync table with order by changes
1 parent 29939ab commit 9c1bec5

2 files changed

Lines changed: 53 additions & 18 deletions

File tree

backends/clickhouse/clickhouse.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -123,17 +123,17 @@ func (click *SClickhouseBackend) GetCreateSQLs(ts sqlchemy.ITableSpec) []string
123123
var ttlCol IClickhouseColumnSpec
124124
for _, c := range ts.Columns() {
125125
cols = append(cols, c.DefinitionString())
126-
if c.IsPrimary() {
127-
primaries = append(primaries, fmt.Sprintf("`%s`", c.Name()))
128-
}
129126
if cc, ok := c.(IClickhouseColumnSpec); ok {
130-
if cc.IsOrderBy() {
131-
orderbys = append(orderbys, fmt.Sprintf("`%s`", c.Name()))
132-
}
133127
partition := cc.PartitionBy()
134128
if len(partition) > 0 && !utils.IsInStringArray(partition, partitions) {
135129
partitions = append(partitions, partition)
136130
}
131+
if c.IsPrimary() && len(partition) == 0 {
132+
primaries = append(primaries, fmt.Sprintf("`%s`", c.Name()))
133+
}
134+
if cc.IsOrderBy() && len(partition) == 0 {
135+
orderbys = append(orderbys, fmt.Sprintf("`%s`", c.Name()))
136+
}
137137
ttlC, ttlU := cc.GetTTL()
138138
if ttlC > 0 && len(ttlU) > 0 {
139139
ttlCol = cc
@@ -183,7 +183,7 @@ func (click *SClickhouseBackend) GetCreateSQLs(ts sqlchemy.ITableSpec) []string
183183
createSql += fmt.Sprintf("\nTTL `%s` + INTERVAL %d %s", ttlCol.Name(), ttlCount, ttlUnit)
184184
}
185185
// set default time zone of table to UTC
186-
createSql += "\nSETTINGS index_granularity=8192"
186+
createSql += "\nSETTINGS index_granularity=8192, allow_nullable_key=1"
187187
}
188188
return []string{
189189
createSql,

backends/clickhouse/sync.go

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,18 @@ func findTtlColumn(cols []sqlchemy.IColumnSpec) sColumnTTL {
4747
return ret
4848
}
4949

50+
func findOrderByColumns(cols []sqlchemy.IColumnSpec) []string {
51+
var ret []string
52+
for _, col := range cols {
53+
if clickCol, ok := col.(IClickhouseColumnSpec); ok {
54+
if (clickCol.IsOrderBy() || clickCol.IsPrimary()) && len(clickCol.PartitionBy()) == 0 {
55+
ret = append(ret, clickCol.Name())
56+
}
57+
}
58+
}
59+
return ret
60+
}
61+
5062
func findPartitions(cols []sqlchemy.IColumnSpec) []string {
5163
parts := make([]string, 0)
5264
for i := range cols {
@@ -158,18 +170,41 @@ func (clickhouse *SClickhouseBackend) CommitTableChangeSQL(ts sqlchemy.ITableSpe
158170
}*/
159171

160172
// check TTL
161-
oldTtlSpec := findTtlColumn(changes.OldColumns)
162-
newTtlSpec := findTtlColumn(ts.Columns())
163-
log.Debugf("old: %s new: %s", jsonutils.Marshal(oldTtlSpec), jsonutils.Marshal(newTtlSpec))
164-
if oldTtlSpec != newTtlSpec {
165-
if oldTtlSpec.Count > 0 && newTtlSpec.Count == 0 {
166-
// remove
167-
sql := "REMOVE TTL"
168-
alters = append(alters, sql)
169-
} else {
173+
{
174+
oldTtlSpec := findTtlColumn(changes.OldColumns)
175+
newTtlSpec := findTtlColumn(ts.Columns())
176+
log.Debugf("old: %s new: %s", jsonutils.Marshal(oldTtlSpec), jsonutils.Marshal(newTtlSpec))
177+
if oldTtlSpec != newTtlSpec {
178+
if oldTtlSpec.Count > 0 && newTtlSpec.Count == 0 {
179+
// remove
180+
sql := fmt.Sprintf("REMOVE TTL")
181+
alters = append(alters, sql)
182+
} else {
183+
// alter
184+
sql := fmt.Sprintf("MODIFY TTL `%s` + INTERVAL %d %s", newTtlSpec.ColName, newTtlSpec.Count, newTtlSpec.Unit)
185+
alters = append(alters, sql)
186+
}
187+
}
188+
}
189+
190+
// check order by
191+
{
192+
oldOrderBys := findOrderByColumns(changes.OldColumns)
193+
newOrderBys := findOrderByColumns(ts.Columns())
194+
log.Debugf("old: %s new: %s", jsonutils.Marshal(oldOrderBys), jsonutils.Marshal(newOrderBys))
195+
if jsonutils.Marshal(oldOrderBys).String() != jsonutils.Marshal(newOrderBys).String() {
170196
// alter
171-
sql := fmt.Sprintf("MODIFY TTL `%s` + INTERVAL %d %s", newTtlSpec.ColName, newTtlSpec.Count, newTtlSpec.Unit)
172-
alters = append(alters, sql)
197+
altered := false
198+
for i := range newOrderBys {
199+
if !utils.IsInStringArray(newOrderBys[i], oldOrderBys) {
200+
oldOrderBys = append(oldOrderBys, newOrderBys[i])
201+
altered = true
202+
}
203+
}
204+
if altered {
205+
sql := fmt.Sprintf("MODIFY ORDER BY (%s)", strings.Join(oldOrderBys, ", "))
206+
alters = append(alters, sql)
207+
}
173208
}
174209
}
175210

0 commit comments

Comments
 (0)