Network API

Note

The Trodes Network API is based on the ZMQ network protocol and was introduced in Version 2.0.0 as a secondary method to communicate between modules. It has bindings for Python, making it a powerful way for advanced users to augment their recording setup with custom real-time control. However, it does use more CPU resources than the original QSockets-based network. Users are therefore encouraged to only use the new ZMQ based network if they need these advanced features.

Trodes provides a way for users to write their own modules that interact with the Trodes system in real-time using the TrodesNetwork API.

Real-time Streaming Data From Trodes

When running Trodes doing a live recording or doing a simulated recording via playback of a .rec file, data is streamed out. This data can include raw neural data, processed local field potential (LFP), or processed spikes.

Some experimental protocols might make use of real-time data by detecting certain events and responding by programmatically changing stimulation patterns or delivering rewards.

TrodesNetwork Python SDK is used to receive network signals from Trodes in realtime.

This is a package for accessing streaming Trodes data for real-time analysis in Python.

Installation of Python packages

You can install trodesnetwork and Python dependencies like this:

pip install trodesnetwork
pip install zmq
pip intall msgpack

General Usage

Import trodes from the library. It contains a classes for querying the hardware querying general information. Here is how to establish a connection with the services located at trodes.hardware and trodes.TrodesInfoRequester:

from trodesnetwork import trodes
hardware = trodes.TrodesHardware()
info = trodes.TrodesInfoRequester()

Import socket from the library. It is used to subscribe to various data sources.

from trodesnetwork import socket

Then, subscribe to a specific data type that is being published.

spikes = socket.SourceSubscriber('source.spikes')

In a main loop, receive the requested data and do your processing:

spikes.receive()

The server_address should be changed accordingly if you are running the network from the GUI on a different address.

hardware = trodes.TrodesHardware(server_address="tcp://127.0.0.1:49152")
spikes = socket.SourceSubscriber('source.spikes',server_address="tcp://127.0.0.1:49152")

Supported Datatypes

All data types are returned as a Python dictionary.

Local field potential (LFP)

Use the local field potential by creating a subscriber object to ‘lfp’.

lfpSub = SourceSubscriber('source.lfp')
lfpSub.receive()

Dictionary key:value pairs

  • ‘localTimestamp’ : The hardware sample count since streaming began

  • ‘systemTimestamp’ : The 64-bit computer timestamp when the sample was first processed

  • ‘lfpData’ : A vector containing int16 values of the LFP band data for all channels

Sorted Spike Events

Use the sorted spike events by creating subscriber object to ‘spikes’.

spikesSub = SourceSubscriber('source.spikes')
spikesSub.receive()

Dictionary key:value pairs

  • ‘localTimestamp’ : The hardware sample count since streaming began

  • ‘systemTimestamp’ : The 64-bit computer timestamp when the sample was first processed

  • ‘nTrodeId’ : The ID number of the nTrode

  • ‘cluster’ : The cluster number, starting with 1

Neural Data

Use the raw neural data by creating a subscriber object to ‘neural’.

neuralSub = SourceSubscriber('source.neural')
neuralSub.receive()

Dictionary key:value pairs

  • ‘localTimestamp’ : The hardware sample count since streaming began

  • ‘systemTimestamp’ : The 64-bit computer timestamp when the sample was first processed

  • ‘neuralData’ : A vector containing int16 values of the raw unreferenced data for all channels

Camera Data

To subscribe to camera data, the cameraModule needs to be active. You can play from a .rec file with a matching .h264 video file with geometry tracking set up. You can also do a live recording with tracking set up.

positionSub = SourceSubscriber('source.position')
positionSub.receive()

Dictionary key:value pairs

  • ‘timestamp’ : The hardware sample count since streaming began

  • ‘lineSegment’ : For linearized tracking. This is the current line segment the animal is on.

  • ‘posOnSegment’ : For linearized tracking. This is the current position along the linear segment.

  • ‘x’ : For 2D tracking. The animal’s X location (or the first diode for red/green tracking)

  • ‘y’ : For 2D tracking. The animal’s y location (or the first diode for red/green tracking).

  • ‘x2’ : For 2D tracking with red/green diodes. The X location of the 2nd diode.

  • ‘y2’ : For 2D tracking with red/green diodes. The y location of the 2nd diode.

Digital IO Data

Digital IO data from multiple sources (MCU and ECU) are bundled together into a single byte array.

dioSub = SourceSubscriber('source.digital')
dioSub.receive()

Dictionary key:value pairs

  • ‘localTimestamp’ : The hardware sample count since streaming began

  • ‘systemTimestamp’ : The 64-bit computer timestamp when the sample was first processed

  • ‘digitalData’ : A vector containing uint8 values with the state of each digital I/O channel encoded as one bit in the vector. See recipe below for finding the desired bit.

Preventing frozen execution: non-blocking socket receives or threading

The receive() calls are blocking by default, which means that if there is no data to receive, the call will block code execution in the thread until there is data to receive. There are multiple ways to allow other tasks to be performed when there is no data to receive. The first method is to pass ‘noblock=True’ to receive().

dataType.receive(noblock=True) # throws ZMQError if no message

In order to use this feature, make sure to catch ZMQError in case you do a receive call and there are no new messages.

import zmq
# ...setup code...
try:
   # do your receive
   pass
except zmq.ZMQError as e:
   # handle the case where there is no message
   pass

The second method is to use threading, where multiple threads can be active in parallel. If one thread is blocked, the other threads will still function.

from trodesnetwork import socket
import threading

spikes = SourceSubscriber('source.spikes')
spikes.receive()

def subscribe_spikes_thread():
   spikes = SourceSubscriber('source.spikes')
   while True:
      s = spikes.receive() #This will block if no spikes are present
      ts = s['localTimestamp']
      tid = s['nTrodeId']
      cluster = s['cluster']
      print(f'S: {ts} {tid} {cluster}')

t1 = threading.Thread(target=subscribe_spikes_thread) #Insert into a thread
t1.start()

Examples

Note

The easiest way to get started ois to run the Trodes GUI application and play back a .rec file.

Warning

If your code hangs when calling a constructor for a network object, this is a sign you may be having trouble connecting to the network server.

Ensure Trodes is running. Double check which address and port the network is running on and make sure you initialize your Python objects with the proper address and port.

Stimulation

#Previous stimulation commands are saved on the hardware, so erase the ones you are redefining
hardware.sendClearStimulationParams(0) #clear the slot
hardware.sendClearStimulationParams(1) #clear the slot
hardware.sendClearStimulationParams(2) #clear the slot
hardware.sendClearStimulationParams(3) #clear the slot

# Set up global parameters. The main thing to define is the maximum current output.
#This global setting is used by all stimulation commands. The individual commands
#divide this scale into 255 steps for both the anode and the cathode.
#Therefore, a higher max current results in lower resolution.
globalStimSettings = trodes.GlobalStimulationSettings()
globalStimSettings.setVoltageScale(
 #scaleValue = trodes.CurrentScaling.max10nA
 #scaleValue = trodes.CurrentScaling.max20nA
 #scaleValue = trodes.CurrentScaling.max50nA
 #scaleValue = trodes.CurrentScaling.max100nA
 #scaleValue = trodes.CurrentScaling.max200nA
 #scaleValue = trodes.CurrentScaling.max500nA
 #scaleValue = trodes.CurrentScaling.max1uA
 #scaleValue = trodes.CurrentScaling.max2uA
 #scaleValue = trodes.CurrentScaling.max5uA
 scaleValue = trodes.CurrentScaling.max10uA
 )
#Send the global parameters
hardware.sendGlobalStimulationSettings(globalStimSettings)

#1st pattern definition for biphasic stimulation.
stimCommand = trodes.StimulationCommand()
stimCommand.setBiphasicPulseShape(
  leadingPulseWidth_Samples = 50,
  leadingPulseAmplitude = 200, #Must be between 0 and 255
  secondPulseWidth_Samples = 50, #Pay attention to charge balancing!!
  secondPulseAmplitude = 200, #Must be between 0 and 255 #Pay attention to charge balancing!!
  interPhaseDwell_Samples = 5, #time between the anode and cathode pulses
  pulsePeriod_Samples = 2000, #Defines the pulse frequency in the train
  startDelay_Samples = 0 #Delay after the group trigger is sent
)
stimCommand.setNumPulsesInTrain(
   numPulsesInTrain = 2 #How many total biphasic pulses
)
stimCommand.setChannels(
#Define the electrodes that the cathode and anode will be on.
#Note: these channel must be enabled for stimulation in the Trodes workspace
#and be mapped to stimulation capable hardware channels.
   cathodeID = 2,
   cathodeChannel = 1,
   anodeID = 2,
   anodeChannel = 1
)
stimCommand.setGroup(
  group=2 #Multiple stimulation commands can go in the same group
)
stimCommand.setSlot(
  slot=1 #Each stimulation command must have a unique slot
)
hardware.sendStimulationParams(stimCommand) #Send the parameters to hardware

#2nd pattern definition
stimCommand.setBiphasicPulseShape(
 leadingPulseWidth_Samples = 50,
 leadingPulseAmplitude = 200,
 secondPulseWidth_Samples = 50,
 secondPulseAmplitude = 200,
 interPhaseDwell_Samples = 5,
 pulsePeriod_Samples = 1000,
 startDelay_Samples = 10000
)
stimCommand.setNumPulsesInTrain(
 numPulsesInTrain = 3
)
stimCommand.setChannels(
 cathodeID = 2,
 cathodeChannel = 2,
 anodeID = 2,
 anodeChannel = 2
)
stimCommand.setGroup(
 group=2 #Multiple stimulation commands can go in the same group
)
stimCommand.setSlot(
 slot=2 #Each stimulation command must have a unique slot
)
hardware.sendStimulationParams(stimCommand)

# Send a message to start the group.
#THIS WILL TRIGGER BOTH TRAINS AT THE SAME TIME SINCE THEY ARE BOTH IN GROUP 2
hardware.sendStimulationStartGroup(2)

Triggering a defined StateScript function

# trigger StateScript function 3.
hardware.ecu_shortcut_message(3)

Info Requests

from trodesnetwork import trodes
info = trodes.TrodesInfoRequester()
info.request_time()
info.request_timerate()

Timestamping and Synchronization

In general, timestamps sent from TrodesNetwork are integers representing Unix time in nanoseconds.

To get the Unix time in nanoseconds from Python, you can do:

import time
timestamp = time.time_ns()

This is useful for being able to compare latencies between when Trodes sends out data and when your client code receives it.


Synchronization between modules on different computers

When using network modules across computers, this involves ensuring each device is properly synchronized using the Network Time Protocol (NTP).

When properly synchronized with NTP, you can be sure that the Unix time that each device reports is as similar as possible.

Triggering a function when a digital input changes

from trodesnetwork import trodes
from trodesnetwork import socket

hardware = trodes.TrodesHardware()
dio = socket.SourceSubscriber('source.digital')

oldDValue = 0
newDValue = 0

while True:

  mcu_byte = 1  #location of the MCU data
  ecu_in_byte = 2 #start location of the ECU digital input data
  ecu_out_byte = 6 #start location of the ECU digital output data

  desired_device = ecu_in_byte #choose the device from the above list
  desired_device_channel = 0; #this is 0-based, so '0' is the first channel in the device

  d = dio.receive()
  sys_timestamp = d['localTimestamp'] #this is the hardware timestamp of the sample
  byte_data = bytearray(d['digitalData'][0])  # this is a byte array containing all of the digital info (one bit per channel)
  channel_data = (byte_data[desired_device] >> desired_device_channel) & 1 #Here we isolate just the desired bit

  newDValue = channel_data

  if (oldDValue == 0 and newDValue == 1):
     print('Upward edge trigger at sample '+str(sys_timestamp))
     ​
  oldDValue = newDValue

Triggering a StateScript function when receiving a Spike from a specific cluster

When actually implementing this recipe, you will likely want to have the while loop conditioned on a boolean variable that can be toggled from outside of the loop.

from trodesnetwork import trodes
from trodesnetwork import socket
import threading
​
def subscribe_spikes_thread():
   spikes = socket.SourceSubscriber('source.spikes')
   hardware = trodes.TrodesHardware()
​
   while True:
      s = spikes.receive()
      cluster = s['cluster']
      ntrode_id = s['nTrodeId']
​
      if ntrode_id == 1 and cluster == 1:
         # trigger StateScript function id=1
         hardware.ecu_shortcut_message(1)
​
t1 = threading.Thread(target=subscribe_spikes_thread)
t1.start()

Updating your trodesnetwork version via pip

If there are updates to trodesnetwork, you can update your local Python package version using the following command.

pip install --upgrade --force-reinstall trodesnetwork

Deploying package to PyPi (for maintainers)

This Python library is available on PyPi. Package maintainers can update the package version using the following commands.

python setup.py sdist
twine upload dist