Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
CLI tool for downloading subtitles from napiprojekt.pl, fork of [gabrys/napi.py](https://github.com/gabrys/napi.py)

## prerequisites
- Python 3.7 or newer
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it needed to drop support for 3.7 and 3.8?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it needed to drop support for 3.7 and 3.8?

I'm afraid so, because for py7zr version 1.0.0, the minimum required version is Python 3.9, and the maximum is Python 3.13 (planned support for 3.14).

- Python 3.10 or newer

## installation
- `pip install napi-py` for user-wide installation
Expand Down
2 changes: 1 addition & 1 deletion mypy.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[mypy]
warn_unused_configs = True

[mypy-py7zlib]
[mypy-py7zr]
ignore_missing_imports = True
27 changes: 17 additions & 10 deletions napi/api.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,38 @@
import os
from urllib import request

from urllib import request, error

def _cipher(z):
idx = [0xE, 0x3, 0x6, 0x8, 0x2]
mul = [2, 2, 5, 4, 3]
add = [0, 0xD, 0x10, 0xB, 0x5]

b = []
for i in range(len(idx)):
a = add[i]
m = mul[i]
i = idx[i]
for j in range(len(idx)):
a = add[j]
m = mul[j]
pos = idx[j]

t = a + int(z[i], 16)
t = a + int(z[pos], 16)
v = int(z[t : t + 2], 16)
b.append(("%x" % (v * m))[-1])

return "".join(b)


def _build_url(movie_hash):
return "http://napiprojekt.pl/unit_napisy/dl.php?l=PL&f={}&t={}&v=other&kolejka=false&nick=&pass=&napios={}".format(
movie_hash, _cipher(movie_hash), os.name
)


def download_for(movie_hash: str) -> bytes:
the_url = _build_url(movie_hash)
return request.urlopen(the_url).read()
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
"Accept": "*/*",
"Connection": "close"
}
req = request.Request(the_url, headers=headers)
try:
with request.urlopen(req, timeout=10) as response:
return response.read()
except (error.HTTPError, error.URLError):
return b""
53 changes: 31 additions & 22 deletions napi/encoding.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,72 @@
import locale
from typing import Optional, Tuple

import chardet

DECODING_ORDER = ["utf-16", "windows-1250", "windows-1251", "windows-1252", "windows-1253", "windows-1254", "utf-8"]
DECODING_ORDER = ["utf-8-sig", "utf-16", "windows-1250", "windows-1251", "windows-1252", "windows-1253", "windows-1254", "utf-8"]
CHECK_NUM_CHARS = 5000
AUTO_DETECT_THRESHOLD = 0.9


def _is_ascii(c: str) -> bool:
return ord(c) < 128


def _is_polish_diacritic(c: str) -> bool:
return c in "ąćęłńóśżźĄĆĘŁŃÓŚŻŹ"


def _is_correct_encoding(subs: str) -> bool:
if not subs:
return False
err_symbols, diacritics = 0, 0
for char in subs[:CHECK_NUM_CHARS]:
if _is_polish_diacritic(char):
diacritics += 1
elif not _is_ascii(char):
err_symbols += 1

return err_symbols < diacritics

return diacritics > 0 and err_symbols <= diacritics

def _detect_encoding(subs: bytes) -> Tuple[Optional[str], float]:
result = chardet.detect(subs)
return result["encoding"], result["confidence"]

try:
result = chardet.detect(subs)
return result["encoding"], result["confidence"]
except Exception:
return None, 0.0

def _try_decode(subs: bytes) -> Tuple[str, str]:
encoding, confidence = _detect_encoding(subs)
if encoding and confidence > AUTO_DETECT_THRESHOLD:
try:
return encoding, subs.decode(encoding)
except UnicodeDecodeError:
actual_enc = "utf-8-sig" if encoding.lower() == "utf-8" else encoding
return actual_enc, subs.decode(actual_enc)
except (UnicodeDecodeError, LookupError):
pass

last_exc = None
for i, enc in enumerate(DECODING_ORDER):
for enc in DECODING_ORDER:
try:
encoded_subs = subs.decode(enc)
if _is_correct_encoding(encoded_subs):
return enc, encoded_subs
except UnicodeDecodeError as e:
decoded_subs = subs.decode(enc)
if _is_correct_encoding(decoded_subs):
return enc, decoded_subs
except (UnicodeDecodeError, LookupError) as e:
last_exc = e
raise ValueError("Could not encode using any of {}: {}".format(DECODING_ORDER, last_exc))
continue

try:
return "utf-8", subs.decode("utf-8", errors="replace")
except Exception:
raise ValueError(f"Could not decode using any of {DECODING_ORDER}. Last error: {last_exc}")

def decode_subs(subtitles_binary: bytes, use_enc: Optional[str] = None) -> Tuple[str, str]:
if use_enc is not None:
return use_enc, subtitles_binary.decode(use_enc)
try:
return use_enc, subtitles_binary.decode(use_enc)
except UnicodeDecodeError:
return _try_decode(subtitles_binary)
else:
return _try_decode(subtitles_binary)


def encode_subs(subs: str) -> Tuple[str, bytes]:
target_encoding = locale.getpreferredencoding()
return target_encoding, subs.encode(target_encoding)
target_encoding = locale.getpreferredencoding(False) or "utf-8"
try:
return target_encoding, subs.encode(target_encoding)
except UnicodeEncodeError:
return "utf-8", subs.encode("utf-8")
72 changes: 66 additions & 6 deletions napi/read_7z.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,74 @@
from io import BytesIO
from typing import Optional
from py7zlib import Archive7z, ArchiveError
from typing import Optional, Dict
import threading
import py7zr

NAPI_ARCHIVE_PASSWORD = "iBlm8NTigvru0Jr0"


def un7zip_api_response(content_7z: bytes) -> Optional[bytes]:
class InMemoryIO(py7zr.io.Py7zIO):
def __init__(self, fname: str):
self.fname = fname
self._buf = bytearray()
self._length = 0
self._lock = threading.Lock()

def write(self, data: bytes) -> int:
with self._lock:
self._buf.extend(data)
self._length += len(data)
return len(data)

def read(self, size: Optional[int] = None) -> bytes:
return b""

def seek(self, offset: int, whence: int = 0) -> int:
return offset

def flush(self) -> None:
pass

def size(self) -> int:
return self._length

def getvalue(self) -> bytes:
with self._lock:
return bytes(self._buf)


class InMemoryFactory(py7zr.io.WriterFactory):
def __init__(self, target_filename: Optional[str] = None):
self.products: Dict[str, InMemoryIO] = {}
self.target_filename = target_filename

def create(self, filename: str) -> py7zr.io.Py7zIO:
if self.target_filename is not None and filename != self.target_filename:
product = InMemoryIO(filename)
else:
product = InMemoryIO(filename)
self.products[filename] = product
return product


def un7zip_api_response(content_7z: bytes, target_filename: Optional[str] = None) -> Optional[bytes]:
try:
buffer = BytesIO(content_7z)
archive = Archive7z(buffer, password=NAPI_ARCHIVE_PASSWORD)
return archive.getmember(0).read()
except ArchiveError:
with py7zr.SevenZipFile(buffer, mode="r", password=NAPI_ARCHIVE_PASSWORD) as archive:
factory = InMemoryFactory(target_filename=target_filename)
archive.extractall(factory=factory)

if target_filename:
product = factory.products.get(target_filename)
return product.getvalue() if product else None

if not factory.products:
return None
first_product = next(iter(factory.products.values()))
return first_product.getvalue()

except py7zr.exceptions.UnsupportedCompressionMethodError:
if content_7z and (b"1\r\n" in content_7z or b"00:00:" in content_7z or b"{" in content_7z):
return content_7z
return None
except (py7zr.exceptions.Bad7zFile, py7zr.exceptions.PasswordRequired):
return None
Loading