diff --git a/privatebinapi/__init__.py b/privatebinapi/__init__.py index c31b187..8ebd051 100644 --- a/privatebinapi/__init__.py +++ b/privatebinapi/__init__.py @@ -4,14 +4,31 @@ from privatebinapi.deletion import delete, delete_async from privatebinapi.download import get, get_async -from privatebinapi.exceptions import BadCompressionTypeError, BadExpirationTimeError, BadFormatError, \ - PrivateBinAPIError, BadServerResponseError, UnsupportedFeatureError +from privatebinapi.exceptions import ( + BadAuthConfigError, + BadCompressionTypeError, + BadExpirationTimeError, + BadFormatError, + PrivateBinAPIError, + BadServerResponseError, + UnsupportedFeatureError, +) from privatebinapi.upload import send, send_async __all__ = ( - 'delete', 'delete_async', 'get', 'get_async', 'send', 'send_async', 'BadCompressionTypeError', - 'BadExpirationTimeError', 'BadFormatError', 'BadServerResponseError', 'PrivateBinAPIError', - 'UnsupportedFeatureError' + "delete", + "delete_async", + "get", + "get_async", + "send", + "send_async", + "BadAuthConfigError", + "BadCompressionTypeError", + "BadExpirationTimeError", + "BadFormatError", + "BadServerResponseError", + "PrivateBinAPIError", + "UnsupportedFeatureError", ) -__author__ = 'Pioverpie' +__author__ = "Pioverpie" diff --git a/privatebinapi/exceptions.py b/privatebinapi/exceptions.py index 1ca9835..5a13168 100644 --- a/privatebinapi/exceptions.py +++ b/privatebinapi/exceptions.py @@ -29,3 +29,7 @@ class BadServerResponseError(PrivateBinAPIError): class UnsupportedFeatureError(PrivateBinAPIError): """Indicates that a PrivateBin host does not support the operation attempted""" + + +class BadAuthConfigError(PrivateBinAPIError): + """Indicates that the authentication configuration is invalid.""" diff --git a/privatebinapi/upload.py b/privatebinapi/upload.py index 2d84408..c9a1d62 100644 --- a/privatebinapi/upload.py +++ b/privatebinapi/upload.py @@ -5,7 +5,7 @@ import functools import json from concurrent.futures import Executor -from typing import Optional, Tuple, Union +from typing import Mapping, Optional, Tuple, Union import httpx import requests @@ -13,19 +13,137 @@ from pbincli.format import Paste from privatebinapi.common import DEFAULT_HEADERS, get_loop, verify_response -from privatebinapi.exceptions import BadCompressionTypeError, BadExpirationTimeError, BadFormatError, \ - BadServerResponseError, PrivateBinAPIError +from privatebinapi.exceptions import ( + BadAuthConfigError, + BadCompressionTypeError, + BadExpirationTimeError, + BadFormatError, + BadServerResponseError, + PrivateBinAPIError, +) -__all__ = ('send', 'send_async') +__all__ = ("send", "send_async") -COMPRESSION_TYPES = ('zlib', None) -EXPIRATION_TIMES = ("5min", "10min", "1hour", "1day", "1week", "1month", "1year", "never") -FORMAT_TYPES = ('plaintext', 'syntaxhighlighting', 'markdown') +COMPRESSION_TYPES = ("zlib", None) +EXPIRATION_TIMES = ( + "5min", + "10min", + "1hour", + "1day", + "1week", + "1month", + "1year", + "never", +) +FORMAT_TYPES = ("plaintext", "syntaxhighlighting", "markdown") -def prepare_upload(server: str, *, text: str = None, file: str = None, password: str = None, expiration: str = '1day', - compression: str = 'zlib', formatting: str = 'plaintext', burn_after_reading: bool = False, - discussion: bool = False) -> Tuple[dict, str]: +def _get_auth_cfg( + auth: Optional[str] = None, + auth_user: Optional[str] = None, + auth_pass: Optional[str] = None, + auth_headers: Optional[Mapping[str, str]] = None, +) -> dict: + """ + Utility method that validates and returns the authentication data as a dict. + """ + auth_custom = None + if auth: + if auth == "basic": + if not all((auth_user, auth_pass)): + raise BadAuthConfigError( + "auth_user and auth_pass must be provided for basic authentication" + ) + elif auth == "custom": + if not auth_headers: + raise BadAuthConfigError( + "auth_headers must be provided for custom authentication" + ) + + try: + auth_custom = json.dumps(dict(auth_headers)) + except (TypeError, ValueError) as error: + raise BadAuthConfigError( + "auth_headers must be a valid JSON-able object" + ) from error + else: + raise BadAuthConfigError( + "auth must be 'basic', 'custom', or None (default)" + ) + + return { + "auth": auth, + "auth_user": auth_user, + "auth_pass": auth_pass, + "auth_custom": auth_custom, + } + + +def _get_cfg( + server: str, + *, + text: Optional[str] = None, + file: Optional[str] = None, + expiration: str = "1day", + compression: str = "zlib", + formatting: str = "plaintext", + auth: Optional[str] = None, + auth_user: Optional[str] = None, + auth_pass: Optional[str] = None, + auth_headers: Optional[Mapping[str, str]] = None, +) -> dict: + """ + :return: A configuration dictionary for the PrivateBin API. + """ + if not any((text, file)): + raise ValueError("text and file many not both be None") + if formatting not in FORMAT_TYPES: + raise BadFormatError( + "formatting %s must be in %s" % (repr(formatting), FORMAT_TYPES) + ) + if expiration not in EXPIRATION_TIMES: + raise BadExpirationTimeError( + "expiration %s must be in %s" % (repr(expiration), EXPIRATION_TIMES) + ) + if compression not in COMPRESSION_TYPES: + raise BadCompressionTypeError( + "compression %s must be in %s" % (repr(compression), COMPRESSION_TYPES) + ) + + auth_cfg = _get_auth_cfg( + auth=auth, auth_user=auth_user, auth_pass=auth_pass, auth_headers=auth_headers + ) + + return { + "server": server, + "proxy": None, + "short_api": None, + "short_url": None, + "short_user": None, + "short_pass": None, + "short_token": None, + "no_check_certificate": False, + "no_insecure_warning": False, + **auth_cfg, + } + + +def prepare_upload( + server: str, + *, + text: Optional[str] = None, + file: Optional[str] = None, + password: Optional[str] = None, + expiration: str = "1day", + compression: str = "zlib", + formatting: str = "plaintext", + burn_after_reading: bool = False, + discussion: bool = False, + auth: Optional[str] = None, + auth_user: Optional[str] = None, + auth_pass: Optional[str] = None, + auth_headers: Optional[Mapping[str, str]] = None, +) -> Tuple[dict, str]: """Creates the JSON data needed to upload a paste to a PrivateBin host. :param server: The home URL of the PrivateBin host. @@ -37,45 +155,52 @@ def prepare_upload(server: str, *, text: str = None, file: str = None, password: :param formatting: What format the paste should be declared as. :param burn_after_reading: Whether or not the paste should delete itself immediately after being read. :param discussion: Whether or not to enable discussion on the paste. + :param auth: The authentication type. Supported values are: + + * ``None`` (default): No authentication. + * ``'basic'``: Basic authentication (``auth_user`` and ``auth_pass`` required). + * ``'custom'``: Custom authentication (``auth_headers`` required). + + :param auth_user: The username for basic authentication. + :param auth_pass: The password for basic authentication. + :param auth_headers: A mapping containing the custom authentication header(s). :return: A tuple of the JSON data to POST to the PrivateBin host and the paste's hash """ - if not any((text, file)): - raise ValueError("text and file many not both be None") - if formatting not in FORMAT_TYPES: - raise BadFormatError('formatting %s must be in %s' % (repr(formatting), FORMAT_TYPES)) - if expiration not in EXPIRATION_TIMES: - raise BadExpirationTimeError('expiration %s must be in %s' % (repr(expiration), EXPIRATION_TIMES)) - if compression not in COMPRESSION_TYPES: - raise BadCompressionTypeError('compression %s must be in %s' % (repr(compression), COMPRESSION_TYPES)) + settings = _get_cfg( + server, + text=text, + file=file, + expiration=expiration, + compression=compression, + formatting=formatting, + auth=auth, + auth_user=auth_user, + auth_pass=auth_pass, + auth_headers=auth_headers, + ) - paste = Paste() - settings = { - 'server': server, - 'proxy': None, - 'short_api': None, - 'short_url': None, - 'short_user': None, - 'short_pass': None, - 'short_token': None, - 'no_check_certificate': False, - 'no_insecure_warning': False - } api_client = PrivateBin(settings) + try: version = api_client.getVersion() except json.JSONDecodeError as error: - raise BadServerResponseError("The host failed to respond with PrivateBin version information.") from error + raise BadServerResponseError( + "The host failed to respond with PrivateBin version information." + ) from error + + paste = Paste() paste.setVersion(version) if version == 2 and compression: paste.setCompression(compression) else: - paste.setCompression('none') + paste.setCompression("none") - paste.setText(text or '') + paste.setText(text or "") if password: paste.setPassword(password) if file: paste.setAttachment(file) + paste.encrypt(formatting, burn_after_reading, discussion, expiration) data = paste.getJSON() return data, paste.getHash() @@ -90,22 +215,36 @@ def process_result(response: Union[requests.Response, httpx.Response], passcode: """ data = verify_response(response) - if data['status'] == 0: + if data["status"] == 0: url = str(response.url) output = { **data, - 'full_url': url + '?' + data['id'] + '#' + passcode, - 'passcode': passcode + "full_url": url + "?" + data["id"] + "#" + passcode, + "passcode": passcode, } return output # return str(response.url) + '?' + data['id'] + '#' + passcode, data['deletetoken'] - raise PrivateBinAPIError("Error uploading paste: %s" % data['message']) + raise PrivateBinAPIError("Error uploading paste: %s" % data["message"]) -def send(server: str, *, text: str = None, file: str = None, password: str = None, expiration: str = '1day', - compression: Optional[str] = 'zlib', formatting: str = 'plaintext', burn_after_reading: bool = False, - proxies: dict = None, discussion: bool = False): +def send( + server: str, + *, + text: Optional[str] = None, + file: Optional[str] = None, + password: Optional[str] = None, + expiration: str = "1day", + compression: str = "zlib", + formatting: str = "plaintext", + burn_after_reading: bool = False, + proxies: Optional[dict] = None, + discussion: bool = False, + auth: Optional[str] = None, + auth_user: Optional[str] = None, + auth_pass: Optional[str] = None, + auth_headers: Optional[Mapping[str, str]] = None, +): """Upload a paste to a PrivateBin host. :param server: The home URL of the PrivateBin host. @@ -118,25 +257,57 @@ def send(server: str, *, text: str = None, file: str = None, password: str = Non :param burn_after_reading: Whether or not the paste should delete itself immediately after being read. :param proxies: A dict of proxies to pass to a requests.Session object. :param discussion: Whether or not to enable discussion on the paste. + :param auth: The authentication type. Supported values are: + + * ``None`` (default): No authentication. + * ``'basic'``: Basic authentication (``auth_user`` and ``auth_pass`` required). + * ``'custom'``: Custom authentication (``auth_headers`` required). + + :param auth_user: The username for basic authentication. + :param auth_pass: The password for basic authentication. + :param auth_headers: A mapping containing the custom authentication header(s). :return: The link to the paste and the delete token. """ data, passcode = prepare_upload( - server, text=text, file=file, password=password, expiration=expiration, compression=compression, - formatting=formatting, burn_after_reading=burn_after_reading, discussion=discussion + server, + text=text, + file=file, + password=password, + expiration=expiration, + compression=compression, + formatting=formatting, + burn_after_reading=burn_after_reading, + discussion=discussion, + auth=auth, + auth_user=auth_user, + auth_pass=auth_pass, + auth_headers=auth_headers, ) with requests.Session() as session: response = session.post( - server, - headers=DEFAULT_HEADERS, - proxies=proxies, - data=data + server, headers=DEFAULT_HEADERS, proxies=proxies, data=data ) return process_result(response, passcode) -async def send_async(server: str, *, text: str = None, file: str = None, password: str = None, expiration: str = '1day', - compression: str = 'zlib', formatting: str = 'plaintext', burn_after_reading: bool = False, - proxies: dict = None, discussion: bool = False, executor: Executor = None): +async def send_async( + server: str, + *, + text: Optional[str] = None, + file: Optional[str] = None, + password: Optional[str] = None, + expiration: str = "1day", + compression: str = "zlib", + formatting: str = "plaintext", + burn_after_reading: bool = False, + proxies: Optional[dict] = None, + discussion: bool = False, + auth: Optional[str] = None, + auth_user: Optional[str] = None, + auth_pass: Optional[str] = None, + auth_headers: Optional[Mapping[str, str]] = None, + executor: Optional[Executor] = None, +): """Asynchronously upload a paste to a PrivateBin host. :param server: The home URL of the PrivateBin host. @@ -149,12 +320,33 @@ async def send_async(server: str, *, text: str = None, file: str = None, passwor :param burn_after_reading: Whether or not the paste should delete itself immediately after being read. :param proxies: A dict of proxies to pass to a requests.Session object. :param discussion: Whether or not to enable discussion on the paste. + :param auth: The authentication type. Supported values are: + + * ``None`` (default): No authentication. + * ``'basic'``: Basic authentication (``auth_user`` and ``auth_pass`` required). + * ``'custom'``: Custom authentication (``auth_headers`` required). + + :param auth_user: The username for basic authentication. + :param auth_pass: The password for basic authentication. + :param auth_headers: A mapping containing the custom authentication header(s). :param executor: A concurrent.futures.Executor instance used for decryption. :return: The link to the paste and the delete token. """ func = functools.partial( - prepare_upload, server, text=text, file=file, password=password, expiration=expiration, compression=compression, - formatting=formatting, burn_after_reading=burn_after_reading, discussion=discussion + prepare_upload, + server, + text=text, + file=file, + password=password, + expiration=expiration, + compression=compression, + formatting=formatting, + burn_after_reading=burn_after_reading, + discussion=discussion, + auth=auth, + auth_user=auth_user, + auth_pass=auth_pass, + auth_headers=auth_headers, ) data, passcode = await get_loop().run_in_executor(executor, func) async with httpx.AsyncClient(proxies=proxies, headers=DEFAULT_HEADERS) as client: diff --git a/tests/test_all.py b/tests/test_all.py index 0caf804..e7ed77f 100644 --- a/tests/test_all.py +++ b/tests/test_all.py @@ -11,43 +11,47 @@ @pytest.mark.parametrize("server, file", SERVERS_AND_FILES) def test_full(server, file): send_data = privatebinapi.send( - server, text=MESSAGE, file=file, password='foobar', compression=None, + server, + text=MESSAGE, + file=file, + password="foobar", + compression=None, ) - get_data = privatebinapi.get(send_data['full_url'], password='foobar') - assert get_data['text'] == MESSAGE + get_data = privatebinapi.get(send_data["full_url"], password="foobar") + assert get_data["text"] == MESSAGE if file: - with open(file, 'rb') as file: - assert get_data['attachment']['content'] == file.read() + with open(file, "rb") as file: + assert get_data["attachment"]["content"] == file.read() try: - privatebinapi.delete(send_data['full_url'], send_data['deletetoken']) + privatebinapi.delete(send_data["full_url"], send_data["deletetoken"]) except privatebinapi.UnsupportedFeatureError: pass def test_bad_compression(): try: - privatebinapi.send('', text=MESSAGE, compression='clearly-fake-compression') + privatebinapi.send("", text=MESSAGE, compression="clearly-fake-compression") except privatebinapi.BadCompressionTypeError: pass def test_bad_expiration(): try: - privatebinapi.send('', text=MESSAGE, expiration='clearly-incorrect-expiration') + privatebinapi.send("", text=MESSAGE, expiration="clearly-incorrect-expiration") except privatebinapi.BadExpirationTimeError: pass def test_bad_formatting(): try: - privatebinapi.send('', text=MESSAGE, formatting='clearly-incorrect-format') + privatebinapi.send("", text=MESSAGE, formatting="clearly-incorrect-format") except privatebinapi.BadFormatError: pass def test_send_nothing(): try: - privatebinapi.send('') + privatebinapi.send("") except ValueError: pass @@ -56,10 +60,12 @@ def test_send_nothing(): @pytest.mark.asyncio async def test_async_full(server, _): send_data = await privatebinapi.send_async(server, text=MESSAGE) - get_data = await privatebinapi.get_async(send_data['full_url']) - assert get_data['text'] == MESSAGE + get_data = await privatebinapi.get_async(send_data["full_url"]) + assert get_data["text"] == MESSAGE try: - await privatebinapi.delete_async(send_data['full_url'], send_data['deletetoken']) + await privatebinapi.delete_async( + send_data["full_url"], send_data["deletetoken"] + ) except privatebinapi.UnsupportedFeatureError: pass await asyncio.sleep(0.5) @@ -67,20 +73,20 @@ async def test_async_full(server, _): def test_bad_server(): try: - privatebinapi.send('https://example.com', text=MESSAGE) + privatebinapi.send("https://example.com", text=MESSAGE) except privatebinapi.BadServerResponseError: pass class FakeResponse: - url = '' + url = "" def __init__(self, error=False): self.error = error def json(self): if self.error: - raise json.JSONDecodeError('', '', 0) + raise json.JSONDecodeError("", "", 0) else: return RESPONSE_DATA @@ -94,14 +100,14 @@ def test_bad_response_verification(): def test_bad_process_result(): try: - upload.process_result(FakeResponse(), '') # noqa + upload.process_result(FakeResponse(), "") # noqa except privatebinapi.PrivateBinAPIError: pass def test_bad_process_url(): try: - deletion.process_url('https://example.com') + deletion.process_url("https://example.com") except ValueError: pass @@ -115,6 +121,77 @@ def test_bad_status(): def test_bad_extract_passphrase(): try: - download.extract_passphrase('https://www.example.com') + download.extract_passphrase("https://www.example.com") except ValueError: pass + + +@pytest.mark.parametrize("server, file", SERVERS_AND_FILES) +def test_bad_auth_config(server, file): + try: + privatebinapi.send(server, text=MESSAGE, file=file, auth="invalid") + raise AssertionError("Unexpected success upon `auth='invalid'`") + except privatebinapi.BadAuthConfigError: + pass + + +@pytest.mark.parametrize("server, file", SERVERS_AND_FILES) +def test_bad_basic_auth_config(server, file): + try: + privatebinapi.send(server, text=MESSAGE, file=file, auth="basic") + raise AssertionError( + "Unexpected success upon `auth='basic'` with no credentials" + ) + except privatebinapi.BadAuthConfigError: + pass + + +@pytest.mark.parametrize("server, file", SERVERS_AND_FILES) +def test_bad_custom_auth_config(server, file): + try: + privatebinapi.send(server, text=MESSAGE, file=file, auth="custom") + raise AssertionError( + "Unexpected success upon `auth='custom'` with no authentication headers" + ) + except privatebinapi.BadAuthConfigError: + pass + + +@pytest.mark.parametrize("server, file", SERVERS_AND_FILES) +def test_custom_auth_config_with_bad_payload(server, file): + try: + privatebinapi.send( + server, + text=MESSAGE, + file=file, + auth="custom", + auth_headers="I_AM_NOT_A_DICT", + ) + raise AssertionError( + "Unexpected success upon `auth='custom'` with an invalid payload" + ) + except privatebinapi.BadAuthConfigError: + pass + + +@pytest.mark.parametrize("server, file", SERVERS_AND_FILES) +def test_good_basic_auth_config(server, file): + privatebinapi.send( + server, + text=MESSAGE, + file=file, + auth="basic", + auth_user="foo", + auth_pass="bar", + ) + + +@pytest.mark.parametrize("server, file", SERVERS_AND_FILES) +def test_good_custom_auth_config(server, file): + privatebinapi.send( + server, + text=MESSAGE, + file=file, + auth="custom", + auth_headers={"Authorization": "Bearer foobar"}, + )