From f96c8241b8766c1f5e7dbf8661157b2ed63cc3d3 Mon Sep 17 00:00:00 2001 From: lostsnow Date: Thu, 2 Jun 2022 10:31:31 +0800 Subject: [PATCH 1/4] add shm support --- dongtai_agent_python/api/openapi.py | 2 + dongtai_agent_python/setting/setting.py | 55 ++++++- .../tests/setting/test_setting.py | 30 +++- dongtai_agent_python/utils/lock.py | 14 ++ dongtai_agent_python/utils/shm/__init__.py | 1 + dongtai_agent_python/utils/shm/shm.py | 144 ++++++++++++++++++ setup.cfg | 1 + 7 files changed, 240 insertions(+), 7 deletions(-) create mode 100644 dongtai_agent_python/utils/lock.py create mode 100644 dongtai_agent_python/utils/shm/__init__.py create mode 100644 dongtai_agent_python/utils/shm/shm.py diff --git a/dongtai_agent_python/api/openapi.py b/dongtai_agent_python/api/openapi.py index 656e69c..a5f1dcb 100644 --- a/dongtai_agent_python/api/openapi.py +++ b/dongtai_agent_python/api/openapi.py @@ -187,6 +187,8 @@ def agent_register(self): logger.error("register get agent id empty") return resp + self.setting.set_shm('agent-' + str(self.agent_id)) + if resp.get('data', {}).get('coreAutoStart', 0) != 1: logger.info("agent is waiting for auditing") self.setting.dt_manual_pause = True diff --git a/dongtai_agent_python/setting/setting.py b/dongtai_agent_python/setting/setting.py index 754bf28..cfd8068 100644 --- a/dongtai_agent_python/setting/setting.py +++ b/dongtai_agent_python/setting/setting.py @@ -1,8 +1,13 @@ import os +from multiprocessing import Lock from dongtai_agent_python import version from .config import Config from dongtai_agent_python.utils import Singleton +from dongtai_agent_python.utils.shm import SharedMemoryDict +from dongtai_agent_python.utils.lock import lock + +_lock = Lock() class Setting(Singleton): @@ -13,10 +18,8 @@ def init(self): return self.version = version.__version__ - self.paused = False - self.manual_paused = False self.agent_id = 0 - self.request_seq = 0 + self.shm = None self.auto_create_project = 0 self.use_local_policy = False @@ -38,6 +41,11 @@ def init(self): self.init_os_environ() Setting.loaded = True + def __del__(self): + if self.shm is None: + return False + self.shm.close() + def set_container(self, container): if container and isinstance(container, dict): self.container = container @@ -76,8 +84,45 @@ def init_os_environ(self): for key in os_env.keys(): self.os_env_list.append(key + '=' + str(os_env[key])) + def set_shm(self, name): + if self.shm is None: + self.shm = SharedMemoryDict(name) + + @property + def paused(self): + if self.shm is None: + return False + return self.shm.get('paused') + + @paused.setter + def paused(self, status): + if self.shm is None: + return + self.shm['paused'] = status + + @property + def manual_paused(self): + if self.shm is None: + return False + return self.shm.get('manual_paused') + + @manual_paused.setter + def manual_paused(self, status): + if self.shm is None: + return + self.shm['manual_paused'] = status + def is_agent_paused(self): - return self.paused and self.manual_paused + return self.paused or self.manual_paused + @property + def request_seq(self): + if self.shm is None: + return 0 + return self.shm.get('request_seq', 0) + + @lock(_lock) def incr_request_seq(self): - self.request_seq = self.request_seq + 1 + if self.shm is None: + return + self.shm['request_seq'] = self.request_seq + 1 diff --git a/dongtai_agent_python/tests/setting/test_setting.py b/dongtai_agent_python/tests/setting/test_setting.py index 7461eb8..d9d8d02 100644 --- a/dongtai_agent_python/tests/setting/test_setting.py +++ b/dongtai_agent_python/tests/setting/test_setting.py @@ -1,4 +1,7 @@ +import multiprocessing +import os import threading +import time import unittest from dongtai_agent_python.setting.setting import Setting @@ -6,19 +9,42 @@ class TestSetting(unittest.TestCase): def test_multithreading(self): - def test(name): + def test_mt(name): st1 = Setting() + st1.set_shm("test-setting-001") st1.set_container({'name': name, 'version': '0.1'}) st1.incr_request_seq() thread_num = 5 for i in range(thread_num): - t = threading.Thread(target=test, args=['test' + str(i)]) + t = threading.Thread(target=test_mt, args=['test' + str(i)]) t.start() st = Setting() + st.set_shm("test-setting-001") + time.sleep(1) self.assertEqual(thread_num, st.request_seq) + def test_multiprocessing(self): + if os.name == "nt": + return + + def test_mp(name): + st1 = Setting() + st1.set_shm("test-setting-002") + st1.set_container({'name': name, 'version': '0.1'}) + st1.incr_request_seq() + + process_num = 5 + for i in range(process_num): + p = multiprocessing.Process(target=test_mp, args=('test' + str(i),)) + p.start() + + st = Setting() + st.set_shm("test-setting-002") + time.sleep(1) + self.assertEqual(process_num, st.request_seq) + if __name__ == '__main__': unittest.main() diff --git a/dongtai_agent_python/utils/lock.py b/dongtai_agent_python/utils/lock.py new file mode 100644 index 0000000..0d20ce9 --- /dev/null +++ b/dongtai_agent_python/utils/lock.py @@ -0,0 +1,14 @@ +from functools import wraps + + +def lock(_lock): + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + _lock.acquire() + try: + return func(*args, **kwargs) + finally: + _lock.release() + return wrapper + return decorator diff --git a/dongtai_agent_python/utils/shm/__init__.py b/dongtai_agent_python/utils/shm/__init__.py new file mode 100644 index 0000000..0fdac92 --- /dev/null +++ b/dongtai_agent_python/utils/shm/__init__.py @@ -0,0 +1 @@ +from .shm import SharedMemoryDict diff --git a/dongtai_agent_python/utils/shm/shm.py b/dongtai_agent_python/utils/shm/shm.py new file mode 100644 index 0000000..476b286 --- /dev/null +++ b/dongtai_agent_python/utils/shm/shm.py @@ -0,0 +1,144 @@ +import logging +import pickle +import sys +from contextlib import contextmanager +from multiprocessing import Lock + +from dongtai_agent_python.utils.lock import lock + +if sys.version_info[:3] <= (3, 7): + from shared_memory.shared_memory import SharedMemory +else: + from multiprocessing.shared_memory import SharedMemory + +NULL_BYTE = b"\x00" + +logger = logging.getLogger(__name__) +_lock = Lock() +DEFAULT_OBJ = object() + + +class SharedMemoryDict: + def __init__(self, name, size=1024): + self.name = name + self.mem_block = self.get_or_create(size) + self.init_memory() + + @lock(_lock) + def get_or_create(self, size): + name = 'dongtai-shm-' + self.name + try: + return SharedMemory(name=name) + except FileNotFoundError: + return SharedMemory(name=name, create=True, size=size) + + def init_memory(self): + memory_is_empty = (bytes(self.mem_block.buf).split(NULL_BYTE, 1)[0] == b'') + if memory_is_empty: + self.save_memory({}) + + def close(self) -> None: + if not hasattr(self, 'mem_block'): + return + self.mem_block.close() + + @lock + def clear(self) -> None: + self.save_memory({}) + + def popitem(self): + with self.modify_db() as db: + return db.popitem() + + def save_memory(self, db) -> None: + data = pickle.dumps(db) + try: + self.mem_block.buf[:len(data)] = data + except ValueError as exc: + logging.error("failed save to memory", exc_info=exc) + + def read_memory(self): + return pickle.loads(self.mem_block.buf.tobytes()) + + @contextmanager + @lock(_lock) + def modify_db(self): + db = self.read_memory() + yield db + self.save_memory(db) + + def __getitem__(self, key: str): + return self.read_memory()[key] + + def __setitem__(self, key: str, value) -> None: + with self.modify_db() as db: + db[key] = value + + def __len__(self) -> int: + return len(self.read_memory()) + + def __delitem__(self, key: str) -> None: + with self.modify_db() as db: + del db[key] + + def __iter__(self): + return iter(self.read_memory()) + + def __reversed__(self): + return reversed(self.read_memory()) + + def __del__(self) -> None: + self.close() + + def __contains__(self, key: str) -> bool: + return key in self.read_memory() + + def __eq__(self, other) -> bool: + return self.read_memory() == other + + def __ne__(self, other) -> bool: + return self.read_memory() != other + + if sys.version_info > (3, 8): + def __or__(self, other): + return self.read_memory() | other + + def __ror__(self, other): + return other | self.read_memory() + + def __ior__(self, other): + with self.modify_db() as db: + db |= other + return db + + def __str__(self): + return str(self.read_memory()) + + def __repr__(self): + return repr(self.read_memory()) + + def get(self, key: str, default=None): + return self.read_memory().get(key, default) + + def keys(self): + return self.read_memory().keys() + + def values(self): + return self.read_memory().values() + + def items(self): + return self.read_memory().items() + + def pop(self, key: str, default=DEFAULT_OBJ): + with self.modify_db() as db: + if default is DEFAULT_OBJ: + return db.pop(key) + return db.pop(key, default) + + def update(self, other=(), **kwargs): + with self.modify_db() as db: + db.update(other, **kwargs) + + def setdefault(self, key: str, default=None): + with self.modify_db() as db: + return db.setdefault(key, default) diff --git a/setup.cfg b/setup.cfg index 6d9b41a..1784c82 100644 --- a/setup.cfg +++ b/setup.cfg @@ -29,4 +29,5 @@ install_requires = psutil >= 5.8.0 requests >= 2.25.1 pip >= 19.2.3 + shared-memory38 >= 0.1.2; python_version <= '3.7' ; regexploit >= 1.0.0 From 6cf2eb25ef2f2c4d312160a533a46b78752e3e01 Mon Sep 17 00:00:00 2001 From: lostsnow Date: Thu, 2 Jun 2022 11:50:54 +0800 Subject: [PATCH 2/4] fix test setting with shm --- dongtai_agent_python/setting/setting.py | 1 + dongtai_agent_python/tests/setting/test_setting.py | 4 ++++ dongtai_agent_python/utils/shm/shm.py | 5 +++++ 3 files changed, 10 insertions(+) diff --git a/dongtai_agent_python/setting/setting.py b/dongtai_agent_python/setting/setting.py index cfd8068..d42c1eb 100644 --- a/dongtai_agent_python/setting/setting.py +++ b/dongtai_agent_python/setting/setting.py @@ -45,6 +45,7 @@ def __del__(self): if self.shm is None: return False self.shm.close() + self.shm.unlink() def set_container(self, container): if container and isinstance(container, dict): diff --git a/dongtai_agent_python/tests/setting/test_setting.py b/dongtai_agent_python/tests/setting/test_setting.py index d9d8d02..8ea8e69 100644 --- a/dongtai_agent_python/tests/setting/test_setting.py +++ b/dongtai_agent_python/tests/setting/test_setting.py @@ -11,6 +11,7 @@ class TestSetting(unittest.TestCase): def test_multithreading(self): def test_mt(name): st1 = Setting() + st1.shm = None st1.set_shm("test-setting-001") st1.set_container({'name': name, 'version': '0.1'}) st1.incr_request_seq() @@ -21,6 +22,7 @@ def test_mt(name): t.start() st = Setting() + st.shm = None st.set_shm("test-setting-001") time.sleep(1) self.assertEqual(thread_num, st.request_seq) @@ -31,6 +33,7 @@ def test_multiprocessing(self): def test_mp(name): st1 = Setting() + st1.shm = None st1.set_shm("test-setting-002") st1.set_container({'name': name, 'version': '0.1'}) st1.incr_request_seq() @@ -41,6 +44,7 @@ def test_mp(name): p.start() st = Setting() + st.shm = None st.set_shm("test-setting-002") time.sleep(1) self.assertEqual(process_num, st.request_seq) diff --git a/dongtai_agent_python/utils/shm/shm.py b/dongtai_agent_python/utils/shm/shm.py index 476b286..92e687a 100644 --- a/dongtai_agent_python/utils/shm/shm.py +++ b/dongtai_agent_python/utils/shm/shm.py @@ -42,6 +42,11 @@ def close(self) -> None: return self.mem_block.close() + def unlink(self) -> None: + if not hasattr(self, 'mem_block'): + return + self.mem_block.unlink() + @lock def clear(self) -> None: self.save_memory({}) From 5cc108351f880787d7310836189d1b822ab1fa84 Mon Sep 17 00:00:00 2001 From: lostsnow Date: Thu, 2 Jun 2022 16:55:00 +0800 Subject: [PATCH 3/4] change shm name format --- dongtai_agent_python/setting/setting.py | 2 +- dongtai_agent_python/utils/shm/shm.py | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/dongtai_agent_python/setting/setting.py b/dongtai_agent_python/setting/setting.py index d42c1eb..a76880c 100644 --- a/dongtai_agent_python/setting/setting.py +++ b/dongtai_agent_python/setting/setting.py @@ -87,7 +87,7 @@ def init_os_environ(self): def set_shm(self, name): if self.shm is None: - self.shm = SharedMemoryDict(name) + self.shm = SharedMemoryDict('dongtai-shm-python-' + name) @property def paused(self): diff --git a/dongtai_agent_python/utils/shm/shm.py b/dongtai_agent_python/utils/shm/shm.py index 92e687a..d0c8a3b 100644 --- a/dongtai_agent_python/utils/shm/shm.py +++ b/dongtai_agent_python/utils/shm/shm.py @@ -26,11 +26,10 @@ def __init__(self, name, size=1024): @lock(_lock) def get_or_create(self, size): - name = 'dongtai-shm-' + self.name try: - return SharedMemory(name=name) + return SharedMemory(name=self.name) except FileNotFoundError: - return SharedMemory(name=name, create=True, size=size) + return SharedMemory(name=self.name, create=True, size=size) def init_memory(self): memory_is_empty = (bytes(self.mem_block.buf).split(NULL_BYTE, 1)[0] == b'') From 1ef3f99f97cc9c17b575c8eb8fc8f579677e7d11 Mon Sep 17 00:00:00 2001 From: lostsnow Date: Mon, 6 Jun 2022 15:05:27 +0800 Subject: [PATCH 4/4] fix requires --- .github/workflows/vul-test.yml | 4 ++-- dongtai_agent_python/utils/shm/shm.py | 2 +- setup.cfg | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/vul-test.yml b/.github/workflows/vul-test.yml index d108c8e..e9ebafa 100644 --- a/.github/workflows/vul-test.yml +++ b/.github/workflows/vul-test.yml @@ -56,13 +56,13 @@ jobs: - name: Run Vul test run: | - curl --fail --retry-delay 10 --retry 30 --retry-connrefused http://127.0.0.1:8003/api/django/demo/get_open?name=Data cd ${{ github.workspace }}/DockerVulspace + curl --fail --retry-delay 10 --retry 30 --retry-connrefused http://127.0.0.1:8003/api/django/demo/get_open?name=Data + docker-compose logs djangoweb flaskweb docker-compose exec -T djangoweb python -V docker-compose exec -T djangoweb pip list docker-compose exec -T flaskweb python -V docker-compose exec -T flaskweb pip list - docker-compose logs djangoweb flaskweb bash ${{ github.workspace }}/DongTai-agent-python/dongtai_agent_python/tests/vul-test.sh \ django http://127.0.0.1:8003/api/django ${{ github.run_id }} bash ${{ github.workspace }}/DongTai-agent-python/dongtai_agent_python/tests/vul-test.sh \ diff --git a/dongtai_agent_python/utils/shm/shm.py b/dongtai_agent_python/utils/shm/shm.py index d0c8a3b..dad82ba 100644 --- a/dongtai_agent_python/utils/shm/shm.py +++ b/dongtai_agent_python/utils/shm/shm.py @@ -6,7 +6,7 @@ from dongtai_agent_python.utils.lock import lock -if sys.version_info[:3] <= (3, 7): +if sys.version_info[:3] < (3, 8): from shared_memory.shared_memory import SharedMemory else: from multiprocessing.shared_memory import SharedMemory diff --git a/setup.cfg b/setup.cfg index 1784c82..e8d331e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -29,5 +29,5 @@ install_requires = psutil >= 5.8.0 requests >= 2.25.1 pip >= 19.2.3 - shared-memory38 >= 0.1.2; python_version <= '3.7' + shared-memory38 >= 0.1.2; python_version < '3.8' ; regexploit >= 1.0.0