diff --git a/realtime_decoder/encoder_process.py b/realtime_decoder/encoder_process.py index d2161b6..7b27ea7 100644 --- a/realtime_decoder/encoder_process.py +++ b/realtime_decoder/encoder_process.py @@ -38,8 +38,12 @@ def __init__(self, comm, rank, config): def send_joint_prob(self, dest, msg): """Send mark-position joint probability data""" + # `[msg, MPI.BYTE]` hands MPI the numpy buffer directly. The old + # `msg.tobytes()` allocated a fresh bytes per send, which on a + # tetrode array running 9 ntrodes can be hundreds of allocations + # per second. Wire format is unchanged. self.comm.Send( - buf=msg.tobytes(), + buf=[msg, MPI.BYTE], dest=dest, tag=messages.MPIMessageTag.SPIKE_DECODE_DATA ) @@ -87,6 +91,13 @@ def __init__(self, config, trode, pos_bin_struct): self._occupancy_ct = 0 self._temp_idx = 0 # NOTE(DS): so that mark_idx does not increase but still write down in the mark vec + # Persistent scratch for get_joint_prob's in-cube filter. Sized + # to the full mark buffer; per-call we only touch the [:mark_idx] + # view. Avoids allocating a fresh bool array of length mark_idx + # on every spike, which at high firing rates is the dominant + # source of churn in the encoder hot path. + self._in_range_buf = np.empty(self._marks.shape[0], dtype=bool) + self._init_params() def _load_model(self): @@ -198,17 +209,23 @@ def get_joint_prob(self, mark): #print(mark) - in_range = np.ones(mark_idx, dtype=bool) + # Reuse the preallocated in_range_buf. Slice to mark_idx and + # initialize to True; the loop ANDs each dimension's mask into + # the accumulator in place via out=. + in_range = self._in_range_buf[:mark_idx] + in_range.fill(True) if self.p['use_filter']: std = self.p['filter_std'] n_std = self.p['filter_n_std'] for ii in range(self._marks.shape[1]): - in_range = np.logical_and( - np.logical_and( - self._marks[:mark_idx, ii] > mark[ii] - n_std * std, - self._marks[:mark_idx, ii] < mark[ii] + n_std * std - ), - in_range + col = self._marks[:mark_idx, ii] + lo = mark[ii] - n_std * std + hi = mark[ii] + n_std * std + # combine: in_range &= (lo < col) & (col < hi) + np.logical_and( + np.logical_and(col > lo, col < hi), + in_range, + out=in_range, ) # not enough spikes within n-cube diff --git a/realtime_decoder/ripple_process.py b/realtime_decoder/ripple_process.py index dcc156e..d9de1bc 100644 --- a/realtime_decoder/ripple_process.py +++ b/realtime_decoder/ripple_process.py @@ -35,8 +35,11 @@ def send_lfp_timestamp(self, timestamp): def send_ripple(self, dest, msg): """Send a ripple event message""" + # `[msg, MPI.BYTE]` hands MPI the numpy buffer directly. Avoids + # the per-send bytes copy the old `msg.tobytes()` allocated. + # Wire format unchanged. self.comm.Send( - buf=msg.tobytes(), + buf=[msg, MPI.BYTE], dest=dest, tag=messages.MPIMessageTag.RIPPLE_DETECTION )