HackRF-Treasure-Chest/Software/urh/tests/awre/test_awre_preprocessing.py

257 lines
12 KiB
Python
Raw Normal View History

2022-09-22 19:41:47 +02:00
import random
from tests.awre.AWRETestCase import AWRETestCase
from urh.awre.FormatFinder import FormatFinder
from urh.awre.MessageTypeBuilder import MessageTypeBuilder
from urh.awre.Preprocessor import Preprocessor
from urh.awre.ProtocolGenerator import ProtocolGenerator
from urh.signalprocessing.FieldType import FieldType
from urh.signalprocessing.Message import Message
from urh.signalprocessing.Participant import Participant
from urh.signalprocessing.ProtocolAnalyzer import ProtocolAnalyzer
import numpy as np
class TestAWREPreprocessing(AWRETestCase):
def test_very_simple_sync_word_finding(self):
preamble = "10101010"
sync = "1101"
pg = self.build_protocol_generator(preamble_syncs=[(preamble, sync)],
num_messages=(20,),
data=(lambda i: 10 * i,))
preprocessor = Preprocessor([np.array(msg.plain_bits, dtype=np.uint8) for msg in pg.protocol.messages])
possible_syncs = preprocessor.find_possible_syncs()
#self.save_protocol("very_simple_sync_test", pg)
self.assertGreaterEqual(len(possible_syncs), 1)
self.assertEqual(preprocessor.find_possible_syncs()[0], sync)
def test_simple_sync_word_finding(self):
preamble = "10101010"
sync = "1001"
pg = self.build_protocol_generator(preamble_syncs=[(preamble, sync), (preamble + "1010", sync)],
num_messages=(20, 5),
data=(lambda i: 10 * i, lambda i: 22 * i))
preprocessor = Preprocessor([np.array(msg.plain_bits, dtype=np.uint8) for msg in pg.protocol.messages])
possible_syncs = preprocessor.find_possible_syncs()
#self.save_protocol("simple_sync_test", pg)
self.assertGreaterEqual(len(possible_syncs), 1)
self.assertEqual(preprocessor.find_possible_syncs()[0], sync)
def test_sync_word_finding_odd_preamble(self):
preamble = "0101010"
sync = "1101"
pg = self.build_protocol_generator(preamble_syncs=[(preamble, sync), (preamble + "10", sync)],
num_messages=(20, 5),
data=(lambda i: 10 * i, lambda i: i))
# If we have a odd preamble length, the last bit of the preamble is counted to the sync
preprocessor = Preprocessor([np.array(msg.plain_bits, dtype=np.uint8) for msg in pg.protocol.messages])
possible_syncs = preprocessor.find_possible_syncs()
#self.save_protocol("odd_preamble", pg)
self.assertEqual(preamble[-1] + sync[:-1], possible_syncs[0])
def test_sync_word_finding_special_preamble(self):
preamble = "111001110011100"
sync = "0110"
pg = self.build_protocol_generator(preamble_syncs=[(preamble, sync), (preamble + "10", sync)],
num_messages=(20, 5),
data=(lambda i: 10 * i, lambda i: i))
# If we have a odd preamble length, the last bit of the preamble is counted to the sync
preprocessor = Preprocessor([np.array(msg.plain_bits, dtype=np.uint8) for msg in pg.protocol.messages])
possible_syncs = preprocessor.find_possible_syncs()
#self.save_protocol("special_preamble", pg)
self.assertEqual(sync, possible_syncs[0])
def test_sync_word_finding_errored_preamble(self):
preamble = "00010101010" # first bits are wrong
sync = "0110"
pg = self.build_protocol_generator(preamble_syncs=[(preamble, sync), (preamble + "10", sync)],
num_messages=(20, 5),
data=(lambda i: 10 * i, lambda i: i))
# If we have a odd preamble length, the last bit of the preamble is counted to the sync
preprocessor = Preprocessor([np.array(msg.plain_bits, dtype=np.uint8) for msg in pg.protocol.messages])
possible_syncs = preprocessor.find_possible_syncs()
#self.save_protocol("errored_preamble", pg)
self.assertIn(preamble[-1] + sync[:-1], possible_syncs)
def test_sync_word_finding_with_two_sync_words(self):
preamble = "0xaaaa"
sync1, sync2 = "0x1234", "0xcafe"
pg = self.build_protocol_generator(preamble_syncs=[(preamble, sync1), (preamble, sync2)],
num_messages=(15, 10),
data=(lambda i: 12 * i, lambda i: 16 * i))
preprocessor = Preprocessor([np.array(msg.plain_bits, dtype=np.uint8) for msg in pg.protocol.messages])
possible_syncs = preprocessor.find_possible_syncs()
#self.save_protocol("two_syncs", pg)
self.assertGreaterEqual(len(possible_syncs), 2)
self.assertIn(ProtocolGenerator.to_bits(sync1), possible_syncs)
self.assertIn(ProtocolGenerator.to_bits(sync2), possible_syncs)
def test_multiple_sync_words(self):
hex_messages = [
"aaS1234",
"aaScafe",
"aaSdead",
"aaSbeef",
]
for i in range(1, 256):
messages = []
sync = "{0:02x}".format(i)
if sync.startswith("a"):
continue
for msg in hex_messages:
messages.append(Message.from_plain_hex_str(msg.replace("S", sync)))
for i in range(1, len(messages)):
messages[i].message_type = messages[0].message_type
ff = FormatFinder(messages)
ff.run()
self.assertEqual(len(ff.message_types), 1, msg=sync)
preamble = ff.message_types[0].get_first_label_with_type(FieldType.Function.PREAMBLE)
self.assertEqual(preamble.start, 0, msg=sync)
self.assertEqual(preamble.length, 8, msg=sync)
sync = ff.message_types[0].get_first_label_with_type(FieldType.Function.SYNC)
self.assertEqual(sync.start, 8, msg=sync)
self.assertEqual(sync.length, 8, msg=sync)
def test_sync_word_finding_varying_message_length(self):
hex_messages = [
"aaaa9a7d0f1337471100009a44ebdd13517bf9",
"aaaa9a7d4747111337000134a4473c002b909630b11df37e34728c79c60396176aff2b5384e82f31511581d0cbb4822ad1b6734e2372ad5cf4af4c9d6b067e5f7ec359ec443c3b5ddc7a9e",
"aaaa9a7d0f13374711000205ee081d26c86b8c",
"aaaa9a7d474711133700037cae4cda789885f88f5fb29adc9acf954cb2850b9d94e7f3b009347c466790e89f2bcd728987d4670690861bbaa120f71f14d4ef8dc738a6d7c30e7d2143c267",
"aaaa9a7d0f133747110004c2906142300427f3"
]
messages = [Message.from_plain_hex_str(hex_msg) for hex_msg in hex_messages]
for i in range(1, len(messages)):
messages[i].message_type = messages[0].message_type
ff = FormatFinder(messages)
ff.run()
self.assertEqual(len(ff.message_types), 1)
preamble = ff.message_types[0].get_first_label_with_type(FieldType.Function.PREAMBLE)
self.assertEqual(preamble.start, 0)
self.assertEqual(preamble.length, 16)
sync = ff.message_types[0].get_first_label_with_type(FieldType.Function.SYNC)
self.assertEqual(sync.start, 16)
self.assertEqual(sync.length, 16)
def test_sync_word_finding_common_prefix(self):
"""
Messages are very similar (odd and even ones are the same)
However, they do not have two different sync words!
The algorithm needs to check for a common prefix of the two found sync words
:return:
"""
sync = "0x1337"
num_messages = 10
alice = Participant("Alice", address_hex="dead01")
bob = Participant("Bob", address_hex="beef24")
mb = MessageTypeBuilder("protocol_with_one_message_type")
mb.add_label(FieldType.Function.PREAMBLE, 72)
mb.add_label(FieldType.Function.SYNC, 16)
mb.add_label(FieldType.Function.LENGTH, 8)
mb.add_label(FieldType.Function.SRC_ADDRESS, 24)
mb.add_label(FieldType.Function.DST_ADDRESS, 24)
mb.add_label(FieldType.Function.SEQUENCE_NUMBER, 16)
pg = ProtocolGenerator([mb.message_type],
syncs_by_mt={mb.message_type: "0x1337"},
preambles_by_mt={mb.message_type: "10" * 36},
participants=[alice, bob])
random.seed(0)
for i in range(num_messages):
if i % 2 == 0:
source, destination = alice, bob
data_length = 8
else:
source, destination = bob, alice
data_length = 16
pg.generate_message(data=pg.decimal_to_bits(random.randint(0, 2 ** (data_length - 1)), data_length),
source=source, destination=destination)
preprocessor = Preprocessor([np.array(msg.plain_bits, dtype=np.uint8) for msg in pg.protocol.messages])
possible_syncs = preprocessor.find_possible_syncs()
#self.save_protocol("sync_by_common_prefix", pg)
self.assertEqual(len(possible_syncs), 1)
# +0000 is okay, because this will get fixed by correction in FormatFinder
self.assertIn(possible_syncs[0], [ProtocolGenerator.to_bits(sync), ProtocolGenerator.to_bits(sync) + "0000"])
def test_with_given_preamble_and_sync(self):
preamble = "10101010"
sync = "10011"
pg = self.build_protocol_generator(preamble_syncs=[(preamble, sync)],
num_messages=(20,),
data=(lambda i: 10 * i,))
# If we have a odd preamble length, the last bit of the preamble is counted to the sync
preprocessor = Preprocessor([np.array(msg.plain_bits, dtype=np.uint8) for msg in pg.protocol.messages],
existing_message_types={i: msg.message_type for i, msg in
enumerate(pg.protocol.messages)})
preamble_starts, preamble_lengths, sync_len = preprocessor.preprocess()
#self.save_protocol("given_preamble", pg)
self.assertTrue(all(preamble_start == 0 for preamble_start in preamble_starts))
self.assertTrue(all(preamble_length == len(preamble) for preamble_length in preamble_lengths))
self.assertEqual(sync_len, len(sync))
@staticmethod
def build_protocol_generator(preamble_syncs: list, num_messages: tuple, data: tuple) -> ProtocolGenerator:
message_types = []
preambles_by_mt = dict()
syncs_by_mt = dict()
assert len(preamble_syncs) == len(num_messages) == len(data)
for i, (preamble, sync_word) in enumerate(preamble_syncs):
assert isinstance(preamble, str)
assert isinstance(sync_word, str)
preamble, sync_word = map(ProtocolGenerator.to_bits, (preamble, sync_word))
mb = MessageTypeBuilder("message type #{0}".format(i))
mb.add_label(FieldType.Function.PREAMBLE, len(preamble))
mb.add_label(FieldType.Function.SYNC, len(sync_word))
message_types.append(mb.message_type)
preambles_by_mt[mb.message_type] = preamble
syncs_by_mt[mb.message_type] = sync_word
pg = ProtocolGenerator(message_types, preambles_by_mt=preambles_by_mt, syncs_by_mt=syncs_by_mt)
for i, msg_type in enumerate(message_types):
for j in range(num_messages[i]):
if callable(data[i]):
msg_data = pg.decimal_to_bits(data[i](j), num_bits=8)
else:
msg_data = data[i]
pg.generate_message(message_type=msg_type, data=msg_data)
return pg