0136. Stream 的队列策略详解
- 1. 🎯 本节内容
- 2. 🫧 评价
- 3. 🤔 CountQueuingStrategy 和 ByteLengthQueuingStrategy 的主要区别是什么 ?
- 4. 🤔 如何为不同类型的数据选择合适的队列策略 ?
- 5. 🤔 自定义队列策略需要实现哪些方法 ?
- 6. 🤔 队列策略如何影响内存占用和性能 ?
- 7. 💻 demos.1 - 对比两种内置队列策略的行为差异
- 8. 💻 demos.2 - 实现一个基于优先级的自定义队列策略
- 9. 🔗 引用
1. 🎯 本节内容
- QueuingStrategy 接口定义
- CountQueuingStrategy 的使用场景
- ByteLengthQueuingStrategy 的使用场景
- size() 函数的作用
- highWaterMark 的配置方式
- 自定义队列策略的实现
2. 🫧 评价
队列策略是 Web Streams 流量控制的基础,通过 size() 函数和 highWaterMark 决定队列容量计算方式。CountQueuingStrategy 按块计数,适合固定大小数据;ByteLengthQueuingStrategy 按字节计数,适合二进制流。理解两者差异和自定义策略的实现方法,能够精准控制内存使用,优化不同场景下的流处理性能。
3. 🤔 CountQueuingStrategy 和 ByteLengthQueuingStrategy 的主要区别是什么 ?
CountQueuingStrategy 将每个块的大小计为 1,ByteLengthQueuingStrategy 根据字节长度计算块大小。
3.1. CountQueuingStrategy 的计算方式
js
const countStrategy = new CountQueuingStrategy({ highWaterMark: 5 })
const stream = new ReadableStream(
{
start(controller) {
controller.enqueue('short') // 块大小 = 1
controller.enqueue('a very long string') // 块大小 = 1
controller.enqueue({ data: [1, 2, 3] }) // 块大小 = 1
console.log(controller.desiredSize) // 5 - 3 = 2
},
},
countStrategy
)
// 无论块内容是什么,每个块都计为 11
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
3.2. ByteLengthQueuingStrategy 的计算方式
js
const byteStrategy = new ByteLengthQueuingStrategy({ highWaterMark: 1024 })
const stream = new ReadableStream(
{
start(controller) {
controller.enqueue(new Uint8Array(100)) // 块大小 = 100
controller.enqueue(new Uint8Array(500)) // 块大小 = 500
console.log(controller.desiredSize) // 1024 - 600 = 424
},
},
byteStrategy
)
// 根据 ArrayBufferView 的 byteLength 计算1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
3.3. 对比表格
| 特性 | CountQueuingStrategy | ByteLengthQueuingStrategy |
|---|---|---|
| size() 返回值 | 始终返回 1 | 返回 chunk.byteLength |
| highWaterMark | 表示块数量 | 表示字节数 |
| 适用数据类型 | 任意类型 | ArrayBufferView(Uint8Array 等) |
| 内存预估准确性 | 低(忽略块实际大小) | 高(精确到字节) |
| 使用场景 | 对象流、固定大小消息 | 二进制流、文件流、网络流 |
3.4. 实际行为差异
js
// CountQueuingStrategy:只关心块数量
const countStream = new ReadableStream(
{
start(controller) {
controller.enqueue(new Uint8Array(1)) // 1B
controller.enqueue(new Uint8Array(1024 * 1024)) // 1MB
console.log(controller.desiredSize) // 3 - 2 = 1
// ⚠️ 实际占用约 1MB,但队列认为只用了 2 个位置
},
},
new CountQueuingStrategy({ highWaterMark: 5 })
)
// ByteLengthQueuingStrategy:按实际字节数
const byteStream = new ReadableStream(
{
start(controller) {
controller.enqueue(new Uint8Array(1)) // 1B
controller.enqueue(new Uint8Array(1024 * 1024)) // 1MB
console.log(controller.desiredSize) // 2048 - 1048577 = -1046529
// ✅ 准确反映内存占用,触发背压
},
},
new ByteLengthQueuingStrategy({ highWaterMark: 2048 })
)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
3.5. size() 函数的实现
js
// CountQueuingStrategy 的 size() 实现
const countStrategy = new CountQueuingStrategy({ highWaterMark: 10 })
console.log(countStrategy.size()) // 1
console.log(countStrategy.size('any data')) // 1
console.log(countStrategy.size({ huge: 'object' })) // 1
// ByteLengthQueuingStrategy 的 size() 实现
const byteStrategy = new ByteLengthQueuingStrategy({ highWaterMark: 1024 })
console.log(byteStrategy.size(new Uint8Array(100))) // 100
console.log(byteStrategy.size(new Uint16Array(50))) // 100(50 * 2 字节)
// console.log(byteStrategy.size('string')) // TypeError: 没有 byteLength 属性1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
CountQueuingStrategy 忽略数据大小,适合块大小均匀的场景;ByteLengthQueuingStrategy 精确计量,适合二进制数据流。
4. 🤔 如何为不同类型的数据选择合适的队列策略 ?
根据数据类型、大小分布和内存敏感度选择策略。
4.1. 数据类型决策树
js
// 场景 1:固定大小的对象流 → CountQueuingStrategy
const messageStream = new ReadableStream(
{
pull(controller) {
controller.enqueue({ type: 'update', timestamp: Date.now() })
},
},
new CountQueuingStrategy({ highWaterMark: 10 })
)
// 每条消息大小相近,用块数量控制即可
// 场景 2:二进制数据流 → ByteLengthQueuingStrategy
const fileStream = file.stream() // 浏览器 File API
// 内部使用 ByteLengthQueuingStrategy,按字节控制读取
// 场景 3:混合大小的数据 → 自定义策略或 ByteLengthQueuingStrategy
const mixedStream = new ReadableStream(
{
pull(controller) {
// 可能是小对象或大二进制数据
controller.enqueue(randomData())
},
},
new ByteLengthQueuingStrategy({ highWaterMark: 64 * 1024 })
)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
4.2. 选择矩阵
| 数据特征 | 推荐策略 | 原因 |
|---|---|---|
| 固定大小消息(JSON、对象) | CountQueuingStrategy | 计数简单,性能好 |
| 可变大小文本 | CountQueuingStrategy | 除非文本长度差异巨大 |
| 二进制数据(图片、视频) | ByteLengthQueuingStrategy | 内存占用差异大,需精确控制 |
| 网络数据包 | ByteLengthQueuingStrategy | 包大小不一,按字节更准确 |
| 数据库查询结果 | CountQueuingStrategy | 行大小通常相近 |
4.3. 实际场景示例
js
// WebSocket 消息流:消息大小相近
function createWebSocketStream(ws) {
return new ReadableStream(
{
start(controller) {
ws.onmessage = (e) => controller.enqueue(e.data)
},
},
new CountQueuingStrategy({ highWaterMark: 20 })
)
}
// 文件上传流:块大小固定
function createUploadStream(file) {
const chunkSize = 64 * 1024 // 64KB
let offset = 0
return new ReadableStream(
{
async pull(controller) {
const chunk = file.slice(offset, offset + chunkSize)
const buffer = await chunk.arrayBuffer()
controller.enqueue(new Uint8Array(buffer))
offset += chunkSize
if (offset >= file.size) controller.close()
},
},
new ByteLengthQueuingStrategy({ highWaterMark: 256 * 1024 }) // 256KB 缓冲
)
}
// HTTP 响应流:内容长度未知
async function fetchStream(url) {
const response = await fetch(url)
return response.body // 使用 ByteLengthQueuingStrategy
}
// 日志流:每条日志大小差异小
function createLogStream() {
return new ReadableStream(
{
pull(controller) {
const log = { level: 'info', message: getNextLog(), time: Date.now() }
controller.enqueue(log)
},
},
new CountQueuingStrategy({ highWaterMark: 50 })
)
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
4.4. 错误选择的后果
js
// ❌ 错误:二进制流使用 CountQueuingStrategy
const badStream = new ReadableStream(
{
pull(controller) {
// 块大小从 1KB 到 10MB 不等
controller.enqueue(new Uint8Array(Math.random() * 10 * 1024 * 1024))
},
},
new CountQueuingStrategy({ highWaterMark: 5 })
)
// 问题:可能缓冲 50MB 数据,但队列认为只用了 5 个位置
// ✅ 正确:使用字节策略
const goodStream = new ReadableStream(
{
pull(controller) {
controller.enqueue(new Uint8Array(Math.random() * 10 * 1024 * 1024))
},
},
new ByteLengthQueuingStrategy({ highWaterMark: 16 * 1024 * 1024 }) // 16MB
)
// 精确控制内存,超过 16MB 触发背压1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
4.5. 性能考量
js
// CountQueuingStrategy:计算开销小
const count = new CountQueuingStrategy({ highWaterMark: 100 })
count.size(data) // 始终返回 1,无需访问数据
// ByteLengthQueuingStrategy:需访问 byteLength
const byte = new ByteLengthQueuingStrategy({ highWaterMark: 1024 })
byte.size(chunk) // 读取 chunk.byteLength 属性
// 高频入队场景优先 CountQueuingStrategy(如果数据大小均匀)1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
选择策略的核心原则:数据大小均匀用 Count,大小不一用 ByteLength,追求极致性能可自定义。
5. 🤔 自定义队列策略需要实现哪些方法 ?
必须实现 size(chunk) 方法并提供 highWaterMark 属性。
5.1. QueuingStrategy 接口定义
js
// 接口规范
interface QueuingStrategy {
highWaterMark: number
size(chunk: any): number
}1
2
3
4
5
2
3
4
5
5.2. 最小实现
js
// 自定义策略:根据对象属性计数
class PropertyCountStrategy {
constructor(highWaterMark) {
this.highWaterMark = highWaterMark
}
size(chunk) {
// 返回对象的属性数量
return Object.keys(chunk).length
}
}
const stream = new ReadableStream(
{
start(controller) {
controller.enqueue({ a: 1 }) // 块大小 = 1
controller.enqueue({ a: 1, b: 2, c: 3 }) // 块大小 = 3
console.log(controller.desiredSize) // 10 - 4 = 6
},
},
new PropertyCountStrategy(10)
)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
5.3. 实战示例:字符串长度策略
js
class StringLengthStrategy {
constructor(options = {}) {
this.highWaterMark = options.highWaterMark || 1000
}
size(chunk) {
if (typeof chunk === 'string') {
return chunk.length
}
if (chunk && typeof chunk.toString === 'function') {
return chunk.toString().length
}
return 1 // 默认大小
}
}
const textStream = new ReadableStream(
{
start(controller) {
controller.enqueue('hi') // 大小 = 2
controller.enqueue('hello world') // 大小 = 11
console.log(controller.desiredSize) // 1000 - 13 = 987
},
},
new StringLengthStrategy({ highWaterMark: 1000 })
)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
5.4. 高级示例:权重策略
js
class WeightedStrategy {
constructor(highWaterMark, weightMap = {}) {
this.highWaterMark = highWaterMark
this.weightMap = weightMap
}
size(chunk) {
// 根据数据类型分配不同权重
if (chunk.priority === 'high') return this.weightMap.high || 10
if (chunk.priority === 'low') return this.weightMap.low || 1
return this.weightMap.normal || 5
}
}
const priorityStream = new ReadableStream(
{
start(controller) {
controller.enqueue({ priority: 'low', data: 'x' }) // 权重 1
controller.enqueue({ priority: 'high', data: 'y' }) // 权重 10
console.log(controller.desiredSize) // 100 - 11 = 89
},
},
new WeightedStrategy(100, { high: 10, normal: 5, low: 1 })
)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
5.5. 组合策略
js
// 综合考虑字节数和块数
class HybridStrategy {
constructor(highWaterMark) {
this.highWaterMark = highWaterMark
}
size(chunk) {
// 字节数 + 基础计数
if (chunk && chunk.byteLength !== undefined) {
return chunk.byteLength + 1 // 每块至少计 1
}
if (typeof chunk === 'string') {
return chunk.length * 2 + 1 // UTF-16 估算
}
return 1
}
}
const hybridStream = new ReadableStream(
{
start(controller) {
controller.enqueue(new Uint8Array(100)) // 100 + 1 = 101
controller.enqueue('hello') // 5 * 2 + 1 = 11
console.log(controller.desiredSize) // hwm - 112
},
},
new HybridStrategy(1024)
)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
5.6. 注意事项
js
class BadStrategy {
constructor(highWaterMark) {
this.highWaterMark = highWaterMark
}
size(chunk) {
// ❌ 错误:返回负数
return -1 // 导致 desiredSize 异常增长
// ❌ 错误:返回 0
return 0 // 队列永不满,无背压
// ❌ 错误:抛出异常
throw new Error('size error') // 导致流错误
// ✅ 正确:始终返回正数
return Math.max(1, calculateSize(chunk))
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
5.7. 使用工厂函数
js
function createCustomStrategy(config) {
return {
highWaterMark: config.hwm,
size(chunk) {
return config.sizeCalculator(chunk)
},
}
}
const strategy = createCustomStrategy({
hwm: 50,
sizeCalculator: (chunk) => (chunk.important ? 10 : 1),
})
const stream = new ReadableStream({ pull(controller) {} }, strategy)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
自定义策略的 size() 方法必须同步执行,返回非负数,且应尽量高效。
6. 🤔 队列策略如何影响内存占用和性能 ?
队列策略通过 highWaterMark 和 size() 控制缓冲区大小,直接影响内存使用和调度频率。
6.1. 内存占用计算
js
// 示例:CountQueuingStrategy 的内存陷阱
const stream1 = new ReadableStream(
{
pull(controller) {
// 每次入队 1MB 数据
controller.enqueue(new Uint8Array(1024 * 1024))
},
},
new CountQueuingStrategy({ highWaterMark: 10 })
)
// 理论队列大小:10 个块
// 实际内存占用:10MB(策略未考虑)
// 改用 ByteLengthQueuingStrategy
const stream2 = new ReadableStream(
{
pull(controller) {
controller.enqueue(new Uint8Array(1024 * 1024))
},
},
new ByteLengthQueuingStrategy({ highWaterMark: 10 * 1024 * 1024 })
)
// 理论队列大小:10MB
// 实际内存占用:≈10MB(准确)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
6.2. highWaterMark 对性能的影响
js
// 过小的 highWaterMark:频繁调度
const tooSmall = new ReadableStream(
{
async pull(controller) {
const data = await fetch('/api/data').then((r) => r.text())
controller.enqueue(data)
// pull() 会在每次消费后立即调用,无法批量化
},
},
new CountQueuingStrategy({ highWaterMark: 1 })
)
// 适中的 highWaterMark:批量处理
const balanced = new ReadableStream(
{
async pull(controller) {
const data = await fetch('/api/data').then((r) => r.text())
controller.enqueue(data)
// 队列可容纳多个块,减少网络请求频率
},
},
new CountQueuingStrategy({ highWaterMark: 10 })
)
// 过大的 highWaterMark:内存浪费
const tooBig = new ReadableStream(
{
pull(controller) {
controller.enqueue(new Uint8Array(1024 * 1024))
},
},
new ByteLengthQueuingStrategy({ highWaterMark: 1024 * 1024 * 1024 }) // 1GB
)
// 允许缓冲 1GB 数据,对大多数应用来说过度1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
6.3. 性能测试对比
js
async function benchmarkStrategy(strategy, name) {
const startTime = performance.now()
let pullCount = 0
const stream = new ReadableStream(
{
pull(controller) {
pullCount++
controller.enqueue(new Uint8Array(1024))
if (pullCount >= 1000) controller.close()
},
},
strategy
)
await stream.pipeTo(
new WritableStream({
write() {
// 空处理
},
})
)
const elapsed = performance.now() - startTime
console.log(`${name}: ${elapsed.toFixed(2)}ms, pull 调用 ${pullCount} 次`)
}
// 测试不同策略
await benchmarkStrategy(
new CountQueuingStrategy({ highWaterMark: 1 }),
'Count(1)'
)
await benchmarkStrategy(
new CountQueuingStrategy({ highWaterMark: 10 }),
'Count(10)'
)
await benchmarkStrategy(
new ByteLengthQueuingStrategy({ highWaterMark: 10 * 1024 }),
'Byte(10KB)'
)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
6.4. 内存与延迟的权衡
| highWaterMark | 内存占用 | 延迟 | 吞吐量 | 适用场景 |
|---|---|---|---|---|
| 很小(0-2) | 低 | 高 | 低 | 实时数据流 |
| 中等(5-20) | 中 | 中 | 中 | 通用场景 |
| 很大(50+) | 高 | 低 | 高 | 批量处理、离线 |
js
// 实时音频流:低延迟优先
const audioStream = new ReadableStream(
{
pull(controller) {
const audioChunk = captureAudio()
controller.enqueue(audioChunk)
},
},
new CountQueuingStrategy({ highWaterMark: 2 })
)
// 视频转码:吞吐量优先
const videoStream = new ReadableStream(
{
pull(controller) {
const frame = readVideoFrame()
controller.enqueue(frame)
},
},
new ByteLengthQueuingStrategy({ highWaterMark: 10 * 1024 * 1024 }) // 10MB
)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
6.5. 策略选择对 GC 压力的影响
js
// size() 函数的计算成本
class ExpensiveStrategy {
constructor(highWaterMark) {
this.highWaterMark = highWaterMark
}
size(chunk) {
// ❌ 错误:复杂计算导致性能下降
return JSON.stringify(chunk).length // 每次入队都序列化
}
}
class EfficientStrategy {
constructor(highWaterMark) {
this.highWaterMark = highWaterMark
}
size(chunk) {
// ✅ 正确:简单计算
if (chunk.size !== undefined) return chunk.size
return 1
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
6.6. 实际内存监控
js
async function monitorMemory(stream) {
const initial = performance.memory?.usedJSHeapSize || 0
const reader = stream.getReader()
while (true) {
const { done } = await reader.read()
if (done) break
}
const final = performance.memory?.usedJSHeapSize || 0
console.log(`内存增长: ${((final - initial) / 1024 / 1024).toFixed(2)} MB`)
}
// 对比不同策略的内存使用
const stream1 = createStreamWithStrategy(
new CountQueuingStrategy({ highWaterMark: 100 })
)
const stream2 = createStreamWithStrategy(
new ByteLengthQueuingStrategy({ highWaterMark: 1024 * 1024 })
)
await monitorMemory(stream1)
await monitorMemory(stream2)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
队列策略是性能调优的关键杠杆,需根据数据特征、内存限制和延迟要求综合权衡。
7. 💻 demos.1 - 对比两种内置队列策略的行为差异
html
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>对比内置队列策略</title>
<style>
body {
max-width: 1200px;
margin: 20px auto;
padding: 20px;
}
.demo {
margin: 20px 0;
padding: 15px;
border: 1px solid #ccc;
}
.log {
border: 1px solid #ddd;
padding: 10px;
margin-top: 10px;
max-height: 300px;
overflow-y: auto;
font-family: monospace;
font-size: 12px;
background: #f9f9f9;
}
.comparison {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 20px;
margin-top: 20px;
}
.side {
border: 1px solid #ddd;
padding: 15px;
}
.stats {
background: #e3f2fd;
padding: 10px;
margin-top: 10px;
border-left: 3px solid #2196f3;
}
button {
margin-right: 10px;
}
</style>
</head>
<body>
<h1>队列策略对比演示</h1>
<div class="demo">
<h3>Demo 1:相同数据下的 desiredSize 差异</h3>
<button onclick="demo1()">运行对比</button>
<button onclick="clearLogs()">清空</button>
<div class="comparison">
<div class="side">
<h4>CountQueuingStrategy</h4>
<div class="log" id="log1a"></div>
</div>
<div class="side">
<h4>ByteLengthQueuingStrategy</h4>
<div class="log" id="log1b"></div>
</div>
</div>
</div>
<div class="demo">
<h3>Demo 2:内存占用对比</h3>
<p>生成 100 个随机大小的数据块,观察不同策略下的背压触发时机</p>
<button onclick="demo2()">运行对比</button>
<div class="comparison">
<div class="side">
<h4>CountQueuingStrategy</h4>
<div class="log" id="log2a"></div>
<div class="stats" id="stats2a"></div>
</div>
<div class="side">
<h4>ByteLengthQueuingStrategy</h4>
<div class="log" id="log2b"></div>
<div class="stats" id="stats2b"></div>
</div>
</div>
</div>
<div class="demo">
<h3>Demo 3:性能测试</h3>
<p>测试不同策略下 pull() 调用次数和总耗时</p>
<label>
数据块数量:
<input type="number" id="chunkCount" value="1000" min="100" />
</label>
<button onclick="demo3()">运行测试</button>
<div class="log" id="log3"></div>
</div>
<script src="1.js"></script>
</body>
</html>1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
js
function log(id, message) {
const logEl = document.getElementById(id)
logEl.innerHTML += `${message}\n`
logEl.scrollTop = logEl.scrollHeight
}
function clearLogs() {
;['log1a', 'log1b', 'log2a', 'log2b', 'stats2a', 'stats2b', 'log3'].forEach(
(id) => {
const el = document.getElementById(id)
if (el) el.innerHTML = ''
}
)
}
// Demo 1:相同数据下的 desiredSize 差异
async function demo1() {
clearLogs()
// CountQueuingStrategy
const countStream = new ReadableStream(
{
start(controller) {
log('log1a', `初始 desiredSize: ${controller.desiredSize}`)
const chunks = [
new Uint8Array(10),
new Uint8Array(100),
new Uint8Array(1000),
new Uint8Array(10000),
]
chunks.forEach((chunk, i) => {
controller.enqueue(chunk)
log(
'log1a',
`入队 ${chunk.byteLength}B 后 desiredSize: ${controller.desiredSize}`
)
})
controller.close()
},
},
new CountQueuingStrategy({ highWaterMark: 5 })
)
// ByteLengthQueuingStrategy
const byteStream = new ReadableStream(
{
start(controller) {
log('log1b', `初始 desiredSize: ${controller.desiredSize}`)
const chunks = [
new Uint8Array(10),
new Uint8Array(100),
new Uint8Array(1000),
new Uint8Array(10000),
]
chunks.forEach((chunk, i) => {
controller.enqueue(chunk)
log(
'log1b',
`入队 ${chunk.byteLength}B 后 desiredSize: ${controller.desiredSize}`
)
})
controller.close()
},
},
new ByteLengthQueuingStrategy({ highWaterMark: 2048 })
)
// 消费流
await Promise.all([consumeStream(countStream), consumeStream(byteStream)])
log('log1a', '\n✅ 完成')
log('log1b', '\n✅ 完成')
}
async function consumeStream(stream) {
const reader = stream.getReader()
while (true) {
const { done } = await reader.read()
if (done) break
}
}
// Demo 2:内存占用对比
async function demo2() {
document.getElementById('log2a').innerHTML = ''
document.getElementById('log2b').innerHTML = ''
// CountQueuingStrategy
await runMemoryTest(
'log2a',
'stats2a',
new CountQueuingStrategy({ highWaterMark: 10 }),
'CountQueuingStrategy'
)
// ByteLengthQueuingStrategy
await runMemoryTest(
'log2b',
'stats2b',
new ByteLengthQueuingStrategy({ highWaterMark: 50 * 1024 }),
'ByteLengthQueuingStrategy'
)
}
async function runMemoryTest(logId, statsId, strategy, name) {
let totalBytes = 0
let pullCount = 0
let backpressureCount = 0
const chunks = []
// 生成随机大小的数据块
for (let i = 0; i < 100; i++) {
const size = Math.floor(Math.random() * 10000) + 100
chunks.push(new Uint8Array(size))
}
const stream = new ReadableStream(
{
pull(controller) {
pullCount++
if (controller.desiredSize <= 0) {
backpressureCount++
log(
logId,
`⚠️ pull #${pullCount}: 触发背压 (desiredSize=${controller.desiredSize})`
)
return
}
if (chunks.length === 0) {
controller.close()
return
}
const chunk = chunks.shift()
totalBytes += chunk.byteLength
controller.enqueue(chunk)
if (pullCount <= 20 || pullCount % 10 === 0) {
log(
logId,
`📤 pull #${pullCount}: 入队 ${chunk.byteLength}B, desiredSize=${controller.desiredSize}`
)
}
},
},
strategy
)
await stream.pipeTo(
new WritableStream({
async write() {
await new Promise((resolve) => setTimeout(resolve, 10))
},
})
)
document.getElementById(statsId).innerHTML = `
<strong>统计结果</strong><br>
总数据量: ${(totalBytes / 1024).toFixed(2)} KB<br>
pull 调用次数: ${pullCount}<br>
背压触发次数: ${backpressureCount}<br>
背压触发率: ${((backpressureCount / pullCount) * 100).toFixed(1)}%
`
log(logId, '\n✅ 完成')
}
// Demo 3:性能测试
async function demo3() {
document.getElementById('log3').innerHTML = ''
const count = parseInt(document.getElementById('chunkCount').value)
log('log3', `开始测试,生成 ${count} 个数据块...\n`)
// 测试 CountQueuingStrategy (hwm=1)
await performanceBenchmark(
'Count(hwm=1)',
new CountQueuingStrategy({ highWaterMark: 1 }),
count,
'log3'
)
// 测试 CountQueuingStrategy (hwm=10)
await performanceBenchmark(
'Count(hwm=10)',
new CountQueuingStrategy({ highWaterMark: 10 }),
count,
'log3'
)
// 测试 ByteLengthQueuingStrategy (hwm=10KB)
await performanceBenchmark(
'Byte(hwm=10KB)',
new ByteLengthQueuingStrategy({ highWaterMark: 10 * 1024 }),
count,
'log3'
)
// 测试 ByteLengthQueuingStrategy (hwm=100KB)
await performanceBenchmark(
'Byte(hwm=100KB)',
new ByteLengthQueuingStrategy({ highWaterMark: 100 * 1024 }),
count,
'log3'
)
log('log3', '\n✅ 所有测试完成')
}
async function performanceBenchmark(name, strategy, count, logId) {
let pullCount = 0
const startTime = performance.now()
const stream = new ReadableStream(
{
pull(controller) {
pullCount++
if (pullCount > count) {
controller.close()
return
}
controller.enqueue(new Uint8Array(1024)) // 固定 1KB
},
},
strategy
)
await stream.pipeTo(
new WritableStream({
write() {
// 空处理
},
})
)
const elapsed = performance.now() - startTime
const avgTime = elapsed / count
log(
logId,
`${name.padEnd(20)} | 耗时: ${elapsed.toFixed(
2
)}ms | pull 次数: ${pullCount} | 平均: ${avgTime.toFixed(3)}ms/块`
)
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
8. 💻 demos.2 - 实现一个基于优先级的自定义队列策略
html
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>自定义队列策略</title>
<style>
body {
max-width: 1000px;
margin: 20px auto;
padding: 20px;
}
.demo {
margin: 20px 0;
padding: 15px;
border: 1px solid #ccc;
}
.log {
border: 1px solid #ddd;
padding: 10px;
margin-top: 10px;
max-height: 400px;
overflow-y: auto;
font-family: monospace;
font-size: 12px;
background: #f9f9f9;
}
.controls {
margin: 10px 0;
}
.stats {
background: #fff3cd;
padding: 10px;
margin-top: 10px;
border-left: 3px solid #ffc107;
}
button {
margin-right: 10px;
}
</style>
</head>
<body>
<h1>自定义队列策略示例</h1>
<div class="demo">
<h3>Demo 1:优先级队列策略</h3>
<p>高优先级消息占用更多队列空间,确保低优先级不会挤占资源</p>
<button onclick="demo1()">运行</button>
<button onclick="clearLog('log1')">清空</button>
<div class="log" id="log1"></div>
<div class="stats" id="stats1"></div>
</div>
<div class="demo">
<h3>Demo 2:字符串长度策略</h3>
<p>根据字符串长度控制队列,适合文本流处理</p>
<button onclick="demo2()">运行</button>
<button onclick="clearLog('log2')">清空</button>
<div class="log" id="log2"></div>
<div class="stats" id="stats2"></div>
</div>
<div class="demo">
<h3>Demo 3:混合策略</h3>
<p>智能识别数据类型,对不同类型应用不同的大小计算方式</p>
<button onclick="demo3()">运行</button>
<button onclick="clearLog('log3')">清空</button>
<div class="log" id="log3"></div>
<div class="stats" id="stats3"></div>
</div>
<script src="1.js"></script>
</body>
</html>1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
js
function log(id, message) {
const logEl = document.getElementById(id)
logEl.innerHTML += `${message}\n`
logEl.scrollTop = logEl.scrollHeight
}
function clearLog(id) {
document.getElementById(id).innerHTML = ''
const statsId = id.replace('log', 'stats')
const statsEl = document.getElementById(statsId)
if (statsEl) statsEl.innerHTML = ''
}
// 自定义策略 1:优先级队列策略
class PriorityQueuingStrategy {
constructor(options = {}) {
this.highWaterMark = options.highWaterMark || 100
this.weights = options.weights || { high: 10, normal: 5, low: 1 }
}
size(chunk) {
if (!chunk || typeof chunk !== 'object') return 1
const priority = chunk.priority || 'normal'
const weight = this.weights[priority] || this.weights.normal
log('currentLog', ` 计算块大小: priority=${priority}, weight=${weight}`)
return weight
}
}
// Demo 1:优先级队列策略
async function demo1() {
clearLog('log1')
window.currentLog = 'log1' // 供 PriorityQueuingStrategy 使用
const messages = [
{ priority: 'low', content: 'Background task 1' },
{ priority: 'high', content: 'Critical alert!' },
{ priority: 'normal', content: 'Regular update' },
{ priority: 'low', content: 'Background task 2' },
{ priority: 'high', content: 'Urgent action required' },
{ priority: 'normal', content: 'Info message' },
]
let queueStats = { high: 0, normal: 0, low: 0 }
let index = 0
const stream = new ReadableStream(
{
pull(controller) {
if (index >= messages.length) {
controller.close()
return
}
const msg = messages[index++]
queueStats[msg.priority]++
log('log1', `\n📥 入队消息 #${index}:`)
log('log1', ` 优先级: ${msg.priority}`)
log('log1', ` 内容: ${msg.content}`)
controller.enqueue(msg)
log('log1', ` 队列状态: desiredSize=${controller.desiredSize}`)
},
},
new PriorityQueuingStrategy({
highWaterMark: 50,
weights: { high: 10, normal: 5, low: 1 },
})
)
const reader = stream.getReader()
let processedCount = 0
while (true) {
const { done, value } = await reader.read()
if (done) break
processedCount++
log(
'log1',
`\n📤 处理消息 #${processedCount}: [${value.priority}] ${value.content}`
)
await new Promise((resolve) => setTimeout(resolve, 100))
}
document.getElementById('stats1').innerHTML = `
<strong>队列统计</strong><br>
高优先级消息: ${queueStats.high} 条(权重 10)<br>
普通优先级消息: ${queueStats.normal} 条(权重 5)<br>
低优先级消息: ${queueStats.low} 条(权重 1)<br>
总权重: ${queueStats.high * 10 + queueStats.normal * 5 + queueStats.low * 1}
`
log('log1', '\n✅ 完成')
}
// 自定义策略 2:字符串长度策略
class StringLengthStrategy {
constructor(options = {}) {
this.highWaterMark = options.highWaterMark || 1000
}
size(chunk) {
if (typeof chunk === 'string') {
return chunk.length
}
if (chunk && typeof chunk.toString === 'function') {
return chunk.toString().length
}
return 1
}
}
// Demo 2:字符串长度策略
async function demo2() {
clearLog('log2')
const texts = [
'Hi',
'Hello, World!',
'This is a medium length message for testing purposes.',
'Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.',
'Short',
'A very very very long text that should consume significant queue space because it has many characters.',
]
let totalLength = 0
let index = 0
const stream = new ReadableStream(
{
pull(controller) {
if (index >= texts.length) {
controller.close()
return
}
const text = texts[index++]
totalLength += text.length
log('log2', `📥 入队文本 #${index}:`)
log('log2', ` 长度: ${text.length} 字符`)
log(
'log2',
` 内容: ${text.substring(0, 50)}${text.length > 50 ? '...' : ''}`
)
log('log2', ` desiredSize: ${controller.desiredSize}\n`)
controller.enqueue(text)
},
},
new StringLengthStrategy({ highWaterMark: 200 })
)
const reader = stream.getReader()
let processedCount = 0
while (true) {
const { done, value } = await reader.read()
if (done) break
processedCount++
log('log2', `📖 读取文本 #${processedCount}: ${value.length} 字符`)
await new Promise((resolve) => setTimeout(resolve, 200))
}
document.getElementById('stats2').innerHTML = `
<strong>统计结果</strong><br>
总文本量: ${totalLength} 字符<br>
highWaterMark: 200 字符<br>
处理消息数: ${processedCount}
`
log('log2', '\n✅ 完成')
}
// 自定义策略 3:混合策略
class HybridQueuingStrategy {
constructor(options = {}) {
this.highWaterMark = options.highWaterMark || 1024
}
size(chunk) {
// 二进制数据:按字节数
if (chunk && chunk.byteLength !== undefined) {
return chunk.byteLength
}
// 字符串:按字符数 * 2(UTF-16 估算)
if (typeof chunk === 'string') {
return chunk.length * 2
}
// 对象:按属性数量 * 10
if (chunk && typeof chunk === 'object') {
return Object.keys(chunk).length * 10
}
// 其他:固定为 1
return 1
}
}
// Demo 3:混合策略
async function demo3() {
clearLog('log3')
const mixedData = [
new Uint8Array(100), // 二进制
'Hello, Stream!', // 字符串
{ id: 1, name: 'Alice', age: 30 }, // 对象
new Uint8Array(500),
'Short',
{ a: 1, b: 2, c: 3, d: 4, e: 5 },
new Uint8Array(50),
'A longer string for testing',
]
const stats = { binary: 0, string: 0, object: 0 }
let index = 0
const stream = new ReadableStream(
{
pull(controller) {
if (index >= mixedData.length) {
controller.close()
return
}
const data = mixedData[index++]
let type, size
if (data && data.byteLength !== undefined) {
type = 'binary'
size = data.byteLength
stats.binary++
} else if (typeof data === 'string') {
type = 'string'
size = data.length * 2
stats.string++
} else {
type = 'object'
size = Object.keys(data).length * 10
stats.object++
}
log('log3', `📥 入队 #${index}:`)
log('log3', ` 类型: ${type}`)
log('log3', ` 计算大小: ${size}`)
log('log3', ` desiredSize: ${controller.desiredSize}\n`)
controller.enqueue(data)
},
},
new HybridQueuingStrategy({ highWaterMark: 1000 })
)
const reader = stream.getReader()
let count = 0
while (true) {
const { done, value } = await reader.read()
if (done) break
count++
const type = value.byteLength
? 'binary'
: typeof value === 'string'
? 'string'
: 'object'
log('log3', `📖 读取 #${count}: ${type}`)
await new Promise((resolve) => setTimeout(resolve, 150))
}
document.getElementById('stats3').innerHTML = `
<strong>数据统计</strong><br>
二进制数据: ${stats.binary} 个(按 byteLength 计算)<br>
字符串数据: ${stats.string} 个(按 length * 2 计算)<br>
对象数据: ${stats.object} 个(按 keys * 10 计算)<br>
highWaterMark: 1000 字节
`
log('log3', '\n✅ 完成')
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288