Working with Streams
Developer can choose piping data from LLM gradually, using Embabel streaming capabilities.
In addition to streaming the raw text output from the LLM, Embabel streams can also include LLM reasoning events, so-called “thinking”, and stream of objects created by the LLM. This feature is well aligned with Embabel focus on object-oriented programming model.
Concepts
Section titled “Concepts”StreamingEvent- wraps Thinking or user ObjectStreamingPromptRunnerBuilder- runner with streaming capabilities- Spring Reactive Programming Support for Spring AI ChatClient as underlying infrastructure
- All reactive callbacks, such as doOnNext, doOnComplete, etc. are at developer’s disposal
Example - Simple Thinking and Object Streaming with Callbacks
Section titled “Example - Simple Thinking and Object Streaming with Callbacks” PromptRunner runner = ai.withDefaultLlm() // Example uses qwen3:latest .withToolObject(Tooling.class);
String prompt = "What are exactly two the most hottest months in Florida and their respective highest temperatures";
// Use StreamingPromptBuilder instead of Kotlin extension function Flux<StreamingEvent<MonthItem>> results = new StreamingPromptRunnerBuilder(runner) .streaming() .withPrompt(prompt) .createObjectStreamWithThinking(MonthItem.class);
// Subscribe with real reactive callbacks using builder pattern results .timeout(Duration.ofSeconds(150)) .doOnSubscribe(subscription -> { logger.info("Stream subscription started"); }) .doOnNext(event -> { if (event.isThinking()) { String content = event.getThinking(); receivedEvents.add("THINKING: " + content); logger.info("Integration test received thinking: {}", content); } else if (event.isObject()) { MonthItem obj = event.getObject(); receivedEvents.add("OBJECT: " + obj.getName()); logger.info("Integration test received object: {}", obj.getName()); } }) .doOnError(error -> { errorOccurred.set(error); logger.error("Integration test stream error: {}", error.getMessage()); }) .doOnComplete(() -> { completionCalled.set(true); logger.info("Integration test stream completed successfully"); }) .blockLast(Duration.ofSeconds(6000));Example - Simple Raw Text Streaming with Callbacks
Section titled “Example - Simple Raw Text Streaming with Callbacks” PromptRunner runner = ai.withDefaultLlm() // Example uses qwen3:latest;
String prompt = "What is the highest building in Paris?";
// Use StreamingPromptBuilder instead of Kotlin extension function Flux<String> results = new StreamingPromptRunnerBuilder(runner) .streaming() .withPrompt(prompt) .generateStream();
// Subscribe with real reactive callbacks using builder pattern results .timeout(Duration.ofSeconds(150)) .doOnSubscribe(subscription -> { logger.info("Stream subscription started"); }) .doOnNext(content -> { receivedTextChunks.add(content); logger.info("Integration test received text chunk: {}", content); }) .doOnError(error -> { errorOccurred.set(error); logger.error("Integration test stream error: {}", error.getMessage()); }) .doOnComplete(() -> { completionCalled.set(true); logger.info("Integration test stream completed successfully"); }) .blockLast(Duration.ofSeconds(6000));