PicoScope 7 Software
Available on Windows, Mac and Linux
Code: Select all
#
# Copyright (C) 2018-2019 Pico Technology Ltd. See LICENSE file for terms.
#
# PS2000 Series (A API) STREAMING MODE EXAMPLE
# This example demonstrates how to call the ps4000A driver API functions in order to open a device, setup 2 channels and collects streamed data (1 buffer).
# This data is then plotted as mV against time in ns.
import ctypes
import numpy as np
from picosdk.ps4000a import ps4000a as ps
import matplotlib.pyplot as plt
from picosdk.functions import adc2mV, assert_pico_ok
import time
PROCESS_RATE = 20000000 # Hz
PROCESS_TIME = 5300 # ms
# Create chandle and status ready for use
chandle = ctypes.c_int16()
status = {}
# Open PicoScope 2000 Series device
# Returns handle to chandle for use in future API functions
status["openunit"] = ps.ps4000aOpenUnit(ctypes.byref(chandle), None)
try:
assert_pico_ok(status["openunit"])
except:
powerStatus = status["openunit"]
if powerStatus == 286:
status["changePowerSource"] = ps.ps4000aChangePowerSource(chandle, powerStatus)
else:
raise
assert_pico_ok(status["changePowerSource"])
enabled = 1
disabled = 0
analogue_offset = 0.0
# Set up channel A
# handle = chandle
# channel = PS4000A_CHANNEL_A = 0
# enabled = 1
# coupling type = PS4000A_DC = 1
# range = PS4000A_2V = 7
# analogue offset = 0 V
channel_range = 7
status["setChA"] = ps.ps4000aSetChannel(chandle,
ps.PS4000A_CHANNEL['PS4000A_CHANNEL_A'],
enabled,
ps.PS4000A_COUPLING['PS4000A_DC'],
channel_range,
analogue_offset)
assert_pico_ok(status["setChA"])
def getSamplesBasedOnTime(time_to_record_ms, frequency_hertz):
'''
Returns number of samples based on the time you want to record.
frequency hertz will be a set enum since variable frequency is not possible (I think)
'''
num_samples = int(frequency_hertz*time_to_record_ms/1000)
assert num_samples > 0
return num_samples
total_samples_to_collect = getSamplesBasedOnTime(PROCESS_RATE, PROCESS_TIME)
# Size of capture
sizeOfOneBuffer = total_samples_to_collect
# Create buffers ready for assigning pointers for data collection
bufferAMax = np.zeros(shape=sizeOfOneBuffer, dtype=np.int16)
memory_segment = 0
# Set data buffer location for data collection from channel A
# handle = chandle
# source = PS4000A_CHANNEL_A = 0
# pointer to buffer max = ctypes.byref(bufferAMax)
# pointer to buffer min = ctypes.byref(bufferAMin)
# buffer length = maxSamples
# segment index = 0
# ratio mode = PS4000A_RATIO_MODE_NONE = 0
status["setDataBuffersA"] = ps.ps4000aSetDataBuffers(chandle,
ps.PS4000A_CHANNEL['PS4000A_CHANNEL_A'],
bufferAMax.ctypes.data_as(ctypes.POINTER(ctypes.c_int16)),
None,
sizeOfOneBuffer,
memory_segment,
ps.PS4000A_RATIO_MODE['PS4000A_RATIO_MODE_NONE'])
assert_pico_ok(status["setDataBuffersA"])
# Begin streaming mode:
sampleInterval = ctypes.c_int32(int(10**9/PROCESS_RATE))
sampleUnits = ps.PS4000A_TIME_UNITS['PS4000A_NS']
# We are not triggering:
maxPreTriggerSamples = int(sizeOfOneBuffer/2)
maxPostTriggerSamples = int(sizeOfOneBuffer/2)
autoStopOn = 0
# No downsampling:
downsampleRatio = 1
status["runStreaming"] = ps.ps4000aRunStreaming(chandle,
ctypes.byref(sampleInterval),
sampleUnits,
maxPreTriggerSamples,
maxPostTriggerSamples,
autoStopOn,
downsampleRatio,
ps.PS4000A_RATIO_MODE['PS4000A_RATIO_MODE_NONE'],
sizeOfOneBuffer)
assert_pico_ok(status["runStreaming"])
# We need a big buffer, not registered with the driver, to keep our complete capture in.
bufferCompleteA = np.zeros(shape=total_samples_to_collect, dtype=np.int16)
nextSample = 0
autoStopOuter = False
wasCalledBack = False
def streaming_callback(handle, noOfSamples, startIndex, overflow, triggerAt, triggered, autoStop, param):
global nextSample, autoStopOuter, wasCalledBack
wasCalledBack = True
destEnd = nextSample + noOfSamples
sourceEnd = startIndex + noOfSamples
bufferCompleteA[nextSample:destEnd] = bufferAMax[startIndex:sourceEnd]
nextSample += noOfSamples
# Convert the python function into a C function pointer.
cFuncPtr = ps.StreamingReadyType(streaming_callback)
# Fetch data from the driver in a loop, copying it out of the registered buffers and into our complete one.
start_time = time.time_ns()
while nextSample < total_samples_to_collect:
status["getStreamingLastestValues"] = ps.ps4000aGetStreamingLatestValues(chandle, cFuncPtr, None)
if not wasCalledBack:
time.sleep(0.01)
print("Done grabbing values.")
# Find maximum ADC count value
# handle = chandle
# pointer to value = ctypes.byref(maxADC)
print("Performing ADC Conversion")
maxADC = ctypes.c_int16()
status["maximumValue"] = ps.ps4000aMaximumValue(chandle, ctypes.byref(maxADC))
assert_pico_ok(status["maximumValue"])
# Convert ADC counts data to mV
adc2mVChAMax = adc2mV(bufferCompleteA, channel_range, maxADC)
print("Plotting")
# Create time data
time = np.linspace(0, (len(adc2mVChAMax) - 1) * sampleInterval.value, len(adc2mVChAMax))
# Plot data from channel A and B
plt.plot(time, adc2mVChAMax[:])
plt.ylabel('Voltage (mV)')
plt.show()
# Stop the scope
# handle = chandle
status["stop"] = ps.ps4000aStop(chandle)
assert_pico_ok(status["stop"])
# Disconnect the scope
# handle = chandle
status["close"] = ps.ps4000aCloseUnit(chandle)
assert_pico_ok(status["close"])
# Display status returns
print(status)