🐍

End-to-End-ML-Feature-Pipeline-Online-Serving

Open Source
PythonPYTHON

End to End ML Feature Pipeline Online Serving

No description provided on GitHub.

Created
Mar 2026
Last Updated
Mar 2026
Stars
1 ⭐
Status
Available

ML Feature Pipeline

Production-grade ML feature engineering pipeline that computes real-time and batch features from raw events, stores them in a feature store, and serves them at <10ms P99 latency for model inference.

Implements the feature store pattern used at Uber (Michelangelo), LinkedIn (Feathr), and major ML platforms.


Architecture

HCgEQLxa8AAXUSI HCklyUUawAoCWHb HCkfwLha4AA7IaY HCkiJNRawAcpglO

Raw Events (Kafka)
      β”‚
      β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Kafka Consumer     │──────▢│  Feature Validator  β”‚
β”‚  (ingestion/)       β”‚       β”‚  (core/validators)  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                          β”‚
              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
              β–Ό                           β–Ό                          β–Ό
   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
   β”‚   Redis Online    β”‚      β”‚  PostgreSQL        β”‚     β”‚  Feature Registry β”‚
   β”‚   Store (<1ms)    β”‚      β”‚  Offline Store     β”‚     β”‚  (core/registry)  β”‚
   β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚  (TimescaleDB)    β”‚     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
            β”‚                 β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
            β”‚                          β”‚
            β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                       β–Ό
            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
            β”‚   Feature Store   │◀─────│  Batch Pipeline   β”‚
            β”‚   (core/)         β”‚      β”‚  (Polars/Spark)   β”‚
            β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                     β”‚
          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
          β–Ό                     β–Ό
   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
   β”‚  REST API   β”‚      β”‚  gRPC API   β”‚
   β”‚  (FastAPI)  β”‚      β”‚  (50051)    β”‚
   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Quick Start

Prerequisites

  • Docker + Docker Compose
  • Python 3.10+

1. Start Infrastructure

cd docker
docker compose up -d

This starts: Kafka, Zookeeper, Redis, PostgreSQL (TimescaleDB), Prometheus, Grafana.

2. Install Python Dependencies

python -m venv .venv
source .venv/bin/activate        # Windows: .venv\Scripts\activate
pip install -r requirements.txt

3. Set Up Kafka Topics

bash scripts/setup_kafka.sh

4. Start the API Server

uvicorn src.api.rest_api:app --reload --port 8000

5. Generate Test Events

python scripts/generate_test_data.py --events 10000 --topic transaction-events

Project Structure

ml-feature-pipeline/
β”œβ”€β”€ src/
β”‚   β”œβ”€β”€ api/               # REST (FastAPI) + gRPC serving
β”‚   β”‚   β”œβ”€β”€ rest_api.py
β”‚   β”‚   β”œβ”€β”€ grpc_server.py
β”‚   β”‚   └── proto/
β”‚   β”œβ”€β”€ core/              # Domain logic
β”‚   β”‚   β”œβ”€β”€ feature_store.py   # Main orchestrator
β”‚   β”‚   β”œβ”€β”€ registry.py        # Feature catalog
β”‚   β”‚   β”œβ”€β”€ schemas.py         # Pydantic models
β”‚   β”‚   β”œβ”€β”€ validators.py      # Schema validation
β”‚   β”‚   └── ab_testing.py      # A/B experiments
β”‚   β”œβ”€β”€ ingestion/         # Kafka β†’ Feature pipeline
β”‚   β”‚   β”œβ”€β”€ kafka_consumer.py
β”‚   β”‚   └── transformers.py
β”‚   β”œβ”€β”€ features/          # Feature computation
β”‚   β”‚   β”œβ”€β”€ realtime_features.py
β”‚   β”‚   β”œβ”€β”€ batch_features.py
β”‚   β”‚   └── definitions/       # Feature schemas
β”‚   β”œβ”€β”€ storage/           # Store implementations
β”‚   β”‚   β”œβ”€β”€ redis_store.py     # Online (<1ms reads)
β”‚   β”‚   β”œβ”€β”€ postgres_store.py  # Offline (TimescaleDB)
β”‚   β”‚   └── point_in_time.py   # PIT joins (no leakage)
β”‚   β”œβ”€β”€ backfill/          # Historical recomputation
β”‚   └── utils/             # Metrics + logging
β”œβ”€β”€ tests/
β”‚   β”œβ”€β”€ unit/              # Fast, mocked tests
β”‚   β”œβ”€β”€ integration/       # Full pipeline (mocked stores)
β”‚   └── performance/       # Latency + throughput
β”œβ”€β”€ docker/                # Dockerfiles + docker-compose
β”œβ”€β”€ k8s/                   # Kubernetes manifests
β”œβ”€β”€ config/                # YAML configuration
└── scripts/               # Setup + data generation

API Reference

REST API (http://localhost:8000)

MethodEndpointDescription
POST/v1/features/onlineServe online features
POST/v1/features/batchSubmit batch job
GET/v1/features/registryList feature groups
POST/v1/features/registryRegister feature group
GET/v1/features/registry/{name}Get feature group
GET/v1/healthHealth check
GET/v1/metricsPrometheus metrics
POST/v1/experimentsCreate A/B experiment
GET/v1/experiments/{name}/variantGet variant for entity

Interactive docs: http://localhost:8000/docs

gRPC API (:50051)

service FeatureService {
  rpc GetFeatures(GetFeaturesRequest) returns (GetFeaturesResponse);
  rpc GetFeaturesBatch(GetFeaturesBatchRequest) returns (stream GetFeaturesResponse);
}

Generate stubs:

python -m grpc_tools.protoc \
  -I src/api/proto \
  --python_out=src/api/proto \
  --grpc_python_out=src/api/proto \
  src/api/proto/feature_service.proto

Feature Definitions

All features are defined in config/feature_definitions.yaml and registered via the Python API or YAML loader.

User Features

FeatureTypeComputationTTL
transaction_count_24hintrealtime24h
avg_transaction_amount_7dfloatbatch24h
std_transaction_amount_7dfloatbatch24h
transaction_velocity_1hfloatrealtime1h
unique_merchants_30dintbatch24h
user_lifetime_valuefloatbatch7d
account_age_daysintbatch24h
kyc_verifiedboolrealtimeβ€”
typical_active_hourslistbatch7d

Transaction Features

FeatureTypeComputation
transaction_amountfloatrealtime
amount_zscorefloaton-demand
transaction_currencystringrealtime
merchant_categorystringrealtime
is_onlineboolrealtime
is_new_merchantboolon-demand
merchant_category_frequencyfloatbatch
is_high_risk_timeboolon-demand

Running Tests

# Unit tests
pytest tests/unit/ -v --cov=src

# Integration tests
pytest tests/integration/ -v

# Performance / latency tests
pytest tests/performance/ -v --benchmark-sort=mean

# Full suite with coverage
pytest --cov=src --cov-report=html
open htmlcov/index.html

Kubernetes Deployment

# Create namespace
kubectl create namespace feature-pipeline

# Apply configs, secrets, deployments, services
kubectl apply -f k8s/configmaps/
kubectl apply -f k8s/deployments/
kubectl apply -f k8s/services/

# Check rollout
kubectl rollout status deployment/feature-api -n feature-pipeline

# Scale API manually
kubectl scale deployment feature-api --replicas=5 -n feature-pipeline

HPA (Horizontal Pod Autoscaler) is pre-configured: scales API from 3β†’20 pods at 60% CPU.


Performance Targets

MetricTargetNotes
P99 read latency< 10msRedis online store
P50 read latency< 1msIn-memory path
Write throughput> 10,000 ops/sKafka + Redis
Cache hit rate> 95%Hot features
API QPS (per pod)> 5,000async FastAPI
Batch job duration< 1hDaily aggregations

Monitoring

  • Prometheus: http://localhost:9090
  • Grafana: http://localhost:3000 (admin / admin)

Key metrics exposed at /v1/metrics:

  • feature_pipeline_feature_read_latency_seconds (histogram)
  • feature_pipeline_feature_write_latency_seconds (histogram)
  • feature_pipeline_feature_cache_hit_ratio (gauge)
  • feature_pipeline_feature_validation_errors_total (counter)
  • feature_pipeline_api_requests_total (counter)
  • feature_pipeline_events_processed_total (counter)

Configuration

Edit config/config.yaml or use environment variables:

Env varDefaultDescription
REDIS_HOSTlocalhostRedis hostname
REDIS_PORT6379Redis port
POSTGRES_HOSTlocalhostPostgres hostname
POSTGRES_PASSWORDβ€”Postgres password
KAFKA_BOOTSTRAP_SERVERSlocalhost:9092Kafka brokers
LOG_LEVELINFOLogging level
ENVlocalEnvironment name

Key Design Decisions

  1. Point-in-Time Correctness: storage/point_in_time.py ensures training features never include future data. Critical for model/serving consistency.

  2. Exactly-Once Processing: Kafka consumer commits offsets only after successful batch processing + store writes. Failed events go to a dead-letter queue (DLQ).

  3. Schema Versioning: Non-breaking changes (additive) are allowed automatically. Breaking changes (removals, type changes) require a new major version.

  4. A/B Testing via Consistent Hashing: User assignment is deterministic (same user always in same bucket) across restarts and replicas, without shared state.

  5. Graceful Degradation: If Redis is unavailable, serving falls back to PostgreSQL. Validation errors are logged + counted but don't crash the pipeline.


License

MIT

Other Projects