diff --git a/mark/.idea/inspectionProfiles/Project_Default.xml b/mark/.idea/inspectionProfiles/Project_Default.xml index 920d523..b10669a 100644 --- a/mark/.idea/inspectionProfiles/Project_Default.xml +++ b/mark/.idea/inspectionProfiles/Project_Default.xml @@ -8,5 +8,12 @@ + + + \ No newline at end of file diff --git a/mark/src/app.py b/mark/src/app.py index 5b47b95..52621b1 100644 --- a/mark/src/app.py +++ b/mark/src/app.py @@ -3,6 +3,8 @@ from Tools.scripts.pep384_macrocheck import parse_file from fastapi import FastAPI + +from mp3 import MP3 from mp3_file_io import MP3FileIo from fastapi.responses import StreamingResponse from fastapi.responses import Response @@ -16,7 +18,7 @@ async def health_check(): @app.get("/stream-mp3/{file_name:path}") -async def stream_mp3(file_name: str): +async def stream_mp3(file_name: str, speed: int = 1, updown: str = "down"): print(f"Current working directory: {os.getcwd()}") path = f"../resource/{file_name}" @@ -32,20 +34,24 @@ async def stream_mp3(file_name: str): finally: mp3_io.close() -# 추후 MP3 클래스를 사용하여 MP3 파일을 일부만 읽어오는 방법을 구현할 수 있습니다. - # mp3 = MP3(mp3_io) - # mp3.set_header() - # mp3.set_frame_size() - # mp3.set_frame_count() - # mp3.set_play_time() + mp3 = MP3(mp3_io) + mp3.set_header() + mp3.set_frame_size() + mp3.set_frame_count() + mp3.set_play_time() + + if updown == "down": + mp3 = mp3.change_speed_down(speed) + elif updown == "up": + mp3 = mp3.change_speed_up(speed) headers = { - "Content-Disposition": f"inline; filename={os.path.basename(path)}", + "Content-Disposition": f"inline;", "Content-Type": "audio/mpeg", - "Content-Length": str(mp3_io.get_size()) + "Content-Length": str(len(mp3.file_io.file_bytes)) } - return StreamingResponse(io.BytesIO(mp3_io.get_all()), headers=headers) + return StreamingResponse(io.BytesIO(mp3.file_io.file_bytes), headers=headers) if __name__ == "__main__": import uvicorn diff --git a/mark/src/main.py b/mark/src/main.py index 5290d9d..00b8b5f 100644 --- a/mark/src/main.py +++ b/mark/src/main.py @@ -13,5 +13,5 @@ print(mp3.header) print(mp3.frame_size) -print(mp3.frame_count) -print(mp3.play_time) \ No newline at end of file +print(mp3._frame_count) +print(mp3._play_time) \ No newline at end of file diff --git a/mark/src/mp3.py b/mark/src/mp3.py index 42de87b..4f32995 100644 --- a/mark/src/mp3.py +++ b/mark/src/mp3.py @@ -1,51 +1,34 @@ -import mp3_header_factory +from mp3_file_io_factory import MP3FileIoFactory +from mp3_frame import MP3Frame class MP3: - def __init__(self, file_reader): - self.play_time = None - self.frame_size = 0 - self.frame_count = 0 - self.header = None - self.file_io = file_reader + def __init__(self): + self._frames = list[MP3Frame] + self._frame_count = 0 + self._play_time = 0 - def set_header(self): - header_bytes = self.file_io.get(4) - self.header = mp3_header_factory.MP3HeaderFactory.create(header_bytes) - - def get_header(self): - return self.header - - def set_frame_size(self): - self.frame_size = self.header.calc_frame_size() - - def get_frame_size(self): - return self.frame_size - - def set_frame_count(self): - offset = 4 # 태그 없어서 고정 offset 사용 - self.frame_count = (self.file_io.get_size() - offset) // self.frame_size + def set_frames(self, frames): + self._frames = frames + self._frame_count = len(frames) + self._frame_count = len(frames) + self._play_time = self._frame_count * 1152 / self._frames[0].header.sampling_rate def get_frame_count(self): - return self.set_frame_count - - def set_play_time(self): - frame_duration = 1152 / self.header.sampling_rate - self.play_time = self.frame_count * frame_duration + return self._frame_count def get_play_time(self): - return self.play_time + return self._play_time + - def cut(self, start_time, end_time): - start_frame = int(start_time * self.header.sampling_rate / 1152) - end_frame = int(end_time * self.header.sampling_rate / 1152) + def save(self, file_path): + new_bytes = self.to_bytes() + MP3FileIoFactory.create_with_bytes(new_bytes).save(file_path) - frame_size = self.header.calc_frame_size() - start_byte = start_frame * frame_size - end_byte = end_frame * frame_size + def to_bytes(self) -> bytes: + new_bytes = b'' - io = self.file_io.cut_frames(start_byte, end_byte) - return MP3(io) + for frame in self._frames: + new_bytes += frame.to_bytes() - def save(self, file_path): - self.file_io.save(file_path) + return new_bytes diff --git a/mark/src/mp3Factory.py b/mark/src/mp3Factory.py new file mode 100644 index 0000000..73d4265 --- /dev/null +++ b/mark/src/mp3Factory.py @@ -0,0 +1,56 @@ +from mp3 import MP3 +from mp3_file_io import MP3FileIo +from mp3_frame_factory import Mp3FrameFactory + + +class MP3Factory: + @staticmethod + def create_by_file_path(file_path: str) -> MP3: + io = MP3FileIo() + io.open(file_path) + frames = Mp3FrameFactory.create(io) + io.close() + + return MP3Factory.create_by_frames(frames) + + @staticmethod + def create_by_frames(frames) -> MP3: + mp3 = MP3() + mp3.set_frames(frames) + return mp3 + + @staticmethod + def create_by_cut(mp3:MP3, start_time:int, end_time:int): + # 첫번째 헤더를 기준으로 시작 프레임과 끝 프레임을 계산한다 (CBR만 가능) + first_header = mp3._frames[0].header + + start_frame = int(start_time * first_header.sampling_rate / 1152) + end_frame = int(end_time * first_header.sampling_rate / 1152) + + new_frames = mp3._frames[start_frame:end_frame] + return MP3Factory.create_by_frames(new_frames) + + @staticmethod + def create_speed_down(mp3:MP3, speed: int): + if speed <= 0: + raise ValueError("Speed must be greater than 0") + + new_frames = [] + for i in range(mp3._frame_count): + for _ in range(speed): + new_frames.append(mp3._frames[i]) + + return MP3Factory.create_by_frames(new_frames) + + @staticmethod + def create_speed_up(mp3:MP3, speed: int): + if speed <= 0: + raise ValueError("Speed must be greater than 0") + + new_frames = [] + for i in range(mp3._frame_count): + if i % speed == 0: + frame = mp3._frames[i] + new_frames.append(frame) + + return MP3Factory.create_by_frames(new_frames) diff --git a/mark/src/mp3_file_io.py b/mark/src/mp3_file_io.py index 22011a2..e436528 100644 --- a/mark/src/mp3_file_io.py +++ b/mark/src/mp3_file_io.py @@ -1,71 +1,44 @@ import os.path -import mp3_header_factory - class MP3FileIo: def __init__(self): self.file_obj = None - self.bytes = None + self.file_bytes = None def file_exists(self, path): return os.path.exists(path) - def open(self, file_path): + def open(self, file_path) -> bool: try: self.file_obj = open(file_path, 'rb') except IOError as e: print(f"파일을 열 수 없습니다: {e}") - return + return False + return True def close(self): if self.file_obj is not None: self.file_obj.close() - def read_all(self): - self.bytes = self.file_obj.read() + def read(self, size): + return self.file_obj.read(size) def get(self, size): - return self.bytes[0:size] + return self.file_bytes[0:size] + + def get_bytes(self, start, end): + return self.file_bytes[start:end] def get_all(self): - return self.bytes + return self.file_bytes def get_size(self): - return len(self.bytes) - - def check_header(self): - try: - file_obj = open(self.file_path, 'rb') - header = file_obj.read(2) - file_obj.close() - - # Convert the header to an integer - header_int = int.from_bytes(header, byteorder='big') - - # MP3 Sync Word가(=11비트) 1로 시작하는지 확인한다. - # 0xFF = 1111 1111 - # 0xE0 = 1110 0000 - if (header_int & 0xFFE0) != 0xFFE0: - print("MP3 SYNC WORD not detected") - return False - return True - except IOError as e: - if isinstance(e, EOFError): - print(f"파일이 너무 작습니다: {e}") - else: - print(f"파일을 열 수 없습니다: {e}") - - return False - - def cut_frames(self, start_byte, end_byte): - io = MP3FileIo() - io.bytes = self.bytes[0:4] + self.bytes[start_byte:end_byte] - return io + return len(self.file_bytes) def save(self, file_path): try: file_obj = open(file_path, 'wb') - file_obj.write(self.bytes) + file_obj.write(self.file_bytes) file_obj.close() except IOError as e: print(f"파일을 저장할 수 없습니다: {e}") diff --git a/mark/src/mp3_file_io_factory.py b/mark/src/mp3_file_io_factory.py new file mode 100644 index 0000000..c8b9824 --- /dev/null +++ b/mark/src/mp3_file_io_factory.py @@ -0,0 +1,15 @@ +from mp3_file_io import MP3FileIo + + +class MP3FileIoFactory: + @staticmethod + def create_with_file(file_path: str) -> MP3FileIo: + io = MP3FileIo() + io.open(file_path) + return io + + @staticmethod + def create_with_bytes(file_bytes: bytes) -> MP3FileIo: + io = MP3FileIo() + io.file_bytes = file_bytes + return io diff --git a/mark/src/mp3_frame.py b/mark/src/mp3_frame.py new file mode 100644 index 0000000..2041bc8 --- /dev/null +++ b/mark/src/mp3_frame.py @@ -0,0 +1,14 @@ +from dataclasses import dataclass + +from mp3_header import MP3Header + + +@dataclass +class MP3Frame: + def __init__(self, header:MP3Header, header_bytes, data_bytes:bytes): + self.header = header + self.header_bytes = header_bytes + self.data_bytes = data_bytes + + def to_bytes(self): + return self.header_bytes + self.data_bytes diff --git a/mark/src/mp3_frame_factory.py b/mark/src/mp3_frame_factory.py new file mode 100644 index 0000000..11507c8 --- /dev/null +++ b/mark/src/mp3_frame_factory.py @@ -0,0 +1,29 @@ +from mp3_file_io import MP3FileIo +from mp3_frame import MP3Frame +from mp3_header_factory import MP3HeaderFactory + + +class Mp3FrameFactory: + @staticmethod + def create(io : MP3FileIo) -> list[MP3Frame]: + frames = [] + while True: + header_bytes = io.read(4) + if len(header_bytes) < 4: + break + + if MP3HeaderFactory.is_header(header_bytes) is False: + break + + header = MP3HeaderFactory.create(header_bytes) + frame_size = header.calc_frame_size() + + data_size = frame_size - 4 + data_bytes = io.read(data_size) + if len(data_bytes) < data_size: + break + + frame = MP3Frame(header, header_bytes, data_bytes) + frames.append(frame) + + return frames diff --git a/mark/src/mp3_header_factory.py b/mark/src/mp3_header_factory.py index 05d3560..34e15e2 100644 --- a/mark/src/mp3_header_factory.py +++ b/mark/src/mp3_header_factory.py @@ -1,9 +1,20 @@ +from warnings import catch_warnings + from mp3_header import MP3Header class MP3HeaderFactory: @classmethod - def create(cls, bytes): + + @staticmethod + def is_header(header_bytes: bytes) -> bool: + # 0xFFE0 + if header_bytes[0] == 0xFF and (header_bytes[1] & 0xE0) == 0xE0: + return True + return False + + @staticmethod + def create(header_bytes) -> MP3Header: versions = { 0: "2.5", 2: "2", @@ -57,19 +68,19 @@ def create(cls, bytes): 3: "CCIT J.17", } - version = versions[(bytes[1] & 0x18) >> 3] - layer = layers[(bytes[1] & 0x06) >> 1] - protection = (bytes[1] & 0x01) - bitrate = bitrate[(bytes[2] & 0xF0) >> 4] - sampling_rate = sampling_rates[(bytes[1] & 0x18) >> 3][(bytes[2] & 0x0C) >> 2] - padding = (bytes[2] & 0x02) >> 1 - private = (bytes[2] & 0x01) - channel_mode = channel_modes[(bytes[3] & 0xC0) >> 6] - mode_extension = mode_extensions[(bytes[3] & 0x30) >> 4] - copy_right = (bytes[3] & 0x08) >> 3 - original = (bytes[3] & 0x04) >> 2 - emphasis = emphases[(bytes[3] & 0x03)] + version = versions[(header_bytes[1] & 0x18) >> 3] + layer = layers[(header_bytes[1] & 0x06) >> 1] + protection = (header_bytes[1] & 0x01) + bitrate = bitrate[(header_bytes[2] & 0xF0) >> 4] + sampling_rate = sampling_rates[(header_bytes[1] & 0x18) >> 3][(header_bytes[2] & 0x0C) >> 2] + padding = (header_bytes[2] & 0x02) >> 1 + private = (header_bytes[2] & 0x01) + channel_mode = channel_modes[(header_bytes[3] & 0xC0) >> 6] + mode_extension = mode_extensions[(header_bytes[3] & 0x30) >> 4] + copy_right = (header_bytes[3] & 0x08) >> 3 + original = (header_bytes[3] & 0x04) >> 2 + emphasis = emphases[(header_bytes[3] & 0x03)] return MP3Header(version, layer, protection, bitrate, sampling_rate, padding, private, channel_mode, mode_extension, - copy_right, original, emphasis) + copy_right, original, emphasis) \ No newline at end of file diff --git a/mark/tests/test_mp3.py b/mark/tests/test_mp3.py index 9457d67..23c9c28 100644 --- a/mark/tests/test_mp3.py +++ b/mark/tests/test_mp3.py @@ -1,54 +1,25 @@ import unittest from mp3 import MP3 -from mp3_file_io import MP3FileIo +from mp3Factory import MP3Factory file_path = "../resource/input.mp3" class TestMp3(unittest.TestCase): def setUp(self): - reader = MP3FileIo() - reader.open(file_path) - reader.read_all() - reader.close() - - self.reader = reader - - def test_calc_frame(self): - mp3 = MP3(self.reader) - mp3.set_header() - mp3.set_frame_size() - - self.assertEqual(mp3.get_frame_size(), 1044) - - def test_calc_frame_size(self): - mp3 = MP3(self.reader) - mp3.set_header() - mp3.set_frame_size() - mp3.set_frame_count() - - self.assertEqual(mp3.get_frame_size(), 8980) - - def test_mp3_play_time(self): - mp3 = MP3(self.reader) - mp3.set_header() - mp3.set_frame_size() - mp3.set_frame_count() - mp3.set_play_time() - mp3_play_time = mp3.get_play_time() - - self.assertEqual(mp3_play_time, 234.5795918367347) + self.mp3 = MP3Factory.create_by_file_path(file_path) def test_mp3_cut(self): - mp3 = MP3(self.reader) - mp3.set_header() - mp3.set_frame_size() - mp3.set_frame_count() - mp3.set_play_time() + new_mp3 = MP3Factory.create_by_cut(self.mp3, 10, 20) + new_mp3.save("../resource/output_cut_10_20.mp3") - output = mp3.cut(0, 10) - output.save("../resource/output.mp3") + def test_mp3_set_down_speed(self): + output = MP3Factory.create_speed_down(self.mp3,2) + output.save("../resource/output_down_x2.mp3") + def test_mp3_set_up_speed(self): + output = MP3Factory.create_speed_up(self.mp3, 2) + output.save("../resource/output_up_x2.mp3") if __name__ == '__main__': unittest.main() diff --git a/mark/tests/test_mp3_new.py b/mark/tests/test_mp3_new.py new file mode 100644 index 0000000..ccd2846 --- /dev/null +++ b/mark/tests/test_mp3_new.py @@ -0,0 +1,41 @@ +import unittest + +from mp3_file_io import MP3FileIo +from mp3_frame import MP3Frame +from mp3_header_factory import MP3HeaderFactory + +# 삭제예정 +class TestMP3New(unittest.TestCase): + def test_all_situation(self): + # load mp3 file + io = MP3FileIo() + io.open("../resource/input.mp3") + + frames: list[MP3Frame] = [] + while True: + header_bytes = io.read(4) + if len(header_bytes) < 4: + break + + if MP3HeaderFactory.is_header(header_bytes) is False: + break + + header = MP3HeaderFactory.create(header_bytes) + frame_size = header.calc_frame_size() + + data_size = frame_size - 4 + data_bytes = io.read(data_size) + if len(data_bytes) < data_size: + break + + # Create the frame and add to the list + frame = MP3Frame(header, header_bytes, data_bytes) + frames.append(frame) + + io.close() + + len(frames) + + +if __name__ == '__main__': + unittest.main()