APIElement

This element allows you to define a FastAPI endpoint that responds to a request, structure the type of payload it passes into your flow, and then extract the information from the Payload the APIElement recieves to build the response.

Instantiation

When creating an APIElement, you configure how the HTTP request body is constructed and emitted from your flow. You supply parameters and mappings that drive the API behavior.

Input Setup

1. input_map

A dictionary defining the expected inputs for the API. Each key creates an input port:

  • payload class: a direct Payload subclass (e.g., MessagePayload) creates a port for that type, with persist=True by default.
  • dict config: you can specify:
    • payload_type: class β€” Payload type to accept.
    • ports: list[InputPort] β€” upstream ports to connect.
    • persist: bool, default True β€” whether to clear the port after a response.

Example:

input_map = {
    'user_query': MessagePayload,
    'history': {'ports': [history_el.ports.messages_output], 'persist': False}
}

2. response_dict

Defines how to build the JSON response once inputs arrive. Keys must match input_map ports:

  • For each {port_name: {alias: attr_or_fn}}:
    • If attr_or_fn is a str, read payload.model.<attr_or_fn>.
    • If callable, invoke it (awaiting if coroutine) with the payload.

When all response_dict keys have data, the element emits a {alias: value} response and clears non-persistent inputs.

3. trigger_map (optional)

A mapping {trigger_port: (callback_fn, [required_ports])}:

  • Fires when trigger_port receives a payload and all required_ports hold data.
  • callback_fn(**payloads) returns a dict to emit as the response.

4. build_fn (optional)

A custom function (active_input_port, c, **port_payloads) β†’ dict with persistent state c:

  • Called on every input arrival (after response_dict and trigger_map).
  • Return a dict to emit as the response, or None to skip.

5. request_output_fn

A function mapping the final request dict to a Payload subclass. This parameter is required in non-test mode to construct the request payload into your flow; its signature also defines the default Pydantic model for request validation (unless overridden via request_pydantic_model).

6. Other parameters

  • endpoint: str β€” URL path (default: 'api').
  • outgoing_input_port: InputPort β€” connect api_output to a downstream element.
  • app: FastAPI β€” application to register routes on (default: shared AppRegistry).
  • test: bool β€” register only a test route if True.
  • timeout: float β€” seconds to wait for a response (default: 30s).
  • request_pydantic_model: Type[BaseModel] β€” optional override for the Pydantic model used to validate incoming requests; by default it’s inferred from the parameters of request_output_fn.

Input Ports

Port Name Payload Type Behavior
<key> As defined in input_map Receives payloads to include in the API request under key.

Output Ports

Port Name Payload Type Behavior
api_output Payload type returned by request_output_fn Emits the constructed payload into the flow when an HTTP request is received.

Response Strategies

The APIElement builds and emits the response in the following order of priority:

  1. Response Dictionary (response_dict)
  2. Trigger Map (trigger_map)
  3. Custom Build Function (build_fn)

1. Response Dictionary

When you supply a response_dict, once all keys have received payloads, the element:

  • Gathers each payload from its input port
  • For each (alias β†’ attr_or_fn) mapping:
    • If attr_or_fn is a str, reads payload.model.<attr_or_fn>
    • If attr_or_fn is callable, calls it with the payload (awaiting if necessary)
  • Returns a JSON object of {alias: value, ...} and clears those payloads

2. Trigger Map

If no response_dict is defined or not all ports are ready, but a trigger_map is provided:

  • When an input port listed in trigger_map receives a payload, and all its required_ports have payloads:
    • Calls the corresponding (callback_fn, required_ports)
    • Uses the returned dict as the API response

Trigger Semantics: - Similar to ContextBuilder, the trigger fires only when all required_ports have data. - Because input ports persist their payloads, triggers will still fire if dependencies were received earlier. - Unlike ContextBuilder, APIElement does not queue multiple trigger runs internally; each satisfying event invokes the trigger immediately (subject to HTTP-level serialization).

Concurrency Note: The APIElement processes one trigger at a time. While a previous response is pending (i.e., response_future is active), additional trigger invocations will not start new processing and incoming HTTP requests at the API endpoint will receive a 429 Too Many Requests error until the current request completes.

3. Custom Build Function

If neither response_dict nor trigger_map produces a response, and you provided a build_fn:

  • The element calls your build_fn(active_input_port, c, **port_kwargs) on every arrival
  • The returned dict (if non-None) becomes the API response

Request Serialization

The APIElement processes one request at a time to maintain consistent state. Internally, it:

  • Sets a response_future when a new request is received.
  • Blocks additional HTTP requests until the current response_future is fulfilled or times out.
  • Automatically clears the response_future on completion.

If a second request arrives before the first is resolved, the endpoint will return a 429 Too Many Requests error.