Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions realtime_decoder/encoder_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,12 @@ def __init__(self, comm, rank, config):
def send_joint_prob(self, dest, msg):
"""Send mark-position joint probability data"""

# `[msg, MPI.BYTE]` hands MPI the numpy buffer directly. The old
# `msg.tobytes()` allocated a fresh bytes per send, which on a
# tetrode array running 9 ntrodes can be hundreds of allocations
# per second. Wire format is unchanged.
self.comm.Send(
buf=msg.tobytes(),
buf=[msg, MPI.BYTE],
dest=dest,
tag=messages.MPIMessageTag.SPIKE_DECODE_DATA
)
Expand Down Expand Up @@ -87,6 +91,13 @@ def __init__(self, config, trode, pos_bin_struct):
self._occupancy_ct = 0
self._temp_idx = 0 # NOTE(DS): so that mark_idx does not increase but still write down in the mark vec

# Persistent scratch for get_joint_prob's in-cube filter. Sized
# to the full mark buffer; per-call we only touch the [:mark_idx]
# view. Avoids allocating a fresh bool array of length mark_idx
# on every spike, which at high firing rates is the dominant
# source of churn in the encoder hot path.
self._in_range_buf = np.empty(self._marks.shape[0], dtype=bool)

self._init_params()
def _load_model(self):

Expand Down Expand Up @@ -198,17 +209,23 @@ def get_joint_prob(self, mark):

#print(mark)

in_range = np.ones(mark_idx, dtype=bool)
# Reuse the preallocated in_range_buf. Slice to mark_idx and
# initialize to True; the loop ANDs each dimension's mask into
# the accumulator in place via out=.
in_range = self._in_range_buf[:mark_idx]
in_range.fill(True)
if self.p['use_filter']:
std = self.p['filter_std']
n_std = self.p['filter_n_std']
for ii in range(self._marks.shape[1]):
in_range = np.logical_and(
np.logical_and(
self._marks[:mark_idx, ii] > mark[ii] - n_std * std,
self._marks[:mark_idx, ii] < mark[ii] + n_std * std
),
in_range
col = self._marks[:mark_idx, ii]
lo = mark[ii] - n_std * std
hi = mark[ii] + n_std * std
# combine: in_range &= (lo < col) & (col < hi)
np.logical_and(
np.logical_and(col > lo, col < hi),
in_range,
out=in_range,
)

# not enough spikes within n-cube
Expand Down
5 changes: 4 additions & 1 deletion realtime_decoder/ripple_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ def send_lfp_timestamp(self, timestamp):
def send_ripple(self, dest, msg):
"""Send a ripple event message"""

# `[msg, MPI.BYTE]` hands MPI the numpy buffer directly. Avoids
# the per-send bytes copy the old `msg.tobytes()` allocated.
# Wire format unchanged.
self.comm.Send(
buf=msg.tobytes(),
buf=[msg, MPI.BYTE],
dest=dest,
tag=messages.MPIMessageTag.RIPPLE_DETECTION
)
Expand Down