From 6c3d52cf02b053a992b8d0b759dff809b39e07d3 Mon Sep 17 00:00:00 2001 From: claytongentry Date: Wed, 30 Apr 2025 15:11:07 -0400 Subject: [PATCH 1/2] Metrics Merge Routines --- lib/manifold/api/workspace.rb | 10 ++ lib/manifold/templates/workspace_template.yml | 1 + .../terraform/workspace_configuration.rb | 103 ++++++++++++++++++ 3 files changed, 114 insertions(+) diff --git a/lib/manifold/api/workspace.rb b/lib/manifold/api/workspace.rb index 5b49db3..faa4b38 100644 --- a/lib/manifold/api/workspace.rb +++ b/lib/manifold/api/workspace.rb @@ -59,6 +59,7 @@ def generate(with_terraform: false) write_manifold_merge_sql write_dimensions_merge_sql + write_metrics_merge_sql generate_terraform logger.info("Generated Terraform configuration for workspace '#{name}'.") end @@ -108,6 +109,15 @@ def write_dimensions_merge_sql write_dimensions_merge_sql_file(sql) end + def write_metrics_merge_sql + return unless manifold_file + @manifold_yaml["metrics"]&.each_key do |group_name| + sql_builder = Terraform::SQLBuilder.new(name, manifold_yaml) + sql = sql_builder.build_metric_merge_sql(group_name) + routines_directory.join("merge_#{group_name}.sql").write(sql) + end + end + def valid_dimensions_config? return false unless manifold_yaml diff --git a/lib/manifold/templates/workspace_template.yml b/lib/manifold/templates/workspace_template.yml index 0b25da6..26f8adb 100644 --- a/lib/manifold/templates/workspace_template.yml +++ b/lib/manifold/templates/workspace_template.yml @@ -61,3 +61,4 @@ metrics: field: context.sequence filter: timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 90 DAY) + source: my_project.events diff --git a/lib/manifold/terraform/workspace_configuration.rb b/lib/manifold/terraform/workspace_configuration.rb index f1a772f..2e8daf7 100644 --- a/lib/manifold/terraform/workspace_configuration.rb +++ b/lib/manifold/terraform/workspace_configuration.rb @@ -33,6 +33,32 @@ def build_dimensions_merge_sql(source_sql) SQL end + def build_metric_merge_sql(group_name) + return "" unless valid_config? && @manifold_config["metrics"][group_name] + config = @manifold_config["metrics"][group_name] + ts_field = timestamp_field + interval = @manifold_config.dig("timestamp", "interval") + source = config["source"] + filter_clause = config["filter"] ? " WHERE #{config["filter"]}" : "" + metrics_struct = build_group_metrics_struct(group_name, config) + <<~SQL + MERGE #{@name}.#{metrics_table_name(group_name)} AS target + USING ( + SELECT + id, + TIMESTAMP_TRUNC(#{ts_field}, #{interval}) AS timestamp, + STRUCT(#{metrics_struct}) AS metrics + FROM #{source}#{filter_clause} + GROUP BY id, timestamp + ) AS source + ON source.id = target.id AND source.timestamp = target.timestamp + WHEN MATCHED THEN + UPDATE SET metrics = source.metrics + WHEN NOT MATCHED THEN + INSERT ROW; + SQL + end + private def valid_config? @@ -106,6 +132,64 @@ def build_metric_joins "#{first}\n #{joins.map { |table| "FULL OUTER JOIN #{table} USING (id, timestamp)" }.join("\n ")}" end + + # Builds the inner struct fields for a metrics group + def build_group_metrics_struct(group_name, config) + condition_names = config["conditions"]&.keys || [] + intersection_names = build_intersection_names(config) + field_names = condition_names + intersection_names + parts = field_names.map do |name| + expr = build_condition_expression(name, config) + aggregates = [] + if config.dig("aggregations", "countif") + count_name = config["aggregations"]["countif"] + aggregates << "COUNTIF(#{expr}) AS #{count_name}" + end + if config.dig("aggregations", "sumif") + config["aggregations"]["sumif"].each do |metric, sum_cfg| + field = sum_cfg["field"] + aggregates << "SUM(IF(#{expr}, #{field}, 0)) AS #{metric}" + end + end + "STRUCT(#{aggregates.join(', ')}) AS #{name}" + end + parts.join(", ") + end + + # Determines the condition expression, using a scalar function if args exist + def build_condition_expression(name, config) + cond_cfg = config["conditions"][name] + args = cond_cfg["args"]&.keys + if args && !args.empty? + "is#{name.capitalize}(#{args.join(', ')})" + else + cond_cfg["body"] + end + end + + # Builds intersection condition names from breakouts + def build_intersection_names(config) + breakouts = config["breakouts"] || {} + groups = breakouts.keys + return [] if groups.size <= 1 + (2..groups.size).flat_map do |size| + groups.combination(size).flat_map do |combo| + condition_sets = combo.map { |g| breakouts[g] } + combos = condition_sets.first.map { |c| [c] } + combos = condition_sets[1..].reduce(combos) do |acc, set| + acc.flat_map { |prev| set.map { |c| prev + [c] } } + end + combos.map { |conds| format_intersection_name(conds) } + end + end + end + + # Formats intersection names (first lowercase, others capitalized) + def format_intersection_name(conds) + name = conds.first + conds[1..].each { |c| name += c.capitalize } + name + end end # Handles building table configurations @@ -228,6 +312,12 @@ def routine_config "merge_dimensions" => dimensions_routine_attributes, "merge_manifold" => manifold_routine_attributes }.compact + # add metric merge routines + if @manifold_config&.dig("metrics") + @manifold_config["metrics"].each_key do |group| + routines["merge_#{group}"] = metric_routine_attributes(group) + end + end # add user-defined condition routines, if any conds = build_condition_routines routines.merge!(conds) unless conds.empty? @@ -260,6 +350,19 @@ def manifold_routine_attributes } end + # Builds attributes for a metric merge routine + def metric_routine_attributes(group_name) + { + "dataset_id" => name, + "project" => "${var.project_id}", + "routine_id" => "merge_#{group_name}", + "routine_type" => "PROCEDURE", + "language" => "SQL", + "definition_body" => "${file(\"${path.module}/routines/merge_#{group_name}.sql\")}", + "depends_on" => ["google_bigquery_dataset.#{name}"] + } + end + # generate scalar function routines for defined conditions def build_condition_routines return {} unless @manifold_config.is_a?(Hash) && @manifold_config["metrics"].is_a?(Hash) From e4be1120ea77e392d9ceae3002095a446035a1c1 Mon Sep 17 00:00:00 2001 From: claytongentry Date: Wed, 30 Apr 2025 16:22:00 -0400 Subject: [PATCH 2/2] test + lint --- lib/manifold/api/workspace.rb | 3 + .../terraform/workspace_configuration.rb | 22 +++++--- spec/manifold/api/workspace_spec.rb | 9 +++ spec/manifold/terraform/sql_builder_spec.rb | 55 +++++++++++++++++++ .../terraform/workspace_configuration_spec.rb | 15 +++++ 5 files changed, 97 insertions(+), 7 deletions(-) diff --git a/lib/manifold/api/workspace.rb b/lib/manifold/api/workspace.rb index faa4b38..f760c67 100644 --- a/lib/manifold/api/workspace.rb +++ b/lib/manifold/api/workspace.rb @@ -26,6 +26,7 @@ def generate(path) end # Encapsulates a single manifold. + # rubocop:disable Metrics/ClassLength class Workspace attr_reader :name, :template_path, :logger @@ -111,6 +112,7 @@ def write_dimensions_merge_sql def write_metrics_merge_sql return unless manifold_file + @manifold_yaml["metrics"]&.each_key do |group_name| sql_builder = Terraform::SQLBuilder.new(name, manifold_yaml) sql = sql_builder.build_metric_merge_sql(group_name) @@ -162,5 +164,6 @@ def generate_terraform terraform_generator.generate(terraform_main_path) end end + # rubocop:enable Metrics/ClassLength end end diff --git a/lib/manifold/terraform/workspace_configuration.rb b/lib/manifold/terraform/workspace_configuration.rb index 2e8daf7..5311ef2 100644 --- a/lib/manifold/terraform/workspace_configuration.rb +++ b/lib/manifold/terraform/workspace_configuration.rb @@ -1,7 +1,10 @@ # frozen_string_literal: true +# rubocop:disable Metrics/MethodLength, Metrics/AbcSize + module Manifold module Terraform + # rubocop:disable Metrics/ClassLength # Handles building SQL for manifold routines class SQLBuilder def initialize(name, manifold_config) @@ -35,6 +38,7 @@ def build_dimensions_merge_sql(source_sql) def build_metric_merge_sql(group_name) return "" unless valid_config? && @manifold_config["metrics"][group_name] + config = @manifold_config["metrics"][group_name] ts_field = timestamp_field interval = @manifold_config.dig("timestamp", "interval") @@ -52,10 +56,8 @@ def build_metric_merge_sql(group_name) GROUP BY id, timestamp ) AS source ON source.id = target.id AND source.timestamp = target.timestamp - WHEN MATCHED THEN - UPDATE SET metrics = source.metrics - WHEN NOT MATCHED THEN - INSERT ROW; + WHEN MATCHED THEN UPDATE SET metrics = source.metrics + WHEN NOT MATCHED THEN INSERT ROW; SQL end @@ -134,7 +136,7 @@ def build_metric_joins end # Builds the inner struct fields for a metrics group - def build_group_metrics_struct(group_name, config) + def build_group_metrics_struct(_group_name, config) condition_names = config["conditions"]&.keys || [] intersection_names = build_intersection_names(config) field_names = condition_names + intersection_names @@ -151,7 +153,7 @@ def build_group_metrics_struct(group_name, config) aggregates << "SUM(IF(#{expr}, #{field}, 0)) AS #{metric}" end end - "STRUCT(#{aggregates.join(', ')}) AS #{name}" + "STRUCT(#{aggregates.join(", ")}) AS #{name}" end parts.join(", ") end @@ -161,17 +163,19 @@ def build_condition_expression(name, config) cond_cfg = config["conditions"][name] args = cond_cfg["args"]&.keys if args && !args.empty? - "is#{name.capitalize}(#{args.join(', ')})" + "is#{name.capitalize}(#{args.join(", ")})" else cond_cfg["body"] end end # Builds intersection condition names from breakouts + # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity def build_intersection_names(config) breakouts = config["breakouts"] || {} groups = breakouts.keys return [] if groups.size <= 1 + (2..groups.size).flat_map do |size| groups.combination(size).flat_map do |combo| condition_sets = combo.map { |g| breakouts[g] } @@ -183,6 +187,7 @@ def build_intersection_names(config) end end end + # rubocop:enable Metrics/AbcSize, Metrics/MethodLength, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity # Formats intersection names (first lowercase, others capitalized) def format_intersection_name(conds) @@ -191,6 +196,7 @@ def format_intersection_name(conds) name end end + # rubocop:enable Metrics/ClassLength # Handles building table configurations class TableConfigBuilder @@ -307,6 +313,7 @@ def dataset_config } end + # rubocop:disable Metrics/MethodLength def routine_config routines = { "merge_dimensions" => dimensions_routine_attributes, @@ -323,6 +330,7 @@ def routine_config routines.merge!(conds) unless conds.empty? routines.empty? ? nil : routines end + # rubocop:enable Metrics/MethodLength def dimensions_routine_attributes return nil if @vectors.empty? || @dimensions_config.nil? diff --git a/spec/manifold/api/workspace_spec.rb b/spec/manifold/api/workspace_spec.rb index 68919c8..3770d4a 100644 --- a/spec/manifold/api/workspace_spec.rb +++ b/spec/manifold/api/workspace_spec.rb @@ -377,6 +377,15 @@ def parse_metrics_schema(group_name) expect(sql).to include("MERGE #{workspace.name}.Manifold AS target") end + it "generates metric merge SQL file for each metric group" do + expect(workspace.routines_directory.join("merge_taps.sql")).to be_file + end + + it "includes the metric merge SQL in the generated file" do + sql = workspace.routines_directory.join("merge_taps.sql").read + expect(sql).to include("MERGE #{workspace.name}.TapsMetrics AS target") + end + def configure_vector_service allow(Manifold::Services::VectorService).to receive(:new).and_return(vector_service) configure_vector_schema diff --git a/spec/manifold/terraform/sql_builder_spec.rb b/spec/manifold/terraform/sql_builder_spec.rb index 7d28803..a61c9ee 100644 --- a/spec/manifold/terraform/sql_builder_spec.rb +++ b/spec/manifold/terraform/sql_builder_spec.rb @@ -55,4 +55,59 @@ expect(sql).to include("WHEN NOT MATCHED THEN INSERT ROW") end end + + describe "#build_metric_merge_sql" do + subject(:sql) { builder.build_metric_merge_sql("renders") } + + let(:manifold_config) do + { + "timestamp" => { "field" => "timestamp", "interval" => "DAY" }, + "metrics" => { + "renders" => { + "source" => "my_project.events", + "filter" => "timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)", + "conditions" => { + "mobile" => { "args" => { "device" => "STRING" }, "body" => "device = 'mobile'" } + }, + "aggregations" => { + "countif" => "renderCount", + "sumif" => { "seqSum" => { "field" => "context.seq" } } + } + } + } + } + end + + it "merges into the renders metrics table" do + expect(sql).to include("MERGE analytics.RendersMetrics AS target") + end + + it "truncates the timestamp field by interval" do + expect(sql).to include("TIMESTAMP_TRUNC(timestamp, DAY) AS timestamp") + end + + it "uses the configured source" do + expect(sql).to include("FROM my_project.events") + end + + it "uses the configured filter" do + expect(sql).to include("WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)") + end + + it "groups by id and timestamp" do + expect(sql).to match(/GROUP BY id, timestamp/) + end + + it "includes countif aggregation with condition expression" do + expect(sql).to include("COUNTIF(isMobile(device)) AS renderCount") + end + + it "includes sumif aggregation with IF expression" do + expect(sql).to include("SUM(IF(isMobile(device), context.seq, 0)) AS seqSum") + end + + it "updates on match and inserts on not match" do + expect(sql).to include("WHEN MATCHED THEN").and include("WHEN NOT MATCHED THEN INSERT ROW") + end + end end diff --git a/spec/manifold/terraform/workspace_configuration_spec.rb b/spec/manifold/terraform/workspace_configuration_spec.rb index 07ad7f7..857d6d3 100644 --- a/spec/manifold/terraform/workspace_configuration_spec.rb +++ b/spec/manifold/terraform/workspace_configuration_spec.rb @@ -109,11 +109,26 @@ config.manifold_config = manifold_config end + let(:taps_routine) { json["resource"]["google_bigquery_routine"]["merge_taps"] } + it "includes metrics table configurations" do expect(json["resource"]["google_bigquery_table"]).to include( "tapsmetrics" => expected_metrics_table("taps") ) end + + it "includes the merge_taps routine" do + routines = json["resource"]["google_bigquery_routine"] + expect(routines).to include("merge_taps") + end + + it "sets the correct definition_body for merge_taps routine" do + expect(taps_routine["definition_body"]).to eq("${file(\"${path.module}/routines/merge_taps.sql\")}") + end + + it "sets the routine_type for merge_taps routine" do + expect(taps_routine["routine_type"]).to eq("PROCEDURE") + end end context "when metric conditions are present" do