#!/usr/bin/env python
# pylint: disable=R0914
"""Module implementing the block diagonalization algorithm.
There are two ways to use this module. You can either use the
:class:`BlockDiagonalizer` class, or you can use the
:func:`block_diagonalize` and the :func:`calc_receive_filter` functions
(which use the BlockDiagonalizer class in their implementation).
"""
import collections
from typing import (Any, Callable, Dict, Iterable, List, Optional, Tuple,
Union, cast)
import numpy as np
from scipy.linalg import block_diag
from ..channels.multiuser import MultiUserChannelMatrixExtInt
from ..comm import waterfilling
from ..modulators.fundamental import BPSK, PSK, QAM, QPSK
from ..subspace.projections import calcProjectionMatrix
from ..util.conversion import linear2dB, single_matrix_to_matrix_of_matrices
from ..util.misc import (calc_shannon_sum_capacity, calc_whitening_matrix,
least_right_singular_vectors)
__all__ = [
'block_diagonalize', 'calc_receive_filter', 'BlockDiagonalizer',
'BDWithExtIntBase', 'WhiteningBD', 'EnhancedBD'
]
NumberOrArrayUnion = Union[np.ndarray, float]
Modulator = Union[BPSK, PSK, QPSK, QAM]
# A solution found with the block diagonalization algorithm
BdSolution = Tuple[np.ndarray, np.ndarray]
# metric function
MetricFunc = Callable[..., float]
# Extra arguments for the metric
MetricExtraArgs = Dict[str, Any]
# A solution found with the block diagonalization algorithm for the case with
# external interference. This corresponds to a tuple of 3 numpy arrays.
#
# The first one is a 1D numpy array of 2D numpy arrays (the precoder of each
# user).
#
# The second one is a 1D numpy array of 2D numpy arrays (the receive filter of
# each user).
#
# The third one is a 1D numpy array of integers with the number of streams of
# each user.
BdWithExtIntSolution = Tuple[np.ndarray, np.ndarray, np.ndarray]
# xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# xxxxxxxxxx Module functions xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
[docs]def block_diagonalize(mtChannel: np.ndarray, num_users: int, iPu: float,
noise_var: float) -> BdSolution:
"""
Performs the block diagonalization of `mtChannel`.
Parameters
----------
mtChannel : np.ndarray
Global channel matrix (a 2D numpy array).
num_users : int
Number of users
iPu : float
Power available for each user
noise_var : float
Noise variance
Returns
-------
(newH, Ms_good) : (np.ndarray,np.ndarray)
newH is a 2D numpy array corresponding to the Block
diagonalized channel, while Ms_good is a 2D numpy array
corresponding to the precoder matrix used to block diagonalize
the channel.
Notes
-----
The block diagonalization algorithm is described in [Spencer2004]_, where
different power allocations are illustrated. The :class:`BlockDiagonalizer`
class implement two power allocation methods, a global power allocation,
and a 'per transmitter' power allocation.
"""
BD = BlockDiagonalizer(num_users, iPu, noise_var)
results_tuple = BD.block_diagonalize(mtChannel)
return results_tuple
[docs]def calc_receive_filter(newH: np.ndarray) -> np.ndarray:
"""
Calculates the Zero-Forcing receive filter.
Parameters
----------
newH : np.ndarray
The block diagonalized channel.
Returns
-------
np.ndarray
The zero-forcing matrix to separate each stream of each user. Note
that W_bd_H is directly applied to the received signals (no need
to calculate the conjugate transpose).
"""
return BlockDiagonalizer.calc_receive_filter(newH)
# xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# xxxxxxxxxx Helper Functions xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
def _calc_stream_reduction_matrix(Re_k: np.ndarray,
kept_streams: int) -> np.ndarray:
"""Calculates the `P` matrix that performs the stream reduction such that
the subspace of the remaining streams span the dimensions with the
lowest interference (according to the external interference plus noise
covariance matrix Re_k).
Parameters
----------
Re_k : np.ndarray
The external interference plus noise covariance matrix at a SINGLE
receiver.
kept_streams : int
Number of streams that will be kept. This will be equal to the
number of columns of the returned matrix.
Returns
-------
Pk : np.ndarray
A matrix whose columns corresponding to the `kept_streams` least
significant right singular vectors of Re_k.
"""
min_Vs = least_right_singular_vectors(Re_k, kept_streams)[0]
return min_Vs
def _calc_effective_throughput(sinrs: NumberOrArrayUnion, modulator: Modulator,
packet_length: int) -> float:
"""Calculates the effective throughput of the values in `sinrs` considering
the given modulator and packet_length.
The effective throughput is equivalent to the packet error for a
specific packet error rate and packet length, times the nominal
throughput.
Parameters
----------
sinrs : float | np.ndarray
SINR values (in linear scale).
modulator : Modulator
A modulator object such as M-PSK, M-QAM, etc. See the
:mod:`.modulators` module.
packet_length: int
The package length. That is, the number of bits in each
package.
Returns
-------
effective_throughput : float
Effective throughput that can be obtained.
"""
SINRs = linear2dB(sinrs)
se = modulator.calcTheoreticalSpectralEfficiency(SINRs, packet_length)
total_se = np.sum(se)
return cast(float, total_se)
# xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# xxxxxxxxxx Classes xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
[docs]class BlockDiagonalizer:
"""
Class to perform the block diagonalization algorithm in a joint
transmission scenario.
In the block diagonalization algorithm either a single base station
with more antennas transmits for multiple users at the same time or a
group of base stations acts as a single transmitter to send data to the
multiple users at the same time. In both cases the block
diagonalization algorithm assures that each receiver does not see
interference from the other receivers.
The water-filling algorithm is also applied to optimally distribute the
power. However, in the case with multiple base stations, the power
restriction in each base station must be respected. Therefore, after
the power is optimally allocated at each base station all powers will
be normalized to respect the power restriction of the base transmitting
the highest energy. This is what is done in the
:meth:`block_diagonalize` method.
If the power should not be optimally allocated with the water-filling
algorithm, use the :meth:`block_diagonalize_no_waterfilling` method
instead. The power restriction in each base station will still be
respected, but the base station will equally divide its power among the
available dimensions. Note that the result will be similar to
:meth:`block_diagonalize` in the high SNR regime.
Parameters
----------
num_users : int
Number of users.
iPu : float
Power available for EACH user.
noise_var : float
Noise variance (power in linear scale).
Examples
--------
Consider the case where we have 3 base station (BSs) jointly
transmitting to 3 users, where each base station has a power of 1.5,
the number of antennas (at each BS and at each receiver) is 2,
and the noise variance is 1e-4. The channel can be block
diagonalized for this scenario with
>>> bs_power = 1.5
>>> noise_var = 1e-4
>>> num_users = 2
>>> Ntx = 2 # Number of transmit antennas (per BS)
>>> Nrx = 2 # Number of receive antennas (per user)
>>> # Create the BlockDiagonalizer object
>>> bd = BlockDiagonalizer(num_users, bs_power, noise_var)
>>> channel = np.array([[-0.9834-0.0123j, 0.6503-0.3189j, \
0.5484+1.7049j, -1.0891-0.1025j], [-0.5911-0.3055j, -0.6205+0.3375j, \
-0.7995+0.3723j, 0.7412-1.2537j], [-0.2732+0.475j , -0.4191+0.4019j, \
0.1047-0.5592j, 0.7548-1.0214j], [ 0.5377-0.208j , -0.1480-1.0527j, \
-0.6373+0.4081j, -0.5854-0.8135j]])
>>> (newH, Ms) = bd.block_diagonalize(channel)
We can see that the equivalent channel (after applying the Ms
modulation matrix) is really block diagonalized.
>>> print(np.round(newH + 1e-10 + 1e-10j, 4))
[[ 0.0916+0.0135j -1.7449-0.4328j 0. +0.j 0. +0.j ]
[-0.0114-0.146j 0.0213-1.1366j 0. +0.j 0. +0.j ]
[ 0. +0.j 0. +0.j 0.0868+0.1565j -0.3673+0.2289j]
[ 0. +0.j 0. +0.j -0.0396+0.0407j 1.024 +0.8997j]]
Notice how the power restriction of each BS is respected (although only
one BS will transmit with its maximum power).
>>> print(np.round(np.linalg.norm(Ms[:,0:Ntx])**2, 4))
1.4997
>>> print(np.round(np.linalg.norm(Ms[:,Ntx:])**2, 4))
1.5
Notes
-----
The block diagonalization algorithm is described in [Spencer2004]_, where
different power allocations are illustrated. The :class:`BlockDiagonalizer`
class implement two power allocation methods, a global power allocation,
and a 'per transmitter' power allocation.
"""
def __init__(self, num_users: int, iPu: float, noise_var: float) -> None:
self.num_users = num_users
self.iPu = iPu
# Noise power is used in the water-filling calculations
self.noise_var = noise_var
# noinspection PyPep8
[docs] def _calc_BD_matrix_no_power_scaling(self,
mtChannel: np.ndarray) -> BdSolution:
"""
Calculates the modulation matrix "M" that block diagonalizes the
channel `mtChannel`, but without any king of power scaling.
The "modulation matrix" is a matrix that changes the channel to a block
diagonal structure and it is the first part in the Block
Diagonalization algorithm. The returned modulation matrix is equivalent
to Equation (12) of [Spencer2004]_ but without the power scaling matrix
:math:`\\Lambda`. Therefore, for the complete BD algorithm it is still
necessary to perform this power scaling in the output of
_calc_BD_matrix_no_power_scaling.
Parameters
----------
mtChannel : np.ndarray
Channel from the transmitter to all users.
Returns
-------
(Ms_bad, Sigma) : (np.ndarray,np.ndarray)
The modulation matrix "Ms_bad" is a precoder that block
diagonalizes the channel. The singular values of the equivalent
channel when the modulation matrix is applied correspond to
Sigma. Therefore, Sigma can be used latter in the power
allocation process.
Notes
-----
The reason why the Block Diagonalization algorithm was broken down
into the code here and the power scaling code is because the power
scaling may changing depending on the scenario. For instance, if
the transmitter corresponds to a single base station the the power
may be distributed into all the dimensions of the Modulation
matrix. On the other hand, if the transmitter corresponds to
multiple base stations jointly transmitting to multiple users then
the power of each base station must be distributed only into the
dimensions corresponding to that base station.
"""
iNr = mtChannel.shape[0]
msg = ("`block_diagonalize`: Number of rows of the channel must be"
" a multiple of the number of users.")
assert iNr % self.num_users == 0, msg
# Number of antennas per user
iNrU = iNr // self.num_users
# xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
Ms_bad: List[np.ndarray] = []
Sigma: List[np.ndarray] = []
# Calculates the interfering channels $\tilde{\mat{H}}_j$ as well
# as $\tilde{\mtV}_j^{(1)}$ and $\tilde{\mtV}_j^{(0)}$.
# Note that $\tilde{\mat{H}}_j = \tilde{\mtU}_j \tilde{\Sigma}_j [\tilde{\mtV}_j^{(1)} \; \tilde{\mtV}_j^{(0)}]^H$ where $\tilde{\mtV}_j^{(1)}$ holds
# the first
# $\tilde{L}_j$ right singular vectors and $\tilde{\mtV}_j^{(0)}$
# holds the last $(n_T - \tilde{L}_j)$ right singular values
for user in range(0, self.num_users):
# channel of all users except the current user
tilde_H_cur_user = self._get_tilde_channel(mtChannel, user)
# How many streams user `user` can receive is given by the
# total number of receive antennas minus the rank of
# tilde_H_cur_user
nStreams = iNr - np.linalg.matrix_rank(tilde_H_cur_user)
tilde_V0 = least_right_singular_vectors(tilde_H_cur_user,
nStreams)[0]
# The equivalent channel of the current user corresponds to
# $\mtH_j \tilde{\mtV}_j^{(0)}$
# First we get $\mtH_j$
H_cur_user = self._get_sub_channel(mtChannel, user)
# Now we get the right singular value of the equivalent channel
(_, V1, S) = least_right_singular_vectors(
np.dot(H_cur_user, tilde_V0),
# Number of receive antennas minus number of desired
# streams
iNrU - nStreams)
# Get Ms and Sigma
Ms_bad.append(np.dot(tilde_V0, V1))
Sigma.extend(S)
# Concatenates the precoder for each user to form the complete Ms
# matrix. Ms_bad is the version without water-filling.
# This is equivalent to "concatenate(Ms_bad, axis=1)"
Ms_bad = np.hstack(Ms_bad)
Sigma = np.array(Sigma)
return Ms_bad, Sigma
[docs] def block_diagonalize(self, mtChannel: np.ndarray) -> BdSolution:
"""
Perform the block diagonalization.
mtChannel is a matrix with the channel from the transmitter to all
users, where each `iNUsers` rows correspond to one user.
For an example, see the documentation of the
:class:`BlockDiagonalizer` class.
Parameters
----------
mtChannel : np.ndarray
Channel from (all) the transmitter(s) to all users. This should
be a 2D numpy array.
Returns
-------
np.ndarray, np.ndarray
newH is a 2D numpy array corresponding to the Block
diagonalized channel, while Ms_good is a 2D numpy array
corresponding to the precoder matrix used to block diagonalize
the channel.
See also
--------
block_diagonalize_no_waterfilling
"""
# Calculates the modulation matrix and the singular values of the
# effective channel when this modulation matrix is applied.
(Ms_bad, Sigma) = self._calc_BD_matrix_no_power_scaling(mtChannel)
# Scale the power of this modulation assuring the power restriction
# is not violated in any of the base stations.
Ms_good = self._perform_normalized_waterfilling_power_scaling(
Ms_bad, Sigma)
# Finally calculates the Block diagonal channel
newH = np.dot(mtChannel, Ms_good)
# Return block diagonalized channel and the used precoding matrix
return newH, Ms_good
[docs] def block_diagonalize_no_waterfilling(self,
mtChannel: np.ndarray) -> BdSolution:
"""
Performs the block diagonalization, but without applying the
water-filling algorithm.
The power of each base station is equally divided such that the
square of the Frobenius norm or the columns of Ms_good
corresponding to that base station is equal to its power.
Parameters
----------
mtChannel : np.ndarray
Channel from (all) the transmitter(s) to all users. This should
be a 2D numpy array.
Returns
-------
(newH, Ms_good) : (np.ndarray,np.ndarray)
newH is a 2D numpy array corresponding to the Block
diagonalized channel, while Ms_good is a 2D numpy array
corresponding to the precoder matrix used to block diagonalize
the channel.
See also
--------
block_diagonalize
"""
# This only works of the number of transmit antennas is the same
# for all transmitters.
iNtU = mtChannel.shape[1] // self.num_users
# Calculates the modulation matrix and the singular values of the
# effective channel when this modulation matrix is applied.
(Ms_bad, _) = self._calc_BD_matrix_no_power_scaling(mtChannel)
# Scale the power of this modulation assuring the power restriction
# is not violated in any of the base stations.
Ms_good = np.empty(Ms_bad.shape, dtype=complex)
for user in range(0, self.num_users):
# Calculate the Frobenius norm of the matrix corresponding to
# the transmitter `user`
user_matrix = Ms_bad[:, user * iNtU:user * iNtU + iNtU]
# The power is actually the square of cur_sqrt_P
cur_sqrt_P = np.linalg.norm(user_matrix, 'fro')
Ms_good[:, user * iNtU:user * iNtU +
iNtU] = (user_matrix * np.sqrt(self.iPu) / cur_sqrt_P)
# Ms_good = self._perform_normalized_power_scaling(Ms_bad,
# Sigma)
# Finally calculates the Block diagonal channel
newH = np.dot(mtChannel, Ms_good)
# Return block diagonalized channel and the used precoding matrix
return newH, Ms_good
[docs] @staticmethod
def calc_receive_filter(newH: np.ndarray) -> np.ndarray:
"""
Calculates the Zero-Forcing receive filter.
Parameters
----------
newH : np.ndarray
The block diagonalized channel (2D numpy array).
Returns
-------
W_bd : np.ndarray
The zero-forcing matrix (2D numpy array) to separate each
stream of each user.
"""
W_bd = np.linalg.pinv(newH)
return W_bd
# noinspection PyPep8
[docs] def _get_tilde_channel(self, mtChannel: np.ndarray,
user: int) -> np.ndarray:
"""
Return the combined channel of all users except `user`.
Let $k$ be the index for `user`. If the channel from all
transmitters to receiver $k$ is $\\mtH_k$, then this method returns
$\\tilde{\\mtH_k} = [\\mtH_1^T, \\ldots, \\mtH_{k-1}^T, \\mtH_{k+1}^T, \\ldots, \\mtH_K]^T$.
Parameters
----------
mtChannel : np.ndarray
Channel of all users (2D numpy array).
user : int
Index of the user.
Returns
-------
np.ndarray
The combined channel of all users except `user`.
"""
vtAllUserIndexes = np.arange(0, self.num_users)
desiredUsers = [i for i in vtAllUserIndexes if i != user]
return self._get_sub_channel(mtChannel, desiredUsers)
[docs] def _get_sub_channel(
self, mt_channel: np.ndarray,
desired_users: Union[int, Iterable[int]]) -> np.ndarray:
"""
Get a subchannel according to the desired_users vector.
Parameters
----------
mt_channel : np.ndarray
Channel of all users (2D numpy array).
desired_users : list[int]
An iterable with the indexes of the desired users or an
integer.
Returns
-------
mtSubmatrix : np.ndarray
Submatrix of the desired users (2D numpy array)
Notes
-------
As an example, let's consider the case with a channel for 3
receivers, each with 2 receive antennas, where the transmitter has
6 transmit antennas.
>>> BD = BlockDiagonalizer(3, 0, 0)
>>> channel = np.vstack([np.ones([2, 6]), 2 * np.ones([2, 6]),\
3 * np.ones([2, 6])])
>>> BD._get_sub_channel(channel, [0,2])
array([[1., 1., 1., 1., 1., 1.],
[1., 1., 1., 1., 1., 1.],
[3., 3., 3., 3., 3., 3.],
[3., 3., 3., 3., 3., 3.]])
>>> BD._get_sub_channel(channel, 0)
array([[1., 1., 1., 1., 1., 1.],
[1., 1., 1., 1., 1., 1.]])
"""
nrows = mt_channel.shape[0]
# Number of receive antennas per user
iNrU = nrows // self.num_users
if isinstance(desired_users, collections.abc.Iterable):
vtIndexes: List[int] = []
for index in desired_users:
vtIndexes.extend(range(iNrU * index, (index + 1) * iNrU))
else:
assert isinstance(desired_users, int)
vtIndexes = list(
range(iNrU * desired_users, (desired_users + 1) * iNrU))
return mt_channel[vtIndexes, :]
[docs]class BDWithExtIntBase(BlockDiagonalizer):
"""
Class to perform the block diagonalization algorithm in a joint
transmission scenario taking into account the external interference.
This is the base class for any block diagonalization class that takes
into account the external interference.
Parameters
----------
num_users : int
Number of users.
iPu : float
Power available for EACH user (in linear scale).
noise_var : float
Noise variance (power in linear scale).
pe : float
Power of the external interference source (in linear scale)
"""
def __init__(self, num_users: int, iPu: float, noise_var: float,
pe: float) -> None:
super().__init__(num_users, iPu, noise_var)
self.pe = pe
[docs] def calc_whitening_matrices(
self,
mu_channel: MultiUserChannelMatrixExtInt) -> List[np.ndarray]:
"""
Calculates the whitening receive filters for each user.
Note that to consider the noise variance it must be set in the
provided mu_channel object.
Parameters
----------
mu_channel : MultiUserChannelMatrixExtInt
A MultiUserChannelMatrixExtInt object, which has the channel
from all the transmitters to all the receivers, as well as th
external interference.
Returns
-------
W_all_k : list[np.ndarray]
The whitening matrices that each receiver should use to whiten
the external interference. Each element in W_all_k is the
whitening filter (with the conjugate transpose already applied)
for a receiver.
"""
K = mu_channel.K
R_all_k = mu_channel.calc_cov_matrix_extint_plus_noise(self.pe)
W_all_k = [
calc_whitening_matrix(R_all_k[k]).conjugate().T for k in range(K)
]
return W_all_k
[docs]class WhiteningBD(BDWithExtIntBase):
"""
Class to perform the block diagonalization algorithm in a joint
transmission scenario taking into account the external interference.
Parameters
----------
num_users : int
Number of users.
iPu : float
Power available for EACH user (in linear scale).
noise_var : float
Noise variance (power in linear scale).
pe : float
Power of the external interference source (in linear scale)
"""
def __init__(self, num_users: int, iPu: float, noise_var: float,
pe: float) -> None:
super().__init__(num_users, iPu, noise_var, pe)
[docs] @staticmethod
def _calc_receive_filter_with_whitening(newH: np.ndarray,
whitening_filter: np.ndarray,
Nr: np.ndarray,
Nt: np.ndarray) -> np.ndarray:
"""
Calculates the Zero-Forcing receive filter of all users.
Parameters
----------
newH : np.ndarray
The block diagonalized channel (with the whitening applied).
This should be a 2D numpy array.
whitening_filter : np.ndarray
The whitening filter of all users. This is a block diagonal
matrix where each "block" is the whitening filter of a user.
Nr : np.ndarray
The number of receive antennas of each user.
Nt : np.ndarray
The number of transmit antennas of each user.
Returns
-------
Wk_all_users : np.ndarray
A 1D numpy array where each element corresponds to the receive
filter for a user. This is a 1D numpy array of 2D numpy arrays.
"""
# W = np.linalg.inv(
# np.dot(whitening_filter_k.conjugate().T, Heq_k))
K = Nr.size
big_W = np.dot(BlockDiagonalizer.calc_receive_filter(newH),
whitening_filter)
Wk_all_users = np.empty(K, dtype=np.ndarray)
aux = single_matrix_to_matrix_of_matrices(big_W, Nr, Nt)
for userindex in range(K):
Wk_all_users[userindex] = aux[userindex, userindex]
return Wk_all_users
[docs] def block_diagonalize_no_waterfilling( # type: ignore
self,
mu_channel: MultiUserChannelMatrixExtInt) -> BdWithExtIntSolution:
"""
Perform the block diagonalization of `mu_channel` taking the external
interference into account.
Parameters
----------
mu_channel : MultiUserChannelMatrixExtInt
A MultiUserChannelMatrixExtInt object, which has the channel
from all the transmitters to all the receivers, as well as the
external interference.
Returns
-------
Ms_all_users : np.ndarray
A 1D numpy array where each element corresponds to the precoder
for a user (1D numpy array of 2D numpy arrays).
Wk_all_users : np.ndarray
A 1D numpy array where each element corresponds to the receive
filter for a user (1D numpy array of 2D numpy arrays).
Ns_all_users: np.ndarray
Number of streams of each user (1D numpy array of ints).
"""
Nr = mu_channel.Nr
Nt = mu_channel.Nt
# Re = mu_channel.calc_cov_matrix_extint_plus_noise(
# self.noise_var, self.pe)
H_matrix = mu_channel.big_H_no_ext_int
# Get the whitening filters. This is a list of 2D numpy arrays
whitening_filter_all_k = self.calc_whitening_matrices(mu_channel)
# Creates a block diagonal matrix with the whitening filters for
# each user in the "diagonal" of this block diagonal matrix.
big_whitening_filter = block_diag(*whitening_filter_all_k)
# Equivalent big channel matrix after applying the whitening filter
H_matrix_equiv = np.dot(big_whitening_filter, H_matrix)
# We will now apply the regular block diagonalization algorithm in
# this equivalent channel matrix.
(newH, Ms) = BlockDiagonalizer.block_diagonalize_no_waterfilling(
self, H_matrix_equiv)
# xxxxxxxxxx Calculates the precoder for each transmitter xxxxxxxxx
Ms_all_users = single_matrix_to_matrix_of_matrices(Ms, None, Nt)
# xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# xxxxxxxxxx Calculate the receive filter xxxxxxxxxxxxxxxxxxxxxxxxx
Wk_all_users = self._calc_receive_filter_with_whitening(
newH, big_whitening_filter, Nr, Nt)
# xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
return Ms_all_users, Wk_all_users, Nt.copy()
[docs]class EnhancedBD(BDWithExtIntBase):
"""
Class to perform the block diagonalization algorithm in a joint
transmission scenario taking into account the external interference.
The EnhancedBD class performs the block diagonalization characteristic
to the joint transmission scenario where multiple base stations act as
a single transmitter to send data to the users. However, in addition to
what the BlockDiagonalizer class does the EnhancedBD class can also
take external interference into account.
One way to reduce of eliminate the external interference is to
sacrifice streams in directions strongly occupied by the external
interference.
Parameters
----------
num_users : int
Number of users.
iPu : float
Power available for EACH user (in linear scale).
noise_var : float
Noise variance (power in linear scale).
pe : float
Power of the external interference source (in linear scale)
Notes
-----
See the :class:`BlockDiagonalizer` class for details about the block
diagonalization process.
"""
def __init__(self, num_users: int, iPu: float, noise_var: float,
pe: float) -> None:
super().__init__(num_users, iPu, noise_var, pe)
# Function used to decide how many streams will be sacrificed to
# mitigate external interference. This is set in the
# set_ext_int_handling_metric method (as well as the _modulator and
# _packet_length attributes)
self._metric_func: Optional[
MetricFunc] = None # The default metric will be None
self._metric_func_name = 'None'
# Extra arguments that will be passed to the self._metric_func when
# it is called.
self._metric_func_extra_args: MetricExtraArgs = {}
[docs] def set_ext_int_handling_metric(
self,
metric: Optional[str],
metric_func_extra_args_dict: Optional[MetricExtraArgs] = None
) -> None:
"""
Set the metric used to decide how many streams to sacrifice for
external interference handling.
The modification to the standard Block Diagonalization algorithm
performed in this class consists in avoid transmit data in the
subspace strongly occupied by the external interference source.
This sacrificing (not transmitting) of streams may or may not be
worth it and different metrics can be used to decide this.
The valid values for the `metric` argument are 'None' (python None
object), "fixed", "naive", "capacity" and "effective_throughput".
Each of these values will impact on how the number of transmit
streams is chosen and which subspace is actually used for the
desired signal.
For the "fixed" and "naive" metrics, the number of transmit streams
is determined by the value of the 'num_streams' key in the
metric_func_extra_args_dict. The difference between them is how the
subspace where the useful data is determined (for the given number
of sacrificed streams).
For the "naive" metric, the stream reduction is performed by
multiplying the usual block diagonalizing matrix M by a subset of
the identity matrix. For the "fixed" metric the subspace containing
the lowest remaining external interference energy is chosen by
multiplying the block diagonalizing matrix M by the singular vectors
of the external interference covariance matrix corresponding to the
lowest singular values. The same procedure is used for the other
metrics.
Differently from the "naive" and "fixed" metrics, the "capacity"
and "effective_throughput" metrics try to determine this best
number of sacrificed streams.
- If `metric` is None, then all streams will be used. That is, no
streams will be sacrificed and the external interference won't be
mitigated.
- If metric is "None" or "naive" then the specified number of
streams will be used.
- If `metric` is 'capacity', then the metric used to decide how
many streams to sacrifice will be the sum capacity. The function
:func:`.calc_shannon_sum_capacity` will be used to calculate the
sum capacity metric, and since it only uses the SINR values, no
extra arguments are required in the metric_func_extra_args_dict
dictionary.
- If `metric` is 'effective_throughput' then the metric used to
decide how many streams to sacrifice will be the effective
throughput that can be obtained. The function
`_calc_effective_throughput` will be used to calculate the
effective throughput. Since it requires the a modulator and a
packet length you should set the metric_func_extra_args_dict so
that it has the keys 'modulator' and 'packet_length' with the
correct values (a modulator object and an integer, respectively)
Parameters
----------
metric : str | None
The metric name. Must be one of the available metrics: {None,
'capacity', 'effective_throughput'}.
metric_func_extra_args_dict : dict
A dictionary containing the extra arguments that must be passed
to the metric function. For the "naive" and "fixed" metrics,
this dictionary must contain the "num_streams" keyword with the
desired number of transmit streams. For the
"effective_throughput" metric this dictionary mst contain the
"modulator" and "packet_length" keywords with a modulator
object and an integer, respectively. For the other metrics
metric_func_extra_args_dict will be ignored.
Raises
------
AttributeError
If the metric is not one of the available metrics or if the
metric_func_extra_args_dict does not contain the required
keywords.
"""
if metric_func_extra_args_dict is None:
metric_func_extra_args_dict = {}
if metric is None or metric == 'None':
self._metric_func_name = 'None'
self._metric_func = None
self._metric_func_extra_args = {}
elif metric == 'capacity':
self._metric_func_name = 'capacity'
self._metric_func = calc_shannon_sum_capacity
self._metric_func_extra_args = {}
elif metric == 'naive':
self._metric_func_name = 'naive'
self._metric_func = None
if 'num_streams' not in metric_func_extra_args_dict.keys():
msg = ("The 'naive' metric requires that "
"metric_func_extra_args_dict is provided and has "
"the 'num_streams' key")
raise AttributeError(msg)
# Set self._metric_func_extra_args as a dictionary containing
# the 'num_stream' key (and value) in
# metric_func_extra_args_dict
self._metric_func_extra_args = {
k: metric_func_extra_args_dict[k]
for k in ('num_streams', )
}
self._metric_func_extra_args = metric_func_extra_args_dict
elif metric == 'fixed':
self._metric_func_name = 'fixed'
self._metric_func = None
if 'num_streams' not in metric_func_extra_args_dict.keys():
msg = ("The 'fixed' metric requires that "
"metric_func_extra_args_dict is provided and has "
"the 'num_streams' key")
raise AttributeError(msg) # pragma: no cover
# Set self._metric_func_extra_args as a dictionary containing
# the 'num_stream' key (and value) in
# metric_func_extra_args_dict
self._metric_func_extra_args = {
k: metric_func_extra_args_dict[k]
for k in ('num_streams', )
}
elif metric == 'effective_throughput':
self._metric_func_name = 'effective_throughput'
self._metric_func = _calc_effective_throughput
keys = metric_func_extra_args_dict.keys()
if ('modulator' not in keys) or ('packet_length' not in keys):
msg = ("The 'effective_throughput' metric requires that "
"metric_func_extra_args_dict is provided and has "
"the 'modulator' and package_length' keys")
raise AttributeError(msg)
# Set self._metric_func_extra_args as a dictionary containing
# the 'modulator' and 'packet_length' keys (and values) in
# metric_func_extra_args_dict
self._metric_func_extra_args = {
k: metric_func_extra_args_dict[k]
for k in ('modulator', 'packet_length')
}
else:
msg = ("The `metric` attribute can only be one of "
"{None, 'capacity', 'effective_throughput'}")
raise AttributeError(msg)
@property
def metric_name(self) -> str:
"""Get name of the method used to decide how many streams to sacrifice.
Returns
-------
str
The metric name.
"""
return self._metric_func_name
[docs] @staticmethod
def calc_receive_filter_user_k(Heq_k_P: np.ndarray,
P: Optional[np.ndarray] = None
) -> np.ndarray:
"""
Calculates the Zero-Forcing receive filter of a single user `k` with or
without the stream reduction.
Parameters
----------
Heq_k_P : np.ndarray
The equivalent channel of user `k` after the block
diagonalization process and any stream reduction (2D numpy array).
P : np.ndarray, optional
P has the most significant singular vectors of the external
interference plus noise covariance matrix for each
receiver.
Returns
-------
W : np.ndarray
The receive filter of user `k` (2D numpy array).
Notes
-----
If `P` is not None then the number of transmit streams will be
equal to the number of columns in `P`. Also, the receive filter `W`
includes a projection into the subspace spanned by the columns of
`P`. Since `P` was calculated to be in the directions with weaker
(or no) external interference then the receive filter `W` will
mitigate the external interference.
"""
if P is None:
W = np.linalg.pinv(Heq_k_P)
else:
overbar_P = calcProjectionMatrix(P)
# Calculate the equivalent channel including the stream
# reduction
# Heq_k_red = np.dot(Heq_k, P)
W = np.dot(np.linalg.pinv(np.dot(overbar_P, Heq_k_P)), overbar_P)
return W
# NOTE: PROBABLY, THIS IS ONLY VALID FOR SPATIAL MULTIPLEXING.
[docs] @staticmethod
def _calc_linear_SINRs(Heq_k_red: np.ndarray, Wk: np.ndarray,
Re_k: np.ndarray) -> np.ndarray:
"""Calculates the effective SINRs of each parallel channel.
Parameters
----------
Heq_k_red : np.ndarray
Equivalent channel matrix of user `k` including the block
diagonalization and any stream reduction applied (2D numpy array).
Wk : np.ndarray
Receive filter for user `k` (2D numpy array).
Re_k : np.ndarray
A numpy array where each element is the covariance matrix of the
external interference PLUS noise seen by a user (1D numpy array
of 2D numpy arrays.).
Returns
-------
sinrs : np.ndarray
SINR (in linear scale) of all the parallel channels of all users.
"""
mtP = np.dot(Wk, Heq_k_red)
desired_power = np.abs(np.diagonal(mtP))**2
# noinspection PyTypeChecker
internal_interference = np.sum(
np.abs((mtP - np.diagflat(np.diagonal(mtP))))**2, 1)
Wk_H = Wk.transpose().conjugate()
# Note that the noise is already accounted in the covariance matrix
# Re_k
external_interference_plus_noise = np.diagonal(
np.dot(Wk, np.dot(Re_k, Wk_H))).real
sinr = desired_power / (internal_interference +
np.abs(external_interference_plus_noise))
return sinr
[docs] def block_diagonalize_no_waterfilling( # type: ignore
self,
mu_channel: MultiUserChannelMatrixExtInt) -> BdWithExtIntSolution:
"""Perform the block diagonalization of `mu_channel` taking the
external interference into account.
This is the main method calculating the BD algorithm. Two
important parameters used here are the noise variance (an attribute
of the `mu_channel` object) and the external interference power
(the `pe` attribute) attributes.
Parameters
----------
mu_channel : MultiUserChannelMatrixExtInt object.
A MultiUserChannelMatrixExtInt object, which has the channel
from all the transmitters to all the receivers, as well as the
external interference.
Returns
-------
MsPk_all_users : np.ndarray
A 1D numpy array where each element corresponds to the precoder
for a user (1D numpy array of 2D numpy arrays).
Wk_all_users : np.ndarray
A 1D numpy array where each element corresponds to the receive
filter for a user (1D numpy array of 2D numpy arrays).
Ns_all_users: np.ndarray
Number of streams of each user (1D numpy array of ints).
"""
# xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# xxxxxxxxxx Case where no stream reduction is performed xxxxxxxxxx
# xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# The value of self._metric_func_extra_args is not used for this
# case
if self._metric_func_name == "None":
return self._perform_BD_no_waterfilling_no_stream_reduction(
mu_channel)
# xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# xxxxx Case where the naive stream reduction is performed xxxxxxxx
# xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
if (self._metric_func_name == "naive"
or self._metric_func_name == "fixed"):
return self._perform_BD_no_waterfilling_fixed_or_naive_reduction(
mu_channel)
# xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# xxxxxxxxxx Case where self._metric_func is used xxxxxxxxxxxxxxxxx
# xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# In this case the number of sacrificed streams is automatically
# determined according to the self._metric_func function. This
# function is set in the set_ext_int_handling_metric method, where
# any extra arguments (besides sinr) that should be passed to this
# function are set in the _metric_func_extra_args dictionary.
return self._perform_BD_no_waterfilling_decide_number_streams(
mu_channel)
# xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx