Realtime gesture prediction from streaming data
This example demonstrates how to use a pre-trained model to predict gestures in real-time from streaming data. The model is trained on EMG data and can classify different gestures based on the input features.
Requirements: - A pre-trained model file (e.g., model.keras) - A recent .rhd data file recorded with the Intan RHX system - A PCA model created from the training data - Normalization parameters from the training data
You’ll see how to: - Load a pre-trained model, PCA model, and normalization parameters - Connect to the RHX device and stream data - Preprocess the streamed data - Use the model to predict gestures in real-time
Example code:
"""
Example: Predict gestures in real-time using a pre-trained model.
"""
from intan.rhx_interface import IntanRHXDevice
from intan.io import load_labeled_file
import joblib
import tensorflow as tf
import numpy as np
import os
from intan.ml import EMGRealTimePredictor
# === Config (update as needed) ===
DATA_PATH = r"G:\Shared drives\NML_shared\DataShare\HDEMG Human Healthy\HD-EMG_Cuff\Jonathan"
FILE_PATH = os.path.join(DATA_PATH, r"2024_10_22\raw\RingFlexion_241022_144153\RingFlexion_241022_144153.rhd")
MODEL_PATH = os.path.join(DATA_PATH, "model.keras")
PCA_PATH = os.path.join(DATA_PATH, "training_data_pca.pkl")
NORM_PATH = os.path.join(DATA_PATH, "training_data_norm.npz")
LABEL_PATH = os.path.join(DATA_PATH, "training_data.npz")
NOTES_PATH = os.path.join(os.path.dirname(FILE_PATH), "notes.txt")
# === Load model, PCA, normalization ===
cue_df = load_labeled_file(NOTES_PATH)
model = tf.keras.models.load_model(MODEL_PATH)
pca = joblib.load(PCA_PATH)
norm = np.load(NORM_PATH)
label_names = np.load(LABEL_PATH, allow_pickle=True)["label_names"]
mean, std = norm["mean"], norm["std"]
std[std == 0] = 1
BUFFER_SEC = 1
WINDOW_MS = 250
STEP_SEC = 1
WINDOW_OFFSET_SAMPLES = 200 # <-- Offset from start of buffer (in samples)
device = IntanRHXDevice(num_channels=128, buffer_duration_sec=1)
device.enable_wide_channel(range(128))
device.start_streaming()
predictor = EMGRealTimePredictor(
device=device,
model=model,
pca=pca,
mean=mean,
std=std,
label_names=label_names,
cue_df=cue_df, # optional for ground-truth
buffer_sec=1,
window_ms=250,
step_sec=1,
window_offset_samples=200
)
predictor.run_prediction_loop(background=True)
try:
while True:
msg = predictor.get_message()
if msg:
print(msg)
except KeyboardInterrupt:
print("Stopping...")
predictor.stop()
finally:
device.stop_streaming()
device.close()
```