147 lines
5.6 KiB
Python
147 lines
5.6 KiB
Python
|
import array
|
||
|
import os
|
||
|
import tempfile
|
||
|
import time
|
||
|
import unittest
|
||
|
|
||
|
import numpy as np
|
||
|
from PyQt5.QtCore import QDir
|
||
|
|
||
|
from urh.cythonext.signal_functions import modulate_c, get_oqpsk_bits
|
||
|
from urh.signalprocessing.Modulator import Modulator
|
||
|
from urh.signalprocessing.ProtocolAnalyzer import ProtocolAnalyzer
|
||
|
from urh.signalprocessing.Signal import Signal
|
||
|
|
||
|
|
||
|
class TestModulator(unittest.TestCase):
|
||
|
def setUp(self):
|
||
|
self.modulation_data = array.array("B", [True, False, False, False, True, True, False, True])
|
||
|
self.samples_per_symbol = 100
|
||
|
self.pause = 1000
|
||
|
|
||
|
self.total_samples = len(self.modulation_data) * self.samples_per_symbol + self.pause
|
||
|
|
||
|
def test_ask_fsk_psk_modulation(self):
|
||
|
modulations = ["ASK", "FSK", "PSK"]
|
||
|
|
||
|
for modulation in modulations:
|
||
|
modulator = Modulator(modulation)
|
||
|
tmp_dir = QDir.tempPath()
|
||
|
filename = "{0}_mod.complex".format(modulation)
|
||
|
filename = os.path.join(tmp_dir, filename)
|
||
|
modulator.modulation_type = modulation
|
||
|
modulator.samples_per_symbol = self.samples_per_symbol
|
||
|
|
||
|
if modulation == "ASK":
|
||
|
modulator.parameters[0] = 0
|
||
|
modulator.parameters[1] = 100
|
||
|
elif modulation == "FSK":
|
||
|
modulator.parameters[0] = 1000
|
||
|
modulator.parameters[1] = 2500
|
||
|
elif modulation == "PSK":
|
||
|
modulator.parameters[0] = -90
|
||
|
modulator.parameters[1] = 90
|
||
|
|
||
|
modulator.modulate(self.modulation_data, self.pause).tofile(filename)
|
||
|
|
||
|
signal = Signal(filename, modulation)
|
||
|
signal.modulation_type = modulation
|
||
|
signal.samples_per_symbol = self.samples_per_symbol
|
||
|
if modulation == "ASK":
|
||
|
signal.center = 0.5
|
||
|
elif modulation == "FSK":
|
||
|
signal.center = 0.0097
|
||
|
elif modulation == "PSK":
|
||
|
signal.center = 0
|
||
|
self.assertEqual(signal.num_samples, self.total_samples, msg=modulation)
|
||
|
pa = ProtocolAnalyzer(signal)
|
||
|
pa.get_protocol_from_signal()
|
||
|
self.assertEqual(1, len(pa.messages), msg=modulation)
|
||
|
self.assertEqual(self.modulation_data, pa.messages[0].plain_bits, msg=modulation)
|
||
|
|
||
|
def test_gfsk(self):
|
||
|
target_file = os.path.join(tempfile.gettempdir(), "test.complex")
|
||
|
|
||
|
modulator = Modulator("gfsk")
|
||
|
modulator.modulation_type = "GFSK"
|
||
|
modulator.samples_per_symbol = 100
|
||
|
modulator.sample_rate = 1e6
|
||
|
modulator.parameters[1] = 20e3
|
||
|
modulator.parameters[0] = -10e3
|
||
|
data1 = modulator.modulate([True, False, False, True, False], 9437)
|
||
|
data2 = modulator.modulate([True, False, True], 9845) #, start=len(s))
|
||
|
data3 = modulator.modulate([True, False, True, False], 8458) #, start=len(s))
|
||
|
s = np.concatenate((data1, data2, data3))
|
||
|
|
||
|
s.tofile(target_file)
|
||
|
|
||
|
pa = ProtocolAnalyzer(Signal(target_file, "test", modulation="FSK"))
|
||
|
pa.get_protocol_from_signal()
|
||
|
|
||
|
def test_performance(self):
|
||
|
t = time.time()
|
||
|
modulator = Modulator("Perf")
|
||
|
modulator.modulation_type = "FSK"
|
||
|
modulator.modulate([True] * 1000, pause=10000000)
|
||
|
elapsed = time.time() - t
|
||
|
self.assertLess(elapsed, 0.5)
|
||
|
|
||
|
def test_c_modulation_method_ask(self):
|
||
|
bits = array.array("B", [1, 0, 1, 0, 1, 1, 0, 0, 0, 1])
|
||
|
parameters = array.array("f", [0, 0.25, 0.5, 1])
|
||
|
result = modulate_c(bits, 100, "ASK", parameters, 2, 1, 40e3, 0, 1e6, 1000, 0)
|
||
|
|
||
|
#result.tofile("/tmp/test.complex")
|
||
|
|
||
|
def test_c_modulation_method_fsk(self):
|
||
|
bits = array.array("B", [1, 0, 1, 0, 1, 1, 0, 0, 0, 1])
|
||
|
parameters = array.array("f", [-20e3, -10e3, 10e3, 20e3])
|
||
|
result = modulate_c(bits, 100, "FSK", parameters, 2, 1, 40e3, 0, 1e6, 1000, 0)
|
||
|
|
||
|
# result.tofile("/tmp/test_4fsk.complex")
|
||
|
|
||
|
def test_c_modulation_method_psk(self):
|
||
|
bits = array.array("B", [0, 0, 0, 0, 1, 0, 1, 0, 1, 1, 0, 0, 1, 1, 0, 1])
|
||
|
parameters = array.array("f", [np.pi/4, 3*np.pi/4, 5*np.pi/4, 7*np.pi/4])
|
||
|
result = modulate_c(bits, 100, "PSK", parameters, 2, 1, 40e3, 0, 1e6, 1000, 0)
|
||
|
|
||
|
# result.tofile("/tmp/test_psk.complex")
|
||
|
|
||
|
def test_get_oqpsk_bits(self):
|
||
|
"""
|
||
|
Should delay the Q stream (odd bits) by one bit. So the sequence
|
||
|
11 01 00 10 01 should become:
|
||
|
IQ IQ IQ IQ IQ
|
||
|
|
||
|
1X 01 01 10 00 X1
|
||
|
|
||
|
whereby the X will set to amplitude zero during modulation so it is not important which bit value gets written
|
||
|
|
||
|
#TODO: This does not quite work yet. Fix it, when we have a test signal available.
|
||
|
|
||
|
:return:
|
||
|
"""
|
||
|
bits = array.array("B", [1, 1, 0, 1, 0, 0, 1, 0, 0, 1])
|
||
|
|
||
|
oqpsk_bits = get_oqpsk_bits(bits)
|
||
|
|
||
|
self.assertEqual(len(oqpsk_bits), len(bits) + 2)
|
||
|
|
||
|
self.assertEqual(oqpsk_bits[0], 1)
|
||
|
self.assertEqual(oqpsk_bits[-1], 1)
|
||
|
self.assertEqual(array.array("B", [0, 1, 0, 1, 1, 0, 0, 0]), array.array("B", oqpsk_bits[2:-2]))
|
||
|
|
||
|
def test_c_modulation_method_oqpsk(self):
|
||
|
bits = array.array("B", [0, 0, 0, 0, 1, 0, 1, 0, 1, 1, 0, 0, 1, 1, 0, 1])
|
||
|
parameters = array.array("f", [np.pi/4, 3*np.pi/4, 5*np.pi/4, 7*np.pi/4])
|
||
|
result = modulate_c(bits, 100, "OQPSK", parameters, 2, 1, 40e3, 0, 1e6, 1000, 0)
|
||
|
|
||
|
# result.tofile("/tmp/test_oqpsk.complex")
|
||
|
|
||
|
def test_c_modulation_method_gfsk(self):
|
||
|
bits = array.array("B", [1, 0, 1, 0, 1, 1, 0, 0, 0, 1])
|
||
|
parameters = array.array("f", [-10e3, 10e3])
|
||
|
result = modulate_c(bits, 100, "GFSK", parameters, 1, 1, 40e3, 0, 1e6, 1000, 0)
|
||
|
|
||
|
# result.tofile("/tmp/test_gfsk.complex")
|