This commit is contained in:
RocketGod
2022-09-22 13:46:47 -07:00
parent f65104c2ab
commit e7667c1d93
565 changed files with 165005 additions and 0 deletions

View File

@ -0,0 +1,7 @@
import os
import sys
f = os.readlink(__file__) if os.path.islink(__file__) else __file__
path = os.path.realpath(os.path.join(f, "..", "..", "..", "src"))
if path not in sys.path:
sys.path.insert(0, path)

View File

@ -0,0 +1,88 @@
import random
import numpy as np
from urh.signalprocessing.IQArray import IQArray
from urh.signalprocessing.Message import Message
from urh.signalprocessing.Modulator import Modulator
from urh.signalprocessing.ProtocolAnalyzer import ProtocolAnalyzer
from urh.signalprocessing.Signal import Signal
def demodulate(signal_data, mod_type: str, bit_length, center, noise, tolerance, decoding=None, pause_threshold=8):
signal = Signal("", "")
if isinstance(signal_data, IQArray):
signal.iq_array = signal_data
else:
if signal_data.dtype == np.complex64:
signal.iq_array = IQArray(signal_data.view(np.float32))
else:
signal.iq_array = IQArray(signal_data)
signal.modulation_type = mod_type
signal.samples_per_symbol = bit_length
signal.center = center
signal.noise_threshold = noise
signal.pause_threshold = pause_threshold
if tolerance is not None:
signal.tolerance = tolerance
pa = ProtocolAnalyzer(signal)
if decoding is not None:
pa.decoder = decoding
pa.get_protocol_from_signal()
return pa.decoded_hex_str
def generate_signal(messages: list, modulator: Modulator, snr_db: int, add_noise=True):
result = []
message_powers = []
if isinstance(messages, Message):
messages = [messages]
for msg in messages:
modulated = modulator.modulate(msg.encoded_bits, msg.pause)
if add_noise:
message_powers.append(np.mean(np.abs(modulated[:len(modulated) - msg.pause])))
result.append(modulated)
result = np.concatenate(result)
if not add_noise:
return result
noise = np.random.normal(loc=0, scale=1, size=2 * len(result)).astype(np.float32).view(np.complex64)
# https://stackoverflow.com/questions/23690766/proper-way-to-add-noise-to-signal
snr_ratio = np.power(10, snr_db / 10)
signal_power = np.mean(message_powers)
noise_power = signal_power / snr_ratio
noise = 1 / np.sqrt(2) * noise_power * noise
return result + noise
def generate_message_bits(num_bits=80, preamble="", sync="", eof=""):
bits_to_generate = num_bits - (len(preamble) + len(sync) + len(eof))
if bits_to_generate < 0:
raise ValueError("Preamble and Sync and EOF are together larger than requested num bits")
bytes_to_generate = bits_to_generate // 8
leftover_bits = bits_to_generate % 8
return "".join([preamble, sync]
+ ["{0:08b}".format(random.choice(range(0, 256))) for _ in range(bytes_to_generate)]
+ [random.choice(["0", "1"]) for _ in range(leftover_bits)]
+ [eof]
)
def generate_random_messages(num_messages: int, num_bits: int,
preamble: str, sync: str, eof: str, message_pause: int):
return [
Message.from_plain_bits_str(
generate_message_bits(num_bits, preamble, sync, eof), pause=message_pause
)
for _ in range(num_messages)
]

View File

@ -0,0 +1,153 @@
import os
import sys
import unittest
from urh.ainterpretation import AutoInterpretation
from urh.signalprocessing.Signal import Signal
from tests.auto_interpretation.auto_interpretation_test_util import demodulate
class TestAutoInterpretationIntegration(unittest.TestCase):
SIGNALPATH = "~/GIT/publications/ainterpretation/experiments/signals/"
def get_path(self, signalname):
if sys.platform == "win32":
return None
path = os.path.join(os.path.expanduser(self.SIGNALPATH), signalname)
if os.path.exists(path):
return path
else:
return None
def test_action(self):
path = self.get_path("action_FB_A_B_C_D.coco")
if not path:
return
data = Signal(path, "").iq_array
result = AutoInterpretation.estimate(data)
mod_type, bit_length = result["modulation_type"], result["bit_length"]
center, noise, tolerance = result["center"], result["noise"], result["tolerance"]
self.assertEqual(mod_type, "ASK")
self.assertGreaterEqual(bit_length, 400)
self.assertLessEqual(bit_length, 600)
print("noise", noise, "center", center, "bit length", bit_length, "tolerance", tolerance)
demodulated = demodulate(data, mod_type, bit_length, center, noise, tolerance)
print(demodulated)
self.assertEqual(len(demodulated), 19)
for i in range(2):
self.assertTrue(demodulated[i].startswith("8e8eeeeeee8"))
def test_audi(self):
path = self.get_path("audi_auf_sr5m.coco")
if not path:
return
data = Signal(path, "").iq_array
result = AutoInterpretation.estimate(data)
mod_type, bit_length = result["modulation_type"], result["bit_length"]
center, noise, tolerance = result["center"], result["noise"], result["tolerance"]
self.assertEqual(mod_type, "ASK")
self.assertGreaterEqual(bit_length, 2400)
self.assertLessEqual(bit_length, 2500)
self.assertGreaterEqual(center, 0.005)
self.assertLessEqual(center, 0.32)
print("noise", noise, "center", center, "bit length", bit_length, "tolerance", tolerance)
demodulated = demodulate(data, mod_type, bit_length, center, noise, tolerance)
print(demodulated)
self.assertEqual(len(demodulated), 1)
self.assertTrue(demodulated[0].startswith("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"))
self.assertTrue(demodulated[0].endswith("cad4c"))
def test_brennenstuhl(self):
path = self.get_path("brennenstuhl_signal_ABCD_onoff.coco")
if not path:
return
data = Signal(path, "").iq_array
result = AutoInterpretation.estimate(data)
mod_type, bit_length = result["modulation_type"], result["bit_length"]
center, noise, tolerance = result["center"], result["noise"], result["tolerance"]
self.assertEqual(mod_type, "ASK")
self.assertEqual(bit_length, 300)
print("noise", noise, "center", center, "bit length", bit_length, "tolerance", tolerance)
demodulated = demodulate(data, mod_type, bit_length, center, noise, tolerance, pause_threshold=8)
print(demodulated)
self.assertEqual(len(demodulated), 64)
for i in range(64):
self.assertTrue(demodulated[i].startswith("88888888888"))
self.assertEqual(len(demodulated[i]), len(demodulated[0]))
def test_esaver(self):
path = self.get_path("esaver_test4on.complex")
if not path:
return
data = Signal(path, "").iq_array
result = AutoInterpretation.estimate(data)
mod_type, bit_length = result["modulation_type"], result["bit_length"]
center, noise, tolerance = result["center"], result["noise"], result["tolerance"]
print(center, noise)
self.assertEqual(mod_type, "FSK")
self.assertEqual(bit_length, 100)
print("noise", noise, "center", center, "bit length", bit_length, "tolerance", tolerance)
demodulated = demodulate(data, mod_type, bit_length, center, noise, tolerance)
print(demodulated)
self.assertEqual(len(demodulated), 12)
for i in range(12):
self.assertTrue(demodulated[i].startswith("aaaaaaaa"))
def test_scislo(self):
path = self.get_path("scislo.complex")
if not path:
return
data = Signal(path, "").iq_array
result = AutoInterpretation.estimate(data)
mod_type, bit_length = result["modulation_type"], result["bit_length"]
center, noise, tolerance = result["center"], result["noise"], result["tolerance"]
self.assertEqual(mod_type, "FSK")
self.assertEqual(bit_length, 200)
self.assertGreaterEqual(noise, 0.0120)
print("noise", noise, "center", center, "bit length", bit_length, "tolerance", tolerance)
demodulated = demodulate(data, mod_type, bit_length, center, noise, tolerance)
print(demodulated)
self.assertEqual(len(demodulated), 8)
for i in range(8):
self.assertTrue(demodulated[i].startswith("000000000000aaaaaa"))
def test_vw(self):
path = self.get_path("vw_auf.complex")
if not path:
return
data = Signal(path, "").iq_array
result = AutoInterpretation.estimate(data)
mod_type, bit_length = result["modulation_type"], result["bit_length"]
center, noise, tolerance = result["center"], result["noise"], result["tolerance"]
self.assertEqual(mod_type, "ASK")
self.assertGreaterEqual(bit_length, 2000)
self.assertLessEqual(bit_length, 3000)
demodulated = demodulate(data, mod_type, bit_length, center, noise, tolerance)
print(demodulated)
self.assertEqual(len(demodulated), 1)
self.assertTrue(demodulated[0].startswith("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"))

View File

@ -0,0 +1,104 @@
import unittest
import numpy as np
from tests.auto_interpretation.auto_interpretation_test_util import demodulate
from tests.test_util import get_path_for_data_file
from urh import settings
from urh.ainterpretation import AutoInterpretation
from urh.signalprocessing.Encoding import Encoding
from urh.signalprocessing.Signal import Signal
class TestAutoInterpretationIntegration(unittest.TestCase):
def test_auto_interpretation_fsk(self):
fsk_signal = np.fromfile(get_path_for_data_file("fsk.complex"), dtype=np.float32)
result = AutoInterpretation.estimate(fsk_signal)
mod_type, bit_length = result["modulation_type"], result["bit_length"]
center, noise, tolerance = result["center"], result["noise"], result["tolerance"]
self.assertEqual(mod_type, "FSK")
self.assertEqual(bit_length, 100)
self.assertGreater(tolerance, 0)
self.assertLessEqual(tolerance, 5)
self.assertEqual(demodulate(fsk_signal, mod_type, bit_length, center, noise, tolerance)[0],
"aaaaaaaac626c626f4dc1d98eef7a427999cd239d3f18")
def test_auto_interpretation_ask(self):
ask_signal = np.fromfile(get_path_for_data_file("ask.complex"), dtype=np.float32)
result = AutoInterpretation.estimate(ask_signal)
mod_type, bit_length = result["modulation_type"], result["bit_length"]
center, noise, tolerance = result["center"], result["noise"], result["tolerance"]
self.assertEqual(mod_type, "ASK")
self.assertEqual(bit_length, 300)
self.assertGreater(tolerance, 0)
self.assertLessEqual(tolerance, 6)
self.assertEqual(demodulate(ask_signal, mod_type, bit_length, center, noise, tolerance)[0], "b25b6db6c80")
def test_auto_interpretation_overshoot_ook(self):
data = Signal(get_path_for_data_file("ook_overshoot.complex16s"), "").iq_array
result = AutoInterpretation.estimate(data)
self.assertEqual(result["modulation_type"], "ASK")
self.assertEqual(result["bit_length"], 500)
def test_auto_interpretation_enocean(self):
enocean_signal = np.fromfile(get_path_for_data_file("enocean.complex"), dtype=np.float32)
result = AutoInterpretation.estimate(enocean_signal)
mod_type, bit_length = result["modulation_type"], result["bit_length"]
center, noise, tolerance = result["center"], result["noise"], result["tolerance"]
self.assertEqual(mod_type, "ASK")
self.assertGreaterEqual(center, 0.0077)
self.assertLessEqual(center, 0.0465)
self.assertLessEqual(tolerance, 5)
self.assertEqual(bit_length, 40)
demod = demodulate(enocean_signal, mod_type, bit_length, center, noise, tolerance,
decoding=Encoding(["WSP", settings.DECODING_ENOCEAN]))
self.assertEqual(len(demod), 3)
self.assertEqual(demod[0], demod[2])
self.assertEqual(demod[0], "aa9610002c1c024b")
def test_auto_interpretation_xavax(self):
signal = Signal(get_path_for_data_file("xavax.coco"), "")
result = AutoInterpretation.estimate(signal.iq_array.data)
mod_type, bit_length = result["modulation_type"], result["bit_length"]
center, noise, tolerance = result["center"], result["noise"], result["tolerance"]
self.assertEqual(mod_type, "FSK")
self.assertEqual(bit_length, 100)
demod = demodulate(signal.iq_array.data, mod_type, bit_length, center, noise, tolerance)
self.assertGreaterEqual(len(demod), 5)
for i in range(1, len(demod)):
self.assertTrue(demod[i].startswith("aaaaaaaa"))
def test_auto_interpretation_elektromaten(self):
data = Signal(get_path_for_data_file("elektromaten.complex16s"), "").iq_array
result = AutoInterpretation.estimate(data)
mod_type, bit_length = result["modulation_type"], result["bit_length"]
center, noise, tolerance = result["center"], result["noise"], result["tolerance"]
self.assertEqual(mod_type, "ASK")
self.assertEqual(bit_length, 600)
demodulated = demodulate(data, mod_type, bit_length, center, noise, tolerance, pause_threshold=8)
self.assertEqual(len(demodulated), 11)
for i in range(11):
self.assertTrue(demodulated[i].startswith("8"))
def test_auto_interpretation_homematic(self):
data = Signal(get_path_for_data_file("homematic.complex32s"), "").iq_array
result = AutoInterpretation.estimate(data)
mod_type, bit_length = result["modulation_type"], result["bit_length"]
center, noise, tolerance = result["center"], result["noise"], result["tolerance"]
self.assertEqual(mod_type, "FSK")
self.assertEqual(bit_length, 100)
demodulated = demodulate(data, mod_type, bit_length, center, noise, tolerance)
self.assertEqual(len(demodulated), 2)
for i in range(2):
self.assertTrue(demodulated[i].startswith("aaaaaaaa"))

View File

@ -0,0 +1,54 @@
import unittest
import numpy as np
from urh.ainterpretation import AutoInterpretation
class TestAutoInterpretation(unittest.TestCase):
def __run_merge(self, data):
return list(AutoInterpretation.merge_plateau_lengths(np.array(data, dtype=np.uint64)))
def test_merge_plateau_lengths(self):
self.assertEqual(AutoInterpretation.merge_plateau_lengths([]), [])
self.assertEqual(AutoInterpretation.merge_plateau_lengths([42]), [42])
self.assertEqual(AutoInterpretation.merge_plateau_lengths([100, 100, 100]), [100, 100, 100])
self.assertEqual(self.__run_merge([100, 49, 1, 50, 100]), [100, 100, 100])
self.assertEqual(self.__run_merge([100, 48, 2, 50, 100]), [100, 100, 100])
self.assertEqual(self.__run_merge([100, 100, 67, 1, 10, 1, 21]), [100, 100, 100])
self.assertEqual(self.__run_merge([100, 100, 67, 1, 10, 1, 21, 100, 50, 1, 49]), [100, 100, 100, 100, 100])
def test_estimate_tolerance_from_plateau_lengths(self):
self.assertEqual(AutoInterpretation.estimate_tolerance_from_plateau_lengths([]), None)
self.assertEqual(AutoInterpretation.estimate_tolerance_from_plateau_lengths([10]), None)
self.assertEqual(AutoInterpretation.estimate_tolerance_from_plateau_lengths([100, 49, 1, 50, 100]), 1)
self.assertEqual(AutoInterpretation.estimate_tolerance_from_plateau_lengths([100, 49, 2, 50, 100]), 2)
self.assertEqual(AutoInterpretation.estimate_tolerance_from_plateau_lengths([100, 49, 2, 50, 100, 1]), 2)
self.assertEqual(AutoInterpretation.estimate_tolerance_from_plateau_lengths([8, 8, 6, 1, 1]), 1)
def test_tolerant_greatest_common_divisor(self):
self.assertEqual(AutoInterpretation.get_tolerant_greatest_common_divisor([]), 1)
self.assertEqual(AutoInterpretation.get_tolerant_greatest_common_divisor([22]), 1)
self.assertEqual(AutoInterpretation.get_tolerant_greatest_common_divisor([10, 5, 5]), 5)
self.assertEqual(AutoInterpretation.get_tolerant_greatest_common_divisor([100, 100, 100]), 100)
self.assertEqual(AutoInterpretation.get_tolerant_greatest_common_divisor([100, 100, 200, 300, 100, 400]), 100)
self.assertEqual(AutoInterpretation.get_tolerant_greatest_common_divisor([100, 101, 100, 100]), 100)
self.assertEqual(AutoInterpretation.get_tolerant_greatest_common_divisor([100, 101, 202, 301, 100, 500]), 100)
def test_get_bit_length_from_plateau_length(self):
self.assertEqual(AutoInterpretation.get_bit_length_from_plateau_lengths([]), 0)
self.assertEqual(AutoInterpretation.get_bit_length_from_plateau_lengths([42]), 42)
plateau_lengths = np.array([2, 1, 2, 73, 1, 26, 100, 40, 1, 59, 100, 47, 1, 52, 67, 1, 10, 1, 21, 33, 1, 66, 100, 5, 1, 3, 1, 48, 1, 27, 1, 8], dtype=np.uint64)
merged_lengths = AutoInterpretation.merge_plateau_lengths(plateau_lengths)
self.assertEqual(AutoInterpretation.get_bit_length_from_plateau_lengths(merged_lengths), 100)
plateau_lengths = np.array([1, 292, 331, 606, 647, 286, 645, 291, 334, 601, 339, 601, 338, 602, 337, 603, 338, 604, 336, 605, 337, 600, 338, 605, 646], dtype=np.uint64)
merged_lengths = AutoInterpretation.merge_plateau_lengths(plateau_lengths)
self.assertEqual(AutoInterpretation.get_bit_length_from_plateau_lengths(merged_lengths), 300)
plateau_lengths = np.array([3, 8, 8, 8, 8, 8, 8, 8, 8, 16, 8, 8, 16, 32, 8, 8, 8, 8, 8, 24, 8, 24, 8, 24, 8, 24, 8, 24, 16, 16, 24, 8], dtype=np.uint64)
merged_lengths = AutoInterpretation.merge_plateau_lengths(plateau_lengths)
self.assertEqual(AutoInterpretation.get_bit_length_from_plateau_lengths(merged_lengths), 8)
def test_get_bit_length_from_merged_plateau_lengths(self):
merged_lengths = np.array([40, 40, 40, 40, 40, 30, 50, 30, 90, 40, 40, 80, 160, 30, 50, 30], dtype=np.uint64)
self.assertEqual(AutoInterpretation.get_bit_length_from_plateau_lengths(merged_lengths), 40)

View File

@ -0,0 +1,118 @@
import unittest
import numpy as np
from tests.test_util import get_path_for_data_file
from urh.ainterpretation.AutoInterpretation import detect_center
from urh.cythonext.signal_functions import afp_demod
from urh.signalprocessing.Filter import Filter, FilterType
from urh.signalprocessing.Signal import Signal
class TestCenterDetection(unittest.TestCase):
def test_noiseless_rect(self):
def generate_rectangular_signal(bits: str, bit_len: int):
result = np.zeros(len(bits) * bit_len, dtype=np.float32)
for i, bit in enumerate(bits):
if int(bit) != 0:
result[i * bit_len:(i + 1) * bit_len] = np.ones(bit_len, dtype=np.int8)
return result
rect = generate_rectangular_signal("101010111100011", bit_len=10)
center = detect_center(rect)
self.assertGreaterEqual(center, 0.4)
self.assertLessEqual(center, 0.6)
def test_noisy_rect(self):
data = Signal(get_path_for_data_file("fsk.complex")).iq_array.data
rect = afp_demod(data, 0.008, "FSK", 2)[5:15000]
center = detect_center(rect)
self.assertGreaterEqual(center, -0.0587)
self.assertLessEqual(center, 0.02)
def test_ask_center_detection(self):
data = Signal(get_path_for_data_file("ask.complex")).iq_array.data
rect = afp_demod(data, 0.01111, "ASK", 2)
center = detect_center(rect)
self.assertGreaterEqual(center, 0)
self.assertLessEqual(center, 0.06)
def test_enocean_center_detection(self):
data = Signal(get_path_for_data_file("enocean.complex")).iq_array.data
rect = afp_demod(data, 0.05, "ASK", 2)
messages = [rect[2107:5432], rect[20428:23758], rect[44216:47546]]
for i, msg in enumerate(messages):
center = detect_center(msg)
self.assertGreaterEqual(center, 0.04, msg=str(i))
self.assertLessEqual(center, 0.072, msg=str(i))
def test_ask_50_center_detection(self):
message_indices = [(0, 8000), (18000, 26000), (36000, 44000), (54000, 62000), (72000, 80000)]
data = Signal(get_path_for_data_file("ask50.complex")).iq_array.data
rect = afp_demod(data, 0.0509, "ASK", 2)
for start, end in message_indices:
center = detect_center(rect[start:end])
self.assertGreaterEqual(center, 0.4, msg="{}/{}".format(start, end))
self.assertLessEqual(center, 0.65, msg="{}/{}".format(start, end))
def test_homematic_center_detection(self):
data = Signal(get_path_for_data_file("homematic.complex32s"), "").iq_array.data
rect = afp_demod(data, 0.0012, "FSK", 2)
msg1 = rect[17719:37861]
msg2 = rect[70412:99385]
center1 = detect_center(msg1)
self.assertGreaterEqual(center1, -0.1285)
self.assertLessEqual(center1, -0.0413)
center2 = detect_center(msg2)
self.assertGreaterEqual(center2, -0.1377)
self.assertLessEqual(center2, -0.0367)
def test_noised_homematic_center_detection(self):
data = Signal(get_path_for_data_file("noised_homematic.complex"), "").iq_array.data
rect = afp_demod(data, 0.0, "FSK", 2)
center = detect_center(rect)
self.assertGreater(center, -0.0148)
self.assertLess(center, 0.0024)
def test_fsk_15db_center_detection(self):
data = Signal(get_path_for_data_file("FSK15.complex"), "").iq_array.data
rect = afp_demod(data, 0, "FSK", 2)
center = detect_center(rect)
self.assertGreaterEqual(center, -0.1979)
self.assertLessEqual(center, 0.1131)
def test_fsk_10db_center_detection(self):
data = Signal(get_path_for_data_file("FSK10.complex"), "").iq_array.data
rect = afp_demod(data, 0, "FSK", 2)
center = detect_center(rect)
self.assertGreaterEqual(center, -0.1413)
self.assertLessEqual(center, 0.05)
def test_fsk_live_capture(self):
data = Signal(get_path_for_data_file("fsk_live.coco"), "").iq_array.data
n = 10
moving_average_filter = Filter([1/n for _ in range(n)], filter_type=FilterType.moving_average)
filtered_data = moving_average_filter.apply_fir_filter(data.flatten()).view(np.float32)
filtered_data = filtered_data.reshape((len(filtered_data)//2, 2))
rect = afp_demod(filtered_data, 0.0175, "FSK", 2)
center = detect_center(rect)
self.assertGreaterEqual(center, -0.0148, msg="Filtered")
self.assertLessEqual(center, 0.01, msg="Filtered")
rect = afp_demod(data, 0.0175, "FSK", 2)
center = detect_center(rect)
self.assertGreaterEqual(center, -0.02, msg="Original")
self.assertLessEqual(center, 0.01, msg="Original")

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,79 @@
import unittest
import numpy as np
from tests.test_util import get_path_for_data_file
from urh.ainterpretation.AutoInterpretation import segment_messages_from_magnitudes, merge_message_segments_for_ook
from urh.signalprocessing.IQArray import IQArray
from urh.signalprocessing.Modulator import Modulator
from urh.signalprocessing.Signal import Signal
class TestMessageSegmentation(unittest.TestCase):
def test_segmentation_for_fsk(self):
signal = np.fromfile(get_path_for_data_file("fsk.complex"), dtype=np.complex64)
segments = segment_messages_from_magnitudes(np.abs(signal), 0.0009)
self.assertEqual(len(segments), 1)
self.assertEqual(segments[0], (0, 17742))
def test_segmentation_for_ask(self):
signal = np.fromfile(get_path_for_data_file("ask.complex"), dtype=np.complex64)
segments = segment_messages_from_magnitudes(np.abs(signal), 0.02)
segments = merge_message_segments_for_ook(segments)
self.assertEqual(len(segments), 1)
self.assertEqual(segments[0], (462, 12011))
def test_segmentation_enocean_multiple_messages(self):
signal = np.fromfile(get_path_for_data_file("enocean.complex"), dtype=np.complex64)
segments = segment_messages_from_magnitudes(np.abs(signal), 0.0448)
segments = merge_message_segments_for_ook(segments)
self.assertEqual(len(segments), 3)
self.assertEqual(segments[0], (2107, 5432))
self.assertEqual(segments[1], (20428, 23758))
self.assertEqual(segments[2], (44216, 47546))
def test_message_segmentation_fsk_xavax(self):
signal = Signal(get_path_for_data_file("xavax.coco"), "")
segments = segment_messages_from_magnitudes(signal.iq_array.magnitudes, noise_threshold=0.002)
# Signal starts with overdrive, so one message more
self.assertTrue(len(segments) == 6 or len(segments) == 7)
if len(segments) == 7:
segments = segments[1:]
self.assertEqual(segments,
[(275146, 293697), (321073, 338819), (618213, 1631898), (1657890, 1678041), (1803145, 1820892),
(1846213, 1866364)])
def test_segmentation_ask_50(self):
modulator = Modulator("ask50")
modulator.modulation_type = "ASK"
modulator.parameters[0] = 50
modulator.parameters[1] = 100
modulator.samples_per_symbol = 100
msg1 = modulator.modulate("1010101111", pause=10000)
msg2 = modulator.modulate("1010101110010101", pause=20000)
msg3 = modulator.modulate("1010101010101111", pause=30000)
data = IQArray.concatenate((msg1, msg2, msg3))
segments = segment_messages_from_magnitudes(data.magnitudes, noise_threshold=0)
self.assertEqual(len(segments), 3)
self.assertEqual(segments, [(0, 999), (10999, 12599), (32599, 34199)])
def test_segmentation_elektromaten(self):
signal = Signal(get_path_for_data_file("elektromaten.complex16s"), "")
signal.noise_threshold_relative = 0.1
segments = segment_messages_from_magnitudes(signal.iq_array.magnitudes, noise_threshold=signal.noise_threshold)
segments = merge_message_segments_for_ook(segments)
self.assertEqual(len(segments), 11)
def test_ook_merge(self):
input = [(26728, 27207), (28716, 29216), (30712, 32190), (32695, 34178), (34686, 35181), (36683, 38181), (38670, 39165), (40668, 42154), (42659, 44151), (44642, 46139), (46634, 47121), (47134, 47145), (48632, 50129), (50617, 51105), (52612, 54089), (54100, 54113), (54601, 56095), (56592, 58075), (58581, 59066), (59076, 59091), (60579, 61081), (62567, 64063), (64559, 66053), (66548, 67035), (68539, 69031), (70533, 71035), (72527, 73008), (73019, 73035), (74522, 75006), (90465, 90958), (92456, 92944), (94455, 95935), (96441, 97930), (98437, 98937), (100430, 101914), (102414, 102901), (104413, 105889), (106398, 107895), (108389, 109873), (110385, 110877), (112374, 113853), (114367, 114862), (116355, 117842), (118344, 119826), (120340, 121824), (122324, 122825), (124323, 124821), (126316, 127807), (128300, 129782), (130293, 130777), (132280, 132774), (134275, 134773), (136266, 136767), (138265, 138751), (154205, 154694), (156206, 156703), (158191, 159685), (160189, 161683), (162176, 162667), (164164, 165657), (166159, 166648), (168147, 169631), (170145, 171621), (172131, 173611), (174125, 174607), (176118, 177600), (178105, 178590), (180093, 181574), (181585, 181599), (182090, 183573), (184074, 185565), (186070, 186553), (188061, 188555), (190052, 191533), (192043, 193523), (194034, 194518), (196021, 196510), (198012, 198503), (200014, 200496), (202003, 202485), (202498, 202511), (217953, 218430), (218442, 218457), (219940, 220426), (221935, 223431), (223926, 225409), (225912, 226399), (227912, 229387), (229896, 230382), (231886, 233369), (233383, 233393), (233882, 235375), (235874, 237357), (237858, 238361), (239850, 241343), (241844, 242328), (243840, 245331), (245828, 247306), (247820, 249296), (249811, 250298), (251803, 252283), (252296, 252309), (253790, 255271), (255778, 257276), (257774, 258258), (259764, 260257), (261760, 262239), (263744, 264241), (265744, 266225), (281684, 282171), (283676, 284163), (285668, 287153), (287665, 289149), (289654, 290145), (291642, 293131), (293633, 294120), (295629, 297104), (297116, 297129)]
merged = merge_message_segments_for_ook(input)
self.assertEqual(len(merged), 5)

View File

@ -0,0 +1,41 @@
import unittest
from tests.test_util import get_path_for_data_file
from urh.ainterpretation import AutoInterpretation
import numpy as np
from urh.signalprocessing.Modulator import Modulator
class TestModulationDetection(unittest.TestCase):
def test_fsk_detection(self):
fsk_signal = np.fromfile(get_path_for_data_file("fsk.complex"), dtype=np.complex64)[5:15000]
mod = AutoInterpretation.detect_modulation(fsk_signal, wavelet_scale=4, median_filter_order=7)
self.assertEqual(mod, "FSK")
def test_ook_detection(self):
data = np.fromfile(get_path_for_data_file("ask.complex"), dtype=np.complex64)
mod = AutoInterpretation.detect_modulation(data)
self.assertEqual(mod, "OOK")
data = np.fromfile(get_path_for_data_file("ASK_mod.complex"), dtype=np.complex64)
mod = AutoInterpretation.detect_modulation(data)
self.assertEqual(mod, "OOK")
def test_ask50_detection(self):
message_indices = [(0, 8000), (18000, 26000), (36000, 44000), (54000, 62000), (72000, 80000)]
data = np.fromfile(get_path_for_data_file("ask50.complex"), dtype=np.complex64)
for start, end in message_indices:
mod = AutoInterpretation.detect_modulation(data[start:end])
self.assertEqual(mod, "ASK", msg="{}/{}".format(start, end))
def test_psk_detection(self):
modulator = Modulator("")
modulator.modulation_type = "PSK"
modulator.parameters[0] = -90
modulator.parameters[1] = 90
data = modulator.modulate("10101010111000")
mod = AutoInterpretation.detect_modulation(data)
self.assertEqual(mod, "PSK")

View File

@ -0,0 +1,56 @@
import unittest
import numpy as np
from urh.ainterpretation.AutoInterpretation import detect_noise_level
from tests.test_util import get_path_for_data_file
from urh.signalprocessing.Signal import Signal
class TestNoiseDetection(unittest.TestCase):
def test_for_fsk_signal(self):
data = np.fromfile(get_path_for_data_file("fsk.complex"), dtype=np.complex64)
noise_level = detect_noise_level(np.abs(data))
self.assertGreaterEqual(noise_level, 0.0005)
self.assertLessEqual(noise_level, 0.009)
def test_for_ask_signal(self):
data = np.fromfile(get_path_for_data_file("ask.complex"), dtype=np.complex64)
noise_level = detect_noise_level(np.abs(data))
self.assertGreaterEqual(noise_level, 0.0110)
self.assertLessEqual(noise_level, 0.043)
def test_for_fsk_signal_with_little_noise_before_and_after(self):
data = np.concatenate((np.fromfile(get_path_for_data_file("fsk.complex"), dtype=np.complex64)[-1000:],
np.fromfile(get_path_for_data_file("fsk.complex"), dtype=np.complex64)[0:18800]))
noise_level = detect_noise_level(np.abs(data))
self.assertGreaterEqual(noise_level, 0.0005)
self.assertLessEqual(noise_level, 0.009)
def test_for_enocean_ask_signal(self):
data = np.fromfile(get_path_for_data_file("enocean.complex"), dtype=np.complex64)
noise_level = detect_noise_level(np.abs(data))
self.assertGreaterEqual(noise_level, 0.01)
self.assertLessEqual(noise_level, 0.28)
def test_for_noiseless_signal(self):
data = np.fromfile(get_path_for_data_file("fsk.complex"), dtype=np.complex64)[0:17639]
noise_level = detect_noise_level(np.abs(data))
self.assertEqual(noise_level, 0)
def test_multi_messages_different_rssi(self):
data = Signal(get_path_for_data_file("multi_messages_different_rssi.coco"), "").iq_array.data
noise_level = detect_noise_level(np.abs(data))
self.assertGreater(noise_level, 0.001)
self.assertLess(noise_level, 0.002)
def test_for_psk_signal(self):
data = Signal(get_path_for_data_file("psk_generated.complex"), "").iq_array.data
noise_level = detect_noise_level(np.abs(data))
self.assertGreater(noise_level, 0.0067)
self.assertLessEqual(noise_level, 0.0081)
def test_for_noisy_fsk_15db_signal(self):
data = Signal(get_path_for_data_file("FSK15.complex"), "").iq_array.data
noise_level = detect_noise_level(np.abs(data))
self.assertEqual(noise_level, 0)