This guide builds a production-ready MiCA-compliant trading bot in Python. It uses the Kakunin Python SDK with either Freqtrade or raw CCXT for exchange connectivity.
Time: ~25 minutes
Prerequisites: Python 3.11+, pip, Kakunin account
Stack: Python · Kakunin Python SDK · Freqtrade or CCXT · Docker
For the TypeScript/Kubernetes version, see MiCA Trading Bot Quickstart.
pip install kakunin freqtrade ccxt python-dotenv
Or with a requirements file:
# requirements.txt
kakunin>=1.0.0
ccxt>=4.3.0
python-dotenv>=1.0.0
pydantic>=2.0.0
# .env
KAKUNIN_API_KEY=sk_live_xxxxxxxxxxxxxxxx
KAKUNIN_PROJECT_ID=proj_1234567890
EXCHANGE_API_KEY=xxxxxxxxxxxxxxxxxx
EXCHANGE_SECRET=xxxxxxxxxxxxxxxxxx
AWS_REGION=eu-west-1
# config.py
from pydantic import BaseSettings
class Settings(BaseSettings):
kakunin_api_key: str
kakunin_project_id: str
exchange_api_key: str
exchange_secret: str
aws_region: str = "eu-west-1"
class Config:
env_file = ".env"
settings = Settings()
# mica_policy.py
from dataclasses import dataclass
from typing import List
@dataclass
class TradingHours:
start: str # "08:00" UTC
end: str # "17:00" UTC
exclude_weekends: bool = True
@dataclass
class MiCAScope:
max_trade_size: float # EUR per trade
max_daily_volume: float # EUR per day
allowed_markets: List[str] # e.g. ["EUR/USDT", "BTC/EUR"]
allowed_regions: List[str] # e.g. ["eu-west-1"]
trading_hours: TradingHours
@dataclass
class MiCAPolicy:
agent_name: str
operator_name: str
operator_regulator_id: str # FINMA/FCA/BaFin registration number
scope: MiCAScope
kill_switch_threshold: float = 0.85 # auto-revoke at this anomaly score
circuit_breaker_threshold: float = 0.75
# Your policy:
POLICY = MiCAPolicy(
agent_name="algo_trader_py_v1",
operator_name="Acme Capital GmbH",
operator_regulator_id="BAFIN-2026-00142",
scope=MiCAScope(
max_trade_size=25_000,
max_daily_volume=500_000,
allowed_markets=["EUR/USDT", "BTC/EUR", "ETH/EUR"],
allowed_regions=["eu-west-1"],
trading_hours=TradingHours(start="08:00", end="17:00"),
),
)
# agent_bootstrap.py
import os
from kakunin import KakuninClient
from kakunin.models import AgentMetadata, CertificateScope
from mica_policy import POLICY
from datetime import date
client = KakuninClient(
api_key=os.environ["KAKUNIN_API_KEY"],
project_id=os.environ["KAKUNIN_PROJECT_ID"],
)
def bootstrap_agent():
# Create or retrieve agent
agent = client.agents.create(
name=POLICY.agent_name,
metadata=AgentMetadata(
version="1.0.0",
model="rule-based", # or LLM model name
framework="freqtrade",
operator=POLICY.operator_name,
regulator_id=POLICY.operator_regulator_id,
annex_iii_category="financial_services",
board_approved_date=str(date.today()),
),
)
print(f"✓ Agent registered: {agent.id}")
# Issue X.509 certificate with MiCA scope
cert = client.agents.get_certificate(
agent_id=agent.id,
validity_days=365,
scope=CertificateScope(
actions=["trade.execute", "market.read", "position.query"],
max_trade_size=POLICY.scope.max_trade_size,
max_daily_volume=POLICY.scope.max_daily_volume,
allowed_markets=POLICY.scope.allowed_markets,
),
)
print(f"✓ Certificate issued: {cert.serial_number}")
print(f" KMS ARN: {cert.kms_key_arn}")
return agent, cert
if __name__ == "__main__":
agent, cert = bootstrap_agent()
# Save agent.id and cert.serial_number to your secrets manager
# Never log cert.certificate_pem to stdout in production
# trading_engine.py
import hashlib, json
from datetime import datetime, time
from kakunin import KakuninClient
from kakunin.integrations.scope import verify_agent_scope
from kakunin.exceptions import KakuninScopeError
import ccxt
from mica_policy import POLICY
import os
client = KakuninClient(api_key=os.environ["KAKUNIN_API_KEY"])
exchange = ccxt.binance({
"apiKey": os.environ["EXCHANGE_API_KEY"],
"secret": os.environ["EXCHANGE_SECRET"],
"options": {"defaultType": "spot"},
})
@verify_agent_scope(required_scope="trade.execute")
async def execute_trade(
agent_id: str,
agent_cert: str,
market: str,
side: str,
size_eur: float,
) -> dict:
"""Execute a single MiCA-compliant trade."""
# 1. Pre-trade scope check (redundant if decorator used — defence in depth)
if market not in POLICY.scope.allowed_markets:
raise KakuninScopeError(f"Market {market} not in allowed scope")
if size_eur > POLICY.scope.max_trade_size:
raise KakuninScopeError(
f"Trade size {size_eur} EUR exceeds limit {POLICY.scope.max_trade_size} EUR"
)
if not _within_trading_hours():
raise KakuninScopeError("Outside MiCA-permitted trading hours")
# 2. Build trade payload and hash it (never log raw payload in production)
trade_payload = {"market": market, "side": side, "size_eur": size_eur}
payload_hash = hashlib.sha256(
json.dumps(trade_payload, sort_keys=True).encode()
).hexdigest()
# 3. Sign the trade with agent identity via Kakunin
signed = client.actions.sign(
agent_id=agent_id,
payload_hash=payload_hash,
metadata={"action_type": "trade.execute", "timestamp": datetime.utcnow().isoformat()},
)
print(f"✓ Trade signed (sig: {signed.signature[:20]}...)")
# 4. Submit to exchange with identity headers
order = exchange.create_order(
symbol=market,
type="market",
side=side.lower(),
amount=size_eur,
)
# 5. Log to Kakunin audit trail (MiCA Art. 72)
risk_score = client.events.ingest(
agent_id=agent_id,
action_type="trade.execute",
payload_hash=payload_hash,
metadata={
"order_id": order["id"],
"market": market,
"side": side,
"size_eur": size_eur,
"execution_price": order.get("price"),
"signature": signed.signature[:50],
},
)
# 6. Check post-trade risk score
if risk_score.risk_score >= POLICY.circuit_breaker_threshold:
print(f"⚠️ Anomaly score {risk_score.risk_score:.2f} — pausing for review")
client.agents.pause(agent_id, reason="anomaly_threshold_exceeded")
print(f"✓ Trade executed: {order['id']}")
return order
def _within_trading_hours() -> bool:
now = datetime.utcnow()
if now.weekday() >= 5: # Saturday = 5, Sunday = 6
return False
start = time(8, 0)
end = time(17, 0)
return start <= now.time() <= end
If you're using Freqtrade, add Kakunin as a custom sell/buy reason callback:
# freqtrade_kakunin_plugin.py
from freqtrade.strategy import IStrategy
from kakunin import KakuninClient
from kakunin.integrations.scope import verify_agent_scope
import os
client = KakuninClient(api_key=os.environ["KAKUNIN_API_KEY"])
AGENT_ID = os.environ["KAKUNIN_AGENT_ID"]
class KakuninCompliantStrategy(IStrategy):
"""
Wraps any Freqtrade strategy with Kakunin MiCA compliance.
Override should_sell() and confirm_trade_entry() to add compliance checks.
"""
def confirm_trade_entry(
self, pair, order_type, amount, rate, time_in_force, current_time, entry_tag, **kwargs
) -> bool:
try:
# Verify agent cert is still valid before every trade
cert_status = client.agents.get_status(AGENT_ID)
if cert_status.certificate_status != "active":
self.log(f"Trade blocked — cert status: {cert_status.certificate_status}")
return False
# Check risk score before entering trade
if cert_status.anomaly_score >= 0.75:
self.log(f"Trade blocked — anomaly score: {cert_status.anomaly_score}")
return False
return True
except Exception as e:
self.log(f"Kakunin check failed: {e}")
return False # Fail closed — block trade if compliance check errors
# kya_baseline.py
from kakunin import KakuninClient
import os
client = KakuninClient(api_key=os.environ["KAKUNIN_API_KEY"])
def set_baseline(agent_id: str):
"""
Call once after 7-day paper trading period to establish normal behavior.
Anomaly detection only arms after baseline is set.
"""
baseline = client.monitoring.set_baseline(
agent_id=agent_id,
baseline={
"trades_per_hour_p50": 4,
"trades_per_hour_p95": 10,
"trade_size_eur_p50": 12_000,
"trade_size_eur_p99": 24_000,
"preferred_markets": ["EUR/USDT", "BTC/EUR"],
"active_hours_utc": {"start": 8, "end": 17},
"weekend_activity_expected": False,
},
)
print(f"✓ KYA baseline set — anomaly detection active (agent: {agent_id})")
return baseline
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "trading_engine.py"]
# docker-compose.yml
version: '3.9'
services:
trading-bot:
build: .
env_file: .env
environment:
- KAKUNIN_AGENT_ID=${KAKUNIN_AGENT_ID}
restart: unless-stopped
volumes:
- audit-logs:/var/audit
volumes:
audit-logs:
driver: local
docker compose up -d
docker compose logs -f trading-bot
# Check agent status
python -c "
from kakunin import KakuninClient; import os
c = KakuninClient(api_key=os.environ['KAKUNIN_API_KEY'])
s = c.agents.get_status(os.environ['KAKUNIN_AGENT_ID'])
print(f'Status: {s.certificate_status}')
print(f'Anomaly: {s.anomaly_score}')
print(f'Expires: {s.certificate_expires_at}')
"
# Expected:
# Status: active
# Anomaly: 0.04
# Expires: 2027-05-28T00:00:00Z
| Article | Implementation |
|---|---|
| Art. 67 — CASP registration | operator_regulator_id in agent metadata |
| Art. 68 — Conduct of business | Scope: max_trade_size, allowed_markets, trading hours |
| Art. 70 — Record keeping | WORM audit log via events.ingest() |
| Art. 72 — Transaction reporting | Every trade logged with signature + order ID |
| Art. 73 — Prudential requirements | Anomaly scoring; circuit breaker at 0.75 |