-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathparser.py
More file actions
201 lines (161 loc) · 7.1 KB
/
parser.py
File metadata and controls
201 lines (161 loc) · 7.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
"""
Upstage Document Parse 연동 모듈
스크린샷 분석 및 텍스트 추출 (Markdown/HTML 형식)
"""
import os
import base64
from pathlib import Path
from typing import Dict, List, Optional
import httpx
from dotenv import load_dotenv
load_dotenv()
class DocumentParser:
"""Upstage Document Parse API를 사용한 문서/스크린샷 파싱"""
def __init__(self):
self.api_key = os.getenv("UPSTAGE_API_KEY")
self.base_url = "https://api.upstage.ai/v1/document-ai/document-parse"
async def parse_image(self, image_data: bytes, filename: str = "image.png") -> Dict:
"""이미지를 파싱하여 텍스트 및 레이아웃 정보를 추출합니다.
Args:
image_data: 이미지 바이트 데이터
filename: 파일명
Returns:
파싱된 문서 정보 (텍스트, 레이아웃, 요소 등)
"""
# 파일 확장자에 따른 MIME 타입 결정
ext = filename.lower().split('.')[-1] if '.' in filename else 'png'
mime_types = {
'png': 'image/png',
'jpg': 'image/jpeg',
'jpeg': 'image/jpeg',
'webp': 'image/webp',
'gif': 'image/gif',
'pdf': 'application/pdf'
}
mime_type = mime_types.get(ext, 'image/png')
async with httpx.AsyncClient(timeout=120.0) as client:
files = {
"document": (filename, image_data, mime_type)
}
headers = {
"Authorization": f"Bearer {self.api_key}"
}
# Markdown과 HTML 둘 다 요청
data = {
"ocr": "auto",
"output_formats": '["markdown", "html", "text"]',
"coordinates": "true",
"base64_encoding": '["table"]'
}
response = await client.post(
self.base_url,
headers=headers,
files=files,
data=data
)
if response.status_code != 200:
error_detail = response.text
raise Exception(f"Document Parse API 오류: {response.status_code} - {error_detail}")
result = response.json()
# 디버깅용 - API 응답 구조 확인
print(f"[DEBUG] Document Parse API 응답 키: {result.keys()}")
if "content" in result:
print(f"[DEBUG] content 타입: {type(result['content'])}")
if "elements" in result:
print(f"[DEBUG] elements 개수: {len(result['elements'])}")
return result
async def parse_image_from_path(self, image_path: str) -> Dict:
"""파일 경로에서 이미지를 읽어 파싱합니다."""
path = Path(image_path)
if not path.exists():
raise FileNotFoundError(f"파일을 찾을 수 없습니다: {image_path}")
with open(path, "rb") as f:
image_data = f.read()
return await self.parse_image(image_data, path.name)
async def parse_image_from_base64(self, base64_data: str, filename: str = "image.png") -> Dict:
"""Base64 인코딩된 이미지를 파싱합니다."""
if "," in base64_data:
base64_data = base64_data.split(",")[1]
image_data = base64.b64decode(base64_data)
return await self.parse_image(image_data, filename)
def extract_text_elements(self, parsed_result: Dict) -> List[Dict]:
"""파싱 결과에서 텍스트 요소들을 추출합니다.
Args:
parsed_result: Document Parse API 응답
Returns:
텍스트 요소 리스트 (텍스트, 위치, 타입 포함)
"""
elements = []
# elements 배열에서 추출
if "elements" in parsed_result:
for elem in parsed_result["elements"]:
text = elem.get("text", "") or elem.get("content", {}).get("text", "")
if text:
elements.append({
"text": text,
"type": elem.get("category", elem.get("type", "unknown")),
"bounding_box": elem.get("bounding_box", elem.get("coordinates")),
"confidence": elem.get("confidence", 0),
"html": elem.get("content", {}).get("html", "")
})
return elements
def get_full_text(self, parsed_result: Dict) -> str:
"""파싱 결과에서 전체 텍스트를 추출합니다 (Markdown 우선).
Args:
parsed_result: Document Parse API 응답
Returns:
전체 텍스트 문자열 (Markdown 형식 우선)
"""
# 1. content.markdown 확인 (최신 API 형식)
if "content" in parsed_result:
content = parsed_result["content"]
if isinstance(content, dict):
if "markdown" in content and content["markdown"]:
return content["markdown"]
if "html" in content and content["html"]:
return content["html"]
if "text" in content and content["text"]:
return content["text"]
elif isinstance(content, str):
return content
# 2. 직접 markdown/html/text 필드 확인
if "markdown" in parsed_result and parsed_result["markdown"]:
return parsed_result["markdown"]
if "html" in parsed_result and parsed_result["html"]:
return parsed_result["html"]
if "text" in parsed_result and parsed_result["text"]:
return parsed_result["text"]
# 3. elements에서 텍스트 조합
elements = self.extract_text_elements(parsed_result)
if elements:
return "\n\n".join([e["text"] for e in elements if e.get("text")])
# 4. pages 배열 확인 (일부 API 버전)
if "pages" in parsed_result:
texts = []
for page in parsed_result["pages"]:
if "text" in page:
texts.append(page["text"])
elif "content" in page:
texts.append(str(page["content"]))
if texts:
return "\n\n".join(texts)
# 5. 결과 전체를 문자열로 반환 (최후의 수단)
return str(parsed_result)
def get_markdown(self, parsed_result: Dict) -> Optional[str]:
"""파싱 결과에서 Markdown 텍스트만 추출합니다."""
if "content" in parsed_result:
content = parsed_result["content"]
if isinstance(content, dict) and "markdown" in content:
return content["markdown"]
if "markdown" in parsed_result:
return parsed_result["markdown"]
return None
def get_html(self, parsed_result: Dict) -> Optional[str]:
"""파싱 결과에서 HTML 텍스트만 추출합니다."""
if "content" in parsed_result:
content = parsed_result["content"]
if isinstance(content, dict) and "html" in content:
return content["html"]
if "html" in parsed_result:
return parsed_result["html"]
return None