diff --git a/README.md b/README.md index 9b94b84..bc6c6e8 100644 --- a/README.md +++ b/README.md @@ -59,10 +59,10 @@ GPQ is a concurrency safe, embeddable priority queue that can be used in a varie ## Benchmarks Due to the fact that most operations are done in constant time `O(1)` or logarithmic time `O(log n)`, with the exception of the prioritize function which happens in linear time `O(n)`, all GPQ operations are extremely fast. A single GPQ can handle a few million transactions a second and can be tuned depending on your work load. I have included some basic benchmarks using C++, Rust, Zig, Python, and Go to measure GPQ's performance against the standard implementations of other languages that can be found here at: [pq-bench](https://github.com/JustinTimperio/pq-bench) -| | | -|-------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------| -| ![Time-Spent](https://github.com/JustinTimperio/pq-bench/blob/master/docs/Time-Spent-vs-Implementation.png) | ![Queue-Speed-WITHOUT-Reprioritize](./docs/Queue-Speed-Without-Prioritize.png) | -| ![Queue-Speed-WITH-Reprioritize](./docs/Queue-Speed-With-Prioritize.png) | ![Average Time to Send and Receive VS Bucket Count](./docs/Time-to-Send-and-Receive-VS-Bucket-Count.png) | +| | | +|--------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------| +| ![Time-Spent](./docs/Real-Time-vs-Linear-Time.png) | ![Queue-Speed-WITHOUT-Reprioritize](./docs/Queue-Speed-Without-Prioritize.png) | +| ![Queue-Speed-WITH-Reprioritize](./docs/Queue-Speed-With-Prioritize.png) | ![Average Time to Send and Receive VS Bucket Count](./docs/Time-to-Send-and-Receive-VS-Bucket-Count.png) | ## Usage GPQ at the core is a embeddable priority queue meant to be used at the core of critical workloads that require complex queueing and delivery order guarantees. The best way to use it is just to import it. @@ -77,7 +77,7 @@ For this you will need Go >= `1.22` and gpq itself uses [hashmap](https://github ### API Reference - `NewGPQ[d any](options schema.GPQOptions) (uint, *GPQ[d], error)` - Creates a new GPQ with the specified options and returns the number of restored items, the GPQ, and an error if one occurred. - `ItemsInQueue() uint` - Returns the number of items in the queue. - - `ItemsInDB() uint` - Returns the number of items in the database. + - `ItemsInDB() (uint, error)` - Returns the number of items in the database. - `ActiveBuckets() uint` - Returns the number of active buckets. - `Enqueue(item schema.Item[d]) error` - Enqueues an item into the queue. - `EnqueueBatch(items []schema.Item[d]) error` - Enqueues a batch of items into the queue. diff --git a/disk/disk.go b/disk/disk.go index 105006c..43f33bf 100644 --- a/disk/disk.go +++ b/disk/disk.go @@ -2,6 +2,7 @@ package disk import ( "errors" + "fmt" "github.com/JustinTimperio/gpq/schema" @@ -16,7 +17,7 @@ type Disk[T any] struct { func NewDiskCache[T any](bLogger badger.Logger, options schema.GPQOptions) (*Disk[T], error) { if options.DiskCachePath == "" { - return nil, errors.New("Error creating disk cache: path is empty") + return nil, errors.New("disk cache path cannot be empty") } opts := badger.DefaultOptions(options.DiskCachePath) @@ -29,7 +30,7 @@ func NewDiskCache[T any](bLogger badger.Logger, options schema.GPQOptions) (*Dis } db, err := badger.Open(opts) if err != nil { - return nil, errors.New("Error opening disk cache: " + err.Error()) + return nil, fmt.Errorf("failed to open disk cache at %s: %w", options.DiskCachePath, err) } return &Disk[T]{diskCache: db}, nil @@ -40,10 +41,11 @@ func (d *Disk[T]) Close() error { return d.diskCache.Close() } -func (d *Disk[T]) ItemsInDB() uint { +func (d *Disk[T]) ItemsInDB() (uint, error) { var count uint - _ = d.diskCache.View(func(txn *badger.Txn) error { + err := d.diskCache.View(func(txn *badger.Txn) error { opts := badger.DefaultIteratorOptions + opts.PrefetchValues = false it := txn.NewIterator(opts) defer it.Close() @@ -53,7 +55,10 @@ func (d *Disk[T]) ItemsInDB() uint { return nil }) - return count + if err != nil { + return 0, fmt.Errorf("failed to count items in disk cache: %w", err) + } + return count, nil } func (d *Disk[T]) ProcessBatch(batch []*schema.Item[T]) error { @@ -72,7 +77,9 @@ func (d *Disk[T]) ProcessBatch(batch []*schema.Item[T]) error { return err } txn = d.diskCache.NewTransaction(true) - txn.Set(entry.DiskUUID, b) + if err := txn.Set(entry.DiskUUID, b); err != nil { + return fmt.Errorf("failed to write item after transaction split: %w", err) + } } else if err != nil { return err @@ -98,7 +105,9 @@ func (d *Disk[T]) DeleteBatch(batch []*schema.DeleteMessage) error { return err } txn = d.diskCache.NewTransaction(true) - txn.Delete(entry.DiskUUID) + if err := txn.Delete(entry.DiskUUID); err != nil { + return fmt.Errorf("failed to delete item after transaction split: %w", err) + } } else if err != nil { return err } @@ -156,11 +165,13 @@ func (d *Disk[T]) RestoreFromDisk() ([]*schema.Item[T], error) { }) if len(value) == 0 { - return errors.New("Error reading from disk cache: value is empty") + return fmt.Errorf("empty value found in disk cache for key=%x", key) } obj := new(schema.Item[T]) - obj.FromBytes(value) + if err := obj.FromBytes(value); err != nil { + return fmt.Errorf("corrupted data found in disk cache (key=%x): %w", key, err) + } obj.WasRestored = true items = append(items, obj) @@ -169,8 +180,8 @@ func (d *Disk[T]) RestoreFromDisk() ([]*schema.Item[T], error) { return nil }) if err != nil { - return nil, errors.New("Error reading from disk cache: " + err.Error()) + return nil, fmt.Errorf("failed to restore items from disk cache: %w", err) } - return items, err + return items, nil } diff --git a/docs/Queue-Speed-With-Prioritize.png b/docs/Queue-Speed-With-Prioritize.png index 45bacac..65be0e4 100644 Binary files a/docs/Queue-Speed-With-Prioritize.png and b/docs/Queue-Speed-With-Prioritize.png differ diff --git a/docs/Queue-Speed-Without-Prioritize.png b/docs/Queue-Speed-Without-Prioritize.png index 19d2d8b..9b6e8c4 100644 Binary files a/docs/Queue-Speed-Without-Prioritize.png and b/docs/Queue-Speed-Without-Prioritize.png differ diff --git a/docs/Real-Time-vs-Linear-Time.png b/docs/Real-Time-vs-Linear-Time.png new file mode 100644 index 0000000..579a81f Binary files /dev/null and b/docs/Real-Time-vs-Linear-Time.png differ diff --git a/docs/Time-to-Send-and-Receive-VS-Bucket-Count.png b/docs/Time-to-Send-and-Receive-VS-Bucket-Count.png index 5e6ec37..9d067fe 100644 Binary files a/docs/Time-to-Send-and-Receive-VS-Bucket-Count.png and b/docs/Time-to-Send-and-Receive-VS-Bucket-Count.png differ diff --git a/docs/bench-report-no-repro.csv b/docs/bench-report-no-repro.csv index 6d12e52..e766c06 100644 --- a/docs/bench-report-no-repro.csv +++ b/docs/bench-report-no-repro.csv @@ -1,201 +1,201 @@ Total Items,Buckets,Removed,Escalated,Time Elapsed,Time to Send,Time to Receive -1000000,5,0,0,0.480352,0.124661,0.355690 -1000000,10,0,0,0.456221,0.123449,0.332772 -1000000,15,0,0,0.450382,0.123358,0.327024 -1000000,20,0,0,0.401607,0.121166,0.280440 -1000000,25,0,0,0.391769,0.120379,0.271390 -1000000,30,0,0,0.378708,0.127517,0.251192 -1000000,35,0,0,0.414640,0.124497,0.290143 -1000000,40,0,0,0.361109,0.122513,0.238597 -1000000,45,0,0,0.370787,0.130278,0.240508 -1000000,50,0,0,0.386700,0.120075,0.266625 -1000000,55,0,0,0.377624,0.123339,0.254285 -1000000,60,0,0,0.363803,0.124509,0.239294 -1000000,65,0,0,0.373651,0.130343,0.243308 -1000000,70,0,0,0.358304,0.129457,0.228846 -1000000,75,0,0,0.356953,0.129481,0.227471 -1000000,80,0,0,0.372278,0.136249,0.236029 -1000000,85,0,0,0.354502,0.131480,0.223022 -1000000,90,0,0,0.355176,0.128582,0.226594 -1000000,95,0,0,0.368184,0.134018,0.234166 -1000000,100,0,0,0.367679,0.127886,0.239793 -2000000,5,0,0,1.080151,0.270605,0.809546 -2000000,10,0,0,0.992683,0.242134,0.750549 -2000000,15,0,0,0.954768,0.247791,0.706977 -2000000,20,0,0,0.977516,0.244314,0.733203 -2000000,25,0,0,0.913640,0.241858,0.671782 -2000000,30,0,0,0.895809,0.249782,0.646027 -2000000,35,0,0,0.811538,0.245463,0.566074 -2000000,40,0,0,0.820050,0.250800,0.569249 -2000000,45,0,0,0.771446,0.248302,0.523144 -2000000,50,0,0,0.815834,0.242626,0.573208 -2000000,55,0,0,0.775734,0.252302,0.523432 -2000000,60,0,0,0.759699,0.251128,0.508572 -2000000,65,0,0,0.789265,0.260698,0.528567 -2000000,70,0,0,0.755390,0.262109,0.493281 -2000000,75,0,0,0.759171,0.256032,0.503139 -2000000,80,0,0,0.798577,0.261225,0.537352 -2000000,85,0,0,0.741509,0.261255,0.480254 -2000000,90,0,0,0.734015,0.263220,0.470795 -2000000,95,0,0,0.734381,0.259012,0.475370 -2000000,100,0,0,0.785392,0.252216,0.533176 -3000000,5,0,0,1.554099,0.356306,1.197792 -3000000,10,0,0,1.542341,0.358731,1.183610 -3000000,15,0,0,1.526235,0.363726,1.162509 -3000000,20,0,0,1.493434,0.362188,1.131246 -3000000,25,0,0,1.501360,0.363820,1.137541 -3000000,30,0,0,1.438228,0.374326,1.063902 -3000000,35,0,0,1.359310,0.373921,0.985390 -3000000,40,0,0,1.279624,0.371083,0.908540 -3000000,45,0,0,1.280505,0.375016,0.905489 -3000000,50,0,0,1.308245,0.366860,0.941384 -3000000,55,0,0,1.222253,0.384495,0.837757 -3000000,60,0,0,1.265391,0.379666,0.885725 -3000000,65,0,0,1.258930,0.391462,0.867469 -3000000,70,0,0,1.239573,0.389759,0.849813 -3000000,75,0,0,1.211239,0.381575,0.829664 -3000000,80,0,0,1.215992,0.395794,0.820198 -3000000,85,0,0,1.164770,0.384545,0.780225 -3000000,90,0,0,1.182612,0.390766,0.791847 -3000000,95,0,0,1.175426,0.404903,0.770524 -3000000,100,0,0,1.228954,0.381821,0.847133 -4000000,5,0,0,2.132397,0.480412,1.651985 -4000000,10,0,0,2.128339,0.476332,1.652007 -4000000,15,0,0,2.126882,0.498357,1.628524 -4000000,20,0,0,2.127416,0.488123,1.639293 -4000000,25,0,0,2.074483,0.493737,1.580746 -4000000,30,0,0,2.022730,0.489689,1.533041 -4000000,35,0,0,1.863314,0.501425,1.361889 -4000000,40,0,0,1.860621,0.501323,1.359299 -4000000,45,0,0,1.748524,0.495773,1.252750 -4000000,50,0,0,1.861381,0.489861,1.371520 -4000000,55,0,0,1.715253,0.497953,1.217300 -4000000,60,0,0,1.764435,0.498660,1.265775 -4000000,65,0,0,1.767213,0.516821,1.250392 -4000000,70,0,0,1.738856,0.513258,1.225598 -4000000,75,0,0,1.798200,0.523680,1.274520 -4000000,80,0,0,1.734383,0.517710,1.216673 -4000000,85,0,0,1.624824,0.529243,1.095581 -4000000,90,0,0,1.628978,0.516347,1.112631 -4000000,95,0,0,1.610213,0.536970,1.073243 -4000000,100,0,0,1.679644,0.501370,1.178274 -5000000,5,0,0,2.738972,0.602219,2.136753 -5000000,10,0,0,2.716369,0.602615,2.113755 -5000000,15,0,0,2.720195,0.603787,2.116407 -5000000,20,0,0,2.698476,0.609594,2.088882 -5000000,25,0,0,2.743277,0.608929,2.134348 -5000000,30,0,0,2.641273,0.617008,2.024265 -5000000,35,0,0,2.470825,0.629333,1.841492 -5000000,40,0,0,2.417312,0.628503,1.788810 -5000000,45,0,0,2.285351,0.620693,1.664658 -5000000,50,0,0,2.405780,0.607240,1.798540 -5000000,55,0,0,2.228662,0.621444,1.607218 -5000000,60,0,0,2.303424,0.623592,1.679832 -5000000,65,0,0,2.334081,0.651338,1.682743 -5000000,70,0,0,2.319969,0.646172,1.673797 -5000000,75,0,0,2.314917,0.646436,1.668481 -5000000,80,0,0,2.225577,0.644466,1.581111 -5000000,85,0,0,2.191136,0.653857,1.537280 -5000000,90,0,0,2.155392,0.655350,1.500042 -5000000,95,0,0,2.038051,0.660276,1.377775 -5000000,100,0,0,2.196845,0.630068,1.566777 -6000000,5,0,0,3.327895,0.762550,2.565346 -6000000,10,0,0,3.298723,0.734865,2.563858 -6000000,15,0,0,3.354630,0.732273,2.622357 -6000000,20,0,0,3.286660,0.739868,2.546792 -6000000,25,0,0,3.372318,0.732436,2.639882 -6000000,30,0,0,3.259724,0.740645,2.519079 -6000000,35,0,0,3.054829,0.743409,2.311419 -6000000,40,0,0,2.973856,0.738804,2.235052 -6000000,45,0,0,2.823585,0.742369,2.081216 -6000000,50,0,0,2.859516,0.734220,2.125296 -6000000,55,0,0,2.755520,0.761987,1.993533 -6000000,60,0,0,2.983694,0.767813,2.215880 -6000000,65,0,0,2.925565,0.789701,2.135864 -6000000,70,0,0,2.909070,0.782410,2.126660 -6000000,75,0,0,2.914689,0.771284,2.143406 -6000000,80,0,0,2.796472,0.772394,2.024078 -6000000,85,0,0,2.689242,0.793284,1.895957 -6000000,90,0,0,2.664311,0.795375,1.868936 -6000000,95,0,0,2.529320,0.790332,1.738988 -6000000,100,0,0,2.675950,0.757688,1.918262 -7000000,5,0,0,3.996704,0.924851,3.071853 -7000000,10,0,0,3.898082,0.844846,3.053236 -7000000,15,0,0,3.967545,0.881168,3.086377 -7000000,20,0,0,3.932736,0.866162,3.066574 -7000000,25,0,0,4.035503,0.855663,3.179841 -7000000,30,0,0,3.853679,0.858375,2.995304 -7000000,35,0,0,3.700756,0.892373,2.808384 -7000000,40,0,0,3.593856,0.857586,2.736270 -7000000,45,0,0,3.363648,0.878067,2.485581 -7000000,50,0,0,3.437134,0.850130,2.587004 -7000000,55,0,0,3.334999,0.877300,2.457699 -7000000,60,0,0,3.505047,0.907076,2.597971 -7000000,65,0,0,3.484218,0.915663,2.568555 -7000000,70,0,0,3.564678,0.910495,2.654184 -7000000,75,0,0,3.470754,0.900429,2.570326 -7000000,80,0,0,3.385602,0.902801,2.482802 -7000000,85,0,0,3.108434,0.925589,2.182845 -7000000,90,0,0,3.219396,0.924434,2.294962 -7000000,95,0,0,3.124498,0.910731,2.213767 -7000000,100,0,0,3.177028,0.885023,2.292005 -8000000,5,0,0,4.766666,1.096688,3.669978 -8000000,10,0,0,4.513911,0.982807,3.531104 -8000000,15,0,0,4.560598,0.971304,3.589294 -8000000,20,0,0,4.612083,0.984183,3.627900 -8000000,25,0,0,4.660397,0.970289,3.690108 -8000000,30,0,0,4.621589,0.995142,3.626447 -8000000,35,0,0,4.320914,0.986778,3.334136 -8000000,40,0,0,4.246617,0.985245,3.261372 -8000000,45,0,0,4.091949,0.993188,3.098761 -8000000,50,0,0,4.125746,0.973301,3.152445 -8000000,55,0,0,3.946131,1.012447,2.933684 -8000000,60,0,0,4.003785,0.998628,3.005157 -8000000,65,0,0,4.093362,1.044400,3.048962 -8000000,70,0,0,4.121861,1.032051,3.089810 -8000000,75,0,0,4.069124,1.030064,3.039060 -8000000,80,0,0,4.064636,1.064685,2.999951 -8000000,85,0,0,3.734827,1.033935,2.700892 -8000000,90,0,0,3.785738,1.036601,2.749137 -8000000,95,0,0,3.677863,1.073646,2.604217 -8000000,100,0,0,3.874679,1.020020,2.854659 -9000000,5,0,0,5.235279,1.212156,4.023123 -9000000,10,0,0,5.083309,1.092832,3.990477 -9000000,15,0,0,5.104618,1.091969,4.012649 -9000000,20,0,0,5.237394,1.097292,4.140102 -9000000,25,0,0,5.403803,1.096931,4.306872 -9000000,30,0,0,5.105784,1.122448,3.983336 -9000000,35,0,0,4.815739,1.119876,3.695863 -9000000,40,0,0,4.855958,1.126481,3.729477 -9000000,45,0,0,4.629845,1.125421,3.504425 -9000000,50,0,0,4.619241,1.078884,3.540357 -9000000,55,0,0,4.460644,1.125915,3.334729 -9000000,60,0,0,4.554042,1.105754,3.448288 -9000000,65,0,0,4.710019,1.191131,3.518888 -9000000,70,0,0,4.715202,1.163294,3.551908 -9000000,75,0,0,4.704559,1.150023,3.554536 -9000000,80,0,0,4.710057,1.219878,3.490179 -9000000,85,0,0,4.513430,1.195610,3.317819 -9000000,90,0,0,4.398654,1.203555,3.195100 -9000000,95,0,0,4.153370,1.206831,2.946539 -9000000,100,0,0,4.138215,1.138724,2.999492 -10000000,5,0,0,5.791414,1.291686,4.499728 -10000000,10,0,0,5.724184,1.220067,4.504117 -10000000,15,0,0,5.765862,1.215561,4.550301 -10000000,20,0,0,5.850229,1.234341,4.615889 -10000000,25,0,0,5.904642,1.235793,4.668849 -10000000,30,0,0,5.771626,1.240559,4.531067 -10000000,35,0,0,5.592988,1.265301,4.327687 -10000000,40,0,0,5.453986,1.259612,4.194374 -10000000,45,0,0,5.226291,1.245989,3.980302 -10000000,50,0,0,5.254268,1.228019,4.026249 -10000000,55,0,0,5.031442,1.276297,3.755144 -10000000,60,0,0,5.079279,1.243382,3.835896 -10000000,65,0,0,5.308688,1.297778,4.010910 -10000000,70,0,0,5.352717,1.295163,4.057554 -10000000,75,0,0,5.394536,1.307947,4.086589 -10000000,80,0,0,5.174465,1.290116,3.884350 -10000000,85,0,0,5.036778,1.329402,3.707376 -10000000,90,0,0,4.893278,1.301761,3.591517 -10000000,95,0,0,4.616589,1.314193,3.302397 -10000000,100,0,0,5.059062,1.274316,3.784747 +1000000,5,0,0,0.299611,0.102242,0.197369 +1000000,10,0,0,0.272608,0.087199,0.185408 +1000000,15,0,0,0.259758,0.095315,0.164443 +1000000,20,0,0,0.260309,0.092148,0.168160 +1000000,25,0,0,0.280387,0.094975,0.185412 +1000000,30,0,0,0.257914,0.095800,0.162114 +1000000,35,0,0,0.255534,0.100729,0.154805 +1000000,40,0,0,0.256493,0.097555,0.158938 +1000000,45,0,0,0.254273,0.102262,0.152011 +1000000,50,0,0,0.277551,0.096780,0.180772 +1000000,55,0,0,0.253127,0.104084,0.149044 +1000000,60,0,0,0.260744,0.101133,0.159611 +1000000,65,0,0,0.261073,0.117157,0.143916 +1000000,70,0,0,0.259572,0.104614,0.154957 +1000000,75,0,0,0.282809,0.105830,0.176978 +1000000,80,0,0,0.262717,0.104822,0.157895 +1000000,85,0,0,0.250385,0.109471,0.140913 +1000000,90,0,0,0.260973,0.107594,0.153379 +1000000,95,0,0,0.254907,0.109761,0.145146 +1000000,100,0,0,0.289890,0.104555,0.185335 +2000000,5,0,0,0.609668,0.177877,0.431791 +2000000,10,0,0,0.594733,0.177587,0.417146 +2000000,15,0,0,0.546340,0.186682,0.359658 +2000000,20,0,0,0.551118,0.187527,0.363591 +2000000,25,0,0,0.581696,0.189962,0.391734 +2000000,30,0,0,0.533315,0.192504,0.340811 +2000000,35,0,0,0.523529,0.201488,0.322041 +2000000,40,0,0,0.542160,0.196062,0.346098 +2000000,45,0,0,0.533204,0.208643,0.324562 +2000000,50,0,0,0.585641,0.194380,0.391261 +2000000,55,0,0,0.524163,0.209812,0.314350 +2000000,60,0,0,0.549808,0.208447,0.341361 +2000000,65,0,0,0.537853,0.222556,0.315297 +2000000,70,0,0,0.541593,0.215260,0.326333 +2000000,75,0,0,0.568471,0.213114,0.355356 +2000000,80,0,0,0.541440,0.209655,0.331786 +2000000,85,0,0,0.550112,0.233570,0.316543 +2000000,90,0,0,0.558724,0.226444,0.332280 +2000000,95,0,0,0.539893,0.221556,0.318337 +2000000,100,0,0,0.598759,0.209752,0.389007 +3000000,5,0,0,0.948781,0.265301,0.683480 +3000000,10,0,0,0.929375,0.263156,0.666219 +3000000,15,0,0,0.860883,0.280835,0.580048 +3000000,20,0,0,0.892551,0.282804,0.609747 +3000000,25,0,0,0.919328,0.288656,0.630673 +3000000,30,0,0,0.838356,0.289914,0.548442 +3000000,35,0,0,0.814004,0.303397,0.510607 +3000000,40,0,0,0.844759,0.299385,0.545374 +3000000,45,0,0,0.814474,0.310257,0.504216 +3000000,50,0,0,0.899913,0.290924,0.608989 +3000000,55,0,0,0.815715,0.315817,0.499898 +3000000,60,0,0,0.827638,0.305507,0.522131 +3000000,65,0,0,0.822629,0.330428,0.492201 +3000000,70,0,0,0.823191,0.317380,0.505811 +3000000,75,0,0,0.878452,0.320445,0.558007 +3000000,80,0,0,0.847718,0.334119,0.513600 +3000000,85,0,0,0.818522,0.332896,0.485626 +3000000,90,0,0,0.827470,0.322782,0.504689 +3000000,95,0,0,0.828608,0.335430,0.493178 +3000000,100,0,0,0.896185,0.316362,0.579823 +4000000,5,0,0,1.275384,0.353573,0.921811 +4000000,10,0,0,1.259566,0.349613,0.909953 +4000000,15,0,0,1.185705,0.373355,0.812350 +4000000,20,0,0,1.229322,0.371926,0.857396 +4000000,25,0,0,1.248511,0.391796,0.856715 +4000000,30,0,0,1.155563,0.387860,0.767703 +4000000,35,0,0,1.116387,0.405818,0.710569 +4000000,40,0,0,1.132215,0.391490,0.740726 +4000000,45,0,0,1.100944,0.415700,0.685244 +4000000,50,0,0,1.224761,0.389984,0.834778 +4000000,55,0,0,1.099957,0.418699,0.681258 +4000000,60,0,0,1.127526,0.406180,0.721346 +4000000,65,0,0,1.117332,0.449128,0.668204 +4000000,70,0,0,1.124202,0.422037,0.702166 +4000000,75,0,0,1.205904,0.430433,0.775471 +4000000,80,0,0,1.129890,0.420716,0.709175 +4000000,85,0,0,1.112421,0.439064,0.673357 +4000000,90,0,0,1.141982,0.453779,0.688203 +4000000,95,0,0,1.112918,0.442525,0.670394 +4000000,100,0,0,1.243148,0.430701,0.812447 +5000000,5,0,0,1.665526,0.456298,1.209229 +5000000,10,0,0,1.641645,0.435357,1.206288 +5000000,15,0,0,1.545291,0.468748,1.076544 +5000000,20,0,0,1.602258,0.464675,1.137583 +5000000,25,0,0,1.633946,0.476872,1.157074 +5000000,30,0,0,1.469274,0.482805,0.986469 +5000000,35,0,0,1.465147,0.515793,0.949354 +5000000,40,0,0,1.513597,0.491676,1.021921 +5000000,45,0,0,1.433914,0.525108,0.908805 +5000000,50,0,0,1.563045,0.486934,1.076110 +5000000,55,0,0,1.401496,0.525712,0.875784 +5000000,60,0,0,1.439167,0.507205,0.931962 +5000000,65,0,0,1.433756,0.551929,0.881827 +5000000,70,0,0,1.418836,0.529648,0.889187 +5000000,75,0,0,1.538966,0.535782,1.003183 +5000000,80,0,0,1.490979,0.527986,0.962993 +5000000,85,0,0,1.396409,0.556990,0.839419 +5000000,90,0,0,1.431437,0.536418,0.895019 +5000000,95,0,0,1.408862,0.557475,0.851387 +5000000,100,0,0,1.551802,0.524031,1.027771 +6000000,5,0,0,1.994865,0.531572,1.463293 +6000000,10,0,0,1.981524,0.518084,1.463440 +6000000,15,0,0,1.913569,0.576628,1.336941 +6000000,20,0,0,1.981721,0.575521,1.406201 +6000000,25,0,0,2.007833,0.574681,1.433152 +6000000,30,0,0,1.862922,0.593602,1.269320 +6000000,35,0,0,1.825437,0.607342,1.218095 +6000000,40,0,0,1.847371,0.587649,1.259721 +6000000,45,0,0,1.777238,0.625635,1.151603 +6000000,50,0,0,1.902538,0.596186,1.306351 +6000000,55,0,0,1.747671,0.630350,1.117321 +6000000,60,0,0,1.777941,0.609656,1.168285 +6000000,65,0,0,1.761853,0.669869,1.091983 +6000000,70,0,0,1.747498,0.649966,1.097532 +6000000,75,0,0,1.823992,0.653060,1.170932 +6000000,80,0,0,1.719642,0.633770,1.085871 +6000000,85,0,0,1.720246,0.657722,1.062524 +6000000,90,0,0,1.747043,0.663781,1.083262 +6000000,95,0,0,1.723295,0.666883,1.056412 +6000000,100,0,0,1.853117,0.626701,1.226417 +7000000,5,0,0,2.359387,0.637331,1.722056 +7000000,10,0,0,2.343500,0.614716,1.728783 +7000000,15,0,0,2.299718,0.656829,1.642890 +7000000,20,0,0,2.268033,0.666258,1.601775 +7000000,25,0,0,2.398852,0.673614,1.725239 +7000000,30,0,0,2.209104,0.674182,1.534922 +7000000,35,0,0,2.185483,0.726943,1.458540 +7000000,40,0,0,2.121128,0.686498,1.434630 +7000000,45,0,0,2.101207,0.734819,1.366388 +7000000,50,0,0,2.258733,0.693813,1.564920 +7000000,55,0,0,2.051698,0.734172,1.317526 +7000000,60,0,0,2.088576,0.716862,1.371713 +7000000,65,0,0,2.044326,0.776927,1.267399 +7000000,70,0,0,2.050183,0.737032,1.313151 +7000000,75,0,0,2.194141,0.745370,1.448771 +7000000,80,0,0,2.059469,0.734786,1.324683 +7000000,85,0,0,2.004764,0.767040,1.237724 +7000000,90,0,0,2.038187,0.767716,1.270472 +7000000,95,0,0,2.023765,0.775980,1.247785 +7000000,100,0,0,2.192352,0.737693,1.454659 +8000000,5,0,0,2.726530,0.710556,2.015974 +8000000,10,0,0,2.698604,0.693520,2.005084 +8000000,15,0,0,2.607604,0.744898,1.862705 +8000000,20,0,0,2.638865,0.756051,1.882814 +8000000,25,0,0,2.708467,0.765876,1.942591 +8000000,30,0,0,2.567731,0.781442,1.786289 +8000000,35,0,0,2.525473,0.806957,1.718515 +8000000,40,0,0,2.584270,0.799242,1.785028 +8000000,45,0,0,2.403296,0.828001,1.575295 +8000000,50,0,0,2.626148,0.790325,1.835822 +8000000,55,0,0,2.334984,0.843020,1.491964 +8000000,60,0,0,2.444903,0.809100,1.635804 +8000000,65,0,0,2.368305,0.876018,1.492287 +8000000,70,0,0,2.337441,0.854867,1.482574 +8000000,75,0,0,2.528316,0.860259,1.668057 +8000000,80,0,0,2.418564,0.859025,1.559539 +8000000,85,0,0,2.317383,0.888274,1.429109 +8000000,90,0,0,2.305787,0.854597,1.451190 +8000000,95,0,0,2.323595,0.892250,1.431345 +8000000,100,0,0,2.512742,0.837625,1.675117 +9000000,5,0,0,3.088118,0.801795,2.286323 +9000000,10,0,0,3.016664,0.773738,2.242926 +9000000,15,0,0,2.962284,0.848278,2.114006 +9000000,20,0,0,3.038835,0.840462,2.198373 +9000000,25,0,0,3.132462,0.860516,2.271946 +9000000,30,0,0,2.935913,0.869216,2.066697 +9000000,35,0,0,2.850381,0.909132,1.941249 +9000000,40,0,0,2.821658,0.882031,1.939627 +9000000,45,0,0,2.740740,0.926575,1.814165 +9000000,50,0,0,2.992404,0.889602,2.102802 +9000000,55,0,0,2.657494,0.947951,1.709543 +9000000,60,0,0,2.755548,0.920515,1.835033 +9000000,65,0,0,2.697776,0.994813,1.702962 +9000000,70,0,0,2.735703,0.958497,1.777206 +9000000,75,0,0,2.878154,0.964515,1.913638 +9000000,80,0,0,2.701406,0.948898,1.752508 +9000000,85,0,0,2.627427,0.986885,1.640542 +9000000,90,0,0,2.692035,0.968686,1.723349 +9000000,95,0,0,2.628978,1.001966,1.627012 +9000000,100,0,0,2.911288,0.946697,1.964591 +10000000,5,0,0,3.373120,0.894031,2.479089 +10000000,10,0,0,3.343354,0.859347,2.484007 +10000000,15,0,0,3.254257,0.935829,2.318427 +10000000,20,0,0,3.367681,0.930169,2.437512 +10000000,25,0,0,3.329759,0.957379,2.372380 +10000000,30,0,0,3.183146,0.961188,2.221958 +10000000,35,0,0,3.089384,1.005245,2.084139 +10000000,40,0,0,3.269105,0.993396,2.275709 +10000000,45,0,0,3.125804,1.032544,2.093261 +10000000,50,0,0,3.288299,0.983132,2.305166 +10000000,55,0,0,3.055545,1.045616,2.009929 +10000000,60,0,0,3.114188,1.018425,2.095763 +10000000,65,0,0,3.031843,1.111115,1.920729 +10000000,70,0,0,2.989296,1.050623,1.938673 +10000000,75,0,0,3.142924,1.071917,2.071007 +10000000,80,0,0,3.025720,1.066684,1.959036 +10000000,85,0,0,2.929269,1.091306,1.837963 +10000000,90,0,0,2.949445,1.080172,1.869273 +10000000,95,0,0,2.931059,1.101077,1.829982 +10000000,100,0,0,3.207558,1.056457,2.151101 diff --git a/docs/bench-report-repro.csv b/docs/bench-report-repro.csv index e85302b..a3cdf88 100644 --- a/docs/bench-report-repro.csv +++ b/docs/bench-report-repro.csv @@ -1,201 +1,201 @@ Total Items,Buckets,Removed,Escalated,Time Elapsed,Time to Send,Time to Receive -1000000,5,0,0,0.614072,0.252223,0.361849 -1000000,10,0,0,0.487301,0.166346,0.320954 -1000000,15,0,0,0.465424,0.161509,0.303915 -1000000,20,0,0,0.429060,0.160433,0.268626 -1000000,25,0,0,0.422653,0.151694,0.270959 -1000000,30,0,0,0.412588,0.153395,0.259193 -1000000,35,0,0,0.411344,0.146609,0.264735 -1000000,40,0,0,0.372737,0.144614,0.228123 -1000000,45,0,0,0.381298,0.149845,0.231453 -1000000,50,0,0,0.421840,0.157114,0.264726 -1000000,55,0,0,0.367223,0.134796,0.232427 -1000000,60,0,0,0.373999,0.136299,0.237700 -1000000,65,0,0,0.390020,0.145616,0.244404 -1000000,70,0,0,0.364162,0.136035,0.228127 -1000000,75,0,0,0.368686,0.143142,0.225544 -1000000,80,0,0,0.377908,0.146060,0.231848 -1000000,85,0,0,0.383652,0.148589,0.235063 -1000000,90,0,0,0.376831,0.145537,0.231294 -1000000,95,0,0,0.379045,0.150784,0.228262 -1000000,100,0,0,0.381100,0.154579,0.226521 -2000000,5,0,8780,1.172073,0.389456,0.782617 -2000000,10,0,0,1.010092,0.260968,0.749124 -2000000,15,0,0,0.968993,0.257443,0.711549 -2000000,20,0,0,0.954110,0.265092,0.689018 -2000000,25,0,0,0.971114,0.265140,0.705974 -2000000,30,0,0,0.895227,0.261474,0.633753 -2000000,35,0,0,0.851089,0.282734,0.568355 -2000000,40,0,0,0.833119,0.276862,0.556257 -2000000,45,0,0,0.789324,0.271361,0.517963 -2000000,50,0,0,0.863200,0.296498,0.566703 -2000000,55,0,0,0.808548,0.285560,0.522989 -2000000,60,0,0,0.823831,0.283021,0.540809 -2000000,65,0,0,0.825488,0.302481,0.523006 -2000000,70,0,0,0.840942,0.318196,0.522745 -2000000,75,0,0,0.816378,0.308826,0.507552 -2000000,80,0,0,0.799257,0.281457,0.517800 -2000000,85,0,0,0.755920,0.262167,0.493753 -2000000,90,0,0,0.742541,0.272914,0.469628 -2000000,95,0,0,0.753119,0.275580,0.477539 -2000000,100,0,0,0.809352,0.269598,0.539754 -3000000,5,0,525147,1.897074,0.512351,1.384723 -3000000,10,0,432326,1.782897,0.448884,1.334013 -3000000,15,0,585831,1.836801,0.471124,1.365677 -3000000,20,0,700310,1.894317,0.472125,1.422191 -3000000,25,0,761735,1.947429,0.510474,1.436955 -3000000,30,0,523822,1.749668,0.441676,1.307992 -3000000,35,0,292013,1.520458,0.380246,1.140212 -3000000,40,0,194007,1.451599,0.384171,1.067428 -3000000,45,0,90511,1.322008,0.398315,0.923692 -3000000,50,0,97903,1.333236,0.365696,0.967540 -3000000,55,0,134423,1.356082,0.396705,0.959377 -3000000,60,0,174357,1.398783,0.408589,0.990194 -3000000,65,0,165502,1.356271,0.391172,0.965099 -3000000,70,0,207399,1.387111,0.403218,0.983893 -3000000,75,0,158505,1.338503,0.402033,0.936470 -3000000,80,0,117314,1.275055,0.391559,0.883496 -3000000,85,0,80291,1.230854,0.405083,0.825771 -3000000,90,0,67175,1.220653,0.414606,0.806047 -3000000,95,0,60570,1.241768,0.417398,0.824370 -3000000,100,0,59876,1.285382,0.397571,0.887811 -4000000,5,0,2052180,2.730309,0.622339,2.107969 -4000000,10,0,1386031,2.485446,0.505215,1.980231 -4000000,15,0,1665887,2.540234,0.500777,2.039457 -4000000,20,0,1862791,2.602652,0.510908,2.091744 -4000000,25,0,1963693,2.630047,0.509664,2.120383 -4000000,30,0,1584274,2.424148,0.509078,1.915070 -4000000,35,0,1094349,2.209760,0.503182,1.706578 -4000000,40,0,923556,2.137053,0.522911,1.614142 -4000000,45,0,555130,1.956664,0.509600,1.447064 -4000000,50,0,550534,1.973188,0.520544,1.452644 -4000000,55,0,627424,1.944976,0.506746,1.438230 -4000000,60,0,806981,2.040783,0.524901,1.515882 -4000000,65,0,1011482,2.061003,0.535786,1.525217 -4000000,70,0,1170764,2.151871,0.532845,1.619026 -4000000,75,0,1233862,2.208219,0.540444,1.667775 -4000000,80,0,1268491,2.120460,0.537724,1.582736 -4000000,85,0,867423,1.932836,0.525573,1.407263 -4000000,90,0,723756,1.886115,0.538142,1.347973 -4000000,95,0,516079,1.787550,0.571406,1.216145 -4000000,100,0,324528,1.791376,0.529586,1.261790 -5000000,5,0,4013480,3.484860,0.744488,2.740372 -5000000,10,0,3723893,3.463461,0.625551,2.837910 -5000000,15,0,4392646,3.611652,0.653881,2.957771 -5000000,20,0,4432753,3.624666,0.635287,2.989379 -5000000,25,0,5158887,3.839290,0.632956,3.206334 -5000000,30,0,4327634,3.506346,0.639295,2.867051 -5000000,35,0,3280483,3.081920,0.628154,2.453766 -5000000,40,0,3261591,3.128934,0.649785,2.479149 -5000000,45,0,2203708,2.704719,0.651279,2.053440 -5000000,50,0,1829039,2.565912,0.641004,1.924908 -5000000,55,0,1919433,2.596977,0.632209,1.964768 -5000000,60,0,2568018,2.815486,0.662384,2.153103 -5000000,65,0,3082863,2.940506,0.676709,2.263797 -5000000,70,0,3296707,3.072556,0.648120,2.424436 -5000000,75,0,3646389,3.071129,0.672348,2.398780 -5000000,80,0,3241118,2.970847,0.675929,2.294918 -5000000,85,0,2377000,2.677772,0.658825,2.018947 -5000000,90,0,2398246,2.733234,0.684645,2.048589 -5000000,95,0,1738847,2.509009,0.703938,1.805071 -5000000,100,0,1130174,2.332412,0.656768,1.675644 -6000000,5,0,7874560,4.660791,0.862922,3.797869 -6000000,10,0,6538673,4.350979,0.756016,3.594964 -6000000,15,0,7671927,4.683158,0.753397,3.929761 -6000000,20,0,7866072,4.793124,0.755119,4.038005 -6000000,25,0,8244137,4.895965,0.752644,4.143321 -6000000,30,0,7854925,4.768136,0.763280,4.004856 -6000000,35,0,6347304,4.200039,0.758310,3.441730 -6000000,40,0,5246034,3.884311,0.753668,3.130643 -6000000,45,0,4674411,3.618269,0.787706,2.830563 -6000000,50,0,4375865,3.581855,0.764940,2.816915 -6000000,55,0,4593503,3.577487,0.768592,2.808895 -6000000,60,0,5368031,3.872888,0.778848,3.094041 -6000000,65,0,5902275,3.999542,0.779623,3.219919 -6000000,70,0,6479259,4.157115,0.826905,3.330210 -6000000,75,0,6707107,4.237924,0.798023,3.439901 -6000000,80,0,6068588,3.949625,0.777362,3.172263 -6000000,85,0,5511508,3.704121,0.813528,2.890594 -6000000,90,0,4921397,3.567229,0.805057,2.762172 -6000000,95,0,4071941,3.284678,0.823504,2.461175 -6000000,100,0,3307903,3.140478,0.757465,2.383013 -7000000,5,0,10942236,5.697644,1.619642,4.078001 -7000000,10,0,10270453,5.512758,0.885017,4.627741 -7000000,15,51163,11869644,6.008075,0.911135,5.096941 -7000000,20,0,11636610,6.075285,0.873067,5.202218 -7000000,25,477954,12231702,6.407728,0.890165,5.517562 -7000000,30,96719,12021027,6.172390,0.882817,5.289573 -7000000,35,0,10750107,5.529346,0.930081,4.599266 -7000000,40,0,9268594,5.031654,0.874133,4.157521 -7000000,45,0,8346097,4.697675,0.914095,3.783580 -7000000,50,0,7876007,4.542001,0.855008,3.686992 -7000000,55,0,8023334,4.528474,0.913997,3.614478 -7000000,60,0,8419225,4.699484,0.875165,3.824319 -7000000,65,0,10571310,5.321135,0.971543,4.349592 -7000000,70,0,10196521,5.205932,0.949261,4.256671 -7000000,75,3087,11225914,5.737270,0.896307,4.840963 -7000000,80,0,10444634,5.338575,0.948241,4.390334 -7000000,85,0,9412025,4.922358,0.950560,3.971799 -7000000,90,0,8181797,4.566340,0.949068,3.617272 -7000000,95,0,6492478,4.068910,0.927911,3.140998 -7000000,100,0,6460742,4.072999,0.926043,3.146956 -8000000,5,1547679,13974990,6.611841,1.782974,4.828866 -8000000,10,1349023,13819521,6.522988,1.848314,4.674674 -8000000,15,1250438,14768979,7.003629,2.103466,4.900163 -8000000,20,1944856,13511558,6.956602,2.188037,4.768566 -8000000,25,3385314,13241958,7.170449,2.433094,4.737355 -8000000,30,2981814,13113729,6.890043,2.399469,4.490574 -8000000,35,484684,14308929,6.553872,2.096342,4.457529 -8000000,40,490274,13323517,6.151847,1.925466,4.226381 -8000000,45,120202,12008897,5.750389,1.760990,3.989398 -8000000,50,66021,11455779,5.452624,1.621572,3.831053 -8000000,55,0,11529740,5.543609,1.761989,3.781620 -8000000,60,0,12567514,5.892941,1.912218,3.980723 -8000000,65,169118,13824651,6.379142,2.144917,4.234224 -8000000,70,265625,13805616,6.537384,2.167549,4.369835 -8000000,75,1279731,13029862,6.451706,2.322877,4.128829 -8000000,80,467329,13615753,6.470692,2.228395,4.242297 -8000000,85,0,12772989,5.942393,2.074334,3.868059 -8000000,90,0,11531293,5.427606,1.925593,3.502014 -8000000,95,0,10702063,5.198019,1.819703,3.378316 -8000000,100,0,9408805,4.834727,1.479196,3.355530 -9000000,5,3037646,16685247,7.339630,1.977169,5.362461 -9000000,10,2628172,16768556,7.522570,1.967070,5.555500 -9000000,15,3772889,14626740,7.591316,2.319846,5.271470 -9000000,20,3825503,14272986,7.727109,2.297578,5.429531 -9000000,25,4504975,14511430,7.784240,2.518145,5.266096 -9000000,30,4258811,14570970,7.631955,2.542294,5.089661 -9000000,35,2876804,14663941,7.512055,2.261533,5.250523 -9000000,40,1875466,16302585,7.138127,2.045630,5.092497 -9000000,45,1616716,15099770,6.686282,1.942530,4.743752 -9000000,50,1475924,14625713,6.340199,1.755059,4.585140 -9000000,55,1278398,14987535,6.506340,1.964767,4.541573 -9000000,60,1850110,16260263,7.177482,2.055182,5.122300 -9000000,65,1847096,14951605,7.119351,2.290011,4.829341 -9000000,70,2639700,14665600,7.203971,2.357735,4.846236 -9000000,75,3854553,14674516,7.514475,2.570441,4.944033 -9000000,80,2654872,14337311,7.131127,2.354073,4.777055 -9000000,85,1418711,15076827,7.112464,2.283614,4.828851 -9000000,90,1085103,15473669,6.721262,2.121744,4.599519 -9000000,95,470511,14010402,6.074357,1.911429,4.162928 -9000000,100,362220,13206922,5.821822,1.605431,4.216391 -10000000,5,4380122,19328044,8.223477,2.138010,6.085467 -10000000,10,4032912,18410204,8.479764,2.137043,6.342721 -10000000,15,5134573,15854102,8.336251,2.409270,5.926981 -10000000,20,5372727,15739193,8.478422,2.512243,5.966179 -10000000,25,5530225,16266760,8.678549,2.709208,5.969340 -10000000,30,5569506,16286334,8.637668,2.668475,5.969193 -10000000,35,4335864,15901036,8.064952,2.387045,5.677907 -10000000,40,3746996,16632197,8.214258,2.261024,5.953234 -10000000,45,2959391,18071628,7.543156,2.018661,5.524495 -10000000,50,3050989,17761921,7.281090,1.926665,5.354425 -10000000,55,2446851,17872804,7.314149,2.056313,5.257835 -10000000,60,3281209,17727995,7.993614,2.200846,5.792768 -10000000,65,3397266,16003254,7.593191,2.398930,5.194261 -10000000,70,3815474,15875834,7.805462,2.413468,5.391994 -10000000,75,4746834,15989428,8.195779,2.663577,5.532202 -10000000,80,4105434,15890037,7.991832,2.515989,5.475843 -10000000,85,3263554,15789517,7.571546,2.372554,5.198992 -10000000,90,3259600,17271474,7.993723,2.224401,5.769322 -10000000,95,2122795,17905826,7.228344,2.135265,5.093080 -10000000,100,1662459,16302933,6.542334,1.763724,4.778610 +1000000,5,0,0,0.304796,0.119714,0.185083 +1000000,10,0,0,0.269756,0.099647,0.170109 +1000000,15,0,0,0.259180,0.105990,0.153191 +1000000,20,0,0,0.264456,0.103014,0.161443 +1000000,25,0,0,0.269950,0.102658,0.167292 +1000000,30,0,0,0.253448,0.101945,0.151502 +1000000,35,0,0,0.246981,0.102416,0.144565 +1000000,40,0,0,0.252654,0.099290,0.153364 +1000000,45,0,0,0.245097,0.103019,0.142077 +1000000,50,0,0,0.273276,0.102957,0.170319 +1000000,55,0,0,0.234855,0.098946,0.135909 +1000000,60,0,0,0.247402,0.097298,0.150103 +1000000,65,0,0,0.243134,0.106300,0.136834 +1000000,70,0,0,0.248846,0.103436,0.145410 +1000000,75,0,0,0.269639,0.104829,0.164810 +1000000,80,0,0,0.241489,0.103680,0.137810 +1000000,85,0,0,0.247136,0.111698,0.135439 +1000000,90,0,0,0.249959,0.106639,0.143320 +1000000,95,0,0,0.243798,0.110067,0.133731 +1000000,100,0,0,0.271632,0.102914,0.168718 +2000000,5,0,0,0.591920,0.184654,0.407266 +2000000,10,0,0,0.567438,0.176850,0.390589 +2000000,15,0,0,0.536086,0.192238,0.343848 +2000000,20,0,0,0.542417,0.188070,0.354347 +2000000,25,0,0,0.558974,0.189228,0.369746 +2000000,30,0,0,0.523176,0.196181,0.326994 +2000000,35,0,0,0.513190,0.204328,0.308862 +2000000,40,0,0,0.520199,0.196777,0.323423 +2000000,45,0,0,0.508297,0.205507,0.302790 +2000000,50,0,0,0.570015,0.203260,0.366756 +2000000,55,0,0,0.520087,0.212588,0.307499 +2000000,60,0,0,0.532947,0.208120,0.324827 +2000000,65,0,0,0.518420,0.219160,0.299260 +2000000,70,0,0,0.522839,0.212609,0.310230 +2000000,75,0,0,0.571909,0.212966,0.358943 +2000000,80,0,0,0.534415,0.211290,0.323125 +2000000,85,0,0,0.509377,0.211505,0.297872 +2000000,90,0,0,0.518628,0.201433,0.317196 +2000000,95,0,0,0.495041,0.207214,0.287827 +2000000,100,0,0,0.570585,0.199427,0.371157 +3000000,5,0,0,0.943249,0.289898,0.653351 +3000000,10,0,0,0.888884,0.271484,0.617400 +3000000,15,0,0,0.858038,0.291150,0.566887 +3000000,20,0,0,0.870721,0.286143,0.584578 +3000000,25,0,0,0.913674,0.304130,0.609543 +3000000,30,0,0,0.834930,0.322482,0.512448 +3000000,35,0,0,0.814788,0.298290,0.516497 +3000000,40,0,0,0.839414,0.299988,0.539426 +3000000,45,0,0,0.797161,0.298722,0.498439 +3000000,50,0,0,0.908197,0.281542,0.626655 +3000000,55,0,0,0.783287,0.305768,0.477518 +3000000,60,0,0,0.807669,0.293050,0.514619 +3000000,65,0,0,0.793853,0.313138,0.480716 +3000000,70,0,0,0.787560,0.304600,0.482960 +3000000,75,0,0,0.873677,0.307252,0.566425 +3000000,80,0,0,0.807961,0.303729,0.504232 +3000000,85,0,0,0.788310,0.315988,0.472323 +3000000,90,0,0,0.812653,0.313140,0.499513 +3000000,95,0,0,0.798832,0.319401,0.479432 +3000000,100,0,0,0.900047,0.305609,0.594438 +4000000,5,0,0,1.312452,0.354414,0.958038 +4000000,10,0,32992,1.293093,0.346123,0.946971 +4000000,15,0,49476,1.213818,0.364399,0.849419 +4000000,20,0,41400,1.209091,0.365644,0.843447 +4000000,25,0,34360,1.220894,0.371361,0.849533 +4000000,30,0,8676,1.132169,0.374630,0.757539 +4000000,35,0,0,1.096359,0.398387,0.697972 +4000000,40,0,27148,1.119752,0.386061,0.733691 +4000000,45,0,0,1.087228,0.400773,0.686456 +4000000,50,0,40183,1.213417,0.382192,0.831225 +4000000,55,0,0,1.088623,0.413009,0.675613 +4000000,60,0,15504,1.090491,0.392868,0.697622 +4000000,65,0,22576,1.110595,0.427184,0.683411 +4000000,70,0,20230,1.127521,0.421596,0.705925 +4000000,75,0,45125,1.265487,0.420677,0.844810 +4000000,80,0,15228,1.166686,0.424777,0.741909 +4000000,85,0,30407,1.131428,0.444573,0.686855 +4000000,90,0,2490,1.149576,0.436162,0.713414 +4000000,95,0,29242,1.137965,0.441198,0.696767 +4000000,100,0,50576,1.272764,0.414959,0.857805 +5000000,5,0,162973,1.719990,0.448206,1.271784 +5000000,10,0,253004,1.712342,0.436731,1.275610 +5000000,15,0,137642,1.651285,0.464859,1.186427 +5000000,20,0,252541,1.645313,0.472497,1.172815 +5000000,25,0,240395,1.697715,0.477384,1.220331 +5000000,30,0,126376,1.611649,0.486436,1.125213 +5000000,35,0,127226,1.549424,0.501555,1.047870 +5000000,40,0,142578,1.612192,0.487048,1.125144 +5000000,45,0,100790,1.506562,0.510956,0.995606 +5000000,50,0,107424,1.697497,0.487615,1.209881 +5000000,55,0,97499,1.507024,0.527339,0.979685 +5000000,60,0,232640,1.524929,0.507852,1.017077 +5000000,65,0,112141,1.505165,0.550458,0.954707 +5000000,70,0,116809,1.488755,0.524087,0.964668 +5000000,75,0,278950,1.685253,0.544767,1.140487 +5000000,80,0,154473,1.563309,0.535400,1.027909 +5000000,85,0,107231,1.494527,0.545171,0.949356 +5000000,90,0,117542,1.506748,0.534769,0.971979 +5000000,95,0,119753,1.508129,0.548399,0.959730 +5000000,100,0,93336,1.646859,0.538526,1.108333 +6000000,5,0,422312,2.092982,0.540849,1.552133 +6000000,10,0,404027,2.128459,0.537490,1.590969 +6000000,15,0,366277,2.060288,0.560244,1.500044 +6000000,20,0,377486,2.091494,0.559321,1.532173 +6000000,25,0,259797,2.108112,0.575710,1.532403 +6000000,30,0,437778,2.063193,0.589612,1.473581 +6000000,35,0,399060,1.910304,0.602500,1.307804 +6000000,40,0,395199,2.006413,0.585060,1.421354 +6000000,45,0,523849,1.946533,0.621970,1.324562 +6000000,50,0,319729,2.032188,0.583699,1.448488 +6000000,55,0,482565,1.963444,0.625372,1.338072 +6000000,60,0,493936,1.985351,0.606744,1.378607 +6000000,65,0,375453,1.920874,0.654684,1.266190 +6000000,70,0,516915,1.991413,0.629187,1.362226 +6000000,75,0,597299,2.174931,0.652623,1.522309 +6000000,80,0,556514,1.979182,0.627021,1.352162 +6000000,85,0,516565,1.973487,0.655650,1.317837 +6000000,90,0,616404,1.990341,0.649117,1.341224 +6000000,95,0,620854,1.980581,0.664450,1.316132 +6000000,100,0,341078,2.127766,0.629442,1.498324 +7000000,5,0,1715163,2.785417,0.620942,2.164475 +7000000,10,0,1623859,2.779911,0.602702,2.177209 +7000000,15,0,1707400,2.813195,0.675379,2.137816 +7000000,20,0,1654045,2.767437,0.650758,2.116679 +7000000,25,0,1205396,2.578133,0.676151,1.901982 +7000000,30,0,1229706,2.525530,0.673630,1.851900 +7000000,35,0,1192320,2.514677,0.704380,1.810297 +7000000,40,0,1565473,2.760732,0.704620,2.056112 +7000000,45,0,1810532,2.823445,0.751221,2.072224 +7000000,50,0,1211119,2.532261,0.685132,1.847130 +7000000,55,0,1192700,2.531494,0.734813,1.796680 +7000000,60,0,1347609,2.600764,0.713706,1.887058 +7000000,65,0,2166934,3.050198,0.791947,2.258251 +7000000,70,0,1970005,2.866356,0.777477,2.088880 +7000000,75,0,1606261,2.708065,0.751587,1.956477 +7000000,80,0,1412201,2.613379,0.750682,1.862697 +7000000,85,0,895859,2.354906,0.774489,1.580418 +7000000,90,0,1626724,2.824015,0.777956,2.046059 +7000000,95,0,1086859,2.502839,0.783295,1.719544 +7000000,100,0,1622737,2.874956,0.787285,2.087671 +8000000,5,0,2971813,3.582397,0.766665,2.815732 +8000000,10,0,2562712,3.244056,0.703172,2.540885 +8000000,15,0,2580540,3.205176,0.742386,2.462790 +8000000,20,0,2698420,3.324515,0.747439,2.577076 +8000000,25,0,2386356,3.229281,0.772734,2.456547 +8000000,30,0,2503484,3.147290,0.770162,2.377127 +8000000,35,0,2263415,3.018989,0.799203,2.219785 +8000000,40,0,2583245,3.195799,0.796715,2.399083 +8000000,45,0,2234810,2.995317,0.825657,2.169659 +8000000,50,0,2218835,3.078903,0.794090,2.284812 +8000000,55,0,2275886,2.943430,0.845031,2.098399 +8000000,60,0,2387623,3.083187,0.812817,2.270371 +8000000,65,0,2270354,2.998703,0.878881,2.119822 +8000000,70,0,2318495,3.031083,0.849870,2.181212 +8000000,75,0,2524549,3.164104,0.851482,2.312622 +8000000,80,0,2248457,2.962297,0.841574,2.120723 +8000000,85,0,2213049,2.913182,0.882536,2.030645 +8000000,90,0,2152819,2.927023,0.855132,2.071891 +8000000,95,0,2226148,2.942292,0.889671,2.052622 +8000000,100,0,2179430,2.981433,0.841704,2.139729 +9000000,5,0,3621809,3.827629,0.819313,3.008316 +9000000,10,0,3912368,3.847340,0.780610,3.066730 +9000000,15,0,3624982,3.755970,0.839363,2.916606 +9000000,20,0,3698569,3.759158,0.844351,2.914807 +9000000,25,0,3513082,3.760925,0.861112,2.899812 +9000000,30,0,3997236,3.940646,0.891574,3.049072 +9000000,35,0,3787101,3.759526,0.907295,2.852231 +9000000,40,0,3867230,3.802762,0.881266,2.921495 +9000000,45,0,3798215,3.750207,0.930276,2.819931 +9000000,50,0,3216568,3.533120,0.891711,2.641409 +9000000,55,0,3812435,3.680286,0.947674,2.732612 +9000000,60,0,3839867,3.653675,0.923681,2.729994 +9000000,65,0,3689074,3.595896,0.982576,2.613320 +9000000,70,0,3700752,3.639629,0.950929,2.688700 +9000000,75,0,4270346,4.073362,0.997443,3.075919 +9000000,80,0,4009464,3.868769,0.975652,2.893117 +9000000,85,0,3955479,3.724603,0.989258,2.735345 +9000000,90,0,3919529,3.756801,0.968999,2.787802 +9000000,95,0,3727932,3.650245,0.991285,2.658960 +9000000,100,0,3470715,3.696968,0.950463,2.746505 +10000000,5,0,6475725,4.771627,0.897402,3.874225 +10000000,10,0,6040749,4.768925,0.876224,3.892701 +10000000,15,0,5917721,4.622284,0.935065,3.687218 +10000000,20,0,6285857,4.787759,0.953275,3.834484 +10000000,25,0,5362312,4.422276,0.964333,3.457943 +10000000,30,0,6007325,4.678531,0.970952,3.707579 +10000000,35,0,5716497,4.540345,1.698371,2.841973 +10000000,40,0,6043263,4.592433,0.997314,3.595119 +10000000,45,0,5281712,4.360967,1.702299,2.658668 +10000000,50,0,5257216,4.308344,0.983483,3.324861 +10000000,55,0,5328835,4.345625,1.724349,2.621276 +10000000,60,0,5870849,4.561192,1.734647,2.826545 +10000000,65,0,5169657,4.356449,1.751664,2.604785 +10000000,70,0,4714519,4.091589,1.712368,2.379220 +10000000,75,0,5274712,4.360359,1.736360,2.623999 +10000000,80,0,4854622,4.100874,1.740089,2.360785 +10000000,85,0,4536249,4.002162,1.737434,2.264728 +10000000,90,0,4707352,4.086841,1.730413,2.356427 +10000000,95,0,4514431,3.986423,1.746495,2.239928 +10000000,100,0,4732642,4.028987,1.447545,2.581441 diff --git a/ftime/ftime.go b/ftime/ftime.go deleted file mode 100644 index 4f6cd9f..0000000 --- a/ftime/ftime.go +++ /dev/null @@ -1,151 +0,0 @@ -/* - MIT License - - Copyright (c) 2018 Yusuke Kato - - Permission is hereby granted, free of charge, to any person obtaining a copy - of this software and associated documentation files (the "Software"), to deal - in the Software without restriction, including without limitation the rights - to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - copies of the Software, and to permit persons to whom the Software is - furnished to do so, subject to the following conditions: - - The above copyright notice and this permission notice shall be included in all - copies or substantial portions of the Software. - - THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - SOFTWARE. -*/ - -package ftime - -import ( - "context" - "sync" - "sync/atomic" - "time" -) - -type Ftime interface { - IsDaemonRunning() bool - GetFormat() string - SetFormat(format string) Ftime - GetLocation() *time.Location - SetLocation(location *time.Location) Ftime - Now() time.Time - Stop() - UnixNow() int64 - UnixUNow() uint32 - UnixNanoNow() int64 - UnixUNanoNow() uint32 - FormattedNow() []byte - Since(t time.Time) time.Duration - StartTimerD(ctx context.Context, dur time.Duration) Ftime -} - -// Fastime is fastime's base struct, it's stores atomic time object -type fastime struct { - uut uint32 - uunt uint32 - dur int64 - ut int64 - unt int64 - correctionDur time.Duration - mu sync.Mutex - wg sync.WaitGroup - running atomic.Bool - t atomic.Pointer[time.Time] - ft atomic.Pointer[[]byte] - format atomic.Pointer[string] - formatValid atomic.Bool - location atomic.Pointer[time.Location] -} - -const ( - bufSize = 64 - bufMargin = 10 -) - -var ( - once sync.Once - instance Ftime -) - -func init() { - once.Do(func() { - instance = New().StartTimerD(context.Background(), time.Millisecond*5) - }) -} - -func IsDaemonRunning() (running bool) { - return instance.IsDaemonRunning() -} - -func GetLocation() (loc *time.Location) { - return instance.GetLocation() -} - -func GetFormat() (form string) { - return instance.GetFormat() -} - -// SetLocation replaces time location -func SetLocation(location *time.Location) (ft Ftime) { - return instance.SetLocation(location) -} - -// SetFormat replaces time format -func SetFormat(format string) (ft Ftime) { - return instance.SetFormat(format) -} - -// Now returns current time -func Now() (now time.Time) { - return instance.Now() -} - -// Since returns the time elapsed since t. -// It is shorthand for fastime.Now().Sub(t). -func Since(t time.Time) (dur time.Duration) { - return instance.Since(t) -} - -// Stop stops stopping time refresh daemon -func Stop() { - instance.Stop() -} - -// UnixNow returns current unix time -func UnixNow() (now int64) { - return instance.UnixNow() -} - -// UnixUNow returns current unix time -func UnixUNow() (now uint32) { - return instance.UnixUNow() -} - -// UnixNanoNow returns current unix nano time -func UnixNanoNow() (now int64) { - return instance.UnixNanoNow() -} - -// UnixUNanoNow returns current unix nano time -func UnixUNanoNow() (now uint32) { - return instance.UnixUNanoNow() -} - -// FormattedNow returns formatted byte time -func FormattedNow() (now []byte) { - return instance.FormattedNow() -} - -// StartTimerD provides time refresh daemon -func StartTimerD(ctx context.Context, dur time.Duration) (ft Ftime) { - return instance.StartTimerD(ctx, dur) -} diff --git a/ftime/helpers.go b/ftime/helpers.go deleted file mode 100644 index 7f24725..0000000 --- a/ftime/helpers.go +++ /dev/null @@ -1,247 +0,0 @@ -/* - MIT License - - # Copyright (c) 2018 Yusuke Kato - - Permission is hereby granted, free of charge, to any person obtaining a copy - of this software and associated documentation files (the "Software"), to deal - in the Software without restriction, including without limitation the rights - to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - copies of the Software, and to permit persons to whom the Software is - furnished to do so, subject to the following conditions: - - The above copyright notice and this permission notice shall be included in all - copies or substantial portions of the Software. - - THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - SOFTWARE. -*/ - -package ftime - -import ( - "context" - "math" - "sync/atomic" - "syscall" - "time" - "unsafe" -) - -func (f *fastime) fetchSysTime() (now time.Time) { - var tv syscall.Timeval - err := syscall.Gettimeofday(&tv) - loc := f.GetLocation() - if err != nil { - now = time.Now() - if loc != nil { - return now.In(loc) - } - return now - } - now = time.Unix(0, syscall.TimevalToNsec(tv)) - if loc != nil { - return now.In(loc) - } - return now -} - -func New() (f Ftime) { - return newFtime() -} - -func newFtime() (f *fastime) { - f = &fastime{ - ut: math.MaxInt64, - unt: math.MaxInt64, - uut: math.MaxUint32, - uunt: math.MaxUint32, - correctionDur: time.Millisecond * 100, - } - - form := time.RFC3339 - f.format.Store(&form) - loc := func() (loc *time.Location) { - tz, ok := syscall.Getenv("TZ") - if ok && tz != "" { - var err error - loc, err = time.LoadLocation(tz) - if err == nil { - return loc - } - } - return new(time.Location) - }() - - f.location.Store(loc) - - buf := f.newBuffer(len(form) + bufMargin) - f.ft.Store(&buf) - - return f.refresh() -} - -func (f *fastime) update() (ft *fastime) { - return f.store(f.Now().Add(time.Duration(atomic.LoadInt64(&f.dur)))) -} - -func (f *fastime) refresh() (ft *fastime) { - return f.store(f.fetchSysTime()) -} - -func (f *fastime) newBuffer(max int) (b []byte) { - if max < bufSize { - var buf [bufSize]byte - b = buf[:0] - } else { - b = make([]byte, 0, max) - } - return b -} - -func (f *fastime) store(t time.Time) (ft *fastime) { - f.t.Store(&t) - f.formatValid.Store(false) - ut := t.Unix() - unt := t.UnixNano() - atomic.StoreInt64(&f.ut, ut) - atomic.StoreInt64(&f.unt, unt) - atomic.StoreUint32(&f.uut, *(*uint32)(unsafe.Pointer(&ut))) - atomic.StoreUint32(&f.uunt, *(*uint32)(unsafe.Pointer(&unt))) - return f -} - -func (f *fastime) IsDaemonRunning() (running bool) { - return f.running.Load() -} - -func (f *fastime) GetLocation() (loc *time.Location) { - loc = f.location.Load() - if loc == nil { - return nil - } - return loc -} - -func (f *fastime) GetFormat() (form string) { - return *f.format.Load() -} - -// SetLocation replaces time location -func (f *fastime) SetLocation(loc *time.Location) (ft Ftime) { - if loc == nil { - return f - } - f.location.Store(loc) - f.refresh() - return f -} - -// SetFormat replaces time format -func (f *fastime) SetFormat(format string) (ft Ftime) { - f.format.Store(&format) - f.formatValid.Store(false) - f.refresh() - return f -} - -// Now returns current time -func (f *fastime) Now() (t time.Time) { - return *f.t.Load() -} - -// Stop stops stopping time refresh daemon -func (f *fastime) Stop() { - f.mu.Lock() - f.stop() - f.mu.Unlock() -} - -func (f *fastime) stop() { - if f.IsDaemonRunning() { - atomic.StoreInt64(&f.dur, 0) - } - f.wg.Wait() -} - -func (f *fastime) Since(t time.Time) (dur time.Duration) { - return f.Now().Sub(t) -} - -// UnixNow returns current unix time -func (f *fastime) UnixNow() (now int64) { - return atomic.LoadInt64(&f.ut) -} - -// UnixNow returns current unix time -func (f *fastime) UnixUNow() (now uint32) { - return atomic.LoadUint32(&f.uut) -} - -// UnixNanoNow returns current unix nano time -func (f *fastime) UnixNanoNow() (now int64) { - return atomic.LoadInt64(&f.unt) -} - -// UnixNanoNow returns current unix nano time -func (f *fastime) UnixUNanoNow() (now uint32) { - return atomic.LoadUint32(&f.uunt) -} - -// FormattedNow returns formatted byte time -func (f *fastime) FormattedNow() (now []byte) { - // only update formatted value on swap - if f.formatValid.CompareAndSwap(false, true) { - form := f.GetFormat() - buf := f.Now().AppendFormat(f.newBuffer(len(form)+bufMargin), form) - f.ft.Store(&buf) - } - return *f.ft.Load() -} - -// StartTimerD provides time refresh daemon -func (f *fastime) StartTimerD(ctx context.Context, dur time.Duration) (ft Ftime) { - f.mu.Lock() - defer f.mu.Unlock() - // if the daemon was already running, restart - if f.IsDaemonRunning() { - f.stop() - } - f.running.Store(true) - f.dur = math.MaxInt64 - atomic.StoreInt64(&f.dur, dur.Nanoseconds()) - ticker := time.NewTicker(time.Duration(atomic.LoadInt64(&f.dur))) - lastCorrection := f.fetchSysTime() - f.wg.Add(1) - f.refresh() - - go func() { - // daemon cleanup - defer func() { - f.running.Store(false) - ticker.Stop() - f.wg.Done() - }() - for atomic.LoadInt64(&f.dur) > 0 { - t := <-ticker.C - // rely on ticker for approximation - if t.Sub(lastCorrection) < f.correctionDur { - f.update() - } else { // correct the system time at a fixed interval - select { - case <-ctx.Done(): - return - default: - } - f.refresh() - lastCorrection = t - } - } - }() - return f -} diff --git a/gpq.go b/gpq.go index 24823cd..c0d99bc 100644 --- a/gpq.go +++ b/gpq.go @@ -1,15 +1,15 @@ package gpq import ( - "errors" + "context" "fmt" "sync" "time" "github.com/JustinTimperio/gpq/disk" - "github.com/JustinTimperio/gpq/ftime" "github.com/JustinTimperio/gpq/queues" "github.com/JustinTimperio/gpq/schema" + "github.com/JustinTimperio/gpq/timekeeper" "github.com/google/uuid" ) @@ -38,6 +38,11 @@ type GPQ[d any] struct { batchHandler *batchHandler[d] // batchCounter is used to keep track the current batch number batchCounter *batchCounter + + // ctx is the context for graceful shutdown + ctx context.Context + // cancel is the cancel function for the context + cancel context.CancelFunc } // NewGPQ creates a new GPQ with the given number of buckets @@ -63,6 +68,8 @@ func NewGPQ[d any](Options schema.GPQOptions) (uint, *GPQ[d], error) { } } + ctx, cancel := context.WithCancel(context.Background()) + gpq := &GPQ[d]{ queue: queues.NewCorePriorityQueue[d](Options, diskCache, receiver), options: Options, @@ -74,6 +81,9 @@ func NewGPQ[d any](Options schema.GPQOptions) (uint, *GPQ[d], error) { lazyDiskDeleteChan: receiver, batchHandler: newBatchHandler(diskCache), batchCounter: newBatchCounter(Options.LazyDiskBatchSize), + + ctx: ctx, + cancel: cancel, } var restored uint @@ -104,7 +114,7 @@ func (g *GPQ[d]) ItemsInQueue() uint { } // ItemsInDB returns the total number of items currently commit to disk -func (g *GPQ[d]) ItemsInDB() uint { +func (g *GPQ[d]) ItemsInDB() (uint, error) { return g.diskCache.ItemsInDB() } @@ -117,9 +127,9 @@ func (g *GPQ[d]) ActiveBuckets() uint { func (g *GPQ[d]) Enqueue(item schema.Item[d]) error { if item.Priority > uint(g.options.MaxPriority) { - return errors.New("Priority bucket does not exist") + return fmt.Errorf("priority %d exceeds maximum allowed priority %d", item.Priority, g.options.MaxPriority) } - item.SubmittedAt = ftime.Now() + item.SubmittedAt = timekeeper.Now() item.LastEscalated = item.SubmittedAt if g.options.DiskCacheEnabled && !item.WasRestored { @@ -131,7 +141,12 @@ func (g *GPQ[d]) Enqueue(item schema.Item[d]) error { if g.options.LazyDiskCacheEnabled { item.BatchNumber = g.batchCounter.increment() - g.lazyDiskSendChan <- item + select { + case g.lazyDiskSendChan <- item: + // Successfully sent + case <-g.ctx.Done(): + return fmt.Errorf("queue is closing: %w", g.ctx.Err()) + } } else { err = g.diskCache.WriteSingle(key, item) if err != nil { @@ -170,7 +185,13 @@ func (g *GPQ[d]) EnqueueBatch(items []schema.Item[d]) []error { if g.options.LazyDiskCacheEnabled { items[i].BatchNumber = g.batchCounter.increment() - g.lazyDiskSendChan <- items[i] + select { + case g.lazyDiskSendChan <- items[i]: + // Successfully sent + case <-g.ctx.Done(): + errors = append(errors, fmt.Errorf("queue is closing: %w", g.ctx.Err())) + continue + } } else { err = g.diskCache.WriteSingle(items[i].DiskUUID, items[i]) if err != nil { @@ -255,6 +276,8 @@ func (g *GPQ[d]) Prioritize() (escalated, removed uint, err error) { // Close performs a safe shutdown of the GPQ and the disk cache preventing data loss func (g *GPQ[d]) Close() { + // Signal all goroutines to stop + g.cancel() if g.options.DiskCacheEnabled { if g.options.LazyDiskCacheEnabled { @@ -318,6 +341,7 @@ func (g *GPQ[d]) lazyDiskWriter(maxDelay time.Duration) { go func() { defer wg.Done() + defer ticker.Stop() for { select { case <-ticker.C: @@ -353,7 +377,7 @@ func (g *GPQ[d]) lazyDiskDeleter() { for i, v := range batch { g.batchHandler.deleteBatch(v, i, false) - batch[item.BatchNumber] = batch[item.BatchNumber][:0] + delete(batch, i) } return } diff --git a/gpq_e2e_test.go b/gpq_e2e_test.go index 9380941..978045d 100644 --- a/gpq_e2e_test.go +++ b/gpq_e2e_test.go @@ -14,7 +14,7 @@ import ( func TestE2E(t *testing.T) { var ( - total uint64 = 10_000_000 + total uint64 = 1_000_000 syncToDisk bool = true lazySync bool = true maxBuckets uint = 10 @@ -38,7 +38,7 @@ func TestE2E(t *testing.T) { DiskCacheCompression: true, DiskEncryptionEnabled: true, DiskEncryptionKey: []byte("12345678901234567890123456789012"), - LazyDiskCacheChannelSize: 1_000_000, + LazyDiskCacheChannelSize: 500_000, DiskWriteDelay: time.Duration(time.Second * 5), LazyDiskCacheEnabled: lazySync, diff --git a/gpq_parallel_test.go b/gpq_parallel_test.go index 8807145..44bd593 100644 --- a/gpq_parallel_test.go +++ b/gpq_parallel_test.go @@ -175,6 +175,7 @@ func TestBatchParallel(t *testing.T) { if queue.ItemsInQueue() != 0 { t.Fatal("Items in queue:", queue.ItemsInQueue()) } + t.Log("Received", received, "messages!") queue.Close() t.Log("Batch Parallel Test Passed") diff --git a/helpers.go b/helpers.go index 81ec1e3..7fcb50d 100644 --- a/helpers.go +++ b/helpers.go @@ -45,14 +45,17 @@ func (bh *batchHandler[T]) deleteBatch(batch []*schema.DeleteMessage, batchNumbe return } - bh.syncedBatches[batchNumber] = false - bh.deletedBatches[batchNumber] = true - - if _, ok := bh.syncedBatches[batchNumber]; ok { + // Check if this batch was already synced to disk + if alreadySynced, ok := bh.syncedBatches[batchNumber]; ok && alreadySynced { + // Batch is on disk, safe to delete bh.diskCache.DeleteBatch(batch) + delete(bh.syncedBatches, batchNumber) + delete(bh.deletedBatches, batchNumber) return } + // Batch not synced yet, just mark for deletion + bh.deletedBatches[batchNumber] = true } type batchCounter struct { diff --git a/queues/cpq.go b/queues/cpq.go index da5c9e1..bb3722b 100644 --- a/queues/cpq.go +++ b/queues/cpq.go @@ -6,9 +6,9 @@ import ( "sync" "github.com/JustinTimperio/gpq/disk" - "github.com/JustinTimperio/gpq/ftime" "github.com/JustinTimperio/gpq/queues/gheap" "github.com/JustinTimperio/gpq/schema" + "github.com/JustinTimperio/gpq/timekeeper" "github.com/cornelk/hashmap" "github.com/tidwall/btree" @@ -61,7 +61,7 @@ func (cpq *CorePriorityQueue[T]) Enqueue(data *schema.Item[T]) error { bucket, ok := cpq.buckets.Get(data.Priority) if !ok { - return errors.New("Core Priority Queue Error: Priority not found") + return fmt.Errorf("priority bucket %d not found in queue", data.Priority) } cpq.bpq.Insert(data.Priority) @@ -133,7 +133,7 @@ func (cpq *CorePriorityQueue[T]) DequeueBatch(batchSize uint) ([]*schema.Item[T] defer cpq.mux.Unlock() if cpq.bpq.Len() == 0 { - return nil, []error{errors.New("Core Priority Queue Error: No items found in the queue")} + return nil, []error{errors.New("queue is empty, no items to dequeue")} } batch := make([]*schema.Item[T], 0, batchSize) @@ -145,7 +145,15 @@ func (cpq *CorePriorityQueue[T]) DequeueBatch(batchSize uint) ([]*schema.Item[T] bucket, ok := cpq.buckets.Get(priority) if !ok { - return batch, []error{errors.New("Core Priority Queue Error: Internal error, priority not found")} + // Internal corruption detected - rollback items already dequeued + for _, item := range batch { + cpq.bpq.Insert(item.Priority) + if b, found := cpq.buckets.Get(item.Priority); found { + gheap.Enqueue[T](b, item) + cpq.itemsInQueue++ + } + } + return nil, []error{fmt.Errorf("Core Priority Queue Error: Internal corruption - priority %d in btree but not in hashmap (attempted to dequeue %d items before error)", priority, len(batch))} } item, err := gheap.Dequeue[T](bucket) @@ -165,18 +173,33 @@ func (cpq *CorePriorityQueue[T]) DequeueBatch(batchSize uint) ([]*schema.Item[T] } func (cpq *CorePriorityQueue[T]) Prioritize() (removed uint, escalated uint, err error) { - cpq.mux.Lock() - defer cpq.mux.Unlock() - + // Collect bucket keys with read lock + var bucketKeys []uint + cpq.mux.RLock() cpq.buckets.Range(func(key uint, bucket *priorityQueue[T]) bool { - // Iterate through the bucket and remove items that have been waiting too long + bucketKeys = append(bucketKeys, key) + return true + }) + cpq.mux.RUnlock() + + // Process each bucket with individual locks + for _, key := range bucketKeys { + cpq.mux.Lock() + + bucket, ok := cpq.buckets.Get(key) + if !ok { + cpq.mux.Unlock() + continue // Bucket was removed, skip + } + + // Process timeouts in this bucket var len = bucket.Len() var currentIndex uint for i := 0; i < len; i++ { item := bucket.items[currentIndex] if item.CanTimeout { - currentTime := ftime.Now() + currentTime := timekeeper.Now() if currentTime.Sub(item.SubmittedAt) > item.Timeout { if cpq.options.DiskCacheEnabled { @@ -196,7 +219,8 @@ func (cpq *CorePriorityQueue[T]) Prioritize() (removed uint, escalated uint, err _, e := gheap.Remove[T](bucket, item) if e != nil { err = fmt.Errorf("Core Priority Queue Error: %w", err) - return false + cpq.mux.Unlock() + return removed, escalated, err } cpq.itemsInQueue-- removed++ @@ -209,54 +233,57 @@ func (cpq *CorePriorityQueue[T]) Prioritize() (removed uint, escalated uint, err currentIndex++ } } - return true - }) - // Iterate through the buckets and remove empty buckets - cpq.buckets.Range(func(key uint, bucket *priorityQueue[T]) bool { - if bucket.Len() == 0 { + cpq.mux.Unlock() + } + + // Clean up empty buckets + cpq.mux.Lock() + for _, key := range bucketKeys { + bucket, ok := cpq.buckets.Get(key) + if ok && bucket.Len() == 0 { cpq.bpq.Delete(key) } - return true - }) - - if err != nil { - return removed, escalated, err } + cpq.mux.Unlock() + + // Process escalations with per-bucket locking + for _, key := range bucketKeys { + cpq.mux.Lock() + + bucket, ok := cpq.buckets.Get(key) + if !ok { + cpq.mux.Unlock() + continue + } - // This is a very basic but fast algorithm that iterates from the front to the back of the queue. - // If the item can escalate and has reached its ticker, then we check if the last item was escalated, - // and that we are not first in the queue. This strategy means that messages can only push up the queue, - // if other messages are also not being prioritized. In this model, messages not being escalated, - // can be impacted by other high priority messages allowing for fairly complex queue strategies. - // I think in the future this can allow for more advanced features but seems fine for now. - cpq.buckets.Range(func(key uint, bucket *priorityQueue[T]) bool { var lastItemWasEscalated bool var len = bucket.Len() for i := 0; i < len; i++ { item := bucket.items[i] - if item.ShouldEscalate { - currentTime := ftime.Now() - if currentTime.Sub(item.LastEscalated) > item.EscalationRate { + // Reset flag at start of each iteration + wasEscalatedThisRound := false - if !lastItemWasEscalated && i != 0 { - item.LastEscalated = currentTime - bucket.UpdatePriority(item, i-1) - escalated++ - } - // We don't need to update lastItemWasEscalated here because we just swapped - // the current cursor index, with cursor index - 1. The previous index must have - // not been escalated so we don't need to update lastItemWasEscalated + if item.ShouldEscalate { + currentTime := timekeeper.Now() + readyToEscalate := currentTime.Sub(item.LastEscalated) > item.EscalationRate + canMove := !lastItemWasEscalated && i != 0 + + if readyToEscalate && canMove { + item.LastEscalated = currentTime + bucket.UpdatePriority(item, i-1) + escalated++ + wasEscalatedThisRound = true } - } else { - lastItemWasEscalated = false } + + lastItemWasEscalated = wasEscalatedThisRound } - return true - }) + cpq.mux.Unlock() + } return removed, escalated, nil } diff --git a/timekeeper/timekeeper.go b/timekeeper/timekeeper.go new file mode 100644 index 0000000..f7b3b0a --- /dev/null +++ b/timekeeper/timekeeper.go @@ -0,0 +1,116 @@ +// Package timekeeper provides a high-performance cached time implementation +// optimized for high-throughput applications that need fast time access +// without the overhead of syscalls on every time.Now() call. +// +// This implementation uses atomic operations and periodic updates to provide +// microsecond-precision cached time values with configurable refresh rates. +package timekeeper + +import ( + "sync/atomic" + "time" +) + +// Cache holds the cached time value and provides thread-safe access +type Cache struct { + // Cached time value stored as nanoseconds since epoch + cachedNanos atomic.Int64 + + // Ticker for periodic updates + ticker *time.Ticker + + // Channel to signal shutdown + done chan struct{} +} + +var ( + // globalCache is the default cache instance used by package-level functions + globalCache *Cache +) + +func init() { + // Initialize with 50ms refresh rate + globalCache = NewCache(50 * time.Millisecond) + globalCache.Start() +} + +// NewCache creates a new time cache with the specified refresh interval. +// A smaller interval provides more accurate time but uses more CPU. +// Recommended intervals: +// - 1ms for high-precision requirements +// - 5ms for balanced performance (default) +// - 10-50ms for maximum throughput with lower precision needs +func NewCache(refreshInterval time.Duration) *Cache { + c := &Cache{ + ticker: time.NewTicker(refreshInterval), + done: make(chan struct{}), + } + + // Initialize with current time + c.refresh() + + return c +} + +// Start begins the background refresh goroutine +func (c *Cache) Start() { + go func() { + for { + select { + case <-c.ticker.C: + c.refresh() + case <-c.done: + c.ticker.Stop() + return + } + } + }() +} + +// Stop halts the background refresh goroutine +func (c *Cache) Stop() { + close(c.done) +} + +// refresh updates the cached time with the current system time +func (c *Cache) refresh() { + now := time.Now().UnixNano() + c.cachedNanos.Store(now) +} + +// Now returns the cached current time +// This is an extremely fast operation using only an atomic load +func (c *Cache) Now() time.Time { + nanos := c.cachedNanos.Load() + return time.Unix(0, nanos) +} + +// SetRefreshInterval changes the refresh interval dynamically +func (c *Cache) SetRefreshInterval(interval time.Duration) { + c.ticker.Reset(interval) +} + +// Package-level convenience functions using the global cache + +// Now returns the cached current time using the global cache instance. +// This is the primary function for high-throughput time access. +func Now() time.Time { + return globalCache.Now() +} + +// Stop stops the global cache refresh goroutine. +// This should typically only be called during application shutdown. +func Stop() { + globalCache.Stop() +} + +// SetRefreshInterval changes the global cache refresh interval. +// Use this to tune performance vs precision for your specific workload. +func SetRefreshInterval(interval time.Duration) { + globalCache.SetRefreshInterval(interval) +} + +// Since returns the time elapsed since t using the cached current time. +func Since(t time.Time) time.Duration { + return Now().Sub(t) +}