-
Notifications
You must be signed in to change notification settings - Fork 27
Open
Description
C:\veighna_studio\python.exe "****状态机与数据库state machine.py"
****state machine.py:17: MovedIn20Warning:
Deprecated API features detected! These feature(s) are not compatible with SQLAlchemy 2.0. To prevent incompatible upgrades prior to updating applications, ensure requirements files are pinned to "sqlalchemy<2.0". Set environment variable SQLALCHEMY_WARN_20=1 to show all deprecation warnings. Set environment variable SQLALCHEMY_SILENCE_UBER_WARNING=1 to silence this message. (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)
Traceback (most recent call last):
File "****state machine.py", line 20, in <module>
class Process(Base):
File "C:\veighna_studio\lib\site-packages\state_machine\__init__.py", line 47, in acts_as_state_machine
modified_class = adaptor.modifed_class(original_class, _temp_callback_cache)
File "C:\veighna_studio\lib\site-packages\state_machine\orm\sqlalchemy.py", line 53, in modifed_class
setattr(original_class, key, class_dict[key])
File "C:\veighna_studio\lib\site-packages\sqlalchemy\orm\decl_api.py", line 80, in __setattr__
_add_attribute(cls, key, value)
File "C:\veighna_studio\lib\site-packages\sqlalchemy\orm\decl_base.py", line 1142, in _add_attribute
cls.__mapper__.add_property(key, value)
File "C:\veighna_studio\lib\site-packages\sqlalchemy\orm\mapper.py", line 1984, in add_property
self._configure_property(key, prop, init=self.configured)
File "C:\veighna_studio\lib\site-packages\sqlalchemy\orm\mapper.py", line 1744, in _configure_property
prop = self._property_from_column(key, prop)
File "C:\veighna_studio\lib\site-packages\sqlalchemy\orm\mapper.py", line 1890, in _property_from_column
raise sa_exc.InvalidRequestError(msg)
sqlalchemy.exc.InvalidRequestError: Implicitly combining column processes.aasm_state with column processes.aasm_state under attribute 'aasm_state'. Please configure one or more attributes for these same-named columns explicitly.
进程已结束,退出代码为 1
****state machine.py:
from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from state_machine import acts_as_state_machine, State, Event, before, after
import random
import time
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
logger = logging.getLogger(__name__)
# 数据库配置
engine = create_engine('mysql+pymysql://root:password@localhost:3306/state_test', echo=True)
Session = sessionmaker(bind=engine)
Base = declarative_base()
@acts_as_state_machine
class Process(Base):
__tablename__ = 'processes'
# 数据库字段
id = Column(Integer, primary_key=True)
name = Column(String(50))
transition_count = Column(Integer, default=0)
created_at = Column(DateTime, default=datetime.now)
last_updated = Column(DateTime, default=datetime.now, onupdate=datetime.now)
# 显式定义aasm_state列,指定长度
aasm_state = Column(String(50))
# 状态定义
waiting = State(initial=True)
running = State()
finished = State()
failed = State()
# 事件定义
start = Event(from_states=waiting, to_state=running)
finish = Event(from_states=running, to_state=finished)
fail = Event(from_states=(waiting, running), to_state=failed)
restart = Event(from_states=failed, to_state=waiting)
def __init__(self, name):
self.name = name
self.transition_count = 0
@before('start')
def before_start(self):
logger.info(f"Process {self.name} (ID: {self.id}) is about to start. Transition count: {self.transition_count}")
return random.random() > 0.2
@after('start')
def after_start(self):
self.transition_count += 1
logger.info(f"Process {self.name} (ID: {self.id}) has started. Transition count: {self.transition_count}")
@before('finish')
def before_finish(self):
logger.info(
f"Process {self.name} (ID: {self.id}) is about to finish. Transition count: {self.transition_count}")
return True
@after('finish')
def after_finish(self):
self.transition_count += 1
logger.info(f"Process {self.name} (ID: {self.id}) has finished. Transition count: {self.transition_count}")
@after('fail')
def after_fail(self):
self.transition_count += 1
logger.info(f"Process {self.name} (ID: {self.id}) has failed. Transition count: {self.transition_count}")
@after('restart')
def after_restart(self):
self.transition_count += 1
logger.info(f"Process {self.name} (ID: {self.id}) has restarted. Transition count: {self.transition_count}")
def setup_database():
"""创建数据库和表"""
try:
# 创建数据库
temp_engine = create_engine('mysql+pymysql://root:password@localhost:3306')
with temp_engine.connect() as conn:
conn.execute("CREATE DATABASE IF NOT EXISTS state_test")
# 创建表
Base.metadata.create_all(engine)
logger.info("Database and tables created successfully")
except Exception as e:
logger.error(f"Database setup error: {e}")
raise
def get_or_create_process(session, process_name="TestProcess"):
"""获取或创建进程"""
process = session.query(Process).filter_by(name=process_name).first()
if not process:
process = Process(name=process_name)
session.add(process)
session.commit()
logger.info(f"Created new process with ID: {process.id}")
else:
logger.info(
f"Resumed existing process with ID: {process.id}, State: {process.current_state}, Count: {process.transition_count}")
return process
def simulate_process(session, process):
"""模拟进程状态变化"""
while True:
current_state = process.current_state
logger.info(
f"Process {process.name} (ID: {process.id}) current state: {current_state}, Transitions: {process.transition_count}")
try:
if current_state == 'waiting':
if process.start():
logger.info(f"Process {process.name} started successfully")
else:
process.fail()
logger.info(f"Process {process.name} failed to start")
elif current_state == 'running':
# 80%概率成功完成,20%概率失败
if random.random() > 0.2:
process.finish()
else:
process.fail()
logger.info(f"Process {process.name} failed during execution")
elif current_state == 'failed':
process.restart()
logger.info(f"Process {process.name} restarted")
elif current_state == 'finished':
logger.info(
f"Process {process.name} completed successfully after {process.transition_count} transitions")
break
# 保存状态到数据库
session.commit()
# 随机等待1-3秒
time.sleep(random.uniform(1, 3))
except Exception as e:
logger.error(f"Error occurred: {e}")
session.rollback()
break
def main():
"""主函数"""
session = None
try:
# 设置数据库
setup_database()
# 创建会话
session = Session()
# 获取或创建进程
process = get_or_create_process(session)
# 模拟进程状态变化
simulate_process(session, process)
except Exception as e:
logger.error(f"Main error: {e}")
if session:
session.rollback()
finally:
if session:
session.close()
if __name__ == "__main__":
main()cursor fix suggestion:
state_machine\orm\sqlalchemy.py
class SqlAlchemyAdaptor(BaseAdaptor):
def extra_class_members(self, initial_state):
# return {'aasm_state': sqlalchemy.Column(sqlalchemy.String)}
return {}Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels