-
Notifications
You must be signed in to change notification settings - Fork 0
Home
Data‑Warp is a powerful Python library that simplifies working with data files across various formats and storage locations. At its core is the FileConnector module—a universal connector designed to streamline data ingestion from multiple sources with minimal configuration.
Data‑Warp provides a one‑stop solution for all data operations including connectors, orchestration, transformation, ELT, monitoring, dashboards, and reporting, making it an invaluable tool for data engineers.
Data‑Warp Connectors abstracts the complexity of reading files from different sources (local, HTTP, S3, etc.) and in various formats. It supports:
-
Multiple File Formats:
CSV, JSON, Parquet, Excel, and more. -
Flexible Readers:
Choose from the Pandas reader, built‑in Python modules, or PyArrow for efficient data ingestion. -
Streaming & Batching:
Process large datasets efficiently with advanced streaming and batching capabilities via theStreamingBatchIterator. -
Unified API:
A singleFileConnectorclass provides methods such asfetch(),stream(), andfetch_batch()for consistent data ingestion across file types and sources.
Data‑Warp is organized into several key modules:
-
Base Connector:
Contains abstract base classes and shared functionality for connectors. -
File Source Handlers:
Implements specific file source handlers (e.g. LocalFileSource, HTTPFileSource, S3FileSource). -
FileConnector Module:
The heart of the library, providing file type inference, reader selection, and methods for fetching data directly, streaming, and batching. -
Utils Module:
Contains utility functions and decorators (likeinherit_docstring_and_signature) as well as theStreamingBatchIteratorclass for efficient batch processing and additional data manipulation (filtering, searching, mapping, flattening, and converting to DataFrame).
-
CSV Files:
Read CSV files using Pandas, Python’s built‑incsvmodule, or PyArrow for optimized performance. -
JSON Files:
Read JSON files in various formats:- JSON Array: Stream each object individually.
- Dict‑of‑Lists: Convert column‑oriented JSON to row‑oriented records.
- NDJSON: Process newline‑delimited JSON.
-
Parquet & Excel Files:
Efficiently read Parquet files (using Pandas or PyArrow) and Excel files (using Pandas).
-
Streaming:
Process large files in chunks without loading the entire file into memory. -
Batch Processing:
Thefetch_batch()method provides a rich iterator—implemented as theStreamingBatchIterator—that supports:-
next()to retrieve the next batch. -
to_list()to collect all batches. -
flatten_to_list()to flatten batches into a single list. -
to_dataframe()to convert the streamed data directly into a pandas DataFrame. -
filter_batches(predicate),search(search_func),map_batches(func), andflatten()for advanced data manipulation. -
__len__()to get the total number of batches.
-
-
Single Class Interface:
TheFileConnectorclass allows you to easily switch between data sources and file formats. -
Modular Design:
The library’s modular design makes it easy to add support for additional file types or data sources.
from data_warp.connectors.file_connector import FileConnector
# Read a CSV file using Pandas.
connector = FileConnector("data/sample.csv", source="local")
df = connector.fetch()
print(df.head())Streaming & Batch Processing
connector = FileConnector("data/sample.json", file_type="json", source="local", reader="builtin") batch_iterator = connector.fetch_batch(batch_size=1)
first_batch = batch_iterator.next() print("First batch:", first_batch)
flat_records = batch_iterator.flatten_to_list() print("Flat records:", flat_records)
df = batch_iterator.to_dataframe() print("DataFrame:", df.head())
Filtering, Searching & Mapping
filtered_batches = connector.fetch_batch(batch_size=2).filter_batches(lambda batch: len(batch) > 1) for batch in filtered_batches: print("Filtered batch:", batch)
matches = connector.fetch_batch(batch_size=1).search(lambda record: record.get('name') == 'Alice') for match in matches: print("Found record:", match)
mapped_batches = connector.fetch_batch(batch_size=1).map_batches( lambda batch: [{k: (v.upper() if isinstance(v, str) and k == 'name' else v) for k, v in record.items()} for record in batch] ) for batch in mapped_batches: print("Mapped batch:", batch)
Upcoming Features
Data‑Warp is continuously evolving. Some upcoming features include:
Extended Format Support:
Add support for additional file formats such as XML, Avro, and ORC.
Enhanced Cloud Integrations:
Deep integration with cloud storage platforms (AWS, GCP, Azure) for seamless data ingestion.
Real-time Monitoring & Alerts:
Built‑in monitoring, logging, and alerting for data pipelines.
Integrated Data Transformation & Orchestration:
Native support for data transformation operations, orchestration, and ELT workflows.
Interactive Dashboards & Reporting:
Tools for building dashboards and reports directly from your data pipelines.
Improved Performance & Scalability:
Further optimizations for large-scale, distributed data processing.
Installation
Install via pip:
pip install data-warp
(Replace with the actual package name if different.) Contributing
Contributions are welcome! Please see our CONTRIBUTING.md for details on how to contribute to the project. License
This project is licensed under the MIT License. See the LICENSE file for details.
This Markdown page includes all the sections from Overview to Upcoming Features, as well as usage ex