Encoding
Message encoding and framing can be broken into three steps:
Byte values
0x00
,0x01
, and0x02
are escaped using COBS.All bytes are XORed with
0x03
to ensure output contains no problematic control characters.A delimiter is added to the end of the message.
Deframing and decoding is the reverse of these steps.
Consistent Overhead Byte Stuffing (COBS)
COBS is an algorithm that can be used to escape certain values in a byte stream with a minimal overhead. This frees up the values to be used for special purposes, such as message delimiters or other control characters.
The typical implementations of COBS only escape 0x00
, but the SPIKE™ Prime
implementation additionally escapes 0x01
and 0x02
, as they are used for
message delimiters.
Delimeters are replaced with a special code word that indicates the number of bytes until the next delimiter and its value. The code word is calculated as follows:
The minimum value of \(\text{block_size}\) is 1, as it will aways contain at least the code word itself. Therefore, by adding 2, the minimum value of \(\text{code_word}\) is 3, ensuring no overlap with any delimiters.
This leaves 0xFF - 3
bytes to be used for the block size, which must be
divided by 3 (the number of different delimiters), resulting in a maximum block
size of 84.
For blocks with no delimiters, the code word \(255\) is used. Thus, the code word can be decoded as follows:
Code word |
Block size |
Delimeter |
---|---|---|
\(0 \leq n \leq 2\) |
N/A |
N/A |
\(3 \leq n \leq 86\) |
\(n - 3\) |
\(0\) |
\(87 \leq n \leq 170\) |
\(n - 87\) |
\(1\) |
\(171 \leq n \leq 254\) |
\(n - 171\) |
\(2\) |
\(255\) |
\(84\) |
N/A |
It’s important that the encoded output always begins with a valid code word (i.e. the first byte is not a delimiter). This provides the decoder with a known starting point, allowing it to correctly decode the rest of the message.
Below are sample implementations of the COBS encoding and decoding algorithms in Python:
def encode(data: bytes):
"""
Encode data using COBS algorithm, such that no delimiters are present.
"""
buffer = bytearray()
code_index = block = 0
def begin_block():
"""Append code word to buffer and update code_index and block"""
nonlocal code_index, block
code_index = len(buffer) # index of incomplete code word
buffer.append(NO_DELIMITER) # updated later if delimiter is encountered
block = 1 # no. of bytes in block (incl. code word)
begin_block()
for byte in data:
if byte > DELIMITER:
# non-delimeter value, write as-is
buffer.append(byte)
block += 1
if byte <= DELIMITER or block > MAX_BLOCK_SIZE:
# block completed because size limit reached or delimiter found
if byte <= DELIMITER:
# reason for block completion is delimiter
# update code word to reflect block size
delimiter_base = byte * MAX_BLOCK_SIZE
block_offset = block + COBS_CODE_OFFSET
buffer[code_index] = delimiter_base + block_offset
# begin new block
begin_block()
# update final code word
buffer[code_index] = block + COBS_CODE_OFFSET
return buffer
def decode(data: bytes):
"""
Decode data using COBS algorithm.
"""
buffer = bytearray()
def unescape(code: int):
"""Decode code word, returning value and block size"""
if code == 0xFF:
# no delimiter in block
return None, MAX_BLOCK_SIZE + 1
value, block = divmod(code - COBS_CODE_OFFSET, MAX_BLOCK_SIZE)
if block == 0:
# maximum block size ending with delimiter
block = MAX_BLOCK_SIZE
value -= 1
return value, block
value, block = unescape(data[0])
for byte in data[1:]: # first byte already processed
block -= 1
if block > 0:
buffer.append(byte)
continue
# block completed
if value is not None:
buffer.append(value)
value, block = unescape(byte)
return buffer
Escaping and framing
After encoding the data with COBS as described above, the data
will contain no bytes with a value of 0x00
, 0x01
, or 0x02
.
In SPIKE™ Prime, the values 0x01
and 0x02
are used as message delimiters.
0x01
signifies the start of a high-priority message, and 0x02
signifies
the end of a message (and implicitly the start or resumption of a low-priority message).
In addition to the delimiters 0x01
and 0x02
, the value 0x03
must also
be replaced, as it carries special meaning in the SPIKE™ Prime protocol.
To ensure that the output contains no bytes with a value of 0x03
, all bytes
are bitwise XORed with 0x03
. This effectively shifts the byte values by 3,
turning 0x03
into 0x00
and vice versa, but because the COBS algorithm
already removed any 0x00
bytes, the result will contain no 0x03
bytes.
Finally, the message is framed by (optionally) prefixing it with 0x01
,
and (always) suffixing it with 0x02
. See example below:
def pack(data: bytes):
"""
Encode and frame data for transmission.
"""
buffer = encode(data)
# XOR buffer to remove problematic ctrl+C
for i in range(len(buffer)):
buffer[i] ^= XOR
# add delimiter
buffer.append(DELIMITER)
return bytes(buffer)
Deframing and unescaping
As bytes are received, they should be buffered into their respective priority
queues until a complete message is received, as indicated by the presence of
0x02
.
The table below shows how to interpret each delimiter depending on the state of transmission:
Delimiter |
Message in progress |
Interpretation |
Action |
---|---|---|---|
|
None |
Start of high-priority message |
Start buffering into high-priority queue |
|
High-priority |
Illegal state |
Sync error, clear queues and start buffering into high-priority queue |
|
Low-priority |
Start of high-priority message |
Pause buffering of low-priority message and start buffering into high-priority queue |
|
None |
Start of low-priority message |
Start buffering into low-priority queue |
|
High-priority |
End of high-priority message |
Process high-priority message, empty high-priority queue and start buffering into low-priority queue |
|
Low-priority |
End of low-priority message |
Process low-priority message and empty low-priority queue |
When a message is completed, it can be deframed by removing any leading 0x01
and trailing 0x02
bytes. The remaining bytes can then be unescaped by
reversing the XOR operation and then COBS decoded as described above.
Below is an example of how to deframe and unescape a message in Python:
def unpack(frame: bytes):
"""
Unframe and decode frame.
"""
start = 0
if frame[0] == 0x01: # unused priority byte
start += 1
# unframe and XOR
unframed = bytes(map(lambda x: x ^ XOR, frame[start:-1]))
return bytes(decode(unframed))