From b3a4bcdfa13a4da34bbb90dc626be715ae4303a9 Mon Sep 17 00:00:00 2001 From: Collin Beczak Date: Sat, 7 Feb 2026 10:51:12 -0600 Subject: [PATCH] Implement tile aggregation system for efficient map display - Introduced a new `TileAggregate` model to represent pre-computed tile data, including task counts and centroid coordinates. - Added `TileAggregateRepository` for database interactions, enabling retrieval and management of tile aggregates. - Developed `TileAggregateService` to handle tile data processing, including filtering by difficulty and global status. - Enhanced `TaskController` with a new endpoint to fetch task tiles, returning either clusters or individual task markers based on task count. - Implemented a scheduled job for refreshing tile aggregates, ensuring up-to-date data for map displays. - Updated configuration to include tile refresh interval and batch size settings. - Created SQL migrations for the new tile aggregation tables and functions, supporting efficient data management. --- app/org/maproulette/Config.scala | 3 + .../framework/controller/TaskController.scala | 54 ++ .../framework/model/TileAggregate.scala | 150 ++++++ .../repository/TileAggregateRepository.scala | 491 +++++++++++++++++ .../framework/service/ServiceManager.scala | 5 +- .../service/TileAggregateService.scala | 493 ++++++++++++++++++ app/org/maproulette/jobs/Scheduler.scala | 7 + app/org/maproulette/jobs/SchedulerActor.scala | 26 + conf/application.conf | 4 + conf/evolutions/default/107.sql | 326 ++++++++++++ conf/v2_route/task.api | 62 +++ test/org/maproulette/utils/TestSpec.scala | 4 +- 12 files changed, 1623 insertions(+), 2 deletions(-) create mode 100644 app/org/maproulette/framework/model/TileAggregate.scala create mode 100644 app/org/maproulette/framework/repository/TileAggregateRepository.scala create mode 100644 app/org/maproulette/framework/service/TileAggregateService.scala create mode 100644 conf/evolutions/default/107.sql diff --git a/app/org/maproulette/Config.scala b/app/org/maproulette/Config.scala index 47fb8d769..af4a23640 100644 --- a/app/org/maproulette/Config.scala +++ b/app/org/maproulette/Config.scala @@ -348,6 +348,8 @@ object Config { val KEY_SCHEDULER_SNAPSHOT_CHALLENGES_INTERVAL = s"$SUB_GROUP_SCHEDULER.challengesSnapshot.interval" val KEY_SCHEDULER_SNAPSHOT_CHALLENGES_START = s"$SUB_GROUP_SCHEDULER.challengesSnapshot.startTime" + val KEY_SCHEDULER_TILE_REFRESH_INTERVAL = s"$SUB_GROUP_SCHEDULER.tileRefresh.interval" + val KEY_SCHEDULER_TILE_REFRESH_BATCH_SIZE = s"$SUB_GROUP_SCHEDULER.tileRefresh.batchSize" val KEY_MAPROULETTE_FRONTEND = s"$GROUP_MAPROULETTE.frontend" val SUB_GROUP_MAPILLARY = s"$GROUP_MAPROULETTE.mapillary" @@ -392,6 +394,7 @@ object Config { val DEFAULT_MR3_HOST = "/external" val DEFAULT_VIRTUAL_CHALLENGE_LIMIT = 100 val DEFAULT_VIRTUAL_CHALLENGE_BATCH_SIZE = 500 + val DEFAULT_TILE_REFRESH_BATCH_SIZE = 1000 val DEFAULT_VIRTUAL_CHALLENGE_EXPIRY = "6 hours" val DEFAULT_CHANGESET_HOUR_LIMIT = "1 hour" val DEFAULT_CHANGESET_ENABLED = false diff --git a/app/org/maproulette/framework/controller/TaskController.scala b/app/org/maproulette/framework/controller/TaskController.scala index d3f2397fb..d62629d0e 100644 --- a/app/org/maproulette/framework/controller/TaskController.scala +++ b/app/org/maproulette/framework/controller/TaskController.scala @@ -327,6 +327,60 @@ class TaskController @Inject() ( } } + /** + * Gets task data using pre-computed tile aggregates for efficient map display at scale. + * Uses a tile pyramid system with pre-computed counts broken down by difficulty × global. + * + * Behavior by filter: + * - difficulty & global: Filtered from pre-computed tile data (fast) + * - location_id: Recursive tile drilling until within polygon or < 2000 tasks + * - keywords: Falls back to dynamic query (challenge-level filter, not pre-computed) + * + * All fetched data is re-clustered into ~80 clusters for display. + * When total tasks < 2000, returns individual task markers instead of clusters. + * + * @param z Zoom level (0-14 for pre-computed tiles) + * @param bounds Comma-separated bounding box: left,bottom,right,top + * @param global Whether to include global challenges + * @param location_id Optional Nominatim place_id for polygon filtering + * @param keywords Optional keywords filter (triggers fallback to dynamic query) + * @param difficulty Optional difficulty filter (1=Easy, 2=Normal, 3=Expert) + * @return TaskMarkerResponse with totalCount and either clusters or tasks (with overlaps) + */ + def getTaskTiles( + z: Int, + bounds: String, + global: Boolean, + location_id: Option[Long], + keywords: Option[String], + difficulty: Option[Int] + ): Action[AnyContent] = Action.async { implicit request => + this.sessionManager.userAwareRequest { implicit user => + val boundingBox = bounds.split(",").map(_.trim.toDouble).toList match { + case List(left, bottom, right, top) => + SearchLocation(left, bottom, right, top) + case _ => + SearchLocation(-180.0, -90.0, 180.0, 90.0) + } + + // Delegate all logic to the TileAggregateService + // The service handles: + // - difficulty & global filtering from pre-computed tile breakdowns + // - location_id filtering with recursive tile drilling + // - keywords filtering via fallback to dynamic query + // - re-clustering into ~80 clusters for display + val response = this.serviceManager.tileAggregate.getTileData( + z, + boundingBox, + difficulty, + global, + location_id, + keywords + ) + Ok(Json.toJson(response)) + } + } + // for getting more detailed task marker data on individul makrers // def getTaskMarkerData(id: Long): Action[AnyContent] = Action.async { implicit request => // this.sessionManager.userAwareRequest { implicit user => diff --git a/app/org/maproulette/framework/model/TileAggregate.scala b/app/org/maproulette/framework/model/TileAggregate.scala new file mode 100644 index 000000000..3e8b2d43b --- /dev/null +++ b/app/org/maproulette/framework/model/TileAggregate.scala @@ -0,0 +1,150 @@ +/* + * Copyright (C) 2020 MapRoulette contributors (see CONTRIBUTORS.md). + * Licensed under the Apache License, Version 2.0 (see LICENSE). + */ +package org.maproulette.framework.model + +import play.api.libs.json._ + +/** + * Represents a cluster point (centroid with count) for map display + * + * @param lat Cluster centroid latitude + * @param lng Cluster centroid longitude + * @param count Number of tasks in this cluster + */ +case class ClusterPoint( + lat: Double, + lng: Double, + count: Int +) + +object ClusterPoint { + implicit val clusterPointFormat: Format[ClusterPoint] = Json.format[ClusterPoint] +} + +/** + * Counts broken down by difficulty × global filter combinations. + * Keys are like "d1_gf" = difficulty 1, global false + * + * @param d1_gf Difficulty 1 (Easy), global false + * @param d1_gt Difficulty 1 (Easy), global true + * @param d2_gf Difficulty 2 (Normal), global false + * @param d2_gt Difficulty 2 (Normal), global true + * @param d3_gf Difficulty 3 (Expert), global false + * @param d3_gt Difficulty 3 (Expert), global true + * @param d0_gf Difficulty not set, global false + * @param d0_gt Difficulty not set, global true + */ +case class FilterCounts( + d1_gf: Int = 0, + d1_gt: Int = 0, + d2_gf: Int = 0, + d2_gt: Int = 0, + d3_gf: Int = 0, + d3_gt: Int = 0, + d0_gf: Int = 0, + d0_gt: Int = 0 +) { + + /** + * Get count for specific difficulty and global filters + * + * @param difficulty Optional difficulty filter (1, 2, 3) + * @param global Whether to include global challenges (true = all, false = non-global only) + * @return Filtered count + */ + def getFilteredCount(difficulty: Option[Int], global: Boolean): Int = { + // global=true means "include global challenges" → return ALL tasks + // global=false means "exclude global challenges" → return only non-global (*_gf) + difficulty match { + case Some(1) => if (global) d1_gf + d1_gt else d1_gf + case Some(2) => if (global) d2_gf + d2_gt else d2_gf + case Some(3) => if (global) d3_gf + d3_gt else d3_gf + case None => + // No difficulty filter - sum all difficulties + if (global) d1_gf + d1_gt + d2_gf + d2_gt + d3_gf + d3_gt + d0_gf + d0_gt + else d1_gf + d2_gf + d3_gf + d0_gf + case _ => + // Unknown difficulty - treat as "other" + if (global) d0_gf + d0_gt else d0_gf + } + } + + /** + * Get total count (all combinations) + */ + def total: Int = d1_gf + d1_gt + d2_gf + d2_gt + d3_gf + d3_gt + d0_gf + d0_gt +} + +object FilterCounts { + implicit val filterCountsFormat: Format[FilterCounts] = Json.format[FilterCounts] + + def fromJson(json: JsValue): FilterCounts = { + FilterCounts( + d1_gf = (json \ "d1_gf").asOpt[Int].getOrElse(0), + d1_gt = (json \ "d1_gt").asOpt[Int].getOrElse(0), + d2_gf = (json \ "d2_gf").asOpt[Int].getOrElse(0), + d2_gt = (json \ "d2_gt").asOpt[Int].getOrElse(0), + d3_gf = (json \ "d3_gf").asOpt[Int].getOrElse(0), + d3_gt = (json \ "d3_gt").asOpt[Int].getOrElse(0), + d0_gf = (json \ "d0_gf").asOpt[Int].getOrElse(0), + d0_gt = (json \ "d0_gt").asOpt[Int].getOrElse(0) + ) + } +} + +/** + * Represents a pre-computed tile aggregate for efficient map display at scale. + * Uses Web Mercator tile coordinates (z/x/y). + * Only tracks tasks with status 0, 3, or 6. + * + * @param z Zoom level (0-14 for pre-computed) + * @param x Tile X coordinate + * @param y Tile Y coordinate + * @param taskCount Total tasks in this tile (all filters) + * @param countsByFilter Counts broken down by difficulty × global + * @param centroidLat Centroid latitude of all tasks in tile + * @param centroidLng Centroid longitude of all tasks in tile + */ +case class TileAggregate( + z: Int, + x: Int, + y: Int, + taskCount: Int, + countsByFilter: FilterCounts, + centroidLat: Double, + centroidLng: Double +) { + + /** + * Get the filtered count for this tile based on difficulty and global filters + */ + def getFilteredCount(difficulty: Option[Int], global: Boolean): Int = { + countsByFilter.getFilteredCount(difficulty, global) + } +} + +object TileAggregate { + implicit val tileAggregateWrites: Writes[TileAggregate] = Json.writes[TileAggregate] + implicit val tileAggregateReads: Reads[TileAggregate] = Json.reads[TileAggregate] +} + +/** + * Response for tile-based queries with combined clusters. + * All data from tiles is combined and re-clustered into ~80 clusters. + * + * @param totalCount Total tasks matching the filter + * @param clusters Combined clusters (target ~80) + * @param tasks Individual task markers (when total < threshold) + */ +case class TileDataResponse( + totalCount: Int, + clusters: Option[List[ClusterPoint]] = None, + tasks: Option[List[TaskMarker]] = None +) + +object TileDataResponse { + implicit val tileDataResponseWrites: Writes[TileDataResponse] = Json.writes[TileDataResponse] + implicit val tileDataResponseReads: Reads[TileDataResponse] = Json.reads[TileDataResponse] +} diff --git a/app/org/maproulette/framework/repository/TileAggregateRepository.scala b/app/org/maproulette/framework/repository/TileAggregateRepository.scala new file mode 100644 index 000000000..af4767e0d --- /dev/null +++ b/app/org/maproulette/framework/repository/TileAggregateRepository.scala @@ -0,0 +1,491 @@ +/* + * Copyright (C) 2020 MapRoulette contributors (see CONTRIBUTORS.md). + * Licensed under the Apache License, Version 2.0 (see LICENSE). + */ + +package org.maproulette.framework.repository + +import java.sql.Connection + +import anorm._ +import anorm.SqlParser.{get, int, double} +import javax.inject.{Inject, Singleton} +import org.maproulette.framework.model.{TileAggregate, FilterCounts, TaskMarker, TaskMarkerLocation} +import org.maproulette.session.SearchLocation +import play.api.db.Database +import play.api.libs.json.Json + +/** + * Repository for accessing pre-computed tile aggregates. + * Tiles track tasks with status 0, 3, or 6, with breakdowns by difficulty and global. + */ +@Singleton +class TileAggregateRepository @Inject() (override val db: Database) extends RepositoryMixin { + implicit val baseTable: String = "tile_aggregates" + + /** + * Parser for converting database rows to TileAggregate objects + */ + private val tileAggregateParser: RowParser[TileAggregate] = { + get[Int]("z") ~ + get[Int]("x") ~ + get[Int]("y") ~ + get[Int]("task_count") ~ + get[Option[String]]("counts_by_filter") ~ + get[Option[Double]]("centroid_lat") ~ + get[Option[Double]]("centroid_lng") map { + case z ~ x ~ y ~ taskCount ~ countsJson ~ centroidLat ~ centroidLng => + val filterCounts = countsJson + .map { json => + try { + FilterCounts.fromJson(Json.parse(json)) + } catch { + case _: Exception => FilterCounts() + } + } + .getOrElse(FilterCounts()) + + TileAggregate( + z, + x, + y, + taskCount, + filterCounts, + centroidLat.getOrElse(0.0), + centroidLng.getOrElse(0.0) + ) + } + } + + /** + * Get tiles for a bounding box at a specific zoom level + */ + def getTilesInBounds( + zoom: Int, + bounds: SearchLocation + )(implicit c: Option[Connection] = None): List[TileAggregate] = { + this.withMRConnection { implicit c => + val minX = lngToTileX(bounds.left, zoom) + val maxX = lngToTileX(bounds.right, zoom) + val minY = latToTileY(bounds.top, zoom) + val maxY = latToTileY(bounds.bottom, zoom) + + SQL""" + SELECT z, x, y, task_count, counts_by_filter::text as counts_by_filter, + centroid_lat, centroid_lng + FROM tile_aggregates + WHERE z = $zoom + AND x >= $minX AND x <= $maxX + AND y >= $minY AND y <= $maxY + AND task_count > 0 + """.as(tileAggregateParser.*) + } + } + + /** + * Get a single tile by coordinates + */ + def getTile(z: Int, x: Int, y: Int)( + implicit c: Option[Connection] = None + ): Option[TileAggregate] = { + this.withMRConnection { implicit c => + SQL""" + SELECT z, x, y, task_count, counts_by_filter::text as counts_by_filter, + centroid_lat, centroid_lng + FROM tile_aggregates + WHERE z = $z AND x = $x AND y = $y + """.as(tileAggregateParser.singleOpt) + } + } + + /** + * Get child tiles for recursive drilling (zoom level z+1) + */ + def getChildTiles(z: Int, x: Int, y: Int)( + implicit c: Option[Connection] = None + ): List[TileAggregate] = { + this.withMRConnection { implicit c => + // Child tiles at z+1: (x*2, y*2), (x*2+1, y*2), (x*2, y*2+1), (x*2+1, y*2+1) + val childZ = z + 1 + val childXMin = x * 2 + val childXMax = x * 2 + 1 + val childYMin = y * 2 + val childYMax = y * 2 + 1 + + SQL""" + SELECT z, x, y, task_count, counts_by_filter::text as counts_by_filter, + centroid_lat, centroid_lng + FROM tile_aggregates + WHERE z = $childZ + AND x >= $childXMin AND x <= $childXMax + AND y >= $childYMin AND y <= $childYMax + AND task_count > 0 + """.as(tileAggregateParser.*) + } + } + + /** + * Get tile bounds as (west, south, east, north) + */ + def getTileBounds(z: Int, x: Int, y: Int): (Double, Double, Double, Double) = { + val west = tileToLng(x, z) + val east = tileToLng(x + 1, z) + val north = tileToLat(y, z) + val south = tileToLat(y + 1, z) + (west, south, east, north) + } + + /** + * Get tile bounds as SearchLocation + */ + def getTileBoundsAsSearchLocation(z: Int, x: Int, y: Int): SearchLocation = { + val (west, south, east, north) = getTileBounds(z, x, y) + SearchLocation(west, south, east, north) + } + + /** + * Fetch all task markers in a bounding box with a single query. + * Much more efficient than querying per-tile when total count is low. + */ + def getTaskMarkersInBounds( + bounds: SearchLocation, + difficulty: Option[Int] = None, + global: Boolean = false + )(implicit c: Option[Connection] = None): List[TaskMarker] = { + this.withMRConnection { implicit c => + val boundsGeom = + s"ST_MakeEnvelope(${bounds.left}, ${bounds.bottom}, ${bounds.right}, ${bounds.top}, 4326)" + + var query = s""" + SELECT DISTINCT tasks.id, ST_Y(tasks.location) as lat, ST_X(tasks.location) as lng, + tasks.status, tasks.priority, tasks.bundle_id, l.user_id as locked_by + FROM tasks + INNER JOIN challenges c ON c.id = tasks.parent_id + INNER JOIN projects p ON p.id = c.parent_id + LEFT JOIN locked l ON l.item_id = tasks.id AND l.item_type = 2 + WHERE tasks.location && $boundsGeom + AND ST_Intersects(tasks.location, $boundsGeom) + AND tasks.status IN (0, 3, 6) + AND c.deleted = false AND c.enabled = true AND c.is_archived = false + AND p.deleted = false AND p.enabled = true + """ + + if (!global) { + query += " AND c.is_global = false" + } + + difficulty.foreach { d => + query += s" AND c.difficulty = $d" + } + + SQL(query).as(taskMarkerParser.*) + } + } + + /** + * Fetch actual task markers for a tile (for low-count tiles) + * Filters by status 0, 3, 6 and optionally by difficulty and global + */ + def getTaskMarkersForTile( + z: Int, + x: Int, + y: Int, + difficulty: Option[Int] = None, + global: Boolean = false + )(implicit c: Option[Connection] = None): List[TaskMarker] = { + this.withMRConnection { implicit c => + val (west, south, east, north) = getTileBounds(z, x, y) + val tileGeom = s"ST_MakeEnvelope($west, $south, $east, $north, 4326)" + + var query = s""" + SELECT DISTINCT tasks.id, ST_Y(tasks.location) as lat, ST_X(tasks.location) as lng, + tasks.status, tasks.priority, tasks.bundle_id, l.user_id as locked_by + FROM tasks + INNER JOIN challenges c ON c.id = tasks.parent_id + INNER JOIN projects p ON p.id = c.parent_id + LEFT JOIN locked l ON l.item_id = tasks.id AND l.item_type = 2 + WHERE tasks.location && $tileGeom + AND ST_Intersects(tasks.location, $tileGeom) + AND tasks.status IN (0, 3, 6) + AND c.deleted = false AND c.enabled = true AND c.is_archived = false + AND p.deleted = false AND p.enabled = true + """ + + if (!global) { + query += " AND c.is_global = false" + } + + difficulty.foreach { d => + query += s" AND c.difficulty = $d" + } + + SQL(query).as(taskMarkerParser.*) + } + } + + /** + * Count tasks within a polygon (for deciding fetch vs cluster strategy) + */ + def countTasksInPolygon( + polygonWkt: String, + difficulty: Option[Int] = None, + global: Boolean = false + )(implicit c: Option[Connection] = None): Int = { + this.withMRConnection { implicit c => + // Use bounding box of polygon for index acceleration + var query = s""" + SELECT COUNT(DISTINCT tasks.id)::int as count + FROM tasks + INNER JOIN challenges c ON c.id = tasks.parent_id + INNER JOIN projects p ON p.id = c.parent_id + WHERE tasks.location && ST_GeomFromText('$polygonWkt', 4326) + AND ST_Intersects(tasks.location, ST_GeomFromText('$polygonWkt', 4326)) + AND tasks.status IN (0, 3, 6) + AND c.deleted = false AND c.enabled = true AND c.is_archived = false + AND p.deleted = false AND p.enabled = true + """ + + if (!global) { + query += " AND c.is_global = false" + } + + difficulty.foreach { d => + query += s" AND c.difficulty = $d" + } + + SQL(query).as(SqlParser.int("count").single) + } + } + + /** + * Fetch task markers within a polygon (for location_id filtering) + * Uses bounding box operator for index acceleration. + */ + def getTaskMarkersInPolygon( + polygonWkt: String, + difficulty: Option[Int] = None, + global: Boolean = false, + limit: Option[Int] = None + )(implicit c: Option[Connection] = None): List[TaskMarker] = { + this.withMRConnection { implicit c => + // Use && operator with polygon for index acceleration + var query = s""" + SELECT DISTINCT tasks.id, ST_Y(tasks.location) as lat, ST_X(tasks.location) as lng, + tasks.status, tasks.priority, tasks.bundle_id, l.user_id as locked_by + FROM tasks + INNER JOIN challenges c ON c.id = tasks.parent_id + INNER JOIN projects p ON p.id = c.parent_id + LEFT JOIN locked l ON l.item_id = tasks.id AND l.item_type = 2 + WHERE tasks.location && ST_GeomFromText('$polygonWkt', 4326) + AND ST_Intersects(tasks.location, ST_GeomFromText('$polygonWkt', 4326)) + AND tasks.status IN (0, 3, 6) + AND c.deleted = false AND c.enabled = true AND c.is_archived = false + AND p.deleted = false AND p.enabled = true + """ + + if (!global) { + query += " AND c.is_global = false" + } + + difficulty.foreach { d => + query += s" AND c.difficulty = $d" + } + + limit.foreach { l => + query += s" LIMIT $l" + } + + SQL(query).as(taskMarkerParser.*) + } + } + + /** + * Get clustered task markers within a polygon using PostGIS kmeans. + * Returns cluster centroids with counts. + */ + def getClusteredTasksInPolygon( + polygonWkt: String, + difficulty: Option[Int] = None, + global: Boolean = false, + numClusters: Int = 80 + )(implicit c: Option[Connection] = None): List[ClusterPoint] = { + this.withMRConnection { implicit c => + var filterClause = s""" + tasks.location && ST_GeomFromText('$polygonWkt', 4326) + AND ST_Intersects(tasks.location, ST_GeomFromText('$polygonWkt', 4326)) + AND tasks.status IN (0, 3, 6) + AND c.deleted = false AND c.enabled = true AND c.is_archived = false + AND p.deleted = false AND p.enabled = true + """ + + if (!global) { + filterClause += " AND c.is_global = false" + } + + difficulty.foreach { d => + filterClause += s" AND c.difficulty = $d" + } + + val query = s""" + WITH task_points AS ( + SELECT tasks.location + FROM tasks + INNER JOIN challenges c ON c.id = tasks.parent_id + INNER JOIN projects p ON p.id = c.parent_id + WHERE $filterClause + ), + clustered AS ( + SELECT ST_ClusterKMeans(location, $numClusters) OVER() as cluster_id, location + FROM task_points + ) + SELECT + AVG(ST_Y(location)) as lat, + AVG(ST_X(location)) as lng, + COUNT(*)::int as count + FROM clustered + GROUP BY cluster_id + HAVING COUNT(*) > 0 + """ + + SQL(query).as(clusterPointParser.*) + } + } + + private val clusterPointParser: RowParser[ClusterPoint] = { + get[Double]("lat") ~ + get[Double]("lng") ~ + get[Int]("count") map { + case lat ~ lng ~ count => + ClusterPoint(lat, lng, count) + } + } + + /** + * Fetch task markers for a tile that are also within a polygon. + * Uses both tile bounds and polygon intersection for filtering. + */ + def getTaskMarkersForTileInPolygon( + z: Int, + x: Int, + y: Int, + polygonWkt: String, + difficulty: Option[Int] = None, + global: Boolean = false + )(implicit c: Option[Connection] = None): List[TaskMarker] = { + this.withMRConnection { implicit c => + val (west, south, east, north) = getTileBounds(z, x, y) + val tileGeom = s"ST_MakeEnvelope($west, $south, $east, $north, 4326)" + + var query = s""" + SELECT DISTINCT tasks.id, ST_Y(tasks.location) as lat, ST_X(tasks.location) as lng, + tasks.status, tasks.priority, tasks.bundle_id, l.user_id as locked_by + FROM tasks + INNER JOIN challenges c ON c.id = tasks.parent_id + INNER JOIN projects p ON p.id = c.parent_id + LEFT JOIN locked l ON l.item_id = tasks.id AND l.item_type = 2 + WHERE tasks.location && $tileGeom + AND ST_Intersects(tasks.location, $tileGeom) + AND ST_Intersects(tasks.location, ST_GeomFromText('$polygonWkt', 4326)) + AND tasks.status IN (0, 3, 6) + AND c.deleted = false AND c.enabled = true AND c.is_archived = false + AND p.deleted = false AND p.enabled = true + """ + + if (!global) { + query += " AND c.is_global = false" + } + + difficulty.foreach { d => + query += s" AND c.difficulty = $d" + } + + SQL(query).as(taskMarkerParser.*) + } + } + + private val taskMarkerParser: RowParser[TaskMarker] = { + get[Long]("id") ~ + get[Double]("lat") ~ + get[Double]("lng") ~ + get[Int]("status") ~ + get[Int]("priority") ~ + get[Option[Long]]("bundle_id") ~ + get[Option[Long]]("locked_by") map { + case id ~ lat ~ lng ~ status ~ priority ~ bundleId ~ lockedBy => + TaskMarker(id, TaskMarkerLocation(lat, lng), status, priority, bundleId, lockedBy) + } + } + + /** + * Process the tile refresh queue + */ + def processRefreshQueue(batchSize: Int = 1000)(implicit c: Option[Connection] = None): Int = { + this.withMRTransaction { implicit c => + SQL"SELECT process_tile_refresh_queue($batchSize)" + .as(SqlParser.int("process_tile_refresh_queue").single) + } + } + + /** + * Get count of queued tiles awaiting refresh + */ + def getQueueSize()(implicit c: Option[Connection] = None): Int = { + this.withMRConnection { implicit c => + SQL"SELECT COUNT(*)::int as count FROM tile_refresh_queue" + .as(SqlParser.int("count").single) + } + } + + /** + * Rebuild all tiles for a specific zoom level + */ + def rebuildZoomLevel(zoom: Int)(implicit c: Option[Connection] = None): Int = { + this.withMRTransaction { implicit c => + SQL"SELECT rebuild_zoom_level($zoom)" + .as(SqlParser.int("rebuild_zoom_level").single) + } + } + + /** + * Clear the refresh queue + */ + def clearRefreshQueue()(implicit c: Option[Connection] = None): Int = { + this.withMRTransaction { implicit c => + SQL"DELETE FROM tile_refresh_queue".executeUpdate() + } + } + + /** + * Get total count of pre-computed tiles + */ + def getTotalTileCount()(implicit c: Option[Connection] = None): Int = { + this.withMRConnection { implicit c => + SQL"SELECT COUNT(*)::int as count FROM tile_aggregates" + .as(SqlParser.int("count").single) + } + } + + // Web Mercator coordinate conversion functions + def lngToTileX(lng: Double, zoom: Int): Int = { + math.floor((lng + 180.0) / 360.0 * (1 << zoom)).toInt + } + + def latToTileY(lat: Double, zoom: Int): Int = { + val latClamped = math.max(-85.0511, math.min(85.0511, lat)) + val latRad = math.toRadians(latClamped) + math + .floor( + (1.0 - math.log(math.tan(latRad) + 1.0 / math.cos(latRad)) / math.Pi) / 2.0 * (1 << zoom) + ) + .toInt + } + + def tileToLng(x: Int, zoom: Int): Double = { + x.toDouble / (1 << zoom) * 360.0 - 180.0 + } + + def tileToLat(y: Int, zoom: Int): Double = { + val n = math.Pi - 2.0 * math.Pi * y.toDouble / (1 << zoom) + math.toDegrees(math.atan(math.sinh(n))) + } +} diff --git a/app/org/maproulette/framework/service/ServiceManager.scala b/app/org/maproulette/framework/service/ServiceManager.scala index 55ad2fab8..291fc7b35 100644 --- a/app/org/maproulette/framework/service/ServiceManager.scala +++ b/app/org/maproulette/framework/service/ServiceManager.scala @@ -39,7 +39,8 @@ class ServiceManager @Inject() ( notificationService: Provider[NotificationService], leaderboardService: Provider[LeaderboardService], taskHistoryService: Provider[TaskHistoryService], - nominatimService: Provider[NominatimService] + nominatimService: Provider[NominatimService], + tileAggregateService: Provider[TileAggregateService] ) { def comment: CommentService = commentService.get() @@ -98,4 +99,6 @@ class ServiceManager @Inject() ( def leaderboard: LeaderboardService = leaderboardService.get() def nominatim: NominatimService = nominatimService.get() + + def tileAggregate: TileAggregateService = tileAggregateService.get() } diff --git a/app/org/maproulette/framework/service/TileAggregateService.scala b/app/org/maproulette/framework/service/TileAggregateService.scala new file mode 100644 index 000000000..df1147a88 --- /dev/null +++ b/app/org/maproulette/framework/service/TileAggregateService.scala @@ -0,0 +1,493 @@ +/* + * Copyright (C) 2020 MapRoulette contributors (see CONTRIBUTORS.md). + * Licensed under the Apache License, Version 2.0 (see LICENSE). + */ + +package org.maproulette.framework.service + +import javax.inject.{Inject, Singleton} +import org.maproulette.framework.model.{ + TaskMarker, + TileAggregate, + ClusterPoint, + TaskMarkerResponse, + TaskClusterSummary, + OverlappingTaskMarker, + Point +} +import org.maproulette.framework.repository.{TileAggregateRepository, TaskClusterRepository} +import org.maproulette.session.SearchLocation +import org.slf4j.LoggerFactory +import play.api.libs.json.Json + +import scala.collection.mutable.ListBuffer + +/** + * Service layer for tile-based task aggregation. + * Provides efficient map display for large datasets by using pre-computed tiles + * with filtering by difficulty and global, recursive drilling for location_id, + * and re-clustering into ~80 clusters. + */ +@Singleton +class TileAggregateService @Inject() ( + repository: TileAggregateRepository, + taskClusterRepository: TaskClusterRepository, + nominatimService: NominatimService +) { + private val logger = LoggerFactory.getLogger(this.getClass) + + // Threshold for switching from clusters to individual tasks + val CLUSTER_THRESHOLD = 2000 + + // Maximum pre-computed zoom level + val MAX_PRECOMPUTED_ZOOM = 14 + + // Minimum zoom level to query - ensures we get enough tiles for good clustering + // Zoom 10 = 1024x1024 tiles globally, giving fine-grained data for k-means + // At wide view, this yields many tiles, enabling up to 80 well-distributed clusters + val MIN_QUERY_ZOOM = 10 + + // Target number of clusters for final output + val TARGET_CLUSTERS = 80 + + /** + * Get tile data for a bounding box with filtering. + * Supports difficulty, global, location_id, and keywords filters. + * Returns TaskMarkerResponse with clusters, tasks, and overlapping tasks. + * + * @param zoom Zoom level + * @param bounds Bounding box + * @param difficulty Optional difficulty filter + * @param global Include global challenges + * @param locationId Optional Nominatim place_id for polygon filtering + * @param keywords Optional keywords (triggers fallback to dynamic query) + * @return TaskMarkerResponse with clusters or tasks (including overlaps) + */ + def getTileData( + zoom: Int, + bounds: SearchLocation, + difficulty: Option[Int] = None, + global: Boolean = false, + locationId: Option[Long] = None, + keywords: Option[String] = None + ): TaskMarkerResponse = { + + // Keywords filter requires fallback (challenge-level filter, not pre-computed) + if (keywords.exists(_.trim.nonEmpty)) { + return getFallbackData(bounds, difficulty, global, locationId, keywords) + } + + // Use minimum query zoom to ensure enough tiles for good clustering + // At zoom 10, we get fine-grained tiles for better cluster distribution + val effectiveZoom = math.max(zoom, MIN_QUERY_ZOOM) + + // Collect all data points (either tile centroids or actual tasks) + val collectedPoints = ListBuffer[ClusterPoint]() + val collectedTasks = ListBuffer[TaskMarker]() + + if (locationId.isDefined) { + // Location ID filtering with recursive drilling + processLocationFiltering( + effectiveZoom, + bounds, + difficulty, + global, + locationId.get, + collectedPoints, + collectedTasks + ) + } else { + // Standard tile-based processing + processTiles(effectiveZoom, bounds, difficulty, global, collectedPoints, collectedTasks) + } + + // Calculate total count + val totalCount = collectedPoints.map(_.count).sum + collectedTasks.size + + // If we have few enough tasks, return them with overlap detection + if (totalCount < CLUSTER_THRESHOLD && collectedTasks.nonEmpty) { + val (singleMarkers, overlappingMarkers) = detectOverlaps(collectedTasks.toList) + return TaskMarkerResponse( + totalCount = totalCount, + tasks = Some(singleMarkers), + overlappingTasks = if (overlappingMarkers.nonEmpty) Some(overlappingMarkers) else None, + clusters = None + ) + } + + // Combine tile centroids with task locations for clustering + val allPoints = collectedPoints.toList ++ collectedTasks.map { task => + ClusterPoint(task.location.lat, task.location.lng, 1) + } + + if (allPoints.isEmpty) { + return TaskMarkerResponse(totalCount = 0) + } + + // Re-cluster into ~80 clusters, then merge nearby clusters to prevent visual overlap + val initialClusters = kMeansClustering(allPoints, TARGET_CLUSTERS) + val clusters = mergeNearbyClusters(initialClusters, zoom) + + // Convert ClusterPoints to TaskClusterSummary + val clusterSummaries = clusters.zipWithIndex.map { + case (cp, idx) => + TaskClusterSummary( + clusterId = idx, + numberOfPoints = cp.count, + taskId = None, + taskStatus = None, + point = Point(cp.lat, cp.lng), + bounding = Json.toJson("{}") + ) + } + + TaskMarkerResponse( + totalCount = totalCount, + tasks = None, + overlappingTasks = None, + clusters = Some(clusterSummaries) + ) + } + + /** + * Merge clusters that are too close together to prevent visual overlap. + * Minimum distance is calculated based on viewport zoom level. + * At lower zoom levels, clusters need to be farther apart in degrees. + * Iterates until no more merges are possible. + */ + private def mergeNearbyClusters( + clusters: List[ClusterPoint], + viewportZoom: Int + ): List[ClusterPoint] = { + if (clusters.size <= 1) return clusters + + // Calculate minimum distance in degrees based on zoom level + // At zoom 0, world is ~360 degrees wide displayed in ~256 pixels + // We want clusters to be at least ~25 pixels apart visually + // degrees_per_pixel = 360 / (256 * 2^zoom) + // min_distance = pixels * degrees_per_pixel + val pixelBuffer = 25.0 + val minDistanceDeg = pixelBuffer * 360.0 / (256.0 * math.pow(2, viewportZoom)) + + var current = clusters + var changed = true + var maxIters = 50 // Prevent infinite loops + + while (changed && maxIters > 0) { + changed = false + maxIters -= 1 + + val result = ListBuffer[ClusterPoint]() + val used = Array.fill(current.size)(false) + + for (i <- current.indices if !used(i)) { + var merged = current(i) + used(i) = true + + // Find all unused clusters within minimum distance and merge them + for (j <- (i + 1) until current.size if !used(j)) { + val other = current(j) + val dist = distance(merged.lat, merged.lng, other.lat, other.lng) + + if (dist < minDistanceDeg) { + // Merge: weighted average of positions, sum of counts + val totalCount = merged.count + other.count + val newLat = (merged.lat * merged.count + other.lat * other.count) / totalCount + val newLng = (merged.lng * merged.count + other.lng * other.count) / totalCount + merged = ClusterPoint(newLat, newLng, totalCount) + used(j) = true + changed = true + } + } + + result += merged + } + + current = result.toList + } + + current + } + + /** + * Detect overlapping tasks (tasks at the same location within ~0.1 meters) + * Groups tasks by location and separates single vs overlapping markers + */ + private def detectOverlaps( + tasks: List[TaskMarker] + ): (List[TaskMarker], List[OverlappingTaskMarker]) = { + // Group tasks by rounded location (precision of ~0.1 meters = 0.000001 degrees) + val precision = 1000000.0 + val grouped = tasks.groupBy { task => + ( + math.round(task.location.lat * precision), + math.round(task.location.lng * precision) + ) + } + + val singleMarkers = ListBuffer[TaskMarker]() + val overlappingMarkers = ListBuffer[OverlappingTaskMarker]() + + grouped.values.foreach { groupTasks => + if (groupTasks.size == 1) { + singleMarkers += groupTasks.head + } else { + // Use the first task's location as the representative location + val location = groupTasks.head.location + overlappingMarkers += OverlappingTaskMarker(location, groupTasks) + } + } + + (singleMarkers.toList, overlappingMarkers.toList) + } + + /** + * Process tiles in the bounding box using tile centroids. + * Uses a two-phase approach: + * 1. First pass: collect all tile centroids (single query, fast) + * 2. If total count is low enough, fetch actual tasks in a single batched query + */ + private def processTiles( + zoom: Int, + bounds: SearchLocation, + difficulty: Option[Int], + global: Boolean, + collectedPoints: ListBuffer[ClusterPoint], + collectedTasks: ListBuffer[TaskMarker] + ): Unit = { + val tiles = repository.getTilesInBounds(zoom, bounds) + + // First pass: calculate total count and collect tile info + var totalCount = 0 + val tilesWithCounts = tiles.flatMap { tile => + val filteredCount = tile.getFilteredCount(difficulty, global) + if (filteredCount > 0) { + totalCount += filteredCount + Some((tile, filteredCount)) + } else { + None + } + } + + // If total count is low enough, fetch actual tasks in a single batched query + if (totalCount < CLUSTER_THRESHOLD && tilesWithCounts.nonEmpty) { + // Fetch all tasks in the bounding box with a single query (much faster than per-tile) + val tasks = repository.getTaskMarkersInBounds(bounds, difficulty, global) + collectedTasks ++= tasks + } else { + // Use tile centroids for clustering (no additional queries needed) + tilesWithCounts.foreach { + case (tile, filteredCount) => + collectedPoints += ClusterPoint(tile.centroidLat, tile.centroidLng, filteredCount) + } + } + } + + /** + * Process with location_id filtering. + * First checks count, then either fetches tasks or uses clustering. + */ + private def processLocationFiltering( + startZoom: Int, + bounds: SearchLocation, + difficulty: Option[Int], + global: Boolean, + locationId: Long, + collectedPoints: ListBuffer[ClusterPoint], + collectedTasks: ListBuffer[TaskMarker] + ): Unit = { + // Get the location polygon from Nominatim + val locationPolygon = nominatimService.getPolygonByPlaceId(locationId) + + locationPolygon match { + case Some(polygonWkt) => + // First get count to decide strategy + val count = repository.countTasksInPolygon(polygonWkt, difficulty, global) + + if (count < CLUSTER_THRESHOLD) { + // Safe to fetch all tasks + val tasks = + repository.getTaskMarkersInPolygon(polygonWkt, difficulty, global, Some(CLUSTER_THRESHOLD)) + collectedTasks ++= tasks + } else { + // Too many tasks - use clustering via the polygon + val clusters = + repository.getClusteredTasksInPolygon(polygonWkt, difficulty, global, TARGET_CLUSTERS) + collectedPoints ++= clusters + } + + case None => + // Location not found, fall back to standard processing + logger.warn(s"Location polygon not found for place_id: $locationId") + processTiles(startZoom, bounds, difficulty, global, collectedPoints, collectedTasks) + } + } + + /** + * Fallback to dynamic query for keywords filtering + */ + private def getFallbackData( + bounds: SearchLocation, + difficulty: Option[Int], + global: Boolean, + locationId: Option[Long], + keywords: Option[String] + ): TaskMarkerResponse = { + val boundingBox = bounds + + val statusList = List(0, 3, 6) + + val taskCount = taskClusterRepository.queryCountTaskMarkers( + statusList, + global, + boundingBox, + locationId, + keywords, + difficulty + ) + + if (taskCount > 5000) { + TaskMarkerResponse(totalCount = taskCount) + } else if (taskCount >= 100) { + val clusters = taskClusterRepository.queryTaskMarkersClustered( + statusList, + global, + boundingBox, + locationId, + keywords, + difficulty + ) + TaskMarkerResponse(totalCount = taskCount, clusters = Some(clusters)) + } else { + val (singleMarkers, overlappingMarkers) = taskClusterRepository.queryTaskMarkersWithOverlaps( + statusList, + global, + boundingBox, + locationId, + keywords, + difficulty + ) + TaskMarkerResponse( + totalCount = taskCount, + tasks = Some(singleMarkers), + overlappingTasks = if (overlappingMarkers.nonEmpty) Some(overlappingMarkers) else None + ) + } + } + + /** + * Simple k-means clustering implementation. + * Groups points into k clusters and returns cluster centroids with counts. + */ + private def kMeansClustering(points: List[ClusterPoint], k: Int): List[ClusterPoint] = { + if (points.isEmpty) return List.empty + if (points.size <= k) return points + + val numClusters = math.min(k, points.size) + + // Initialize centroids using k-means++ style selection + var centroids = initializeCentroids(points, numClusters) + + // Run k-means iterations + val maxIterations = 20 + var iteration = 0 + var changed = true + + while (iteration < maxIterations && changed) { + // Assign points to nearest centroid + val assignments = points.map { point => + val nearest = centroids.zipWithIndex.minBy { + case (centroid, _) => + distance(point.lat, point.lng, centroid._1, centroid._2) + }._2 + (point, nearest) + } + + // Recalculate centroids + val newCentroids = (0 until numClusters).map { i => + val clusterPoints = assignments.filter(_._2 == i).map(_._1) + if (clusterPoints.isEmpty) { + centroids(i) + } else { + val totalWeight = clusterPoints.map(_.count).sum.toDouble + val avgLat = clusterPoints.map(p => p.lat * p.count).sum / totalWeight + val avgLng = clusterPoints.map(p => p.lng * p.count).sum / totalWeight + (avgLat, avgLng) + } + }.toList + + changed = !newCentroids.equals(centroids) + centroids = newCentroids + iteration += 1 + } + + // Calculate final clusters with counts + val assignments = points.map { point => + val nearest = centroids.zipWithIndex.minBy { + case (centroid, _) => + distance(point.lat, point.lng, centroid._1, centroid._2) + }._2 + (point, nearest) + } + + centroids.zipWithIndex + .map { + case ((lat, lng), i) => + val count = assignments.filter(_._2 == i).map(_._1.count).sum + ClusterPoint(lat, lng, count) + } + .filter(_.count > 0) + } + + private def initializeCentroids(points: List[ClusterPoint], k: Int): List[(Double, Double)] = { + // Simple initialization: spread evenly across the points + val step = points.size / k + (0 until k).map { i => + val point = points(math.min(i * step, points.size - 1)) + (point.lat, point.lng) + }.toList + } + + private def distance(lat1: Double, lng1: Double, lat2: Double, lng2: Double): Double = { + // Simple Euclidean distance (sufficient for clustering purposes) + val dLat = lat2 - lat1 + val dLng = lng2 - lng1 + math.sqrt(dLat * dLat + dLng * dLng) + } + + /** + * Process the tile refresh queue (called by scheduler) + */ + def processRefreshQueue(batchSize: Int = 1000): Int = { + val processed = repository.processRefreshQueue(batchSize) + if (processed > 0) { + logger.info(s"Processed $processed tiles from refresh queue") + } + processed + } + + /** + * Get the current size of the refresh queue + */ + def getQueueSize(): Int = repository.getQueueSize() + + /** + * Full rebuild of a specific zoom level (admin operation) + */ + def rebuildZoomLevel(zoom: Int): Int = { + logger.info(s"Starting full rebuild of zoom level $zoom") + val tilesCreated = repository.rebuildZoomLevel(zoom) + logger.info(s"Completed rebuild of zoom level $zoom: $tilesCreated tiles created") + tilesCreated + } + + /** + * Get statistics about the tile system + */ + def getStats(): Map[String, Int] = { + Map( + "totalTiles" -> repository.getTotalTileCount(), + "queueSize" -> repository.getQueueSize() + ) + } +} diff --git a/app/org/maproulette/jobs/Scheduler.scala b/app/org/maproulette/jobs/Scheduler.scala index b2e324153..617af9945 100644 --- a/app/org/maproulette/jobs/Scheduler.scala +++ b/app/org/maproulette/jobs/Scheduler.scala @@ -115,6 +115,13 @@ class Scheduler @Inject() ( Config.KEY_SCHEDULER_UPDATE_CHALLENGE_COMPLETION_INTERVAL ) + schedule( + "refreshTileAggregates", + "Processing tile refresh queue", + 30.seconds, + Config.KEY_SCHEDULER_TILE_REFRESH_INTERVAL + ) + scheduleAtTime( "sendCountNotificationDailyEmails", "Sending Count Notification Daily Emails", diff --git a/app/org/maproulette/jobs/SchedulerActor.scala b/app/org/maproulette/jobs/SchedulerActor.scala index 28ff6eeee..06be27aea 100644 --- a/app/org/maproulette/jobs/SchedulerActor.scala +++ b/app/org/maproulette/jobs/SchedulerActor.scala @@ -92,6 +92,8 @@ class SchedulerActor @Inject() ( this.handleArchiveChallenges(action) case RunJob("updateChallengeCompletionMetrics", action) => this.handleUpdateChallengeCompletionMetrics(action) + case RunJob("refreshTileAggregates", action) => + this.refreshTileAggregates(action) } /** @@ -876,6 +878,30 @@ class SchedulerActor @Inject() ( logger.warn(s"The KeepRight challenge creation failed. ${f.getMessage}") } } + + /** + * Processes the tile refresh queue to update pre-computed tile aggregates. + * Tiles are used for efficient map display of tasks at scale. + * + * @param action - action string + */ + def refreshTileAggregates(action: String): Unit = { + val start = System.currentTimeMillis + logger.info(s"Scheduled Task '$action': Starting run") + + val batchSize = appConfig + .getOptional[Int](Config.KEY_SCHEDULER_TILE_REFRESH_BATCH_SIZE) + .getOrElse(Config.DEFAULT_TILE_REFRESH_BATCH_SIZE) + + val processed = serviceManager.tileAggregate.processRefreshQueue(batchSize) + val queueRemaining = serviceManager.tileAggregate.getQueueSize() + + val totalTime = System.currentTimeMillis - start + logger.info( + s"Scheduled Task '$action': Finished run. Time spent: ${String.format("%1d", totalTime)}ms. " + + s"Tiles processed: $processed. Queue remaining: $queueRemaining" + ) + } } object SchedulerActor { diff --git a/conf/application.conf b/conf/application.conf index 34a3f34bd..c576c552b 100644 --- a/conf/application.conf +++ b/conf/application.conf @@ -292,6 +292,10 @@ maproulette { interval = "7 days" startTime = "1:00:00" # Snapshot every week at 1am local time } + tileRefresh { + interval = "30 seconds" + batchSize = 1000 + } } mapillary { host = "a.mapillary.com" diff --git a/conf/evolutions/default/107.sql b/conf/evolutions/default/107.sql new file mode 100644 index 000000000..7df8e34f7 --- /dev/null +++ b/conf/evolutions/default/107.sql @@ -0,0 +1,326 @@ +# --- MapRoulette Scheme + +# --- !Ups + +-- ============================================================================= +-- TILE PYRAMID AGGREGATION SYSTEM +-- ============================================================================= + +CREATE TABLE IF NOT EXISTS tile_aggregates ( + id SERIAL PRIMARY KEY, + z SMALLINT NOT NULL, + x INTEGER NOT NULL, + y INTEGER NOT NULL, + task_count INTEGER DEFAULT 0, + counts_by_filter JSONB DEFAULT '{}'::jsonb, + centroid_lat DOUBLE PRECISION, + centroid_lng DOUBLE PRECISION, + last_updated TIMESTAMP WITHOUT TIME ZONE DEFAULT NOW(), + CONSTRAINT tile_aggregates_unique UNIQUE (z, x, y) +);; + +SELECT create_index_if_not_exists('tile_aggregates', 'zxy', '(z, x, y)');; +CREATE INDEX IF NOT EXISTS idx_tile_aggregates_count ON tile_aggregates (task_count) WHERE task_count > 0;; +CREATE INDEX IF NOT EXISTS idx_tile_aggregates_counts_gin ON tile_aggregates USING GIN (counts_by_filter);; + +CREATE TABLE IF NOT EXISTS tile_refresh_queue ( + id SERIAL PRIMARY KEY, + z SMALLINT NOT NULL, + x INTEGER NOT NULL, + y INTEGER NOT NULL, + queued_at TIMESTAMP WITHOUT TIME ZONE DEFAULT NOW(), + CONSTRAINT tile_refresh_queue_unique UNIQUE (z, x, y) +);; + +SELECT create_index_if_not_exists('tile_refresh_queue', 'z', '(z)');; + +-- Tile coordinate conversion functions +CREATE OR REPLACE FUNCTION lng_to_tile_x(lng DOUBLE PRECISION, zoom INTEGER) RETURNS INTEGER as $$ +BEGIN + RETURN FLOOR((lng + 180.0) / 360.0 * (1 << zoom))::INTEGER;; +END +$$ +LANGUAGE plpgsql IMMUTABLE;; + +CREATE OR REPLACE FUNCTION lat_to_tile_y(lat DOUBLE PRECISION, zoom INTEGER) RETURNS INTEGER as $$ +DECLARE + lat_rad DOUBLE PRECISION;; + lat_clamped DOUBLE PRECISION;; +BEGIN + lat_clamped := GREATEST(-85.0511, LEAST(85.0511, lat));; + lat_rad := RADIANS(lat_clamped);; + RETURN FLOOR((1.0 - LN(TAN(lat_rad) + 1.0 / COS(lat_rad)) / PI()) / 2.0 * (1 << zoom))::INTEGER;; +END +$$ +LANGUAGE plpgsql IMMUTABLE;; + +CREATE OR REPLACE FUNCTION tile_to_lng(x INTEGER, zoom INTEGER) RETURNS DOUBLE PRECISION as $$ +BEGIN + RETURN x::DOUBLE PRECISION / (1 << zoom) * 360.0 - 180.0;; +END +$$ +LANGUAGE plpgsql IMMUTABLE;; + +CREATE OR REPLACE FUNCTION tile_to_lat(y INTEGER, zoom INTEGER) RETURNS DOUBLE PRECISION as $$ +DECLARE + n DOUBLE PRECISION;; +BEGIN + n := PI() - 2.0 * PI() * y::DOUBLE PRECISION / (1 << zoom);; + RETURN DEGREES(ATAN(SINH(n)));; +END +$$ +LANGUAGE plpgsql IMMUTABLE;; + +CREATE OR REPLACE FUNCTION tile_bounds(p_z INTEGER, p_x INTEGER, p_y INTEGER) RETURNS geometry as $$ +DECLARE + west DOUBLE PRECISION;; + east DOUBLE PRECISION;; + north DOUBLE PRECISION;; + south DOUBLE PRECISION;; +BEGIN + west := tile_to_lng(p_x, p_z);; + east := tile_to_lng(p_x + 1, p_z);; + north := tile_to_lat(p_y, p_z);; + south := tile_to_lat(p_y + 1, p_z);; + RETURN ST_MakeEnvelope(west, south, east, north, 4326);; +END +$$ +LANGUAGE plpgsql IMMUTABLE;; + +-- Trigger function to queue tiles for refresh +CREATE OR REPLACE FUNCTION queue_tile_refresh() RETURNS TRIGGER as $$ +DECLARE + task_lng DOUBLE PRECISION;; + task_lat DOUBLE PRECISION;; + old_lng DOUBLE PRECISION;; + old_lat DOUBLE PRECISION;; + zoom_level INTEGER;; + should_process_new BOOLEAN := FALSE;; + should_process_old BOOLEAN := FALSE;; +BEGIN + IF TG_OP != 'DELETE' AND NEW.location IS NOT NULL THEN + IF NEW.status IN (0, 3, 6) THEN + should_process_new := TRUE;; + task_lng := ST_X(NEW.location);; + task_lat := ST_Y(NEW.location);; + END IF;; + END IF;; + + IF TG_OP != 'INSERT' AND OLD.location IS NOT NULL THEN + IF OLD.status IN (0, 3, 6) THEN + should_process_old := TRUE;; + old_lng := ST_X(OLD.location);; + old_lat := ST_Y(OLD.location);; + END IF;; + END IF;; + + IF should_process_new THEN + FOR zoom_level IN 0..14 LOOP + INSERT INTO tile_refresh_queue (z, x, y) + VALUES (zoom_level, lng_to_tile_x(task_lng, zoom_level), lat_to_tile_y(task_lat, zoom_level)) + ON CONFLICT (z, x, y) DO NOTHING;; + END LOOP;; + END IF;; + + IF should_process_old THEN + IF NOT should_process_new OR (should_process_new AND NOT ST_Equals(OLD.location, NEW.location)) THEN + FOR zoom_level IN 0..14 LOOP + INSERT INTO tile_refresh_queue (z, x, y) + VALUES (zoom_level, lng_to_tile_x(old_lng, zoom_level), lat_to_tile_y(old_lat, zoom_level)) + ON CONFLICT (z, x, y) DO NOTHING;; + END LOOP;; + END IF;; + END IF;; + + IF TG_OP = 'DELETE' THEN + RETURN OLD;; + ELSE + RETURN NEW;; + END IF;; +END +$$ +LANGUAGE plpgsql;; + +DROP TRIGGER IF EXISTS task_tile_refresh_insert ON tasks;; +CREATE TRIGGER task_tile_refresh_insert + AFTER INSERT ON tasks + FOR EACH ROW + EXECUTE FUNCTION queue_tile_refresh();; + +DROP TRIGGER IF EXISTS task_tile_refresh_update ON tasks;; +CREATE TRIGGER task_tile_refresh_update + AFTER UPDATE OF location, status ON tasks + FOR EACH ROW + EXECUTE FUNCTION queue_tile_refresh();; + +DROP TRIGGER IF EXISTS task_tile_refresh_delete ON tasks;; +CREATE TRIGGER task_tile_refresh_delete + AFTER DELETE ON tasks + FOR EACH ROW + EXECUTE FUNCTION queue_tile_refresh();; + +-- Function to rebuild a single tile +CREATE OR REPLACE FUNCTION rebuild_tile_aggregate(p_z INTEGER, p_x INTEGER, p_y INTEGER) RETURNS VOID as $$ +DECLARE + tile_geom geometry;; + task_count_val INTEGER;; + counts_json JSONB;; + centroid_lat_val DOUBLE PRECISION;; + centroid_lng_val DOUBLE PRECISION;; +BEGIN + tile_geom := tile_bounds(p_z, p_x, p_y);; + + WITH task_data AS ( + SELECT + t.location, + COALESCE(c.difficulty, 0) as difficulty, + COALESCE(c.is_global, false) as is_global + FROM tasks t + INNER JOIN challenges c ON c.id = t.parent_id + INNER JOIN projects p ON p.id = c.parent_id + WHERE t.location && tile_geom + AND ST_Intersects(t.location, tile_geom) + AND t.status IN (0, 3, 6) + AND c.deleted = FALSE + AND c.enabled = TRUE + AND c.is_archived = FALSE + AND p.deleted = FALSE + AND p.enabled = TRUE + ), + aggregated AS ( + SELECT + COUNT(*) as total, + AVG(ST_Y(location)) as avg_lat, + AVG(ST_X(location)) as avg_lng, + COUNT(*) FILTER (WHERE difficulty = 1 AND NOT is_global) as d1_gf, + COUNT(*) FILTER (WHERE difficulty = 1 AND is_global) as d1_gt, + COUNT(*) FILTER (WHERE difficulty = 2 AND NOT is_global) as d2_gf, + COUNT(*) FILTER (WHERE difficulty = 2 AND is_global) as d2_gt, + COUNT(*) FILTER (WHERE difficulty = 3 AND NOT is_global) as d3_gf, + COUNT(*) FILTER (WHERE difficulty = 3 AND is_global) as d3_gt, + COUNT(*) FILTER (WHERE difficulty NOT IN (1,2,3) AND NOT is_global) as d0_gf, + COUNT(*) FILTER (WHERE difficulty NOT IN (1,2,3) AND is_global) as d0_gt + FROM task_data + ) + SELECT + total, + jsonb_build_object( + 'd1_gf', d1_gf, 'd1_gt', d1_gt, + 'd2_gf', d2_gf, 'd2_gt', d2_gt, + 'd3_gf', d3_gf, 'd3_gt', d3_gt, + 'd0_gf', d0_gf, 'd0_gt', d0_gt + ), + avg_lat, + avg_lng + INTO task_count_val, counts_json, centroid_lat_val, centroid_lng_val + FROM aggregated;; + + IF task_count_val IS NULL THEN + task_count_val := 0;; + counts_json := '{}'::jsonb;; + END IF;; + + IF task_count_val > 0 THEN + INSERT INTO tile_aggregates (z, x, y, task_count, counts_by_filter, centroid_lat, centroid_lng, last_updated) + VALUES (p_z, p_x, p_y, task_count_val, counts_json, centroid_lat_val, centroid_lng_val, NOW()) + ON CONFLICT (z, x, y) DO UPDATE SET + task_count = EXCLUDED.task_count, + counts_by_filter = EXCLUDED.counts_by_filter, + centroid_lat = EXCLUDED.centroid_lat, + centroid_lng = EXCLUDED.centroid_lng, + last_updated = NOW();; + ELSE + DELETE FROM tile_aggregates WHERE z = p_z AND x = p_x AND y = p_y;; + END IF;; +END +$$ +LANGUAGE plpgsql;; + +-- Function to process queued tiles +CREATE OR REPLACE FUNCTION process_tile_refresh_queue(batch_size INTEGER DEFAULT 1000) RETURNS INTEGER as $$ +DECLARE + processed INTEGER := 0;; + tile_rec RECORD;; +BEGIN + FOR tile_rec IN + SELECT z, x, y + FROM tile_refresh_queue + ORDER BY z ASC, queued_at ASC + LIMIT batch_size + FOR UPDATE SKIP LOCKED + LOOP + PERFORM rebuild_tile_aggregate(tile_rec.z, tile_rec.x, tile_rec.y);; + DELETE FROM tile_refresh_queue WHERE z = tile_rec.z AND x = tile_rec.x AND y = tile_rec.y;; + processed := processed + 1;; + END LOOP;; + + RETURN processed;; +END +$$ +LANGUAGE plpgsql;; + +-- Function to rebuild all tiles for a zoom level +CREATE OR REPLACE FUNCTION rebuild_zoom_level(p_zoom INTEGER) RETURNS INTEGER as $$ +DECLARE + tiles_created INTEGER := 0;; + tile_rec RECORD;; +BEGIN + DELETE FROM tile_aggregates WHERE z = p_zoom;; + + FOR tile_rec IN + SELECT DISTINCT + lng_to_tile_x(ST_X(t.location), p_zoom) AS tx, + lat_to_tile_y(ST_Y(t.location), p_zoom) AS ty + FROM tasks t + INNER JOIN challenges c ON c.id = t.parent_id + INNER JOIN projects p ON p.id = c.parent_id + WHERE t.location IS NOT NULL + AND t.status IN (0, 3, 6) + AND c.deleted = FALSE + AND c.enabled = TRUE + AND c.is_archived = FALSE + AND p.deleted = FALSE + AND p.enabled = TRUE + LOOP + PERFORM rebuild_tile_aggregate(p_zoom, tile_rec.tx, tile_rec.ty);; + tiles_created := tiles_created + 1;; + END LOOP;; + + RETURN tiles_created;; +END +$$ +LANGUAGE plpgsql;; + +-- Function to get child tile coordinates +CREATE OR REPLACE FUNCTION get_child_tile_coords(p_z INTEGER, p_x INTEGER, p_y INTEGER) +RETURNS TABLE(child_z INTEGER, child_x INTEGER, child_y INTEGER) as $$ +BEGIN + RETURN QUERY + SELECT p_z + 1, p_x * 2, p_y * 2 + UNION ALL + SELECT p_z + 1, p_x * 2 + 1, p_y * 2 + UNION ALL + SELECT p_z + 1, p_x * 2, p_y * 2 + 1 + UNION ALL + SELECT p_z + 1, p_x * 2 + 1, p_y * 2 + 1;; +END +$$ +LANGUAGE plpgsql IMMUTABLE;; + +# --- !Downs + +DROP TRIGGER IF EXISTS task_tile_refresh_insert ON tasks;; +DROP TRIGGER IF EXISTS task_tile_refresh_update ON tasks;; +DROP TRIGGER IF EXISTS task_tile_refresh_delete ON tasks;; +DROP FUNCTION IF EXISTS queue_tile_refresh();; +DROP FUNCTION IF EXISTS process_tile_refresh_queue(INTEGER);; +DROP FUNCTION IF EXISTS rebuild_tile_aggregate(INTEGER, INTEGER, INTEGER);; +DROP FUNCTION IF EXISTS rebuild_zoom_level(INTEGER);; +DROP FUNCTION IF EXISTS get_child_tile_coords(INTEGER, INTEGER, INTEGER);; +DROP FUNCTION IF EXISTS tile_bounds(INTEGER, INTEGER, INTEGER);; +DROP FUNCTION IF EXISTS tile_to_lat(INTEGER, INTEGER);; +DROP FUNCTION IF EXISTS tile_to_lng(INTEGER, INTEGER);; +DROP FUNCTION IF EXISTS lat_to_tile_y(DOUBLE PRECISION, INTEGER);; +DROP FUNCTION IF EXISTS lng_to_tile_x(DOUBLE PRECISION, INTEGER);; +DROP TABLE IF EXISTS tile_refresh_queue;; +DROP TABLE IF EXISTS tile_aggregates;; diff --git a/conf/v2_route/task.api b/conf/v2_route/task.api index 9d587ff53..400088855 100644 --- a/conf/v2_route/task.api +++ b/conf/v2_route/task.api @@ -678,6 +678,68 @@ PUT /markers/box/:left/:bottom/:right/:top @org.maproulette.framework GET /taskMarkers @org.maproulette.framework.controller.TaskController.getTaskMarkers(statuses: String, global:Boolean ?= false, cluster:Boolean ?= false, bounds: Option[String], location_id: Option[Long] ?= None, keywords:Option[String] ?= None, difficulty:Option[Int] ?= None) ### # tags: [ Task ] +# operationId: task_get_task_tiles +# summary: Get Task Tiles +# description: | +# Returns pre-computed tile aggregates for efficient map display at scale. +# Only includes tasks with status 0 (Created), 3 (Skipped), or 6 (Too Hard). +# For tiles with >= 2000 tasks, returns pre-computed clusters. +# For tiles with < 2000 tasks, returns individual task markers. +# When filters (global, location_id, keywords, difficulty) are applied, +# falls back to dynamic query since tiles only store unfiltered data. +# responses: +# '200': +# description: Task markers with clusters and/or individual tasks (including overlaps) +# content: +# application/json: +# schema: +# $ref: '#/components/schemas/org.maproulette.framework.model.TaskMarkerResponse' +# parameters: +# - name: z +# in: path +# description: Zoom level (0-14 for pre-computed tiles, 15+ fetches on-demand) +# required: true +# schema: +# type: integer +# minimum: 0 +# maximum: 20 +# - name: bounds +# in: query +# description: Comma-separated bounding box coordinates (left,bottom,right,top) +# required: true +# schema: +# type: string +# example: "-122.5,37.5,-122.0,38.0" +# - name: global +# in: query +# description: Include global challenges (triggers fallback to dynamic query) +# required: false +# schema: +# type: boolean +# default: false +# - name: location_id +# in: query +# description: Nominatim place_id for polygon filtering (triggers fallback) +# required: false +# schema: +# type: integer +# - name: keywords +# in: query +# description: Comma-separated keywords to filter by (triggers fallback) +# required: false +# schema: +# type: string +# - name: difficulty +# in: query +# description: Filter by difficulty 1=Easy, 2=Normal, 3=Expert (triggers fallback) +# required: false +# schema: +# type: integer +# enum: [1, 2, 3] +### +GET /taskTiles/:z @org.maproulette.framework.controller.TaskController.getTaskTiles(z: Int, bounds: String, global: Boolean ?= false, location_id: Option[Long] ?= None, keywords: Option[String] ?= None, difficulty: Option[Int] ?= None) +### +# tags: [ Task ] # operationId: task_update_task_changeset # summary: Update Task Changeset # description: Will update the changeset of the task. It will do this by attempting to match the OSM changeset to the Task based on the geometry and the time that the changeset was executed. diff --git a/test/org/maproulette/utils/TestSpec.scala b/test/org/maproulette/utils/TestSpec.scala index 77dab0cc3..d0f568f34 100644 --- a/test/org/maproulette/utils/TestSpec.scala +++ b/test/org/maproulette/utils/TestSpec.scala @@ -147,6 +147,7 @@ trait TestSpec extends PlaySpec with MockitoSugar { val leaderboardService = mock[LeaderboardService] val taskHistoryService = mock[TaskHistoryService] val nominatimService = mock[NominatimService] + val tileAggregateService = mock[TileAggregateService] val serviceManager = new ServiceManager( Providers.of[ProjectService](projectService), Providers.of[GrantService](grantService), @@ -171,7 +172,8 @@ trait TestSpec extends PlaySpec with MockitoSugar { Providers.of[NotificationService](notificationService), Providers.of[LeaderboardService](leaderboardService), Providers.of[TaskHistoryService](taskHistoryService), - Providers.of[NominatimService](nominatimService) + Providers.of[NominatimService](nominatimService), + Providers.of[TileAggregateService](tileAggregateService) ) val permission = new Permission(Providers.of[DALManager](dalManager), serviceManager, new Config())