Server sent event (SSE) timing delays

Viewed 53

I'm trying to implement an AI chat interface with a typewriter effect like ChatGPT. My backend API is written in Go and uses sashabaranov/go-openai for interfacing with OpenAI. Here's the controller, which is served at the route /answer/api/v1/chat/completion

package controller

import (
	"os"
	"strings"

	"github.com/apache/incubator-answer/internal/base/handler"
	"github.com/gin-gonic/gin"
	"github.com/sashabaranov/go-openai"
	"github.com/segmentfault/pacman/errors"
)

// ChatController chat controller
type ChatController struct{}

// NewChatController creates a new ChatController
func NewChatController() *ChatController {
	return &ChatController{}
}

// ChatCompletion godoc
// @Summary Get Chat Completion
// @Description Get Chat Completion
// @Tags api-answer
// @Accept  json
// @Produce  json
// @Param prompt query string true "Prompt for the chat completion"
// @Router /answer/api/v1/chat/completion [get]
// @Success 200 {object} map[string]interface{}
func (cc *ChatController) ChatCompletion(ctx *gin.Context) {
	prompt := ctx.Query("prompt")
	if prompt == "" {
		handler.HandleResponse(ctx, gin.Error{Err: errors.New(400, "prompt is required")}, nil)
		return
	}

	OPENAI_API_KEY := os.Getenv("OPENAI_API_KEY")

	client := openai.NewClient(OPENAI_API_KEY)
	stream, err := client.CreateChatCompletionStream(ctx, openai.ChatCompletionRequest{
		Model: "gpt-4",
		Messages: []openai.ChatCompletionMessage{
			{Role: "system", Content: "You are a helpful assistant."},
			{Role: "user", Content: prompt},
		},
	})
	if err != nil {
		handler.HandleResponse(ctx, err, nil)
		return
	}
	defer stream.Close()

	ctx.Writer.Header().Set("Content-Type", "text/event-stream")
	ctx.Writer.Header().Set("Cache-Control", "no-cache")
	ctx.Writer.Header().Set("Connection", "keep-alive")

	const NEWLINE = "$NEWLINE$"

	for {
		select {
		case <-ctx.Done():
			return
		default:
			response, err := stream.Recv()
			if err != nil {
				handler.HandleResponse(ctx, err, nil)
				return
			}
			if response.Choices[0].Delta.Content != "" {
				contentWithNewlines := strings.ReplaceAll(response.Choices[0].Delta.Content, "\n", NEWLINE)
				ctx.Writer.Write([]byte("event: token\n"))
				ctx.Writer.Write([]byte("data: " + contentWithNewlines + "\n\n"))
				ctx.Writer.Flush()
			}
		}
	}
}

On the frontend I have a React page with an EventSource with a listener that should be outputting each token as it's received:

/* eslint-disable */
import React, { useEffect, useState, useCallback } from 'react';


const Chats = () => {
  const [output, setOutput] = useState<string>("");
  const [isStarted, setIsStarted] = useState(false);
  const [isStreamFinished, setIsStreamFinished] = useState<boolean>(false);
  const prompt = 'mic check'; // Replace with your actual prompt
  const NEWLINE = '$NEWLINE$'
  
  
  const startChat = useCallback(() => {
    setIsStarted(true);
    setOutput("");
    
    const eventSource = new EventSource(
      `/answer/api/v1/chat/completion?prompt=${encodeURIComponent(prompt)}`,
    );
    
    eventSource.addEventListener('error', () => eventSource.close());
    
    eventSource.addEventListener("token", (e) => {
      // avoid newlines getting messed up
      const token = e.data.replaceAll(NEWLINE, "\n");
      console.log(token);
      setOutput((prevOutput) => prevOutput + token);
    });
    
    eventSource.addEventListener("finished", (e) => {
      console.log("finished", e);
      eventSource.close();
      setIsStreamFinished(true);
    });
    
    return () => {
      eventSource.close();
    };
  }, [prompt]);
  

  
  return (
    <div>
    <p>Prompt: {prompt}</p>
    {!isStarted && <button onClick={startChat}>Start</button>}
    <div>{output}</div>
    </div>
  );
};

export default Chats;

What actually happens when I hit the Start button is that it pauses until the completion finishes and then all the tokens are output at once. If I have a prompt that generates a longer response, that upfront pause is longer and still all the tokens are output at once. I'm not sure if the delay is on the frontend or backend.

2 Answers

The problem you're experiencing is that the tokens are accumulated and sent to the client all at once, rather than being streamed individually as they are generated. This points to an issue with how streaming is handled in your Go backend. Here's a more detailed walkthrough and potential fixes.

Issues and Explanations

  1. HTTP Response Headers:
    Ensure that the response headers are set correctly and early enough to enable streaming behavior.

    ctx.Stream(func(w io.Writer) bool {
        w.Header().Set("Content-Type", "text/event-stream")
        w.Header().Set("Cache-Control", "no-cache")
        w.Header().Set("Connection", "keep-alive")
        return true
    })
    

    However, you might need to move this inside the handler properly.

  2. Event Loop and Buffering:
    The default implementation of http.Server in Go may buffer the response until it's fully available unless we explicitly flush it. Using ctx.Writer.Flush should work, but it’s critical to ensure no middleware or proxy is causing buffering.

  3. Use of strings.ReplaceAll:
    The replacement of newlines (\n) with a placeholder ($NEWLINE$) might be causing issues or delays in some cases.

Possible Solutions

Solution 1: Ensure Streaming Is Properly Handled

Make sure you're flushing the response after every token:

for {
    select {
    case <-ctx.Done():
        return
    default:
        response, err := stream.Recv()
        if err != nil {
            handler.HandleResponse(ctx, err, nil)
            return
        }
        if response.Choices[0].Delta.Content != "" {
            content := response.Choices[0].Delta.Content
            ctx.Writer.Write([]byte("event: token\n"))
            ctx.Writer.Write([]byte("data: " + content + "\n\n"))
            ctx.Writer.Flush()
        }
    }
}

Here, ctx.Writer.Flush() ensures the written data is sent to the client immediately.

Solution 2: Disable Any Middleware Buffers

Ensure that no middleware or reverse proxies are buffering responses. For example:

  • In your Gin middleware, ensure you’re disabling buffering.
  • If using nginx or another reverse proxy, disable buffering:
proxy_buffering off;

Solution 3: Content-Replacement Check

To simplify, and ensure there's no unintentional delay being introduced by strings.ReplaceAll, replace the special tokens handling to a more direct sending approach.

Example Full Revised Handler:

// ChatCompletion is handling the chat stream completion
func (cc *ChatController) ChatCompletion(ctx *gin.Context) {
    prompt := ctx.Query("prompt")
    if prompt == "" {
        handler.HandleResponse(ctx, gin.Error{Err: errors.New(400, "prompt is required")}, nil)
        return
    }

    OPENAI_API_KEY := os.Getenv("OPENAI_API_KEY")

    client := openai.NewClient(OPENAI_API_KEY)
    stream, err := client.CreateChatCompletionStream(ctx, openai.ChatCompletionRequest{
        Model: "gpt-4",
        Messages: []openai.ChatCompletionMessage{
            {Role: "system", Content: "You are a helpful assistant."},
            {Role: "user", Content: prompt},
        },
    })
    if err != nil {
        handler.HandleResponse(ctx, err, nil)
        return
    }
    defer stream.Close()

    ctx.Writer.Header().Set("Content-Type", "text/event-stream")
    ctx.Writer.Header().Set("Cache-Control", "no-cache")
    ctx.Writer.Header().Set("Connection", "keep-alive")
    ctx.Writer.(http.Flusher).Flush()

    for {
        response, err := stream.Recv()
        if err != nil {
            if err == io.EOF {
                ctx.Writer.Write([]byte("event: finished\n\n"))
                ctx.Writer.(http.Flusher).Flush()
                return
            }
            handler.HandleResponse(ctx, err, nil)
            return
        }
        content := response.Choices[0].Delta.Content
        if content != "" {
            ctx.Writer.Write([]byte("event: token\n"))
            ctx.Writer.Write([]byte("data: " + content + "\n\n"))
            ctx.Writer.(http.Flusher).Flush()
        }
    }
}

React Frontend Adjustments

The frontend code looks correct regarding how it handles streaming. Ensure your server setup reflects the changes for proper streaming.

Final Notes:

  1. If you're running the backend locally and it works correctly (streaming-as-you-go), but it buffers in production, it's likely the configuration of your reverse proxy (like Nginx).
  2. Continuous testing becomes crucial to understand what's affecting the delay. Time your operations and use logging to pinpoint where latency might be introduced.

With these adjustments, your Go backend should properly stream data to your React frontend, avoiding the initial pause and streaming each token as it is generated.

I googled and found this issue: https://github.com/gin-gonic/gin/issues/1589 but when I tried adding the X-Accel-Buffering: no; header it still didn't work.

The issue you're experiencing is likely due to the way the frontend and backend are interacting. Let's break it down:

Frontend (React):

  1. When you click the "Start" button, the startChat function is called, which sets isStarted to true and resets the output.
  2. An EventSource is created, which establishes a bi-directional communication channel over the WebSocket protocol.
  3. As the server sends messages, the frontend receives them and updates the output.

Backend (Go):

  1. When the frontend requests the /answer/api/v1/chat/completion endpoint, it establishes a streaming connection.
  2. The server sends each token separately, prefixed with an "event" event and a newline character (\n).
  3. The server does not wait for the completion; it sends each token individually, which is what you're seeing on the frontend.

Now, let's discuss the problem:

From your description, it seems like the frontend is not receiving the tokens in real-time. Instead, it appears that all the tokens are being received at once. This is because the EventSource is buffering the events until the stream is closed.

To fix this issue, you can use fetch instead of the EventSource API to make the request. This will allow the browser to handle the response chunk-by-chunk, rather than buffering the entire response.

Alternatively, if you prefer to use EventSource, you can set the buffering property to line-by-line:

const eventSource = new EventSource(
  `/answer/api/v1/chat/completion?prompt=${encodeURIComponent(prompt)}`,
  { buffering: 'line-by-line' }
);

On the server-side, you can use the text/event-stream content type, which allows the browser to process the response chunk-by-chunk. This is the reason for adding ctx.Writer.Header().Set("Content-Type", "text/event-stream");

Here is your modified backend handler:

func (cc *ChatController) ChatCompletion(ctx *gin.Context) {
	prompt := ctx.Query("prompt")
	if prompt == "" {
		handler.HandleResponse(ctx, gin.Error{Err: errors.New(400, "prompt is required")}, nil)
		return
	}

	OPENAI_API_KEY := os.Getenv("OPENAI_API_KEY")

	client := openai.NewClient(OPENAI_API_KEY)
	stream, err := client.CreateChatCompletionStream(ctx, openai.ChatCompletionRequest{
		Model: "gpt-4",
		Messages: []openai.ChatCompletionMessage{
			{Role: "system", Content: "You are a helpful assistant."},
			{Role: "user", Content: prompt},
		},
	})
	if err != nil {
		handler.HandleResponse(ctx, err, nil)
		return
	}
	defer stream.Close()

	ctx.Writer.Header().Set("Content-Type", "text/event-stream")
	ctx.Writer.Header().Set("Cache-Control", "no-cache")
	ctx.Writer.Header().Set("Connection", "keep-alive")

	const NEWLINE = "$NEWLINE$"

	for {
		select {
		case <-ctx.Done():
			return
		default:
			response, err := stream.Recv()
			if err != nil {
				handler.HandleResponse(ctx, err, nil)
				return
			}
			if response.Choices[0].Delta.Content != "" {
				contentWithNewlines := strings.ReplaceAll(response.Choices[0].Delta.Content, "\n", NEWLINE)
				buf := &bytes.Buffer{}
				io.WriteString(buf, "event: token\n")
				io.WriteString(buf, "data: " + contentWithNewlines + "\n\n")
				ctx.Writer.Write(buf.Bytes())
				ctx.writer.Flush()
			}
		}
	}
}

This will allow the browser to process the response chunk-by-chunk, reducing the delay and improving the experience.