Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions mark/.idea/inspectionProfiles/Project_Default.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 16 additions & 10 deletions mark/src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

from Tools.scripts.pep384_macrocheck import parse_file
from fastapi import FastAPI

from mp3 import MP3
from mp3_file_io import MP3FileIo
from fastapi.responses import StreamingResponse
from fastapi.responses import Response
Expand All @@ -16,7 +18,7 @@ async def health_check():


@app.get("/stream-mp3/{file_name:path}")
async def stream_mp3(file_name: str):
async def stream_mp3(file_name: str, speed: int = 1, updown: str = "down"):
print(f"Current working directory: {os.getcwd()}")

path = f"../resource/{file_name}"
Expand All @@ -32,20 +34,24 @@ async def stream_mp3(file_name: str):
finally:
mp3_io.close()

# 추후 MP3 클래스를 사용하여 MP3 파일을 일부만 읽어오는 방법을 구현할 수 있습니다.
# mp3 = MP3(mp3_io)
# mp3.set_header()
# mp3.set_frame_size()
# mp3.set_frame_count()
# mp3.set_play_time()
mp3 = MP3(mp3_io)
mp3.set_header()
mp3.set_frame_size()
mp3.set_frame_count()
mp3.set_play_time()

if updown == "down":
mp3 = mp3.change_speed_down(speed)
elif updown == "up":
mp3 = mp3.change_speed_up(speed)

headers = {
"Content-Disposition": f"inline; filename={os.path.basename(path)}",
"Content-Disposition": f"inline;",
"Content-Type": "audio/mpeg",
"Content-Length": str(mp3_io.get_size())
"Content-Length": str(len(mp3.file_io.file_bytes))
}

return StreamingResponse(io.BytesIO(mp3_io.get_all()), headers=headers)
return StreamingResponse(io.BytesIO(mp3.file_io.file_bytes), headers=headers)

if __name__ == "__main__":
import uvicorn
Expand Down
4 changes: 2 additions & 2 deletions mark/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@

print(mp3.header)
print(mp3.frame_size)
print(mp3.frame_count)
print(mp3.play_time)
print(mp3._frame_count)
print(mp3._play_time)
61 changes: 22 additions & 39 deletions mark/src/mp3.py
Original file line number Diff line number Diff line change
@@ -1,51 +1,34 @@
import mp3_header_factory
from mp3_file_io_factory import MP3FileIoFactory
from mp3_frame import MP3Frame


class MP3:
def __init__(self, file_reader):
self.play_time = None
self.frame_size = 0
self.frame_count = 0
self.header = None
self.file_io = file_reader
def __init__(self):
self._frames = list[MP3Frame]
self._frame_count = 0
self._play_time = 0

def set_header(self):
header_bytes = self.file_io.get(4)
self.header = mp3_header_factory.MP3HeaderFactory.create(header_bytes)

def get_header(self):
return self.header

def set_frame_size(self):
self.frame_size = self.header.calc_frame_size()

def get_frame_size(self):
return self.frame_size

def set_frame_count(self):
offset = 4 # 태그 없어서 고정 offset 사용
self.frame_count = (self.file_io.get_size() - offset) // self.frame_size
def set_frames(self, frames):
self._frames = frames
self._frame_count = len(frames)
self._frame_count = len(frames)
self._play_time = self._frame_count * 1152 / self._frames[0].header.sampling_rate

def get_frame_count(self):
return self.set_frame_count

def set_play_time(self):
frame_duration = 1152 / self.header.sampling_rate
self.play_time = self.frame_count * frame_duration
return self._frame_count

def get_play_time(self):
return self.play_time
return self._play_time


def cut(self, start_time, end_time):
start_frame = int(start_time * self.header.sampling_rate / 1152)
end_frame = int(end_time * self.header.sampling_rate / 1152)
def save(self, file_path):
new_bytes = self.to_bytes()
MP3FileIoFactory.create_with_bytes(new_bytes).save(file_path)

frame_size = self.header.calc_frame_size()
start_byte = start_frame * frame_size
end_byte = end_frame * frame_size
def to_bytes(self) -> bytes:
new_bytes = b''

io = self.file_io.cut_frames(start_byte, end_byte)
return MP3(io)
for frame in self._frames:
new_bytes += frame.to_bytes()

def save(self, file_path):
self.file_io.save(file_path)
return new_bytes
56 changes: 56 additions & 0 deletions mark/src/mp3Factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from mp3 import MP3
from mp3_file_io import MP3FileIo
from mp3_frame_factory import Mp3FrameFactory


class MP3Factory:
@staticmethod
def create_by_file_path(file_path: str) -> MP3:
io = MP3FileIo()
io.open(file_path)
frames = Mp3FrameFactory.create(io)
io.close()

return MP3Factory.create_by_frames(frames)

@staticmethod
def create_by_frames(frames) -> MP3:
mp3 = MP3()
mp3.set_frames(frames)
return mp3

@staticmethod
def create_by_cut(mp3:MP3, start_time:int, end_time:int):
# 첫번째 헤더를 기준으로 시작 프레임과 끝 프레임을 계산한다 (CBR만 가능)
first_header = mp3._frames[0].header

start_frame = int(start_time * first_header.sampling_rate / 1152)
end_frame = int(end_time * first_header.sampling_rate / 1152)

new_frames = mp3._frames[start_frame:end_frame]
return MP3Factory.create_by_frames(new_frames)

@staticmethod
def create_speed_down(mp3:MP3, speed: int):
if speed <= 0:
raise ValueError("Speed must be greater than 0")

new_frames = []
for i in range(mp3._frame_count):
for _ in range(speed):
new_frames.append(mp3._frames[i])

return MP3Factory.create_by_frames(new_frames)

@staticmethod
def create_speed_up(mp3:MP3, speed: int):
if speed <= 0:
raise ValueError("Speed must be greater than 0")

new_frames = []
for i in range(mp3._frame_count):
if i % speed == 0:
frame = mp3._frames[i]
new_frames.append(frame)

return MP3Factory.create_by_frames(new_frames)
53 changes: 13 additions & 40 deletions mark/src/mp3_file_io.py
Original file line number Diff line number Diff line change
@@ -1,71 +1,44 @@
import os.path

import mp3_header_factory

class MP3FileIo:
def __init__(self):
self.file_obj = None
self.bytes = None
self.file_bytes = None

def file_exists(self, path):
return os.path.exists(path)

def open(self, file_path):
def open(self, file_path) -> bool:
try:
self.file_obj = open(file_path, 'rb')
except IOError as e:
print(f"파일을 열 수 없습니다: {e}")
return
return False
return True

def close(self):
if self.file_obj is not None:
self.file_obj.close()

def read_all(self):
self.bytes = self.file_obj.read()
def read(self, size):
return self.file_obj.read(size)

def get(self, size):
return self.bytes[0:size]
return self.file_bytes[0:size]

def get_bytes(self, start, end):
return self.file_bytes[start:end]

def get_all(self):
return self.bytes
return self.file_bytes

def get_size(self):
return len(self.bytes)

def check_header(self):
try:
file_obj = open(self.file_path, 'rb')
header = file_obj.read(2)
file_obj.close()

# Convert the header to an integer
header_int = int.from_bytes(header, byteorder='big')

# MP3 Sync Word가(=11비트) 1로 시작하는지 확인한다.
# 0xFF = 1111 1111
# 0xE0 = 1110 0000
if (header_int & 0xFFE0) != 0xFFE0:
print("MP3 SYNC WORD not detected")
return False
return True
except IOError as e:
if isinstance(e, EOFError):
print(f"파일이 너무 작습니다: {e}")
else:
print(f"파일을 열 수 없습니다: {e}")

return False

def cut_frames(self, start_byte, end_byte):
io = MP3FileIo()
io.bytes = self.bytes[0:4] + self.bytes[start_byte:end_byte]
return io
return len(self.file_bytes)

def save(self, file_path):
try:
file_obj = open(file_path, 'wb')
file_obj.write(self.bytes)
file_obj.write(self.file_bytes)
file_obj.close()
except IOError as e:
print(f"파일을 저장할 수 없습니다: {e}")
Expand Down
15 changes: 15 additions & 0 deletions mark/src/mp3_file_io_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from mp3_file_io import MP3FileIo


class MP3FileIoFactory:
@staticmethod
def create_with_file(file_path: str) -> MP3FileIo:
io = MP3FileIo()
io.open(file_path)
return io

@staticmethod
def create_with_bytes(file_bytes: bytes) -> MP3FileIo:
io = MP3FileIo()
io.file_bytes = file_bytes
return io
14 changes: 14 additions & 0 deletions mark/src/mp3_frame.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from dataclasses import dataclass

from mp3_header import MP3Header


@dataclass
class MP3Frame:
def __init__(self, header:MP3Header, header_bytes, data_bytes:bytes):
self.header = header
self.header_bytes = header_bytes
self.data_bytes = data_bytes

def to_bytes(self):
return self.header_bytes + self.data_bytes
29 changes: 29 additions & 0 deletions mark/src/mp3_frame_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from mp3_file_io import MP3FileIo
from mp3_frame import MP3Frame
from mp3_header_factory import MP3HeaderFactory


class Mp3FrameFactory:
@staticmethod
def create(io : MP3FileIo) -> list[MP3Frame]:
frames = []
while True:
header_bytes = io.read(4)
if len(header_bytes) < 4:
break

if MP3HeaderFactory.is_header(header_bytes) is False:
break

header = MP3HeaderFactory.create(header_bytes)
frame_size = header.calc_frame_size()

data_size = frame_size - 4
data_bytes = io.read(data_size)
if len(data_bytes) < data_size:
break

frame = MP3Frame(header, header_bytes, data_bytes)
frames.append(frame)

return frames
Loading