diff --git a/afsk/__init__.py b/afsk/__init__.py index bd02c87..ab8a9f5 100644 --- a/afsk/__init__.py +++ b/afsk/__init__.py @@ -1,4 +1 @@ - -from afsk import encode -import ax25 - +from .ax25 import FCS \ No newline at end of file diff --git a/afsk/afsk.py b/afsk/afsk.py index 3522ac0..81ec5da 100644 --- a/afsk/afsk.py +++ b/afsk/afsk.py @@ -1,5 +1,3 @@ -# coding=utf-8 - # Bell 202 Audio Frequency Shift Keying # http://n1vg.net/packet/ @@ -11,9 +9,9 @@ from bitarray import bitarray -import audiogen -from audiogen.util import multiply -from audiogen.util import constant +import audiogen_p3 as audiogen +from audiogen_p3.util import multiply +from audiogen_p3.util import constant MARK_HZ = 1200.0 SPACE_HZ = 2200.0 @@ -30,7 +28,7 @@ def encode(binary_data): audiogen module. ''' framed_data = frame(binary_data) - + # set volume to 1/2, preceed packet with 1/20 s silence to allow for startup glitches for sample in itertools.chain( audiogen.silence(1.05), @@ -53,7 +51,7 @@ def modulate(data): clock = (x / BAUD_RATE for x in itertools.count(1)) tones = (MARK_HZ if bit else SPACE_HZ for bit in data) - for boundary, frequency in itertools.izip(clock, tones): + for boundary, frequency in izip(clock, tones): # frequency of current symbol is determined by how much # we advance the signal's phase in each audio frame phase_change_per_sample = TWO_PI / (audiogen.sampler.FRAME_RATE / frequency) diff --git a/afsk/ax25.py b/afsk/ax25.py index 3c899bb..281d4d8 100644 --- a/afsk/ax25.py +++ b/afsk/ax25.py @@ -8,9 +8,9 @@ import argparse from bitarray import bitarray -import audiogen +import audiogen_p3 as audiogen -import afsk +from . import afsk def bit_stuff(data): count = 0 @@ -88,43 +88,31 @@ def fcs_validate(bits): raise Exception("FCS checksum invalid.") class AX25(object): - def __init__( - self, - destination=b"APRS", - source=b"", - digipeaters=(b"RELAY", b"WIDE2-1"), - info=b"\"" - ): + def __init__(self, destination="APRS", source="", digipeaters=("RELAY", "WIDE2-1"), info="\"" ): self.flag = b"\x7e" - self.destination = destination self.source = source self.digipeaters = digipeaters - self.info = info @classmethod def callsign_encode(self, callsign): callsign = callsign.upper() - if callsign.find(b"-") > 0: - callsign, ssid = callsign.split(b"-") + if callsign.find("-") > 0: + callsign, ssid = callsign.split("-") else: - ssid = b"0" + ssid = "0" assert(len(ssid) == 1) assert(len(callsign) <= 6) - callsign = b"{callsign:6s}{ssid}".format(callsign=callsign, ssid=ssid) - + callsign = "{callsign:6s}{ssid}".format(callsign=callsign, ssid=ssid) # now shift left one bit, argh - return b"".join([chr(ord(char) << 1) for char in callsign]) + return b"".join([bytes([char << 1]) for char in callsign.encode('utf-8')]) def encoded_addresses(self): - address_bytes = bytearray(b"{destination}{source}{digis}".format( - destination = AX25.callsign_encode(self.destination), - source = AX25.callsign_encode(self.source), - digis = b"".join([AX25.callsign_encode(digi) for digi in self.digipeaters]) - )) + address = b"".join([AX25.callsign_encode(self.destination),AX25.callsign_encode(self.source), b"".join([AX25.callsign_encode(digi) for digi in self.digipeaters])]) + address_bytes = bytearray(address) # set the low order (first, with eventual little bit endian encoding) bit # in order to flag the end of the address string @@ -133,36 +121,30 @@ def encoded_addresses(self): return address_bytes def header(self): - return b"{addresses}{control}{protocol}".format( - addresses = self.encoded_addresses(), - control = self.control_field, # * 8, - protocol = self.protocol_id, - ) + return b"".join([self.encoded_addresses(), self.control_field, self.protocol_id]) + def packet(self): - return b"{header}{info}{fcs}".format( - flag = self.flag, - header = self.header(), - info = self.info, - fcs = self.fcs() - ) + return b"".join([self.flag,self.header(),self.info.encode('utf-8'),self.fcs()]) + def unparse(self): flag = bitarray(endian="little") flag.frombytes(self.flag) bits = bitarray(endian="little") - bits.frombytes("".join([self.header(), self.info, self.fcs()])) - + bits.frombytes(b"".join([self.header(), self.info.encode('utf-8'), self.fcs()])) return flag + bit_stuff(bits) + flag def __repr__(self): return self.__str__() + def __str__(self): - return b"{source}>{destination},{digis}:{info}".format( + __str__ = "{source}>{destination},{digis}:{info}".format( destination = self.destination, source = self.source, - digis = b",".join(self.digipeaters), + digis = ",".join(self.digipeaters), info = self.info ) + return __str__ @classmethod def parse(cls, bits): @@ -177,7 +159,7 @@ def parse(cls, bits): def fcs(self): content = bitarray(endian="little") - content.frombytes("".join([self.header(), self.info])) + content.frombytes(b"".join([self.header(), self.info.encode('utf-8')])) fcs = FCS() for bit in content: @@ -187,20 +169,8 @@ def fcs(self): return fcs.digest() class UI(AX25): - def __init__( - self, - destination=b"APRS", - source=b"", - digipeaters=(b"WIDE1-1", b"WIDE2-1"), - info=b"" - ): - AX25.__init__( - self, - destination, - source, - digipeaters, - info - ) + def __init__(self, destination="APRS", source="", digipeaters=("WIDE1-1", "WIDE2-1"), info=""): + AX25.__init__(self, destination, source, digipeaters, info) self.control_field = b"\x03" self.protocol_id = b"\xf0" @@ -219,14 +189,14 @@ def main(arguments=None): ) parser.add_argument( '--destination', - default=b'APRS', + default='APRS', help='AX.25 destination address. See http://www.aprs.org/aprs11/tocalls.txt' ) parser.add_argument( '-d', '--digipeaters', '--path', - default=b'WIDE1-1,WIDE2-1', + default='WIDE1-1,WIDE2-1', help='Digipeater path to use. "New Paradigm" recommendations are "WIDE1-1,WIDE2-1" for mobile and "WIDE2-1" for fixed stations. Defaults to "WIDE1-1,WIDE2-1."' ) parser.add_argument( @@ -245,14 +215,14 @@ def main(arguments=None): if args.verbose == 0: logging.basicConfig(level=logging.INFO) - elif args.verbose >=1: + else: logging.basicConfig(level=logging.DEBUG) packet = UI( destination=args.destination, source=args.callsign, info=args.info, - digipeaters=args.digipeaters.split(b','), + digipeaters=args.digipeaters.split(','), ) logger.info(r"Sending packet: '{0}'".format(packet)) diff --git a/setup.py b/setup.py index c9d6019..f454838 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ required_modules = [ 'argparse', - 'audiogen', + 'audiogen_p3', 'bitarray', ] extras_require = { @@ -17,7 +17,7 @@ setup( name="afsk", - version="0.0.3", + version="0.0.4", description=u"Bell 202 Audio Frequency Shift Keying encoder and APRS packet audio tools", author="Christopher H. Casebeer", author_email="", @@ -40,8 +40,8 @@ classifiers=[ "Environment :: Console", "License :: OSI Approved :: BSD License", - "Programming Language :: Python :: 2.6", - "Programming Language :: Python :: 2.7", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.8", "Intended Audience :: Developers", "Intended Audience :: End Users/Desktop", "Topic :: Multimedia :: Sound/Audio", diff --git a/test.py b/test.py index 3b632cf..4076637 100644 --- a/test.py +++ b/test.py @@ -1,7 +1,7 @@ import logging logging.basicConfig(level=logging.DEBUG) -import afsk +from afsk.ax25 import FCS from bitarray import bitarray import crc16 @@ -15,29 +15,29 @@ def test_packet(): - packet = afsk.ax25.UI("APRS", "DUMMY", info=":Test") + packet = ax25.UI("APRS", "DUMMY", info=":Test") generated_bits = packet.unparse() generated_bytes = generated_bits.tobytes() expected_bytes = '~\x82\xa0\xa4\xa6@@`\x88\xaa\x9a\x9a\xb2@`\xae\x92\x88\x8ab@b\xae\x92\x88\x8ad@c\x03\xf0:Test\x9f/\xfb\x01' #expected_bits = bitarray.bitarray('01111110010000010000010100100101011001010000001000000010000001100001000101010101010110010101100101001101000000100000011001110101010010010001000101010001010001100000001001000110011101010100100100010001010100010010011000000010110001101100000000001111010111000010101010100110110011100010111011111001111101001101111110') - print "Unstuffed body BA:\n%s" % unstuffed_body + print ("Unstuffed body BA:\n%s" % unstuffed_body) - print "checksummed_content_bits:\n%r" % "".join([packet.header(), packet.info]) + print ("checksummed_content_bits:\n%r %r" % (packet.header(), packet.info)) - print "BS PACKET:\n%r" % (bs_packet) - print "Packet:\n%s\n%r\nHeader:\n%r" % (packet, packet.packet(), packet.header()) - print "BS HEADER:\n%r" % (bs_header) - print "Generated:\n%r\nExpected:\n%r" % (generated_bytes, expected_bytes) + print ("BS PACKET:\n%r" % (bs_packet)) + print ("Packet:\n%s\n%r\nHeader:\n%r" % (packet, packet.packet(), packet.header())) + print ("BS HEADER:\n%r" % (bs_header)) + print ("Generated:\n%r\nExpected:\n%r" % (generated_bytes, expected_bytes)) #assert generated_bits == expected_bits assert generated_bytes == expected_bytes def test_fcs(): - fcs = afsk.ax25.FCS() - bytes = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + fcs = FCS() + str_bytes = b'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' bits = bitarray() - bits.frombytes(bytes) + bits.frombytes(str_bytes) for bit in bits: fcs.update_bit(bit)