From 342e1668d39adf56ac29de4b1113420ba0def982 Mon Sep 17 00:00:00 2001 From: Avi Basnet Date: Thu, 27 Nov 2025 01:07:00 +0000 Subject: [PATCH 1/6] dedup working --- nemo_curator_semantic_dedup/README.md | 175 ++++++++++ .../__pycache__/helper.cpython-312.pyc | Bin 0 -> 15239 bytes nemo_curator_semantic_dedup/helper.py | 327 ++++++++++++++++++ .../image_dedup_example.py | 301 ++++++++++++++++ 4 files changed, 803 insertions(+) create mode 100644 nemo_curator_semantic_dedup/README.md create mode 100644 nemo_curator_semantic_dedup/__pycache__/helper.cpython-312.pyc create mode 100644 nemo_curator_semantic_dedup/helper.py create mode 100644 nemo_curator_semantic_dedup/image_dedup_example.py diff --git a/nemo_curator_semantic_dedup/README.md b/nemo_curator_semantic_dedup/README.md new file mode 100644 index 0000000..2cda472 --- /dev/null +++ b/nemo_curator_semantic_dedup/README.md @@ -0,0 +1,175 @@ +# Semantic Deduplication with NeMo Curator + +This example demonstrates how to perform GPU-accelerated semantic deduplication on large text datasets using [NVIDIA NeMo Curator](https://github.com/NVIDIA-NeMo/Curator) on Anyscale. + +## What is Semantic Deduplication? + +Unlike exact or fuzzy deduplication that matches text patterns, **semantic deduplication** identifies documents that are conceptually similar even if they use different words. This is achieved by: + +1. **Computing embeddings**: Converting text into dense vector representations using neural network models +2. **Clustering**: Grouping similar embeddings using GPU-accelerated k-means +3. **Similarity matching**: Identifying near-duplicates within clusters based on cosine similarity + +This approach is particularly effective for: +- Removing paraphrased content +- Identifying translated duplicates +- Cleaning datasets with rephrased information +- Improving LLM training data quality + +## Performance + +NeMo Curator leverages NVIDIA RAPIDS™ libraries (cuDF, cuML, cuGraph) for GPU acceleration: + +- **16× faster** fuzzy deduplication compared to CPU-based alternatives +- **40% lower** total cost of ownership (TCO) +- **Near-linear scaling** across multiple GPUs + +## Install the Anyscale CLI + +```bash +pip install -U anyscale +anyscale login +``` + +## Submit the Job + +Clone the example from GitHub: + +```bash +git clone https://github.com/anyscale/examples.git +cd examples/nemo_curator_semantic_dedup +``` + +Submit the job: + +```bash +anyscale job submit -f job.yaml +``` + +### Using Your Own Data + +To process your own dataset, set the `INPUT_DATA_PATH` environment variable: + +```bash +anyscale job submit -f job.yaml \ + --env INPUT_DATA_PATH=s3://your-bucket/your-data/ \ + --env OUTPUT_DATA_PATH=s3://your-bucket/output/ +``` + +Your input data should be in Parquet or JSONL format with at least two columns: +- `id`: Unique document identifier +- `text`: The text content to deduplicate + +## Configuration Options + +You can customize the pipeline behavior via environment variables: + +| Variable | Default | Description | +|----------|---------|-------------| +| `INPUT_DATA_PATH` | `/mnt/cluster_storage/semantic_dedup/input` | Path to input dataset | +| `OUTPUT_DATA_PATH` | `/mnt/cluster_storage/semantic_dedup/output` | Path for output data | +| `EMBEDDING_MODEL` | `sentence-transformers/all-MiniLM-L6-v2` | HuggingFace model for embeddings | +| `EMBEDDING_BATCH_SIZE` | `64` | Batch size per GPU for embedding computation | +| `NUM_CLUSTERS` | `1000` | Number of k-means clusters | +| `SIMILARITY_THRESHOLD` | `0.8` | Cosine similarity threshold (0.0-1.0) | + +### Embedding Model Options + +| Model | Quality | Speed | Use Case | +|-------|---------|-------|----------| +| `sentence-transformers/all-MiniLM-L6-v2` | Good | Fast | Quick experiments, large datasets | +| `intfloat/e5-large-v2` | Better | Medium | Production workloads | +| `BAAI/bge-large-en-v1.5` | Best | Slower | High-quality deduplication | + +### Tuning the Similarity Threshold + +- **Higher threshold (e.g., 0.9)**: Stricter matching, fewer duplicates removed +- **Lower threshold (e.g., 0.7)**: Looser matching, more duplicates removed + +Start with 0.8 and adjust based on your quality requirements. + +## Understanding the Example + +### Pipeline Architecture + +``` +┌─────────────────┐ +│ Input Data │ Parquet/JSONL files +└────────┬────────┘ + │ + ▼ +┌─────────────────┐ +│ Embedding │ GPU-accelerated transformer inference +│ Creation │ (sentence-transformers, E5, BGE, etc.) +└────────┬────────┘ + │ + ▼ +┌─────────────────┐ +│ Clustering │ GPU-accelerated k-means (cuML) +│ (k-means) │ Groups similar embeddings +└────────┬────────┘ + │ + ▼ +┌─────────────────┐ +│ Duplicate │ Pairwise similarity within clusters +│ Extraction │ Identifies semantic duplicates +└────────┬────────┘ + │ + ▼ +┌─────────────────┐ +│ Deduplicated │ Original data minus duplicates +│ Output │ +└─────────────────┘ +``` + +### Key Components + +- **EmbeddingCreator**: Computes dense vector embeddings using pre-trained models +- **ClusteringModel**: GPU-accelerated k-means clustering with cuML +- **SemanticClusterLevelDedup**: Finds duplicates within clusters using cosine similarity + +### Scaling Considerations + +- **Number of clusters**: Should be roughly √(n_documents) for balanced cluster sizes +- **Memory**: Each GPU should have enough memory for the embedding model (~4GB for MiniLM, ~8GB for larger models) +- **Batch size**: Increase for faster processing, decrease if running out of GPU memory + +## Output + +The pipeline produces: + +1. **Deduplicated dataset**: Parquet files in `{OUTPUT_DATA_PATH}/{timestamp}/deduplicated/` +2. **Cache files**: Intermediate embeddings and clusters for debugging + +Example output log: + +``` +============================================================ +Semantic Deduplication Complete! +============================================================ +Original documents: 1,000,000 +Duplicates removed: 127,543 +Final documents: 872,457 +Reduction: 12.75% +Output saved to: /mnt/cluster_storage/semantic_dedup/output/20250115T143022Z/deduplicated +============================================================ +``` + +## Monitoring + +View your job progress in the [Anyscale Console](https://console.anyscale.com/jobs). The Dask dashboard link will be printed in the logs for detailed task monitoring. + +## Cost Optimization Tips + +1. **Use spot instances**: The job.yaml is configured with `market_type: PREFER_SPOT` for cost savings +2. **Start small**: Test with a subset of your data before running on the full dataset +3. **Choose the right GPU**: A10G instances offer a good balance of cost and performance +4. **Tune batch size**: Larger batches = better GPU utilization = faster processing + +## Learn More + +- [NeMo Curator Documentation](https://docs.nvidia.com/nemo/curator/latest/) +- [Semantic Deduplication Guide](https://docs.nvidia.com/nemo/curator/latest/curate-text/process-data/deduplication/index.html) +- [NeMo Curator GitHub](https://github.com/NVIDIA-NeMo/Curator) +- [Anyscale Documentation](https://docs.anyscale.com/) + diff --git a/nemo_curator_semantic_dedup/__pycache__/helper.cpython-312.pyc b/nemo_curator_semantic_dedup/__pycache__/helper.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..b0e10dfe70c3ed15c17633f6b4060378992a8e3a GIT binary patch literal 15239 zcmbt*YjhJ=nqZYwl1gvM?>B{CSb%NJBY+_R;|CbKZ4yWb4vCI%l|e?9oGKZxB6mBT zKHK&zXAslb!I?A@XFKOa*|6K*=V51GBq4Ly&g@w!5|O6j>|uBEW7so4oQ63`(|gYD z_uW#Jgi&w0cgyy@U)_7Z?>_3g-~ArfpV{pe3Z9=VZ1VrAgQEThU(`ph0Q7@PG(}yb zc#5Y3)D%5N(+KMVx-lKe^<#RHGh+sCX$2sPL+!%0Am5h~;I#Zx@s%)%`z-*v=s(h@RrUbiagLdXX#Z={3 zC8@Ins-~*PssXl+)$mn(HE;WgVXT&~;q5| zD*$TY+jtLO2~g{-&a?SGeg+RMTYbS`DC`USLqTZ--c8d!G3@sR0A!AZLYUi6oI2L; z?LE@pJ2W_Q7z&Jm(Al$s2#{fL%6C@4w&7R#Dafs)VD!wSFdl|i3wb@{3y+_>e+N1o z&wYKu1IQz9P!#>(?{pM3MA5(vVc-en;pwQ3rz7z4A=Kj2N4=b+s5!lvic*t>bd+i; zN{04hwn*JXc4AdMGWhU7?o(hEtPvhJ9 zHi{CrKr6dim(M@Ufo}je6RM0tbsV`Ge$;;7fcL^)X=+?YQ^%>Oo~O_E0sXeHQ>8lT zQlWg#8>0HRftRB+q^9B{ew(^(RA)@M`2X5}R3FuyhqadI%l|rDuJ$xpr9%0fr&X%j zEOp8(Y3g!{p}uG2=BN(FY{8LVXH-fwHB0TLF5iN`Yw))Se?v`F7e&^Ifv-c>aW`dv z@m++KItBa!y}five~P+DH&J0|qdb!(DwNN|Why*bp+fn5Lf1@*?Q^iAxwUWjo*5Lb zJsUMVzRss$mFEmUG+h2b$uH6uDP@!}r$70p`ir#3I&vQw@jiax4`3ICVF%()Gll(A zLTDx&F~8Ik4hvJ$VJ=esGB@D!2LzrAhd4fTF&GH>crN1T=8l{=ag38-@6JfvBV9Ka zVR`}qK;V1|uEb3!h!Y`^^VnsB=nI||Wc^toESr4((7AAUTDJEF{6a8%Lg`pGD)eL% z68DY{$&4uYc+ms{l?}XbX6CGHRWSNRF(d*VX*M|S56QM11*O;^1q5N*W0XyjAW0xw zWc`d7kWHcx7X5+*Xvz>}RurUZ5IurS3leTxjy#VJcAg7O37w*Ewv!hod^3S?r*O$P zH60M7&Y&=>1<`gwb!n`w!GtCnKIk@ zliT|597=B+jlGtsJeICJmN;=bTUvYVz|{lqo&CJDJ-gBKe*L@kzu1te-S(}4s(O}& zW~s_!YeuS~e(BXed1-+)_%2pnNF27W^9c~TjN?aRpxnEM;WX?9e#Z{wsXl8 zn@w=LR>oJJPq_D`nf+O-bFCH$eRr>R+jnbZK;I=Qk9`NDg5PB2vEM$lQ0}UXt10Pf zdhcYWWpA=&Z^GIAYf6VxcUCO9|LC<|n;uF35mxobrTshUKi$%|LH7|&_vzS=bb5phoBAwOS7*NMqWel6x7{X$t4!E(LsNgJ;Zt2x|7OFdElBgz%?2## zWB`8bWHQ5a$jMX%ESWo*ph$TpbLWxzj`^f^^5jdSw=RnC%xxPFgi9b z`{0~)tGM}GJ>k?b!nr&`QKw;5=FB9u%vqyW-dqhRb2fl11hV6K{x%ae^VYYS>$*8d z)E>1(9lY)5`Z*_@U#0mTR4D3H`=F^PTdYO?oQj&FcHVx&@nKFg$YWkU1}9RbO0(E` z7G-%S@AA`sOl#+vstcgLSnSl)&bx4}M!8Of@;RI+ZgstPQFHG6+WyF;jeMUz`qD4*wP-6&H>O;h1cwM7(o06Bs$$5icivP-Sc=X`}) zJ6HC5%&zjW*_9W}u1uv9E&txxm4DCdG+K|%ZihPR$(<^c&(#^Lqxhb=R{S1wt$J*( zl?8Lv)+}22y>qSnp1EqY3g;T5FP~!2;&^Olz!%=F&UkW<3gz=?UZz!rwcOMjqooopBr~F~rK{79>Cf;))DeTdSsHDo~@lY@b zmJ%pZ%!&RNPssY`pBo0EJ9&>@+=wkq}ChMDk0`yL$erSKy|$D|YrM%WH8 z`Cgq7!kz|M7m{RV+7~`2Gm{~IP()=}MA-@YUGK48ZL~S9@pw+TGYi3)DFGBRL1z77 z0W`yll5F%%PYXewXkB7G(lDL{$^s~juwFPUnF&D)+2WV{LC`ycotc`JWZg6`oBMoW z-+%~9AzQ*BuR75h=m%|5*#gS;^i0^x`$gGIio7rr*#-mjVqdt3vUy4f`*p01g7e9hnh8wWn~K)!OFe z=ih(j-B(gAyS}J>YH?_xXYo>uj_t@+R4=t&J-=``TT&hqu5MoFy;t6_B*jLTdzQzS zcPC1=KuK*weD}>A$h-%NG+BoiY7GBURV*&VjcM ztn656SqZP$(zQ>=*j27+dEgh;82hF@>ngowy=q-La^uV|CqJ0HS&`~E7_+8bJ%G8f zd3iAIdv|!HI@!E0#qG~g5gr~67^F|f2Zt?3{#P0Dq_*QO!L}t8khYyYc*8qVaUiw_zipo`2E~@^wGEQF+2%#+y9bY z2d?~aPv@|O{-}!{+F<->yB^_PhYUk)`cJy)p+@5;`xt}|SfS#!g&t~QZd+>*ZfP2N zM*pcoKz{Yhi z!)Z-y6~mkntRg0AgdZJx8*8Wu-BsOXH$%;t=AgDB0N8DPlfr?o!T1%;- zpk?zqg7BKBiJBk&pigyNKWCgXT%=I%d(3wjRdu{D9*yJyKVZw8HLO`Zlbh9=e9k+g z7Wm$-sP%FGCGq=Ne`uWry?74tyx-5`9ytN37%N=zOJQ&5e8e;iK0NT~MQS+ky`e{p zn-D`&Ty76@sCtMfdm`53(*b{&^9RBC7qN0=b4y%kf{WNSRKYiX4xESPoFot=P!UmP zMBK-e!`>e}o5K`CP!6I7*;8hCK^m8tnII^r`ta0rq&n|t6v4-KLExa21Ko%dk!w8_ z;xmZHPWjFYP%4SfU4VtT+rcVlO7>c@0EhEveQOUA{8t2s( zMAq>WiUfTHTN{;S1WgwL95OR86C4MfbZQ#aKoMW7S8IwWX^2)q7DmLli=A zXD&k13y5=pMoIh7Mj1>fVPlr%6L*=-a2i`|S2iqeSgK2zTjvk0GUkQDZ}u+fes(yv zH&ORg!m>Nf?0Lvi1N12US2Ky{Ur4|BQsVS0sTaKo-*js9)pWo3P_J_s|GHoN=ciXr z-J%z&V~1j|E_E&aC|TQ)DC_*3-InMV*Yw~M{Dy*t(4)b!F!Xc%(!kr+*xd5aiZju9 z@RmJc8A&svI3CB6{jRw+-n{CpNLOyXS(A45Eih1FwqHJxwYjbgE)K@{rGb>KX%)1w ziJxuBx|_4^s)uHS!}5?an62Qdv}}4SVhVKOP}kj zNcigYUC>vN@=5L3&3E%*4g`t}Qp_K_#c8Ob9NHUlBXH!YsgPJ3)faOp0?i_=Rhr=X z(;ufi#>m!Vq5uL?;8b&r6NT}R$a5Ip@C7;I>F2=I1%tZ7qnCAlUbcor|5<;~7x2Q1 zU3(!GuG1$-V2)z!Kms~oAqJ(1i#gaVOq~&U9weu1*FcaDQyw#zxFn_k6fFvbhXBf*S;2+z%Eq8@5eH}q1$fB4;~k;@i;Sb@-G1DCRfH( zn>5uXOq;T8U75E1$+rD~YD=;$q^>4us!5ny?`_(a+0>of)ScPXlibvk+SCh;tVy%y?fm8WWpV!(zEa8 zVA3;?YB>y_-26G)m^C|Ndo!giN%%Fl6t){kdU{hWeScs(I87|A3X5o;8V01Xh%qPM zi%Iv_XY%Y(R3g*GuvBIN8&C`BL^X4z(7c(nz)03TBn%G0q5QQ32={0P4unLU>k!~o zmCS0-QG-{L>A+H&agoPF^Co}Y0Tg3;mIg*?Lq<_VCTcRsCZIPF&6D#_aJ~x=I`*9j z2puCL`gKKg>5AQucr2>eLp%|Cm(8fXf)CU;D?4)$SKtpN=z16thcHn@Aw-`q_yRM6 z_ya6MTiRn3#}OuMw*-(X8>jsN*{X<1#VweVx99{b2o@{KnVYbu(i%F4p^5JUtvm21 z1t1}OsR^#H%c`%9UL9Q?%rx&wHt$K5?OiahRTNkW%*_~@{ifwz63Guil zO2)4N_`~dJP%rew;){GCqiO6{?1(3f1~IoAnuQ9CrXBlvYa5tJQ!9X(im+eQkhg4$ z-LuHkx?xo9Ltr_RhmABK!yP?7I+7Ph1w7YrA4LsVF-CuoM`a#mulO`38t098EaGu2 zIf2P@m^{v0d6p7KvGy1wDs!PY6BXtnj#R?D=djQZ$rIex_Y@v6BXA3x78%f?PM53-IF=h<{(hNU2@6Vj2 zAg0U`?@dna2fskM(kH0%P{UhpSXF1K>W1|3Bf|G<*kq zXJnqwj^2# zJ0ExypSiNAa}urSbx%}_oPx(`lUf;dMFCsIg&L1mQvw*dwTJR8TDdD)R)}$Vj6@?D z9)1eix2x?ZJ5(s2=WiCwl}F2q#be<%f#Zg=fFABb@_^@u-*eP=?L?vOS1#PPe`qelb`mK(#kkRBAQSg7DA#&JS%k*2npuSbqvD^Qa3jY{vPye73o58jrpgAfIld3 zO2kehlMCRX3D(W^`N3F%XzDC@(qIG^A=n5$taC!)5T$dWnE=n75jY8^#A}2k<#rkI zAd!K0b0afg1d2GSTqVxd9~=+N@B-uZoXcY5iVK+AjgP9h_{S$tH-wF#2H~zp9lmrpIbud%a z#?7s-AC$ZxRQM$htjd|GAR&4}lz^2Z?h%*{ep2!oaXCgJ)&id)_!x85ivO`hTL%ba zD>EQoN;tMH`B@WX4y=fO23^Q{v@xCnNG&KQ4cD*r0FW@?$$x3a1&#m5&woZ3j#oP2 zWO|!S^AV%MSP@noN5sK#?c9l+-U>G$OYr@e63!+o+0BU#oNAfn_{=y4J||`Zfm!9# zqeC@6{fH^|O(G?uBq)rIPlcPhIZrileTuUsX ztp>Ov-L;K4KS3qNFfw{b!+wnA=e?bZ{&EB<^de;|nEq2|fXl%PKn!`YAL~$$B}SHr zN(k|~iZ37p=NEWoWjmRL64HgsmDYkiA-kX8Yw}^lw*Xak`vTw@A>ItdZz7QplemJF zEMZX`32~a*a<7D6WLM5rtwqU2REvm57UI_sJtr;}mue!aP7J~uvPBsakiL{x`ia3Q zTQv-RLc)U-Ma7ekqVm@x%PQknV)vk`6?kU<643t}{7DDl6amj{qw1NpF7tPpE#Pse zu1zrIU%DF>3|Y21!*WTMTP}aU=G~gR>{hTQOE!FCq-^!8tUck_me^FP9f``$_*~-Yk;MMd#HcS}Ig@6_zpmrn8GLK-J^ua4cPA5Fhf=M*sk*)dQw`m@ z-1A3P?Ip3UH^&n-TT&HU6CDRK_Ja#>;j}I8j(35ZHo4_sqOxbf1{O%&miV4qmUQh< zf~{JuY+b%|^XQ%abmemkw$;k&YZtFxycWG0T^>qw^`t5fL194@)*3&ZD(zliq0hPo zKnH4$@++qoPcOAWjN6VeZqN3W{a<(vCS5Ht%hKT2)y>(mx@=iPc7rFo`2cu&Yf56q zhc2pV-_5lp|fZ|@uTtmE3d{kB)GjdD{kv=4W|x1n>g@X;<>SeWVemxZ&3qlv5WooxCqB9{naHlgLXcdQ!Dx>n5Hy*u$TtXEpnNSn6*|O!0d&QG z35iU1d;lRF*{Pw!T@etR%C`O`w2)8{gFQnwiTH035tHq3luH}{;1sk`&W0u5vSle6 z?^^OE?CtSq<5FBmSa&9vor(nlwubf~3`FS%+W|lqIm}KU0ns!1k92^ z;BWw+8QhBJ^*DPn>!*4Z!Il%~z95KpatLsPeG(i|aIpF~h(y6XB+icpARcgH$9;60 z8t-Oq~cVFdyOW3@@`maMqsnho$wFo%k0>5tDtm9yjk(40t~!Vb(|PSPs+-g-{9F^r_yR31s)KdbN4CKotqXz} z=8g>xE4mcwEV;{4Ii-y_%t1z@rqO{Wx1zNvM>&%>o#EaU$X7@O;XK3)--8}yn{rwx z7_#xq%*2EsKF)Ce7}|-L9Kr=gH5{yEyYtG};@Cet4a>^de>(E|NbITQt@99&>dZ10 z5-y{`Cbqige?V3iBAEDPJgmq;7AepDw?c>VuLrqnXUaV*^jRu5;`H$C@pctni;LKZ z$B2t^BcY%Gbik~kBDQJ1165UWBTTthLD0vL#uu1mAdwk@VIXmJ69+K7d<~ne>t{WdxQdPPRP!Usx73S& z4UO(26OTYl_TNDsabf7=r9o5`h_&#t`0tSrng4%4h|C`qLmaic)7YNO1KUR|{!mA5b9pw&7O!_=2Dne~dPNYI6uHr(>?h*4`Zrki3?_#m z0k0_pp2HA=mPlwELOyUw(4PxhgDF6w<`77loyHgR%#r(;vT+*3CtOB+9*N-p;?0ON z%75tSVb>FntP)!Ky%lwnGj;JsTrULhiDVsPd_NL(7PVg80md$Ra8S$wxTjy zzj4j_EKPT0OPkj82;6g*tr-viI#oc2fGh>2W&*-L bytes | None: + for attempt in range(1, retries + 1): + try: + async with session.get(url, timeout=aiohttp.ClientTimeout(total=15)) as response: + if response.status == HTTP_OK: + return await response.read() + elif attempt > 1: + logger.debug(f"[Attempt {attempt}] Failed to download {url}: HTTP status {response.status}") + except (aiohttp.ClientError, asyncio.TimeoutError) as e: + if attempt > 1: + logger.debug(f"[Attempt {attempt}] Failed to download {url}: {e}") + + if attempt < retries: + await asyncio.sleep(1) + + logger.debug(f"All {retries} attempts failed for {url}") + return None + + +async def process_batch(batch: pd.DataFrame, output_dir: str, batch_num: int) -> None: + tar_filename = os.path.join(output_dir, f"{batch_num:05d}.tar") + + metadatas = [] + # Set timeout and connection limits for the session + timeout = aiohttp.ClientTimeout(total=15) + connector = aiohttp.TCPConnector(limit=256, limit_per_host=16) + + async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session: + tasks = [] + for i, (_, row) in enumerate(batch.iterrows()): + caption = row["TEXT"] + url = row["URL"] + + key = f"{batch_num:05d}{i:04d}" + + meta = {"url": url, "caption": caption, "key": key} + metadatas.append(meta) + + tasks.append(fetch_image_bytes(session, url, retries=3)) + + results = await asyncio.gather(*tasks, return_exceptions=True) + + with tarfile.open(tar_filename, "w") as tar: + for i, result in enumerate(results): + # Only proceed for successful downloads (bytes) + if isinstance(result, bytes) and result: + key = f"{batch_num:05d}{i:04d}" + + # Add image bytes + jpg_info = tarfile.TarInfo(name=f"{key}.jpg") + jpg_info.size = len(result) + tar.addfile(jpg_info, fileobj=io.BytesIO(result)) + + # Add caption text + caption_bytes = str(metadatas[i]["caption"]).encode("utf-8") + txt_info = tarfile.TarInfo(name=f"{key}.txt") + txt_info.size = len(caption_bytes) + tar.addfile(txt_info, fileobj=io.BytesIO(caption_bytes)) + + # Add JSON metadata + json_bytes = json.dumps(metadatas[i]).encode("utf-8") + json_info = tarfile.TarInfo(name=f"{key}.json") + json_info.size = len(json_bytes) + tar.addfile(json_info, fileobj=io.BytesIO(json_bytes)) + + # Write parquet + meta_df = pd.DataFrame(metadatas) + parquet_path = os.path.join(output_dir, f"{batch_num:05d}.parquet") + meta_df.to_parquet(parquet_path) + + +def process_parquet_chunk(chunk: tuple[int, pd.DataFrame], output_dir: str) -> None: + batch_num, batch = chunk + + asyncio.run(process_batch(batch, output_dir, batch_num)) + + +def download_webdataset( + parquet_path: str, + output_dir: str, + entries_per_tar: int = 10000, + num_processes: int = 2, +) -> None: + os.makedirs(output_dir, exist_ok=True) + + # Read the parquet file + df = pd.read_parquet(parquet_path) + print(f"Loaded {len(df)} entries from parquet file") + + # Split the dataframe into chunks for multiprocessing + chunks = [ + (batch_num, df[i : i + entries_per_tar]) for batch_num, i in enumerate(range(0, len(df), entries_per_tar)) + ] + print(f"Split into {len(chunks)} chunks of {entries_per_tar} entries each") + + # Use multiprocessing to process chunks in parallel with progress tracking + with Pool(processes=num_processes) as pool: + func = partial(process_parquet_chunk, output_dir=output_dir) + + # Use tqdm to track progress of chunk processing + list(tqdm( + pool.imap(func, chunks), + total=len(chunks), + desc="Processing chunks", + unit="chunk" + )) + + # Best-effort cleanup of legacy tmp dir from previous versions + tmp_dir = os.path.join(output_dir, "tmp") + try: + if os.path.isdir(tmp_dir) and not os.listdir(tmp_dir): + os.rmdir(tmp_dir) + except OSError as e: + logger.debug(f"Failed to remove tmp dir {tmp_dir}: {e}") + + +def _prepare_metadata_record( + image_obj: ImageObject, + new_id: str, + old_id_col: str | None, +) -> dict: + """Prepare metadata record for an image object.""" + metadata_record = { + "id": new_id, + "original_id": image_obj.image_id, + "original_path": image_obj.image_path, + } + + # Preserve original ID in specified column if requested + if old_id_col: + metadata_record[old_id_col] = image_obj.image_id + + # Add scores and embeddings to metadata + if image_obj.aesthetic_score is not None: + metadata_record["aesthetic_score"] = image_obj.aesthetic_score + if image_obj.nsfw_score is not None: + metadata_record["nsfw_score"] = image_obj.nsfw_score + if image_obj.embedding is not None: + # Convert embedding to list for JSON serialization + metadata_record["embedding"] = image_obj.embedding.tolist() + metadata_record["embedding_dim"] = len(image_obj.embedding) + + # Add original metadata + if image_obj.metadata: + metadata_record.update(image_obj.metadata) + + return metadata_record + + +def _add_caption_to_metadata(image_obj: ImageObject, metadata_record: dict) -> None: + """Add caption/text to metadata record.""" + if "caption" in image_obj.metadata: + metadata_record["caption"] = str(image_obj.metadata["caption"]) + elif "text" in image_obj.metadata: + metadata_record["caption"] = str(image_obj.metadata["text"]) + elif "TEXT" in image_obj.metadata: + metadata_record["caption"] = str(image_obj.metadata["TEXT"]) + + +def _add_image_to_tar(tar: tarfile.TarFile, image_obj: ImageObject, new_id: str) -> None: + """Add image data to tar file if available.""" + if image_obj.image_data is not None: + # Convert numpy array to PIL Image and save as bytes + image_pil = Image.fromarray(image_obj.image_data) + image_bytes = _image_to_bytes(image_pil) + + # Add image to tar + image_info = tarfile.TarInfo(name=f"{new_id}.jpg") + image_info.size = len(image_bytes.getvalue()) + tar.addfile(image_info, fileobj=image_bytes) + + +def _add_json_to_tar(tar: tarfile.TarFile, metadata_record: dict, new_id: str) -> None: + """Add JSON metadata to tar file.""" + json_data = json.dumps(metadata_record, indent=2) + json_bytes = json_data.encode("utf-8") + json_info = tarfile.TarInfo(name=f"{new_id}.json") + json_info.size = len(json_bytes) + tar.addfile(json_info, fileobj=io.BytesIO(json_bytes)) + + +def save_imagebatch_to_webdataset( + image_batches: list[ImageBatch], + output_path: str, + samples_per_shard: int = 10000, + max_shards: int = 5, + old_id_col: str | None = None, +) -> None: + """ + Save ImageBatch objects to WebDataset format with resharding. + + Args: + image_batches: List of ImageBatch objects from pipeline output + output_path: Directory path where the WebDataset should be saved + samples_per_shard: Number of samples to include in each tar file + max_shards: Order of magnitude of max shards (for zero-padding filenames) + old_id_col: If specified, will preserve the original image_id in this column + """ + os.makedirs(output_path, exist_ok=True) + + # Flatten all ImageObjects from all batches + all_image_objects = [] + for batch in image_batches: + all_image_objects.extend(batch.data) + + if not all_image_objects: + print("No images to save") + return + + print(f"Processing {len(all_image_objects)} images into {samples_per_shard} samples per shard") + + max_samples_per_shard = math.ceil(math.log10(samples_per_shard)) + + # Process images in shards + shard_id = 0 + for i in range(0, len(all_image_objects), samples_per_shard): + shard_images = all_image_objects[i:i + samples_per_shard] + + # Create output file paths + parquet_filename = _name_partition(shard_id, max_shards=max_shards) + tar_filename = _name_partition(shard_id, max_shards=max_shards, ext="tar") + parquet_path = os.path.join(output_path, parquet_filename) + tar_path = os.path.join(output_path, tar_filename) + + # Prepare metadata for parquet + metadata_records = [] + + # Create tar file with images and metadata + with tarfile.open(tar_path, "w") as tar: + for sample_idx, image_obj in enumerate(shard_images): + # Generate new ID combining shard and sample indices + new_id = _combine_id( + shard_id, + sample_idx, + max_shards=max_shards, + max_samples_per_shard=max_samples_per_shard + ) + + # Prepare metadata record for parquet + metadata_record = _prepare_metadata_record(image_obj, new_id, old_id_col) + metadata_records.append(metadata_record) + + # Save image data if available and requested + _add_image_to_tar(tar, image_obj, new_id) + + # Store caption/text in metadata (no separate .txt file) + _add_caption_to_metadata(image_obj, metadata_record) + + # Add JSON metadata to tar + _add_json_to_tar(tar, metadata_record, new_id) + + # Save metadata to parquet + metadata_df = pd.DataFrame(metadata_records) + metadata_df.to_parquet(parquet_path, index=False) + + print(f"✓ Saved shard {shard_id:0{max_shards}d} with {len(shard_images)} samples") + print(f" - Tar file: {tar_filename}") + print(f" - Parquet file: {parquet_filename}") + + shard_id += 1 + + print(f"\nSuccessfully saved {len(all_image_objects)} images to {shard_id} shards") + print(f"Output directory: {output_path}") + + +def _name_partition( + partition_index: int, + max_shards: int = 5, + ext: str = "parquet", +) -> str: + """Generate partition filename with proper zero-padding.""" + return f"{partition_index:0{max_shards}d}.{ext}" + + +def _combine_id(shard_id: int, sample_id: int, max_shards: int = 5, max_samples_per_shard: int = 4) -> str: + """Combine shard and sample IDs into a unique identifier.""" + int_id = sample_id + (10**max_samples_per_shard) * shard_id + n_digits = max_samples_per_shard + max_shards + return f"{int_id:0{n_digits}d}" + + +def _image_to_bytes(image_pil: Image.Image, image_format: str = "JPEG") -> io.BytesIO: + """Convert PIL Image to BytesIO object for tarfile.""" + buffer = io.BytesIO() + image_pil.save(buffer, format=image_format) + buffer.seek(0) + return buffer \ No newline at end of file diff --git a/nemo_curator_semantic_dedup/image_dedup_example.py b/nemo_curator_semantic_dedup/image_dedup_example.py new file mode 100644 index 0000000..474f465 --- /dev/null +++ b/nemo_curator_semantic_dedup/image_dedup_example.py @@ -0,0 +1,301 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import os +import time + +from helper import download_webdataset + +from nemo_curator.core.client import RayClient +from nemo_curator.pipeline import Pipeline +from nemo_curator.stages.deduplication.semantic import SemanticDeduplicationWorkflow +from nemo_curator.stages.file_partitioning import FilePartitioningStage +from nemo_curator.stages.image.deduplication.removal import ImageDuplicatesRemovalStage +from nemo_curator.stages.image.embedders.clip_embedder import ImageEmbeddingStage +from nemo_curator.stages.image.io.convert import ConvertImageBatchToDocumentBatchStage +from nemo_curator.stages.image.io.image_reader import ImageReaderStage +from nemo_curator.stages.image.io.image_writer import ImageWriterStage +from nemo_curator.stages.text.io.writer.parquet import ParquetWriter + + +def create_image_embedding_pipeline(args: argparse.Namespace) -> Pipeline: + """Create image curation pipeline with file partitioning, image reading, embedding, deduplication.""" + + # Define pipeline + pipeline = Pipeline(name="image_curation", description="Curate images with embeddings and quality scoring") + + # Stage 0: Partition tar files for parallel processing + pipeline.add_stage(FilePartitioningStage( + file_paths=args.input_wds_dataset_dir, + files_per_partition=args.tar_files_per_partition, + file_extensions=[".tar"], + )) + + # Stage 1: Read images from webdataset tar files (now runs in parallel) + pipeline.add_stage(ImageReaderStage( + task_batch_size=args.batch_size, + verbose=args.verbose, + num_threads=16, # More threads for I/O + num_gpus_per_worker=0.25, + )) + + # Stage 2: Generate CLIP embeddings for images + pipeline.add_stage(ImageEmbeddingStage( + model_dir=args.model_dir, + num_gpus_per_worker=args.embedding_gpus_per_worker, + model_inference_batch_size=args.embedding_batch_size, + verbose=args.verbose, + )) + + # Stage 3: Convert embeddings to document batch + pipeline.add_stage(ConvertImageBatchToDocumentBatchStage(fields=["image_id", "embedding"])) + + # Stage 4: Save embeddings to parquet file + pipeline.add_stage(ParquetWriter( + path=args.embeddings_dir, + )) + + return pipeline + +def create_embedding_deduplication_workflow(args: argparse.Namespace) -> Pipeline: + """Create image deduplication pipeline with embedding deduplication.""" + return SemanticDeduplicationWorkflow( + input_path=args.embeddings_dir, + output_path=args.removal_parquets_dir, + id_field="image_id", + embedding_field="embedding", + n_clusters=100, + eps=0.01, + read_kwargs={"storage_options": {}}, + write_kwargs={"storage_options": {}}, + verbose=args.verbose, + ) + +def create_image_deduplication_pipeline(args: argparse.Namespace) -> Pipeline: + """Create image deduplication pipeline with image deduplication.""" + # Define pipeline + pipeline = Pipeline(name="image_deduplication", description="Deduplicate images with image deduplication") + + # Stage 0: Partition tar files for parallel processing + pipeline.add_stage(FilePartitioningStage( + file_paths=args.input_wds_dataset_dir, + files_per_partition=args.tar_files_per_partition, + file_extensions=[".tar"], + )) + + # Stage 1: Read images from webdataset tar files (now runs in parallel) + pipeline.add_stage(ImageReaderStage( + task_batch_size=args.batch_size, + verbose=args.verbose, + num_threads=16, # More threads for I/O + num_gpus_per_worker=0.25, + )) + + # Stage 2: Read removal list from parquet file and filter images + pipeline.add_stage(ImageDuplicatesRemovalStage( + removal_parquets_dir=args.removal_parquets_dir + "/duplicates", + duplicate_id_field="id", + verbose=args.verbose, + )) + + # Stage 3: Write filtered images to disk + pipeline.add_stage(ImageWriterStage( + output_dir=args.output_dataset_dir, + remove_image_data=True, + verbose=args.verbose, + )) + + return pipeline + + +def main(args: argparse.Namespace) -> None: + """Main execution function for image curation pipeline.""" + + ray_client = RayClient() + ray_client.start() + + print("Starting image curation pipeline...") + print(f"Input parquet file: {args.input_parquet}") + print(f"Input webdataset directory: {args.input_wds_dataset_dir}") + print(f"Output webdataset directory: {args.output_dataset_dir}") + print(f"Model directory: {args.model_dir}") + print(f"Tar files per partition: {args.tar_files_per_partition}") + print(f"Task batch size: {args.batch_size}") + print("\n" + "=" * 50 + "\n") + + # Step 1: Download and prepare webdataset from parquet file + if not args.skip_download: + print("Step 1: Downloading webdataset from parquet file...") + download_start = time.time() + + # Create output directory if it doesn't exist + os.makedirs(args.input_wds_dataset_dir, exist_ok=True) + + # Download webdataset using helper function + download_webdataset( + parquet_path=args.input_parquet, + output_dir=args.input_wds_dataset_dir, + num_processes=args.download_processes, + entries_per_tar=args.entries_per_tar, + ) + + download_time = time.time() - download_start + print(f"✓ Dataset download completed in {download_time:.2f} seconds") + print(f"✓ Webdataset saved to: {args.input_wds_dataset_dir}") + print("\n" + "=" * 50 + "\n") + else: + print("Step 1: Skipping download (using existing dataset)") + print(f"Using existing dataset at: {args.input_wds_dataset_dir}") + print("\n" + "=" * 50 + "\n") + + # Step 2: Create and run curation pipelines + # Step 2.1: Create image embedding pipeline + print("Step 2.1: Running image embedding pipeline...") + start_time = time.time() + pipeline = create_image_embedding_pipeline(args) + print(pipeline.describe()) + print("\n" + "=" * 50 + "\n") + pipeline.run() + + # Step 2.2: Create image deduplication pipeline (pairwise executor is XennaExecutor by default) + print("Step 2.2: Running image deduplication pipeline...") + start_time = time.time() + pipeline = create_embedding_deduplication_workflow(args) + print("\n" + "=" * 50 + "\n") + pipeline.run() + + # Step 2.3: Create image deduplication pipeline + print("Step 2.3: Running image deduplication pipeline...") + start_time = time.time() + pipeline = create_image_deduplication_pipeline(args) + print(pipeline.describe()) + print("\n" + "=" * 50 + "\n") + pipeline.run() + + end_time = time.time() + + # Calculate and print execution time + execution_time = end_time - start_time + hours, remainder = divmod(execution_time, 3600) + minutes, seconds = divmod(remainder, 60) + + print("\nImage curation pipeline completed!") + print(f"Total execution time: {int(hours):02d}:{int(minutes):02d}:{seconds:.2f}") + print(f"Total execution time: {execution_time:.2f} seconds") + print(f"\nProcessed dataset available at: {args.output_dataset_dir}") + + ray_client.stop() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Image curation pipeline with embedding generation and quality scoring" + ) + + # Dataset arguments + parser.add_argument( + "--input-parquet", + type=str, + required=False, + default=None, + help="Path to input parquet file containing image URLs and metadata" + ) + parser.add_argument( + "--input-wds-dataset-dir", + type=str, + required=True, + help="Directory to save the downloaded webdataset" + ) + parser.add_argument( + "--output-dataset-dir", + type=str, + required=True, + help="Directory to save the resulting webdataset" + ) + parser.add_argument( + "--embeddings-dir", + type=str, + required=True, + help="Directory to save the embeddings" + ) + parser.add_argument( + "--removal-parquets-dir", + type=str, + required=True, + help="Directory to save the remove parquets" + ) + parser.add_argument( + "--download-processes", + type=int, + default=8, + help="Number of parallel processes for downloading images" + ) + parser.add_argument( + "--entries-per-tar", + type=int, + default=1000, + help="Number of entries per tar shard during download" + ) + parser.add_argument( + "--skip-download", + action="store_true", + default=False, + help="Skip dataset download and use existing webdataset" + ) + + # Image reader arguments + parser.add_argument( + "--tar-files-per-partition", + type=int, + default=1, + help="Number of tar files to process per partition (controls parallelism) for FilePartitioningStage" + ) + parser.add_argument( + "--batch-size", + type=int, + default=100, + help="Number of images per ImageBatch for the reader stage" + ) + + # General arguments + parser.add_argument( + "--model-dir", + type=str, + required=True, + help="Path to model directory containing all model weights" + ) + parser.add_argument( + "--verbose", + action="store_true", + default=False, + help="Enable verbose logging for all stages" + ) + + # Embedding stage arguments + parser.add_argument( + "--embedding-batch-size", + type=int, + default=32, + help="Batch size for embedding generation" + ) + parser.add_argument( + "--embedding-gpus-per-worker", + type=float, + default=0.25, + help="GPU allocation per worker for embedding generation" + ) + + args = parser.parse_args() + main(args) \ No newline at end of file From a812e2cad5f740126343b673f6e3c0b0b2158f5f Mon Sep 17 00:00:00 2001 From: Avi Basnet Date: Sun, 30 Nov 2025 00:52:08 +0000 Subject: [PATCH 2/6] working job submit version --- nemo_curator_semantic_dedup/Dockerfile | 64 +++++++ nemo_curator_semantic_dedup/README.md | 168 +++--------------- .../image_dedup_example.py | 119 ++++++++++--- nemo_curator_semantic_dedup/job.yaml | 66 +++++++ 4 files changed, 248 insertions(+), 169 deletions(-) create mode 100644 nemo_curator_semantic_dedup/Dockerfile create mode 100644 nemo_curator_semantic_dedup/job.yaml diff --git a/nemo_curator_semantic_dedup/Dockerfile b/nemo_curator_semantic_dedup/Dockerfile new file mode 100644 index 0000000..200ef69 --- /dev/null +++ b/nemo_curator_semantic_dedup/Dockerfile @@ -0,0 +1,64 @@ +# NeMo Curator Image Deduplication Example +# Uses CUDA 12.8 for GPU-accelerated processing +FROM anyscale/ray:2.52.0-slim-py312-cu128 + +# Install system dependencies +RUN sudo apt-get update && \ + sudo apt-get install -y --no-install-recommends \ + build-essential \ + unzip \ + wget \ + curl && \ + sudo apt-get clean && \ + sudo rm -rf /var/lib/apt/lists/* + +# Install uv for fast package management +RUN curl -LsSf https://astral.sh/uv/install.sh | sh + +# Install Python dependencies +# NeMo Curator with CUDA 12 support for image processing +RUN uv pip install --system "nemo-curator[image_cuda12]" + +# Additional dependencies for image downloading and processing +RUN uv pip install --system \ + loguru \ + Pillow \ + aiohttp \ + tqdm \ + pandas \ + pyarrow \ + huggingface_hub \ + transformers + +# Pre-download CLIP model weights to avoid runtime downloads +# This makes job startup faster and more reliable +RUN python -c "\ +from huggingface_hub import snapshot_download; \ +import os; \ +model_dir = '/home/ray/model_weights/openai/clip-vit-large-patch14'; \ +os.makedirs(model_dir, exist_ok=True); \ +snapshot_download('openai/clip-vit-large-patch14', local_dir=model_dir)" + +# Set environment variable for model directory +ENV MODEL_DIR=/home/ray/model_weights + +# Download and prepare the example dataset from HuggingFace +# Downloads MS COCO parquet, deduplicates URLs, and truncates to 100k rows +RUN mkdir -p /home/ray/data && \ + curl -L https://huggingface.co/datasets/ChristophSchuhmann/MS_COCO_2017_URL_TEXT/resolve/main/mscoco.parquet \ + -o /home/ray/data/mscoco.parquet && \ + python -c "\ +import pandas as pd; \ +df = pd.read_parquet('/home/ray/data/mscoco.parquet'); \ +deduped = df[~df['URL'].duplicated()]; \ +truncated = deduped[:100000]; \ +truncated.to_parquet('/home/ray/data/truncated_100k_mscoco.parquet'); \ +print(f'Created truncated dataset with {len(truncated)} rows')" && \ + rm /home/ray/data/mscoco.parquet + +# Create output directories +RUN mkdir -p /home/ray/data/webdataset \ + /home/ray/data/results \ + /home/ray/data/embeddings \ + /home/ray/data/removal_ids + diff --git a/nemo_curator_semantic_dedup/README.md b/nemo_curator_semantic_dedup/README.md index 2cda472..84b872e 100644 --- a/nemo_curator_semantic_dedup/README.md +++ b/nemo_curator_semantic_dedup/README.md @@ -1,28 +1,8 @@ -# Semantic Deduplication with NeMo Curator +# Image Semantic Deduplication with NeMo Curator -This example demonstrates how to perform GPU-accelerated semantic deduplication on large text datasets using [NVIDIA NeMo Curator](https://github.com/NVIDIA-NeMo/Curator) on Anyscale. +This example uses [NVIDIA NeMo Curator](https://github.com/NVIDIA-NeMo/Curator) to perform GPU-accelerated semantic deduplication on image datasets. -## What is Semantic Deduplication? - -Unlike exact or fuzzy deduplication that matches text patterns, **semantic deduplication** identifies documents that are conceptually similar even if they use different words. This is achieved by: - -1. **Computing embeddings**: Converting text into dense vector representations using neural network models -2. **Clustering**: Grouping similar embeddings using GPU-accelerated k-means -3. **Similarity matching**: Identifying near-duplicates within clusters based on cosine similarity - -This approach is particularly effective for: -- Removing paraphrased content -- Identifying translated duplicates -- Cleaning datasets with rephrased information -- Improving LLM training data quality - -## Performance - -NeMo Curator leverages NVIDIA RAPIDS™ libraries (cuDF, cuML, cuGraph) for GPU acceleration: - -- **16× faster** fuzzy deduplication compared to CPU-based alternatives -- **40% lower** total cost of ownership (TCO) -- **Near-linear scaling** across multiple GPUs +NeMo Curator is a scalable data curation library that leverages NVIDIA RAPIDS™ for GPU acceleration. This example downloads images from a parquet file, generates CLIP embeddings, and removes near-duplicate images based on semantic similarity. ## Install the Anyscale CLI @@ -31,145 +11,47 @@ pip install -U anyscale anyscale login ``` -## Submit the Job +## Run the job -Clone the example from GitHub: +Clone the example from GitHub. ```bash git clone https://github.com/anyscale/examples.git cd examples/nemo_curator_semantic_dedup ``` -Submit the job: +Submit the job. ```bash anyscale job submit -f job.yaml ``` -### Using Your Own Data - -To process your own dataset, set the `INPUT_DATA_PATH` environment variable: +## Understanding the example -```bash -anyscale job submit -f job.yaml \ - --env INPUT_DATA_PATH=s3://your-bucket/your-data/ \ - --env OUTPUT_DATA_PATH=s3://your-bucket/output/ -``` +- The [Dockerfile](./Dockerfile) builds a custom image with NeMo Curator CUDA dependencies (`nemo-curator[image_cuda12]`), downloads the MS COCO sample dataset from HuggingFace, and pre-downloads the CLIP model weights to speed up job startup. -Your input data should be in Parquet or JSONL format with at least two columns: -- `id`: Unique document identifier -- `text`: The text content to deduplicate +- The entrypoint defined in [job.yaml](./job.yaml) runs `image_dedup_example.py` which executes a 3-step pipeline: + 1. **Download WebDataset**: Fetches images from URLs in the parquet file and saves them as WebDataset tar files to `/mnt/cluster_storage/nemo_curator/webdataset` + 2. **Generate CLIP embeddings**: Uses OpenAI's CLIP ViT-L/14 model to create 768-dimensional embeddings for each image + 3. **Semantic deduplication**: Clusters embeddings with k-means and removes near-duplicates based on cosine similarity -## Configuration Options +- The `/mnt/cluster_storage/` directory is an ephemeral shared filesystem attached to the cluster for the duration of the job. All outputs (embeddings, duplicate IDs, and deduplicated images) are saved here. -You can customize the pipeline behavior via environment variables: +- To use your own data, prepare a parquet file with `URL` and `TEXT` columns, upload it to cluster storage, and override the `INPUT_PARQUET` environment variable: + ```bash + anyscale job submit -f job.yaml \ + --env INPUT_PARQUET=/mnt/cluster_storage/your_data.parquet \ + --env OUTPUT_DIR=/mnt/cluster_storage/your_results + ``` -| Variable | Default | Description | -|----------|---------|-------------| -| `INPUT_DATA_PATH` | `/mnt/cluster_storage/semantic_dedup/input` | Path to input dataset | -| `OUTPUT_DATA_PATH` | `/mnt/cluster_storage/semantic_dedup/output` | Path for output data | -| `EMBEDDING_MODEL` | `sentence-transformers/all-MiniLM-L6-v2` | HuggingFace model for embeddings | -| `EMBEDDING_BATCH_SIZE` | `64` | Batch size per GPU for embedding computation | -| `NUM_CLUSTERS` | `1000` | Number of k-means clusters | -| `SIMILARITY_THRESHOLD` | `0.8` | Cosine similarity threshold (0.0-1.0) | +- The [helper.py](./helper.py) module provides utilities for downloading images in parallel and converting them to [WebDataset](https://github.com/webdataset/webdataset) format, which is optimized for streaming large-scale image datasets. -### Embedding Model Options +## View the job -| Model | Quality | Speed | Use Case | -|-------|---------|-------|----------| -| `sentence-transformers/all-MiniLM-L6-v2` | Good | Fast | Quick experiments, large datasets | -| `intfloat/e5-large-v2` | Better | Medium | Production workloads | -| `BAAI/bge-large-en-v1.5` | Best | Slower | High-quality deduplication | - -### Tuning the Similarity Threshold - -- **Higher threshold (e.g., 0.9)**: Stricter matching, fewer duplicates removed -- **Lower threshold (e.g., 0.7)**: Looser matching, more duplicates removed - -Start with 0.8 and adjust based on your quality requirements. - -## Understanding the Example - -### Pipeline Architecture - -``` -┌─────────────────┐ -│ Input Data │ Parquet/JSONL files -└────────┬────────┘ - │ - ▼ -┌─────────────────┐ -│ Embedding │ GPU-accelerated transformer inference -│ Creation │ (sentence-transformers, E5, BGE, etc.) -└────────┬────────┘ - │ - ▼ -┌─────────────────┐ -│ Clustering │ GPU-accelerated k-means (cuML) -│ (k-means) │ Groups similar embeddings -└────────┬────────┘ - │ - ▼ -┌─────────────────┐ -│ Duplicate │ Pairwise similarity within clusters -│ Extraction │ Identifies semantic duplicates -└────────┬────────┘ - │ - ▼ -┌─────────────────┐ -│ Deduplicated │ Original data minus duplicates -│ Output │ -└─────────────────┘ -``` +View the job in the [jobs tab](https://console.anyscale.com/jobs) of the Anyscale console. -### Key Components - -- **EmbeddingCreator**: Computes dense vector embeddings using pre-trained models -- **ClusteringModel**: GPU-accelerated k-means clustering with cuML -- **SemanticClusterLevelDedup**: Finds duplicates within clusters using cosine similarity - -### Scaling Considerations - -- **Number of clusters**: Should be roughly √(n_documents) for balanced cluster sizes -- **Memory**: Each GPU should have enough memory for the embedding model (~4GB for MiniLM, ~8GB for larger models) -- **Batch size**: Increase for faster processing, decrease if running out of GPU memory - -## Output - -The pipeline produces: - -1. **Deduplicated dataset**: Parquet files in `{OUTPUT_DATA_PATH}/{timestamp}/deduplicated/` -2. **Cache files**: Intermediate embeddings and clusters for debugging - -Example output log: - -``` -============================================================ -Semantic Deduplication Complete! -============================================================ -Original documents: 1,000,000 -Duplicates removed: 127,543 -Final documents: 872,457 -Reduction: 12.75% -Output saved to: /mnt/cluster_storage/semantic_dedup/output/20250115T143022Z/deduplicated -============================================================ -``` - -## Monitoring - -View your job progress in the [Anyscale Console](https://console.anyscale.com/jobs). The Dask dashboard link will be printed in the logs for detailed task monitoring. - -## Cost Optimization Tips - -1. **Use spot instances**: The job.yaml is configured with `market_type: PREFER_SPOT` for cost savings -2. **Start small**: Test with a subset of your data before running on the full dataset -3. **Choose the right GPU**: A10G instances offer a good balance of cost and performance -4. **Tune batch size**: Larger batches = better GPU utilization = faster processing - -## Learn More +## Learn more - [NeMo Curator Documentation](https://docs.nvidia.com/nemo/curator/latest/) -- [Semantic Deduplication Guide](https://docs.nvidia.com/nemo/curator/latest/curate-text/process-data/deduplication/index.html) -- [NeMo Curator GitHub](https://github.com/NVIDIA-NeMo/Curator) -- [Anyscale Documentation](https://docs.anyscale.com/) - +- [NeMo Curator Image Tutorials](https://github.com/NVIDIA-NeMo/Curator/tree/main/tutorials/image/getting-started) +- [Anyscale Jobs Documentation](https://docs.anyscale.com/platform/jobs/) diff --git a/nemo_curator_semantic_dedup/image_dedup_example.py b/nemo_curator_semantic_dedup/image_dedup_example.py index 474f465..077b94e 100644 --- a/nemo_curator_semantic_dedup/image_dedup_example.py +++ b/nemo_curator_semantic_dedup/image_dedup_example.py @@ -16,6 +16,7 @@ import os import time +import ray from helper import download_webdataset from nemo_curator.core.client import RayClient @@ -199,9 +200,44 @@ def main(args: argparse.Namespace) -> None: ray_client.stop() +def get_env_or_arg(env_var: str, arg_value, default=None): + """Get value from environment variable or command-line argument.""" + env_value = os.environ.get(env_var) + if env_value is not None: + return env_value + if arg_value is not None: + return arg_value + return default + + +def get_env_bool(env_var: str, arg_value: bool, default: bool = False) -> bool: + """Get boolean value from environment variable or command-line argument.""" + env_value = os.environ.get(env_var) + if env_value is not None: + return env_value.lower() in ("true", "1", "yes") + return arg_value if arg_value is not None else default + + +def get_env_int(env_var: str, arg_value: int, default: int) -> int: + """Get integer value from environment variable or command-line argument.""" + env_value = os.environ.get(env_var) + if env_value is not None: + return int(env_value) + return arg_value if arg_value is not None else default + + +def get_env_float(env_var: str, arg_value: float, default: float) -> float: + """Get float value from environment variable or command-line argument.""" + env_value = os.environ.get(env_var) + if env_value is not None: + return float(env_value) + return arg_value if arg_value is not None else default + + if __name__ == "__main__": parser = argparse.ArgumentParser( - description="Image curation pipeline with embedding generation and quality scoring" + description="Image curation pipeline with embedding generation and quality scoring. " + "Arguments can also be set via environment variables (see job.yaml)." ) # Dataset arguments @@ -210,71 +246,76 @@ def main(args: argparse.Namespace) -> None: type=str, required=False, default=None, - help="Path to input parquet file containing image URLs and metadata" + help="Path to input parquet file containing image URLs and metadata (env: INPUT_PARQUET)" ) parser.add_argument( "--input-wds-dataset-dir", type=str, - required=True, - help="Directory to save the downloaded webdataset" + required=False, + default=None, + help="Directory to save the downloaded webdataset (env: INPUT_WDS_DIR)" ) parser.add_argument( "--output-dataset-dir", type=str, - required=True, - help="Directory to save the resulting webdataset" + required=False, + default=None, + help="Directory to save the resulting webdataset (env: OUTPUT_DIR)" ) parser.add_argument( "--embeddings-dir", type=str, - required=True, - help="Directory to save the embeddings" + required=False, + default=None, + help="Directory to save the embeddings (env: EMBEDDINGS_DIR)" ) parser.add_argument( "--removal-parquets-dir", type=str, - required=True, - help="Directory to save the remove parquets" + required=False, + default=None, + help="Directory to save the remove parquets (env: REMOVAL_DIR)" ) parser.add_argument( "--download-processes", type=int, - default=8, - help="Number of parallel processes for downloading images" + default=None, + help="Number of parallel processes for downloading images (env: DOWNLOAD_PROCESSES)" ) parser.add_argument( "--entries-per-tar", type=int, - default=1000, - help="Number of entries per tar shard during download" + default=None, + help="Number of entries per tar shard during download (env: ENTRIES_PER_TAR)" ) parser.add_argument( "--skip-download", action="store_true", - default=False, - help="Skip dataset download and use existing webdataset" + default=None, + help="Skip dataset download and use existing webdataset (env: SKIP_DOWNLOAD)" ) # Image reader arguments parser.add_argument( "--tar-files-per-partition", type=int, - default=1, - help="Number of tar files to process per partition (controls parallelism) for FilePartitioningStage" + default=None, + help="Number of tar files to process per partition (env: TAR_FILES_PER_PARTITION)" ) parser.add_argument( "--batch-size", type=int, - default=100, - help="Number of images per ImageBatch for the reader stage" + default=None, + help="Number of images per ImageBatch for the reader stage (env: BATCH_SIZE)" ) # General arguments parser.add_argument( "--model-dir", type=str, - required=True, - help="Path to model directory containing all model weights" + required=False, + default=None, + help="Path to model directory containing all model weights (env: MODEL_DIR)" ) parser.add_argument( "--verbose", @@ -287,15 +328,41 @@ def main(args: argparse.Namespace) -> None: parser.add_argument( "--embedding-batch-size", type=int, - default=32, - help="Batch size for embedding generation" + default=None, + help="Batch size for embedding generation (env: EMBEDDING_BATCH_SIZE)" ) parser.add_argument( "--embedding-gpus-per-worker", type=float, - default=0.25, + default=None, help="GPU allocation per worker for embedding generation" ) - args = parser.parse_args() + cli_args = parser.parse_args() + + # Resolve arguments from environment variables or command-line args + args = argparse.Namespace( + input_parquet=get_env_or_arg("INPUT_PARQUET", cli_args.input_parquet), + input_wds_dataset_dir=get_env_or_arg("INPUT_WDS_DIR", cli_args.input_wds_dataset_dir), + output_dataset_dir=get_env_or_arg("OUTPUT_DIR", cli_args.output_dataset_dir), + embeddings_dir=get_env_or_arg("EMBEDDINGS_DIR", cli_args.embeddings_dir), + removal_parquets_dir=get_env_or_arg("REMOVAL_DIR", cli_args.removal_parquets_dir), + model_dir=get_env_or_arg("MODEL_DIR", cli_args.model_dir, "/home/ray/model_weights"), + download_processes=get_env_int("DOWNLOAD_PROCESSES", cli_args.download_processes, 8), + entries_per_tar=get_env_int("ENTRIES_PER_TAR", cli_args.entries_per_tar, 1000), + skip_download=get_env_bool("SKIP_DOWNLOAD", cli_args.skip_download, False), + tar_files_per_partition=get_env_int("TAR_FILES_PER_PARTITION", cli_args.tar_files_per_partition, 1), + batch_size=get_env_int("BATCH_SIZE", cli_args.batch_size, 100), + embedding_batch_size=get_env_int("EMBEDDING_BATCH_SIZE", cli_args.embedding_batch_size, 32), + embedding_gpus_per_worker=get_env_float("EMBEDDING_GPUS_PER_WORKER", cli_args.embedding_gpus_per_worker, 0.25), + verbose=cli_args.verbose, + ) + + # Validate required arguments + required_args = ["input_wds_dataset_dir", "output_dataset_dir", "embeddings_dir", "removal_parquets_dir"] + missing = [arg for arg in required_args if getattr(args, arg) is None] + if missing: + parser.error(f"Missing required arguments: {', '.join(missing)}. " + "Set them via command-line or environment variables.") + main(args) \ No newline at end of file diff --git a/nemo_curator_semantic_dedup/job.yaml b/nemo_curator_semantic_dedup/job.yaml new file mode 100644 index 0000000..78fbd5c --- /dev/null +++ b/nemo_curator_semantic_dedup/job.yaml @@ -0,0 +1,66 @@ +# NeMo Curator Image Semantic Deduplication Job +# View the docs: https://docs.anyscale.com/reference/job-api#jobconfig + +name: nemo-curator-image-dedup + +# Build custom image with NeMo Curator CUDA dependencies +containerfile: ./Dockerfile + +# Use named compute config with L40S GPU +compute_config: "nemo-compute-config" + +# Working directory - upload only the example code, not data +working_dir: . + +# Environment variables for job configuration +# Override these when submitting to use your own data paths +env_vars: + # Input parquet file with image URLs (TEXT and URL columns) + # This file is copied into the Docker image during build + INPUT_PARQUET: "/home/ray/data/truncated_100k_mscoco.parquet" + + # Directory for WebDataset tar files (created from parquet) + # Use /mnt/cluster_storage for persistence, or /home/ray/data for ephemeral + INPUT_WDS_DIR: "/mnt/cluster_storage/nemo_curator/webdataset" + + # Output directory for deduplicated images + OUTPUT_DIR: "/mnt/cluster_storage/nemo_curator/results" + + # Directory to store CLIP embeddings + EMBEDDINGS_DIR: "/mnt/cluster_storage/nemo_curator/embeddings" + + # Directory for duplicate removal parquets + REMOVAL_DIR: "/mnt/cluster_storage/nemo_curator/removal_ids" + + # Model weights directory (pre-downloaded in Docker image) + MODEL_DIR: "/home/ray/model_weights" + + # Processing settings + BATCH_SIZE: "32" + EMBEDDING_BATCH_SIZE: "32" + TAR_FILES_PER_PARTITION: "10" + DOWNLOAD_PROCESSES: "8" + ENTRIES_PER_TAR: "1000" + + # Set to "true" to skip downloading (use existing WebDataset) + # WebDataset already exists from previous run + SKIP_DOWNLOAD: "false" + + # Ray memory settings to avoid OOM + RAY_DEFAULT_OBJECT_STORE_MEMORY_PROPORTION: "0.5" + + # Increase Ray API server limit for cosmos_xenna monitoring + RAY_MAX_LIMIT_FROM_API_SERVER: "100000" + +# When empty, uses the default Anyscale Cloud +cloud: + +# The entrypoint script +entrypoint: python image_dedup_example.py + +# Don't retry on failure - easier to debug +max_retries: 0 + +# Kill after 4 hours to control costs (adjust based on dataset size) +timeout_s: 14400 + From a5cd5b37d1c77b4fc76fd11f3c13119fe7087ad2 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Sun, 30 Nov 2025 15:34:16 -0800 Subject: [PATCH 3/6] remove unnecessary file --- .../__pycache__/helper.cpython-312.pyc | Bin 15239 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 nemo_curator_semantic_dedup/__pycache__/helper.cpython-312.pyc diff --git a/nemo_curator_semantic_dedup/__pycache__/helper.cpython-312.pyc b/nemo_curator_semantic_dedup/__pycache__/helper.cpython-312.pyc deleted file mode 100644 index b0e10dfe70c3ed15c17633f6b4060378992a8e3a..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 15239 zcmbt*YjhJ=nqZYwl1gvM?>B{CSb%NJBY+_R;|CbKZ4yWb4vCI%l|e?9oGKZxB6mBT zKHK&zXAslb!I?A@XFKOa*|6K*=V51GBq4Ly&g@w!5|O6j>|uBEW7so4oQ63`(|gYD z_uW#Jgi&w0cgyy@U)_7Z?>_3g-~ArfpV{pe3Z9=VZ1VrAgQEThU(`ph0Q7@PG(}yb zc#5Y3)D%5N(+KMVx-lKe^<#RHGh+sCX$2sPL+!%0Am5h~;I#Zx@s%)%`z-*v=s(h@RrUbiagLdXX#Z={3 zC8@Ins-~*PssXl+)$mn(HE;WgVXT&~;q5| zD*$TY+jtLO2~g{-&a?SGeg+RMTYbS`DC`USLqTZ--c8d!G3@sR0A!AZLYUi6oI2L; z?LE@pJ2W_Q7z&Jm(Al$s2#{fL%6C@4w&7R#Dafs)VD!wSFdl|i3wb@{3y+_>e+N1o z&wYKu1IQz9P!#>(?{pM3MA5(vVc-en;pwQ3rz7z4A=Kj2N4=b+s5!lvic*t>bd+i; zN{04hwn*JXc4AdMGWhU7?o(hEtPvhJ9 zHi{CrKr6dim(M@Ufo}je6RM0tbsV`Ge$;;7fcL^)X=+?YQ^%>Oo~O_E0sXeHQ>8lT zQlWg#8>0HRftRB+q^9B{ew(^(RA)@M`2X5}R3FuyhqadI%l|rDuJ$xpr9%0fr&X%j zEOp8(Y3g!{p}uG2=BN(FY{8LVXH-fwHB0TLF5iN`Yw))Se?v`F7e&^Ifv-c>aW`dv z@m++KItBa!y}five~P+DH&J0|qdb!(DwNN|Why*bp+fn5Lf1@*?Q^iAxwUWjo*5Lb zJsUMVzRss$mFEmUG+h2b$uH6uDP@!}r$70p`ir#3I&vQw@jiax4`3ICVF%()Gll(A zLTDx&F~8Ik4hvJ$VJ=esGB@D!2LzrAhd4fTF&GH>crN1T=8l{=ag38-@6JfvBV9Ka zVR`}qK;V1|uEb3!h!Y`^^VnsB=nI||Wc^toESr4((7AAUTDJEF{6a8%Lg`pGD)eL% z68DY{$&4uYc+ms{l?}XbX6CGHRWSNRF(d*VX*M|S56QM11*O;^1q5N*W0XyjAW0xw zWc`d7kWHcx7X5+*Xvz>}RurUZ5IurS3leTxjy#VJcAg7O37w*Ewv!hod^3S?r*O$P zH60M7&Y&=>1<`gwb!n`w!GtCnKIk@ zliT|597=B+jlGtsJeICJmN;=bTUvYVz|{lqo&CJDJ-gBKe*L@kzu1te-S(}4s(O}& zW~s_!YeuS~e(BXed1-+)_%2pnNF27W^9c~TjN?aRpxnEM;WX?9e#Z{wsXl8 zn@w=LR>oJJPq_D`nf+O-bFCH$eRr>R+jnbZK;I=Qk9`NDg5PB2vEM$lQ0}UXt10Pf zdhcYWWpA=&Z^GIAYf6VxcUCO9|LC<|n;uF35mxobrTshUKi$%|LH7|&_vzS=bb5phoBAwOS7*NMqWel6x7{X$t4!E(LsNgJ;Zt2x|7OFdElBgz%?2## zWB`8bWHQ5a$jMX%ESWo*ph$TpbLWxzj`^f^^5jdSw=RnC%xxPFgi9b z`{0~)tGM}GJ>k?b!nr&`QKw;5=FB9u%vqyW-dqhRb2fl11hV6K{x%ae^VYYS>$*8d z)E>1(9lY)5`Z*_@U#0mTR4D3H`=F^PTdYO?oQj&FcHVx&@nKFg$YWkU1}9RbO0(E` z7G-%S@AA`sOl#+vstcgLSnSl)&bx4}M!8Of@;RI+ZgstPQFHG6+WyF;jeMUz`qD4*wP-6&H>O;h1cwM7(o06Bs$$5icivP-Sc=X`}) zJ6HC5%&zjW*_9W}u1uv9E&txxm4DCdG+K|%ZihPR$(<^c&(#^Lqxhb=R{S1wt$J*( zl?8Lv)+}22y>qSnp1EqY3g;T5FP~!2;&^Olz!%=F&UkW<3gz=?UZz!rwcOMjqooopBr~F~rK{79>Cf;))DeTdSsHDo~@lY@b zmJ%pZ%!&RNPssY`pBo0EJ9&>@+=wkq}ChMDk0`yL$erSKy|$D|YrM%WH8 z`Cgq7!kz|M7m{RV+7~`2Gm{~IP()=}MA-@YUGK48ZL~S9@pw+TGYi3)DFGBRL1z77 z0W`yll5F%%PYXewXkB7G(lDL{$^s~juwFPUnF&D)+2WV{LC`ycotc`JWZg6`oBMoW z-+%~9AzQ*BuR75h=m%|5*#gS;^i0^x`$gGIio7rr*#-mjVqdt3vUy4f`*p01g7e9hnh8wWn~K)!OFe z=ih(j-B(gAyS}J>YH?_xXYo>uj_t@+R4=t&J-=``TT&hqu5MoFy;t6_B*jLTdzQzS zcPC1=KuK*weD}>A$h-%NG+BoiY7GBURV*&VjcM ztn656SqZP$(zQ>=*j27+dEgh;82hF@>ngowy=q-La^uV|CqJ0HS&`~E7_+8bJ%G8f zd3iAIdv|!HI@!E0#qG~g5gr~67^F|f2Zt?3{#P0Dq_*QO!L}t8khYyYc*8qVaUiw_zipo`2E~@^wGEQF+2%#+y9bY z2d?~aPv@|O{-}!{+F<->yB^_PhYUk)`cJy)p+@5;`xt}|SfS#!g&t~QZd+>*ZfP2N zM*pcoKz{Yhi z!)Z-y6~mkntRg0AgdZJx8*8Wu-BsOXH$%;t=AgDB0N8DPlfr?o!T1%;- zpk?zqg7BKBiJBk&pigyNKWCgXT%=I%d(3wjRdu{D9*yJyKVZw8HLO`Zlbh9=e9k+g z7Wm$-sP%FGCGq=Ne`uWry?74tyx-5`9ytN37%N=zOJQ&5e8e;iK0NT~MQS+ky`e{p zn-D`&Ty76@sCtMfdm`53(*b{&^9RBC7qN0=b4y%kf{WNSRKYiX4xESPoFot=P!UmP zMBK-e!`>e}o5K`CP!6I7*;8hCK^m8tnII^r`ta0rq&n|t6v4-KLExa21Ko%dk!w8_ z;xmZHPWjFYP%4SfU4VtT+rcVlO7>c@0EhEveQOUA{8t2s( zMAq>WiUfTHTN{;S1WgwL95OR86C4MfbZQ#aKoMW7S8IwWX^2)q7DmLli=A zXD&k13y5=pMoIh7Mj1>fVPlr%6L*=-a2i`|S2iqeSgK2zTjvk0GUkQDZ}u+fes(yv zH&ORg!m>Nf?0Lvi1N12US2Ky{Ur4|BQsVS0sTaKo-*js9)pWo3P_J_s|GHoN=ciXr z-J%z&V~1j|E_E&aC|TQ)DC_*3-InMV*Yw~M{Dy*t(4)b!F!Xc%(!kr+*xd5aiZju9 z@RmJc8A&svI3CB6{jRw+-n{CpNLOyXS(A45Eih1FwqHJxwYjbgE)K@{rGb>KX%)1w ziJxuBx|_4^s)uHS!}5?an62Qdv}}4SVhVKOP}kj zNcigYUC>vN@=5L3&3E%*4g`t}Qp_K_#c8Ob9NHUlBXH!YsgPJ3)faOp0?i_=Rhr=X z(;ufi#>m!Vq5uL?;8b&r6NT}R$a5Ip@C7;I>F2=I1%tZ7qnCAlUbcor|5<;~7x2Q1 zU3(!GuG1$-V2)z!Kms~oAqJ(1i#gaVOq~&U9weu1*FcaDQyw#zxFn_k6fFvbhXBf*S;2+z%Eq8@5eH}q1$fB4;~k;@i;Sb@-G1DCRfH( zn>5uXOq;T8U75E1$+rD~YD=;$q^>4us!5ny?`_(a+0>of)ScPXlibvk+SCh;tVy%y?fm8WWpV!(zEa8 zVA3;?YB>y_-26G)m^C|Ndo!giN%%Fl6t){kdU{hWeScs(I87|A3X5o;8V01Xh%qPM zi%Iv_XY%Y(R3g*GuvBIN8&C`BL^X4z(7c(nz)03TBn%G0q5QQ32={0P4unLU>k!~o zmCS0-QG-{L>A+H&agoPF^Co}Y0Tg3;mIg*?Lq<_VCTcRsCZIPF&6D#_aJ~x=I`*9j z2puCL`gKKg>5AQucr2>eLp%|Cm(8fXf)CU;D?4)$SKtpN=z16thcHn@Aw-`q_yRM6 z_ya6MTiRn3#}OuMw*-(X8>jsN*{X<1#VweVx99{b2o@{KnVYbu(i%F4p^5JUtvm21 z1t1}OsR^#H%c`%9UL9Q?%rx&wHt$K5?OiahRTNkW%*_~@{ifwz63Guil zO2)4N_`~dJP%rew;){GCqiO6{?1(3f1~IoAnuQ9CrXBlvYa5tJQ!9X(im+eQkhg4$ z-LuHkx?xo9Ltr_RhmABK!yP?7I+7Ph1w7YrA4LsVF-CuoM`a#mulO`38t098EaGu2 zIf2P@m^{v0d6p7KvGy1wDs!PY6BXtnj#R?D=djQZ$rIex_Y@v6BXA3x78%f?PM53-IF=h<{(hNU2@6Vj2 zAg0U`?@dna2fskM(kH0%P{UhpSXF1K>W1|3Bf|G<*kq zXJnqwj^2# zJ0ExypSiNAa}urSbx%}_oPx(`lUf;dMFCsIg&L1mQvw*dwTJR8TDdD)R)}$Vj6@?D z9)1eix2x?ZJ5(s2=WiCwl}F2q#be<%f#Zg=fFABb@_^@u-*eP=?L?vOS1#PPe`qelb`mK(#kkRBAQSg7DA#&JS%k*2npuSbqvD^Qa3jY{vPye73o58jrpgAfIld3 zO2kehlMCRX3D(W^`N3F%XzDC@(qIG^A=n5$taC!)5T$dWnE=n75jY8^#A}2k<#rkI zAd!K0b0afg1d2GSTqVxd9~=+N@B-uZoXcY5iVK+AjgP9h_{S$tH-wF#2H~zp9lmrpIbud%a z#?7s-AC$ZxRQM$htjd|GAR&4}lz^2Z?h%*{ep2!oaXCgJ)&id)_!x85ivO`hTL%ba zD>EQoN;tMH`B@WX4y=fO23^Q{v@xCnNG&KQ4cD*r0FW@?$$x3a1&#m5&woZ3j#oP2 zWO|!S^AV%MSP@noN5sK#?c9l+-U>G$OYr@e63!+o+0BU#oNAfn_{=y4J||`Zfm!9# zqeC@6{fH^|O(G?uBq)rIPlcPhIZrileTuUsX ztp>Ov-L;K4KS3qNFfw{b!+wnA=e?bZ{&EB<^de;|nEq2|fXl%PKn!`YAL~$$B}SHr zN(k|~iZ37p=NEWoWjmRL64HgsmDYkiA-kX8Yw}^lw*Xak`vTw@A>ItdZz7QplemJF zEMZX`32~a*a<7D6WLM5rtwqU2REvm57UI_sJtr;}mue!aP7J~uvPBsakiL{x`ia3Q zTQv-RLc)U-Ma7ekqVm@x%PQknV)vk`6?kU<643t}{7DDl6amj{qw1NpF7tPpE#Pse zu1zrIU%DF>3|Y21!*WTMTP}aU=G~gR>{hTQOE!FCq-^!8tUck_me^FP9f``$_*~-Yk;MMd#HcS}Ig@6_zpmrn8GLK-J^ua4cPA5Fhf=M*sk*)dQw`m@ z-1A3P?Ip3UH^&n-TT&HU6CDRK_Ja#>;j}I8j(35ZHo4_sqOxbf1{O%&miV4qmUQh< zf~{JuY+b%|^XQ%abmemkw$;k&YZtFxycWG0T^>qw^`t5fL194@)*3&ZD(zliq0hPo zKnH4$@++qoPcOAWjN6VeZqN3W{a<(vCS5Ht%hKT2)y>(mx@=iPc7rFo`2cu&Yf56q zhc2pV-_5lp|fZ|@uTtmE3d{kB)GjdD{kv=4W|x1n>g@X;<>SeWVemxZ&3qlv5WooxCqB9{naHlgLXcdQ!Dx>n5Hy*u$TtXEpnNSn6*|O!0d&QG z35iU1d;lRF*{Pw!T@etR%C`O`w2)8{gFQnwiTH035tHq3luH}{;1sk`&W0u5vSle6 z?^^OE?CtSq<5FBmSa&9vor(nlwubf~3`FS%+W|lqIm}KU0ns!1k92^ z;BWw+8QhBJ^*DPn>!*4Z!Il%~z95KpatLsPeG(i|aIpF~h(y6XB+icpARcgH$9;60 z8t-Oq~cVFdyOW3@@`maMqsnho$wFo%k0>5tDtm9yjk(40t~!Vb(|PSPs+-g-{9F^r_yR31s)KdbN4CKotqXz} z=8g>xE4mcwEV;{4Ii-y_%t1z@rqO{Wx1zNvM>&%>o#EaU$X7@O;XK3)--8}yn{rwx z7_#xq%*2EsKF)Ce7}|-L9Kr=gH5{yEyYtG};@Cet4a>^de>(E|NbITQt@99&>dZ10 z5-y{`Cbqige?V3iBAEDPJgmq;7AepDw?c>VuLrqnXUaV*^jRu5;`H$C@pctni;LKZ z$B2t^BcY%Gbik~kBDQJ1165UWBTTthLD0vL#uu1mAdwk@VIXmJ69+K7d<~ne>t{WdxQdPPRP!Usx73S& z4UO(26OTYl_TNDsabf7=r9o5`h_&#t`0tSrng4%4h|C`qLmaic)7YNO1KUR|{!mA5b9pw&7O!_=2Dne~dPNYI6uHr(>?h*4`Zrki3?_#m z0k0_pp2HA=mPlwELOyUw(4PxhgDF6w<`77loyHgR%#r(;vT+*3CtOB+9*N-p;?0ON z%75tSVb>FntP)!Ky%lwnGj;JsTrULhiDVsPd_NL(7PVg80md$Ra8S$wxTjy zzj4j_EKPT0OPkj82;6g*tr-viI#oc2fGh>2W&*-L Date: Mon, 1 Dec 2025 09:07:24 +0000 Subject: [PATCH 4/6] changed custom compute config --- nemo_curator_semantic_dedup/job.yaml | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/nemo_curator_semantic_dedup/job.yaml b/nemo_curator_semantic_dedup/job.yaml index 78fbd5c..23b3ab6 100644 --- a/nemo_curator_semantic_dedup/job.yaml +++ b/nemo_curator_semantic_dedup/job.yaml @@ -6,8 +6,12 @@ name: nemo-curator-image-dedup # Build custom image with NeMo Curator CUDA dependencies containerfile: ./Dockerfile -# Use named compute config with L40S GPU -compute_config: "nemo-compute-config" +# Compute configuration with L40S GPU for CUDA-accelerated image processing +compute_config: + head_node: + instance_type: g6.8xlarge # 1x L40S GPU, 32 vCPUs, 128GB RAM (AWS) + # For GCP, use: g2-standard-32 (1x L4 GPU) + worker_nodes: [] # Run entirely on head node # Working directory - upload only the example code, not data working_dir: . @@ -52,9 +56,6 @@ env_vars: # Increase Ray API server limit for cosmos_xenna monitoring RAY_MAX_LIMIT_FROM_API_SERVER: "100000" -# When empty, uses the default Anyscale Cloud -cloud: - # The entrypoint script entrypoint: python image_dedup_example.py From 89019bad6ee49c47db3c3bf68b0e16b25a084940 Mon Sep 17 00:00:00 2001 From: Avi Basnet Date: Tue, 9 Dec 2025 11:02:37 +0000 Subject: [PATCH 5/6] working version --- nemo_curator_semantic_dedup/Dockerfile | 4 + .../image_dedup_example.py | 101 ++++++++++++++++++ nemo_curator_semantic_dedup/job.yaml | 28 +++-- 3 files changed, 126 insertions(+), 7 deletions(-) diff --git a/nemo_curator_semantic_dedup/Dockerfile b/nemo_curator_semantic_dedup/Dockerfile index 200ef69..4d4b88c 100644 --- a/nemo_curator_semantic_dedup/Dockerfile +++ b/nemo_curator_semantic_dedup/Dockerfile @@ -42,6 +42,10 @@ snapshot_download('openai/clip-vit-large-patch14', local_dir=model_dir)" # Set environment variable for model directory ENV MODEL_DIR=/home/ray/model_weights +# Required by cosmos_xenna (NeMo Curator backend) - must be set before Ray cluster starts +# This allows cosmos_xenna to manage GPU allocation instead of Ray's default CUDA_VISIBLE_DEVICES handling +ENV RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES=0 + # Download and prepare the example dataset from HuggingFace # Downloads MS COCO parquet, deduplicates URLs, and truncates to 100k rows RUN mkdir -p /home/ray/data && \ diff --git a/nemo_curator_semantic_dedup/image_dedup_example.py b/nemo_curator_semantic_dedup/image_dedup_example.py index 077b94e..c5850c0 100644 --- a/nemo_curator_semantic_dedup/image_dedup_example.py +++ b/nemo_curator_semantic_dedup/image_dedup_example.py @@ -19,6 +19,62 @@ import ray from helper import download_webdataset + +def wait_for_workers(min_cpus: int = 1, min_gpus: int = 1, timeout: int = 600, poll_interval: int = 10, stability_checks: int = 3): + """Wait for Ray cluster to have minimum required resources with stability verification. + + Args: + min_cpus: Minimum CPUs required + min_gpus: Minimum GPUs required + timeout: Maximum seconds to wait + poll_interval: Seconds between polls + stability_checks: Number of consecutive successful checks required + """ + # Connect to Ray cluster + if not ray.is_initialized(): + ray.init(address="auto", ignore_reinit_error=True) + + print(f"Waiting for cluster resources (min {min_cpus} CPUs, {min_gpus} GPUs)...") + start_time = time.time() + consecutive_success = 0 + + while True: + elapsed = time.time() - start_time + if elapsed > timeout: + raise TimeoutError(f"Cluster did not reach required resources within {timeout}s") + + try: + resources = ray.cluster_resources() + cpus = resources.get("CPU", 0) + gpus = resources.get("GPU", 0) + + print(f" [{elapsed:.0f}s] Available: {cpus:.0f} CPUs, {gpus:.0f} GPUs (check {consecutive_success + 1}/{stability_checks})") + + if cpus >= min_cpus and gpus >= min_gpus: + consecutive_success += 1 + if consecutive_success >= stability_checks: + print(f"✓ Cluster stable with {cpus:.0f} CPUs and {gpus:.0f} GPUs") + # Log full resources for debugging + print(f" Full resources: {resources}") + # Add delay to ensure Ray GCS state is fully propagated + print(" Waiting 5s for resource state to stabilize...") + time.sleep(5) + # Verify one more time + final_resources = ray.cluster_resources() + if "CPU" not in final_resources: + print(f" WARNING: CPU key missing after delay! Resources: {final_resources}") + consecutive_success = 0 + continue + print(f" Final verification: {final_resources.get('CPU', 0):.0f} CPUs, {final_resources.get('GPU', 0):.0f} GPUs") + return + else: + consecutive_success = 0 + except Exception as e: + print(f" [{elapsed:.0f}s] Waiting for cluster... ({e})") + consecutive_success = 0 + + time.sleep(poll_interval) + from nemo_curator.core.client import RayClient from nemo_curator.pipeline import Pipeline from nemo_curator.stages.deduplication.semantic import SemanticDeduplicationWorkflow @@ -127,6 +183,12 @@ def main(args: argparse.Namespace) -> None: ray_client = RayClient() ray_client.start() + # Wait for all cluster nodes to be ready (head + workers) + # Read expected resources from environment or use defaults + expected_cpus = int(os.environ.get("EXPECTED_CPUS", "4")) + expected_gpus = int(os.environ.get("EXPECTED_GPUS", "1")) + wait_for_workers(min_cpus=expected_cpus, min_gpus=expected_gpus, timeout=600, poll_interval=5, stability_checks=3) + print("Starting image curation pipeline...") print(f"Input parquet file: {args.input_parquet}") print(f"Input webdataset directory: {args.input_wds_dataset_dir}") @@ -164,6 +226,45 @@ def main(args: argparse.Namespace) -> None: # Step 2: Create and run curation pipelines # Step 2.1: Create image embedding pipeline print("Step 2.1: Running image embedding pipeline...") + + # Re-check cluster resources before running GPU pipeline + # This ensures workers are still connected after the download phase + # Use aggressive checking: 1s intervals, 5 consecutive checks required + print("Verifying cluster resources before GPU pipeline...") + wait_for_workers(min_cpus=expected_cpus, min_gpus=expected_gpus, timeout=300, poll_interval=1, stability_checks=5) + + # Extra verification: Query ray.nodes() to ensure node info is available + print("Verifying Ray nodes...") + nodes = ray.nodes() + print(f" Found {len(nodes)} Ray nodes:") + for node in nodes: + node_resources = node.get("Resources", {}) + alive = node.get("Alive", False) + print(f" - Node {node.get('NodeID', 'unknown')[:8]}: Alive={alive}, CPUs={node_resources.get('CPU', 0)}, GPUs={node_resources.get('GPU', 0)}") + + # Check both cluster_resources and available_resources + # cosmos_xenna might use available_resources which could be different + print("\nResource comparison:") + cluster_res = ray.cluster_resources() + avail_res = ray.available_resources() + print(f" cluster_resources(): CPU={cluster_res.get('CPU', 'MISSING')}, GPU={cluster_res.get('GPU', 'MISSING')}") + print(f" available_resources(): CPU={avail_res.get('CPU', 'MISSING')}, GPU={avail_res.get('GPU', 'MISSING')}") + + # Wait for resources to stabilize before cosmos_xenna runs + print("\nWaiting 10s for Ray state to fully stabilize before cosmos_xenna...") + time.sleep(10) + + # Final check right before pipeline + print("Final resource check:") + cluster_res = ray.cluster_resources() + avail_res = ray.available_resources() + print(f" cluster_resources(): CPU={cluster_res.get('CPU', 'MISSING')}, GPU={cluster_res.get('GPU', 'MISSING')}") + print(f" available_resources(): CPU={avail_res.get('CPU', 'MISSING')}, GPU={avail_res.get('GPU', 'MISSING')}") + + if 'CPU' not in cluster_res or 'GPU' not in cluster_res: + print("WARNING: cluster_resources missing CPU or GPU key!") + print(f" Full cluster_resources: {cluster_res}") + start_time = time.time() pipeline = create_image_embedding_pipeline(args) print(pipeline.describe()) diff --git a/nemo_curator_semantic_dedup/job.yaml b/nemo_curator_semantic_dedup/job.yaml index 23b3ab6..80d5e85 100644 --- a/nemo_curator_semantic_dedup/job.yaml +++ b/nemo_curator_semantic_dedup/job.yaml @@ -6,12 +6,18 @@ name: nemo-curator-image-dedup # Build custom image with NeMo Curator CUDA dependencies containerfile: ./Dockerfile -# Compute configuration with L40S GPU for CUDA-accelerated image processing +# Compute configuration with L4 GPU for CUDA-accelerated image processing +# Head + worker nodes for distributed processing compute_config: head_node: - instance_type: g6.8xlarge # 1x L40S GPU, 32 vCPUs, 128GB RAM (AWS) - # For GCP, use: g2-standard-32 (1x L4 GPU) - worker_nodes: [] # Run entirely on head node + instance_type: g6.8xlarge # 1x L4 GPU, 32 vCPUs, 128GB RAM + # Ensure Ray reports CPU resources on the head node for cosmos_xenna + resources: + CPU: 32 + worker_nodes: + - instance_type: g6.8xlarge # 1x L4 GPU per worker + min_nodes: 2 + max_nodes: 2 # Working directory - upload only the example code, not data working_dir: . @@ -46,15 +52,23 @@ env_vars: DOWNLOAD_PROCESSES: "8" ENTRIES_PER_TAR: "1000" - # Set to "true" to skip downloading (use existing WebDataset) - # WebDataset already exists from previous run - SKIP_DOWNLOAD: "false" + + SKIP_DOWNLOAD: "false" # Always keep false # Ray memory settings to avoid OOM RAY_DEFAULT_OBJECT_STORE_MEMORY_PROPORTION: "0.5" # Increase Ray API server limit for cosmos_xenna monitoring RAY_MAX_LIMIT_FROM_API_SERVER: "100000" + + # Required by cosmos_xenna (NeMo Curator backend) - must be set before Ray starts + # This allows cosmos_xenna to manage GPU allocation instead of Ray + RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES: "0" + + # Expected cluster resources (head + 1 worker) + # With 1 head (32 CPUs, 1 GPU) + 1 worker (32 CPUs, 1 GPU) = 64 CPUs, 2 GPUs + EXPECTED_CPUS: "60" + EXPECTED_GPUS: "2" # The entrypoint script entrypoint: python image_dedup_example.py From c22c1da6641a4b1236d33e352da0309f8ee96142 Mon Sep 17 00:00:00 2001 From: Avi Basnet Date: Tue, 9 Dec 2025 19:59:05 +0000 Subject: [PATCH 6/6] minimal working version --- nemo_curator_semantic_dedup/Dockerfile | 4 - .../image_dedup_example.py | 101 ------------------ nemo_curator_semantic_dedup/job.yaml | 9 -- 3 files changed, 114 deletions(-) diff --git a/nemo_curator_semantic_dedup/Dockerfile b/nemo_curator_semantic_dedup/Dockerfile index 4d4b88c..200ef69 100644 --- a/nemo_curator_semantic_dedup/Dockerfile +++ b/nemo_curator_semantic_dedup/Dockerfile @@ -42,10 +42,6 @@ snapshot_download('openai/clip-vit-large-patch14', local_dir=model_dir)" # Set environment variable for model directory ENV MODEL_DIR=/home/ray/model_weights -# Required by cosmos_xenna (NeMo Curator backend) - must be set before Ray cluster starts -# This allows cosmos_xenna to manage GPU allocation instead of Ray's default CUDA_VISIBLE_DEVICES handling -ENV RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES=0 - # Download and prepare the example dataset from HuggingFace # Downloads MS COCO parquet, deduplicates URLs, and truncates to 100k rows RUN mkdir -p /home/ray/data && \ diff --git a/nemo_curator_semantic_dedup/image_dedup_example.py b/nemo_curator_semantic_dedup/image_dedup_example.py index c5850c0..077b94e 100644 --- a/nemo_curator_semantic_dedup/image_dedup_example.py +++ b/nemo_curator_semantic_dedup/image_dedup_example.py @@ -19,62 +19,6 @@ import ray from helper import download_webdataset - -def wait_for_workers(min_cpus: int = 1, min_gpus: int = 1, timeout: int = 600, poll_interval: int = 10, stability_checks: int = 3): - """Wait for Ray cluster to have minimum required resources with stability verification. - - Args: - min_cpus: Minimum CPUs required - min_gpus: Minimum GPUs required - timeout: Maximum seconds to wait - poll_interval: Seconds between polls - stability_checks: Number of consecutive successful checks required - """ - # Connect to Ray cluster - if not ray.is_initialized(): - ray.init(address="auto", ignore_reinit_error=True) - - print(f"Waiting for cluster resources (min {min_cpus} CPUs, {min_gpus} GPUs)...") - start_time = time.time() - consecutive_success = 0 - - while True: - elapsed = time.time() - start_time - if elapsed > timeout: - raise TimeoutError(f"Cluster did not reach required resources within {timeout}s") - - try: - resources = ray.cluster_resources() - cpus = resources.get("CPU", 0) - gpus = resources.get("GPU", 0) - - print(f" [{elapsed:.0f}s] Available: {cpus:.0f} CPUs, {gpus:.0f} GPUs (check {consecutive_success + 1}/{stability_checks})") - - if cpus >= min_cpus and gpus >= min_gpus: - consecutive_success += 1 - if consecutive_success >= stability_checks: - print(f"✓ Cluster stable with {cpus:.0f} CPUs and {gpus:.0f} GPUs") - # Log full resources for debugging - print(f" Full resources: {resources}") - # Add delay to ensure Ray GCS state is fully propagated - print(" Waiting 5s for resource state to stabilize...") - time.sleep(5) - # Verify one more time - final_resources = ray.cluster_resources() - if "CPU" not in final_resources: - print(f" WARNING: CPU key missing after delay! Resources: {final_resources}") - consecutive_success = 0 - continue - print(f" Final verification: {final_resources.get('CPU', 0):.0f} CPUs, {final_resources.get('GPU', 0):.0f} GPUs") - return - else: - consecutive_success = 0 - except Exception as e: - print(f" [{elapsed:.0f}s] Waiting for cluster... ({e})") - consecutive_success = 0 - - time.sleep(poll_interval) - from nemo_curator.core.client import RayClient from nemo_curator.pipeline import Pipeline from nemo_curator.stages.deduplication.semantic import SemanticDeduplicationWorkflow @@ -183,12 +127,6 @@ def main(args: argparse.Namespace) -> None: ray_client = RayClient() ray_client.start() - # Wait for all cluster nodes to be ready (head + workers) - # Read expected resources from environment or use defaults - expected_cpus = int(os.environ.get("EXPECTED_CPUS", "4")) - expected_gpus = int(os.environ.get("EXPECTED_GPUS", "1")) - wait_for_workers(min_cpus=expected_cpus, min_gpus=expected_gpus, timeout=600, poll_interval=5, stability_checks=3) - print("Starting image curation pipeline...") print(f"Input parquet file: {args.input_parquet}") print(f"Input webdataset directory: {args.input_wds_dataset_dir}") @@ -226,45 +164,6 @@ def main(args: argparse.Namespace) -> None: # Step 2: Create and run curation pipelines # Step 2.1: Create image embedding pipeline print("Step 2.1: Running image embedding pipeline...") - - # Re-check cluster resources before running GPU pipeline - # This ensures workers are still connected after the download phase - # Use aggressive checking: 1s intervals, 5 consecutive checks required - print("Verifying cluster resources before GPU pipeline...") - wait_for_workers(min_cpus=expected_cpus, min_gpus=expected_gpus, timeout=300, poll_interval=1, stability_checks=5) - - # Extra verification: Query ray.nodes() to ensure node info is available - print("Verifying Ray nodes...") - nodes = ray.nodes() - print(f" Found {len(nodes)} Ray nodes:") - for node in nodes: - node_resources = node.get("Resources", {}) - alive = node.get("Alive", False) - print(f" - Node {node.get('NodeID', 'unknown')[:8]}: Alive={alive}, CPUs={node_resources.get('CPU', 0)}, GPUs={node_resources.get('GPU', 0)}") - - # Check both cluster_resources and available_resources - # cosmos_xenna might use available_resources which could be different - print("\nResource comparison:") - cluster_res = ray.cluster_resources() - avail_res = ray.available_resources() - print(f" cluster_resources(): CPU={cluster_res.get('CPU', 'MISSING')}, GPU={cluster_res.get('GPU', 'MISSING')}") - print(f" available_resources(): CPU={avail_res.get('CPU', 'MISSING')}, GPU={avail_res.get('GPU', 'MISSING')}") - - # Wait for resources to stabilize before cosmos_xenna runs - print("\nWaiting 10s for Ray state to fully stabilize before cosmos_xenna...") - time.sleep(10) - - # Final check right before pipeline - print("Final resource check:") - cluster_res = ray.cluster_resources() - avail_res = ray.available_resources() - print(f" cluster_resources(): CPU={cluster_res.get('CPU', 'MISSING')}, GPU={cluster_res.get('GPU', 'MISSING')}") - print(f" available_resources(): CPU={avail_res.get('CPU', 'MISSING')}, GPU={avail_res.get('GPU', 'MISSING')}") - - if 'CPU' not in cluster_res or 'GPU' not in cluster_res: - print("WARNING: cluster_resources missing CPU or GPU key!") - print(f" Full cluster_resources: {cluster_res}") - start_time = time.time() pipeline = create_image_embedding_pipeline(args) print(pipeline.describe()) diff --git a/nemo_curator_semantic_dedup/job.yaml b/nemo_curator_semantic_dedup/job.yaml index 80d5e85..1cca312 100644 --- a/nemo_curator_semantic_dedup/job.yaml +++ b/nemo_curator_semantic_dedup/job.yaml @@ -60,15 +60,6 @@ env_vars: # Increase Ray API server limit for cosmos_xenna monitoring RAY_MAX_LIMIT_FROM_API_SERVER: "100000" - - # Required by cosmos_xenna (NeMo Curator backend) - must be set before Ray starts - # This allows cosmos_xenna to manage GPU allocation instead of Ray - RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES: "0" - - # Expected cluster resources (head + 1 worker) - # With 1 head (32 CPUs, 1 GPU) + 1 worker (32 CPUs, 1 GPU) = 64 CPUs, 2 GPUs - EXPECTED_CPUS: "60" - EXPECTED_GPUS: "2" # The entrypoint script entrypoint: python image_dedup_example.py