diff --git a/libensemble/alloc_funcs/start_only_persistent.py b/libensemble/alloc_funcs/start_only_persistent.py index 6b02b4c60..e97383f0a 100644 --- a/libensemble/alloc_funcs/start_only_persistent.py +++ b/libensemble/alloc_funcs/start_only_persistent.py @@ -68,17 +68,20 @@ def only_persistent_gens(W, H, sim_specs, gen_specs, alloc_specs, persis_info, l gen_count = support.count_persis_gens() Work = {} - # Asynchronous return to generator - async_return = user.get("async_return", False) and sum(H["sim_ended"]) >= initial_batch_size + # Asynchronous return to generator. + # Use the manager-maintained counter instead of re-scanning the full H array. + async_return = user.get("async_return", False) and libE_info["sim_ended_count"] >= initial_batch_size if gen_count < persis_info.get("num_gens_started", 0): # When a persistent worker is done, trigger a shutdown (returning exit condition of 1) return Work, persis_info, 1 - # Give evaluated results back to a running persistent gen + # Give evaluated results back to a running persistent gen. + # Compute the sim_ended & ~gen_informed mask once; AND with per-worker gen_inds inside the loop. + pending_sim = H["sim_ended"] & ~H["gen_informed"] for wid in support.avail_worker_ids(persistent=EVAL_GEN_TAG, active_recv=active_recv_gen): gen_inds = H["gen_worker"] == wid - returned_but_not_given = np.logical_and.reduce((H["sim_ended"], ~H["gen_informed"], gen_inds)) + returned_but_not_given = pending_sim & gen_inds if np.any(returned_but_not_given): if async_return or support.all_sim_ended(H, gen_inds): point_ids = np.where(returned_but_not_given)[0] diff --git a/libensemble/history.py b/libensemble/history.py index 80f848cca..f7607e3c8 100644 --- a/libensemble/history.py +++ b/libensemble/history.py @@ -1,12 +1,21 @@ +import json import logging import time +from pathlib import Path +from typing import TYPE_CHECKING import numpy as np import numpy.typing as npt from libensemble.tools.fields_keys import libE_fields, protected_libE_fields +if TYPE_CHECKING: + from libensemble.logger import LibensembleLogger + logger = logging.getLogger(__name__) +if TYPE_CHECKING: + assert isinstance(logger, LibensembleLogger) + # For debug messages - uncomment # logger.setLevel(logging.DEBUG) @@ -69,14 +78,14 @@ def __init__( if "sim_started" not in fields: logger.manager_warning( # type: ignore[attr-defined] - "Marking entries in H0 as having been " + "'sim_started' and 'sim_ended'" + "Marking entries in H0 as having been 'sim_started' and 'sim_ended'" ) H["sim_started"][: len(H0)] = 1 H["sim_ended"][: len(H0)] = 1 elif "sim_ended" not in fields: logger.manager_warning( # type: ignore[attr-defined] - "Marking entries in H0 as having been " + "'sim_ended' if 'sim_started'" + "Marking entries in H0 as having been 'sim_ended' if 'sim_started'" ) H["sim_ended"][: len(H0)] = H0["sim_started"] @@ -102,27 +111,165 @@ def __init__( self.index = len(H0) self.grow_count = 0 self.safe_mode = False + self.use_cache = False - self.sim_started_count = np.sum(H["sim_started"]) - self.sim_ended_count = np.sum(H["sim_ended"]) - self.gen_informed_count = np.sum(H["gen_informed"]) + self.sim_started_count: int = np.sum(H["sim_started"]) + self.sim_ended_count: int = np.sum(H["sim_ended"]) + self.gen_informed_count: int = np.sum(H["gen_informed"]) self.given_back_warned = False - self.sim_started_offset = self.sim_started_count - self.sim_ended_offset = self.sim_ended_count - self.gen_informed_offset = self.gen_informed_count + self.sim_started_offset: int = self.sim_started_count + self.sim_ended_offset: int = self.sim_ended_count + self.gen_informed_offset: int = self.gen_informed_count self.last_started = -1 self.last_ended = -1 + def init_cache( + self, + cache_name: str, + cache_dir: str | Path, + spec_hash: str | None = None, + ) -> None: + self.cache_dir = Path(cache_dir).expanduser() + self.cache_dir.mkdir(parents=True, exist_ok=True) + self.cache = self.cache_dir / Path(cache_name + ".npy") + self.cache_meta = self.cache_dir / Path(cache_name + ".meta.json") + self.spec_hash = spec_hash + self.use_cache = True + self.cache_set = False + + # Precompute the sorted user-field names and their dtypes once, so + # _shelf_longrunning_sims doesn't recompute them on every sim return. + libE_field_names = {k[0] for k in libE_fields} + self.cache_keys = sorted([n for n in self.H.dtype.names if n not in libE_field_names]) + self.cache_dtype = np.dtype(sorted([(n, self.H.dtype.fields[n][0]) for n in self.cache_keys])) + + # Buffer for new entries collected during this run; deduplicated via bytes key. + self._cache_buffer: list = [] + self._cache_seen: set = set() + + # Validate any existing cache against the configuration hash. + cache_valid = False + if self.cache.exists(): + if self.cache_meta.exists(): + try: + with open(self.cache_meta) as f: + meta = json.load(f) + if meta.get("spec_hash") == spec_hash: + cache_valid = True + except (json.JSONDecodeError, KeyError): + pass + if not cache_valid: + logger.debug( + "Cache hash mismatch or missing metadata — starting fresh: %s", + self.cache.name, + ) + self.cache.unlink(missing_ok=True) + + if not self.cache.exists(): + self.cache.touch() + + try: + self.in_cache = np.load(self.cache, allow_pickle=True) + except EOFError: + self.in_cache = None + + # Pre-populate the seen-set from any on-disk entries so we don't re-add them. + # Also mark cache_set=True immediately when there is existing data — the manager + # uses this flag to decide whether to scan the cache when dispatching sim work. + if self.in_cache is not None and len(self.in_cache) > 0: + for row in self.in_cache: + self._cache_seen.add(row.tobytes()) + self.cache_set = True + def _append_new_fields(self, H_f: npt.NDArray) -> None: - dtype_new = np.dtype(list(set(self.H.dtype.descr + np.lib.recfunctions.repack_fields(H_f).dtype.descr))) + import numpy.lib.recfunctions as rfn + + dtype_new: np.dtype = np.dtype(list(set(self.H.dtype.descr + rfn.repack_fields(H_f).dtype.descr))) + H_new = np.zeros(len(self.H), dtype=dtype_new) old_fields = self.H.dtype.names for field in old_fields: H_new[field][: len(self.H)] = self.H[field] self.H = H_new + def _shelf_longrunning_sims(self, index): + """Cache any f values that ran for more than a second. + + Uses a bytes-keyed set for O(1) deduplication instead of np.unique on + every insertion, and accumulates new entries in a plain Python list that + is only materialised into a structured array at save_cache() time. + """ + if self.H[index]["sim_ended_time"] - self.H[index]["sim_started_time"] <= 1: + return + entry = np.array([self.H[index][self.cache_keys]], dtype=self.cache_dtype) + key = entry[0].tobytes() + if key in self._cache_seen: + return + self._cache_seen.add(key) + self._cache_buffer.append(entry) + self.cache_set = True + + def _materialize_cache(self) -> npt.NDArray | None: + """Combine the on-disk cache with any buffered new entries into one array.""" + parts = [] + if self.in_cache is not None: + parts.append(self.in_cache) + if self._cache_buffer: + parts.append(np.concatenate(self._cache_buffer)) + if not parts: + return None + return np.concatenate(parts) if len(parts) > 1 else parts[0] + + def save_cache(self) -> None: + if self.use_cache and self.cache_set: + combined = self._materialize_cache() + if combined is not None: + np.save(self.cache, combined, allow_pickle=True) + if self.spec_hash: + with open(self.cache_meta, "w") as f: + json.dump({"spec_hash": self.spec_hash}, f) + + def get_shelved_sims(self) -> npt.NDArray: + combined = self._materialize_cache() + return combined if combined is not None else np.load(self.cache, allow_pickle=True) + + @staticmethod + def _classify_fields(fields, returned_H, H): + """Partition returned fields into three buckets for update_history_f. + + Returns + ------- + scalar_fields : list[str] + Fields whose per-row value is a scalar or object (can be assigned + with a single fancy-indexed write across all rows). + uniform_fields : list[str] + Fixed-shape array fields whose shape exactly matches H's storage + shape (can also be assigned in one fancy-indexed write). + ragged_fields : list[str] + Fixed-shape array fields that are *smaller* than H's storage shape + (need per-row slice assignment). + """ + scalar_fields = [] + uniform_fields = [] + ragged_fields = [] + for field in fields: + if field in protected_libE_fields: + continue + dt = returned_H.dtype[field] + if dt.shape == () or dt.hasobject: + scalar_fields.append(field) + else: + # Compare element shape: returned vs H's allocated shape + h_shape = H.dtype[field].shape + r_shape = dt.shape + if r_shape == h_shape: + uniform_fields.append(field) + else: + ragged_fields.append(field) + return scalar_fields, uniform_fields, ragged_fields + def update_history_f(self, D: dict, kill_canceled_sims: bool = False) -> None: """ Updates the history after points have been evaluated @@ -135,30 +282,42 @@ def update_history_f(self, D: dict, kill_canceled_sims: bool = False) -> None: if returned_H is not None and any([field not in self.H.dtype.names for field in returned_H.dtype.names]): self._append_new_fields(returned_H) - for j, ind in enumerate(new_inds): + if self.safe_mode: for field in fields: - if field in protected_libE_fields: - if self.safe_mode: - assert False, "The field '" + field + "' is protected" - continue + assert field not in protected_libE_fields, "The field '" + field + "' is protected" - if np.isscalar(returned_H[field][j]) or returned_H.dtype[field].hasobject: - self.H[field][ind] = returned_H[field][j] - else: - # len or np.size + new_inds = np.asarray(new_inds) + + if fields and returned_H is not None: + scalar_fields, uniform_fields, ragged_fields = self._classify_fields(fields, returned_H, self.H) + + # Vectorized assignment for scalar and object fields (one op per field) + for field in scalar_fields: + self.H[field][new_inds] = returned_H[field] + + # Vectorized assignment for fixed-shape array fields that exactly match H's shape + for field in uniform_fields: + self.H[field][new_inds] = returned_H[field] + + # Per-row loop only for ragged (partial-fill) array fields + for j, ind in enumerate(new_inds): + for field in ragged_fields: H0_size = len(returned_H[field][j]) assert H0_size <= len(self.H[field][ind]), ( "History update Error: Too many values received for " + field ) assert H0_size, "History update Error: No values in this field " + field - if H0_size == len(self.H[field][ind]): - self.H[field][ind] = returned_H[field][j] # ref - else: - self.H[field][ind][:H0_size] = returned_H[field][j] # Slice View + self.H[field][ind][:H0_size] = returned_H[field][j] + + # Batch-update bookkeeping fields for all returned rows at once + t = time.time() + self.H["sim_ended"][new_inds] = True + self.H["sim_ended_time"][new_inds] = t + self.sim_ended_count += len(new_inds) - self.H["sim_ended"][ind] = True - self.H["sim_ended_time"][ind] = time.time() - self.sim_ended_count += 1 + if self.use_cache: + for ind in new_inds: + self._shelf_longrunning_sims(ind) if kill_canceled_sims: for j in range(self.last_ended + 1, np.max(new_inds) + 1): @@ -205,7 +364,7 @@ def update_history_to_gen(self, q_inds: npt.NDArray): if self.using_H0 and not self.given_back_warned: logger.manager_warning( # type: ignore[attr-defined] - "Giving entries in H0 back to gen. Marking entries in " + "H0 as 'gen_informed' if 'sim_ended'." + "Giving entries in H0 back to gen. Marking entries in H0 as 'gen_informed' if 'sim_ended'." ) self.given_back_warned = True diff --git a/libensemble/libE.py b/libensemble/libE.py index 219e2cd8c..4e1b3f5f5 100644 --- a/libensemble/libE.py +++ b/libensemble/libE.py @@ -137,7 +137,7 @@ from libensemble.tools.alloc_support import AllocSupport from libensemble.tools.tools import _USER_SIM_ID_WARNING from libensemble.utils import launcher -from libensemble.utils.misc import specs_dump +from libensemble.utils.misc import compute_config_hash, specs_dump from libensemble.utils.timer import Timer from libensemble.version import __version__ from libensemble.worker import worker_main @@ -235,12 +235,27 @@ def libE( exit_criteria=exit_criteria, ) + # Compute a deterministic hash of the full configuration for cache integrity. + spec_hash = compute_config_hash( + sim_specs=ensemble.sim_specs, + gen_specs=ensemble.gen_specs, + alloc_specs=ensemble.alloc_specs, + libE_specs=ensemble.libE_specs, + exit_criteria=ensemble.exit_criteria, + H0=H0, + ) + (sim_specs, gen_specs, alloc_specs, libE_specs) = [ specs_dump(spec, by_alias=True) for spec in [ensemble.sim_specs, ensemble.gen_specs, ensemble.alloc_specs, ensemble.libE_specs] ] exit_criteria = specs_dump(ensemble.exit_criteria, by_alias=True, exclude_none=True) + # Inject spec hash and auto-generate cache name when not explicitly provided. + libE_specs["_spec_hash"] = spec_hash + if libE_specs.get("cache_long_sims") and not libE_specs.get("cache_name"): + libE_specs["cache_name"] = f".libe_cache_{spec_hash[:16]}" + # Restore objects that don't survive serialization via model_dump if hasattr(ensemble.sim_specs, "simulator") and ensemble.sim_specs.simulator is not None: sim_specs["simulator"] = ensemble.sim_specs.simulator diff --git a/libensemble/manager.py b/libensemble/manager.py index 7995d2da9..cffc71872 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -20,8 +20,10 @@ from numpy.lib.recfunctions import repack_fields from libensemble.comms.comms import CommFinishedException, QCommThread +from libensemble.comms.logs import LogConfig from libensemble.executors.executor import Executor from libensemble.message_numbers import ( + CACHE_RETRIEVE, EVAL_GEN_TAG, EVAL_SIM_TAG, FINISHED_PERSISTENT_GEN_TAG, @@ -30,6 +32,7 @@ MAN_SIGNAL_KILL, PERSIS_STOP, STOP_TAG, + calc_status_strings, calc_type_strings, ) from libensemble.resources.resources import Resources @@ -38,7 +41,7 @@ from libensemble.utils.misc import _WorkerIndexer, extract_H_ranges from libensemble.utils.output_directory import EnsembleDirectory from libensemble.utils.timer import Timer -from libensemble.worker import WorkerErrMsg, worker_main +from libensemble.worker import Worker, WorkerErrMsg, worker_main logger = logging.getLogger(__name__) # For debug messages - uncomment @@ -204,6 +207,7 @@ def __init__( timer.start() self.date_start = timer.date_start.replace(" ", "_") self.safe_mode = libE_specs.get("safe_mode") + self.use_cache = libE_specs.get("cache_long_sims") self.kill_canceled_sims = libE_specs.get("kill_canceled_sims") self.hist = hist self.hist.safe_mode = self.safe_mode @@ -217,6 +221,14 @@ def __init__( self.WorkerExc = False self.persis_pending: list[int] = [] self.live_data = libE_specs.get("live_data") + if self.use_cache: + self.hist.init_cache( + self.libE_specs.get("cache_name"), + self.libE_specs.get("cache_dir"), + spec_hash=self.libE_specs.get("_spec_hash"), + ) + self.from_cache: Any = None + self.cache_hit = False dyn_keys = ("resource_sets", "num_procs", "num_gpus") dyn_keys_in_H = any(k in self.hist.H.dtype.names for k in dyn_keys) @@ -244,7 +256,7 @@ def __init__( self.wcomms = [local_worker_comm] + self.wcomms self.W = _WorkerIndexer(self.W, 1 - gen_on_worker) # if gen on worker, then no additional worker - self.wcomms = _WorkerIndexer(self.wcomms, 1 - gen_on_worker) + self.wcomms = _WorkerIndexer(self.wcomms, 1 - gen_on_worker) # type: ignore[assignment] temp_EnsembleDirectory = EnsembleDirectory(libE_specs=libE_specs) self.resources = Resources.resources @@ -287,7 +299,7 @@ def term_test_stop_val(self, stop_val: Any) -> bool: """Checks against stop value criterion""" key, val = stop_val H = self.hist.H - return np.any(filter_nans(H[key][H["sim_ended"]]) <= val) + return bool(np.any(filter_nans(H[key][H["sim_ended"]]) <= val)) def term_test(self, logged: bool = True) -> bool | int: """Checks termination criteria""" @@ -416,27 +428,132 @@ def _ensure_sim_id_in_persis_in(self, D: npt.NDArray) -> None: if "sim_id" not in self.gen_specs["persis_in"]: self.gen_specs["persis_in"].append("sim_id") + def _find_cache_match(self, work_row: int, cache: npt.NDArray, new_dtype: np.dtype) -> int: + """Return the index of the first cache row that matches the work row for all outbound sim fields. + + Vectorizes the comparison across the full cache axis per field, short-circuiting as soon + as any field eliminates all remaining candidates. Returns -1 when no match is found. + """ + mask = np.ones(len(cache), dtype=bool) + for f in np.dtype(new_dtype).names: + cf = cache[f] + hf = self.hist.H[f][work_row] + try: + if cf.ndim == 1: + mask &= np.isclose(cf, hf) + else: + # multi-dim field: reduce over all axes except the cache axis + mask &= np.all(np.isclose(cf, hf), axis=tuple(range(1, cf.ndim))) + except (TypeError, ValueError): + # object or non-numeric dtype: fall back to element-wise equality + mask &= np.array([c == hf for c in cf]) + if not mask.any(): + return -1 + return int(np.argmax(mask)) if mask.any() else -1 + + def _update_local_entry_from_cache( + self, cache_row: npt.NDArray, work_row: int, new_dtype: np.dtype, w: int, dtype_with_idx: np.dtype + ) -> None: + """Updates the local `from_cache` record with the cache row""" + from_cache_entry = np.empty(1, dtype=dtype_with_idx) + from_cache_entry["H_row"] = work_row # log this for later checking if outbound rows are already cached + from_cache_entry["worker_id"] = w # used to simulate the worker sending back work that actually came from cache + for field in np.dtype(new_dtype).names: # we now only do this since all outbound fields were close + from_cache_entry[field] = cache_row[field] + self.from_cache[-1] = from_cache_entry # the local record was already appended + self.cache_hit = True + + def _cache_scan( + self, cache: npt.NDArray, Work: dict, w: int, dtype_with_idx: np.dtype, new_dtype: np.dtype + ) -> None: + """ + Check if any work rows are in the cache, and if so, call _update_local_entry_from_cache + to update the local `from_cache` record. + + Each H_row in the work order is compared against the full cache array in a vectorized + manner (one NumPy operation per field across all cache rows) rather than iterating over + cache rows one at a time. A set tracks already-matched rows for O(1) membership tests. + """ + self.cache_timer = Timer() + # worker_id == 0 means an uninitialised (blank) slot; filter those out. + # H_row 0 is a valid row index so we cannot use >= 0 as the sentinel. + seen_rows: set[int] = set(self.from_cache["H_row"][self.from_cache["worker_id"] > 0]) + with self.cache_timer: + for work_row in Work["libE_info"]["H_rows"]: # used to compare H entries against the cache + if work_row in seen_rows: + continue + match_idx = self._find_cache_match(work_row, cache, new_dtype) + if match_idx >= 0: + self._update_local_entry_from_cache(cache[match_idx], work_row, new_dtype, w, dtype_with_idx) + seen_rows.add(work_row) + + def _update_state_from_cache(self, Work: dict, work_rows: npt.NDArray, w: int, new_dtype: np.dtype) -> None: + """Retrieve saved cache from history, create local record-array qof matching cache entries. + + The `from_cache` local record contains cache entries and the workerID and H_rows they are associated with, had + they been sent to a worker. + + Cache entries *must* be associated with the preempted outbound worker and H_rows because those values + are always associated with actual inbound results. Later on, when we iterate over the cache for entries that + could've been sent to a worker (but weren't), we'll process that entry as though it came from that worker, + with those H_rows. + """ + + cache = self.hist.get_shelved_sims() + + # our local record resembles the cache, but additionally with the worker_id and H_row from the alloc_f + dtype_with_idx = np.dtype(cache.dtype.descr + np.dtype([("H_row", int), ("worker_id", int)]).descr) + + # initialize or grow the local record, then call _cache_scan to fill it + if self.from_cache is None: + self.from_cache = np.zeros(len(work_rows), dtype=dtype_with_idx) + else: + self.from_cache = np.append(self.from_cache, np.zeros(len(work_rows), dtype=dtype_with_idx)) + + # populates the local record + self._cache_scan(cache, Work, w, dtype_with_idx, new_dtype) + def _send_work_order(self, Work: dict, w: int) -> None: """Sends an allocation function order to a worker""" logger.debug(f"Manager sending work unit to worker {w}") + work_rows = Work["libE_info"]["H_rows"] + new_dtype = [(name, self.hist.H.dtype.fields[name][0]) for name in Work["H_fields"]] + + if self.use_cache and Work["tag"] == EVAL_SIM_TAG and len(work_rows) and self.hist.cache_set: + self._update_state_from_cache(Work, work_rows, w, np.dtype(new_dtype)) + if self.resources: self._set_resources(Work, w) - self.wcomms[w].send(Work["tag"], Work) - if Work["tag"] == EVAL_GEN_TAG: self.W[w]["gen_started_time"] = time.time() + self.wcomms[w].send(Work["tag"], Work) - work_rows = Work["libE_info"]["H_rows"] work_name = calc_type_strings[Work["tag"]] - logger.debug(f"Manager sending {work_name} work to worker {w}. Rows {extract_H_ranges(Work) or None}") + if self.cache_hit: + logger.debug( + f"Manager retrieved {work_name} work for worker {w} from cache. Rows {extract_H_ranges(Work) or None}" + ) + else: + logger.debug(f"Manager sending {work_name} work to worker {w}. Rows {extract_H_ranges(Work) or None}") + if len(work_rows): - new_dtype = [(name, self.hist.H.dtype.fields[name][0]) for name in Work["H_fields"]] + + if self.cache_hit: + work_rows = [row for row in work_rows if row not in self.from_cache["H_row"]] + if ( + all([i in self.from_cache["H_row"] for i in work_rows]) and Work["tag"] == EVAL_SIM_TAG + ): # if all rows in work_rows are found in cache + logger.debug("Manager skipping sending *all* work to worker %s due to cache", w) + return + H_to_be_sent = np.empty(len(work_rows), dtype=new_dtype) for i, row in enumerate(work_rows): H_to_be_sent[i] = repack_fields(self.hist.H[Work["H_fields"]][row]) + if Work["tag"] in [EVAL_SIM_TAG, PERSIS_STOP]: # inclusion of PERSIS_STOP for final_gen_send + self.wcomms[w].send(Work["tag"], Work) self.wcomms[w].send(0, H_to_be_sent) def _update_state_on_alloc(self, Work: dict, w: int): @@ -458,13 +575,34 @@ def _update_state_on_alloc(self, Work: dict, w: int): # --- Handle incoming messages from workers - def _receive_from_workers(self, persis_info: dict) -> dict: - """Receives calculation output from workers. Loops over all + def _receive_from_workers_or_cache(self, persis_info: dict) -> dict: + """ + Two stage process of handling either: + 1. Messages that could've been sent to a worker, but are already in the cache. + 2. Messages that have been sent by a worker. + + 1. + If the cache is not empty, the cache is scanned for messages that could've been sent. + Messages are processed as though they came from their corresponding worker. The local + record of the cache is then cleared to prevent duplicate processing. + + 2. + Receives calculation output from workers. Loops over all active workers and probes to see if worker is ready to communticate. If any output is received, all other workers are looped back over. """ time.sleep(0.0001) # Critical for multiprocessing performance + + # Process messages from the cache + if self.cache_hit: + self.cache_hit = False + for w in self.from_cache["worker_id"]: + if w > 0: # actual cache entry - not blank. assuming w0 gets no sim work + self._handle_msg_from_worker(persis_info, w, process_cache=True) + self.from_cache = None + + # Process messages from workers new_stuff = True while new_stuff: new_stuff = False @@ -533,11 +671,34 @@ def _update_state_on_worker_msg(self, persis_info: dict, D_recv: dict, w: int) - if D_recv.get("persis_info"): persis_info.setdefault(int(w), {}).update(D_recv["persis_info"]) - def _handle_msg_from_worker(self, persis_info: dict, w: int) -> None: - """Handles a message from worker w""" + def _create_simulated_D_recv(self, w: int) -> dict: + """Create a simulated worker message containing the cache entry instead of a message from a worker.""" + + cache_entry_by_worker = self.from_cache[self.from_cache["worker_id"] == w] + D_recv = { + "calc_out": cache_entry_by_worker[[name[0] for name in self.sim_specs["out"]]], + "libE_info": { + "H_rows": cache_entry_by_worker["H_row"], + "workerID": w, + }, + "calc_status": CACHE_RETRIEVE, + "calc_type": 1, + } + return D_recv + + def _handle_msg_from_worker(self, persis_info: dict, w: int, process_cache: bool = False) -> None: + """Handles a message from worker w. + + If processing from the cache, create a simulated worker message containing + the cache entry. + """ try: - msg = self.wcomms[w].recv() - tag, D_recv = msg + if process_cache: + D_recv = self._create_simulated_D_recv(w) + enum_desc, calc_id = Worker._extract_debug_data(1, D_recv) + else: + msg = self.wcomms[w].recv() + tag, D_recv = msg except CommFinishedException: logger.debug(f"Finalizing message from Worker {w}") return @@ -552,7 +713,13 @@ def _handle_msg_from_worker(self, persis_info: dict, w: int) -> None: logger.vdebug(f"Manager received a log message from worker {w}") # type: ignore[attr-defined] logging.getLogger(D_recv.name).handle(D_recv) else: - logger.debug(f"Manager received data message from worker {w}") + if process_cache: + logger.debug(f"Manager retrieved cached message redirected from worker {w}") + calc_msg = f"""{enum_desc} {calc_id}: {"sim"} {self.cache_timer}""" + calc_msg += f" Status: {calc_status_strings[CACHE_RETRIEVE]}" + logging.getLogger(LogConfig.config.stats_name).info(calc_msg) # libE_stats + else: + logger.debug(f"Manager received data message from worker {w}") self._update_state_on_worker_msg(persis_info, D_recv, w) def _kill_cancelled_sims(self) -> None: @@ -613,7 +780,7 @@ def _final_receive_and_kill(self, persis_info: dict) -> tuple[dict, int, int]: exit_flag = 0 while (any(self.W["active"]) or any(self.W["persis_state"])) and exit_flag == 0: - persis_info = self._receive_from_workers(persis_info) + persis_info = self._receive_from_workers_or_cache(persis_info) if self.term_test(logged=False) == 2: # Elapsed Wallclock has expired if not any(self.W["persis_state"]): @@ -703,7 +870,7 @@ def run(self, persis_info: dict) -> tuple[dict, int, int]: try: while not self.term_test(): self._kill_cancelled_sims() - persis_info = self._receive_from_workers(persis_info) + persis_info = self._receive_from_workers_or_cache(persis_info) Work, persis_info, flag = self._alloc_work(self.hist.trim_H(), persis_info) if flag: break @@ -724,6 +891,7 @@ def run(self, persis_info: dict) -> tuple[dict, int, int]: logger.error(traceback.format_exc()) raise LoggedException(e.args) from None finally: + self.hist.save_cache() # Return persis_info, exit_flag, elapsed time result = self._final_receive_and_kill(persis_info) self.wcomms = [] diff --git a/libensemble/message_numbers.py b/libensemble/message_numbers.py index 0ecc7092e..46ff8c987 100644 --- a/libensemble/message_numbers.py +++ b/libensemble/message_numbers.py @@ -41,6 +41,8 @@ WORKER_DONE = 35 # Calculation was successful # last_calc_status_rst_tag CALC_EXCEPTION = 36 # Reserved: Automatically used if user_f raised an exception +CACHE_RETRIEVE = 40 # Manager retrieved sim from cache + MAN_KILL_SIGNALS = [MAN_SIGNAL_FINISH, MAN_SIGNAL_KILL] @@ -57,6 +59,7 @@ TASK_FAILED_TO_START: "Task Failed to start", WORKER_DONE: "Completed", CALC_EXCEPTION: "Exception occurred", + CACHE_RETRIEVE: "Retrieved from cache", None: "Unknown Status", } # last_calc_status_string_rst_tag diff --git a/libensemble/specs.py b/libensemble/specs.py index 9ee04baa3..f33c961ca 100644 --- a/libensemble/specs.py +++ b/libensemble/specs.py @@ -560,6 +560,31 @@ class LibeSpecs(BaseModel): Forms the base of a generator directory. """ + cache_long_sims: bool | None = False + """ + Cache simulation results with runtimes >1s to disk. Subsequent runs with an + identical configuration (specs, callables, H0) will access this cache. + + Upon the generator creating points already in the cache, those points will be skipped from + being sent for evaluation. Instead the corresponding cached results are retrieved and returned + to the generator. + + The cache is saved in ``cache_dir``. When ``cache_name`` is ``None``, the filename is + automatically derived from a SHA-256 hash of the full ensemble configuration. + """ + + cache_dir: str | Path | None = str(Path.home() / ".cache" / "libensemble") + """ + The directory to store the cache file. Defaults to `~/.cache/libensemble`. + """ + + cache_name: str | None = None + """ + The name of the cache file. Stored in cache_dir. + When ``None`` and ``cache_long_sims`` is ``True``, a name is automatically + derived from a SHA-256 hash of the full ensemble configuration. + """ + calc_dir_id_width: int | None = 4 """ The width of the numerical ID component of a calculation directory name. Leading diff --git a/libensemble/tests/functionality_tests/test_cache_sims.py b/libensemble/tests/functionality_tests/test_cache_sims.py new file mode 100644 index 000000000..d93907203 --- /dev/null +++ b/libensemble/tests/functionality_tests/test_cache_sims.py @@ -0,0 +1,73 @@ +""" +Runs libEnsemble with Latin hypercube sampling on a simple 1D problem + +Execute via one of the following commands (e.g. 3 workers): + mpiexec -np 4 python test_1d_sampling.py + python test_1d_sampling.py --nworkers 3 + python test_1d_sampling.py --nworkers 3 --comms tcp + +The number of concurrent evaluations of the objective function will be 4-1=3. +""" + +# Do not change these lines - they are parsed by run-tests.sh +# TESTSUITE_COMMS: mpi local +# TESTSUITE_NPROCS: 2 4 + +import time + +import numpy as np + +from libensemble.alloc_funcs.give_sim_work_first import give_sim_work_first +from libensemble.gen_funcs.sampling import latin_hypercube_sample as gen_f + +# Import libEnsemble items for this test +from libensemble.libE import libE +from libensemble.tools import parse_args, save_libE_output + + +def sim_f(In): + Out = np.zeros(1, dtype=[("f", float)]) + time.sleep(1.1) + Out["f"] = np.linalg.norm(In) + return Out + + +if __name__ == "__main__": + nworkers, is_manager, libE_specs, _ = parse_args() + libE_specs["cache_long_sims"] = True + libE_specs["cache_dir"] = "." + + sim_specs = { + "sim_f": sim_f, + "in": ["x"], + "out": [("f", float)], + } + + gen_specs = { + "gen_f": gen_f, + "out": [("x", float, (1,))], + "batch_size": 10, + "user": { + "lb": np.array([-3]), + "ub": np.array([3]), + "gen_seed": 42, + }, + } + + alloc_specs = {"alloc_f": give_sim_work_first} + + exit_criteria = {"sim_max": 11} + + H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, alloc_specs=alloc_specs, libE_specs=libE_specs) + + if is_manager: + assert len(H) >= 11 + print("\nlibEnsemble with random sampling has generated enough points") + save_libE_output(H, persis_info, __file__, nworkers) + + H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, alloc_specs=alloc_specs, libE_specs=libE_specs) + + if is_manager: + # better way of seeing "long" sims not actually taking so long (because of cache?) + durations = H["sim_ended_time"] - H["sim_started_time"] + assert any((durations < 1.1) & (durations != -np.inf)) diff --git a/libensemble/tests/functionality_tests/test_executor_hworld_pass_fail.py b/libensemble/tests/functionality_tests/test_executor_hworld_pass_fail.py index a65462e0d..543e17355 100644 --- a/libensemble/tests/functionality_tests/test_executor_hworld_pass_fail.py +++ b/libensemble/tests/functionality_tests/test_executor_hworld_pass_fail.py @@ -56,6 +56,9 @@ if is_manager: print(f"\nCores req: {cores_all_tasks} Cores avail: {logical_cores}\n {mess_resources}\n") + libE_specs["cache_long_sims"] = True + libE_specs["cache_name"] = "executor_hworld_" + str(nworkers) + "_" + libE_specs.get("comms") + sim_app = "./my_simtask.x" if not os.path.isfile(sim_app): build_simfunc() @@ -104,7 +107,7 @@ # For debug print(f"Expecting: {calc_status_list}") - print("Received: {H['cstat']}\n") + print(f"Received: {H['cstat']}\n") assert np.array_equal(H["cstat"], calc_status_list), "Error - unexpected calc status. Received: " + str( H["cstat"] diff --git a/libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py b/libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py index 390582143..cb2ae4d88 100644 --- a/libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py +++ b/libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py @@ -49,7 +49,6 @@ def six_hump_camel_func(x): # Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). if __name__ == "__main__": - workflow = Ensemble(parse_args=True) if workflow.is_manager: @@ -58,14 +57,23 @@ def six_hump_camel_func(x): n = 2 vocs = VOCS( - variables={"core": [-3, 3], "edge": [-2, 2], "core_on_cube": [0, 1], "edge_on_cube": [0, 1]}, + variables={ + "core": [-3, 3], + "edge": [-2, 2], + "core_on_cube": [0, 1], + "edge_on_cube": [0, 1], + }, objectives={"energy": "MINIMIZE"}, ) aposmm = APOSMM( vocs, max_active_runs=max(1, workflow.nworkers - 1), - variables_mapping={"x": ["core", "edge"], "x_on_cube": ["core_on_cube", "edge_on_cube"], "f": ["energy"]}, + variables_mapping={ + "x": ["core", "edge"], + "x_on_cube": ["core_on_cube", "edge_on_cube"], + "f": ["energy"], + }, initial_sample_size=100, sample_points=np.round(minima, 1), localopt_method="LN_BOBYQA", @@ -91,7 +99,7 @@ def six_hump_camel_func(x): print("[Manager]:", H[np.where(H["local_min"])]["x"]) print("[Manager]: Time taken =", time() - start_time, flush=True) - tol = 1e-5 + tol = 1e-4 for m in minima: # The minima are known on this test problem. # We use their values to test APOSMM has identified all minima diff --git a/libensemble/utils/misc.py b/libensemble/utils/misc.py index 6e3779910..91d92a8b9 100644 --- a/libensemble/utils/misc.py +++ b/libensemble/utils/misc.py @@ -2,6 +2,9 @@ Misc internal functions """ +import hashlib +import inspect +import json from itertools import chain, groupby from operator import itemgetter @@ -9,6 +12,119 @@ import numpy.typing as npt +def _get_callable_source(obj) -> str: + """Get source code for a function or callable object. + + Tries ``inspect.getsource`` on the object directly, then on its class. + Falls back to ``name.module`` when source is unavailable. + """ + if obj is None: + return "" + for target in (obj, type(obj)): + try: + return inspect.getsource(target) + except (TypeError, OSError): + continue + name = getattr(obj, "__name__", type(obj).__name__) + module = getattr(obj, "__module__", type(obj).__module__) + return f"{module}.{name}" + + +def compute_config_hash( + sim_specs, + gen_specs, + alloc_specs=None, + libE_specs=None, + exit_criteria=None, + H0=None, +) -> str: + """Compute a deterministic SHA-256 hash of the full ensemble configuration. + + All Pydantic spec models are dumped to stable dictionaries. Callables + (``sim_f``, ``gen_f``, ``simulator``, ``generator``, ``alloc_f``) are + replaced with their source code (or ``module.name`` fallback) so that + code changes invalidate the cache. ``H0`` data is also included. + + Parameters + ---------- + sim_specs : SimSpecs + gen_specs : GenSpecs | None + alloc_specs : AllocSpecs + libE_specs : LibeSpecs + exit_criteria : ExitCriteria + H0 : numpy.ndarray | None + + Returns + ------- + str + 64-character hex digest. + """ + spec_dicts: dict = {} + + def _dump(spec, **kwargs): + return spec.model_dump(**kwargs) + + def _dump_or_empty(spec, **kwargs): + return _dump(spec, **kwargs) if spec is not None else {} + + spec_dicts["sim"] = _dump(sim_specs, by_alias=True, exclude_none=True, exclude_defaults=True) + spec_dicts["gen"] = _dump_or_empty(gen_specs, by_alias=True, exclude_none=True, exclude_defaults=True) + spec_dicts["alloc"] = _dump_or_empty(alloc_specs, by_alias=True, exclude_none=True, exclude_defaults=True) + spec_dicts["exit"] = _dump_or_empty(exit_criteria, by_alias=True, exclude_none=True) + + libE_dict = _dump_or_empty(libE_specs, by_alias=True, exclude_none=True, exclude_defaults=True) + for key in ("cache_long_sims", "cache_dir", "cache_name"): + libE_dict.pop(key, None) + spec_dicts["libE"] = libE_dict + + # Hash callable sources, then strip raw objects so memory addresses + # don't leak into the JSON serialization. + _strip_raw_objects(spec_dicts) + _add_callable_sources(spec_dicts, sim_specs, gen_specs, alloc_specs) + + # Hash H0 data + if H0 is not None and len(H0): + spec_dicts["H0_hash"] = hashlib.sha256(H0.tobytes()).hexdigest() + + serialized = json.dumps(spec_dicts, sort_keys=True, default=str) + return hashlib.sha256(serialized.encode()).hexdigest() + + +_RAW_OBJECT_FIELDS = { + "sim": {"sim_f", "simulator", "vocs"}, + "gen": {"gen_f", "generator", "vocs"}, + "alloc": {"alloc_f"}, +} + + +def _strip_raw_objects(spec_dicts): + """Remove object-typed fields from dumped spec dicts. + + These are handled separately via source extraction to avoid + non-deterministic memory-address-based serialization. + """ + for key, fields in _RAW_OBJECT_FIELDS.items(): + if key in spec_dicts: + for field in fields: + spec_dicts[key].pop(field, None) + + +def _add_callable_sources(spec_dicts, sim_specs, gen_specs, alloc_specs): + """Extract source for callables and store them in spec_dicts.""" + for spec_name, spec, field in [ + ("sim", sim_specs, "sim_f"), + ("sim", sim_specs, "simulator"), + ("gen", gen_specs, "gen_f"), + ("gen", gen_specs, "generator"), + ("alloc", alloc_specs, "alloc_f"), + ]: + if spec is None: + continue + obj = getattr(spec, field, None) + if obj is not None: + spec_dicts[f"{spec_name}_source_{field}"] = _get_callable_source(obj) + + def extract_H_ranges(Work: dict) -> str: """Convert received H_rows into ranges for labeling""" work_H_rows = Work["libE_info"]["H_rows"] diff --git a/libensemble/worker.py b/libensemble/worker.py index c6ef5fcb3..0e2948a4a 100644 --- a/libensemble/worker.py +++ b/libensemble/worker.py @@ -229,7 +229,8 @@ def _set_resources(workerID, comm: Comm, libE_specs) -> bool: logger.debug(f"No resources set on worker {workerID}") return False - def _extract_debug_data(self, calc_type, Work): + @staticmethod + def _extract_debug_data(calc_type, Work): if calc_type == EVAL_SIM_TAG: enum_desc = "sim_id" calc_id = extract_H_ranges(Work) @@ -260,7 +261,7 @@ def _handle_calc(self, Work: dict, calc_in: npt.NDArray) -> tuple[npt.NDArray | calc_type = Work["tag"] self.calc_iter[calc_type] += 1 - enum_desc, calc_id = self._extract_debug_data(calc_type, Work) + enum_desc, calc_id = Worker._extract_debug_data(calc_type, Work) timer = Timer()