-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathremote_exec.py
More file actions
174 lines (154 loc) · 6.19 KB
/
remote_exec.py
File metadata and controls
174 lines (154 loc) · 6.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# Time: 2024/8/1 13:18
# Author: Luohaosen
# File: remote_exec.py
import logging
import os
import paramiko
import stat
logging.basicConfig(format="%(asctime)s - %(message)s", level=logging.INFO)
class RemoteControl(object):
"""
适用于 Windows 的远程控制器
"""
def __init__(self, username, password, host='192.168.2.201', port_id: int = 22, timeout: int = 30, **kwargs):
"""
:param local_path: 本地路径
:param server_path: 远程路径
:param port_id: 端口号
:param host: 主机地址
:param username: 用户名
:param password: 密码
:param timeout: 超时连接时间阈值
"""
self.port_id = port_id
self.host = host
self.username = username
self.passwd = password
self.timeout = timeout
self.kwargs = kwargs
# ssh窗口
self.client = paramiko.SSHClient()
# 获取公钥
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def run(self, mode, local_path=None, server_path=None, cmd=None):
"""
统一调用方法
:param cmd: 命令行
:param server_path: 服务器地址
:param local_path: 本地地址
:param mode: 选择模式,支持 upload(上传)/download(下载)/execute(执行)
:return:
"""
# 根据选择的模式完成交互
if mode == 'upload':
self.sftp_upload(local_path=local_path, server_path=server_path)
elif mode == 'download':
self.sftp_download(server_path=server_path, local_path=local_path)
elif mode == 'execute':
self.ssh_exec_command(cmd)
else:
raise Exception(f"mode={mode} not ['upload', 'download', 'execute']")
def sftp_upload(self, local_path, server_path):
"""
从本地 上传文件 至 远程服务器 指定位置
:return:
"""
try:
# 连接远程服务器
self.client.connect(hostname=self.host, username=self.username, password=self.passwd, timeout=self.timeout)
# 创建SFTP服务器
sftp = self.client.open_sftp()
# put上传
if os.path.isfile(local_path):
sftp.put(localpath=local_path, remotepath=server_path)
else:
# 创建远程文件夹
try:
sftp.stat(server_path)
except FileNotFoundError:
sftp.mkdir(server_path)
# 文件名称列表
list_local_file = os.listdir(local_path)
for f in list_local_file:
try:
sftp.put(localpath=os.path.join(local_path, f), remotepath=f"{server_path}/{os.path.basename(f)}")
except Exception:
print(f"f={f} dont put")
except Exception as e:
logging.info(f"上传失败, 具体请检查:{e}")
else:
sftp.close()
return "上传成功"
def sftp_download(self, server_path, local_path):
"""
从 远程服务器 下载文件 至 本地 指定位置
:return:
"""
try:
# 连接远程服务器
self.client.connect(hostname=self.host, username=self.username, password=self.passwd, timeout=self.timeout)
# 创建SFTP服务器
sftp = self.client.open_sftp()
# 获取远程文件属性
server_file_attr = sftp.stat(server_path)
server_file_attr_mode = stat.S_ISREG(server_file_attr.st_mode)
logging.info(f"下载文件属性={server_file_attr},{server_file_attr_mode}")
# get下载
if server_file_attr_mode:
# if os.path.isfile(local_path):
# sftp.get(remotepath=server_path, localpath=local_path)
# else:
# os.makedirs(os.path.dirname(local_path), exist_ok=True)
# sftp.get(remotepath=server_path, localpath=local_path)
if not os.path.isfile(local_path):
os.makedirs(os.path.dirname(local_path), exist_ok=True)
sftp.get(remotepath=server_path, localpath=local_path)
else:
list_remote_file = sftp.listdir(server_path)
for f in list_remote_file:
# 创建本地目录
os.makedirs(local_path, exist_ok=True)
remote_file = os.path.join(server_path, f).replace('\\', '/')
try:
sftp.get(remotepath=remote_file, localpath=os.path.join(local_path, os.path.basename(f)))
except Exception:
print(f"f={f} dont get")
except Exception as e:
logging.info(f"下载失败, 具体请检查{e}")
else:
sftp.close()
return "下载成功"
def ssh_exec_command(self, cmd):
"""
提交 命令行 至 远程执行
:return:
"""
try:
# 连接远程服务器
self.client.connect(hostname=self.host, username=self.username, password=self.passwd, timeout=self.timeout)
# 执行命令
stdin, stdout, stderr = self.client.exec_command(command=cmd)
# 等待命令执行完成
exit_status = stdout.channel.recv_exit_status()
# 结果
out_status = stdout.read().decode('utf-8')
err_status = stderr.read().decode('utf-8')
# 打印输出结果
logging.info(f"命令行执行,产生输出:{out_status}")
logging.info(f"返回状态码:{exit_status}")
# 打印错误信息
if err_status != '':
logging.info(f"{self.host}错误信息:{err_status}")
except Exception as e:
return f"执行失败, 具体请检查{e}"
else:
if exit_status == 0:
return "执行成功"
else:
return "执行失败"
def close(self):
"""
关闭客户端
:return:
"""
self.client.close()