Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/mertricstest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:

metricstest:
runs-on: ubuntu-latest
container: golang:1.24
container: golang:1.26
needs: branchtest

services:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/statictest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
jobs:
statictest:
runs-on: ubuntu-latest
container: golang:1.24
container: golang:1.26
steps:
- name: Checkout code
uses: actions/checkout@v2
Expand Down
10 changes: 10 additions & 0 deletions cmd/server/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@ func parseFlags(cfg *server.Config, defaultServerAddress string, defaultStoreInt
var restore bool
var databaseDSN string
var key string
var auditFile string
var auditURL string

flag.StringVar(&serverAddress, "a", defaultServerAddress, "address and port to run server")
flag.IntVar(&storeInterval, "i", defaultStoreInterval, "interval to store aggregator data")
flag.StringVar(&fileStoragePath, "f", defaultFileStoragePath, "file path to store aggregator data")
flag.BoolVar(&restore, "r", defaultRestore, "restore aggregator data from file on startup")
flag.StringVar(&databaseDSN, "d", "", "database DSN")
flag.StringVar(&key, "k", "", "key for signature")
flag.StringVar(&auditFile, "audit-file", "", "file to store audit data")
flag.StringVar(&auditURL, "audit-url", "", "url to send audit data")

flag.Parse()

Expand All @@ -44,4 +48,10 @@ func parseFlags(cfg *server.Config, defaultServerAddress string, defaultStoreInt
if cfg.Key == "" {
cfg.Key = key
}
if cfg.AuditFile == "" {
cfg.AuditFile = auditFile
}
if cfg.AuditURL == "" {
cfg.AuditURL = auditURL
}
}
27 changes: 21 additions & 6 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@ package main
import (
"context"
"database/sql"
"log"
"net/http"
"time"

"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/iliaonishchenko/aggreg8"
"github.com/iliaonishchenko/aggreg8/internal/audit"
"github.com/iliaonishchenko/aggreg8/internal/config/server"
"github.com/iliaonishchenko/aggreg8/internal/handler"
"github.com/iliaonishchenko/aggreg8/internal/logger"
Expand All @@ -19,9 +24,6 @@ import (
"github.com/iliaonishchenko/aggreg8/internal/signature"
_ "github.com/jackc/pgx/v5/stdlib"
"go.uber.org/zap"
"log"
"net/http"
"time"
)

func initStorage(cfg *server.Config) (service.MetricStorage, *repository.MetricsRepository, context.CancelFunc) {
Expand Down Expand Up @@ -68,6 +70,18 @@ func initStorage(cfg *server.Config) (service.MetricStorage, *repository.Metrics
return storage, repo, cancelFunc
}

func initNotifier(cfg *server.Config) audit.Notifier {
notifier := audit.NewNotifier()

if cfg.AuditFile != "" {
notifier.Register(audit.NewFileObserver(cfg.AuditFile))
}
if cfg.AuditURL != "" {
notifier.Register(audit.NewHTTPObserver(cfg.AuditURL, http.Client{}))
}
return notifier
}

func main() {
defaultServerAddress := "localhost:8080"
defaultStoreInterval := 300
Expand All @@ -87,20 +101,21 @@ func main() {
}

storage, repo, cancelFunc := initStorage(cfg)
notifier := initNotifier(cfg)

if err := run(*cfg, storage, repo); err != nil {
if err := run(*cfg, storage, repo, notifier); err != nil {
if cancelFunc != nil {
cancelFunc()
}
logger.Log.Fatal("error starting server", zap.Error(err))
}
}

func run(cfg server.Config, storage service.MetricStorage, repo *repository.MetricsRepository) error {
func run(cfg server.Config, storage service.MetricStorage, repo *repository.MetricsRepository, notifier audit.Notifier) error {

r := chi.NewRouter()

updateHandler := handler.NewUpdateHandler(storage)
updateHandler := handler.NewUpdateHandler(storage, notifier)
allHandler := handler.NewAllMetricsHandler(storage)
getMetricHandler := handler.NewGetMetricHandler(storage)
pingHandler := handler.NewPingHandler(repo)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/iliaonishchenko/aggreg8

go 1.24.10
go 1.26

require (
github.com/caarlos0/env/v11 v11.3.1
Expand Down
11 changes: 11 additions & 0 deletions internal/audit/audit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package audit

type AuditEvent struct {
TS int64 `json:"ts"`
Metrics []string `json:"metrics"`
IPAddress string `json:"ip_address"`
}

type Observer interface {
Notify(event AuditEvent) error
}
41 changes: 41 additions & 0 deletions internal/audit/file_observer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package audit

import (
"encoding/json"
"fmt"
"os"
"sync"
)

type FileObserver struct {
filePath string
mu sync.Mutex
}

func NewFileObserver(filePath string) *FileObserver {
return &FileObserver{filePath: filePath}
}

func (f *FileObserver) Notify(event AuditEvent) error {
f.mu.Lock()
defer f.mu.Unlock()

data, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("could not marshal audit event to JSON: %w", err)
}

file, err := os.OpenFile(f.filePath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
return fmt.Errorf("could not open audit file: %w", err)
}
defer file.Close()

data = append(data, '\n')
_, err = file.Write(data)
if err != nil {
return fmt.Errorf("could not write audit event to file: %w", err)
}

return nil
}
55 changes: 55 additions & 0 deletions internal/audit/file_observer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package audit

import (
"encoding/json"
"os"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestFileObserver_Notify(t *testing.T) {
tmpFile, err := os.CreateTemp("", "audit_test_*.log")
require.NoError(t, err)
defer os.Remove(tmpFile.Name())
tmpFile.Close()

observer := NewFileObserver(tmpFile.Name())

event1 := AuditEvent{
TS: 1000,
Metrics: []string{"Alloc", "Frees"},
IPAddress: "192.168.0.1",
}
event2 := AuditEvent{
TS: 2000,
Metrics: []string{"HeapSys"},
IPAddress: "10.0.0.1",
}

require.NoError(t, observer.Notify(event1))
require.NoError(t, observer.Notify(event2))

data, err := os.ReadFile(tmpFile.Name())
require.NoError(t, err)

lines := strings.Split(strings.TrimSpace(string(data)), "\n")
assert.Len(t, lines, 2)

var got1 AuditEvent
require.NoError(t, json.Unmarshal([]byte(lines[0]), &got1))
assert.Equal(t, event1, got1)

var got2 AuditEvent
require.NoError(t, json.Unmarshal([]byte(lines[1]), &got2))
assert.Equal(t, event2, got2)
}

func TestFileObserver_Notify_InvalidPath(t *testing.T) {
observer := NewFileObserver("/nonexistent/dir/audit.log")

err := observer.Notify(AuditEvent{TS: 1000, Metrics: []string{"Alloc"}, IPAddress: "127.0.0.1"})
assert.Error(t, err)
}
36 changes: 36 additions & 0 deletions internal/audit/http_observer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package audit

import (
"bytes"
"encoding/json"
"fmt"
"net/http"
)

type HTTPObserver struct {
url string
client http.Client
}

func NewHTTPObserver(url string, client http.Client) *HTTPObserver {
return &HTTPObserver{
url: url,
client: client,
}
}

func (o *HTTPObserver) Notify(event AuditEvent) error {
buf := new(bytes.Buffer)
enc := json.NewEncoder(buf)
if err := enc.Encode(event); err != nil {
return fmt.Errorf("could not marshal audit event to JSON: %w", err)
}

resp, err := o.client.Post(o.url, "application/json", buf)
if err != nil {
return fmt.Errorf("could not POST audit event: %w", err)
}
defer resp.Body.Close()

return nil
}
46 changes: 46 additions & 0 deletions internal/audit/http_observer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package audit

import (
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestHTTPObserver_Notify(t *testing.T) {
var receivedEvent AuditEvent
var receivedContentType string

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
receivedContentType = r.Header.Get("Content-Type")
body, _ := io.ReadAll(r.Body)
json.Unmarshal(body, &receivedEvent)
w.WriteHeader(http.StatusOK)
}))
defer server.Close()

observer := NewHTTPObserver(server.URL, http.Client{})

event := AuditEvent{
TS: 12345678,
Metrics: []string{"Alloc", "Frees"},
IPAddress: "192.168.0.42",
}

err := observer.Notify(event)
require.NoError(t, err)

assert.Equal(t, "application/json", receivedContentType)
assert.Equal(t, event, receivedEvent)
}

func TestHTTPObserver_Notify_ServerUnavailable(t *testing.T) {
observer := NewHTTPObserver("http://127.0.0.1:1", http.Client{})

err := observer.Notify(AuditEvent{TS: 1000, Metrics: []string{"Alloc"}, IPAddress: "127.0.0.1"})
assert.Error(t, err)
}
49 changes: 49 additions & 0 deletions internal/audit/mocks/audit_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 31 additions & 0 deletions internal/audit/notifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package audit

type Notifier interface {
NotifyAll(event AuditEvent) []error
}

type NotifierService struct {
observers []Observer
}

func NewNotifier() *NotifierService {
return &NotifierService{
observers: make([]Observer, 0),
}
}

func (n *NotifierService) Register(observer Observer) {
n.observers = append(n.observers, observer)
}

func (n *NotifierService) NotifyAll(event AuditEvent) []error {
errors := make([]error, 0)

for _, observer := range n.observers {
err := observer.Notify(event)
if err != nil {
errors = append(errors, err)
}
}
return errors
}
Loading
Loading