Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions lib/manifold/api/workspace.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def generate(path)
end

# Encapsulates a single manifold.
# rubocop:disable Metrics/ClassLength
class Workspace
attr_reader :name, :template_path, :logger

Expand Down Expand Up @@ -59,6 +60,7 @@ def generate(with_terraform: false)

write_manifold_merge_sql
write_dimensions_merge_sql
write_metrics_merge_sql
generate_terraform
logger.info("Generated Terraform configuration for workspace '#{name}'.")
end
Expand Down Expand Up @@ -108,6 +110,16 @@ def write_dimensions_merge_sql
write_dimensions_merge_sql_file(sql)
end

def write_metrics_merge_sql
return unless manifold_file

@manifold_yaml["metrics"]&.each_key do |group_name|
sql_builder = Terraform::SQLBuilder.new(name, manifold_yaml)
sql = sql_builder.build_metric_merge_sql(group_name)
routines_directory.join("merge_#{group_name}.sql").write(sql)
end
end

def valid_dimensions_config?
return false unless manifold_yaml

Expand Down Expand Up @@ -152,5 +164,6 @@ def generate_terraform
terraform_generator.generate(terraform_main_path)
end
end
# rubocop:enable Metrics/ClassLength
end
end
1 change: 1 addition & 0 deletions lib/manifold/templates/workspace_template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,4 @@ metrics:
field: context.sequence

filter: timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 90 DAY)
source: my_project.events
111 changes: 111 additions & 0 deletions lib/manifold/terraform/workspace_configuration.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# frozen_string_literal: true

# rubocop:disable Metrics/MethodLength, Metrics/AbcSize

module Manifold
module Terraform
# rubocop:disable Metrics/ClassLength
# Handles building SQL for manifold routines
class SQLBuilder
def initialize(name, manifold_config)
Expand Down Expand Up @@ -33,6 +36,31 @@ def build_dimensions_merge_sql(source_sql)
SQL
end

def build_metric_merge_sql(group_name)
return "" unless valid_config? && @manifold_config["metrics"][group_name]

config = @manifold_config["metrics"][group_name]
ts_field = timestamp_field
interval = @manifold_config.dig("timestamp", "interval")
source = config["source"]
filter_clause = config["filter"] ? " WHERE #{config["filter"]}" : ""
metrics_struct = build_group_metrics_struct(group_name, config)
<<~SQL
MERGE #{@name}.#{metrics_table_name(group_name)} AS target
USING (
SELECT
id,
TIMESTAMP_TRUNC(#{ts_field}, #{interval}) AS timestamp,
STRUCT(#{metrics_struct}) AS metrics
FROM #{source}#{filter_clause}
GROUP BY id, timestamp
) AS source
ON source.id = target.id AND source.timestamp = target.timestamp
WHEN MATCHED THEN UPDATE SET metrics = source.metrics
WHEN NOT MATCHED THEN INSERT ROW;
SQL
end

private

def valid_config?
Expand Down Expand Up @@ -106,7 +134,69 @@ def build_metric_joins

"#{first}\n #{joins.map { |table| "FULL OUTER JOIN #{table} USING (id, timestamp)" }.join("\n ")}"
end

# Builds the inner struct fields for a metrics group
def build_group_metrics_struct(_group_name, config)
condition_names = config["conditions"]&.keys || []
intersection_names = build_intersection_names(config)
field_names = condition_names + intersection_names
parts = field_names.map do |name|
expr = build_condition_expression(name, config)
aggregates = []
if config.dig("aggregations", "countif")
count_name = config["aggregations"]["countif"]
aggregates << "COUNTIF(#{expr}) AS #{count_name}"
end
if config.dig("aggregations", "sumif")
config["aggregations"]["sumif"].each do |metric, sum_cfg|
field = sum_cfg["field"]
aggregates << "SUM(IF(#{expr}, #{field}, 0)) AS #{metric}"
end
end
"STRUCT(#{aggregates.join(", ")}) AS #{name}"
end
parts.join(", ")
end

# Determines the condition expression, using a scalar function if args exist
def build_condition_expression(name, config)
cond_cfg = config["conditions"][name]
args = cond_cfg["args"]&.keys
if args && !args.empty?
"is#{name.capitalize}(#{args.join(", ")})"
else
cond_cfg["body"]
end
end

# Builds intersection condition names from breakouts
# rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
def build_intersection_names(config)
breakouts = config["breakouts"] || {}
groups = breakouts.keys
return [] if groups.size <= 1

(2..groups.size).flat_map do |size|
groups.combination(size).flat_map do |combo|
condition_sets = combo.map { |g| breakouts[g] }
combos = condition_sets.first.map { |c| [c] }
combos = condition_sets[1..].reduce(combos) do |acc, set|
acc.flat_map { |prev| set.map { |c| prev + [c] } }
end
combos.map { |conds| format_intersection_name(conds) }
end
end
end
# rubocop:enable Metrics/AbcSize, Metrics/MethodLength, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity

# Formats intersection names (first lowercase, others capitalized)
def format_intersection_name(conds)
name = conds.first
conds[1..].each { |c| name += c.capitalize }
name
end
end
# rubocop:enable Metrics/ClassLength

# Handles building table configurations
class TableConfigBuilder
Expand Down Expand Up @@ -223,16 +313,24 @@ def dataset_config
}
end

# rubocop:disable Metrics/MethodLength
def routine_config
routines = {
"merge_dimensions" => dimensions_routine_attributes,
"merge_manifold" => manifold_routine_attributes
}.compact
# add metric merge routines
if @manifold_config&.dig("metrics")
@manifold_config["metrics"].each_key do |group|
routines["merge_#{group}"] = metric_routine_attributes(group)
end
end
# add user-defined condition routines, if any
conds = build_condition_routines
routines.merge!(conds) unless conds.empty?
routines.empty? ? nil : routines
end
# rubocop:enable Metrics/MethodLength

def dimensions_routine_attributes
return nil if @vectors.empty? || @dimensions_config.nil?
Expand Down Expand Up @@ -260,6 +358,19 @@ def manifold_routine_attributes
}
end

# Builds attributes for a metric merge routine
def metric_routine_attributes(group_name)
{
"dataset_id" => name,
"project" => "${var.project_id}",
"routine_id" => "merge_#{group_name}",
"routine_type" => "PROCEDURE",
"language" => "SQL",
"definition_body" => "${file(\"${path.module}/routines/merge_#{group_name}.sql\")}",
"depends_on" => ["google_bigquery_dataset.#{name}"]
}
end

# generate scalar function routines for defined conditions
def build_condition_routines
return {} unless @manifold_config.is_a?(Hash) && @manifold_config["metrics"].is_a?(Hash)
Expand Down
9 changes: 9 additions & 0 deletions spec/manifold/api/workspace_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,15 @@ def parse_metrics_schema(group_name)
expect(sql).to include("MERGE #{workspace.name}.Manifold AS target")
end

it "generates metric merge SQL file for each metric group" do
expect(workspace.routines_directory.join("merge_taps.sql")).to be_file
end

it "includes the metric merge SQL in the generated file" do
sql = workspace.routines_directory.join("merge_taps.sql").read
expect(sql).to include("MERGE #{workspace.name}.TapsMetrics AS target")
end

def configure_vector_service
allow(Manifold::Services::VectorService).to receive(:new).and_return(vector_service)
configure_vector_schema
Expand Down
55 changes: 55 additions & 0 deletions spec/manifold/terraform/sql_builder_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,59 @@
expect(sql).to include("WHEN NOT MATCHED THEN INSERT ROW")
end
end

describe "#build_metric_merge_sql" do
subject(:sql) { builder.build_metric_merge_sql("renders") }

let(:manifold_config) do
{
"timestamp" => { "field" => "timestamp", "interval" => "DAY" },
"metrics" => {
"renders" => {
"source" => "my_project.events",
"filter" => "timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)",
"conditions" => {
"mobile" => { "args" => { "device" => "STRING" }, "body" => "device = 'mobile'" }
},
"aggregations" => {
"countif" => "renderCount",
"sumif" => { "seqSum" => { "field" => "context.seq" } }
}
}
}
}
end

it "merges into the renders metrics table" do
expect(sql).to include("MERGE analytics.RendersMetrics AS target")
end

it "truncates the timestamp field by interval" do
expect(sql).to include("TIMESTAMP_TRUNC(timestamp, DAY) AS timestamp")
end

it "uses the configured source" do
expect(sql).to include("FROM my_project.events")
end

it "uses the configured filter" do
expect(sql).to include("WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)")
end

it "groups by id and timestamp" do
expect(sql).to match(/GROUP BY id, timestamp/)
end

it "includes countif aggregation with condition expression" do
expect(sql).to include("COUNTIF(isMobile(device)) AS renderCount")
end

it "includes sumif aggregation with IF expression" do
expect(sql).to include("SUM(IF(isMobile(device), context.seq, 0)) AS seqSum")
end

it "updates on match and inserts on not match" do
expect(sql).to include("WHEN MATCHED THEN").and include("WHEN NOT MATCHED THEN INSERT ROW")
end
end
end
15 changes: 15 additions & 0 deletions spec/manifold/terraform/workspace_configuration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,26 @@
config.manifold_config = manifold_config
end

let(:taps_routine) { json["resource"]["google_bigquery_routine"]["merge_taps"] }

it "includes metrics table configurations" do
expect(json["resource"]["google_bigquery_table"]).to include(
"tapsmetrics" => expected_metrics_table("taps")
)
end

it "includes the merge_taps routine" do
routines = json["resource"]["google_bigquery_routine"]
expect(routines).to include("merge_taps")
end

it "sets the correct definition_body for merge_taps routine" do
expect(taps_routine["definition_body"]).to eq("${file(\"${path.module}/routines/merge_taps.sql\")}")
end

it "sets the routine_type for merge_taps routine" do
expect(taps_routine["routine_type"]).to eq("PROCEDURE")
end
end

context "when metric conditions are present" do
Expand Down
Loading