KAKUNIN

Freqtrade Integration

Register a Freqtrade AI trading bot with Kakunin. Cryptographic identity, pre-trade scope verification, risk-aware throttling, and webhook-triggered circuit breakers.

Freqtrade Integration Guide

Connect a Freqtrade bot to Kakunin in under 2 hours. Covers cert installation, trade event mapping, pre-trade scope checks, and risk-based throttling.

Target audience: AI trading bot operators subject to MiCA or EU AI Act compliance obligations.


Architecture

Freqtrade Bot

    ├── on startup      → register agent + issue X.509 cert
    ├── before trade    → verify_agent_scope (check permitted_actions)
    ├── after trade     → log trade_execution event
    ├── on error        → log authentication / data_mutation event
    └── on webhook      → listen for certificate.revoked → halt trading

Prerequisites

pip install requests cryptography

Set env vars:

export KAKUNIN_API_KEY=kkn_sandbox_...   # start with sandbox
export KAKUNIN_BASE_URL=https://kakunin-git-staging-immortalpilot365-5309s-projects.vercel.app/api/v1

For production: swap to kkn_live_* key and https://kakunin.ai/api/v1.


Step 1 — Register Agent on Startup

Add to freqtrade/templates/sample_strategy.py or your bot's __init__:

import os
import hashlib
import requests
import json
from pathlib import Path

KAKUNIN_BASE = os.environ["KAKUNIN_BASE_URL"]
KAKUNIN_HEADERS = {
    "Authorization": f"Bearer {os.environ['KAKUNIN_API_KEY']}",
    "Content-Type": "application/json",
}
AGENT_STATE_FILE = Path(".kakunin_agent.json")


def kakunin_register() -> dict:
    """Register this Freqtrade bot with Kakunin and issue X.509 cert."""
    # Return cached state if already registered
    if AGENT_STATE_FILE.exists():
        return json.loads(AGENT_STATE_FILE.read_text())

    # 1. Register agent
    resp = requests.post(f"{KAKUNIN_BASE}/agents", json={
        "name": "Freqtrade Bot v1.0",
        "model": "freqtrade-ml",
        "version": "1.0.0",
        "permitted_actions": [
            "trade_execution",
            "data_access",
            "api_call",
        ],
    }, headers=KAKUNIN_HEADERS, timeout=10)
    resp.raise_for_status()
    agent = resp.json()["data"]

    # 2. Issue certificate
    cert_resp = requests.post(
        f"{KAKUNIN_BASE}/agents/{agent['id']}/certify",
        headers=KAKUNIN_HEADERS, timeout=10
    )
    cert_resp.raise_for_status()
    cert = cert_resp.json()["data"]

    state = {
        "agent_id": agent["id"],
        "certificate_pem": cert["certificate_pem"],
        "serial_number": cert["serial_number"],
        "expires_at": cert["expires_at"],
    }

    # Cache locally — cert valid for 365 days
    AGENT_STATE_FILE.write_text(json.dumps(state))
    return state

Step 2 — Install Certificate for mTLS (optional)

If your exchange connection requires client certs:

import tempfile
import ssl

def get_ssl_context(agent_state: dict) -> ssl.SSLContext:
    """Create SSL context with Kakunin certificate for mTLS."""
    ctx = ssl.create_default_context()
    # Write cert to temp file — SSLContext requires file path
    with tempfile.NamedTemporaryFile(suffix=".pem", delete=False) as f:
        f.write(agent_state["certificate_pem"].encode())
        ctx.load_cert_chain(f.name)
    return ctx

Step 3 — Pre-Trade Scope Verification

Call before every trade to ensure the action is permitted:

def verify_scope(agent_id: str, action: str, metadata: dict = {}) -> bool:
    """
    Check agent is permitted to execute this action.
    Returns False if scope check fails — caller should abort trade.
    """
    try:
        resp = requests.get(
            f"{KAKUNIN_BASE}/agents/{agent_id}",
            headers=KAKUNIN_HEADERS, timeout=5
        )
        if not resp.ok:
            return False  # fail closed — abort on error
        agent = resp.json()["data"]
        return action in (agent.get("permitted_actions") or [])
    except requests.RequestException:
        return False  # fail closed

Usage in Freqtrade strategy:

def confirm_trade_entry(self, pair, order_type, amount, rate, ...) -> bool:
    state = kakunin_register()

    # Scope check — abort if not permitted
    if not verify_scope(state["agent_id"], "trade_execution", {"pair": pair}):
        self.log_once(f"Kakunin: trade_execution not in permitted_actions, blocking {pair}", logging.WARNING)
        return False

    return True

Step 4 — Log Trade Events

Map Freqtrade callbacks to Kakunin event types:

def log_kakunin_event(agent_id: str, action_type: str, metadata: dict):
    """Fire-and-forget event log. Never blocks the trade path."""
    try:
        requests.post(f"{KAKUNIN_BASE}/events", json={
            "agent_id": agent_id,
            "action_type": action_type,
            "metadata": metadata,
        }, headers=KAKUNIN_HEADERS, timeout=3)
    except requests.RequestException:
        pass  # non-blocking — never fail trades over logging
Freqtrade callbackaction_typeMetadata to include
confirm_trade_entrytrade_executionpair, side, amount, rate
confirm_trade_exittrade_executionpair, profit_ratio, exit_reason
bot_startauthenticationstrategy, timeframe
process_stoppedauthenticationreason
Exchange API callapi_callexchange, endpoint
custom_stoplossdata_accesspair, current_rate
Signal generationmodel_inferencepair, signal, confidence
def custom_exit(self, pair, trade, current_time, current_rate, current_profit, **kwargs):
    state = kakunin_register()
    log_kakunin_event(state["agent_id"], "trade_execution", {
        "pair": pair,
        "side": "exit",
        "current_profit_ratio": float(current_profit),
        "exit_reason": "custom_exit",
        "rate": float(current_rate),
    })
    return None

Step 5 — Risk Score Check + Throttling

Check risk band before trading sessions. High risk → reduce position size or halt:

def get_risk_score(agent_id: str) -> dict:
    resp = requests.get(
        f"{KAKUNIN_BASE}/agents/{agent_id}/risk-score",
        headers=KAKUNIN_HEADERS, timeout=5
    )
    resp.raise_for_status()
    return resp.json()["data"]  # { risk_score, risk_band }


def get_position_size_multiplier(agent_id: str) -> float:
    """Return stake multiplier based on risk band. 1.0 = normal, 0.0 = halt."""
    try:
        risk = get_risk_score(agent_id)
        band = risk.get("risk_band", "low")
        return {"low": 1.0, "medium": 0.5, "high": 0.0}.get(band, 1.0)
    except Exception:
        return 1.0  # default to normal on error


def custom_stake_amount(self, current_time, current_rate, current_profit, ...) -> float:
    state = kakunin_register()
    multiplier = get_position_size_multiplier(state["agent_id"])
    if multiplier == 0.0:
        return 0  # halt — high risk score
    return self.wallets.get_trade_stake_amount(...) * multiplier

Step 6 — Webhook Listener for Revocation Alerts

Start a lightweight listener to receive Kakunin webhook events:

from http.server import HTTPServer, BaseHTTPRequestHandler
import threading
import hmac
import hashlib

WEBHOOK_SECRET = os.environ.get("KAKUNIN_WEBHOOK_SECRET", "")
_trading_halted = threading.Event()


class KakuninWebhookHandler(BaseHTTPRequestHandler):
    def do_POST(self):
        length = int(self.headers.get("Content-Length", 0))
        body = self.rfile.read(length)

        # Verify HMAC signature
        sig = self.headers.get("X-Kakunin-Signature", "")
        expected = hmac.new(WEBHOOK_SECRET.encode(), body, hashlib.sha256).hexdigest()
        if not hmac.compare_digest(sig, expected):
            self.send_response(401)
            self.end_headers()
            return

        payload = json.loads(body)
        if payload.get("event") == "certificate.revoked":
            _trading_halted.set()  # signal halt to main loop

        self.send_response(200)
        self.end_headers()

    def log_message(self, *args): pass  # suppress access logs


def start_webhook_listener(port: int = 8765):
    server = HTTPServer(("0.0.0.0", port), KakuninWebhookHandler)
    thread = threading.Thread(target=server.serve_forever, daemon=True)
    thread.start()


def is_trading_halted() -> bool:
    return _trading_halted.is_set()

Check in your main loop:

def bot_loop_start(self, current_time, **kwargs) -> None:
    if is_trading_halted():
        raise SystemExit("Trading halted: Kakunin certificate revoked")

Register webhook in dashboard: /dashboard/webhooks → add URL https://your-bot-host:8765/webhook → select certificate.revoked.


Full Strategy Template

class KakuninStrategy(IStrategy):
    _kakunin_state: dict | None = None

    def bot_start(self, **kwargs) -> None:
        self._kakunin_state = kakunin_register()
        start_webhook_listener()
        log_kakunin_event(self._kakunin_state["agent_id"], "authentication", {
            "event": "bot_start",
            "strategy": self.__class__.__name__,
        })

    def confirm_trade_entry(self, pair, order_type, amount, rate, **kwargs) -> bool:
        if is_trading_halted():
            return False
        if not verify_scope(self._kakunin_state["agent_id"], "trade_execution"):
            return False
        multiplier = get_position_size_multiplier(self._kakunin_state["agent_id"])
        if multiplier == 0.0:
            return False  # high risk — halt
        log_kakunin_event(self._kakunin_state["agent_id"], "trade_execution", {
            "pair": pair, "side": "buy", "amount": float(amount), "rate": float(rate),
        })
        return True

Testing in Sandbox

# 1. Generate sandbox key
curl -X POST https://kakunin-git-staging-immortalpilot365-5309s-projects.vercel.app/api/v1/api-keys/sandbox \
  -H "Authorization: Bearer $KAKUNIN_API_KEY"

# 2. Run bot against sandbox
export KAKUNIN_API_KEY=kkn_sandbox_...
export KAKUNIN_BASE_URL=https://kakunin-git-staging-immortalpilot365-5309s-projects.vercel.app/api/v1

# 3. Reset sandbox data when done
curl -X POST $KAKUNIN_BASE_URL/../internal/sandbox/reset \
  -H "Authorization: Bearer $KAKUNIN_API_KEY"

Going to Production

  1. Generate production API key: /dashboard/api-keys → "Generate API Key"
  2. Update env vars: KAKUNIN_API_KEY=kkn_live_..., KAKUNIN_BASE_URL=https://kakunin.ai/api/v1
  3. Delete .kakunin_agent.json to force fresh registration
  4. Configure webhook with your public URL
  5. Monitor risk score in /dashboard/agents

Next: AI Agent Quickstart · Webhooks · Sandbox

On this page