StructuredRouterTransformer

At a high level, the purpose of this element is twofold. Firstly, it dynamically generates a JSON schema for an LLM to follow, and secondly, it handles the resulting structured data by routing it(and optionally transforming it) out of the desired ports.

By using a routing_map, it:

  1. Constructs a unified Pydantic RootModel from sub-schemas for each route.
  2. Parses incoming JSON messages (MessagePayload) against this schema.
  3. Dispatches each output field to its connected ports based on the route field.
  4. Applies optional transform functions or default payload types (StructuredPayload).
  5. Supports dynamic schema updates via <route>_<field>_schema_input ports.

It leverages the FlowController for port management and routing logic.

Instantiation

Arguments:

routing_map: dict
Mapping of route names to configuration dicts. Each dict may include the following keys: outputs: dict
Mapping of field names to spec dicts.
schema: dict
Either {'pydantic_model': type} for static schemas or {'ports': [InputPort]} for dynamic updates.
ports: list[InputPort]
InputPort instances to emit each field value.
payload_type: type, optional
Payload class for emission (defaults to StructuredPayload).
transform: callable, optional
Accepts raw field value and returns a Payload.

Example:

from typing import Literal
from pydantic import BaseModel
from pyllments.elements import StructuredRouterTransformer
from pyllments.payloads import MessagePayload, SchemaPayload, StructuredPayload


srt = StructuredRouterTransformer(
    routing_map={
        'reply': {
            'outputs': {
                'content': {
                    'schema': {'pydantic_model': str},  # static schema without schema input port
                    'ports': [chat_interface_el.ports.message_input],
                    'transform': lambda text: MessagePayload(content=text, role='assistant')
                }
            }
        },
        'dynamic': {
            'outputs': {
                'value': {
                    'schema': {'ports': [schema_provider_el.ports.schema_output]},  # dynamic schema via SchemaPayload
                    'ports': [consumer_el.ports.value_input],
                    'payload_type': StructuredPayload
                }
            }
        },
        'simple': {
            'outputs': {
                'number': {
                    'schema': {'pydantic_model': int},    # direct Pydantic type
                    'ports': [number_consumer_el.ports.number_input]
                }
            }
        }
    },
    incoming_output_port=llm_el.ports.output['message_output']
)

Note on schema spec:
- You can supply schema as a direct pydantic_model (no schema input port created).
- Or specify schema with ports: [InputPort] to receive schema updates at runtime.
- The schema input port always accepts SchemaPayload by default.

incoming_output_port: OutputPort, optional
Port to receive upstream MessagePayload JSON input (defaults to message_input).

flow_controller: FlowController, optional
Custom flow controller for routing and port management.

pydantic_model: BaseModel
Generated unified model for JSON input validation.

Input Ports

Port Name Payload Type Behavior
message_input MessagePayload Receives a JSON payload containing a route and data fields to parse and dispatch.
<route>_<field>_schema_input SchemaPayload Receives SchemaPayload updates for the specified field; rebuilds the internal schema.

Generated Output Ports

For each route defined in the routing_map and for each field under that route’s outputs, the element automatically creates an output port named <route>_<field>. When a JSON message arrives on the message_input port:

  1. The transformer parses the message and validates it against its pydantic_model.
  2. It reads the route discriminator and looks up the corresponding outputs spec.
  3. For each field in that outputs spec:
    • It extracts the field value from the validated object.
    • If a transform function is provided, applies it to the value to produce a payload.
    • Otherwise, wraps the raw value in StructuredPayload.
    • Emits the resulting payload on the <route>_<field> port.

You can connect multiple downstream input ports to each of these output ports, and all will receive the payload when it is emitted.

Output Ports

Port Name Payload Type Behavior
<route>_<field> StructuredPayload (default) or Payload via transform Emits the field value packaged as a payload; you can connect multiple input ports to this output, and all will receive the payload. For StructuredPayload, access raw data via .model.data.
schema_output SchemaPayload Emits the unified Pydantic schema when it changes; consumers can connect to receive updated schemas.

Data and Schema Structures

Message Structure

The element receives a MessagePayload whose model.content is a JSON string matching the generated Pydantic schema. The expected JSON format is:

{
  "route": "<route_name>",
  "<field1>": <value1>,
  "<field2>": <value2>,
  ...
}
  • <route_name> must be one of the keys in routing_map.
  • Subsequent fields correspond to the names defined under routing_map[route]['outputs'] and will be validated by Pydantic.

Schema Input Payload (<route>_<field>_schema_input)

To update a field’s schema at runtime, send a SchemaPayload with a Pydantic model:

from pyllments.payloads.schema import SchemaPayload

new_schema = SchemaPayload(schema=CustomFieldModel)
srt.ports.reply_content_schema_input > schema_provider_el.ports.schema_output
  • CustomFieldModel must inherit from pydantic.BaseModel or RootModel.
  • Upon receipt, the element rebuilds the unified pydantic_model including this update.

Schema Output Payload (schema_output)

The element emits its unified pydantic_model whenever it changes via a SchemaPayload on schema_output:

  • payload.model.schema is the Pydantic RootModel class representing the union of all routes.
  • You can call .model_json_schema() on this class to get the JSON Schema dictionary.

StructuredPayload Data

For output ports without a custom transform, the element wraps field values in StructuredPayload:

from pyllments.payloads.structured import StructuredPayload

payload = StructuredPayload(data={"tools": ["a","b"]})
print(payload.model.data)  # {'tools': ['a', 'b']}
  • .model.data holds the raw Python object validated by Pydantic.

Routing Map & Generated Schema

This element dynamically builds a Pydantic union model from your routing_map. Below is an example mapping (taken from the mcp_flow.py recipe) and the corresponding JSON Schema snippet it produces.

Example routing_map

from pyllments.elements import StructuredRouterTransformer
from pyllments.payloads import MessagePayload, SchemaPayload, StructuredPayload

structured_router_el = StructuredRouterTransformer(
    routing_map={
        'reply': {
            'description': 'Send a chat reply back to the user',
            'outputs': {
                'message': {
                    'description': 'The assistant's textual response',
                    'schema': {'pydantic_model': str},
                    'ports': [chat_interface_el.ports.message_input],
                    'transform': lambda txt: MessagePayload(content=txt, role='assistant')
                }
            }
        },
        'tools': {
            'description': 'Tools invocation route',
            'outputs': {
                'tools': {
                    'description': 'Which tools to call',
                    'schema': {'payload_type': SchemaPayload},
                    'payload_type': StructuredPayload
                }
            }
        }
    }
)

Example Generated JSON Schema

{
  "$defs": {
    "reply_route": {
      "title": "reply_route",
      "description": "Send a chat reply back to the user",
      "type": "object",
      "properties": {
        "route": { "type": "string", "const": "reply" },
        "message": {
          "type": "string",
          "description": "The assistant's textual response"
        }
      },
      "required": ["route", "message"]
    },
    "tools_route": {
      "title": "tools_route",
      "description": "Tools invocation route",
      "type": "object",
      "properties": {
        "route": { "type": "string", "const": "tools" },
        "tools": {
          "type": "array",
          "items": { "type": "string" },
          "description": "Which tools to call"
        }
      },
      "required": ["route", "tools"]
    }
  },
  "discriminator": { "propertyName": "route" },
  "oneOf": [
    {"$ref": "#/$defs/reply_route"},
    {"$ref": "#/$defs/tools_route"}
  ],
  "type": "object"
}