Plugins

Plugins are wrappers around models (like LLM, ASR, TTS), post-processors, and I/O (such as video and audio streams) that can be connected together to form pipelines in tandem with other plugin nodes. These are designed to reduce boilerplate code and quickly build up more complex agents.

A plugin recieves input into a processing queue, processes it, and then outputs the results across its output channels. Each output channel signifies a type of output the model has (for example, the ChatQuery plugin exposes outputs at the token, word, and sentence level) and each output channel can be connected to an arbitrary number of other plugin nodes.

By default plugins are threaded and run off their own queue, but they can be configured to run inline (unthreaded) by passing threaded=False to the plugin’s initializer. They can also be interrupted with the interrupt() function to abandon the current request and any remaining data in the input queue (for example, if you wanted to stop LLM generation early, or mute TTS output)

When creating new plugin types, implement the process() function to handle incoming data, and then return the outgoing data. You can also use simple callback functions to recieve data instead of needing to define your own Plugin class (like chat_plugin.add(my_function) to recieve chat output)

Plugin API

All plugins derive from the shared Plugin interface:

class Plugin(output_channels=1, relay=False, drop_inputs=False, threaded=True, **kwargs)[source]

Bases: Thread

Base class for plugins that process incoming/outgoing data from connections with other plugins, forming a pipeline or graph. Plugins can run either single-threaded or in an independent thread that processes data out of a queue.

Frequent categories of plugins:

  • sources: text prompts, images/video

  • process: LLM queries, RAG, dynamic LLM calls, image post-processors

  • outputs: print to stdout, save images/video

Inherited plugins should implement the process() function to handle incoming data.

Parameters:
  • output_channels (int) – the number of sets of output connections the plugin has

  • relay (bool) – if true, will relay any inputs as outputs after processing

  • drop_inputs (bool) – if true, only the most recent input in the queue will be used

  • threaded (bool) – if true, will spawn independent thread for processing the queue.

process(input, **kwargs)[source]

Abstract function that plugin instances should implement to process incoming data. Don’t call this function externally unless threaded=False, because otherwise the plugin’s internal thread dispatches from the queue.

Parameters:
  • input – input data to process from the previous plugin in the pipeline

  • kwargs – optional processing arguments that accompany this data

Returns:

Plugins should return their output data to be sent to downstream plugins. You can also call output() as opposed to returning it.

add(plugin, channel=0, **kwargs)[source]

Connect the output queue from this plugin with the input queue of another plugin, so that this plugin sends its output data to the other one.

Parameters:
  • plugin (Plugin|callable) – either the plugin to link to, or a callback function.

  • channel (int)

Returns:

A reference to this plugin instance (self)

__call__(input=None, **kwargs)[source]

Callable () operator alias for the input() function. This is provided for a more intuitive way of processing data like plugin(data) instead of plugin.input(data)

Parameters:
  • input – input data sent to the plugin’s process() function.

  • kwargs – additional arguments forwarded to the plugin’s process() function.

Returns:

None if the plugin is threaded, otherwise returns any outputs.

input(input=None, **kwargs)[source]

Add data to the plugin’s processing queue and return immediately, or process it now and return the results if threaded=False.

Parameters:
  • input – input data sent to the plugin’s process() function.

  • kwargs – additional arguments forwarded to the plugin’s process() function.

Returns:

None if the plugin is threaded, otherwise returns any outputs.

output(output, channel=0, **kwargs)[source]

Output data to the next plugin(s) on the specified channel (-1 for all channels)

property num_outputs

Return the total number of output connections across all channels

start()[source]

Start threads for all plugins in the graph that have threading enabled.

run()[source]

Processes the queue forever and automatically run when created with threaded=True

dispatch(input, **kwargs)[source]

Invoke the process() function on incoming data

interrupt(clear_inputs=True, recursive=True, block=None)[source]

Interrupt any ongoing/pending processing, and optionally clear the input queue along with any downstream queues, and optionally wait for processing of those requests to have finished.

Parameters:
  • clear_inputs (bool) – if True, clear any remaining inputs in this plugin’s queue.

  • recursive (bool) – if True, then any downstream plugins will also be interrupted.

  • block (bool) – is true, this function will wait until any ongoing processing has finished. This is done so that any lingering outputs don’t cascade downstream in the pipeline. If block is None, it will automatically be set to true if this plugin has outputs.

clear_inputs()[source]

Clear the input queue, dropping any data.

find(type)[source]

Return the plugin with the specified type by searching for it among the pipeline graph of inputs and output connections to other plugins.