About GraphAI Streaming
With GraphAI, you can seamlessly handle streaming processes for LLM prompts through server-side, client-side, or server-client integrations.
The term "seamless" here refers to the ability to use GraphAI's default Express middleware or HTTP/Streaming Agent Filters to operate without explicitly managing server-client or streaming processes.
Once implemented, processes can be migrated from the client to the server or adjusted server configurations by simply changing settings with minimal code modification. This flexibility allows developers to focus on business logic without worrying about the complexities of implementing streaming processes.
- Execute solely in a browser or Node.js.
- Execute collaboratively between browser and server:
- Run agents on the server while operating GraphAI in the browser.
- Run the entire graph on the server by posting graph data from the browser.
These combinations allow transparent streaming regardless of setup.
Overview of Streaming Processes
Sequential Data Transmission During an agent's execution, data is passed to the Agent Filter via a callback function as it is generated. Instead of processing data in bulk, the callback function is invoked sequentially to handle each data unit.
Processing within Callback Functions The callback function receives information such as
nodeId
,agentId
, anddata
from the context and processes data individually for different environments.In Browsers Data received through the callback function is displayed in real time in the browser, enabling live updates.
In Express (Web Servers) Data received through the callback function is processed and returned as an HTTP response. This enables immediate responses in scenarios where APIs are utilized.
This mechanism facilitates real-time data processing, display, and responses during agent execution.
Passing Data to Agent Filters within Agents
for await (const message of chatStream) {
const token = message.choices[0].delta.content;
if (filterParams && filterParams.streamTokenCallback && token) {
filterParams.streamTokenCallback(token);
}
}
agentFilter
This section explains how to use the streamAgentFilterGenerator
function to create a agentFilter
for streaming processes. By specifying a callback function, users can acquire an agentFilter
capable of processing data in real time.
export const streamAgentFilterGenerator = <T>(callback: (context: AgentFunctionContext, data: T) => void) => {
const streamAgentFilter: AgentFilterFunction = async (context, next) => {
if (context.debugInfo.isResult) {
context.filterParams.streamTokenCallback = (data: T) => {
callback(context, data);
};
}
return next(context);
};
return streamAgentFilter;
};
Usage
- Define a Callback Function Create a callback function that takes
context
anddata
as arguments. This function is invoked each time the agent receives data, allowing real-time processing.
const myCallback = (context, data) => {
console.log("Data received:", data);
// Implement necessary processing here
};
- Obtain the streamAgentFilter Pass the callback function to the
streamAgentFilterGenerator
to generate anagentFilter
that processes data sequentially. ThisagentFilter
handles real-time data processing during agent execution.
const myAgentFilter = streamAgentFilterGenerator(myCallback);
This completes the setup for agentFilter processing using streamAgentFilterGenerator
. Pass this agentFilter to the agentFilters
parameter of the GraphAI constructor to build a mechanism for sequential data processing through callback functions.
About Streaming Processes
1. Direct Usage of GraphAI (Browser)
- Receive streaming data through a callback function via the Agent Filter.
- Obtain the overall result from
graphai.run()
. - Streaming and result processing can be controlled on the implementation side, without needing to consider delimiters or data formats.
2. Express Usage
- Sequential strings are sent due to HTTP mechanisms.
- By default, tokens (strings) stream sequentially, and the result (content) is returned as a JSON string after an
__END__
delimiter. - Token processing, delimiters, and content handling can be customized by passing a callback function to Express.
Express Control
Express provides middleware to support streaming servers, non-streaming servers, and servers supporting both. By using middleware that supports both, streaming control is possible through HTTP headers, even if the agent supports streaming.
Specific determination is made based on the presence of the following HTTP header:
Content-Type
set totext/event-stream
.
Server-Client Model Addendum
When operating GraphAI in the browser and agents on the server, you need to use both streamAgentFilter
and httpAgentFilter
together. The httpAgentFilter
bypasses browser processing and executes the agent on the server. If the agent does not exist in the browser, skip agent validation using bypassAgentIds
.
const agentFilters = [
{
name: "streamAgentFilter",
agent: streamAgentFilter,
agentIds: streamAgents,
},
{
name: "httpAgentFilter",
agent: httpAgentFilter,
filterParams: {
server: {
baseUrl: "http://localhost:8085/agents",
},
},
agentIds: serverAgentIds,
},
];
const graphai = new GraphAI(selectedGraph.value, agents, { agentFilters, bypassAgentIds: serverAgentIds });
Reference Sources
Example of Express callback function: https://github.com/receptron/graphai-utils/blob/b302835d978ce1017c6e105898431eda28adcbd4/packages/express/src/agents.ts#L122-L135
Implementation of Express: https://github.com/receptron/graphai-utils/tree/main/packages/express/src
Implementation of streamAgentFilterGenerator: https://github.com/receptron/graphai/blob/main/packages/agent_filters/src/filters/stream.ts
Implementation of httpAgentFilter (GraphAI Agent's agent Filter client format): https://github.com/receptron/graphai/blob/main/packages/agent_filters/src/filters/http_client.ts
Original document (Japanese) https://zenn.dev/singularity/articles/graphai-streaming