108 lines
5.1 KiB
Python
108 lines
5.1 KiB
Python
import copy
|
|
import unittest
|
|
|
|
from tests.utils_testing import get_path_for_data_file
|
|
from urh import settings
|
|
from urh.awre import AutoAssigner
|
|
from urh.signalprocessing.Encoding import Encoding
|
|
from urh.signalprocessing.Message import Message
|
|
from urh.signalprocessing.MessageType import MessageType
|
|
from urh.signalprocessing.Participant import Participant
|
|
from urh.signalprocessing.ProtocolAnalyzer import ProtocolAnalyzer
|
|
from urh.signalprocessing.Ruleset import Rule, Ruleset, Mode
|
|
|
|
|
|
class TestAutoAssignments(unittest.TestCase):
|
|
def setUp(self):
|
|
self.protocol = ProtocolAnalyzer(None)
|
|
with open(get_path_for_data_file("decoded_bits.txt")) as f:
|
|
for line in f:
|
|
self.protocol.messages.append(Message.from_plain_bits_str(line.replace("\n", "")))
|
|
self.protocol.messages[-1].message_type = self.protocol.default_message_type
|
|
|
|
# Assign participants
|
|
alice = Participant("Alice", "A")
|
|
bob = Participant("Bob", "B")
|
|
alice_indices = {1, 2, 5, 6, 9, 10, 13, 14, 17, 18, 20, 22, 23, 26, 27, 30, 31, 34, 35, 38, 39, 41}
|
|
for i, message in enumerate(self.protocol.messages):
|
|
if i in alice_indices:
|
|
message.participant = alice
|
|
else:
|
|
message.participant = bob
|
|
|
|
self.assertEqual(self.protocol.num_messages, 42)
|
|
self.assertEqual(self.protocol.plain_hex_str[0][16:18], "2d")
|
|
|
|
self.decodings = []
|
|
self.decodings.append(Encoding(['Non Return To Zero (NRZ)']))
|
|
self.decodings.append(Encoding(['Non Return To Zero Inverted (NRZ-I)', 'Invert']))
|
|
self.decodings.append(Encoding(['Manchester I', 'Edge Trigger']))
|
|
self.decodings.append(Encoding(['Manchester II', 'Edge Trigger', 'Invert']))
|
|
self.decodings.append(Encoding(['Differential Manchester', 'Edge Trigger', 'Differential Encoding', ]))
|
|
self.decodings.append(Encoding(['DeWhitening Special', settings.DECODING_DATAWHITENING, '0x9a7d9a7d;0x21;0']))
|
|
self.decodings.append(Encoding(['DeWhitening', settings.DECODING_DATAWHITENING, '0x67686768;0x21;0']))
|
|
|
|
def test_message_type_assign_by_value(self):
|
|
start = 8
|
|
end = 15
|
|
hex_value = "9a7d9a7d"
|
|
|
|
msg_type = MessageType("autotest")
|
|
msg_type.ruleset = Ruleset(Mode.all_apply, [Rule(start, end, "=", hex_value, 1)])
|
|
msg_type.assigned_by_ruleset = True
|
|
|
|
self.protocol.message_types.append(msg_type)
|
|
self.protocol.update_auto_message_types()
|
|
matching_indices = [0, 2, 3, 21, 23, 24]
|
|
for i, message in enumerate(self.protocol.messages):
|
|
if i in matching_indices:
|
|
self.assertEqual(message.message_type, msg_type, msg=str(i))
|
|
else:
|
|
self.assertEqual(message.message_type, self.protocol.default_message_type, msg=str(i))
|
|
|
|
def test_two_assign_participants_by_rssi(self):
|
|
rssis = [[0.65389872, 0.13733707, 0.1226876, 0.73320961, 0.64940965, 0.12463234, 0.12296994,
|
|
0.68053716, 0.66020358, 0.12428901, 0.12312815, 0.69160986, 0.65582329, 0.12536003,
|
|
0.12587067, 0.66315573, 0.66313261, 0.12816505, 0.13491708, 0.66950738, 0.14047238],
|
|
[0.26651502, 0.2073856, 0.13547869, 0.25948182, 0.28204739, 0.13716124, 0.13526952,
|
|
0.24828221, 0.25431305, 0.13681877, 0.13650328, 0.28083691, 0.25550124, 0.13498682,
|
|
0.13611424, 0.2629154, 0.26388499, 0.13780586, 0.13561584, 0.27228078, 0.1356563]]
|
|
|
|
proto1 = ProtocolAnalyzer(None)
|
|
proto2 = ProtocolAnalyzer(None)
|
|
|
|
for i in range(0, len(rssis[0])):
|
|
message = copy.deepcopy(self.protocol.messages[i])
|
|
message.participant = None
|
|
proto1.messages.append(message)
|
|
proto1.messages[i].rssi = rssis[0][i]
|
|
|
|
self.assertEqual(len(proto1.messages), 21)
|
|
|
|
for i in range(0, len(rssis[1])):
|
|
message = copy.deepcopy(self.protocol.messages[21 + i])
|
|
message.participant = None
|
|
proto2.messages.append(message)
|
|
proto2.messages[i].rssi = rssis[1][i]
|
|
|
|
self.assertEqual(len(proto2.messages), 21)
|
|
|
|
alice = Participant(name="Alice", shortname="A")
|
|
alice.relative_rssi = 1
|
|
bob = Participant(name="Bob", shortname="B")
|
|
bob.relative_rssi = 0
|
|
excpected_partis = [[alice, bob, bob, alice, alice, bob, bob,
|
|
alice, alice, bob, bob, alice, alice, bob,
|
|
bob, alice, alice, bob, bob, alice, bob],
|
|
[alice, bob, bob, alice, alice, bob, bob,
|
|
alice, alice, bob, bob, alice, alice, bob,
|
|
bob, alice, alice, bob, bob, alice, bob]]
|
|
|
|
AutoAssigner.auto_assign_participants(proto1.messages, [alice, bob])
|
|
for i, message in enumerate(proto1.messages):
|
|
self.assertEqual(message.participant, excpected_partis[0][i])
|
|
|
|
AutoAssigner.auto_assign_participants(proto2.messages, [alice, bob])
|
|
for i, message in enumerate(proto2.messages):
|
|
self.assertEqual(message.participant, excpected_partis[1][i])
|