Overview
The Streaming package (framework/streaming) is a core utility within Bifrost designed to handle real-time data streams from AI providers. It provides a robust and efficient mechanism for plugins like Logging, OTel, and Maxim to process, aggregate, and format streaming responses for chat completions, transcriptions, and other real-time AI interactions.
Its primary purpose is to simplify the complexity of handling chunked data, ensuring that plugins can work with complete, well-structured responses without needing to implement their own aggregation logic.
How It Works
The streaming package uses anAccumulator to manage the lifecycle of a streaming operation. This process is designed to be highly efficient, using sync.Pool to reuse objects and minimize memory allocations.
-
Initialization: When a plugin that needs to process streams (like
loggingorotel) is initialized, it creates a newstreaming.Accumulator. -
Stream Start: In the
PreHookphase of a request, if the request is identified as a streaming type, the plugin callsaccumulator.CreateStreamAccumulator(requestID, timestamp)to prepare a dedicated buffer for the incoming chunks of that request. -
Chunk Processing: In the
PostHookphase, as each chunk of the streaming response arrives, the plugin passes it toaccumulator.ProcessStreamingResponse().- For each
deltachunk, the accumulator appends it to the buffer associated with the request ID. - The accumulator handles different types of streams, including chat, audio, and transcriptions, using specialized logic to correctly piece together the data. For example, it accumulates text deltas, tool call argument deltas, and other parts of the message.
- For each
-
Finalization: When the final chunk of the stream is received (indicated by a
finish_reasonor other provider-specific signal),ProcessStreamingResponseperforms the final assembly.- It reconstructs the complete
ChatMessageor other response object from all the stored chunks. - It calculates total token usage, cost, and latency.
- It returns a
ProcessedStreamResponseobject withStreamResponseTypeFinaland the complete, structuredAccumulatedData.
- It reconstructs the complete
-
Cleanup: Once the final response is processed, the accumulator cleans up all buffered chunks for that request ID, returning them to the
sync.Poolfor reuse.
Key Components
Accumulator
The central component of the package. It is a thread-safe manager that:
- Tracks stream chunks for multiple concurrent requests using a
sync.Map. - Uses
sync.Poolto recycle*StreamChunkobjects, reducing garbage collection overhead. - Provides methods to add chunks (
addChatStreamChunk,addAudioStreamChunk, etc.). - Includes a periodic cleanup worker to remove stale accumulators for incomplete or orphaned requests.
ProcessStreamingResponse
This is the main entry point for plugins to process stream data. It inspects the response type and delegates to the appropriate handler:
processChatStreamingResponseprocessAudioStreamingResponseprocessTranscriptionStreamingResponseprocessResponsesStreamingResponse
ProcessedStreamResponse, which indicates whether the chunk is a delta or the final aggregated response.
Stream-Specific Builders
The package includes internal logic to correctly build complete messages from chunks. For example,buildCompleteMessageFromChatStreamChunks iterates through the collected ChatStreamChunk objects, appending content deltas and assembling tool calls into a final, coherent schemas.ChatMessage.
Usage Example
The following snippet from thelogging plugin shows how the streaming package is used in practice within a plugin’s PostHook.
streaming package. This greatly simplifies plugin development and ensures consistent data handling across the framework.
