HackRF-Treasure-Chest/Software/Universal Radio Hacker/tests/awre/test_awre_real_protocols.py
2022-09-22 13:46:47 -07:00

150 lines
6.8 KiB
Python

from tests.awre.AWRETestCase import AWRETestCase
from tests.utils_testing import get_path_for_data_file
from urh.awre.CommonRange import CommonRange
from urh.awre.FormatFinder import FormatFinder
from urh.awre.Preprocessor import Preprocessor
from urh.awre.ProtocolGenerator import ProtocolGenerator
from urh.signalprocessing.FieldType import FieldType
from urh.signalprocessing.Message import Message
from urh.signalprocessing.MessageType import MessageType
from urh.signalprocessing.Participant import Participant
from urh.signalprocessing.ProtocolAnalyzer import ProtocolAnalyzer
import numpy as np
class TestAWRERealProtocols(AWRETestCase):
def setUp(self):
super().setUp()
alice = Participant("Alice", "A")
bob = Participant("Bob", "B")
self.participants = [alice, bob]
def test_format_finding_enocean(self):
enocean_protocol = ProtocolAnalyzer(None)
with open(get_path_for_data_file("enocean_bits.txt")) as f:
for line in f:
enocean_protocol.messages.append(Message.from_plain_bits_str(line.replace("\n", "")))
enocean_protocol.messages[-1].message_type = enocean_protocol.default_message_type
ff = FormatFinder(enocean_protocol.messages)
ff.perform_iteration()
message_types = ff.message_types
self.assertEqual(len(message_types), 1)
preamble = message_types[0].get_first_label_with_type(FieldType.Function.PREAMBLE)
self.assertEqual(preamble.start, 0)
self.assertEqual(preamble.length, 8)
sync = message_types[0].get_first_label_with_type(FieldType.Function.SYNC)
self.assertEqual(sync.start, 8)
self.assertEqual(sync.length, 4)
checksum = message_types[0].get_first_label_with_type(FieldType.Function.CHECKSUM)
self.assertEqual(checksum.start, 56)
self.assertEqual(checksum.length, 4)
self.assertIsNone(message_types[0].get_first_label_with_type(FieldType.Function.SRC_ADDRESS))
self.assertIsNone(message_types[0].get_first_label_with_type(FieldType.Function.DST_ADDRESS))
self.assertIsNone(message_types[0].get_first_label_with_type(FieldType.Function.LENGTH))
self.assertIsNone(message_types[0].get_first_label_with_type(FieldType.Function.SEQUENCE_NUMBER))
def test_format_finding_rwe(self):
ff, messages = self.get_format_finder_from_protocol_file("rwe.proto.xml", return_messages=True)
ff.run()
sync1, sync2 = "0x9a7d9a7d", "0x67686768"
preprocessor = Preprocessor([np.array(msg.plain_bits, dtype=np.uint8) for msg in messages])
possible_syncs = preprocessor.find_possible_syncs()
self.assertIn(ProtocolGenerator.to_bits(sync1), possible_syncs)
self.assertIn(ProtocolGenerator.to_bits(sync2), possible_syncs)
ack_messages = (3, 5, 7, 9, 11, 13, 15, 17, 20)
ack_message_type = next(mt for mt, messages in ff.existing_message_types.items() if ack_messages[0] in messages)
self.assertTrue(all(ack_msg in ff.existing_message_types[ack_message_type] for ack_msg in ack_messages))
for mt in ff.message_types:
preamble = mt.get_first_label_with_type(FieldType.Function.PREAMBLE)
self.assertEqual(preamble.start, 0)
self.assertEqual(preamble.length, 32)
sync = mt.get_first_label_with_type(FieldType.Function.SYNC)
self.assertEqual(sync.start, 32)
self.assertEqual(sync.length, 32)
length = mt.get_first_label_with_type(FieldType.Function.LENGTH)
self.assertEqual(length.start, 64)
self.assertEqual(length.length, 8)
dst = mt.get_first_label_with_type(FieldType.Function.DST_ADDRESS)
self.assertEqual(dst.length, 24)
if mt == ack_message_type or 1 in ff.existing_message_types[mt]:
self.assertEqual(dst.start, 72)
else:
self.assertEqual(dst.start, 88)
if mt != ack_message_type and 1 not in ff.existing_message_types[mt]:
src = mt.get_first_label_with_type(FieldType.Function.SRC_ADDRESS)
self.assertEqual(src.start, 112)
self.assertEqual(src.length, 24)
elif 1 in ff.existing_message_types[mt]:
# long ack
src = mt.get_first_label_with_type(FieldType.Function.SRC_ADDRESS)
self.assertEqual(src.start, 96)
self.assertEqual(src.length, 24)
crc = mt.get_first_label_with_type(FieldType.Function.CHECKSUM)
self.assertIsNotNone(crc)
def test_homematic(self):
proto_file = get_path_for_data_file("homematic.proto.xml")
protocol = ProtocolAnalyzer(signal=None, filename=proto_file)
protocol.message_types = []
protocol.from_xml_file(filename=proto_file, read_bits=True)
# prevent interfering with preassinged labels
protocol.message_types = [MessageType("Default")]
participants = sorted({msg.participant for msg in protocol.messages})
self.clear_message_types(protocol.messages)
ff = FormatFinder(protocol.messages, participants=participants)
ff.known_participant_addresses.clear()
ff.perform_iteration()
self.assertGreater(len(ff.message_types), 0)
for i, message_type in enumerate(ff.message_types):
preamble = message_type.get_first_label_with_type(FieldType.Function.PREAMBLE)
self.assertEqual(preamble.start, 0)
self.assertEqual(preamble.length, 32)
sync = message_type.get_first_label_with_type(FieldType.Function.SYNC)
self.assertEqual(sync.start, 32)
self.assertEqual(sync.length, 32)
length = message_type.get_first_label_with_type(FieldType.Function.LENGTH)
self.assertEqual(length.start, 64)
self.assertEqual(length.length, 8)
seq = message_type.get_first_label_with_type(FieldType.Function.SEQUENCE_NUMBER)
self.assertEqual(seq.start, 72)
self.assertEqual(seq.length, 8)
src = message_type.get_first_label_with_type(FieldType.Function.SRC_ADDRESS)
self.assertEqual(src.start, 96)
self.assertEqual(src.length, 24)
dst = message_type.get_first_label_with_type(FieldType.Function.DST_ADDRESS)
self.assertEqual(dst.start, 120)
self.assertEqual(dst.length, 24)
checksum = message_type.get_first_label_with_type(FieldType.Function.CHECKSUM)
self.assertEqual(checksum.length, 16)
self.assertIn("CC1101", checksum.checksum.caption)
for msg_index in ff.existing_message_types[message_type]:
msg_len = len(protocol.messages[msg_index])
self.assertEqual(checksum.start, msg_len-16)
self.assertEqual(checksum.end, msg_len)