diff --git a/py4web/utils/form.py b/py4web/utils/form.py index 0c34a3163..64022a68e 100644 --- a/py4web/utils/form.py +++ b/py4web/utils/form.py @@ -1,5 +1,4 @@ import copy -import functools import os import time import uuid @@ -999,6 +998,8 @@ def index(): that the identity of the logged in user has not changed, you can do as below. signing_info = session.get('user', {}).get('id', '') The content of the field should be convertible to a string via json. + :param auto_process: bool - whether the form should automatically process POST data. If you set this to False, + you are responsible for calling form.process(). useful for custom logic in a `if self.submitted:` block. """ def __init__( @@ -1020,6 +1021,7 @@ def __init__( signing_info=None, submit_value="Submit", show_id=False, + auto_process=True, **kwargs, ): self.param = Param( @@ -1030,11 +1032,29 @@ def __init__( ) if isinstance(table, list): - dbio = False - # Mimic a table from a list of fields without calling define_table - form_name = form_name or "no_table" - for field in table: - field.tablename = getattr(field, "tablename", form_name) + if len(table) == 0: + raise ValueError("Cannot build form with empty list of fields") + # using _table to check if Field.bind was called + # and the field is bound to a table, since unlike `tablename`, `_table` is + # only set in Field.bind() + all_tablenames = list( + set( + str(getattr(field, "_table", None) or "no_table") for field in table + ) + ) + + # only disable dbio if the fields are from multiple tables + # this allows making forms for a subset of fields easily: + # Form([db.tbl.field1, db.tbl.field2]) + if len(all_tablenames) > 1 or all_tablenames[0] in "no_table": + dbio = False + # Mimic a table from a list of fields without calling define_table + form_name = form_name or "no_table" + for field in table: + field.tablename = getattr(field, "tablename", form_name) + else: + # if we just have 1 table, use it as the form name + form_name = form_name or all_tablenames[0] if isinstance(record, (int, str)): record_id = int(str(record)) @@ -1077,96 +1097,100 @@ def __init__( if self.record: self.vars = self._read_vars_from_record(table) - if not readonly: - try: - post_vars = request.GET if self.method == "GET" else request.POST - except KeyError: - post_vars = {} + try: + self.post_vars = request.GET if self.method == "GET" else request.POST + except KeyError: + self.post_vars = {} - try: - form_vars = copy.deepcopy(request.forms) - except KeyError: - form_vars = {} - for k in form_vars: - self.vars[k] = form_vars[k] - process = False - - # We only a process a form if it is POST and the formkey matches (correct formname and crsf) - # Notice: we never expose the crsf uuid, we only use to sign the form uuid - if post_vars: - self.submitted = True - if not self.csrf_protection or self._verify_form(post_vars): - process = True - - if process: - record_id = self.record and self.record.get("id") - if not post_vars.get("_delete"): - validated_vars = {} - uploaded_fields = set() - for field in self.table: - if field.writable and field.type != "id": - original_value = post_vars.get(field.name) - if isinstance(original_value, list): - if len(original_value) == 1: - original_value = original_value[0] - elif len(original_value) == 0: - original_value = None - (value, error) = field.validate(original_value, record_id) - if field.type == "password" and record_id and value is None: - continue - if field.type == "upload": - uploaded_fields.add(field.name) - value = request.files.get(field.name) - delete = post_vars.get("_delete_" + field.name) - if value is not None: - if field.uploadfield == True and field.uploadfolder: - validated_vars[field.name] = field.store( - value.file, - value.filename, - field.uploadfolder, - ) - elif field.uploadfield and field.db: - validated_vars[field.name] = field.store( - value.file, - value.filename, - field.uploadfolder, - ) - else: - validated_vars[field.name] = value - elif self.record: - if not delete: - validated_vars[field.name] = self.record.get( - field.name - ) - else: - validated_vars[field.name] = value = None - elif field.type == "boolean": - validated_vars[field.name] = value is not None - else: - validated_vars[field.name] = value - if error: - self.errors[field.name] = error - if self.errors: - for field_name in uploaded_fields: - validated_vars[field_name] = ( - self.record and self.record.get(field_name) or None - ) - self.vars.update(validated_vars) - if self.record and dbio: - self.vars["id"] = self.record.id - if validation: - validation(self) - if not self.errors: - self.accepted = True - if dbio: - self.update_or_insert(validated_vars) - elif dbio: - self.accepted = True - self.deleted = True - self.record.delete_record() + try: + form_vars = copy.deepcopy(request.forms) + except KeyError: + form_vars = {} + for k in form_vars: + self.vars[k] = form_vars[k] + + # We only a process a form if it is POST and the formkey matches (correct formname and crsf) + # Notice: we never expose the crsf uuid, we only use to sign the form uuid + if self.post_vars: + self.submitted = True + + if not readonly and auto_process: + self.process() if self.csrf_protection: self._sign_form() + def process(self): + if not self.submitted: + return + + if self.csrf_protection and not self._verify_form(self.post_vars): + return + + if self.post_vars.get("_delete") and self.dbio: + self.accepted = True + self.deleted = True + self.record.delete_record() + return + + record_id = self.record and self.record.get("id") + validated_vars = {} + uploaded_fields = set() + for field in self.table: + if field.writable and field.type != "id": + original_value = self.post_vars.get(field.name) + if isinstance(original_value, list): + if len(original_value) == 1: + original_value = original_value[0] + elif len(original_value) == 0: + original_value = None + (value, error) = field.validate(original_value, record_id) + if field.type == "password" and record_id and value is None: + continue + if field.type == "upload": + uploaded_fields.add(field.name) + value = request.files.get(field.name) + delete = self.post_vars.get("_delete_" + field.name) + if value is not None: + if field.uploadfield == True and field.uploadfolder: + validated_vars[field.name] = field.store( + value.file, + value.filename, + field.uploadfolder, + ) + elif field.uploadfield and field.db: + validated_vars[field.name] = field.store( + value.file, + value.filename, + field.uploadfolder, + ) + else: + validated_vars[field.name] = value + elif self.record: + if not delete: + validated_vars[field.name] = self.record.get(field.name) + else: + validated_vars[field.name] = value = None + elif field.type == "boolean": + validated_vars[field.name] = value is not None + else: + validated_vars[field.name] = value + if error: + self.errors[field.name] = error + if self.errors: + for field_name in uploaded_fields: + validated_vars[field_name] = ( + self.record and self.record.get(field_name) or None + ) + self.vars.update(validated_vars) + if self.record and self.dbio: + self.vars["id"] = self.record.id + if self.validation: + self.validation(self) + if not self.errors: + self.accepted = True + if self.dbio: + self.update_or_insert(validated_vars) + def _read_vars_from_record(self, table): if isinstance(table, list): # The table is just a list of fields.