Source code for pyoephys.interface._gui_events

import numpy as np


[docs] class Event(object): """ Represents an event received from a ZMQ Interface plugin """ event_types = {0: 'TIMESTAMP', 1: 'BUFFER_SIZE', 2: 'PARAMETER_CHANGE', 3: 'TTL', 4: 'SPIKE', 5: 'MESSAGE', 6: 'BINARY_MSG'} def __init__(self, _d, _data=None): self.type = None self.stream = '' self.sample_num = 0 self.source_node = 0 self.event_state = 0 self.event_line = 0 self.event_word = 0 self.numBytes = 0 self.data = b'' self.__dict__.update(_d) self.timestamp = None # noinspection PyTypeChecker self.type = Event.event_types[self.type] if _data: self.data = _data self.numBytes = len(_data) dfb = np.frombuffer(self.data, dtype=np.uint8) self.event_line = dfb[0] dfb = np.frombuffer(self.data, dtype=np.uint8, offset=1) self.event_state = dfb[0] dfb = np.frombuffer(self.data, dtype=np.uint64, offset=2) self.event_word = dfb[0] if self.type == 'TIMESTAMP': t = np.frombuffer(self.data, dtype=np.int64) self.timestamp = t[0]
[docs] def set_data(self, _data): """ Sets event data """ self.data = _data self.numBytes = len(_data)
def __str__(self): """Prints info about the event""" ds = self.__dict__.copy() del ds['data'] return str(ds)
[docs] class Spike(object): """ Represents a spike event received from a ZMQ Interface plugin """ def __init__(self, _d, _data=None): self.stream = '' self.source_node = 0 self.electrode = 0 self.sample_num = 0 self.num_channels = 0 self.num_samples = 0 self.sorted_id = 0 self.threshold = [] self.__dict__.update(_d) self.data = _data def __str__(self): ds = self.__dict__.copy() del ds['data'] return str(ds)