KAKUNIN

Python SDK

kakunin — official Python SDK for registering agents, issuing certificates, ingesting behavioral events, and enforcing compliance across LangChain, LlamaIndex, CrewAI, AutoGen, and LangGraph.

Installation

pip install kakunin

Requires Python 3.10+. Async-first (asyncio); all client methods are coroutines.


Quickstart

import asyncio
from kakunin import Kakunin

async def main():
    async with Kakunin(api_key="kak_live_...") as client:
        # 1. Register the agent
        agent = await client.agents.create(
            name="TradeBot v2.1",
            model="gpt-4o",
            version="v2.1.0",
            model_hash="sha256:8f3c...2a91",
            financial_scope={
                "max_single_trade_usd": 50_000,
                "permitted_instruments": ["EUR/USD", "BTC/EUR"],
                "permitted_venues": ["euronext"],
                "leverage_permitted": False,
            },
        )

        # 2. Issue an X.509 certificate — private key never leaves KMS
        cert = await client.agents.certify(agent.id)
        print(cert.serial_number)   # "c4f9-17a2-6b8e"
        print(cert.expires_at)      # 2027-05-20T...

        # 3. Ingest a behavioral event
        event = await client.events.create(
            agent_id=agent.id,
            action_type="transaction_initiated",
            details={"amount": 840, "currency": "EUR", "venue": "euronext"},
        )
        print(event.risk_score)     # 0.12
        print(event.risk_band)      # 'low'

asyncio.run(main())

Configuration

from kakunin import Kakunin

client = Kakunin(
    api_key="kak_live_...",                      # or kak_test_... for sandbox
    base_url="https://api.kakunin.ai/v1",        # default
    timeout=10.0,                                # per-request timeout in seconds
)

Use as an async context manager (async with) for automatic connection cleanup, or call await client.aclose() manually.


Agents

client.agents.create(**params)

agent = await client.agents.create(
    name="Risk Analyser",
    model="claude-sonnet-4-6",
    version="v1.0.0",
    model_hash="sha256:...",
)
ParameterTypeRequiredDescription
namestrHuman-readable agent name
modelstrModel identifier
versionstrSemver or build tag
model_hashstrSHA-256 of model weights/config. Binds cert to exact model version.
financial_scopedictEncoded in the X.509 cert

client.agents.certify(agent_id)

Issues an X.509 certificate via AWS KMS. Returns 409 if agent already has an active cert.

cert = await client.agents.certify(agent.id)
# cert.pem            — full X.509 PEM
# cert.serial_number  — use for verification
# cert.expires_at     — 365 days (MiCA Art. 70)

client.agents.get(agent_id)

agent = await client.agents.get("agt-8f3c2a91d4")
print(agent.status)   # AgentStatus.active

client.agents.list(**params)

agents, pagination = await client.agents.list(status="active", limit=50)

client.agents.halt(agent_id, *, reason=None)

Immediately suspends the agent and revokes its active certificate. Signed halt receipt returned.

receipt = await client.agents.halt(agent.id, reason="Anomalous trade pattern detected")
print(receipt.signature)

client.agents.risk(agent_id, *, window_days=7)

Rolling risk profile with drift score and event breakdown.

risk = await client.agents.risk(agent.id, window_days=30)
print(risk.risk_band)         # RiskBand.high
print(risk.drift.drift_trend) # "increasing"

Events

client.events.create(**params)

Ingest a behavioral event. Returns risk score synchronously (p99 200 ms).

event = await client.events.create(
    agent_id=agent.id,
    action_type="data_access",
    chain_id="chn-xyz",            # optional — group events into a decision chain
    details={"resource": "portfolio_db", "rows_read": 1200},
)

Action types:

TypeBand (default)Description
api_calllowStandard API invocation
data_accesslowRead operation on data
data_mutationmediumWrite / delete operation
transaction_initiatedmediumFinancial transaction
transaction_anomalyhighTransaction outside normal pattern
authentication_failuremediumFailed auth
unauthorized_access_attempthighScope violation attempt

client.events.list(**params)

events, pagination = await client.events.list(agent_id=agent.id, band="high")

Decision Chains

Group a sequence of agent events into a tamper-evident chain. Useful for auditing multi-step agentic workflows.

chain = await client.chains.create(name="Trade execution T-984231")

await client.events.create(agent_id=agent.id, action_type="data_access", chain_id=chain.chain_id)
await client.events.create(agent_id=agent.id, action_type="transaction_initiated", chain_id=chain.chain_id)

closed = await client.chains.close(chain.chain_id)
print(closed.chain_hash)   # HMAC over ordered events — verifiable by anyone

Verify

Public endpoints — no API key required. Use these to verify counterparty agents.

# Verify a certificate serial
cert_data = await client.verify.certificate("c4f9-17a2-6b8e")
print(cert_data["status"])  # "active"

# Verify a signed message
result = await client.verify.message(
    agent_id=agent.id,
    payload={"order_id": "T-984231", "amount": 840},
    signature="base64...",
    certificate_serial="c4f9-17a2-6b8e",
)
print(result.valid)         # True

Compliance Reports

from kakunin.models import StandardsFramework

report = await client.reports.create(
    agent_id=agent.id,
    window_days=30,
    standards_frameworks=[StandardsFramework.iso_27001, StandardsFramework.nist_csf],
)
# report.status == ReportStatus.generating — poll until ready

Framework Integrations

The kakunin.integrations package provides two layers:

LayerPurposeRaises on failure?
Scope verificationPre-execution guard — checks agent is active and holds required scopesScopeViolationError
Behavioral event emissionFire-and-forget monitoring — emits events at framework hook points✗ Never raises

verify_agent_scope — universal decorator

Works with any Python function, any framework. Checks the agent's Kakunin status before the wrapped function runs.

from kakunin import Kakunin
from kakunin.integrations.scope import verify_agent_scope

client = Kakunin(api_key="kak_live_...")

@verify_agent_scope(client, agent_id="agt-123", required_scopes=["trade.execute"])
async def execute_trade(order: dict) -> dict:
    # Only runs if agent is active and has "trade.execute" in metadata["scopes"]
    ...

Parameters:

ParameterTypeDescription
kakuninKakuninAuthenticated client
agent_idstrAgent to verify
required_scopeslist[str] | NoneScope strings checked against agent.metadata["scopes"]. Pass None to only check status.

Works with both async def and def functions. When called from within a running event loop (e.g. inside an async framework), the sync check runs in a thread pool to avoid blocking.

verify_agent_scope raises ScopeViolationError and never swallows it — unlike the event-emission helpers. Handle it at the call site or let it propagate to your agent's error boundary.


LangChain — KakuninToolGuard

Wraps any LangChain BaseTool with a scope check before every invocation. Requires langchain-core ≥ 0.1.0.

from langchain_core.tools import tool
from kakunin import Kakunin
from kakunin.integrations.langchain import KakuninToolGuard

client = Kakunin(api_key="kak_live_...")

@tool
def trade_executor(order: str) -> str:
    """Execute a trade order."""
    return execute_trade(order)

guarded = KakuninToolGuard(
    kakunin=client,
    agent_id="agt-123",
    tool=trade_executor,
    required_scopes=["trade.execute"],
)

# Drop into any LangChain agent or LCEL chain as a normal tool
agent = create_react_agent(llm, tools=[guarded], prompt=prompt)

To guard an entire chain rather than a single tool, use langchain_scope_callback:

from kakunin.integrations.langchain import langchain_scope_callback

guard = langchain_scope_callback(client, agent_id="agt-123")
chain = my_chain.with_config(callbacks=[guard])
# Raises ScopeViolationError at chain start if agent is not active

LlamaIndex — KakuninFunctionToolGuard

Wraps a callable with scope verification and exposes the full LlamaIndex tool protocol (call, acall, metadata).

from kakunin import Kakunin
from kakunin.integrations.llamaindex import KakuninFunctionToolGuard
from llama_index.core.agent import ReActAgent

client = Kakunin(api_key="kak_live_...")

def portfolio_lookup(account_id: str) -> dict:
    """Fetch current portfolio positions."""
    return fetch_positions(account_id)

guarded = KakuninFunctionToolGuard(
    kakunin=client,
    agent_id="agt-123",
    fn=portfolio_lookup,
    name="portfolio_lookup",
    description="Fetch current portfolio positions for an account.",
    required_scopes=["data.read"],
)

agent = ReActAgent.from_tools([guarded], llm=llm, verbose=True)

CrewAI — KakuninCrewAgent

Subclass of crewai.Agent that emits Kakunin events at every task execution and tool call. Pass required_scopes to block task execution if the agent is not active. Requires crewai ≥ 0.28.0.

from crewai import Task, Crew
from kakunin import Kakunin
from kakunin.integrations.crewai import KakuninCrewAgent

client = Kakunin(api_key="kak_live_...")

agent = KakuninCrewAgent(
    kakunin=client,
    agent_id="agt-123",
    required_scopes=["trade.execute"],   # optional scope guard
    # Standard CrewAI kwargs:
    role="Risk Analyst",
    goal="Assess and flag high-risk trades",
    backstory="...",
    tools=[...],
)

task = Task(description="Assess trade T-984231", agent=agent)
crew = Crew(agents=[agent], tasks=[task])
crew.kickoff()

Every execute_task() call emits an api_call event on start and a data_access event on completion. Exceptions emit transaction_anomaly. If required_scopes are set, execution is blocked before any event is emitted.


AutoGen — KakuninConversableAgent

Subclass of autogen.ConversableAgent that emits Kakunin events on every receive() and generate_reply(). Pass required_scopes to block reply generation. Requires pyautogen ≥ 0.2.0.

from autogen import UserProxyAgent
from kakunin import Kakunin
from kakunin.integrations.autogen import KakuninConversableAgent

client = Kakunin(api_key="kak_live_...")

risk_engine = KakuninConversableAgent(
    kakunin=client,
    agent_id="agt-456",
    required_scopes=["chat.reply"],   # optional scope guard
    # Standard AutoGen kwargs:
    name="RiskEngine",
    system_message="You are a risk analyst.",
    llm_config={"model": "gpt-4o"},
)

user_proxy = UserProxyAgent(name="User")
user_proxy.initiate_chat(risk_engine, message="Assess trade T-984231")

For attaching X-Kakunin-Cert-Serial to outbound httpx requests, use KakuninHttpxMixin:

from kakunin.integrations.autogen import KakuninHttpxMixin, KakuninConversableAgent

class MyAgent(KakuninHttpxMixin, KakuninConversableAgent):
    pass

agent = MyAgent(
    kakunin=client,
    agent_id="agt-456",
    cert_serial="c4f9-17a2-6b8e",
    name="RiskEngine",
    ...
)

# Outbound calls via agent.http_client automatically include the cert serial
resp = await agent.http_client.get("https://internal-service/prices")

LangGraph — kakunin_node

Decorator that wraps a LangGraph node function to emit a Kakunin event on every invocation.

from kakunin import Kakunin
from kakunin.integrations.langgraph import kakunin_node

client = Kakunin(api_key="kak_live_...")

@kakunin_node(client, agent_id="agt-123", action_type="data_access")
async def risk_engine_node(state: dict) -> dict:
    # your node logic
    return state

# Or wrap at graph construction time
graph.add_node("risk_engine", kakunin_node(client, agent_id="agt-123")(risk_engine_node))

To add scope verification to a LangGraph node, combine the two decorators:

@verify_agent_scope(client, agent_id="agt-123", required_scopes=["data.read"])
@kakunin_node(client, agent_id="agt-123", action_type="data_access")
async def data_ingestion_node(state: dict) -> dict:
    ...

Error handling

All SDK errors are instances of KakuninError:

from kakunin import Kakunin, KakuninError, AuthenticationError, RateLimitError
from kakunin.exceptions import ScopeViolationError

try:
    await client.agents.certify(agent_id)
except RateLimitError as e:
    print(f"Rate limited. Retry after {e.retry_after}s")
except AuthenticationError:
    print("Invalid API key")
except KakuninError as e:
    print(f"API error: {e}")

ScopeViolationError

Raised by verify_agent_scope, KakuninToolGuard, KakuninFunctionToolGuard, and the required_scopes parameter on KakuninCrewAgent / KakuninConversableAgent.

from kakunin.exceptions import ScopeViolationError

try:
    await guarded_fn()
except ScopeViolationError as e:
    print(e.agent_id)        # "agt-123"
    print(e.agent_status)    # "suspended"
    print(e.missing_scopes)  # ["trade.execute"]
AttributeTypeDescription
agent_idstr | NoneThe agent that failed verification
agent_statusstr | None"suspended" or "retired" if status check failed
missing_scopeslist[str]Scopes the agent lacked (empty if status was the issue)

Models

Key Pydantic models exported from kakunin.models:

ModelDescription
AgentAgent record — id, name, model, version, status, model_hash
AgentStatusEnum: active, suspended, retired
Certificateid, serial_number, status, pem, issued_at, expires_at
CertificateStatusEnum: active, revoked, expired
BehaviorEventid, agent_id, action_type, risk_score, risk_band
RiskBandEnum: low, medium, high
AgentRiskRolling risk profile with drift info
HaltReceiptSigned halt receipt — signature, algorithm, halted_at
DecisionChainchain_id, name, status, chain_hash, events

Environment variables

# Required
KAK_API_KEY=kak_live_...    # or kak_test_... for sandbox

On this page