Skip to content

humnetlab/sparkmobility

Repository files navigation

CI release GitHub contributors

sparkmobility - A Spark-based Python Library for Processing, Modeling, and Analyzing Large Mobility Datasets

sparkmobility is a library for processing large mobility dataset, including Location-Based Service (LBS) using the Apache Spark framework. This Python repository serves as the main interface between sparkmobility and users. The Scala repository holds various data processing pipelines which can be found at sparkmobility-scala.

Key features of sparkmobility include:

  • Apache Spark-based implementation of the stay detection algorithm published by Zheng et al. (2010).

  • Inference of home and work locations.

  • Extract and visualize mobility metrics and patterns from the stay points output by the stay detection algorithm.

  • Generate synthetic trajectories and OD flow patterns using the detected stay points and human mobility models including gravity model, rank-based EPR model, and TimeGeo.

Table of contents

  1. Examples

Examples

Import and configure sparkmobility

To import sparkmobility, simply call the following:

>>> import sparkmobility as sm
>>> sm.config["CORES"] = 8
>>> sm.config["MEMORY"] = 32
>>> sm.config["LOG_LEVEL"] = "ERROR"
>>> sm.config["TEMP_DIR"] = "/my_path/to_tmp_folder"
JAR file not found at /data_1/albert/sparkmobility/sparkmobility/lib/sparkmobility010.jar. Downloading from GCS...
Download complete.
Spark already installed.
Environment variables set for current session.
To make this persistent, add the following to your shell config (e.g., .bashrc):
export SPARK_HOME="/home/albert/.spark/spark-3.5.5-bin-hadoop3-scala2.13"
export PATH="$SPARK_HOME/bin:$PATH"

Spark sessions can be configured through the sparkmobility configuration file. They include:

  • sm.config['CORES'] sets the number of CPU cores for the parallelism in spark ;
  • sm.config['MEMORY'] sets the amount of memory allocated for both the executor and driver in spark ;
  • sm.config['LOG_LEVEL'] sets the level of messages during compue; ;
  • sm.config['TEMP_DIR'] sets the path to the directory that holds the temporary files when running the pipelines. It is important to set it to a directory that has sufficient storage in disk to prevent out of storage error.

When imported for the first time, sparkmobility automatically searches for the .jar file that contains the pre-compiled pipelines developed in Scala. If not, sparkmobility automatically downloads from Google Cloud Storage.

Initialize a MobilityDataset

In sparkmobility, the class MobilityDataset describes the mobility dataset. It does NOT hold the raw data or the detected stay points in memory but store various attributes of the dataset. The mandatory input fields include:

  • dataset_name (type: str) ;
  • raw_data_path (type: str) ;
  • processed_data_path (type: str) ;
  • column_mappings (type: dict) ;

Additionally, it is optional to define the time period and region of interests, which help reduce the computation time during the stay detection phase by selecting a subset of records:

  • start_datetime (type: datetime) ;
  • end_datetime (type: datetime) ;
  • longitude (type: list);
  • laitude (type: list);
  • time_zone (type: str) specifies the local time zone of the region of interest.

Initialize a MobilityDataset:

>>> from sparkmobility.datatset import MobilityDataset
>>> # create a MobilityDataset
>>> myDataset = MobilityDataset(
        dataset_name="example_dataset",
        raw_data_path="example_dataset_raw_lbs",
        processed_data_path="example_dataset_output",
        column_mappings={"caid": "caid",
                         "latitude": "latitude",
                         "longitude": "longitude",
                         "utc_timestamp": "utc_timestamp"},
        start_datetime="2019-01-01 00:00:00",
        end_datetime="2025-01-31 23:59:59",
        longitude=[-118.9448, -117.6463], # LA region
        latitude=[33.7037, 34.3373],
        time_zone="America/Los Angeles",
    )
>>> print(type(myDataset))
<class 'sparkmobility.datatset.MobilityDataset'>

Conduct StayDetection

StayDetection is a process for detecting the stay points and their respective stay duration from the raw mobility dataset, which comes in the format of (USER_ID, TIME, LNG, LAT). To call StayDetection:

>>> from sparkmobility.processing.stay_detection import StayDetection
# Initialize the StayDetection instance
>>> stays = StayDetection(MobilityDataset=myDataset)
# Conduct stay detection
>>> stays.get_stays(hex_resolution=9)
# Compute OD flow matrix for trips between home and work locations
>>> stays.get_home_work_od_matrix(hex_resolution=7)
# Compute mobility distributions
>>> stays.summarize()

Argument hex_resolution specifies the resolution of the hexagonal grids in the output data. The output of the StayDetection module is automatically saved to the directory processed_data_path when MobilityDataset is first initialized. The structures are:

πŸ“¦ processed_data_path
 ┣ πŸ“‚ HomeWorkODMatrix
 ┃ β”— πŸ“‚ Resolution7
 ┣ πŸ“‚ Metrics
 ┃ ┣ πŸ“‚ DailyVisitedLocations
 ┃ ┣ πŸ“‚ DepartureTimeDistribution
 ┃ β”— πŸ“‚ StayDurationDistribution
 ┣ πŸ“‚ StayPoints
 ┣ πŸ“‚ StayPointsWithHomeWork
 β”— πŸ“œ config.json

To visualize the output of StayDetection, we can call the following visualization functions:

Plot mobiilty distributions:

>>> from sparkmobility.visualization.population import plot_mobility_distributions
# plot distribution of the number of daily visited locations N, the depature time t, and the stay duration delta_t
>>> fig, ax = plot_mobility_distributions(myDataset)

Plot distributions

Plot home work OD flow:

>>> from sparkmobility.visualization.population import plot_flow
# Load the OD flow matrix for trips between home and work locations
>>> flow_df = myDataset.load_home_work_flow(hex_resolution=7).toPandas()
# Plot the flow
>>> plot_flow(flow_df[flow_df["flow"] > 20])

Plot flow

Plot user trajectories:

>>> import pyspark.sql.functions as F
>>> from sparkmobility.visualization.individual import plot_trajectories
# Load detected stay points
>>> stay_points = myDataset.load_stays()
# Filter for a user's trajectory
>>> stay_points = stay_points.filter(
        F.col("caid") == "0006e4cac5385960141fee505fbb73922c27309b34c45a8c5bb0bf03ace****"
    ).toPandas()
>>> plot_trajectories(stay_points)

Plot trajectory

Plot distribution of home and work locations

myDataset.plot_home_locations(hex_resolution=9)

Plot home

myDataset.plot_work_locations(hex_resolution=9)

Plot home

Select active users using UserSelection based on stay points

The UserSelection modules filters the users in the stay points dataset based on the active timespan of each user and the number of stay points detected:

  • num_stay_points_range type(list) ;
  • time_span_days_range type(list) ;

The method UserSelection.filter_users returns a visualization of the number of users by the criteria and saves the stay points of the selected users to processed_data_path/FilteredUserStayPoints

>>> from sparkmobility.processing.user_selection import UserSelection
# Create an instance variable for the UserSelection module
>>> user_selection = UserSelection(myDataset)
# Filter users based on the number of stay points and the active timespan
>>> fig, ax = user_selection.filter_users(
        num_stay_points_range=[100, 800],
        time_span_days_range=[15, 30]
    )

Plot user selection

Related packages

scikit-mobility is a similar package that deals with mobility datasets. scikit-mobility uses two data structures, trajectories (TrajDataFrame) and mobility flows (FlowDataFrame), to manage, analyze, and model human mobility data. Instead of using pandas, sparkmobility levearges Apache Spark. The input mobility data are structured as Resilient Distributed Dataset (RDD) during processing. The gravity and ranked-based EPR models implemented in sparkmobility are adopted from scikit-mobility implementations.

Packages

No packages published

Contributors 3

  •  
  •  
  •  

Languages