Add tagging support for workflow search in PostgresWorkflowRegistry#201
Add tagging support for workflow search in PostgresWorkflowRegistry#201abhinavs1920 wants to merge 7 commits intobusiness4s:mainfrom
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds status, created_at, wakeup_at, and tags columns and indexes to the Postgres registry schema; extends PostgresWorkflowRegistry with tagging, search, and count APIs, tag serialization and status mapping; updates upsert/getExecutingWorkflows and test infra with Docker-availability guards. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Test/Client
participant Reg as PostgresWorkflowRegistry
participant Tagger as Tagger
participant DB as PostgreSQL
Client->>Reg: upsertInstance(templateId, instanceId, state, execMeta)
Reg->>Tagger: compute tags (templateId, state)
Tagger-->>Reg: Map[String,String]
Reg->>Reg: statusToString, tagsToJson, set created_at/updated_at/wakeup_at
Reg->>DB: INSERT ... ON CONFLICT ... UPDATE (status, created_at, updated_at, wakeup_at, tags)
DB-->>Reg: OK
Client->>Reg: search(templateId, query)
Reg->>Reg: buildFilters(query) / buildOrderBy(query)
Reg->>DB: SELECT ... WHERE ... ORDER BY ... LIMIT/OFFSET
DB-->>Reg: rows (including tags JSON)
Reg->>Reg: jsonToTags -> WorkflowSearch.Result
Reg-->>Client: List[WorkflowSearch.Result]
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs). Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In
`@workflows4s-doobie/src/main/scala/workflows4s/doobie/postgres/PostgresWorkflowRegistry.scala`:
- Around line 92-96: stringToStatus currently matches only
"running"/"awaiting"/"finished" and will throw MatchError for unknown DB values;
update stringToStatus (and any callers) to handle unexpected strings explicitly
— either change its return to Option[ExecutionStatus] or Either[String,
ExecutionStatus] and return None/Left for unknown inputs, or add a default case
(case _ => ExecutionStatus.Unknown or throw new
IllegalArgumentException(s"Unknown status: $s")) so the error is reported
clearly; adjust callers of stringToStatus accordingly (e.g., code that reads
from the DB) to handle the new None/Left/Unknown path.
🧹 Nitpick comments (5)
workflows4s-doobie/src/main/scala/workflows4s/doobie/postgres/PostgresWorkflowRegistry.scala (2)
73-77: Manual JSON serialization may miss edge cases.The
escapeJsonfunction only handles backslash and double-quote escaping but misses other JSON special characters like newlines (\n), tabs (\t), carriage returns (\r), and other control characters. This could produce invalid JSON or allow data corruption for tags containing these characters.Consider using a proper JSON library (Circe is already in the dependency list) for serialization:
♻️ Suggested refactor using Circe
+import io.circe.syntax._ +import io.circe.parser.decode - private def tagsToJson(tags: Map[String, String]): String = - if tags.isEmpty then "{}" - else tags.map { case (k, v) => s""""${escapeJson(k)}":"${escapeJson(v)}"""" }.mkString("{", ",", "}") - - private def escapeJson(s: String): String = s.replace("\\", "\\\\").replace("\"", "\\\"") + private def tagsToJson(tags: Map[String, String]): String = tags.asJson.noSpaces
79-84: Regex-based JSON parsing doesn't unescape values and is fragile.The
jsonToTagsfunction captures escaped strings but doesn't unescape them (e.g.,\"would remain as\"in the result instead of becoming"). This creates asymmetry withtagsToJson. Additionally, regex-based JSON parsing won't handle all valid JSON edge cases.♻️ Suggested refactor using Circe
+import io.circe.parser.decode - private def jsonToTags(json: String): Map[String, String] = - if json == null || json == "{}" then Map.empty - else { - val pattern = """"([^"\\]*(?:\\.[^"\\]*)*)"\s*:\s*"([^"\\]*(?:\\.[^"\\]*)*)"""".r - pattern.findAllMatchIn(json).map(m => m.group(1) -> m.group(2)).toMap - } + private def jsonToTags(json: String): Map[String, String] = + if json == null || json == "{}" then Map.empty + else decode[Map[String, String]](json).getOrElse(Map.empty)workflows4s-doobie/src/main/resources/schema/postgres-workflow-registry-schema.sql (1)
14-15: Consider adding indexes for time-based queries.The
getExecutingWorkflowsquery filters onupdated_at, and search queries can filter/sort oncreated_atandupdated_at. Consider adding indexes for these columns to support query performance at scale.♻️ Suggested additional indexes
CREATE INDEX if not exists idx_executing_workflows_status ON executing_workflows(status); CREATE INDEX if not exists idx_executing_workflows_tags ON executing_workflows USING GIN(tags); +CREATE INDEX if not exists idx_executing_workflows_updated_at ON executing_workflows(updated_at); +CREATE INDEX if not exists idx_executing_workflows_created_at ON executing_workflows(created_at);workflows4s-doobie/src/test/scala/workflows4s/doobie/postgres/PostgresDatabaseWorkflowRegistryTest.scala (2)
123-148: Consider adding tests forInandNotEqualstag filters.The test covers
EqualsandHasKeyfilters but doesn't exerciseInorNotEqualsfilters. Consider adding coverage for completeness.♻️ Suggested additional test cases
val inResults = registry.search(templateId, WorkflowSearch.Query(tagFilters = List(WorkflowSearch.TagFilter.In("region", Set("us", "eu")))) ).unsafeRunSync() inResults.map(_.id) should contain theSameElementsAs List(id1, id3) val notEqualsResults = registry.search(templateId, WorkflowSearch.Query(tagFilters = List(WorkflowSearch.TagFilter.NotEquals("env", "prod"))) ).unsafeRunSync() notEqualsResults.map(_.id) shouldBe List(id2)
163-178: Consider adding offset test for complete pagination coverage.The test covers sorting and limit but doesn't exercise the offset parameter. A test like
offset = 1, limit = 1would verify pagination fully.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
workflows4s-doobie/src/main/resources/schema/postgres-workflow-registry-schema.sqlworkflows4s-doobie/src/main/scala/workflows4s/doobie/postgres/PostgresWorkflowRegistry.scalaworkflows4s-doobie/src/test/scala/workflows4s/doobie/postgres/PostgresDatabaseWorkflowRegistryTest.scala
🧰 Additional context used
🧬 Code graph analysis (2)
workflows4s-doobie/src/test/scala/workflows4s/doobie/postgres/PostgresDatabaseWorkflowRegistryTest.scala (3)
workflows4s-core/src/main/scala/workflows4s/runtime/registry/WorkflowRegistry.scala (1)
WorkflowRegistry(7-23)workflows4s-core/src/main/scala/workflows4s/runtime/registry/WorkflowSearch.scala (5)
WorkflowSearch(14-52)Query(15-29)TagFilter(33-38)Equals(35-35)HasKey(34-34)workflows4s-doobie/src/main/scala/workflows4s/doobie/postgres/PostgresWorkflowRegistry.scala (4)
PostgresWorkflowRegistry(27-169)upsertInstance(47-65)search(108-131)getTags(67-73)
workflows4s-doobie/src/main/scala/workflows4s/doobie/postgres/PostgresWorkflowRegistry.scala (2)
workflows4s-core/src/main/scala/workflows4s/runtime/registry/WorkflowRegistry.scala (1)
WorkflowRegistry(7-23)workflows4s-core/src/main/scala/workflows4s/runtime/registry/WorkflowSearch.scala (3)
WorkflowSearch(14-52)Result(44-51)TagFilter(33-38)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: test
- GitHub Check: Summary
🔇 Additional comments (11)
workflows4s-doobie/src/main/scala/workflows4s/doobie/postgres/PostgresWorkflowRegistry.scala (6)
67-71: LGTM - intentional type casting as per design.The
asInstanceOfcast aligns with the design decision from issue#171(option 2: "allow users to supply State => Tags and perform casting internally based on templateId"). The caller is responsible for ensuring the tagger's state type matches the workflow's state type for a given templateId.
108-131: LGTM - search implementation is well-structured.The search method properly uses Doobie fragments for SQL construction, ensuring protection against SQL injection. The result mapping correctly converts database types to domain types.
139-157: LGTM - filter building is correct and safe.The filter construction properly handles all query parameters with parameterized values. The fallback to
fr"TRUE"when no filters exist is defensive (thoughtemplateIdfilter is always present). TheInfilter correctly returnsFALSEfor empty value sets.
159-167: LGTM - ordering handles nullable columns correctly.The
NULLS LASTclause for wakeup ordering ensures consistent behavior with nullablewakeup_atvalues.
98-106: LGTM - query now correctly filters by running status.Adding the
status = 'running'filter ensures only actually running (not awaiting/finished) workflows are returned for recovery purposes.
54-62: TheWHEREclause in theON CONFLICT DO UPDATEis correct. PostgreSQL explicitly allows qualified column references in theWHEREclause ofON CONFLICT DO UPDATE(as documented in the PostgreSQL INSERT specification). The patternWHERE table_name.column_nameis standard and matches the code's usage. The extensive test coverage confirms the SQL executes successfully.workflows4s-doobie/src/main/resources/schema/postgres-workflow-registry-schema.sql (1)
1-12: LGTM - schema structure aligns with implementation.The schema additions properly support the new search functionality. The
JSONBtype with default'{}'for tags is appropriate, and the nullablewakeup_atmatches theOption[Instant]in the domain model.workflows4s-doobie/src/test/scala/workflows4s/doobie/postgres/PostgresDatabaseWorkflowRegistryTest.scala (4)
91-103: LGTM - good test coverage for templateId filtering.The test correctly verifies that search filters by templateId, returning only matching workflows.
105-121: LGTM - status filtering tests are well-designed.Tests correctly verify both single and multiple status filters, using appropriate matchers for unordered results.
150-161: LGTM - count functionality is properly tested.Tests verify both positive counts and zero counts with filters.
185-185: LGTM - test helper is appropriate.The
dummyAWhelper withnullstate works correctly since the test tagger implementation handles it explicitly.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
workflows4s-doobie/src/main/scala/workflows4s/doobie/postgres/PostgresWorkflowRegistry.scala
Show resolved
Hide resolved
|
@abhinavs1920 Please take a look at coderabbit nitpicks. They should be easily actionable |
a478937 to
0a65e1f
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In
`@workflows4s-doobie/src/main/scala/workflows4s/doobie/postgres/PostgresWorkflowRegistry.scala`:
- Around line 105-112: The unescapeJson function replaces backslashes too early
which corrupts sequences like "\\n"; update unescapeJson (the method named
unescapeJson) to perform the "\\\\" → "\" replacement last (i.e., apply escapes
for \" \b \f \n \r \t first, then replace double-backslashes) so literal
backslash sequences are preserved, or replace the whole custom unescape with a
robust JSON parser approach (e.g., use io.circe.parser.decode in jsonToTags to
parse the JSON into Map[String,String] and avoid manual unescaping).
🧹 Nitpick comments (1)
workflows4s-doobie/src/main/scala/workflows4s/doobie/postgres/PostgresWorkflowRegistry.scala (1)
67-91: Consider using circe for JSON serialization.The manual JSON serialization works, but since circe is already a project dependency, using
io.circe.syntax._and.asJson.noSpaceswould be more robust and handle edge cases automatically.♻️ Optional refactor using circe
+import io.circe.syntax._ - private def tagsToJson(tags: Map[String, String]): String = - if tags.isEmpty then "{}" - else tags.map { case (k, v) => s""""${escapeJson(k)}":"${escapeJson(v)}"""" }.mkString("{", ",", "}") - - private def escapeJson(s: String): String = { - val sb = new StringBuilder - s.foreach { - case '"' => sb.append("\\\"") - case '\\' => sb.append("\\\\") - case '\b' => sb.append("\\b") - case '\f' => sb.append("\\f") - case '\n' => sb.append("\\n") - case '\r' => sb.append("\\r") - case '\t' => sb.append("\\t") - case c if c < ' ' => sb.append(f"\\u${c.toInt}%04x") - case c => sb.append(c) - } - sb.toString - } + private def tagsToJson(tags: Map[String, String]): String = tags.asJson.noSpaces
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
workflows4s-doobie/src/main/resources/schema/postgres-workflow-registry-schema.sqlworkflows4s-doobie/src/main/scala/workflows4s/doobie/postgres/PostgresWorkflowRegistry.scalaworkflows4s-doobie/src/test/scala/workflows4s/doobie/postgres/PostgresDatabaseWorkflowRegistryTest.scala
🚧 Files skipped from review as they are similar to previous changes (1)
- workflows4s-doobie/src/main/resources/schema/postgres-workflow-registry-schema.sql
🧰 Additional context used
🧬 Code graph analysis (2)
workflows4s-doobie/src/test/scala/workflows4s/doobie/postgres/PostgresDatabaseWorkflowRegistryTest.scala (4)
workflows4s-core/src/main/scala/workflows4s/runtime/registry/WorkflowRegistry.scala (1)
WorkflowRegistry(7-23)workflows4s-core/src/main/scala/workflows4s/runtime/registry/WorkflowSearch.scala (7)
WorkflowSearch(14-52)Query(15-29)TagFilter(33-38)Equals(35-35)HasKey(34-34)In(36-36)NotEquals(37-37)workflows4s-doobie/src/main/scala/workflows4s/doobie/postgres/PostgresWorkflowRegistry.scala (5)
PostgresWorkflowRegistry(27-198)upsertInstance(47-65)search(137-160)getTags(67-73)count(162-166)workflows4s-core/src/main/scala/workflows4s/runtime/registry/InMemoryWorkflowRegistry.scala (4)
upsertInstance(41-56)search(67-88)getTags(58-63)count(90-97)
workflows4s-doobie/src/main/scala/workflows4s/doobie/postgres/PostgresWorkflowRegistry.scala (3)
workflows4s-core/src/main/scala/workflows4s/runtime/WorkflowInstanceId.scala (2)
WorkflowInstanceId(3-5)toString(4-4)workflows4s-core/src/main/scala/workflows4s/runtime/registry/WorkflowRegistry.scala (1)
WorkflowRegistry(7-23)workflows4s-core/src/main/scala/workflows4s/runtime/registry/WorkflowSearch.scala (3)
WorkflowSearch(14-52)Result(44-51)TagFilter(33-38)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: test
- GitHub Check: Summary
🔇 Additional comments (11)
workflows4s-doobie/src/main/scala/workflows4s/doobie/postgres/PostgresWorkflowRegistry.scala (7)
19-25: LGTM!The trait extension to include
WorkflowSearch[IO]aligns with the PR objectives and cleanly exposes the new search capabilities.
29-45: LGTM!The addition of the
taggersparameter with a default empty map maintains backward compatibility while enabling the new tagging functionality.
47-65: LGTM!The upsert logic correctly handles initial insert vs. update scenarios, preserving
created_aton updates while updatingstatus,updated_at,wakeup_at, andtags.
114-125: LGTM!Status mapping is correct, and the previous review concern about missing default case has been addressed with explicit error handling.
127-135: LGTM!The addition of
status = 'running'filter correctly ensures only actively running workflows are considered for recovery detection.
137-196: LGTM on search/count implementation.The search and count methods are well-structured with proper filter composition, pagination support, and
NULLS LASThandling for wakeup sorting.
183-184: No semantic inconsistency found—both implementations correctly exclude rows with missing keys.The
NotEqualsfilter behaves identically in both registries:
InMemoryWorkflowRegistry:d.tags.get(k).exists(_ != v)returnsfalsefor missing keysPostgresWorkflowRegistry:tags ->> $k <> $vreturnsNULLfor missing keys (excluded in WHERE clause)Both implementations exclude rows where the key is absent, making them semantically consistent.
workflows4s-doobie/src/test/scala/workflows4s/doobie/postgres/PostgresDatabaseWorkflowRegistryTest.scala (4)
20-25: LGTM!Test setup with proper cleanup via
TRUNCATEensures test isolation.
27-89: LGTM!Existing tests provide good coverage of core registry functionality including status tracking and update time filtering.
123-160: LGTM on tag search tests!Good coverage of all tag filter types. The use of
mutable.Mapfor dynamic tag assignment in tests is pragmatic for this test scenario.
162-196: LGTM!Count and pagination tests effectively verify the sorting and limiting behavior using clock advancement for deterministic timestamp ordering.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
workflows4s-doobie/src/main/scala/workflows4s/doobie/postgres/PostgresWorkflowRegistry.scala
Outdated
Show resolved
Hide resolved
e36efd1 to
ad4c4f5
Compare
Thanks for letting me know. |
workflows4s-doobie/src/main/scala/workflows4s/doobie/postgres/PostgresWorkflowRegistry.scala
Outdated
Show resolved
Hide resolved
workflows4s-doobie/src/main/scala/workflows4s/doobie/postgres/PostgresWorkflowRegistry.scala
Outdated
Show resolved
Hide resolved
ad4c4f5 to
d833e09
Compare
There was a problem hiding this comment.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
workflows4s-doobie/src/test/scala/workflows4s/doobie/postgres/PostgresDatabaseWorkflowRegistryTest.scala (1)
28-89: Avoid order-dependent assertions for unordered DB results.
getExecutingWorkflowsdoesn’t specify an ORDER BY, so list order isn’t guaranteed; the equality assertions can flake. Prefer order-insensitive checks (or add ordering in the query).✅ Safer assertions
- assert(workflows === List(id1, id4)) + workflows should contain theSameElementsAs List(id1, id4) ... - assert(workflows === List(id1, id2)) + workflows should contain theSameElementsAs List(id1, id2)
🧹 Nitpick comments (1)
workflows4s-doobie/src/main/resources/schema/postgres-workflow-registry-schema.sql (1)
14-17: Consider indexingwakeup_atif wakeup filters/sorts are common.Search supports wakeup-based filters and ordering; a (possibly partial) index can help performance under load.
🔎 Optional index
+CREATE INDEX if not exists idx_executing_workflows_wakeup_at + ON executing_workflows(wakeup_at) + WHERE wakeup_at IS NOT NULL;
Required for JSON parsing in PostgresWorkflowRegistry
Use wildcard import 'io.circe.parser.*' instead of specific 'decode' import to fix CI compilation error
2bac9f4 to
6ce5fda
Compare
|
Heyy @Krever |
Closes #171 #168
This extends PostgresWorkflowRegistry to support workflow search with tags, following the same approach as InMemoryWorkflowRegistry.
Changes:
Added status, created_at, updated_at, wakeup_at, and tags (JSONB) columns to the schema
PostgresWorkflowRegistry now implements WorkflowSearch[IO] with search() and count() methods
Accepts optional taggers parameter for generating tags from workflow state
Supports all tag filters: HasKey, Equals, In, NotEquals
Breaking change: Schema needs migration for existing tables (new columns added).
Summary by CodeRabbit
New Features
Performance
Tests
Chores