Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#
# It is implemented as a Neutron/ML2 mechanism driver.
import contextlib
from datetime import datetime, timedelta
import os
import re
import threading
Expand Down Expand Up @@ -92,6 +93,9 @@
# The default interval between periodic resyncs, in seconds.
DEFAULT_RESYNC_INTERVAL_SECS = 60

# The default maximum interval between resync completions, in seconds.
DEFAULT_RESYNC_MAX_INTERVAL_SECS = 3600

calico_opts = [
cfg.IntOpt(
"num_port_status_threads",
Expand Down Expand Up @@ -135,6 +139,14 @@
" server starts or is restarted."
),
),
cfg.IntOpt(
"resync_max_interval_secs",
default=DEFAULT_RESYNC_MAX_INTERVAL_SECS,
help=(
"Calico will log an error if the interval between periodic"
" resync completions surpasses this maximum (in seconds)."
),
),
]
cfg.CONF.register_opts(calico_opts, "calico")

Expand Down Expand Up @@ -323,6 +335,9 @@ def __init__(self):
# safe to compare this with other values returned by monotonic_time().
self._last_status_queue_log_time = monotonic_time()

# Last resync completion time
self.last_resync_time = datetime.now()

# Tell the monkeypatch where we are.
global mech_driver
assert mech_driver is None
Expand Down Expand Up @@ -438,6 +453,7 @@ def _post_fork_init(self):
# We deliberately do this last, to ensure that all of the setup
# above is complete before we start running.
self._epoch += 1
eventlet.spawn(self.resync_monitor_thread, self._epoch)
eventlet.spawn(self.periodic_resync_thread, self._epoch)
if cfg.CONF.calico.etcd_compaction_period_mins > 0:
eventlet.spawn(self.periodic_compaction_thread, self._epoch)
Expand Down Expand Up @@ -1068,6 +1084,46 @@ def _update_port(self, plugin_context, port):
LOG.info("Port unbound, attempting delete if needed.")
self.endpoint_syncer.delete_endpoint(port)

def resync_monitor_thread(self, launch_epoch):
"""Monitor the interval between completed resyncs.

Logs an error if the period resync duration surpasses
the configured maximum time in seconds.
"""
try:
LOG.info("Resync monitor thread started")

while self._epoch == launch_epoch:
# Only monitor the resync if we are the master node.
if self.elector.master():
LOG.info("I am master: monitoring periodic resync")

curr_time = datetime.now()
time_delta = curr_time - self.last_resync_time
if time_delta.seconds > cfg.CONF.calico.resync_max_interval_secs:
LOG.error(
"The time since the last resync completion has surpassed"
f" {cfg.CONF.calico.resync_max_interval_secs} seconds"
)

deadline = self.last_resync_time + timedelta(
seconds=cfg.CONF.calico.resync_max_interval_secs
)
time_left = (deadline - curr_time).seconds
polling_rate = cfg.CONF.calico.resync_max_interval_secs / 5
sleep_time = time_left if deadline > curr_time else polling_rate
eventlet.sleep(sleep_time)
else:
LOG.debug("I am not master")
eventlet.sleep(MASTER_CHECK_INTERVAL_SECS)
except Exception:
# TODO(nj) Should we tear down the process.
LOG.exception("Resync monitor thread died!")
if self.elector:
# Stop the elector so that we give up the mastership.
self.elector.stop()
raise

def periodic_resync_thread(self, launch_epoch):
"""Periodic Neutron DB -> etcd resynchronization logic.

Expand All @@ -1082,6 +1138,7 @@ def periodic_resync_thread(self, launch_epoch):
# Only do the resync if we are the master node.
if self.elector.master():
LOG.info("I am master: doing periodic resync")
start_time = datetime.now()

# Since this thread is not associated with any particular
# request, we use our own admin context for accessing the
Expand All @@ -1103,6 +1160,13 @@ def periodic_resync_thread(self, launch_epoch):

# Resync ClusterInformation and FelixConfiguration.
self.provide_felix_config()

# mark this resync as finished.
self.last_resync_time = datetime.now()
LOG.info(
"The periodic resync finished after"
f" {self.last_resync_time - start_time}"
)
except Exception:
LOG.exception("Error in periodic resync thread.")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2026 Tigera, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
networking_calico.plugins.ml2.drivers.calico.test.test_monitor_thread

Unit tests for the thread that monitors the periodic resync thread.
"""
from datetime import datetime, timedelta
import mock
import unittest

import networking_calico.plugins.ml2.drivers.calico.test.lib as lib
from networking_calico.plugins.ml2.drivers.calico import mech_calico


INITIAL_EPOCH = 0
TEST_MAX_INTERVAL = 30


class TestResyncMonitorThread(lib.Lib, unittest.TestCase):
"""Tests for the driver's resync monitor thread logic."""

def setUp(self):
super(TestResyncMonitorThread, self).setUp()

# thread logic mocks
self.driver.elector = mock.Mock()
self.sleep_patcher = mock.patch("eventlet.sleep")
self.mock_sleep = self.sleep_patcher.start()

# log mocks
self.log_error = mock.patch.object(mech_calico.LOG, "error").start()
self.log_info = mock.patch.object(mech_calico.LOG, "info").start()
self.log_debug = mock.patch.object(mech_calico.LOG, "debug").start()

# resync mocks
self.driver.subnet_syncer = mock.Mock()
self.driver.policy_syncer = mock.Mock()
self.driver.endpoint_syncer = mock.Mock()
self.driver.provide_felix_config = mock.Mock()

def tearDown(self):
self.sleep_patcher.stop()
super(TestResyncMonitorThread, self).tearDown()

def simulate_epoch_progression(self, expected_sleep_time=None):
def increment_epoch(actual_sleep_time):
if expected_sleep_time is not None:
assert expected_sleep_time == actual_sleep_time
self.driver._epoch += 1

return increment_epoch

def test_monitor_does_nothing_when_not_master(self):
"""Test that a driver that is not master does not monitor."""
self.driver.elector.master.return_value = False
self.mock_sleep.side_effect = self.simulate_epoch_progression()

self.driver.resync_monitor_thread(INITIAL_EPOCH)

self.log_debug.assert_called_once_with("I am not master")
self.log_error.assert_not_called()

def test_monitor_logs_error_when_over_max(self):
"""Test that an error is logged when interval surpasses maximum."""
lib.m_oslo_config.cfg.CONF.calico.resync_max_interval_secs = TEST_MAX_INTERVAL
self.driver.elector.master.return_value = True
fake_resync_time = datetime.now() - timedelta(seconds=TEST_MAX_INTERVAL + 1)
self.driver.last_resync_time = fake_resync_time
self.mock_sleep.side_effect = self.simulate_epoch_progression()

self.driver.resync_monitor_thread(INITIAL_EPOCH)

self.log_error.assert_called_once()
self.assertIn(
"The time since the last resync completion has surpassed",
self.log_error.call_args[0][0],
)

def test_monitor_no_error_if_interval_under_max(self):
"""If interval is below max, no error should be logged."""
lib.m_oslo_config.cfg.CONF.calico.resync_max_interval_secs = TEST_MAX_INTERVAL
self.driver.elector.master.return_value = True
self.mock_sleep.side_effect = self.simulate_epoch_progression()

self.driver.resync_monitor_thread(INITIAL_EPOCH)

self.log_error.assert_not_called()

def test_monitor_exception_stops_elector(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one feels like it would belong better in test_election.py - WDYT? (It doesn't have any resync-related detail.)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it belongs where it is because it calls and tests the monitor thread. Plus, it does not test any of the actual election logic (the actual elector is entirely mocked out), it just checks that the function is called.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry, you're right. I must have misread before.

"""On unexpected exception, elector.stop() must be called."""
self.driver.elector.master.return_value = True

with mock.patch.object(self.driver, "elector") as mock_elector:
mock_elector.master.side_effect = Exception("Test exception")

with self.assertRaises(Exception):
self.driver.resync_monitor_thread(INITIAL_EPOCH)

mock_elector.stop.assert_called_once()

def test_resync_resets_time(self):
"""Test that resync resets current interval duration to below max."""
lib.m_oslo_config.cfg.CONF.calico.resync_max_interval_secs = TEST_MAX_INTERVAL
self.driver.elector.master.return_value = True
fake_resync_time_time = datetime.now() - timedelta(
seconds=TEST_MAX_INTERVAL + 1
)
self.driver.last_resync_time = fake_resync_time_time

self.mock_sleep.side_effect = self.simulate_epoch_progression()
self.driver.resync_monitor_thread(INITIAL_EPOCH)

self.log_error.assert_called_once()
self.assertIn(
"The time since the last resync completion has surpassed",
self.log_error.call_args[0][0],
)

# Resync
self.mock_sleep.side_effect = self.simulate_epoch_progression()
self.driver.periodic_resync_thread(INITIAL_EPOCH + 1)

self.mock_sleep.side_effect = self.simulate_epoch_progression()
self.driver.resync_monitor_thread(INITIAL_EPOCH + 2)

self.log_error.assert_called_once()

def test_errors_continue_to_log(self):
"""Test that errors continue logging if resync does not occur."""
lib.m_oslo_config.cfg.CONF.calico.resync_max_interval_secs = TEST_MAX_INTERVAL
self.driver.elector.master.return_value = True
fake_resync_time_time = datetime.now() - timedelta(
seconds=TEST_MAX_INTERVAL + 1
)
self.driver.last_resync_time = fake_resync_time_time

self.mock_sleep.side_effect = self.simulate_epoch_progression()
self.driver.resync_monitor_thread(INITIAL_EPOCH)

self.log_error.assert_called_once()
self.assertIn(
"The time since the last resync completion has surpassed",
self.log_error.call_args[0][0],
)

self.mock_sleep.side_effect = self.simulate_epoch_progression()
self.driver.resync_monitor_thread(INITIAL_EPOCH + 1)

self.assertEqual(self.log_error.call_count, 2)
self.assertIn(
"The time since the last resync completion has surpassed",
self.log_error.call_args[0][0],
)

@mock.patch("networking_calico.plugins.ml2.drivers.calico.mech_calico.datetime")
def test_sleep_time_logic_before_deadline(self, mock_datetime):
"""Test that we sleep until deadline if there is time left."""
lib.m_oslo_config.cfg.CONF.calico.resync_max_interval_secs = TEST_MAX_INTERVAL
self.driver.elector.master.return_value = True

curr_time = datetime.now()
self.driver.last_resync_time = curr_time
expected_sleep_time = TEST_MAX_INTERVAL
mock_datetime.now.return_value = curr_time

self.mock_sleep.side_effect = self.simulate_epoch_progression(
expected_sleep_time
)
self.driver.resync_monitor_thread(INITIAL_EPOCH)

def test_sleep_time_logic_after_deadline(self):
"""Test that we poll if the deadline has passed."""
lib.m_oslo_config.cfg.CONF.calico.resync_max_interval_secs = TEST_MAX_INTERVAL
self.driver.elector.master.return_value = True

fake_resync_time = datetime.now() - timedelta(seconds=TEST_MAX_INTERVAL + 1)
self.driver.last_resync_time = fake_resync_time
expected_sleep_time = TEST_MAX_INTERVAL / 5

self.mock_sleep.side_effect = self.simulate_epoch_progression(
expected_sleep_time
)
self.driver.resync_monitor_thread(INITIAL_EPOCH)