Freqtrade Integration
Register a Freqtrade AI trading bot with Kakunin. Cryptographic identity, pre-trade scope verification, risk-aware throttling, and webhook-triggered circuit breakers.
Freqtrade Integration Guide
Connect a Freqtrade bot to Kakunin in under 2 hours. Covers cert installation, trade event mapping, pre-trade scope checks, and risk-based throttling.
Target audience: AI trading bot operators subject to MiCA or EU AI Act compliance obligations.
Architecture
Freqtrade Bot
│
├── on startup → register agent + issue X.509 cert
├── before trade → verify_agent_scope (check permitted_actions)
├── after trade → log trade_execution event
├── on error → log authentication / data_mutation event
└── on webhook → listen for certificate.revoked → halt tradingPrerequisites
pip install requests cryptographySet env vars:
export KAKUNIN_API_KEY=kkn_sandbox_... # start with sandbox
export KAKUNIN_BASE_URL=https://kakunin-git-staging-immortalpilot365-5309s-projects.vercel.app/api/v1For production: swap to kkn_live_* key and https://kakunin.ai/api/v1.
Step 1 — Register Agent on Startup
Add to freqtrade/templates/sample_strategy.py or your bot's __init__:
import os
import hashlib
import requests
import json
from pathlib import Path
KAKUNIN_BASE = os.environ["KAKUNIN_BASE_URL"]
KAKUNIN_HEADERS = {
"Authorization": f"Bearer {os.environ['KAKUNIN_API_KEY']}",
"Content-Type": "application/json",
}
AGENT_STATE_FILE = Path(".kakunin_agent.json")
def kakunin_register() -> dict:
"""Register this Freqtrade bot with Kakunin and issue X.509 cert."""
# Return cached state if already registered
if AGENT_STATE_FILE.exists():
return json.loads(AGENT_STATE_FILE.read_text())
# 1. Register agent
resp = requests.post(f"{KAKUNIN_BASE}/agents", json={
"name": "Freqtrade Bot v1.0",
"model": "freqtrade-ml",
"version": "1.0.0",
"permitted_actions": [
"trade_execution",
"data_access",
"api_call",
],
}, headers=KAKUNIN_HEADERS, timeout=10)
resp.raise_for_status()
agent = resp.json()["data"]
# 2. Issue certificate
cert_resp = requests.post(
f"{KAKUNIN_BASE}/agents/{agent['id']}/certify",
headers=KAKUNIN_HEADERS, timeout=10
)
cert_resp.raise_for_status()
cert = cert_resp.json()["data"]
state = {
"agent_id": agent["id"],
"certificate_pem": cert["certificate_pem"],
"serial_number": cert["serial_number"],
"expires_at": cert["expires_at"],
}
# Cache locally — cert valid for 365 days
AGENT_STATE_FILE.write_text(json.dumps(state))
return stateStep 2 — Install Certificate for mTLS (optional)
If your exchange connection requires client certs:
import tempfile
import ssl
def get_ssl_context(agent_state: dict) -> ssl.SSLContext:
"""Create SSL context with Kakunin certificate for mTLS."""
ctx = ssl.create_default_context()
# Write cert to temp file — SSLContext requires file path
with tempfile.NamedTemporaryFile(suffix=".pem", delete=False) as f:
f.write(agent_state["certificate_pem"].encode())
ctx.load_cert_chain(f.name)
return ctxStep 3 — Pre-Trade Scope Verification
Call before every trade to ensure the action is permitted:
def verify_scope(agent_id: str, action: str, metadata: dict = {}) -> bool:
"""
Check agent is permitted to execute this action.
Returns False if scope check fails — caller should abort trade.
"""
try:
resp = requests.get(
f"{KAKUNIN_BASE}/agents/{agent_id}",
headers=KAKUNIN_HEADERS, timeout=5
)
if not resp.ok:
return False # fail closed — abort on error
agent = resp.json()["data"]
return action in (agent.get("permitted_actions") or [])
except requests.RequestException:
return False # fail closedUsage in Freqtrade strategy:
def confirm_trade_entry(self, pair, order_type, amount, rate, ...) -> bool:
state = kakunin_register()
# Scope check — abort if not permitted
if not verify_scope(state["agent_id"], "trade_execution", {"pair": pair}):
self.log_once(f"Kakunin: trade_execution not in permitted_actions, blocking {pair}", logging.WARNING)
return False
return TrueStep 4 — Log Trade Events
Map Freqtrade callbacks to Kakunin event types:
def log_kakunin_event(agent_id: str, action_type: str, metadata: dict):
"""Fire-and-forget event log. Never blocks the trade path."""
try:
requests.post(f"{KAKUNIN_BASE}/events", json={
"agent_id": agent_id,
"action_type": action_type,
"metadata": metadata,
}, headers=KAKUNIN_HEADERS, timeout=3)
except requests.RequestException:
pass # non-blocking — never fail trades over logging| Freqtrade callback | action_type | Metadata to include |
|---|---|---|
confirm_trade_entry | trade_execution | pair, side, amount, rate |
confirm_trade_exit | trade_execution | pair, profit_ratio, exit_reason |
bot_start | authentication | strategy, timeframe |
process_stopped | authentication | reason |
| Exchange API call | api_call | exchange, endpoint |
custom_stoploss | data_access | pair, current_rate |
| Signal generation | model_inference | pair, signal, confidence |
def custom_exit(self, pair, trade, current_time, current_rate, current_profit, **kwargs):
state = kakunin_register()
log_kakunin_event(state["agent_id"], "trade_execution", {
"pair": pair,
"side": "exit",
"current_profit_ratio": float(current_profit),
"exit_reason": "custom_exit",
"rate": float(current_rate),
})
return NoneStep 5 — Risk Score Check + Throttling
Check risk band before trading sessions. High risk → reduce position size or halt:
def get_risk_score(agent_id: str) -> dict:
resp = requests.get(
f"{KAKUNIN_BASE}/agents/{agent_id}/risk-score",
headers=KAKUNIN_HEADERS, timeout=5
)
resp.raise_for_status()
return resp.json()["data"] # { risk_score, risk_band }
def get_position_size_multiplier(agent_id: str) -> float:
"""Return stake multiplier based on risk band. 1.0 = normal, 0.0 = halt."""
try:
risk = get_risk_score(agent_id)
band = risk.get("risk_band", "low")
return {"low": 1.0, "medium": 0.5, "high": 0.0}.get(band, 1.0)
except Exception:
return 1.0 # default to normal on error
def custom_stake_amount(self, current_time, current_rate, current_profit, ...) -> float:
state = kakunin_register()
multiplier = get_position_size_multiplier(state["agent_id"])
if multiplier == 0.0:
return 0 # halt — high risk score
return self.wallets.get_trade_stake_amount(...) * multiplierStep 6 — Webhook Listener for Revocation Alerts
Start a lightweight listener to receive Kakunin webhook events:
from http.server import HTTPServer, BaseHTTPRequestHandler
import threading
import hmac
import hashlib
WEBHOOK_SECRET = os.environ.get("KAKUNIN_WEBHOOK_SECRET", "")
_trading_halted = threading.Event()
class KakuninWebhookHandler(BaseHTTPRequestHandler):
def do_POST(self):
length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(length)
# Verify HMAC signature
sig = self.headers.get("X-Kakunin-Signature", "")
expected = hmac.new(WEBHOOK_SECRET.encode(), body, hashlib.sha256).hexdigest()
if not hmac.compare_digest(sig, expected):
self.send_response(401)
self.end_headers()
return
payload = json.loads(body)
if payload.get("event") == "certificate.revoked":
_trading_halted.set() # signal halt to main loop
self.send_response(200)
self.end_headers()
def log_message(self, *args): pass # suppress access logs
def start_webhook_listener(port: int = 8765):
server = HTTPServer(("0.0.0.0", port), KakuninWebhookHandler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
def is_trading_halted() -> bool:
return _trading_halted.is_set()Check in your main loop:
def bot_loop_start(self, current_time, **kwargs) -> None:
if is_trading_halted():
raise SystemExit("Trading halted: Kakunin certificate revoked")Register webhook in dashboard: /dashboard/webhooks → add URL https://your-bot-host:8765/webhook → select certificate.revoked.
Full Strategy Template
class KakuninStrategy(IStrategy):
_kakunin_state: dict | None = None
def bot_start(self, **kwargs) -> None:
self._kakunin_state = kakunin_register()
start_webhook_listener()
log_kakunin_event(self._kakunin_state["agent_id"], "authentication", {
"event": "bot_start",
"strategy": self.__class__.__name__,
})
def confirm_trade_entry(self, pair, order_type, amount, rate, **kwargs) -> bool:
if is_trading_halted():
return False
if not verify_scope(self._kakunin_state["agent_id"], "trade_execution"):
return False
multiplier = get_position_size_multiplier(self._kakunin_state["agent_id"])
if multiplier == 0.0:
return False # high risk — halt
log_kakunin_event(self._kakunin_state["agent_id"], "trade_execution", {
"pair": pair, "side": "buy", "amount": float(amount), "rate": float(rate),
})
return TrueTesting in Sandbox
# 1. Generate sandbox key
curl -X POST https://kakunin-git-staging-immortalpilot365-5309s-projects.vercel.app/api/v1/api-keys/sandbox \
-H "Authorization: Bearer $KAKUNIN_API_KEY"
# 2. Run bot against sandbox
export KAKUNIN_API_KEY=kkn_sandbox_...
export KAKUNIN_BASE_URL=https://kakunin-git-staging-immortalpilot365-5309s-projects.vercel.app/api/v1
# 3. Reset sandbox data when done
curl -X POST $KAKUNIN_BASE_URL/../internal/sandbox/reset \
-H "Authorization: Bearer $KAKUNIN_API_KEY"Going to Production
- Generate production API key:
/dashboard/api-keys→ "Generate API Key" - Update env vars:
KAKUNIN_API_KEY=kkn_live_...,KAKUNIN_BASE_URL=https://kakunin.ai/api/v1 - Delete
.kakunin_agent.jsonto force fresh registration - Configure webhook with your public URL
- Monitor risk score in
/dashboard/agents
Next: AI Agent Quickstart · Webhooks · Sandbox