Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 32 additions & 33 deletions pywebpush/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import time
import logging
from copy import deepcopy
from typing import cast, Union, Dict
from typing import cast, Union

try:
from urlparse import urlparse
Expand Down Expand Up @@ -128,14 +128,13 @@ class WebPusher:
]
verbose = False

# Note: the type declarations are not valid under python 3.8,
def __init__(
self,
subscription_info: Dict[
str, Union[Union[str, bytes], Dict[str, Union[str, bytes]]]
subscription_info: dict[
str, str | bytes | dict[str, str | bytes]
],
requests_session: Union[None, requests.Session] = None,
aiohttp_session: Union[None, aiohttp.client.ClientSession] = None,
requests_session: None | requests.Session = None,
aiohttp_session: None | aiohttp.client.ClientSession = None,
verbose: bool = False,
) -> None:
"""Initialize using the info provided by the client PushSubscription
Expand Down Expand Up @@ -168,8 +167,8 @@ def __init__(
self.subscription_info = deepcopy(subscription_info)
self.auth_key = self.receiver_key = None
if "keys" in subscription_info:
keys: Dict[str, Union[str, bytes]] = cast(
Dict[str, Union[str, bytes]], self.subscription_info["keys"]
keys: dict[str, str | bytes] = cast(
dict[str, str | bytes], self.subscription_info["keys"]
)
for k in ["p256dh", "auth"]:
if keys.get(k) is None:
Expand Down Expand Up @@ -266,7 +265,7 @@ def encode(
reply["salt"] = base64.urlsafe_b64encode(salt).strip(b"=")
return reply

def as_curl(self, endpoint: str, encoded_data: bytes, headers: Dict[str, str]) -> str:
def as_curl(self, endpoint: str, encoded_data: bytes, headers: dict[str, str]) -> str:
"""Return the send as a curl command.

Useful for debugging. This will write out the encoded data to a local
Expand Down Expand Up @@ -300,8 +299,8 @@ def as_curl(self, endpoint: str, encoded_data: bytes, headers: Dict[str, str]) -

def _prepare_send_data(
self,
data: Union[None, bytes] = None,
headers: Union[None, Dict[str, str]] = None,
data: None | bytes = None,
headers: None | dict[str, str] = None,
ttl: int = 0,
content_encoding: str = "aes128gcm",
) -> dict:
Expand Down Expand Up @@ -361,7 +360,7 @@ def _prepare_send_data(

return {"endpoint": endpoint, "data": encoded_data, "headers": headers}

def send(self, *args, **kwargs) -> Union[Response, str]:
def send(self, *args, **kwargs) -> Response | str:
"""Encode and send the data to the Push Service"""
timeout = kwargs.pop("timeout", 10000)
curl = kwargs.pop("curl", False)
Expand All @@ -387,7 +386,7 @@ def send(self, *args, **kwargs) -> Union[Response, str]:
)
return resp

async def send_async(self, *args, **kwargs) -> Union[aiohttp.ClientResponse, str]:
async def send_async(self, *args, **kwargs) -> aiohttp.ClientResponse | str:
timeout = kwargs.pop("timeout", 10000)
curl = kwargs.pop("curl", False)

Expand All @@ -414,20 +413,20 @@ async def send_async(self, *args, **kwargs) -> Union[aiohttp.ClientResponse, str


def webpush(
subscription_info: Dict[
str, Union[Union[str, bytes], Dict[str, Union[str, bytes]]]
subscription_info: dict[
str, str | bytes | dict[str, str | bytes]
],
data: Union[None, str] = None,
vapid_private_key: Union[None, Vapid, str] = None,
vapid_claims: Union[None, Dict[str, Union[str, int]]] = None,
data: None | str = None,
vapid_private_key: None | Vapid | str = None,
vapid_claims: None | dict[str, str | int] = None,
content_encoding: str = "aes128gcm",
curl: bool = False,
timeout: Union[None, float] = None,
timeout: None | float = None,
ttl: int = 0,
verbose: bool = False,
headers: Union[None, Dict[str, Union[str, int, float]]] = None,
requests_session: Union[None, requests.Session] = None,
) -> Union[str, requests.Response]:
headers: None | dict[str, str | int | float] = None,
requests_session: None | requests.Session = None,
) -> str | requests.Response:
"""
One call solution to endcode and send `data` to the endpoint
contained in `subscription_info` using optional VAPID auth headers.
Expand Down Expand Up @@ -544,23 +543,23 @@ def webpush(


async def webpush_async(
subscription_info: Dict[
str, Union[Union[str, bytes], Dict[str, Union[str, bytes]]]
subscription_info: dict[
str, str | bytes | dict[str, str | bytes]
],
data: Union[None, str] = None,
vapid_private_key: Union[None, Vapid, str] = None,
vapid_claims: Union[None, Dict[str, Union[str, int]]] = None,
data: None | str = None,
vapid_private_key: None | Vapid | str = None,
vapid_claims: None | dict[str, str | int] = None,
content_encoding: str = "aes128gcm",
curl: bool = False,
timeout: Union[None, float] = None,
timeout: None | float = None,
ttl: int = 0,
verbose: bool = False,
headers: Union[None, Dict[str, Union[str, int, float]]] = None,
aiohttp_session: Union[None, aiohttp.ClientSession] = None,
) -> Union[str, aiohttp.ClientResponse]:
headers: None | dict[str, str | int | float] = None,
aiohttp_session: None | aiohttp.ClientSession = None,
) -> str | aiohttp.ClientResponse:
"""
Async version of webpush function. One call solution to encode and send
`data` to the endpoint contained in `subscription_info` using optional
Async version of webpush function. One call solution to encode and send
`data` to the endpoint contained in `subscription_info` using optional
VAPID auth headers.

Example:
Expand Down
6 changes: 3 additions & 3 deletions pywebpush/tests/test_webpush.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
import unittest
import time
from typing import cast, Union, Dict
from typing import cast
from unittest.mock import patch, Mock, AsyncMock

import http_ece
Expand Down Expand Up @@ -192,7 +192,7 @@ def test_webpush_vapid_instance(self, vapid_sign, pusher_send):
subscription_info = self._gen_subscription_info()
data = "Mary had a little lamb"
vapid_key = py_vapid.Vapid.from_string(self.vapid_key)
claims: Dict[str, Union[str, int]] = dict(
claims: dict[str, str | int] = dict(
sub="mailto:ops@example.com", aud="https://example.com"
)
webpush(
Expand Down Expand Up @@ -456,7 +456,7 @@ async def test_webpush_async_vapid_instance(self, vapid_sign, pusher_send):
subscription_info = self._gen_subscription_info()
data = "Mary had a little lamb"
vapid_key = py_vapid.Vapid.from_string(self.vapid_key)
claims: Dict[str, Union[str, int]] = dict(
claims: dict[str, str | int] = dict(
sub="mailto:ops@example.com", aud="https://example.com"
)
await webpush_async(
Expand Down