pyoephys.interface package

Hardware interfaces for ZMQ, LSL, and Open Ephys GUI.

class GUIClient(host='127.0.0.1')[source]

Bases: object

Wrapper around open_ephys.control.OpenEphysHTTPServer

start_acquisition(duration_sec=0)[source]
stop_acquisition()[source]
idle()[source]
status()[source]
start_record(duration_sec=0)[source]
set_recording_params(base_text=None, append_text=None, parent_dir=None)[source]
get_recording_info(key='')[source]
quit()[source]
load_config(path)[source]
clear_signal_chain()[source]
message(msg)[source]
close_gui()[source]
class Event(_d, _data=None)[source]

Bases: object

Represents an event received from a ZMQ Interface plugin

event_types = {0: 'TIMESTAMP', 1: 'BUFFER_SIZE', 2: 'PARAMETER_CHANGE', 3: 'TTL', 4: 'SPIKE', 5: 'MESSAGE', 6: 'BINARY_MSG'}
set_data(_data)[source]

Sets event data

class Spike(_d, _data=None)[source]

Bases: object

Represents a spike event received from a ZMQ Interface plugin

class ZMQClient(host_ip='127.0.0.1', data_port='5556', heartbeat_port=None, buffer_seconds=30.0, expected_channel_count=None, expected_channel_names=None, required_fraction=1.0, max_channels=256, auto_start=False, set_index_looping=True, align_to_header_index=False, fill_value=nan, verbose=False)[source]

Bases: object

Open Ephys–compatible ZMQ client with per-channel ring buffers (deque-based).

Parameters:
  • host_ip (str)

  • data_port (str)

  • heartbeat_port (Optional[str])

  • buffer_seconds (float)

  • expected_channel_count (Optional[int])

  • expected_channel_names (Optional[Iterable[str]])

  • required_fraction (float)

  • max_channels (int)

  • auto_start (bool)

  • set_index_looping (bool)

  • align_to_header_index (bool)

  • fill_value (float)

  • verbose (bool)

wait_for_expected_channels(timeout=15.0)[source]

Block until expected channels (by name) have been seen (per required_fraction).

Return type:

bool

Parameters:

timeout (float)

set_index_looping(enabled)[source]

If True, global_sample_index restarts at each playback loop; else stays monotonic.

Return type:

None

Parameters:

enabled (bool)

start()[source]
Return type:

None

stop(timeout=2.0)[source]
Return type:

None

Parameters:

timeout (float | None)

close()[source]
Return type:

None

set_channel_index(indices)[source]
Return type:

None

Parameters:

indices (Iterable[int])

property channel_names: List[str]
fs_estimate(n_last=2000)[source]
Return type:

float

Parameters:

n_last (int)

get_latest_window(window_ms=500)[source]
Return type:

ndarray

Parameters:

window_ms (int)

get_latest(n)[source]

Return the latest n samples and their absolute timestamps. Y: (C_selected, n), t: (n,)

Return type:

Tuple[ndarray, ndarray]

Parameters:

n (int)

latest()[source]

Return (t_rel, Y) like the old client:

  • t_rel: (M,) seconds ending at 0 ([-window, 0])

  • Y: (N_channels, M)

Uses self.N_samples as the window size.

Return type:

tuple[Optional[ndarray], Optional[ndarray]]

drain_new()[source]
Return type:

tuple[Optional[ndarray], Optional[ndarray]]

Return only NEW samples since last call:
  • t_abs: (K,) seconds since stream start (based on total_samples_written / fs)

  • Y_new: (N_channels, K)

exception NotReadyError[source]

Bases: RuntimeError

Raised when getters are called before required channels are ready.

class OEBinPlaybackClient(oebin_path, stream_name='EMG', block_size=128, loopback=False, enable_lsl=True, use_recorded_timestamps=True, speed_factor=1.0, verbose=False)[source]

Bases: object

Realtime playback of Open Ephys .oebin recordings with clean LSL timestamps.

Publishes chunks as (n_samples, n_channels) with one timestamp per row. Guarantees strictly increasing timestamps and correct data orientation.

Parameters:
  • oebin_path (str) – Path to a single .oebin file or a directory containing one.

  • stream_name (str) – LSL stream name to publish.

  • block_size (int) – Samples per chunk (controls latency).

  • loopback (bool) – If True, after the last sample is sent, restart from the beginning.

  • enable_lsl (bool) – Whether to publish over LSL. If False, only loop internally (for tests).

  • use_recorded_timestamps (bool) – If True, publish recorded timestamps instead of local_clock().

  • speed_factor (float) – 1.0 = real time. >1.0 faster, <1.0 slower (wall-time playback).

  • verbose (bool) – If True, emit INFO logs from this client.

start()[source]
Return type:

None

stop(timeout=2.0)[source]
Return type:

None

Parameters:

timeout (float | None)

close()[source]

Stop playback and release resources.

Return type:

None

is_done()[source]

Check if playback has reached the end of the recording, and not loopback.

Return type:

bool

is_running()[source]
Return type:

bool

get_latest_window(window_ms=500)[source]

Return the most recent window from the mirror buffer as (C, Nwin).

Return type:

ndarray

Parameters:

window_ms (int)

playback_cli(argv=None)[source]
Return type:

None

Parameters:

argv (Sequence[str] | None)

class LSLClient(stream_name=None, stream_type='EMG', timeout_s=5.0, buffer_seconds=30.0, pull_timeout=0.2, verbose=False)[source]

Bases: object

Subscribe to an LSL stream and keep a thread-safe ring buffer.

Parameters:
  • stream_name (Optional[str]) – Exact LSL stream name to match. If None, matches by type only.

  • stream_type (str) – LSL stream type to match (e.g., “EMG”).

  • timeout_s (float) – How long to wait for a matching stream on start().

  • buffer_seconds (float) – Size of the internal ring buffer in seconds.

  • pull_timeout (float) – Timeout per pull_chunk() in seconds.

  • verbose (bool)

start()[source]
Return type:

None

stop(timeout=2.0)[source]
Return type:

None

Parameters:

timeout (float | None)

get_latest(n)[source]

Return the latest n samples as (C×n, n_t). Raises if not ready.

Return type:

Tuple[ndarray, ndarray]

Parameters:

n (int)

get_latest_window(window_ms)[source]

Return the latest window_ms milliseconds of data as shape (C, samples).

Convenience wrapper around get_latest() that converts a duration in milliseconds to a sample count.

Return type:

ndarray

Parameters:

window_ms (int)

get_window(seconds)[source]

Return the last seconds of data.

Return type:

Tuple[ndarray, ndarray]

Parameters:

seconds (float)