From b685d13f41240a83e5a6a02a59a886059d2defef Mon Sep 17 00:00:00 2001 From: jl33ai Date: Fri, 29 May 2026 14:33:33 -0700 Subject: [PATCH] preallocate encoder and ripple hot path scratch extends the same latency hygiene pass from the decoder PR to the encoder and ripple processes. encoder: - send_joint_prob: msg.tobytes() -> [msg, MPI.BYTE]. on a 9 ntrode rig running ~20 spikes/sec/tetrode that's ~180 bytes-allocs/sec, more during bursts. - Encoder.get_joint_prob: the per-spike in_range bool array (length = current mark count, up to bufsize=18001) was allocated fresh on every call. now a single _in_range_buf is allocated once in __init__ and we operate on a [:mark_idx] view with .fill(True) + np.logical_and(..., out=). semantics preserved, verified against the original loop on random inputs. ripple: - send_ripple: same .tobytes() -> [msg, MPI.BYTE] change. didn't touch EnvelopeEstimator.add_new_data even though it has its own per-LFP-sample allocations (b*x products, sum, sqrt). that one needs to switch from returning fresh arrays to returning persistent buffers, which is a meaningful API change worth its own PR with its own review. no algorithmic change. all consumers of the rewritten paths still receive identically-shaped data. --- realtime_decoder/encoder_process.py | 33 ++++++++++++++++++++++------- realtime_decoder/ripple_process.py | 5 ++++- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/realtime_decoder/encoder_process.py b/realtime_decoder/encoder_process.py index d2161b6..7b27ea7 100644 --- a/realtime_decoder/encoder_process.py +++ b/realtime_decoder/encoder_process.py @@ -38,8 +38,12 @@ def __init__(self, comm, rank, config): def send_joint_prob(self, dest, msg): """Send mark-position joint probability data""" + # `[msg, MPI.BYTE]` hands MPI the numpy buffer directly. The old + # `msg.tobytes()` allocated a fresh bytes per send, which on a + # tetrode array running 9 ntrodes can be hundreds of allocations + # per second. Wire format is unchanged. self.comm.Send( - buf=msg.tobytes(), + buf=[msg, MPI.BYTE], dest=dest, tag=messages.MPIMessageTag.SPIKE_DECODE_DATA ) @@ -87,6 +91,13 @@ def __init__(self, config, trode, pos_bin_struct): self._occupancy_ct = 0 self._temp_idx = 0 # NOTE(DS): so that mark_idx does not increase but still write down in the mark vec + # Persistent scratch for get_joint_prob's in-cube filter. Sized + # to the full mark buffer; per-call we only touch the [:mark_idx] + # view. Avoids allocating a fresh bool array of length mark_idx + # on every spike, which at high firing rates is the dominant + # source of churn in the encoder hot path. + self._in_range_buf = np.empty(self._marks.shape[0], dtype=bool) + self._init_params() def _load_model(self): @@ -198,17 +209,23 @@ def get_joint_prob(self, mark): #print(mark) - in_range = np.ones(mark_idx, dtype=bool) + # Reuse the preallocated in_range_buf. Slice to mark_idx and + # initialize to True; the loop ANDs each dimension's mask into + # the accumulator in place via out=. + in_range = self._in_range_buf[:mark_idx] + in_range.fill(True) if self.p['use_filter']: std = self.p['filter_std'] n_std = self.p['filter_n_std'] for ii in range(self._marks.shape[1]): - in_range = np.logical_and( - np.logical_and( - self._marks[:mark_idx, ii] > mark[ii] - n_std * std, - self._marks[:mark_idx, ii] < mark[ii] + n_std * std - ), - in_range + col = self._marks[:mark_idx, ii] + lo = mark[ii] - n_std * std + hi = mark[ii] + n_std * std + # combine: in_range &= (lo < col) & (col < hi) + np.logical_and( + np.logical_and(col > lo, col < hi), + in_range, + out=in_range, ) # not enough spikes within n-cube diff --git a/realtime_decoder/ripple_process.py b/realtime_decoder/ripple_process.py index dcc156e..d9de1bc 100644 --- a/realtime_decoder/ripple_process.py +++ b/realtime_decoder/ripple_process.py @@ -35,8 +35,11 @@ def send_lfp_timestamp(self, timestamp): def send_ripple(self, dest, msg): """Send a ripple event message""" + # `[msg, MPI.BYTE]` hands MPI the numpy buffer directly. Avoids + # the per-send bytes copy the old `msg.tobytes()` allocated. + # Wire format unchanged. self.comm.Send( - buf=msg.tobytes(), + buf=[msg, MPI.BYTE], dest=dest, tag=messages.MPIMessageTag.RIPPLE_DETECTION )