Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/submit/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "submit"
version = "0.2.0"
version = "0.3.0"
description = "Extract code block which should be submitted to regulatory agency."
readme = "README.md"
requires-python = ">=3.10"
Expand Down
69 changes: 51 additions & 18 deletions python/submit/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
COMMENT_NOT_SUBMIT_NEGIN: str = rf"\/\*{SYLBOMS}*NOT\s*SUBMIT\s*BEGIN{SYLBOMS}*\*\/"
COMMENT_NOT_SUBMIT_END: str = rf"\/\*{SYLBOMS}*NOT\s*SUBMIT\s*END{SYLBOMS}*\*\/"

# 宏变量
# 仅支持一层宏变量引用(例如:&id),不支持嵌套宏变量引用(例如:&&id、&&&id)
MACRO_VAR = r"(?<!&)(&[A-Za-z_][A-Za-z0_9_]*)"


class ConvertMode(IntFlag):
"""转换模式。"""
Expand Down Expand Up @@ -53,14 +57,19 @@ def get_available_values(cls) -> list[ConvertMode]:


def copy_file(
sas_file: str, txt_file: str, convert_mode: ConvertMode = ConvertMode.BOTH, encoding: str | None = None
sas_file: str,
txt_file: str,
convert_mode: ConvertMode = ConvertMode.BOTH,
macro_subs: dict[str, str] | None = None,
encoding: str | None = None,
) -> None:
"""将 SAS 代码复制到 txt 文件中,并移除指定标记之间的内容。

Args:
sas_file (str): SAS 文件路径。
txt_file (str): TXT 文件路径。
convert_mode (ConvertMode, optional): 转换模式,默认值为 ConvertMode.BOTH。
macro_subs (dict[str, str] | None, optional): 一个字典,其键为 SAS 代码中的宏变量名称,值为替代的字符串,默认值为 None。
encoding (str | None, optional): 字符编码,默认值为 None,将自动检测编码。
"""

Expand All @@ -69,35 +78,36 @@ def copy_file(
encoding = detect(f.read())["encoding"]

with open(sas_file, "r", encoding=encoding) as f:
sas_code = f.read()
code = f.read()

# 提取代码片段
if convert_mode & ConvertMode.NEGATIVE:
# 移除不需要递交的代码片段
sas_code = re.sub(
rf"{COMMENT_NOT_SUBMIT_NEGIN}.*?{COMMENT_NOT_SUBMIT_END}",
"",
sas_code,
flags=re.I | re.S,
)
code = re.sub(rf"{COMMENT_NOT_SUBMIT_NEGIN}.*?{COMMENT_NOT_SUBMIT_END}", "", code, flags=re.I | re.S)

if convert_mode & ConvertMode.POSITIVE:
# 提取需要递交的代码片段
sas_code = re.findall(rf"{COMMENT_SUBMIT_BEGIN}(.*?){COMMENT_SUBMIT_END}", sas_code, re.I | re.S)
sas_code = "".join(sas_code)

txt_code = sas_code

txt_code_dir = os.path.dirname(txt_file)
if not os.path.exists(txt_code_dir):
os.makedirs(txt_code_dir)
code = re.findall(rf"{COMMENT_SUBMIT_BEGIN}(.*?){COMMENT_SUBMIT_END}", code, re.I | re.S)
code = "".join(code)

# 替换宏变量
if macro_subs is not None:
for key, value in macro_subs.items():
regex_macro = re.compile(rf"(?<!&)&{key}")
code = re.sub(regex_macro, value, code)

txt_file_dir = os.path.dirname(txt_file)
if not os.path.exists(txt_file_dir):
os.makedirs(txt_file_dir)
with open(txt_file, "w", encoding=encoding) as f:
f.write(txt_code)
f.write(code)


def copy_directory(
sas_dir: str,
txt_dir: str,
convert_mode: ConvertMode = ConvertMode.BOTH,
macro_subs: dict[str, str] | None = None,
exclude_files: list[str] = None,
exclude_dirs: list[str] = None,
encoding: str | None = None,
Expand All @@ -108,6 +118,7 @@ def copy_directory(
sas_dir (str): SAS 文件夹路径。
txt_dir (str): TXT 文件夹路径。
convert_mode (ConvertMode, optional): 转换模式,默认值为 ConvertMode.BOTH。
macro_subs (dict[str, str] | None, optional): 一个字典,其键为 SAS 代码中的宏变量名称,值为替代的字符串,默认值为 None。
exclude_files (list[str], optional): 排除文件列表,默认值为 None。
exclude_dirs (list[str], optional): 排除目录列表,默认值为 None。
encoding (str | None, optional): 字符编码,默认值为 None,将自动检测编码。
Expand All @@ -129,7 +140,24 @@ def copy_directory(
if file.endswith(".sas"):
sas_file = os.path.join(dirpath, file)
txt_file = os.path.join(txt_dir, ref_path, file.replace(".sas", ".txt"))
copy_file(sas_file, txt_file, convert_mode=convert_mode, encoding=encoding)
copy_file(sas_file, txt_file, convert_mode=convert_mode, macro_subs=macro_subs, encoding=encoding)


def parse_dict(arg: str) -> dict[str, str]:
"""解析字典字符串。

Args:
arg (str): 字典字符串。

Returns:
dict[str, str]: 字典。
"""

arg = arg.strip("{}")
try:
return dict([ele.strip("\"'") for ele in item.split("=")] for item in arg.split(","))
except ValueError:
raise argparse.ArgumentTypeError("无效的字典字符串")


def main() -> None:
Expand All @@ -155,6 +183,9 @@ def main() -> None:
default="both",
help="转换模式(默认 both)",
)
parent_parser.add_argument(
"--macro-subs", type=parse_dict, help="宏变量替换,格式为 {key1=value1,key2=value2}(默认无)"
)
parent_parser.add_argument("--encoding", default=None, help="编码格式(默认自动检测)")

# 子命令 copyfile
Expand All @@ -176,13 +207,15 @@ def main() -> None:
sas_file=args.sas_file,
txt_file=args.txt_file,
convert_mode=args.convert_mode,
macro_subs=args.macro_subs,
encoding=args.encoding,
)
elif args.command == "copydir":
copy_directory(
sas_dir=args.sas_dir,
txt_dir=args.txt_dir,
convert_mode=args.convert_mode,
macro_subs=args.macro_subs,
exclude_files=args.exclude_files,
exclude_dirs=args.exclude_dirs,
encoding=args.encoding,
Expand Down
6 changes: 4 additions & 2 deletions python/submit/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ def shared_test_directory(tmp_path_factory: pytest.TempPathFactory) -> Path:
proc datasets library = work memtype = data kill noprint;
quit;

%let id = %str();

/*====SUBMIT BEGIN====*/
proc sql;
create table t2 as select * from adam.adae;
create table t2 as select * from adam.adeff&id;
quit;
/*====SUBMIT END====*/

Expand Down Expand Up @@ -175,7 +177,7 @@ def shared_validate_directory(tmp_path_factory: pytest.TempPathFactory) -> Path:
""")
(dir_tfl / "t2.txt").write_text("""
proc sql;
create table t2 as select * from adam.adae;
create table t2 as select * from adam.adeff;
quit;
""")
(dir_tfl / "t3.txt").write_text("""
Expand Down
4 changes: 3 additions & 1 deletion python/submit/tests/test_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ def test_copy_file(self, shared_test_directory: Path, shared_validate_directory:
assert re.sub(r"\s*", "", tmp_code) == re.sub(r"\s*", "", validate_code)

def test_copy_directory(self, shared_test_directory: Path, shared_validate_directory: Path, tmp_path: Path):
copy_directory(shared_test_directory, tmp_path, exclude_dirs=["other"], exclude_files=["fcmp.sas"])
copy_directory(
shared_test_directory, tmp_path, exclude_dirs=["other"], exclude_files=["fcmp.sas"], macro_subs={"id": ""}
)
copy_directory(shared_test_directory / "macro", tmp_path / "macro", convert_mode=ConvertMode.NEGATIVE)

for validate_file in shared_validate_directory.rglob("*.txt"):
Expand Down
2 changes: 1 addition & 1 deletion python/submit/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading