Source code for pyphysim.simulations.simulationhelpers

#!/usr/bin/env python
"""Module implementing helper functions for simulators."""

import sys
from typing import Any, List, Optional, Union

from ..progressbar import ProgressbarZMQServer
from .runner import SimulationRunner

try:
    # noinspection PyUnresolvedReferences
    from ipyparallel import Client, LoadBalancedView, DirectView
except ImportError:  # pragma: no cover
    Client = Any
    LoadBalancedView = Any
    DirectView = Any


# xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# xxxxxxxxxxxxxxx Module functions xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
[docs]def simulate_do_what_i_mean( runner_or_list_of_runners: Union[SimulationRunner, List[SimulationRunner]], folder: Optional[str] = None) -> None: # pragma: no cover """ This will either call the simulate method or the simulate_in_parallel method as appropriated. If the 'parameters variation index' was specified in the command line, then the 'simulate' method will be called with that index. If not, then the simulate method will be called without any index or, if there is an ipython cluster running, the simulate_in_parallel method will be called. Parameters ---------- runner_or_list_of_runners : SimulationRunner | list[SimulationRunner] The SimulationRunner object for which either the 'simulate' or the 'simulate_in_parallel' method will be called. If this is a list, then we just call this method individually for each member of the list. folder : str Folder to be added to the python path. This should be the main pyphysim folder """ if isinstance(runner_or_list_of_runners, list): list_of_runners = runner_or_list_of_runners _simulate_do_what_i_mean_multiple_runners(list_of_runners, folder) else: # xxxxxxxxxx We only have one SimulationRunner obj xxxxxxxxxxxxxxxx runner = runner_or_list_of_runners _simulate_do_what_i_mean_single_runner(runner, folder, block=True)
# xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
[docs]def _add_folder_to_ipython_engines_path( client: Client, folder: str) -> None: # pragma: no cover """ Add a folder to sys.path of each ipython engine. The list of engines is get as a direct view from 'client'. This will also add the folder to the local python path. Parameters ---------- client : Client The client from which we will get a direct view to access the engines. folder : str The folder to be added to the python path at each engine. """ # Add the folder to the python path of the main application sys.path.append(folder) # We create a direct view to run coe in all engines dview = client.direct_view() # Reset the engines so that we don't have variables there from last # computations dview.execute('%reset') dview.execute('import sys') # Add the folder to the python path of each engine. We use # block=True to ensure that all engines have modified their # path to include the folder with the simulator before we # continue. dview.execute('sys.path.append("{0}")'.format(folder), block=True)
[docs]def _simulate_do_what_i_mean_single_runner( runner: SimulationRunner, folder: Optional[str] = None, block: bool = True) -> None: # pragma: no cover """ This will either call the `simulate` method or the `simulate_in_parallel` method as appropriated. If the 'parameters variation index' was specified in the command line, then the `simulate` method will be called with that index. If not, then the `simulate` method will be called without any index or, if there is an ipython cluster running, the `simulate_in_parallel` method will be called. Parameters ---------- runner : SimulationRunner The SimulationRunner object for which either the 'simulate' or the 'simulate_in_parallel' method will be called. folder : str, optional Folder to be added to the python path. This should be the main pyphysim folder block : bool, optional Passed to the simulate_in_parallel method when the simulation is performed in parallel. If this is false, you need to call the method 'wait_parallel_simulation' of the runner object at some point. """ if runner.command_line_args.index is not None: # Perform the simulation (serially) for the desired index msg = "Simulation will be run for the parameters variation: {0}" print(msg.format(runner.command_line_args.index)) runner.simulate(runner.command_line_args.index) else: run_in_parallel = True try: # If we can get an IPython view that means that the IPython # engines are running. In that case we will perform the # simulation in parallel from ipyparallel import Client # cl = Client(profile="ssh") # noinspection PyTypeChecker cl = Client(profile="default", timeout=2.0) if folder is not None: _add_folder_to_ipython_engines_path(cl, folder) # For the actual simulation we are better using a # load_balanced_view lview = cl.load_balanced_view() except (IOError, ImportError): # If we can't get an IPython view then we will perform the # simulation serially run_in_parallel = False lview = None if run_in_parallel is True: print("Simulation will be run in Parallel") # Remove the " - SNR: {SNR}" string in the progressbar message, # since when the simulation is performed in parallel we get a # single progressbar for all the simulation. runner.progressbar_message = 'Elapsed Time: {{elapsed_time}}' runner.simulate_in_parallel(lview, wait=block) else: print("Simulation will be run serially") runner.simulate()
[docs]def _simulate_do_what_i_mean_multiple_runners( list_of_runners: List[SimulationRunner], folder: Optional[str] = None) -> None: # pragma: no cover """ This will either call the `simulate` method or the `simulate_in_parallel` method as appropriated. If the 'parameters variation index' was specified in the command line, then the `simulate` method will be called with that index. If not, then the `simulate` method will be called without any index or, if there is an ipython cluster running, the `simulate_in_parallel` method will be called. Parameters ---------- list_of_runners : list[SimulationRunner] The `_simulate_do_what_i_mean_single_runner` will be called for each object in the list. folder : str Folder to be added to the python path. This should be the main pyphysim folder. """ # If we have a list of SimulationRunner objects, we want two # things. First, we want to use the same progressbar for all of # them. Second, we want to use 'block=False' for all of them and # only later call the wait_parallel_simulation method for each # runner. # First we will check the progress_output_type variable of the first # runner. If it is 'screen' we will create a progressbar that prints # the progress to the terminal, otherwise we will create a progressbar # the prints the progress to a file. if list_of_runners[0].progress_output_type == 'screen': # Progress will be printed to the screen filename = None else: # Progress will be printed to this file filename = 'multiple_runners_progress.txt' # xxxxxxxxxx Create the shared progressbar object xxxxxxxxxxxxxxxxx pbar = ProgressbarZMQServer(progresschar='*', message="Elapsed Time: {elapsed_time}", sleep_time=1, filename=filename) # xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx # xxxxxxxxxx Start the simulation for each runner xxxxxxxxxxxxxxxxx add_folder_to_path = True for runner in list_of_runners: runner._pbar = pbar # pylint: disable= W0212 if add_folder_to_path is True: _simulate_do_what_i_mean_single_runner(runner, folder, block=False) add_folder_to_path = False else: _simulate_do_what_i_mean_single_runner(runner, None, block=False) # xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx # xxxxx Wait for the simulation of each runner to finish xxxxxxxxxx for runner in list_of_runners: runner.wait_parallel_simulation()
# xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx # xxxxxxxxxx Module Functions - END xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx