EAI
TL;DR - Kafka Integration: Async Messaging
APIs are useful for application integration, but do not deal with the reality that the receiving system might be down.
Message Brokers like Kafka address this with guaranteed async delivery of messages. The Broker stores the message, delivering it (possibly later) when the receiver is up.
Message Brokers also support multi-cast: you publish a message to a "topic", and other systems subscribe. This is often casually described as "pub/sub".
GenAI-Logic provides AI-driven support for both directions:
- Publish — rules trigger outbound Kafka messages when row state changes
- Subscribe — inbound messages are consumed, validated, and committed using the same reusable logic as your APIs
Overview
APIs and Messages
GenAI-Logic provides multiple approaches for integrations:
- Ad hoc access via automatically created self-serve APIs
- Custom APIs
- Kafka integrations — both outbound publication and inbound message subscription (pub/sub).

These approaches are typically more efficient than ETL-based integrations:
| Requirement | Poor Practice | Good Practice | Best Practice | Ideal |
|---|---|---|---|---|
| Ad Hoc Integration | ETL | APIs | Self-Serve APIs | Automated Self-Serve APIs |
| Custom APIs | Coded APIs | Create with AI | ||
| Messages | Coded Pub/Sub | Create with AI | ||
| Logic | Logic in UI | Reusable Logic - any source | Declarative Rules .. Extensible with Python |
-
APIs should be self-serve: not requiring continuing server development — they avoid the overhead of nightly Extract, Transfer and Load (ETL)
-
Logic should be re-used over the UI and API transaction sources — logic embedded in UI controls cannot be shared with APIs and message consumers
AI-based creation
Traditional API/Kafka integrations are fiddly — extensive hand-code or complex metadata. With AI, you provide a message shape and name the exceptions; the system maps the rest:
- Sample message: AI reads the JSON shape and auto-maps obvious fields silently — no entry needed for direct name matches
- FIELD_EXCEPTIONS — exceptions only: specify renames, dot-notation joins (
Customer.name), and child collections; everything else is inferred _unresolvedguard: fields AI can't map block server start — no silent failures
For full working examples, see Sample-Basic-EAI.md.
Publish — Outbound Kafka Messages
GenAI-Logic supports two publish patterns:
| Pattern | What's sent | How |
|---|---|---|
| Key only | {"id": 42} |
Default — describe topic + trigger condition |
| By-example | Shaped message: selected fields, joins, child collections | Describe desired JSON sample → AI auto-matches obvious fields, adds # TODO on uncertain ones, blocks server start on unresolved |
By-example
If all sample fields map with no ambiguity, no _unresolved list is generated and FIELD_EXCEPTIONS may be empty — the file is just SAMPLE + the rule.
Key Only — Event Notification
The default publish — just the primary key. Consumer calls back to fetch current data.
🤖 AI generates logic/logic_discovery/order_shipping.py:
import integration.kafka.kafka_producer as kafka_producer
from logic_bank.exec_row_logic.logic_row import LogicRow
import models
def send_order_to_kafka(row: models.Order, old_row: models.Order, logic_row: LogicRow):
if row.date_shipped is not None and row.date_shipped != old_row.date_shipped:
kafka_producer.publish_kafka_message(
topic="order_shipping",
logic_row=logic_row)
# → publishes: {"id": 42}
Rule.after_flush_row_event(on_class=models.Order, calling=send_order_to_kafka)
By-Example — Event-Carried State Transfer
For B2B or cross-org scenarios, publish a compound message so the consumer is self-contained.
Publish to Kafka topic 'order_shipping' when an Order is shipped (date_shipped is set).
The outbound message should look like this example:
{
"order_id": 1,
"order_date": "2026-04-06",
"customer_name": "Alfreds Futterkiste",
"total": 100.00,
"items": [
{
"quantity": 2,
"product_name": "Chai",
"unit_price": 25.00
}
]
}
🤖 AI generates two files:
integration/kafka/kafka_publish_discovery/order_shipping.py— mapper config (SAMPLE, FIELD_EXCEPTIONS,_unresolvedguard)logic/logic_discovery/place_order/app_integration.py— adds theRule.after_flush_row_eventthat calls it
The AI matches sample fields to model columns by name. Three outcomes:
| Outcome | What AI does | Server starts? |
|---|---|---|
| ✅ Obvious match | Silent — no entry in FIELD_EXCEPTIONS |
✅ |
| ⚠️ Plausible guess | Entry in FIELD_EXCEPTIONS with # TODO: verify |
✅ — verify at test time, remove TODO when satisfied |
| ❌ No candidate | Added to _unresolved list — server will not start |
❌ — must fix first |
AI also reports in chat immediately at generation time:
✅ Generated
integration/kafka/kafka_publish_discovery/order_shipping.py— 1 must-resolve:region(no matching column found in Order, Customer, or Item). Fix_unresolvedbefore starting the server.
from integration.system.EaiPublishMapper import serialize_row
# ✅ High-confidence auto-matches (no entry needed):
# order_id → Order.id
# order_date → Order.order_date
# quantity → Item.quantity
# unit_price → Item.unit_price
FIELD_EXCEPTIONS = {
"customer_name": "Customer.name", # TODO: verify — join via Customer relationship
"total": "amount_total", # TODO: verify — rename from amount_total
"items": "ItemList", # child collection: relationship name
"product_name": "Product.name", # TODO: verify — join via Product from Item
}
# ❌ MUST RESOLVE — server will not start until this list is empty.
# For each entry: add correct mapping to FIELD_EXCEPTIONS above, then remove from here.
_unresolved = [
"'region': no matching column found in Order, Customer, or Item",
]
if _unresolved:
raise NotImplementedError(
f"order_shipping.py: resolve before use:\n " + "\n ".join(_unresolved))
SAMPLE = {
"order_id": None,
"order_date": None,
"customer_name": None,
"total": None,
"items": [
{ "quantity": None, "product_name": None, "unit_price": None }
]
}
def row_to_dict(row):
return serialize_row(row, sample=SAMPLE, exceptions=FIELD_EXCEPTIONS)
from integration.kafka.kafka_publish_discovery import order_shipping
import integration.kafka.kafka_producer as kafka_producer
from logic_bank.exec_row_logic.logic_row import LogicRow
import models
def send_order_to_kafka(row: models.Order, old_row: models.Order, logic_row: LogicRow):
if row.date_shipped is not None and row.date_shipped != old_row.date_shipped:
kafka_producer.publish_kafka_message(
topic="order_shipping",
logic_row=logic_row,
mapper=order_shipping)
Rule.after_flush_row_event(on_class=models.Order, calling=send_order_to_kafka)
Note
Publish mapper files are named after the topic (order_shipping.py) and live in integration/kafka/kafka_publish_discovery/, symmetric to kafka_subscribe_discovery/ for subscribe. publish_kafka_message with no mapper sends key-only. The EaiPublishMapper engine handles dot-notation joins (Customer.name), child collection traversal, type serialization, and None-safe access.
Subscribe — Inbound Kafka Messages
Inbound Kafka messages are consumed, parsed, and committed using the same reusable business logic as your APIs. This means credit checks, derivations, and constraints all fire — regardless of whether the transaction originates from a UI, an API call, or a Kafka message.
What You Do
Describe the topic, the expected JSON payload, the target tables, and the field mappings:
Subscribe to Kafka topic `order_b2b` (JSON format).
The payload is a single order with items:
{
"Account": "Alice",
"Notes": "Kafka order from sales",
"Items": [
{ "Name": "Widget", "QuantityOrdered": 1 },
{ "Name": "Gadget", "QuantityOrdered": 2 }
]
}
Target tables: Order, Item (from models.py).
Field mappings:
- `Account` → look up Customer by Customer.name, set Order.customer_id
- `Notes` → Order.notes
- `Items` array → Item rows: `Name` → look up Product by Product.name, set Item.product_id; `QuantityOrdered` → Item.quantity
What the System Does — The Two-Message Pattern
A single-transaction consumer loses data if parsing fails mid-flush — the raw payload is gone. Instead, the system generates a two-message pattern for reliability:
topic: order_b2b
→ Consumer 1: save raw JSON blob → OrderB2bMessage (Tx 1 — always commits)
→ row_event: blob insert → publish to order_b2b_processed
→ Consumer 2: parse → Order + Items, resolve FKs, LogicBank rules (Tx 2)
Parse failures leave is_processed = False on the blob row — queryable and retryable. The raw message is never lost.
The AI generates these key files:
| File | Role |
|---|---|
integration/kafka/kafka_subscribe_discovery/order_b2b.py |
Consumer logic — design details, field mapping, registration |
logic/logic_discovery/place_order/order_b2b_consume.py |
row_event bridge — publishes blob to order_b2b_processed (no inline parse) |
integration/OrderB2bMapper.py |
JSON → Order + Items (3-tier mapping contract) |
api/api_discovery/order_b2b_kafka_consume_debug.py |
/consume_debug/order_b2b — test without Kafka |
integration/kafka/message_formats/order_b2b.json |
Message format spec / test fixture |
test/order_b2b_reset.sh |
Reset Kafka topics + log between runs |
Testing
Because the subscriber is wired to a debug endpoint, no running Kafka instance is needed for initial development and testing.
# APILOGICPROJECT_CONSUME_DEBUG=true must be set in config/default.env
python api_logic_server_run.py
curl 'http://localhost:5656/consume_debug/order_b2b?file=integration/kafka/message_formats/order_b2b.json'
sqlite3 database/db.sqlite "SELECT * FROM order_b2b_message; SELECT * FROM 'order'; SELECT * FROM item;"
docker compose -f integration/kafka/dockercompose_start_kafka.yml up -d
# Enable KAFKA_CONSUMER + KAFKA_PRODUCER in config/default.env
bash test/order_b2b_reset.sh # recreates topics + clears log
# Start server; publish sample JSON to order_b2b topic
Shared Logic
The subscribe consumer and your API endpoints use the same underlying logic rules. There is no need to duplicate validation or derived-field logic for the messaging path.