Python SDK
kakunin — official Python SDK for registering agents, issuing certificates, ingesting behavioral events, and enforcing compliance across LangChain, LlamaIndex, CrewAI, AutoGen, and LangGraph.
Installation
pip install kakuninRequires Python 3.10+. Async-first (asyncio); all client methods are coroutines.
Quickstart
import asyncio
from kakunin import Kakunin
async def main():
async with Kakunin(api_key="kak_live_...") as client:
# 1. Register the agent
agent = await client.agents.create(
name="TradeBot v2.1",
model="gpt-4o",
version="v2.1.0",
model_hash="sha256:8f3c...2a91",
financial_scope={
"max_single_trade_usd": 50_000,
"permitted_instruments": ["EUR/USD", "BTC/EUR"],
"permitted_venues": ["euronext"],
"leverage_permitted": False,
},
)
# 2. Issue an X.509 certificate — private key never leaves KMS
cert = await client.agents.certify(agent.id)
print(cert.serial_number) # "c4f9-17a2-6b8e"
print(cert.expires_at) # 2027-05-20T...
# 3. Ingest a behavioral event
event = await client.events.create(
agent_id=agent.id,
action_type="transaction_initiated",
details={"amount": 840, "currency": "EUR", "venue": "euronext"},
)
print(event.risk_score) # 0.12
print(event.risk_band) # 'low'
asyncio.run(main())Configuration
from kakunin import Kakunin
client = Kakunin(
api_key="kak_live_...", # or kak_test_... for sandbox
base_url="https://api.kakunin.ai/v1", # default
timeout=10.0, # per-request timeout in seconds
)Use as an async context manager (async with) for automatic connection cleanup, or call await client.aclose() manually.
Agents
client.agents.create(**params)
agent = await client.agents.create(
name="Risk Analyser",
model="claude-sonnet-4-6",
version="v1.0.0",
model_hash="sha256:...",
)| Parameter | Type | Required | Description |
|---|---|---|---|
name | str | ✓ | Human-readable agent name |
model | str | ✓ | Model identifier |
version | str | ✓ | Semver or build tag |
model_hash | str | ✓ | SHA-256 of model weights/config. Binds cert to exact model version. |
financial_scope | dict | — | Encoded in the X.509 cert |
client.agents.certify(agent_id)
Issues an X.509 certificate via AWS KMS. Returns 409 if agent already has an active cert.
cert = await client.agents.certify(agent.id)
# cert.pem — full X.509 PEM
# cert.serial_number — use for verification
# cert.expires_at — 365 days (MiCA Art. 70)client.agents.get(agent_id)
agent = await client.agents.get("agt-8f3c2a91d4")
print(agent.status) # AgentStatus.activeclient.agents.list(**params)
agents, pagination = await client.agents.list(status="active", limit=50)client.agents.halt(agent_id, *, reason=None)
Immediately suspends the agent and revokes its active certificate. Signed halt receipt returned.
receipt = await client.agents.halt(agent.id, reason="Anomalous trade pattern detected")
print(receipt.signature)client.agents.risk(agent_id, *, window_days=7)
Rolling risk profile with drift score and event breakdown.
risk = await client.agents.risk(agent.id, window_days=30)
print(risk.risk_band) # RiskBand.high
print(risk.drift.drift_trend) # "increasing"Events
client.events.create(**params)
Ingest a behavioral event. Returns risk score synchronously (p99 200 ms).
event = await client.events.create(
agent_id=agent.id,
action_type="data_access",
chain_id="chn-xyz", # optional — group events into a decision chain
details={"resource": "portfolio_db", "rows_read": 1200},
)Action types:
| Type | Band (default) | Description |
|---|---|---|
api_call | low | Standard API invocation |
data_access | low | Read operation on data |
data_mutation | medium | Write / delete operation |
transaction_initiated | medium | Financial transaction |
transaction_anomaly | high | Transaction outside normal pattern |
authentication_failure | medium | Failed auth |
unauthorized_access_attempt | high | Scope violation attempt |
client.events.list(**params)
events, pagination = await client.events.list(agent_id=agent.id, band="high")Decision Chains
Group a sequence of agent events into a tamper-evident chain. Useful for auditing multi-step agentic workflows.
chain = await client.chains.create(name="Trade execution T-984231")
await client.events.create(agent_id=agent.id, action_type="data_access", chain_id=chain.chain_id)
await client.events.create(agent_id=agent.id, action_type="transaction_initiated", chain_id=chain.chain_id)
closed = await client.chains.close(chain.chain_id)
print(closed.chain_hash) # HMAC over ordered events — verifiable by anyoneVerify
Public endpoints — no API key required. Use these to verify counterparty agents.
# Verify a certificate serial
cert_data = await client.verify.certificate("c4f9-17a2-6b8e")
print(cert_data["status"]) # "active"
# Verify a signed message
result = await client.verify.message(
agent_id=agent.id,
payload={"order_id": "T-984231", "amount": 840},
signature="base64...",
certificate_serial="c4f9-17a2-6b8e",
)
print(result.valid) # TrueCompliance Reports
from kakunin.models import StandardsFramework
report = await client.reports.create(
agent_id=agent.id,
window_days=30,
standards_frameworks=[StandardsFramework.iso_27001, StandardsFramework.nist_csf],
)
# report.status == ReportStatus.generating — poll until readyFramework Integrations
The kakunin.integrations package provides two layers:
| Layer | Purpose | Raises on failure? |
|---|---|---|
| Scope verification | Pre-execution guard — checks agent is active and holds required scopes | ✓ ScopeViolationError |
| Behavioral event emission | Fire-and-forget monitoring — emits events at framework hook points | ✗ Never raises |
verify_agent_scope — universal decorator
Works with any Python function, any framework. Checks the agent's Kakunin status before the wrapped function runs.
from kakunin import Kakunin
from kakunin.integrations.scope import verify_agent_scope
client = Kakunin(api_key="kak_live_...")
@verify_agent_scope(client, agent_id="agt-123", required_scopes=["trade.execute"])
async def execute_trade(order: dict) -> dict:
# Only runs if agent is active and has "trade.execute" in metadata["scopes"]
...Parameters:
| Parameter | Type | Description |
|---|---|---|
kakunin | Kakunin | Authenticated client |
agent_id | str | Agent to verify |
required_scopes | list[str] | None | Scope strings checked against agent.metadata["scopes"]. Pass None to only check status. |
Works with both async def and def functions. When called from within a running event loop (e.g. inside an async framework), the sync check runs in a thread pool to avoid blocking.
verify_agent_scope raises ScopeViolationError and never swallows it — unlike the event-emission helpers. Handle it at the call site or let it propagate to your agent's error boundary.
LangChain — KakuninToolGuard
Wraps any LangChain BaseTool with a scope check before every invocation. Requires langchain-core ≥ 0.1.0.
from langchain_core.tools import tool
from kakunin import Kakunin
from kakunin.integrations.langchain import KakuninToolGuard
client = Kakunin(api_key="kak_live_...")
@tool
def trade_executor(order: str) -> str:
"""Execute a trade order."""
return execute_trade(order)
guarded = KakuninToolGuard(
kakunin=client,
agent_id="agt-123",
tool=trade_executor,
required_scopes=["trade.execute"],
)
# Drop into any LangChain agent or LCEL chain as a normal tool
agent = create_react_agent(llm, tools=[guarded], prompt=prompt)To guard an entire chain rather than a single tool, use langchain_scope_callback:
from kakunin.integrations.langchain import langchain_scope_callback
guard = langchain_scope_callback(client, agent_id="agt-123")
chain = my_chain.with_config(callbacks=[guard])
# Raises ScopeViolationError at chain start if agent is not activeLlamaIndex — KakuninFunctionToolGuard
Wraps a callable with scope verification and exposes the full LlamaIndex tool protocol (call, acall, metadata).
from kakunin import Kakunin
from kakunin.integrations.llamaindex import KakuninFunctionToolGuard
from llama_index.core.agent import ReActAgent
client = Kakunin(api_key="kak_live_...")
def portfolio_lookup(account_id: str) -> dict:
"""Fetch current portfolio positions."""
return fetch_positions(account_id)
guarded = KakuninFunctionToolGuard(
kakunin=client,
agent_id="agt-123",
fn=portfolio_lookup,
name="portfolio_lookup",
description="Fetch current portfolio positions for an account.",
required_scopes=["data.read"],
)
agent = ReActAgent.from_tools([guarded], llm=llm, verbose=True)CrewAI — KakuninCrewAgent
Subclass of crewai.Agent that emits Kakunin events at every task execution and tool call. Pass required_scopes to block task execution if the agent is not active. Requires crewai ≥ 0.28.0.
from crewai import Task, Crew
from kakunin import Kakunin
from kakunin.integrations.crewai import KakuninCrewAgent
client = Kakunin(api_key="kak_live_...")
agent = KakuninCrewAgent(
kakunin=client,
agent_id="agt-123",
required_scopes=["trade.execute"], # optional scope guard
# Standard CrewAI kwargs:
role="Risk Analyst",
goal="Assess and flag high-risk trades",
backstory="...",
tools=[...],
)
task = Task(description="Assess trade T-984231", agent=agent)
crew = Crew(agents=[agent], tasks=[task])
crew.kickoff()Every execute_task() call emits an api_call event on start and a data_access event on completion. Exceptions emit transaction_anomaly. If required_scopes are set, execution is blocked before any event is emitted.
AutoGen — KakuninConversableAgent
Subclass of autogen.ConversableAgent that emits Kakunin events on every receive() and generate_reply(). Pass required_scopes to block reply generation. Requires pyautogen ≥ 0.2.0.
from autogen import UserProxyAgent
from kakunin import Kakunin
from kakunin.integrations.autogen import KakuninConversableAgent
client = Kakunin(api_key="kak_live_...")
risk_engine = KakuninConversableAgent(
kakunin=client,
agent_id="agt-456",
required_scopes=["chat.reply"], # optional scope guard
# Standard AutoGen kwargs:
name="RiskEngine",
system_message="You are a risk analyst.",
llm_config={"model": "gpt-4o"},
)
user_proxy = UserProxyAgent(name="User")
user_proxy.initiate_chat(risk_engine, message="Assess trade T-984231")For attaching X-Kakunin-Cert-Serial to outbound httpx requests, use KakuninHttpxMixin:
from kakunin.integrations.autogen import KakuninHttpxMixin, KakuninConversableAgent
class MyAgent(KakuninHttpxMixin, KakuninConversableAgent):
pass
agent = MyAgent(
kakunin=client,
agent_id="agt-456",
cert_serial="c4f9-17a2-6b8e",
name="RiskEngine",
...
)
# Outbound calls via agent.http_client automatically include the cert serial
resp = await agent.http_client.get("https://internal-service/prices")LangGraph — kakunin_node
Decorator that wraps a LangGraph node function to emit a Kakunin event on every invocation.
from kakunin import Kakunin
from kakunin.integrations.langgraph import kakunin_node
client = Kakunin(api_key="kak_live_...")
@kakunin_node(client, agent_id="agt-123", action_type="data_access")
async def risk_engine_node(state: dict) -> dict:
# your node logic
return state
# Or wrap at graph construction time
graph.add_node("risk_engine", kakunin_node(client, agent_id="agt-123")(risk_engine_node))To add scope verification to a LangGraph node, combine the two decorators:
@verify_agent_scope(client, agent_id="agt-123", required_scopes=["data.read"])
@kakunin_node(client, agent_id="agt-123", action_type="data_access")
async def data_ingestion_node(state: dict) -> dict:
...Error handling
All SDK errors are instances of KakuninError:
from kakunin import Kakunin, KakuninError, AuthenticationError, RateLimitError
from kakunin.exceptions import ScopeViolationError
try:
await client.agents.certify(agent_id)
except RateLimitError as e:
print(f"Rate limited. Retry after {e.retry_after}s")
except AuthenticationError:
print("Invalid API key")
except KakuninError as e:
print(f"API error: {e}")ScopeViolationError
Raised by verify_agent_scope, KakuninToolGuard, KakuninFunctionToolGuard, and the required_scopes parameter on KakuninCrewAgent / KakuninConversableAgent.
from kakunin.exceptions import ScopeViolationError
try:
await guarded_fn()
except ScopeViolationError as e:
print(e.agent_id) # "agt-123"
print(e.agent_status) # "suspended"
print(e.missing_scopes) # ["trade.execute"]| Attribute | Type | Description |
|---|---|---|
agent_id | str | None | The agent that failed verification |
agent_status | str | None | "suspended" or "retired" if status check failed |
missing_scopes | list[str] | Scopes the agent lacked (empty if status was the issue) |
Models
Key Pydantic models exported from kakunin.models:
| Model | Description |
|---|---|
Agent | Agent record — id, name, model, version, status, model_hash |
AgentStatus | Enum: active, suspended, retired |
Certificate | id, serial_number, status, pem, issued_at, expires_at |
CertificateStatus | Enum: active, revoked, expired |
BehaviorEvent | id, agent_id, action_type, risk_score, risk_band |
RiskBand | Enum: low, medium, high |
AgentRisk | Rolling risk profile with drift info |
HaltReceipt | Signed halt receipt — signature, algorithm, halted_at |
DecisionChain | chain_id, name, status, chain_hash, events |
Environment variables
# Required
KAK_API_KEY=kak_live_... # or kak_test_... for sandboxTypeScript SDK
@kakunin/sdk — official TypeScript SDK for registering agents, issuing certificates, ingesting behavioral events, and verifying identities.
Supabase Integration
Bind Kakunin X.509 certificate metadata to Supabase RLS policies — row-level data isolation enforced by cryptographic agent identity.