diff --git a/libvuln/jsonblob/jsonblob.go b/libvuln/jsonblob/jsonblob.go index ff07a500e..489deb3dd 100644 --- a/libvuln/jsonblob/jsonblob.go +++ b/libvuln/jsonblob/jsonblob.go @@ -45,85 +45,43 @@ type Store struct { } // Load reads in all the records serialized in the provided [io.Reader]. -func Load(ctx context.Context, r io.Reader) (*Loader, error) { - l := Loader{ - dec: json.NewDecoder(r), - cur: uuid.Nil, +func Load(ctx context.Context, r io.Reader) (*Store, error) { + s, err := New() + if err != nil { + return nil, err } - return &l, nil -} - -// Loader is an iterator that returns a series of [Entry]. -// -// Users should call [*Loader.Next] until it reports false, then check for -// errors via [*Loader.Err]. -type Loader struct { - err error - e *Entry - - dec *json.Decoder - next *Entry - de diskEntry - cur uuid.UUID -} -// Next reports whether there's an [Entry] to be processed. -func (l *Loader) Next() bool { - if l.err != nil { - return false + l, err := NewLoader(r) + if err != nil { + return nil, err } - for l.err = l.dec.Decode(&l.de); l.err == nil; l.err = l.dec.Decode(&l.de) { - id := l.de.Ref - // If we just hit a new Entry, promote the current one. - if id != l.cur { - l.e = l.next - l.next = &Entry{} - l.next.Updater = l.de.Updater - l.next.Fingerprint = l.de.Fingerprint - l.next.Date = l.de.Date + // TODO(DO NOT MERGE): ~~This implementation might be a bit naive. Currently, + // it basically copies [OfflineImport]. We could probably do some custom + // parsing such that it basically just decodes the json into the + // appropriate [Store] fields.~~ + // Actually, this might be the way. + // [OfflineImport]: https://github.com/quay/claircore/blob/126f688bb11220fb34708719be91952dc32ff7b1/libvuln/updates.go#L17-L74 + for l.Next() { + if err := ctx.Err(); err != nil { + return nil, err } - switch l.de.Kind { - case driver.VulnerabilityKind: - vuln := claircore.Vulnerability{} - if err := json.Unmarshal(l.de.Vuln.buf, &vuln); err != nil { - l.err = err - return false + e := l.Entry() + if e.Enrichment != nil { + if _, err = s.UpdateEnrichments(ctx, e.Updater, e.Fingerprint, e.Enrichment); err != nil { + return nil, fmt.Errorf("updating enrichements: %w", err) } - l.next.Vuln = append(l.next.Vuln, &vuln) - case driver.EnrichmentKind: - en := driver.EnrichmentRecord{} - if err := json.Unmarshal(l.de.Enrichment.buf, &en); err != nil { - l.err = err - return false - } - l.next.Enrichment = append(l.next.Enrichment, en) } - // If this was an initial diskEntry, promote the ref. - if id != l.cur { - l.cur = id - // If we have an Entry ready, report that. - if l.e != nil { - return true + if e.Vuln != nil { + if _, err = s.UpdateVulnerabilities(ctx, e.Updater, e.Fingerprint, e.Vuln); err != nil { + return nil, fmt.Errorf("updating vulnerabilities: %w", err) } } } - l.e = l.next - return true -} - -// Entry returns the latest loaded [Entry]. -func (l *Loader) Entry() *Entry { - return l.e -} - -// Err is the latest encountered error. -func (l *Loader) Err() error { - // Don't report EOF as an error. - if errors.Is(l.err, io.EOF) { - return nil + if err := l.Err(); err != nil { + return nil, err } - return l.err + return s, nil } // Store writes out the contents of the receiver to the provided [io.Writer]. @@ -257,8 +215,6 @@ type diskEntry struct { // // It is unsafe for modification because it does not return a copy of the map. func (s *Store) Entries() map[uuid.UUID]*Entry { - // BUG(hank) [Store.Entries] reports seemingly-empty entries when populated - // via [Store.UpdateVulnerabilities]. s.RLock() defer s.RUnlock() return s.entry @@ -284,6 +240,7 @@ func (s *Store) UpdateVulnerabilities(ctx context.Context, updater string, finge } e := Entry{ + Vuln: vulns, vulns: buf, vulnCt: len(vulns), } @@ -413,6 +370,7 @@ func (s *Store) UpdateEnrichments(ctx context.Context, kind string, fp driver.Fi } e := Entry{ + Enrichment: es, enrichments: buf, enrichmentCt: len(es), } diff --git a/libvuln/jsonblob/jsonblob_test.go b/libvuln/jsonblob/jsonblob_test.go index 17f2061c9..fa58cc3f8 100644 --- a/libvuln/jsonblob/jsonblob_test.go +++ b/libvuln/jsonblob/jsonblob_test.go @@ -3,14 +3,14 @@ package jsonblob import ( "bytes" "context" - "io" - "testing" - + "fmt" "github.com/google/go-cmp/cmp" - "golang.org/x/sync/errgroup" - "github.com/quay/claircore" "github.com/quay/claircore/libvuln/driver" + "golang.org/x/sync/errgroup" + "io" + "testing" + "github.com/quay/claircore/test" ) @@ -69,24 +69,13 @@ func TestRoundtrip(t *testing.T) { eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { defer w.Close(); return a.Store(w) }) eg.Go(func() error { - l, err := Load(ctx, io.TeeReader(r, &buf)) + s, err := Load(ctx, io.TeeReader(r, &buf)) if err != nil { - return err - } - for l.Next() { - e := l.Entry() - if e.Vuln != nil && e.Enrichment != nil { - t.Error("expecting entry to have either vulnerability or enrichment, got both") - } - if e.Vuln != nil { - got.V = append(got.V, l.Entry().Vuln...) - } - if e.Enrichment != nil { - got.E = append(got.E, l.Entry().Enrichment...) - } + return fmt.Errorf("failed to load jsonblob: %w", err) } - if err := l.Err(); err != nil { - return err + for _, e := range s.Entries() { + got.V = append(got.V, e.Vuln...) + got.E = append(got.E, e.Enrichment...) } return nil }) diff --git a/libvuln/jsonblob/loader.go b/libvuln/jsonblob/loader.go new file mode 100644 index 000000000..d2fe88f2a --- /dev/null +++ b/libvuln/jsonblob/loader.go @@ -0,0 +1,93 @@ +package jsonblob + +import ( + "encoding/json" + "errors" + "github.com/google/uuid" + "io" + + "github.com/quay/claircore" + "github.com/quay/claircore/libvuln/driver" +) + +// NewLoader creates a new Loader from the provided [io.Reader]. +func NewLoader(r io.Reader) (*Loader, error) { + l := Loader{ + dec: json.NewDecoder(r), + cur: uuid.Nil, + } + return &l, nil +} + +// Loader is an iterator that returns a series of [Entry]. +// +// Users should call [*Loader.Next] until it reports false, then check for +// errors via [*Loader.Err]. +type Loader struct { + err error + e *Entry + + dec *json.Decoder + next *Entry + de diskEntry + cur uuid.UUID +} + +// Next reports whether there's an [Entry] to be processed. +func (l *Loader) Next() bool { + if l.err != nil { + return false + } + + for l.err = l.dec.Decode(&l.de); l.err == nil; l.err = l.dec.Decode(&l.de) { + id := l.de.Ref + // If we just hit a new Entry, promote the current one. + if id != l.cur { + l.e = l.next + l.next = &Entry{} + l.next.Updater = l.de.Updater + l.next.Fingerprint = l.de.Fingerprint + l.next.Date = l.de.Date + } + switch l.de.Kind { + case driver.VulnerabilityKind: + vuln := claircore.Vulnerability{} + if err := json.Unmarshal(l.de.Vuln.buf, &vuln); err != nil { + l.err = err + return false + } + l.next.Vuln = append(l.next.Vuln, &vuln) + case driver.EnrichmentKind: + en := driver.EnrichmentRecord{} + if err := json.Unmarshal(l.de.Enrichment.buf, &en); err != nil { + l.err = err + return false + } + l.next.Enrichment = append(l.next.Enrichment, en) + } + // If this was an initial diskEntry, promote the ref. + if id != l.cur { + l.cur = id + // If we have an Entry ready, report that. + if l.e != nil { + return true + } + } + } + l.e = l.next + return true +} + +// Entry returns the latest loaded [Entry]. +func (l *Loader) Entry() *Entry { + return l.e +} + +// Err is the latest encountered error. +func (l *Loader) Err() error { + // Don't report EOF as an error. + if errors.Is(l.err, io.EOF) { + return nil + } + return l.err +} diff --git a/libvuln/jsonblob/loader_test.go b/libvuln/jsonblob/loader_test.go new file mode 100644 index 000000000..e04adeddd --- /dev/null +++ b/libvuln/jsonblob/loader_test.go @@ -0,0 +1,78 @@ +package jsonblob + +import ( + "bytes" + "context" + "io" + "testing" + + "github.com/google/go-cmp/cmp" + "golang.org/x/sync/errgroup" + + "github.com/quay/claircore" + "github.com/quay/claircore/libvuln/driver" + "github.com/quay/claircore/test" +) + +func TestLoader(t *testing.T) { + ctx := context.Background() + a, err := New() + if err != nil { + t.Fatal(err) + } + + var want, got struct { + V []*claircore.Vulnerability + E []driver.EnrichmentRecord + } + + want.V = test.GenUniqueVulnerabilities(10, "test") + ref, err := a.UpdateVulnerabilities(ctx, "test", "", want.V) + if err != nil { + t.Error(err) + } + t.Logf("ref: %v", ref) + + want.E = test.GenEnrichments(15) + ref, err = a.UpdateEnrichments(ctx, "test", "", want.E) + if err != nil { + t.Error(err) + } + t.Logf("ref: %v", ref) + + var buf bytes.Buffer + defer func() { + t.Logf("wrote:\n%s", buf.String()) + }() + r, w := io.Pipe() + eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { defer w.Close(); return a.Store(w) }) + eg.Go(func() error { + l, err := NewLoader(io.TeeReader(r, &buf)) + if err != nil { + return err + } + for l.Next() { + e := l.Entry() + if e.Vuln != nil && e.Enrichment != nil { + t.Error("expecting entry to have either vulnerability or enrichment, got both") + } + if e.Vuln != nil { + got.V = append(got.V, l.Entry().Vuln...) + } + if e.Enrichment != nil { + got.E = append(got.E, l.Entry().Enrichment...) + } + } + if err := l.Err(); err != nil { + return err + } + return nil + }) + if err := eg.Wait(); err != nil { + t.Error(err) + } + if !cmp.Equal(got, want) { + t.Error(cmp.Diff(got, want)) + } +} diff --git a/libvuln/updates.go b/libvuln/updates.go index 49d6d4639..bde04f435 100644 --- a/libvuln/updates.go +++ b/libvuln/updates.go @@ -26,7 +26,7 @@ func OfflineImport(ctx context.Context, pool *pgxpool.Pool, in io.Reader) error ctx = zlog.ContextWithValues(ctx, "component", "libvuln/OfflineImporter") s := postgres.NewMatcherStore(pool) - l, err := jsonblob.Load(ctx, in) + l, err := jsonblob.NewLoader(in) if err != nil { return err }