Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,14 @@ $ manage.py test tests.test_additional_data_tsgorgtype

_see `manage.py test --help` for more info_

### Updating the test data fixture

Note that the OrgInfoCache entries for the test funders/recipients also needs to be included in the test data fixture.

```shell
./manage.py dumpdata --output db/fixtures/test_data.json db additional_data.OrgInfoCache
```

# Updating requirements

We target python3.12 for our requirements.
Expand Down
4 changes: 2 additions & 2 deletions datastore/api/dashboard/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class Publishers(generics.ListAPIView):
]

def get_queryset(self):
return db.Publisher.objects.filter(getter_run=db.GetterRun.latest())
return db.Publisher.objects.all()


class Publisher(generics.RetrieveAPIView):
Expand All @@ -56,7 +56,7 @@ class Publisher(generics.RetrieveAPIView):
serializer_class = serializers.PublisherSerializer

def get_queryset(self):
return db.Publisher.objects.filter(getter_run=db.GetterRun.latest())
return db.Publisher.objects.all()


class Sources(generics.ListAPIView):
Expand Down
5 changes: 1 addition & 4 deletions datastore/api/org/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ def get_queryset(self):
return (
# Empty order_by to cancel default sort
db.Publisher.objects.order_by()
.filter(getter_run__in=db.GetterRun.objects.in_use())
.values(*fields)
.union(db.Funder.objects.all().values(*fields))
.union(db.Recipient.objects.all().values(*fields))
Expand Down Expand Up @@ -77,9 +76,7 @@ def get_object(self):
recipient_queryset = self.filter_queryset(db.Recipient.objects.all())
publisher_queryset = self.filter_queryset(
# Empty order_by to cancel default sort
db.Publisher.objects.order_by().filter(
getter_run__in=db.GetterRun.objects.in_use()
)
db.Publisher.objects.order_by().all()
)

try:
Expand Down
17 changes: 7 additions & 10 deletions datastore/api/org/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from django.db.models.query import QuerySet, Q

import db.models as db
from db.models import Publisher


@dataclass
Expand Down Expand Up @@ -62,9 +63,7 @@ def exists(

if publisher_queryset is None:
# Empty order_by to cancel default sort
publisher_queryset = db.Publisher.objects.order_by().filter(
getter_run__in=db.GetterRun.objects.in_use()
)
publisher_queryset = db.Publisher.objects.order_by()

id_query = Q(org_id=org_id) | Q(non_primary_org_ids__contains=[org_id])

Expand Down Expand Up @@ -102,9 +101,7 @@ def get(

if publisher_queryset is None:
# Empty order_by to cancel default sort
publisher_queryset = db.Publisher.objects.order_by().filter(
getter_run__in=db.GetterRun.objects.in_use()
)
publisher_queryset = db.Publisher.objects.order_by().all()

name = None
primary_org_id = org_id
Expand Down Expand Up @@ -150,13 +147,13 @@ def get(

# is org a Publisher?
try:
publisher = publisher_queryset.filter(org_id=org_id).order_by(
"-getter_run__datetime"
)[0]
publisher = Publisher.get_most_recent(
org_id=org_id, queryset=publisher_queryset
)
name = publisher.name
# Publishers take precedence over Funders / Recipients when it comes to primary vs non-primary ID priority
primary_org_id = publisher.org_id
except IndexError:
except Publisher.DoesNotExist:
publisher = None

if funder is None and recipient is None and publisher is None:
Expand Down
167 changes: 95 additions & 72 deletions datastore/data_quality/management/commands/rewrite_quality_data.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import Optional, Literal
from django.core.management.base import BaseCommand
from django.core.cache import cache
from django.db import connection
Expand Down Expand Up @@ -54,97 +55,119 @@ def add_arguments(self, parser):
"--threads",
type=int,
default=8,
help="Number of threads to use for processing quality data",
help="Number of threads to use for processing quality data. Set to 0 to disable threading.",
)

def handle(self, *args, **options):
if "latest" in options["getter_run"]:
source_files = db.Latest.objects.get(
series=db.Latest.CURRENT
).sourcefile_set.all()
else:
source_files = db.SourceFile.objects.filter(
getter_run=options["getter_run"]
)
getter_run: str = options["getter_run"]
publisher_only: bool = options["publisher_only"]
sourcefile_only: bool = options["sourcefile_only"]
publisher_prefix: str | None = options.get("publisher")
threads: int = options["threads"]

rewrite_quality_data(
getter_run=getter_run,
publisher_only=publisher_only,
sourcefile_only=sourcefile_only,
publisher_prefix=publisher_prefix,
threads=threads,
)

if options.get("publisher"):
source_files = source_files.filter(
data__publisher__prefix=options["publisher"]
)

publisher_objs_for_update = []
sourcefile_objs_for_update = []

if not options["publisher_only"]:
print("Processing sourcefile data")
process_sf_list = []
for source_file in source_files:
process_sf_list.append(
{
"pk": source_file.pk,
"grants": list(
source_file.grant_set.values_list("data", flat=True)
),
}
)
def rewrite_quality_data(
getter_run: str | Literal["latest"] = "latest",
publisher_only: bool = False,
sourcefile_only: bool = False,
publisher_prefix: Optional[str] = None,
threads: int = 0,
):
if getter_run == "latest":
source_files = db.Latest.objects.get(
series=db.Latest.CURRENT
).sourcefile_set.all()
else:
source_files = db.SourceFile.objects.filter(getter_run=getter_run)

if publisher_prefix:
source_files = source_files.filter(data__publisher__prefix=publisher_prefix)

publisher_objs_for_update = []
sourcefile_objs_for_update = []

if not publisher_only:
print("Processing sourcefile quality data")
process_sf_list = []
for source_file in source_files:
process_sf_list.append(
{
"pk": source_file.pk,
"grants": list(
source_file.grant_set.values_list("data", flat=True)
),
}
)

with Pool(options.get("threads")) as process_pool:
if not threads:
for sf_ in process_sf_list:
process_source_file(sf_)
else:
with Pool(threads) as process_pool:
try:
source_file_results = process_pool.map(
process_source_file, process_sf_list
)
except Exception as e:
print(f"Error generating quality data {e}")

for source_file_result in source_file_results:
if source_file_result == None:
continue
for source_file_result in source_file_results:
if source_file_result is None:
continue

sf = db.SourceFile.objects.get(pk=source_file_result["pk"])
sf.quality = source_file_result["quality"]
sf.aggregate = source_file_result["aggregate"]
sourcefile_objs_for_update.append(sf)

db.SourceFile.objects.bulk_update(
sourcefile_objs_for_update, ["quality", "aggregate"], batch_size=10000
)
sf = db.SourceFile.objects.get(pk=source_file_result["pk"])
sf.quality = source_file_result["quality"]
sf.aggregate = source_file_result["aggregate"]
sourcefile_objs_for_update.append(sf)

def process_publishers(source_file):
"""Updates the publisher data with aggregates and quality data relating to their source files"""

# We want to store the quality and aggregate data against the latest version of the publisher
# object rather than the version from the getter_run that this source file came from
# This is so that when we serialise the latest publishers we get the latest aggregate and
# quality data regardless of when the source file entered the system.
publisher = db.Publisher.objects.get(
getter_run=db.GetterRun.latest(),
prefix=source_file.data["publisher"]["prefix"],
)
db.SourceFile.objects.bulk_update(
sourcefile_objs_for_update, ["quality", "aggregate"], batch_size=10000
)

print(publisher)

try:
(
publisher.quality,
publisher.aggregate,
) = quality_data.create_publisher_stats(publisher)
publisher_objs_for_update.append(publisher)
except Exception as e:
print("Could not create publisher quality data for %s" % str(publisher))
print(e)
connection.close()

if not options["sourcefile_only"]:
print("Processing publisher data")
with dummy.Pool(4) as process_pool:
def process_publishers(source_file_: db.SourceFile):
"""Updates the publisher data with aggregates and quality data relating to their source files"""
publisher = db.Publisher.objects.get(
prefix=source_file_.data["publisher"]["prefix"]
)
print(f"Processing Publisher Quality for {publisher.prefix}")

try:
(
publisher.quality,
publisher.aggregate,
) = quality_data.create_publisher_stats(publisher)
publisher_objs_for_update.append(publisher)
except Exception as e:
print("Could not create publisher quality data for %s" % str(publisher))
print(e)
if threads > 0:
connection.close() # ????

if not sourcefile_only:
print(
f"Processing publisher quality data ({source_files.distinct('data__publisher__prefix').count()})"
)
if not threads:
for sf_ in source_files.distinct("data__publisher__prefix"):
process_publishers(sf_)
else:
with dummy.Pool(threads) as process_pool:
process_pool.starmap(
process_publishers,
zip(source_files.distinct("data__publisher__prefix")),
)

db.Publisher.objects.bulk_update(
publisher_objs_for_update, ["quality", "aggregate"], batch_size=10000
)
db.Publisher.objects.bulk_update(
publisher_objs_for_update, ["quality", "aggregate"], batch_size=10000
)

# Clear all caches - data has changed
cache.clear()
# Clear all caches - data has changed
cache.clear()
2 changes: 1 addition & 1 deletion datastore/data_quality/quality_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ def get_publisher_quality_grants(self):
def get_pc_quality_publishers(self):
ret = {}

publishers = db.Publisher.objects.filter(getter_run=db.GetterRun.latest())
publishers = db.Publisher.objects.all()

total_publishers_all = publishers.count()

Expand Down
2 changes: 1 addition & 1 deletion datastore/db/fixtures/test_data.json

Large diffs are not rendered by default.

11 changes: 0 additions & 11 deletions datastore/db/management/commands/load_data_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,6 @@ def load_data(self):
getter_run = db.GetterRun.objects.create()

for ob in dataset:
prefix = ob["publisher"]["prefix"]
publisher, c = db.Publisher.objects.get_or_create(
getter_run=getter_run,
prefix=prefix,
data=ob["publisher"],
org_id=ob["publisher"].get("org_id", "unknown"),
name=ob["publisher"]["name"],
source=db.Entity.PUBLISHER,
)

source_file = db.SourceFile.objects.create(data=ob, getter_run=getter_run)

grant_data = self.load_grant_data(ob["datagetter_metadata"]["json"])
Expand All @@ -49,7 +39,6 @@ def load_data(self):
grant_bulk_insert.append(
db.Grant.from_data(
source_file=source_file,
publisher=publisher,
data=grant,
additional_data=additional_data,
getter_run=getter_run,
Expand Down
15 changes: 2 additions & 13 deletions datastore/db/management/commands/load_datagetter_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,16 +138,6 @@ def load_data(self):
getter_run = db.GetterRun.objects.create()

for ob in dataset:
prefix = ob["publisher"]["prefix"]
publisher, c = db.Publisher.objects.get_or_create(
getter_run=getter_run,
prefix=prefix,
data=ob["publisher"],
org_id=ob["publisher"].get("org_id", "unknown"),
name=ob["publisher"]["name"],
source=db.Entity.PUBLISHER,
)

source_file = db.SourceFile.objects.create(data=ob, getter_run=getter_run)

try:
Expand Down Expand Up @@ -176,7 +166,6 @@ def load_data(self):
grant_bulk_insert.append(
db.Grant.from_data(
source_file=source_file,
publisher=publisher,
data=grant,
additional_data=additional_data,
getter_run=getter_run,
Expand Down Expand Up @@ -215,10 +204,10 @@ def handle(self, *args, **options):
print("Updating Latest", file=self.stdout)
db.Latest.update()

# Update entities data for publisher, funders and recipients
call_command("manage_entities_data", "--update")
print("Updating quality data", file=self.stdout)
call_command("rewrite_quality_data", "latest")
# Update entities data for funders and recipients
call_command("manage_entities_data", "--update")

# Clear all cached objects - The latest data as well as new data has been added
cache.clear()
Loading