Octave Resource Protocol Software Tools
This page provides information about software tools built on top of the Octave Resource Protocol:
- Octave ORP Evaluation Tool: the Octave ORP Evaluation Tool sample package that you can use as a starting point for experimenting with connecting a mangOH Red to an edge device (see this tutorial).
- Demo Application for Southbound Resource Protocol: the Demo Application for Southbound Resource Protocol sample package that you can use as a starting point for experimenting with communications between a mangOH Red and a development machine over a USB-to-UART bridge (see this tutorial for connecting a development machine to a mangOH Red).
- Source Code for the Octave ORP Evaluation Tool Package: provides the source code for the Octave ORP Evaluation Tool package
- Source Code for the Demo Application for Southbound Resource Protocol
Octave ORP Evaluation Tool
We have provided source code, for an Octave ORP Evaluation Tool Python package. This package contains a high-level, sample Python client that you can copy and use in your edge device applications to interact with Octave. The client is built upon the package's implementation of the Octave Resource Protocol. The package also contains a sample Python application.
The Octave ORP Evaluation Tool package includes a Python API defined in octave_rp.py. The API consists of the classes and methods described in the following subsections.
You can view an example application that was built using this API in the package's sample.py file.
For more information see this tutorial.
OctaveRP
OctaveRP
is the package's main class that is built upon HDLC for framing. This class uses the package's implementation of the Octave Resource Protocol to perform Octave communications.
Once instantiated, the Octave_RP
instance is used by the API's Sensor, Input, and Output classes to create and manage the respective resources.
Methods
Constructor
Constructs and initializes a new OctaveRP
object.
Parameters
Name | Type | Required | Description |
---|---|---|---|
self | Octave_RP | No | The class instance. |
serial | Serial | Yes | A Serial object over which to communicate. |
retry_timeout | Float | No | The timeout period, in seconds, after which to retry communications. |
create_input
Creates an Input object representing an Octave Input Resource on the Datahub.
Parameters
Name | Type | Required | Description |
---|---|---|---|
self | Octave_RP | No | The class instance. |
dataType | String | Yes | The type of data to tracked by the Input. Must be set to one of the following values: trig (trigger), bool , num , str , or json . |
path | String | Yes | The resource's path. |
unit | String | Yes | The type of units that the value represents (e.g. degrees). |
create_sensor
Creates a Sensor object representing an Octave Sensor Resource on the Datahub.
Parameters
Name | Type | Required | Description |
---|---|---|---|
self | Octave_RP | No | The class instance. |
dataType | String | Yes | The type of data to tracked by the Output. Must be set to one of the following values: trig (trigger), bool , num , str , or json . |
path | String | Yes | The resource's path. |
unit | String | Yes | The type of units that the value represents (e.g. degrees). |
handler | Callback | Yes | The method to invoke when a value is output from the device. |
Sensor
Represents an Octave Sensor resource.
Methods
Constructor
Constructs and initializes a new Sensor
object.
Parameters
Name | Type | Required | Description |
---|---|---|---|
self | Sensor | No | The class instance. |
orp | Octave_RP | Yes | A reference to an OctaveRP instance. |
handler | Callback | Yes | The method to invoke when the value is sensed on the device. |
dataType | String | Yes | The type of data to tracked by the Sensor. Must be set to one of the following values: trig (trigger), bool , num , str , or json . |
path | String | Yes | The resource's path. |
unit | String | No | The type of units that the value represents (e.g. degrees). |
create_sensor
Creates a new Sensor
object using the OctaveRP
reference passed in the constructor to perform the necessary Octave commands.
Parameters
Name | Type | Required | Description |
---|---|---|---|
self | Sensor | No | The class instance. |
Output
Represents an Octave Output resource.
Methods
Constructor
Constructs and initializes a new Output
object.
Parameters
Name | Type | Required | Description |
---|---|---|---|
self | Sensor | No | The class instance. |
orp | Octave_RP | Yes | A reference to an OctaveRP instance. |
handler | Callback | Yes | The method to invoke when the value is output on the device. |
dataType | String | Yes | The type of data to tracked by the Output. Must be set to one of the following values: trig (trigger), bool , num , str , or json . |
path | String | Yes | The resource's path. |
unit | String | No | The type of units that the value represents (e.g. degrees). |
create_output
Creates a new Output
object using the OctaveRP
reference passed in the constructor to perform the necessary Octave commands.
Parameters
Name | Type | Required | Description |
---|---|---|---|
self | Sensor | No | The class instance. |
Input
Represents an Octave Input resource.
Methods
Constructor
Constructs and initializes a new Input
object.
Name | Type | Required | Description |
---|---|---|---|
self | Sensor | No | The class instance. |
orp | Octave_RP | Yes | A reference to an OctaveRP instance. |
dataType | String | Yes | The type of data to tracked by the Input. Must be set to one of the following values: trig (trigger), bool , num , str , or json . |
path | String | Yes | The resource's path. |
unit | String | No | The type of units that the value represents (e.g. degrees). |
Parameters
create_input
Creates a new Input
object using the OctaveRP
reference passed in the constructor to perform the necessary Octave commands.
Parameters
Name | Type | Required | Description |
---|---|---|---|
self | Sensor | No | The class instance. |
send
Sends an input value.
Parameters
Name | Type | Required | Description |
---|---|---|---|
self | Sensor | No | The class instance. |
value | Sensor | Yes | The input value to send. |
Demo Application for Southbound Resource Protocol
We have provided source code, for a Demo Application for Southbound Resource Protocol Python package. This package contains a high-level, sample Python client that you can copy and use to experiment with communicating between a development machine and an Octave-enabled device over a USB-to-UART bridge.
The client is built upon the package's implementation of Octave's Southbound Resource Protocol. The package also contains a sample Python application.
You can view an example application that was built using this API in the package's run.py file.
For more information see the ORP Communications via USB-to-UART bridge tutorial on connecting a development machine to a mangOH Red over a USB-to-UART bridge.
Source Code for the Octave ORP Evaluation Tool Package
The following sub sections provide the source code that you can copy and paste into the respective files to create the Octave ORP Evaluation Tool package. This package can be used to experiment with ORP and is also used in this tutorial .
The files in the package include the following:
- __init__.py: defines the package's imports.
- octave_rp.py: defines the
OctaveRP
class which exposes an implementation of a high-level API for working with ORP. - protocol.py: provides an implementation of ORP.
- sample.py: provides a sample Python application built using the
OctaveRP
defined in octave_rp.py. - simple_hdlc.py: provides an implementation of HDLC that is used by the
OctaveRP
defined in octave_rp.py.
__init__.py
from octave_rp import Sensor, OctaveRP, Output, Input
octave_rp.py
from protocol import *
from serial import Serial
from simple_hdlc import HDLC
from time import sleep
import logging
logging.basicConfig(format='%(asctime)s %(name)s: %(message)s')
logger = logging.getLogger('OctaveRP')
logger.setLevel(logging.DEBUG)
class OctaveRP(HDLC):
def __init__(self, serial, retry_timeout=0.5):
HDLC.__init__(self, serial)
self.serial = serial
self.messages = []
self.sensor_handlers = {}
self.output_handlers = {}
self.retry_timeout = retry_timeout
self.startReader(onFrame=self.rcv_frame)
def _wakeup(self):
self.serial.write("~")
sleep(0.1)
self.serial.write("~")
sleep(0.1)
self.serial.write("~")
def execute_wait_confirm(self, frame, resp):
self.clear()
self._wakeup()
confirmed = False
while confirmed == False:
logger.debug("sending frame, awaiting %s" % resp)
self.sendFrame(frame)
sleep(self.retry_timeout)
for m in self.messages:
logger.debug('checking message %s' % m)
if m['responseType'] == resp:
confirmed = True
def push_value(self, path, datatype=None, value=None):
if value is None:
value = self.sensor_handlers[path][1]()
datatype = self.sensor_handlers[path][0]
cmd = 'push %s %s %s' % (datatype, '%s/value' % path, value)
logger.debug('preparing to send: %s' % cmd)
frame = encode_request(cmd)
self._wakeup()
self.sendFrame(frame)
def create_input(self, dataType, path, unit):
if unit:
frame = encode_request('create input %s %s/value %s' % (dataType, path, unit))
else:
frame = encode_request('create input %s %s' % (dataType, path))
logger.debug('creating input at %s' % path)
self.execute_wait_confirm(frame, SBR_PKT_RESP_INPUT_CREATE)
def create_sensor(self, dataType, path, unit, handler):
if unit:
frame = encode_request('create sensor %s %s %s' % (dataType, path, unit))
else:
frame = encode_request('create sensor %s %s' % (dataType, path))
logger.debug('creating sensor at %s' % path)
self.execute_wait_confirm(frame, SBR_PKT_RESP_SENSOR_CREATE)
self.sensor_handlers[path] = (dataType, handler)
self.push_value(path)
def create_output(self, dataType, path, unit, handler):
self.output_handlers[path] = handler
if unit:
frame = encode_request('create output %s %s %s' % (dataType, path, unit))
else:
frame = encode_request('create output %s %s' % (dataType, path))
# create output
logger.debug('creating output at %s' % path)
self.execute_wait_confirm(frame, SBR_PKT_RESP_OUTPUT_CREATE)
# register handler
frame = encode_request('add handler %s' % path)
self.execute_wait_confirm(frame, SBR_PKT_RESP_HANDLER_ADD)
def clear(self):
self.messages = []
def rcv_frame(self, data):
message = decode_response(data)
logger.debug('recv frame: %s' % message)
if message['responseType'] == SBR_PKT_NTFY_SENSOR_CALL:
if message['path'] in self.sensor_handlers.keys():
self.push_value(message['path'])
elif message['responseType'] == SBR_PKT_NTFY_HANDLER_CALL:
logger.debug('new data at %s' % message['path'])
if message['path'] in self.output_handlers.keys():
self.output_handlers[message['path']](message['data'])
else:
self.messages.append(message)
class Sensor(object):
def __init__(self, orp, handler, datatype, path, unit=None):
self.datatype = datatype
self.path = path
self.unit = unit
self.orp = orp
self.handler = handler
self.declared = False
def create_sensor(self):
self.orp.create_sensor(self.datatype, self.path, self.unit, self.handler)
self.declared = True
class Output(object):
def __init__(self, orp, handler, datatype, path, unit=None):
self.datatype = datatype
self.path = path
self.unit = unit
self.orp = orp
self.handler = handler
self.declared = False
def create_output(self):
self.orp.create_output(self.datatype, self.path, self.unit, self.handler)
self.declared = True
class Input(object):
def __init__(self, orp, datatype, path, unit=None):
self.datatype = datatype
self.path = path
self.unit = unit
self.orp = orp
self.declared = False
def create_input(self):
self.orp.create_input(self.datatype, self.path, self.unit)
def send(self, value):
self.orp.push_value(self.path, self.datatype, value)
protocol.py
#============================================================================
#
# Filename: sbr_Protocol_v1.py
#
# Purpose: Python test functions to encode and decode packets for the
# Southbound Resource Protocol
#
# Copyright: (c) 2018 Sierra Wireless, Inc.
# All rights reserved
#
#----------------------------------------------------------------------------
#
# VERSION: 0.0.1
#
# NOTES:
#
#
# Packet type field - byte 0
#
SBR_PKT_RQST_INPUT_CREATE = 'I' # type[1] d_type[1] pad[2] path[] units[]
SBR_PKT_RESP_INPUT_CREATE = 'i' # type[1] status[1] pad[2]
SBR_PKT_RQST_OUTPUT_CREATE = 'O' # type[1] d_type[1] pad[2] path[] units[]
SBR_PKT_RESP_OUTPUT_CREATE = 'o' # type[1] status[1] pad[2]
SBR_PKT_RQST_DELETE = 'D' # type[1] pad[1] pad[2] path[]
SBR_PKT_RESP_DELETE = 'd' # type[1] status[1] pad[2]
SBR_PKT_RQST_HANDLER_ADD = 'H' # type[1] pad[1] pad[2] path[]
SBR_PKT_RESP_HANDLER_ADD = 'h' # type[1] status[1] pad[2]
SBR_PKT_RQST_HANDLER_REMOVE = 'K' # type[1] pad[1] pad[2] path[]
SBR_PKT_RESP_HANDLER_REMOVE = 'k' # type[1] status[1] pad[2]
SBR_PKT_RQST_PUSH = 'P' # type[1] d_type[1] pad[2] time[] path[] data[]
SBR_PKT_RESP_PUSH = 'p' # type[1] status[1] pad[2]
SBR_PKT_RQST_GET = 'G' # type[1] pad[1] pad[2] path[]
SBR_PKT_RESP_GET = 'g' # type[1] status[1] pad[2] time[] data[]
SBR_PKT_RQST_EXAMPLE_SET = 'E' # type[1] d_type[1] pad[2] path[] data[]
SBR_PKT_RESP_EXAMPLE_SET = 'e' # type[1] status[1] pad[2]
SBR_PKT_RQST_SENSOR_CREATE = 'S' # type[1] d_type[1] pad[2] path[] units[]
SBR_PKT_RESP_SENSOR_CREATE = 's' # type[1] status[1] pad[2]
SBR_PKT_RQST_SENSOR_REMOVE = 'R' # type[1] pad[1] pad[2] path[]
SBR_PKT_RESP_SENSOR_REMOVE = 'r' # type[1] status[1] pad[2]
SBR_PKT_NTFY_HANDLER_CALL = 'c' # type[1] d_type[1] pad[2] time[] path[] data[]
SBR_PKT_RESP_HANDLER_CALL = 'C' # type[1] status[1] pad[2]
SBR_PKT_NTFY_SENSOR_CALL = 'b' # type[1] pad[1] pad[2] path[]
SBR_PKT_RESP_SENSOR_CALL = 'B' # type[1] status[1] pad[2]
SBR_PKT_RESP_UNKNOWN_RQST = '?' # type[1] status[1] pad[2]
#
# Data type field - byte 1
#
SBR_DATA_TYPE_TRIGGER = 'T' # trigger - no data
SBR_DATA_TYPE_BOOLEAN = 'B' # Boolean - 1 byte: 't' | 'f'
SBR_DATA_TYPE_NUMERIC = 'N' # numeric - null-terminated ASCII string, representing double
SBR_DATA_TYPE_STRING = 'S' # string - null-terminated ASCII string
SBR_DATA_TYPE_JSON = 'J' # JSON - null-terminated ASCII string, representing JSON
SBR_DATA_TYPE_UNDEF = ' ' # not specified
#
# Variable length field identifiers
#
SBR_FIELD_ID_PATH = 'P'
SBR_FIELD_ID_TIME = 'T'
SBR_FIELD_ID_UNITS = 'U'
SBR_FIELD_ID_DATA = 'D'
# Variable length field separator
SBR_VARLENGTH_SEPARATOR = ','
#
# Packet type descriptions
#
ptypes = [
[ SBR_PKT_RQST_INPUT_CREATE, 'request create input' ],
[ SBR_PKT_RESP_INPUT_CREATE, 'response create input' ],
[ SBR_PKT_RQST_OUTPUT_CREATE, 'request create output' ],
[ SBR_PKT_RESP_OUTPUT_CREATE, 'response create output' ],
[ SBR_PKT_RQST_DELETE, 'request delete resource' ],
[ SBR_PKT_RESP_DELETE, 'response delete resource' ],
[ SBR_PKT_RQST_HANDLER_ADD, 'request add handler' ],
[ SBR_PKT_RESP_HANDLER_ADD, 'response add handler' ],
[ SBR_PKT_RQST_HANDLER_REMOVE, 'request remove handler' ],
[ SBR_PKT_RESP_HANDLER_REMOVE, 'response remove handler' ],
[ SBR_PKT_RQST_PUSH, 'request push' ],
[ SBR_PKT_RESP_PUSH, 'response push' ],
[ SBR_PKT_RQST_GET, 'request get' ],
[ SBR_PKT_RESP_GET, 'response get' ],
[ SBR_PKT_RQST_EXAMPLE_SET, 'request set example' ],
[ SBR_PKT_RESP_EXAMPLE_SET, 'response set example' ],
[ SBR_PKT_RQST_SENSOR_CREATE, 'request create sensor' ],
[ SBR_PKT_RESP_SENSOR_CREATE, 'response create sensor' ],
[ SBR_PKT_RQST_SENSOR_REMOVE, 'request remove sensor' ],
[ SBR_PKT_RESP_SENSOR_REMOVE, 'response remove sensor' ],
[ SBR_PKT_NTFY_HANDLER_CALL, 'handler call' ],
[ SBR_PKT_RESP_HANDLER_CALL, 'handler ack' ],
[ SBR_PKT_NTFY_SENSOR_CALL, 'sensor poll' ],
[ SBR_PKT_RESP_SENSOR_CALL, 'sensor poll ack' ],
[ SBR_PKT_RESP_UNKNOWN_RQST, 'unknown packet type' ],
]
#
# Status field
#
status_list = [
'OK',
'NOT FOUND',
'NOT POSSIBLE', # deprecated
'OUT OF RANGE',
'NO MEMORY',
'NOT PERMITTED',
'FAULT',
'COMM ERROR',
'TIMEOUT',
'OVERFLOW',
'UNDERFLOW',
'WOULD BLOCK',
'DEADLOCK',
'FORMAT ERROR',
'DUPLICATE',
'BAD PARAMETER',
'CLOSED',
'BUSY',
'UNSUPPORTED',
'IO_ERROR',
'NOT IMPLEMENTED',
'UNAVAILABLE',
'TERMINATED'
]
#
# Data types
#
data_types = [
[ 'trig', SBR_DATA_TYPE_TRIGGER ],
[ 'bool', SBR_DATA_TYPE_BOOLEAN ],
[ 'num', SBR_DATA_TYPE_NUMERIC ],
[ 'str', SBR_DATA_TYPE_STRING ],
[ 'json', SBR_DATA_TYPE_JSON ]
]
#
# Syntax
#
syntax_list = [
' create input|output|sensor trig|bool|num|str|json <path> [<units>]',
' delete resource|handler|sensor <path>',
' add handler <path>',
' push trig|bool|num|str|json <path> [<data>]',
' get <path>',
' example json <path> [<data>]'
]
#
# Usage
#
def print_usage():
print 'Usage:'
for i in range(len(syntax_list)):
print syntax_list[i]
print
#
# Encode data type
#
def encode_dtype(data_type):
field = ''
dtype = data_type.lower()
if dtype[0] == 't':
field = field + SBR_DATA_TYPE_TRIGGER
elif dtype[0] == 'b':
field = field + SBR_DATA_TYPE_BOOLEAN
elif dtype[0] == 'n':
field = field + SBR_DATA_TYPE_NUMERIC
elif dtype[0] == 's':
field = field + SBR_DATA_TYPE_STRING
elif dtype[0] == 'j':
field = field + SBR_DATA_TYPE_JSON
else:
print 'Invalid data type'
return
return field
#
# Encode segment number and segment count
#
def encode_segment():
return '01'
#
# Encode path
#
def encode_path(path):
return SBR_FIELD_ID_PATH + path
#
# Encode units
#
def encode_units(units):
return SBR_FIELD_ID_UNITS + units
#
# Encode data
#
def encode_data(data):
return SBR_FIELD_ID_DATA + data
#
# create input|output|sensor data-type path [units]
#
def encode_create(argc, args):
packet = ''
dtype = ''
syntax = syntax_list[0]
if argc < 3 :
print 'Invalid number of arguments'
print syntax_list[0]
return
if argc > 3 :
what,data_type,path,units = args.split(' ')
else:
what,data_type,path = args.split(' ')
units = None
what = what.lower()
if what[0] == 'i':
packet = packet + SBR_PKT_RQST_INPUT_CREATE
elif what[0] == 'o':
packet = packet + SBR_PKT_RQST_OUTPUT_CREATE
elif what[0] == 's':
packet = packet + SBR_PKT_RQST_SENSOR_CREATE
else:
print 'Invalid request'
print syntax_list[0]
return
dtype = encode_dtype(data_type)
if dtype == '':
return
packet = packet + dtype
packet = packet + encode_segment()
packet = packet + encode_path(path)
if units != None:
packet = packet + SBR_VARLENGTH_SEPARATOR
packet = packet + encode_units(units)
return packet
#
# delete resource|handler|sensor path
#
def encode_delete(argc, args):
packet = ''
if argc < 2 :
print 'Invalid number of arguments'
print syntax_list[1]
return
what,path = args.split(' ')
what = what.lower()
# packet type
if what[0] == 'r':
packet = packet + SBR_PKT_RQST_DELETE
elif what[0] == 'h':
packet = packet + SBR_PKT_RQST_HANDLER_REMOVE
elif what[0] == 's':
packet = packet + SBR_PKT_RQST_SENSOR_REMOVE
else:
print 'Invalid request'
print syntax_list[1]
return
packet = packet + '.'
packet = packet + encode_segment()
packet = packet + encode_path(path)
return packet
#
# add handler path
#
def encode_add(argc, args):
packet = ''
if argc < 2 :
print 'Invalid number of arguments'
print syntax_list[2]
return
what,path = args.split(' ')
what = what.lower()
if what[0] != 'h':
print 'Invalid request ' + what
print syntax_list[2]
return
packet = packet + SBR_PKT_RQST_HANDLER_ADD
# data type - ignored
packet = packet + '.'
packet = packet + encode_segment()
packet = packet + encode_path(path)
return packet
#
# push data-type path [data]
#
def encode_push(argc, args):
packet = ''
dtype = ''
if argc < 2 :
print 'Invalid number of arguments'
print syntax_list[3]
return
print 'argc: ' + str(argc)
if argc > 2 :
data_type,path,data = args.split(' ', 2)
else:
data_type,path = args.split(' ')
data = ''
packet = packet + SBR_PKT_RQST_PUSH
dtype = encode_dtype(data_type)
if dtype == '':
return
packet = packet + dtype
packet = packet + encode_segment()
packet = packet + encode_path(path)
if data != '':
packet = packet + SBR_VARLENGTH_SEPARATOR
packet = packet + encode_data(data)
return packet
#
# get path
#
def encode_get(argc, args):
packet = ''
dtype = ''
if argc < 1 :
print 'Invalid number of arguments'
print syntax_list[4]
return
packet = packet + SBR_PKT_RQST_GET
# data type ignored
packet = packet + '.'
packet = packet + encode_segment()
packet = packet + encode_path(args)
return packet
#
# example data-type path [data]
#
def encode_example(argc, args):
packet = ''
dtype = ''
if argc < 2 :
print 'Invalid number of arguments'
print syntax_list[5]
return
if argc > 2 :
data_type,path,data = args.split(' ', 2)
else:
data_type,path = args.split(' ')
data = ''
packet = packet + SBR_PKT_RQST_EXAMPLE_SET
dtype = encode_dtype(data_type)
if dtype == '':
return
packet = packet + dtype
packet = packet + encode_segment()
packet = packet + encode_path(path)
if data != '':
packet = packet + SBR_VARLENGTH_SEPARATOR
packet = packet + encode_data(data)
return packet
#
# Parse command and build a request packet
#
def encode_request(request):
if request.find(' ') < 0 :
print_usage()
return
argc = len(request.split(' ')) - 1
# all commands take at least one argument
if argc < 1 :
print_usage()
return
request_type,args = request.split(' ', 1)
request_type = request_type.lower()
if request_type[0] == 'c':
p = encode_create(argc, args)
elif request_type[0] == 'd':
p = encode_delete(argc, args)
elif request_type[0] == 'a':
p = encode_add(argc, args)
elif request_type[0] == 'p':
p = encode_push(argc, args)
elif request_type[0] == 'g':
p = encode_get(argc, args)
elif request_type[0] == 'e':
p = encode_example(argc, args)
elif request_type[0] == 'r':
p = args
else:
print_usage()
return
return p
#
# Decode and print contents of an incoming packet
#
def decode_response(response):
resp = {}
# Positional fields:
ptype = response[0]
status = response[1]
seg_number = response[2]
seg_count = response[3]
# Labeled, variable length fields
var_length = response[4:]
for i in range(len(ptypes)):
test = ptypes[i]
if test[0] == ptype:
resp['responseType'] = test[0]
# Status is represented in ASCII, starting with '@' (0x40) for OK.
# Subtract 0x40 to index into the table, above
#
i = ord(status[0]) - ord('\x40')
resp['status'] = status_list[i]
if len(var_length):
var_fields = var_length.split(',')
for i in range(len(var_fields)):
field = var_fields[i]
if field[0] == 'P':
resp['path'] = field[1:]
if field[0] == 'T':
resp['timestamp'] = field[1:]
if field[0] == 'D':
resp['data'] = field[1:]
return resp
sample.py
import os
from json import dumps
import logging
from platform import platform
from psutil import cpu_percent, virtual_memory
from serial import Serial
from time import sleep
from octave_rp import OctaveRP, Output, Input, Sensor
DEV = os.getenv('DEV', '/dev/ttyS0')
# Create serial object and use it to create SbSerial connection
s = Serial(DEV)
orp = OctaveRP(s)
# Handles request for platform resource
def string_platform_handler():
return platform()
# Handles request for JSON resource
def json_vmem_handler():
mem = virtual_memory()
return dumps({ 'total': mem.total, 'available': mem.available, 'active': mem.active })
# Create sensor objects with SbSerial connection, callback, type,
# resource path, and optionally unit of measure
sensors = [
Sensor(orp, string_platform_handler, 'string', 'system/platform', 'alphabet'),
Sensor(orp, cpu_percent, 'num', 'system/cpu', 'percent'),
Sensor(orp, json_vmem_handler, 'json', 'system/mem')
]
[sensor.create_sensor() for sensor in sensors]
def print_output(data):
print '*' * 20
print '* %s' % data
print '*' * 20
output = Output(orp, print_output, 'string', 'external_status/lcd1', 'alphabet')
output.create_output()
push_type_input = Input(orp, 'num', 'push_type_input', 'number')
push_type_input.create_input()
push_counter = 0
push_type_input.send(push_counter)
# Run Forever
while True:
try:
sleep(1)
push_type_input.send(push_counter)
push_counter += 1
except KeyboardInterrupt:
exit(0)
simple_hdlc.py
#!/usr/bin/python
# coding: utf8
__version__ = '0.2'
import logging
import struct
import time
from threading import Thread
from PyCRC.CRCCCITT import CRCCCITT
logger = logging.getLogger(__name__)
def calcCRC(data):
crc = CRCCCITT("FFFF").calculate(bytes(data))
b = bytearray(struct.pack(">H", crc))
return b
class Frame(object):
STATE_READ = 0x01
STATE_ESCAPE = 0x02
def __init__(self):
self.finished = False
self.error = False
self.state = self.STATE_READ
self.data = bytearray()
self.crc = bytearray()
self.reader = None
def __len__(self):
return len(self.data)
def addByte(self, b):
if b == 0x7D:
self.state = self.STATE_ESCAPE
elif self.state == self.STATE_ESCAPE:
self.state = self.STATE_READ
b = b ^ 0x20
self.data.append(b)
else:
self.data.append(b)
def finish(self):
self.crc = self.data[-2:]
self.data = self.data[:-2]
self.finished = True
def checkCRC(self):
res = bool(self.crc == calcCRC(self.data))
if not res:
c1 = str(self.crc)
c2 = str(calcCRC(self.data))
logger.warning("invalid crc %s != %s", c1.encode("hex"), c2.encode("hex"))
self.error = True
return res
def toString(self):
return str(self.data)
class HDLC(object):
def __init__(self, serial):
self.serial = serial
self.current_frame = None
self.last_frame = None
self.frame_callback = None
self.error_callback = None
self.running = False
@classmethod
def toBytes(cls, data):
return bytearray(data)
def sendFrame(self, data):
bs = self._encode(self.toBytes(data))
logger.info("Sending Frame: %s", bs.encode("hex"))
res = self.serial.write(bs)
logger.info("Send %s bytes", res)
def _onFrame(self, frame):
self.last_frame = frame
s = self.last_frame.toString()
logger.info("Received Frame: %s", s.encode("hex"))
if self.frame_callback is not None:
self.frame_callback(s)
def _onError(self, frame):
self.last_frame = frame
s = self.last_frame.toString()
logger.warning("Frame Error: %s", s.encode("hex"))
if self.error_callback is not None:
self.error_callback(s)
def _readBytes(self, size):
while size > 0:
b = bytearray(self.serial.read(1))
if b < 1:
return False
res = self._readByte(b[0])
if res:
return True
def _readByte(self, b):
assert 0 <= b <= 255
if b == 0x7E:
# Start or End
if not self.current_frame or len(self.current_frame) < 1:
# Start
self.current_frame = Frame()
else:
# End
self.current_frame.finish()
self.current_frame.checkCRC()
elif self.current_frame is None:
# Ignore before Start
return False
elif not self.current_frame.finished:
self.current_frame.addByte(b)
else:
# Ignore Bytes
pass
# Validate and return
if self.current_frame.finished and not self.current_frame.error:
# Success
self._onFrame(self.current_frame)
self.current_frame = None
return True
elif self.current_frame.finished:
# Error
self._onError(self.current_frame)
self.current_frame = None
return True
return False
def readFrame(self, timeout=5):
timer = time.time() + timeout
while time.time() < timer:
i = self.serial.in_waiting
if i < 1:
time.sleep(0.0001)
continue
res = self._readBytes(i)
if res:
# Validate and return
if not self.last_frame.error:
# Success
s = self.last_frame.toString()
return s
elif self.last_frame.finished:
# Error
raise ValueError("Invalid Frame (CRC FAIL)")
raise RuntimeError("readFrame timeout")
@classmethod
def _encode(cls, bs):
data = bytearray()
data.append(0x7E)
crc = calcCRC(bs)
for byte in bs:
if byte == 0x7E or byte == 0x7D:
data.append(0x7D)
data.append(byte ^ 0x20)
else:
data.append(byte)
data += crc
data.append(0x7E)
return bytes(data)
def _receiveLoop(self):
while self.running:
i = self.serial.in_waiting
if i < 1:
time.sleep(0.001)
continue
res = self._readBytes(i)
def startReader(self, onFrame, onError=None):
if self.running:
raise RuntimeError("reader already running")
self.reader = Thread(target=self._receiveLoop)
self.reader.setDaemon(True)
self.frame_callback = onFrame
self.error_callback = onError
self.running = True
self.reader.start()
def stopReader(self):
self.running = False
self.reader.join()
self.reader = None
Source Code for the Demo Application for Southbound Resource Protocol
The following sub sections provide the source code that you can copy and paste into the respective files to create the Demo Application for Southbound Resource Protocol package. This package can be used to experiment with ORP and is also used in this tutorial.
The files in the package include the following:
- requirements.txt: defines the package's external dependencies
- run.py: the main program
- /sb_serial/__init__.py: defines the package's imports.
- /sb_serial/sb_serial.py: defines the southbound protocol for the serial port.
- /sb_serial/sbr_Protocol_v1.py: provides an implementation of Octave's southbound protocol.
- /sb_serial/simple_hdlc.py: provides an implementation of HDLC that is used by the
OctaveRP
defined in octave_rp.py.
Note
Create a sub directory named "sb_serial" within the package and place the files listed above that are prefixed with "/sb_serial/" into that sub directory.
requirements.txt
psutil==5.4.8
PyCRC==1.21
pyserial==3.4
Note
PyCRC 1.21 must be downloaded from here.
run.py
import os
from json import dumps
import logging
from platform import platform
from psutil import cpu_percent, virtual_memory
from serial import Serial
from time import sleep
from sb_serial import Sensor, SbSerial
#DF: changed
#DEV = os.getenv('DEV', '/dev/ttyS0')
DEV = os.getenv('DEV', '/dev/tty.SLAB_USBtoUART')
# Create serial object and use it to create SbSerial connection
s = Serial(DEV)
sbs = SbSerial(s)
# Handles request for platform resource
def string_platform_handler():
return platform()
# Handles request for JSON resource
def json_vmem_handler():
mem = virtual_memory()
return dumps({ 'total': mem.total, 'available': mem.available, 'active': mem.active })
# Create sensor objects with SbSerial connection, callback, type,
# resource path, and optionally unit of measure
sensors = [
Sensor(sbs, string_platform_handler, 'string', 'system/platform', 'alphabet'),
Sensor(sbs, cpu_percent, 'num', 'system/cpu', 'percent'),
Sensor(sbs, json_vmem_handler, 'json', 'system/mem')
]
[sensor.create_sensor() for sensor in sensors]
# Run Forever
while True:
try:
sleep(1)
except KeyboardInterrupt:
exit(0)
__init__py for SBR
Note
To be placed in a sub directory named "sb_serial".
from sb_serial import Sensor, SbSerial
sb_serial.py
Note
To be placed in a sub directory named "sb_serial".
from sbr_Protocol_v1 import *
from serial import Serial
from simple_hdlc import HDLC
from time import sleep
import logging
logging.basicConfig(format='%(asctime)s %(name)s: %(message)s')
logger = logging.getLogger('SbSerial')
logger.setLevel(logging.DEBUG)
class SbSerial(HDLC):
def __init__(self, serial, retry_timeout=0.5):
HDLC.__init__(self, serial)
self.serial = serial
self.messages = []
self.sensor_handlers = {}
self.output_handlers = {}
self.retry_timeout = retry_timeout
self.startReader(onFrame=self.rcv_frame)
def execute_wait_confirm(self, frame, resp):
self.clear()
self.serial.write("~~~~~~~~~~~~")
confirmed = False
while confirmed == False:
logger.debug("sending frame, awaiting %s" % resp)
self.sendFrame(frame)
sleep(self.retry_timeout)
for m in self.messages:
logger.debug('checking message %s' % m)
if m['responseType'] == resp:
confirmed = True
def push_value(self, path):
value = self.sensor_handlers[path][1]()
cmd = 'push %s %s %s' % (self.sensor_handlers[path][0],
'%s/value' % path,
self.sensor_handlers[path][1]())
logger.debug('preparing to send: %s' % cmd)
frame = encode_request(cmd)
self.sendFrame(frame)
#self.execute_wait_confirm(frame, SBR_PKT_RESP_PUSH)
def create_sensor(self, dataType, path, unit, handler):
if unit:
frame = encode_request('create sensor %s %s %s' % (dataType, path, unit))
else:
frame = encode_request('create sensor %s %s' % (dataType, path))
logger.debug('creating sensor at %s' % path)
self.execute_wait_confirm(frame, SBR_PKT_RESP_SENSOR_CREATE)
self.sensor_handlers[path] = (dataType, handler)
sleep(0.2)
self.push_value(path)
def clear(self):
self.messages = []
def rcv_frame(self, data):
message = decode_response(data)
logger.debug('recv frame: %s' % message)
if message['responseType'] == SBR_PKT_NTFY_SENSOR_CALL:
if message['path'] in self.sensor_handlers.keys():
self.push_value(message['path'])
elif message['responseType'] == SBR_PKT_NTFY_SENSOR_CALL:
logger.debug('new data at %s' % message['path'])
else:
self.messages.append(message)
class Sensor(object):
def __init__(self, sbs, handler, dataType, path, unit=None):
self.dataType = dataType
self.path = path
self.unit = unit
self.sbs = sbs
self.handler = handler
self.declared = False
def create_sensor(self):
self.sbs.create_sensor(self.dataType, self.path, self.unit, self.handler)
self.declared = True
sbr_Protocol_v1.py
Note
To be placed in a sub directory named "sb_serial".
#============================================================================
#
# Filename: sbr_Protocol_v1.py
#
# Purpose: Python test functions to encode and decode packets for the
# Southbound Resource Protocol
#
# Copyright: (c) 2018 Sierra Wireless, Inc.
# All rights reserved
#
#----------------------------------------------------------------------------
#
# VERSION: 0.0.1
#
# NOTES:
#
#
# Packet type field - byte 0
#
SBR_PKT_RQST_INPUT_CREATE = 'I' # type[1] d_type[1] pad[2] path[] units[]
SBR_PKT_RESP_INPUT_CREATE = 'i' # type[1] status[1] pad[2]
SBR_PKT_RQST_OUTPUT_CREATE = 'O' # type[1] d_type[1] pad[2] path[] units[]
SBR_PKT_RESP_OUTPUT_CREATE = 'o' # type[1] status[1] pad[2]
SBR_PKT_RQST_DELETE = 'D' # type[1] pad[1] pad[2] path[]
SBR_PKT_RESP_DELETE = 'd' # type[1] status[1] pad[2]
SBR_PKT_RQST_HANDLER_ADD = 'H' # type[1] pad[1] pad[2] path[]
SBR_PKT_RESP_HANDLER_ADD = 'h' # type[1] status[1] pad[2]
SBR_PKT_RQST_HANDLER_REMOVE = 'K' # type[1] pad[1] pad[2] path[]
SBR_PKT_RESP_HANDLER_REMOVE = 'k' # type[1] status[1] pad[2]
SBR_PKT_RQST_PUSH = 'P' # type[1] d_type[1] pad[2] time[] path[] data[]
SBR_PKT_RESP_PUSH = 'p' # type[1] status[1] pad[2]
SBR_PKT_RQST_GET = 'G' # type[1] pad[1] pad[2] path[]
SBR_PKT_RESP_GET = 'g' # type[1] status[1] pad[2] time[] data[]
SBR_PKT_RQST_EXAMPLE_SET = 'E' # type[1] d_type[1] pad[2] path[] data[]
SBR_PKT_RESP_EXAMPLE_SET = 'e' # type[1] status[1] pad[2]
SBR_PKT_RQST_SENSOR_CREATE = 'S' # type[1] d_type[1] pad[2] path[] units[]
SBR_PKT_RESP_SENSOR_CREATE = 's' # type[1] status[1] pad[2]
SBR_PKT_RQST_SENSOR_REMOVE = 'R' # type[1] pad[1] pad[2] path[]
SBR_PKT_RESP_SENSOR_REMOVE = 'r' # type[1] status[1] pad[2]
SBR_PKT_NTFY_HANDLER_CALL = 'c' # type[1] d_type[1] pad[2] time[] path[] data[]
SBR_PKT_RESP_HANDLER_CALL = 'C' # type[1] status[1] pad[2]
SBR_PKT_NTFY_SENSOR_CALL = 'b' # type[1] pad[1] pad[2] path[]
SBR_PKT_RESP_SENSOR_CALL = 'B' # type[1] status[1] pad[2]
SBR_PKT_RESP_UNKNOWN_RQST = '?' # type[1] status[1] pad[2]
#
# Data type field - byte 1
#
SBR_DATA_TYPE_TRIGGER = 'T' # trigger - no data
SBR_DATA_TYPE_BOOLEAN = 'B' # Boolean - 1 byte: 't' | 'f'
SBR_DATA_TYPE_NUMERIC = 'N' # numeric - null-terminated ASCII string, representing double
SBR_DATA_TYPE_STRING = 'S' # string - null-terminated ASCII string
SBR_DATA_TYPE_JSON = 'J' # JSON - null-terminated ASCII string, representing JSON
SBR_DATA_TYPE_UNDEF = ' ' # not specified
#
# Variable length field identifiers
#
SBR_FIELD_ID_PATH = 'P'
SBR_FIELD_ID_TIME = 'T'
SBR_FIELD_ID_UNITS = 'U'
SBR_FIELD_ID_DATA = 'D'
# Variable length field separator
SBR_VARLENGTH_SEPARATOR = ','
#
# Packet type descriptions
#
ptypes = [
[ SBR_PKT_RQST_INPUT_CREATE, 'request create input' ],
[ SBR_PKT_RESP_INPUT_CREATE, 'response create input' ],
[ SBR_PKT_RQST_OUTPUT_CREATE, 'request create output' ],
[ SBR_PKT_RESP_OUTPUT_CREATE, 'response create output' ],
[ SBR_PKT_RQST_DELETE, 'request delete resource' ],
[ SBR_PKT_RESP_DELETE, 'response delete resource' ],
[ SBR_PKT_RQST_HANDLER_ADD, 'request add handler' ],
[ SBR_PKT_RESP_HANDLER_ADD, 'response add handler' ],
[ SBR_PKT_RQST_HANDLER_REMOVE, 'request remove handler' ],
[ SBR_PKT_RESP_HANDLER_REMOVE, 'response remove handler' ],
[ SBR_PKT_RQST_PUSH, 'request push' ],
[ SBR_PKT_RESP_PUSH, 'response push' ],
[ SBR_PKT_RQST_GET, 'request get' ],
[ SBR_PKT_RESP_GET, 'response get' ],
[ SBR_PKT_RQST_EXAMPLE_SET, 'request set example' ],
[ SBR_PKT_RESP_EXAMPLE_SET, 'response set example' ],
[ SBR_PKT_RQST_SENSOR_CREATE, 'request create sensor' ],
[ SBR_PKT_RESP_SENSOR_CREATE, 'response create sensor' ],
[ SBR_PKT_RQST_SENSOR_REMOVE, 'request remove sensor' ],
[ SBR_PKT_RESP_SENSOR_REMOVE, 'response remove sensor' ],
[ SBR_PKT_NTFY_HANDLER_CALL, 'handler call' ],
[ SBR_PKT_RESP_HANDLER_CALL, 'handler ack' ],
[ SBR_PKT_NTFY_SENSOR_CALL, 'sensor poll' ],
[ SBR_PKT_RESP_SENSOR_CALL, 'sensor poll ack' ],
[ SBR_PKT_RESP_UNKNOWN_RQST, 'unknown packet type' ],
]
#
# Status field
#
status_list = [
'OK',
'NOT FOUND',
'NOT POSSIBLE', # deprecated
'OUT OF RANGE',
'NO MEMORY',
'NOT PERMITTED',
'FAULT',
'COMM ERROR',
'TIMEOUT',
'OVERFLOW',
'UNDERFLOW',
'WOULD BLOCK',
'DEADLOCK',
'FORMAT ERROR',
'DUPLICATE',
'BAD PARAMETER',
'CLOSED',
'BUSY',
'UNSUPPORTED',
'IO_ERROR',
'NOT IMPLEMENTED',
'UNAVAILABLE',
'TERMINATED'
]
#
# Data types
#
data_types = [
[ 'trig', SBR_DATA_TYPE_TRIGGER ],
[ 'bool', SBR_DATA_TYPE_BOOLEAN ],
[ 'num', SBR_DATA_TYPE_NUMERIC ],
[ 'str', SBR_DATA_TYPE_STRING ],
[ 'json', SBR_DATA_TYPE_JSON ]
]
#
# Syntax
#
syntax_list = [
' create input|output|sensor trig|bool|num|str|json <path> [<units>]',
' delete resource|handler|sensor <path>',
' add handler <path>',
' push trig|bool|num|str|json <path> [<data>]',
' get <path>',
' example json <path> [<data>]'
]
#
# Usage
#
def print_usage():
print 'Usage:'
for i in range(len(syntax_list)):
print syntax_list[i]
print
#
# Encode data type
#
def encode_dtype(data_type):
field = ''
dtype = data_type.lower()
if dtype[0] == 't':
field = field + SBR_DATA_TYPE_TRIGGER
elif dtype[0] == 'b':
field = field + SBR_DATA_TYPE_BOOLEAN
elif dtype[0] == 'n':
field = field + SBR_DATA_TYPE_NUMERIC
elif dtype[0] == 's':
field = field + SBR_DATA_TYPE_STRING
elif dtype[0] == 'j':
field = field + SBR_DATA_TYPE_JSON
else:
print 'Invalid data type'
return
return field
#
# Encode segment number and segment count
#
def encode_segment():
return '01'
#
# Encode path
#
def encode_path(path):
return SBR_FIELD_ID_PATH + path
#
# Encode units
#
def encode_units(units):
return SBR_FIELD_ID_UNITS + units
#
# Encode data
#
def encode_data(data):
return SBR_FIELD_ID_DATA + data
#
# create input|output|sensor data-type path [units]
#
def encode_create(argc, args):
packet = ''
dtype = ''
syntax = syntax_list[0]
if argc < 3 :
print 'Invalid number of arguments'
print syntax_list[0]
return
if argc > 3 :
what,data_type,path,units = args.split(' ')
else:
what,data_type,path = args.split(' ')
units = None
what = what.lower()
if what[0] == 'i':
packet = packet + SBR_PKT_RQST_INPUT_CREATE
elif what[0] == 'o':
packet = packet + SBR_PKT_RQST_OUTPUT_CREATE
elif what[0] == 's':
packet = packet + SBR_PKT_RQST_SENSOR_CREATE
else:
print 'Invalid request'
print syntax_list[0]
return
dtype = encode_dtype(data_type)
if dtype == '':
return
packet = packet + dtype
packet = packet + encode_segment()
packet = packet + encode_path(path)
if units != None:
packet = packet + SBR_VARLENGTH_SEPARATOR
packet = packet + encode_units(units)
return packet
#
# delete resource|handler|sensor path
#
def encode_delete(argc, args):
packet = ''
if argc < 2 :
print 'Invalid number of arguments'
print syntax_list[1]
return
what,path = args.split(' ')
what = what.lower()
# packet type
if what[0] == 'r':
packet = packet + SBR_PKT_RQST_DELETE
elif what[0] == 'h':
packet = packet + SBR_PKT_RQST_HANDLER_REMOVE
elif what[0] == 's':
packet = packet + SBR_PKT_RQST_SENSOR_REMOVE
else:
print 'Invalid request'
print syntax_list[1]
return
packet = packet + '.'
packet = packet + encode_segment()
packet = packet + encode_path(path)
return packet
#
# add handler path
#
def encode_add(argc, args):
packet = ''
if argc < 2 :
print 'Invalid number of arguments'
print syntax_list[2]
return
what,path = args.split(' ')
what = what.lower()
if what[0] != 'h':
print 'Invalid request ' + what
print syntax_list[2]
return
packet = packet + SBR_PKT_RQST_HANDLER_ADD
# data type - ignored
packet = packet + '.'
packet = packet + encode_segment()
packet = packet + encode_path(path)
return packet
#
# push data-type path [data]
#
def encode_push(argc, args):
packet = ''
dtype = ''
if argc < 2 :
print 'Invalid number of arguments'
print syntax_list[3]
return
print 'argc: ' + str(argc)
if argc > 2 :
data_type,path,data = args.split(' ', 2)
else:
data_type,path = args.split(' ')
data = ''
packet = packet + SBR_PKT_RQST_PUSH
dtype = encode_dtype(data_type)
if dtype == '':
return
packet = packet + dtype
packet = packet + encode_segment()
packet = packet + encode_path(path)
if data != '':
packet = packet + SBR_VARLENGTH_SEPARATOR
packet = packet + encode_data(data)
return packet
#
# get path
#
def encode_get(argc, args):
packet = ''
dtype = ''
if argc < 1 :
print 'Invalid number of arguments'
print syntax_list[4]
return
packet = packet + SBR_PKT_RQST_GET
# data type ignored
packet = packet + '.'
packet = packet + encode_segment()
packet = packet + encode_path(args)
return packet
#
# example data-type path [data]
#
def encode_example(argc, args):
packet = ''
dtype = ''
if argc < 2 :
print 'Invalid number of arguments'
print syntax_list[5]
return
print 'argc: ' + str(argc)
if argc > 2 :
data_type,path,data = args.split(' ', 2)
else:
data_type,path = args.split(' ')
data = ''
packet = packet + SBR_PKT_RQST_EXAMPLE_SET
dtype = encode_dtype(data_type)
if dtype == '':
return
packet = packet + dtype
packet = packet + encode_segment()
packet = packet + encode_path(path)
if data != '':
packet = packet + SBR_VARLENGTH_SEPARATOR
packet = packet + encode_data(data)
return packet
#
# Parse command and build a request packet
#
def encode_request(request):
if request.find(' ') < 0 :
print_usage()
return
argc = len(request.split(' ')) - 1
# all commands take at least one argument
if argc < 1 :
print_usage()
return
request_type,args = request.split(' ', 1)
request_type = request_type.lower()
if request_type[0] == 'c':
p = encode_create(argc, args)
elif request_type[0] == 'd':
p = encode_delete(argc, args)
elif request_type[0] == 'a':
p = encode_add(argc, args)
elif request_type[0] == 'p':
p = encode_push(argc, args)
elif request_type[0] == 'g':
p = encode_get(argc, args)
elif request_type[0] == 'e':
p = encode_example(argc, args)
elif request_type[0] == 'r':
p = args
else:
print_usage()
return
return p
#
# Decode and print contents of an incoming packet
#
def decode_response(response):
resp = {}
#print('Received : ' + response)
# Positional fields:
ptype = response[0]
status = response[1]
seg_number = response[2]
seg_count = response[3]
# Labeled, variable length fields
var_length = response[4:]
for i in range(len(ptypes)):
test = ptypes[i]
if test[0] == ptype:
#print 'Message type : ' + test[1]
resp['responseType'] = test[0]
# Status is represented in ASCII, starting with '@' (0x40) for OK.
# Subtract 0x40 to index into the table, above
#
i = ord(status[0]) - ord('\x40')
#print 'Status : ' + status_list[i]
resp['status'] = status_list[i]
if len(var_length):
var_fields = var_length.split(',')
for i in range(len(var_fields)):
field = var_fields[i]
if field[0] == 'P':
resp['path'] = field[1:]
#print 'Path : ' + field[1:]
if field[0] == 'T':
resp['timestamp'] = field[1:]
#print 'Timestamp : ' + field[1:]
if field[0] == 'D':
resp['data'] = field[1:]
#print 'Data : ' + field[1:]
return resp
simple_hdlc.py for SBR
Note
To be placed in a sub directory named "sb_serial".
#!/usr/bin/python
# coding: utf8
__version__ = '0.2'
import logging
import struct
import time
from threading import Thread
from PyCRC.CRCCCITT import CRCCCITT
logger = logging.getLogger(__name__)
def calcCRC(data):
crc = CRCCCITT("FFFF").calculate(bytes(data))
b = bytearray(struct.pack(">H", crc))
return b
class Frame(object):
STATE_READ = 0x01
STATE_ESCAPE = 0x02
def __init__(self):
self.finished = False
self.error = False
self.state = self.STATE_READ
self.data = bytearray()
self.crc = bytearray()
self.reader = None
def __len__(self):
return len(self.data)
def addByte(self, b):
if b == 0x7D:
self.state = self.STATE_ESCAPE
elif self.state == self.STATE_ESCAPE:
self.state = self.STATE_READ
b = b ^ 0x20
self.data.append(b)
else:
self.data.append(b)
def finish(self):
self.crc = self.data[-2:]
self.data = self.data[:-2]
self.finished = True
def checkCRC(self):
res = bool(self.crc == calcCRC(self.data))
if not res:
c1 = str(self.crc)
c2 = str(calcCRC(self.data))
logger.warning("invalid crc %s != %s", c1.encode("hex"), c2.encode("hex"))
self.error = True
return res
def toString(self):
return str(self.data)
class HDLC(object):
def __init__(self, serial):
self.serial = serial
self.current_frame = None
self.last_frame = None
self.frame_callback = None
self.error_callback = None
self.running = False
@classmethod
def toBytes(cls, data):
return bytearray(data)
def sendFrame(self, data):
bs = self._encode(self.toBytes(data))
logger.info("Sending Frame: %s", bs.encode("hex"))
res = self.serial.write(bs)
logger.info("Send %s bytes", res)
def _onFrame(self, frame):
self.last_frame = frame
s = self.last_frame.toString()
logger.info("Received Frame: %s", s.encode("hex"))
if self.frame_callback is not None:
self.frame_callback(s)
def _onError(self, frame):
self.last_frame = frame
s = self.last_frame.toString()
logger.warning("Frame Error: %s", s.encode("hex"))
if self.error_callback is not None:
self.error_callback(s)
def _readBytes(self, size):
while size > 0:
b = bytearray(self.serial.read(1))
if b < 1:
return False
res = self._readByte(b[0])
if res:
return True
def _readByte(self, b):
assert 0 <= b <= 255
if b == 0x7E:
# Start or End
if not self.current_frame or len(self.current_frame) < 1:
# Start
self.current_frame = Frame()
else:
# End
self.current_frame.finish()
self.current_frame.checkCRC()
elif self.current_frame is None:
# Ignore before Start
return False
elif not self.current_frame.finished:
self.current_frame.addByte(b)
else:
# Ignore Bytes
pass
# Validate and return
if self.current_frame.finished and not self.current_frame.error:
# Success
self._onFrame(self.current_frame)
self.current_frame = None
return True
elif self.current_frame.finished:
# Error
self._onError(self.current_frame)
self.current_frame = None
return True
return False
def readFrame(self, timeout=5):
timer = time.time() + timeout
while time.time() < timer:
i = self.serial.in_waiting
if i < 1:
time.sleep(0.0001)
continue
res = self._readBytes(i)
if res:
# Validate and return
if not self.last_frame.error:
# Success
s = self.last_frame.toString()
return s
elif self.last_frame.finished:
# Error
raise ValueError("Invalid Frame (CRC FAIL)")
raise RuntimeError("readFrame timeout")
@classmethod
def _encode(cls, bs):
data = bytearray()
data.append(0x7E)
crc = calcCRC(bs)
bs += crc
for byte in bs:
if byte == 0x7E or byte == 0x7D:
data.append(0x7D)
data.append(byte ^ 0x20)
else:
data.append(byte)
data.append(0x7E)
return bytes(data)
def _receiveLoop(self):
while self.running:
i = self.serial.in_waiting
if i < 1:
time.sleep(0.001)
continue
res = self._readBytes(i)
def startReader(self, onFrame, onError=None):
if self.running:
raise RuntimeError("reader already running")
self.reader = Thread(target=self._receiveLoop)
self.reader.setDaemon(True)
self.frame_callback = onFrame
self.error_callback = onError
self.running = True
self.reader.start()
def stopReader(self):
self.running = False
self.reader.join()
self.reader = None
Updated over 4 years ago