Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
options: --health-cmd="mysqladmin ping" --health-interval 10s --health-timeout 5s --health-retries 5
strategy:
matrix:
python-version: [ "3.8", "3.9", "3.10", "3.11", "3.12", "3.13" ]
python-version: [ "3.9", "3.10", "3.11", "3.12", "3.13" ]
steps:
- uses: actions/cache@v4
with:
Expand Down Expand Up @@ -48,7 +48,7 @@ jobs:
options: --health-cmd="mariadb-admin ping -uroot -p${MYSQL_ROOT_PASSWORD}" --health-interval 10s --health-timeout 5s --health-retries 5
strategy:
matrix:
python-version: [ "3.8", "3.9", "3.10", "3.11", "3.12", "3.13" ]
python-version: [ "3.9", "3.10", "3.11", "3.12", "3.13" ]
steps:
- uses: actions/cache@v4
with:
Expand Down
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@ up:
@poetry update

deps:
@poetry install
@poetry install --all-groups

_style:
@isort -src $(checkfiles)
@black $(checkfiles)
@ruff format $(checkfiles)
@ruff check --fix $(checkfiles)

style: deps _style

_check:
@black --check $(checkfiles) || (echo "Please run 'make style' to auto-fix style issues" && false)
@ruff format --check $(checkfiles) || (echo "Please run 'make style' to auto-fix style issues" && false)
@ruff check $(checkfiles)
@mypy $(checkfiles)

Expand Down
4 changes: 3 additions & 1 deletion asyncmy/contexts.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations
from collections.abc import Coroutine
from typing import Any, Iterator
from typing import Any
from collections.abc import Iterator


class _ContextManager(Coroutine):
Expand Down
37 changes: 19 additions & 18 deletions asyncmy/replication/binlogstream.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations
import struct
from typing import Any, Dict, List, Optional, Set, Type, Union
from typing import Any

from asyncmy import Connection
from asyncmy.constants.COMMAND import COM_BINLOG_DUMP, COM_BINLOG_DUMP_GTID, COM_REGISTER_SLAVE
Expand Down Expand Up @@ -37,7 +38,7 @@


class ReportSlave:
def __init__(self, value: Union[str, tuple, dict]):
def __init__(self, value: str | tuple | dict):
self._hostname = ""
self._username = ""
self._password = "" # nosec: B105
Expand Down Expand Up @@ -101,22 +102,22 @@ def __init__(
connection: Connection,
ctl_connection: Connection,
server_id: int,
slave_uuid: Optional[str] = None,
slave_heartbeat: Optional[int] = None,
report_slave: Optional[Union[str, tuple, dict]] = None,
master_log_file: Optional[str] = None,
master_log_position: Optional[int] = None,
master_auto_position: Optional[Set[Gtid]] = None,
slave_uuid: str | None = None,
slave_heartbeat: int | None = None,
report_slave: str | tuple | dict | None = None,
master_log_file: str | None = None,
master_log_position: int | None = None,
master_auto_position: set[Gtid] | None = None,
resume_stream: bool = False,
blocking: bool = False,
skip_to_timestamp: Optional[int] = None,
only_events: Optional[List[Type[BinLogEvent]]] = None,
ignored_events: Optional[List[Type[BinLogEvent]]] = None,
skip_to_timestamp: int | None = None,
only_events: list[type[BinLogEvent]] | None = None,
ignored_events: list[type[BinLogEvent]] | None = None,
filter_non_implemented_events: bool = True,
only_tables: Optional[List[str]] = None,
ignored_tables: Optional[List[str]] = None,
only_schemas: Optional[List[str]] = None,
ignored_schemas: Optional[List[str]] = None,
only_tables: list[str] | None = None,
ignored_tables: list[str] | None = None,
only_schemas: list[str] | None = None,
ignored_schemas: list[str] | None = None,
freeze_schema: bool = False,
):
self._freeze_schema = freeze_schema
Expand Down Expand Up @@ -149,12 +150,12 @@ def __init__(
RotateEvent,
*self._allowed_events,
]
self._table_map: Dict[str, Any] = {}
self._table_map: dict[str, Any] = {}

@staticmethod
def _allowed_event_list(
only_events: Optional[List[Type[BinLogEvent]]],
ignored_events: Optional[List[Type[BinLogEvent]]],
only_events: list[type[BinLogEvent]] | None,
ignored_events: list[type[BinLogEvent]] | None,
filter_non_implemented_events: bool,
):
if only_events is not None:
Expand Down
34 changes: 9 additions & 25 deletions asyncmy/replication/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ class GtidEvent(BinLogEvent):
"""GTID change in binlog event"""

def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(GtidEvent, self).__init__(
from_packet, event_size, table_map, ctl_connection, **kwargs
)
super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)

self.commit_flag = self.packet.read(1) == 1
self.sid = self.packet.read(16)
Expand Down Expand Up @@ -82,9 +80,7 @@ class RotateEvent(BinLogEvent):
"""

def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(RotateEvent, self).__init__(
from_packet, event_size, table_map, ctl_connection, **kwargs
)
super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)
self.position = struct.unpack("<Q", self.packet.read(8))[0]
self.next_binlog = self.packet.read(event_size - 8).decode()

Expand All @@ -105,15 +101,13 @@ class XidEvent(BinLogEvent):
"""

def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(XidEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)
super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)
self.xid = struct.unpack("<Q", self.packet.read(8))[0]


class HeartbeatLogEvent(BinLogEvent):
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(HeartbeatLogEvent, self).__init__(
from_packet, event_size, table_map, ctl_connection, **kwargs
)
super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)
self.ident = self.packet.read(event_size).decode()


Expand All @@ -122,9 +116,7 @@ class QueryEvent(BinLogEvent):
Only replicated queries are logged."""

def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(QueryEvent, self).__init__(
from_packet, event_size, table_map, ctl_connection, **kwargs
)
super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)

# Post-header
self.slave_proxy_id = self.packet.read_uint32()
Expand Down Expand Up @@ -153,9 +145,7 @@ class BeginLoadQueryEvent(BinLogEvent):
"""

def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(BeginLoadQueryEvent, self).__init__(
from_packet, event_size, table_map, ctl_connection, **kwargs
)
super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)

# Payload
self.file_id = self.packet.read_uint32()
Expand All @@ -179,9 +169,7 @@ class ExecuteLoadQueryEvent(BinLogEvent):
"""

def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(ExecuteLoadQueryEvent, self).__init__(
from_packet, event_size, table_map, ctl_connection, **kwargs
)
super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)

# Post-header
self.slave_proxy_id = self.packet.read_uint32()
Expand All @@ -206,9 +194,7 @@ class IntvarEvent(BinLogEvent):
"""

def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(IntvarEvent, self).__init__(
from_packet, event_size, table_map, ctl_connection, **kwargs
)
super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)

# Payload
self.type = self.packet.read_uint8()
Expand All @@ -217,7 +203,5 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)

class NotImplementedEvent(BinLogEvent):
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(NotImplementedEvent, self).__init__(
from_packet, event_size, table_map, ctl_connection, **kwargs
)
super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)
self.packet.advance(event_size)
14 changes: 7 additions & 7 deletions asyncmy/replication/gtid.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from __future__ import annotations
import binascii
import re
import struct
from io import BytesIO
from typing import Set, Union


class Gtid:
Expand Down Expand Up @@ -30,7 +30,7 @@ def parse_interval(interval):
@staticmethod
def parse(gtid: str):
m = re.search(
"^([0-9a-fA-F]{8}(?:-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12})" "((?::[0-9-]+)+)$",
"^([0-9a-fA-F]{8}(?:-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12})((?::[0-9-]+)+)$",
gtid,
)
if not m:
Expand Down Expand Up @@ -118,7 +118,7 @@ def __add__(self, other):
"""Include the transactions of this gtid. Raise if the
attempted merge has different SID"""
if self.sid != other.sid:
raise Exception("Attempt to merge different SID" "%s != %s" % (self.sid, other.sid))
raise Exception("Attempt to merge different SID%s != %s" % (self.sid, other.sid))

result = Gtid(str(self))

Expand Down Expand Up @@ -232,7 +232,7 @@ def __ge__(self, other):


class GtidSet:
def __init__(self, gtid_set: Set[Gtid]):
def __init__(self, gtid_set: set[Gtid]):
self._gtid_set = gtid_set

def merge_gtid(self, gtid: Gtid):
Expand All @@ -246,14 +246,14 @@ def merge_gtid(self, gtid: Gtid):
new_gtid_set.add(gtid)
self._gtid_set = new_gtid_set

def __contains__(self, other: Union[Gtid, "GtidSet"]):
def __contains__(self, other: Gtid | GtidSet):
if isinstance(other, GtidSet):
return all(other_gtid in self._gtid_set for other_gtid in other._gtid_set)
if isinstance(other, Gtid):
return any(other in x for x in self._gtid_set)
raise NotImplementedError

def __add__(self, other: Union[Gtid, "GtidSet"]):
def __add__(self, other: Gtid | GtidSet):
if isinstance(other, Gtid):
new = GtidSet(self._gtid_set)
new.merge_gtid(other)
Expand Down Expand Up @@ -289,5 +289,5 @@ def decode(cls, payload: BytesIO):
(n_sid,) = struct.unpack("<Q", payload.read(8))
return cls(set(Gtid.decode(payload) for _ in range(0, n_sid)))

def __eq__(self, other: "GtidSet"): # type: ignore[override]
def __eq__(self, other: GtidSet): # type: ignore[override]
return self._gtid_set == other._gtid_set
22 changes: 6 additions & 16 deletions asyncmy/replication/row_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@

class RowsEvent(BinLogEvent):
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(RowsEvent, self).__init__(
from_packet, event_size, table_map, ctl_connection, **kwargs
)
super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)
self._rows = None
self._only_tables = kwargs["only_tables"]
self._ignored_tables = kwargs["ignored_tables"]
Expand Down Expand Up @@ -462,9 +460,7 @@ class DeleteRowsEvent(RowsEvent):
"""

def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(DeleteRowsEvent, self).__init__(
from_packet, event_size, table_map, ctl_connection, **kwargs
)
super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)
if self._processed:
self.columns_present_bitmap = self.packet.read((self.number_of_columns + 7) / 8)

Expand All @@ -479,9 +475,7 @@ class WriteRowsEvent(RowsEvent):
"""

def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(WriteRowsEvent, self).__init__(
from_packet, event_size, table_map, ctl_connection, **kwargs
)
super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)
if self._processed:
self.columns_present_bitmap = self.packet.read((self.number_of_columns + 7) / 8)

Expand All @@ -501,9 +495,7 @@ class UpdateRowsEvent(RowsEvent):
"""

def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(UpdateRowsEvent, self).__init__(
from_packet, event_size, table_map, ctl_connection, **kwargs
)
super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)
if self._processed:
# Body
self.columns_present_bitmap = self.packet.read((self.number_of_columns + 7) / 8)
Expand All @@ -525,9 +517,7 @@ class TableMapEvent(BinLogEvent):
"""

def __init__(self, from_packet, event_size, table_map, connection, **kwargs):
super(TableMapEvent, self).__init__(
from_packet, event_size, table_map, connection, **kwargs
)
super().__init__(from_packet, event_size, table_map, connection, **kwargs)
self._only_tables = kwargs["only_tables"]
self._ignored_tables = kwargs["ignored_tables"]
self._only_schemas = kwargs["only_schemas"]
Expand Down Expand Up @@ -606,7 +596,7 @@ async def init(self):
# to pymysqlreplication start, but replayed from binlog
# from blowing up the service.
column_schema = {
"COLUMN_NAME": "__dropped_col_{i}__".format(i=i),
"COLUMN_NAME": f"__dropped_col_{i}__",
"COLLATION_NAME": None,
"CHARACTER_SET_NAME": None,
"COLUMN_COMMENT": None,
Expand Down
Loading