3232import io .pixelsdb .pixels .common .physical .StorageFactory ;
3333import io .pixelsdb .pixels .common .retina .RetinaService ;
3434import io .pixelsdb .pixels .common .utils .ConfigFactory ;
35- import io .pixelsdb .pixels .common .utils .DateUtil ;
35+ import io .pixelsdb .pixels .common .utils .NetUtils ;
36+ import io .pixelsdb .pixels .common .utils .PixelsFileNameUtils ;
3637import io .pixelsdb .pixels .core .compactor .CompactLayout ;
3738import io .pixelsdb .pixels .core .compactor .PixelsCompactor ;
3839import net .sourceforge .argparse4j .inf .Namespace ;
3940
4041import java .io .IOException ;
4142import java .util .ArrayList ;
4243import java .util .Iterator ;
44+ import java .util .LinkedHashMap ;
4345import java .util .List ;
46+ import java .util .Map ;
4447import java .util .concurrent .ConcurrentLinkedQueue ;
4548import java .util .concurrent .ExecutorService ;
4649import java .util .concurrent .Executors ;
@@ -114,16 +117,52 @@ public void execute(Namespace ns, String command) throws Exception
114117 }
115118 }
116119
120+ // Issue #1305: obtain local hostname for the unified compact file naming.
121+ String hostName = NetUtils .getLocalHostName ();
122+
123+ /**
124+ * Issue #1305:
125+ * Group files by virtualNodeId before compaction.
126+ * GC-eligible files (retina/ordered/compact) carry a real virtualNodeId and must
127+ * not be mixed across groups to preserve create_ts/delete_ts monotonicity.
128+ * single-type files fall into the VNODE_ID_NONE (-1) bucket and are compacted freely.
129+ * copy-type files and unrecognised-format files are skipped.
130+ */
131+ Map <Integer , List <Status >> groupedStatuses = new LinkedHashMap <>();
132+ for (Status status : statuses )
133+ {
134+ String path = status .getPath ();
135+ PixelsFileNameUtils .PxlFileType fileType = PixelsFileNameUtils .extractFileType (path );
136+ if (fileType == null )
137+ {
138+ System .err .println ("Skipping file with unrecognized naming format: " + path );
139+ continue ;
140+ }
141+ if (fileType == PixelsFileNameUtils .PxlFileType .COPY )
142+ {
143+ System .err .println ("Skipping copy file (test/benchmark data, not for compaction): " + path );
144+ continue ;
145+ }
146+ int vNodeId = PixelsFileNameUtils .extractVirtualNodeId (path );
147+ groupedStatuses .computeIfAbsent (vNodeId , k -> new ArrayList <>()).add (status );
148+ }
149+
117150 List <Path > targetPaths = layout .getCompactPaths ();
118151 ConcurrentLinkedQueue <File > compactFiles = new ConcurrentLinkedQueue <>();
119152 ConcurrentLinkedQueue <Path > compactPaths = new ConcurrentLinkedQueue <>();
120153 int targetPathId = 0 ;
121154
122- // compact
155+ // Issue #1305: iterate over each virtualNodeId group independently to preserve
156+ // the monotonicity invariants required by Storage GC.
123157 long startTime = System .currentTimeMillis ();
124- for (int i = 0 , thdId = 0 ; i < statuses .size (); i += numRowGroupInBlock , ++thdId )
158+ int thdId = 0 ;
159+ for (Map .Entry <Integer , List <Status >> entry : groupedStatuses .entrySet ())
125160 {
126- if (i + numRowGroupInBlock > statuses .size ())
161+ int vNodeId = entry .getKey ();
162+ List <Status > groupStatuses = entry .getValue ();
163+ int groupSize = groupStatuses .size ();
164+
165+ for (int i = 0 ; i < groupSize ; ++thdId )
127166 {
128167 /**
129168 * Issue #160:
@@ -136,34 +175,38 @@ public void execute(Namespace ns, String command) throws Exception
136175 * and rebuild a pure compactLayout for the tail files as the
137176 * compactLayout in metadata does not work for the tail files.
138177 */
139- numRowGroupInBlock = statuses .size () - i ;
140- compactLayout = CompactLayout .buildPure (numRowGroupInBlock , numColumn );
141- }
178+ // Issue #1305: use local batchSize/batchLayout instead of mutating
179+ // numRowGroupInBlock/compactLayout, so they stay unchanged across batches and groups.
180+ int batchSize = Math .min (numRowGroupInBlock , groupSize - i );
181+ CompactLayout batchLayout = (batchSize < numRowGroupInBlock )
182+ ? CompactLayout .buildPure (batchSize , numColumn )
183+ : compactLayout ;
142184
143- List <String > sourcePaths = new ArrayList <>();
144- for (int j = 0 ; j < numRowGroupInBlock ; ++j )
145- {
146- if (!statuses .get (i +j ).getPath ().endsWith ("/" ))
185+ List <String > sourcePaths = new ArrayList <>();
186+ for (int j = 0 ; j < batchSize ; ++j )
147187 {
148- sourcePaths .add (statuses .get (i + j ).getPath ());
188+ if (!groupStatuses .get (i + j ).getPath ().endsWith ("/" ))
189+ {
190+ sourcePaths .add (groupStatuses .get (i + j ).getPath ());
191+ }
149192 }
150- }
151193
152- Path targetPath = targetPaths .get (targetPathId ++);
153- String targetDirPath = targetPath .getUri ();
154- targetPathId %= targetPaths .size ();
155- if (!targetDirPath .endsWith ("/" ))
156- {
157- targetDirPath += "/" ;
158- }
159- String targetFileName = DateUtil .getCurTime () + "_compact.pxl" ;
160- String targetFilePath = targetDirPath + targetFileName ;
194+ Path targetPath = targetPaths .get (targetPathId ++);
195+ String targetDirPath = targetPath .getUri ();
196+ targetPathId %= targetPaths .size ();
197+ if (!targetDirPath .endsWith ("/" ))
198+ {
199+ targetDirPath += "/" ;
200+ }
201+ // Issue #1305: use unified naming format (hostName + vNodeId) instead of DateUtil timestamp only.
202+ String targetFileName = PixelsFileNameUtils .buildCompactFileName (hostName , vNodeId );
203+ String targetFilePath = targetDirPath + targetFileName ;
161204
162- System .out .println ("(" + thdId + ") " + sourcePaths .size () +
163- " ordered files to be compacted into '" + targetFilePath + "'." );
205+ System .out .println ("(" + thdId + ") vNodeId=" + vNodeId + ", " + sourcePaths .size () +
206+ " ordered files to be compacted into '" + targetFilePath + "'." );
164207
165- PixelsCompactor .Builder compactorBuilder = PixelsCompactor .newBuilder ()
166- .setSourcePaths (sourcePaths )
208+ PixelsCompactor .Builder compactorBuilder = PixelsCompactor .newBuilder ()
209+ .setSourcePaths (sourcePaths )
167210 /**
168211 * Issue #192:
169212 * No need to deep copy compactLayout as it is never modified in-place
@@ -173,40 +216,46 @@ public void execute(Namespace ns, String command) throws Exception
173216 *
174217 * Deep copy it if it is in-place modified in the future.
175218 */
176- .setCompactLayout (compactLayout )
177- .setInputStorage (orderStorage )
178- .setOutputStorage (compactStorage )
179- .setPath (targetFilePath )
180- .setBlockSize (blockSize )
181- .setReplication (replication )
182- .setBlockPadding (false )
183- .setHasHiddenColumn (true );
184-
185- long threadStart = System .currentTimeMillis ();
186- compactExecutor .execute (() -> {
187- // Issue #192: run compaction in threads.
188- try
189- {
190- // build() spends some time to read file footers and should be called inside sub-thread.
191- PixelsCompactor pixelsCompactor = compactorBuilder .build ();
192- pixelsCompactor .compact ();
193- pixelsCompactor .close ();
194- File compactFile = new File ();
195- compactFile .setName (targetFileName );
196- compactFile .setType (File .Type .REGULAR );
197- compactFile .setNumRowGroup (pixelsCompactor .getNumRowGroup ());
198- compactFile .setPathId (targetPath .getId ());
199- compactFiles .offer (compactFile );
200- compactPaths .offer (targetPath );
201- } catch (IOException e )
202- {
203- System .err .println ("write compact file '" + targetFilePath + "' failed" );
204- e .printStackTrace ();
205- return ;
206- }
207- System .out .println ("Compact file '" + targetFilePath + "' is built in " +
208- ((System .currentTimeMillis () - threadStart ) / 1000.0 ) + "s" );
209- });
219+ .setCompactLayout (batchLayout )
220+ .setInputStorage (orderStorage )
221+ .setOutputStorage (compactStorage )
222+ .setPath (targetFilePath )
223+ .setBlockSize (blockSize )
224+ .setReplication (replication )
225+ .setBlockPadding (false )
226+ .setHasHiddenColumn (true );
227+
228+ final String finalTargetFileName = targetFileName ;
229+ final String finalTargetFilePath = targetFilePath ;
230+ final Path finalTargetPath = targetPath ;
231+ long threadStart = System .currentTimeMillis ();
232+
233+ compactExecutor .execute (() -> {
234+ // Issue #192: run compaction in threads.
235+ try
236+ {
237+ PixelsCompactor pixelsCompactor = compactorBuilder .build ();
238+ pixelsCompactor .compact ();
239+ pixelsCompactor .close ();
240+ File compactFile = new File ();
241+ compactFile .setName (finalTargetFileName );
242+ compactFile .setType (File .Type .REGULAR );
243+ compactFile .setNumRowGroup (pixelsCompactor .getNumRowGroup ());
244+ compactFile .setPathId (finalTargetPath .getId ());
245+ compactFiles .offer (compactFile );
246+ compactPaths .offer (finalTargetPath );
247+ } catch (IOException e )
248+ {
249+ System .err .println ("write compact file '" + finalTargetFilePath + "' failed" );
250+ e .printStackTrace ();
251+ return ;
252+ }
253+ System .out .println ("Compact file '" + finalTargetFilePath + "' is built in " +
254+ ((System .currentTimeMillis () - threadStart ) / 1000.0 ) + "s" );
255+ });
256+
257+ i += batchSize ;
258+ }
210259 }
211260
212261 // Issue #192: wait for the compaction to complete.
0 commit comments