Skip to content

Commit e7e52fa

Browse files
Merge pull request #760 from Steinbeck-Lab/fix-pipeline-scoping
Fix pipeline scoping
2 parents 6d71344 + a4c26c6 commit e7e52fa

7 files changed

Lines changed: 128 additions & 42 deletions

File tree

app/Console/Commands/SubmissionsAutoProcess/ClassifyAuto.php

Lines changed: 44 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ class ClassifyAuto extends Command
1616
/**
1717
* The name and signature of the console command.
1818
*/
19-
protected $signature = 'coconut:npclassify {collection_id : The ID of the collection to process}';
19+
protected $signature = 'coconut:npclassify {collection_id? : The ID of the collection to process} {--all : Process all collections}';
2020

2121
/**
2222
* The console command description.
@@ -30,40 +30,66 @@ public function handle()
3030
{
3131
$collection_id = $this->argument('collection_id');
3232

33-
$collection = Collection::find($collection_id);
34-
if (! $collection) {
35-
Log::error("Collection with ID {$collection_id} not found.");
33+
if (! $collection_id && ! $this->option('all')) {
34+
Log::error('Please specify either a collection_id or use --all flag');
3635

3736
return 1;
3837
}
3938

40-
Log::info("Classifying molecules using NPClassifier for collection ID: {$collection_id}");
39+
if ($collection_id !== null) {
40+
$collection = Collection::find($collection_id);
41+
if (! $collection) {
42+
Log::error("Collection with ID {$collection_id} not found.");
43+
44+
return 1;
45+
}
46+
}
47+
48+
$collectionLabel = $collection_id !== null ? "collection ID: {$collection_id}" : 'all collections';
49+
50+
Log::info("Classifying molecules using NPClassifier for {$collectionLabel}");
4151

4252
// Use raw query to avoid ambiguous column issues
43-
$sql = '
44-
SELECT DISTINCT molecules.id, molecules.canonical_smiles
45-
FROM molecules
46-
INNER JOIN entries ON entries.molecule_id = molecules.id
47-
INNER JOIN properties ON properties.molecule_id = molecules.id
53+
$conditions = '
54+
WHERE molecules.active = true
55+
AND properties.np_classifier_pathway IS NULL
56+
AND properties.np_classifier_superclass IS NULL
57+
AND properties.np_classifier_class IS NULL
58+
AND properties.np_classifier_is_glycoside IS NULL
59+
';
60+
61+
$bindings = [];
62+
if ($collection_id !== null) {
63+
$conditions = '
4864
WHERE entries.collection_id = ?
4965
AND molecules.active = true
5066
AND properties.np_classifier_pathway IS NULL
5167
AND properties.np_classifier_superclass IS NULL
5268
AND properties.np_classifier_class IS NULL
5369
AND properties.np_classifier_is_glycoside IS NULL
70+
';
71+
$bindings[] = $collection_id;
72+
}
73+
74+
$sql = '
75+
SELECT DISTINCT molecules.id, molecules.canonical_smiles
76+
FROM molecules
77+
INNER JOIN entries ON entries.molecule_id = molecules.id
78+
INNER JOIN properties ON properties.molecule_id = molecules.id
79+
'.$conditions.'
5480
ORDER BY molecules.id
5581
';
5682

57-
$molecules = DB::select($sql, [$collection_id]);
83+
$molecules = DB::select($sql, $bindings);
5884

5985
$totalCount = count($molecules);
6086
if ($totalCount === 0) {
61-
Log::info("No molecules found to classify in collection {$collection_id}.");
87+
Log::info("No molecules found to classify in {$collectionLabel}.");
6288

6389
return 0;
6490
}
6591

66-
Log::info("Starting NPClassifier for {$totalCount} molecules in collection {$collection_id}");
92+
Log::info("Starting NPClassifier for {$totalCount} molecules in {$collectionLabel}");
6793

6894
// Chunk the results manually
6995
$chunks = array_chunk($molecules, 1000);
@@ -72,23 +98,23 @@ public function handle()
7298
$moleculeIds = array_map(fn ($row) => $row->id, $chunk);
7399
$moleculeCount = count($moleculeIds);
74100

75-
Log::info("Processing batch of {$moleculeCount} molecules for classification in collection {$collection_id}");
101+
Log::info("Processing batch of {$moleculeCount} molecules for classification in {$collectionLabel}");
76102

77103
$batchJobs = [];
78104
$batchJobs[] = new ClassifyMoleculeBatch($moleculeIds);
79105

80106
Bus::batch($batchJobs)
81-
->catch(function (Batch $batch, Throwable $e) use ($collection_id) {
82-
Log::error("NPClassifier batch failed for collection {$collection_id}: ".$e->getMessage());
107+
->catch(function (Batch $batch, Throwable $e) use ($collectionLabel) {
108+
Log::error("NPClassifier batch failed for {$collectionLabel}: ".$e->getMessage());
83109
})
84-
->name("NPClassifier Batch Auto Collection {$collection_id}")
110+
->name('NPClassifier Batch Auto '.ucfirst($collectionLabel))
85111
->allowFailures()
86112
->onConnection('redis')
87113
->onQueue('default')
88114
->dispatch();
89115
}
90116

91-
Log::info("All classification jobs have been dispatched for collection {$collection_id}!");
117+
Log::info("All classification jobs have been dispatched for {$collectionLabel}!");
92118

93119
return 0;
94120
}

app/Console/Commands/SubmissionsAutoProcess/FetchCASNumbersAuto.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ class FetchCASNumbersAuto extends Command
1717
*
1818
* @var string
1919
*/
20-
protected $signature = 'coconut:fetch-cas-numbers {collection_id? : The ID of the collection to fetch CAS numbers for}';
20+
protected $signature = 'coconut:fetch-cas-numbers {collection_id? : The ID of the collection to fetch CAS numbers for} {--all : Process all collections}';
2121

2222
/**
2323
* The console command description.
@@ -40,6 +40,13 @@ public function handle()
4040
{
4141
$collection_id = $this->argument('collection_id');
4242

43+
if (! $collection_id && ! $this->option('all')) {
44+
Log::error('Please specify either a collection_id or use --all flag');
45+
$this->error('Please specify either a collection_id or use --all flag');
46+
47+
return 1;
48+
}
49+
4350
if ($collection_id) {
4451
$collection = Collection::find($collection_id);
4552
if (! $collection) {

app/Console/Commands/SubmissionsAutoProcess/GenerateCoordinates.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ class GenerateCoordinates extends Command
1515
*
1616
* @var string
1717
*/
18-
protected $signature = 'coconut:generate-coordinates-auto {collection_id?}';
18+
protected $signature = 'coconut:generate-coordinates-auto {collection_id?} {--all : Process all collections}';
1919

2020
/**
2121
* The console command description.
@@ -31,6 +31,12 @@ public function handle()
3131
{
3232
$collectionId = $this->argument('collection_id');
3333

34+
if (! $collectionId && ! $this->option('all')) {
35+
Log::error('Please specify either a collection_id or use --all flag');
36+
37+
return 1;
38+
}
39+
3440
$scriptPath = app_path('Scripts/generate_coordinates.py');
3541
$tmpCsv = storage_path('app/public/coordinates_input'.($collectionId ? '_'.$collectionId : '').'.csv');
3642
$outputJson = 'coordinates'.($collectionId ? '_'.$collectionId : '').'.json';

app/Console/Commands/SubmissionsAutoProcess/GenerateProperties.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ class GenerateProperties extends Command
1515
*
1616
* @var string
1717
*/
18-
protected $signature = 'coconut:generate-properties-auto {collection_id?}';
18+
protected $signature = 'coconut:generate-properties-auto {collection_id?} {--all : Process all collections}';
1919

2020
/**
2121
* The console command description.
@@ -31,6 +31,12 @@ public function handle()
3131
{
3232
$collectionId = $this->argument('collection_id');
3333

34+
if (! $collectionId && ! $this->option('all')) {
35+
Log::error('Please specify either a collection_id or use --all flag');
36+
37+
return 1;
38+
}
39+
3440
$outputTSV = 'properties'.($collectionId ? '_'.$collectionId : '').'.tsv';
3541
$scriptPath = app_path('Scripts/generate_properties.py');
3642

app/Console/Commands/SubmissionsAutoProcess/ImportPubChemNamesAuto.php

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class ImportPubChemNamesAuto extends Command
1818
*
1919
* @var string
2020
*/
21-
protected $signature = 'coconut:import-pubchem-data {collection_id : The ID of the collection to process} {--retry-failed : Retry previously failed entries}';
21+
protected $signature = 'coconut:import-pubchem-data {collection_id? : The ID of the collection to process} {--all : Process all collections} {--retry-failed : Retry previously failed entries}';
2222

2323
/**
2424
* The console command description.
@@ -35,15 +35,24 @@ public function handle()
3535
$collection_id = $this->argument('collection_id');
3636
$retryFailed = $this->option('retry-failed');
3737

38-
$collection = Collection::find($collection_id);
39-
if (! $collection) {
40-
Log::error("Collection with ID {$collection_id} not found.");
38+
if (! $collection_id && ! $this->option('all')) {
39+
Log::error('Please specify either a collection_id or use --all flag');
4140

4241
return 1;
4342
}
43+
44+
if ($collection_id !== null) {
45+
$collection = Collection::find($collection_id);
46+
if (! $collection) {
47+
Log::error("Collection with ID {$collection_id} not found.");
48+
49+
return 1;
50+
}
51+
}
52+
4453
$query = Molecule::select('molecules.id')
4554
->join('entries', 'entries.molecule_id', '=', 'molecules.id')
46-
->where('entries.collection_id', $collection_id)
55+
->when($collection_id !== null, fn ($q) => $q->where('entries.collection_id', $collection_id))
4756
->where(function ($query) {
4857
$query->whereNull('molecules.name')
4958
->orWhere('molecules.name', '=', '');
@@ -67,36 +76,38 @@ public function handle()
6776
}
6877

6978
// Count the total number of molecules to process
79+
$collectionLabel = $collection_id !== null ? "collection {$collection_id}" : 'all collections';
80+
7081
$totalCount = $query->count();
7182
if ($totalCount === 0) {
72-
Log::info("No molecules found that require PubChem data import for collection {$collection_id}.");
83+
Log::info("No molecules found that require PubChem data import for {$collectionLabel}.");
7384

7485
return 0;
7586
}
7687

77-
Log::info("Starting PubChem data import for {$totalCount} molecules in collection {$collection_id}.");
88+
Log::info("Starting PubChem data import for {$totalCount} molecules in {$collectionLabel}.");
7889

7990
// Use chunk to process large sets of molecules
80-
$query->chunkById(10000, function ($mols) use ($collection_id) {
91+
$query->chunkById(10000, function ($mols) use ($collectionLabel) {
8192
$moleculeCount = count($mols);
82-
Log::info("Processing batch of {$moleculeCount} molecules for collection {$collection_id}");
93+
Log::info("Processing batch of {$moleculeCount} molecules for {$collectionLabel}");
8394

8495
// Prepare batch jobs
8596
$batchJobs = [];
8697
$batchJobs[] = new ImportPubChemBatch($mols->pluck('id')->toArray());
8798

8899
// Dispatch as a batch
89100
Bus::batch($batchJobs)
90-
->catch(function (Batch $batch, Throwable $e) use ($collection_id) {
91-
Log::error("PubChem import batch failed for collection {$collection_id}: ".$e->getMessage());
101+
->catch(function (Batch $batch, Throwable $e) use ($collectionLabel) {
102+
Log::error("PubChem import batch failed for {$collectionLabel}: ".$e->getMessage());
92103
})
93-
->name("Import PubChem Auto Batch Collection {$collection_id}")
104+
->name('Import PubChem Auto Batch '.ucfirst($collectionLabel))
94105
->allowFailures()
95106
->onConnection('redis')
96107
->onQueue('default')
97108
->dispatch();
98-
});
109+
}, 'molecules.id', 'id');
99110

100-
Log::info("All PubChem import jobs dispatched for collection {$collection_id}!");
111+
Log::info("All PubChem import jobs dispatched for {$collectionLabel}!");
101112
}
102113
}

app/Jobs/ImportPubChemAuto.php

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use Illuminate\Queue\SerializesModels;
1212
use Illuminate\Support\Facades\Http;
1313
use Illuminate\Support\Facades\Log;
14+
use Illuminate\Support\Facades\Redis;
1415

1516
class ImportPubChemAuto implements ShouldBeUnique, ShouldQueue
1617
{
@@ -30,6 +31,20 @@ class ImportPubChemAuto implements ShouldBeUnique, ShouldQueue
3031
*/
3132
public $timeout = 120;
3233

34+
/**
35+
* The number of times the job may be attempted.
36+
*
37+
* @var int
38+
*/
39+
public $tries = 3;
40+
41+
/**
42+
* The number of seconds to wait before retrying the job.
43+
*
44+
* @var array
45+
*/
46+
public $backoff = [30, 60];
47+
3348
/**
3449
* Create a new job instance.
3550
*/
@@ -76,6 +91,14 @@ public function handle(): void
7691
'batch_id' => $this->batch()?->id,
7792
]);
7893
}
94+
} catch (\Illuminate\Http\Client\ConnectionException $e) {
95+
// Transient network error — release back to queue for retry
96+
Log::warning('PubChem connection timeout, will retry', [
97+
'molecule_id' => $this->molecule->id,
98+
'attempt' => $this->attempts(),
99+
'error_message' => $e->getMessage(),
100+
]);
101+
throw $e;
79102
} catch (\Throwable $e) {
80103
// Only actual system errors should be treated as job failures
81104
updateCurationStatus($this->molecule->id, $this->stepName, 'failed', $e->getMessage());
@@ -122,14 +145,23 @@ public function failed(\Throwable $exception): void
122145
}
123146

124147
/**
125-
* Make a throttled HTTP GET request and sleep for 200ms afterward.
148+
* Make a throttled HTTP GET request, respecting a global 5 req/s limit
149+
* enforced via Redis across all workers.
126150
*/
127151
private function throttledGet(string $url)
128152
{
129-
$response = Http::get($url);
130-
usleep(200000); // Sleep for 200 milliseconds to limit to 5 requests per second
131-
132-
return $response;
153+
while (true) {
154+
try {
155+
return Redis::throttle('pubchem-api')
156+
->allow(5)
157+
->every(1)
158+
->block(0)
159+
->then(fn () => Http::timeout(30)->connectTimeout(10)->get($url));
160+
} catch (\Illuminate\Contracts\Redis\LimiterTimeoutException $e) {
161+
// No slot available yet; wait 200ms before trying again
162+
usleep(200000);
163+
}
164+
}
133165
}
134166

135167
public function fetchIUPACNameFromPubChem()

app/Jobs/ImportPubChemBatch.php

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
use Illuminate\Foundation\Bus\Dispatchable;
1010
use Illuminate\Queue\InteractsWithQueue;
1111
use Illuminate\Queue\SerializesModels;
12-
use Illuminate\Support\Facades\Log;
1312

1413
class ImportPubChemBatch implements ShouldQueue
1514
{
@@ -39,7 +38,6 @@ public function handle(): void
3938

4039
$batchJobs = [];
4140
foreach ($molecules as $molecule) {
42-
Log::info('Importing PubChem data for molecule ID: '.$molecule->id);
4341
array_push($batchJobs, new ImportPubChemAuto($molecule));
4442
}
4543
$this->batch()->add($batchJobs);

0 commit comments

Comments
 (0)