diff --git a/lib/manifold/api/workspace.rb b/lib/manifold/api/workspace.rb index 5b49db3..f760c67 100644 --- a/lib/manifold/api/workspace.rb +++ b/lib/manifold/api/workspace.rb @@ -26,6 +26,7 @@ def generate(path) end # Encapsulates a single manifold. + # rubocop:disable Metrics/ClassLength class Workspace attr_reader :name, :template_path, :logger @@ -59,6 +60,7 @@ def generate(with_terraform: false) write_manifold_merge_sql write_dimensions_merge_sql + write_metrics_merge_sql generate_terraform logger.info("Generated Terraform configuration for workspace '#{name}'.") end @@ -108,6 +110,16 @@ def write_dimensions_merge_sql write_dimensions_merge_sql_file(sql) end + def write_metrics_merge_sql + return unless manifold_file + + @manifold_yaml["metrics"]&.each_key do |group_name| + sql_builder = Terraform::SQLBuilder.new(name, manifold_yaml) + sql = sql_builder.build_metric_merge_sql(group_name) + routines_directory.join("merge_#{group_name}.sql").write(sql) + end + end + def valid_dimensions_config? return false unless manifold_yaml @@ -152,5 +164,6 @@ def generate_terraform terraform_generator.generate(terraform_main_path) end end + # rubocop:enable Metrics/ClassLength end end diff --git a/lib/manifold/templates/workspace_template.yml b/lib/manifold/templates/workspace_template.yml index 0b25da6..26f8adb 100644 --- a/lib/manifold/templates/workspace_template.yml +++ b/lib/manifold/templates/workspace_template.yml @@ -61,3 +61,4 @@ metrics: field: context.sequence filter: timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 90 DAY) + source: my_project.events diff --git a/lib/manifold/terraform/workspace_configuration.rb b/lib/manifold/terraform/workspace_configuration.rb index f1a772f..5311ef2 100644 --- a/lib/manifold/terraform/workspace_configuration.rb +++ b/lib/manifold/terraform/workspace_configuration.rb @@ -1,7 +1,10 @@ # frozen_string_literal: true +# rubocop:disable Metrics/MethodLength, Metrics/AbcSize + module Manifold module Terraform + # rubocop:disable Metrics/ClassLength # Handles building SQL for manifold routines class SQLBuilder def initialize(name, manifold_config) @@ -33,6 +36,31 @@ def build_dimensions_merge_sql(source_sql) SQL end + def build_metric_merge_sql(group_name) + return "" unless valid_config? && @manifold_config["metrics"][group_name] + + config = @manifold_config["metrics"][group_name] + ts_field = timestamp_field + interval = @manifold_config.dig("timestamp", "interval") + source = config["source"] + filter_clause = config["filter"] ? " WHERE #{config["filter"]}" : "" + metrics_struct = build_group_metrics_struct(group_name, config) + <<~SQL + MERGE #{@name}.#{metrics_table_name(group_name)} AS target + USING ( + SELECT + id, + TIMESTAMP_TRUNC(#{ts_field}, #{interval}) AS timestamp, + STRUCT(#{metrics_struct}) AS metrics + FROM #{source}#{filter_clause} + GROUP BY id, timestamp + ) AS source + ON source.id = target.id AND source.timestamp = target.timestamp + WHEN MATCHED THEN UPDATE SET metrics = source.metrics + WHEN NOT MATCHED THEN INSERT ROW; + SQL + end + private def valid_config? @@ -106,7 +134,69 @@ def build_metric_joins "#{first}\n #{joins.map { |table| "FULL OUTER JOIN #{table} USING (id, timestamp)" }.join("\n ")}" end + + # Builds the inner struct fields for a metrics group + def build_group_metrics_struct(_group_name, config) + condition_names = config["conditions"]&.keys || [] + intersection_names = build_intersection_names(config) + field_names = condition_names + intersection_names + parts = field_names.map do |name| + expr = build_condition_expression(name, config) + aggregates = [] + if config.dig("aggregations", "countif") + count_name = config["aggregations"]["countif"] + aggregates << "COUNTIF(#{expr}) AS #{count_name}" + end + if config.dig("aggregations", "sumif") + config["aggregations"]["sumif"].each do |metric, sum_cfg| + field = sum_cfg["field"] + aggregates << "SUM(IF(#{expr}, #{field}, 0)) AS #{metric}" + end + end + "STRUCT(#{aggregates.join(", ")}) AS #{name}" + end + parts.join(", ") + end + + # Determines the condition expression, using a scalar function if args exist + def build_condition_expression(name, config) + cond_cfg = config["conditions"][name] + args = cond_cfg["args"]&.keys + if args && !args.empty? + "is#{name.capitalize}(#{args.join(", ")})" + else + cond_cfg["body"] + end + end + + # Builds intersection condition names from breakouts + # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity + def build_intersection_names(config) + breakouts = config["breakouts"] || {} + groups = breakouts.keys + return [] if groups.size <= 1 + + (2..groups.size).flat_map do |size| + groups.combination(size).flat_map do |combo| + condition_sets = combo.map { |g| breakouts[g] } + combos = condition_sets.first.map { |c| [c] } + combos = condition_sets[1..].reduce(combos) do |acc, set| + acc.flat_map { |prev| set.map { |c| prev + [c] } } + end + combos.map { |conds| format_intersection_name(conds) } + end + end + end + # rubocop:enable Metrics/AbcSize, Metrics/MethodLength, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity + + # Formats intersection names (first lowercase, others capitalized) + def format_intersection_name(conds) + name = conds.first + conds[1..].each { |c| name += c.capitalize } + name + end end + # rubocop:enable Metrics/ClassLength # Handles building table configurations class TableConfigBuilder @@ -223,16 +313,24 @@ def dataset_config } end + # rubocop:disable Metrics/MethodLength def routine_config routines = { "merge_dimensions" => dimensions_routine_attributes, "merge_manifold" => manifold_routine_attributes }.compact + # add metric merge routines + if @manifold_config&.dig("metrics") + @manifold_config["metrics"].each_key do |group| + routines["merge_#{group}"] = metric_routine_attributes(group) + end + end # add user-defined condition routines, if any conds = build_condition_routines routines.merge!(conds) unless conds.empty? routines.empty? ? nil : routines end + # rubocop:enable Metrics/MethodLength def dimensions_routine_attributes return nil if @vectors.empty? || @dimensions_config.nil? @@ -260,6 +358,19 @@ def manifold_routine_attributes } end + # Builds attributes for a metric merge routine + def metric_routine_attributes(group_name) + { + "dataset_id" => name, + "project" => "${var.project_id}", + "routine_id" => "merge_#{group_name}", + "routine_type" => "PROCEDURE", + "language" => "SQL", + "definition_body" => "${file(\"${path.module}/routines/merge_#{group_name}.sql\")}", + "depends_on" => ["google_bigquery_dataset.#{name}"] + } + end + # generate scalar function routines for defined conditions def build_condition_routines return {} unless @manifold_config.is_a?(Hash) && @manifold_config["metrics"].is_a?(Hash) diff --git a/spec/manifold/api/workspace_spec.rb b/spec/manifold/api/workspace_spec.rb index 68919c8..3770d4a 100644 --- a/spec/manifold/api/workspace_spec.rb +++ b/spec/manifold/api/workspace_spec.rb @@ -377,6 +377,15 @@ def parse_metrics_schema(group_name) expect(sql).to include("MERGE #{workspace.name}.Manifold AS target") end + it "generates metric merge SQL file for each metric group" do + expect(workspace.routines_directory.join("merge_taps.sql")).to be_file + end + + it "includes the metric merge SQL in the generated file" do + sql = workspace.routines_directory.join("merge_taps.sql").read + expect(sql).to include("MERGE #{workspace.name}.TapsMetrics AS target") + end + def configure_vector_service allow(Manifold::Services::VectorService).to receive(:new).and_return(vector_service) configure_vector_schema diff --git a/spec/manifold/terraform/sql_builder_spec.rb b/spec/manifold/terraform/sql_builder_spec.rb index 7d28803..a61c9ee 100644 --- a/spec/manifold/terraform/sql_builder_spec.rb +++ b/spec/manifold/terraform/sql_builder_spec.rb @@ -55,4 +55,59 @@ expect(sql).to include("WHEN NOT MATCHED THEN INSERT ROW") end end + + describe "#build_metric_merge_sql" do + subject(:sql) { builder.build_metric_merge_sql("renders") } + + let(:manifold_config) do + { + "timestamp" => { "field" => "timestamp", "interval" => "DAY" }, + "metrics" => { + "renders" => { + "source" => "my_project.events", + "filter" => "timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)", + "conditions" => { + "mobile" => { "args" => { "device" => "STRING" }, "body" => "device = 'mobile'" } + }, + "aggregations" => { + "countif" => "renderCount", + "sumif" => { "seqSum" => { "field" => "context.seq" } } + } + } + } + } + end + + it "merges into the renders metrics table" do + expect(sql).to include("MERGE analytics.RendersMetrics AS target") + end + + it "truncates the timestamp field by interval" do + expect(sql).to include("TIMESTAMP_TRUNC(timestamp, DAY) AS timestamp") + end + + it "uses the configured source" do + expect(sql).to include("FROM my_project.events") + end + + it "uses the configured filter" do + expect(sql).to include("WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)") + end + + it "groups by id and timestamp" do + expect(sql).to match(/GROUP BY id, timestamp/) + end + + it "includes countif aggregation with condition expression" do + expect(sql).to include("COUNTIF(isMobile(device)) AS renderCount") + end + + it "includes sumif aggregation with IF expression" do + expect(sql).to include("SUM(IF(isMobile(device), context.seq, 0)) AS seqSum") + end + + it "updates on match and inserts on not match" do + expect(sql).to include("WHEN MATCHED THEN").and include("WHEN NOT MATCHED THEN INSERT ROW") + end + end end diff --git a/spec/manifold/terraform/workspace_configuration_spec.rb b/spec/manifold/terraform/workspace_configuration_spec.rb index 07ad7f7..857d6d3 100644 --- a/spec/manifold/terraform/workspace_configuration_spec.rb +++ b/spec/manifold/terraform/workspace_configuration_spec.rb @@ -109,11 +109,26 @@ config.manifold_config = manifold_config end + let(:taps_routine) { json["resource"]["google_bigquery_routine"]["merge_taps"] } + it "includes metrics table configurations" do expect(json["resource"]["google_bigquery_table"]).to include( "tapsmetrics" => expected_metrics_table("taps") ) end + + it "includes the merge_taps routine" do + routines = json["resource"]["google_bigquery_routine"] + expect(routines).to include("merge_taps") + end + + it "sets the correct definition_body for merge_taps routine" do + expect(taps_routine["definition_body"]).to eq("${file(\"${path.module}/routines/merge_taps.sql\")}") + end + + it "sets the routine_type for merge_taps routine" do + expect(taps_routine["routine_type"]).to eq("PROCEDURE") + end end context "when metric conditions are present" do