Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,825 changes: 929 additions & 896 deletions Cargo.lock

Large diffs are not rendered by default.

36 changes: 18 additions & 18 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
[package]
name = "melba"
version = "0.1.0"
edition = "2021"
edition = "2024"
rust-version = "1.85.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio-postgres = "0.7.10"
tokio = {version = "1.37.0", features = ["full"]}
sqlx = {version = "0.7.4", features = ["runtime-async-std-native-tls","postgres", "migrate"]}
reqwest = { version = "0.12.5", features = ["json"] }
serde_json = "1.0.116"
tokio-postgres = "0.7.13"
tokio = {version = "1.44.1", features = ["full"]}
sqlx = {version = "0.8.3", default-features = false, features = ["runtime-tokio", "tls-native-tls", "migrate", "derive", "macros", "chrono", "postgres"]}
reqwest = { version = "0.12.15", features = ["json"] }
serde_json = "1.0.140"
linkify = "0.10.0"
mb-rs = {git = "https://github.com/yellowHatpro/Rust-Playground.git"}
serde = { version = "1.0.198", features = ["derive"] }
chrono = { version = "0.4.38" , features = ["serde"] }
dotenv = "0.15.0"
clap = {version = "4.5.7", features = ["color", "derive"]}
serde = { version = "1.0.219", features = ["derive"] }
chrono = { version = "0.4.40", features = ["serde"] }
clap = {version = "4.5.32", features = ["color", "derive"]}
colorize = "0.1.0"
thiserror = "1.0.61"
config = "0.14.0"
once_cell = "1.19.0"
sentry = "0.34.0"
thiserror = "2.0.12"
config = "0.15.11"
once_cell = "1.21.1"
sentry = "0.36.0"
prometheus = {version = "0.13.4", features = ["push"]}
log = "0.4.14"
env_logger = "0.11.5"
log = "0.4.26"
env_logger = "0.11.7"
dotenvy = "0.15.7"

[lib]
path = "src/lib.rs"
Expand All @@ -34,4 +34,4 @@ name = "melba"
path = "src/main.rs"

[dev-dependencies]
mockito = "1.4.0"
mockito = "1.7.0"
2 changes: 1 addition & 1 deletion src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub async fn spawn_poller_task(db_pool: PgPool) -> JoinHandle<()> {
.expect("[POLLER] Could not find rows in edit rows to start poller");

tokio::spawn(async move {
if let Err(e) = poller.poll().await {
if let Err(e) = poller.run().await {
error!("[POLLER] Task Failed, Error: {}", e);
sentry::capture_error(&e);
}
Expand Down
3 changes: 2 additions & 1 deletion src/cli/utils.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use crate::archival::archival_response::ArchivalStatusResponse;
use crate::archival::error::ArchivalError;
use crate::archival::utils::make_archival_status_request;
use crate::models::musicbrainz_db::EditData;
use crate::models::musicbrainz_db::EditNote;
use crate::poller;
use crate::poller::utils::should_insert_url_to_internet_archive_urls;
use colorize::AnsiColor;
use mb_rs::schema::{EditData, EditNote};
use sqlx::{Error, PgPool, Row};

//TODO: Currently I am returning the internet_archive_urls row id when I insert any URL. Now there might be URLs which are already saved, hence instead of row id, show how many URLs are still there unprocessed, and is before the currently inserted one.
Expand Down
2 changes: 1 addition & 1 deletion src/configuration/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use config::{Config, ConfigError, File};
use dotenv::dotenv;
use dotenvy::dotenv;
use env_logger::Builder;
use log::LevelFilter;
use once_cell::sync::Lazy;
Expand Down
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
// Lib target intended for integration tests.
// TODO: Move tests to the main crate

pub mod app;
pub mod archival;
pub mod configuration;
pub mod metrics;
pub mod models;
pub mod poller;
pub mod structs;
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
mod app;
mod archival;
mod cli;
mod models;

Check failure on line 8 in src/main.rs

View workflow job for this annotation

GitHub Actions / Test for ubuntu-latest

file is loaded as a module multiple times: `src/models/mod.rs`
mod poller;
mod structs;
mod models;

Check failure on line 11 in src/main.rs

View workflow job for this annotation

GitHub Actions / Test for ubuntu-latest

the name `models` is defined multiple times

Check failure on line 11 in src/main.rs

View workflow job for this annotation

GitHub Actions / Test for ubuntu-latest

the name `models` is defined multiple times

Check failure on line 11 in src/main.rs

View workflow job for this annotation

GitHub Actions / Test for ubuntu-latest

the name `models` is defined multiple times

mod configuration;
mod metrics;
Expand Down
3 changes: 3 additions & 0 deletions src/models/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
//! This module contain all the database models used in the app

pub mod musicbrainz_db;

Check failure on line 3 in src/models/mod.rs

View workflow job for this annotation

GitHub Actions / Test for ubuntu-latest

file is loaded as a module multiple times: `src/models/musicbrainz_db.rs`
20 changes: 20 additions & 0 deletions src/models/musicbrainz_db.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
//! All the relevant models from the Msuicbrainz Database

/// The edit note of an entity
#[derive(sqlx::FromRow, Debug)]
pub struct EditNote {
pub id: i32,
pub editor: i32,
pub edit: i32,
pub text: String,

#[allow(dead_code)]
pub post_time: Option<chrono::DateTime<chrono::Utc>>,
}

/// The data of the edited entity
#[derive(sqlx::FromRow, Debug)]
pub struct EditData {
pub edit: i32,
pub data: serde_json::Value,
}
98 changes: 69 additions & 29 deletions src/poller/looper.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::metrics::Metrics;
use crate::models::musicbrainz_db::EditData;
use crate::models::musicbrainz_db::EditNote;
use crate::poller::utils::{
extract_url_from_edit_data, extract_url_from_edit_note, save_url_to_internet_archive_urls,
};
use log::{info, warn};
use mb_rs::schema::{EditData, EditNote};
use sqlx::{Error, PgPool};

/// Function which runs on each poll and thus is responsible for:
Expand All @@ -22,6 +23,24 @@ pub async fn poll_db(
edit_note_start_idx, edit_data_start_idx
);
let metrics = Metrics::new().await;

// Handle Edits
let next_edit_id = poll_and_save_edit_data(edit_data_start_idx, pool).await?;

// Handle Edits Notes
let next_edit_note_id = poll_and_save_edit_note_data(edit_note_start_idx, pool).await?;

// Perf Note: use join! on the two calls

metrics.db_poll_counter.inc();
metrics.push_metrics().await;

// Return the next ids of the last edit and notes for the next poll
Ok((next_edit_id, next_edit_note_id))
}

/// Poll the edit data from the database and save it
async fn poll_and_save_edit_data(start_id: i32, pool: &PgPool) -> Result<Option<i32>, Error> {
let edits = sqlx::query_as::<_, EditData>(
r#"
SELECT DISTINCT ON (edit)
Expand All @@ -32,57 +51,78 @@ pub async fn poll_db(
LIMIT 10;
"#,
)
.bind(edit_data_start_idx)
.bind(start_id)
.fetch_all(pool)
.await?;

// Perf Note: use a stream
for edit in &edits {
let urls = extract_url_from_edit_data(edit, pool).await;

// Perf Note: use a stream
for url in urls {
let save_edit_data_url_result =
save_url_to_internet_archive_urls(url.as_str(), "edit_data", edit.edit, pool).await;

match save_edit_data_url_result {
// The url got saved
Ok(true) => info!("[POLLER] ADDED: Edit ID `{}` URL `{}`", edit.edit, url),

// The url didn't need to get saved
Ok(false) => {}

// Couldn't save the URL
Err(e) => warn!(
"[POLLER] Error saving an URL from edit id `{}`: {}",
edit.edit, e
),
}
}
}

Ok(edits.last().map(|edit| edit.edit + 1))
}

/// Poll the edit data from the database and save it
async fn poll_and_save_edit_note_data(start_id: i32, pool: &PgPool) -> Result<Option<i32>, Error> {
let notes = sqlx::query_as::<_, EditNote>(
r#"
SELECT DISTINCT ON (id)
SELECT DISTINCT ON (id)
*
FROM edit_note
WHERE id >= $1
ORDER BY id
LIMIT 10;
"#,
)
.bind(edit_note_start_idx)
.bind(start_id)
.fetch_all(pool)
.await?;

for edit in &edits {
let urls = extract_url_from_edit_data(edit, pool).await;
for url in urls {
let save_edit_data_url_result =
save_url_to_internet_archive_urls(url.as_str(), "edit_data", edit.edit, pool).await;
if let Ok(true) = save_edit_data_url_result {
info!("[POLLER] ADDED: Edit Data {} URL {}", edit.edit, url);
} else if let Err(e) = save_edit_data_url_result {
warn!("[POLLER] Error saving URL from edit: {}: {}", edit.edit, e)
}
}
}
// Perf Note: use a stream
for note in &notes {
let urls = extract_url_from_edit_note(note, pool).await;

// Perf Note: use a stream
for url in urls {
let save_edit_note_url_result =
save_url_to_internet_archive_urls(url.as_str(), "edit_note", note.id, pool).await;
if let Ok(true) = save_edit_note_url_result {
info!("[POLLER] ADDED: Edit Note ID {} URL {}", note.id, url);
} else if let Err(e) = save_edit_note_url_result {
warn!(
"[POLLER] Error saving URL from edit note: {}: {}",

match save_edit_note_url_result {
// The url got saved
Ok(true) => info!("[POLLER] ADDED: Edit Note ID `{}` URL `{}`", note.id, url),

// The url didn't need to get saved
Ok(false) => {}

// Couldn't save the URL
Err(e) => warn!(
"[POLLER] Error saving an URL from edit note id `{}`: {}",
note.id, e
)
),
}
}
}
metrics.db_poll_counter.inc();
metrics.push_metrics().await;

// Return the next ids of the last edit and notes for the next poll
Ok((
edits.last().map(|edit| edit.edit + 1),
notes.last().map(|note| note.id + 1),
))
Ok(notes.last().map(|note| note.id + 1))
}
69 changes: 36 additions & 33 deletions src/poller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,25 @@ use tokio::time::interval;

/// Responsible for polling musicbrainz data for edit_notes and edit_data
pub struct Poller {
/// Time between polls, in seconds
poll_interval: u64,

/// The database pool
pool: sqlx::PgPool,

/// The start id of the note poller
edit_note_start_idx: i32,

/// The start id of the data poller
edit_data_start_idx: i32,
}

impl Poller {
pub async fn new(poll_interval: u64, pool: sqlx::PgPool) -> Result<Self, Error> {
// Fetch the start ids for the poller from the DB
let (edit_data_start_idx, edit_note_start_idx) =
get_edit_data_and_note_start_id(&pool).await?;

Ok(Poller {
poll_interval,
pool,
Expand All @@ -27,42 +36,36 @@ impl Poller {
})
}

/// Polls the `edit_data` and `edit_note` tables continuously
pub async fn poll(&mut self) -> Result<(), Error> {
/// This function is the main loop of the poller. Each loop polls the `edit_data` and `edit_note` tables for new changes
pub async fn run(&mut self) -> Result<(), Error> {
let mut interval = interval(Duration::from_secs(self.poll_interval));

loop {
interval.tick().await;
match looper::poll_db(
&self.pool,
self.edit_data_start_idx,
self.edit_note_start_idx,
)
.await
{
Ok((edit_data_id, edit_note_id)) => {
if edit_data_id.is_some() {
self.edit_data_start_idx = edit_data_id.unwrap();
update_last_unprocessed_rows(
"edit_data",
edit_data_id.unwrap(),
&self.pool,
)
.await?;
}
if edit_note_id.is_some() {
self.edit_note_start_idx = edit_note_id.unwrap();
update_last_unprocessed_rows(
"edit_note",
edit_note_id.unwrap(),
&self.pool,
)
.await?;
}
}
Err(e) => {
return Err(e);
}
}

self.poll().await?;
}
}

/// Execute a poll action.
async fn poll(&mut self) -> Result<(), Error> {
let (edit_data_id, edit_note_id) = looper::poll_db(
&self.pool,
self.edit_data_start_idx,
self.edit_note_start_idx,
)
.await?;

if let Some(edit_data_id) = edit_data_id {
self.edit_data_start_idx = edit_data_id;
update_last_unprocessed_rows("edit_data", edit_data_id, &self.pool).await?;
}

if let Some(edit_note_id) = edit_note_id {
self.edit_note_start_idx = edit_note_id;
update_last_unprocessed_rows("edit_note", edit_note_id, &self.pool).await?;
}

Ok(())
}
}
2 changes: 2 additions & 0 deletions src/poller/tests/utils.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::models::musicbrainz_db::EditNote;

use super::*;

#[test]
Expand Down Expand Up @@ -505,7 +507,7 @@
async fn test_should_insert_url_to_internet_archive_urls(pool: PgPool) -> Result<(), Error> {
let url1 = "www.musicbrainz.org";
let url2 = "www.example.com";
let url3 = "www.example2.com";

Check warning on line 510 in src/poller/tests/utils.rs

View workflow job for this annotation

GitHub Actions / Test for ubuntu-latest

Diff in /home/runner/work/melba/melba/src/poller/./tests/utils.rs
let url4 = "www.example.com";

assert!(!should_insert_url_to_internet_archive_urls(url1, &pool)
Expand All @@ -522,7 +524,7 @@
(id, url, job_id, from_table, from_table_id, created_at, retry_count, status, status_message)
VALUES (1, 'www.example.com', NULL, 'edit_note', 71024937, '2024-08-25 14:18:47.891132+00', 0, 1, NULL);
")
.execute(&pool)

Check warning on line 527 in src/poller/tests/utils.rs

View workflow job for this annotation

GitHub Actions / Test for ubuntu-latest

Diff in /home/runner/work/melba/melba/src/poller/./tests/utils.rs
.await?;

assert!(!should_insert_url_to_internet_archive_urls(url4, &pool)
Expand Down
Loading
Loading