Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 116 additions & 92 deletions py4web/utils/form.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import copy
import functools
import os
import time
import uuid
Expand Down Expand Up @@ -999,6 +998,8 @@ def index():
that the identity of the logged in user has not changed, you can do as below.
signing_info = session.get('user', {}).get('id', '')
The content of the field should be convertible to a string via json.
:param auto_process: bool - whether the form should automatically process POST data. If you set this to False,
you are responsible for calling form.process(). useful for custom logic in a `if self.submitted:` block.
"""

def __init__(
Expand All @@ -1020,6 +1021,7 @@ def __init__(
signing_info=None,
submit_value="Submit",
show_id=False,
auto_process=True,
**kwargs,
):
self.param = Param(
Expand All @@ -1030,11 +1032,29 @@ def __init__(
)

if isinstance(table, list):
dbio = False
# Mimic a table from a list of fields without calling define_table
form_name = form_name or "no_table"
for field in table:
field.tablename = getattr(field, "tablename", form_name)
if len(table) == 0:
raise ValueError("Cannot build form with empty list of fields")
# using _table to check if Field.bind was called
# and the field is bound to a table, since unlike `tablename`, `_table` is
# only set in Field.bind()
all_tablenames = list(
set(
str(getattr(field, "_table", None) or "no_table") for field in table
)
)

# only disable dbio if the fields are from multiple tables
# this allows making forms for a subset of fields easily:
# Form([db.tbl.field1, db.tbl.field2])
if len(all_tablenames) > 1 or all_tablenames[0] in "no_table":
dbio = False
# Mimic a table from a list of fields without calling define_table
form_name = form_name or "no_table"
for field in table:
field.tablename = getattr(field, "tablename", form_name)
else:
# if we just have 1 table, use it as the form name
form_name = form_name or all_tablenames[0]

if isinstance(record, (int, str)):
record_id = int(str(record))
Expand Down Expand Up @@ -1077,96 +1097,100 @@ def __init__(
if self.record:
self.vars = self._read_vars_from_record(table)

if not readonly:
try:
post_vars = request.GET if self.method == "GET" else request.POST
except KeyError:
post_vars = {}
try:
self.post_vars = request.GET if self.method == "GET" else request.POST
except KeyError:
self.post_vars = {}

try:
form_vars = copy.deepcopy(request.forms)
except KeyError:
form_vars = {}
for k in form_vars:
self.vars[k] = form_vars[k]
process = False

# We only a process a form if it is POST and the formkey matches (correct formname and crsf)
# Notice: we never expose the crsf uuid, we only use to sign the form uuid
if post_vars:
self.submitted = True
if not self.csrf_protection or self._verify_form(post_vars):
process = True

if process:
record_id = self.record and self.record.get("id")
if not post_vars.get("_delete"):
validated_vars = {}
uploaded_fields = set()
for field in self.table:
if field.writable and field.type != "id":
original_value = post_vars.get(field.name)
if isinstance(original_value, list):
if len(original_value) == 1:
original_value = original_value[0]
elif len(original_value) == 0:
original_value = None
(value, error) = field.validate(original_value, record_id)
if field.type == "password" and record_id and value is None:
continue
if field.type == "upload":
uploaded_fields.add(field.name)
value = request.files.get(field.name)
delete = post_vars.get("_delete_" + field.name)
if value is not None:
if field.uploadfield == True and field.uploadfolder:
validated_vars[field.name] = field.store(
value.file,
value.filename,
field.uploadfolder,
)
elif field.uploadfield and field.db:
validated_vars[field.name] = field.store(
value.file,
value.filename,
field.uploadfolder,
)
else:
validated_vars[field.name] = value
elif self.record:
if not delete:
validated_vars[field.name] = self.record.get(
field.name
)
else:
validated_vars[field.name] = value = None
elif field.type == "boolean":
validated_vars[field.name] = value is not None
else:
validated_vars[field.name] = value
if error:
self.errors[field.name] = error
if self.errors:
for field_name in uploaded_fields:
validated_vars[field_name] = (
self.record and self.record.get(field_name) or None
)
self.vars.update(validated_vars)
if self.record and dbio:
self.vars["id"] = self.record.id
if validation:
validation(self)
if not self.errors:
self.accepted = True
if dbio:
self.update_or_insert(validated_vars)
elif dbio:
self.accepted = True
self.deleted = True
self.record.delete_record()
try:
form_vars = copy.deepcopy(request.forms)
except KeyError:
form_vars = {}
for k in form_vars:
self.vars[k] = form_vars[k]

# We only a process a form if it is POST and the formkey matches (correct formname and crsf)
# Notice: we never expose the crsf uuid, we only use to sign the form uuid
if self.post_vars:
self.submitted = True

if not readonly and auto_process:
self.process()
if self.csrf_protection:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this now always run after process() even if auto_process is False?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, you're right

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, no, I was right originally.

The original code looks like this:

if not readonly:
    # [...]
    if post_vars:
        self.submitted = True
        if not self.csrf_protection or self._verify_form(post_vars):
            process = True
    if process:
        # [...]
if self.csrf_protection:
    self._sign_form()

We both probably got turned around by the if not self.csrf_protection - thats been moved into the new process method and rewritten to an early return ((not A) or B inverted to A and (not B))

So this is actually matching the original logic exactly. If CSRF is enabled, the form is always signed. What i can't say if that original logic was correct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this now always run after process() even if auto_process is False?

self._sign_form()

def process(self):
if not self.submitted:
return

if self.csrf_protection and not self._verify_form(self.post_vars):
return

if self.post_vars.get("_delete") and self.dbio:
self.accepted = True
self.deleted = True
self.record.delete_record()
return

record_id = self.record and self.record.get("id")
validated_vars = {}
uploaded_fields = set()
for field in self.table:
if field.writable and field.type != "id":
original_value = self.post_vars.get(field.name)
if isinstance(original_value, list):
if len(original_value) == 1:
original_value = original_value[0]
elif len(original_value) == 0:
original_value = None
(value, error) = field.validate(original_value, record_id)
if field.type == "password" and record_id and value is None:
continue
if field.type == "upload":
uploaded_fields.add(field.name)
value = request.files.get(field.name)
delete = self.post_vars.get("_delete_" + field.name)
if value is not None:
if field.uploadfield == True and field.uploadfolder:
validated_vars[field.name] = field.store(
value.file,
value.filename,
field.uploadfolder,
)
elif field.uploadfield and field.db:
validated_vars[field.name] = field.store(
value.file,
value.filename,
field.uploadfolder,
)
else:
validated_vars[field.name] = value
elif self.record:
if not delete:
validated_vars[field.name] = self.record.get(field.name)
else:
validated_vars[field.name] = value = None
elif field.type == "boolean":
validated_vars[field.name] = value is not None
else:
validated_vars[field.name] = value
if error:
self.errors[field.name] = error
if self.errors:
for field_name in uploaded_fields:
validated_vars[field_name] = (
self.record and self.record.get(field_name) or None
)
self.vars.update(validated_vars)
if self.record and self.dbio:
self.vars["id"] = self.record.id
if self.validation:
self.validation(self)
if not self.errors:
self.accepted = True
if self.dbio:
self.update_or_insert(validated_vars)

def _read_vars_from_record(self, table):
if isinstance(table, list):
# The table is just a list of fields.
Expand Down