Hardware Integration and Control¶
These examples demonstrate integration between Intan EMG systems and external hardware for closed-loop control, robotic applications, and multi-sensor systems.
3D-Printed Robotic Arm Control¶
Control a 3D-printed robotic hand using EMG-based gesture recognition. This example shows how to integrate the Intan system with a Raspberry Pi Pico-controlled servo system.
Note
Demo video available in the 3D printed arm README.
Hardware Requirements:
Raspberry Pi Pico (or Pico 2)
PCA9685 16-channel servo driver
5-6 servo motors
3D-printed InMoov hand components
5V power supply (servos)
Software Requirements:
CircuitPython on Pico
Python 3.10+ on host PC
PySerial library
Setup:
See the detailed build guide.
Architecture:
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Intan RHX │ USB │ Host PC │ Serial │ Pico + PCA │
│ Device │────────▶│ + Python │────────▶│ 9685 │
└─────────────┘ └──────────────┘ └─────────────┘
│ │
│ EMG Processing │ Servo Control
│ Gesture Recognition │
▼ ▼
[Predict Gesture] [Move Fingers]
Host-side control script:
"""
EMG-controlled robotic hand
Requires: Pico running code.py from examples/3D_printed_arm_control/
"""
from intan.interface import IntanRHXDevice
from intan.ml import EMGRealTimePredictor
import serial
import tensorflow as tf
# Initialize Intan device
device = IntanRHXDevice(num_channels=64)
device.enable_wide_channel(range(64))
device.start_streaming()
# Load gesture model
model = tf.keras.models.load_model('gesture_model.keras')
# Connect to Pico via serial
pico = serial.Serial('COM3', 115200, timeout=1) # Adjust port
# Gesture-to-command mapping
GESTURE_COMMANDS = {
'rest': 'rest',
'flex': 'grip',
'extend': 'open',
'pinch': 'pinch',
'point': 'point',
}
# Real-time predictor
predictor = EMGRealTimePredictor(
device=device,
model=model,
pca=pca,
mean=mean,
std=std,
label_names=list(GESTURE_COMMANDS.keys()),
confidence_threshold=0.8
)
print("System ready. Perform gestures to control robot.")
try:
predictor.run_prediction_loop()
while True:
prediction = predictor.get_prediction()
if prediction:
gesture = prediction['label']
confidence = prediction['confidence']
if gesture in GESTURE_COMMANDS:
command = GESTURE_COMMANDS[gesture]
pico.write(f"{command}\\n".encode())
print(f"[{confidence:.1%}] {gesture} → {command}")
except KeyboardInterrupt:
print("Stopping...")
finally:
pico.write(b"rest\\n")
pico.close()
device.close()
Pico firmware (CircuitPython):
"""
Raspberry Pi Pico servo controller
File: examples/3D_printed_arm_control/code.py
"""
import board
import busio
from adafruit_servokit import ServoKit
import supervisor
# Initialize PCA9685
i2c = busio.I2C(board.GP1, board.GP0)
kit = ServoKit(channels=16, i2c=i2c)
# Servo channel assignments
THUMB = 0
INDEX = 1
MIDDLE = 2
RING = 3
PINKY = 4
WRIST = 5
# Define gestures as servo angles
GESTURES = {
'rest': [90, 90, 90, 90, 90, 90],
'grip': [170, 170, 170, 170, 170, 90],
'open': [10, 10, 10, 10, 10, 90],
'pinch': [170, 170, 10, 10, 10, 90],
'point': [170, 10, 170, 170, 170, 90],
}
def move_to_gesture(gesture_name):
if gesture_name in GESTURES:
angles = GESTURES[gesture_name]
for servo_idx, angle in enumerate(angles):
kit.servo[servo_idx].angle = angle
# Main loop
print("Robotic hand ready")
while True:
if supervisor.runtime.serial_bytes_available:
command = input().strip()
print(f"Received: {command}")
move_to_gesture(command)
Available gestures:
rest: Neutral positiongrip: Close all fingersopen: Open all fingerspinch: Thumb + index fingerpoint: Extend index fingerflex: Individual finger controlsupinate/pronate: Wrist rotation
RHX Device Interface Examples¶
Low-level examples for interfacing with Intan RHX hardware.
Connecting to device:
"""
Basic device connection and configuration
Script: examples/interface/rhx_connect_to_device.py
"""
from intan.interface import IntanRHXDevice
# Connect to RHX TCP server
device = IntanRHXDevice(
host='127.0.0.1',
command_port=5000,
waveform_port=5001
)
print(f"Connected: {device.connected}")
print(f"Sample rate: {device.get_sample_rate()} Hz")
print(f"Available channels: {device.get_num_channels()}")
# Enable specific channels
device.enable_wide_channel(range(0, 64))
# Start streaming
device.start_streaming()
# Stream 1 second of data
timestamps, data = device.stream(duration_sec=1.0)
print(f"Streamed data shape: {data.shape}")
device.close()
Multi-channel streaming plot:
# Real-time plot of multiple channels
python examples/RHXDevice/multichannel_stream_plot.py
Recording demo:
# Record streaming data to file
python examples/RHXDevice/record_demo.py --duration 60 --output recording.npz
TCP benchmark:
# Test TCP streaming performance
python examples/RHXDevice/tcp_benchmark.py --channels 128 --duration 10
Comparing file vs stream:
"""
Verify streaming data matches recorded file
Script: examples/RHXDevice/compare_rhd_vs_stream.py
"""
from intan.io import load_rhd_file
from intan.interface import IntanRHXDevice
import numpy as np
# Stream live data
device = IntanRHXDevice()
device.enable_wide_channel([15]) # Channel 15
device.start_streaming()
_, stream_data = device.stream(duration_sec=2.0)
device.close()
# Load same data from file
result = load_rhd_file('recording.rhd')
file_data = result['amplifier_data'][15, :len(stream_data[0])]
# Compare
correlation = np.corrcoef(stream_data[0], file_data)[0, 1]
print(f"Correlation: {correlation:.4f}")
# Should be >0.99 if synchronized properly
Scrolling live plot:
"""
Real-time scrolling plot
Script: examples/RHXDevice/scrolling_live.py
"""
from intan.interface import IntanRHXDevice
from intan.plotting import RealtimePlotter
import numpy as np
device = IntanRHXDevice(num_channels=8)
device.enable_wide_channel(range(8))
device.start_streaming()
plotter = RealtimePlotter(
n_channels=8,
sample_rate=4000,
window_sec=2.0,
update_interval_ms=50
)
try:
plotter.start()
buffer_size = 8000
buffer = np.zeros((8, buffer_size))
while True:
_, data = device.stream(n_frames=200)
buffer = np.roll(buffer, -200, axis=1)
buffer[:, -200:] = data
plotter.update(buffer)
except KeyboardInterrupt:
print("Stopping...")
finally:
plotter.stop()
device.close()
Raspberry Pi Pico + IMU Integration¶
Combine Intan EMG with IMU (accelerometer/gyroscope) data from a Pico.
"""
Synchronized EMG + IMU recording
Script: examples/interface/rhx_emg_and_imu.py
"""
from intan.interface import IntanRHXDevice, PicoIMUClient
import numpy as np
# Connect to Intan
emg_device = IntanRHXDevice(num_channels=64)
emg_device.enable_wide_channel(range(64))
emg_device.start_streaming()
# Connect to Pico IMU
imu_client = PicoIMUClient(port='COM4', baud_rate=115200)
# Recording buffers
emg_buffer = []
imu_buffer = []
timestamps = []
print("Recording EMG + IMU... Press Ctrl+C to stop")
try:
while True:
# Get EMG data
ts, emg_data = emg_device.stream(n_frames=40)
emg_buffer.append(emg_data)
# Get IMU data (non-blocking)
imu_sample = imu_client.read_sample()
if imu_sample:
imu_buffer.append(imu_sample)
timestamps.append(ts)
except KeyboardInterrupt:
print("Stopping...")
finally:
# Save synchronized data
np.savez(
'emg_imu_recording.npz',
emg=np.concatenate(emg_buffer, axis=1),
imu=np.array(imu_buffer),
timestamps=np.concatenate(timestamps)
)
print(f"Saved EMG shape: {np.concatenate(emg_buffer, axis=1).shape}")
print(f"Saved IMU samples: {len(imu_buffer)}")
imu_client.close()
emg_device.close()
Pico IMU streaming script:
"""
Pico IMU data publisher
File: examples/interface/pico/code.py
"""
import board
import busio
import adafruit_mpu6050
import time
i2c = busio.I2C(board.GP1, board.GP0)
mpu = adafruit_mpu6050.MPU6050(i2c)
while True:
accel = mpu.acceleration
gyro = mpu.gyro
# Send as CSV over serial
print(f"{time.monotonic()},{accel[0]},{accel[1]},{accel[2]},"
f"{gyro[0]},{gyro[1]},{gyro[2]}")
time.sleep(0.01) # 100 Hz
Wireless EMG with RHD2164¶
Interface with RHD2164 wireless headstage via SPI.
"""
Wireless RHD2164 interface
Script: examples/RHD2164_wireless/code.py
"""
from intan.interface import RHD2164Interface
import time
# Initialize SPI interface (on Pico or similar)
rhd = RHD2164Interface(
spi_bus=0,
cs_pin=5,
sample_rate=20000
)
# Configure channels
rhd.configure_channels(
channels=range(64),
bandwidth=[0.1, 7500], # Hz
dsp_enabled=True
)
# Start acquisition
rhd.start()
try:
while True:
if rhd.data_available():
samples = rhd.read_samples(100) # Read 100 samples
# Process samples...
print(f"Read {len(samples)} samples")
time.sleep(0.01)
except KeyboardInterrupt:
rhd.stop()
TCP Command Interface¶
Low-level TCP control examples.
"""
Send commands to RHX via TCP
Script: examples/intan_tcp/RHXRunAndStimulateDemo.py
"""
from intan.interface import IntanTCPClient
client = IntanTCPClient()
# Query device info
sample_rate = client.execute_command('get sampleratehertz')
print(f"Sample rate: {sample_rate} Hz")
# Configure stimulation
client.execute_command('set a-010.stimenabled true')
client.execute_command('set a-010.stimshape biphasic')
client.execute_command('set a-010.firstphaseamplitudemicroamps 100')
client.execute_command('set a-010.firstphasedurationmicroseconds 200')
# Trigger stimulation
client.execute_command('execute manualstimtriggerpulse a-010')
client.close()
See Also¶
Gesture Classification Pipeline - ML pipeline for control
Intan RHX Controller Device - RHX device details