Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,15 @@ impl AuthSettings {
self.authorized_subgraphs.is_empty() || self.authorized_subgraphs.contains(subgraph)
}

/// Check if any of the deployment's subgraphs is authorized.
///
/// If the set of authorized subgraphs is empty, then any deployment is authorized (including
/// orphaned deployments with no parent subgraphs).
pub fn is_any_deployment_subgraph_authorized(&self, subgraphs: &[SubgraphId]) -> bool {
subgraphs
.iter()
.any(|subgraph| self.is_subgraph_authorized(subgraph))
self.authorized_subgraphs.is_empty()
|| subgraphs
.iter()
.any(|subgraph| self.is_subgraph_authorized(subgraph))
}
}

Expand Down
189 changes: 125 additions & 64 deletions src/network/pre_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,73 +16,21 @@ use crate::network::{
pub fn into_internal_indexers_raw_info<'a>(
data: impl Iterator<Item = &'a subgraph_client::types::Subgraph>,
) -> HashMap<IndexerId, IndexerRawInfo> {
let mut indexer_indexing_largest_allocation: HashMap<
(IndexerId, DeploymentId),
(AllocationId, u128),
> = HashMap::new();

data.flat_map(|subgraph| {
subgraph
.versions
.iter()
.map(|version| (&subgraph.id, version))
})
.fold(HashMap::new(), |mut acc, (subgraph_id, version)| {
for allocation in &version.subgraph_deployment.allocations {
let indexer_id = allocation.indexer.id;
let deployment_id = version.subgraph_deployment.id;

// If the indexer info is not present, insert it if it is valid
let indexer = match acc.entry(indexer_id) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => match try_into_indexer_raw_info(&allocation.indexer) {
Ok(info) => entry.insert(info),
Err(err) => {
// Log the error and skip the indexer
tracing::info!(
subgraph_id=%subgraph_id,
version=%version.version,
deployment_id=%deployment_id,
allocation_id=%allocation.id,
indexer_id=%indexer_id,
"invalid indexer info: {err}"
);
continue;
}
},
};

// Update the indexer's indexings largest allocations table
let indexing_largest_allocation = match indexer_indexing_largest_allocation
.entry((indexer_id, deployment_id))
{
Entry::Vacant(entry) => {
entry.insert((allocation.id, allocation.allocated_tokens));
allocation.id
}
Entry::Occupied(entry) => {
let (largest_allocation_address, largest_allocation_amount) = entry.into_mut();
if allocation.allocated_tokens > *largest_allocation_amount {
*largest_allocation_address = allocation.id;
*largest_allocation_amount = allocation.allocated_tokens;
}
*largest_allocation_address
}
};
let mut indexers = HashMap::new();
let mut largest_allocations = HashMap::new();

// Update the indexer's indexings info
let indexing = indexer
.indexings
.entry(deployment_id)
.or_insert(IndexingRawInfo {
largest_allocation: allocation.id,
});

indexing.largest_allocation = indexing_largest_allocation;
for subgraph in data {
for version in &subgraph.versions {
process_allocations_into_indexers(
&version.subgraph_deployment.allocations,
version.subgraph_deployment.id,
&mut indexers,
&mut largest_allocations,
);
}
}

acc
})
indexers
}

/// Convert from the fetched subgraphs information into the internal representation.
Expand Down Expand Up @@ -235,3 +183,116 @@ fn try_into_indexer_raw_info(
indexings: Default::default(),
})
}

/// Convert orphaned deployments (not linked to any active subgraph) into internal representation.
///
/// Orphaned deployments have `activeSubgraphCount: 0`. This function filters out deployments
/// without a valid manifest or without active allocations.
pub fn into_orphaned_deployments_raw_info(
data: impl Iterator<Item = subgraph_client::types::SubgraphDeployment>,
) -> HashMap<DeploymentId, DeploymentRawInfo> {
data.filter_map(|deployment| {
// Validate manifest exists
let manifest = deployment.manifest.as_ref()?;
let network = manifest.network.as_ref()?;

// Skip if no active allocations
if deployment.allocations.is_empty() {
return None;
}

let allocations = deployment
.allocations
.iter()
.map(|a| AllocationInfo {
indexer: a.indexer.id,
})
.collect();

Some((
deployment.id,
DeploymentRawInfo {
id: deployment.id,
manifest_network: network.clone(),
manifest_start_block: manifest.start_block,
subgraphs: Default::default(), // Empty - orphaned
allocations,
},
))
})
.collect()
}

/// Extract indexer information from orphaned deployments.
pub fn into_indexers_raw_info_from_orphaned_deployments<'a>(
data: impl Iterator<Item = &'a subgraph_client::types::SubgraphDeployment>,
) -> HashMap<IndexerId, IndexerRawInfo> {
let mut indexers = HashMap::new();
let mut largest_allocations = HashMap::new();

for deployment in data {
process_allocations_into_indexers(
&deployment.allocations,
deployment.id,
&mut indexers,
&mut largest_allocations,
);
}

indexers
}

/// Process allocations and update indexer maps.
///
/// For each allocation, validates the indexer info and tracks the largest allocation
/// per (indexer, deployment) pair.
fn process_allocations_into_indexers(
allocations: &[subgraph_client::types::Allocation],
deployment_id: DeploymentId,
indexers: &mut HashMap<IndexerId, IndexerRawInfo>,
largest_allocations: &mut HashMap<(IndexerId, DeploymentId), (AllocationId, u128)>,
) {
for allocation in allocations {
let indexer_id = allocation.indexer.id;

let indexer = match indexers.entry(indexer_id) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => match try_into_indexer_raw_info(&allocation.indexer) {
Ok(info) => entry.insert(info),
Err(err) => {
tracing::info!(
%deployment_id,
allocation_id = %allocation.id,
%indexer_id,
"invalid indexer info: {err}"
);
continue;
}
},
};

let indexing_largest_allocation =
match largest_allocations.entry((indexer_id, deployment_id)) {
Entry::Vacant(entry) => {
entry.insert((allocation.id, allocation.allocated_tokens));
allocation.id
}
Entry::Occupied(entry) => {
let (addr, amount) = entry.into_mut();
if allocation.allocated_tokens > *amount {
*addr = allocation.id;
*amount = allocation.allocated_tokens;
}
*addr
}
};

indexer
.indexings
.entry(deployment_id)
.or_insert(IndexingRawInfo {
largest_allocation: allocation.id,
})
.largest_allocation = indexing_largest_allocation;
}
}
30 changes: 23 additions & 7 deletions src/network/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,23 +250,39 @@ fn spawn_updater_task(
/// Fetch the subgraphs information from the graph network subgraph and performs pre-processing
/// steps, i.e., validation and conversion into the internal representation.
///
/// 1. Fetch the subgraphs information from the graph network subgraph.
/// 2. Validate and convert the subgraphs fetched info into the internal representation.
/// 1. Fetch the subgraphs and orphaned deployments from the graph network subgraph.
/// 2. Validate and convert the fetched info into the internal representation.
/// 3. Merge orphaned deployments into the deployments map.
///
/// If the fetch fails or the response is empty, an error is returned.
///
/// Invalid info is filtered out before converting into the internal representation.
pub async fn fetch_and_preprocess_subgraph_info(
client: &mut SubgraphClient,
) -> anyhow::Result<PreprocessedNetworkInfo> {
// Fetch the subgraphs information from the graph network subgraph
// Fetch the subgraphs and orphaned deployments from the graph network subgraph
let data = client.fetch().await?;
anyhow::ensure!(!data.is_empty(), "empty subgraph response");
anyhow::ensure!(!data.subgraphs.is_empty(), "empty subgraph response");

// Pre-process (validate and convert) the fetched subgraphs information
let indexers = pre_processing::into_internal_indexers_raw_info(data.iter());
let subgraphs = pre_processing::into_internal_subgraphs_raw_info(data.into_iter());
let deployments = pre_processing::into_internal_deployments_raw_info(subgraphs.values());
let mut indexers = pre_processing::into_internal_indexers_raw_info(data.subgraphs.iter());
let subgraphs = pre_processing::into_internal_subgraphs_raw_info(data.subgraphs.into_iter());
let mut deployments = pre_processing::into_internal_deployments_raw_info(subgraphs.values());

// Pre-process orphaned deployments and merge them
let orphaned_indexers = pre_processing::into_indexers_raw_info_from_orphaned_deployments(
data.orphaned_deployments.iter(),
);
let orphaned_deployments =
pre_processing::into_orphaned_deployments_raw_info(data.orphaned_deployments.into_iter());

for (id, indexer) in orphaned_indexers {
indexers.entry(id).or_insert(indexer);
}

for (id, deployment) in orphaned_deployments {
deployments.entry(id).or_insert(deployment);
}

let subgraphs = subgraph_processing::process_subgraph_info(subgraphs);
let deployments = subgraph_processing::process_deployments_info(deployments);
Expand Down
Loading
Loading