StructuredRouterTransformer
At a high level, the purpose of this element is twofold. Firstly, it dynamically generates a JSON schema for an LLM to follow, and secondly, it handles the resulting structured data by routing it(and optionally transforming it) out of the desired ports.
By using a routing_map
, it:
- Constructs a unified Pydantic
RootModel
from sub-schemas for each route. - Parses incoming JSON messages (
MessagePayload
) against this schema. - Dispatches each output field to its connected ports based on the
route
field. - Applies optional
transform
functions or default payload types (StructuredPayload
). - Supports dynamic schema updates via
<route>_<field>_schema_input
ports.
It leverages the FlowController
for port management and routing logic.
Instantiation
Arguments:
routing_map: dict
Mapping of route names to configuration dicts. Each dict may include the following keys: outputs: dict
Mapping of field names to spec dicts. schema: dict
Either {'pydantic_model': type}
for static schemas or {'ports': [InputPort]}
for dynamic updates. ports: list[InputPort]
InputPort instances to emit each field value. payload_type: type, optional
Payload class for emission (defaults to StructuredPayload
). transform: callable, optional
Accepts raw field value and returns a Payload.
Example:
from typing import Literal
from pydantic import BaseModel
from pyllments.elements import StructuredRouterTransformer
from pyllments.payloads import MessagePayload, SchemaPayload, StructuredPayload
= StructuredRouterTransformer(
srt ={
routing_map'reply': {
'outputs': {
'content': {
'schema': {'pydantic_model': str}, # static schema without schema input port
'ports': [chat_interface_el.ports.message_input],
'transform': lambda text: MessagePayload(content=text, role='assistant')
}
}
},'dynamic': {
'outputs': {
'value': {
'schema': {'ports': [schema_provider_el.ports.schema_output]}, # dynamic schema via SchemaPayload
'ports': [consumer_el.ports.value_input],
'payload_type': StructuredPayload
}
}
},'simple': {
'outputs': {
'number': {
'schema': {'pydantic_model': int}, # direct Pydantic type
'ports': [number_consumer_el.ports.number_input]
}
}
}
},=llm_el.ports.output['message_output']
incoming_output_port )
Note on schema spec:
- You can supply schema
as a direct pydantic_model
(no schema input port created).
- Or specify schema
with ports: [InputPort]
to receive schema updates at runtime.
- The schema input port always accepts SchemaPayload
by default.
incoming_output_port: OutputPort, optional
Port to receive upstream MessagePayload
JSON input (defaults to message_input
).
flow_controller: FlowController, optional
Custom flow controller for routing and port management.
pydantic_model: BaseModel
Generated unified model for JSON input validation.
Input Ports
Port Name | Payload Type | Behavior |
---|---|---|
message_input | MessagePayload | Receives a JSON payload containing a route and data fields to parse and dispatch. |
<route>_<field>_schema_input | SchemaPayload | Receives SchemaPayload updates for the specified field; rebuilds the internal schema. |
Generated Output Ports
For each route defined in the routing_map
and for each field under that routeβs outputs
, the element automatically creates an output port named <route>_<field>
. When a JSON message arrives on the message_input
port:
- The transformer parses the message and validates it against its
pydantic_model
. - It reads the
route
discriminator and looks up the corresponding outputs spec. - For each field in that outputs spec:
- It extracts the field value from the validated object.
- If a
transform
function is provided, applies it to the value to produce a payload. - Otherwise, wraps the raw value in
StructuredPayload
. - Emits the resulting payload on the
<route>_<field>
port.
You can connect multiple downstream input ports to each of these output ports, and all will receive the payload when it is emitted.
Output Ports
Port Name | Payload Type | Behavior |
---|---|---|
<route>_<field> |
StructuredPayload (default) or Payload via transform |
Emits the field value packaged as a payload; you can connect multiple input ports to this output, and all will receive the payload. For StructuredPayload , access raw data via .model.data . |
schema_output |
SchemaPayload |
Emits the unified Pydantic schema when it changes; consumers can connect to receive updated schemas. |
Data and Schema Structures
Message Structure
The element receives a MessagePayload
whose model.content
is a JSON string matching the generated Pydantic schema. The expected JSON format is:
{
"route": "<route_name>",
"<field1>": <value1>,
"<field2>": <value2>,
...
}
<route_name>
must be one of the keys inrouting_map
.- Subsequent fields correspond to the names defined under
routing_map[route]['outputs']
and will be validated by Pydantic.
Schema Input Payload (<route>_<field>_schema_input
)
To update a fieldβs schema at runtime, send a SchemaPayload
with a Pydantic model:
from pyllments.payloads.schema import SchemaPayload
= SchemaPayload(schema=CustomFieldModel)
new_schema > schema_provider_el.ports.schema_output srt.ports.reply_content_schema_input
CustomFieldModel
must inherit frompydantic.BaseModel
orRootModel
.- Upon receipt, the element rebuilds the unified
pydantic_model
including this update.
Schema Output Payload (schema_output
)
The element emits its unified pydantic_model
whenever it changes via a SchemaPayload
on schema_output
:
payload.model.schema
is the PydanticRootModel
class representing the union of all routes.- You can call
.model_json_schema()
on this class to get the JSON Schema dictionary.
StructuredPayload Data
For output ports without a custom transform
, the element wraps field values in StructuredPayload
:
from pyllments.payloads.structured import StructuredPayload
= StructuredPayload(data={"tools": ["a","b"]})
payload print(payload.model.data) # {'tools': ['a', 'b']}
.model.data
holds the raw Python object validated by Pydantic.
Routing Map & Generated Schema
This element dynamically builds a Pydantic union model from your routing_map
. Below is an example mapping (taken from the mcp_flow.py
recipe) and the corresponding JSON Schema snippet it produces.
Example routing_map
from pyllments.elements import StructuredRouterTransformer
from pyllments.payloads import MessagePayload, SchemaPayload, StructuredPayload
= StructuredRouterTransformer(
structured_router_el ={
routing_map'reply': {
'description': 'Send a chat reply back to the user',
'outputs': {
'message': {
'description': 'The assistant's textual response',
'schema': {'pydantic_model': str},
'ports': [chat_interface_el.ports.message_input],
'transform': lambda txt: MessagePayload(content=txt, role='assistant')
}
}
},'tools': {
'description': 'Tools invocation route',
'outputs': {
'tools': {
'description': 'Which tools to call',
'schema': {'payload_type': SchemaPayload},
'payload_type': StructuredPayload
}
}
}
} )
Example Generated JSON Schema
{
"$defs": {
"reply_route": {
"title": "reply_route",
"description": "Send a chat reply back to the user",
"type": "object",
"properties": {
"route": { "type": "string", "const": "reply" },
"message": {
"type": "string",
"description": "The assistant's textual response"
}
},
"required": ["route", "message"]
},
"tools_route": {
"title": "tools_route",
"description": "Tools invocation route",
"type": "object",
"properties": {
"route": { "type": "string", "const": "tools" },
"tools": {
"type": "array",
"items": { "type": "string" },
"description": "Which tools to call"
}
},
"required": ["route", "tools"]
}
},
"discriminator": { "propertyName": "route" },
"oneOf": [
{"$ref": "#/$defs/reply_route"},
{"$ref": "#/$defs/tools_route"}
],
"type": "object"
}