From 4a975798f73ad35cb359867547f6a2e58842c9a1 Mon Sep 17 00:00:00 2001 From: lokas Date: Thu, 24 Aug 2023 08:45:26 +0300 Subject: [PATCH 01/10] add TaskDescriptionInterface --- tasks/requests/move/__init__.py | 0 tasks/requests/move/bot/__init__.py | 0 tasks/requests/move/bot/models.py | 111 ++++++++++++++++++++++++++++ 3 files changed, 111 insertions(+) create mode 100644 tasks/requests/move/__init__.py create mode 100644 tasks/requests/move/bot/__init__.py create mode 100644 tasks/requests/move/bot/models.py diff --git a/tasks/requests/move/__init__.py b/tasks/requests/move/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tasks/requests/move/bot/__init__.py b/tasks/requests/move/bot/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tasks/requests/move/bot/models.py b/tasks/requests/move/bot/models.py new file mode 100644 index 00000000..21e801ee --- /dev/null +++ b/tasks/requests/move/bot/models.py @@ -0,0 +1,111 @@ +import copy +import logging +from abc import ABC, abstractmethod + +import pywikibot +import wikitextparser as wtp + + +class TaskDescriptionInterface(ABC): + """ + Interface for a task description. + """ + + @abstractmethod + def get_description(self): + """ + Retrieves the description for the object. + + Returns: + str: The default description for the object. + """ + pass + + +class TaskDescription(TaskDescriptionInterface): + """ + Represents a task description object. + + Args: + site (str): The site of the task. + page_title (str): The title of the page. + default_description (str): The default description. + + Attributes: + site (str): The site of the task. + page_title (str): The title of the page. + default_description (str): The default description. + + Methods: + get_description: Retrieves the description for the object. + + Usage: + # Create a TaskDescription object + site = pywikibot.Site() + task_description = TaskDescription(site=site, page_title='Page 1', default_description='Default description') + + # Get the description + description = task_description.get_description() + """ + + def __init__(self, site, page_title, default_description): + self.site = site + self.page_title = page_title + self.default_description = default_description + + def get_description(self): + """ + Retrieves the description for the object. + + Returns: + str: The default description for the object. + """ + try: + page = pywikibot.Page(self.site, self.page_title) + if page.exists() and page.text != "": + parsed = wtp.parse(page.text) + text = copy.deepcopy(page.text) + for comment in parsed.comments: + text = str(text).replace(str(comment), "") + self.default_description = str(text).strip() + except Exception as e: + print(e) + logging.error(e) + return self.default_description + + +class WikipediaTaskReader: + """ + Reads tasks from Wikipedia. + + Attributes: + task (TaskDescription): The task description. + + Methods: + read: Reads and returns the task. + """ + + def __init__(self, task_description): + """ + Initializes a new instance of the WikipediaTaskReader class. + + Args: + task_description (TaskDescription): The task description. + """ + self.task = task_description + + def read(self): + """ + Reads and returns the task. + + Returns: + TaskDescription: The task description. + """ + return self.task + + +page_name = "ويكيبيديا:طلبات نقل عبر البوت/ملخص التعديل" +site = pywikibot.Site("ar", "wikipedia") +default_description = "بوت:نقل ([[ويكيبيديا:طلبات نقل عبر البوت]])" +task_description = TaskDescription(site=site, page_title=page_name, default_description=default_description) +print(task_description.get_description()) From 7448f0d4f0904952752233efec0db1dee9d49707 Mon Sep 17 00:00:00 2001 From: lokas Date: Thu, 24 Aug 2023 09:29:29 +0300 Subject: [PATCH 02/10] add TaskOptionInterface --- tasks/requests/move/bot/models.py | 77 +++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/tasks/requests/move/bot/models.py b/tasks/requests/move/bot/models.py index 21e801ee..c5726fd6 100644 --- a/tasks/requests/move/bot/models.py +++ b/tasks/requests/move/bot/models.py @@ -5,6 +5,8 @@ import pywikibot import wikitextparser as wtp +from core.utils.helpers import prepare_str + class TaskDescriptionInterface(ABC): """ @@ -74,6 +76,76 @@ def get_description(self): return self.default_description +class TaskOptionInterface(ABC): + """ + Represents an interface for a task option object. + """ + + @abstractmethod + def get_options(self): + """ + Retrieves the options. + + Returns: + dict: A dictionary containing the options and their values. + """ + pass + + +class TaskOption: + """ + Represents a task option object. + + Args: + site (str): The site of the task. + page_title (str): The title of the page. + template_name (str): The name of the template. + + Attributes: + site (str): The site of the task. + page_title (str): The title of the page. + template_name (str): The name of the template. + + Methods: + get_options: Retrieves the options from the template on the page or the default options. + + """ + + DEFAULT_OPTIONS = { + 'move-subpages': 'yes', + 'leave-redirect': 'yes', + 'leave-talk': 'yes', + 'move-talk': 'yes' + } + + def __init__(self, site, page_title, template_name): + self.site = site + self.page_title = page_title + self.template_name = template_name + + def get_options(self): + """ + Retrieves the options from the template on the page or the default options. + + Returns: + dict: A dictionary containing the options and their values. + """ + options = self.DEFAULT_OPTIONS.copy() + + try: + page = pywikibot.Page(self.site, self.page_title) + parsed = wtp.parse(page.text) + for template in parsed.templates: + if prepare_str(template.name) == prepare_str(self.template_name): + for arg in template.arguments: + options[prepare_str(arg.name)] = prepare_str(arg.value) + except Exception as e: + print(e) + logging.error(e) + + return options + + class WikipediaTaskReader: """ Reads tasks from Wikipedia. @@ -109,3 +181,8 @@ def read(self): default_description = "بوت:نقل ([[ويكيبيديا:طلبات نقل عبر البوت]])" task_description = TaskDescription(site=site, page_title=page_name, default_description=default_description) print(task_description.get_description()) + +option_page_name = "ويكيبيديا:طلبات نقل عبر البوت/خيارات البوت" +default_template_name = "ويكيبيديا:طلبات نقل عبر البوت/خيارات البوت/قالب" +task_option = TaskOption(site=site, page_title=option_page_name, template_name=default_template_name) +print(task_option.get_options()) From 842f62fdde35d0ff081b2e682d4a1ddfb2c1b8b4 Mon Sep 17 00:00:00 2001 From: lokas Date: Thu, 24 Aug 2023 09:41:35 +0300 Subject: [PATCH 03/10] add BotRunnerInterface --- tasks/requests/move/bot/models.py | 75 +++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/tasks/requests/move/bot/models.py b/tasks/requests/move/bot/models.py index c5726fd6..99fa84cf 100644 --- a/tasks/requests/move/bot/models.py +++ b/tasks/requests/move/bot/models.py @@ -146,6 +146,77 @@ def get_options(self): return options +class BotRunnerInterface(ABC): + """ + Represents an interface for a bot runner object. + """ + + @abstractmethod + def can_run(self): + """ + Checks if the bot can be run. + + Returns: + bool: True if the bot can be run, False otherwise. + """ + pass + + +class BotRunner(BotRunnerInterface): + """ + Represents a bot runner object. + + Args: + site (str): The site of the page. + page_title (str): The title of the page. + + Attributes: + site (str): The site of the page. + page_title (str): The title of the page. + + Methods: + can_run: Checks if the bot can be run based on the description and option of the page. + + Example: + # Create a BotRunner object + bot_runner = BotRunner(site='example.com', page_title='Page 1') + + # Check if the bot can be run + can_run = bot_runner.can_run() + + if can_run: + print("The bot can be run!") + else: + print("The bot cannot be run.") + + """ + + def __init__(self, site, page_title): + self.site = site + self.page_title = page_title + self.status = False + self.yes_word = "نعم" + + def can_run(self): + """ + Checks if the bot can be run based on the description and option of the page. + + Returns: + bool: True if the bot can be run, False otherwise. + """ + + try: + page = pywikibot.Page(self.site, self.page_title) + if page.exists(): + text = page.text + self.status = prepare_str(text) == prepare_str(self.yes_word) + except Exception as e: + print(e) + logging.error(e) + + return self.status + + class WikipediaTaskReader: """ Reads tasks from Wikipedia. @@ -186,3 +257,7 @@ def read(self): default_template_name = "ويكيبيديا:طلبات نقل عبر البوت/خيارات البوت/قالب" task_option = TaskOption(site=site, page_title=option_page_name, template_name=default_template_name) print(task_option.get_options()) + +bot_runner_page_name = "ويكيبيديا:طلبات نقل عبر البوت/تشغيل البوت" +bot_runner = BotRunner(site=site, page_title=bot_runner_page_name) +print(bot_runner.can_run()) From 0cde150911e41a99d41fc3649499706b95ee0c69 Mon Sep 17 00:00:00 2001 From: lokas Date: Thu, 24 Aug 2023 10:58:09 +0300 Subject: [PATCH 04/10] add check_user_role --- tasks/requests/move/bot/models.py | 138 ++++++++++++++++++++++++------ tasks/requests/move/read.py | 92 ++++++++++++++++++++ tasks/requests/move/run.py | 89 +++++++++++++++++++ 3 files changed, 292 insertions(+), 27 deletions(-) create mode 100644 tasks/requests/move/read.py create mode 100644 tasks/requests/move/run.py diff --git a/tasks/requests/move/bot/models.py b/tasks/requests/move/bot/models.py index 99fa84cf..ff8e093e 100644 --- a/tasks/requests/move/bot/models.py +++ b/tasks/requests/move/bot/models.py @@ -92,7 +92,7 @@ def get_options(self): pass -class TaskOption: +class TaskOption(TaskOptionInterface): """ Represents a task option object. @@ -217,25 +217,125 @@ def can_run(self): return self.status -class WikipediaTaskReader: +class LastUserEditRoleInterface(ABC): + """ + Represents an interface for checking the role of the last user who edited a page. """ - Reads tasks from Wikipedia. - Attributes: - task (TaskDescription): The task description. + def __init__(self, page: pywikibot.Page, role: str): + self.page = page + self.role = role - Methods: - read: Reads and returns the task. + @abstractmethod + def get_last_user_role(self) -> list: + """ + Get the role of the last user who edited the page. + Returns: + list: The role of the last user who edited the page. + """ + pass + + @abstractmethod + def check_user_role(self) -> bool: + """ + Checks if the user has the role of the last user who edited the page. + Returns: + bool: True if the user has the role of the last user who edited the page, False otherwise. + """ + pass + + +class LastUserEditRoleChecker(LastUserEditRoleInterface): + """ + Represents a class for checking the role of the last user who edited a page. """ - def __init__(self, task_description): + def __init__(self, page: pywikibot.Page, role: str): + super().__init__(page=page, role=role) + + def get_last_user_role(self) -> list: + """ + Get the role of the last user who edited the page. + Returns: + list: The role of the last user who edited the page. """ - Initializes a new instance of the WikipediaTaskReader class. + try: + last_user = self.page.lastNonBotUser() + user = pywikibot.User(self.page.site, last_user) + return user.groups() + except Exception as e: + print(e) + logging.error(e) + return [] + + def check_user_role(self) -> bool: + """ + Checks if the user has the role of the last user who edited the page. + Returns: + bool: True if the user has the role of the last user who edited the page, False otherwise. + """ + return self.role in self.get_last_user_role() + + +class WikipediaTaskReader: + """ + Represents a task reader for Wikipedia. Args: - task_description (TaskDescription): The task description. + description (DescriptionInterface): The description object. + option (OptionInterface): The option object. + task (TaskInterface): The task object. + last_user_edit_role (LastUserEditRoleInterface): The last user edit role object. + Methods: + can_run: Checks if the bot can be run based on the description, option, and task. + + Example: + # Create description, option, and task objects + description = DescriptionImpl() + option = OptionImpl() + task = TaskImpl() + + # Create a WikipediaTaskReader object + task_reader = WikipediaTaskReader(description, option, task) + + # Check if the bot can be run + can_run = task_reader.can_run() + + if can_run: + print("The bot can be run!") + else: + print("The bot cannot be run.") + """ + + def __init__(self, site: pywikibot.Site, description: TaskDescriptionInterface, option: TaskOptionInterface, + task_stats: BotRunnerInterface, last_user_edit_role: LastUserEditRoleInterface): + self.description = description + self.option = option + self.task_stats = task_stats + self.site = site + self.last_user_edit_role = last_user_edit_role + + def can_bot_run(self): + """ + Check if the bot can run based on the status of the page. + + Returns: + bool: True if the bot can run, False otherwise. """ - self.task = task_description + if self.task_stats.can_run(): + print("can run page status has true value") + return True + else: + print("can run page status has false value") + return False + + def check_user_role(self) -> bool: + if self.last_user_edit_role.check_user_role(): + print("can run page status has true value") + return True + else: + print("can run page status has false value") + return False def read(self): """ @@ -245,19 +345,3 @@ def read(self): TaskDescription: The task description. """ return self.task - - -page_name = "ويكيبيديا:طلبات نقل عبر البوت/ملخص التعديل" -site = pywikibot.Site("ar", "wikipedia") -default_description = "بوت:نقل ([[ويكيبيديا:طلبات نقل عبر البوت]])" -task_description = TaskDescription(site=site, page_title=page_name, default_description=default_description) -print(task_description.get_description()) - -option_page_name = "ويكيبيديا:طلبات نقل عبر البوت/خيارات البوت" -default_template_name = "ويكيبيديا:طلبات نقل عبر البوت/خيارات البوت/قالب" -task_option = TaskOption(site=site, page_title=option_page_name, template_name=default_template_name) -print(task_option.get_options()) - -bot_runner_page_name = "ويكيبيديا:طلبات نقل عبر البوت/تشغيل البوت" -bot_runner = BotRunner(site=site, page_title=bot_runner_page_name) -print(bot_runner.can_run()) diff --git a/tasks/requests/move/read.py b/tasks/requests/move/read.py new file mode 100644 index 00000000..cc071f1d --- /dev/null +++ b/tasks/requests/move/read.py @@ -0,0 +1,92 @@ +import logging + +import pywikibot + +from tasks.requests.move.bot.models import TaskDescription, TaskOption, BotRunner, WikipediaTaskReader, \ + LastUserEditRoleChecker + +# Create an instance of the RequestsPage class +site = pywikibot.Site() + +type_of_request = 9 + +try: + + page_name = "ويكيبيديا:طلبات نقل عبر البوت/ملخص التعديل" + site = pywikibot.Site("ar", "wikipedia") + default_description = "بوت:نقل ([[ويكيبيديا:طلبات نقل عبر البوت]])" + task_description = TaskDescription(site=site, page_title=page_name, default_description=default_description) + print(task_description.get_description()) + + option_page_name = "ويكيبيديا:طلبات نقل عبر البوت/خيارات البوت" + default_template_name = "ويكيبيديا:طلبات نقل عبر البوت/خيارات البوت/قالب" + task_option = TaskOption(site=site, page_title=option_page_name, template_name=default_template_name) + print(task_option.get_options()) + + bot_runner_page_name = "ويكيبيديا:طلبات نقل عبر البوت/تشغيل البوت" + bot_runner = BotRunner(site=site, page_title=bot_runner_page_name) + print(bot_runner.can_run()) + + task_page = pywikibot.Page(site, "ويكيبيديا:طلبات نقل عبر البوت") + + last_user_edit_role = LastUserEditRoleChecker(page=task_page, role="editor") + + wikipediataskreader = WikipediaTaskReader( + site=site, + description=task_description, + option=task_option, + task_stats=bot_runner, + last_user_edit_role=last_user_edit_role + ) + if wikipediataskreader.can_bot_run() and wikipediataskreader.check_user_role(): + print("can run page status has true value") + + +except Exception as e: + print(f"An error occurred: {e}") + logging.error(e) + +# +# try: +# +# requests_page = RequestsPage(site) +# requests_page.title = "ويكيبيديا:طلبات توزيع قالب تصفح" +# requests_page.header_text = "{{/ترويسة}}" +# +# requests_page.load_page() +# +# if requests_page.check_user_edits(3000): +# scanner = RequestsScanner() +# scanner.pattern = r"\*\s*\[\[قالب:(?P.*)\]\](?P.*)" +# scanner.scan(requests_page.get_page_text()) +# +# if scanner.have_requests: +# requests_page.start_request() +# try: +# with Session(engine) as session: +# for request in scanner.requests: +# # source_page = pywikibot.Page(site, f"{request['source']}",ns=0) +# # destination_page = pywikibot.Page(site, f"{request['destination']}",ns=0) +# # if source_page.exists() and destination_page.exists() and source_page.namespace() == 0 and destination_page.namespace() == 0: +# # todo:add check if template exists with send content to talk page +# request_model = Request( +# from_title=request['source'], +# from_namespace=10, +# request_type=type_of_request, +# extra=request['extra'] +# ) +# session.add(request_model) +# session.commit() +# except Exception as e: +# session.rollback() +# print("An error occurred while committing the changes:", e) +# +# else: +# print("no reqtest found") +# requests_page.move_to_talk_page() +# else: +# print("not allow for user") +# requests_page.move_to_talk_page() +# # Get the page text after removing the header text +# except Exception as e: +# print(f"An error occurred: {e}") diff --git a/tasks/requests/move/run.py b/tasks/requests/move/run.py new file mode 100644 index 00000000..24a7c48d --- /dev/null +++ b/tasks/requests/move/run.py @@ -0,0 +1,89 @@ +import traceback + +import pywikibot +import wikitextparser as wtp +from sqlalchemy import select, func, distinct +from sqlalchemy.orm import Session + +from core.utils.helpers import prepare_str +from core.utils.pipeline import Pipeline +from tasks.requests.core.database.engine import engine +from tasks.requests.core.database.models import Request, Status, Page +from tasks.requests.remove.bot.portals_bar import PortalsBar +from tasks.requests.remove.bot.portals_merge import PortalsMerge +from tasks.requests.remove.bot.remove_portal import RemovePortal + +# Create an instance of the RequestsPage class +site = pywikibot.Site() + +type_of_request = 9 + +try: + session = Session(engine) + + stmt = select(Request).join(Page).filter(Request.status == Status.RECEIVED, Page.status == Status.PENDING, + Request.request_type == type_of_request).group_by(Request).having( + func.count(Page.id) == func.count(distinct(Page.id))).limit(20) + + for request in session.scalars(stmt): + + template_from = request.from_title + template_to = request.to_title + + pages = session.query(Page).filter(Page.request == request, Page.status == Status.PENDING).order_by( + Page.namespace.desc()).limit(1000).all() + + for page in pages: + try: + p = pywikibot.Page(site, title=str(page.page_name), ns=page.namespace) + if p.exists(): + temp_text = p.text + parsed = wtp.parse(p.text) + # for remove template + if request.from_namespace == 10: + for template in parsed.templates: + if prepare_str(template.name) == prepare_str(template_from): + temp_text = temp_text.replace(str(template), str("")) + + if request.from_namespace == 14: + for link in parsed.wikilinks: + if link.title.startswith("تصنيف:"): + if prepare_str(link.title.replace("تصنيف:", "")) == prepare_str(template_from): + temp_text = str(temp_text).replace(str(link), "") + + if request.from_namespace == 100: + + step_one = RemovePortal(p.text, template_from) + step_one.start_remove() + if p.namespace() == 0: + pipeline = Pipeline(p, step_one.tem_text, str("remove"), [ + PortalsMerge, + PortalsBar, + ], []) + processed_text, processed_summary = pipeline.process() + temp_text = processed_text + else: + temp_text = step_one.tem_text + p.text = temp_text + + word = "تصنيف" + if request.from_namespace == 10: + word = "قالب" + elif request.from_namespace == 100: + word = "بوابة" + + p.save( + summary="بوت:[[ويكيبيديا:طلبات إزالة (بوابة، تصنيف، قالب)]] حذف [[" + word + ":" + template_from + "]] " + ) + page.status = Status.COMPLETED + session.commit() + except Exception as e: + print(f"An error occurred where save : {e}") + just_the_string = traceback.format_exc() + print(just_the_string) + session.rollback() + +except Exception as e: + print(f"An error occurred: {e}") + just_the_string = traceback.format_exc() + print(just_the_string) From f4588dfd224c0aabfa140ffc251048d2f58b6e44 Mon Sep 17 00:00:00 2001 From: lokas Date: Thu, 24 Aug 2023 12:09:12 +0300 Subject: [PATCH 05/10] add newModel --- tasks/requests/core/database/models.py | 30 +++- tasks/requests/move/bot/models.py | 227 ++++++++++++++++++++++++- tasks/requests/move/read.py | 31 +++- 3 files changed, 269 insertions(+), 19 deletions(-) diff --git a/tasks/requests/core/database/models.py b/tasks/requests/core/database/models.py index b3e5bbf7..30d5aab5 100644 --- a/tasks/requests/core/database/models.py +++ b/tasks/requests/core/database/models.py @@ -1,20 +1,18 @@ +import enum +from datetime import datetime from typing import List as typing_list -from sqlalchemy import String, INTEGER, TIMESTAMP,func,ForeignKey,Text +from sqlalchemy import String, INTEGER, func, ForeignKey, Text +from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.orm import DeclarativeBase from sqlalchemy.orm import Mapped from sqlalchemy.orm import mapped_column from sqlalchemy.orm import relationship -from datetime import datetime -from sqlalchemy.ext.hybrid import hybrid_property - -import enum - - from .engine import engine from .hellper import get_namespace + class Base(DeclarativeBase): pass @@ -74,4 +72,22 @@ def __repr__(self) -> str: return f"pages(id={self.id!r}, title={self.title!r})" +class Request_Move_Page(Base): + __tablename__ = "request_move_page" + + id: Mapped[int] = mapped_column(primary_key=True) + from_title: Mapped[str] = mapped_column(String(255)) + from_namespace: Mapped[int] = mapped_column(INTEGER) + to_title: Mapped[str] = mapped_column(String(255)) + to_namespace: Mapped[int] = mapped_column(INTEGER) + status: Mapped[Status] = mapped_column(insert_default=Status.PENDING) + create_date: Mapped[datetime] = mapped_column(insert_default=func.now()) + update_date: Mapped[datetime] = mapped_column(server_default=func.now(), onupdate=func.current_timestamp()) + task_description: Mapped[str] = mapped_column(Text, nullable=True) + task_options: Mapped[str] = mapped_column(Text, nullable=True) + + def __repr__(self) -> str: + return f"pages(id={self.id!r}, title={self.from_title!r})" + + Base.metadata.create_all(engine) diff --git a/tasks/requests/move/bot/models.py b/tasks/requests/move/bot/models.py index ff8e093e..20559c62 100644 --- a/tasks/requests/move/bot/models.py +++ b/tasks/requests/move/bot/models.py @@ -1,5 +1,6 @@ import copy import logging +import re from abc import ABC, abstractmethod import pywikibot @@ -277,6 +278,201 @@ def check_user_role(self) -> bool: return self.role in self.get_last_user_role() +class WikiListFormatInterface(ABC): + """ + Represents an interface for checking the format of a wiki list and performing operations on it. + """ + + @abstractmethod + def set_wiki_text(self, wiki_text): + """ + Set the wiki text. + + Args: + wiki_text (str): The wiki text to set. + """ + pass + + @abstractmethod + def check_format(self): + """ + Check the format of the wiki list. + + Returns: + bool: True if the wiki list has the correct format, False otherwise. + """ + pass + + @abstractmethod + def get_list(self): + """ + Get the wiki list. + + Returns: + list: The wiki list. + """ + pass + + +class WikiListFormatChecker(WikiListFormatInterface): + """ + Represents a class for checking the format of a wiki list and performing operations on it. + + Methods: + set_wiki_text: Set the wiki text. + check_format: Check the format of the wiki list. + get_list: Get the wiki list. + + Example: + # Create a WikiListFormatChecker object + format_checker = WikiListFormatChecker() + + # Set the wiki text + format_checker.set_wiki_text("1. Item 1\n2. Item 2\n3. Item 3") + + # Check the format of the wiki list + is_format_correct = format_checker.check_format() + + if is_format_correct: + print("The wiki list format is correct!") + else: + print("The wiki list format is incorrect.") + + # Get the wiki list + wiki_list = format_checker.get_list() + print("The wiki list is:", wiki_list) + """ + + def __init__(self): + self.list = [] + self.wiki_text = "" + + def set_wiki_text(self, wiki_text): + """ + Set the wiki text. + + Args: + wiki_text (str): The wiki text to set. + """ + self.wiki_text = wiki_text + + def check_format(self): + """ + Check the format of the wiki list. + + Returns: + bool: True if the wiki list has the correct format, False otherwise. + """ + tem_wiki_text = self.wiki_text.replace("{{/مقدمة}}", "").strip() + status = True + # loop line by line + for line in tem_wiki_text.split("\n"): + regex = re.compile( + r"\*\s*\[\[(?P.*):(?P.*)\]\](?P.*\>.*)\[\[(?P.*):(?P.*)\]\]") + + # to skip empty lines + if line.strip() == "": + continue + + if regex.match(line.strip()): + # print from_ns, source, to_ns, destination + from_ns = regex.match(line).group("from_ns") + source = regex.match(line).group("source") + to_ns = regex.match(line).group("to_ns") + destination = regex.match(line).group("destination") + self.list.append({ + "from_ns": from_ns, + "source": source, + "to_ns": to_ns, + "destination": destination + }) + else: + status = False + break + return status + + def get_list(self): + """ + Get the wiki list. + + Returns: + list: The wiki list. + """ + return self.list + + +class Order: + """ + Represents an order. + + Attributes: + from_ns (str): The source namespace of the order. + source (str): The source of the order. + to_ns (str): The destination namespace of the order. + destination (str): The destination of the order. + description (str): The description of the order. + options (str): The options of the order. + + Methods: + __init__: Initialize a new instance of the Order class. + set_description: Set the description of the order. + set_options: Set the options of the order. + + Example: + # Create a new order + order = Order("NS1", "Source1", "NS2", "Destination1") + + # Set the description of the order + order.set_description("This is a sample order.") + + # Set the options of the order + order.set_options("Option 1, Option 2, Option 3") + + # Access the attributes of the order + print("From Namespace:", order.from_ns) + print("Source:", order.source) + print("To Namespace:", order.to_ns) + print("Destination:", order.destination) + print("Description:", order.description) + print("Options:", order.options) + """ + + def __init__(self, from_ns, source, to_ns, destination): + """ + Initialize a new instance of the Order class. + + Args: + from_ns (str): The source namespace of the order. + source (str): The source of the order. + to_ns (str): The destination namespace of the order. + destination (str): The destination of the order. + """ + self.from_ns = from_ns + self.source = source + self.to_ns = to_ns + self.destination = destination + self.description = "" + self.options = "" + + def set_description(self, description): + """ + Set the description of the order. + + Args: + description (str): The description to set. + """ + self.description = description + + def set_options(self, options): + """ + Set the options of the order. + + Args: + options (str): The options to set. + """ + self.options = options + + class WikipediaTaskReader: """ Represents a task reader for Wikipedia. @@ -308,12 +504,15 @@ class WikipediaTaskReader: """ def __init__(self, site: pywikibot.Site, description: TaskDescriptionInterface, option: TaskOptionInterface, - task_stats: BotRunnerInterface, last_user_edit_role: LastUserEditRoleInterface): + task_stats: BotRunnerInterface, last_user_edit_role: LastUserEditRoleInterface, + wiki_text_list: WikiListFormatInterface): self.description = description self.option = option self.task_stats = task_stats self.site = site self.last_user_edit_role = last_user_edit_role + self.wiki_text_list = wiki_text_list + self.orders_list = [] def can_bot_run(self): """ @@ -337,11 +536,23 @@ def check_user_role(self) -> bool: print("can run page status has false value") return False - def read(self): - """ - Reads and returns the task. + def check_format(self): + if self.wiki_text_list.check_format(): + print("can run page status has true value") + return True + else: + print("can run page status has false value") + return False - Returns: - TaskDescription: The task description. - """ - return self.task + def get_list(self): + temp_list = self.wiki_text_list.get_list() + self.orders_list = [] + for item in temp_list: + order = Order(item["from_ns"], item["source"], item["to_ns"], item["destination"]) + order.set_description(self.description.get_description()) + order.set_options(self.option.get_options()) + self.orders_list.append(order) + return self.orders_list + + def can_read(self): + return self.can_bot_run() and self.check_user_role() and self.check_format() diff --git a/tasks/requests/move/read.py b/tasks/requests/move/read.py index cc071f1d..a08e2e0e 100644 --- a/tasks/requests/move/read.py +++ b/tasks/requests/move/read.py @@ -1,9 +1,12 @@ import logging import pywikibot +from sqlalchemy.orm import Session +from tasks.requests.core.database.engine import engine +from tasks.requests.core.database.models import Request_Move_Page from tasks.requests.move.bot.models import TaskDescription, TaskOption, BotRunner, WikipediaTaskReader, \ - LastUserEditRoleChecker + LastUserEditRoleChecker, WikiListFormatChecker # Create an instance of the RequestsPage class site = pywikibot.Site() @@ -31,15 +34,35 @@ last_user_edit_role = LastUserEditRoleChecker(page=task_page, role="editor") + wiki_text_list = WikiListFormatChecker() + wiki_text_list.set_wiki_text(task_page.text) + wikipediataskreader = WikipediaTaskReader( site=site, description=task_description, option=task_option, task_stats=bot_runner, - last_user_edit_role=last_user_edit_role + last_user_edit_role=last_user_edit_role, + wiki_text_list=wiki_text_list ) - if wikipediataskreader.can_bot_run() and wikipediataskreader.check_user_role(): - print("can run page status has true value") + + if wikipediataskreader.can_read(): + try: + with Session(engine) as session: + for order in wikipediataskreader.get_list(): + request_model = Request_Move_Page( + from_title=order.source, + from_namespace=order.from_ns, + to_title=order.description, + to_namespace=order.to_ns, + task_description=order.description, + task_options=str(order.options) + ) + session.add(request_model) + session.commit() + except Exception as e: + session.rollback() + print("An error occurred while committing the changes:", e) except Exception as e: From 53880b72676043314aa3319cf1f038d975f0ff2d Mon Sep 17 00:00:00 2001 From: lokas Date: Thu, 24 Aug 2023 12:22:29 +0300 Subject: [PATCH 06/10] end read.py --- tasks/requests/move/bot/models.py | 20 ++++++++++++++++++-- tasks/requests/move/read.py | 7 ++++++- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/tasks/requests/move/bot/models.py b/tasks/requests/move/bot/models.py index 20559c62..7320ca71 100644 --- a/tasks/requests/move/bot/models.py +++ b/tasks/requests/move/bot/models.py @@ -380,6 +380,7 @@ def check_format(self): source = regex.match(line).group("source") to_ns = regex.match(line).group("to_ns") destination = regex.match(line).group("destination") + # todo: add list of namespaces that only allowed to move self.list.append({ "from_ns": from_ns, "source": source, @@ -447,9 +448,9 @@ def __init__(self, from_ns, source, to_ns, destination): to_ns (str): The destination namespace of the order. destination (str): The destination of the order. """ - self.from_ns = from_ns + self.from_ns = from_ns.replace(":", "") self.source = source - self.to_ns = to_ns + self.to_ns = to_ns.replace(":", "") self.destination = destination self.description = "" self.options = "" @@ -556,3 +557,18 @@ def get_list(self): def can_read(self): return self.can_bot_run() and self.check_user_role() and self.check_format() + + def move_to_talk_page(self): + # titles + page_name = "ويكيبيديا:طلبات نقل عبر البوت" + talk_name = "ويكيبيديا:طلبات نقل عبر البوت/أرشيف 10" + # objects + page = pywikibot.Page(self.site, page_name) + archive_page = pywikibot.Page(self.site, talk_name) + # set texts + template = "{{/مقدمة}}" + archive_page.text = archive_page.text + "\n" + page.text.replace(template, "") + page.text = template + # start save + page.save("جاري النقل") + archive_page.save("اضافة الي الارشيف") diff --git a/tasks/requests/move/read.py b/tasks/requests/move/read.py index a08e2e0e..70622391 100644 --- a/tasks/requests/move/read.py +++ b/tasks/requests/move/read.py @@ -45,7 +45,7 @@ last_user_edit_role=last_user_edit_role, wiki_text_list=wiki_text_list ) - + emptry_page = False if wikipediataskreader.can_read(): try: with Session(engine) as session: @@ -60,10 +60,15 @@ ) session.add(request_model) session.commit() + emptry_page = True except Exception as e: session.rollback() print("An error occurred while committing the changes:", e) + if emptry_page: + wikipediataskreader.move_to_talk_page() + else: + print("no reqtest found") except Exception as e: print(f"An error occurred: {e}") From c873c4772e990ec558785cf4300ffe82de55436f Mon Sep 17 00:00:00 2001 From: lokas Date: Thu, 24 Aug 2023 12:31:05 +0300 Subject: [PATCH 07/10] fix page name that will check by LastUserEditRoleChecker --- tasks/requests/move/bot/models.py | 4 ++++ tasks/requests/move/read.py | 4 +++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/tasks/requests/move/bot/models.py b/tasks/requests/move/bot/models.py index 7320ca71..f3c0ccbe 100644 --- a/tasks/requests/move/bot/models.py +++ b/tasks/requests/move/bot/models.py @@ -562,13 +562,17 @@ def move_to_talk_page(self): # titles page_name = "ويكيبيديا:طلبات نقل عبر البوت" talk_name = "ويكيبيديا:طلبات نقل عبر البوت/أرشيف 10" + turn_on_name = "ويكيبيديا:طلبات نقل عبر البوت/تشغيل البوت" # objects page = pywikibot.Page(self.site, page_name) archive_page = pywikibot.Page(self.site, talk_name) + turn_on_name = pywikibot.Page(self.site, turn_on_name) # set texts template = "{{/مقدمة}}" archive_page.text = archive_page.text + "\n" + page.text.replace(template, "") page.text = template + turn_on_name.text = "لا" # start save page.save("جاري النقل") archive_page.save("اضافة الي الارشيف") + turn_on_name.save("تم تشغيل البوت") diff --git a/tasks/requests/move/read.py b/tasks/requests/move/read.py index 70622391..208e294b 100644 --- a/tasks/requests/move/read.py +++ b/tasks/requests/move/read.py @@ -27,12 +27,14 @@ print(task_option.get_options()) bot_runner_page_name = "ويكيبيديا:طلبات نقل عبر البوت/تشغيل البوت" + bot_runner = BotRunner(site=site, page_title=bot_runner_page_name) print(bot_runner.can_run()) task_page = pywikibot.Page(site, "ويكيبيديا:طلبات نقل عبر البوت") - last_user_edit_role = LastUserEditRoleChecker(page=task_page, role="editor") + last_user_edit_role_page = pywikibot.Page(site, bot_runner_page_name) + last_user_edit_role = LastUserEditRoleChecker(page=last_user_edit_role_page, role="editor") wiki_text_list = WikiListFormatChecker() wiki_text_list.set_wiki_text(task_page.text) From 97448e77405f39ab415fe455c9b48394fa7678be Mon Sep 17 00:00:00 2001 From: lokas Date: Thu, 24 Aug 2023 12:38:08 +0300 Subject: [PATCH 08/10] fixing --- tasks/requests/move/read.py | 62 +++---------------------------------- 1 file changed, 5 insertions(+), 57 deletions(-) diff --git a/tasks/requests/move/read.py b/tasks/requests/move/read.py index 208e294b..39611237 100644 --- a/tasks/requests/move/read.py +++ b/tasks/requests/move/read.py @@ -16,28 +16,19 @@ try: page_name = "ويكيبيديا:طلبات نقل عبر البوت/ملخص التعديل" - site = pywikibot.Site("ar", "wikipedia") default_description = "بوت:نقل ([[ويكيبيديا:طلبات نقل عبر البوت]])" - task_description = TaskDescription(site=site, page_title=page_name, default_description=default_description) - print(task_description.get_description()) - option_page_name = "ويكيبيديا:طلبات نقل عبر البوت/خيارات البوت" default_template_name = "ويكيبيديا:طلبات نقل عبر البوت/خيارات البوت/قالب" - task_option = TaskOption(site=site, page_title=option_page_name, template_name=default_template_name) - print(task_option.get_options()) - bot_runner_page_name = "ويكيبيديا:طلبات نقل عبر البوت/تشغيل البوت" - bot_runner = BotRunner(site=site, page_title=bot_runner_page_name) - print(bot_runner.can_run()) - task_page = pywikibot.Page(site, "ويكيبيديا:طلبات نقل عبر البوت") + task_description = TaskDescription(site=site, page_title=page_name, default_description=default_description) + task_option = TaskOption(site=site, page_title=option_page_name, template_name=default_template_name) + bot_runner = BotRunner(site=site, page_title=bot_runner_page_name) last_user_edit_role_page = pywikibot.Page(site, bot_runner_page_name) last_user_edit_role = LastUserEditRoleChecker(page=last_user_edit_role_page, role="editor") - - wiki_text_list = WikiListFormatChecker() - wiki_text_list.set_wiki_text(task_page.text) + wiki_text_list = WikiListFormatChecker().set_wiki_text(task_page.text) wikipediataskreader = WikipediaTaskReader( site=site, @@ -47,6 +38,7 @@ last_user_edit_role=last_user_edit_role, wiki_text_list=wiki_text_list ) + emptry_page = False if wikipediataskreader.can_read(): try: @@ -76,47 +68,3 @@ print(f"An error occurred: {e}") logging.error(e) -# -# try: -# -# requests_page = RequestsPage(site) -# requests_page.title = "ويكيبيديا:طلبات توزيع قالب تصفح" -# requests_page.header_text = "{{/ترويسة}}" -# -# requests_page.load_page() -# -# if requests_page.check_user_edits(3000): -# scanner = RequestsScanner() -# scanner.pattern = r"\*\s*\[\[قالب:(?P.*)\]\](?P.*)" -# scanner.scan(requests_page.get_page_text()) -# -# if scanner.have_requests: -# requests_page.start_request() -# try: -# with Session(engine) as session: -# for request in scanner.requests: -# # source_page = pywikibot.Page(site, f"{request['source']}",ns=0) -# # destination_page = pywikibot.Page(site, f"{request['destination']}",ns=0) -# # if source_page.exists() and destination_page.exists() and source_page.namespace() == 0 and destination_page.namespace() == 0: -# # todo:add check if template exists with send content to talk page -# request_model = Request( -# from_title=request['source'], -# from_namespace=10, -# request_type=type_of_request, -# extra=request['extra'] -# ) -# session.add(request_model) -# session.commit() -# except Exception as e: -# session.rollback() -# print("An error occurred while committing the changes:", e) -# -# else: -# print("no reqtest found") -# requests_page.move_to_talk_page() -# else: -# print("not allow for user") -# requests_page.move_to_talk_page() -# # Get the page text after removing the header text -# except Exception as e: -# print(f"An error occurred: {e}") From e989ff5f1daee74e4d2078b41b5d21cdef5b744b Mon Sep 17 00:00:00 2001 From: lokas Date: Thu, 24 Aug 2023 12:49:04 +0300 Subject: [PATCH 09/10] fixing + add base of run --- tasks/requests/move/read.py | 16 ++++--- tasks/requests/move/run.py | 85 ++++++------------------------------- 2 files changed, 23 insertions(+), 78 deletions(-) diff --git a/tasks/requests/move/read.py b/tasks/requests/move/read.py index 39611237..c568629e 100644 --- a/tasks/requests/move/read.py +++ b/tasks/requests/move/read.py @@ -1,4 +1,4 @@ -import logging +import traceback import pywikibot from sqlalchemy.orm import Session @@ -11,8 +11,6 @@ # Create an instance of the RequestsPage class site = pywikibot.Site() -type_of_request = 9 - try: page_name = "ويكيبيديا:طلبات نقل عبر البوت/ملخص التعديل" @@ -28,7 +26,8 @@ bot_runner = BotRunner(site=site, page_title=bot_runner_page_name) last_user_edit_role_page = pywikibot.Page(site, bot_runner_page_name) last_user_edit_role = LastUserEditRoleChecker(page=last_user_edit_role_page, role="editor") - wiki_text_list = WikiListFormatChecker().set_wiki_text(task_page.text) + wiki_text_list = WikiListFormatChecker() + wiki_text_list.set_wiki_text(task_page.text) wikipediataskreader = WikipediaTaskReader( site=site, @@ -58,13 +57,16 @@ except Exception as e: session.rollback() print("An error occurred while committing the changes:", e) + just_the_string = traceback.format_exc() + print(just_the_string) if emptry_page: - wikipediataskreader.move_to_talk_page() + # wikipediataskreader.move_to_talk_page() + pass else: print("no reqtest found") except Exception as e: print(f"An error occurred: {e}") - logging.error(e) - + just_the_string = traceback.format_exc() + print(just_the_string) diff --git a/tasks/requests/move/run.py b/tasks/requests/move/run.py index 24a7c48d..ba516a90 100644 --- a/tasks/requests/move/run.py +++ b/tasks/requests/move/run.py @@ -1,87 +1,30 @@ import traceback import pywikibot -import wikitextparser as wtp -from sqlalchemy import select, func, distinct from sqlalchemy.orm import Session -from core.utils.helpers import prepare_str -from core.utils.pipeline import Pipeline from tasks.requests.core.database.engine import engine -from tasks.requests.core.database.models import Request, Status, Page -from tasks.requests.remove.bot.portals_bar import PortalsBar -from tasks.requests.remove.bot.portals_merge import PortalsMerge -from tasks.requests.remove.bot.remove_portal import RemovePortal +from tasks.requests.core.database.models import Request_Move_Page, Status # Create an instance of the RequestsPage class site = pywikibot.Site() -type_of_request = 9 - try: session = Session(engine) - stmt = select(Request).join(Page).filter(Request.status == Status.RECEIVED, Page.status == Status.PENDING, - Request.request_type == type_of_request).group_by(Request).having( - func.count(Page.id) == func.count(distinct(Page.id))).limit(20) - - for request in session.scalars(stmt): - - template_from = request.from_title - template_to = request.to_title - - pages = session.query(Page).filter(Page.request == request, Page.status == Status.PENDING).order_by( - Page.namespace.desc()).limit(1000).all() - - for page in pages: - try: - p = pywikibot.Page(site, title=str(page.page_name), ns=page.namespace) - if p.exists(): - temp_text = p.text - parsed = wtp.parse(p.text) - # for remove template - if request.from_namespace == 10: - for template in parsed.templates: - if prepare_str(template.name) == prepare_str(template_from): - temp_text = temp_text.replace(str(template), str("")) - - if request.from_namespace == 14: - for link in parsed.wikilinks: - if link.title.startswith("تصنيف:"): - if prepare_str(link.title.replace("تصنيف:", "")) == prepare_str(template_from): - temp_text = str(temp_text).replace(str(link), "") - - if request.from_namespace == 100: - - step_one = RemovePortal(p.text, template_from) - step_one.start_remove() - if p.namespace() == 0: - pipeline = Pipeline(p, step_one.tem_text, str("remove"), [ - PortalsMerge, - PortalsBar, - ], []) - processed_text, processed_summary = pipeline.process() - temp_text = processed_text - else: - temp_text = step_one.tem_text - p.text = temp_text - - word = "تصنيف" - if request.from_namespace == 10: - word = "قالب" - elif request.from_namespace == 100: - word = "بوابة" - - p.save( - summary="بوت:[[ويكيبيديا:طلبات إزالة (بوابة، تصنيف، قالب)]] حذف [[" + word + ":" + template_from + "]] " - ) - page.status = Status.COMPLETED - session.commit() - except Exception as e: - print(f"An error occurred where save : {e}") - just_the_string = traceback.format_exc() - print(just_the_string) - session.rollback() + pages = session.query(Request_Move_Page).filter(Request_Move_Page.status == Status.PENDING).order_by( + Request_Move_Page.id.desc()).limit(1).all() + + for page in pages: + try: + print(page) + page.status = Status.COMPLETED + session.commit() + except Exception as e: + print(f"An error occurred where save : {e}") + just_the_string = traceback.format_exc() + print(just_the_string) + session.rollback() except Exception as e: print(f"An error occurred: {e}") From 24c690e80c5fc7dbbf95cab9237bb5096805bbc1 Mon Sep 17 00:00:00 2001 From: lokas Date: Thu, 24 Aug 2023 12:56:21 +0300 Subject: [PATCH 10/10] some fix --- tasks/requests/move/bot/models.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/tasks/requests/move/bot/models.py b/tasks/requests/move/bot/models.py index f3c0ccbe..a69e34a1 100644 --- a/tasks/requests/move/bot/models.py +++ b/tasks/requests/move/bot/models.py @@ -365,21 +365,20 @@ def check_format(self): """ tem_wiki_text = self.wiki_text.replace("{{/مقدمة}}", "").strip() status = True + regex = re.compile( + r"\*\s*\[\[(?P.*):(?P.*)\]\](?P.*\>.*)\[\[(?P.*):(?P.*)\]\]") # loop line by line for line in tem_wiki_text.split("\n"): - regex = re.compile( - r"\*\s*\[\[(?P.*):(?P.*)\]\](?P.*\>.*)\[\[(?P.*):(?P.*)\]\]") - # to skip empty lines if line.strip() == "": continue - - if regex.match(line.strip()): + line_regex = regex.match(line.strip()) + if line_regex: # print from_ns, source, to_ns, destination - from_ns = regex.match(line).group("from_ns") - source = regex.match(line).group("source") - to_ns = regex.match(line).group("to_ns") - destination = regex.match(line).group("destination") + from_ns = line_regex.group("from_ns") + source = line_regex.group("source") + to_ns = line_regex.group("to_ns") + destination = line_regex.group("destination") # todo: add list of namespaces that only allowed to move self.list.append({ "from_ns": from_ns,