79563351

Date: 2025-04-09 02:32:07
Score: 4.5
Natty:
Report link

Finally I find my way to make it happen, so I'm here to put my solution, is case someone is facing the same problem as me.

Because we need to send some special headers to Azure service when create the websoket connection, so we need a proxy server (native Websocket in browser cannot send coustom headers).

server.ts:

import http from "http";
import * as WebSocket from "ws";
import crypto from "crypto";
import fs from "fs";
import path from "path";

// Azure tts
const URL =
  "wss://<your_azure_service_origin>.tts.speech.microsoft.com/cognitiveservices/websocket/v2";
const KEY = "your_azure_service_key";

const server = http.createServer((req, res) => {
  res.end("Server is Running");
});

server.on("upgrade", (req, socket, head) => {
  const remote = new WebSocket.WebSocket(URL, {
    headers: {
      "ocp-apim-subscription-key": KEY,
      "x-connectionid": crypto.randomUUID().replace(/-/g, ""),
    },
  });
  remote.on("open", () => {
    console.log("remote open");

    const requestId = crypto.randomUUID().replace(/-/g, "");
    const now = new Date().toISOString();
    // send speech.config
    remote.send(
      [
        `X-Timestamp:${now}`,
        "Path:speech.config",
        "",
        `${JSON.stringify({})}`,
      ].join("\r\n"),
    );

    // send synthesis.context
    remote.send(
      [
        `X-Timestamp:${now}`,
        "Path:synthesis.context",
        `X-RequestId:${requestId}`,
        "",
        `${JSON.stringify({
          synthesis: {
            audio: {
              // outputFormat: "audio-16khz-32kbitrate-mono-mp3",
              outputFormat: "raw-16khz-16bit-mono-pcm",
              metadataOptions: {
                visemeEnabled: false,
                bookmarkEnabled: false,
                wordBoundaryEnabled: false,
                punctuationBoundaryEnabled: false,
                sentenceBoundaryEnabled: false,
                sessionEndEnabled: true,
              },
            },
            language: { autoDetection: false },
            input: {
              bidirectionalStreamingMode: true,
              voiceName: "zh-CN-YunxiNeural",
              language: "",
            },
          },
        })}`,
      ].join("\r\n"),
    );

    const client = new WebSocket.WebSocketServer({ noServer: true });
    client.handleUpgrade(req, socket, head, (clientWs) => {
      clientWs.on("message", (data: Buffer) => {
        const json = JSON.parse(data.toString("utf8")) as {
          type: "data" | "end";
          data?: string;
        };
        console.log("Client:", json);
        remote.send(
          [
            `X-Timestamp:${new Date().toISOString()}`,
            `Path:text.${json.type === "data" ? "piece" : "end"}`,
            "Content-Type:text/plain",
            `X-RequestId:${requestId}`,
            "", // empty line
            json.data || "",
          ].join("\r\n"),
        );
      });

      const file = createWAVFile(`speech/${Date.now()}.wav`);
      remote.on("message", (data: Buffer, isBinary) => {
        // console.log("Remote, isBinary:", isBinary);
        const { headers, content } = parseChunk(data);
        console.log({ headers });
        if (isBinary) {
          if (headers.Path === "audio") {
            // why we need to skip the first byte
            const audioContent = content.subarray(1);
            if (audioContent.length) {
              file.write(audioContent);
              clientWs.send(audioContent);
            }
          }
        } else if (headers.Path === "turn.end") {
          file.end();
        }
      });

      clientWs.on("close", () => {
        console.log("client close");
        remote.close();
      });
      clientWs.on("error", (error) => {
        console.log("client error", error);
      });
    });
    remote.on("close", (code, reason) => {
      console.log("remote close", reason.toString());
    });
    remote.on("error", (error) => {
      console.log("remote error", error);
    });
  });
});

function parseChunk(buffer: Buffer) {
  const len = buffer.length;
  const headers: string[][] = [];
  // skip first bytes
  //? what means the first bytes?
  let i = 2;
  let temp: number[] = [];
  let curr: string[] = [];
  let contentPosition: number;
  for (; i < len; i++) {
    if (buffer[i] === 0x3a) {
      // :
      curr.push(Buffer.from(temp).toString());
      temp = [];
    } else if (buffer[i] === 0x0d && buffer[i + 1] === 0x0a) {
      // \r\n
      // maybe empty line
      if (temp.length) {
        curr.push(Buffer.from(temp).toString());
        temp = [];
        headers.push(curr);
        curr = [];
      }
      i += 1; // skip \n
      contentPosition = i;
      if (headers.at(-1)?.[0] === "Path") {
        // if we get `Path`
        break;
      }
    } else {
      temp.push(buffer[i]);
    }
  }

  const obj: Record<string, string> = {};
  for (const [key, value] of headers) {
    obj[key] = value;
  }

  const content = buffer.subarray(contentPosition!);

  return { headers: obj, content };
}

// for test
function createWAVFile(
  filename: string,
  sampleRate = 16000,
  bitDepth = 16,
  channels = 1,
) {
  let dataLength = 0;
  let data = Buffer.alloc(0);
  return {
    write(chunk: Buffer) {
      dataLength += chunk.length;
      data = Buffer.concat([data, chunk]);
    },
    end() {
      const byteRate = sampleRate * (bitDepth / 8) * channels;
      const blockAlign = (bitDepth / 8) * channels;

      // WAV head
      const buffer = Buffer.alloc(44);
      buffer.write("RIFF", 0); // ChunkID
      buffer.writeUInt32LE(36 + dataLength, 4); // ChunkSize
      buffer.write("WAVE", 8); // Format
      buffer.write("fmt ", 12); // Subchunk1ID
      buffer.writeUInt32LE(16, 16); // Subchunk1Size (16 for PCM)
      buffer.writeUInt16LE(1, 20); // AudioFormat (1 = PCM)
      buffer.writeUInt16LE(channels, 22); // Channels
      buffer.writeUInt32LE(sampleRate, 24); // SampleRate
      buffer.writeUInt32LE(byteRate, 28); // ByteRate
      buffer.writeUInt16LE(blockAlign, 32); // BlockAlign
      buffer.writeUInt16LE(bitDepth, 34); // BitsPerSample
      buffer.write("data", 36); // Subchunk2ID
      buffer.writeUInt32LE(dataLength, 40); // Subchunk2Size

      const stream = fs.createWriteStream(filename);
      stream.write(buffer);
      stream.write(data);
      stream.end();
      console.log(`write to file ${filename}`);
    },
  };
}

server.listen(8080);

player.ts:

type StreamingAudioPlayerOptions = {
  autoPlay: boolean;
};

export class StreamingAudioPlayer {
  private context = new AudioContext();
  private chunks: Blob[] = [];
  private decodeChunkIndex = 0;
  private buffers: AudioBuffer[] = [];
  private duration = 0;
  private decoding = false;
  private scheduleIndex = 0;
  private currentDuration = 0; // 粗略记录已播放时长,用于展示,不可用于播放控制
  private state: "play" | "stop" = "stop";
  private isPlaying = false; // 是否真的在播放
  // 跟踪下一个缓冲区的预定播放时间
  private nextScheduledTime = 0;
  // 跟踪已创建的音频源
  private activeSources: AudioBufferSourceNode[] = [];
  private sourceSchedule = new WeakMap<AudioBufferSourceNode, [number]>();
  private beginOffset = 0;
  private timer: number | null;

  constructor(private readonly options: StreamingAudioPlayerOptions) {}

  private async decodeAudioChunks() {
    if (this.decoding || this.chunks.length === 0) {
      return;
    }

    this.decoding = true;
    while (this.decodeChunkIndex < this.chunks.length) {
      const originBuffer =
        await this.chunks[this.decodeChunkIndex].arrayBuffer();

      // Step 1: 转成 Int16
      const int16 = new Int16Array(originBuffer);

      // Step 2: 转成 Float32
      const float32 = new Float32Array(int16.length);
      for (let i = 0; i < int16.length; i++) {
        float32[i] = int16[i] / 32768; // Normalize to [-1.0, 1.0]
      }

      // Step 3: 创建 AudioBuffer (单声道)
      const audioBuffer = this.context.createBuffer(
        1, // mono
        float32.length,
        16000, // sampleRate
      );

      audioBuffer.copyToChannel(float32, 0);
      this.buffers.push(audioBuffer);
      this.duration += audioBuffer.duration;
      console.log(
        `chunk ${this.decodeChunkIndex} decoded, total buffer duration: ${this.duration}`,
      );
      this.decodeChunkIndex++;

      if (this.state === "play" && !this.isPlaying) {
        console.log("ready to play");
        this._play();
      } else if (this.state === "stop" && this.options.autoPlay) {
        this.play();
      }
    }
    this.decoding = false;
  }

  async append(chunk: Blob) {
    this.chunks.push(chunk);
    if (!this.decoding) {
      this.decodeAudioChunks();
    }
  }

  private scheduleBuffers() {
    while (this.scheduleIndex < this.buffers.length) {
      if (this.nextScheduledTime - this.context.currentTime > 10) {
        // 缓冲控制在 10s 左右
        break;
      }
      const buffer = this.buffers[this.scheduleIndex];
      const source = this.context.createBufferSource();
      source.buffer = buffer;
      // 记录并更新预定时间
      const startTime = this.nextScheduledTime;
      this.nextScheduledTime += buffer.duration;

      source.connect(this.context.destination);
      if (this.beginOffset !== 0) {
        source.start(startTime, this.beginOffset);
        this.beginOffset = 0;
      } else {
        source.start(startTime);
      }
      this.sourceSchedule.set(source, [startTime]);
      console.log(`schedule chunk ${this.scheduleIndex}`);
      this.activeSources.push(source);
      const index = this.scheduleIndex;
      this.scheduleIndex++;

      // 监听播放结束来维护状态
      source.addEventListener("ended", () => {
        // 移除已结束的源
        this.activeSources = this.activeSources.filter((s) => s !== source);
        if (this.state !== "play") {
          return;
        }
        console.log(`chunk ${index} play finish`);
        if (this.scheduleIndex < this.buffers.length) {
          // 继续安排未播放的切片
          this.scheduleBuffers();
        } else if (this.activeSources.length === 0) {
          // 如果没有剩余的播放源,那么停止播放
          this._stop();
        }
      });
    }
  }

  private _play() {
    // 使用计时器粗略记录已播放时长
    // ?播放卡住了怎么办
    const updatePlayDuration = (timestamp1: number) => {
      return (timestamp2: number) => {
        this.currentDuration += timestamp2 - timestamp1;
        this.timer = requestAnimationFrame(updatePlayDuration(timestamp2));
      };
    };
    this.timer = requestAnimationFrame(updatePlayDuration(performance.now()));
    // 初始化播放时间为当前上下文时间
    this.nextScheduledTime = this.context.currentTime;
    this.isPlaying = true;
    this.scheduleBuffers();
  }

  private _stop() {
    if (this.state !== "play") {
      return;
    }

    // 停止所有活跃的音频源
    this.activeSources.forEach((source, index) => {
      if (index === 0) {
        // current playing source
        const offset =
          this.context.currentTime - this.sourceSchedule.get(source)![0];
        console.log("offset:", offset);
      }
      source.stop();
    });

    cancelAnimationFrame(this.timer!);
    this.timer = null;

    this.activeSources = [];
    // 不确定是否加载了全部的音频切片
    this.state = "stop";
    this.isPlaying = false;
    console.log(`played duration: ${this.currentDuration}`);
  }

  resume() {
    // 恢复播放应该依据已播放时间
    // 因为已播放时间可以通过时间轴(暂未实现)调整
    this.scheduleIndex = 0;
    let d = 0;
    for (; this.scheduleIndex < this.buffers.length; this.scheduleIndex++) {
      const buffer = this.buffers[this.scheduleIndex];
      if (d + buffer.duration * 1000 > this.currentDuration) {
        break;
      }
      d += buffer.duration * 1000;
    }
    this.state = "play";
    this.beginOffset = (this.currentDuration - d) / 1000;
    console.log("resume offset", this.beginOffset);
    this._play();
  }

  play() {
    if (this.state === "play") {
      return;
    }
    this.state = "play";
    this.duration = this.buffers.reduce((total, buffer) => {
      return total + buffer.duration;
    }, 0);
    if (this.duration === 0) {
      console.warn("waiting buffer");
      return;
    }

    this.currentDuration = 0;
    this.scheduleIndex = 0;
    console.log(this);
    this._play();
  }

  pause() {
    this._stop();
  }
}

index.js:

// something like:
const player = new StreamingAudioPlayer({ autoPlay: true });
const ws = new Websocket("xxx");
ws.send('{"type":"data","data":"hello"}');
ws.send('{"type":"data","data":" world"}');
ws.send('{"type":"end"}');
ws.addEventListener("message", (e) => {
  player.append(e.data as Blob);
});

The code is for reference only. If anyone has any better suggestions, please feel free to share your thoughts.

Reasons:
  • RegEx Blacklisted phrase (2): any better suggestions
  • Long answer (-1):
  • Has code block (-0.5):
  • Me too answer (2.5): facing the same problem
  • Self-answer (0.5):
  • Low reputation (1):
Posted by: xiaoyu