← All posts

Hexagonal Architecture in Python

Table of Contents

Introduction {#introduction}

Hexagonal Architecture, also known as Clean Architecture or Ports and Adapters, is a software design pattern that promotes separation of concerns, testability, and maintainability. This architectural pattern places the business logic at the center (the "hexagon") and isolates it from external dependencies through well-defined interfaces.

Project Goal: API-to-PostgreSQL Data Integration

This project demonstrates implementing a robust financial market data pipeline that fetches data from the AlphaVantage API and stores it in PostgreSQL, while maintaining:

Why Clean Architecture for Data Integration?

Traditional data pipelines often become tightly coupled monoliths where business logic is mixed with API calls, database queries, and infrastructure concerns. This makes them:

Clean Architecture solves these problems by creating clear boundaries between different concerns, allowing each component to evolve independently while maintaining system integrity.

Design Principles {#design}

  1. Port: Think of a port as a "contract" or interface that defines what your application needs from the outside world, without caring about the specific details of how those needs are met. It's like a USB port on your computer - it defines the shape and electrical specifications, but doesn't care whether you plug in a mouse, keyboard, or external drive.

  2. Adapter: An adapter is the actual implementation that fulfills the port's contract. It's the specific piece of code that handles the technical details of connecting to databases, web APIs, file systems, or user interfaces. Using the USB analogy, the adapter would be the actual USB cable and device that plugs into the port.

  3. The application logic is a thin layer that “glues together” other layers. It’s also known as “use cases”. If you read this code and can’t tell what database it uses or what URL it calls, it’s a good sign. Sometimes it’s very short, and that’s fine. Think about it as an orchestrator.

  4. The domain layer contains the core business logic and rules that remain constant regardless of external changes. It includes:

  5. Entities: Core business objects with identity and lifecycle (e.g., MarketData, ApiLog)
  6. Value Objects: Immutable objects defined by their attributes (e.g., Price, Symbol, Timestamp)
  7. Domain Services: Business logic that doesn't naturally fit in entities (e.g., price validation rules)

  8. Repositories abstract data persistence, providing a collection-like interface for accessing domain entities. They hide whether data comes from PostgreSQL, MongoDB, or memory.

  9. Services in the domain layer encapsulate business rules and complex operations that involve multiple entities or external calculations.

  10. Entities vs Value Objects:

  11. Entities have identity and lifecycle (a MarketData record with ID=123 is the same entity even if price changes)
  12. Value Objects are immutable and defined by their attributes (Price($100) equals any other Price($100))

  13. Dependency Inversion: High-level modules (domain) should not depend on low-level modules (database). Both should depend on abstractions (interfaces/ports). This enables testing business logic without real databases and swapping implementations without changing core logic.

  14. The Dependency Rule: Dependencies can only point inward toward the domain. The domain layer knows nothing about the application layer, which knows nothing about the infrastructure layer. This creates a protective boundary around your business logic.

Project implementation {#project-implementation}

Prerequisites

Project structure

myproject
├── mypackage
│   ├── adapters
│      ├── inbound
│         └── cli
│      └── outbound
│          ├── external_apis
│          ├── messaging
│          └── persistence
│              ├── file
│              └── postgres
│   ├── application
│      ├── containers
│      └── use_cases
│   ├── domains
│      └── market_data
│          ├── entities
│          ├── service
│          └── value_objects
│   ├── infrastructure
│      ├── configuration
│      ├── database
│         └── service
│      └── logging
│   └── ports
│       └── outbound
├── resources
├── logs
├── main.py

Clean Architecture Flow

{{< clean-architecture >}}

Example entity - MarketData:

# domains/market_data/entities/market_data.py

from dataclasses import dataclass, field
from datetime import datetime
from decimal import Decimal
from enum import Enum
from typing import Any, Dict, List, Optional


class DataSource(Enum):
    API = "API"
    CSV = "CSV"
    MANUAL = "MANUAL"


class DataStatus(Enum):
    PENDING = "PENDING"
    VALIDATED = "VALIDATED"
    FAILED = "FAILED"
    SAVED = "SAVED"


@dataclass
class MarketData:
    """Domain Entity for API market data"""

    id: Optional[int] = None
    symbol: str = ""
    price: Decimal = Decimal("0.00")
    volume: int = 0
    market_cap: Optional[Decimal] = None
    pe_ratio: Optional[Decimal] = None
    data_timestamp: Optional[datetime] = None
    source: DataSource = DataSource.API
    status: DataStatus = DataStatus.PENDING
    raw_data: Dict[str, Any] = field(default_factory=dict)
    created_at: Optional[datetime] = None
    updated_at: Optional[datetime] = None

    def __post_init__(self) -> None:
        if self.created_at is None:
            self.created_at = datetime.now()
        self.updated_at = datetime.now()

    def validate(self) -> List[str]:
        """Business validation rules"""
        errors = []

        if not self.symbol.strip():
            errors.append("Symbol is required")

        if self.price <= 0:
            errors.append("Price must be positive")

        if self.volume < 0:
            errors.append("Volume cannot be negative")

        if self.market_cap is not None and self.market_cap <= 0:
            errors.append("Market cap must be positive if provided")

        if self.pe_ratio is not None and self.pe_ratio <= 0:
            errors.append("PE ratio must be positive if provided")

        if self.data_timestamp is None:
            errors.append("Data timestamp is required")

        return errors

    def is_valid(self) -> bool:
        return len(self.validate()) == 0

    def mark_as_validated(self) -> None:
        """Business behavior"""
        if self.is_valid():
            self.status = DataStatus.VALIDATED
            self.updated_at = datetime.now()
        else:
            raise ValueError(f"Cannot validate: {self.validate()}")

    def mark_as_saved(self) -> None:
        """Business behavior"""
        if self.status == DataStatus.VALIDATED:
            self.status = DataStatus.SAVED
            self.updated_at = datetime.now()
        else:
            raise ValueError("Only validated data can be marked as saved")

Example Port

from abc import ABC, abstractmethod

from selene.domains.market_data.value_objects.api_response import APIResponse


class MarketDataAPIPort(ABC):
    """Port for market data API operations"""

    @abstractmethod
    def get_market_data(self, symbol: str) -> APIResponse:
        """Fetch market data for a given symbol."""

    @abstractmethod
    def get_bulk_market_data(self, symbols: list[str]) -> APIResponse:
        """Fetch market data for multiple symbols."""

Example API configuration

api:
  name: alphavantage
  description: Alpha Vantage API for financial data
  website: "https://www.alphavantage.co"
  base_url: "https://www.alphavantage.co/query"
  timeout_seconds: 30
  retry_attempts: 3
  rate_limit_per_minute: 60
  default_endpoint: global_quote # Specify which endpoint to use by default
  endpoints:
    - name: global_quote
      method: GET
      params:
        - name: function
          required: true
          type: string
          default: "GLOBAL_QUOTE"
        - name: symbol
          required: true
          type: string
        - name: apikey
          required: true
          type: string
      schema:
        price_path: ["Global Quote", "05. price"]
        volume_path: ["Global Quote", "06. volume"]
        market_cap_path: null
        pe_ratio_path: null
        timestamp_path: ["Global Quote", "07. latest trading day"]
        validation_keys: ["Global Quote"]
  symbols:
    - "AAPL"
    - "GOOGL"
    - "MSFT"
    - "TSLA"

# Database connection settings
database:
  host: localhost
  port: 5432
  database: selene
  user: postgres
  min_connection: 1
  max_connection: 10

Handle Secrets with .env

# postgres
DB_NAME=my_dbname
DB_USER=my_user
DB_PASSWORD=my_password
DB_HOST=localhost
DB_PORT=5432
DB_SSLMODE=prefer
DB_CONNECT_TIMEOUT=30
APP_NAME=my_app_name

# alpha_vantage
ALPHA_VANTAGE_API_KEY=your_api_key_here

Pros and Cons {#pros_and_cons}

Pros: When Clean Architecture Shines

Large & Long-term Projects

Complex Business Logic

Enterprise Requirements

Cons: When It Becomes Over-Engineering

Small & Ad-hoc Projects

Rapid Prototyping

Resource Constraints

Decision Framework: When to Use Clean Architecture

✅ Use Clean Architecture When:

❌ Avoid Clean Architecture When:

Conclusion

Complexity Trade-off

Simple Script Approach (50 lines):

import requests
import psycopg2

# Fetch data
response = requests.get(f"https://api.example.com/stock/{symbol}")
data = response.json()

# Save to database
conn = psycopg2.connect("postgresql://...")
cursor.execute("INSERT INTO market_data VALUES (%s, %s)", (symbol, price))
conn.commit()

Clean Architecture Approach (500+ lines):

The Trade-off: 10x more code for 100x more maintainability and flexibility.

Get the Code

Explore the complete implementation with all the Clean Architecture patterns discussed in this article:

Selene - Market Data Pipeline

Production-ready Python implementation featuring:

  • Complete Hexagonal Architecture implementation
  • AlphaVantage API integration with rate limiting
  • PostgreSQL repository with connection pooling
  • Real market data validation and processing
  • CLI interface with dependency injection

🔗 View on GitHub →

Quick Start:

git clone https://github.com/miroslawsteblik/selene.git
cd selene
pip install .
selene fetch --config resources/fetch_api.yaml