Source code for pyphysim.ia.iabase

#!/usr/bin/env python
"""
Module containing the base class for Interference Alignment (IA)
Algorithms.

This module should probably only be imported in the other modules inside
the 'ia' package that implement the IA algorithms.
"""

from abc import ABCMeta, abstractmethod
from typing import Any, List, Optional, Sequence, TypeVar, Union, cast

import numpy as np

import pyphysim.channels.multiuser as muchannels

from ..util.conversion import linear2dB
from ..util.misc import leig, randn_c_RS

NumberOrArray = TypeVar("NumberOrArray", np.ndarray, float)


# xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# xxxxxxxxxxxxxxx Base Class for all IA Algorithms xxxxxxxxxxxxxxxxxxxxxxxx
# xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
[docs]class IASolverBaseClass: # pylint: disable=R0902 """ Base class for all Interference Alignment Algorithms. At least the `_updateW`, `_updateF` and `solve` methods must be implemented in the subclasses of IASolverBaseClass, where the `solve` method uses the `_updateW` and `_updateF` methods in its implementation. The implementation of the `_updateW` method should call the `_clear_receive_filter` method in the beginning and after that set either the _W or the _W_H variables with the correct value. The implementation of the `_updateF` method should call the clear_precoder_filter in the beginning and after that set _W variable with the correct precoder (normalized to have a Frobenius norm equal to one. The implementation of the `_updateF` method must set the _F variable with the correct value. Another method that can be implemented is the get_cost method. It should return the cost of the current IA solution. What is considered "the cost" varies from one IA algorithm to another, but should always be a real non-negative number. If get_cost is not implemented a value of -1 is returned. Parameters ---------- multiUserChannel : muchannels.MultiUserChannelMatrix The multiuser channel. """ # The IASolverBaseClass is an abstract class and the 'solve' method # (marked as abstract) must be implemented in a subclass. __metaclass__ = ABCMeta def __init__(self, multiUserChannel: muchannels.MultiUserChannelMatrix): # xxxxxxxxxx Private attributes xxxxxxxxxxxxxxx if not isinstance(multiUserChannel, muchannels.MultiUserChannelMatrix): raise ValueError("multiUserChannel must be an object of the " "MultiUserChannelMatrix class (or a subclass).") # Channel of all users self._multiUserChannel = multiUserChannel # Number of streams per user self._Ns: Optional[np.ndarray] = None # Power of each user (P is an 1D numpy array). If not set (_P is # None), then a power of 1 will be used for each transmitter. self._P: Optional[np.ndarray] = None # xxxxxxxxxx Precoder and receive filters xxxxxxxxxxxxxxxxxxxxxxxxx # These are numpy arrays of numpy arrays # Precoder: One precoder for each user self._F: Optional[np.ndarray] = None # Precoder: Same as _F, but scaled with the correct power value in # self.P self._full_F: Optional[np.ndarray] = None # Receive filter: One for each user self._W: Optional[np.ndarray] = None # Receive filter (hermitian): One for each user self._W_H: Optional[np.ndarray] = None self._full_W_H: Optional[np.ndarray] = None self._full_W: Optional[np.ndarray] = None # xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx # xxxxxxxxxx Other member variables xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx # RandomState object used to randomize the precoder self._rs = np.random.RandomState() # xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
[docs] def _clear_receive_filter(self) -> None: """ Clear the receive filter. This should be called in the beginning of the implementation of the `_updateW` method in subclasses. """ self._W = None self._W_H = None self._full_W_H = None self._full_W = None
[docs] def _clear_precoder_filter(self) -> None: """ Clear the precoder filter. This should be called in the beginning of the implementation of the `_updateF` method in subclasses. """ self._F = None self._full_F = None
[docs] def clear(self) -> None: """ Clear the IA Solver object. All member attributes that are updated during the solve method, such as the precoder and receive filters, will be cleared. The other attributes that correspond to "configuration" such as the channel object won't be changed. Notes ----- You should overwrite this method in subclasses that pass parameters to the __init__ method, since here we call __init__ without arguments which is probably not what you want. """ # The F and W variables will be numpy arrays of numpy arrays. self._clear_precoder_filter() # Set _F and _full_F to None self._clear_receive_filter() # Set _W, _W_H and _full_W_H to None self._P = None self._Ns = None
# Don't clear the self._noise_var attribute
[docs] def get_cost(self) -> float: # pylint: disable=R0201 """ Get the current cost of the IA Solution. This method should be implemented in subclasses and return a number greater than or equal to zero.. Returns ------- cost : float The Cost of the current IA solution (a real non-negative number). """ return -1
@property def noise_var(self) -> float: """ Get method for the noise_var property. Returns ------- float The noise variance (a real non-negative number). """ noise_var = self._multiUserChannel.noise_var if noise_var is None: return 0.0 return noise_var @property def F(self) -> np.ndarray: """ Transmit precoder of all users. Returns ------- np.ndarray The precoders of all users (a 1D numpy array of 2D numpy arrays). """ return self._F @property def full_F(self) -> np.ndarray: """ Transmit precoder of all users. Returns ------- np.ndarray The precoders of all users (a 1D numpy array of 2D numpy arrays). """ if self._full_F is None: self._full_F = self._F * np.sqrt(self.P) return self._full_F # noinspection PyUnresolvedReferences
[docs] def set_precoders(self, F: Optional[Sequence[np.ndarray]] = None, full_F: Optional[Sequence[np.ndarray]] = None, P: Optional[np.ndarray] = None) -> None: """ Set the precoders of each user. Either `F` or `full_F` (or both of them) must be provided. If only `full_F` is provided then the value of `F` will be calculated from `full_F`. In any case, the value of `self.Ns` will be updated according to the dimensions of the `F` or `full_F`. Parameters ---------- F : np.ndarray | list[np.ndarray], optional A numpy array where each element is the (normalized) precoder (a 2D numpy array) of one user. full_F : np.ndarray | list[np.ndarray], optional A numpy array where each element is the precoder (a 2D numpy array) of one user. P : np.ndarray, optional The maximum transmit power. If not provided the current value of self.P will be kept as it is. """ if F is None and full_F is None: raise RuntimeError("Either 'F' or 'full_F' must be provided.") self._clear_precoder_filter() if P is not None: self._P = P self._full_F = full_F if F is None: assert (full_F is not None) K = len(full_F) self._F = np.empty(K, dtype=np.ndarray) for k in range(K): self._F[k] = full_F[k] / np.linalg.norm(full_F[k], 'fro') else: self._F = F # Update the number of streams self._Ns = np.empty(self.K, dtype=int) for k in range(self.K): self._Ns[k] = self._F[k].shape[1]
# The W property should return a receive filter that can be directly # multiplied (no need to calculate the hermitian of W) by the received # signal to cancel interference and compensate the effect of the # channel. @property def W(self) -> np.ndarray: """ Receive filter of all users. Returns ------- np.ndarray The receive filter of all users. (a 1D numpy array of 2D numpy arrays). """ # If self._W is None but self._W_H is not None than we need to # update self_W from self._W_H. if self._W is None: if self._W_H is not None: self._W = np.empty(self.K, dtype=np.ndarray) for k in range(self.K): self._W[k] = self._W_H[k].conj().T return self._W @property def W_H(self) -> np.ndarray: """ Get method for the W_H property. Returns ------- np.ndarray The conjugate of the receive filter of all users. (a 1D numpy array of 2D numpy arrays). """ # If self._W_H is None but self._W is not None than we need to # update self_W_H from self._W. if self._W_H is None: if self._W is not None: self._W_H = np.empty(self.K, dtype=np.ndarray) for k in range(self.K): self._W_H[k] = self._W[k].conj().T return self._W_H @property def full_W_H(self) -> np.ndarray: """ Get method for the full_W_H property. The full_W_H property returns the equivalent filter of the IA filter plus the post processing filter. Returns ------- np.ndarray The equivalent filter of the IA filter plus the post processing filter. """ if self._full_W_H is None: if self.W_H is not None: self._full_W_H = np.empty(self.K, dtype=np.ndarray) for k in range(self.K): # Equivalent channel with the effect of the precoder, # channel and receive filter Hieq = self._calc_equivalent_channel(k) # TODO: Put this in a try-except block for the case # that Hieq is singular (a linalg.LinAlgError exception # is thrown). In order to handle the exception, you # could set full_W_H to just W_H or you could try the # stream reduction (but stream reduction should be # performed at the precoders too) self._full_W_H[k] = np.linalg.solve(Hieq, self.W_H[k]) return self._full_W_H @property def full_W(self) -> np.ndarray: """ Get method for the full_W property. The full_W property returns the equivalent filter of the IA filter plus the post processing filter. Returns ------- np.ndarray the equivalent filter of the IA filter plus the post processing filter. """ if self._full_W is None: self._full_W = np.empty(self.K, dtype=np.ndarray) for k in range(self.K): self._full_W[k] = self.full_W_H[k].conj().T return self._full_W
[docs] def set_receive_filters(self, W_H: Optional[Sequence[np.ndarray]] = None, W: Optional[Sequence[np.ndarray]] = None) -> None: """ Set the receive filters. You only need to pass either `W_H` or `W`, but not both of them, since one is calculated from the other. Parameters ---------- W_H : np.ndarray | list[np.ndarray] A numpy array where each element is the receive filter (a 2D numpy array) of one user. This is a 1D numpy array of 2D numpy arrays. W : np.ndarray | list[np.ndarray] A numpy array where each element is the receive filter (a 2D numpy array) of one user. This is a 1D numpy array of 2D numpy arrays. """ self._clear_receive_filter() if W is None and W_H is None: raise RuntimeError("Either 'W' or 'W_H' must be provided.") if W is not None and W_H is not None: raise RuntimeError("Either 'W' or 'W_H' must be provided (" "but not both of them.") self._W = W self._W_H = W_H
[docs] def _calc_equivalent_channel(self, k: int) -> np.ndarray: """ Calculates the equivalent channel for user :math:`k` considering the effect of the precoder (including transmit power), the actual channel, and the receive filter (without power compensation). Parameters ---------- k : int The index of the desired user. Returns ------- Hk_eq : np.ndarray The equivalent channel. Notes ----- This method is used only internally in order to calculate the "W" get property so that the returned filter W compensates the effect of the direct channel. """ # Note that here Wk_H is the self.Wk_H property and not the # self.full_W_H property. Since _calc_equivalent_channel is used in # the full_W_H get property if we had used self.full_W_H here we # would get an infinity recursion. Wk_H = self.W_H[k] full_Fk = self.full_F[k] Hkk = self._get_channel(k, k) Hk_eq = Wk_H.dot(Hkk.dot(full_Fk)) return Hk_eq
@property def P(self) -> np.ndarray: """ Transmit power of all users. Returns ------- np.ndarray The power of all users. """ if self._P is None: return np.ones(self.K, dtype=float) return self._P @P.setter def P(self, value: Optional[NumberOrArray]) -> None: """Transmit power of all users. Parameters ---------- value : float | np.ndarray The new power of all users. """ if value is None: # Note that if self._P is None then the getter property will # return a numpy array of ones with the appropriated size. self._P = None elif np.isscalar(value): if value > 0.0: self._P = np.ones(self.K, dtype=float) * value else: raise ValueError("P cannot be negative or equal to zero.") else: assert (not isinstance(value, float)) if len(value) != self.K: raise ValueError("P must be set to a sequence of length K") value = np.array(value) assert (isinstance(value, np.ndarray)) if np.all(value > 0.0): self._P = np.array(value) else: raise ValueError("P cannot be negative or equal to zero.") @property def Ns(self) -> np.ndarray: """ Number of streams of all users. Returns ------- Ns : np.ndarray Number of streams of all users. """ return self._Ns # xxxxx Properties to read the channel related variables xxxxxxxxxxxxxx @property def K(self) -> int: """ The number of users. Returns ------- K : int The number of users. """ K = self._multiUserChannel.K assert K > 0, ("You must initialize the channel object before" " using the IA solver object.") return K @property def Nr(self) -> np.ndarray: """ Number of receive antennas of all users. Returns ------- Nr : np.ndarray Number of receive antennas of all users. """ return self._multiUserChannel.Nr @property def Nt(self) -> np.ndarray: """ Number of transmit antennas of all users. Returns ------- Nt : np.ndarray Number of transmit antennas of all users. """ return self._multiUserChannel.Nt
[docs] def randomizeF(self, Ns: Union[int, List[int], Sequence[int]], P: Optional[np.ndarray] = None) -> None: """ Generates a random precoder for each user. Parameters ---------- Ns : int | list[int] | np.ndarray Number of streams of each user. P : np.ndarray, optional Power of each user. If not provided, a value of 1 will be used for each user. """ self._clear_precoder_filter() if isinstance(Ns, int): Ns = np.ones(self.K, dtype=int) * Ns assert (not isinstance(Ns, int)) self.P = P # Local function that returns a normalized version of the input # numpy array def normalized(A: np.ndarray) -> np.ndarray: return A / np.linalg.norm(A, 'fro') self._F = np.zeros(self.K, dtype=np.ndarray) for k in range(self.K): self._F[k] = normalized(randn_c_RS(self._rs, self.Nt[k], Ns[k])) # This will create a new array so that we can modify self._Ns # internally without changing the original Ns variable passed to # the randomizeF method. self._Ns = np.array(Ns)
# This method is just an alias for the get_channel method of the # multiuserchannel object associated with the IA Solver.xs
[docs] def _get_channel(self, k: int, l: int) -> np.ndarray: """ Get the channel from transmitter l to receiver k. Parameters ---------- l : int Transmitting user. k : int Receiving user. Returns ------- H : np.ndarray The channel matrix between transmitter l and receiver k. """ return self._multiUserChannel.get_Hkl(k, l)
[docs] def _get_channel_rev(self, k: int, l: int) -> np.ndarray: """ Get the channel from transmitter l to receiver k in the reverse network. Let the matrix :math:`\\mtH_{kl}` be the channel matrix between the transmitter :math:`l` to receiver :math:`k` in the direct network. The channel matrix between the transmitter :math:`l` to receiver :math:`k` in the reverse network, denoted as :math:`\\overleftarrow{\\mtH}_{kl}`, is then given by :math:`\\overleftarrow{\\mtH}_{kl} = \\mtH_{lk}^\\dagger` where :math:`\\mtA^\\dagger` is the conjugate transpose of :math:`\\mtA`. Parameters ---------- l : int Transmitting user of the reverse network. k : int Receiving user of the reverse network. Returns ------- H : np.ndarray The channel matrix between transmitter l and receiver k in the reverse network. Notes ----- See Section III of [Cadambe2008]_ for details. """ return self._get_channel(l, k).transpose().conjugate()
# noinspection PyPep8
[docs] def calc_Q(self, k: int) -> np.ndarray: """ Calculates the interference covariance matrix at the :math:`k`-th receiver. The interference covariance matrix at the :math:`k`-th receiver, :math:`\\mtQ k`, is given by .. math:: \\mtQ k = \\sum_{j=1, j \\neq k}^{K} \\frac{P_j}{Ns_j} \\mtH_{kj} \\mtF_j \\mtF_j^H \\mtH_{kj}^H where :math:`P_j` is the transmit power of transmitter :math:`j`, and :math:`Ns_j` is the number of streams for user :math:`j`. Parameters ---------- k : int Index of the desired receiver. Returns ------- Qk : np.ndarray The interference covariance matrix at receiver :math:`k`. Notes ----- This is impacted by the self.P attribute. """ Qk = self._multiUserChannel.calc_Q(k, self.full_F) return Qk
# This method must be tested in a subclass of IASolverBaseClass, since # we need the receive filter and IASolverBaseClass does not know how to # calculate it
[docs] def calc_Q_rev(self, k: int) -> np.ndarray: """ Calculates the interference covariance matrix at the :math:`k`-th receiver in the reverse network. Parameters ---------- k : int Index of the desired receiver. Returns ------- Qk_rev : np.ndarray The interference covariance matrix at receiver :math:`k` in the reverse network. See also -------- calc_Q """ P = self.P interfering_users = set(range(self.K)) - {k} Qk = np.zeros([self.Nt[k], self.Nt[k]], dtype=complex) for l in interfering_users: # The lets make sure the receive filter norm is equal to one so # that we can correctly scale it to the desired power. assert (isinstance(self._W, np.ndarray)) assert np.linalg.norm(self._W[l], 'fro') - 1.0 < 1e-6 Hkl_F_rev = np.dot(self._get_channel_rev(k, l), self._W[l]) Qk = Qk + np.dot(P[l] * Hkl_F_rev, Hkl_F_rev.conjugate().T) return Qk
# noinspection PyPep8
[docs] def calc_remaining_interference_percentage(self, k: int, Qk: Optional[np.ndarray] = None ) -> float: """ Calculates the percentage of the interference in the desired signal space according to equation (30) in [Cadambe2008]_. The percentage :math:`p_k` of the interference in the desired signal space is given by .. math:: p_k = \\frac{\\sum_{j=1}^{Ns[k]} \\lambda_j [\\mtQ k]}{Tr[\\mtQ k]} where :math:`\\lambda_j[\\mtA]` denotes the :math:`j`-th smallest eigenvalue of :math:`\\mtA`. Parameters ---------- k : int The index of the desired user. Qk : np.ndarray The covariance matrix of the remaining interference at receiver k. If not provided, it will be automatically calculated. In that case, the `P` attribute will also be taken into account if it is set. Returns ------- float The percentage of the interference in the desired signal space. Notes ----- `Qk` must be a symmetric matrix so that its eigenvalues are real and positive (any covariance matrix is a symmetric matrix). """ # $$p_k = \frac{\sum_{j=1}^{Ns[k]} \lambda_j [\mtQ k]}{Tr[\mtQ k]}$$ if Qk is None: Qk = self.calc_Q(k) [_, D] = leig(Qk, self.Ns[k]) pk = np.sum(np.abs(D)) / np.trace(np.abs(Qk)) return cast(float, pk)
[docs] def calc_SINR_old(self) -> np.ndarray: """ Calculates the SINR values (in linear scale) of all streams of all users with the current IA solution. The noise variance used will be the value of the noise_var property, which, if not explicitly set, will use the noise_var property of the multiuserchannel object. This method is deprecated since it's not the correct way to calculate the SINR. Use the calc_SINR method instead. Returns ------- SINRs : np.ndarray The SINR (in linear scale) of all streams of all users. This is a 1D numpy array of 1D numpy arrays (of floats). """ K = self.K SINRs: np.array = np.empty(K, dtype=np.ndarray) for j in range(K): numerator = 0.0 denominator = 0.0 Wj_H = self.W_H[j] for i in range(K): Hji = self._get_channel(j, i) Fi = self.F[i] aux: np.ndarray = np.dot(Wj_H, np.dot(Hji, Fi)) if i == j: aux = np.dot(aux, aux.transpose().conjugate()) # Numerator will be a 1D numpy array with length equal # to the number of streams numerator = numerator + np.diag(np.abs(aux)) else: denominator = denominator + aux # pylint: disable=E1103 # noinspection PyUnresolvedReferences denominator = np.dot( denominator, denominator.transpose().conjugate()) # type: ignore noise_power = self.noise_var * np.dot(Wj_H, Wj_H.transpose().conjugate()) denominator += noise_power denominator = np.diag(np.abs(denominator)) SINRs[j] = numerator / denominator return SINRs
[docs] def calc_SINR(self) -> np.ndarray: """ Calculates the SINR values (in linear scale) of all streams of all users with the current IA solution. The noise variance used will be the value of the noise_var property, which, if not explicitly set, will use the noise_var property of the multiuserchannel object. Returns ------- SINRs : np.ndarray The SINR (in linear scale) of all streams of all users. This is a 1D numpy array of 1D numpy arrays (of floats). """ K = self.K SINRs = np.empty(K, dtype=np.ndarray) for k in range(self.K): Bkl_all_l = self._calc_Bkl_cov_matrix_all_l(k, self.noise_var) SINRs[k] = self._calc_SINR_k(k, Bkl_all_l) return SINRs
[docs] def calc_SINR_in_dB(self) -> np.ndarray: """ Calculates the SINR values (in dB scale) of all streams of all users with the current IA solution. The noise variance used will be the value of the noise_var property, which, if not explicitly set, will use the noise_var property of the multiuserchannel object. Returns ------- SINRs : np.ndarray The SINR (in dB scale) of all streams of all users. This is a 1D numpy array of 1D numpy arrays (of floats). """ K = self.K SINRs = np.empty(K, dtype=np.ndarray) for k in range(self.K): Bkl_all_l = self._calc_Bkl_cov_matrix_all_l(k, self.noise_var) SINRs[k] = linear2dB(self._calc_SINR_k(k, Bkl_all_l)) return SINRs
[docs] def calc_sum_capacity(self) -> float: """ Calculates the sum capacity of the current solution. The SINRs are estimated and applied to the Shannon capacity formula Returns ------- float The sum capacity of the current solution. """ return cast(float, np.sum(np.log2(1 + np.hstack(self.calc_SINR()))))
# noinspection PyPep8
[docs] def _calc_Bkl_cov_matrix_first_part(self, k: int) -> np.ndarray: """ Calculates the first part in the equation of the Blk covariance matrix in equation (28) of [Cadambe2008]_. The first part is given by .. math:: \\sum_{j=1}^{K} \\frac{P^{[j]}}{d^{[j]}} \\sum_{d=1}^{d^{[j]}} \\mtH^{[kj]}\\mtV_{\\star d}^{[j]} \\mtV_{\\star d}^{[j]\\dagger} \\mtH^{[kj]\\dagger} Note that it only depends on the value of :math:`k`. Parameters ---------- k : int Index of the desired user. Returns ------- Bkl_first_part : np.ndarray First part in equation (28) of [Cadambe2008]_. """ # $$\sum_{j=1}^{K} \frac{P^{[j]}}{d^{[j]}} \sum_{d=1}^{d^{[j]}} \mtH^{[kj]}\mtV_{\star d}^{[j]} \mtV_{\star d}^{[j]\dagger} \mtH^{[kj]\dagger}$$ first_part = 0.0 for j in range(self.K): Hkj = self._get_channel(k, j) Vj = self.full_F[j] aux = np.dot(Hkj, Vj) first_part = first_part + np.dot(aux, aux.conjugate().T) return first_part
# noinspection PyPep8
[docs] def _calc_Bkl_cov_matrix_second_part(self, k: int, l: int) -> np.ndarray: """ Calculates the second part in the equation of the Blk covariance matrix in equation (28) of [Cadambe2008]_ (note that it does not include the identity matrix). The second part is given by .. math:: \\frac{P^{[k]}}{d^{[k]}} \\mtH^{[kk]} \\mtV_{\\star l}^{[k]} \\mtV_{\\star l}^{[k]\\dagger} \\mtH^{[kk]\\dagger} Parameters ---------- k : int Index of the desired user. l : int Index of the desired stream. Returns ------- second_part : np.ndarray Second part in equation (28) of [Cadambe2008]_. """ # $$\frac{P^{[k]}}{d^{[k]}} \mtH^{[kk]} \mtV_{\star l}^{[k]} \mtV_{\star l}^{[k]\dagger} \mtH^{[kk]\dagger}$$ Hkk = self._get_channel(k, k) Vkl = self.full_F[k][:, l:l + 1] aux = np.dot(Hkk, Vkl) second_part = np.dot(aux, aux.conjugate().T) return second_part
# noinspection PyPep8
[docs] def _calc_Bkl_cov_matrix_all_l(self, k: int, noise_power: Optional[float] = None ) -> np.ndarray: """ Calculates the interference-plus-noise covariance matrix for all streams at receiver :math:`k` according to equation (28) in [Cadambe2008]_. The interference-plus-noise covariance matrix for stream :math:`l` of user :math:`k` is given by Equation (28) in [Cadambe2008]_, which is reproduced below .. math:: \\mtB^{[kl]} = \\sum_{j=1}^{K} \\frac{P^{[j]}}{d^{[j]}} \\sum_{d=1}^{d^{[j]}} \\mtH^{[kj]}\\mtV_{\\star l}^{[j]} \\mtV_{\\star l}^{[j]\\dagger} \\mtH^{[kj]\\dagger} - \\frac{P^{[k]}}{d^{[k]}} \\mtH^{[kk]} \\mtV_{\\star l}^{[k]} \\mtV_{\\star l}^{[k]\\dagger} \\mtH^{[kk]\\dagger} + \\mtI_{N^{[k]}} where :math:`P^{[k]}` is the transmit power of transmitter :math:`k`, :math:`d^{[k]}` is the number of degrees of freedom of user :math:`k`, :math:`\\mtH^{[kj]}` is the channel between transmitter :math:`j` and receiver :math:`k`, :math:`\\mtV_{\\star l}` is the :math:`l`-th column of the precoder of user :math:`k` and :math:`\\mtI_{N^{k}}` is an identity matrix with size equal to the number of receive antennas of receiver :math:`k`. Parameters ---------- k : int Index of the desired user. noise_power : float Noise power (variance). Returns ------- Bkl : np.ndarray Covariance matrix of all streams of user k. Each element of the returned 1D numpy array is a 2D numpy complex array corresponding to the covariance matrix of one stream of user k. Notes ----- To be simple, a function that returns the covariance matrix of only a single stream "l" of the desired user "k" could be implemented, but in the order to calculate the max SINR algorithm we need the covariance matrix of all streams and returning them in single function as is done here allows us to calculate the first part in equation (28) of [Cadambe2008]_ only once, since it is the same for all streams. """ # $$\mtB^{[kl]} = \sum_{j=1}^{K} \frac{P^{[j]}}{d^{[j]}} \sum_{d=1}^{d^{[j]}} \mtH^{[kj]}\mtV_{\star l}^{[j]} \mtV_{\star l}^{[j]\dagger} \mtH^{[kj]\dagger} - \frac{P^{[k]}}{d^{[k]}} \mtH^{[kk]} \mtV_{\star l}^{[k]} \mtV_{\star l}^{[k]\dagger} \mtH^{[kk]\dagger} + \sigma_n^2 \mtI_{N^{[k]}}$$ if noise_power is None: noise_power = self.noise_var assert (self._Ns is not None) Bkl_all_l = np.empty(self._Ns[k], dtype=np.ndarray) first_part = self._calc_Bkl_cov_matrix_first_part(k) for l in range(self._Ns[k]): second_part = self._calc_Bkl_cov_matrix_second_part(k, l) Bkl_all_l[l] = first_part - second_part + (noise_power * np.eye(self.Nr[k])) return Bkl_all_l
[docs] def _calc_SINR_k(self, k: int, Bkl_all_l: Sequence[np.ndarray]) -> np.ndarray: """ Calculates the SINR of all streams of user 'k'. Parameters ---------- k : int Index of the desired user. Bkl_all_l : list[np.ndarray] | nd.ndarray A sequence (1D numpy array, a list, etc) of 2D numpy arrays corresponding to the Bkl matrices for all 'l's. Returns ------- SINR_k : np.ndarray The SINR for the different streams of user k. """ Hkk = self._get_channel(k, k) Vk = self.full_F[k] Uk_H = self.full_W_H[k] SINR_k = np.empty(self.Ns[k], dtype=float) for l in range(self.Ns[k]): Vkl = Vk[:, l:l + 1] Ukl_H = Uk_H[l:l + 1, :] Ukl = Ukl_H.conj().T aux = np.dot(Ukl_H, np.dot(Hkk, Vkl)) numerator = np.dot(aux, aux.transpose().conjugate()) denominator = np.dot(Ukl_H, np.dot(Bkl_all_l[l], Ukl)) SINR_kl = numerator.item() / denominator.item() # The imaginary part should be negligible SINR_k[l] = np.abs(SINR_kl) return SINR_k
[docs] @abstractmethod def solve(self, Ns: Union[int, np.ndarray], P: Optional[np.ndarray] = None) -> Any: # pragma: no cover """ Find the IA solution. This method must be implemented in a subclass and should updates the 'F' and 'W' member variables. Parameters ---------- Ns : int | np.ndarray Number of streams of each user. P : np.ndarray Power of each user. If not provided, a value of 1 will be used for each user. Notes ----- This function should be implemented in the derived classes """ raise NotImplementedError("solve: Not implemented")