From c9630679d9e3e581cb5e7d13ea60f2887e2484c8 Mon Sep 17 00:00:00 2001 From: Gerardo Marx Date: Fri, 17 Oct 2025 09:36:47 -0600 Subject: [PATCH] 2D example now working in macos and new version of matplotlib animation --- assignment1_answer.py | 22 ++ assignment1_initial_conditions.py | 103 +++++++++ assignment1_initial_conditions_answer.py | 105 +++++++++ assignment1_prediction.py | 62 ++++++ assignment1_prediction_answer.py | 64 ++++++ assignment1_update.py | 105 +++++++++ assignment1_update_answer.py | 97 ++++++++ kfsims/__init__.py | 0 kfsims/kfmodels.py | 28 +++ kfsims/kftracker2d.py | 77 +++++++ kfsims/tracker2d.py | 272 +++++++++++++++++++++++ kfsims/vehiclemodel2d.py | 37 +++ 12 files changed, 972 insertions(+) create mode 100644 assignment1_answer.py create mode 100644 assignment1_initial_conditions.py create mode 100644 assignment1_initial_conditions_answer.py create mode 100644 assignment1_prediction.py create mode 100644 assignment1_prediction_answer.py create mode 100644 assignment1_update.py create mode 100644 assignment1_update_answer.py create mode 100644 kfsims/__init__.py create mode 100644 kfsims/kfmodels.py create mode 100644 kfsims/kftracker2d.py create mode 100644 kfsims/tracker2d.py create mode 100644 kfsims/vehiclemodel2d.py diff --git a/assignment1_answer.py b/assignment1_answer.py new file mode 100644 index 0000000..b40f244 --- /dev/null +++ b/assignment1_answer.py @@ -0,0 +1,22 @@ +import numpy as np +from kfsims.tracker2d import run_sim +from kfsims.kftracker2d import KalmanFilterModel + +# Simulation Options +sim_options = {'time_step': 0.01, + 'end_time': 120, + 'measurement_rate': 1, + 'measurement_noise_std': 10, + 'motion_type': 'random', + 'start_at_origin': False, + 'start_at_random_speed': True, + 'start_at_random_heading': True, + 'draw_plots': True, + 'draw_animation': True} + +kf_options = {'accel_std':0.5, # Q Matrix Param + 'meas_std':10, # R Matrix + 'init_on_measurement':True} + +# Run the Simulation +run_sim(KalmanFilterModel, sim_options, kf_options) \ No newline at end of file diff --git a/assignment1_initial_conditions.py b/assignment1_initial_conditions.py new file mode 100644 index 0000000..fe03a07 --- /dev/null +++ b/assignment1_initial_conditions.py @@ -0,0 +1,103 @@ +import numpy as np +from kfsims.tracker2d import run_sim +from kfsims.kfmodels import KalmanFilterBase + +# Simulation Options +sim_options = {'time_step': 0.01, + 'end_time': 120, + 'measurement_rate': 1, + 'measurement_noise_std': 10, + 'motion_type': 'straight', + 'start_at_origin': False, + 'start_at_random_speed': True, + 'start_at_random_heading': True, + 'draw_plots': True, + 'draw_animation': True} + +# Kalman Filter Model +class KalmanFilterModel(KalmanFilterBase): + + def initialise(self, time_step): + + # Set Initial State and Covariance (COMMENT OUT FOR DELAYED) + init_pos_std = 0 + init_vel_std = 10 + self.state = np.array([0,0,0,0]) + self.covariance = np.diag(np.array([init_pos_std*init_pos_std, + init_pos_std*init_pos_std, + init_vel_std*init_vel_std, + init_vel_std*init_vel_std])) + + # Setup the Model F Matrix + dt = time_step + self.F = np.array([[1,0,dt,0], + [0,1,0,dt], + [0,0,1,0], + [0,0,0,1]]) + + # Set the Q Matrix + accel_std = 0.0 + self.Q = np.diag(np.array([(0.5*dt*dt),(0.5*dt*dt),dt,dt]) * (accel_std*accel_std)) + + # Setup the Model H Matrix + self.H = np.array([[1,0,0,0],[0,1,0,0]]) + + # Set the R Matrix + meas_std = 10.0 + self.R = np.diag([meas_std*meas_std, meas_std*meas_std]) + + return + + def prediction_step(self): + # Make Sure Filter is Initialised + if self.state is not None: + x = self.state + P = self.covariance + + # Calculate Kalman Filter Prediction + x_predict = np.matmul(self.F, x) + P_predict = np.matmul(self.F, np.matmul(P, np.transpose(self.F))) + self.Q + + # Save Predicted State + self.state = x_predict + self.covariance = P_predict + + return + + def update_step(self, measurement): + + # Make Sure Filter is Initialised + if self.state is not None and self.covariance is not None: + x = self.state + P = self.covariance + H = self.H + R = self.R + + # Calculate Kalman Filter Update + z = np.array([measurement[0],measurement[1]]) + z_hat = np.matmul(H, x) + + y = z - z_hat + S = np.matmul(H,np.matmul(P,np.transpose(H))) + R + K = np.matmul(P,np.matmul(np.transpose(H),np.linalg.inv(S))) + + x_update = x + np.matmul(K, y) + P_update = np.matmul( (np.eye(4) - np.matmul(K,H)), P) + + # Save Updated State + self.innovation = y + self.innovation_covariance = S + self.state = x_update + self.covariance = P_update + + else: + + # Set Initial State and Covariance + return + + + return + + +# Run the Simulation +run_sim(KalmanFilterModel, sim_options, {}) \ No newline at end of file diff --git a/assignment1_initial_conditions_answer.py b/assignment1_initial_conditions_answer.py new file mode 100644 index 0000000..026f327 --- /dev/null +++ b/assignment1_initial_conditions_answer.py @@ -0,0 +1,105 @@ +import numpy as np +from kfsims.tracker2d import run_sim +from kfsims.kfmodels import KalmanFilterBase + +# Simulation Options +sim_options = {'time_step': 0.01, + 'end_time': 120, + 'measurement_rate': 1, + 'measurement_noise_std': 10, + 'motion_type': 'straight', + 'start_at_origin': False, + 'start_at_random_speed': True, + 'start_at_random_heading': True, + 'draw_plots': True, + 'draw_animation': True} + +# Kalman Filter Model +class KalmanFilterModel(KalmanFilterBase): + + def initialise(self, time_step): + + # Set Initial State and Covariance (COMMENT OUT FOR DELAYED INIT) + init_pos_std = 100 + init_vel_std = 10 + #self.state = np.array([0,0,0,0]) + #self.covariance = np.diag(np.array([init_pos_std*init_pos_std, + # init_pos_std*init_pos_std, + # init_vel_std*init_vel_std, + # init_vel_std*init_vel_std])) + + # Setup the Model F Matrix + dt = time_step + self.F = np.array([[1,0,dt,0], + [0,1,0,dt], + [0,0,1,0], + [0,0,0,1]]) + + # Set the Q Matrix + accel_std = 0.0 + self.Q = np.diag(np.array([(0.5*dt*dt),(0.5*dt*dt),dt,dt]) * (accel_std*accel_std)) + + # Setup the Model H Matrix + self.H = np.array([[1,0,0,0],[0,1,0,0]]) + + # Set the R Matrix + meas_std = 10.0 + self.R = np.diag([meas_std*meas_std, meas_std*meas_std]) + + return + + def prediction_step(self): + # Make Sure Filter is Initialised + if self.state is not None: + x = self.state + P = self.covariance + + # Calculate Kalman Filter Prediction + x_predict = np.matmul(self.F, x) + P_predict = np.matmul(self.F, np.matmul(P, np.transpose(self.F))) + self.Q + + # Save Predicted State + self.state = x_predict + self.covariance = P_predict + + return + + def update_step(self, measurement): + + # Make Sure Filter is Initialised + if self.state is not None and self.covariance is not None: + x = self.state + P = self.covariance + H = self.H + R = self.R + + # Calculate Kalman Filter Update + z = np.array([measurement[0],measurement[1]]) + z_hat = np.matmul(H, x) + + y = z - z_hat + S = np.matmul(H,np.matmul(P,np.transpose(H))) + R + K = np.matmul(P,np.matmul(np.transpose(H),np.linalg.inv(S))) + + x_update = x + np.matmul(K, y) + P_update = np.matmul( (np.eye(4) - np.matmul(K,H)), P) + + # Save Updated State + self.innovation = y + self.innovation_covariance = S + self.state = x_update + self.covariance = P_update + + else: + + # Set Initial State and Covariance + init_vel_std = 10 + + self.state = np.array([measurement[0],measurement[1],0,0]) + self.covariance = np.diag(np.array([self.R[0,0],self.R[1,1],init_vel_std*init_vel_std,init_vel_std*init_vel_std])) + + return + + +# Run the Simulation +run_sim(KalmanFilterModel, sim_options, {}) \ No newline at end of file diff --git a/assignment1_prediction.py b/assignment1_prediction.py new file mode 100644 index 0000000..64cb142 --- /dev/null +++ b/assignment1_prediction.py @@ -0,0 +1,62 @@ +import numpy as np +from kfsims.tracker2d import run_sim +from kfsims.kfmodels import KalmanFilterBase + +# Simulation Options +sim_options = {'time_step': 0.01, + 'end_time': 120, + 'measurement_rate': 1, + 'measurement_noise_std': 10, + 'motion_type': 'straight', + 'start_at_origin': True, + 'start_at_random_speed': False, + 'start_at_random_heading': False, + 'draw_plots': True, + 'draw_animation': True} + +# Kalman Filter Model +class KalmanFilterModel(KalmanFilterBase): + + def initialise(self, time_step): + + # Define a np.array 4x1 with the initial state (px,py,vx,vy) + #self.state = + + # Define a np.array 4x4 for the initial covariance + #self.covariance = + + # Setup the Model F Matrix + #self.F = + + # Set the Q Matrix + #self.Q = + + return + + def prediction_step(self): + # Make Sure Filter is Initialised + if self.state is not None: + x = self.state + P = self.covariance + + # Calculate Kalman Filter Prediction + + # State Prediction: x_predict = F * x + x_predict = x + + # Covariance Prediction: P_predict = F * P * F' + Q + P_predict = P + + # Save Predicted State + self.state = x_predict + self.covariance = P_predict + + return + + def update_step(self, measurement): + return + + + +# Run the Simulation +run_sim(KalmanFilterModel, sim_options, {}) \ No newline at end of file diff --git a/assignment1_prediction_answer.py b/assignment1_prediction_answer.py new file mode 100644 index 0000000..276d756 --- /dev/null +++ b/assignment1_prediction_answer.py @@ -0,0 +1,64 @@ +import numpy as np +from kfsims.tracker2d import run_sim +from kfsims.kfmodels import KalmanFilterBase + +# Simulation Options +sim_options = {'time_step': 0.01, + 'end_time': 120, + 'measurement_rate': 1, + 'measurement_noise_std': 10, + 'motion_type': 'straight', + 'start_at_origin': True, + 'start_at_random_speed': False, + 'start_at_random_heading': False, + 'draw_plots': True, + 'draw_animation': True} + +# Kalman Filter Model +class KalmanFilterModel(KalmanFilterBase): + + def initialise(self, time_step): + + # Set Initial State and Covariance + self.state = np.array([0,0,7.07,7.07]) + self.covariance = np.diag(np.array([0,0,0,0])) + + # Setup the Model F Matrix + dt = time_step + self.F = np.array([[1,0,dt,0], + [0,1,0,dt], + [0,0,1,0], + [0,0,0,1]]) + + # Set the Q Matrix + accel_std = 0.1 + self.Q = np.diag(np.array([(0.5*dt*dt),(0.5*dt*dt),dt,dt]) * (accel_std*accel_std)) + + return + + def prediction_step(self): + # Make Sure Filter is Initialised + if self.state is not None: + x = self.state + P = self.covariance + + # Calculate Kalman Filter Prediction + + # State Prediction: x_predict = F * x + x_predict = np.matmul(self.F, x) + + # Covariance Prediction: P_predict = F * P * F' + Q + P_predict = np.matmul(self.F, np.matmul(P, np.transpose(self.F))) + self.Q + + # Save Predicted State + self.state = x_predict + self.covariance = P_predict + + return + + def update_step(self, measurement): + return + + +# Run the Simulation +run_sim(KalmanFilterModel, sim_options, {}) \ No newline at end of file diff --git a/assignment1_update.py b/assignment1_update.py new file mode 100644 index 0000000..607cdae --- /dev/null +++ b/assignment1_update.py @@ -0,0 +1,105 @@ +import numpy as np +from kfsims.tracker2d import run_sim +from kfsims.kfmodels import KalmanFilterBase + +# Simulation Options +sim_options = {'time_step': 0.01, + 'end_time': 120, + 'measurement_rate': 1, + 'measurement_noise_std': 10, + 'motion_type': 'straight', + 'start_at_origin': True, + 'start_at_random_speed': True, + 'start_at_random_heading': True, + 'draw_plots': True, + 'draw_animation': True} + +# Kalman Filter Model +class KalmanFilterModel(KalmanFilterBase): + + def initialise(self, time_step): + + # Set Initial State and Covariance + init_pos_std = 0 + init_vel_std = 0 + self.state = np.array([0,0,0,0]) + self.covariance = np.diag(np.array([init_pos_std*init_pos_std, + init_pos_std*init_pos_std, + init_vel_std*init_vel_std, + init_vel_std*init_vel_std])) + + # Setup the Model F Matrix + dt = time_step + self.F = np.array([[1,0,dt,0], + [0,1,0,dt], + [0,0,1,0], + [0,0,0,1]]) + + # Set the Q Matrix + accel_std = 0.0 + self.Q = np.diag(np.array([(0.5*dt*dt),(0.5*dt*dt),dt,dt]) * (accel_std*accel_std)) + + # Setup the Model H Matrix + #self.H = + + # Set the R Matrix + meas_std = 10.0 + #self.R = + + return + + def prediction_step(self): + # Make Sure Filter is Initialised + if self.state is not None: + x = self.state + P = self.covariance + + # Calculate Kalman Filter Prediction + x_predict = np.matmul(self.F, x) + P_predict = np.matmul(self.F, np.matmul(P, np.transpose(self.F))) + self.Q + + # Save Predicted State + self.state = x_predict + self.covariance = P_predict + + return + + def update_step(self, measurement): + + # Make Sure Filter is Initialised + if self.state is not None and self.covariance is not None: + x = self.state + P = self.covariance + + # Calculate Kalman Filter Update + z = np.array([measurement[0],measurement[1]]) + + # Predicted Measurement: z_hat = H * x + # z_hat = + + # Innovation: y = z - z_hat + y = None + + # Innovation Covariance: S = H * P * H' + R + S = None + + # Kalman Gain: K = P * H' * S^-1 + # K = + + # Kalman State Update: x_update = x + K*y + x_update = x + + # Kalman Covariance Update: P_update = (I - K*H)*P + P_update = P + + # Save Updated State + self.innovation = y + self.innovation_covariance = S + self.state = x_update + self.covariance = P_update + + return + + +# Run the Simulation +run_sim(KalmanFilterModel, sim_options, {}) \ No newline at end of file diff --git a/assignment1_update_answer.py b/assignment1_update_answer.py new file mode 100644 index 0000000..6e49107 --- /dev/null +++ b/assignment1_update_answer.py @@ -0,0 +1,97 @@ +import numpy as np +from kfsims.tracker2d import run_sim +from kfsims.kfmodels import KalmanFilterBase + +# Simulation Options +sim_options = {'time_step': 0.1, + 'end_time': 120, + 'measurement_rate': 1, + 'measurement_noise_std': 10, + 'motion_type': 'straight', + 'start_at_origin': True, + 'start_at_random_speed': True, + 'start_at_random_heading': True, + 'draw_plots': True, + 'draw_animation': True} + +# Kalman Filter Model +class KalmanFilterModel(KalmanFilterBase): + + def initialise(self, time_step): + + # Set Initial State and Covariance + init_pos_std = 0 + init_vel_std = 0 + self.state = np.array([0,0,0,0]) + self.covariance = np.diag(np.array([init_pos_std*init_pos_std, + init_pos_std*init_pos_std, + init_vel_std*init_vel_std, + init_vel_std*init_vel_std])) + + # Setup the Model F Matrix + dt = time_step + self.F = np.array([[1,0,dt,0], + [0,1,0,dt], + [0,0,1,0], + [0,0,0,1]]) + + # Set the Q Matrix + accel_std = 0.1 + self.Q = np.diag(np.array([(0.5*dt*dt),(0.5*dt*dt),dt,dt]) * (accel_std*accel_std)) + + # Setup the Model H Matrix + self.H = np.array([[1,0,0,0],[0,1,0,0]]) + + # Set the R Matrix + meas_std = 10.0 + self.R = np.diag([meas_std*meas_std, meas_std*meas_std]) + + return + + def prediction_step(self): + # Make Sure Filter is Initialised + if self.state is not None: + x = self.state + P = self.covariance + + # Calculate Kalman Filter Prediction + x_predict = np.matmul(self.F, x) + P_predict = np.matmul(self.F, np.matmul(P, np.transpose(self.F))) + self.Q + + # Save Predicted State + self.state = x_predict + self.covariance = P_predict + + return + + def update_step(self, measurement): + + # Make Sure Filter is Initialised + if self.state is not None and self.covariance is not None: + x = self.state + P = self.covariance + H = self.H + R = self.R + + # Calculate Kalman Filter Update + z = np.array([measurement[0],measurement[1]]) + z_hat = np.matmul(H, x) + + y = z - z_hat + S = np.matmul(H,np.matmul(P,np.transpose(H))) + R + K = np.matmul(P,np.matmul(np.transpose(H),np.linalg.inv(S))) + + x_update = x + np.matmul(K, y) + P_update = np.matmul( (np.eye(4) - np.matmul(K,H)), P) + + # Save Updated State + self.innovation = y + self.innovation_covariance = S + self.state = x_update + self.covariance = P_update + + return + + +# Run the Simulation +run_sim(KalmanFilterModel, sim_options, {}) \ No newline at end of file diff --git a/kfsims/__init__.py b/kfsims/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/kfsims/kfmodels.py b/kfsims/kfmodels.py new file mode 100644 index 0000000..8ce7c2a --- /dev/null +++ b/kfsims/kfmodels.py @@ -0,0 +1,28 @@ +class KalmanFilterBase: + + def __init__(self): + self.state = None + self.covariance = None + self.innovation = None + self.innovation_covariance = None + + def get_current_state(self): + if self.state is not None: + return self.state.tolist() + return None + + def get_current_covariance(self): + if self.covariance is not None: + return self.covariance.tolist() + return None + + def get_last_innovation(self): + if self.innovation is not None: + return self.innovation.tolist() + return None + + def get_last_innovation_covariance(self): + if self.innovation_covariance is not None: + return self.innovation_covariance.tolist() + return None + diff --git a/kfsims/kftracker2d.py b/kfsims/kftracker2d.py new file mode 100644 index 0000000..fec0221 --- /dev/null +++ b/kfsims/kftracker2d.py @@ -0,0 +1,77 @@ +import numpy as np +from .kfmodels import KalmanFilterBase + +# Kalman Filter Model +class KalmanFilterModel(KalmanFilterBase): + + def initialise(self, time_step, accel_std, meas_std, init_on_measurement=False, init_pos_std = 500, init_vel_std = 50): + dt = time_step + + # Set Model F and H Matrices + self.F = np.array([[1,0,dt,0], + [0,1,0,dt], + [0,0,1,0], + [0,0,0,1]]) + + self.H = np.array([[1,0,0,0],[0,1,0,0]]) + + # Set R and Q Matrices + self.Q = np.diag(np.array([(0.5*dt*dt),(0.5*dt*dt),dt,dt]) * (accel_std*accel_std)) + self.R = np.diag([meas_std*meas_std, meas_std*meas_std]) + + # Set Initial State and Covariance + if init_on_measurement is False: + self.state = np.array([0,0,0,0]) # Assume we are at zero position and velocity + self.covariance = np.diag(np.array([init_pos_std*init_pos_std,init_pos_std*init_pos_std,init_vel_std*init_vel_std,init_vel_std*init_vel_std])) + + return + + def prediction_step(self): + # Make Sure Filter is Initialised + if self.state is not None: + x = self.state + P = self.covariance + + # Calculate Kalman Filter Prediction + x_predict = np.matmul(self.F, x) + P_predict = np.matmul(self.F, np.matmul(P, np.transpose(self.F))) + self.Q + + # Save Predicted State + self.state = x_predict + self.covariance = P_predict + + return + + def update_step(self, measurement): + + # Make Sure Filter is Initialised + if self.state is not None and self.covariance is not None: + x = self.state + P = self.covariance + H = self.H + R = self.R + + # Calculate Kalman Filter Update + z = np.array([measurement[0],measurement[1]]) + z_hat = np.matmul(H, x) + + y = z - z_hat + S = np.matmul(H,np.matmul(P,np.transpose(H))) + R + K = np.matmul(P,np.matmul(np.transpose(H),np.linalg.inv(S))) + + x_update = x + np.matmul(K, y) + P_update = np.matmul( (np.eye(4) - np.matmul(K,H)), P) + + # Save Updated State + self.innovation = y + self.innovation_covariance = S + self.state = x_update + self.covariance = P_update + + else: + + # Set Initial State and Covariance + self.state = np.array([measurement[0],measurement[1],0,0]) + self.covariance = np.diag(np.array([self.R[0,0],self.R[1,1],10,10])) # Assume we don't know our velocity + + return \ No newline at end of file diff --git a/kfsims/tracker2d.py b/kfsims/tracker2d.py new file mode 100644 index 0000000..ca2219b --- /dev/null +++ b/kfsims/tracker2d.py @@ -0,0 +1,272 @@ +import numpy as np +import matplotlib +#matplotlib.use("QtAgg") # or "MacOSX" if Qt isn’t installed +import matplotlib.pyplot as plt +import matplotlib.animation as animation +import matplotlib.gridspec as gridspec +from .vehiclemodel2d import VehicleModel2D + + + +def run_sim(KalmanFilterModel, sim_option, kf_options): + + dt = sim_option['time_step'] + end_time = sim_option['end_time'] + measurement_rate = sim_option['measurement_rate'] + measurement_noise_std = sim_option['measurement_noise_std'] + motion_type = sim_option['motion_type'] + draw_plots = sim_option['draw_plots'] + draw_animation = sim_option['draw_animation'] + start_at_origin = sim_option['start_at_origin'] + start_at_random_speed = sim_option['start_at_random_speed'] + start_at_random_heading = sim_option['start_at_random_heading'] + + initial_x_position = 0 + initial_y_position = 0 + if start_at_origin is False: + initial_x_position = 1000*(np.random.rand()-0.5) + initial_y_position = 1000*(np.random.rand()-0.5) + + initial_speed = 10 + if start_at_random_speed is True: + initial_speed = np.random.rand() * 20 + + initial_heading = 45 + if start_at_random_heading is True: + initial_heading = 180*(np.random.rand()-0.5) + + + vehicle_params = {'initial_x_position': initial_x_position, + 'initial_y_position': initial_y_position, + 'initial_heading': np.deg2rad(initial_heading), + 'initial_speed': initial_speed} + + + + + meas_steps = np.ceil((1/measurement_rate)/dt).astype(int) + num_steps = np.ceil(end_time/dt).astype(int) + + # Create the Simulation Objects + vehicle_model = VehicleModel2D() + kalman_filter = KalmanFilterModel() + + # Initialise the Components + vehicle_model.initialise(vehicle_params) + kalman_filter.initialise(dt, **kf_options) + + + # Save the Initial States + time_history = np.linspace(0.0, dt*num_steps, num_steps+1) + vehicle_position_history = [vehicle_model.get_position()] + vehicle_velocity_history = [vehicle_model.get_velocity()] + measurement_history = [None] + estimated_state_history = [kalman_filter.get_current_state()] + estimated_covariance_history = [kalman_filter.get_current_covariance()] + estimated_error_history = [None] + + measurement_innovation_history = [None] + measurement_innovation_covariance_history = [None] + + rand_param = np.random.randn()*0.1 + + # Run the Simulation + for k in range(1, num_steps+1): + + # Update the Vehicle Model + if motion_type == 'random': + vehicle_model.update_vehicle(dt, np.random.randn()*2 , np.random.randn()*2) + elif motion_type == 'circle': + vehicle_model.update_vehicle(dt, 0 , np.deg2rad(360/120)) + elif motion_type == 'linear': + vehicle_model.update_vehicle(dt,rand_param,0) + else: + vehicle_model.update_vehicle(dt, 0, 0) + + vehicle_position = vehicle_model.get_position() + vehicle_velocity = vehicle_model.get_velocity() + + # KF Prediction + kalman_filter.prediction_step() + + # KF Measurement + measurement = None + if (k % meas_steps) == 0: + x_meas = vehicle_position[0] + np.random.randn()*measurement_noise_std + y_meas = vehicle_position[1] + np.random.randn()*measurement_noise_std + measurement = [x_meas,y_meas] + kalman_filter.update_step(measurement) + measurement_innovation_history.append(kalman_filter.get_last_innovation()) + measurement_innovation_covariance_history.append(kalman_filter.get_last_innovation_covariance()) + + # Estimation Error + estimation_error = None + estimated_state = kalman_filter.get_current_state() + if estimated_state is not None: + estimation_error = [estimated_state[0] - vehicle_position[0], + estimated_state[1] - vehicle_position[1], + estimated_state[2] - vehicle_velocity[0], + estimated_state[3] - vehicle_velocity[1]] + + # Save Data + vehicle_position_history.append(vehicle_model.get_position()) + vehicle_velocity_history.append(vehicle_model.get_velocity()) + measurement_history.append(measurement) + estimated_state_history.append(kalman_filter.get_current_state()) + estimated_covariance_history.append(kalman_filter.get_current_covariance()) + estimated_error_history.append(estimation_error) + + # Calculate Stats + x_innov_std = np.std([v[0] for v in measurement_innovation_history if v is not None]) + y_innov_std = np.std([v[1] for v in measurement_innovation_history if v is not None]) + pos_mse = np.mean([(v[0]**2+v[1]**2) for v in estimated_error_history if v is not None]) + vel_mse = np.mean([(v[2]**2+v[3]**2) for v in estimated_error_history if v is not None]) + print('X Position Measurement Innovation Std: {} (m)'.format(x_innov_std)) + print('Y Position Measurement Innovation Std: {} (m)'.format(y_innov_std)) + print('Position Mean Squared Error: {} (m)^2'.format(pos_mse)) + print('Velocity Mean Squared Error: {} (m/s)^2'.format(vel_mse)) + + + + if draw_plots is True: + # Plot Analysis + fig1 = plt.figure(constrained_layout=True) + fig1_gs = gridspec.GridSpec(ncols=2, nrows=4, figure=fig1) + + fig1_ax1 = fig1.add_subplot(fig1_gs[0,0],title='X Position') + fig1_ax2 = fig1.add_subplot(fig1_gs[1,0],title='Y Position') + fig1_ax3 = fig1.add_subplot(fig1_gs[2,0],title='X Velocity') + fig1_ax4 = fig1.add_subplot(fig1_gs[3,0],title='Y Velocity') + fig1_ax5 = fig1.add_subplot(fig1_gs[0,1],title='X Position Error') + fig1_ax6 = fig1.add_subplot(fig1_gs[1,1],title='Y Position Error') + fig1_ax7 = fig1.add_subplot(fig1_gs[2,1],title='X Velocity Error') + fig1_ax8 = fig1.add_subplot(fig1_gs[3,1],title='Y Velocity Error') + fig1_ax1.grid(True) + fig1_ax2.grid(True) + fig1_ax3.grid(True) + fig1_ax4.grid(True) + fig1_ax5.grid(True) + fig1_ax6.grid(True) + fig1_ax7.grid(True) + fig1_ax8.grid(True) + fig1_ax4.set_xlabel('Time (sec)') + fig1_ax8.set_xlabel('Time (sec)') + fig1_ax1.set_ylabel('X Position (m)') + fig1_ax2.set_ylabel('Y Position (m)') + fig1_ax3.set_ylabel('X Velocity (m/s)') + fig1_ax4.set_ylabel('Y Velocity (m/s)') + + # Plot Vehicle State + fig1_ax1.plot(time_history, [v[0] for v in vehicle_position_history], 'b') + fig1_ax2.plot(time_history, [v[1] for v in vehicle_position_history], 'b') + fig1_ax3.plot(time_history, [v[0] for v in vehicle_velocity_history], 'b') + fig1_ax4.plot(time_history, [v[1] for v in vehicle_velocity_history], 'b') + + # Plot Estimated States + time_plot = [t for t,v in zip(time_history, estimated_state_history) if v is not None ] + fig1_ax1.plot(time_plot, [v[0] for v in estimated_state_history if v is not None], 'r') + fig1_ax2.plot(time_plot, [v[1] for v in estimated_state_history if v is not None], 'r') + fig1_ax3.plot(time_plot, [v[2] for v in estimated_state_history if v is not None], 'r') + fig1_ax4.plot(time_plot, [v[3] for v in estimated_state_history if v is not None], 'r') + + # Plot Measurements + time_plot = [t for t,v in zip(time_history, measurement_history) if v is not None ] + fig1_ax1.plot(time_plot, [v[0] for v in measurement_history if v is not None], 'k+') + fig1_ax2.plot(time_plot, [v[1] for v in measurement_history if v is not None], 'k+') + + # Plot Errors + time_plot = [t for t,v in zip(time_history, estimated_error_history) if v is not None ] + fig1_ax5.plot(time_plot, [v[0] for v in estimated_error_history if v is not None], 'r') + fig1_ax6.plot(time_plot, [v[1] for v in estimated_error_history if v is not None], 'r') + fig1_ax7.plot(time_plot, [v[2] for v in estimated_error_history if v is not None], 'r') + fig1_ax8.plot(time_plot, [v[3] for v in estimated_error_history if v is not None], 'r') + time_plot = [t for t,v in zip(time_history, estimated_covariance_history) if v is not None ] + fig1_ax5.plot(time_plot, [3.0*np.sqrt(v[0][0]) for v in estimated_covariance_history if v is not None], 'g') + fig1_ax6.plot(time_plot, [3.0*np.sqrt(v[1][1]) for v in estimated_covariance_history if v is not None], 'g') + fig1_ax7.plot(time_plot, [3.0*np.sqrt(v[2][2]) for v in estimated_covariance_history if v is not None], 'g') + fig1_ax8.plot(time_plot, [3.0*np.sqrt(v[3][3]) for v in estimated_covariance_history if v is not None], 'g') + fig1_ax5.plot(time_plot, [-3.0*np.sqrt(v[0][0]) for v in estimated_covariance_history if v is not None], 'g') + fig1_ax6.plot(time_plot, [-3.0*np.sqrt(v[1][1]) for v in estimated_covariance_history if v is not None], 'g') + fig1_ax7.plot(time_plot, [-3.0*np.sqrt(v[2][2]) for v in estimated_covariance_history if v is not None], 'g') + fig1_ax8.plot(time_plot, [-3.0*np.sqrt(v[3][3]) for v in estimated_covariance_history if v is not None], 'g') + + fig2 = plt.figure(constrained_layout=True) + fig2_gs = gridspec.GridSpec(ncols=1, nrows=2, figure=fig2) + + fig2_ax1 = fig2.add_subplot(fig2_gs[0,0],title='X Measurement Innovation') + fig2_ax2 = fig2.add_subplot(fig2_gs[1,0],title='Y Measurement Innovation') + fig2_ax1.grid(True) + fig2_ax2.grid(True) + fig2_ax2.set_xlabel('Time (sec)') + + time_plot = [t for t,v in zip(time_history, measurement_innovation_history) if v is not None ] + fig2_ax1.plot(time_plot, [v[0] for v in measurement_innovation_history if v is not None], 'b') + fig2_ax2.plot(time_plot, [v[1] for v in measurement_innovation_history if v is not None], 'b') + time_plot = [t for t,v in zip(time_history, measurement_innovation_covariance_history) if v is not None ] + fig2_ax1.plot(time_plot, [1.0*np.sqrt(v[0][0]) for v in measurement_innovation_covariance_history if v is not None], 'g--') + fig2_ax1.plot(time_plot, [-1.0*np.sqrt(v[0][0]) for v in measurement_innovation_covariance_history if v is not None], 'g--') + fig2_ax2.plot(time_plot, [1.0*np.sqrt(v[1][1]) for v in measurement_innovation_covariance_history if v is not None], 'g--') + fig2_ax2.plot(time_plot, [-1.0*np.sqrt(v[1][1]) for v in measurement_innovation_covariance_history if v is not None], 'g--') + + if draw_animation is True: + + # Plot Animation + fig2 = plt.figure(constrained_layout=True) + fig_ax2 = fig2.add_subplot(111,title='2D Position', aspect='equal')#, autoscale_on=True, xlim=(0, 1000), ylim=(0, 1000)) + fig_ax2.set_xlabel('X Position (m)') + fig_ax2.set_ylabel('Y Position (m)') + fig_ax2.grid(True) + vehicle_plot, = fig_ax2.plot([], [], 'bo', linestyle='None', markersize=6) + meas_plot, = fig_ax2.plot([], [], '+k') + estimate_plot, = fig_ax2.plot([], [], 'ro', linestyle='None', markersize=6) + estimate_var_plot, = fig_ax2.plot([], [], 'g-') + + def update_plot(i): + # Plot Vehicle + vx, vy = vehicle_position_history[i] + vehicle_plot.set_data([vx], [vy]) + + # Plot Measurements + x_data = [meas[0] for meas in measurement_history[1:i] if meas is not None] + y_data = [meas[1] for meas in measurement_history[1:i] if meas is not None] + meas_plot.set_data(x_data, y_data) + + # Plot Estimates + if estimated_state_history[i] is not None: + est_xpos = estimated_state_history[i][0] + est_ypos = estimated_state_history[i][1] + estimate_plot.set_data([est_xpos], [est_ypos]) + if estimated_covariance_history[i] is not None: + cov = estimated_covariance_history[i] + cov_mat = np.array([[cov[0][0], cov[0][1]], [cov[1][0], cov[1][1]]]) + U, S, V = np.linalg.svd(cov_mat) + theta = np.linspace(0, 2 * np.pi, 100) + theta_mat = np.array([np.cos(theta), np.sin(theta)]) + D = np.matmul(np.matmul(U, np.diag(3.0 * np.sqrt(S))), theta_mat) + estimate_var_plot.set_data([x + est_xpos for x in D[0]], [y + est_ypos for y in D[1]]) + + fig_ax2.relim() + fig_ax2.autoscale_view() + + return vehicle_plot, meas_plot, estimate_plot, estimate_var_plot + + # # Create the Animation + plot_animation = animation.FuncAnimation(fig2, update_plot, frames=range(0, num_steps, 50), interval=100, repeat=False, blit=False) + # Prevent garbage collection of the animation by holding a strong reference + fig2._anim = plot_animation + + def _on_close(_evt): + try: + anim = getattr(fig2, '_anim', None) + if anim is not None and getattr(anim, 'event_source', None) is not None: + anim.event_source.stop() + except Exception: + pass + # let Matplotlib handle the actual close + fig2.canvas.mpl_connect('close_event', _on_close) + + + # Show Animation + plt.show() + + return (pos_mse, vel_mse) \ No newline at end of file diff --git a/kfsims/vehiclemodel2d.py b/kfsims/vehiclemodel2d.py new file mode 100644 index 0000000..7522ee7 --- /dev/null +++ b/kfsims/vehiclemodel2d.py @@ -0,0 +1,37 @@ +import numpy as np + +class VehicleModel2D(): + + def __init__(self): + self.x_pos = 0 + self.y_pos = 0 + self.vel = 0 + self.yaw = 0 + + def initialise(self, vehicle_params): + self.x_pos = vehicle_params['initial_x_position'] + self.y_pos = vehicle_params['initial_y_position'] + self.yaw = vehicle_params['initial_heading'] + self.vel = vehicle_params['initial_speed'] + + def update_vehicle(self, time_step, accel, yaw_rate): + self.vel = self.vel + accel * time_step + self.yaw = self.yaw + yaw_rate * time_step + self.x_pos = self.x_pos + self.vel * np.cos(self.yaw) * time_step + self.y_pos = self.y_pos + self.vel * np.sin(self.yaw) * time_step + return + + def get_position(self): + return [self.x_pos, self.y_pos] + + def get_velocity(self): + x_vel = self.vel * np.cos(self.yaw) + y_vel = self.vel * np.sin(self.yaw) + return [x_vel, y_vel] + + def get_speed(self): + return self.vel + + def get_heading(self): + return self.yaw +