diff --git a/python/grass/pygrass/utils.py b/python/grass/pygrass/utils.py index 3cbd638e85f..11931d1a632 100644 --- a/python/grass/pygrass/utils.py +++ b/python/grass/pygrass/utils.py @@ -436,6 +436,432 @@ def table_exist(cursor, table_name): return bool(one and one[0]) +def txt2numpy( + tablestring, + sep=",", + names=None, + null_value=None, + fill_value=None, + comments="#", + usecols=None, + encoding=None, + structured=True, +): + """Read table-like output from grass modules as Numpy array; + format instructions are handed down to Numpys genfromtxt function + + :param stdout: tabular stdout from GRASS GIS module call + :type stdout: str|byte + + :param sep: Separator delimiting columns + :type sep: str + + :param names: List of strings with names for columns + :type names: list + + :param null_value: Characters representing the no-data value + :type null_value: str + + :param fill_value: Value to fill no-data with + :type fill_value: str + + :param comments: Character that identifies comments in the input string + :type comments: str + + :param usecols: List of columns to import + :type usecols: list + + :param structured: return structured array if True, un-structured otherwise + :type structured: bool + + :return: numpy.ndarray + + >>> import grass.script.core as grasscore + >>> import numpy as np + >>> txt = grasscore.read_command( + ... "r.stats", flags="cn", input="basin_50K,geology_30m", separator="|" + ... ) + >>> np_array = txt2numpy(txt, sep="|", names=None) + >>> print(np_array) + + """ + + from io import BytesIO + import numpy as np + + if not encoding: + encoding = grassutils._get_encoding() + + if type(tablestring).__name__ == "str": + tablestring = grasscore.encode(tablestring, encoding=encoding) + elif type(tablestring).__name__ != "bytes": + raise GrassError(_("Unsupported data type")) + + kwargs = { + "missing_values": null_value, + "filling_values": fill_value, + "usecols": usecols, + "names": names, + "encoding": encoding, + "delimiter": sep, + } + + if structured: + kwargs["dtype"] = None + + return np.genfromtxt(BytesIO(tablestring), **kwargs) + + +def numpy2table( + np_array, + table, + connection, + formats=None, + names=False, + column_prefix="column", + update_formats=True, + overwrite=True, +): + """Write numpy array to database table. Most suitable SQL data type is + extracted from the numpy dtype, as well as column names (if possible), + if not given by the user + + :param np_array: structured or unstructured 2d numpy array + :type np_array: numpy.ndarray + + :param connection: A database (PostgreSQL or SQLite) connection object + :type connection: connection + + :param formats: A list of strings that describe the dtype of the numpy array + :type formats: list + + :param names: List of strings with names for columns + :type names: list + + :param column_prefix: A sring with the prefix to be used for column names + :type column_prefix: str + + :param update_formats: Flag whether to overwrite existing format definitions in structured numpy arrays + :type update_formats: bool + + :param overwrite: Whether to overwrite existing tables with the same name + :type overwrite: bool + + >>> import numpy as np + >>> from io import BytesIO + >>> import sqlite3 + >>> conn = sqlite3.connect("file::memory:?cache=shared") + >>> np_array = np.array( + ... [ + ... ["112", "abc", "2005-01-01", "13.543", "True", "1"], + ... [ + ... "9223372036854775806", + ... "test", + ... "2005-02-01", + ... "29.543", + ... "False", + ... "0", + ... ], + ... ] + ... ) + >>> table = "test" + >>> numpy2table( + ... np_array, table, conn, names=None, formats=None, update_formats=True + ... ) + >>> conn.close() + """ + + from io import BytesIO + import numpy as np + from numpy.lib import recfunctions as rfn + + connection_info = str(type(connection)).split("'")[1].lower() + if not "pg" and "sqlite" not in connection_info: + raise GrassError(_("DB backend not supported, please check connection!")) + + dbdriver = ( + "sqlite" if "sqlite" in str(type(connection)).split("'")[1].lower() else "pg" + ) + + # Check DB connection + if dbdriver == "sqlite": + pass + elif dbdriver == "pg": + from psycopg2.extras import execute_values + + """ + # Get all numpy dtypes + for char_code in np.typecodes["All"]: + print(np.dtype(char_code).num, ": \"\" # ", np.dtype(char_code), char_code) + # Compare to: + - SQLite: https://www.sqlite.org/datatype3.html + - PostgreSQL: https://www.postgresql.org/docs/11/datatype.html + """ + sql_to_dtype = { + "sqlite": { + "INTEGER": [0, 2, 3, 4, 5, 6, 7, 8, 9, 10], + "REAL": [11, 12, 23], + "TEXT": [1, 18, 19, 21, 22], + "BLOB": [17], + }, + "pg": { + "boolean": [0], + "smallint": [2, 3, 4], + "integer": [5, 6], + "bigint": [7, 8, 9, 10], + "text": [1, 18, 19], + "real": [11, 23], + "double precision": [12], + "bytea": [17], + "timestamp": [21], + "intervall": [22], + }, + } + + # Dictionary to translate numpy dtypes to backend-specific SQL data types + dtype_to_sql = { + "sqlite": { + 0: "INTEGER", # numpy: bool ; short form: ? + 1: "TEXT", # numpy: int8 ; short form: b + 2: "INTEGER", # numpy: uint8 ; short form: B + 3: "INTEGER", # numpy: int16 ; short form: h + 4: "INTEGER", # numpy: uint16 ; short form: H + 5: "INTEGER", # numpy: int32 ; short form: i + 6: "INTEGER", # numpy: uint32 ; short form: I + 7: "INTEGER", # numpy: int32 ; short form: l + 8: "INTEGER", # numpy: uint32 ; short form: L + 9: "INTEGER", # numpy: int64 ; short form: q, p + 10: "INTEGER", # numpy: uint64 ; short form: Q, P + 11: "REAL", # numpy: float32 ; short form: f + 12: "REAL", # numpy: float64 ; short form: d + 13: "UNSUPPORTED", # numpy: float64 ; short form: g + 14: "UNSUPPORTED", # numpy: complex64 ; short form: F + 15: "UNSUPPORTED", # numpy: complex128 ; short form: D + 16: "UNSUPPORTED", # numpy: complex128 ; short form: G + 17: "BLOB", # numpy: object ; short form: O + 18: "TEXT", # numpy: |S0 ; short form: S + 19: "TEXT", # numpy: differs from length of parameter !" + ) + ) + + length_names = ( + len(np_array.dtype.names) if np_array.dtype.names else np_array.shape[1] + ) + if names: + if len(names) != length_names: + raise GrassError( + _( + "Length of parameter does not match number of columns in array!" + ) + ) + + # Check if user-given formats can be assigned to data + if formats: + if len(formats) != length_names: + raise GrassError( + _( + "Length of parameter does not match number of columns in array!" + ) + ) + + for idx, np_format in enumerate(formats): + try: + if np_format == "str": + size = ( + np_array[:, idx].dtype.itemsize + if not np_array.dtype.names + else np_array[np_array.dtype.names[idx]].dtype.itemsize + ) + np_format = np.dtype((np_format, size)).str + formats[idx] = np_format + if not np_array.dtype.names: + np_array[:, idx].astype(np_format).astype("str") == np_array[ + :, idx + ].astype(np_format).astype("str") + else: + np_array[np_array.dtype.names[idx]].astype(np_format) + except: + if not np_array.dtype.names: + raise GrassError( + _( + "Cannot represent column number {} as {}".format( + idx, np_format + ) + ) + ) + raise GrassError( + _( + "Cannot represent column {} as {}".format( + np_array.dtype.names[idx], np_format + ) + ) + ) + + # Start with unstructured array + if not np_array.dtype.names: + np_array_view = np_array + elif update_formats or formats: + np_array_view = rfn.structured_to_unstructured(np_array) + + # Generate a list of minimal formats to represent data in array columns + if not formats and (update_formats or not np_array.dtype.names): + formats = [] + for col_idx in range(np_array_view.shape[1]): + dtype = None + types = [ + np.uint8, + np.int8, + np.uint16, + np.int16, + np.uint32, + np.int32, + np.uint64, + np.int64, + np.single, + np.double, + np.longlong, + np.ulonglong, + np.datetime64, + ] + for np_dtype in types: # np.typecodes["All"]: + try: + # Check if data can be casted and still match original after type-cast + if not all( + np_array_view[:, col_idx].astype(np_dtype).astype("str") + == np_array_view[:, col_idx].astype("str") + ): + continue + + # print(np_array_view[:,col_idx].astype(np_dtype)) + # Bool types represented as integer + if np_dtype in (np.uint8, np.int8): + dtype = ( + np.dtype(np.bool) + if np.max(np_array_view[:, col_idx].astype(np_dtype)) == 1 + and np.min(np_array_view[:, col_idx].astype(np_dtype)) == 0 + else np.dtype(np_dtype) + ) + else: + # get character code of dtype + dtype = np_array_view[:, col_idx].astype(np_dtype).dtype + break + except: + continue + if not dtype: + dtype = np_array_view[:, col_idx].dtype + formats.append(dtype) + + # Generate a list of tuples with column names and formats for the array + if not names and formats: + dtype = np.dtype( + [ + ( + "{}{}".format(column_prefix, idx) + if not np_array.dtype.names + else np_array.dtype.names[idx], + np_format, + ) + for idx, np_format in enumerate(formats) + ] + ) + elif names and formats: + dtype = np.dtype( + [(names[idx], np_format) for idx, np_format in enumerate(formats)] + ) + elif names and not formats: + dtype = np.dtype([(np_name, np_name) for idx, np_name in enumerate(names)]) + else: + dtype = None + + # Start with unstructured array + if not np_array.dtype.names or update_formats or formats: + structured_array = rfn.unstructured_to_structured(np_array_view, dtype) + else: + structured_array = np_array + + # Generate a list of SQL data types for columns + columns = [] + for col in structured_array.dtype.names: + type_code = structured_array[col].dtype.num + columns.append("{} {}".format(col, dtype_to_sql[dbdriver][type_code])) + + # Define initial SQL strings + create_sql = "CREATE TABLE {} ({});".format(table, ", ".join(columns)) + + # Execute SQL code + with connection: + cur = connection.cursor() + if overwrite: + drop_sql = "DROP TABLE IF EXISTS {};".format(table) + cur.execute(drop_sql) + # Create table + cur.execute(create_sql) + + # Insert data + if dbdriver == "sqlite": + insert_sql = "INSERT INTO {}({}) VALUES({});".format( + table, + ", ".join(structured_array.dtype.names), + ",".join(["?"] * len(structured_array.dtype.names)), + ) + cur.executemany(insert_sql, structured_array.tolist()) + elif dbdriver == "pg": + # For arrays that do not contain objects or binary data, they could be loaded using the copy statement + if { + np.dtype(descr[1]).num for descr in structured_array.dtype.descr + }.isdisjoint({13, 14, 15, 16, 17, 20}): + np_array_txt = BytesIO() + np.savetxt(np_array_txt, structured_array, delimiter="\t", fmt="%s") + np_array_txt.seek(0) + cur.copy_from(np_array_txt, table) + else: + insert_sql = "INSERT INTO {}({}) VALUES %s;".format( + table, + ", ".join(structured_array.dtype.names), + ) + execute_values(cur, insert_sql, structured_array.tolist()) + connection.commit() + + def create_test_vector_map(map_name="test_vector"): """This functions creates a vector map layer with points, lines, boundaries, centroids, areas, isles and attributes for testing purposes