- Published on
X Algorithm Part 2: The Composable Pipeline Framework in Rust
- Authors

- Name
- Steve Tran
X Algorithm Part 2: The Composable Pipeline Framework in Rust
This is Part 2 of a 5-part series on the X recommendation algorithm. Part 1 gives the big-picture overview. Part 3 covers candidate sourcing. Part 4 dives into the ranking transformer. Part 5 covers scoring, filtering, and the final feed.
Before we get into Thunder, Phoenix, or the Grok transformer, we need to understand the skeleton that holds everything together: the CandidatePipeline framework.
This is the most underrated part of the codebase. It's a small library (~330 lines) that defines a generic, trait-based pipeline contract. Every recommendation component — filters, hydrators, scorers, sources — plugs into this framework. The orchestration logic is written once and reused everywhere.
The Core Idea: Traits as Contracts
The framework lives in candidate-pipeline/ and defines seven traits, each representing one role in the pipeline:
| Trait | Runs | Purpose |
|---|---|---|
QueryHydrator<Q> | parallel | Enrich the query with user context |
Source<Q, C> | parallel | Fetch candidate posts from a data source |
Hydrator<Q, C> | parallel | Enrich candidates with additional data |
Filter<Q, C> | sequential | Remove ineligible candidates |
Scorer<Q, C> | sequential | Compute scores for candidates |
Selector<Q, C> | single | Sort and select top-K |
SideEffect<Q, C> | async / fire-and-forget | Cache, log, post-process |
Each trait is minimal. Here's Source:
candidate-pipeline/source.rs:7-22
#[async_trait]
pub trait Source<Q, C>: Any + Send + Sync
where
Q: Clone + Send + Sync + 'static,
C: Clone + Send + Sync + 'static,
{
fn enable(&self, _query: &Q) -> bool { true }
async fn get_candidates(&self, query: &Q) -> Result<Vec<C>, String>;
fn name(&self) -> &'static str { util::short_type_name(type_name_of_val(self)) }
}
Three methods: an optional enable gate, the async get_candidates implementation, and a name for logging. That's the entire contract a candidate source must fulfill.
Filter returns a FilterResult<C> that explicitly separates kept from removed candidates — a design detail that matters for observability, since the framework logs removal counts at each step:
candidate-pipeline/filter.rs:6-32
pub struct FilterResult<C> {
pub kept: Vec<C>,
pub removed: Vec<C>,
}
#[async_trait]
pub trait Filter<Q, C>: Any + Send + Sync { ... }
Scorer and Hydrator share an important constraint in their docstrings:
candidate-pipeline/scorer.rs:20-22
/// IMPORTANT: The returned vector must have the same candidates in the same order as the input.
/// Dropping candidates in a scorer is not allowed - use a filter stage instead.
async fn score(&self, query: &Q, candidates: &[C]) -> Result<Vec<C>, String>;
The same constraint exists on Hydrator (candidate-pipeline/hydrator.rs:20-22). Scorers cannot drop candidates. If you want to remove a post, you write a Filter. This separation enforces a clean invariant: after scoring, the candidate list is the same length it was before. The framework uses this guarantee to detect bugs (more on that below).
The CandidatePipeline Trait
The ten-method CandidatePipeline trait is where implementors declare which components they're using:
candidate-pipeline/candidate_pipeline.rs:37-92
pub trait CandidatePipeline<Q, C>: Send + Sync {
fn query_hydrators(&self) -> &[Box<dyn QueryHydrator<Q>>];
fn sources(&self) -> &[Box<dyn Source<Q, C>>];
fn hydrators(&self) -> &[Box<dyn Hydrator<Q, C>>];
fn filters(&self) -> &[Box<dyn Filter<Q, C>>];
fn scorers(&self) -> &[Box<dyn Scorer<Q, C>>];
fn selector(&self) -> &dyn Selector<Q, C>;
fn post_selection_hydrators(&self) -> &[Box<dyn Hydrator<Q, C>>];
fn post_selection_filters(&self) -> &[Box<dyn Filter<Q, C>>];
fn side_effects(&self) -> Arc<Vec<Box<dyn SideEffect<Q, C>>>>;
fn result_size(&self) -> usize;
// execute() is provided — implementors only fill in the above
async fn execute(&self, query: Q) -> PipelineResult<Q, C> { ... }
}
Implementing this trait is just wiring: return your list of components for each slot. The execute() method is provided by the framework and is not overridden. The orchestration logic is written once.
Here's how PhoenixCandidatePipeline in home-mixer/ wires everything together:
home-mixer/candidate_pipeline/phoenix_candidate_pipeline.rs:216-255
impl CandidatePipeline<ScoredPostsQuery, PostCandidate> for PhoenixCandidatePipeline {
fn query_hydrators(&self) -> &[Box<dyn QueryHydrator<ScoredPostsQuery>>] {
&self.query_hydrators // [UserActionSeqQueryHydrator, UserFeaturesQueryHydrator]
}
fn sources(&self) -> &[Box<dyn Source<ScoredPostsQuery, PostCandidate>>] {
&self.sources // [PhoenixSource, ThunderSource]
}
fn filters(&self) -> &[Box<dyn Filter<ScoredPostsQuery, PostCandidate>>] {
&self.filters // [DropDuplicatesFilter, AgeFilter, SelfTweetFilter, ...]
}
fn scorers(&self) -> &[Box<dyn Scorer<ScoredPostsQuery, PostCandidate>>] {
&self.scorers // [PhoenixScorer, WeightedScorer, AuthorDiversityScorer, OONScorer]
}
// ...
fn result_size(&self) -> usize { params::RESULT_SIZE }
}
The execute() orchestration is inherited. No glue code needed.
How execute() Works
The pipeline execution is a linear sequence of await calls, each stage feeding into the next:
candidate-pipeline/candidate_pipeline.rs:53-92
async fn execute(&self, query: Q) -> PipelineResult<Q, C> {
let hydrated_query = self.hydrate_query(query).await;
let candidates = self.fetch_candidates(&hydrated_query).await;
let hydrated_candidates = self.hydrate(&hydrated_query, candidates).await;
let (kept_candidates, filtered_candidates) = self.filter(&hydrated_query, hydrated_candidates).await;
let scored_candidates = self.score(&hydrated_query, kept_candidates).await;
let selected_candidates = self.select(&hydrated_query, scored_candidates);
let post_hydrated = self.hydrate_post_selection(&hydrated_query, selected_candidates).await;
let (mut final_candidates, post_filtered) = self.filter_post_selection(&hydrated_query, post_hydrated).await;
final_candidates.truncate(self.result_size());
self.run_side_effects(...);
PipelineResult { ... }
}
Stages are sequential — each one awaits completion before the next begins. But within the parallel stages (hydrate_query, fetch_candidates, hydrate), all sub-components run concurrently:
candidate-pipeline/candidate_pipeline.rs:129-130
// Sources run in parallel
let source_futures = sources.iter().map(|s| s.get_candidates(query));
let results = join_all(source_futures).await;
candidate-pipeline/candidate_pipeline.rs:246
// Filters run one by one
for filter in filters.iter() {
match filter.filter(query, candidates).await { ... }
}
The distinction is intentional. Sources are independent — Thunder and Phoenix don't need each other's results. Filters are dependent — RetweetDeduplicationFilter needs to run after DropDuplicatesFilter has already removed exact duplicates.
Three Error Handling Patterns
This is where the framework gets interesting. It makes deliberate choices that prioritize availability over correctness.
Pattern 1: Failed sources return empty
If Thunder or Phoenix crashes mid-request, the source returns an error. The framework logs it and moves on with zero candidates from that source. The feed may be thinner but the request doesn't fail.
candidate-pipeline/candidate_pipeline.rs:145-152
Err(err) => {
error!("request_id={} stage=Source component={} failed: {}", ...);
// collected stays empty for this source
}
Pattern 2: Failed hydrators are silently skipped
If GizmoduckCandidateHydrator (author info) fails, candidates continue downstream without author_screen_name or author_followers_count populated. The framework logs the error and the next hydrator runs.
candidate-pipeline/candidate_pipeline.rs:205-213
Err(err) => {
error!("request_id={} stage=Hydrator component={} failed: {}", ...);
// candidates unchanged — field stays None
}
Pattern 3: Failed filters fall back to their input
This is the most interesting one. Before each filter runs, the framework saves a backup of the current candidate list:
candidate-pipeline/candidate_pipeline.rs:247-262
for filter in filters {
let backup = candidates.clone();
match filter.filter(query, candidates).await {
Ok(result) => { candidates = result.kept; }
Err(err) => {
error!(...);
candidates = backup; // restore pre-filter state
}
}
}
If MutedKeywordFilter crashes, the candidates it would have filtered out are preserved. The user might see a post with a muted keyword — a content correctness failure — but the request completes. In a social feed, an occasional incorrect post is far better than a blank screen.
The Length Mismatch Guard
For hydrators and scorers, there's a subtle correctness check. Because these stages must return exactly as many candidates as they received, the framework validates this:
candidate-pipeline/candidate_pipeline.rs:192-202
if hydrated.len() == expected_len {
hydrator.update_all(&mut candidates, hydrated);
} else {
warn!("skipped: length_mismatch expected={} got={}", expected_len, hydrated.len());
}
The same check applies to scorers (candidate-pipeline/candidate_pipeline.rs:282-292). If a hydrator accidentally drops or duplicates a candidate, its entire result is discarded. This prevents subtle off-by-one bugs from silently corrupting downstream stages. The warning is logged for monitoring.
The enable() Gate
Every trait has an enable(&self, query: &Q) -> bool method that defaults to true. This is a lightweight mechanism for conditional execution:
candidate-pipeline/source.rs:13-15
fn enable(&self, _query: &Q) -> bool { true } // default: always run
A component can override this to skip itself based on query context — for example, a scorer that only applies to video posts could check query.has_video_candidates() and return false otherwise. The framework filters out disabled components before running any stage:
candidate-pipeline/candidate_pipeline.rs:128-129
let sources: Vec<_> = self.sources().iter().filter(|s| s.enable(query)).collect();
This avoids conditional logic scattered across business code. Instead of if is_video { video_scorer.score(...) }, the scorer itself declares when it's relevant.
Side Effects: Fire and Forget
Side effects are the one stage that doesn't block the response. After final_candidates is assembled and ready to return, side effects are spawned into a detached Tokio task:
candidate-pipeline/candidate_pipeline.rs:319-328
fn run_side_effects(&self, input: Arc<SideEffectInput<Q, C>>) {
let side_effects = self.side_effects();
tokio::spawn(async move {
let futures = side_effects.iter().map(|se| se.run(input.clone()));
let _ = join_all(futures).await;
});
}
The tokio::spawn returns immediately. The response is sent to the client while caching (e.g., CacheRequestInfoSideEffect) happens asynchronously in the background. Errors from side effects are discarded (let _ = ...) — they're best-effort operations that shouldn't block the feed.
Why This Design Matters
The framework gives X three things that are hard to get together:
Safe experimentation. Adding a new filter or scorer is a two-step operation: implement the trait, add it to the list in PhoenixCandidatePipeline. No changes to orchestration code. This is critical when running hundreds of A/B experiments in parallel — each experiment can swap in a different component list without touching shared infrastructure.
Observability by default. Every component has a name() method. Every stage logs which components ran, how many candidates were kept/removed, and any errors — all tagged with request_id. Debugging a bad feed result means grepping logs, not reading code.
Graceful degradation. The three error handling patterns together mean the system degrades incrementally rather than failing hard. A flaky downstream service affects one component, not the whole request.
What's Next
With the framework understood, the next parts zoom into the components that plug into it:
- Part 3: How candidates are found — Thunder's in-memory post store and Phoenix's two-tower retrieval model.
- Part 4: How candidates are ranked — the Grok-based transformer with candidate isolation masking.
- Part 5: How the final feed is assembled — the four-scorer chain, 10 filters, and the weighted score formula.