Java WebFlux集成DeepSeek大模型:流式接入完整实现

· Java技术教程

当下开发AI对话类后端项目时,选用传统同步方式调用DeepSeek大模型接口会暴露出越来越多的问题,这类调用方式不仅容易引发线程阻塞、用户等待耗时过长的情况,还会在高并发场景下让内存占用持续攀升,根本无法满足实时交互类业务的功能与性能需求。

针对这类问题,Spring WebFlux凭借异步非阻塞编程模式、原生支持SSE服务端消息推送的核心特性,成为大模型流式输出场景下适配度极高的后端技术方案,本文依托响应式编程框架,细致讲解Java WebFlux对接DeepSeek大模型的流式接入方式,提供可直接落地使用的工程化代码,同时补充生产环境部署与调试的关键要点,助力开发者快速完成功能搭建与上线。

技术选型与核心优势

方案选型依据

接下来通过对比传统同步调用与流式响应调用两种实现方式,讲解WebFlux+DeepSeek流式技术组合的实际使用优势:

整体实现流程

这套方案整体执行流程十分清晰,前端通过SSE协议与后端搭建长久连接,再由WebFlux服务接收前端发送的请求,搭建契合DeepSeek要求的流式请求参数,分批接收大模型传回的响应数据,解析数据后实时推送给前端完成页面展示,全程实现异步非阻塞运行、数据实时回传的效果。

环境准备与依赖配置

基础运行环境

本次开发基于Spring Boot 2.7.15稳定版搭建,运行环境兼容JDK8及以上版本,更推荐使用JDK11及以上版本,部署项目前需要提前在DeepSeek官方平台申请API密钥,保障接口能够正常发起调用。

Maven核心依赖

在项目pom.xml文件中导入WebFlux及相关配套依赖,全程采用Spring官方原生组件,无需额外引入第三方工具包,以此降低项目整体依赖的复杂程度,具体导入内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<dependencies>
    <!-- Spring WebFlux 核心依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <!-- Jackson JSON解析 -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
    <!-- lombok 简化代码 -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
</dependencies>

配置文件参数

在application.yml配置文件中统一配置DeepSeek接口调用的各项参数,将配置内容与业务代码拆分管理,避免硬编码导致后期维护难度上升,具体配置项如下所示:

deepseek:
  # 官方API基础路径
  base-url: https://api.deepseek.com/v1
  # 申请的API密钥
  api-key: sk-xxxxxxxxxxxxxxxxxxxxxxxx
  # 调用的模型名称
  model-name: deepseek-chat
  # 响应超时时间,大模型推理适当延长
  timeout: 30000
# WebFlux 线程池优化(生产环境建议配置)
spring:
  webflux:
    netty:
      worker-threads: 16
      connection-timeout: 5000

核心代码实现

配置类:WebClient实例化

WebFlux框架中选用WebClient替代传统RestTemplate,实现异步非阻塞的HTTP请求调用,这一部分会通过配置类创建全局单例WebClient对象,重复利用网络连接资源,提升接口调用的效率,具体实现代码如下:

package com.example.deepseek.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.netty.http.client.HttpClient;
import java.time.Duration;

@Configuration
public class DeepSeekWebClientConfig {

    @Value("${deepseek.base-url}")
    private String baseUrl;

    @Value("${deepseek.api-key}")
    private String apiKey;

    @Value("${deepseek.timeout}")
    private Long timeout;

    @Bean
    public WebClient deepSeekWebClient() {
        // 配置HTTP客户端超时时间
        HttpClient httpClient = HttpClient.create()
                .responseTimeout(Duration.ofMillis(timeout));
        // 构建WebClient,统一配置请求头
        return WebClient.builder()
                .baseUrl(baseUrl)
                .clientConnector(new ReactorClientHttpConnector(httpClient))
                .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer " + apiKey)
                .build();
    
    }
}

DTO实体类:请求与响应参数

依照DeepSeek官方API接口的参数规范,创建请求报文与流式响应解析实体类,保障数据传输与解析过程不会出错,具体代码编写如下:

请求实体DeepSeekChatRequest

package com.example.deepseek.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class DeepSeekChatRequest {
    /**
     * 模型名称
     */
    private String model;
    /**
     * 对话消息列表
     */
    private List<Message> messages;
    /**
     * 开启流式输出
     */
    private Boolean stream = true;

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Message {
        /**
         * 角色:user/assistant
         */
        private String role;
        /**
         * 消息内容
         */
        private String content;
    
    }
}

流式响应实体DeepSeekStreamResponse

package com.example.deepseek.entity;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
import java.util.List;

@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class DeepSeekStreamResponse {
    /**
     * 响应结果列表
     */
    private List<Choice> choices;

    @Data
    @JsonIgnoreProperties(ignoreUnknown = true)
    public static class Choice {
        /**
         * 增量内容
         */
        private Delta delta;
        /**
         * 结束标识
         */
        private String finish_reason;
    
    }

    @Data
    @JsonIgnoreProperties(ignoreUnknown = true)
    public static class Delta {
        /**
         * 流式输出的文本内容
         */
        private String content;
    
    }
}

Service层:流式请求核心逻辑

业务逻辑层封装大模型流式调用的核心逻辑,接收前端传输的用户提问内容,搭建请求参数并调用DeepSeek官方接口,解析流式响应数据后,最终返回Flux类型的流式结果数据,核心实现代码如下:

package com.example.deepseek.service;

import com.example.deepseek.entity.DeepSeekChatRequest;
import com.example.deepseek.entity.DeepSeekStreamResponse;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import javax.annotation.Resource;
import java.util.Collections;

@Slf4j
@Service
public class DeepSeekStreamService {

    @Resource
    private WebClient deepSeekWebClient;

    @Value("${deepseek.model-name}")
    private String modelName;

    private final ObjectMapper objectMapper = new ObjectMapper();

    /**
     * 流式对话接口
     * @param content 用户提问内容
     * @return 流式文本数据
     */
    public Flux<String> streamChat(String content) {
        // 构建请求参数
        DeepSeekChatRequest request = new DeepSeekChatRequest();
        request.setModel(modelName);
        request.setMessages(Collections.singletonList(
                new DeepSeekChatRequest.Message("user", content)
        ));

        // 发起流式请求
        return deepSeekWebClient.post()
                .uri("/chat/completions")
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(request)
                // 接收SSE流式响应
                .accept(MediaType.TEXT_EVENT_STREAM)
                .retrieve()
                // 转换为流式字符串
                .bodyToFlux(String.class)
                // 过滤空数据和结束标识
                .filter(chunk -> chunk != null && !chunk.isEmpty())
                // 解析每一块流式数据
                .map(chunk -> {
                    try {
                        // 去除SSE前缀data:
                        String jsonStr = chunk.replace("data: ", "").trim();
                        // 流结束标识,直接返回空
                        if ("[DONE]".equals(jsonStr)) {
                            return "";
                        }
                        // 解析JSON提取增量内容
                        DeepSeekStreamResponse response = objectMapper.readValue(jsonStr, DeepSeekStreamResponse.class);
                        if (response.getChoices() != null && !response.getChoices().isEmpty()) {
                            DeepSeekStreamResponse.Delta delta = response.getChoices().get(0).getDelta();
                            return delta.getContent() == null ? "" : delta.getContent();
                        }
                    } catch (Exception e) {
                        log.error("DeepSeek流式响应解析失败", e);
                    }
                    return "";
                })
                // 过滤空字符串
                .filter(result -> !result.isEmpty());
    
    }
}

Controller层:对外暴露流式接口

控制层对外提供SSE流式调用接口,方便前端搭建长连接并获取实时推送数据,接口返回数据格式需设置为TEXT_EVENT_STREAM_VALUE,确保流式推送功能正常生效,对应代码如下:

package com.example.deepseek.controller;

import com.example.deepseek.service.DeepSeekStreamService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import javax.annotation.Resource;

@Slf4j
@RestController
@RequestMapping("/api/deepseek")
public class DeepSeekStreamController {

    @Resource
    private DeepSeekStreamService deepSeekStreamService;

    /**
     * 流式对话接口
     * produces指定为SSE格式,实现流式推送
     */
    @GetMapping(value = "/stream/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> streamChat(@RequestParam("content") String content) {
        log.info("接收用户流式提问:{}", content);
        return deepSeekStreamService.streamChat(content);
    
    }
}

前端对接测试

后端接口开发完成后,前端可借助原生EventSource对象搭建SSE长连接,接收后端实时推送的流式数据,将文本逐段渲染展示在页面上,这一部分提供简易前端测试代码,方便快速验证功能是否可用:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>DeepSeek流式对话</title>
</head>
<body>
    <div id="result" style="white-space: pre-line;"></div>
    <script>
        // 建立SSE连接
        const content = "请介绍一下Java WebFlux";
        const eventSource = new EventSource(`/api/deepseek/stream/chat?content=${encodeURIComponent(content)}`);
        const resultDiv = document.getElementById('result');
        
        // 接收流式数据
        eventSource.onmessage = function (event) {
            resultDiv.innerHTML += event.data;
        };
        
        // 连接关闭
        eventSource.onclose = function () {
            console.log("流式响应结束");
        };
        
        // 异常处理
        eventSource.onerror = function () {
            console.error("流式连接异常");
            eventSource.close();
        };
    </script>
</body>
</html>

启动项目并正常运行后,打开前端测试页面,即可看到文字逐字渲染显示的效果,以此验证流式对话功能能够正常运行。

生产环境避坑与优化

常见问题解决

  1. 流式连接超时:大模型生成内容的耗时通常较长,可适度调大配置文件中的超时时长,避免请求未处理完成就被强制断开。
  2. 响应解析报错:严格剔除SSE协议data:前缀以及[DONE]流结束标记,过滤多余响应内容,避免出现JSON解析失败的问题。
  3. 并发限流问题:DeepSeek官方API设有调用频次限制,生产环境需要添加接口限流措施,防止API密钥被官方封禁。
  4. 连接异常断开:借助Reactor框架的retryWhen工具,实现延时重试机制,解决网络不稳定引发的连接中断问题。

性能优化建议

总结

WebFlux集成DeepSeek大模型流式接入方案,全程采用Spring官方原生组件,无冗余第三方依赖,代码结构简洁且后期维护便捷,这套方案既解决了传统同步调用的性能短板,又能实现大模型响应数据的实时推送,优化用户使用体验,适用于AI智能对话、智能问答、内容生成等各类业务场景。

开发者替换自身的DeepSeek API密钥后,可直接复用本文代码完成功能开发,后续也能在此基础上,新增对话上下文存储、多轮对话交互、请求参数动态配置等拓展功能。

针对方案落地过程中出现的各类问题,可通过修改配置项、完善异常处理、调整并发参数等方式逐一解决,进一步提升服务运行的稳定性。