@@ -44,6 +44,10 @@ type StopLoadIDs struct {
4444
4545var databasesLoadMutex sync.Mutex
4646var stopLoadIDsMutex sync.Mutex
47+ var countersRPS = make (map [string ]int )
48+ var countersQPS = make (map [string ]int )
49+ var muRPS sync.Mutex
50+ var muQPS sync.Mutex
4751
4852func main () {
4953 // Get the configuration from environment variables or .env file.
@@ -75,6 +79,9 @@ func main() {
7579 go manageAllLoad ("postgres" )
7680 }
7781
82+ // Start logging the counters
83+ startLoggingCounters ()
84+
7885 // Continuously check and update the configuration from the control panel every 10 seconds
7986 for {
8087 time .Sleep (5 * time .Second )
@@ -339,6 +346,13 @@ func runMySQL(ctx context.Context, routineId int, dbConfig map[string]string) {
339346 // Variable to store the time of the last configuration update
340347 lastUpdate := time .Now ()
341348
349+ // Local counters for this goroutine
350+ localCounterRPS := 0
351+ localCounterQPS := 0
352+
353+ // Variable to store the time of the last counter update
354+ lastCounterUpdate := time .Now ()
355+
342356 for {
343357 select {
344358 case <- ctx .Done ():
@@ -356,25 +370,39 @@ func runMySQL(ctx context.Context, routineId int, dbConfig map[string]string) {
356370 lastUpdate = time .Now ()
357371 }
358372
359- // Use localDBConfig for other operations
373+ // Use localDBConfig for other operations and count queries
360374 if localDBConfig ["switch1" ] == "true" {
361-
362- load .MySQLSwitch1 (db , routineId , localDBConfig )
375+ localCounterQPS += load .MySQLSwitch1 (db , routineId , localDBConfig )
363376 }
364377
365378 if localDBConfig ["switch2" ] == "true" {
366-
367- load .MySQLSwitch2 (db , routineId , localDBConfig )
379+ localCounterQPS += load .MySQLSwitch2 (db , routineId , localDBConfig )
368380 }
369381
370382 if localDBConfig ["switch3" ] == "true" {
371-
372- load .MySQLSwitch3 (db , routineId , localDBConfig )
383+ localCounterQPS += load .MySQLSwitch3 (db , routineId , localDBConfig )
373384 }
374385
375386 if localDBConfig ["switch4" ] == "true" {
387+ localCounterQPS += load .MySQLSwitch4 (db , routineId , localDBConfig )
388+ }
389+
390+ // Increase local RPS counter
391+ localCounterRPS ++
392+
393+ // Update global counters every 2 seconds
394+ if time .Since (lastCounterUpdate ) > 2 * time .Second {
395+ muRPS .Lock ()
396+ countersRPS [dbConfig ["id" ]] += localCounterRPS
397+ muRPS .Unlock ()
376398
377- load .MySQLSwitch4 (db , routineId , localDBConfig )
399+ muQPS .Lock ()
400+ countersQPS [dbConfig ["id" ]] += localCounterQPS
401+ muQPS .Unlock ()
402+
403+ localCounterRPS = 0
404+ localCounterQPS = 0
405+ lastCounterUpdate = time .Now ()
378406 }
379407
380408 // Check that sleep is not empty before attempting conversion
@@ -388,6 +416,7 @@ func runMySQL(ctx context.Context, routineId int, dbConfig map[string]string) {
388416 }
389417}
390418
419+ // runPostgreSQL runs the PostgreSQL database operations for a specific connection
391420func runPostgreSQL (ctx context.Context , routineId int , dbConfig map [string ]string ) {
392421 connectionString := dbConfig ["connectionString" ]
393422
@@ -408,6 +437,13 @@ func runPostgreSQL(ctx context.Context, routineId int, dbConfig map[string]strin
408437 localDBConfig [k ] = v
409438 }
410439
440+ // Local counters for this goroutine
441+ localCounterRPS := 0
442+ localCounterQPS := 0
443+
444+ // Variable to store the time of the last counter update
445+ lastCounterUpdate := time .Now ()
446+
411447 for {
412448 select {
413449 case <- ctx .Done ():
@@ -429,23 +465,41 @@ func runPostgreSQL(ctx context.Context, routineId int, dbConfig map[string]strin
429465 // Debug
430466 // newConnections, _ := strconv.Atoi(localDBConfig["connections"])
431467 // if newConnections > 0 {
432- // log.Printf("Postgres: goroutine: %d: id: %s in progress. Switches: %s, %s, %s, %s, Sleep: %s", routineId+1, dbConfig["id"], localDBConfig["switch1"], localDBConfig["switch2"], localDBConfig["switch3"], localDBConfig["switch4"], localDBConfig["sleep"])
468+ // log.Printf("Postgres: goroutine: %d: id: %s in progress. Switches: %s, %s, %s, %s, Sleep: %s", routineId+1, dbConfig["id"], localDBConfig["switch1"], localDBConfig["switch2"], localDBConfig["switch3"], localDBConfig["switch4"], localDBConfig["sleep"])
433469 // }
434470
435471 if localDBConfig ["switch1" ] == "true" {
436- load .PostgresSwitch1 (db , routineId , localDBConfig )
472+ localCounterQPS += load .PostgresSwitch1 (db , routineId , localDBConfig )
437473 }
438474
439475 if localDBConfig ["switch2" ] == "true" {
440- load .PostgresSwitch2 (db , routineId , localDBConfig )
476+ localCounterQPS += load .PostgresSwitch2 (db , routineId , localDBConfig )
441477 }
442478
443479 if localDBConfig ["switch3" ] == "true" {
444- load .PostgresSwitch3 (db , routineId , localDBConfig )
480+ localCounterQPS += load .PostgresSwitch3 (db , routineId , localDBConfig )
445481 }
446482
447483 if localDBConfig ["switch4" ] == "true" {
448- load .PostgresSwitch4 (db , routineId , localDBConfig )
484+ localCounterQPS += load .PostgresSwitch4 (db , routineId , localDBConfig )
485+ }
486+
487+ // Increase local RPS counter
488+ localCounterRPS ++
489+
490+ // Update global counters every 5 seconds
491+ if time .Since (lastCounterUpdate ) > 2 * time .Second {
492+ muRPS .Lock ()
493+ countersRPS [dbConfig ["id" ]] += localCounterRPS
494+ muRPS .Unlock ()
495+
496+ muQPS .Lock ()
497+ countersQPS [dbConfig ["id" ]] += localCounterQPS
498+ muQPS .Unlock ()
499+
500+ localCounterRPS = 0
501+ localCounterQPS = 0
502+ lastCounterUpdate = time .Now ()
449503 }
450504
451505 sleepDuration , err := strconv .Atoi (localDBConfig ["sleep" ])
@@ -456,6 +510,7 @@ func runPostgreSQL(ctx context.Context, routineId int, dbConfig map[string]strin
456510 }
457511}
458512
513+ // runMongoDB runs the MongoDB operations for a specific connection
459514func runMongoDB (ctx context.Context , routineId int , dbConfig map [string ]string ) {
460515 connectionString := dbConfig ["connectionString" ]
461516
@@ -480,6 +535,13 @@ func runMongoDB(ctx context.Context, routineId int, dbConfig map[string]string)
480535 localDBConfig [k ] = v
481536 }
482537
538+ // Local counters for this goroutine
539+ localCounterRPS := 0
540+ localCounterQPS := 0
541+
542+ // Variable to store the time of the last counter update
543+ lastCounterUpdate := time .Now ()
544+
483545 for {
484546 select {
485547 case <- ctx .Done ():
@@ -497,22 +559,40 @@ func runMongoDB(ctx context.Context, routineId int, dbConfig map[string]string)
497559
498560 lastUpdate = time .Now ()
499561 }
500- // log.Printf("MongoDB: goroutine: %d: id: %s in progress. Switches: %s, %s, %s, %s, Sleep: %s", routineId+1, dbConfig["id"], updatedDBConfig["switch1"], updatedDBConfig["switch2"], updatedDBConfig["switch3"], updatedDBConfig["switch4"], updatedDBConfig["sleep"])
501562
563+ // Execute database operations and count queries
502564 if localDBConfig ["switch1" ] == "true" {
503- load .MongoDBSwitch1 (client , db , routineId , localDBConfig )
565+ localCounterQPS += load .MongoDBSwitch1 (client , db , routineId , localDBConfig )
504566 }
505567
506568 if localDBConfig ["switch2" ] == "true" {
507- load .MongoDBSwitch2 (client , db , routineId , localDBConfig )
569+ localCounterQPS += load .MongoDBSwitch2 (client , db , routineId , localDBConfig )
508570 }
509571
510572 if localDBConfig ["switch3" ] == "true" {
511- load .MongoDBSwitch3 (client , db , routineId , localDBConfig )
573+ localCounterQPS += load .MongoDBSwitch3 (client , db , routineId , localDBConfig )
512574 }
513575
514576 if localDBConfig ["switch4" ] == "true" {
515- load .MongoDBSwitch4 (client , db , routineId , localDBConfig )
577+ localCounterQPS += load .MongoDBSwitch4 (client , db , routineId , localDBConfig )
578+ }
579+
580+ // Increase local RPS counter
581+ localCounterRPS ++
582+
583+ // Update global counters every 2 seconds
584+ if time .Since (lastCounterUpdate ) > 2 * time .Second {
585+ muRPS .Lock ()
586+ countersRPS [dbConfig ["id" ]] += localCounterRPS
587+ muRPS .Unlock ()
588+
589+ muQPS .Lock ()
590+ countersQPS [dbConfig ["id" ]] += localCounterQPS
591+ muQPS .Unlock ()
592+
593+ localCounterRPS = 0
594+ localCounterQPS = 0
595+ lastCounterUpdate = time .Now ()
516596 }
517597
518598 sleepDuration , err := strconv .Atoi (localDBConfig ["sleep" ])
@@ -705,41 +785,43 @@ func checkConnection(db map[string]string) string {
705785 result = mongodb .CheckMongoDB (connectionString )
706786 }
707787
708- // updateConnectionStatus(dbType, db["id"], result)
709-
710788 return result
711789}
712790
713- // // updateConnectionStatus updates the connection status of a specific database.
714- // func updateConnectionStatus(dbType, id, status string) {
715- // var databasesForProcess []map[string]string
716-
717- // // Lock the mutex only for the time needed to copy the reference to the data
718- // databasesLoadMutex.Lock()
719- // switch dbType {
720- // case "mysql":
721- // databasesForProcess = databasesLoad.MySQL
722- // case "postgres":
723- // databasesForProcess = databasesLoad.Postgres
724- // case "mongodb":
725- // databasesForProcess = databasesLoad.MongoDB
726- // }
727- // databasesLoadMutex.Unlock()
728-
729- // // Update the connection status outside of the mutex lock
730- // for i, db := range databasesForProcess {
731- // if db["id"] == id {
732- // databasesLoadMutex.Lock()
733- // switch dbType {
734- // case "mysql":
735- // databasesLoad.MySQL[i]["connectionStatus"] = status
736- // case "postgres":
737- // databasesLoad.Postgres[i]["connectionStatus"] = status
738- // case "mongodb":
739- // databasesLoad.MongoDB[i]["connectionStatus"] = status
740- // }
741- // databasesLoadMutex.Unlock()
742- // break
743- // }
744- // }
745- // }
791+ // startLoggingCounters starts a goroutine that logs counters every 5 seconds
792+ func startLoggingCounters () {
793+ ctx , cancel := context .WithCancel (context .Background ())
794+ go func () {
795+ defer cancel ()
796+ for {
797+ select {
798+ case <- ctx .Done ():
799+ return
800+ case <- time .After (2 * time .Second ):
801+ logAllCounters ()
802+ }
803+ }
804+ }()
805+ }
806+
807+ // logAllCounters logs the RPS and QPS for each database and resets the counters
808+ func logAllCounters () {
809+ muRPS .Lock ()
810+ defer muRPS .Unlock ()
811+ muQPS .Lock ()
812+ defer muQPS .Unlock ()
813+ for db , count := range countersRPS {
814+ rps := count / 2
815+ qps := countersQPS [db ] / 2
816+ log .Printf ("Database: %s, iterations per second (RPS): %d, queries per second (QPS): %d" , db , rps , qps )
817+
818+ // Save RPS and QPS to Valkey in a separate hash table
819+ err := valkey .AddDatabasePerformance (db , rps , qps )
820+ if err != nil {
821+ log .Printf ("Error saving to Valkey: %s" , err )
822+ }
823+
824+ countersRPS [db ] = 0 // Reset RPS counter after logging
825+ countersQPS [db ] = 0 // Reset QPS counter after logging
826+ }
827+ }
0 commit comments