Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions apps/_scaffold/common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
"""
This file defines cache, session, and translator T object for the app
These are fixtures that every app needs so probably you will not be editing this file
This module sets up core fixtures and utilities for a py4web application, including:

- Logging: Configures a custom logger using application settings.
- Database: Connects to the database using settings from the configuration.
- Caching: Instantiates a cache object for use throughout the app.
- Translation: Sets up the translation object for internationalization.
- Session Management: Selects and configures the session backend (cookies, Redis, Memcache, or database) based on settings.
- Authentication: Initializes the Auth object, configures its parameters, and defines authentication tables and actions.
- Email: Configures the email sender for authentication-related emails if SMTP settings are provided.
- User Groups: Sets up tagging for user groups if the authentication database is available.
- Auth Plugins: Optionally registers authentication plugins (PAM, LDAP, OAuth2 for Google, GitHub, Facebook, Okta) based on settings.
- File Download: Defines an action for downloading uploaded files if an upload folder is specified.
- Scheduler: Optionally starts a background scheduler for running tasks if enabled in settings.
- Decorators: Provides convenience action factories for authenticated and unauthenticated routes.

This file is intended to be a foundational part of the application and typically does not require modification.
"""

import os
Expand Down Expand Up @@ -47,7 +61,7 @@
session = Session(secret=settings.SESSION_SECRET_KEY)

elif settings.SESSION_TYPE == "redis":
import redis
import redis # type: ignore[reportMissingImports]

host, port = settings.REDIS_SERVER.split(":")
# for more options: https://github.com/andymccurdy/redis-py/blob/master/redis/client.py
Expand All @@ -62,7 +76,7 @@
elif settings.SESSION_TYPE == "memcache":
import time

import memcache
import memcache # type: ignore[reportMissingImports]

conn = memcache.Client(settings.MEMCACHE_CLIENTS, debug=0)
session = Session(secret=settings.SESSION_SECRET_KEY, storage=conn)
Expand Down
2 changes: 1 addition & 1 deletion apps/_scaffold/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,6 @@

# try import private settings
try:
from .settings_private import *
from .settings_private import * # type: ignore[reportMissingImports]
except (ImportError, ModuleNotFoundError):
pass
98 changes: 45 additions & 53 deletions py4web/utils/auth_plugins/ldap_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ class LDAPPlugin(UsernamePassword):
base_dn='ou=Users,dc=domain,dc=com',
tls=True))

If you need to bind to the directory with an admin account in order to
search it then specify bind_dn & bind_pw to use for this.
- currently only implemented for Active Directory
If you need to bind to the directory with a bind account in order to
search it, then specify bind_dn & bind_pw to use for this.
- currently only implemented for Active Directory where anonymous bind
is normally not allowed.

If you need to restrict the set of allowed users (e.g. to members of a
department) then specify an rfc4515 search filter string.
Expand All @@ -88,7 +89,12 @@ class LDAPPlugin(UsernamePassword):
))

Where:
manage_user - let web2py handle user data from ldap
manage_user: bool
If True py4web will fetch and update user profile
fields (first name, last name, email) from LDAP/AD on each login and
keep them in sync with its db.
If False, only authentication is performed and user profile fields
are taken only from py4web db.
user_firstname_attrib - the attribute containing the user's first name
optionally you can specify parts.
Example: cn: "John Smith" - 'cn:1'='John'
Expand All @@ -112,7 +118,7 @@ class LDAPPlugin(UsernamePassword):
))

Where:
manage_groups - let web2py handle the groups from ldap
manage_groups - let py4web handle the groups from ldap
db - is the database object (need to have auth_user, auth_group,
auth_membership)
group_dn - the ldap branch of the groups
Expand Down Expand Up @@ -140,10 +146,8 @@ class LDAPPlugin(UsernamePassword):
group_filterstr - as the filterstr but for group select

If using Active Directory you must specify bind_dn and bind_pw for
allowed_groups unless anonymous bind works.
allowed_groups because anonymous bind is not normally allowed.

You can set the logging level with the "logging_level" parameter, default
is "error" and can be set to error, warning, info, debug.
"""

def __init__(
Expand Down Expand Up @@ -237,8 +241,8 @@ def check_credentials(self, username, password):
logger.warning("blank password not allowed")
return False
logger.debug(
"mode: [%s] manage_user: [%s] custom_scope: [%s] manage_groups: [%s]"
% (str(mode), str(manage_user), str(custom_scope), str(manage_groups))
f"mode: {str(mode)}, manage_user: {str(manage_user)}, \
custom_scope: {str(custom_scope)}, manage_groups: {str(manage_groups)}" \
)
if manage_user:
if user_firstname_attrib.count(":") > 0:
Expand Down Expand Up @@ -275,7 +279,7 @@ def check_credentials(self, username, password):
for x in base_dn.split(","):
if "DC=" in x.upper():
domain.append(x.split("=")[-1])
username = "%s@%s" % (username, ".".join(domain))
username = f"{username}@{".".join(domain)}"
username_bare = username.split("@")[0]
con.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
# In cases where ForestDnsZones and DomainDnsZones are found,
Expand All @@ -299,14 +303,14 @@ def check_credentials(self, username, password):
result = con.search_ext_s(
base_dn,
ldap.SCOPE_SUBTREE,
"(&(sAMAccountName=%s)(%s))"
% (ldap.filter.escape_filter_chars(username_bare), filterstr),
f"(&(sAMAccountName={ldap.filter.escape_filter_chars(username_bare)})({filterstr}))",
requested_attrs,
)[0][1]
logger.info(f"Login result: {result}")
if not isinstance(result, dict):
# result should be a dict in the form
# {'sAMAccountName': [username_bare]}
logger.warning("User [%s] not found!" % username)
logger.warning(f"User {username} not found!")
return False
if bind_dn:
# We know the user exists & is in the correct OU
Expand Down Expand Up @@ -347,7 +351,7 @@ def check_credentials(self, username, password):
con.simple_bind_s(bind_dn, bind_pw)
dn = "uid=" + username + "," + base_dn
dn = con.search_s(
base_dn, ldap.SCOPE_SUBTREE, "(uid=%s)" % username, [""]
base_dn, ldap.SCOPE_SUBTREE, f"(uid={username, [""]})"
)[0][0]
else:
dn = "uid=" + username + "," + base_dn
Expand All @@ -367,10 +371,7 @@ def check_credentials(self, username, password):
# bind anonymously
con.simple_bind_s(dn, pw)
# search by e-mail address
filter = "(&(mail=%s)(%s))" % (
ldap.filter.escape_filter_chars(username),
filterstr,
)
filter = f"(&(mail={ldap.filter.escape_filter_chars(username)})({filterstr}))"
# find the uid
attrs = ["uid"]
if manage_user:
Expand All @@ -392,10 +393,7 @@ def check_credentials(self, username, password):
basedns = base_dn
else:
basedns = [base_dn]
filter = "(&(uid=%s)(%s))" % (
ldap.filter.escape_filter_chars(username),
filterstr,
)
filter = f"(&(uid={ldap.filter.escape_filter_chars(username)})({filterstr}))"
found = False
for basedn in basedns:
try:
Expand All @@ -409,11 +407,10 @@ def check_credentials(self, username, password):
except ldap.LDAPError:
(exc_type, exc_value) = sys.exc_info()[:2]
logger.warning(
"ldap_auth: searching %s for %s resulted in %s: %s\n"
% (basedn, filter, exc_type, exc_value)
f"ldap_auth: searching {basedn} for {filter} resulted in {exc_type}: {exc_value}\n"
)
if not found:
logger.warning("User [%s] not found!" % username)
logger.warning(f"User [{username}] not found!")
return False
result = result[0][1]
if mode == "custom":
Expand All @@ -423,11 +420,7 @@ def check_credentials(self, username, password):
basedns = base_dn
else:
basedns = [base_dn]
filter = "(&(%s=%s)(%s))" % (
username_attrib,
ldap.filter.escape_filter_chars(username),
filterstr,
)
filter = f"(&({username_attrib}={ldap.filter.escape_filter_chars(username)})({filterstr}))"
if custom_scope == "subtree":
scope = ldap.SCOPE_SUBTREE
elif custom_scope == "base":
Expand All @@ -447,15 +440,14 @@ def check_credentials(self, username, password):
except ldap.LDAPError:
(exc_type, exc_value) = sys.exc_info()[:2]
logger.warning(
"ldap_auth: searching %s for %s resulted in %s: %s\n"
% (basedn, filter, exc_type, exc_value)
f"ldap_auth: searching {basedn} for {filter} resulted in {exc_type}: {exc_value}\n"
)
if not found:
logger.warning("User [%s] not found!" % username)
logger.warning(f"User {username} not found!")
return False
result = result[0][1]
if manage_user:
logger.info("[%s] Manage user data" % str(username))
logger.info(f"[{str(username)}] Manage user data")
try:
store_sso_id = "ldap:" + username
user_firstname = result[user_firstname_attrib][0]
Expand Down Expand Up @@ -526,14 +518,14 @@ def check_credentials(self, username, password):
except ldap.LDAPError:
import traceback

logger.warning("[%s] Error in ldap processing" % str(username))
logger.warning(f"[{str(username)}] Error in Ldap processing")
logger.debug(traceback.format_exc())
print(traceback.format_exc())
return False
except IndexError: # for AD membership test
import traceback

logger.warning("[%s] Ldap result indexing error" % str(username))
logger.warning(f"[{str(username)}] Ldap result indexing error")
logger.debug(traceback.format_exc())
return False

Expand Down Expand Up @@ -573,7 +565,7 @@ def do_manage_groups(self, con, username, group_mapping={}):
logger = self.logger
groups = self.groups

logger.info("[%s] Manage user groups" % str(username))
logger.info(f"[{str(username)}] Manage user groups")
try:
#
# Get all group name where the user is in actually in ldap
Expand All @@ -586,7 +578,7 @@ def do_manage_groups(self, con, username, group_mapping={}):
if group in group_mapping:
l.append(group_mapping[group])
ldap_groups_of_the_user = l
logger.info("User groups after remapping: %s" % str(l))
logger.info(f"User groups after remapping: {str(l)}")

#
# Get all group name where the user is in actually in local db
Expand Down Expand Up @@ -620,7 +612,7 @@ def do_manage_groups(self, con, username, group_mapping={}):
email=username, first_name=username
)
if not db_user_id:
logger.error("There is no username or email for %s!" % username)
logger.error(f"There is no username or email for {username}!")
raise
db_groups_of_the_user = groups.get(db_user_id)

Expand All @@ -645,7 +637,7 @@ def do_manage_groups(self, con, username, group_mapping={}):
for callback in manage_groups_callback:
callback()
except:
logger.warning("[%s] Groups are not managed successfully!" % str(username))
logger.warning(f"[{str(username)}] Groups are not managed successfully!")
import traceback

logger.debug(traceback.format_exc())
Expand All @@ -654,7 +646,7 @@ def do_manage_groups(self, con, username, group_mapping={}):

def _init_ldap(self):
"""
Inicialize ldap connection
Inizialize ldap connection
"""

server = self.server
Expand All @@ -668,11 +660,12 @@ def _init_ldap(self):
tls = self.tls
logger = self.logger

logger.info("[%s] Initialize ldap connection" % str(server))
logger.debug(f"[{str(server)}] Initialize ldap connection")
if secure:
if not port:
port = 636

logger.debug(f"Using Secure LDAP connection to {server}:{port}")
if self_signed_certificate:
# NOTE : If you have a self-signed SSL Certificate pointing over "port=686" and "secure=True" alone
# will not work, you need also to set "self_signed_certificate=True".
Expand All @@ -695,7 +688,8 @@ def _init_ldap(self):
else:
if not port:
port = 389
con = ldap.initialize("ldap://" + server + ":" + str(port))
logger.debug(f"Initializing LDAP connection to ldap://{server}:{str(port)}")
con = ldap.initialize(f"ldap://{server}:{str(port)}")
if tls:
con.start_tls_s()
return con
Expand All @@ -719,7 +713,7 @@ def get_user_groups_from_ldap(self, username=None, password=None):
if username is None:
return []

logger.info("[%s] Get user groups from ldap" % str(username))
logger.info(f"[{str(username)}] Get user groups from ldap")
#
# Get all group name where the user is in actually in ldap
# #########################################################
Expand All @@ -732,7 +726,7 @@ def get_user_groups_from_ldap(self, username=None, password=None):
for x in base_dn.split(","):
if "DC=" in x.upper():
domain.append(x.split("=")[-1])
username = "%s@%s" % (username, ".".join(domain))
username = f"{username}@{".".join(domain)}"
username_bare = username.split("@")[0]
con = self._init_ldap()
con.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
Expand All @@ -754,19 +748,17 @@ def get_user_groups_from_ldap(self, username=None, password=None):
username = con.search_ext_s(
base_dn,
ldap.SCOPE_SUBTREE,
"(&(sAMAccountName=%s)(%s))" % (bare, filterstr),
f"(&(sAMAccountName={bare})({filterstr}))",
["cn"],
)[0][0]

# if username is None, return empty list
if username is None:
return []
# search for groups where user is in
filter = "(&(%s=%s)(%s))" % (
ldap.filter.escape_filter_chars(group_member_attrib),
ldap.filter.escape_filter_chars(username),
group_filterstr,
)
filter = f"(&({ldap.filter.escape_filter_chars(group_member_attrib)}=\
{ldap.filter.escape_filter_chars(username)})({group_filterstr}))"

group_search_result = con.search_s(
group_dn, ldap.SCOPE_SUBTREE, filter, [group_name_attrib]
)
Expand All @@ -778,5 +770,5 @@ def get_user_groups_from_ldap(self, username=None, password=None):
str(group[group_name_attrib][0], encoding="utf-8")
)

logger.debug("User groups: %s" % ldap_groups_of_the_user)
logger.debug(f"User groups for {username}: {ldap_groups_of_the_user}")
return list(ldap_groups_of_the_user)