150 lines
6.8 KiB
Python
150 lines
6.8 KiB
Python
|
from tests.awre.AWRETestCase import AWRETestCase
|
||
|
from tests.utils_testing import get_path_for_data_file
|
||
|
from urh.awre.CommonRange import CommonRange
|
||
|
from urh.awre.FormatFinder import FormatFinder
|
||
|
from urh.awre.Preprocessor import Preprocessor
|
||
|
from urh.awre.ProtocolGenerator import ProtocolGenerator
|
||
|
from urh.signalprocessing.FieldType import FieldType
|
||
|
from urh.signalprocessing.Message import Message
|
||
|
from urh.signalprocessing.MessageType import MessageType
|
||
|
from urh.signalprocessing.Participant import Participant
|
||
|
from urh.signalprocessing.ProtocolAnalyzer import ProtocolAnalyzer
|
||
|
import numpy as np
|
||
|
|
||
|
class TestAWRERealProtocols(AWRETestCase):
|
||
|
def setUp(self):
|
||
|
super().setUp()
|
||
|
alice = Participant("Alice", "A")
|
||
|
bob = Participant("Bob", "B")
|
||
|
self.participants = [alice, bob]
|
||
|
|
||
|
def test_format_finding_enocean(self):
|
||
|
enocean_protocol = ProtocolAnalyzer(None)
|
||
|
with open(get_path_for_data_file("enocean_bits.txt")) as f:
|
||
|
for line in f:
|
||
|
enocean_protocol.messages.append(Message.from_plain_bits_str(line.replace("\n", "")))
|
||
|
enocean_protocol.messages[-1].message_type = enocean_protocol.default_message_type
|
||
|
|
||
|
ff = FormatFinder(enocean_protocol.messages)
|
||
|
ff.perform_iteration()
|
||
|
|
||
|
message_types = ff.message_types
|
||
|
self.assertEqual(len(message_types), 1)
|
||
|
|
||
|
preamble = message_types[0].get_first_label_with_type(FieldType.Function.PREAMBLE)
|
||
|
self.assertEqual(preamble.start, 0)
|
||
|
self.assertEqual(preamble.length, 8)
|
||
|
|
||
|
sync = message_types[0].get_first_label_with_type(FieldType.Function.SYNC)
|
||
|
self.assertEqual(sync.start, 8)
|
||
|
self.assertEqual(sync.length, 4)
|
||
|
|
||
|
checksum = message_types[0].get_first_label_with_type(FieldType.Function.CHECKSUM)
|
||
|
self.assertEqual(checksum.start, 56)
|
||
|
self.assertEqual(checksum.length, 4)
|
||
|
|
||
|
self.assertIsNone(message_types[0].get_first_label_with_type(FieldType.Function.SRC_ADDRESS))
|
||
|
self.assertIsNone(message_types[0].get_first_label_with_type(FieldType.Function.DST_ADDRESS))
|
||
|
self.assertIsNone(message_types[0].get_first_label_with_type(FieldType.Function.LENGTH))
|
||
|
self.assertIsNone(message_types[0].get_first_label_with_type(FieldType.Function.SEQUENCE_NUMBER))
|
||
|
|
||
|
def test_format_finding_rwe(self):
|
||
|
ff, messages = self.get_format_finder_from_protocol_file("rwe.proto.xml", return_messages=True)
|
||
|
ff.run()
|
||
|
|
||
|
sync1, sync2 = "0x9a7d9a7d", "0x67686768"
|
||
|
|
||
|
preprocessor = Preprocessor([np.array(msg.plain_bits, dtype=np.uint8) for msg in messages])
|
||
|
possible_syncs = preprocessor.find_possible_syncs()
|
||
|
self.assertIn(ProtocolGenerator.to_bits(sync1), possible_syncs)
|
||
|
self.assertIn(ProtocolGenerator.to_bits(sync2), possible_syncs)
|
||
|
|
||
|
ack_messages = (3, 5, 7, 9, 11, 13, 15, 17, 20)
|
||
|
ack_message_type = next(mt for mt, messages in ff.existing_message_types.items() if ack_messages[0] in messages)
|
||
|
self.assertTrue(all(ack_msg in ff.existing_message_types[ack_message_type] for ack_msg in ack_messages))
|
||
|
|
||
|
for mt in ff.message_types:
|
||
|
preamble = mt.get_first_label_with_type(FieldType.Function.PREAMBLE)
|
||
|
self.assertEqual(preamble.start, 0)
|
||
|
self.assertEqual(preamble.length, 32)
|
||
|
|
||
|
sync = mt.get_first_label_with_type(FieldType.Function.SYNC)
|
||
|
self.assertEqual(sync.start, 32)
|
||
|
self.assertEqual(sync.length, 32)
|
||
|
|
||
|
length = mt.get_first_label_with_type(FieldType.Function.LENGTH)
|
||
|
self.assertEqual(length.start, 64)
|
||
|
self.assertEqual(length.length, 8)
|
||
|
|
||
|
dst = mt.get_first_label_with_type(FieldType.Function.DST_ADDRESS)
|
||
|
self.assertEqual(dst.length, 24)
|
||
|
|
||
|
if mt == ack_message_type or 1 in ff.existing_message_types[mt]:
|
||
|
self.assertEqual(dst.start, 72)
|
||
|
else:
|
||
|
self.assertEqual(dst.start, 88)
|
||
|
|
||
|
if mt != ack_message_type and 1 not in ff.existing_message_types[mt]:
|
||
|
src = mt.get_first_label_with_type(FieldType.Function.SRC_ADDRESS)
|
||
|
self.assertEqual(src.start, 112)
|
||
|
self.assertEqual(src.length, 24)
|
||
|
elif 1 in ff.existing_message_types[mt]:
|
||
|
# long ack
|
||
|
src = mt.get_first_label_with_type(FieldType.Function.SRC_ADDRESS)
|
||
|
self.assertEqual(src.start, 96)
|
||
|
self.assertEqual(src.length, 24)
|
||
|
|
||
|
crc = mt.get_first_label_with_type(FieldType.Function.CHECKSUM)
|
||
|
self.assertIsNotNone(crc)
|
||
|
|
||
|
def test_homematic(self):
|
||
|
proto_file = get_path_for_data_file("homematic.proto.xml")
|
||
|
protocol = ProtocolAnalyzer(signal=None, filename=proto_file)
|
||
|
protocol.message_types = []
|
||
|
protocol.from_xml_file(filename=proto_file, read_bits=True)
|
||
|
# prevent interfering with preassinged labels
|
||
|
protocol.message_types = [MessageType("Default")]
|
||
|
|
||
|
participants = sorted({msg.participant for msg in protocol.messages})
|
||
|
|
||
|
self.clear_message_types(protocol.messages)
|
||
|
ff = FormatFinder(protocol.messages, participants=participants)
|
||
|
ff.known_participant_addresses.clear()
|
||
|
ff.perform_iteration()
|
||
|
|
||
|
self.assertGreater(len(ff.message_types), 0)
|
||
|
|
||
|
for i, message_type in enumerate(ff.message_types):
|
||
|
preamble = message_type.get_first_label_with_type(FieldType.Function.PREAMBLE)
|
||
|
self.assertEqual(preamble.start, 0)
|
||
|
self.assertEqual(preamble.length, 32)
|
||
|
|
||
|
sync = message_type.get_first_label_with_type(FieldType.Function.SYNC)
|
||
|
self.assertEqual(sync.start, 32)
|
||
|
self.assertEqual(sync.length, 32)
|
||
|
|
||
|
length = message_type.get_first_label_with_type(FieldType.Function.LENGTH)
|
||
|
self.assertEqual(length.start, 64)
|
||
|
self.assertEqual(length.length, 8)
|
||
|
|
||
|
seq = message_type.get_first_label_with_type(FieldType.Function.SEQUENCE_NUMBER)
|
||
|
self.assertEqual(seq.start, 72)
|
||
|
self.assertEqual(seq.length, 8)
|
||
|
|
||
|
src = message_type.get_first_label_with_type(FieldType.Function.SRC_ADDRESS)
|
||
|
self.assertEqual(src.start, 96)
|
||
|
self.assertEqual(src.length, 24)
|
||
|
|
||
|
dst = message_type.get_first_label_with_type(FieldType.Function.DST_ADDRESS)
|
||
|
self.assertEqual(dst.start, 120)
|
||
|
self.assertEqual(dst.length, 24)
|
||
|
|
||
|
checksum = message_type.get_first_label_with_type(FieldType.Function.CHECKSUM)
|
||
|
self.assertEqual(checksum.length, 16)
|
||
|
self.assertIn("CC1101", checksum.checksum.caption)
|
||
|
|
||
|
for msg_index in ff.existing_message_types[message_type]:
|
||
|
msg_len = len(protocol.messages[msg_index])
|
||
|
self.assertEqual(checksum.start, msg_len-16)
|
||
|
self.assertEqual(checksum.end, msg_len)
|