Octave Resource Protocol Software Tools

This page provides information about software tools built on top of the Octave Resource Protocol:

Octave ORP Evaluation Tool

We have provided source code, for an Octave ORP Evaluation Tool Python package. This package contains a high-level, sample Python client that you can copy and use in your edge device applications to interact with Octave. The client is built upon the package's implementation of the Octave Resource Protocol. The package also contains a sample Python application.

The Octave ORP Evaluation Tool package includes a Python API defined in octave_rp.py. The API consists of the classes and methods described in the following subsections.

You can view an example application that was built using this API in the package's sample.py file.

For more information see this tutorial.

OctaveRP

OctaveRP is the package's main class that is built upon HDLC for framing. This class uses the package's implementation of the Octave Resource Protocol to perform Octave communications.

Once instantiated, the Octave_RP instance is used by the API's Sensor, Input, and Output classes to create and manage the respective resources.

Methods

Constructor

Constructs and initializes a new OctaveRP object.

Parameters

NameTypeRequiredDescription
selfOctave_RPNoThe class instance.
serialSerialYesA Serial object over which to communicate.
retry_timeoutFloatNoThe timeout period, in seconds, after which to retry communications.

create_input

Creates an Input object representing an Octave Input Resource on the Datahub.

Parameters

NameTypeRequiredDescription
selfOctave_RPNoThe class instance.
dataTypeStringYesThe type of data to tracked by the Input. Must be set to one of the following values: trig (trigger), bool, num, str, or json.
pathStringYesThe resource's path.
unitStringYesThe type of units that the value represents (e.g. degrees).

create_sensor

Creates a Sensor object representing an Octave Sensor Resource on the Datahub.

Parameters

NameTypeRequiredDescription
selfOctave_RPNoThe class instance.
dataTypeStringYesThe type of data to tracked by the Output. Must be set to one of the following values: trig (trigger), bool, num, str, or json.
pathStringYesThe resource's path.
unitStringYesThe type of units that the value represents (e.g. degrees).
handlerCallbackYesThe method to invoke when a value is output from the device.

Sensor

Represents an Octave Sensor resource.

Methods

Constructor

Constructs and initializes a new Sensor object.

Parameters

NameTypeRequiredDescription
selfSensorNoThe class instance.
orpOctave_RPYesA reference to an OctaveRP instance.
handlerCallbackYesThe method to invoke when the value is sensed on the device.
dataTypeStringYesThe type of data to tracked by the Sensor. Must be set to one of the following values: trig (trigger), bool, num, str, or json.
pathStringYesThe resource's path.
unitStringNoThe type of units that the value represents (e.g. degrees).

create_sensor

Creates a new Sensor object using the OctaveRP reference passed in the constructor to perform the necessary Octave commands.

Parameters

NameTypeRequiredDescription
selfSensorNoThe class instance.

Output

Represents an Octave Output resource.

Methods

Constructor

Constructs and initializes a new Output object.

Parameters

NameTypeRequiredDescription
selfSensorNoThe class instance.
orpOctave_RPYesA reference to an OctaveRP instance.
handlerCallbackYesThe method to invoke when the value is output on the device.
dataTypeStringYesThe type of data to tracked by the Output. Must be set to one of the following values: trig (trigger), bool, num, str, or json.
pathStringYesThe resource's path.
unitStringNoThe type of units that the value represents (e.g. degrees).

create_output

Creates a new Output object using the OctaveRP reference passed in the constructor to perform the necessary Octave commands.

Parameters

NameTypeRequiredDescription
selfSensorNoThe class instance.

Input

Represents an Octave Input resource.

Methods

Constructor

Constructs and initializes a new Input object.

NameTypeRequiredDescription
selfSensorNoThe class instance.
orpOctave_RPYesA reference to an OctaveRP instance.
dataTypeStringYesThe type of data to tracked by the Input. Must be set to one of the following values: trig (trigger), bool, num, str, or json.
pathStringYesThe resource's path.
unitStringNoThe type of units that the value represents (e.g. degrees).

Parameters

create_input

Creates a new Input object using the OctaveRP reference passed in the constructor to perform the necessary Octave commands.

Parameters

NameTypeRequiredDescription
selfSensorNoThe class instance.

send

Sends an input value.

Parameters

NameTypeRequiredDescription
selfSensorNoThe class instance.
valueSensorYesThe input value to send.

Demo Application for Southbound Resource Protocol

We have provided source code, for a Demo Application for Southbound Resource Protocol Python package. This package contains a high-level, sample Python client that you can copy and use to experiment with communicating between a development machine and an Octave-enabled device over a USB-to-UART bridge.

The client is built upon the package's implementation of Octave's Southbound Resource Protocol. The package also contains a sample Python application.

You can view an example application that was built using this API in the package's run.py file.

For more information see the ORP Communications via USB-to-UART bridge tutorial on connecting a development machine to a mangOH Red over a USB-to-UART bridge.

Source Code for the Octave ORP Evaluation Tool Package

The following sub sections provide the source code that you can copy and paste into the respective files to create the Octave ORP Evaluation Tool package. This package can be used to experiment with ORP and is also used in this tutorial .

The files in the package include the following:

  • __init__.py: defines the package's imports.
  • octave_rp.py: defines the OctaveRP class which exposes an implementation of a high-level API for working with ORP.
  • protocol.py: provides an implementation of ORP.
  • sample.py: provides a sample Python application built using the OctaveRP defined in octave_rp.py.
  • simple_hdlc.py: provides an implementation of HDLC that is used by the OctaveRP defined in octave_rp.py.

__init__.py

from octave_rp import Sensor, OctaveRP, Output, Input

octave_rp.py

from protocol import *
from serial import Serial
from simple_hdlc import HDLC
from time import sleep
import logging

logging.basicConfig(format='%(asctime)s %(name)s: %(message)s')
logger = logging.getLogger('OctaveRP')
logger.setLevel(logging.DEBUG)

class OctaveRP(HDLC):

    def __init__(self, serial, retry_timeout=0.5):
        HDLC.__init__(self, serial)
        self.serial = serial
        self.messages = []
        self.sensor_handlers = {}
        self.output_handlers = {}
        self.retry_timeout = retry_timeout
        self.startReader(onFrame=self.rcv_frame)

    def _wakeup(self):
        self.serial.write("~")
        sleep(0.1)
        self.serial.write("~")
        sleep(0.1)
        self.serial.write("~")

    def execute_wait_confirm(self, frame, resp):
        self.clear()
        self._wakeup()
        confirmed = False
        while confirmed == False:
            logger.debug("sending frame, awaiting %s" % resp)
            self.sendFrame(frame)
            sleep(self.retry_timeout)
            for m in self.messages:
                logger.debug('checking message %s' % m)
                if m['responseType'] == resp:
                    confirmed = True

    def push_value(self, path, datatype=None, value=None):
        if value is None:
            value = self.sensor_handlers[path][1]()
            datatype = self.sensor_handlers[path][0]
        cmd = 'push %s %s %s' % (datatype, '%s/value' % path, value)
        logger.debug('preparing to send: %s' % cmd)
        frame = encode_request(cmd)
        self._wakeup()
        self.sendFrame(frame)

    def create_input(self, dataType, path, unit):
        if unit:
            frame = encode_request('create input %s %s/value %s' % (dataType, path, unit))
        else:
            frame = encode_request('create input %s %s' % (dataType, path))
        logger.debug('creating input at %s' % path)
        self.execute_wait_confirm(frame, SBR_PKT_RESP_INPUT_CREATE)

    def create_sensor(self, dataType, path, unit, handler):
        if unit:
            frame = encode_request('create sensor %s %s %s' % (dataType, path, unit))
        else:
            frame = encode_request('create sensor %s %s' % (dataType, path))
        logger.debug('creating sensor at %s' % path)
        self.execute_wait_confirm(frame, SBR_PKT_RESP_SENSOR_CREATE)
        self.sensor_handlers[path] = (dataType, handler)
        self.push_value(path)

    def create_output(self, dataType, path, unit, handler):
        self.output_handlers[path] = handler

        if unit:
            frame = encode_request('create output %s %s %s' % (dataType, path, unit))
        else:
            frame = encode_request('create output %s %s' % (dataType, path))

        # create output
        logger.debug('creating output at %s' % path)
        self.execute_wait_confirm(frame, SBR_PKT_RESP_OUTPUT_CREATE)

        # register handler
        frame = encode_request('add handler %s' % path)
        self.execute_wait_confirm(frame, SBR_PKT_RESP_HANDLER_ADD)

    def clear(self):
        self.messages = []

    def rcv_frame(self, data):
        message = decode_response(data)
        logger.debug('recv frame: %s' % message)
        if message['responseType'] == SBR_PKT_NTFY_SENSOR_CALL:
            if message['path'] in self.sensor_handlers.keys():
                self.push_value(message['path'])
        elif message['responseType'] == SBR_PKT_NTFY_HANDLER_CALL:
            logger.debug('new data at %s' % message['path'])
            if message['path'] in self.output_handlers.keys():
                self.output_handlers[message['path']](message['data'])
        else:
            self.messages.append(message)

class Sensor(object):

    def __init__(self, orp, handler, datatype, path, unit=None):
        self.datatype = datatype
        self.path = path
        self.unit = unit
        self.orp = orp
        self.handler = handler
        self.declared = False

    def create_sensor(self):
        self.orp.create_sensor(self.datatype, self.path, self.unit, self.handler)
        self.declared = True

class Output(object):

    def __init__(self, orp, handler, datatype, path, unit=None):
        self.datatype = datatype
        self.path = path
        self.unit = unit
        self.orp = orp
        self.handler = handler
        self.declared = False

    def create_output(self):
        self.orp.create_output(self.datatype, self.path, self.unit, self.handler)
        self.declared = True

class Input(object):

    def __init__(self, orp, datatype, path, unit=None):
        self.datatype = datatype
        self.path = path
        self.unit = unit
        self.orp = orp
        self.declared = False

    def create_input(self):
        self.orp.create_input(self.datatype, self.path, self.unit)

    def send(self, value):
        self.orp.push_value(self.path, self.datatype, value)

protocol.py

#============================================================================
#
# Filename:  sbr_Protocol_v1.py
#
# Purpose:   Python test functions to encode and decode packets for the
#            Southbound Resource Protocol
#
# Copyright: (c) 2018 Sierra Wireless, Inc.
#            All rights reserved
#
#----------------------------------------------------------------------------
#
# VERSION: 0.0.1
#
# NOTES:
#

#
# Packet type field - byte 0
#
SBR_PKT_RQST_INPUT_CREATE   = 'I'   # type[1] d_type[1] pad[2] path[] units[]
SBR_PKT_RESP_INPUT_CREATE   = 'i'   # type[1] status[1] pad[2]

SBR_PKT_RQST_OUTPUT_CREATE  = 'O'   # type[1] d_type[1] pad[2] path[] units[]
SBR_PKT_RESP_OUTPUT_CREATE  = 'o'   # type[1] status[1] pad[2]

SBR_PKT_RQST_DELETE         = 'D'   # type[1] pad[1]    pad[2] path[]
SBR_PKT_RESP_DELETE         = 'd'   # type[1] status[1] pad[2]

SBR_PKT_RQST_HANDLER_ADD    = 'H'   # type[1] pad[1]    pad[2] path[] 
SBR_PKT_RESP_HANDLER_ADD    = 'h'   # type[1] status[1] pad[2]

SBR_PKT_RQST_HANDLER_REMOVE = 'K'   # type[1] pad[1]    pad[2] path[] 
SBR_PKT_RESP_HANDLER_REMOVE = 'k'   # type[1] status[1] pad[2]

SBR_PKT_RQST_PUSH           = 'P'   # type[1] d_type[1] pad[2] time[] path[] data[]
SBR_PKT_RESP_PUSH           = 'p'   # type[1] status[1] pad[2]

SBR_PKT_RQST_GET            = 'G'   # type[1] pad[1]    pad[2] path[]
SBR_PKT_RESP_GET            = 'g'   # type[1] status[1] pad[2] time[] data[]

SBR_PKT_RQST_EXAMPLE_SET    = 'E'   # type[1] d_type[1] pad[2] path[] data[]
SBR_PKT_RESP_EXAMPLE_SET    = 'e'   # type[1] status[1] pad[2]

SBR_PKT_RQST_SENSOR_CREATE  = 'S'   # type[1] d_type[1] pad[2] path[] units[]
SBR_PKT_RESP_SENSOR_CREATE  = 's'   # type[1] status[1] pad[2]

SBR_PKT_RQST_SENSOR_REMOVE  = 'R'   # type[1] pad[1]    pad[2] path[]
SBR_PKT_RESP_SENSOR_REMOVE  = 'r'   # type[1] status[1] pad[2]

SBR_PKT_NTFY_HANDLER_CALL   = 'c'   # type[1] d_type[1] pad[2] time[] path[] data[]
SBR_PKT_RESP_HANDLER_CALL   = 'C'   # type[1] status[1] pad[2]

SBR_PKT_NTFY_SENSOR_CALL    = 'b'   # type[1] pad[1]    pad[2] path[]
SBR_PKT_RESP_SENSOR_CALL    = 'B'   # type[1] status[1] pad[2]

SBR_PKT_RESP_UNKNOWN_RQST   = '?'   # type[1] status[1] pad[2]


#
# Data type field - byte 1
#
SBR_DATA_TYPE_TRIGGER       = 'T'   # trigger - no data
SBR_DATA_TYPE_BOOLEAN       = 'B'   # Boolean - 1 byte:  't' | 'f'
SBR_DATA_TYPE_NUMERIC       = 'N'   # numeric - null-terminated ASCII string, representing double
SBR_DATA_TYPE_STRING        = 'S'   # string  - null-terminated ASCII string
SBR_DATA_TYPE_JSON          = 'J'   # JSON    - null-terminated ASCII string, representing JSON
SBR_DATA_TYPE_UNDEF         = ' '   # not specified


#
# Variable length field identifiers
#
SBR_FIELD_ID_PATH           = 'P'
SBR_FIELD_ID_TIME           = 'T'
SBR_FIELD_ID_UNITS          = 'U'
SBR_FIELD_ID_DATA           = 'D'

# Variable length field separator
SBR_VARLENGTH_SEPARATOR     = ','



#
# Packet type descriptions
#
ptypes = [
    [ SBR_PKT_RQST_INPUT_CREATE,   'request create input'     ],
    [ SBR_PKT_RESP_INPUT_CREATE,   'response create input'    ],

    [ SBR_PKT_RQST_OUTPUT_CREATE,  'request create output'    ],
    [ SBR_PKT_RESP_OUTPUT_CREATE,  'response create output'   ],

    [ SBR_PKT_RQST_DELETE,         'request delete resource'  ],
    [ SBR_PKT_RESP_DELETE,         'response delete resource' ],

    [ SBR_PKT_RQST_HANDLER_ADD,    'request add handler'      ],
    [ SBR_PKT_RESP_HANDLER_ADD,    'response add handler'     ],

    [ SBR_PKT_RQST_HANDLER_REMOVE, 'request remove handler'   ],
    [ SBR_PKT_RESP_HANDLER_REMOVE, 'response remove handler'  ],

    [ SBR_PKT_RQST_PUSH,           'request push'             ],
    [ SBR_PKT_RESP_PUSH,           'response push'            ],

    [ SBR_PKT_RQST_GET,            'request get'              ],
    [ SBR_PKT_RESP_GET,            'response get'             ],

    [ SBR_PKT_RQST_EXAMPLE_SET,    'request set example'      ],
    [ SBR_PKT_RESP_EXAMPLE_SET,    'response set example'     ],

    [ SBR_PKT_RQST_SENSOR_CREATE,  'request create sensor'    ],
    [ SBR_PKT_RESP_SENSOR_CREATE,  'response create sensor'   ],

    [ SBR_PKT_RQST_SENSOR_REMOVE,  'request remove sensor'    ],
    [ SBR_PKT_RESP_SENSOR_REMOVE,  'response remove sensor'   ],

    [ SBR_PKT_NTFY_HANDLER_CALL,   'handler call'             ],
    [ SBR_PKT_RESP_HANDLER_CALL,   'handler ack'              ],

    [ SBR_PKT_NTFY_SENSOR_CALL,    'sensor poll'              ],
    [ SBR_PKT_RESP_SENSOR_CALL,    'sensor poll ack'          ],

    [ SBR_PKT_RESP_UNKNOWN_RQST,   'unknown packet type'      ],
]


#
# Status field
#
status_list = [
    'OK',
    'NOT FOUND',
    'NOT POSSIBLE',   # deprecated
    'OUT OF RANGE',
    'NO MEMORY',
    'NOT PERMITTED',
    'FAULT',
    'COMM ERROR',
    'TIMEOUT',
    'OVERFLOW',
    'UNDERFLOW',
    'WOULD BLOCK',
    'DEADLOCK',
    'FORMAT ERROR',
    'DUPLICATE',
    'BAD PARAMETER',
    'CLOSED',
    'BUSY',
    'UNSUPPORTED',
    'IO_ERROR',
    'NOT IMPLEMENTED',
    'UNAVAILABLE',
    'TERMINATED'
]


#
# Data types
#
data_types = [
    [ 'trig',   SBR_DATA_TYPE_TRIGGER ],
    [ 'bool',   SBR_DATA_TYPE_BOOLEAN ],
    [ 'num',    SBR_DATA_TYPE_NUMERIC ],
    [ 'str',    SBR_DATA_TYPE_STRING  ],
    [ 'json',   SBR_DATA_TYPE_JSON    ]
]


#
# Syntax
#
syntax_list = [
    '  create input|output|sensor  trig|bool|num|str|json <path> [<units>]',
    '  delete resource|handler|sensor <path>',
    '  add handler <path>',
    '  push trig|bool|num|str|json <path> [<data>]',
    '  get <path>',
    '  example json <path> [<data>]'
]


#
# Usage
#
def print_usage():

    print 'Usage:'
    for i in range(len(syntax_list)):
        print syntax_list[i]
    print


#
# Encode data type
#
def encode_dtype(data_type):

    field = ''
    dtype = data_type.lower()

    if   dtype[0] == 't':
        field = field + SBR_DATA_TYPE_TRIGGER

    elif dtype[0] == 'b':
        field = field + SBR_DATA_TYPE_BOOLEAN

    elif dtype[0] == 'n':
        field = field + SBR_DATA_TYPE_NUMERIC

    elif dtype[0] == 's':
        field = field + SBR_DATA_TYPE_STRING

    elif dtype[0] == 'j':
        field = field + SBR_DATA_TYPE_JSON

    else:
        print 'Invalid data type'
        return

    return field

#
# Encode segment number and segment count
#
def encode_segment():

    return '01'


#
# Encode path
#
def encode_path(path):

    return SBR_FIELD_ID_PATH + path


#
# Encode units
#
def encode_units(units):

    return SBR_FIELD_ID_UNITS + units


#
# Encode data
#
def encode_data(data):

    return SBR_FIELD_ID_DATA + data


#
# create input|output|sensor data-type path [units]
#
def encode_create(argc, args):

    packet = ''
    dtype = ''
    syntax = syntax_list[0]


    if argc < 3 :
        print 'Invalid number of arguments'
        print syntax_list[0]
        return

    if argc > 3 :
        what,data_type,path,units = args.split(' ')

    else:
        what,data_type,path = args.split(' ')
        units = None

    what = what.lower()

    if what[0] == 'i':
        packet = packet + SBR_PKT_RQST_INPUT_CREATE
    elif what[0] == 'o':
        packet = packet + SBR_PKT_RQST_OUTPUT_CREATE
    elif what[0] == 's':
        packet = packet + SBR_PKT_RQST_SENSOR_CREATE
    else:
        print 'Invalid request'
        print syntax_list[0]
        return

    dtype = encode_dtype(data_type)
    if dtype == '': 
        return
    packet = packet + dtype

    packet = packet + encode_segment()
    packet = packet + encode_path(path)
    if units != None:
        packet = packet + SBR_VARLENGTH_SEPARATOR
        packet = packet + encode_units(units)

    return packet


#
# delete resource|handler|sensor path
#
def encode_delete(argc, args):

    packet = ''

    if argc < 2 :
        print 'Invalid number of arguments'
        print syntax_list[1]
        return

    what,path = args.split(' ')
    what = what.lower()

    # packet type
    if what[0] == 'r':
        packet = packet + SBR_PKT_RQST_DELETE
    elif what[0] == 'h':
        packet = packet + SBR_PKT_RQST_HANDLER_REMOVE
    elif what[0] == 's':
        packet = packet + SBR_PKT_RQST_SENSOR_REMOVE
    else:
        print 'Invalid request'
        print syntax_list[1]
        return

    packet = packet + '.'
    packet = packet + encode_segment()
    packet = packet + encode_path(path)

    return packet


#
# add handler path
#
def encode_add(argc, args):

    packet = ''


    if argc < 2 :
        print 'Invalid number of arguments'
        print syntax_list[2]
        return

    what,path = args.split(' ')
    what = what.lower()

    if what[0] != 'h':
        print 'Invalid request ' + what
        print syntax_list[2]
        return

    packet = packet + SBR_PKT_RQST_HANDLER_ADD
    # data type - ignored
    packet = packet + '.'
    packet = packet + encode_segment()
    packet = packet + encode_path(path)

    return packet


#
# push data-type path [data]
#
def encode_push(argc, args):

    packet = ''
    dtype = ''

    if argc < 2 :
        print 'Invalid number of arguments'
        print syntax_list[3]
        return

    print 'argc: ' + str(argc)
    if argc > 2 :
        data_type,path,data = args.split(' ', 2)

    else:
        data_type,path = args.split(' ')
        data = ''

    packet = packet + SBR_PKT_RQST_PUSH

    dtype = encode_dtype(data_type)
    if dtype == '': 
        return
    packet = packet + dtype

    packet = packet + encode_segment()
    packet = packet + encode_path(path)
    if data != '':
        packet = packet + SBR_VARLENGTH_SEPARATOR
        packet = packet + encode_data(data)

    return packet


#
# get path
#
def encode_get(argc, args):

    packet = ''
    dtype = ''

    if argc < 1 :
        print 'Invalid number of arguments'
        print syntax_list[4]
        return

    packet = packet + SBR_PKT_RQST_GET
    # data type ignored
    packet = packet + '.'
    packet = packet + encode_segment()
    packet = packet + encode_path(args)

    return packet



#
# example data-type path [data]
#
def encode_example(argc, args):

    packet = ''
    dtype = ''

    if argc < 2 :
        print 'Invalid number of arguments'
        print syntax_list[5]
        return

    if argc > 2 :
        data_type,path,data = args.split(' ', 2)

    else:
        data_type,path = args.split(' ')
        data = ''

    packet = packet + SBR_PKT_RQST_EXAMPLE_SET

    dtype = encode_dtype(data_type)
    if dtype == '':
        return
    packet = packet + dtype

    packet = packet + encode_segment()
    packet = packet + encode_path(path)
    if data != '':
        packet = packet + SBR_VARLENGTH_SEPARATOR
        packet = packet + encode_data(data)

    return packet


#
# Parse command and build a request packet
#
def encode_request(request):

    if request.find(' ') < 0 :
        print_usage()
        return

    argc = len(request.split(' ')) - 1

    # all commands take at least one argument
    if argc < 1 :
        print_usage()
        return

    request_type,args = request.split(' ', 1)
    request_type = request_type.lower()

    if request_type[0] == 'c':
        p = encode_create(argc, args)

    elif request_type[0] == 'd':
        p = encode_delete(argc, args)

    elif request_type[0] == 'a':
        p = encode_add(argc, args)

    elif request_type[0] == 'p':
        p = encode_push(argc, args)

    elif request_type[0] == 'g':
        p = encode_get(argc, args)

    elif request_type[0] == 'e':
        p = encode_example(argc, args)

    elif request_type[0] == 'r':
        p = args

    else:
        print_usage()
        return

    return p


#
# Decode and print contents of an incoming packet
#
def decode_response(response):

    resp = {}

    # Positional fields:
    ptype      = response[0]
    status     = response[1]
    seg_number = response[2]
    seg_count  = response[3]

    # Labeled, variable length fields
    var_length = response[4:]

    for i in range(len(ptypes)):
        test = ptypes[i]
        if test[0] == ptype:
            resp['responseType'] = test[0]

    # Status is represented in ASCII, starting with '@' (0x40) for OK.
    # Subtract 0x40 to index into the table, above
    #
    i = ord(status[0]) - ord('\x40')
    resp['status'] = status_list[i]

    if len(var_length):
        var_fields = var_length.split(',')
        for i in range(len(var_fields)):
            field = var_fields[i]
            if field[0] == 'P':
                resp['path'] = field[1:]
            if field[0] == 'T':
                resp['timestamp'] = field[1:]
            if field[0] == 'D':
                resp['data'] = field[1:]
    return resp

sample.py

import os
from json import dumps
import logging
from platform import platform
from psutil import cpu_percent, virtual_memory
from serial import Serial
from time import sleep
from octave_rp import OctaveRP, Output, Input, Sensor

DEV = os.getenv('DEV', '/dev/ttyS0')

# Create serial object and use it to create SbSerial connection
s = Serial(DEV)
orp = OctaveRP(s)

# Handles request for platform resource
def string_platform_handler():
    return platform()

# Handles request for JSON resource
def json_vmem_handler():
    mem = virtual_memory()
    return dumps({ 'total': mem.total, 'available': mem.available, 'active': mem.active })

# Create sensor objects with SbSerial connection, callback, type,
# resource path, and optionally unit of measure
sensors = [
    Sensor(orp, string_platform_handler, 'string', 'system/platform', 'alphabet'),
    Sensor(orp, cpu_percent, 'num', 'system/cpu', 'percent'),
    Sensor(orp, json_vmem_handler, 'json', 'system/mem')
]

[sensor.create_sensor() for sensor in sensors]

def print_output(data):
    print '*' * 20
    print '* %s' % data
    print '*' * 20

output = Output(orp, print_output, 'string', 'external_status/lcd1', 'alphabet')
output.create_output()

push_type_input = Input(orp, 'num', 'push_type_input', 'number')
push_type_input.create_input()

push_counter = 0
push_type_input.send(push_counter)

# Run Forever
while True:
    try:
        sleep(1)
        push_type_input.send(push_counter)
        push_counter += 1
    except KeyboardInterrupt:
        exit(0)

simple_hdlc.py

#!/usr/bin/python
# coding: utf8

__version__ = '0.2'

import logging
import struct
import time
from threading import Thread
from PyCRC.CRCCCITT import CRCCCITT


logger = logging.getLogger(__name__)


def calcCRC(data):
    crc = CRCCCITT("FFFF").calculate(bytes(data))
    b = bytearray(struct.pack(">H", crc))
    return b

class Frame(object):
    STATE_READ = 0x01
    STATE_ESCAPE = 0x02

    def __init__(self):
        self.finished = False
        self.error = False
        self.state = self.STATE_READ
        self.data = bytearray()
        self.crc = bytearray()
        self.reader = None

    def __len__(self):
        return len(self.data)

    def addByte(self, b):
        if b == 0x7D:
            self.state = self.STATE_ESCAPE
        elif self.state == self.STATE_ESCAPE:
            self.state = self.STATE_READ
            b = b ^ 0x20
            self.data.append(b)
        else:
            self.data.append(b)

    def finish(self):
        self.crc = self.data[-2:]
        self.data = self.data[:-2]
        self.finished = True

    def checkCRC(self):
        res = bool(self.crc == calcCRC(self.data))
        if not res:
            c1 = str(self.crc)
            c2 = str(calcCRC(self.data))
            logger.warning("invalid crc %s != %s", c1.encode("hex"), c2.encode("hex"))
            self.error = True
        return res

    def toString(self):
        return str(self.data)


class HDLC(object):
    def __init__(self, serial):
        self.serial = serial
        self.current_frame = None
        self.last_frame = None
        self.frame_callback = None
        self.error_callback = None
        self.running = False

    @classmethod
    def toBytes(cls, data):
        return bytearray(data)

    def sendFrame(self, data):
        bs = self._encode(self.toBytes(data))
        logger.info("Sending Frame: %s", bs.encode("hex"))
        res = self.serial.write(bs)
        logger.info("Send %s bytes", res)

    def _onFrame(self, frame):
        self.last_frame = frame
        s = self.last_frame.toString()
        logger.info("Received Frame: %s", s.encode("hex"))
        if self.frame_callback is not None:
            self.frame_callback(s)

    def _onError(self, frame):
        self.last_frame = frame
        s = self.last_frame.toString()
        logger.warning("Frame Error: %s", s.encode("hex"))
        if self.error_callback is not None:
            self.error_callback(s)

    def _readBytes(self, size):
        while size > 0:
            b = bytearray(self.serial.read(1))
            if b < 1:
                return False
            res = self._readByte(b[0])
            if res:
                return True

    def _readByte(self, b):
        assert 0 <= b <= 255
        if b == 0x7E:
            # Start or End
            if not self.current_frame or len(self.current_frame) < 1:
                # Start
                self.current_frame = Frame()
            else:
                # End
                self.current_frame.finish()
                self.current_frame.checkCRC()
        elif self.current_frame is None:
            # Ignore before Start
            return False
        elif not self.current_frame.finished:
            self.current_frame.addByte(b)
        else:
            # Ignore Bytes
            pass

        # Validate and return
        if self.current_frame.finished and not self.current_frame.error:
            # Success
            self._onFrame(self.current_frame)
            self.current_frame = None
            return True
        elif self.current_frame.finished:
            # Error
            self._onError(self.current_frame)
            self.current_frame = None
            return True
        return False

    def readFrame(self, timeout=5):
        timer = time.time() + timeout
        while time.time() < timer:
            i = self.serial.in_waiting
            if i < 1:
                time.sleep(0.0001)
                continue

            res = self._readBytes(i)

            if res:
                # Validate and return
                if not self.last_frame.error:
                    # Success
                    s = self.last_frame.toString()
                    return s
                elif self.last_frame.finished:
                    # Error
                    raise ValueError("Invalid Frame (CRC FAIL)")
        raise RuntimeError("readFrame timeout")

    @classmethod
    def _encode(cls, bs):
        data = bytearray()
        data.append(0x7E)
        crc = calcCRC(bs)
        for byte in bs:
            if byte == 0x7E or byte == 0x7D:
                data.append(0x7D)
                data.append(byte ^ 0x20)
            else:
                data.append(byte)
        data += crc
        data.append(0x7E)
        return bytes(data)

    def _receiveLoop(self):
        while self.running:
            i = self.serial.in_waiting
            if i < 1:
                time.sleep(0.001)
                continue
            res = self._readBytes(i)

    def startReader(self, onFrame, onError=None):
        if self.running:
            raise RuntimeError("reader already running")
        self.reader = Thread(target=self._receiveLoop)
        self.reader.setDaemon(True)
        self.frame_callback = onFrame
        self.error_callback = onError
        self.running = True
        self.reader.start()

    def stopReader(self):
        self.running = False
        self.reader.join()
        self.reader = None

Source Code for the Demo Application for Southbound Resource Protocol

The following sub sections provide the source code that you can copy and paste into the respective files to create the Demo Application for Southbound Resource Protocol package. This package can be used to experiment with ORP and is also used in this tutorial.

The files in the package include the following:

📘

Note

Create a sub directory named "sb_serial" within the package and place the files listed above that are prefixed with "/sb_serial/" into that sub directory.

requirements.txt

psutil==5.4.8
PyCRC==1.21
pyserial==3.4

📘

Note

PyCRC 1.21 must be downloaded from here.

run.py

import os
from json import dumps
import logging
from platform import platform
from psutil import cpu_percent, virtual_memory
from serial import Serial
from time import sleep
from sb_serial import Sensor, SbSerial

#DF: changed
#DEV = os.getenv('DEV', '/dev/ttyS0')
DEV = os.getenv('DEV', '/dev/tty.SLAB_USBtoUART')

# Create serial object and use it to create SbSerial connection
s = Serial(DEV)
sbs = SbSerial(s)

# Handles request for platform resource
def string_platform_handler():
    return platform()

# Handles request for JSON resource
def json_vmem_handler():
    mem = virtual_memory()
    return dumps({ 'total': mem.total, 'available': mem.available, 'active': mem.active })

# Create sensor objects with SbSerial connection, callback, type,
# resource path, and optionally unit of measure
sensors = [
    Sensor(sbs, string_platform_handler, 'string', 'system/platform', 'alphabet'),
    Sensor(sbs, cpu_percent, 'num', 'system/cpu', 'percent'),
    Sensor(sbs, json_vmem_handler, 'json', 'system/mem')
]

[sensor.create_sensor() for sensor in sensors]

# Run Forever
while True:
    try:
        sleep(1)
    except KeyboardInterrupt:
        exit(0)

__init__py for SBR

📘

Note

To be placed in a sub directory named "sb_serial".

from sb_serial import Sensor, SbSerial

sb_serial.py

📘

Note

To be placed in a sub directory named "sb_serial".

from sbr_Protocol_v1 import *
from serial import Serial
from simple_hdlc import HDLC
from time import sleep
import logging

logging.basicConfig(format='%(asctime)s %(name)s: %(message)s')
logger = logging.getLogger('SbSerial')
logger.setLevel(logging.DEBUG)

class SbSerial(HDLC):

    def __init__(self, serial, retry_timeout=0.5):
        HDLC.__init__(self, serial)
        self.serial = serial
        self.messages = []
        self.sensor_handlers = {}
        self.output_handlers = {}
        self.retry_timeout = retry_timeout
        self.startReader(onFrame=self.rcv_frame)

    def execute_wait_confirm(self, frame, resp):
        self.clear()
        self.serial.write("~~~~~~~~~~~~")
        confirmed = False
        while confirmed == False:
            logger.debug("sending frame, awaiting %s" % resp)
            self.sendFrame(frame)
            sleep(self.retry_timeout)
            for m in self.messages:
                logger.debug('checking message %s' % m)
                if m['responseType'] == resp:
                    confirmed = True

    def push_value(self, path):
        value = self.sensor_handlers[path][1]()
        cmd = 'push %s %s %s' % (self.sensor_handlers[path][0],
            '%s/value' % path,
            self.sensor_handlers[path][1]())
        logger.debug('preparing to send: %s' % cmd)
        frame = encode_request(cmd)
        self.sendFrame(frame)
        #self.execute_wait_confirm(frame, SBR_PKT_RESP_PUSH)

    def create_sensor(self, dataType, path, unit, handler):
        if unit:
            frame = encode_request('create sensor %s %s %s' % (dataType, path, unit))
        else:
            frame = encode_request('create sensor %s %s' % (dataType, path))
        logger.debug('creating sensor at %s' % path)
        self.execute_wait_confirm(frame, SBR_PKT_RESP_SENSOR_CREATE)
        self.sensor_handlers[path] = (dataType, handler)
        sleep(0.2)
        self.push_value(path)

    def clear(self):
        self.messages = []

    def rcv_frame(self, data):
        message = decode_response(data)
        logger.debug('recv frame: %s' % message)
        if message['responseType'] == SBR_PKT_NTFY_SENSOR_CALL:
            if message['path'] in self.sensor_handlers.keys():
                self.push_value(message['path'])
        elif message['responseType'] == SBR_PKT_NTFY_SENSOR_CALL:
            logger.debug('new data at %s' % message['path'])
        else:
            self.messages.append(message)

class Sensor(object):

    def __init__(self, sbs, handler, dataType, path, unit=None):
        self.dataType = dataType
        self.path = path
        self.unit = unit
        self.sbs = sbs
        self.handler = handler
        self.declared = False

    def create_sensor(self):
        self.sbs.create_sensor(self.dataType, self.path, self.unit, self.handler)
        self.declared = True

sbr_Protocol_v1.py

📘

Note

To be placed in a sub directory named "sb_serial".

#============================================================================
#
# Filename:  sbr_Protocol_v1.py
#
# Purpose:   Python test functions to encode and decode packets for the
#            Southbound Resource Protocol
#
# Copyright: (c) 2018 Sierra Wireless, Inc.
#            All rights reserved
#
#----------------------------------------------------------------------------
#
# VERSION: 0.0.1
#
# NOTES:
#

#
# Packet type field - byte 0
#
SBR_PKT_RQST_INPUT_CREATE   = 'I'   # type[1] d_type[1] pad[2] path[] units[]
SBR_PKT_RESP_INPUT_CREATE   = 'i'   # type[1] status[1] pad[2]

SBR_PKT_RQST_OUTPUT_CREATE  = 'O'   # type[1] d_type[1] pad[2] path[] units[]
SBR_PKT_RESP_OUTPUT_CREATE  = 'o'   # type[1] status[1] pad[2]

SBR_PKT_RQST_DELETE         = 'D'   # type[1] pad[1]    pad[2] path[]
SBR_PKT_RESP_DELETE         = 'd'   # type[1] status[1] pad[2]

SBR_PKT_RQST_HANDLER_ADD    = 'H'   # type[1] pad[1]    pad[2] path[] 
SBR_PKT_RESP_HANDLER_ADD    = 'h'   # type[1] status[1] pad[2]

SBR_PKT_RQST_HANDLER_REMOVE = 'K'   # type[1] pad[1]    pad[2] path[] 
SBR_PKT_RESP_HANDLER_REMOVE = 'k'   # type[1] status[1] pad[2]

SBR_PKT_RQST_PUSH           = 'P'   # type[1] d_type[1] pad[2] time[] path[] data[]
SBR_PKT_RESP_PUSH           = 'p'   # type[1] status[1] pad[2]

SBR_PKT_RQST_GET            = 'G'   # type[1] pad[1]    pad[2] path[]
SBR_PKT_RESP_GET            = 'g'   # type[1] status[1] pad[2] time[] data[]

SBR_PKT_RQST_EXAMPLE_SET    = 'E'   # type[1] d_type[1] pad[2] path[] data[]
SBR_PKT_RESP_EXAMPLE_SET    = 'e'   # type[1] status[1] pad[2]

SBR_PKT_RQST_SENSOR_CREATE  = 'S'   # type[1] d_type[1] pad[2] path[] units[]
SBR_PKT_RESP_SENSOR_CREATE  = 's'   # type[1] status[1] pad[2]

SBR_PKT_RQST_SENSOR_REMOVE  = 'R'   # type[1] pad[1]    pad[2] path[]
SBR_PKT_RESP_SENSOR_REMOVE  = 'r'   # type[1] status[1] pad[2]

SBR_PKT_NTFY_HANDLER_CALL   = 'c'   # type[1] d_type[1] pad[2] time[] path[] data[]
SBR_PKT_RESP_HANDLER_CALL   = 'C'   # type[1] status[1] pad[2]

SBR_PKT_NTFY_SENSOR_CALL    = 'b'   # type[1] pad[1]    pad[2] path[]
SBR_PKT_RESP_SENSOR_CALL    = 'B'   # type[1] status[1] pad[2]

SBR_PKT_RESP_UNKNOWN_RQST   = '?'   # type[1] status[1] pad[2]


#
# Data type field - byte 1
#
SBR_DATA_TYPE_TRIGGER       = 'T'   # trigger - no data
SBR_DATA_TYPE_BOOLEAN       = 'B'   # Boolean - 1 byte:  't' | 'f'
SBR_DATA_TYPE_NUMERIC       = 'N'   # numeric - null-terminated ASCII string, representing double
SBR_DATA_TYPE_STRING        = 'S'   # string  - null-terminated ASCII string
SBR_DATA_TYPE_JSON          = 'J'   # JSON    - null-terminated ASCII string, representing JSON
SBR_DATA_TYPE_UNDEF         = ' '   # not specified


#
# Variable length field identifiers
#
SBR_FIELD_ID_PATH           = 'P'
SBR_FIELD_ID_TIME           = 'T'
SBR_FIELD_ID_UNITS          = 'U'
SBR_FIELD_ID_DATA           = 'D'

# Variable length field separator
SBR_VARLENGTH_SEPARATOR     = ','



#
# Packet type descriptions
#
ptypes = [
    [ SBR_PKT_RQST_INPUT_CREATE,   'request create input'     ],
    [ SBR_PKT_RESP_INPUT_CREATE,   'response create input'    ],

    [ SBR_PKT_RQST_OUTPUT_CREATE,  'request create output'    ],
    [ SBR_PKT_RESP_OUTPUT_CREATE,  'response create output'   ],

    [ SBR_PKT_RQST_DELETE,         'request delete resource'  ],
    [ SBR_PKT_RESP_DELETE,         'response delete resource' ],

    [ SBR_PKT_RQST_HANDLER_ADD,    'request add handler'      ], 
    [ SBR_PKT_RESP_HANDLER_ADD,    'response add handler'     ],

    [ SBR_PKT_RQST_HANDLER_REMOVE, 'request remove handler'   ],
    [ SBR_PKT_RESP_HANDLER_REMOVE, 'response remove handler'  ],

    [ SBR_PKT_RQST_PUSH,           'request push'             ],
    [ SBR_PKT_RESP_PUSH,           'response push'            ],

    [ SBR_PKT_RQST_GET,            'request get'              ],
    [ SBR_PKT_RESP_GET,            'response get'             ],

    [ SBR_PKT_RQST_EXAMPLE_SET,    'request set example'      ],
    [ SBR_PKT_RESP_EXAMPLE_SET,    'response set example'     ],

    [ SBR_PKT_RQST_SENSOR_CREATE,  'request create sensor'    ],
    [ SBR_PKT_RESP_SENSOR_CREATE,  'response create sensor'   ],

    [ SBR_PKT_RQST_SENSOR_REMOVE,  'request remove sensor'    ],
    [ SBR_PKT_RESP_SENSOR_REMOVE,  'response remove sensor'   ],

    [ SBR_PKT_NTFY_HANDLER_CALL,   'handler call'             ],
    [ SBR_PKT_RESP_HANDLER_CALL,   'handler ack'              ],

    [ SBR_PKT_NTFY_SENSOR_CALL,    'sensor poll'              ],
    [ SBR_PKT_RESP_SENSOR_CALL,    'sensor poll ack'          ],

    [ SBR_PKT_RESP_UNKNOWN_RQST,   'unknown packet type'      ],
]


#
# Status field
#
status_list = [
    'OK',
    'NOT FOUND',
    'NOT POSSIBLE',   # deprecated
    'OUT OF RANGE',
    'NO MEMORY',
    'NOT PERMITTED',
    'FAULT',
    'COMM ERROR',
    'TIMEOUT',
    'OVERFLOW',
    'UNDERFLOW',
    'WOULD BLOCK',
    'DEADLOCK',
    'FORMAT ERROR',
    'DUPLICATE',
    'BAD PARAMETER',
    'CLOSED',
    'BUSY',
    'UNSUPPORTED',
    'IO_ERROR',
    'NOT IMPLEMENTED',
    'UNAVAILABLE',
    'TERMINATED'
]


#
# Data types
#
data_types = [
    [ 'trig',   SBR_DATA_TYPE_TRIGGER ],
    [ 'bool',   SBR_DATA_TYPE_BOOLEAN ],
    [ 'num',    SBR_DATA_TYPE_NUMERIC ],
    [ 'str',    SBR_DATA_TYPE_STRING  ],
    [ 'json',   SBR_DATA_TYPE_JSON    ]
]


#
# Syntax
#
syntax_list = [
    '  create input|output|sensor  trig|bool|num|str|json <path> [<units>]',
    '  delete resource|handler|sensor <path>',
    '  add handler <path>',
    '  push trig|bool|num|str|json <path> [<data>]',
    '  get <path>',
    '  example json <path> [<data>]'
]


#
# Usage
#
def print_usage():

    print 'Usage:'
    for i in range(len(syntax_list)):
        print syntax_list[i]
    print


#
# Encode data type
#
def encode_dtype(data_type):

    field = ''
    dtype = data_type.lower()

    if   dtype[0] == 't':
        field = field + SBR_DATA_TYPE_TRIGGER

    elif dtype[0] == 'b':
        field = field + SBR_DATA_TYPE_BOOLEAN

    elif dtype[0] == 'n':
        field = field + SBR_DATA_TYPE_NUMERIC

    elif dtype[0] == 's':
        field = field + SBR_DATA_TYPE_STRING

    elif dtype[0] == 'j':
        field = field + SBR_DATA_TYPE_JSON

    else:
        print 'Invalid data type'
        return

    return field

#
# Encode segment number and segment count
#
def encode_segment():

    return '01'


#
# Encode path
#
def encode_path(path):

    return SBR_FIELD_ID_PATH + path


#
# Encode units
#
def encode_units(units):

    return SBR_FIELD_ID_UNITS + units


#
# Encode data
#
def encode_data(data):

    return SBR_FIELD_ID_DATA + data


#
# create input|output|sensor data-type path [units]
#
def encode_create(argc, args):

    packet = ''
    dtype = ''
    syntax = syntax_list[0]


    if argc < 3 :
        print 'Invalid number of arguments'
        print syntax_list[0]
        return

    if argc > 3 :
        what,data_type,path,units = args.split(' ')

    else:
        what,data_type,path = args.split(' ')
        units = None

    what = what.lower()

    if what[0] == 'i':
        packet = packet + SBR_PKT_RQST_INPUT_CREATE
    elif what[0] == 'o':
        packet = packet + SBR_PKT_RQST_OUTPUT_CREATE
    elif what[0] == 's':
        packet = packet + SBR_PKT_RQST_SENSOR_CREATE
    else:
        print 'Invalid request'
        print syntax_list[0]
        return

    dtype = encode_dtype(data_type)
    if dtype == '': 
        return
    packet = packet + dtype

    packet = packet + encode_segment()
    packet = packet + encode_path(path)
    if units != None:
        packet = packet + SBR_VARLENGTH_SEPARATOR
        packet = packet + encode_units(units)

    return packet


#
# delete resource|handler|sensor path
#
def encode_delete(argc, args):

    packet = ''


    if argc < 2 :
        print 'Invalid number of arguments'
        print syntax_list[1]
        return

    what,path = args.split(' ')
    what = what.lower()

    # packet type
    if what[0] == 'r':
        packet = packet + SBR_PKT_RQST_DELETE
    elif what[0] == 'h':
        packet = packet + SBR_PKT_RQST_HANDLER_REMOVE
    elif what[0] == 's':
        packet = packet + SBR_PKT_RQST_SENSOR_REMOVE
    else:
        print 'Invalid request'
        print syntax_list[1]
        return

    packet = packet + '.'
    packet = packet + encode_segment()
    packet = packet + encode_path(path)

    return packet


#
# add handler path
#
def encode_add(argc, args):

    packet = ''


    if argc < 2 :
        print 'Invalid number of arguments'
        print syntax_list[2]
        return

    what,path = args.split(' ')
    what = what.lower()

    if what[0] != 'h':
        print 'Invalid request ' + what
        print syntax_list[2]
        return

    packet = packet + SBR_PKT_RQST_HANDLER_ADD
    # data type - ignored
    packet = packet + '.'
    packet = packet + encode_segment()
    packet = packet + encode_path(path)

    return packet


#
# push data-type path [data]
#
def encode_push(argc, args):

    packet = ''
    dtype = ''

    if argc < 2 :
        print 'Invalid number of arguments'
        print syntax_list[3]
        return

    print 'argc: ' + str(argc)
    if argc > 2 :
        data_type,path,data = args.split(' ', 2)

    else:
        data_type,path = args.split(' ')
        data = ''

    packet = packet + SBR_PKT_RQST_PUSH

    dtype = encode_dtype(data_type)
    if dtype == '': 
        return
    packet = packet + dtype

    packet = packet + encode_segment()
    packet = packet + encode_path(path)
    if data != '':
        packet = packet + SBR_VARLENGTH_SEPARATOR
        packet = packet + encode_data(data)

    return packet


#
# get path
#
def encode_get(argc, args):

    packet = ''
    dtype = ''

    if argc < 1 :
        print 'Invalid number of arguments'
        print syntax_list[4]
        return

    packet = packet + SBR_PKT_RQST_GET
    # data type ignored
    packet = packet + '.'
    packet = packet + encode_segment()
    packet = packet + encode_path(args)

    return packet



#
# example data-type path [data]
#
def encode_example(argc, args):

    packet = ''
    dtype = ''

    if argc < 2 :
        print 'Invalid number of arguments'
        print syntax_list[5]
        return

    print 'argc: ' + str(argc)
    if argc > 2 :
        data_type,path,data = args.split(' ', 2)

    else:
        data_type,path = args.split(' ')
        data = ''

    packet = packet + SBR_PKT_RQST_EXAMPLE_SET

    dtype = encode_dtype(data_type)
    if dtype == '':
        return
    packet = packet + dtype

    packet = packet + encode_segment()
    packet = packet + encode_path(path)
    if data != '':
        packet = packet + SBR_VARLENGTH_SEPARATOR
        packet = packet + encode_data(data)

    return packet


#
# Parse command and build a request packet
#
def encode_request(request):

    if request.find(' ') < 0 :
        print_usage()
        return

    argc = len(request.split(' ')) - 1

    # all commands take at least one argument
    if argc < 1 :
        print_usage()
        return

    request_type,args = request.split(' ', 1)
    request_type = request_type.lower()

    if request_type[0] == 'c':
        p = encode_create(argc, args)

    elif request_type[0] == 'd':
        p = encode_delete(argc, args)

    elif request_type[0] == 'a':
        p = encode_add(argc, args)

    elif request_type[0] == 'p':
        p = encode_push(argc, args)

    elif request_type[0] == 'g':
        p = encode_get(argc, args)

    elif request_type[0] == 'e':
        p = encode_example(argc, args)

    elif request_type[0] == 'r':
        p = args

    else:
        print_usage()
        return

    return p


#
# Decode and print contents of an incoming packet
#
def decode_response(response):

    resp = {}
    #print('Received     : ' + response)

    # Positional fields:
    ptype      = response[0]
    status     = response[1]
    seg_number = response[2]
    seg_count  = response[3]

    # Labeled, variable length fields
    var_length = response[4:]

    for i in range(len(ptypes)):
        test = ptypes[i]
        if test[0] == ptype:
            #print 'Message type : ' + test[1]
            resp['responseType'] = test[0]

    # Status is represented in ASCII, starting with '@' (0x40) for OK.
    # Subtract 0x40 to index into the table, above
    #
    i = ord(status[0]) - ord('\x40')
    #print 'Status       : ' + status_list[i]
    resp['status'] = status_list[i]

    if len(var_length):
        var_fields = var_length.split(',')
        for i in range(len(var_fields)):
            field = var_fields[i]
            if field[0] == 'P':
                resp['path'] = field[1:]
                #print 'Path         : ' + field[1:]
            if field[0] == 'T':
                resp['timestamp'] = field[1:]
                #print 'Timestamp    : ' + field[1:]
            if field[0] == 'D':
                resp['data'] = field[1:]
                #print 'Data         : ' + field[1:]
    return resp

simple_hdlc.py for SBR

📘

Note

To be placed in a sub directory named "sb_serial".

#!/usr/bin/python
# coding: utf8

__version__ = '0.2'

import logging
import struct
import time
from threading import Thread
from PyCRC.CRCCCITT import CRCCCITT


logger = logging.getLogger(__name__)


def calcCRC(data):
    crc = CRCCCITT("FFFF").calculate(bytes(data))
    b = bytearray(struct.pack(">H", crc))
    return b

class Frame(object):
    STATE_READ = 0x01
    STATE_ESCAPE = 0x02

    def __init__(self):
        self.finished = False
        self.error = False
        self.state = self.STATE_READ
        self.data = bytearray()
        self.crc = bytearray()
        self.reader = None

    def __len__(self):
        return len(self.data)

    def addByte(self, b):
        if b == 0x7D:
            self.state = self.STATE_ESCAPE
        elif self.state == self.STATE_ESCAPE:
            self.state = self.STATE_READ
            b = b ^ 0x20
            self.data.append(b)
        else:
            self.data.append(b)

    def finish(self):
        self.crc = self.data[-2:]
        self.data = self.data[:-2]
        self.finished = True

    def checkCRC(self):
        res = bool(self.crc == calcCRC(self.data))
        if not res:
            c1 = str(self.crc)
            c2 = str(calcCRC(self.data))
            logger.warning("invalid crc %s != %s", c1.encode("hex"), c2.encode("hex"))
            self.error = True
        return res

    def toString(self):
        return str(self.data)


class HDLC(object):
    def __init__(self, serial):
        self.serial = serial
        self.current_frame = None
        self.last_frame = None
        self.frame_callback = None
        self.error_callback = None
        self.running = False

    @classmethod
    def toBytes(cls, data):
        return bytearray(data)

    def sendFrame(self, data):
        bs = self._encode(self.toBytes(data))
        logger.info("Sending Frame: %s", bs.encode("hex"))
        res = self.serial.write(bs)
        logger.info("Send %s bytes", res)

    def _onFrame(self, frame):
        self.last_frame = frame
        s = self.last_frame.toString()
        logger.info("Received Frame: %s", s.encode("hex"))
        if self.frame_callback is not None:
            self.frame_callback(s)

    def _onError(self, frame):
        self.last_frame = frame
        s = self.last_frame.toString()
        logger.warning("Frame Error: %s", s.encode("hex"))
        if self.error_callback is not None:
            self.error_callback(s)

    def _readBytes(self, size):
        while size > 0:
            b = bytearray(self.serial.read(1))
            if b < 1:
                return False
            res = self._readByte(b[0])
            if res:
                return True

    def _readByte(self, b):
        assert 0 <= b <= 255
        if b == 0x7E:
            # Start or End
            if not self.current_frame or len(self.current_frame) < 1:
                # Start
                self.current_frame = Frame()
            else:
                # End
                self.current_frame.finish()
                self.current_frame.checkCRC()
        elif self.current_frame is None:
            # Ignore before Start
            return False
        elif not self.current_frame.finished:
            self.current_frame.addByte(b)
        else:
            # Ignore Bytes
            pass

        # Validate and return
        if self.current_frame.finished and not self.current_frame.error:
            # Success
            self._onFrame(self.current_frame)
            self.current_frame = None
            return True
        elif self.current_frame.finished:
            # Error
            self._onError(self.current_frame)
            self.current_frame = None
            return True
        return False

    def readFrame(self, timeout=5):
        timer = time.time() + timeout
        while time.time() < timer:
            i = self.serial.in_waiting
            if i < 1:
                time.sleep(0.0001)
                continue

            res = self._readBytes(i)

            if res:
                # Validate and return
                if not self.last_frame.error:
                    # Success
                    s = self.last_frame.toString()
                    return s
                elif self.last_frame.finished:
                    # Error
                    raise ValueError("Invalid Frame (CRC FAIL)")
        raise RuntimeError("readFrame timeout")

    @classmethod
    def _encode(cls, bs):
        data = bytearray()
        data.append(0x7E)
        crc = calcCRC(bs)
        bs += crc
        for byte in bs:
            if byte == 0x7E or byte == 0x7D:
                data.append(0x7D)
                data.append(byte ^ 0x20)
            else:
                data.append(byte)
        data.append(0x7E)
        return bytes(data)

    def _receiveLoop(self):
        while self.running:
            i = self.serial.in_waiting
            if i < 1:
                time.sleep(0.001)
                continue
            res = self._readBytes(i)

    def startReader(self, onFrame, onError=None):
        if self.running:
            raise RuntimeError("reader already running")
        self.reader = Thread(target=self._receiveLoop)
        self.reader.setDaemon(True)
        self.frame_callback = onFrame
        self.error_callback = onError
        self.running = True
        self.reader.start()

    def stopReader(self):
        self.running = False
        self.reader.join()
        self.reader = None