From 840a723aa260d629860bc2aa807f4ada697ea5c1 Mon Sep 17 00:00:00 2001 From: Federico Marcos Date: Sun, 14 Jul 2019 00:08:01 -0300 Subject: [PATCH 1/7] split intersections and crossed into separate commands --- .editorconfig | 2 +- .gitignore | 4 + .../management/commands/update_osm.py | 219 +----------------- apps/catastro/tests/test_importer.py | 18 ++ apps/commands/apps.py | 6 + apps/commands/cross_areas.py | 214 +++++++++++++++++ apps/commands/logging.py | 8 + .../management/commands/cross_areas.py | 12 + .../commands/street_intersections.py | 9 + apps/commands/street_intersections.py | 33 +++ config/settings/base.py | 1 + 11 files changed, 307 insertions(+), 219 deletions(-) create mode 100644 apps/catastro/tests/test_importer.py create mode 100644 apps/commands/apps.py create mode 100644 apps/commands/cross_areas.py create mode 100644 apps/commands/logging.py create mode 100644 apps/commands/management/commands/cross_areas.py create mode 100644 apps/commands/management/commands/street_intersections.py create mode 100644 apps/commands/street_intersections.py diff --git a/.editorconfig b/.editorconfig index 16c7dc73..134a6da7 100644 --- a/.editorconfig +++ b/.editorconfig @@ -5,7 +5,7 @@ root = true [*] charset = utf-8 end_of_line = lf -insert_final_newline = true +insert_final_newline = false trim_trailing_whitespace = true [*.{py,rst,ini}] diff --git a/.gitignore b/.gitignore index 753a1ab0..8f2560b7 100644 --- a/.gitignore +++ b/.gitignore @@ -277,3 +277,7 @@ web/media/ # pbf osm data osm/*.pbf + +# dumps +dump.sql +dump.sql.gz \ No newline at end of file diff --git a/apps/catastro/management/commands/update_osm.py b/apps/catastro/management/commands/update_osm.py index 72e5b776..cabead12 100644 --- a/apps/catastro/management/commands/update_osm.py +++ b/apps/catastro/management/commands/update_osm.py @@ -15,16 +15,14 @@ from django.contrib.gis.db.models.functions import MakeValid from django.contrib.gis.geos.error import GEOSException -from apps.catastro.models import Poi, Interseccion, AdministrativeArea +from apps.catastro.models import Poi, AdministrativeArea from apps.core.models import Recorrido, ImporterLog, Parada, Horario from apps.editor.models import RecorridoProposed from apps.utils.fix_way import fix_way, fix_polygon import osmium -import geopandas as gpd from datetime import datetime from urllib import request -from psycopg2.extras import execute_values class Nonegetter(): @@ -65,13 +63,6 @@ def add_arguments(self, parser): default=False, help='Build poi data from osm' ) - parser.add_argument( - '--cross', - action='store_true', - dest='cross', - default=False, - help='Build cross data from planet_osm_line' - ) parser.add_argument( '--download', action='store_true', @@ -79,13 +70,6 @@ def add_arguments(self, parser): default=False, help='Run importer/download routine from OSM' ) - parser.add_argument( - '--intersections', - action='store_true', - dest='intersections', - default=False, - help='Run intersections routine' - ) parser.add_argument( '--update_routes', action='store_true', @@ -400,180 +384,6 @@ def print_tree(node, level=0): # recorridos de osm # ####################### - if options['cross']: - - crs = {'init': 'epsg:4326'} - - self.out1('Cross osm recorridos') - self.out2('Obteniendo bus routes de osm planet_osm_line') - bus_routes = gpd.read_postgis( - """ - # esto cambiarlo para no usar mas planet_osm_line (osm2pgsql), usar osmosis para construir las bus_routes - # SELECT - # @osm_id AS osm_id, -- @=modulus operator - # name, - # ref, - # st_linemerge(st_union(way)) AS way - # FROM - # planet_osm_line - # WHERE - # route = 'bus' - # GROUP BY - # osm_id, - # name, - # ref - """, - connection, - geom_col='way', - crs=crs - ) - bus_routes.set_index('osm_id', inplace=True) - - self.out2('Creando geodataframe') - bus_routes_buffer = gpd.GeoDataFrame({ - 'osm_id': bus_routes.index, - 'way': bus_routes.way, - 'way_buffer_40': bus_routes.way.buffer(0.0004), - 'way_buffer_40_simplify': bus_routes.way.simplify(0.0001).buffer(0.0004), - 'name': bus_routes.name - }, crs=crs).set_geometry('way_buffer_40_simplify') - - self.out2('Obteniendo recorridos de cualbondi core_recorridos') - core_recorrido = gpd.read_postgis( - """ - SELECT - cr.id, - cr.nombre, - cr.linea_id, - cr.ruta, - cl.nombre AS linea_nombre - FROM - core_recorrido cr - JOIN core_linea cl ON (cr.linea_id = cl.id) - -- JOIN catastro_ciudad_recorridos ccr ON (ccr.recorrido_id = cr.id) - --WHERE - -- ccr.ciudad_id = 1 - ; - """, - connection, - geom_col='ruta', - crs=crs - ) - core_recorrido.set_index('id', inplace=True) - - self.out2('Creando geodataframe') - core_recorrido_buffer = gpd.GeoDataFrame({ - 'id': core_recorrido.index, - 'ruta': core_recorrido.ruta.simplify(0.0001), - 'ruta_buffer_40_simplify': core_recorrido.ruta.simplify(0.0001).buffer(0.0004), - 'nombre': core_recorrido.nombre, - 'linea_id': core_recorrido.linea_id, - }, crs=crs).set_geometry('ruta') - - self.out2('Generando intersecciones') - intersections = gpd.sjoin(core_recorrido_buffer, bus_routes_buffer, how='inner', op='intersects') - - self.out2('Copiando indice, id') - intersections['id'] = intersections.index - - self.out2('Copiando indice, osm_id') - intersections['osm_id'] = intersections.index_right - - self.out2('Drop indice, osm_id') - intersections.drop('index_right', inplace=True, axis=1) - - self.out2('Generando match [id, osm_id]') - intersections = intersections[['id', 'osm_id']] - - self.out2('Generando indice de match [id, osm_id]') - intersections.index = range(len(intersections)) - - self.out2('Generando way_buffer_40_simplify') - way_buffer_40_simplify = gpd.GeoSeries( - bus_routes_buffer.loc[intersections.osm_id].way_buffer_40_simplify.values, crs=crs) - - self.out2('Generando ruta_buffer_40_simplify') - ruta_buffer_40_simplify = gpd.GeoSeries( - core_recorrido_buffer.loc[intersections.id].ruta_buffer_40_simplify.values, crs=crs) - - self.out2('Generando symmetric_difference') - diffs = ruta_buffer_40_simplify.symmetric_difference(way_buffer_40_simplify).area.values - - self.out2('Generando norm_factor') - norm_factor = ruta_buffer_40_simplify.area.values + way_buffer_40_simplify.area.values - - self.out2('Generando diffs') - diffs = (diffs / norm_factor).tolist() - - self.out2('Pasando osm_ids a lista') - osm_ids = intersections.osm_id.values.tolist() - - self.out2('Pasando osm_names a lista') - osm_names = bus_routes.loc[osm_ids].name.values.tolist() - # ways = bus_routes.loc[osm_ids].way.map(lambda x: x.wkb).values.tolist() - - self.out2('Pasando recorrido_ids de intersections a lista') - recorrido_ids = intersections['id'].values.tolist() - - self.out2('Pasando linea_ids a lista') - linea_ids = core_recorrido.loc[recorrido_ids].linea_id.values.tolist() - # rutas = core_recorrido.loc[recorrido_ids].ruta.map(lambda x: x.wkb).values.tolist() - - self.out2('Pasando recorrido_nombres a lista') - recorrido_nombres = core_recorrido.loc[recorrido_ids].nombre.values.tolist() - # ruta_buffer_40_simplifys = ruta_buffer_40_simplify.map(lambda x: x.wkb).values.tolist() - # way_buffer_40_simplifys = way_buffer_40_simplify.map(lambda x: x.wkb).values.tolist() - - self.out2('Pasando linea_nombres a lista') - linea_nombres = core_recorrido.loc[recorrido_ids].linea_nombre.values.tolist() - - self.out2('DROP TABLE crossed_areas') - cu.execute("DROP TABLE IF EXISTS crossed_areas;") - cu.execute('DROP INDEX IF EXISTS crossed_areas_recorrido_id;') - cu.execute('DROP INDEX IF EXISTS crossed_areas_area;') - - self.out2('CREATE TABLE crossed_areas') - cu.execute( - """ - CREATE TABLE crossed_areas ( - area FLOAT, - linea_id INTEGER, - recorrido_id INTEGER, - osm_id BIGINT, - linea_nombre VARCHAR(100), - recorrido_nombre VARCHAR(100), - osm_name TEXT - ); - """ - ) - - self.out2('Preparando lista de values') - data = list(zip(diffs, linea_ids, recorrido_ids, osm_ids, linea_nombres, recorrido_nombres, osm_names)) - - self.out2('Ejecutando insert query') - insert_query = """ - INSERT INTO crossed_areas ( - area, - linea_id, - recorrido_id, - osm_id, - linea_nombre, - recorrido_nombre, - osm_name - ) - VALUES %s - """ - execute_values(cu, insert_query, data) - - self.out2('Commit insert query') - connection.commit() - - self.out2('Generando indice crossed_areas_recorrido_id') - cu.execute('CREATE INDEX crossed_areas_recorrido_id ON crossed_areas (recorrido_id);') - cu.execute('CREATE INDEX crossed_areas_area ON crossed_areas (area);') - - self.out2('LISTO!') - if options['update_routes']: # TODO: consider also trains / trams / things that have fixed stops @@ -966,33 +776,6 @@ def node(self, n): self.out2('Generando catastro_poi') cu.execute('CREATE INDEX catastropoi_nomnormal_gin ON catastro_poi USING gin (nom_normal gin_trgm_ops);') - ########################## - # Intersections de osm # - ########################## - - if options['intersections']: - - self.out1('Generando Intersecciones') - cu.execute('delete from catastro_interseccion') - cu.execute(''' - SELECT - SEL1.nom || ' y ' || SEL2.nom as nom, - upper(translate(SEL1.nom || ' y ' || SEL2.nom, 'áéíóúÁÉÍÓÚäëïöüÄËÏÖÜñÑàèìòùÀÈÌÒÙ', 'AEIOUAEIOUAEIOUAEIOUNNAEIOUAEIOU')) as nom_normal, - ST_Intersection(SEL1.way, SEL2.way) as latlng - FROM - catastro_calle AS SEL1 - join catastro_calle as SEL2 on (ST_Intersects(SEL1.way, SEL2.way) and ST_GeometryType(ST_Intersection(SEL1.way, SEL2.way):: Geometry)='ST_Point' ) - ''') - self.out2('Generando slugs') - intersections = cu.fetchall() - total = len(intersections) - i = 0 - for inter in intersections: - i = i + 1 - Interseccion.objects.create(nom=inter[0], nom_normal=inter[1], latlng=inter[2]) - if (i * 100.0 / total) % 1 == 0: - self.out2('{:2.0f}%'.format(i * 100.0 / total)) - # self.out1('Eliminando tablas no usadas') # cu.execute('drop table planet_osm_roads;') # cu.execute('drop table planet_osm_polygon;') diff --git a/apps/catastro/tests/test_importer.py b/apps/catastro/tests/test_importer.py new file mode 100644 index 00000000..1f973c3b --- /dev/null +++ b/apps/catastro/tests/test_importer.py @@ -0,0 +1,18 @@ +from django.core.management import call_command + +from django.test import TestCase + + +class CommandsTestCase(TestCase): + def test_import_osm(self): + pass + # args = [] + # opts = { + # 'king': 'argentina', + # 'download': True, + # 'admin_areas': True, + # 'update_routes': True, + # 'add_routes': True, + # 'pois': True + # } + # call_command('update_osm', *args, **opts) diff --git a/apps/commands/apps.py b/apps/commands/apps.py new file mode 100644 index 00000000..4ec80cef --- /dev/null +++ b/apps/commands/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class CommandsConfig(AppConfig): + name = 'apps.commands' + verbose_name = "Commands" diff --git a/apps/commands/cross_areas.py b/apps/commands/cross_areas.py new file mode 100644 index 00000000..c3bb0322 --- /dev/null +++ b/apps/commands/cross_areas.py @@ -0,0 +1,214 @@ +import geopandas as gpd +from django.db import connection +from psycopg2.extras import execute_values +from .logging import logger + + +class CrossAreasTask: + def run(self): + cu = connection.cursor() + + crs = {"init": "epsg:4326"} + + logger.info("Cross osm recorridos") + logger.info("Obteniendo bus routes de osm planet_osm_line") + + bus_routes = gpd.read_postgis( + """ + # esto cambiarlo para no usar mas planet_osm_line (osm2pgsql), usar osmosis para construir las bus_routes + # SELECT + # @osm_id AS osm_id, -- @=modulus operator + # name, + # ref, + # st_linemerge(st_union(way)) AS way + # FROM + # planet_osm_line + # WHERE + # route = 'bus' + # GROUP BY + # osm_id, + # name, + # ref + """, + connection, + geom_col="way", + crs=crs, + ) + bus_routes.set_index("osm_id", inplace=True) + + logger.info("Creando geodataframe") + bus_routes_buffer = gpd.GeoDataFrame( + { + "osm_id": bus_routes.index, + "way": bus_routes.way, + "way_buffer_40": bus_routes.way.buffer(0.0004), + "way_buffer_40_simplify": bus_routes.way.simplify(0.0001).buffer( + 0.0004 + ), + "name": bus_routes.name, + }, + crs=crs, + ).set_geometry("way_buffer_40_simplify") + + logger.info("Obteniendo recorridos de cualbondi core_recorridos") + core_recorrido = gpd.read_postgis( + """ + SELECT + cr.id, + cr.nombre, + cr.linea_id, + cr.ruta, + cl.nombre AS linea_nombre + FROM + core_recorrido cr + JOIN core_linea cl ON (cr.linea_id = cl.id) + -- JOIN catastro_ciudad_recorridos ccr ON (ccr.recorrido_id = cr.id) + --WHERE + -- ccr.ciudad_id = 1 + ; + """, + connection, + geom_col="ruta", + crs=crs, + ) + core_recorrido.set_index("id", inplace=True) + + logger.info("Creando geodataframe para cb con buffers") + core_recorrido_buffer = gpd.GeoDataFrame( + { + "id": core_recorrido.index, + "ruta": core_recorrido.ruta.simplify(0.0001), + "ruta_buffer_40_simplify": core_recorrido.ruta.simplify(0.0001).buffer( + 0.0004 + ), + "nombre": core_recorrido.nombre, + "linea_id": core_recorrido.linea_id, + }, + crs=crs, + ).set_geometry("ruta") + + logger.info("Generando intersecciones") + intersections = gpd.sjoin( + core_recorrido_buffer, bus_routes_buffer, how="inner", op="intersects" + ) + + logger.info("Copiando indice, id") + intersections["id"] = intersections.index + + logger.info("Copiando indice, osm_id") + intersections["osm_id"] = intersections.index_right + + logger.info("Drop indice, osm_id") + intersections.drop("index_right", inplace=True, axis=1) + + logger.info("Generando match [id, osm_id]") + intersections = intersections[["id", "osm_id"]] + + logger.info("Generando indice de match [id, osm_id]") + intersections.index = range(len(intersections)) + + logger.info("Generando way_buffer_40_simplify") + way_buffer_40_simplify = gpd.GeoSeries( + bus_routes_buffer.loc[intersections.osm_id].way_buffer_40_simplify.values, + crs=crs, + ) + + logger.info("Generando ruta_buffer_40_simplify") + ruta_buffer_40_simplify = gpd.GeoSeries( + core_recorrido_buffer.loc[intersections.id].ruta_buffer_40_simplify.values, + crs=crs, + ) + + logger.info("Generando symmetric_difference") + diffs = ruta_buffer_40_simplify.symmetric_difference( + way_buffer_40_simplify + ).area.values + + logger.info("Generando norm_factor") + norm_factor = ( + ruta_buffer_40_simplify.area.values + way_buffer_40_simplify.area.values + ) + + logger.info("Generando diffs") + diffs = (diffs / norm_factor).tolist() + + logger.info("Pasando osm_ids a lista") + osm_ids = intersections.osm_id.values.tolist() + + logger.info("Pasando osm_names a lista") + osm_names = bus_routes.loc[osm_ids].name.values.tolist() + # ways = bus_routes.loc[osm_ids].way.map(lambda x: x.wkb).values.tolist() + + logger.info("Pasando recorrido_ids de intersections a lista") + recorrido_ids = intersections["id"].values.tolist() + + logger.info("Pasando linea_ids a lista") + linea_ids = core_recorrido.loc[recorrido_ids].linea_id.values.tolist() + # rutas = core_recorrido.loc[recorrido_ids].ruta.map(lambda x: x.wkb).values.tolist() + + logger.info("Pasando recorrido_nombres a lista") + recorrido_nombres = core_recorrido.loc[recorrido_ids].nombre.values.tolist() + # ruta_buffer_40_simplifys = ruta_buffer_40_simplify.map(lambda x: x.wkb).values.tolist() + # way_buffer_40_simplifys = way_buffer_40_simplify.map(lambda x: x.wkb).values.tolist() + + logger.info("Pasando linea_nombres a lista") + linea_nombres = core_recorrido.loc[recorrido_ids].linea_nombre.values.tolist() + + logger.info("DROP TABLE crossed_areas") + cu.execute("DROP TABLE IF EXISTS crossed_areas;") + cu.execute("DROP INDEX IF EXISTS crossed_areas_recorrido_id;") + cu.execute("DROP INDEX IF EXISTS crossed_areas_area;") + + logger.info("CREATE TABLE crossed_areas") + cu.execute( + """ + CREATE TABLE crossed_areas ( + area FLOAT, + linea_id INTEGER, + recorrido_id INTEGER, + osm_id BIGINT, + linea_nombre VARCHAR(100), + recorrido_nombre VARCHAR(100), + osm_name TEXT + ); + """ + ) + + logger.info("Preparando lista de values") + data = list( + zip( + diffs, + linea_ids, + recorrido_ids, + osm_ids, + linea_nombres, + recorrido_nombres, + osm_names, + ) + ) + + logger.info("Ejecutando insert query") + insert_query = """ + INSERT INTO crossed_areas ( + area, + linea_id, + recorrido_id, + osm_id, + linea_nombre, + recorrido_nombre, + osm_name + ) + VALUES %s + """ + execute_values(cu, insert_query, data) + + logger.info("Commit insert query") + connection.commit() + + logger.info("Generando indice crossed_areas_recorrido_id") + cu.execute( + "CREATE INDEX crossed_areas_recorrido_id ON crossed_areas (recorrido_id);" + ) + cu.execute("CREATE INDEX crossed_areas_area ON crossed_areas (area);") + + logger.info("LISTO!") diff --git a/apps/commands/logging.py b/apps/commands/logging.py new file mode 100644 index 00000000..704ecd07 --- /dev/null +++ b/apps/commands/logging.py @@ -0,0 +1,8 @@ +import logging + +logger = logging.getLogger() +handler = logging.StreamHandler() +formatter = logging.Formatter("%(asctime)s %(levelname)-8s %(message)s") +handler.setFormatter(formatter) +logger.addHandler(handler) +logger.setLevel(logging.DEBUG) diff --git a/apps/commands/management/commands/cross_areas.py b/apps/commands/management/commands/cross_areas.py new file mode 100644 index 00000000..d01a235c --- /dev/null +++ b/apps/commands/management/commands/cross_areas.py @@ -0,0 +1,12 @@ +from django.core.management.base import BaseCommand +from apps.commands.cross_areas import CrossAreasTask + + +class Command(BaseCommand): + help = "calculate intersection areas between osm and cb to find matches" + + def add_arguments(self, parser): + pass + + def handle(self, *args, **options): + CrossAreasTask().run() diff --git a/apps/commands/management/commands/street_intersections.py b/apps/commands/management/commands/street_intersections.py new file mode 100644 index 00000000..5c1e525a --- /dev/null +++ b/apps/commands/management/commands/street_intersections.py @@ -0,0 +1,9 @@ +from django.core.management.base import BaseCommand +from apps.commands.street_intersections import StreetIntersectionsTask + + +class Command(BaseCommand): + help = "Generate street intersections" + + def handle(self, *args, **options): + StreetIntersectionsTask().run() diff --git a/apps/commands/street_intersections.py b/apps/commands/street_intersections.py new file mode 100644 index 00000000..327c75fc --- /dev/null +++ b/apps/commands/street_intersections.py @@ -0,0 +1,33 @@ +from .logging import logger +from django.db import connection +from apps.catastro.models import Interseccion + + +class StreetIntersectionsTask: + def run(self): + cu = connection.cursor() + + logger.info("Generando Intersecciones") + cu.execute("delete from catastro_interseccion") + cu.execute( + """ + SELECT + SEL1.nom || ' y ' || SEL2.nom as nom, + upper(translate(SEL1.nom || ' y ' || SEL2.nom, 'áéíóúÁÉÍÓÚäëïöüÄËÏÖÜñÑàèìòùÀÈÌÒÙ', 'AEIOUAEIOUAEIOUAEIOUNNAEIOUAEIOU')) as nom_normal, + ST_Intersection(SEL1.way, SEL2.way) as latlng + FROM + catastro_calle AS SEL1 + join catastro_calle as SEL2 on (ST_Intersects(SEL1.way, SEL2.way) and ST_GeometryType(ST_Intersection(SEL1.way, SEL2.way):: Geometry)='ST_Point' ) + """ + ) + logger.info("Generando slugs") + intersections = cu.fetchall() + total = len(intersections) + i = 0 + for inter in intersections: + i = i + 1 + Interseccion.objects.create( + nom=inter[0], nom_normal=inter[1], latlng=inter[2] + ) + if (i * 100.0 / total) % 1 == 0: + logger.info("{:2.0f}%".format(i * 100.0 / total)) diff --git a/config/settings/base.py b/config/settings/base.py index 950048b7..8661e13b 100644 --- a/config/settings/base.py +++ b/config/settings/base.py @@ -92,6 +92,7 @@ 'apps.catastro.apps.CatastroConfig', 'apps.editor.apps.EditorConfig', 'apps.api3.apps.Api3Config', + 'apps.commands.apps.CommandsConfig', # DEPRECATED: 'apps.usuarios', ] From 3fa97ed782d5de539bed0cd9926d0eae3b3316ba Mon Sep 17 00:00:00 2001 From: Federico Marcos Date: Sun, 14 Jul 2019 03:04:50 -0300 Subject: [PATCH 2/7] remove unused parameter --- apps/catastro/management/commands/update_osm.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/apps/catastro/management/commands/update_osm.py b/apps/catastro/management/commands/update_osm.py index cabead12..d7c617db 100644 --- a/apps/catastro/management/commands/update_osm.py +++ b/apps/catastro/management/commands/update_osm.py @@ -50,12 +50,6 @@ class Command(BaseCommand): help = 'update local database with osm POIs and Streets' def add_arguments(self, parser): - parser.add_argument( - '--ciudad', - action='store', - dest='ciudad', - help='Only import this ciudad slug' - ) parser.add_argument( '--pois', action='store_true', From 744378a319dc3bc38a85d4f063f8d1e696701ad2 Mon Sep 17 00:00:00 2001 From: Federico Marcos Date: Sun, 14 Jul 2019 03:05:14 -0300 Subject: [PATCH 3/7] wip download osm task --- apps/commands/download_osm.py | 53 +++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 apps/commands/download_osm.py diff --git a/apps/commands/download_osm.py b/apps/commands/download_osm.py new file mode 100644 index 00000000..f7cea33c --- /dev/null +++ b/apps/commands/download_osm.py @@ -0,0 +1,53 @@ +import luigi +from urllib import request +import os +import sys + + +class ForceableTask(luigi.Task): + + force = luigi.BoolParameter(significant=False, default=False) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + # To force execution, we just remove all outputs before `complete()` is called + if self.force is True: + outputs = luigi.task.flatten(self.output()) + for out in outputs: + if out.exists(): + os.remove(self.output().path) + + +def progress(numblocks, blocksize, filesize, url): + base = os.path.basename(url) + try: + percent = min((numblocks * blocksize * 100) / filesize, 100) + except ZeroDivisionError: + percent = 100 + if numblocks != 0: + sys.stdout.write('\b' * 70) + sys.stdout.write('%-66s%3d%%' % (base, percent)) + + +class DownloadOSM(ForceableTask): + king = None + + @classmethod + def get_filename(cls): + return f'/tmp/osm-{cls.king["name"]}.pbf' + + def output(self): + return luigi.LocalTarget(self.get_filename()) + + def run(self): + url = self.king['url'] + request.urlretrieve(url, self.get_filename(), lambda nb, bs, fs: progress(nb, bs, fs, url)) + + +class DownloadOSMArgentina(DownloadOSM): + king = { + 'name': 'argentina', + 'url': 'http://download.geofabrik.de/south-america/argentina-latest.osm.pbf', + 'id': 286393, + 'paradas_completas': False, + } From 26fda99583458e0bcf9a4f3198a52772624a90ef Mon Sep 17 00:00:00 2001 From: Federico Marcos Date: Sun, 14 Jul 2019 03:07:15 -0300 Subject: [PATCH 4/7] rename app --- apps/commands/apps.py | 6 ------ apps/tasks/apps.py | 6 ++++++ apps/{commands => tasks}/cross_areas.py | 0 apps/{commands => tasks}/download_osm.py | 0 apps/{commands => tasks}/logging.py | 0 apps/{commands => tasks}/management/commands/cross_areas.py | 0 .../management/commands/street_intersections.py | 0 apps/{commands => tasks}/street_intersections.py | 0 config/settings/base.py | 2 +- 9 files changed, 7 insertions(+), 7 deletions(-) delete mode 100644 apps/commands/apps.py create mode 100644 apps/tasks/apps.py rename apps/{commands => tasks}/cross_areas.py (100%) rename apps/{commands => tasks}/download_osm.py (100%) rename apps/{commands => tasks}/logging.py (100%) rename apps/{commands => tasks}/management/commands/cross_areas.py (100%) rename apps/{commands => tasks}/management/commands/street_intersections.py (100%) rename apps/{commands => tasks}/street_intersections.py (100%) diff --git a/apps/commands/apps.py b/apps/commands/apps.py deleted file mode 100644 index 4ec80cef..00000000 --- a/apps/commands/apps.py +++ /dev/null @@ -1,6 +0,0 @@ -from django.apps import AppConfig - - -class CommandsConfig(AppConfig): - name = 'apps.commands' - verbose_name = "Commands" diff --git a/apps/tasks/apps.py b/apps/tasks/apps.py new file mode 100644 index 00000000..bbea9589 --- /dev/null +++ b/apps/tasks/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class TasksConfig(AppConfig): + name = 'apps.tasks' + verbose_name = "Tasks" diff --git a/apps/commands/cross_areas.py b/apps/tasks/cross_areas.py similarity index 100% rename from apps/commands/cross_areas.py rename to apps/tasks/cross_areas.py diff --git a/apps/commands/download_osm.py b/apps/tasks/download_osm.py similarity index 100% rename from apps/commands/download_osm.py rename to apps/tasks/download_osm.py diff --git a/apps/commands/logging.py b/apps/tasks/logging.py similarity index 100% rename from apps/commands/logging.py rename to apps/tasks/logging.py diff --git a/apps/commands/management/commands/cross_areas.py b/apps/tasks/management/commands/cross_areas.py similarity index 100% rename from apps/commands/management/commands/cross_areas.py rename to apps/tasks/management/commands/cross_areas.py diff --git a/apps/commands/management/commands/street_intersections.py b/apps/tasks/management/commands/street_intersections.py similarity index 100% rename from apps/commands/management/commands/street_intersections.py rename to apps/tasks/management/commands/street_intersections.py diff --git a/apps/commands/street_intersections.py b/apps/tasks/street_intersections.py similarity index 100% rename from apps/commands/street_intersections.py rename to apps/tasks/street_intersections.py diff --git a/config/settings/base.py b/config/settings/base.py index 8661e13b..45873cb0 100644 --- a/config/settings/base.py +++ b/config/settings/base.py @@ -92,7 +92,7 @@ 'apps.catastro.apps.CatastroConfig', 'apps.editor.apps.EditorConfig', 'apps.api3.apps.Api3Config', - 'apps.commands.apps.CommandsConfig', + 'apps.tasks.apps.TasksConfig', # DEPRECATED: 'apps.usuarios', ] From 3bd21c34fb3d2c20c8f97d302eac541266cf3aed Mon Sep 17 00:00:00 2001 From: Federico Marcos Date: Sun, 14 Jul 2019 18:38:41 -0300 Subject: [PATCH 5/7] fix tests --- apps/api3/tests/test_endpoints.py | 2 +- apps/catastro/management/commands/update_osm.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/apps/api3/tests/test_endpoints.py b/apps/api3/tests/test_endpoints.py index 1f6e82c9..9b1ab7cd 100644 --- a/apps/api3/tests/test_endpoints.py +++ b/apps/api3/tests/test_endpoints.py @@ -45,7 +45,7 @@ def test_recorridos_origen_destino(self): def test_recorridos_por_linea_1(self): "should simulate a client query based on bus name" response = self.client.get( - '/api/v3/recorridos/?q=129&c=la-plata&page=1') + '/api/v3/recorridos/?q=129&l=-57.968416213989265%2C-34.910780590483675%2C300&c=la-plata&page=1') self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.data["count"], 1) result = response.data["results"][0] diff --git a/apps/catastro/management/commands/update_osm.py b/apps/catastro/management/commands/update_osm.py index d7c617db..ba3a8220 100644 --- a/apps/catastro/management/commands/update_osm.py +++ b/apps/catastro/management/commands/update_osm.py @@ -360,8 +360,9 @@ def print_tree(node, level=0): print_tree(node, level + 1) # print_tree(tree) - + self.out2('saving admin areas tree...') AdministrativeArea.load_bulk([tree]) + self.out2('finished saving admin areas') for K in OLD_KING: K.delete() @@ -707,7 +708,7 @@ def node(self, n): .order_by() \ .annotate(cond=RawSQL("ST_Intersects(ST_Buffer(%s::geography, 400, 2)::geometry, ruta)", (point.ewkb,), output_field=BooleanField())) \ .filter(cond=True) \ - .only('id') + .exists() if q: defaults = { 'tags': n.tags.__dict__, From fda8bc596a6df04dda30be439dd6a8d9c5253490 Mon Sep 17 00:00:00 2001 From: Federico Marcos Date: Mon, 15 Jul 2019 01:31:47 -0300 Subject: [PATCH 6/7] airflow for scheduling --- apps/tasks/download_osm.py | 73 +++++++++++++++------------- apps/tasks/{logging.py => logger.py} | 0 requirements/base.txt | 2 + 3 files changed, 41 insertions(+), 34 deletions(-) rename apps/tasks/{logging.py => logger.py} (100%) diff --git a/apps/tasks/download_osm.py b/apps/tasks/download_osm.py index f7cea33c..b32fd1f6 100644 --- a/apps/tasks/download_osm.py +++ b/apps/tasks/download_osm.py @@ -1,21 +1,16 @@ -import luigi from urllib import request import os import sys +from datetime import timedelta, datetime +from airflow import DAG +from airflow.operators.python_operator import PythonOperator -class ForceableTask(luigi.Task): - force = luigi.BoolParameter(significant=False, default=False) - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - # To force execution, we just remove all outputs before `complete()` is called - if self.force is True: - outputs = luigi.task.flatten(self.output()) - for out in outputs: - if out.exists(): - os.remove(self.output().path) +def download_osm(king): + filename = f'/tmp/osm-{king["name"]}.pbf' + url = king['url'] + request.urlretrieve(url, filename, lambda nb, bs, fs: progress(nb, bs, fs, url)) def progress(numblocks, blocksize, filesize, url): @@ -29,25 +24,35 @@ def progress(numblocks, blocksize, filesize, url): sys.stdout.write('%-66s%3d%%' % (base, percent)) -class DownloadOSM(ForceableTask): - king = None - - @classmethod - def get_filename(cls): - return f'/tmp/osm-{cls.king["name"]}.pbf' - - def output(self): - return luigi.LocalTarget(self.get_filename()) - - def run(self): - url = self.king['url'] - request.urlretrieve(url, self.get_filename(), lambda nb, bs, fs: progress(nb, bs, fs, url)) - - -class DownloadOSMArgentina(DownloadOSM): - king = { - 'name': 'argentina', - 'url': 'http://download.geofabrik.de/south-america/argentina-latest.osm.pbf', - 'id': 286393, - 'paradas_completas': False, - } +king = { + 'name': 'argentina', + 'url': 'http://download.geofabrik.de/south-america/argentina-latest.osm.pbf', + 'id': 286393, + 'paradas_completas': False, +} + + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5), + 'start_date': datetime.today(), + 'schedule_interval': '@once' + # 'queue': 'bash_queue', + # 'pool': 'backfill', + # 'priority_weight': 10, + # 'end_date': datetime(2016, 1, 1), +} + +dag = DAG( + 'importer', default_args=default_args) + + +t1 = PythonOperator( + task_id='download_osm', + python_callable=download_osm, + op_kwargs={'king': king}, + dag=dag) diff --git a/apps/tasks/logging.py b/apps/tasks/logger.py similarity index 100% rename from apps/tasks/logging.py rename to apps/tasks/logger.py diff --git a/requirements/base.txt b/requirements/base.txt index 5f70d85f..60ceed40 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -34,3 +34,5 @@ osmium==2.14.3 # # warning, pinned! test before changing this one urllib3==1.25.2 + +apache-airflow==1.10.3 # https://airflow.apache.org/start.html \ No newline at end of file From 522c15a54218e5be617d9d2c93e7eef5b5d307bd Mon Sep 17 00:00:00 2001 From: Federico Marcos Date: Mon, 7 Oct 2019 20:26:51 -0300 Subject: [PATCH 7/7] WIP prefect setup --- apps/tasks/management/commands/cross_areas.py | 2 +- .../commands/download_osm_airflow.py} | 0 .../commands/street_intersections.py | 2 +- apps/tasks/management/commands/update_osm.py | 5 + apps/tasks/{ => tasks}/cross_areas.py | 0 apps/tasks/tasks/download_osm/admin_areas.py | 233 ++++++++++++++++++ apps/tasks/tasks/download_osm/download_osm.py | 16 ++ apps/tasks/tasks/download_osm/download_pbf.py | 22 ++ .../tasks/{ => tasks}/street_intersections.py | 0 requirements/base.txt | 2 +- 10 files changed, 279 insertions(+), 3 deletions(-) rename apps/tasks/{download_osm.py => management/commands/download_osm_airflow.py} (100%) create mode 100644 apps/tasks/management/commands/update_osm.py rename apps/tasks/{ => tasks}/cross_areas.py (100%) create mode 100644 apps/tasks/tasks/download_osm/admin_areas.py create mode 100644 apps/tasks/tasks/download_osm/download_osm.py create mode 100644 apps/tasks/tasks/download_osm/download_pbf.py rename apps/tasks/{ => tasks}/street_intersections.py (100%) diff --git a/apps/tasks/management/commands/cross_areas.py b/apps/tasks/management/commands/cross_areas.py index d01a235c..7a0d0336 100644 --- a/apps/tasks/management/commands/cross_areas.py +++ b/apps/tasks/management/commands/cross_areas.py @@ -1,5 +1,5 @@ from django.core.management.base import BaseCommand -from apps.commands.cross_areas import CrossAreasTask +from apps.tasks.tasks.cross_areas import CrossAreasTask class Command(BaseCommand): diff --git a/apps/tasks/download_osm.py b/apps/tasks/management/commands/download_osm_airflow.py similarity index 100% rename from apps/tasks/download_osm.py rename to apps/tasks/management/commands/download_osm_airflow.py diff --git a/apps/tasks/management/commands/street_intersections.py b/apps/tasks/management/commands/street_intersections.py index 5c1e525a..43763e20 100644 --- a/apps/tasks/management/commands/street_intersections.py +++ b/apps/tasks/management/commands/street_intersections.py @@ -1,5 +1,5 @@ from django.core.management.base import BaseCommand -from apps.commands.street_intersections import StreetIntersectionsTask +from apps.tasks.tasks.street_intersections import StreetIntersectionsTask class Command(BaseCommand): diff --git a/apps/tasks/management/commands/update_osm.py b/apps/tasks/management/commands/update_osm.py new file mode 100644 index 00000000..c3ad69ea --- /dev/null +++ b/apps/tasks/management/commands/update_osm.py @@ -0,0 +1,5 @@ +from prefect.engine.executors import DaskExecutor +from apps.tasks.tasks.download_osm.download_osm import flow + +executor = DaskExecutor(address="tcp://dask-scheduler:8786") +flow.run(executor=executor) diff --git a/apps/tasks/cross_areas.py b/apps/tasks/tasks/cross_areas.py similarity index 100% rename from apps/tasks/cross_areas.py rename to apps/tasks/tasks/cross_areas.py diff --git a/apps/tasks/tasks/download_osm/admin_areas.py b/apps/tasks/tasks/download_osm/admin_areas.py new file mode 100644 index 00000000..45c98b24 --- /dev/null +++ b/apps/tasks/tasks/download_osm/admin_areas.py @@ -0,0 +1,233 @@ +from prefect import task +from apps.catastro.models import AdministrativeArea +import osmium +from datetime import datetime +from django.contrib.gis.geos import Polygon, MultiPolygon +from apps.utils.fix_way import fix_polygon +from django.contrib.gis.geos.error import GEOSException +from django.contrib.gis.db.models.functions import MakeValid +from django.db.models import F + + +@task +def admin_areas(king): + + ADMIN_LEVEL_MIN = 1 + ADMIN_LEVEL_MAX = 8 + KING_ID = king['id'] # osm_id king + run_timestamp = datetime.now() + inputfile = f'/tmp/osm-{king["name"]}.pbf' + + OLD_KING = list(AdministrativeArea.objects.filter(osm_id=KING_ID)) + KING = None + + admin_areas = [[] for i in range(12)] # index: admin_level, value + admin_relations = {} # index: admin_level, value + admin_relations_ways_ids = {} + # this = self + + class RelsHandler(osmium.SimpleHandler): + def relation(self, r): + if 'boundary' in r.tags and r.tags['boundary'] == 'administrative' and 'name' in r.tags and 'admin_level' in r.tags: # and r.id == KING_ID: + ways = [] + for m in r.members: + # outer (parts and exclaves) / inner (hole) + if m.type == 'w' and m.role in ['outer']: + ways.append(m.ref) + admin_relations_ways_ids.setdefault(m.ref, []).append(r.id) + try: + admin_level = int(r.tags['admin_level']) + except ValueError: + return + admin_relations[r.id] = { + 'import_timestamp': run_timestamp, + 'osm_id': r.id, + 'osm_type': 'r', + 'ways': ways, + 'admin_level': admin_level, + 'name': r.tags['name'], # .encode('utf-8').strip(), + 'tags': r.tags.__dict__, + } + # this.out2(f"REL {r.id} {r.tags['name'].encode('utf-8').strip()}") + + class WaysHandler(osmium.SimpleHandler): + def way(self, w): + + # ways that are admin areas + if 'boundary' in w.tags and w.tags['boundary'] == 'administrative' and 'name' in w.tags and 'admin_level' in w.tags: + linestring = [] + for node in w.nodes: + linestring.append([float(node.x) / 10000000, float(node.y) / 10000000]) + if linestring[0][0] == linestring[-1][0] and linestring[0][1] == linestring[-1][1]: + if int(w.tags['admin_level']) >= ADMIN_LEVEL_MIN and int(w.tags['admin_level']) <= ADMIN_LEVEL_MAX: + poly = Polygon(linestring) + admin_areas[int(w.tags['admin_level'])].append({ + 'import_timestamp': run_timestamp, + 'osm_id': w.id, + 'osm_type': 'w', + 'geometry': poly, + 'geometry_simple': poly.simplify(0.01, True), + 'admin_level': int(w.tags['admin_level']), + 'name': w.tags['name'], # .encode('utf-8').strip(), + 'tags': w.tags.__dict__, + }) + + # fill relations that are admin areas + if w.id in admin_relations_ways_ids: + linestring = [] + for node in w.nodes: + linestring.append([float(node.x) / 10000000, float(node.y) / 10000000]) + + for rel_id in admin_relations_ways_ids[w.id]: + for i, wid in enumerate(admin_relations[rel_id]['ways']): + if wid == w.id: + admin_relations[rel_id]['ways'][i] = linestring + + print(f'Collecting rels, using {inputfile}') + h = RelsHandler() + h.apply_file(inputfile) + print('Collecting ways & nodes') + h = WaysHandler() + h.apply_file(inputfile, locations=True) + + admin_count_ok = 0 + admin_count_all = 0 + admin_count = 0 + print('Joining ways') + for (k, v) in admin_relations.items(): + admin_count_all = admin_count_all + 1 + dbadminarea = AdministrativeArea.objects.filter(osm_id=v['osm_id'], osm_type=v['osm_type']) + if dbadminarea: + dbadminarea = dbadminarea[0] + v['img_panorama'] = dbadminarea.img_panorama + v['img_cuadrada'] = dbadminarea.img_cuadrada + + if v['admin_level'] >= ADMIN_LEVEL_MIN and v['admin_level'] <= ADMIN_LEVEL_MAX: + print(f"osmid={k} level={v['admin_level']} name={v['name'].encode('utf-8')}", end="") + wfull = [w for w in v['ways'] if not isinstance(w, int)] + if len(wfull) == 0 or float(len(wfull)) / float(len(v['ways'])) < 0.8: + print(f" NOK skipping adminarea, less than 80% of fragments") + continue + way, status = fix_polygon(wfull, 1000) + if way is None: + # si esta roto, buscar en la base de datos si hay uno con ese id y usar ese way + print(f' ERROR: {status}') + if dbadminarea: + way = dbadminarea.geometry + else: + admin_count = admin_count + 1 + print(f" OK -> {len(way)}") + # last point equals first + admin_count_ok = admin_count_ok + 1 + try: + poly = Polygon(way) + v['geometry'] = poly + v['geometry_simple'] = poly.simplify(0.01, True) + if v['osm_id'] != KING_ID: + admin_areas[v['admin_level']].append(v) + except Exception as e: + try: + print(f" {e}, retrying as multipolygon") + mp = [] + for p in way: + p_fixed, status = fix_polygon(p, 1000) + if p_fixed: + try: + mp.append(Polygon(p_fixed)) + except Exception as e3: + print(f" {e3} {status}, skipping fragment") + poly = MultiPolygon(mp) + v['geometry'] = poly + v['geometry_simple'] = poly.simplify(0.01, True) + if v['osm_id'] != KING_ID: + admin_areas[v['admin_level']].append(v) + print('-> ok') + except Exception as e2: + print(f" {e2}, error") + if v['osm_id'] == KING_ID: + KING = v + print(f"TOTALS: {admin_count_all} {admin_count} {admin_count_ok}, {len(admin_areas)}") + + def fuzzy_contains(out_geom, in_geom, buffer=0): + return ( + out_geom.intersects(in_geom) and # optimization + out_geom.buffer(buffer).contains(in_geom) + ) + + KING_GEOM_BUFF = KING['geometry_simple'].buffer(0.01) + + def get_parent_aa(node, geometry): + try: + if ( + node['data']['osm_id'] is KING_ID or + fuzzy_contains(node['data']['geometry_simple'], geometry, 0.01) + ): + parent_aa = None + for child in node['children']: + parent_aa = get_parent_aa(child, geometry) + if parent_aa is not None: + break + if parent_aa is None: + return node + else: + return parent_aa + else: + return None + except Exception: + # print('node.geometry', node['data']['geometry']) + print('node.data', node['data']['name'].encode('utf-8')) + print('node.osm_id', node['data']['osm_id']) + print('node.osm_type', node['data']['osm_type']) + raise + + tree = { + 'children': [], + 'data': { + 'import_timestamp': run_timestamp, + 'geometry': KING['geometry_simple'], + 'geometry_simple': KING['geometry_simple'], + 'osm_id': KING['osm_id'], + 'osm_type': KING['osm_type'], + 'name': KING['name'], + 'tags': KING['tags'], + } + } + for li in admin_areas: + # aa = admin area + for aa in li: + if not aa['geometry'].intersects(KING_GEOM_BUFF): + continue + try: + parent_aa = get_parent_aa(tree, aa['geometry']) + aa.pop('admin_level') + if 'ways' in aa: + aa.pop('ways') + else: + print(f" {aa['osm_id']}: {aa['name'].encode('utf-8').strip()}, does not have 'ways' attribute") + if parent_aa is None: + tree['children'].append({'children': [], 'data': aa}) + else: + parent_aa['children'].append({'children': [], 'data': aa}) + except GEOSException as e: + print(f'{str(e)}\n{tree["data"]["osm_id"]} {tree["data"]["name"].encode("utf-8")}\n{aa["osm_id"]} {aa["name"].encode("utf-8")}') + + def print_tree(node, level=0): + print(f'{" " * level} {level} {node["data"]["name"].encode("utf-8")}') + for node in node['children']: + print_tree(node, level + 1) + + # print_tree(tree) + print('saving admin areas tree...') + AdministrativeArea.load_bulk([tree]) + print('finished saving admin areas') + + for K in OLD_KING: + K.delete() + + # fix invalid geometries + # TODO: I think these should be makeValid(ated) earlier in the process, not here but ASAP + # that way we would avoid some issues around intersections that fail earlier in the process of creating the adminareas tree + # the makevalid function is only available in postgis (is not in a library like GEOS) + # in ~4000 shapes we had 10 not valid, so we can use something like `if not geom.valid: cursor.exec('SELECT ST_MAKEVALID(POLYGON('WKT text here'));')` + AdministrativeArea.objects.filter(geometry_simple__isvalid=False).update(geometry_simple=MakeValid(F('geometry_simple'))) + AdministrativeArea.objects.filter(geometry__isvalid=False).update(geometry_simple=MakeValid(F('geometry'))) diff --git a/apps/tasks/tasks/download_osm/download_osm.py b/apps/tasks/tasks/download_osm/download_osm.py new file mode 100644 index 00000000..65d48195 --- /dev/null +++ b/apps/tasks/tasks/download_osm/download_osm.py @@ -0,0 +1,16 @@ +from prefect import Flow +from .download_pbf import download_pbf +from .admin_areas import admin_areas + +king = { + 'name': 'argentina', + 'url': 'http://download.geofabrik.de/south-america/argentina-latest.osm.pbf', + 'id': 286393, + 'paradas_completas': False, +} + + +with Flow("download-osm") as flow: + download_pbf(king) + admin_areas(upstream_tasks=[download_pbf]) + diff --git a/apps/tasks/tasks/download_osm/download_pbf.py b/apps/tasks/tasks/download_osm/download_pbf.py new file mode 100644 index 00000000..413dce60 --- /dev/null +++ b/apps/tasks/tasks/download_osm/download_pbf.py @@ -0,0 +1,22 @@ +from urllib import request +import os +import sys +from prefect import task + + +@task +def download_pbf(king): + filename = f'/tmp/osm-{king["name"]}.pbf' + url = king['url'] + request.urlretrieve(url, filename, lambda nb, bs, fs: progress(nb, bs, fs, url)) + + +def progress(numblocks, blocksize, filesize, url): + base = os.path.basename(url) + try: + percent = min((numblocks * blocksize * 100) / filesize, 100) + except ZeroDivisionError: + percent = 100 + if numblocks != 0: + sys.stdout.write('\b' * 70) + sys.stdout.write('%-66s%3d%%' % (base, percent)) diff --git a/apps/tasks/street_intersections.py b/apps/tasks/tasks/street_intersections.py similarity index 100% rename from apps/tasks/street_intersections.py rename to apps/tasks/tasks/street_intersections.py diff --git a/requirements/base.txt b/requirements/base.txt index 60ceed40..24518b83 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -35,4 +35,4 @@ osmium==2.14.3 # # warning, pinned! test before changing this one urllib3==1.25.2 -apache-airflow==1.10.3 # https://airflow.apache.org/start.html \ No newline at end of file +prefect \ No newline at end of file