End-to-End-ML-Feature-Pipeline-Online-Serving
End to End ML Feature Pipeline Online Serving
No description provided on GitHub.
ML Feature Pipeline
Production-grade ML feature engineering pipeline that computes real-time and batch features from raw events, stores them in a feature store, and serves them at <10ms P99 latency for model inference.
Implements the feature store pattern used at Uber (Michelangelo), LinkedIn (Feathr), and major ML platforms.
Architecture
Raw Events (Kafka)
β
βΌ
βββββββββββββββββββββββ βββββββββββββββββββββββ
β Kafka Consumer ββββββββΆβ Feature Validator β
β (ingestion/) β β (core/validators) β
βββββββββββββββββββββββ ββββββββββββ¬βββββββββββ
β
βββββββββββββββββββββββββββββΌβββββββββββββββββββββββββββ
βΌ βΌ βΌ
βββββββββββββββββββββ βββββββββββββββββββββ βββββββββββββββββββββ
β Redis Online β β PostgreSQL β β Feature Registry β
β Store (<1ms) β β Offline Store β β (core/registry) β
ββββββββββ¬βββββββββββ β (TimescaleDB) β βββββββββββββββββββββ
β ββββββββββ¬βββββββββββ
β β
ββββββββββββ¬ββββββββββββββββ
βΌ
βββββββββββββββββββββ βββββββββββββββββββββ
β Feature Store ββββββββ Batch Pipeline β
β (core/) β β (Polars/Spark) β
ββββββββββ¬βββββββββββ βββββββββββββββββββββ
β
ββββββββββββ΄βββββββββββ
βΌ βΌ
βββββββββββββββ βββββββββββββββ
β REST API β β gRPC API β
β (FastAPI) β β (50051) β
βββββββββββββββ βββββββββββββββ
Quick Start
Prerequisites
- Docker + Docker Compose
- Python 3.10+
1. Start Infrastructure
cd docker
docker compose up -d
This starts: Kafka, Zookeeper, Redis, PostgreSQL (TimescaleDB), Prometheus, Grafana.
2. Install Python Dependencies
python -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
pip install -r requirements.txt
3. Set Up Kafka Topics
bash scripts/setup_kafka.sh
4. Start the API Server
uvicorn src.api.rest_api:app --reload --port 8000
5. Generate Test Events
python scripts/generate_test_data.py --events 10000 --topic transaction-events
Project Structure
ml-feature-pipeline/
βββ src/
β βββ api/ # REST (FastAPI) + gRPC serving
β β βββ rest_api.py
β β βββ grpc_server.py
β β βββ proto/
β βββ core/ # Domain logic
β β βββ feature_store.py # Main orchestrator
β β βββ registry.py # Feature catalog
β β βββ schemas.py # Pydantic models
β β βββ validators.py # Schema validation
β β βββ ab_testing.py # A/B experiments
β βββ ingestion/ # Kafka β Feature pipeline
β β βββ kafka_consumer.py
β β βββ transformers.py
β βββ features/ # Feature computation
β β βββ realtime_features.py
β β βββ batch_features.py
β β βββ definitions/ # Feature schemas
β βββ storage/ # Store implementations
β β βββ redis_store.py # Online (<1ms reads)
β β βββ postgres_store.py # Offline (TimescaleDB)
β β βββ point_in_time.py # PIT joins (no leakage)
β βββ backfill/ # Historical recomputation
β βββ utils/ # Metrics + logging
βββ tests/
β βββ unit/ # Fast, mocked tests
β βββ integration/ # Full pipeline (mocked stores)
β βββ performance/ # Latency + throughput
βββ docker/ # Dockerfiles + docker-compose
βββ k8s/ # Kubernetes manifests
βββ config/ # YAML configuration
βββ scripts/ # Setup + data generation
API Reference
REST API (http://localhost:8000)
| Method | Endpoint | Description |
|---|---|---|
POST | /v1/features/online | Serve online features |
POST | /v1/features/batch | Submit batch job |
GET | /v1/features/registry | List feature groups |
POST | /v1/features/registry | Register feature group |
GET | /v1/features/registry/{name} | Get feature group |
GET | /v1/health | Health check |
GET | /v1/metrics | Prometheus metrics |
POST | /v1/experiments | Create A/B experiment |
GET | /v1/experiments/{name}/variant | Get variant for entity |
Interactive docs: http://localhost:8000/docs
gRPC API (:50051)
service FeatureService {
rpc GetFeatures(GetFeaturesRequest) returns (GetFeaturesResponse);
rpc GetFeaturesBatch(GetFeaturesBatchRequest) returns (stream GetFeaturesResponse);
}
Generate stubs:
python -m grpc_tools.protoc \
-I src/api/proto \
--python_out=src/api/proto \
--grpc_python_out=src/api/proto \
src/api/proto/feature_service.proto
Feature Definitions
All features are defined in config/feature_definitions.yaml and registered via the Python API or YAML loader.
User Features
| Feature | Type | Computation | TTL |
|---|---|---|---|
transaction_count_24h | int | realtime | 24h |
avg_transaction_amount_7d | float | batch | 24h |
std_transaction_amount_7d | float | batch | 24h |
transaction_velocity_1h | float | realtime | 1h |
unique_merchants_30d | int | batch | 24h |
user_lifetime_value | float | batch | 7d |
account_age_days | int | batch | 24h |
kyc_verified | bool | realtime | β |
typical_active_hours | list | batch | 7d |
Transaction Features
| Feature | Type | Computation |
|---|---|---|
transaction_amount | float | realtime |
amount_zscore | float | on-demand |
transaction_currency | string | realtime |
merchant_category | string | realtime |
is_online | bool | realtime |
is_new_merchant | bool | on-demand |
merchant_category_frequency | float | batch |
is_high_risk_time | bool | on-demand |
Running Tests
# Unit tests
pytest tests/unit/ -v --cov=src
# Integration tests
pytest tests/integration/ -v
# Performance / latency tests
pytest tests/performance/ -v --benchmark-sort=mean
# Full suite with coverage
pytest --cov=src --cov-report=html
open htmlcov/index.html
Kubernetes Deployment
# Create namespace
kubectl create namespace feature-pipeline
# Apply configs, secrets, deployments, services
kubectl apply -f k8s/configmaps/
kubectl apply -f k8s/deployments/
kubectl apply -f k8s/services/
# Check rollout
kubectl rollout status deployment/feature-api -n feature-pipeline
# Scale API manually
kubectl scale deployment feature-api --replicas=5 -n feature-pipeline
HPA (Horizontal Pod Autoscaler) is pre-configured: scales API from 3β20 pods at 60% CPU.
Performance Targets
| Metric | Target | Notes |
|---|---|---|
| P99 read latency | < 10ms | Redis online store |
| P50 read latency | < 1ms | In-memory path |
| Write throughput | > 10,000 ops/s | Kafka + Redis |
| Cache hit rate | > 95% | Hot features |
| API QPS (per pod) | > 5,000 | async FastAPI |
| Batch job duration | < 1h | Daily aggregations |
Monitoring
- Prometheus:
http://localhost:9090 - Grafana:
http://localhost:3000(admin / admin)
Key metrics exposed at /v1/metrics:
feature_pipeline_feature_read_latency_seconds(histogram)feature_pipeline_feature_write_latency_seconds(histogram)feature_pipeline_feature_cache_hit_ratio(gauge)feature_pipeline_feature_validation_errors_total(counter)feature_pipeline_api_requests_total(counter)feature_pipeline_events_processed_total(counter)
Configuration
Edit config/config.yaml or use environment variables:
| Env var | Default | Description |
|---|---|---|
REDIS_HOST | localhost | Redis hostname |
REDIS_PORT | 6379 | Redis port |
POSTGRES_HOST | localhost | Postgres hostname |
POSTGRES_PASSWORD | β | Postgres password |
KAFKA_BOOTSTRAP_SERVERS | localhost:9092 | Kafka brokers |
LOG_LEVEL | INFO | Logging level |
ENV | local | Environment name |
Key Design Decisions
-
Point-in-Time Correctness:
storage/point_in_time.pyensures training features never include future data. Critical for model/serving consistency. -
Exactly-Once Processing: Kafka consumer commits offsets only after successful batch processing + store writes. Failed events go to a dead-letter queue (DLQ).
-
Schema Versioning: Non-breaking changes (additive) are allowed automatically. Breaking changes (removals, type changes) require a new major version.
-
A/B Testing via Consistent Hashing: User assignment is deterministic (same user always in same bucket) across restarts and replicas, without shared state.
-
Graceful Degradation: If Redis is unavailable, serving falls back to PostgreSQL. Validation errors are logged + counted but don't crash the pipeline.
License
MIT