Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 44 additions & 18 deletions app/Console/Commands/SubmissionsAutoProcess/ClassifyAuto.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class ClassifyAuto extends Command
/**
* The name and signature of the console command.
*/
protected $signature = 'coconut:npclassify {collection_id : The ID of the collection to process}';
protected $signature = 'coconut:npclassify {collection_id? : The ID of the collection to process} {--all : Process all collections}';

/**
* The console command description.
Expand All @@ -30,40 +30,66 @@ public function handle()
{
$collection_id = $this->argument('collection_id');

$collection = Collection::find($collection_id);
if (! $collection) {
Log::error("Collection with ID {$collection_id} not found.");
if (! $collection_id && ! $this->option('all')) {
Log::error('Please specify either a collection_id or use --all flag');

return 1;
}

Log::info("Classifying molecules using NPClassifier for collection ID: {$collection_id}");
if ($collection_id !== null) {
$collection = Collection::find($collection_id);
if (! $collection) {
Log::error("Collection with ID {$collection_id} not found.");

return 1;
}
}

$collectionLabel = $collection_id !== null ? "collection ID: {$collection_id}" : 'all collections';

Log::info("Classifying molecules using NPClassifier for {$collectionLabel}");

// Use raw query to avoid ambiguous column issues
$sql = '
SELECT DISTINCT molecules.id, molecules.canonical_smiles
FROM molecules
INNER JOIN entries ON entries.molecule_id = molecules.id
INNER JOIN properties ON properties.molecule_id = molecules.id
$conditions = '
WHERE molecules.active = true
AND properties.np_classifier_pathway IS NULL
AND properties.np_classifier_superclass IS NULL
AND properties.np_classifier_class IS NULL
AND properties.np_classifier_is_glycoside IS NULL
';

$bindings = [];
if ($collection_id !== null) {
$conditions = '
WHERE entries.collection_id = ?
AND molecules.active = true
AND properties.np_classifier_pathway IS NULL
AND properties.np_classifier_superclass IS NULL
AND properties.np_classifier_class IS NULL
AND properties.np_classifier_is_glycoside IS NULL
';
$bindings[] = $collection_id;
}

$sql = '
SELECT DISTINCT molecules.id, molecules.canonical_smiles
FROM molecules
INNER JOIN entries ON entries.molecule_id = molecules.id
INNER JOIN properties ON properties.molecule_id = molecules.id
'.$conditions.'
ORDER BY molecules.id
';

$molecules = DB::select($sql, [$collection_id]);
$molecules = DB::select($sql, $bindings);

$totalCount = count($molecules);
if ($totalCount === 0) {
Log::info("No molecules found to classify in collection {$collection_id}.");
Log::info("No molecules found to classify in {$collectionLabel}.");

return 0;
}

Log::info("Starting NPClassifier for {$totalCount} molecules in collection {$collection_id}");
Log::info("Starting NPClassifier for {$totalCount} molecules in {$collectionLabel}");

// Chunk the results manually
$chunks = array_chunk($molecules, 1000);
Expand All @@ -72,23 +98,23 @@ public function handle()
$moleculeIds = array_map(fn ($row) => $row->id, $chunk);
$moleculeCount = count($moleculeIds);

Log::info("Processing batch of {$moleculeCount} molecules for classification in collection {$collection_id}");
Log::info("Processing batch of {$moleculeCount} molecules for classification in {$collectionLabel}");

$batchJobs = [];
$batchJobs[] = new ClassifyMoleculeBatch($moleculeIds);

Bus::batch($batchJobs)
->catch(function (Batch $batch, Throwable $e) use ($collection_id) {
Log::error("NPClassifier batch failed for collection {$collection_id}: ".$e->getMessage());
->catch(function (Batch $batch, Throwable $e) use ($collectionLabel) {
Log::error("NPClassifier batch failed for {$collectionLabel}: ".$e->getMessage());
})
->name("NPClassifier Batch Auto Collection {$collection_id}")
->name('NPClassifier Batch Auto '.ucfirst($collectionLabel))
->allowFailures()
->onConnection('redis')
->onQueue('default')
->dispatch();
}

Log::info("All classification jobs have been dispatched for collection {$collection_id}!");
Log::info("All classification jobs have been dispatched for {$collectionLabel}!");

return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class FetchCASNumbersAuto extends Command
*
* @var string
*/
protected $signature = 'coconut:fetch-cas-numbers {collection_id? : The ID of the collection to fetch CAS numbers for}';
protected $signature = 'coconut:fetch-cas-numbers {collection_id? : The ID of the collection to fetch CAS numbers for} {--all : Process all collections}';

/**
* The console command description.
Expand All @@ -40,6 +40,13 @@ public function handle()
{
$collection_id = $this->argument('collection_id');

if (! $collection_id && ! $this->option('all')) {
Log::error('Please specify either a collection_id or use --all flag');
$this->error('Please specify either a collection_id or use --all flag');

return 1;
}

if ($collection_id) {
$collection = Collection::find($collection_id);
if (! $collection) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class GenerateCoordinates extends Command
*
* @var string
*/
protected $signature = 'coconut:generate-coordinates-auto {collection_id?}';
protected $signature = 'coconut:generate-coordinates-auto {collection_id?} {--all : Process all collections}';

/**
* The console command description.
Expand All @@ -31,6 +31,12 @@ public function handle()
{
$collectionId = $this->argument('collection_id');

if (! $collectionId && ! $this->option('all')) {
Log::error('Please specify either a collection_id or use --all flag');

return 1;
}

$scriptPath = app_path('Scripts/generate_coordinates.py');
$tmpCsv = storage_path('app/public/coordinates_input'.($collectionId ? '_'.$collectionId : '').'.csv');
$outputJson = 'coordinates'.($collectionId ? '_'.$collectionId : '').'.json';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class GenerateProperties extends Command
*
* @var string
*/
protected $signature = 'coconut:generate-properties-auto {collection_id?}';
protected $signature = 'coconut:generate-properties-auto {collection_id?} {--all : Process all collections}';

/**
* The console command description.
Expand All @@ -31,6 +31,12 @@ public function handle()
{
$collectionId = $this->argument('collection_id');

if (! $collectionId && ! $this->option('all')) {
Log::error('Please specify either a collection_id or use --all flag');

return 1;
}

$outputTSV = 'properties'.($collectionId ? '_'.$collectionId : '').'.tsv';
$scriptPath = app_path('Scripts/generate_properties.py');

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class ImportPubChemNamesAuto extends Command
*
* @var string
*/
protected $signature = 'coconut:import-pubchem-data {collection_id : The ID of the collection to process} {--retry-failed : Retry previously failed entries}';
protected $signature = 'coconut:import-pubchem-data {collection_id? : The ID of the collection to process} {--all : Process all collections} {--retry-failed : Retry previously failed entries}';

/**
* The console command description.
Expand All @@ -35,15 +35,24 @@ public function handle()
$collection_id = $this->argument('collection_id');
$retryFailed = $this->option('retry-failed');

$collection = Collection::find($collection_id);
if (! $collection) {
Log::error("Collection with ID {$collection_id} not found.");
if (! $collection_id && ! $this->option('all')) {
Log::error('Please specify either a collection_id or use --all flag');

return 1;
}

if ($collection_id !== null) {
$collection = Collection::find($collection_id);
if (! $collection) {
Log::error("Collection with ID {$collection_id} not found.");

return 1;
}
}

$query = Molecule::select('molecules.id')
->join('entries', 'entries.molecule_id', '=', 'molecules.id')
->where('entries.collection_id', $collection_id)
->when($collection_id !== null, fn ($q) => $q->where('entries.collection_id', $collection_id))
->where(function ($query) {
$query->whereNull('molecules.name')
->orWhere('molecules.name', '=', '');
Expand All @@ -67,36 +76,38 @@ public function handle()
}

// Count the total number of molecules to process
$collectionLabel = $collection_id !== null ? "collection {$collection_id}" : 'all collections';

$totalCount = $query->count();
if ($totalCount === 0) {
Log::info("No molecules found that require PubChem data import for collection {$collection_id}.");
Log::info("No molecules found that require PubChem data import for {$collectionLabel}.");

return 0;
}

Log::info("Starting PubChem data import for {$totalCount} molecules in collection {$collection_id}.");
Log::info("Starting PubChem data import for {$totalCount} molecules in {$collectionLabel}.");

// Use chunk to process large sets of molecules
$query->chunkById(10000, function ($mols) use ($collection_id) {
$query->chunkById(10000, function ($mols) use ($collectionLabel) {
$moleculeCount = count($mols);
Log::info("Processing batch of {$moleculeCount} molecules for collection {$collection_id}");
Log::info("Processing batch of {$moleculeCount} molecules for {$collectionLabel}");

// Prepare batch jobs
$batchJobs = [];
$batchJobs[] = new ImportPubChemBatch($mols->pluck('id')->toArray());

// Dispatch as a batch
Bus::batch($batchJobs)
->catch(function (Batch $batch, Throwable $e) use ($collection_id) {
Log::error("PubChem import batch failed for collection {$collection_id}: ".$e->getMessage());
->catch(function (Batch $batch, Throwable $e) use ($collectionLabel) {
Log::error("PubChem import batch failed for {$collectionLabel}: ".$e->getMessage());
})
->name("Import PubChem Auto Batch Collection {$collection_id}")
->name('Import PubChem Auto Batch '.ucfirst($collectionLabel))
->allowFailures()
->onConnection('redis')
->onQueue('default')
->dispatch();
});
}, 'molecules.id', 'id');

Log::info("All PubChem import jobs dispatched for collection {$collection_id}!");
Log::info("All PubChem import jobs dispatched for {$collectionLabel}!");
}
}
42 changes: 37 additions & 5 deletions app/Jobs/ImportPubChemAuto.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Http;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\Redis;

class ImportPubChemAuto implements ShouldBeUnique, ShouldQueue
{
Expand All @@ -30,6 +31,20 @@ class ImportPubChemAuto implements ShouldBeUnique, ShouldQueue
*/
public $timeout = 120;

/**
* The number of times the job may be attempted.
*
* @var int
*/
public $tries = 3;

/**
* The number of seconds to wait before retrying the job.
*
* @var array
*/
public $backoff = [30, 60];

/**
* Create a new job instance.
*/
Expand Down Expand Up @@ -76,6 +91,14 @@ public function handle(): void
'batch_id' => $this->batch()?->id,
]);
}
} catch (\Illuminate\Http\Client\ConnectionException $e) {
// Transient network error — release back to queue for retry
Log::warning('PubChem connection timeout, will retry', [
'molecule_id' => $this->molecule->id,
'attempt' => $this->attempts(),
'error_message' => $e->getMessage(),
]);
throw $e;
} catch (\Throwable $e) {
// Only actual system errors should be treated as job failures
updateCurationStatus($this->molecule->id, $this->stepName, 'failed', $e->getMessage());
Expand Down Expand Up @@ -122,14 +145,23 @@ public function failed(\Throwable $exception): void
}

/**
* Make a throttled HTTP GET request and sleep for 200ms afterward.
* Make a throttled HTTP GET request, respecting a global 5 req/s limit
* enforced via Redis across all workers.
*/
private function throttledGet(string $url)
{
$response = Http::get($url);
usleep(200000); // Sleep for 200 milliseconds to limit to 5 requests per second

return $response;
while (true) {
try {
return Redis::throttle('pubchem-api')
->allow(5)
->every(1)
->block(0)
->then(fn () => Http::timeout(30)->connectTimeout(10)->get($url));
} catch (\Illuminate\Contracts\Redis\LimiterTimeoutException $e) {
// No slot available yet; wait 200ms before trying again
usleep(200000);
}
}
}

public function fetchIUPACNameFromPubChem()
Expand Down
2 changes: 0 additions & 2 deletions app/Jobs/ImportPubChemBatch.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Log;

class ImportPubChemBatch implements ShouldQueue
{
Expand Down Expand Up @@ -39,7 +38,6 @@ public function handle(): void

$batchJobs = [];
foreach ($molecules as $molecule) {
Log::info('Importing PubChem data for molecule ID: '.$molecule->id);
array_push($batchJobs, new ImportPubChemAuto($molecule));
}
$this->batch()->add($batchJobs);
Expand Down
Loading