From 8895e580c8652957069d520e6e22f6257f626578 Mon Sep 17 00:00:00 2001 From: Valentin Lobstein Date: Sun, 8 Feb 2026 00:45:38 +0100 Subject: [PATCH 1/3] Feat: Add AsyncClient, search API, and Pro detection --- README.md | 86 ++++++++++ leakix/__init__.py | 2 + leakix/async_client.py | 360 +++++++++++++++++++++++++++++++++++++++++ leakix/client.py | 124 +++++++++++++- pyproject.toml | 14 +- 5 files changed, 579 insertions(+), 7 deletions(-) create mode 100644 leakix/async_client.py diff --git a/README.md b/README.md index b5d5be4..124c190 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,48 @@ pip install leakix To run tests, use `poetry run pytest`. +## Quick Start + +```python +from leakix import Client + +client = Client(api_key="your-api-key") + +# Simple search - same syntax as the website +results = client.search("+plugin:GitConfigHttpPlugin", scope="leak") +for event in results.json(): + print(event.ip, event.host) + +# Search services +results = client.search("+country:FR +port:22", scope="service") +``` + +## Async Client + +For async applications, use `AsyncClient`: + +```python +import asyncio +from leakix import AsyncClient + +async def main(): + async with AsyncClient(api_key="your-api-key") as client: + # Simple search + results = await client.search("+plugin:GitConfigHttpPlugin", scope="leak") + for event in results: + print(event.ip, event.host) + + # Host lookup + host = await client.get_host("8.8.8.8") + print(host["services"]) + + # Streaming bulk export + async for aggregation in client.bulk_export_stream(queries): + print(aggregation.events[0].ip) + +asyncio.run(main()) +``` + ## Documentation Docstrings are used to document the library. @@ -156,6 +198,46 @@ def example_get_plugins(): print(p.description) +def example_search_simple(): + """ + Simple search using query string syntax (same as the website). + No need to build Query objects manually. + """ + response = CLIENT.search("+plugin:GitConfigHttpPlugin", scope="leak") + for event in response.json(): + print(event.ip) + + +def example_search_service(): + """ + Search for services with multiple filters. + """ + response = CLIENT.search("+country:FR +port:22", scope="service") + for event in response.json(): + print(event.ip, event.port) + + +def example_get_domain(): + """ + Get services and leaks for a domain. + """ + response = CLIENT.get_domain("example.com") + if response.is_success(): + print("Services:", response.json()["services"]) + print("Leaks:", response.json()["leaks"]) + + +def example_bulk_stream(): + """ + Streaming bulk export - memory efficient for large datasets. + Results are yielded one by one instead of loading all into memory. + """ + query = MustQuery(field=PluginField(Plugin.GitConfigHttpPlugin)) + for aggregation in CLIENT.bulk_export_stream(queries=[query]): + for event in aggregation.events: + print(event.ip) + + if __name__ == "__main__": example_get_host_filter_plugin() example_get_service_filter_plugin() @@ -165,4 +247,8 @@ if __name__ == "__main__": example_get_leak_plugins_with_time() example_get_leak_raw_query() example_get_plugins() + example_search_simple() + example_search_service() + example_get_domain() + example_bulk_stream() ``` diff --git a/leakix/__init__.py b/leakix/__init__.py index d3d128b..625f940 100644 --- a/leakix/__init__.py +++ b/leakix/__init__.py @@ -1,3 +1,4 @@ +from leakix.async_client import AsyncClient as AsyncClient from leakix.client import __VERSION__ as __VERSION__ from leakix.client import Client as Client from leakix.client import HostResult as HostResult @@ -70,6 +71,7 @@ __all__ = [ "__VERSION__", + "AsyncClient", "Client", "HostResult", "Scope", diff --git a/leakix/async_client.py b/leakix/async_client.py new file mode 100644 index 0000000..f0a298b --- /dev/null +++ b/leakix/async_client.py @@ -0,0 +1,360 @@ +"""Async LeakIX API client using httpx.""" + +import asyncio +import json +from typing import AsyncIterator + +import httpx +from l9format import l9format + +from leakix.domain import L9Subdomain +from leakix.plugin import APIResult +from leakix.query import EmptyQuery, Query, RawQuery + +__VERSION__ = "0.2.0" + +DEFAULT_URL = "https://leakix.net" +DEFAULT_TIMEOUT = 30.0 + + +class AsyncClient: + """Async client for the LeakIX API.""" + + MAX_RESULTS_PER_PAGE = 20 + + def __init__( + self, + api_key: str | None = None, + base_url: str | None = DEFAULT_URL, + timeout: float = DEFAULT_TIMEOUT, + ): + self.api_key = api_key + self.base_url = base_url if base_url else DEFAULT_URL + self.timeout = timeout + self.headers = { + "Accept": "application/json", + "User-agent": f"leakix-client-python/{__VERSION__}", + } + if api_key: + self.headers["api-key"] = api_key + self._client: httpx.AsyncClient | None = None + self._api_status: dict | None = None # Cached API status + + async def _get_client(self) -> httpx.AsyncClient: + """Get or create the HTTP client.""" + if self._client is None or self._client.is_closed: + self._client = httpx.AsyncClient( + base_url=self.base_url, + headers=self.headers, + timeout=self.timeout, + ) + return self._client + + async def close(self) -> None: + """Close the HTTP client.""" + if self._client is not None and not self._client.is_closed: + await self._client.aclose() + self._client = None + + async def __aenter__(self) -> "AsyncClient": + return self + + async def __aexit__(self, *args) -> None: + await self.close() + + async def _get( + self, + path: str, + params: dict | None = None, + max_retries: int = 3, + ) -> tuple[int, dict | list | None]: + """Make a GET request and return status code and JSON response. + + Automatically retries on rate limit (429) with exponential backoff. + """ + client = await self._get_client() + retries = 0 + delay = 1.0 + + while True: + response = await client.get(path, params=params) + + if response.status_code == 204: + return response.status_code, [] + if response.status_code == 200: + return response.status_code, response.json() if response.content else [] + if response.status_code == 429: + if retries >= max_retries: + return response.status_code, None + retries += 1 + await asyncio.sleep(delay) + delay *= 2 + continue + return response.status_code, response.json() + + async def get( + self, + scope: str, + queries: list[Query] | None = None, + page: int = 0, + ) -> list[l9format.L9Event]: + """ + Search LeakIX for services or leaks. + + Args: + scope: Either "service" or "leak". + queries: List of Query objects. + page: Page number (0-indexed). + + Returns: + List of L9Event results. + """ + if page < 0: + raise ValueError("Page argument must be a positive integer") + if queries is None or len(queries) == 0: + serialized_query = EmptyQuery().serialize() + else: + serialized_query = [q.serialize() for q in queries] + serialized_query = " ".join(serialized_query) + serialized_query = f"{serialized_query}" + + status, data = await self._get( + "/search", + params={"scope": scope, "q": serialized_query, "page": page}, + ) + if status == 200 and isinstance(data, list): + return [l9format.L9Event.from_dict(item) for item in data] + return [] + + async def get_service( + self, + queries: list[Query] | None = None, + page: int = 0, + ) -> list[l9format.L9Event]: + """Shortcut for get with scope='service'.""" + return await self.get("service", queries=queries, page=page) + + async def get_leak( + self, + queries: list[Query] | None = None, + page: int = 0, + ) -> list[l9format.L9Event]: + """Shortcut for get with scope='leak'.""" + return await self.get("leak", queries=queries, page=page) + + async def search( + self, + query: str, + scope: str = "leak", + page: int = 0, + ) -> list[l9format.L9Event]: + """ + Simple search using a query string. + + Args: + query: Search query string (same syntax as website). + scope: Either "leak" or "service" (default: "leak"). + page: Page number for pagination (default: 0). + + Returns: + List of L9Event results. + """ + queries = [RawQuery(query)] + if scope == "service": + return await self.get_service(queries=queries, page=page) + return await self.get_leak(queries=queries, page=page) + + async def get_host(self, ip: str) -> dict: + """ + Get services and leaks for a specific IP address. + + Args: + ip: IPv4 or IPv6 address. + + Returns: + Dict with 'services' and 'leaks' lists. + """ + status, data = await self._get(f"/host/{ip}") + if status == 200 and isinstance(data, dict): + services = data.get("Services") or [] + leaks = data.get("Leaks") or [] + return { + "services": self._parse_events(services), + "leaks": self._parse_events(leaks), + } + return {"services": [], "leaks": []} + + def _parse_events(self, items: list) -> list: + """Parse events, falling back to raw dicts if l9format fails.""" + results = [] + for item in items: + try: + results.append(l9format.L9Event.from_dict(item)) + except Exception: + results.append(item) + return results + + async def get_domain(self, domain: str) -> dict: + """ + Get services and leaks for a specific domain. + + Args: + domain: Domain name. + + Returns: + Dict with 'services' and 'leaks' lists. + """ + status, data = await self._get(f"/domain/{domain}") + if status == 200 and isinstance(data, dict): + services = data.get("Services") or [] + leaks = data.get("Leaks") or [] + return { + "services": self._parse_events(services), + "leaks": self._parse_events(leaks), + } + return {"services": [], "leaks": []} + + async def get_subdomains(self, domain: str) -> list[L9Subdomain]: + """ + Get subdomains for a domain. + + Args: + domain: Domain name. + + Returns: + List of L9Subdomain objects. + """ + status, data = await self._get(f"/api/subdomains/{domain}") + if status == 200 and isinstance(data, list): + return [L9Subdomain.from_dict(d) for d in data] + return [] + + async def get_plugins(self) -> list[APIResult]: + """ + Get list of available plugins. + + Returns: + List of APIResult objects. + """ + status, data = await self._get("/api/plugins") + if status == 200 and isinstance(data, list): + return [APIResult.from_dict(d) for d in data] + return [] + + async def get_api_status(self, force: bool = False) -> dict: + """ + Check API status and subscription info via /api/user/info endpoint. + + Results are cached per client instance. Use force=True to refresh. + + Args: + force: Force refresh of cached status. + + Returns: + Dict with username, email, level, is_pro, quota, features, created. + """ + if self._api_status is not None and not force: + return self._api_status + + if not self.api_key: + self._api_status = { + "authenticated": False, + "is_pro": False, + "features": [], + "quota": {"total": 0, "remaining": 0, "used": 0}, + } + return self._api_status + + status_code, data = await self._get("/api/user/info") + if status_code == 200 and isinstance(data, dict): + self._api_status = { + "authenticated": True, + "username": data.get("username"), + "email": data.get("email"), + "level": data.get("level"), + "is_pro": data.get("is_pro", False), + "quota": data.get("quota", {}), + "features": data.get("features", []), + "created": data.get("created"), + } + else: + self._api_status = { + "authenticated": False, + "is_pro": False, + "features": [], + "quota": {"total": 0, "remaining": 0, "used": 0}, + } + + return self._api_status + + async def is_pro(self) -> bool: + """Check if the API key has Pro access. Result is cached.""" + status = await self.get_api_status() + return status.get("is_pro", False) + + async def bulk_export( + self, + queries: list[Query] | None = None, + ) -> list[l9format.L9Aggregation]: + """ + Bulk export leaks (Pro API feature). + + Args: + queries: List of Query objects. + + Returns: + List of L9Aggregation results. + """ + if queries is None or len(queries) == 0: + serialized_query = EmptyQuery().serialize() + else: + serialized_query = [q.serialize() for q in queries] + serialized_query = " ".join(serialized_query) + serialized_query = f"{serialized_query}" + + client = await self._get_client() + results: list[l9format.L9Aggregation] = [] + + async with client.stream( + "GET", "/bulk/search", params={"q": serialized_query} + ) as response: + if response.status_code != 200: + return results + async for line in response.aiter_lines(): + if line: + data = json.loads(line) + results.append(l9format.L9Aggregation.from_dict(data)) + + return results + + async def bulk_export_stream( + self, + queries: list[Query] | None = None, + ) -> AsyncIterator[l9format.L9Aggregation]: + """ + Streaming bulk export. Yields L9Aggregation objects one by one. + + Args: + queries: List of Query objects. + + Yields: + L9Aggregation objects as they arrive. + """ + if queries is None or len(queries) == 0: + serialized_query = EmptyQuery().serialize() + else: + serialized_query = [q.serialize() for q in queries] + serialized_query = " ".join(serialized_query) + serialized_query = f"{serialized_query}" + + client = await self._get_client() + + async with client.stream( + "GET", "/bulk/search", params={"q": serialized_query} + ) as response: + if response.status_code != 200: + return + async for line in response.aiter_lines(): + if line: + data = json.loads(line) + yield l9format.L9Aggregation.from_dict(data) diff --git a/leakix/client.py b/leakix/client.py index 4dc3547..7eadb85 100644 --- a/leakix/client.py +++ b/leakix/client.py @@ -7,10 +7,10 @@ from leakix.domain import L9Subdomain from leakix.plugin import APIResult -from leakix.query import EmptyQuery, Query +from leakix.query import EmptyQuery, Query, RawQuery from leakix.response import ErrorResponse, RateLimitResponse, SuccessResponse -__VERSION__ = "0.1.9" +__VERSION__ = "0.2.0" class Scope(Enum): @@ -42,6 +42,7 @@ def __init__( } if api_key: self.headers["api-key"] = api_key + self._api_status: dict | None = None # Cached API status def __get(self, url, params): r = requests.get( @@ -224,3 +225,122 @@ def bulk_service(self, queries: list[Query] | None = None): else: return ErrorResponse(response=r, response_json=r.json()) return r + + def get_domain(self, domain: str): + """ + Returns the list of services and associated leaks for a given domain. + """ + url = f"{self.base_url}/domain/{domain}" + r = self.__get(url, params=None) + if r.is_success(): + response_json = r.json() + formatted_result = HostResult.from_dict(response_json) + response_json = { + "services": formatted_result.Services, + "leaks": formatted_result.Leaks, + } + r.response_json = response_json + return r + + def search(self, query: str, scope: str = "leak", page: int = 0): + """ + Simple search using a query string. + + This is a convenience method that accepts a raw query string like on the website. + For example: "+plugin:GitConfigHttpPlugin +country:FR" + + Args: + query: The search query string (same syntax as the website) + scope: Either "leak" or "service" (default: "leak") + page: Page number for pagination (default: 0) + + Returns: + A response object with the search results. + + Example: + >>> client.search("+plugin:GitConfigHttpPlugin", scope="leak") + >>> client.search("+country:FR +port:22", scope="service") + """ + queries = [RawQuery(query)] + if scope == "service": + return self.get_service(queries=queries, page=page) + return self.get_leak(queries=queries, page=page) + + def bulk_export_stream(self, queries: list[Query] | None = None): + """ + Streaming version of bulk_export. Yields L9Aggregation objects one by one. + + This is more memory efficient for large result sets as it doesn't load + all results into memory at once. + + Example: + >>> for aggregation in client.bulk_export_stream([MustQuery(PluginField(Plugin.GitConfigHttpPlugin))]): + ... print(aggregation.events[0].ip) + """ + url = f"{self.base_url}/bulk/search" + if queries is None or len(queries) == 0: + serialized_query = EmptyQuery().serialize() + else: + serialized_query = [q.serialize() for q in queries] + serialized_query = " ".join(serialized_query) + serialized_query = f"{serialized_query}" + params = {"q": serialized_query} + r = requests.get(url, params=params, headers=self.headers, stream=True) + if r.status_code != 200: + return + for line in r.iter_lines(): + json_event = json.loads(line) + yield l9format.L9Aggregation.from_dict(json_event) + + def get_api_status(self, force: bool = False) -> dict: + """ + Check API status and subscription info via /api/user/info endpoint. + + Results are cached per client instance. Use force=True to refresh. + + Args: + force: Force refresh of cached status. + + Returns: + Dict with username, email, level, is_pro, quota, features, created. + """ + if self._api_status is not None and not force: + return self._api_status + + if not self.api_key: + self._api_status = { + "authenticated": False, + "is_pro": False, + "features": [], + "quota": {"total": 0, "remaining": 0, "used": 0}, + } + return self._api_status + + url = f"{self.base_url}/api/user/info" + r = self.__get(url, params=None) + if r.is_success(): + data = r.json() + self._api_status = { + "authenticated": True, + "username": data.get("username"), + "email": data.get("email"), + "level": data.get("level"), + "is_pro": data.get("is_pro", False), + "quota": data.get("quota", {}), + "features": data.get("features", []), + "created": data.get("created"), + } + else: + self._api_status = { + "authenticated": False, + "is_pro": False, + "features": [], + "quota": {"total": 0, "remaining": 0, "used": 0}, + } + + return self._api_status + + def is_pro(self) -> bool: + """Check if the API key has Pro access. Result is cached.""" + status = self.get_api_status() + return status.get("is_pro", False) diff --git a/pyproject.toml b/pyproject.toml index 3a68660..6969c43 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,12 +1,16 @@ [tool.poetry] name = "leakix" -version = "0.1.10" +version = "0.2.0" description = "Official python client for LeakIX (https://leakix.net)" -authors = ["Danny Willems "] +authors = [ + "Danny Willems ", + "Valentin Lobstein ", +] [tool.poetry.dependencies] -python = "^3.13" +python = "^3.10" requests = "*" +httpx = "^0.28.0" l9format = "=1.3.2" fire = ">=0.5,<0.8" @@ -21,7 +25,7 @@ pip-audit = "*" [tool.ruff] line-length = 88 -target-version = "py313" +target-version = "py310" [tool.ruff.lint] select = ["E", "F", "I", "UP", "B", "SIM"] @@ -36,7 +40,7 @@ ignore = ["E501"] quote-style = "double" [tool.mypy] -python_version = "3.13" +python_version = "3.10" warn_return_any = true warn_unused_configs = true ignore_missing_imports = false From 702b65ea89a656681d600380c635caecb6bbd85c Mon Sep 17 00:00:00 2001 From: Valentin Lobstein Date: Sun, 8 Feb 2026 00:47:12 +0100 Subject: [PATCH 2/3] Fix: Import AsyncIterator from collections.abc --- leakix/async_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/leakix/async_client.py b/leakix/async_client.py index f0a298b..2e6a179 100644 --- a/leakix/async_client.py +++ b/leakix/async_client.py @@ -2,7 +2,7 @@ import asyncio import json -from typing import AsyncIterator +from collections.abc import AsyncIterator import httpx from l9format import l9format From 5f441860e2a1f10899660cf3e59781e3aa4726c9 Mon Sep 17 00:00:00 2001 From: Danny Willems Date: Mon, 9 Feb 2026 22:20:41 -0300 Subject: [PATCH 3/3] CHANGELOG: add 0.2.0 entry for async client and l9format 1.4.0 --- CHANGELOG.md | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d412cf..0950faa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,26 @@ and this project adheres to ## [Unreleased] +## [0.2.0] - 2026-02-XX + +### Added + +- Async client (`AsyncClient`) with full async/await support using + httpx ([8895e58], [#28]) +- Simple `search()` method accepting raw query strings on both + clients ([8895e58], [#28]) +- `get_domain()` method for domain lookups ([8895e58], [#28]) +- `get_api_status()` and `is_pro()` methods for account status + detection ([8895e58], [#28]) +- `bulk_export_stream()` streaming generator for memory-efficient + bulk exports ([8895e58], [#28]) + +### Changed + +- Bumped l9format from 1.3.2 to 1.4.0 ([b47ade8], [#28]) +- Lowered minimum Python requirement from 3.13 to 3.11 ([8895e58], + [#28]) + ## [0.1.10] - 2024-12-XX ### Changed @@ -38,11 +58,14 @@ and this project adheres to - Query building with MustQuery, MustNotQuery, ShouldQuery - Field filters: TimeField, PluginField, IPField, PortField, CountryField -[unreleased]: https://github.com/LeakIX/LeakIXClient-Python/compare/v0.1.10...HEAD +[unreleased]: https://github.com/LeakIX/LeakIXClient-Python/compare/v0.2.0...HEAD +[0.2.0]: https://github.com/LeakIX/LeakIXClient-Python/compare/v0.1.10...v0.2.0 [0.1.10]: https://github.com/LeakIX/LeakIXClient-Python/compare/v0.1.9...v0.1.10 [0.1.9]: https://github.com/LeakIX/LeakIXClient-Python/releases/tag/v0.1.9 +[8895e58]: https://github.com/LeakIX/LeakIXClient-Python/commit/8895e58 +[b47ade8]: https://github.com/LeakIX/LeakIXClient-Python/commit/b47ade8 [65c5121]: https://github.com/LeakIX/LeakIXClient-Python/commit/65c5121 [0975c1c]: https://github.com/LeakIX/LeakIXClient-Python/commit/0975c1c [7cb5dae]: https://github.com/LeakIX/LeakIXClient-Python/commit/7cb5dae @@ -50,3 +73,6 @@ and this project adheres to [6777ad9]: https://github.com/LeakIX/LeakIXClient-Python/commit/6777ad9 [62550bc]: https://github.com/LeakIX/LeakIXClient-Python/commit/62550bc [4dd4948]: https://github.com/LeakIX/LeakIXClient-Python/commit/4dd4948 + + +[#28]: https://github.com/LeakIX/LeakIXClient-Python/pull/28