APIElement
This element allows you to define a FastAPI endpoint that responds to a request, structure the type of payload it passes into your flow, and then extract the information from the Payload the APIElement recieves to build the response.
Instantiation
When creating an APIElement, you configure how the HTTP request body is constructed and emitted from your flow. You supply parameters and mappings that drive the API behavior.
Input Setup
1. input_map
A dictionary defining the expected inputs for the API. Each key creates an input port:
- payload class: a direct
Payloadsubclass (e.g.,MessagePayload) creates a port for that type, withpersist=Trueby default. - dict config: you can specify:
payload_type: class β Payload type to accept.ports: list[InputPort] β upstream ports to connect.persist: bool, defaultTrueβ whether to clear the port after a response.
Example:
input_map = {
'user_query': MessagePayload,
'history': {'ports': [history_el.ports.messages_output], 'persist': False}
}2. response_dict
Defines how to build the JSON response once inputs arrive. Keys must match input_map ports:
- For each
{port_name: {alias: attr_or_fn}}:- If
attr_or_fnis astr, readpayload.model.<attr_or_fn>. - If callable, invoke it (awaiting if coroutine) with the payload.
- If
When all response_dict keys have data, the element emits a {alias: value} response and clears non-persistent inputs.
3. trigger_map (optional)
A mapping {trigger_port: (callback_fn, [required_ports])}:
- Fires when
trigger_portreceives a payload and allrequired_portshold data. callback_fn(**payloads)returns a dict to emit as the response.
4. build_fn (optional)
A custom function (active_input_port, c, **port_payloads) β dict with persistent state c:
- Called on every input arrival (after
response_dictandtrigger_map). - Return a dict to emit as the response, or
Noneto skip.
5. request_output_fn
A function mapping the final request dict to a Payload subclass. This parameter is required in non-test mode to construct the request payload into your flow; its signature also defines the default Pydantic model for request validation (unless overridden via request_pydantic_model).
6. Other parameters
endpoint: strβ URL path (default:'api').outgoing_input_port: InputPortβ connectapi_outputto a downstream element.app: FastAPIβ application to register routes on (default: shared AppRegistry).test: boolβ register only a test route ifTrue.timeout: floatβ seconds to wait for a response (default: 30s).request_pydantic_model: Type[BaseModel]β optional override for the Pydantic model used to validate incoming requests; by default itβs inferred from the parameters ofrequest_output_fn.
Input Ports
| Port Name | Payload Type | Behavior |
|---|---|---|
<key> |
As defined in input_map |
Receives payloads to include in the API request under key. |
Output Ports
| Port Name | Payload Type | Behavior |
|---|---|---|
api_output |
Payload type returned by request_output_fn |
Emits the constructed payload into the flow when an HTTP request is received. |
Response Strategies
The APIElement builds and emits the response in the following order of priority:
- Response Dictionary (
response_dict) - Trigger Map (
trigger_map) - Custom Build Function (
build_fn)
1. Response Dictionary
When you supply a response_dict, once all keys have received payloads, the element:
- Gathers each payload from its input port
- For each
(alias β attr_or_fn)mapping:- If
attr_or_fnis astr, readspayload.model.<attr_or_fn> - If
attr_or_fnis callable, calls it with the payload (awaiting if necessary)
- If
- Returns a JSON object of
{alias: value, ...}and clears those payloads
2. Trigger Map
If no response_dict is defined or not all ports are ready, but a trigger_map is provided:
- When an input port listed in
trigger_mapreceives a payload, and all itsrequired_portshave payloads:- Calls the corresponding
(callback_fn, required_ports) - Uses the returned dict as the API response
- Calls the corresponding
Trigger Semantics: - Similar to ContextBuilder, the trigger fires only when all required_ports have data. - Because input ports persist their payloads, triggers will still fire if dependencies were received earlier. - Unlike ContextBuilder, APIElement does not queue multiple trigger runs internally; each satisfying event invokes the trigger immediately (subject to HTTP-level serialization).
Concurrency Note: The APIElement processes one trigger at a time. While a previous response is pending (i.e., response_future is active), additional trigger invocations will not start new processing and incoming HTTP requests at the API endpoint will receive a 429 Too Many Requests error until the current request completes.
3. Custom Build Function
If neither response_dict nor trigger_map produces a response, and you provided a build_fn:
- The element calls your
build_fn(active_input_port, c, **port_kwargs)on every arrival - The returned dict (if non-
None) becomes the API response
Request Serialization
The APIElement processes one request at a time to maintain consistent state. Internally, it:
- Sets a
response_futurewhen a new request is received. - Blocks additional HTTP requests until the current
response_futureis fulfilled or times out. - Automatically clears the
response_futureon completion.
If a second request arrives before the first is resolved, the endpoint will return a 429 Too Many Requests error.