diff --git a/src/auth.rs b/src/auth.rs index d54e47a7..f84a4346 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -25,10 +25,15 @@ impl AuthSettings { self.authorized_subgraphs.is_empty() || self.authorized_subgraphs.contains(subgraph) } + /// Check if any of the deployment's subgraphs is authorized. + /// + /// If the set of authorized subgraphs is empty, then any deployment is authorized (including + /// orphaned deployments with no parent subgraphs). pub fn is_any_deployment_subgraph_authorized(&self, subgraphs: &[SubgraphId]) -> bool { - subgraphs - .iter() - .any(|subgraph| self.is_subgraph_authorized(subgraph)) + self.authorized_subgraphs.is_empty() + || subgraphs + .iter() + .any(|subgraph| self.is_subgraph_authorized(subgraph)) } } diff --git a/src/network/pre_processing.rs b/src/network/pre_processing.rs index dc7b64a3..b019a54a 100644 --- a/src/network/pre_processing.rs +++ b/src/network/pre_processing.rs @@ -16,73 +16,21 @@ use crate::network::{ pub fn into_internal_indexers_raw_info<'a>( data: impl Iterator, ) -> HashMap { - let mut indexer_indexing_largest_allocation: HashMap< - (IndexerId, DeploymentId), - (AllocationId, u128), - > = HashMap::new(); - - data.flat_map(|subgraph| { - subgraph - .versions - .iter() - .map(|version| (&subgraph.id, version)) - }) - .fold(HashMap::new(), |mut acc, (subgraph_id, version)| { - for allocation in &version.subgraph_deployment.allocations { - let indexer_id = allocation.indexer.id; - let deployment_id = version.subgraph_deployment.id; - - // If the indexer info is not present, insert it if it is valid - let indexer = match acc.entry(indexer_id) { - Entry::Occupied(entry) => entry.into_mut(), - Entry::Vacant(entry) => match try_into_indexer_raw_info(&allocation.indexer) { - Ok(info) => entry.insert(info), - Err(err) => { - // Log the error and skip the indexer - tracing::info!( - subgraph_id=%subgraph_id, - version=%version.version, - deployment_id=%deployment_id, - allocation_id=%allocation.id, - indexer_id=%indexer_id, - "invalid indexer info: {err}" - ); - continue; - } - }, - }; - - // Update the indexer's indexings largest allocations table - let indexing_largest_allocation = match indexer_indexing_largest_allocation - .entry((indexer_id, deployment_id)) - { - Entry::Vacant(entry) => { - entry.insert((allocation.id, allocation.allocated_tokens)); - allocation.id - } - Entry::Occupied(entry) => { - let (largest_allocation_address, largest_allocation_amount) = entry.into_mut(); - if allocation.allocated_tokens > *largest_allocation_amount { - *largest_allocation_address = allocation.id; - *largest_allocation_amount = allocation.allocated_tokens; - } - *largest_allocation_address - } - }; + let mut indexers = HashMap::new(); + let mut largest_allocations = HashMap::new(); - // Update the indexer's indexings info - let indexing = indexer - .indexings - .entry(deployment_id) - .or_insert(IndexingRawInfo { - largest_allocation: allocation.id, - }); - - indexing.largest_allocation = indexing_largest_allocation; + for subgraph in data { + for version in &subgraph.versions { + process_allocations_into_indexers( + &version.subgraph_deployment.allocations, + version.subgraph_deployment.id, + &mut indexers, + &mut largest_allocations, + ); } + } - acc - }) + indexers } /// Convert from the fetched subgraphs information into the internal representation. @@ -235,3 +183,116 @@ fn try_into_indexer_raw_info( indexings: Default::default(), }) } + +/// Convert orphaned deployments (not linked to any active subgraph) into internal representation. +/// +/// Orphaned deployments have `activeSubgraphCount: 0`. This function filters out deployments +/// without a valid manifest or without active allocations. +pub fn into_orphaned_deployments_raw_info( + data: impl Iterator, +) -> HashMap { + data.filter_map(|deployment| { + // Validate manifest exists + let manifest = deployment.manifest.as_ref()?; + let network = manifest.network.as_ref()?; + + // Skip if no active allocations + if deployment.allocations.is_empty() { + return None; + } + + let allocations = deployment + .allocations + .iter() + .map(|a| AllocationInfo { + indexer: a.indexer.id, + }) + .collect(); + + Some(( + deployment.id, + DeploymentRawInfo { + id: deployment.id, + manifest_network: network.clone(), + manifest_start_block: manifest.start_block, + subgraphs: Default::default(), // Empty - orphaned + allocations, + }, + )) + }) + .collect() +} + +/// Extract indexer information from orphaned deployments. +pub fn into_indexers_raw_info_from_orphaned_deployments<'a>( + data: impl Iterator, +) -> HashMap { + let mut indexers = HashMap::new(); + let mut largest_allocations = HashMap::new(); + + for deployment in data { + process_allocations_into_indexers( + &deployment.allocations, + deployment.id, + &mut indexers, + &mut largest_allocations, + ); + } + + indexers +} + +/// Process allocations and update indexer maps. +/// +/// For each allocation, validates the indexer info and tracks the largest allocation +/// per (indexer, deployment) pair. +fn process_allocations_into_indexers( + allocations: &[subgraph_client::types::Allocation], + deployment_id: DeploymentId, + indexers: &mut HashMap, + largest_allocations: &mut HashMap<(IndexerId, DeploymentId), (AllocationId, u128)>, +) { + for allocation in allocations { + let indexer_id = allocation.indexer.id; + + let indexer = match indexers.entry(indexer_id) { + Entry::Occupied(entry) => entry.into_mut(), + Entry::Vacant(entry) => match try_into_indexer_raw_info(&allocation.indexer) { + Ok(info) => entry.insert(info), + Err(err) => { + tracing::info!( + %deployment_id, + allocation_id = %allocation.id, + %indexer_id, + "invalid indexer info: {err}" + ); + continue; + } + }, + }; + + let indexing_largest_allocation = + match largest_allocations.entry((indexer_id, deployment_id)) { + Entry::Vacant(entry) => { + entry.insert((allocation.id, allocation.allocated_tokens)); + allocation.id + } + Entry::Occupied(entry) => { + let (addr, amount) = entry.into_mut(); + if allocation.allocated_tokens > *amount { + *addr = allocation.id; + *amount = allocation.allocated_tokens; + } + *addr + } + }; + + indexer + .indexings + .entry(deployment_id) + .or_insert(IndexingRawInfo { + largest_allocation: allocation.id, + }) + .largest_allocation = indexing_largest_allocation; + } +} diff --git a/src/network/service.rs b/src/network/service.rs index a944b8f0..de165668 100644 --- a/src/network/service.rs +++ b/src/network/service.rs @@ -250,8 +250,9 @@ fn spawn_updater_task( /// Fetch the subgraphs information from the graph network subgraph and performs pre-processing /// steps, i.e., validation and conversion into the internal representation. /// -/// 1. Fetch the subgraphs information from the graph network subgraph. -/// 2. Validate and convert the subgraphs fetched info into the internal representation. +/// 1. Fetch the subgraphs and orphaned deployments from the graph network subgraph. +/// 2. Validate and convert the fetched info into the internal representation. +/// 3. Merge orphaned deployments into the deployments map. /// /// If the fetch fails or the response is empty, an error is returned. /// @@ -259,14 +260,29 @@ fn spawn_updater_task( pub async fn fetch_and_preprocess_subgraph_info( client: &mut SubgraphClient, ) -> anyhow::Result { - // Fetch the subgraphs information from the graph network subgraph + // Fetch the subgraphs and orphaned deployments from the graph network subgraph let data = client.fetch().await?; - anyhow::ensure!(!data.is_empty(), "empty subgraph response"); + anyhow::ensure!(!data.subgraphs.is_empty(), "empty subgraph response"); // Pre-process (validate and convert) the fetched subgraphs information - let indexers = pre_processing::into_internal_indexers_raw_info(data.iter()); - let subgraphs = pre_processing::into_internal_subgraphs_raw_info(data.into_iter()); - let deployments = pre_processing::into_internal_deployments_raw_info(subgraphs.values()); + let mut indexers = pre_processing::into_internal_indexers_raw_info(data.subgraphs.iter()); + let subgraphs = pre_processing::into_internal_subgraphs_raw_info(data.subgraphs.into_iter()); + let mut deployments = pre_processing::into_internal_deployments_raw_info(subgraphs.values()); + + // Pre-process orphaned deployments and merge them + let orphaned_indexers = pre_processing::into_indexers_raw_info_from_orphaned_deployments( + data.orphaned_deployments.iter(), + ); + let orphaned_deployments = + pre_processing::into_orphaned_deployments_raw_info(data.orphaned_deployments.into_iter()); + + for (id, indexer) in orphaned_indexers { + indexers.entry(id).or_insert(indexer); + } + + for (id, deployment) in orphaned_deployments { + deployments.entry(id).or_insert(deployment); + } let subgraphs = subgraph_processing::process_subgraph_info(subgraphs); let deployments = subgraph_processing::process_deployments_info(deployments); diff --git a/src/network/subgraph_client.rs b/src/network/subgraph_client.rs index 1a442bee..fae83850 100644 --- a/src/network/subgraph_client.rs +++ b/src/network/subgraph_client.rs @@ -11,7 +11,7 @@ use serde_json::json; use serde_with::serde_as; use thegraph_core::alloy::primitives::{BlockHash, BlockNumber, BlockTimestamp}; use thegraph_graphql_http::http::response::Error as GqlError; -use types::Subgraph; +use types::{Subgraph, SubgraphDeployment}; use url::Url; use crate::{ @@ -47,7 +47,7 @@ pub mod types { #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "camelCase")] pub struct SubgraphVersion { - pub version: u32, + pub _version: u32, pub subgraph_deployment: SubgraphDeployment, } @@ -128,9 +128,17 @@ pub struct Client { pub max_lag_seconds: u64, } +/// Result of fetching network subgraph data. +pub struct FetchResult { + /// Active subgraphs with their versions and deployments. + pub subgraphs: Vec, + /// Orphaned deployments (not linked to any active subgraph but with active allocations). + pub orphaned_deployments: Vec, +} + impl Client { - /// Fetch the list of subgraphs (and deployments) from the network subgraph. - pub async fn fetch(&mut self) -> anyhow::Result> { + /// Fetch the list of subgraphs and orphaned deployments from the network subgraph. + pub async fn fetch(&mut self) -> anyhow::Result { for indexer in &self.indexers.clone() { match self.fetch_from_indexer(indexer).await { Ok(results) => return Ok(results), @@ -148,10 +156,10 @@ impl Client { async fn fetch_from_indexer( &mut self, indexer: &TrustedIndexer, - ) -> anyhow::Result> { + ) -> anyhow::Result { // ref: 9936786a-e286-45f3-9190-8409d8389e88 let query = r#" - query ($block: Block_height!, $first: Int!, $last: String!) { + query ($block: Block_height!, $first: Int!, $last: String!, $lastOrphaned: String!) { meta: _meta(block: $block) { block { number hash timestamp } } results: subgraphs( block: $block @@ -189,6 +197,34 @@ impl Client { } } } + orphanedDeployments: subgraphDeployments( + block: $block + orderBy: id, orderDirection: asc + first: $first + where: { + id_gt: $lastOrphaned + activeSubgraphCount: 0 + } + ) { + ipfsHash + manifest { + network + startBlock + } + indexerAllocations( + first: 100 + orderBy: allocatedTokens, orderDirection: desc + where: { status: Active } + ) { + id + allocatedTokens + indexer { + id + url + stakedTokens + } + } + } }"#; #[derive(Debug, Deserialize)] @@ -198,9 +234,11 @@ impl Client { errors: Vec, } #[derive(Debug, Deserialize)] + #[serde(rename_all = "camelCase")] pub struct QueryData { meta: Meta, results: Vec, + orphaned_deployments: Vec, } #[derive(Debug, Deserialize)] pub struct Meta { @@ -216,8 +254,17 @@ impl Client { debug_assert!(self.page_size > 0); let mut query_block: Option = None; let mut last_id: Option = None; + let mut last_orphaned_id: Option = None; + let mut subgraphs_done = false; + let mut orphaned_done = false; let mut results: Vec = Default::default(); + let mut orphaned_results: Vec = Default::default(); + // Pagination uses independent cursors for subgraphs and orphaned deployments. Both + // subqueries are included in every request, even after one completes. When one finishes, + // its cursor remains at the final value causing subsequent queries to return empty results + // for that subquery. This avoids the complexity of dynamically constructing the query string + // and dealing with multiple result types. loop { let block_height = match &query_block { Some(block) => BlockHeight::Hash(block.hash), @@ -230,7 +277,8 @@ impl Client { "variables": { "block": block_height, "first": self.page_size, - "last": last_id.unwrap_or_default(), + "last": last_id.clone().unwrap_or_default(), + "lastOrphaned": last_orphaned_id.clone().unwrap_or_default(), }, }); let response = self @@ -277,16 +325,36 @@ impl Client { ); query_block = Some(block); } - last_id = data.results.last().map(|entry| entry.id.to_string()); - let page_len = data.results.len(); - results.append(&mut data.results); - if page_len < self.page_size { + + if !subgraphs_done { + last_id = data.results.last().map(|entry| entry.id.to_string()); + if data.results.len() < self.page_size { + subgraphs_done = true; + } + results.append(&mut data.results); + } + + if !orphaned_done { + last_orphaned_id = data + .orphaned_deployments + .last() + .map(|entry| entry.id.to_string()); + if data.orphaned_deployments.len() < self.page_size { + orphaned_done = true; + } + orphaned_results.append(&mut data.orphaned_deployments); + } + + if subgraphs_done && orphaned_done { break; } } self.latest_block = Some(query_block.unwrap()); - Ok(results) + Ok(FetchResult { + subgraphs: results, + orphaned_deployments: orphaned_results, + }) } }