✨ Add adls_to_df() and adls_list() Prefect tasks#1037
Conversation
| def adls_to_df( | ||
| path: str, | ||
| sep: str = "\t", | ||
| credentials_secret: str | None = None, | ||
| config_key: str | None = None, | ||
| ) -> pd.DataFrame: | ||
| r"""Load file(s) from the Azure Data Lake to a pandas DataFrame. | ||
|
|
||
| Note: Currently supports CSV and parquet files. | ||
|
|
||
| Args: | ||
| path (str): The path from which to load the DataFrame. | ||
| sep (str, optional): The separator to use when reading a CSV file. | ||
| Defaults to "\t". | ||
| credentials_secret (str, optional): The name of the Azure Key Vault secret | ||
| storing the credentials. | ||
| config_key (str, optional): The key in the viadot config holding relevant | ||
| credentials. | ||
|
|
||
| Raises: | ||
| MissingSourceCredentialsError: If credentials were not provided. | ||
|
|
||
| Returns: | ||
| pd.DataFrame: The HTTP response object. | ||
| """ | ||
| logger = get_run_logger() | ||
|
|
||
| if not (credentials_secret or config_key): | ||
| raise MissingSourceCredentialsError | ||
|
|
||
| credentials = get_credentials(credentials_secret) | ||
| lake = AzureDataLake(credentials=credentials, config_key=config_key) | ||
|
|
||
| full_dl_path = str(Path(credentials["ACCOUNT_NAME"], path)) | ||
| logger.info(f"Downloading data from {full_dl_path} to a DataFrame...") | ||
|
|
||
| name = Path(path).stem | ||
| suffixes = "".join(Path(path).suffixes) | ||
| file_name = f"{name}{suffixes}" | ||
|
|
||
| lake.download(to_path=file_name, from_path=path, recursive=False) | ||
|
|
||
| if ".csv" in suffixes: | ||
| df = pd.read_csv(file_name, sep=sep) | ||
| elif ".parquet" in suffixes: | ||
| df = pd.read_parquet(file_name) | ||
|
|
||
| Path.unlink(file_name) | ||
|
|
||
| logger.info("Successfully loaded data.") | ||
|
|
||
| return df |
There was a problem hiding this comment.
There's already an ADLS.to_df() method than you can use for this task - no need to reinvent the wheel
There was a problem hiding this comment.
That is not working. I created it because Fabio asked me to.
There was a problem hiding this comment.
What's not working? Can you link the issue? Also, you should fix the issue and not add another function that does the same thing.
| path (str): The path to the directory which contents you want to list. | ||
| recursive (bool, optional): If True, recursively list all subdirectories | ||
| and files. Defaults to False. | ||
| file_to_match (str, optional): If exist it only returns files with that name. |
There was a problem hiding this comment.
What is this param for? If you want to check if a file exists, you can use the existing AzureDataLake.exists() method (and add a task for it if needed). Also, any logic should be defined in viadot source, tasks and flows should only use viadot sources without adding any additional ingestion logic on top.
There was a problem hiding this comment.
This is a task to list files in ADLS. I transcripted from viadot 1, because Fabio needed it. If you want to leave it in another part, Talk to Fabio.
There was a problem hiding this comment.
It doesn't matter where the code is from, I'm only checking if it's good enough to merge or not. You forgot to link the issue?
|
@fdelgadodyvenia Should it be finished or removed? Where this functionality is needed? |
adls_to_df() and adls_list() Prefect tasks
Summary
Added prefect 2.0 task into adls utils. Those tasks are, to list files given a path and create a pandas data frame from an adls path file.
Importance
Required by the migration project.
Checklist
This PR:
CONTRIBUTING.mdCHANGELOG.md