Skip to content
This repository was archived by the owner on Dec 29, 2025. It is now read-only.
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 55 additions & 43 deletions pkgs/autoaspm.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
#!/usr/bin/env python3

# Original bash script by Luis R. Rodriguez
# Re-written in Python by z8
# Re-re-written to patch supported devices automatically by notthebee

import re
import subprocess
import os
import platform
import argparse
from enum import Enum


class ASPM(Enum):
DISABLED = 0b00
L0s = 0b01
Expand All @@ -22,40 +20,30 @@ def run_prerequisites():
raise OSError("This script only runs on Linux-based systems")
if not os.environ.get("SUDO_UID") and os.geteuid() != 0:
raise PermissionError("This script needs root privileges to run")
lspci_detected = subprocess.run(["which", "lspci"], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL)
if lspci_detected.returncode > 0:
raise Exception("lspci not detected. Please install pciutils")
lspci_detected = subprocess.run(["which", "setpci"], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL)
if lspci_detected.returncode > 0:
raise Exception("setpci not detected. Please install pciutils")
for cmd in ["lspci", "setpci"]:
if subprocess.run(["which", cmd], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode > 0:
raise Exception(f"{cmd} not detected. Please install pciutils")


def get_device_name(addr):
p = subprocess.Popen([
"lspci",
"-s",
addr,
], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return p.communicate()[0].splitlines()[0].decode()
"""Return the lspci description for a given PCI address"""
p = subprocess.Popen(["lspci", "-s", addr], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return p.communicate()[0].splitlines()[0].decode().strip()


def read_all_bytes(device):
all_bytes = bytearray()
device_name = get_device_name(device)
p = subprocess.Popen([
"lspci",
"-s",
device,
"-xxx"
], stdout= subprocess.PIPE, stderr=subprocess.PIPE)
ret = p.communicate()
ret = ret[0].decode()
p = subprocess.Popen(["lspci", "-s", device, "-xxx"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
ret = p.communicate()[0].decode()
for line in ret.splitlines():
if not device_name in line and ": " in line:
all_bytes.extend(bytearray.fromhex(line.split(": ")[1]))
if len(all_bytes) < 256:
exit()
return all_bytes


def find_byte_to_patch(bytes, pos):
pos = bytes[pos]
if bytes[pos] != 0x10:
Expand All @@ -65,34 +53,43 @@ def find_byte_to_patch(bytes, pos):
pos += 0x10
return pos

def patch_byte(device, position, value):
subprocess.Popen([
"setpci",
"-s",
device,
f"{hex(position)}.B={hex(value)}"
]).communicate()

def patch_device(addr, aspm_value):
def patch_byte(device, position, value, dry_run=False):
"""Write a byte via setpci unless dry_run is True"""
if dry_run:
print(f"[DRY-RUN] Would run: setpci -s {device} {hex(position)}.B={hex(value)}")
else:
subprocess.Popen(["setpci", "-s", device, f"{hex(position)}.B={hex(value)}"]).communicate()


def patch_device(addr, aspm_value, verbose=False, dry_run=False):
endpoint_bytes = read_all_bytes(addr)
byte_position_to_patch = find_byte_to_patch(endpoint_bytes, 0x34)
if int(endpoint_bytes[byte_position_to_patch]) & 0b11 != aspm_value.value:
device_name = get_device_name(addr)

current_value = int(endpoint_bytes[byte_position_to_patch]) & 0b11
target_value = aspm_value.value

if current_value != target_value:
patched_byte = int(endpoint_bytes[byte_position_to_patch])
patched_byte = patched_byte >> 2
patched_byte = patched_byte << 2
patched_byte = patched_byte | aspm_value.value
patched_byte = (patched_byte >> 2) << 2 | target_value
patch_byte(addr, byte_position_to_patch, patched_byte, dry_run=dry_run)
msg = f"{addr}: Enabled ASPM {aspm_value.name}"
else:
msg = f"{addr}: Already has ASPM {aspm_value.name} enabled"

patch_byte(addr, byte_position_to_patch, patched_byte)
print(f"{addr}: Enabled ASPM {aspm_value.name}")
if verbose:
print(f"{addr} ({device_name}): {msg.split(': ')[1]}")
else:
print(f"{addr}: Already has ASPM {aspm_value.name} enabled")
print(msg)


def list_supported_devices():
"""Return a dict {addr: (aspm_mode, description)}"""
pcie_addr_regex = r"([0-9a-f]{2}:[0-9a-f]{2}\.[0-9a-f])"
lspci = subprocess.run("lspci -vv", shell=True, capture_output=True).stdout
lspci_arr = re.split(pcie_addr_regex, str(lspci))[1:]
lspci_arr = [ x+y for x,y in zip(lspci_arr[0::2], lspci_arr[1::2]) ]
lspci_arr = [x + y for x, y in zip(lspci_arr[0::2], lspci_arr[1::2])]

aspm_devices = {}
for dev in lspci_arr:
Expand All @@ -101,14 +98,29 @@ def list_supported_devices():
continue
aspm_support = re.findall(r"ASPM (L[L0-1s ]*),", dev)
if aspm_support:
aspm_devices.update({device_addr: ASPM[aspm_support[0].replace(" ", "")]})
desc = get_device_name(device_addr)
aspm_devices.update({device_addr: (ASPM[aspm_support[0].replace(" ", "")], desc)})
return aspm_devices


def main():
parser = argparse.ArgumentParser(description="PCIe ASPM patching utility")
parser.add_argument("--verbose", "-v", action="store_true", help="Show detailed device info")
parser.add_argument("--dry-run", "-n", action="store_true", help="Do not make any changes, only show actions")
args = parser.parse_args()

run_prerequisites()
for device, aspm_mode in list_supported_devices().items():
patch_device(device, aspm_mode)
devices = list_supported_devices()

if args.verbose:
print("Detected ASPM-capable devices:\n")
for device, (aspm_mode, desc) in devices.items():
print(f"{device} - {desc} (ASPM: {aspm_mode.name})")
print("\n--- Starting ASPM patching ---")

for device, (aspm_mode, _) in devices.items():
patch_device(device, aspm_mode, verbose=args.verbose, dry_run=args.dry_run)


if __name__ == "__main__":
main()