APIElement
This element allows you to define a FastAPI endpoint that responds to a request, structure the type of payload it passes into your flow, and then extract the information from the Payload the APIElement recieves to build the response.
Instantiation
When creating an APIElement
, you configure how the HTTP request body is constructed and emitted from your flow. You supply parameters and mappings that drive the API behavior.
Input Setup
1. input_map
A dictionary defining the expected inputs for the API. Each key creates an input port:
- payload class: a direct
Payload
subclass (e.g.,MessagePayload
) creates a port for that type, withpersist=True
by default. - dict config: you can specify:
payload_type
: class β Payload type to accept.ports
: list[InputPort
] β upstream ports to connect.persist
: bool, defaultTrue
β whether to clear the port after a response.
Example:
= {
input_map 'user_query': MessagePayload,
'history': {'ports': [history_el.ports.messages_output], 'persist': False}
}
2. response_dict
Defines how to build the JSON response once inputs arrive. Keys must match input_map
ports:
- For each
{port_name: {alias: attr_or_fn}}
:- If
attr_or_fn
is astr
, readpayload.model.<attr_or_fn>
. - If callable, invoke it (awaiting if coroutine) with the payload.
- If
When all response_dict
keys have data, the element emits a {alias: value}
response and clears non-persistent inputs.
3. trigger_map
(optional)
A mapping {trigger_port: (callback_fn, [required_ports])}
:
- Fires when
trigger_port
receives a payload and allrequired_ports
hold data. callback_fn(**payloads)
returns a dict to emit as the response.
4. build_fn
(optional)
A custom function (active_input_port, c, **port_payloads) β dict
with persistent state c
:
- Called on every input arrival (after
response_dict
andtrigger_map
). - Return a dict to emit as the response, or
None
to skip.
5. request_output_fn
A function mapping the final request dict to a Payload
subclass. This parameter is required in non-test mode to construct the request payload into your flow; its signature also defines the default Pydantic model for request validation (unless overridden via request_pydantic_model
).
6. Other parameters
endpoint: str
β URL path (default:'api'
).outgoing_input_port: InputPort
β connectapi_output
to a downstream element.app: FastAPI
β application to register routes on (default: shared AppRegistry).test: bool
β register only a test route ifTrue
.timeout: float
β seconds to wait for a response (default: 30s).request_pydantic_model: Type[BaseModel]
β optional override for the Pydantic model used to validate incoming requests; by default itβs inferred from the parameters ofrequest_output_fn
.
Input Ports
Port Name | Payload Type | Behavior |
---|---|---|
<key> |
As defined in input_map |
Receives payloads to include in the API request under key . |
Output Ports
Port Name | Payload Type | Behavior |
---|---|---|
api_output |
Payload type returned by request_output_fn |
Emits the constructed payload into the flow when an HTTP request is received. |
Response Strategies
The APIElement
builds and emits the response in the following order of priority:
- Response Dictionary (
response_dict
) - Trigger Map (
trigger_map
) - Custom Build Function (
build_fn
)
1. Response Dictionary
When you supply a response_dict
, once all keys have received payloads, the element:
- Gathers each payload from its input port
- For each
(alias β attr_or_fn)
mapping:- If
attr_or_fn
is astr
, readspayload.model.<attr_or_fn>
- If
attr_or_fn
is callable, calls it with the payload (awaiting if necessary)
- If
- Returns a JSON object of
{alias: value, ...}
and clears those payloads
2. Trigger Map
If no response_dict
is defined or not all ports are ready, but a trigger_map
is provided:
- When an input port listed in
trigger_map
receives a payload, and all itsrequired_ports
have payloads:- Calls the corresponding
(callback_fn, required_ports)
- Uses the returned dict as the API response
- Calls the corresponding
Trigger Semantics: - Similar to ContextBuilder, the trigger fires only when all required_ports
have data. - Because input ports persist their payloads, triggers will still fire if dependencies were received earlier. - Unlike ContextBuilder, APIElement does not queue multiple trigger runs internally; each satisfying event invokes the trigger immediately (subject to HTTP-level serialization).
Concurrency Note: The APIElement processes one trigger at a time. While a previous response is pending (i.e., response_future
is active), additional trigger invocations will not start new processing and incoming HTTP requests at the API endpoint will receive a 429 Too Many Requests
error until the current request completes.
3. Custom Build Function
If neither response_dict
nor trigger_map
produces a response, and you provided a build_fn
:
- The element calls your
build_fn(active_input_port, c, **port_kwargs)
on every arrival - The returned dict (if non-
None
) becomes the API response
Request Serialization
The APIElement
processes one request at a time to maintain consistent state. Internally, it:
- Sets a
response_future
when a new request is received. - Blocks additional HTTP requests until the current
response_future
is fulfilled or times out. - Automatically clears the
response_future
on completion.
If a second request arrives before the first is resolved, the endpoint will return a 429 Too Many Requests
error.