StructuredRouterTransformer
At a high level, the purpose of this element is twofold. Firstly, it dynamically generates a JSON schema for an LLM to follow, and secondly, it handles the resulting structured data by routing it(and optionally transforming it) out of the desired ports.
By using a routing_map, it:
- Constructs a unified Pydantic
RootModelfrom sub-schemas for each route. - Parses incoming JSON messages (
MessagePayload) against this schema. - Dispatches each output field to its connected ports based on the
routefield. - Applies optional
transformfunctions or default payload types (StructuredPayload). - Supports dynamic schema updates via
<route>_<field>_schema_inputports.
It leverages the FlowController for port management and routing logic.
Instantiation
Arguments:
routing_map: dict
Mapping of route names to configuration dicts. Each dict may include the following keys: outputs: dict
Mapping of field names to spec dicts. schema: dict
Either {'pydantic_model': type} for static schemas or {'ports': [InputPort]} for dynamic updates. ports: list[InputPort]
InputPort instances to emit each field value. payload_type: type, optional
Payload class for emission (defaults to StructuredPayload). transform: callable, optional
Accepts raw field value and returns a Payload.
Example:
from typing import Literal
from pydantic import BaseModel
from pyllments.elements import StructuredRouterTransformer
from pyllments.payloads import MessagePayload, SchemaPayload, StructuredPayload
srt = StructuredRouterTransformer(
routing_map={
'reply': {
'outputs': {
'content': {
'schema': {'pydantic_model': str}, # static schema without schema input port
'ports': [chat_interface_el.ports.message_input],
'transform': lambda text: MessagePayload(content=text, role='assistant')
}
}
},
'dynamic': {
'outputs': {
'value': {
'schema': {'ports': [schema_provider_el.ports.schema_output]}, # dynamic schema via SchemaPayload
'ports': [consumer_el.ports.value_input],
'payload_type': StructuredPayload
}
}
},
'simple': {
'outputs': {
'number': {
'schema': {'pydantic_model': int}, # direct Pydantic type
'ports': [number_consumer_el.ports.number_input]
}
}
}
},
incoming_output_port=llm_el.ports.output['message_output']
)Note on schema spec:
- You can supply schema as a direct pydantic_model (no schema input port created).
- Or specify schema with ports: [InputPort] to receive schema updates at runtime.
- The schema input port always accepts SchemaPayload by default.
incoming_output_port: OutputPort, optional
Port to receive upstream MessagePayload JSON input (defaults to message_input).
flow_controller: FlowController, optional
Custom flow controller for routing and port management.
pydantic_model: BaseModel
Generated unified model for JSON input validation.
Input Ports
| Port Name | Payload Type | Behavior |
|---|---|---|
| message_input | MessagePayload | Receives a JSON payload containing a route and data fields to parse and dispatch. |
| <route>_<field>_schema_input | SchemaPayload | Receives SchemaPayload updates for the specified field; rebuilds the internal schema. |
Generated Output Ports
For each route defined in the routing_map and for each field under that routeβs outputs, the element automatically creates an output port named <route>_<field>. When a JSON message arrives on the message_input port:
- The transformer parses the message and validates it against its
pydantic_model. - It reads the
routediscriminator and looks up the corresponding outputs spec. - For each field in that outputs spec:
- It extracts the field value from the validated object.
- If a
transformfunction is provided, applies it to the value to produce a payload. - Otherwise, wraps the raw value in
StructuredPayload. - Emits the resulting payload on the
<route>_<field>port.
You can connect multiple downstream input ports to each of these output ports, and all will receive the payload when it is emitted.
Output Ports
| Port Name | Payload Type | Behavior |
|---|---|---|
<route>_<field> |
StructuredPayload (default) or Payload via transform |
Emits the field value packaged as a payload; you can connect multiple input ports to this output, and all will receive the payload. For StructuredPayload, access raw data via .model.data. |
schema_output |
SchemaPayload |
Emits the unified Pydantic schema when it changes; consumers can connect to receive updated schemas. |
Data and Schema Structures
Message Structure
The element receives a MessagePayload whose model.content is a JSON string matching the generated Pydantic schema. The expected JSON format is:
{
"route": "<route_name>",
"<field1>": <value1>,
"<field2>": <value2>,
...
}<route_name>must be one of the keys inrouting_map.- Subsequent fields correspond to the names defined under
routing_map[route]['outputs']and will be validated by Pydantic.
Schema Input Payload (<route>_<field>_schema_input)
To update a fieldβs schema at runtime, send a SchemaPayload with a Pydantic model:
from pyllments.payloads.schema import SchemaPayload
new_schema = SchemaPayload(schema=CustomFieldModel)
srt.ports.reply_content_schema_input > schema_provider_el.ports.schema_outputCustomFieldModelmust inherit frompydantic.BaseModelorRootModel.- Upon receipt, the element rebuilds the unified
pydantic_modelincluding this update.
Schema Output Payload (schema_output)
The element emits its unified pydantic_model whenever it changes via a SchemaPayload on schema_output:
payload.model.schemais the PydanticRootModelclass representing the union of all routes.- You can call
.model_json_schema()on this class to get the JSON Schema dictionary.
StructuredPayload Data
For output ports without a custom transform, the element wraps field values in StructuredPayload:
from pyllments.payloads.structured import StructuredPayload
payload = StructuredPayload(data={"tools": ["a","b"]})
print(payload.model.data) # {'tools': ['a', 'b']}.model.dataholds the raw Python object validated by Pydantic.
Routing Map & Generated Schema
This element dynamically builds a Pydantic union model from your routing_map. Below is an example mapping (taken from the mcp_flow.py recipe) and the corresponding JSON Schema snippet it produces.
Example routing_map
from pyllments.elements import StructuredRouterTransformer
from pyllments.payloads import MessagePayload, SchemaPayload, StructuredPayload
structured_router_el = StructuredRouterTransformer(
routing_map={
'reply': {
'description': 'Send a chat reply back to the user',
'outputs': {
'message': {
'description': 'The assistant's textual response',
'schema': {'pydantic_model': str},
'ports': [chat_interface_el.ports.message_input],
'transform': lambda txt: MessagePayload(content=txt, role='assistant')
}
}
},
'tools': {
'description': 'Tools invocation route',
'outputs': {
'tools': {
'description': 'Which tools to call',
'schema': {'payload_type': SchemaPayload},
'payload_type': StructuredPayload
}
}
}
}
)Example Generated JSON Schema
{
"$defs": {
"reply_route": {
"title": "reply_route",
"description": "Send a chat reply back to the user",
"type": "object",
"properties": {
"route": { "type": "string", "const": "reply" },
"message": {
"type": "string",
"description": "The assistant's textual response"
}
},
"required": ["route", "message"]
},
"tools_route": {
"title": "tools_route",
"description": "Tools invocation route",
"type": "object",
"properties": {
"route": { "type": "string", "const": "tools" },
"tools": {
"type": "array",
"items": { "type": "string" },
"description": "Which tools to call"
}
},
"required": ["route", "tools"]
}
},
"discriminator": { "propertyName": "route" },
"oneOf": [
{"$ref": "#/$defs/reply_route"},
{"$ref": "#/$defs/tools_route"}
],
"type": "object"
}