Hardware Integration and Control

These examples demonstrate integration between Intan EMG systems and external hardware for closed-loop control, robotic applications, and multi-sensor systems.


3D-Printed Robotic Arm Control

Control a 3D-printed robotic hand using EMG-based gesture recognition. This example shows how to integrate the Intan system with a Raspberry Pi Pico-controlled servo system.

Note

Demo video available in the 3D printed arm README.

Hardware Requirements:

  • Raspberry Pi Pico (or Pico 2)

  • PCA9685 16-channel servo driver

  • 5-6 servo motors

  • 3D-printed InMoov hand components

  • 5V power supply (servos)

Software Requirements:

  • CircuitPython on Pico

  • Python 3.10+ on host PC

  • PySerial library

Setup:

See the detailed build guide.

Architecture:

┌─────────────┐         ┌──────────────┐         ┌─────────────┐
│ Intan RHX   │  USB    │   Host PC    │  Serial │ Pico + PCA  │
│   Device    │────────▶│  + Python    │────────▶│   9685      │
└─────────────┘         └──────────────┘         └─────────────┘
                               │                         │
                               │ EMG Processing          │ Servo Control
                               │ Gesture Recognition     │
                               ▼                         ▼
                        [Predict Gesture]          [Move Fingers]

Host-side control script:

"""
EMG-controlled robotic hand
Requires: Pico running code.py from examples/3D_printed_arm_control/
"""
from intan.interface import IntanRHXDevice
from intan.ml import EMGRealTimePredictor
import serial
import tensorflow as tf

# Initialize Intan device
device = IntanRHXDevice(num_channels=64)
device.enable_wide_channel(range(64))
device.start_streaming()

# Load gesture model
model = tf.keras.models.load_model('gesture_model.keras')

# Connect to Pico via serial
pico = serial.Serial('COM3', 115200, timeout=1)  # Adjust port

# Gesture-to-command mapping
GESTURE_COMMANDS = {
    'rest': 'rest',
    'flex': 'grip',
    'extend': 'open',
    'pinch': 'pinch',
    'point': 'point',
}

# Real-time predictor
predictor = EMGRealTimePredictor(
    device=device,
    model=model,
    pca=pca,
    mean=mean,
    std=std,
    label_names=list(GESTURE_COMMANDS.keys()),
    confidence_threshold=0.8
)

print("System ready. Perform gestures to control robot.")

try:
    predictor.run_prediction_loop()

    while True:
        prediction = predictor.get_prediction()

        if prediction:
            gesture = prediction['label']
            confidence = prediction['confidence']

            if gesture in GESTURE_COMMANDS:
                command = GESTURE_COMMANDS[gesture]
                pico.write(f"{command}\\n".encode())
                print(f"[{confidence:.1%}] {gesture}{command}")

except KeyboardInterrupt:
    print("Stopping...")
finally:
    pico.write(b"rest\\n")
    pico.close()
    device.close()

Pico firmware (CircuitPython):

"""
Raspberry Pi Pico servo controller
File: examples/3D_printed_arm_control/code.py
"""
import board
import busio
from adafruit_servokit import ServoKit
import supervisor

# Initialize PCA9685
i2c = busio.I2C(board.GP1, board.GP0)
kit = ServoKit(channels=16, i2c=i2c)

# Servo channel assignments
THUMB = 0
INDEX = 1
MIDDLE = 2
RING = 3
PINKY = 4
WRIST = 5

# Define gestures as servo angles
GESTURES = {
    'rest': [90, 90, 90, 90, 90, 90],
    'grip': [170, 170, 170, 170, 170, 90],
    'open': [10, 10, 10, 10, 10, 90],
    'pinch': [170, 170, 10, 10, 10, 90],
    'point': [170, 10, 170, 170, 170, 90],
}

def move_to_gesture(gesture_name):
    if gesture_name in GESTURES:
        angles = GESTURES[gesture_name]
        for servo_idx, angle in enumerate(angles):
            kit.servo[servo_idx].angle = angle

# Main loop
print("Robotic hand ready")
while True:
    if supervisor.runtime.serial_bytes_available:
        command = input().strip()
        print(f"Received: {command}")
        move_to_gesture(command)

Available gestures:

  • rest: Neutral position

  • grip: Close all fingers

  • open: Open all fingers

  • pinch: Thumb + index finger

  • point: Extend index finger

  • flex: Individual finger control

  • supinate/pronate: Wrist rotation


RHX Device Interface Examples

Low-level examples for interfacing with Intan RHX hardware.

Connecting to device:

"""
Basic device connection and configuration
Script: examples/interface/rhx_connect_to_device.py
"""
from intan.interface import IntanRHXDevice

# Connect to RHX TCP server
device = IntanRHXDevice(
    host='127.0.0.1',
    command_port=5000,
    waveform_port=5001
)

print(f"Connected: {device.connected}")
print(f"Sample rate: {device.get_sample_rate()} Hz")
print(f"Available channels: {device.get_num_channels()}")

# Enable specific channels
device.enable_wide_channel(range(0, 64))

# Start streaming
device.start_streaming()

# Stream 1 second of data
timestamps, data = device.stream(duration_sec=1.0)
print(f"Streamed data shape: {data.shape}")

device.close()

Multi-channel streaming plot:

# Real-time plot of multiple channels
python examples/RHXDevice/multichannel_stream_plot.py

Recording demo:

# Record streaming data to file
python examples/RHXDevice/record_demo.py --duration 60 --output recording.npz

TCP benchmark:

# Test TCP streaming performance
python examples/RHXDevice/tcp_benchmark.py --channels 128 --duration 10

Comparing file vs stream:

"""
Verify streaming data matches recorded file
Script: examples/RHXDevice/compare_rhd_vs_stream.py
"""
from intan.io import load_rhd_file
from intan.interface import IntanRHXDevice
import numpy as np

# Stream live data
device = IntanRHXDevice()
device.enable_wide_channel([15])  # Channel 15
device.start_streaming()
_, stream_data = device.stream(duration_sec=2.0)
device.close()

# Load same data from file
result = load_rhd_file('recording.rhd')
file_data = result['amplifier_data'][15, :len(stream_data[0])]

# Compare
correlation = np.corrcoef(stream_data[0], file_data)[0, 1]
print(f"Correlation: {correlation:.4f}")

# Should be >0.99 if synchronized properly

Scrolling live plot:

"""
Real-time scrolling plot
Script: examples/RHXDevice/scrolling_live.py
"""
from intan.interface import IntanRHXDevice
from intan.plotting import RealtimePlotter
import numpy as np

device = IntanRHXDevice(num_channels=8)
device.enable_wide_channel(range(8))
device.start_streaming()

plotter = RealtimePlotter(
    n_channels=8,
    sample_rate=4000,
    window_sec=2.0,
    update_interval_ms=50
)

try:
    plotter.start()
    buffer_size = 8000
    buffer = np.zeros((8, buffer_size))

    while True:
        _, data = device.stream(n_frames=200)

        buffer = np.roll(buffer, -200, axis=1)
        buffer[:, -200:] = data

        plotter.update(buffer)

except KeyboardInterrupt:
    print("Stopping...")
finally:
    plotter.stop()
    device.close()

Raspberry Pi Pico + IMU Integration

Combine Intan EMG with IMU (accelerometer/gyroscope) data from a Pico.

"""
Synchronized EMG + IMU recording
Script: examples/interface/rhx_emg_and_imu.py
"""
from intan.interface import IntanRHXDevice, PicoIMUClient
import numpy as np

# Connect to Intan
emg_device = IntanRHXDevice(num_channels=64)
emg_device.enable_wide_channel(range(64))
emg_device.start_streaming()

# Connect to Pico IMU
imu_client = PicoIMUClient(port='COM4', baud_rate=115200)

# Recording buffers
emg_buffer = []
imu_buffer = []
timestamps = []

print("Recording EMG + IMU... Press Ctrl+C to stop")

try:
    while True:
        # Get EMG data
        ts, emg_data = emg_device.stream(n_frames=40)
        emg_buffer.append(emg_data)

        # Get IMU data (non-blocking)
        imu_sample = imu_client.read_sample()
        if imu_sample:
            imu_buffer.append(imu_sample)

        timestamps.append(ts)

except KeyboardInterrupt:
    print("Stopping...")

finally:
    # Save synchronized data
    np.savez(
        'emg_imu_recording.npz',
        emg=np.concatenate(emg_buffer, axis=1),
        imu=np.array(imu_buffer),
        timestamps=np.concatenate(timestamps)
    )

    print(f"Saved EMG shape: {np.concatenate(emg_buffer, axis=1).shape}")
    print(f"Saved IMU samples: {len(imu_buffer)}")

    imu_client.close()
    emg_device.close()

Pico IMU streaming script:

"""
Pico IMU data publisher
File: examples/interface/pico/code.py
"""
import board
import busio
import adafruit_mpu6050
import time

i2c = busio.I2C(board.GP1, board.GP0)
mpu = adafruit_mpu6050.MPU6050(i2c)

while True:
    accel = mpu.acceleration
    gyro = mpu.gyro

    # Send as CSV over serial
    print(f"{time.monotonic()},{accel[0]},{accel[1]},{accel[2]},"
          f"{gyro[0]},{gyro[1]},{gyro[2]}")

    time.sleep(0.01)  # 100 Hz

Wireless EMG with RHD2164

Interface with RHD2164 wireless headstage via SPI.

"""
Wireless RHD2164 interface
Script: examples/RHD2164_wireless/code.py
"""
from intan.interface import RHD2164Interface
import time

# Initialize SPI interface (on Pico or similar)
rhd = RHD2164Interface(
    spi_bus=0,
    cs_pin=5,
    sample_rate=20000
)

# Configure channels
rhd.configure_channels(
    channels=range(64),
    bandwidth=[0.1, 7500],  # Hz
    dsp_enabled=True
)

# Start acquisition
rhd.start()

try:
    while True:
        if rhd.data_available():
            samples = rhd.read_samples(100)  # Read 100 samples
            # Process samples...
            print(f"Read {len(samples)} samples")

        time.sleep(0.01)

except KeyboardInterrupt:
    rhd.stop()

TCP Command Interface

Low-level TCP control examples.

"""
Send commands to RHX via TCP
Script: examples/intan_tcp/RHXRunAndStimulateDemo.py
"""
from intan.interface import IntanTCPClient

client = IntanTCPClient()

# Query device info
sample_rate = client.execute_command('get sampleratehertz')
print(f"Sample rate: {sample_rate} Hz")

# Configure stimulation
client.execute_command('set a-010.stimenabled true')
client.execute_command('set a-010.stimshape biphasic')
client.execute_command('set a-010.firstphaseamplitudemicroamps 100')
client.execute_command('set a-010.firstphasedurationmicroseconds 200')

# Trigger stimulation
client.execute_command('execute manualstimtriggerpulse a-010')

client.close()

See Also