-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmicroflask_async.py
More file actions
1328 lines (1076 loc) · 44.8 KB
/
microflask_async.py
File metadata and controls
1328 lines (1076 loc) · 44.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#Author : iw0雨渊(Gitee:@wth_iw0 GitHub:@iw0_wth)
#Date : 2026.3.21
#Description : MicroFlask_async is a Flask-like async web framework engineered to streamline the migration of Flask-based projects.
#Copyright © 2026 iw0雨渊. All rights reserved.
#Version : 0.0.20260321
#License : MIT License
#Contact : EMAIL:tianqi2021_001@163.com
#Github : https://github.com/iw0-wth
__version__ = '0.0.20260321'
"""
MicroFlask Async Web Framework
异步版的MicroFlask,可在mpy,py双环境运行,支持自定义动态路由规则
This project uses micropython-easyweb as reference: https://github.com/funnygeeker/micropython-easyweb
This project integrates utemplate as third-party engine: https://github.com/pfalcon/utemplate/
"""
import os
import re
import sys
import binascii
try:
from utemplate import source, compiled, recompile
UTEMPLATE_AVAILABLE = True
except ImportError:
UTEMPLATE_AVAILABLE = False
try:
import ujson as json # MicroPython
except Exception: # pragma: no cover
import json # type: ignore
try:
import uasyncio as asyncio # MicroPython
except Exception: # pragma: no cover
import asyncio # type: ignore
STRICT_MODE = True
#兼容函数
def _schedule(func, arg):
"""兼容 micropython.schedule;在 CPython 下直接调用。"""
if micropython is not None and hasattr(micropython, "schedule"):
micropython.schedule(func, arg)
else: # pragma: no cover
func(arg)
def groups(match):
i = 1
return_tuple = tuple()
while True:
try:
return_tuple += (match.group(i),)
i += 1
except Exception:
break
return return_tuple
def escape(text):
"""替代 re.escape 的手动实现"""
special_chars = r"\^$*+?.()|{}[]"
return "".join(["\\" + char if char in special_chars else char for char in text])
class ImmutableMultiDict(dict):
"""兼容低版本mpy"""
def get(self, key, default=None):
if key in self:
return super().get(key)
return default
#外置的可扩展的渲染引擎
class TemplateEngine:
"""模板引擎基类,定义统一的渲染接口"""
def render(self, template_content: str, context: dict) -> str:
"""
渲染模板的核心方法
Args:
template_content: 模板内容字符串
context: 上下文变量字典
Returns:
渲染后的字符串
"""
raise NotImplementedError("子类必须实现此方法")
def render_file(self, template_path: str, context: dict) -> str:
"""
从文件渲染模板的便捷方法
Args:
template_path: 模板文件路径
context: 上下文变量字典
Returns:
渲染后的字符串
"""
try:
# 尝试从templates目录读取
with open(f"templates/{template_path}", "rb") as f:
content = f.read().decode("utf-8")
except OSError:
# 尝试直接路径读取
with open(template_path, "rb") as f:
content = f.read().decode("utf-8")
return self.render(content, context)
class ExpressionTemplateEngine(TemplateEngine):
"""支持表达式求值的模板引擎"""
def render(self, template_content: str, context: dict) -> str:
def eval_expression(match):
expression = match.group(1).strip()
try:
env = globals()
env.update(context)
result = eval(expression, {"__builtins__": {}}, env)
return str(result)
except Exception:
# 求值失败时返回原表达式
return match.group(0)
# 使用正则替换所有 {{ 表达式 }}
return re.sub(r'{{\s*(.*?)\s*}}', eval_expression, template_content)
class SimpleTemplateEngine(TemplateEngine):
"""简单字符串替换的模板引擎"""
def render(self, template_content: str, context: dict) -> str:
"""使用字符串替换方式渲染模板"""
result = template_content
for key, value in context.items():
placeholder = "{{ %s }}" % key
result = result.replace(placeholder, str(value))
return result
class ThirdPartyTemplateAdapter(TemplateEngine):
"""第三方模板引擎适配器模板"""
def __init__(self, engine_instance, render_method_name="render"):
"""
初始化适配器
Args:
engine_instance: 第三方模板引擎实例
render_method_name: 渲染方法名,默认为'render'
"""
self.engine = engine_instance
self.render_method = getattr(engine_instance, render_method_name)
def render(self, template_content: str, context: dict) -> str:
"""调用第三方引擎的渲染方法"""
return self.render_method(template_content, **context)
def render_file(self, template_path: str, context: dict) -> str:
"""重写文件渲染方法以适配第三方引擎的文件加载方式"""
# 某些第三方引擎可能自带文件加载逻辑
if hasattr(self.engine, 'render_file'):
return self.engine.render_file(template_path, **context)
# 否则使用基类的默认实现
return super().render_file(template_path, context)
def create_template_adapter(render_func, file_render_func=None):
"""创建函数式模板引擎适配器"""
class FunctionalTemplateEngine(TemplateEngine):
def render(self, template_content: str, context: dict) -> str:
return render_func(template_content, **context)
def render_file(self, template_path: str, context: dict) -> str:
if file_render_func:
return file_render_func(template_path, **context)
# 回退到默认文件加载+渲染
with open(template_path, "r", encoding="utf-8") as f:
content = f.read()
return render_func(content, **context)
return FunctionalTemplateEngine()
class UTemplateEngine(TemplateEngine):
"""utemplate模板引擎适配器"""
def __init__(self, pkg=None, dir="templates", loader_type="source", auto_recompile=True):
"""
初始化utemplate引擎
Args:
pkg: 包名,通常为__name__
dir: 模板目录
loader_type: 加载器类型,可选"source"、"compiled"、"recompile"
auto_recompile: 是否自动重新编译(仅对recompile有效)
"""
if not UTEMPLATE_AVAILABLE:
raise ImportError("utemplate is not installed. ")
self.pkg = pkg
self.dir = dir
#由于mpy导入从根目录开始,会导致模板文件导入失败,遂加入相对目录
if self.dir not in sys.path:
sys.path.append(self.dir)
# 根据loader_type创建相应的加载器
if loader_type == "compiled":
self.loader = compiled.Loader(pkg, dir)
elif loader_type == "recompile":
self.loader = recompile.Loader(pkg, dir)
else: # source是默认的
self.loader = source.Loader(pkg, dir)
# 缓存已加载的模板函数
self._template_cache = {}
def _get_template(self, template_name):
"""获取模板函数,支持缓存"""
# 移除扩展名中的点,用下划线替换
cache_key = template_name.replace(".", "_")
if cache_key not in self._template_cache:
# 从加载器获取模板函数
render_func = self.loader.load(template_name)
self._template_cache[cache_key] = render_func
return self._template_cache[cache_key]
def render(self, template_content: str, context: dict) -> str:
"""
渲染模板内容字符串
注意:utemplate主要设计用于文件模板,这里提供一个基本实现
但utemplate的优势在于编译文件模板,所以这个方法可能不是最优的
"""
# 由于utemplate主要设计用于文件模板,对于字符串渲染我们使用简单的回退方案
result = template_content
# 简单的变量替换
for key, value in context.items():
placeholder = "{{ " + str(key) + " }}"
result = result.replace(placeholder, str(value))
return result
def render_file(self, template_path: str, context: dict) -> str:
"""
从文件渲染模板 - 这是utemplate的主要使用方式
"""
# 获取模板函数
render_func = self._get_template(template_path)
try:
# 调用模板函数,它会返回一个生成器
generator = render_func(**context)
# 从生成器收集所有片段
parts = []
for chunk in generator:
if chunk is not None:
parts.append(str(chunk))
return "".join(parts)
except Exception as e:
# 如果渲染出错,提供详细错误信息
error_msg = f"UTemplate渲染错误: {e}\n"
error_msg += f"模板: {template_path}\n"
error_msg += f"上下文键: {list(context.keys())}"
raise RuntimeError(error_msg) from e
def clear_cache(self):
"""清除模板缓存"""
self._template_cache.clear()
#外置的可扩展的动态路由规则
class BaseConverter:
"""路由转换器基类,模仿 Flask 的 werkzeug.routing.BaseConverter"""
# 转换器匹配变量时使用的正则表达式部分
# 子类可以覆盖这个属性
regex = "[^/]+"
# 转换器在URL中可接受的参数(例如 <int:num> 中的 “int” 就是名字,没有参数)
# 对于 <re(r"\d+"):var>,名字是“re”,参数是 (r"\d+",)
def __init__(self, map, *args, **kwargs):
"""
Args:
map: 保留参数,为了与Flask API兼容,这里传入的是 url_map
*args: 来自路由定义中括号内的参数,如 <re(r"\d+"):id> 中的 r"\d+"
"""
self.map = map
self.args = args
self.kwargs = kwargs
def to_python(self, value: str):
"""
将URL中的字符串值转换为Python对象(传递给视图函数)。
默认返回字符串,子类可覆盖以进行类型转换。
"""
return value
def to_url(self, value) -> str:
"""
将Python对象转换回URL中的字符串。
默认使用 str(),子类可覆盖。
"""
return str(value)
# ---------------- 默认转换器定义 ----------------
class StringConverter(BaseConverter):
"""字符串转换器,默认类型"""
pass # 使用基类的 regex 和 to_python
class IntConverter(BaseConverter):
"""整数转换器"""
regex = r"[-+]?\d+"
if STRICT_MODE:
regex = r"-?\d+"
def to_python(self, value: str):
return int(value)
class FloatConverter(BaseConverter):
"""浮点数转换器"""
regex = r"[-+]?\d+\.\d+|[+-]?\d+"
if STRICT_MODE:
regex = r"-?\d+\.\d+"
def to_python(self, value: str):
return float(value)
class PathConverter(BaseConverter):
"""路径转换器,匹配包括斜杠的剩余部分"""
regex = ".+"
def to_python(self, value: str):
return value
class PartConverter(BaseConverter):
"""part 转换器,匹配非空路径段,但不包括斜杠"""
regex = ".+?"
class ReConverter(BaseConverter):
def __init__(self, map, reg):
super().__init__(map)
self.regex = reg
class BoolConverter(BaseConverter):
"""bool转换器"""
regex = r"[tT][rR][uU][eE]|[fF][aA][lL][sS][eE]|[yY][eE][sS]|[nN][oO]|1|0"
if STRICT_MODE:
regex = r"[tT]rue|[fF]alse|[yY]es|[nN]o|1|0"
def to_python(self, value: str):
return str(value).lower() in ("true", "1", "yes")
#对照列表
FILE_TYPE = ImmutableMultiDict(
{
".txt": "text/plain",
".htm": "text/html",
".html": "text/html",
".css": "text/css",
".csv": "text/csv",
".js": "application/javascript",
".xml": "application/xml",
".xhtml": "application/xhtml+xml",
".json": "application/json",
".zip": "application/zip",
".pdf": "application/pdf",
".ts": "application/typescript",
".woff": "font/woff",
".woff2": "font/woff2",
".ttf": "font/ttf",
".otf": "font/otf",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".png": "image/png",
".gif": "image/gif",
".svg": "image/svg+xml",
".ico": "image/x-icon",
}
)
STATUS_CODE = ImmutableMultiDict(
{
100: "Continue",
101: "Switching Protocols",
102: "Processing",
200: "OK",
201: "Created",
202: "Accepted",
203: "Non-Authoritative Information",
204: "No Content",
205: "Reset Content",
206: "Partial Content",
207: "Multi-Status",
208: "Already Reported",
226: "IM Used",
300: "Multiple Choices",
301: "Moved Permanently",
302: "Found",
303: "See Other",
304: "Not Modified",
305: "Use Proxy",
307: "Temporary Redirect",
308: "Permanent Redirect",
400: "Bad Request",
401: "Unauthorized",
402: "Payment Required",
403: "Forbidden",
404: "Not Found",
405: "Method Not Allowed",
406: "Not Acceptable",
407: "Proxy Authentication Required",
408: "Request Timeout",
409: "Conflict",
410: "Gone",
411: "Length Required",
412: "Precondition Failed",
413: "Payload Too Large",
414: "URI Too Long",
415: "Unsupported Media Type",
416: "Range Not Satisfiable",
417: "Expectation Failed",
418: "I'm a teapot",
422: "Unprocessable Entity",
423: "Locked",
424: "Failed Dependency",
426: "Upgrade Required",
428: "Precondition Required",
429: "Too Many Requests",
431: "Request Header Fields Too Large",
451: "Unavailable For Legal Reasons",
500: "Internal Server Error",
501: "Not Implemented",
502: "Bad Gateway",
503: "Service Unavailable",
504: "Gateway Timeout",
505: "HTTP Version Not Supported",
506: "Variant Also Negotiates",
507: "Insufficient Storage",
508: "Loop Detected",
510: "Not Extended",
511: "Network Authentication Required",
}
)
# ========= URL 编码/解码(照搬 easyweb.py) =========
def url_encode(url):
"""URL 编码"""
encoded_url = ""
for char in url:
if char.isalpha() or char.isdigit() or char in ("-", ".", "_", "~"):
encoded_url += char
else:
encoded_url += (
"%" + binascii.hexlify(char.encode("utf-8")).decode("utf-8").upper()
)
return encoded_url
def url_decode(encoded_url):
"""URL 解码"""
encoded_url = encoded_url.replace("+", " ")
if "%" not in encoded_url:
return encoded_url
blocks = encoded_url.split("%")
decoded_url = blocks[0]
buffer = ""
for b in blocks[1:]:
if len(b) == 2:
buffer += b[:2]
else:
decoded_url += binascii.unhexlify(buffer + b[:2]).decode("utf-8")
buffer = ""
decoded_url += b[2:]
if buffer:
decoded_url += binascii.unhexlify(buffer).decode("utf-8")
return decoded_url
def _is_generator(obj):
def _g():
yield 0
return isinstance(obj, type(_g()))
def _is_streaming_body(body):
if body is None:
return False
if isinstance(body, (bytes, str, dict)):
return False
return _is_generator(body) or hasattr(body, "__iter__")
def _to_bytes(x, charset="utf-8"):
if x is None:
return b""
if isinstance(x, bytes):
return x
if isinstance(x, str):
return x.encode(charset)
return str(x).encode(charset)
class Request:
def __init__(self, method="GET", full_path="/", protocol="HTTP/1.1", headers=None, body=b""):
self.method = method or "GET"
self.full_path = full_path or "/"
self.protocol = protocol or "HTTP/1.1"
self.headers = ImmutableMultiDict(headers or {})
self.body = body if body is not None else b""
raw_path = self.full_path.split("?", 1)[0]
self.path = url_decode(raw_path)
self.view_args = ImmutableMultiDict({})
self._url = None
self._args = None
self._form = None
self._json = None
self._cookies = None
@property
def host(self):
return self.headers.get("Host")
@property
def url(self):
if self._url is None:
host = self.host
if host:
self._url = "http://{}{}".format(host, self.full_path)
return self._url
@property
def args(self):
if self._args is None:
args = {}
try:
if "?" in self.full_path:
query_str = self.full_path.split("?", 1)[1].rstrip("&")
else:
query_str = ""
if query_str:
for arg_pair in query_str.split("&"):
if not arg_pair:
continue
if "=" in arg_pair:
key, value = arg_pair.split("=", 1)
else:
key, value = arg_pair, ""
args[url_decode(key)] = url_decode(value)
except Exception:
args = {}
self._args = ImmutableMultiDict(args)
return self._args
@property
def json(self):
if self._json is None:
try:
self._json = json.loads(self.body)
except Exception:
self._json = None
return self._json
@property
def form(self):
if self._form is None:
try:
ct = self.headers.get("Content-Type", "") or ""
if "application/x-www-form-urlencoded" not in ct:
self._form = None
return None
items = (self.body or b"").decode("utf-8").split("&")
form = {}
for item in items:
if not item:
continue
if "=" in item:
k, v = item.split("=", 1)
else:
k, v = item, ""
k = url_decode(k)
v = None if v == "" else url_decode(v)
form[k] = v
self._form = ImmutableMultiDict(form)
except Exception:
self._form = None
return self._form
@property
def cookies(self):
if self._cookies is None:
cookies = {}
try:
raw = self.headers.get("Cookie")
if raw:
for item in raw.split(";"):
item = item.strip()
if "=" in item:
k, v = item.split("=", 1)
cookies[url_decode(k)] = url_decode(v)
except Exception:
cookies = {}
self._cookies = ImmutableMultiDict(cookies)
return self._cookies
class Response:
_default_template_engine = None
def __init__(self, content=b"", status=200, headers=None, mimetype=None, content_type=None):
self.status = int(status or 200)
self.reason = STATUS_CODE.get(self.status, "Undefined")
self.headers = {}
if headers:
if isinstance(headers, dict):
self.headers.update(headers)
else:
for k, v in headers:
self.headers[k] = v
self._cookies = []
if mimetype and not content_type:
content_type = mimetype
self._explicit_content_type = content_type
self.content = content
def set_cookie(self, name, value="", max_age=None, path="/"):
if max_age is None:
max_age_part = ""
else:
max_age_part = "; Max-Age={}".format(int(max_age))
path_part = "" if path is None else "; Path={}".format(path)
line = "Set-Cookie: {}={}{}{}".format(
url_encode(str(name)), url_encode(str(value)), max_age_part, path_part
)
self._cookies.append(line)
@classmethod
def set_default_template_engine(cls, engine):
"""设置默认模板引擎"""
cls._default_template_engine = engine
@classmethod
def render_template(cls, template_path, *args,
template_engine=None, # 可传入特定引擎
**kwargs):
"""增强的模板渲染方法,支持多种引擎"""
# 确定上下文
def _ctx(*args, **kwargs):
if args and isinstance(args[0], dict):
return args[0]
return kwargs
context = _ctx(*args, **kwargs)
# 确定使用的引擎
engine = template_engine or cls._default_template_engine
# 如果没有设置引擎,使用表达式引擎作为默认
if engine is None:
engine = ExpressionTemplateEngine()
# 可以缓存默认引擎实例
cls._default_template_engine = engine
try:
# 使用引擎渲染
content = engine.render_file(template_path, context)
return cls(content, content_type="text/html; charset=utf-8")
except Exception as e:
return cls(f"Template Error: {e}", status=500)
# 添加直接使用字符串模板的方法
@classmethod
def render_string(cls, template_string, *args,
template_engine=None, **kwargs):
"""渲染字符串模板"""
# 确定上下文
def _ctx(*args, **kwargs):
if args and isinstance(args[0], dict):
return args[0]
return kwargs
context = _ctx(*args, **kwargs) if args and isinstance(args[0], dict) else kwargs
engine = template_engine or cls._default_template_engine
if engine is None:
engine = ExpressionTemplateEngine()
cls._default_template_engine = engine
try:
content = engine.render(template_string, context)
return cls(content, content_type="text/html; charset=utf-8")
except Exception as e:
return cls(f"Template Error: {e}", status=500)
def _finalize_headers(self, body_bytes=None, streaming=False):
# Content-Type 默认策略
if self._explicit_content_type:
# 如果用户明确指定了Content-Type,但没指定charset且是文本类型,则加上
ct = self._explicit_content_type.lower()
if ct.startswith('text/') and 'charset=' not in ct:
self._explicit_content_type = f"{self._explicit_content_type}; charset=utf-8"
self.headers.setdefault("Content-Type", self._explicit_content_type)
else:
# 没有明确指定,则根据内容类型设置默认值
if isinstance(self.content, str):
# 字符串默认使用HTML,强制指定UTF-8
self.headers.setdefault("Content-Type", "text/html; charset=utf-8")
elif isinstance(self.content, dict):
# JSON响应
self.headers.setdefault("Content-Type", "application/json")
elif isinstance(self.content, list):
# JSON响应
self.headers.setdefault("Content-Type", "application/json")
else:
# 其他文本内容
self.headers.setdefault("Content-Type", "text/plain; charset=utf-8")
self.headers.setdefault("Connection", "close")
if (not streaming) and (body_bytes is not None):
self.headers.setdefault("Content-Length", str(len(body_bytes)))
# 添加UTF-8编码相关的其他header
self.headers.setdefault("Content-Encoding", "identity")
def _status_line(self):
return "HTTP/1.1 {} {}\r\n".format(self.status, self.reason).encode("utf-8")
def _headers_bytes(self):
lines = []
for k, v in self.headers.items():
lines.append("{}: {}".format(k, v))
for c in self._cookies:
lines.append(c)
return ("\r\n".join(lines) + "\r\n\r\n").encode("utf-8")
def iter_chunks(self):
"""
统一的“响应字节流”迭代器:
- 非流式:status_line + headers + body
- 流式:status_line + headers + (chunks...)
其中 generator 首个 yield 若为 dict,会用于补充 headers(easyweb 风格)。
"""
streaming = _is_streaming_body(self.content)
if not streaming:
body = self.content
if isinstance(body, dict):
body = json.dumps(body)
if isinstance(body, list):
body = json.dumps(body)
body_bytes = _to_bytes(body)
self._finalize_headers(body_bytes=body_bytes, streaming=False)
yield self._status_line()
yield self._headers_bytes()
if body_bytes:
yield body_bytes
return
it = iter(self.content)
first_item = None
has_first = False
try:
first_item = next(it)
has_first = True
except Exception:
has_first = False
if has_first and isinstance(first_item, dict):
self.headers.update(first_item)
first_item = None
has_first = False
self._finalize_headers(body_bytes=None, streaming=True)
yield self._status_line()
yield self._headers_bytes()
if has_first and first_item is not None:
b = _to_bytes(first_item)
if b:
yield b
for item in it:
if item is None:
continue
b = _to_bytes(item)
if b:
yield b
def build(self):
data = b"".join([chunk for chunk in self.iter_chunks()])
return data
def make_response(content=b"", status=200, headers=None):
if isinstance(content, Response):
if status is not None:
content.status = int(status)
content.reason = STATUS_CODE.get(content.status, "Undefined")
if headers:
if isinstance(headers, dict):
content.headers.update(headers)
else:
for k, v in headers:
content.headers[k] = v
return content
if isinstance(content, tuple):
if len(content) == 2:
return Response(content[0], status=content[1])
if len(content) >= 3:
return Response(content[0], status=content[1], headers=content[2])
return Response(content, status=status, headers=headers)
class Application:
def __init__(self, static_folder="static/", static_url_path="/static/"):
self.routes = []
self.static_routes = {}
self.static_folder = static_folder
self.static_url_path = static_url_path
# 初始化URL映射和转换器字典
# 模拟 Flask 的 url_map
class UrlMap:
def __init__(self):
self.converters = {}
self.url_map = UrlMap()
self._register_default_converters()
def _register_default_converters(self):
"""注册所有默认的转换器"""
self.url_map.converters.update({
'string': StringConverter,
'int': IntConverter,
'float': FloatConverter,
'path': PathConverter,
'part': PartConverter,
're': ReConverter, # 重构后的 re 转换器
'bool': BoolConverter
})
def _compile_route(self, path: str):
"""
编译路由路径,支持Flask风格的 <转换器名(参数):变量名> 语法。
返回: (compiled_regex_pattern, converters_dict, param_names_list)
"""
# 修改后的正则表达式,使用普通捕获组替代命名捕获组
# 组1: 转换器名
# 组2: 括号内的参数
# 组3: 变量名
route_param_pattern = re.compile(
r'<(?:([a-zA-Z_][a-zA-Z0-9_]*)(?:\(([^)]*)\))?:)?([a-zA-Z_][a-zA-Z0-9_]*)>'
)
param_names = []
converters = {} # 存放 {变量名: 转换器实例}
regex_parts = []
# 按斜杠分割路径,但注意 path 转换器会吃掉后面的所有部分
segments = path.split('/')
for segment in segments:
if not segment:
regex_parts.append('') # 用于开头的空段
continue
# 尝试匹配动态段
match = route_param_pattern.match(segment)
if match:
# 获取匹配的完整字符串
matched_str = match.group(0)
# 检查是否匹配了整个段(MicroPython兼容的方法)
if matched_str == segment:
# 这是一个动态参数段
converter_name = match.group(1) # 索引1对应转换器名
args_string = match.group(2) # 索引2对应参数
variable_name = match.group(3) # 索引3对应变量名
if variable_name in param_names:
raise ValueError(f"Duplicate variable name '{variable_name}' in route '{path}'")
param_names.append(variable_name)
# 1. 确定转换器类
if converter_name is None:
converter_name = 'string' # 默认转换器
converter_class = self.url_map.converters.get(converter_name)
if converter_class is None:
raise ValueError(f"Unknown converter '{converter_name}' in route '{path}'. "+
f"Available converters: {list(self.url_map.converters.keys())}")
# 2. 解析参数(如果存在)
args = ()
kwargs = {}
if args_string:
# 这是一个简化的参数解析,主要处理 re(r'...') 这种单字符串参数
try:
# 尝试用 eval 解析元组,但在受控环境下(参数来自开发者定义的路由)
parsed_args = eval(f'({args_string},)', {}, {})
if isinstance(parsed_args, tuple):
args = parsed_args
except Exception:
# 如果 eval 失败,将整个 args_string 作为单个字符串参数
args = (args_string.strip(),)
# 3. 创建转换器实例
converter_instance = converter_class(self.url_map, *args, **kwargs)
converters[variable_name] = converter_instance
# 4. 获取该转换器的正则部分并添加到路径
regex_part = f"({converter_instance.regex})"
regex_parts.append(regex_part)
# 5. 如果遇到 'path' 转换器,它是最后一部分,跳出循环
if converter_name == 'path':
# 确保 path 是最后一个转换器
if segments.index(segment) != len(segments) - 1:
raise ValueError(f"Path converter '{segment}' must be the last part in route '{path}'")
# 对于 path 转换器,我们不再连接剩余的硬编码部分
break
continue # 动态段处理完毕,继续下一个段
# 如果到达这里,说明不是动态参数段,是静态文本段
regex_parts.append(escape(segment))
# 构建完整的正则表达式
regex_pattern = "^" + "/".join(regex_parts) + "$"