Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to
[Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [unreleased]

### Added
- auth: Allow rebind connection with service bind credentials after successful
user authentication to retrieve user information and groups (#57).
Contribution from @Cornelicorn.

## [1.4.0] - 2025-04-11

### Added
Expand Down
33 changes: 32 additions & 1 deletion src/authentication/rfl/authentication/ldap.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def __init__(
bind_password: Optional[str] = None,
restricted_groups: Optional[List[str]] = None,
lookup_user_dn: bool = False,
lookup_as_user: Optional[bool] = True,
):
self.uri = uri
self.cacert = cacert
Expand All @@ -56,6 +57,26 @@ def __init__(
self.bind_password = bind_password
self.restricted_groups = restricted_groups
self.lookup_user_dn = lookup_user_dn
# The lookup_as_user attribute is a boolean to control which bind dn is used to
# retrieve user information and user groups after successful authentication.
#
# When True (default), LDAP connection is kept after authentication to use user
# permissions.
#
# When False, LDAP connection is closed, another LDAP connection is opened. This
# connection is binded with service bind dn and password when defined. When not
# defined, user information and groups are retrieved anonymously.
#
# When the lookup_as_user argument is not defined, it is enabled when either
# bind dn or password are not defined, and disabled otherwise. When argument
# value is True or False, this value is directly used.
if lookup_as_user is None:
if self.bind_dn is None or self.bind_password is None:
self.lookup_as_user = True
else:
self.lookup_as_user = False
else:
self.lookup_as_user = lookup_as_user

def connection(self):
connection = ldap.initialize(self.uri.geturl())
Expand Down Expand Up @@ -333,8 +354,18 @@ def login(self, user: str, password: str) -> AuthenticatedUser:

connection = self.connection()
try:
# Try simple authentication with user DN and password on LDAP directory
# Bind with user password to test their credentials
connection.simple_bind_s(user_dn, password)

# Unless lookup as user is enabled, close current connection with
# authenticated user credentials, open another connection and bind with
# service dn (if defined).
if not self.lookup_as_user:
logger.debug("Re-initialize LDAP connection to avoid lookup as user")
connection.unbind_s()
connection = self.connection()
self._bind(connection)

fullname, gid = self._get_user_info(connection, user_dn)
groups = self._get_groups(connection, user, user_dn, gid)
except ldap.SERVER_DOWN as err:
Expand Down
125 changes: 125 additions & 0 deletions src/authentication/rfl/tests/test_ldap.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,59 @@ def test_login_errors(self, mock_simple_bind_s, mock_lookup_user_dn):
):
self.authentifier.login("john", "SECR3T")

@patch.object(LDAPAuthentifier, "_get_groups")
@patch.object(LDAPAuthentifier, "_get_user_info")
@patch.object(LDAPAuthentifier, "_lookup_user_dn")
@patch.object(LDAPAuthentifier, "_bind")
@patch.object(LDAPAuthentifier, "connection")
def test_login_no_bind_lookup_as_user_true(
self,
mock_connection,
mock_bind,
mock_lookup_user_dn,
mock_get_user_info,
mock_get_groups,
):
# setup mocks return values
mock_get_groups.return_value = ["group1", "group2"]
mock_get_user_info.return_value = ("John Doe", 42)
mock_lookup_user_dn.return_value = "uid=john,ou=people,dc=corp,dc=org"
mock_connection.return_value.simple_bind_s.return_value = None

# if lookup_as_user is False, login() must not call _bind().
self.authentifier.lookup_as_user = True
self.authentifier.login("john", "SECR3T")
mock_bind.assert_not_called()
mock_connection.return_value.simple_bind_s.assert_called_once_with(
"uid=john,ou=people,dc=corp,dc=org", "SECR3T"
)

@patch.object(LDAPAuthentifier, "_get_groups")
@patch.object(LDAPAuthentifier, "_get_user_info")
@patch.object(LDAPAuthentifier, "_lookup_user_dn")
@patch.object(LDAPAuthentifier, "_bind")
@patch.object(LDAPAuthentifier, "connection")
def test_login_single_bind_lookup_as_user_false(
self,
mock_connection,
mock_bind,
mock_lookup_user_dn,
mock_get_user_info,
mock_get_groups,
):
# setup mocks return values
mock_get_groups.return_value = ["group1", "group2"]
mock_get_user_info.return_value = ("John Doe", 42)
mock_lookup_user_dn.return_value = "uid=john,ou=people,dc=corp,dc=org"

# if lookup_as_user is False, login() should call _bind() once.
self.authentifier.lookup_as_user = False
self.authentifier.login("john", "SECR3T")
mock_bind.assert_called_once()
mock_connection.return_value.simple_bind_s.assert_called_once_with(
"uid=john,ou=people,dc=corp,dc=org", "SECR3T"
)

def test_user_info(self):
connection = Mock(spec=ldap.ldapobject.LDAPObject)
connection.search_s.return_value = [
Expand Down Expand Up @@ -875,3 +928,75 @@ def test_users_ldap_operations_error(self, mock_search_s):
r"^Operations error on users search: fail$",
):
self.authentifier.users()


class TestLDAPAuthentifierInit(unittest.TestCase):
def test_lookup_as_user_auto_bind_dn(self):
# lookup_as_user is None, bind_dn and bind_password are set, should do lookup
# with service credentials.
auth = LDAPAuthentifier(
uri=urllib.parse.urlparse("ldap://localhost"),
user_base="ou=people,dc=corp,dc=org",
group_base="ou=groups,dc=corp,dc=org",
bind_dn="uid=read,ou=apps,dc=corp,dc=org",
bind_password="SECR3T",
lookup_as_user=None,
)
self.assertFalse(auth.lookup_as_user)

def test_lookup_as_user_auto_no_bind_user(self):
# lookup_as_user is None, bind_dn and bind_password are not set, should do
# lookup as user.
auth = LDAPAuthentifier(
uri=urllib.parse.urlparse("ldap://localhost"),
user_base="ou=people,dc=corp,dc=org",
group_base="ou=groups,dc=corp,dc=org",
bind_dn=None,
bind_password=None,
lookup_as_user=None,
)
self.assertTrue(auth.lookup_as_user)

def test_lookup_as_user_auto_no_bind_dn(self):
# lookup_as_user is None, bind_dn is not set, should do lookup as user.
auth = LDAPAuthentifier(
uri=urllib.parse.urlparse("ldap://localhost"),
user_base="ou=people,dc=corp,dc=org",
group_base="ou=groups,dc=corp,dc=org",
bind_dn=None,
bind_password="SECR3T",
lookup_as_user=None,
)
self.assertTrue(auth.lookup_as_user)

def test_lookup_as_user_auto_no_bind_password(self):
# lookup_as_user is None, bind_password is not set, should do lookup as user.
auth = LDAPAuthentifier(
uri=urllib.parse.urlparse("ldap://localhost"),
user_base="ou=people,dc=corp,dc=org",
group_base="ou=groups,dc=corp,dc=org",
bind_dn="uid=read,ou=apps,dc=corp,dc=org",
bind_password=None,
lookup_as_user=None,
)
self.assertTrue(auth.lookup_as_user)

def test_lookup_as_user_enabled(self):
# lookup_as_user is True, should do lookup as user.
auth = LDAPAuthentifier(
uri=urllib.parse.urlparse("ldap://localhost"),
user_base="ou=people,dc=corp,dc=org",
group_base="ou=groups,dc=corp,dc=org",
lookup_as_user=True,
)
self.assertTrue(auth.lookup_as_user)

def test_lookup_as_user_disabled(self):
# lookup_as_user is False, should do lookup with service credentials.
auth = LDAPAuthentifier(
uri=urllib.parse.urlparse("ldap://localhost"),
user_base="ou=people,dc=corp,dc=org",
group_base="ou=groups,dc=corp,dc=org",
lookup_as_user=False,
)
self.assertFalse(auth.lookup_as_user)
Loading