diff --git a/simulation-container/simulations/simpleCSSFaultScenario.py b/simulation-container/simulations/simpleCSSFaultScenario.py new file mode 100644 index 0000000..8196e0e --- /dev/null +++ b/simulation-container/simulations/simpleCSSFaultScenario.py @@ -0,0 +1,361 @@ +import matplotlib.pyplot as plt +import numpy as np +import random +from utilities.CSSfault import CSSfault +import psycopg2 +from psycopg2 import sql +import json + +#utilities? +from Basilisk.architecture import messaging, sysModel +from Basilisk.utilities import macros +from Basilisk.utilities import orbitalMotion +from Basilisk.utilities import unitTestSupport +from Basilisk.utilities import RigidBodyKinematics as rbk + +from Basilisk.utilities import simIncludeGravBody + + +#simulation tools + +from Basilisk.simulation import spacecraft +from Basilisk.simulation import extForceTorque +from Basilisk.simulation import simpleNav + +from Basilisk.fswAlgorithms import attTrackingError +from Basilisk.fswAlgorithms import inertial3D +from Basilisk.fswAlgorithms import mrpFeedback +from Basilisk.simulation import coarseSunSensor + + +#general simulation initialization, i think +from Basilisk.utilities import SimulationBaseClass + +class SensorSunPos(sysModel.SysModel): + def __init__(self, sim, samplingTime, simTaskName, sensors, nav, inertial): + super().__init__() + self.sensors = sensors + self.navLog = nav.attOutMsg.recorder(samplingTime) + sim.AddModelToTask(simTaskName, self.navLog) + self.inertial = inertial + self.UpdateState(-1) + + def UpdateState(self, CurrentSimNanos): + weightedSum = [] + for i in self.sensors: + weightedSum.append((i.sensedValue - i.senBias) * np.array(i.nHat_B)) + weightedSum = np.sum(weightedSum, axis=0) + if np.linalg.norm(weightedSum): + v_sun_B = weightedSum / np.linalg.norm(weightedSum) + C_BN = rbk.MRP2C(self.navLog.sigma_BN[-1]) + v_sun = C_BN.T @ v_sun_B + theta = np.atan(v_sun[1] / v_sun[0]) + phi = np.atan(v_sun[2] / np.sqrt(v_sun[0]**2 + v_sun[1]**2)) * -1 + psi = 0 + euler = [theta, phi, psi] + orientation = rbk.euler3212MRP(euler) + self.sensedSun = [euler[0][0], euler[1][0], euler[2]] + self.inertial.sigma_R0N = orientation + else: + self.sensedSun = self.inertial.sigma_R0N + +def simulate(plot, CSSsun): + #a bunch of initializations + simTaskName = "sim city" + simProcessName = "mr. sim" + + satSim = SimulationBaseClass.SimBaseClass() + timestep = 5 + dynamics = satSim.CreateNewProcess(simProcessName) + simulationTimeStep = macros.sec2nano(timestep) + dynamics.addTask(satSim.CreateNewTask(simTaskName, simulationTimeStep)) + + satellite = spacecraft.Spacecraft() + satellite.ModelTag = "oops" + + + #satellite state definitions + inertia = [1000., 0., 0., + 0., 1000., 0., + 0., 0., 1000.] + + #note that all angular orientations (here and all throughout) are in MRPs + #angular velocities are in rad/s tho + #satellite mass + satellite.hub.mHub = 1000.0 + #distance from body frame origin to COM + satellite.hub.r_BcB_B= [[0.0], [0.0], [0.0]] + #adding inertia to the objectsatellite + satellite.hub.IHubPntBc_B = unitTestSupport.np2EigenMatrix3d(inertia) + #orientation of body frame relative to inertial + satellite.hub.sigma_BNInit = rbk.euler3212MRP([0, 1, 0]) + #ang velocity of body frame relative to inertial expressed in body frame coords + satellite.hub.omega_BN_BInit = [[0.01/ np.sqrt(3)], [-0.01 / np.sqrt(3)], [0.01 / np.sqrt(3)]] + + satSim.AddModelToTask(simTaskName, satellite) + + #gravity stuff - 2BP + gravity = simIncludeGravBody.gravBodyFactory() + earth = gravity.createEarth() + earth.isCentralBody = True + gravity.createSun() + + #spice log initialization - date is just cause that's what the example used + UTCInit = "2012 MAY 1 00:28:30.0" + spice = gravity.createSpiceInterface(time=UTCInit, epochInMsg=True) + satSim.AddModelToTask(simTaskName, spice) + + gravity.addBodiesTo(satellite) + + #orbits! + oe = orbitalMotion.ClassicElements() + + r = 7000. * 1000 + oe.a = r + oe.e = 0.000 #1 + oe.i = 0.0 #90.0 * macros.D2R + + oe.Omega = 0 #110 gets permanent illumination at i = 90 + oe.omega = 0 #90.0 * macros.D2R + oe.f = 0 #85.3 * macros.D2R + rN, vN = orbitalMotion.elem2rv(earth.mu, oe) + oe = orbitalMotion.rv2elem(earth.mu, rN, vN) #yea idk why this exists + + #more satellite initializations + #for some reason these are relative to planet, but the + #satellite log's aren't + satellite.hub.r_CN_NInit = rN + satellite.hub.v_CN_NInit = vN + + #sim time + n = np.sqrt(earth.mu / oe.a**3) + period = 2. * np.pi / n + simTime = macros.sec2nano(1 * period) + + #navigation module + nav = simpleNav.SimpleNav() + nav.ModelTag = "navigation" + satSim.AddModelToTask(simTaskName, nav) + nav.scStateInMsg.subscribeTo(satellite.scStateOutMsg) + nav.sunStateInMsg.subscribeTo(spice.planetStateOutMsgs[1]) + + #inertial reference attitude + inertial = inertial3D.inertial3D() + inertial.ModelTag = "inertial3D" + satSim.AddModelToTask(simTaskName, inertial) + inertial.sigma_R0N = [0., 0.5, 0.] + + + + #attitude error from reference + attError = attTrackingError.attTrackingError() + attError.ModelTag = "attErrorInertial3D" + attError.attNavInMsg.subscribeTo(nav.attOutMsg) + attError.attRefInMsg.subscribeTo(inertial.attRefOutMsg) + satSim.AddModelToTask(simTaskName, attError) + + control = mrpFeedback.mrpFeedback() + + control.ModelTag = "mrpFeedback" + if CSSsun: + satSim.AddModelToTask(simTaskName, control) + #parameters taken from scenarioAttitudeFeedbackRW + control.K = 3.5 + control.Ki = -1 #negative turns integral control off + control.P = 30.0 + control.integralLimit = 2. / control.Ki * 0.1 + + #external torque + + if CSSsun: + ext = extForceTorque.ExtForceTorque() + satellite.addDynamicEffector(ext) + satSim.AddModelToTask(simTaskName, ext) + ext.cmdTorqueInMsg.subscribeTo(control.cmdTorqueOutMsg) + + #some final module subscriptions + + #apparently mrpFeedback needs config info for the satellite + configData = messaging.VehicleConfigMsgPayload() + configData.ISCPntB_B = inertia + configDataMsg = messaging.VehicleConfigMsg() + configDataMsg.write(configData) + control.guidInMsg.subscribeTo(attError.attGuidOutMsg) + control.vehConfigInMsg.subscribeTo(configDataMsg) + + + #CSS stuff + def setup(CSS): + CSS.fov = 90. * macros.D2R + CSS.scaleFactor = 1.0 + CSS.maxOutput = 4.0 + CSS.minOutput = 0.0 + CSS.sunInMsg.subscribeTo(spice.planetStateOutMsgs[1]) + CSS.stateInMsg.subscribeTo(satellite.scStateOutMsg) + #CSS.sunEclipseInMsg.subscribeTo(eclipses.eclipseOutMsgs[0]) + CSS.nHat_B = np.array([1.0, 0.0, 0.0]) + + sensors = [] + loggers = [] + numCSS = 18 + for i in range(numCSS): + sensors.append(coarseSunSensor.CoarseSunSensor()) + setup(sensors[i]) + #sensors[i].senNoiseStd = i/500 + sensors[i].senBias = 0#i/4.0 + satSim.AddModelToTask(simTaskName, sensors[i]) + loggers.append(sensors[i].cssDataOutMsg.recorder()) + satSim.AddModelToTask(simTaskName, loggers[i]) + sensors[3].nHat_B = np.array([-1.0, 0.0, 0.0]) + sensors[4].nHat_B = np.array([-1.0, 0.0, 0.0]) + sensors[5].nHat_B = np.array([-1.0, 0.0, 0.0]) + sensors[6].nHat_B = np.array([0.0, 1.0, 0.0]) + sensors[7].nHat_B = np.array([0.0, 1.0, 0.0]) + sensors[8].nHat_B = np.array([0.0, 1.0, 0.0]) + sensors[9].nHat_B = np.array([0.0, -1.0, 0.0]) + sensors[10].nHat_B = np.array([0.0, -1.0, 0.0]) + sensors[11].nHat_B = np.array([0.0, -1.0, 0.0]) + sensors[12].nHat_B = np.array([0.0, 0.0, 1.0]) + sensors[13].nHat_B = np.array([0.0, 0.0, 1.0]) + sensors[14].nHat_B = np.array([0.0, 0.0, 1.0]) + sensors[15].nHat_B = np.array([0.0, 0.0, -1.0]) + sensors[16].nHat_B = np.array([0.0, 0.0, -1.0]) + sensors[17].nHat_B = np.array([0.0, 0.0, -1.0]) + for i in range(numCSS): + sensors[i].r_B = sensors[i].nHat_B + + #how often each logger samples + samplingTime = unitTestSupport.samplingTime(simTime, simulationTimeStep,\ + simTime / simulationTimeStep) + + cssfault = CSSfault(sensors, chance=0.0001) + satSim.AddModelToTask(simTaskName, cssfault) + + senseSun = SensorSunPos(satSim, samplingTime, simTaskName, sensors, nav, inertial) + if CSSsun: + satSim.AddModelToTask(simTaskName, senseSun) + + """data collection""" + + sensedSunLog = senseSun.logger("sensedSun") + satSim.AddModelToTask(simTaskName, sensedSunLog) + + faults = cssfault.logger("faultState") + satSim.AddModelToTask(simTaskName, faults) + + #true satellite states (translational and rotational position/velocity) + satLog = satellite.scStateOutMsg.recorder(samplingTime) + satSim.AddModelToTask(simTaskName, satLog) + + #technically a module for adding noise to sensors, but eh i use it for sun pointing + navLog = nav.attOutMsg.recorder(samplingTime) + satSim.AddModelToTask(simTaskName, navLog) + + #planet states (main planet and sun) + spiceLog = spice.planetStateOutMsgs[0].recorder(samplingTime) + satSim.AddModelToTask(simTaskName, spiceLog) + + #attitude error (from reference) + errorLog = attError.attGuidOutMsg.recorder(samplingTime) + satSim.AddModelToTask(simTaskName, errorLog) + + sunPoint = np.array(navLog.vehSunPntBdy) + + CSSdata = [] + for i in loggers: + CSSdata.append(i.OutputData) + CSSdata = np.array(CSSdata) + + sigma = np.array(satLog.sigma_BN) + + """SIMULATION""" + + satSim.SetProgressBar(True) + satSim.InitializeSimulation() + + satSim.TotalSim.SingleStepProcesses() + + satSim.ConfigureStopTime(simTime) + satSim.ExecuteSimulation() + + sensedSun = np.array(sensedSunLog.sensedSun) + + sunPoint = np.array(navLog.vehSunPntBdy) + + CSSdata = [] + for i in loggers: + CSSdata.append(i.OutputData) + CSSdata = np.array(CSSdata) + + sigma = np.array(satLog.sigma_BN) + + faults = np.array(faults.faultState) + + """PLOTTING""" + if plot: + plt.close("all") + + #pointing vector to the sun in the body frame + plt.figure(1) + timeAxis = satLog.times() + for i in range(3): + plt.plot(timeAxis * macros.NANO2SEC / period, sunPoint[:, i], + color=unitTestSupport.getLineColor(i, 3), + label=rf'$r_{i+1}$') + plt.title("Sun Direction (Body)") + plt.legend() + plt.xlabel("Time [orbits]") + plt.ylabel("Vector Component") + + #CSS sensor values, biases included + plt.figure(2, figsize=(20,len(CSSdata) / 2)) + colors = plt.cm.tab20.colors[:len(CSSdata)] + timeAxis = loggers[0].times() + for i in range(len(CSSdata)): + plt.subplot(int(len(CSSdata) / 6), 6, i+1) + plt.plot(timeAxis * macros.NANO2SEC / period, CSSdata[i], + color=colors[i]) + plt.title(f'$CSS_{{{i+1}}}$') + plt.ylim(-0.5, 1.5) + plt.xlabel("Time [orbits]") + plt.ylabel("Sensor Output") + + if CSSsun: + #where the sensors collectively think the sun is (orientation vector) + #plt.figure(3) + #not sure what i was thinking when i made this plot + plt.figure(3) + sensedSun = np.array(sensedSun) + plt.plot(satLog.times () * macros.NANO2SEC / period, sensedSun[:, 0], label=rf"$\sigma_{1}$") + plt.plot(satLog.times () * macros.NANO2SEC / period, sensedSun[:, 1], label=rf"$\sigma_{2}$") + plt.plot(satLog.times () * macros.NANO2SEC / period, sensedSun[:, 2], label=rf"$\sigma_{3}$") + plt.title("Sun Orientation via CSS Data (Inertial, 321 Euler)") + plt.legend() + plt.xlabel("Time [orbits]") + plt.ylabel("Orientation (rad)") + + #satellite orientation relative to inertial + plt.figure(4) + for i in range(3): + plt.plot(satLog.times() / period, sigma[:, i], label=rf"$\sigma_{i+1}$") + plt.title("Inertial Orientation") + plt.xlabel("Time [orbits]") + plt.ylabel("Orientation (MRP)") + plt.legend() + + plt.figure(5) + for i in range(numCSS): + plt.plot(satLog.times() / period, faults[:, i], label=rf"$CSS_{{{i+1}}}$") + plt.title("CSS Sensors' Fault State") + plt.xlabel("Time [orbits]") + plt.ylabel("Fault State") + plt.legend() + + plt.tight_layout() + plt.show() + + return np.array(satLog.times() / period), np.array(sigma), np.array(sensedSun), np.array(CSSdata), np.array(faults) + +if __name__ == "__main__": + simulate(False, False) + \ No newline at end of file diff --git a/simulation-container/simulations/simpleRWFaultScenario.py b/simulation-container/simulations/simpleRWFaultScenario.py index de90db4..1760313 100644 --- a/simulation-container/simulations/simpleRWFaultScenario.py +++ b/simulation-container/simulations/simpleRWFaultScenario.py @@ -5,6 +5,7 @@ import psycopg2 from psycopg2 import sql import json +import sys #utilities? from Basilisk.architecture import messaging @@ -33,6 +34,10 @@ #general simulation initialization, i think from Basilisk.utilities import SimulationBaseClass +#debugging +np.set_printoptions(threshold=np.inf) + + def simulate(plot): #a bunch of initializations simTaskName = "sim city" @@ -113,6 +118,13 @@ def simulate(plot): nav.scStateInMsg.subscribeTo(satellite.scStateOutMsg) nav.sunStateInMsg.subscribeTo(spice.planetStateOutMsgs[1]) + #constant disturbance so the RWs are always active + ext = extForceTorque.ExtForceTorque() + satellite.addDynamicEffector(ext) + ext.extTorquePntB_B = [0.0015, 0.0015, 0.0015] + satSim.AddModelToTask(simTaskName, ext) + + #inertial reference attitude inertial = inertial3D.inertial3D() inertial.ModelTag = "inertial3D" @@ -150,13 +162,14 @@ def simulate(plot): """ maxMomentum = 100. defaults = [ - {"axis":[1, 0, 0], "u_max":0.2, "type":"Honeywell_HR16"}, - {"axis":[0, 1, 0], "u_max":0.2, "type":"Honeywell_HR16"}, - {"axis":[0, 0, 1], "u_max":0.2, "type":"Honeywell_HR16"}, + {"axis":[1, 0, 0], "u_max":0.05, "rwType":"Honeywell_HR16", "cViscous":0.0005}, + {"axis":[0, 1, 0], "u_max":0.05, "rwType":"Honeywell_HR16", "cViscous":0.0005}, + {"axis":[0, 0, 1], "u_max":0.05, "rwType":"Honeywell_HR16", "cViscous":0.0005}, ] for i in range(len(defaults)): - rwFactory.create(defaults[i]["type"], defaults[i]["axis"], maxMomentum=maxMomentum, RWModel=rwModel, u_max=defaults[i]["u_max"]) + rwFactory.create(defaults[i]["rwType"], defaults[i]["axis"], maxMomentum=maxMomentum, RWModel=rwModel, u_max=defaults[i]["u_max"], + cViscous=defaults[i]["cViscous"]) numRW = rwFactory.getNumOfDevices() @@ -167,12 +180,16 @@ def simulate(plot): rwFactory.addToSpacecraft(satellite.ModelTag, rwEffector, satellite) satSim.AddModelToTask(simTaskName, rwEffector, 2) #the 2 ensures it will get updated before the satellite - higher priority + # for addition to RWFault components = [] count = 0 for i in rwFactory.rwList: - components.append((defaults[count]["type"], rwFactory.rwList[i])) + components.append((defaults[count]["rwType"], rwFactory.rwList[i])) count += 1 - rwf = RWFault(components, rwFactory = rwFactory, rwEffector=rwEffector, defaults=defaults, rwModel=rwModel) + #see rwfault.py for notes on requirements + rwf = RWFault(components, rwFactory = rwFactory, rwEffector=rwEffector, defaults=defaults, rwModel=rwModel, chance=0.001, + types=["friction"]) + satSim.AddModelToTask(simTaskName, rwf) #control torque """note: because of the RW's higher execution priority, @@ -240,31 +257,45 @@ def simulate(plot): #RW recorders rwMotorLog = rwMotor.rwMotorTorqueOutMsg.recorder(samplingTime) satSim.AddModelToTask(simTaskName, rwMotorLog) + + #measure the actual torque generated by each RW rwTorqueLog = [] for i in range(numRW): rwTorqueLog.append(rwEffector.rwOutMsgs[i].recorder(samplingTime)) satSim.AddModelToTask(simTaskName, rwTorqueLog[i]) + + #fault log as created by RWFault + faultLog = rwf.logger("fault") + satSim.AddModelToTask(simTaskName, faultLog) + + #state log as created by RWFault + stateLog = rwf.logger("state") + satSim.AddModelToTask(simTaskName, stateLog) - #fault log - faultLog = [] + #number of faults per component + faultCountLog = rwf.logger("count") + satSim.AddModelToTask(simTaskName, faultCountLog) """simulation start""" + satSim.SetProgressBar(True) satSim.InitializeSimulation() + satSim.ConfigureStopTime(simTime) + satSim.ExecuteSimulation() + + + #update faultLog to indicate all timesteps after injection as a fault + faultLog = np.array(faultLog.fault) + for i in range(1, len(faultLog)): + for j in range(len(faultLog[i])): + if faultLog[i-1, j]: + faultLog[i, j] = True + - - while satSim.TotalSim.CurrentNanos < simTime: - satSim.TotalSim.SingleStepProcesses() - #msgs.append(stateReader) - temp = rwf.inject_random(0.001) - faultLog.append([(1 if i[0] else 0) for i in temp]) - for i in range(len(faultLog[-1])): - if len(faultLog) > 1 and faultLog[-2][i] == 1: - faultLog[-1][i] = 1 - - faultLog = np.array(faultLog) - motorTorque = [rw.u_current for rw in rwTorqueLog] + count = faultCountLog.count + #extract actual output torques from recorder + motorTorque = np.array([rw.u_current for rw in rwTorqueLog]) sigma = np.array(satLog.sigma_BN) - + fricLog = np.array([[t[3] for t in col] for col in stateLog.state]) """plotting""" if plot: plt.figure(1) @@ -294,21 +325,26 @@ def simulate(plot): plt.xlabel("Time [orbits]") plt.ylabel("Torque [N-m]") plt.ylim(-0.22, 0.22) + plt.ylim(-0.005, 0.005) #fault plotting plt.figure(4) for i in range(numRW): - plt.plot(satLog.times() / period, faultLog[:, i], label=f'RW {i+1}') + plt.plot(satLog.times() / period, count[:, i], label=f'RW {i+1}') plt.title("RW Fault State") plt.legend() plt.xlabel("Time [orbits]") plt.ylabel("Fault State (binary)") - plt.ylim(-0.1, 1.1) plt.tight_layout() plt.show() - return satLog.times(), sigma, rwMotorLog.motorTorque[:, :3], motorTorque, faultLog + return np.array(satLog.times() / period), np.array(sigma), np.array(rwMotorLog.motorTorque[:, :3]), np.array(motorTorque), np.array(faultLog), np.array(fricLog) + +""" +This is a miniaturized version of dataloader.py for this specific scenario. +Just passes the return values of simulate() onto middleware.py +""" def run(plot, simulation_id): times, sigma, torque_desired, torque_actual, faults = simulate(plot) conn = psycopg2.connect( @@ -356,7 +392,8 @@ def run(plot, simulation_id): conn.close() if __name__ == "__main__": - run(False, simulation_id=3192025) + #run(False, simulation_id=3192025) + simulate(True) diff --git a/simulation-container/simulations/simpleThrusterFaultScenario.py b/simulation-container/simulations/simpleThrusterFaultScenario.py new file mode 100644 index 0000000..3f91dd2 --- /dev/null +++ b/simulation-container/simulations/simpleThrusterFaultScenario.py @@ -0,0 +1,313 @@ +import matplotlib.pyplot as plt +import numpy as np +import random +from utilities.thrusterfault import ThrusterFault +import psycopg2 +from psycopg2 import sql +import json +import sys + +#utilities? +from Basilisk.architecture import messaging +from Basilisk.utilities import macros +from Basilisk.utilities import orbitalMotion +from Basilisk.utilities import unitTestSupport +from Basilisk.utilities import RigidBodyKinematics as rbk +from Basilisk.utilities import simIncludeThruster +from Basilisk.utilities import simIncludeGravBody + + + +from Basilisk.simulation import spacecraft +from Basilisk.simulation import extForceTorque +from Basilisk.simulation import simpleNav +from Basilisk.simulation import thrusterDynamicEffector +from Basilisk.utilities import SimulationBaseClass +from Basilisk.fswAlgorithms import velocityPoint +from Basilisk.fswAlgorithms import mrpFeedback +from Basilisk.fswAlgorithms import attTrackingError + + +def simulate(plot): + #a bunch of initializations + simTaskName = "sim city" + simProcessName = "mr. sim" + + satSim = SimulationBaseClass.SimBaseClass() + timestep = 0.5 + dynamics = satSim.CreateNewProcess(simProcessName) + simulationTimeStep = macros.sec2nano(timestep) + dynamics.addTask(satSim.CreateNewTask(simTaskName, simulationTimeStep)) + + satellite = spacecraft.Spacecraft() + satellite.ModelTag = "oops" + + satSim.SetProgressBar(True) #completely useless for this sim + + + #satellite state definitions + inertia = [1000., 0., 0., + 0., 1000., 0., + 0., 0., 1000.] + + #note that all angular orientations (here and all throughout) are in MRPs + #angular velocities are in rad/s tho + #satellite mass + satellite.hub.mHub = 1000.0 + #distance from body frame origin to COM + satellite.hub.r_BcB_B= [[0.0], [0.0], [0.0]] + #adding inertia to the objectsatellite + satellite.hub.IHubPntBc_B = unitTestSupport.np2EigenMatrix3d(inertia) + #orientation of body frame relative to inertial + satellite.hub.sigma_BNInit = rbk.euler3212MRP([0, 0, 0]) + #ang velocity of body frame relative to inertial expressed in body frame coords + satellite.hub.omega_BN_BInit = [[0.0 * macros.D2R], [0.0], [0.0]]#[2.00 * macros.D2R]] + + satSim.AddModelToTask(simTaskName, satellite) + + + #gravity stuff + gravity = simIncludeGravBody.gravBodyFactory() + earth = gravity.createEarth() + earth.isCentralBody = True + + + gravity.addBodiesTo(satellite) + + #orbits! (first, for initial orbit) + oe1 = orbitalMotion.ClassicElements() + + r1 = 7000. * 1000 + oe1.a = r1 + oe1.e = 0.000 #1 + oe1.i = 0.0 #90.0 * macros.D2R + + oe1.Omega = 0 #110 gets permanent illumination at i = 90 + oe1.omega = 0 #90.0 * macros.D2R + oe1.f = 0 #85.3 * macros.D2R + rN1, vN1 = orbitalMotion.elem2rv(earth.mu, oe1) + oe1 = orbitalMotion.rv2elem(earth.mu, rN1, vN1) #yea idk why this exists + + #more satellite initializations + #for some reason these are relative to planet, but the + #satellite log's aren't + satellite.hub.r_CN_NInit = rN1 + satellite.hub.v_CN_NInit = vN1 + + #sim time + n1 = np.sqrt(earth.mu / oe1.a**3) + period1 = 2. * np.pi / n1 + maneuverTime1 = macros.sec2nano(period1) + + + #now, define final circular orbit as needed + r2 = 10000. * 1000 + a2 = r2 + + n2 = np.sqrt(earth.mu / a2**3) + period2 = 2. * np.pi / n2 + + finalTime = macros.sec2nano(period2) + + #finally, define hohmann elliptical transfer orbit as needed + rp = r1 + ra = r2 + at = (rp + ra) / 2. + periodTransfer = 2. * np.pi * np.sqrt(at**3 / earth.mu) + maneuverTime2 = macros.sec2nano(periodTransfer / 2.) + + #define the two impulse maneuvers that make up the hohmann transfer + v1 = np.sqrt(earth.mu / r1) + v2 = np.sqrt(earth.mu / r2) + vtp = np.sqrt(earth.mu * (2 / rp - 1 / at)) + vta = np.sqrt(earth.mu * (2 / ra - 1 / at)) + dv1 = (vtp - v1) + dv2 = (v2 - vta) + totalTime = maneuverTime2 + maneuverTime1 + finalTime + + #navigation module - can be done via the satellite's own thing, but + #i thought this would be good to use for future noise-addition purposes, + #which the satellite doesn't support (i don't think) + nav = simpleNav.SimpleNav() + nav.ModelTag = "navigation" + satSim.AddModelToTask(simTaskName, nav) + nav.scStateInMsg.subscribeTo(satellite.scStateOutMsg) + + "Thruster Setup" + #state effector + thrusterSet = thrusterDynamicEffector.ThrusterDynamicEffector() + satSim.AddModelToTask(simTaskName, thrusterSet) + + #creating the actual thrusters + thFactory = simIncludeThruster.thrusterFactory() + location = [[0, 0, 1], [0, 1, 0], [1, 0, 0]] + direction = [[0, 0, 1], [0, 1, 0], [1, 0, 0]] + maxThrust = 50000. + for (pos, direct) in zip(location, direction): + thFactory.create('Blank_Thruster', pos, direct, MaxThrust=maxThrust) + thrTimeData = messaging.THRArrayOnTimeCmdMsgPayload() + thrTimeData.OnTimeRequest = np.zeros(len(location)) + thrTimeMsg = messaging.THRArrayOnTimeCmdMsg().write(thrTimeData) + thrusterSet.cmdsInMsg.subscribeTo(thrTimeMsg) + + + #adding thrusters to satellite + thrModelTag = "Thrusters" + thFactory.addToSpacecraft(thrModelTag, thrusterSet, satellite) + + #calculate burn times + #note that because this uses thrusters capable of finite thrust, this isn't actually an ideal hohmann transfer. + t1 = dv1 / (maxThrust / satellite.hub.mHub) + t2 = dv2 / (maxThrust / satellite.hub.mHub) + totalTime = totalTime + macros.sec2nano(t1) + macros.sec2nano(t2) + + fault = ThrusterFault(satSim, timestep, thFactory, thrusterSet, thrTimeMsg, thrTimeData, location, direction) + satSim.AddModelToTask(simTaskName, fault) + + # setup module that makes satellite point towards its velocity vector + attGuidance = velocityPoint.velocityPoint() + attGuidance.ModelTag = "velocityPoint" + attGuidance.transNavInMsg.subscribeTo(nav.transOutMsg) + attGuidance.mu = earth.mu + satSim.AddModelToTask(simTaskName, attGuidance) + + #attitude error from reference + attError = attTrackingError.attTrackingError() + attError.ModelTag = "attErrorInertial3D" + attError.attNavInMsg.subscribeTo(nav.attOutMsg) + attError.attRefInMsg.subscribeTo(attGuidance.attRefOutMsg) + satSim.AddModelToTask(simTaskName, attError) + + control = mrpFeedback.mrpFeedback() + + control.ModelTag = "mrpFeedback" + satSim.AddModelToTask(simTaskName, control) + #parameters taken from scenarioAttitudeFeedbackRW + control.K = 3.5 + control.Ki = -1 #negative turns integral control off + control.P = 30.0 + control.integralLimit = 2. / control.Ki * 0.1 + + #external torque + ext = extForceTorque.ExtForceTorque() + satellite.addDynamicEffector(ext) + satSim.AddModelToTask(simTaskName, ext) + ext.cmdTorqueInMsg.subscribeTo(control.cmdTorqueOutMsg) + + #some final module subscriptions + + #apparently mrpFeedback needs config info for the satellite + configData = messaging.VehicleConfigMsgPayload() + configData.ISCPntB_B = inertia + configDataMsg = messaging.VehicleConfigMsg() + configDataMsg.write(configData) + control.guidInMsg.subscribeTo(attError.attGuidOutMsg) + control.vehConfigInMsg.subscribeTo(configDataMsg) + + + """data collection""" + + #how often each logger samples + samplingTime = unitTestSupport.samplingTime(totalTime , simulationTimeStep,\ + totalTime / simulationTimeStep) + + #true satellite states (translational and rotational position/velocity) + satLog = satellite.scStateOutMsg.recorder(samplingTime) + satSim.AddModelToTask(simTaskName, satLog) + + + + #technically a module for adding noise to sensors, but eh i use it for sun pointing + navLog = nav.attOutMsg.recorder(samplingTime) + satSim.AddModelToTask(simTaskName, navLog) + + veloLog = nav.transOutMsg.recorder(samplingTime) + satSim.AddModelToTask(simTaskName, veloLog) + + #thruster recorders + thrLog = [] + for i in range(thFactory.getNumOfDevices()): + thrLog.append(thrusterSet.thrusterOutMsgs[i].recorder(samplingTime)) + satSim.AddModelToTask(simTaskName, thrLog[i]) + + """simulation start""" + satSim.SetProgressBar(True) + satSim.InitializeSimulation() + satSim.ConfigureStopTime(maneuverTime1) + satSim.ExecuteSimulation() + + + thrTimeData.OnTimeRequest = [0, t1, 0] + thrTimeMsg.write(thrTimeData, time=maneuverTime1) + + satSim.ConfigureStopTime(maneuverTime1 + macros.sec2nano(t1)) + satSim.ExecuteSimulation() + + thrTimeData.OnTimeRequest = [0, 0, 0] + thrTimeMsg.write(thrTimeData, time=maneuverTime1 + macros.sec2nano(t1)) + + satSim.ConfigureStopTime(maneuverTime1 + maneuverTime2) + satSim.ExecuteSimulation() + + + thrTimeData.OnTimeRequest = [0, t2, 0] + thrTimeMsg.write(thrTimeData, time=maneuverTime1 + maneuverTime2) + + satSim.ConfigureStopTime(maneuverTime1 + maneuverTime2 + macros.sec2nano(t2)) + satSim.ExecuteSimulation() + + thrTimeData.OnTimeRequest = [0, 0, 0] + thrTimeMsg.write(thrTimeData, time=maneuverTime1 + maneuverTime2 + macros.sec2nano(t2)) + + satSim.ConfigureStopTime(maneuverTime1 + maneuverTime2 + finalTime) + satSim.ExecuteSimulation() + #collecting some of the data for plotting and returning + + sat_pos = satLog.r_BN_N + + sat_velo = satLog.v_BN_N + + + pos = np.array(sat_pos[:]) + velo = np.array(sat_velo[:]) + #^^ cause i decided to add the sun, which centers the inertial frame of these loggers at the sun + + navVelo = veloLog.v_BN_N + + sigma = np.array(satLog.sigma_BN) + omega = np.array(satLog.omega_BN_B) + + thrustLog = [thruster.thrustForce for thruster in thrLog] + oeEnd = orbitalMotion.rv2elem(earth.mu, pos[-1], velo[-1]) + if plot: + plt.figure(1) + for i in range(3): + plt.plot(satLog.times() * macros.NANO2SEC / period1, pos[:, i] / 1000, + color=unitTestSupport.getLineColor(i, 3)) + plt.title("Planet-relative Cartesian Position") + plt.legend(["x", "y", "z", 'vx', 'vy', 'vz']) + plt.xlabel("Time [orbits]") + plt.ylabel("Position [km]") + + plt.figure(2) + for i in range(thFactory.getNumOfDevices()): + plt.plot(satLog.times() * macros.NANO2SEC / period1, thrustLog[i], label=f'Thruster {i+1}') + plt.legend() + plt.title("Thruster Outputs") + plt.xlabel("Time [orbits]") + plt.ylabel("Thrust [N]") + + plt.figure(3) + for i in range(3): + plt.plot(satLog.times() * macros.NANO2SEC / period1, velo[:, i] / 1000, + color=unitTestSupport.getLineColor(i, 3)) + plt.title("Planet-relative Cartesian Velocity") + plt.legend(['vx', 'vy', 'vz']) + plt.xlabel("Time [orbits]") + plt.ylabel("Velocity [km/s]") + + plt.show() + return + +if __name__ == "__main__": + simulate(True) \ No newline at end of file diff --git a/simulation-container/simulations/tests/RW_Friction_Magnitude_Analysis_Derived_Quality_vs_Magnitude_1.png b/simulation-container/simulations/tests/RW_Friction_Magnitude_Analysis_Derived_Quality_vs_Magnitude_1.png new file mode 100644 index 0000000..2416b8c Binary files /dev/null and b/simulation-container/simulations/tests/RW_Friction_Magnitude_Analysis_Derived_Quality_vs_Magnitude_1.png differ diff --git a/simulation-container/simulations/tests/RW_Friction_Magnitude_Analysis_Derived_Quality_vs_Magnitude_1000_1.png b/simulation-container/simulations/tests/RW_Friction_Magnitude_Analysis_Derived_Quality_vs_Magnitude_1000_1.png new file mode 100644 index 0000000..2fd0d8a Binary files /dev/null and b/simulation-container/simulations/tests/RW_Friction_Magnitude_Analysis_Derived_Quality_vs_Magnitude_1000_1.png differ diff --git a/simulation-container/simulations/tests/RW_Friction_Magnitude_Analysis_Derived_Quality_vs_Magnitude_2.png b/simulation-container/simulations/tests/RW_Friction_Magnitude_Analysis_Derived_Quality_vs_Magnitude_2.png new file mode 100644 index 0000000..fb3166a Binary files /dev/null and b/simulation-container/simulations/tests/RW_Friction_Magnitude_Analysis_Derived_Quality_vs_Magnitude_2.png differ diff --git a/simulation-container/simulations/tests/RW_Friction_Magnitude_LIME_Analysis_Derived_Quality_vs_Magnitude_3.png b/simulation-container/simulations/tests/RW_Friction_Magnitude_LIME_Analysis_Derived_Quality_vs_Magnitude_3.png new file mode 100644 index 0000000..19c3517 Binary files /dev/null and b/simulation-container/simulations/tests/RW_Friction_Magnitude_LIME_Analysis_Derived_Quality_vs_Magnitude_3.png differ diff --git a/simulation-container/simulations/tests/RW_Friction_Magnitude_LIME_Analysis_Derived_Quality_vs_Magnitude_4.png b/simulation-container/simulations/tests/RW_Friction_Magnitude_LIME_Analysis_Derived_Quality_vs_Magnitude_4.png new file mode 100644 index 0000000..7378b69 Binary files /dev/null and b/simulation-container/simulations/tests/RW_Friction_Magnitude_LIME_Analysis_Derived_Quality_vs_Magnitude_4.png differ diff --git a/simulation-container/simulations/tests/RW_Friction_Magnitude_LIME_Analysis_Derived_Quality_vs_Magnitude_5.png b/simulation-container/simulations/tests/RW_Friction_Magnitude_LIME_Analysis_Derived_Quality_vs_Magnitude_5.png new file mode 100644 index 0000000..de6aacd Binary files /dev/null and b/simulation-container/simulations/tests/RW_Friction_Magnitude_LIME_Analysis_Derived_Quality_vs_Magnitude_5.png differ diff --git a/simulation-container/simulations/tests/RW_Friction_Magnitude_SHAP_Analysis_Derived_Quality_vs_Magnitude_1.png b/simulation-container/simulations/tests/RW_Friction_Magnitude_SHAP_Analysis_Derived_Quality_vs_Magnitude_1.png new file mode 100644 index 0000000..95647a7 Binary files /dev/null and b/simulation-container/simulations/tests/RW_Friction_Magnitude_SHAP_Analysis_Derived_Quality_vs_Magnitude_1.png differ diff --git a/simulation-container/simulations/tests/RW_Friction_Magnitude_SHAP_Analysis_Derived_Quality_vs_Magnitude_2.png b/simulation-container/simulations/tests/RW_Friction_Magnitude_SHAP_Analysis_Derived_Quality_vs_Magnitude_2.png new file mode 100644 index 0000000..f06b1c7 Binary files /dev/null and b/simulation-container/simulations/tests/RW_Friction_Magnitude_SHAP_Analysis_Derived_Quality_vs_Magnitude_2.png differ diff --git a/simulation-container/simulations/tests/RW_Friction_Magnitude_SHAP_Analysis_Derived_Quality_vs_Magnitude_3.png b/simulation-container/simulations/tests/RW_Friction_Magnitude_SHAP_Analysis_Derived_Quality_vs_Magnitude_3.png new file mode 100644 index 0000000..9a0100d Binary files /dev/null and b/simulation-container/simulations/tests/RW_Friction_Magnitude_SHAP_Analysis_Derived_Quality_vs_Magnitude_3.png differ diff --git a/simulation-container/simulations/tests/testLIME.py b/simulation-container/simulations/tests/testLIME.py index eb858a0..06e9ded 100644 --- a/simulation-container/simulations/tests/testLIME.py +++ b/simulation-container/simulations/tests/testLIME.py @@ -112,9 +112,3 @@ plt.show() - - - - - - diff --git a/simulation-container/simulations/tests/testLIME_CSS.py b/simulation-container/simulations/tests/testLIME_CSS.py new file mode 100644 index 0000000..0c2f9aa --- /dev/null +++ b/simulation-container/simulations/tests/testLIME_CSS.py @@ -0,0 +1,167 @@ +import lime +import lime.lime_tabular +import sklearn +from sklearn import ensemble +import numpy as np +import matplotlib.pyplot as plt +import sys +import os +import shap +import seaborn as sns +#weirdness occurred when i didn't have this +parent_dir = os.path.abspath(os.path.join(os.getcwd(), "..")) +sys.path.append(parent_dir) +#import twoModeScenario as sc #this test is currently setup to run twoModeScenario, which only has pseudofaults +import simpleCSSFaultScenario as sc +np.set_printoptions(threshold=np.inf) + +#run the simulation, combine the data per timestep +times, sigma, sensedSun, CSSdata, faults = sc.simulate(False, False) +features = [] +times /= 1e9 +for i in range(len(times)): + #note that the time is included here but not passed into the classifier + #it's only included here to make it easier to find the specific timestep being explained + #since train_test_split changes the order + + #on a LIME-interested note, these values are absolute-valued because LIME's output was nonsensical otherwise. + #and that makes sense, because what's really the difference to it between a negative and positive torque? + features.append([times[i]]) + #features[i].extend(abs(sigma[i])) + #features[i].extend(abs(sensedSun[i])) + features[i].extend(CSSdata[:, i]) +#features = [a + b + c for (a, b, c) in (sigma, torque_desired, torque_actual)] +features = np.array(features) +class_names = np.array(["OFF", "STUCK_CURRENT", "STUCK_MAX", "STUCK_RAND", "RAND", "NOMINAL"]) +labels = np.array([1 if np.isin(i, class_names[:-1]).any() else 0 for i in faults]) + +#train and test sets, naming features and labels +train, test, labels_train, labels_test = sklearn.model_selection.train_test_split(features, labels, train_size=0.80) +train = np.array(train) +test = np.array(test) +labels_train = np.array(labels_train) +labels_test = np.array(labels_test) +feature_names = []#["sigma_x", "sigma_y", "sigma_z"] +CSS_names = [f"CSS_{i+1}" for i in range(len(CSSdata[:, 0]))] +feature_names.extend(CSS_names) +feature_names = np.array(feature_names) +class_names = np.array(["No Fault", "Fault"]) + + + +#training the classifier and getting results +#note from here on out that the datasets passed in are indexed [1:] to exclude time +rf = ensemble.RandomForestClassifier(n_estimators=500) +rf.fit(train[:, 1:], labels_train) +labels_pred = np.array(rf.predict(test[:, 1:])) +print(f"Random Forest Prediction Accuracy: \n{sklearn.metrics.classification_report(labels_test, labels_pred)}") +print(f"Accuracy = {rf.score(test[:, 1:], labels_test)}") + + +#LIME!!!! passing all instances +explainer = lime.lime_tabular.LimeTabularExplainer(train[:, 1:], + feature_names=feature_names, + class_names=class_names, + discretize_continuous=False) + + +#generating a random timestep for LIME to explain +trues = np.array([i for i in range(len(labels_pred)) if labels_pred[i] == 1])#labels_pred[i] != ['NOMINAL'] * len(CSSdata[:, 0])] +j = np.random.choice(trues, 1)[0] +count = 0 +while np.argmax(rf.predict_proba(test[j-1:j+1, 1:])[1]) != labels_test[j]: + j = np.random.choice(trues, 1)[0] + + if count > len(labels_test): + print("NO CORRECT CLASSIFICATIONS") + + sys.exit() + count += 1 +#CLASSIFIER MUST HAVE A PROBABILITY IN ITS PREDICTION +#top_labels tells it how many labels to explain, in order from most to least likely +exp = explainer.explain_instance(np.array(test[j, 1:]), rf.predict_proba, num_features=len(feature_names), top_labels=len(class_names)) + +#outputs +print(f"Observation Explained:") +print(f"time: {test[j][0]}") +for i in range(len(feature_names)): + print(f"{feature_names[i]}: {test[j][1:][i]}") +print(f"Actual Observation Label: {labels_test[j]}") +print(f"Predicted Observation Label: {np.argmax(rf.predict_proba(test[j-1:j+1, 1:])[1])}") +print(f"LIME model bias for predicted class: {exp.intercept[0]}") +print(f"LIME model bias for other class: {exp.intercept[1]}") +print(f"R^2 of LIME's local linear model: {exp.score}") + + +#this is just to make it easier to see which instance is being explained. it will only work for twoModeScenario, +#and will have to be customized for all others, obviously +plt.figure(1, figsize=(20,len(CSSdata[:, 0]) / 2)) +colors = plt.cm.tab20.colors[:len(CSSdata[:, 0])] +for i in range(len(CSSdata[:, 0])): + plt.plot(times, faults[:, i], label=rf"$CSS_{{{i+1}}}$", color=colors[i]) +plt.title("CSS Sensors' Fault State") +plt.xlabel("Time [orbits]") +plt.ylabel("Fault State") +preds = np.array(test[trues, 0]) +actual = np.array([test[i][0] for i in range(len(labels_test)) if labels_test[i] == 1]) +#for i in actual: +# plt.axvline(x=i, color='g') +#for i in preds: +# plt.axvline(x=i, linestyle='--', color='r') +plt.axvline(x=test[j, 0], color='b') +#plt.xlim(test[j][0] - 0.05, test[j][0] + 0.05) +plt.legend() + +#mrpFeedback Desired Torque Outputs +""" +torque_desired = np.array(torque_desired) +for i in range(len(torque_desired[0])): + plt.plot(times, torque_desired[:, i], label=f'RW {i+1}') +true_preds = np.array(test[trues]) +actual_preds = np.array([test[i][0] for i in range(len(labels_test)) if labels_test[i] == 1]) +for i in actual_preds: + plt.axvline(x=i, color='g') +for i in true_preds[:, 0]: + plt.axvline(x=i, linestyle='--', color='r') +plt.xlim(test[j][0] - 0.1, test[j][0] + 0.1) +plt.title("mrpFeedback Desired Torques") +plt.legend() +plt.xlabel("Time [orbits]") +plt.ylabel("Torque [N-m]") + +plt.figure(2) +#attitude - body frame, MRPs +for i in range(3): + plt.plot(times, sigma[:, i], label=rf"$\sigma_{i+1}$") + plt.title("Inertial Orientation") + plt.xlabel("Time [orbits]") + plt.ylabel("Orientation (MRP)") + plt.legend() +for i in actual_preds: + plt.axvline(x=i, color='g') +for i in true_preds[:, 0]: + plt.axvline(x=i, linestyle='--', color='r') +plt.xlim(test[j][0] - 0.1, test[j][0] + 0.1) +""" +fig = exp.as_pyplot_figure(exp.top_labels[0]) +#fig1 = exp.as_pyplot_figure(exp.top_labels[1]) + +#SHAP STUFF +plt.figure(3) +explainer = shap.TreeExplainer(rf) +shap_values = explainer.shap_values(test[:, 1:], labels_test) +shap.waterfall_plot(shap.Explanation(values=shap_values[j, :, 1], base_values=explainer.expected_value[1], data=test[j, 1:], feature_names=feature_names)) + +plt.figure(4) +interaction = explainer.shap_interaction_values(test[j, 1:], labels_test[j])[:, :, 1] +sns.heatmap(interaction, xticklabels=feature_names, yticklabels=feature_names, cmap='coolwarm', center=0) +plt.title("SHAP Interaction Values for Explained Instance") + +plt.show() + + + + + + + diff --git a/simulation-container/simulations/tests/testLIME_RW.py b/simulation-container/simulations/tests/testLIME_RW.py new file mode 100644 index 0000000..72e0997 --- /dev/null +++ b/simulation-container/simulations/tests/testLIME_RW.py @@ -0,0 +1,184 @@ +import lime +import lime.lime_tabular +import sklearn +from sklearn import ensemble +import numpy as np +import matplotlib.pyplot as plt +import sys +import os +import seaborn as sns +import shap +#weirdness occurred when i didn't have this +parent_dir = os.path.abspath(os.path.join(os.getcwd(), "..")) +sys.path.append(parent_dir) +#import twoModeScenario as sc #this test is currently setup to run twoModeScenario, which only has pseudofaults +import simpleRWFaultScenario as sc +np.set_printoptions(threshold=np.inf) + +#run the simulation, combine the data per timestep + +iterate = True +SHAP = True + +def runLIME(): + + times, sigma, td, ta, faults, fric = sc.simulate(False) + features = [] + times /= 1e9 + fric = np.array([(np.argmax(i) + 1, np.max(i)) for i in fric]) + for i in range(len(times)): + #note that the time is included here but not passed into the classifier + #it's only included here to make it easier to find the specific timestep being explained + #since train_test_split changes the order + + #on a LIME-interested note, these values are absolute-valued because LIME's output was nonsensical otherwise. + #and that makes sense, because what's really the difference to it between a negative and positive torque? + features.append([times[i]]) + features[i].extend(abs(td[i])) + features[i].extend(abs(ta[:, i])) + features[i].extend(fric[i]) + #features = [a + b + c for (a, b, c) in (sigma, torque_desired, torque_actual)] + features = np.array(features) + labels = [1 if i.any() else 0 for i in faults] + #train and test sets, naming features and labels + train, test, labels_train, labels_test = sklearn.model_selection.train_test_split(features, labels, train_size=0.80) + train = np.array(train) + test = np.array(test) + labels_train = np.array(labels_train) + labels_test = np.array(labels_test) + feature_names = ["td_1", "td_2", "td_3", "ta_1", "ta_2", "ta_3"] + feature_names = np.array(feature_names) + class_names = np.array(["No Fault", "Fault"]) + + + + #training the classifier and getting results + #note from here on out that the datasets passed in are indexed [1:] to exclude time + rf = ensemble.RandomForestClassifier(n_estimators=500) + rf.fit(train[:, 1:-2], labels_train) + labels_pred = np.array(rf.predict(test[:, 1:-2])) + #print(f"Random Forest Prediction Accuracy: \n{sklearn.metrics.classification_report(labels_test, labels_pred)}") + #print(f"Accuracy = {rf.score(test[:, 1:-2], labels_test)}") + + + #LIME!!!! passing all instances + explainer = lime.lime_tabular.LimeTabularExplainer(train[:, 1:-2], + feature_names=feature_names, + class_names=class_names, + discretize_continuous=False) + + + #generating a random timestep for LIME to explain + trues = np.array([i for i in range(len(labels_pred)) if labels_pred[i] == 1])#labels_pred[i] != ['NOMINAL'] * len(CSSdata[:, 0])] + j = np.random.choice(trues, 1)[0] + count = 0 + while np.argmax(rf.predict_proba(test[j-1:j+1, 1:-2])[1]) != labels_test[j]: + j = np.random.choice(trues, 1)[0] + if count > len(labels_test): + print("NO CORRECT CLASSIFICATIONS") + sys.exit() + #CLASSIFIER MUST HAVE A PROBABILITY IN ITS PREDICTION + #top_labels tells it how many labels to explain, in order from most to least likely + fric_instance = test[j, -2:] + exp = explainer.explain_instance(np.array(test[j, 1:-2]), rf.predict_proba, num_features=len(feature_names), top_labels=len(class_names)) + weights = sorted(exp.as_list(), key=lambda x: abs(x[1]), reverse=True) + weights = np.array([int(i[0][-1]) for i in weights]) + + indices = np.where(weights == int(fric_instance[0]))[0] + quality.append((fric_instance[1], (indices[0], indices[1]))) + if SHAP: + shap_explainer = shap.TreeExplainer(rf) + shap_values = np.argsort(np.abs(shap_explainer.shap_values(test[j, 1:-2], labels_test[j])[:, 1]))[::-1] + shap_values = np.array([i + 1 for i in shap_values]) + shap_indices = np.where(shap_values == int(fric_instance[0]))[0] + shap_indices = np.concatenate((shap_indices, np.where(shap_values == int(fric_instance[0] + 3))[0])) + shap_quality.append((fric_instance[1], (shap_indices[0], shap_indices[1]))) + else: + shap_explainer = None + + + + return test, j, feature_names, labels_test, exp, trues, td, times, shap_explainer + + + +if iterate: + quality = [] + shap_quality = [] + num_error = 0 + count = 250 + for i in range(count): + try: + runLIME() + except: + num_error += 1 + continue + + mag = [i[0] for i in quality] + weight = [1. / (i[1][0] + i[1][1]) for i in quality] + sns.regplot(x=mag, y=weight, scatter=True, line_kws={'color':'red'}) + plt.title("Quality of LIME Explanation vs Friction Fault Magnitude") + plt.ylabel("Derived Quality of Explanation") + plt.xlabel("Maximum Magnitude of Viscous Friction Coefficient") + plt.savefig("RW_Friction_Magnitude_LIME_Analysis_Derived_Quality_vs_Magnitude_5.png") + + print(f"ERROR PERCENTAGE: {num_error * 1. / count}") + plt.figure() + if len(shap_quality) > 1: + mag = [i[0] for i in shap_quality] + weight = [1. / (i[1][0] + i[1][1]) for i in shap_quality] + sns.regplot(x=mag, y=weight, scatter=True, line_kws={'color':'red'}) + plt.title("Quality of SHAP Explanation vs Friction Fault Magnitude") + plt.ylabel("Derived Quality of Explanation") + plt.xlabel("Maximum Magnitude of Viscous Friction Coefficient") + plt.savefig("RW_Friction_Magnitude_SHAP_Analysis_Derived_Quality_vs_Magnitude_3.png") + plt.show() +else: + test, j, feature_names, labels_test, rf, exp, trues, td, times, explainer = runLIME() + #outputs + print(f"Observation Explained:") + print(f"time: {test[j][0]}") + for i in range(len(feature_names)): + print(f"{feature_names[i]}: {test[j, 1:-2][i]}") + print(f"Actual Observation Label: {labels_test[j]}") + print(f"Predicted Observation Label: {np.argmax(rf.predict_proba(test[j-1:j+1, 1:-2])[1])}") + print(f"LIME model bias for predicted class: {exp.intercept[0]}") + print(f"LIME model bias for other class: {exp.intercept[1]}") + print(f"R^2 of LIME's local linear model: {exp.score}") + + plt.figure(1) + #mrpFeedback Desired Torque Outputs + for i in range(len(td[0])): + plt.plot(times, td[:, i], label=f'RW {i+1}') + true_preds = np.array(test[trues]) + actual_preds = np.array([test[i][0] for i in range(len(labels_test)) if labels_test[i] == 1]) + for i in actual_preds: + plt.axvline(x=i, color='g') + for i in true_preds[:, 0]: + plt.axvline(x=i, linestyle='--', color='r') + plt.title("mrpFeedback Desired Torques") + plt.legend() + plt.xlabel("Time [orbits]") + plt.ylabel("Torque [N-m]") + plt.xlim(test[j][0] - 0.1, test[j][0] + 0.1) + + fig = exp.as_pyplot_figure(exp.top_labels[0]) + + if SHAP: + #SHAP STUFF + plt.figure(3) + + shap_values = explainer.shap_values(test[:, 1:-2], labels_test) + shap.waterfall_plot(shap.Explanation(values=shap_values[j, :, 1], base_values=explainer.expected_value[1], data=test[j, 1:-2], feature_names=feature_names)) + + plt.figure(4) + interaction = explainer.shap_interaction_values(test[j, 1:-2], labels_test[j])[:, :, 1] + sns.heatmap(interaction, xticklabels=feature_names, yticklabels=feature_names, cmap='coolwarm', center=0) + plt.title("SHAP Interaction Values for Explained Instance") + + plt.show() + + + + + diff --git a/simulation-container/simulations/tests/testSHAP.py b/simulation-container/simulations/tests/testSHAP.py new file mode 100644 index 0000000..7210c66 --- /dev/null +++ b/simulation-container/simulations/tests/testSHAP.py @@ -0,0 +1,73 @@ +import shap +import sklearn +from sklearn import ensemble +import numpy as np +import matplotlib.pyplot as plt +import sys +import os +#weirdness occurred when i didn't have this +parent_dir = os.path.abspath(os.path.join(os.getcwd(), "..")) +sys.path.append(parent_dir) +#import twoModeScenario as sc #this test is currently setup to run twoModeScenario, which only has pseudofaults +import simpleCSSFaultScenario as sc +np.set_printoptions(threshold=np.inf) + +#run the simulation, combine the data per timestep +times, sigma, sensedSun, CSSdata, faults = sc.simulate(False, False) +features = [] +times /= 1e9 +for i in range(len(times)): + #note that the time is included here but not passed into the classifier + #it's only included here to make it easier to find the specific timestep being explained + #since train_test_split changes the order + + #on a LIME-interested note, these values are absolute-valued because LIME's output was nonsensical otherwise. + #and that makes sense, because what's really the difference to it between a negative and positive torque? + features.append([times[i]]) + #features[i].extend(abs(sigma[i])) + #features[i].extend(abs(sensedSun[i])) + features[i].extend(CSSdata[:, i]) +#features = [a + b + c for (a, b, c) in (sigma, torque_desired, torque_actual)] +features = np.array(features) +class_names = np.array(["OFF", "STUCK_CURRENT", "STUCK_MAX", "STUCK_RAND", "RAND", "NOMINAL"]) +labels = np.array([1 if np.isin(i, class_names[:-1]).any() else 0 for i in faults]) + +#train and test sets, naming features and labels +train, test, labels_train, labels_test = sklearn.model_selection.train_test_split(features, labels, train_size=0.80) +train = np.array(train) +test = np.array(test) +labels_train = np.array(labels_train) +labels_test = np.array(labels_test) +feature_names = []#["sigma_x", "sigma_y", "sigma_z"] +CSS_names = [f"CSS_{i+1}" for i in range(len(CSSdata[:, 0]))] +feature_names.extend(CSS_names) +feature_names = np.array(feature_names) +class_names = np.array(["No Fault", "Fault"]) + + + +#training the classifier and getting results +#note from here on out that the datasets passed in are indexed [1:] to exclude time +rf = ensemble.RandomForestClassifier(n_estimators=500) +rf.fit(train[:, 1:], labels_train) +labels_pred = np.array(rf.predict(test[:, 1:])) +print(f"Random Forest Prediction Accuracy: \n{sklearn.metrics.classification_report(labels_test, labels_pred)}") +print(f"Accuracy = {rf.score(test[:, 1:], labels_test)}") + + +plt.figure(1, figsize=(20,len(CSSdata[:, 0]) / 2)) +colors = plt.cm.tab20.colors[:len(CSSdata[:, 0])] +for i in range(len(CSSdata[:, 0])): + plt.plot(times, faults[:, i], label=rf"$CSS_{{{i+1}}}$", color=colors[i]) +plt.title("CSS Sensors' Fault State") +plt.xlabel("Time [orbits]") +plt.ylabel("Fault State") +plt.legend() + +plt.figure() + +explainer = shap.TreeExplainer(rf) +shap_values = explainer.shap_values(test[:, 1:], labels_test) +shap.summary_plot(shap_values[:, :, 1], test[:, 1:], feature_names=feature_names) + +plt.show() \ No newline at end of file diff --git a/simulation-container/simulations/utilities/CSSfault.py b/simulation-container/simulations/utilities/CSSfault.py new file mode 100644 index 0000000..a7ff7d2 --- /dev/null +++ b/simulation-container/simulations/utilities/CSSfault.py @@ -0,0 +1,136 @@ +from collections.abc import Collection +from collections import OrderedDict +from typing import Any +import random +from Basilisk.architecture import sysModel, messaging +from Basilisk.simulation import coarseSunSensor +import numpy as np +""" +This is a module for introducing faults into Coarse Sun Sensors. It is a Basilisk Module, so it will +run at every time step provided it is added to a simulation task. + +The module is essentially a way to randomize the built-in faulted states of the coarseSunSensor module. +It randomly assigns random faults to each CSS. If the randomly chosen fault is anything except NOMINAL +and RAND, that sensor is faulted until it gets a different randomly chosen fault. If the randomly chosen fault is NOMINAL, +that sensor is not faulted! If the randomly chosen fault is RAND, that sensor returns to NOMINAL the next turn, +and does not have an entry in the fault sweepstakes that turn. +""" +class CSSfault(sysModel.SysModel): + """ + Requires: + - A list of the sensors + Optional: + - A list of default fault states and other things for the sensors + - a chance for the random fault injections + - a seed for the random fault injections + """ + def __init__(self, components : Collection[Any], **kwargs): + defaults = kwargs.get("defaults") + self.components = components + self.defaults = defaults + self.builtin = ["OFF", "STUCK_CURRENT", "STUCK_MAX", "STUCK_RAND", "RAND", "NOMINAL"] + self.types = kwargs.get("types") + if not self.types: + self.types = ["NOMINAL", "STUCK_CURRENT", "NOMINAL", "STUCK_CURRENT", "STUCK_CURRENT", "NOMINAL"] + if len(self.types) < 6: + self.types.extend(["NOMINAL"] * (6 - len(self.types))) + super().__init__() + #loggable attribute for fault states + self.faultState = ["NOMINAL"] * len(self.components) #all sensors are assumed to start off nominal + self.chance = kwargs.get("chance") + self.seed = kwargs.get("seed") + self.rand = [False] * len(self.components) + for i in self.components: + i.faultState = getattr(coarseSunSensor, "NOMINAL") #forcibly make all sensors nominal at first + """ + Update method. Runs at every timestep. + + Just calls the random fault injection and uses its results to update the loggable attribute. + """ + def UpdateState(self, CurrentSimNanos): + self.faultState = [i[1] for i in self.inject_random(self.chance)] + + """ + Injection if specific new settings are desired. + + Goes through the list of sensors and sets them to the desired fault state. + Used by inject_random once new settings have been randomized. Returns the + new settings. + """ + def inject(self, newSettings : list): + for i in range(len(self.components)): + try: + #for some reason, all the predefined fault states are CSSFAULT_something except NOMINAL + if newSettings[i].upper() == "NOMINAL": + self.components[i].faultState = getattr(coarseSunSensor, (newSettings[i]).upper()) + else: + self.components[i].faultState = getattr(coarseSunSensor, "CSSFAULT_" + (newSettings[i]).upper()) + except: + raise NameError(newSettings[i] + " is not yet a supported fault type.") + return newSettings + """ + Random injection of faults with a given chance. + + Whether each sensor is faulted is randomized, and the type of fault is also + randomized. As stated above, if the fault chosen is RAND, it will only last 1 turn, + and return to NOMINAL the next. Otherwise, it will remain at the chosen or prior state + until changed here. This returns a list of the following form: + (bool, fault state) where bool is whether a fault is injected into the given component + and fault state is the new (or old, if no new fault) setting. + """ + def inject_random(self, chance): + if chance is None: + chance = 0.0005 + toInject = self.randomize(chance, seed=self.seed) + newSettings = [] + for i in range(len(toInject)): + if toInject[i]: + #randomly choose the fault type + fault = np.random.choice(self.types, 1)[0] + newSettings.append(fault) + else: + #check if the last fault state was RAND, and change back to NOMINAL if so + past = self.components[i].faultState + if past == 4: + if self.rand[i]: + newSettings.append("NOMINAL") + self.rand[i] = False + else: + self.rand[i] = True + newSettings.append("RAND") + else: + newSettings.append(self.builtin[self.components[i].faultState]) + return list(zip(toInject, self.inject(newSettings))) + + """ + Resets all reaction wheels to the prescribed defaults, if prescribed. + + Will eventually need to be updated to reset to NOMINAL if not prescribed, + but doesn't really matter because this is not used in normal operation. + """ + def reset(self): + if not self.defaults: + raise ValueError("Defaults have not been set.") + for i in range(len(self.components)): + try: + self.components[i].faultState = getattr(coarseSunSensor, "CSSFAULT_" + (self.defaults[i]).upper()) + except: + raise TypeError("Defaults and/or Components do not contain necessary information.") + """ + Choose which components to randomly have faults injected. + + Parameter: + chance (float): the chance that each component will fault. + + Returns: + toInject (list[bool]): Indicates whether a fault is to be injected to the component at this index + """ + def randomize(self, chance : float, **kwargs): + if not 0. <= chance <= 1.: + raise ValueError("Chance must be between 0 and 1.") + seed = kwargs.get("seed") + if seed is not None: + random.seed(seed) + toInject = [random.random() < chance for _ in self.components] + return toInject + \ No newline at end of file diff --git a/simulation-container/simulations/utilities/fault.py b/simulation-container/simulations/utilities/fault.py deleted file mode 100644 index e3cd30c..0000000 --- a/simulation-container/simulations/utilities/fault.py +++ /dev/null @@ -1,76 +0,0 @@ -import random -from abc import ABC, abstractmethod -from collections.abc import Collection -from typing import Any - - -class Fault(ABC): - """ - Abstract base class for componentwise fault injection - """ - def __init__(self, components: Collection[Any], **kwargs): - """ - Initialize the abstract fault. - - Parameters: - components (Collection): the components in which to (potentially) inject the fault - defaults: a dictionary of the default settings for each component - """ - self.components = components - self.defaults = kwargs.get("defaults") if kwargs.get("defaults") else None - - @abstractmethod - def inject(self, newSettings) -> Collection[Any]: - """ - Inject a predefined fault. - - Returns: - Collection: Details of fault injected into each component. - """ - pass - - @abstractmethod - def inject_random(self, chance : float) -> Collection[tuple[bool, Any]]: - """ - Inject random faults into random components. - - Parameter: - chance (float): the chance that each component will fault. - - Returns: - Collection: - bool: Indicates whether a fault is to be injected to the component at this index - Any: Details of fault injected, None if no fault injected. - """ - pass - - @abstractmethod - def reset(self) -> bool: - """ - Reset all components to settings pre-fault injections, if defaults are given. - - Returns: - bool: True if successful - """ - pass - - def randomize(self, chance : float, **kwargs): - """ - Choose which components to randomly have faults injected. - - Parameter: - chance (float): the chance that each component will fault. - - Returns: - toInject (list[bool]): Indicates whether a fault is to be injected to the component at this index - """ - if not 0. <= chance <= 1.: - raise ValueError("Chance must be between 0 and 1.") - seed = kwargs.get("seed") - if seed is not None: - random.seed(seed) - toInject = [random.random() < chance for _ in self.components] - return toInject - - - diff --git a/simulation-container/simulations/utilities/rwfault.py b/simulation-container/simulations/utilities/rwfault.py index c1c1e58..7f7a34d 100644 --- a/simulation-container/simulations/utilities/rwfault.py +++ b/simulation-container/simulations/utilities/rwfault.py @@ -1,27 +1,96 @@ -from faults.fault import Fault + from collections.abc import Collection from collections import OrderedDict from typing import Any import random -from Basilisk import messaging +from Basilisk.architecture import messaging, sysModel import numpy as np +""" +This is a module for introducing faults into Reaction Wheels. It is a Basilisk module, +so it runs at every timestep provided it is added to a simulation task. -class RWFault(Fault): +The only implemented methodology for faults thus far is a reduction in max torque output +of the reaction wheels. The introduction of friction will eventually be added. +""" +class RWFault(sysModel.SysModel): + """ + Requires, in order: + - A list of the form (type, RW), where type is the model of the reaction wheel and RW is the reaction wheel itself + - the rwFactory (simIncluderwFactory()) used to generate those wheels + - the rwEffector (reactionWheelStateEffector) used to make the wheels actually do something + + Optional: + - the model used for the RWs (jitter, balanced, etc). not not implemented yet + - a seed for the randomized fault injection + - a fault injection chance per RW for the randomized fault injection + - defaults - the original settings of the RWs + """ def __init__(self, components: Collection[Any], rwFactory, rwEffector, **kwargs): - super().__init__(components, **kwargs) + super().__init__() + self.components = components self.rwFactory = rwFactory self.rwEffector = rwEffector self.rwModel = kwargs.get("model") self.seed = kwargs.get("seed") + self.chance = kwargs.get("chance") + self.defaults = kwargs.get("defaults") + self.friction = kwargs.get("friction") + self.types = kwargs.get("types") + if not self.types: + self.types = ["torque"] + #if the defaults are not passed in, the defaults are pulled from the starting states + if not self.defaults: + self.defaults = [] + for i in range(len(components)): + spin = [sublist[0] for sublist in self.components[i][1].gsHat_B] + rwType = self.components[i][0] + u_max = self.components[i][1].u_max + cvis = self.components[i][1].cViscous + self.defaults.append({"axis":spin, "u_max":u_max, "rwType":rwType, "cViscous":cvis}) + #loggable attribute for the actual traits of the RWs + self.state = [(i["rwType"], i["axis"], i["u_max"], i["cViscous"]) for i in self.defaults] + #loggable attribute for the timesteps of fault injections + self.fault = [False for _ in self.components] + self.count = [0] * len(self.components) + + """ + Update method. Runs every timestep. + + Just calls the inject_random method, and updates the loggable attributes with the results. + """ + def UpdateState(self, CurrentSimNanos): + state = self.inject_random(self.chance) + self.state = state[:, 1] + self.fault = state[:, 0] + self.count = [self.count[i] + 1 if self.fault[i] else self.count[i] for i in range(len(self.components))] + + """ + The method that actually changes out the RWs. + + Only called from inject() - takes in new RW settings, and uses them to replace + the batch of RWs that existed previously with new ones with those settings. + + This is the only way I could find to introduce faults into RWs on the Python side. + """ def replaceRWs(self, newRWs): + #empty existing RW carriers + self.rwFactory.rwList = OrderedDict([]) self.rwEffector.ReactionWheelData = self.rwEffector.ReactionWheelData[:0] - for (rwType, spin, torque) in newRWs: + newComps = [] + #create new RWs + for (rwType, spin, torque, cvis) in newRWs: rwModel = self.rwModel if self.rwModel else messaging.BalancedWheels - RW_new = self.rwFactory.create(rwType, spin, maxMomentum=100., RWmodel=rwModel, u_max=torque) + RW_new = self.rwFactory.create(rwType, spin, maxMomentum=100., RWmodel=rwModel, u_max=torque, + cViscous=cvis) self.rwEffector.addReactionWheel(RW_new) - + newComps.append((rwType, RW_new)) + self.components = newComps + + """ + Resets all RWs to default settings. + """ def reset(self): if not self.defaults: raise ValueError("Defaults have not been set.") @@ -31,32 +100,74 @@ def reset(self): spin = self.defaults[i]["axis"] u_max = self.defaults[i]["u_max"] rwType = self.defaults[i]["type"] - resetRWs.append((rwType, spin, u_max)) + cvis = self.defaults[i]["cViscous"] + resetRWs.append((rwType, spin, u_max, cvis)) except: raise TypeError("Defaults and/or Components do not contain necessary information.") self.replaceRWs(resetRWs) return len(self.rwFactory.rwList) > 0 and len(self.rwEffector.ReactionWheelData) > 0 + """ + Meant for non-random injections where the new desired settings are known. Not used in practice, + yet, except as a bridge between inject_random and replaceRWs. + """ def inject(self, newSettings) -> Collection[Any]: self.replaceRWs(newSettings) return newSettings + """ + Does the bulk of the injections in practice currently, randomized. + Each RW has the same chance of being faulted. If it isn't faulted, + the RW still will be replaced, but with the same settings. + If it is faulted, the RW's max torque output will be reduced by some + random factor. This may become multiplicative. + + This method returns a list of tuples of the form: + (bool, (rwType, spin axis, torque)) + where bool is whether a fault was injected, rwType is the rwModel, and torque is the max torque output. + """ def inject_random(self, chance : float) -> Collection[tuple[bool, Any]]: + #if chance isn't passed in + + if chance is None: + chance = 0.001 toInject = self.randomize(chance, seed=self.seed) if self.seed: random.seed(self.seed) newSettings = [] for i in range(len(toInject)): + #spin and rwType stay the same spin = [sublist[0] for sublist in self.components[i][1].gsHat_B] rwType = self.components[i][0] + #u_max and/or cvis is changed if there's a fault by a random factor u_max = self.components[i][1].u_max + cvis = self.components[i][1].cViscous if toInject[i]: rand = random.random() * 10 - u_max = u_max / rand if rand >= 1 else u_max * rand - newSettings.append((rwType, spin, u_max)) + if "torque" in self.types: + u_max = u_max / rand if rand >= 1 else u_max * rand + if "friction" in self.types: + cvis = cvis / rand if rand < 1 else cvis * rand + newSettings.append((rwType, spin, u_max, cvis)) details = list(zip(toInject, self.inject(newSettings))) - for i in range(len(details)): - if not details[i][0]: - details[i] = (details[i][0], None) return np.array(details, dtype=object) + + """ + Choose which components to randomly have faults injected. + + Parameter: + chance (float): the chance that each component will fault. + + Returns: + toInject (list[bool]): Indicates whether a fault is to be injected to the component at this index + """ + def randomize(self, chance : float, **kwargs): + + if not 0. <= chance <= 1.: + raise ValueError("Chance must be between 0 and 1.") + seed = kwargs.get("seed") + if seed is not None: + random.seed(seed) + toInject = [random.random() < chance for _ in self.components] + return toInject \ No newline at end of file diff --git a/simulation-container/simulations/utilities/thrusterfault.py b/simulation-container/simulations/utilities/thrusterfault.py new file mode 100644 index 0000000..b0802e0 --- /dev/null +++ b/simulation-container/simulations/utilities/thrusterfault.py @@ -0,0 +1,84 @@ +from collections.abc import Collection +from collections import OrderedDict +from typing import Any +import random +from Basilisk.architecture import messaging, sysModel +import numpy as np + +class ThrusterFault(sysModel.SysModel): + #simulation, simulation timestep, simIncludeThruster.thrusterFactory(), thrusterDynamicEffector.ThrusterDynamicEffector(), location relative to body origin, thrust pointing vector in body frame + def __init__(self, sim, timestep, thFactory, thSet, timeMsg, timeData, locations, directions, **kwargs): + super().__init__() + self.sim = sim + self.components = thFactory.thrusterList + self.timestep = timestep + self.thFactory = thFactory + self.thSet = thSet + self.seed = kwargs.get("seed") + self.chance = kwargs.get("chance") + self.type = kwargs.get("type") + if not self.type: + self.type = 'Blank_Thruster' + self.locations = locations + self.directions = directions + self.thrTimeMsg = timeMsg + self.thrTimeData = timeData + #self.thrTimeMsg.write(self.thrTimeData, time=0) + self.state = [(False, (l, d, 0, 0)) for (l, d) in zip(locations, directions)] + + def UpdateState(self, CurrentSimNanos): + self.state = self.inject_random(self.chance, CurrentSimNanos) + + + def inject(self, newSettings, currentTime): + times = [] + for (_, _, _, thrustTime) in newSettings: + times.append(thrustTime) + + self.thrTimeData.OnTimeRequest = times + self.thrTimeMsg.write(self.thrTimeData, time=currentTime+1) + return newSettings + + def inject_random(self, chance, currentTime): + if chance is None: + chance = 0.001 + toInject = self.randomize(chance, seed=self.seed) + forces = [] + thrustTime = [] + ontimes = self.thrTimeData.OnTimeRequest + if ontimes[0] == 0 and ontimes[1] == 0 and ontimes[2] == 0: + for i in range(len(self.components)): + randf = random.random() * 500. + randt = (random.random() * self.timestep + self.timestep) * 10 + #^^last between 1 and 2 timesteps, so it shows up on the plot + if toInject[i]: + forces.append(randf) + thrustTime.append(randt) + else: + forces.append(500.) + thrustTime.append(0) + newSettings = zip(self.locations, self.directions, forces, thrustTime) + self.inject(newSettings, currentTime) + + return np.array(zip(toInject, newSettings), dtype=object) + else: + return [] + + """ + Choose which components to randomly have faults injected. + + Parameter: + chance (float): the chance that each component will fault. + + Returns: + toInject (list[bool]): Indicates whether a fault is to be injected to the component at this index + """ + def randomize(self, chance : float, **kwargs): + + if not 0. <= chance <= 1.: + raise ValueError("Chance must be between 0 and 1.") + seed = kwargs.get("seed") + if seed is not None: + random.seed(seed) + toInject = [random.random() < chance for _ in self.components] + return toInject