Source code for pyphysim.comm.waterfilling

#!/usr/bin/env python
"""Implements a waterfilling method.

The doWF method performs the waterfilling algorithm.
"""

from typing import Tuple

import numpy as np

__all__ = ['doWF']


# noinspection PyUnresolvedReferences
[docs]def doWF(vtChannels: np.ndarray, dPt: float, noiseVar: float = 1.0, Es: float = 1.0) -> Tuple[np.ndarray, float]: """ Performs the Waterfilling algorithm and returns the optimum power and water level. Parameters ---------- vtChannels : np.ndarray Numpy array with the channel POWER gains (power of the parallel AWGN channels). dPt : float Total available power. noiseVar : float Noise variance (power in linear scale). Es : float Symbol energy (in linear scale). Returns ------- (vtOptP, mu) : (np.ndarray, float) A tuple with vtOptP and mu, where vtOptP are the optimum powers, while mu is the water level. """ # Sort Channels (descending order) vtChannelsSortIndexes = np.argsort(vtChannels)[::-1] vtChannelsSorted = vtChannels[vtChannelsSortIndexes] assert isinstance(vtChannelsSorted, np.ndarray) # xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx # Calculates the water level that touches the worst channel (the higher # one) and therefore transmits zero power in this worst channel. After # that, calculates the power in each channel (the vector 'Ps') for this # water level. If the sum of all of these powers in 'Ps' is less then # the total available power, then all we need to do is divide the # remaining power equally among all the channels (increase the water # level). On the other hand, if the sum of all of these powers in 'Ps' # is greater then the total available power then we remove the worst # channel and repeat the process. # xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx # Calculates minimum water-level $\mu$ required to use all channels dNChannels = vtChannels.size dRemoveChannels = 0 minMu = float(noiseVar) / ( Es * vtChannelsSorted[dNChannels - dRemoveChannels - 1]) Ps = (minMu - float(noiseVar) / (Es * vtChannelsSorted[np.arange(0, dNChannels - dRemoveChannels)])) # Ps should be a numpy array assert isinstance(Ps, np.ndarray) while (sum(Ps) > dPt) and (dRemoveChannels < dNChannels): dRemoveChannels += 1 minMu = float(noiseVar) / ( Es * vtChannelsSorted[dNChannels - dRemoveChannels - 1]) Ps = ( minMu - float(noiseVar) / (Es * vtChannelsSorted[np.arange(0, dNChannels - dRemoveChannels)]) ) # Distributes the remaining power among the all the remaining channels dPdiff = dPt - np.sum(Ps) vtOptPaux = dPdiff / (dNChannels - dRemoveChannels) + Ps # Put optimum power in the original channel order vtOptP = np.zeros([ vtChannels.size, ]) vtOptP[vtChannelsSortIndexes[np.arange(0, dNChannels - dRemoveChannels)]] = vtOptPaux mu = vtOptPaux[0] + float(noiseVar) / vtChannelsSorted[0] return vtOptP, mu