diff --git a/docs/tutorials/calib_cancel_tutorial.rst b/docs/tutorials/calib_cancel_tutorial.rst index 316e56ba1..17fc58111 100644 --- a/docs/tutorials/calib_cancel_tutorial.rst +++ b/docs/tutorials/calib_cancel_tutorial.rst @@ -23,10 +23,10 @@ have been marked as cancelled. Overview of the Calibration Problem ----------------------------------- -The generator function featured in this tutorial can be found in -``gen_funcs/persistent_surmise_calib.py`` and uses the `surmise`_ library for its -calibration surrogate model interface. The surmise library uses the "PCGPwM" -emulation method in this example. +The generator class featured in this tutorial can be found in +``gen_classes/surmise_calib.py`` as ``SurmiseCalibrator``, a standardized ``gest-api`` +generator that uses the `surmise`_ library for its calibration surrogate model interface. +The surmise library uses the "PCGPwM" emulation method in this example. Say there is a computer model :math:`f(\theta, x)` to be calibrated. To calibrate is to find some parameter :math:`\theta_0` such that :math:`f(\theta_0, x)` closely @@ -44,7 +44,7 @@ for the known true theta, which in this case is the center of a unit hypercube. are therefore stored at the start of libEnsemble's main :doc:`History array<../history_output_logging>` array, and have associated ``sim_id``'s. -The generator function ``gen_f`` then samples an initial batch of parameters +The generator then samples an initial batch of parameters :math:`(\theta_1, \ldots, \theta_n)` and constructs a surrogate model. For illustration, the initial batch of evaluations are arranged in the following sense: @@ -58,83 +58,96 @@ For illustration, the initial batch of evaluations are arranged in the following The surrogate then generates (suggests) new parameters for ``sim_f`` evaluations, so the number of parameters :math:`n` grows as more evaluations are scheduled and performed. -As more evaluations are performed and received by ``gen_f``, the surrogate evolves and +As more evaluations are performed and ingested by the generator, the surrogate evolves and suggests parameters closer to :math:`\theta_0` with uncertainty estimates. -The calibration can be terminated when either ``gen_f`` determines it has found -:math:`\theta_0` with some tolerance in the surrounding uncertainty, or computational -resource runs out. At termination, the generator exits and returns, initiating the -shutdown of the libEnsemble routine. - -The following is a pseudocode overview of the generator. Functions directly from -the calibration library used within the generator function have the ``calib:`` prefix. -Helper functions defined to improve the data received by the calibration library by -interfacing with libEnsemble have the ``libE:`` prefix. All other statements are -workflow logic or persistent generator helper functions like ``send`` or ``receive``:: - - 1 libE: calculate observation values and first batch - 2 while STOP_signal not received: - 3 receive: evaluated points - 4 unpack points into 2D Theta x Point structures - 5 if new model condition: - 6 calib: construct new model - 7 else: - 8 wait to receive more points - 9 if some condition: - 10 calib: generate new thetas from model - 11 calib: if error threshold reached: - 12 exit loop - done - 13 send: new points to be evaluated - 14 if any sent points must be obviated: - 15 libE: mark points with cancel request - 16 send: points with cancel request +The calibration can be terminated when exit criteria are met or the generator +determines it has found :math:`\theta_0` with some tolerance in the surrounding +uncertainty. At termination, the generator's ``finalize()`` method is called, +initiating the shutdown of the libEnsemble routine. + +The ``SurmiseCalibrator`` class implements the standard ``suggest``/``ingest`` +interface. The generator progresses through three phases: + +1. **Observation phase**: ``suggest_numpy()`` returns points for the true theta to + generate observation data. ``ingest_numpy()`` stores the results as observations. +2. **Initial batch phase**: ``suggest_numpy()`` returns a batch of initial theta + samples. ``ingest_numpy()`` builds the emulator and calibrator from the results. +3. **Main loop phase**: ``ingest_numpy()`` updates tracking arrays and + conditionally rebuilds the model. ``suggest_numpy()`` generates new thetas + when sufficient results have arrived, and queues cancellation requests for + obviated simulations via ``suggest_updates()``. + +The following is pseudocode for the generator's ``suggest``/``ingest`` loop:: + + phase 1 - suggest: generate observation points (true theta) + ingest: store observation results, build obs/obsvar + phase 2 - suggest: generate initial theta batch + ingest: build emulator and calibrator + phase 3 - repeat: + ingest: update tracking arrays from results + if rebuild condition met: update emulator, recalibrate + suggest: if select condition met: + calib: generate new thetas from model + if any pending points must be obviated: + queue cancel requests via suggest_updates() + return new points + else: return empty (wait for more results) Point Cancellation Requests and Dedicated Fields ------------------------------------------------ -While the generator loops and updates the model based on returned -points from simulations, it detects conditionally if any new Thetas should be generated -from the model, simultaneously evaluating if any *pending* simulations ought to be -cancelled ("obviated"). If so, the generator then calls ``cancel_columns()``:: +While the generator's ``suggest_numpy()`` generates new thetas from the model, it +simultaneously evaluates if any *pending* simulations ought to be cancelled +("obviated"). If so, the generator calls its internal ``_cancel_columns()`` +method, which constructs cancellation arrays and queues them for the runner: - if select_condition(pending): - new_theta, info = select_next_theta(step_add_theta, cal, emu, pending, n_explore_theta) +.. code-block:: python + + # Inside SurmiseCalibrator._suggest_main_loop(): + if _select_condition(self.pending): + new_theta, info = select_next_theta(...) ... - c_obviate = info["obviatesugg"] # suggested + c_obviate = info["obviatesugg"] # suggested columns to cancel if len(c_obviate) > 0: - cancel_columns(obs_offset, c_obviate, n_x, pending, ps) - -``obs_offset`` is an offset that excludes the observations when mapping points in surmise -data structures to ``sim_id``'s, ``c_obviate`` is a selection -of columns to cancel, ``n_x`` is the number of ``x`` values, and ``pending`` is used -to check that points marked for cancellation have not already returned. ``ps`` is the -instantiation of the *PersistentSupport* class that is set up for persistent generators, and -provides an interface for communication with the manager. - -Within ``cancel_columns()``, each column in ``c_obviate`` is iterated over, and if a -point is ``pending`` and thus has not yet been evaluated by a simulation, -its ``sim_id`` is appended to a list to be sent to the Manager for cancellation. -Cancellation is requested using the helper function ``request_cancel_sim_ids`` provided -by the *PersistentSupport* class. Each of these helper functions is described -:ref:`here`. The entire ``cancel_columns()`` routine is listed below: + self._cancel_columns(c_obviate) + +``_cancel_columns()`` iterates over the columns to cancel, and for each pending +point, appends its ``sim_id`` to a cancellation array. These arrays are returned +by ``suggest_updates()`` and sent to the manager with ``keep_state=True``: .. code-block:: python - def cancel_columns(obs_offset, c, n_x, pending, ps): - """Cancel columns""" + def _cancel_columns(self, c_obviate): + """Mark columns for cancellation and queue cancellation updates.""" sim_ids_to_cancel = [] - columns = np.unique(c) + columns = np.unique(c_obviate) for c in columns: - col_offset = c * n_x - for i in range(n_x): - sim_id_cancel = obs_offset + col_offset + i - if pending[i, c]: + col_offset = c * self.n_x + for i in range(self.n_x): + sim_id_cancel = self._obs_offset + col_offset + i + if self.pending[i, c]: sim_ids_to_cancel.append(sim_id_cancel) - pending[i, c] = 0 + self.pending[i, c] = 0 + + if sim_ids_to_cancel: + cancel_array = np.zeros(len(sim_ids_to_cancel), dtype=[("sim_id", int), ("cancel_requested", bool)]) + cancel_array["sim_id"] = sim_ids_to_cancel + cancel_array["cancel_requested"] = True + self._pending_cancellations.append(cancel_array) + + + def suggest_updates(self): + """Return pending cancellation updates.""" + updates = self._pending_cancellations + self._pending_cancellations = [] + return updates - ps.request_cancel_sim_ids(sim_ids_to_cancel) +The ``LibensembleGenRunner`` sends these updates to the manager with +``keep_state=True``, which updates existing History rows (setting +``cancel_requested=True``) without changing the generator's active state. In future calls to the allocation function by the manager, points that would have -been distributed for simulation work but are now marked with "cancel_requested" will not +been distributed for simulation work but are now marked with ``cancel_requested`` will not be processed. The manager will send kill signals to workers that are already processing cancelled points. These signals can be caught and acted on by the user ``sim_f``; otherwise they will be ignored. @@ -142,24 +155,30 @@ they will be ignored. Allocation Function and Cancellation Configuration -------------------------------------------------- -The allocation function used in this example is the *only_persistent_gens* function in the -*start_only_persistent* module. The calling script passes the following specifications: +The default allocation function ``only_persistent_gens`` is used automatically +with standardized generators. The relevant settings are passed via ``GenSpecs``: .. code-block:: python - libE_specs["kill_canceled_sims"] = True + gen_specs = GenSpecs( + generator=generator, + persis_in=["f", "sim_id"], + out=gen_out, + initial_batch_size=init_sample_size, + async_return=True, + active_recv_gen=True, + ) + +For the kill-sims test, ``kill_canceled_sims`` is also enabled: - alloc_specs = { - "alloc_f": alloc_f, - "initial_batch_size": init_sample_size, - "async_return": True, - "active_recv_gen": True, - } +.. code-block:: python + + libE_specs["kill_canceled_sims"] = True **async_return** tells the allocation function to return results to the generator as soon as they come back from evaluation (once the initial sample is complete). -**init_sample_size** gives the size of the initial sample that is batch returned to the gen. +**initial_batch_size** gives the size of the initial sample that is batch returned to the gen. This is calculated from other parameters in the calling script. **active_recv_gen** allows the persistent generator to handle irregular communications (see below). @@ -174,18 +193,14 @@ this case) must be prepared for irregular sending/receiving of data. Calling Script - Reading Results -------------------------------- -Within the libEnsemble calling script, once the main :doc:`libE()<../libe_module>` -function call has returned, it's a simple enough process to view the History rows +Within the libEnsemble calling script, once the ``Ensemble.run()`` method +has returned, it's a simple enough process to view the History rows that were marked as cancelled:: - if __name__ == "__main__": # required by multiprocessing on macOS and windows - H, persis_info, flag = libE(sim_specs, gen_specs, - exit_criteria, persis_info, - alloc_specs=alloc_specs, - libE_specs=libE_specs) + H, _, _ = test.run() - if is_manager: - print("Cancelled sims", H["cancel_requested"]) + if test.is_manager: + print("Cancelled sims", H["sim_id"][H["cancel_requested"]]) Here's an example graph showing the relationship between scheduled, cancelled (obviated), failed, and completed simulations requested by the ``gen_f``. Notice that for each @@ -198,9 +213,9 @@ successfully obviated: :align: center Please see the ``test_persistent_surmise_calib.py`` regression test for an example -routine using the surmise calibration generator. -The associated simulation function and allocation function are included in -``sim_funcs/surmise_test_function.py`` and ``alloc_funcs/start_only_persistent.py`` respectively. +routine using the ``SurmiseCalibrator`` generator class. +The associated simulation function is in ``sim_funcs/surmise_test_function.py``, +and the default ``only_persistent_gens`` allocation function is used automatically. Using cancellations to kill running simulations ------------------------------------------------ diff --git a/libensemble/gen_classes/surmise_calib.py b/libensemble/gen_classes/surmise_calib.py new file mode 100644 index 000000000..a996cf371 --- /dev/null +++ b/libensemble/gen_classes/surmise_calib.py @@ -0,0 +1,401 @@ +""" +Surmise calibration generator using the gest-api interface. + +This module contains a calibration generator that uses the Surmise package +for Bayesian calibration with surrogate model emulation. It supports +selective cancellation of pending simulations as the model evolves. +""" + +__all__ = ["SurmiseCalibrator"] + +import numpy as np +from gest_api.vocs import VOCS +from numpy import typing as npt +from surmise.calibration import calibrator +from surmise.emulation import emulator + +from libensemble.gen_funcs.surmise_calib_support import ( + gen_observations, + gen_thetas, + gen_true_theta, + gen_xs, + select_next_theta, + thetaprior, +) +from libensemble.generators import LibensembleGenerator + +# Generator phases +_PHASE_OBS = 0 # Generating observation points (true theta) +_PHASE_INITIAL = 1 # Generating initial theta batch +_PHASE_MAIN = 2 # Main calibration loop +_PHASE_DONE = 3 # Generator has finished + + +def _build_emulator(theta, x, fevals): + """Build the emulator.""" + print(x.shape, theta.shape, fevals.shape) + emu = emulator( + x, + theta, + fevals, + method="PCGPwM", + options={ + "xrmnan": "all", + "thetarmnan": "never", + "return_grad": True, + }, + ) + emu.fit() + return emu + + +def _select_condition(pending, n_remaining_theta=5): + """Determine whether enough pending evaluations have returned to select new thetas.""" + n_x = pending.shape[0] + return False if np.sum(pending) > n_remaining_theta * n_x else True + + +def _rebuild_condition(pending, prev_pending, n_theta=5): + """Determine whether enough new results have arrived to rebuild the emulator.""" + n_x = pending.shape[0] + if np.sum(prev_pending) - np.sum(pending) >= n_x * n_theta or np.sum(pending) == 0: + return True + else: + return False + + +class SurmiseCalibrator(LibensembleGenerator): + """ + Bayesian calibration generator using the Surmise package. + + Uses a Gaussian process emulator (PCGPwM method) to build a surrogate + model and a Bayesian calibrator to select informative parameters. + Supports cancellation of pending simulations that become unnecessary + as the model evolves. + + Parameters + ---------- + vocs : VOCS + Variable and objective specification. Variables should be split into + two groups by naming convention: ``x``-prefixed variables are input + coordinates (e.g. ``x0``, ``x1``, ``x2``) and ``theta``-prefixed + variables are calibration parameters (e.g. ``theta0``, ``theta1``). + The objective (e.g. ``f``) is the simulation output. + n_init_thetas : int + Number of initial theta parameter sets to evaluate. + num_x_vals : int + Number of x-coordinate points to create. + step_add_theta : int + Number of new theta parameters to generate per selection step. + n_explore_theta : int + Number of theta candidates to explore when selecting the next batch. + obsvar : float + Variance constant for generating observation noise. + priorloc : float + Location (mean) of the normal prior on theta parameters. + priorscale : float + Scale (std) of the normal prior on theta parameters. + random_seed : int + Seed for the random number generator. + """ + + def __init__( + self, + vocs: VOCS, + n_init_thetas: int = 15, + num_x_vals: int = 25, + step_add_theta: int = 10, + n_explore_theta: int = 200, + obsvar: float = 0.1, + priorloc: float = 1, + priorscale: float = 0.5, + random_seed: int = 1, + *args, + **kwargs, + ): + # Determine x and theta variable names from VOCS before super().__init__ + x_var_names = [v for v in vocs.variables if v.startswith("x")] + theta_var_names = [v for v in vocs.variables if v.startswith("theta")] + + if not x_var_names: + raise ValueError("VOCS must contain x-prefixed variables (e.g. x0, x1, x2)") + if not theta_var_names: + raise ValueError("VOCS must contain theta-prefixed variables (e.g. theta0, theta1)") + + # Set up variables_mapping so the auto-mapping in LibensembleGenerator + # doesn't map all variables to a single "x" field. We need separate + # "x" and "thetas" compound fields in the H-array. + variables_mapping = kwargs.pop("variables_mapping", {}) + if not variables_mapping: + variables_mapping = { + "x": x_var_names, + "thetas": theta_var_names, + } + + super().__init__(vocs, *args, variables_mapping=variables_mapping, **kwargs) + + self._n_x_dims = len(x_var_names) + self._n_theta_dims = len(theta_var_names) + + self.n_init_thetas = n_init_thetas + self.n_x = num_x_vals + self.step_add_theta = step_add_theta + self.n_explore_theta = n_explore_theta + self.obsvar_const = obsvar + self.rng = np.random.default_rng(random_seed) + + # Set up prior and generate x-coordinates + self.prior = thetaprior(priorloc, priorscale, self.rng) + self.x = gen_xs(self.n_x, self.rng) + + # Output dtype for compound fields + self._out_dtype = [ + ("x", float, self._n_x_dims), + ("thetas", float, self._n_theta_dims), + ("priority", int), + ] + + # Internal state + self._phase = _PHASE_OBS + self._obs_offset = self.n_x # Number of observation sim_ids before theta evaluations + + # Observation data (populated after obs results arrive) + self.obs: npt.NDArray | None = None + self._obsvar: float | None = None + + # Calibration model objects + self.emu: emulator | None = None + self.cal: calibrator | None = None + + # Accumulated theta parameters and evaluation tracking arrays + self.theta: npt.NDArray | None = None + self.fevals: npt.NDArray | None = None + self.pending: npt.NDArray | None = None + self.prev_pending: npt.NDArray | None = None + self.complete: npt.NDArray | None = None + + # Pending cancellation updates to be returned by suggest_updates() + self._pending_cancellations: list[npt.NDArray] = [] + + def _validate_vocs(self, vocs) -> None: + pass + + def _make_output(self, xs, thetas, set_priorities=False): + """Create output array for a batch of (x, theta) pairs. + + The output uses compound fields ``x`` (shape n_x_dims) and ``thetas`` + (shape n_theta_dims), matching the legacy H-array layout. + + Parameters + ---------- + xs : np.ndarray + X-coordinate points, shape (n_x, n_x_dims). + thetas : np.ndarray + Theta parameters, shape (n_thetas, n_theta_dims). + set_priorities : bool + If True, assign randomized priorities. + + Returns + ------- + H_o : np.ndarray + Structured array with one row per (x_i, theta_j) pair. + """ + n_thetas = len(thetas) + n_x = len(xs) + n_points = n_x * n_thetas + + H_o = np.zeros(n_points, dtype=self._out_dtype) + + for i, xval in enumerate(xs): + start = i * n_thetas + H_o["x"][start : start + n_thetas] = xval + H_o["thetas"][start : start + n_thetas] = thetas + + if set_priorities: + priority = np.arange(n_points) + self.rng.shuffle(priority) + H_o["priority"] = priority + + return H_o + + def suggest_numpy(self, num_points: int | None = None) -> npt.NDArray: + """Return the next batch of points to evaluate. + + The generator transitions through three phases: + + 1. **Observation phase**: returns ``(1 * n_x)`` points using the true theta. + 2. **Initial batch phase**: returns ``(n_init_thetas * n_x)`` points. + 3. **Main loop phase**: conditionally returns new theta points or an + empty array when waiting for more results. + """ + if self._phase == _PHASE_OBS: + # Generate observation points using the true theta + true_theta = gen_true_theta() + H_o = self._make_output(self.x, true_theta) + return H_o + + elif self._phase == _PHASE_INITIAL: + # Generate initial batch of thetas + self.theta = gen_thetas(self.prior, self.n_init_thetas) + H_o = self._make_output(self.x, self.theta, set_priorities=True) + return H_o + + elif self._phase == _PHASE_MAIN: + return self._suggest_main_loop() + + # Done + return np.zeros(0, dtype=self._out_dtype) + + def _suggest_main_loop(self): + """Handle suggest logic for the main calibration loop.""" + # Check if we should generate new thetas + if _select_condition(self.pending): + new_theta, info = select_next_theta( + self.step_add_theta, self.cal, self.emu, self.pending, self.n_explore_theta + ) + + # Extend tracking arrays for new thetas + self._pad_arrays(new_theta) + + # Build output array + H_o = self._make_output(self.x, new_theta, set_priorities=True) + + # Determine evaluations to cancel + c_obviate = info["obviatesugg"] + if len(c_obviate) > 0: + print(f"columns sent for cancel is: {c_obviate}", flush=True) + self._cancel_columns(c_obviate) + self.pending[:, c_obviate] = False + + return H_o + + # Nothing to suggest yet - return empty array + return np.zeros(0, dtype=self._out_dtype) + + def ingest_numpy(self, calc_in: npt.NDArray) -> None: + """Receive evaluated results and update internal state. + + Handles the three phases: observation results, initial batch results, + and ongoing calibration results. + """ + if calc_in is None or len(calc_in) == 0: + return + + if self._phase == _PHASE_OBS: + # Observation results - construct obs and obsvar + returned_fevals: npt.NDArray = np.reshape(calc_in["f"], (1, self.n_x)) + true_fevals = returned_fevals + self.obs, self._obsvar = gen_observations(true_fevals, self.obsvar_const, self.rng) + self._phase = _PHASE_INITIAL + + elif self._phase == _PHASE_INITIAL: + # Initial batch results - build emulator and calibrator + self.fevals = np.reshape(calc_in["f"], (self.n_x, self.n_init_thetas)) + assert self.fevals is not None + self.pending = np.full(self.fevals.shape, False) + self.prev_pending = self.pending.copy() + self.complete = np.full(self.fevals.shape, True) + + self.emu = _build_emulator(self.theta, self.x, self.fevals) + self.cal = calibrator(self.emu, self.obs, self.x, self.prior, self._obsvar, method="directbayes") + assert self.cal is not None + + print( + "quantiles:", + np.round(np.quantile(self.cal.theta.rnd(10000), (0.01, 0.99), axis=0), 3), + ) + self._phase = _PHASE_MAIN + + elif self._phase == _PHASE_MAIN: + # Main loop - update tracking arrays with new results + self._update_arrays(calc_in) + + # Check if we should rebuild the model + if _rebuild_condition(self.pending, self.prev_pending): + self._rebuild_model() + + def _update_arrays(self, calc_in): + """Unpack results into 2D (n_x, n_thetas) tracking arrays.""" + sim_id = calc_in["sim_id"] + c, r = divmod(sim_id - self._obs_offset, self.n_x) + + self.fevals[r, c] = calc_in["f"] + self.pending[r, c] = False + self.complete[r, c] = True + + def _rebuild_model(self): + """Rebuild the emulator and recalibrate.""" + print( + "Percentage Cancelled: %0.2f ( %d / %d)" + % ( + 100 * np.round(np.mean(1 - self.pending - self.complete), 4), + np.sum(1 - self.pending - self.complete), + np.prod(self.pending.shape), + ) + ) + print( + "Percentage Pending: %0.2f ( %d / %d)" + % ( + 100 * np.round(np.mean(self.pending), 4), + np.sum(self.pending), + np.prod(self.pending.shape), + ) + ) + print( + "Percentage Complete: %0.2f ( %d / %d)" + % ( + 100 * np.round(np.mean(self.complete), 4), + np.sum(self.complete), + np.prod(self.pending.shape), + ) + ) + + self.emu.update(theta=self.theta, f=self.fevals) + self.cal.fit() + + samples = self.cal.theta.rnd(2500) + print(np.mean(np.sum((samples - np.array([0.5] * self._n_theta_dims)) ** 2, 1))) + print(np.round(np.quantile(self.cal.theta.rnd(10000), (0.01, 0.99), axis=0), 3)) + + self.step_add_theta += 2 + self.prev_pending = self.pending.copy() + + def _pad_arrays(self, new_theta): + """Extend tracking arrays for new thetas.""" + n_new = len(new_theta) + self.theta = np.vstack((self.theta, new_theta)) + self.fevals = np.hstack((self.fevals, np.full((self.n_x, n_new), np.nan))) + self.pending = np.hstack((self.pending, np.full((self.n_x, n_new), True))) + self.prev_pending = np.hstack((self.prev_pending, np.full((self.n_x, n_new), True))) + self.complete = np.hstack((self.complete, np.full((self.n_x, n_new), False))) + + def _cancel_columns(self, c_obviate): + """Mark columns for cancellation and queue cancellation updates.""" + sim_ids_to_cancel = [] + columns = np.unique(c_obviate) + for c in columns: + col_offset = c * self.n_x + for i in range(self.n_x): + sim_id_cancel = self._obs_offset + col_offset + i + if self.pending[i, c]: + sim_ids_to_cancel.append(sim_id_cancel) + self.pending[i, c] = 0 + + if sim_ids_to_cancel: + cancel_array = np.zeros(len(sim_ids_to_cancel), dtype=[("sim_id", int), ("cancel_requested", bool)]) + cancel_array["sim_id"] = sim_ids_to_cancel + cancel_array["cancel_requested"] = True + self._pending_cancellations.append(cancel_array) + + def suggest_updates(self): + """Return pending cancellation updates to be sent to the manager. + + Returns a list of numpy arrays, each containing ``sim_id`` and + ``cancel_requested`` fields for points that should be cancelled. + The runner sends these with ``keep_state=True`` so the manager + updates existing History rows without changing the generator's + active state. + """ + updates = self._pending_cancellations + self._pending_cancellations = [] + return updates diff --git a/libensemble/sim_funcs/surmise_test_function.py b/libensemble/sim_funcs/surmise_test_function.py index 29b591356..e3d729bb6 100644 --- a/libensemble/sim_funcs/surmise_test_function.py +++ b/libensemble/sim_funcs/surmise_test_function.py @@ -1,7 +1,5 @@ """ -Created on Tue Feb 9 10:27:23 2021 - -@author: mosesyhc +Borehole test simulation functions for the Surmise calibration example. """ import numpy as np diff --git a/libensemble/tests/regression_tests/test_persistent_surmise_calib.py b/libensemble/tests/regression_tests/test_persistent_surmise_calib.py index 3820bfdec..697099411 100644 --- a/libensemble/tests/regression_tests/test_persistent_surmise_calib.py +++ b/libensemble/tests/regression_tests/test_persistent_surmise_calib.py @@ -1,5 +1,5 @@ """ -Runs libEnsemble with Surmise calibration test. +Runs libEnsemble with Surmise calibration test using the gest-api generator. Execute via one of the following commands (e.g. 3 workers): mpiexec -np 4 python test_persistent_surmise_calib.py @@ -25,7 +25,6 @@ # TESTSUITE_COMMS: mpi local tcp # TESTSUITE_NPROCS: 4 # TESTSUITE_EXTRA: true -# TESTSUITE_OS_SKIP: OSX # Requires: # Install Surmise package @@ -33,11 +32,10 @@ import sys import numpy as np +from gest_api.vocs import VOCS from libensemble import Ensemble -from libensemble.gen_funcs.persistent_surmise_calib import surmise_calib as gen_f - -# Import libEnsemble items for this test +from libensemble.gen_classes.surmise_calib import SurmiseCalibrator from libensemble.sim_funcs.surmise_test_function import borehole as sim_f from libensemble.sim_funcs.surmise_test_function import tstd2theta from libensemble.specs import ExitCriteria, GenSpecs, SimSpecs @@ -62,15 +60,39 @@ def run_surmise_calib(): # Batch mode until after init_sample_size (add one theta to batch for observations) init_sample_size = (n_init_thetas + 1) * n_x - # Stop after max_emul_runs runs of the emulator + # Stop after max_evals evaluations max_evals = init_sample_size + max_add_thetas * n_x + # Define the problem via VOCS + vocs = VOCS( + variables={ + "x0": [0, 1.0], + "x1": [0, 1.0], + "x2": [0, 1.0], + "theta0": [0, 1.0], + "theta1": [0, 1.0], + "theta2": [0, 1.0], + "theta3": [0, 1.0], + }, + objectives={"f": "EXPLORE"}, + ) + + # Initialize the standardized generator + generator = SurmiseCalibrator( + vocs, + n_init_thetas=n_init_thetas, + num_x_vals=n_x, + step_add_theta=step_add_theta, + n_explore_theta=n_explore_theta, + obsvar=obsvar, + priorloc=1, + priorscale=0.5, + ) + gen_out = [ ("x", float, ndims), ("thetas", float, nparams), ("priority", int), - ("obs", float, n_x), - ("obsvar", float, n_x), ] test = Ensemble( @@ -82,18 +104,9 @@ def run_surmise_calib(): user={"num_obs": n_x}, ), gen_specs=GenSpecs( - gen_f=gen_f, - persis_in=[o[0] for o in gen_out] + ["f", "sim_ended", "sim_id"], + generator=generator, + persis_in=["f", "sim_id"], out=gen_out, - user={ - "n_init_thetas": n_init_thetas, # No. of thetas in initial batch - "num_x_vals": n_x, # No. of x points to create - "step_add_theta": step_add_theta, # No. of thetas to generate per step - "n_explore_theta": n_explore_theta, # No. of thetas to explore each step - "obsvar": obsvar, # Variance for generating noise in obs - "priorloc": 1, # Prior location in the unit cube - "priorscale": 0.5, # Standard deviation of prior - }, initial_batch_size=init_sample_size, async_return=True, active_recv_gen=True, diff --git a/libensemble/tests/regression_tests/test_persistent_surmise_killsims.py b/libensemble/tests/regression_tests/test_persistent_surmise_killsims.py index aeb10f4f5..76f08bacc 100644 --- a/libensemble/tests/regression_tests/test_persistent_surmise_killsims.py +++ b/libensemble/tests/regression_tests/test_persistent_surmise_killsims.py @@ -1,5 +1,6 @@ """ -Tests libEnsemble's capability to kill/cancel simulations that are in progress. +Tests libEnsemble's capability to kill/cancel simulations that are in progress, +using the gest-api generator interface. Execute via one of the following commands (e.g. 3 workers): mpiexec -np 4 python test_persistent_surmise_killsims.py @@ -14,7 +15,7 @@ subprocesses a compiled version of the borehole simulation. A delay is added to simulations after the initial batch, so that the killing of running simulations can be tested. This will only affect simulations that have already -been issued to a worker when the cancel request is registesred by the manager. +been issued to a worker when the cancel request is registered by the manager. See more information, see tutorial: "Borehole Calibration with Selective Simulation Cancellation" @@ -25,7 +26,6 @@ # TESTSUITE_COMMS: mpi local tcp # TESTSUITE_NPROCS: 3 4 # TESTSUITE_EXTRA: true -# TESTSUITE_OS_SKIP: OSX # Requires: # Install Surmise package @@ -33,14 +33,16 @@ import os import numpy as np +from gest_api.vocs import VOCS from libensemble.executors.executor import Executor -from libensemble.gen_funcs.persistent_surmise_calib import surmise_calib as gen_f +from libensemble.gen_classes.surmise_calib import SurmiseCalibrator # Import libEnsemble items for this test from libensemble.libE import libE from libensemble.sim_funcs.borehole_kills import borehole as sim_f -from libensemble.tests.regression_tests.common import build_borehole # current location +from libensemble.specs import GenSpecs, SimSpecs +from libensemble.tests.regression_tests.common import build_borehole from libensemble.tools import parse_args, save_libE_output # from libensemble import logger @@ -61,7 +63,7 @@ # Batch mode until after init_sample_size (add one theta to batch for observations) init_sample_size = (n_init_thetas + 1) * n_x - # Stop after max_emul_runs runs of the emulator + # Stop after max_evals evaluations max_evals = init_sample_size + max_add_thetas * n_x sim_app = os.path.join(os.getcwd(), "borehole.x") @@ -79,50 +81,64 @@ en_suffix = str(nworkers) + "_" + libE_specs.get("comms") libE_specs["ensemble_dir_path"] = "ensemble_calib_kills_w" + en_suffix - sim_specs = { - "sim_f": sim_f, - "in": ["x", "thetas"], - "out": [ - ("f", float), - ("sim_killed", bool), # "sim_killed" is used only for display at the end of this test - ], - "user": { - "num_obs": n_x, - "init_sample_size": init_sample_size, + # Define the problem via VOCS + vocs = VOCS( + variables={ + "x0": [0, 1.0], + "x1": [0, 1.0], + "x2": [0, 1.0], + "theta0": [0, 1.0], + "theta1": [0, 1.0], + "theta2": [0, 1.0], + "theta3": [0, 1.0], }, - } + objectives={"f": "EXPLORE"}, + ) + + # Initialize the standardized generator + generator = SurmiseCalibrator( + vocs, + n_init_thetas=n_init_thetas, + num_x_vals=n_x, + step_add_theta=step_add_theta, + n_explore_theta=n_explore_theta, + obsvar=obsvar, + priorloc=1, + priorscale=0.2, + ) gen_out = [ ("x", float, ndims), ("thetas", float, nparams), ("priority", int), - ("obs", float, n_x), - ("obsvar", float, n_x), ] - gen_specs = { - "gen_f": gen_f, - "persis_in": [o[0] for o in gen_out] + ["f", "sim_ended", "sim_id"], - "out": gen_out, - "initial_batch_size": init_sample_size, - "async_return": True, - "active_recv_gen": True, - "user": { - "n_init_thetas": n_init_thetas, # Num thetas in initial batch - "num_x_vals": n_x, # Num x points to create - "step_add_theta": step_add_theta, # No. of thetas to generate per step - "n_explore_theta": n_explore_theta, # No. of thetas to explore each step - "obsvar": obsvar, # Variance for generating noise in obs - "priorloc": 1, # Prior location in the unit cube. - "priorscale": 0.2, # Standard deviation of prior + sim_specs = SimSpecs( + sim_f=sim_f, + inputs=["x", "thetas"], + out=[ + ("f", float), + ("sim_killed", bool), + ], + user={ + "num_obs": n_x, + "init_sample_size": init_sample_size, }, - } + ) + + gen_specs = GenSpecs( + generator=generator, + persis_in=["f", "sim_id"], + out=gen_out, + initial_batch_size=init_sample_size, + async_return=True, + active_recv_gen=True, + ) - persis_info = {} exit_criteria = {"sim_max": max_evals} # Perform the run - H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, libE_specs=libE_specs) + H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, {}, libE_specs=libE_specs) if is_manager: print("Cancelled sims", H["sim_id"][H["cancel_requested"]]) diff --git a/libensemble/tests/unit_tests/test_ufunc_runners.py b/libensemble/tests/unit_tests/test_ufunc_runners.py index 79fda7c28..a4f00e807 100644 --- a/libensemble/tests/unit_tests/test_ufunc_runners.py +++ b/libensemble/tests/unit_tests/test_ufunc_runners.py @@ -133,6 +133,94 @@ def test_globus_compute_runner_fail(): pytest.fail("Expected exception") +def test_libensemble_gen_runner_loop_with_updates(): + """Test that LibensembleGenRunner._loop_over_gen sends updates with keep_state=True.""" + from libensemble.message_numbers import EVAL_GEN_TAG, PERSIS_STOP + from libensemble.utils.runners import LibensembleGenRunner + + # Create mock generator with suggest_numpy, suggest_updates, ingest_numpy + mock_gen = mock.Mock() + mock_gen.variables_mapping = {} + + H_out = np.zeros(2, dtype=[("x", float)]) + H_out["x"] = [1.0, 2.0] + mock_gen.suggest_numpy.return_value = H_out + + cancel_update = np.zeros(1, dtype=[("sim_id", int), ("cancel_requested", bool)]) + cancel_update["sim_id"] = [5] + cancel_update["cancel_requested"] = True + mock_gen.suggest_updates.return_value = [cancel_update] + + mock_gen.ingest_numpy.return_value = None + + specs = {"generator": mock_gen, "batch_size": 2} + runner = LibensembleGenRunner(specs) + + # Mock PersistentSupport + mock_ps = mock.Mock() + H_in = np.zeros(2, dtype=[("f", float), ("sim_id", int)]) + H_in["f"] = [0.5, 0.6] + H_in["sim_id"] = [0, 1] + + # First recv returns EVAL_GEN_TAG (continue), second returns PERSIS_STOP (exit) + mock_ps.recv.side_effect = [ + (EVAL_GEN_TAG, {}, H_in), + (PERSIS_STOP, {}, None), + ] + runner.ps = mock_ps + + # Start the loop with a non-STOP tag + runner._loop_over_gen(EVAL_GEN_TAG, {}, H_in) + + # Verify: send was called with H_out, then with cancel_update using keep_state=True + assert mock_ps.send.call_count >= 2, "send should be called at least twice (H_out + update)" + + # First send call: H_out (no keep_state) + first_send_args, first_send_kwargs = mock_ps.send.call_args_list[0] + np.testing.assert_array_equal(first_send_args[0], H_out) + assert first_send_kwargs.get("keep_state", False) is False or "keep_state" not in first_send_kwargs + + # Second send call: cancel_update with keep_state=True + second_send_args, second_send_kwargs = mock_ps.send.call_args_list[1] + np.testing.assert_array_equal(second_send_args[0], cancel_update) + assert second_send_kwargs.get("keep_state") is True, "Updates should be sent with keep_state=True" + + # recv was used (not send_recv) when updates were present + assert mock_ps.recv.call_count >= 1 + # send_recv should NOT have been called on the iteration with updates + mock_ps.send_recv.assert_not_called() + + +def test_libensemble_gen_runner_loop_without_updates(): + """Test that LibensembleGenRunner._loop_over_gen uses send_recv when no updates.""" + from libensemble.message_numbers import EVAL_GEN_TAG, PERSIS_STOP + from libensemble.utils.runners import LibensembleGenRunner + + mock_gen = mock.Mock() + mock_gen.variables_mapping = {} + + H_out = np.zeros(2, dtype=[("x", float)]) + mock_gen.suggest_numpy.return_value = H_out + mock_gen.suggest_updates.return_value = [] # Empty updates + mock_gen.ingest_numpy.return_value = None + + specs = {"generator": mock_gen, "batch_size": 2} + runner = LibensembleGenRunner(specs) + + mock_ps = mock.Mock() + H_in = np.zeros(2, dtype=[("f", float), ("sim_id", int)]) + + mock_ps.send_recv.return_value = (PERSIS_STOP, {}, None) + runner.ps = mock_ps + + runner._loop_over_gen(EVAL_GEN_TAG, {}, H_in) + + # send_recv should be used when there are no updates + mock_ps.send_recv.assert_called_once() + # send should NOT have been called separately + mock_ps.send.assert_not_called() + + if __name__ == "__main__": test_normal_runners() test_thread_runners() @@ -140,3 +228,5 @@ def test_globus_compute_runner_fail(): test_globus_compute_runner_init() test_globus_compute_runner_pass() test_globus_compute_runner_fail() + test_libensemble_gen_runner_loop_with_updates() + test_libensemble_gen_runner_loop_without_updates() diff --git a/libensemble/utils/runners.py b/libensemble/utils/runners.py index 72f8d1d4f..b3cb1f06d 100644 --- a/libensemble/utils/runners.py +++ b/libensemble/utils/runners.py @@ -252,6 +252,22 @@ def _get_points_updates(self, batch_size: int) -> (npt.NDArray, list): updates = None return numpy_out, updates + def _loop_over_gen(self, tag, Work, H_in): + """Interact with suggest/ingest generator, sending any updates (e.g. cancellations) via keep_state""" + while tag not in [PERSIS_STOP, STOP_TAG]: + batch_size = self.specs.get("batch_size") or len(H_in) + H_out, updates = self._get_points_updates(batch_size) + if updates is not None and len(updates): + self.ps.send(H_out) + for update in updates: + self.ps.send(update, keep_state=True) + tag, Work, H_in = self.ps.recv() + else: + tag, Work, H_in = self.ps.send_recv(H_out) + if H_in is not None: + self._convert_ingest(H_in) + return H_in + def _convert_ingest(self, x: npt.NDArray) -> list: self.gen.ingest_numpy(unmap_numpy_array(x, mapping=getattr(self.gen, "variables_mapping", {})))