0133. 背压机制(Backpressure)
- 1. 🎯 本节内容
- 2. 🫧 评价
- 3. 🤔 什么是背压机制,为什么流处理中需要它 ?
- 4. 🤔 desiredSize 的值如何计算,负值代表什么 ?
- 5. 🤔 highWaterMark 如何影响流的缓冲行为 ?
- 6. 🤔 背压信号如何在管道链中传播 ?
- 7. 🤔 如何在自定义流中正确响应背压 ?
- 8. 💻 demos.1 - 观察背压信号的触发时机
- 9. 💻 demos.2 - 实现一个支持背压的自定义流
- 10. 🔗 引用
1. 🎯 本节内容
- 背压机制的原理与必要性
- desiredSize 的计算公式
- highWaterMark 参数的作用
- 队列大小的动态变化
- 背压信号的传播路径
- 生产者与消费者的速率平衡
2. 🫧 评价
背压机制是 Web Streams 的核心设计之一,通过 desiredSize 和 highWaterMark 实现生产者与消费者的速率平衡。理解队列大小的计算公式、负值的含义,以及信号在管道链中的传播路径,是构建高性能流处理应用的关键。实际开发中需重点关注慢速生产者的暂停逻辑,避免内存堆积导致的性能问题。
3. 🤔 什么是背压机制,为什么流处理中需要它 ?
背压机制(Backpressure)是一种流量控制策略,用于防止快速生产者压垮慢速消费者,确保内存使用可控。
3.1. 背压机制的核心原理
当消费者处理速度慢于生产者时,未处理的数据会累积在队列中。背压机制通过信号通知生产者暂停或减速,避免内存溢出。
const stream = new ReadableStream({
start(controller) {
console.log('desiredSize:', controller.desiredSize) // 1(默认 highWaterMark)
},
pull(controller) {
// desiredSize > 0 时才会调用 pull()
controller.enqueue('chunk')
console.log('desiredSize:', controller.desiredSize) // 队列增加后减小
},
})2
3
4
5
6
7
8
9
10
3.2. 为什么流处理需要背压
// ❌ 无背压控制:快速生产者持续入队
const uncontrolled = new ReadableStream({
start(controller) {
for (let i = 0; i < 1000000; i++) {
controller.enqueue(new Uint8Array(1024)) // 持续入队 1MB 数据
}
},
})
// 结果:1GB 数据瞬间进入队列,内存爆炸
// ✅ 有背压控制:根据 desiredSize 调整节奏
const controlled = new ReadableStream({
async pull(controller) {
if (controller.desiredSize > 0) {
const data = await fetchData()
controller.enqueue(data)
}
// desiredSize <= 0 时停止调用 pull()
},
})2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
3.3. 背压的三个关键要素
| 要素 | 说明 | 作用 |
|---|---|---|
| desiredSize | 期望队列剩余容量(可为负) | 生产者决策依据 |
| highWaterMark | 队列容量上限(默认 1) | 背压触发阈值 |
| 队列策略 | 计算每个块大小的规则(默认计数为 1) | 控制入队速率 |
const stream = new ReadableStream(
{
pull(controller) {
// desiredSize 告诉你还能安全入队多少
console.log('剩余容量:', controller.desiredSize)
},
},
new CountQueuingStrategy({ highWaterMark: 5 })
)
// highWaterMark: 5 表示队列最多容纳 5 个块2
3
4
5
6
7
8
9
10
背压机制的本质是用信号(desiredSize)替代阻塞,让异步流保持响应性的同时实现流量控制。
4. 🤔 desiredSize 的值如何计算,负值代表什么 ?
desiredSize 等于 highWaterMark 减去当前队列大小,负值表示队列已超过容量限制。
4.1. 计算公式
desiredSize = highWaterMark - queueTotalSize其中 queueTotalSize 是队列中所有块的大小总和,由队列策略的 size() 函数计算。
const stream = new ReadableStream(
{
start(controller) {
console.log(controller.desiredSize) // 3(队列为空)
controller.enqueue('a') // 块大小为 1
console.log(controller.desiredSize) // 2
controller.enqueue('b')
controller.enqueue('c')
console.log(controller.desiredSize) // 0(达到上限)
controller.enqueue('d')
console.log(controller.desiredSize) // -1(超出上限)
},
},
new CountQueuingStrategy({ highWaterMark: 3 })
)2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
4.2. 负值的含义
desiredSize 为负表示队列过载,生产者应立即停止入队。
const stream = new ReadableStream(
{
async pull(controller) {
// ⚠️ 即使 desiredSize 为负,pull() 仍会被调用
console.log('desiredSize:', controller.desiredSize)
if (controller.desiredSize <= 0) {
console.log('队列已满,跳过本次入队')
return // 主动停止生产
}
controller.enqueue(await fetchData())
},
},
new CountQueuingStrategy({ highWaterMark: 1 })
)2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
4.3. 不同队列策略下的计算差异
// CountQueuingStrategy:每个块大小为 1
new ReadableStream(
{
start(controller) {
controller.enqueue('any data')
console.log(controller.desiredSize) // highWaterMark - 1
},
},
new CountQueuingStrategy({ highWaterMark: 10 })
)
// ByteLengthQueuingStrategy:块大小为字节数
new ReadableStream(
{
start(controller) {
controller.enqueue(new Uint8Array(1024)) // 1KB
console.log(controller.desiredSize) // 16384 - 1024 = 15360
},
},
new ByteLengthQueuingStrategy({ highWaterMark: 16 * 1024 })
)2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
4.4. 动态变化示例
const chunks = []
const stream = new ReadableStream(
{
pull(controller) {
const chunk = `chunk${chunks.length}`
controller.enqueue(chunk)
chunks.push({
chunk,
desiredSize: controller.desiredSize,
queueSize: 2 - controller.desiredSize, // 反推队列大小
})
if (chunks.length >= 5) {
controller.close()
}
},
},
new CountQueuingStrategy({ highWaterMark: 2 })
)
await stream.pipeTo(
new WritableStream({
write(chunk) {
console.table(chunks)
},
})
)
// 输出队列大小从 0 → 1 → 2 → 1 → 0 的变化过程2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
负值不会阻止 pull() 调用,生产者需主动检查 desiredSize 来决定是否入队。
5. 🤔 highWaterMark 如何影响流的缓冲行为 ?
highWaterMark 定义队列容量上限,影响背压触发时机和内存使用量。
5.1. highWaterMark 的作用机制
// highWaterMark = 1:严格限流
const strictStream = new ReadableStream(
{
pull(controller) {
console.log('pull 调用,desiredSize:', controller.desiredSize)
controller.enqueue(Math.random())
},
},
new CountQueuingStrategy({ highWaterMark: 1 })
)
// desiredSize > 0 时才调用 pull()
// 队列只能容纳 1 个块,消费后才会再次 pull()
// highWaterMark = 10:允许缓冲
const bufferedStream = new ReadableStream(
{
pull(controller) {
controller.enqueue(Math.random())
},
},
new CountQueuingStrategy({ highWaterMark: 10 })
)
// 队列可容纳 10 个块,减少 pull() 调用频率2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
5.2. 不同值的影响对比
| highWaterMark | 缓冲区大小 | pull() 频率 | 适用场景 |
|---|---|---|---|
| 0 | 无缓冲 | 每次消费 | 实时数据流 |
| 1 | 单块缓冲 | 消费后调用 | 内存敏感场景 |
| 10+ | 多块缓冲 | 队列空时调用 | 网络请求、批处理 |
// 观察不同 highWaterMark 下的调用次数
async function testHighWaterMark(hwm) {
let pullCount = 0
const stream = new ReadableStream(
{
pull(controller) {
pullCount++
controller.enqueue(pullCount)
if (pullCount >= 100) controller.close()
},
},
new CountQueuingStrategy({ highWaterMark: hwm })
)
await stream.pipeTo(
new WritableStream({
write() {
// 模拟慢速消费
},
})
)
return pullCount
}
await testHighWaterMark(1) // pull 调用约 100 次
await testHighWaterMark(10) // pull 调用约 10-20 次(批量填充队列)2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
5.3. 字节流中的特殊处理
// 字节流的 highWaterMark 单位是字节
const byteStream = new ReadableStream(
{
type: 'bytes',
pull(controller) {
// desiredSize 表示剩余字节容量
const buffer = new Uint8Array(controller.desiredSize)
controller.enqueue(buffer)
},
},
new ByteLengthQueuingStrategy({ highWaterMark: 64 * 1024 }) // 64KB
)2
3
4
5
6
7
8
9
10
11
12
5.4. 过大或过小的问题
// ⚠️ highWaterMark 过小:频繁切换导致性能下降
const tooSmall = new ReadableStream(
{
async pull(controller) {
const data = await fetch('/api').then((r) => r.text())
controller.enqueue(data)
},
},
new CountQueuingStrategy({ highWaterMark: 1 })
)
// 每次只缓冲 1 个块,网络请求无法批量化
// ⚠️ highWaterMark 过大:内存占用过高
const tooBig = new ReadableStream(
{
pull(controller) {
controller.enqueue(new Uint8Array(1024 * 1024)) // 1MB
},
},
new CountQueuingStrategy({ highWaterMark: 1000 })
)
// 队列可容纳 1000 个 1MB 块 = 1GB 内存2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
5.5. 实际场景选择建议
// 文件上传:较大 highWaterMark 提升吞吐量
const uploadStream = new ReadableStream(
{
async pull(controller) {
const chunk = await readFileChunk()
controller.enqueue(chunk)
},
},
new ByteLengthQueuingStrategy({ highWaterMark: 256 * 1024 }) // 256KB
)
// 实时日志:较小 highWaterMark 降低延迟
const logStream = new ReadableStream(
{
pull(controller) {
controller.enqueue(getLatestLog())
},
},
new CountQueuingStrategy({ highWaterMark: 3 })
)2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
highWaterMark 本质是在内存占用与调度效率之间的权衡,需根据数据块大小和消费速度调整。
6. 🤔 背压信号如何在管道链中传播 ?
背压信号从下游(消费者)向上游(生产者)逐级传播,通过 ReadableStream 的锁定机制自动协调。
6.1. 单向传播路径
// 管道链:ReadableStream → TransformStream → WritableStream
const source = new ReadableStream({
pull(controller) {
console.log('Source desiredSize:', controller.desiredSize)
controller.enqueue(Date.now())
},
})
const transform = new TransformStream({
transform(chunk, controller) {
console.log('Transform desiredSize:', controller.desiredSize)
controller.enqueue(chunk * 2)
},
})
const sink = new WritableStream({
write(chunk) {
// 慢速写入触发背压
return new Promise((resolve) => setTimeout(resolve, 1000))
},
})
await source.pipeThrough(transform).pipeTo(sink)
// 背压传播:sink 慢 → transform 队列满 → source 暂停 pull()2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
6.2. 传播机制原理
// 手动模拟背压传播
const reader = source.getReader()
const writer = sink.getWriter()
while (true) {
const { done, value } = await reader.read()
if (done) break
// 等待 writer 准备好(背压信号)
await writer.ready
await writer.write(value)
}
// writer.ready Promise 在队列满时挂起,阻塞 reader.read()2
3
4
5
6
7
8
9
10
11
12
13
6.3. 多级转换链
const source = new ReadableStream(
{
pull(controller) {
controller.enqueue('data')
},
},
new CountQueuingStrategy({ highWaterMark: 2 })
)
const transform1 = new TransformStream(
{
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase())
},
},
new CountQueuingStrategy({ highWaterMark: 3 })
)
const transform2 = new TransformStream(
{
transform(chunk, controller) {
controller.enqueue(`[${chunk}]`)
},
},
new CountQueuingStrategy({ highWaterMark: 1 }) // ⚠️ 最小缓冲
)
const sink = new WritableStream(
{
write(chunk) {
return new Promise((resolve) => setTimeout(resolve, 500))
},
},
new CountQueuingStrategy({ highWaterMark: 4 })
)
await source.pipeThrough(transform1).pipeThrough(transform2).pipeTo(sink)
// 背压由 transform2(highWaterMark: 1)触发
// 信号传播:transform2 满 → transform1 暂停 → source 暂停2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
6.4. 背压失效的情况
// ❌ 绕过管道机制:背压失效
const reader = source.getReader()
const writer = sink.getWriter()
// 忽略 writer.ready,持续写入
while (true) {
const { value, done } = await reader.read()
if (done) break
writer.write(value) // 未等待,队列可能爆满
}
// ✅ 正确做法:等待 ready
while (true) {
const { value, done } = await reader.read()
if (done) break
await writer.ready // 等待背压信号
await writer.write(value)
}2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
6.5. 可视化传播过程
function createInstrumentedStream(name, hwm) {
return new TransformStream(
{
transform(chunk, controller) {
console.log(`[${name}] 队列大小: ${hwm - controller.desiredSize}`)
controller.enqueue(chunk)
},
},
new CountQueuingStrategy({ highWaterMark: hwm })
)
}
const pipeline = source
.pipeThrough(createInstrumentedStream('Stage1', 5))
.pipeThrough(createInstrumentedStream('Stage2', 3))
.pipeThrough(createInstrumentedStream('Stage3', 1))
.pipeTo(
new WritableStream({
write() {
return new Promise((resolve) => setTimeout(resolve, 100))
},
})
)
// 输出队列大小从下游到上游的变化2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
背压传播的核心是 pipeTo() 内部的协调逻辑,开发者无需手动处理信号传递。
7. 🤔 如何在自定义流中正确响应背压 ?
在 pull() 中检查 desiredSize,暂停生产;在 transform() 中控制入队时机。
7.1. ReadableStream 中的背压响应
// ✅ 正确:根据 desiredSize 调整生产速率
const stream = new ReadableStream({
async pull(controller) {
// desiredSize <= 0 时停止生产
if (controller.desiredSize <= 0) {
console.log('队列已满,暂停生产')
return
}
const data = await fetchData()
controller.enqueue(data)
},
})
// ❌ 错误:忽略 desiredSize,持续入队
const badStream = new ReadableStream({
async start(controller) {
while (true) {
controller.enqueue(await fetchData()) // 无限入队
}
},
})2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
7.2. 处理批量数据
// 数据库分页读取示例
function createDBStream(query) {
let offset = 0
const pageSize = 100
return new ReadableStream({
async pull(controller) {
// 只在有容量时拉取下一页
if (controller.desiredSize <= 0) {
return
}
const rows = await db.query(query, { offset, limit: pageSize })
if (rows.length === 0) {
controller.close()
return
}
// 逐行入队,每次检查容量
for (const row of rows) {
controller.enqueue(row)
if (controller.desiredSize <= 0) {
break // 队列满,下次 pull() 继续
}
}
offset += rows.length
},
})
}2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
7.3. TransformStream 中的背压响应
// ✅ 正确:transform() 中控制入队
const transform = new TransformStream({
transform(chunk, controller) {
const results = processChunk(chunk) // 可能产生多个输出
for (const result of results) {
if (controller.desiredSize <= 0) {
console.log('下游队列满,缓存剩余数据')
this.buffer = this.buffer || []
this.buffer.push(result)
} else {
controller.enqueue(result)
}
}
},
flush(controller) {
// 流结束时清空缓存
if (this.buffer) {
this.buffer.forEach((item) => controller.enqueue(item))
}
},
})2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
7.4. 避免阻塞 pull()
// ❌ 错误:在 pull() 中等待队列清空
const badStream = new ReadableStream({
async pull(controller) {
while (controller.desiredSize <= 0) {
await new Promise((resolve) => setTimeout(resolve, 100)) // 死锁!
}
controller.enqueue(data)
},
})
// pull() 返回前队列不会被消费,导致永久等待
// ✅ 正确:直接返回,等待下次调用
const goodStream = new ReadableStream({
pull(controller) {
if (controller.desiredSize <= 0) {
return // 队列满时直接返回
}
controller.enqueue(data)
},
})2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
7.5. 结合 async 迭代器
async function* generateData() {
for (let i = 0; i < 1000; i++) {
yield await fetchData(i)
}
}
const stream = new ReadableStream({
async start(controller) {
const generator = generateData()
// 封装生产逻辑
this.produce = async () => {
if (controller.desiredSize <= 0) return
const { value, done } = await generator.next()
if (done) {
controller.close()
} else {
controller.enqueue(value)
}
}
},
async pull(controller) {
await this.produce()
},
})2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
7.6. 实战示例:限速文件读取
function createThrottledFileStream(file, bytesPerSecond) {
let lastTime = Date.now()
return new ReadableStream(
{
async pull(controller) {
// 检查背压
if (controller.desiredSize <= 0) {
return
}
// 限速逻辑
const now = Date.now()
const elapsed = now - lastTime
const maxBytes = (bytesPerSecond * elapsed) / 1000
if (maxBytes < 1024) {
// 未达到读取间隔
return
}
const chunk = await file.read(
Math.min(maxBytes, controller.desiredSize)
)
controller.enqueue(chunk)
lastTime = now
},
},
new ByteLengthQueuingStrategy({ highWaterMark: 64 * 1024 })
)
}2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
正确响应背压的关键是:never block,always check desiredSize,let pull() return。
8. 💻 demos.1 - 观察背压信号的触发时机
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>观察背压信号的触发时机</title>
<style>
body {
max-width: 1000px;
margin: 20px auto;
padding: 20px;
}
.demo {
margin: 20px 0;
padding: 15px;
border: 1px solid #ccc;
}
.log {
border: 1px solid #ddd;
padding: 10px;
margin-top: 10px;
max-height: 300px;
overflow-y: auto;
font-family: monospace;
font-size: 12px;
}
.controls {
margin: 10px 0;
}
button {
margin-right: 10px;
}
</style>
</head>
<body>
<h1>背压信号观察</h1>
<div class="demo">
<h3>Demo 1:观察 desiredSize 的变化</h3>
<div class="controls">
<label>
highWaterMark:
<input type="number" id="hwm1" value="3" min="1" />
</label>
<button onclick="demo1()">运行</button>
<button onclick="clearLog('log1')">清空日志</button>
</div>
<div class="log" id="log1"></div>
</div>
<div class="demo">
<h3>Demo 2:慢速消费触发背压</h3>
<div class="controls">
<label>
消费延迟(ms):
<input type="number" id="delay" value="500" min="0" />
</label>
<label>
highWaterMark:
<input type="number" id="hwm2" value="2" min="1" />
</label>
<button onclick="demo2()">运行</button>
<button onclick="clearLog('log2')">清空日志</button>
</div>
<div class="log" id="log2"></div>
</div>
<div class="demo">
<h3>Demo 3:管道链中的背压传播</h3>
<div class="controls">
<button onclick="demo3()">运行</button>
<button onclick="clearLog('log3')">清空日志</button>
</div>
<div class="log" id="log3"></div>
</div>
<script src="1.js"></script>
</body>
</html>2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
function log(id, message) {
const logEl = document.getElementById(id)
const time = new Date().toLocaleTimeString()
logEl.innerHTML += `[${time}] ${message}\n`
logEl.scrollTop = logEl.scrollHeight
}
function clearLog(id) {
document.getElementById(id).innerHTML = ''
}
// Demo 1:观察 desiredSize 的变化
async function demo1() {
clearLog('log1')
const hwm = parseInt(document.getElementById('hwm1').value)
const stream = new ReadableStream(
{
start(controller) {
log('log1', `初始 desiredSize: ${controller.desiredSize}`)
for (let i = 1; i <= 5; i++) {
controller.enqueue(`chunk${i}`)
log(
'log1',
`入队 chunk${i} 后 desiredSize: ${controller.desiredSize}`
)
}
controller.close()
},
},
new CountQueuingStrategy({ highWaterMark: hwm })
)
const reader = stream.getReader()
log('log1', '开始消费...')
while (true) {
const { done, value } = await reader.read()
if (done) break
log('log1', `读取: ${value}`)
}
log('log1', '✅ 完成')
}
// Demo 2:慢速消费触发背压
async function demo2() {
clearLog('log2')
const delay = parseInt(document.getElementById('delay').value)
const hwm = parseInt(document.getElementById('hwm2').value)
let pullCount = 0
const stream = new ReadableStream(
{
pull(controller) {
pullCount++
log(
'log2',
`🔄 pull() 第 ${pullCount} 次调用,desiredSize: ${controller.desiredSize}`
)
if (pullCount > 10) {
controller.close()
return
}
controller.enqueue(`data${pullCount}`)
log('log2', ` 入队后 desiredSize: ${controller.desiredSize}`)
},
},
new CountQueuingStrategy({ highWaterMark: hwm })
)
const reader = stream.getReader()
while (true) {
const { done, value } = await reader.read()
if (done) break
log('log2', `📖 消费: ${value}`)
await new Promise((resolve) => setTimeout(resolve, delay))
}
log('log2', `✅ 完成,pull() 共调用 ${pullCount} 次`)
}
// Demo 3:管道链中的背压传播
async function demo3() {
clearLog('log3')
let sourcePulls = 0
let transform1Calls = 0
let transform2Calls = 0
const source = new ReadableStream(
{
pull(controller) {
sourcePulls++
log(
'log3',
`📤 Source pull() #${sourcePulls}, desiredSize: ${controller.desiredSize}`
)
if (sourcePulls > 15) {
controller.close()
return
}
controller.enqueue(sourcePulls)
},
},
new CountQueuingStrategy({ highWaterMark: 3 })
)
const transform1 = new TransformStream(
{
transform(chunk, controller) {
transform1Calls++
log(
'log3',
` 🔀 Transform1 #${transform1Calls}, desiredSize: ${controller.desiredSize}`
)
controller.enqueue(chunk * 10)
},
},
new CountQueuingStrategy({ highWaterMark: 2 })
)
const transform2 = new TransformStream(
{
transform(chunk, controller) {
transform2Calls++
log(
'log3',
` 🔀 Transform2 #${transform2Calls}, desiredSize: ${controller.desiredSize}`
)
controller.enqueue(`[${chunk}]`)
},
},
new CountQueuingStrategy({ highWaterMark: 1 })
)
const sink = new WritableStream({
async write(chunk) {
log('log3', ` 📥 Sink 写入: ${chunk}`)
await new Promise((resolve) => setTimeout(resolve, 300))
},
})
await source.pipeThrough(transform1).pipeThrough(transform2).pipeTo(sink)
log('log3', '---')
log(
'log3',
`统计:Source pulls=${sourcePulls}, T1=${transform1Calls}, T2=${transform2Calls}`
)
log('log3', '✅ 完成')
}2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
9. 💻 demos.2 - 实现一个支持背压的自定义流
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>支持背压的自定义流</title>
<style>
body {
max-width: 1000px;
margin: 20px auto;
padding: 20px;
}
.demo {
margin: 20px 0;
padding: 15px;
border: 1px solid #ccc;
}
.log {
border: 1px solid #ddd;
padding: 10px;
margin-top: 10px;
max-height: 400px;
overflow-y: auto;
font-family: monospace;
font-size: 12px;
background: #f9f9f9;
}
.controls {
margin: 10px 0;
}
button {
margin-right: 10px;
}
.stats {
background: #e8f4f8;
padding: 10px;
margin-top: 10px;
border-left: 3px solid #2196f3;
}
</style>
</head>
<body>
<h1>实现支持背压的自定义流</h1>
<div class="demo">
<h3>Demo 1:分页数据库查询流</h3>
<p>模拟从数据库分页读取数据,响应背压信号</p>
<div class="controls">
<label>
每页大小:
<input type="number" id="pageSize" value="5" min="1" />
</label>
<label>
消费延迟(ms):
<input type="number" id="delay1" value="200" min="0" />
</label>
<button onclick="demo1()">运行</button>
<button onclick="clearLog('log1')">清空</button>
</div>
<div class="log" id="log1"></div>
<div class="stats" id="stats1"></div>
</div>
<div class="demo">
<h3>Demo 2:限速文件上传流</h3>
<p>模拟限速文件读取,同时响应下游背压</p>
<div class="controls">
<label>
限速(KB/s):
<input type="number" id="speed" value="50" min="1" />
</label>
<label>
文件大小(KB):
<input type="number" id="fileSize" value="200" min="1" />
</label>
<button onclick="demo2()">运行</button>
<button onclick="clearLog('log2')">清空</button>
</div>
<div class="log" id="log2"></div>
<div class="stats" id="stats2"></div>
</div>
<div class="demo">
<h3>Demo 3:批量处理转换流</h3>
<p>将输入数据批量处理,控制输出速率</p>
<div class="controls">
<label>
批量大小:
<input type="number" id="batchSize" value="3" min="1" />
</label>
<button onclick="demo3()">运行</button>
<button onclick="clearLog('log3')">清空</button>
</div>
<div class="log" id="log3"></div>
<div class="stats" id="stats3"></div>
</div>
<script src="1.js"></script>
</body>
</html>2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
function log(id, message) {
const logEl = document.getElementById(id)
const time = new Date().toLocaleTimeString()
logEl.innerHTML += `[${time}] ${message}\n`
logEl.scrollTop = logEl.scrollHeight
}
function clearLog(id) {
document.getElementById(id).innerHTML = ''
const statsId = id.replace('log', 'stats')
const statsEl = document.getElementById(statsId)
if (statsEl) statsEl.innerHTML = ''
}
// Demo 1:分页数据库查询流
async function demo1() {
clearLog('log1')
const pageSize = parseInt(document.getElementById('pageSize').value)
const delay = parseInt(document.getElementById('delay1').value)
// 模拟数据库
const mockDB = Array.from({ length: 50 }, (_, i) => ({
id: i + 1,
name: `User${i + 1}`,
}))
function createDBStream() {
let offset = 0
let pullCount = 0
let skippedPulls = 0
return new ReadableStream({
async pull(controller) {
pullCount++
// 响应背压
if (controller.desiredSize <= 0) {
skippedPulls++
log(
'log1',
`⚠️ pull #${pullCount}: 队列已满 (desiredSize=${controller.desiredSize}),跳过查询`
)
return
}
log(
'log1',
`📊 pull #${pullCount}: 查询 offset=${offset}, limit=${pageSize}`
)
// 模拟数据库查询
await new Promise((resolve) => setTimeout(resolve, 50))
const rows = mockDB.slice(offset, offset + pageSize)
if (rows.length === 0) {
log('log1', '✅ 数据库无更多数据,关闭流')
controller.close()
document.getElementById('stats1').innerHTML = `
<strong>统计</strong><br>
总 pull 调用: ${pullCount} 次<br>
跳过查询: ${skippedPulls} 次(背压生效)<br>
实际查询: ${pullCount - skippedPulls} 次
`
return
}
// 入队
for (const row of rows) {
controller.enqueue(row)
if (controller.desiredSize <= 0) {
log(
'log1',
` 队列已满,本批剩余 ${
rows.length - rows.indexOf(row) - 1
} 条留待下次`
)
offset += rows.indexOf(row) + 1
return
}
}
offset += rows.length
log(
'log1',
` 入队 ${rows.length} 条,desiredSize: ${controller.desiredSize}`
)
},
})
}
const stream = createDBStream()
const reader = stream.getReader()
let count = 0
while (true) {
const { done, value } = await reader.read()
if (done) break
count++
log('log1', `📖 消费 #${count}: ${JSON.stringify(value)}`)
await new Promise((resolve) => setTimeout(resolve, delay))
}
log('log1', `🎉 完成,共消费 ${count} 条数据`)
}
// Demo 2:限速文件上传流
async function demo2() {
clearLog('log2')
const speedKBps = parseInt(document.getElementById('speed').value)
const fileSizeKB = parseInt(document.getElementById('fileSize').value)
function createThrottledStream() {
const chunkSize = 1024 // 1KB
const totalChunks = fileSizeKB
let sentChunks = 0
let lastTime = Date.now()
let throttleSkips = 0
let backpressureSkips = 0
return new ReadableStream(
{
pull(controller) {
const now = Date.now()
const elapsed = now - lastTime
// 响应背压
if (controller.desiredSize <= 0) {
backpressureSkips++
log(
'log2',
`⚠️ 背压触发,跳过本次读取 (desiredSize=${controller.desiredSize})`
)
return
}
// 限速检查
const allowedBytes = (speedKBps * 1024 * elapsed) / 1000
if (allowedBytes < chunkSize) {
throttleSkips++
return
}
if (sentChunks >= totalChunks) {
log('log2', '✅ 文件读取完成')
controller.close()
document.getElementById('stats2').innerHTML = `
<strong>统计</strong><br>
发送块数: ${sentChunks} / ${totalChunks}<br>
限速跳过: ${throttleSkips} 次<br>
背压跳过: ${backpressureSkips} 次
`
return
}
const chunk = new Uint8Array(chunkSize)
controller.enqueue(chunk)
sentChunks++
lastTime = now
const progress = ((sentChunks / totalChunks) * 100).toFixed(1)
log(
'log2',
`📤 发送块 #${sentChunks} (${progress}%), desiredSize: ${controller.desiredSize}`
)
},
},
new ByteLengthQueuingStrategy({ highWaterMark: 16 * 1024 }) // 16KB 缓冲
)
}
const stream = createThrottledStream()
const startTime = Date.now()
await stream.pipeTo(
new WritableStream({
async write(chunk) {
// 模拟网络上传延迟
await new Promise((resolve) => setTimeout(resolve, 50))
},
})
)
const elapsed = ((Date.now() - startTime) / 1000).toFixed(2)
const actualSpeed = (fileSizeKB / elapsed).toFixed(2)
log('log2', `🎉 上传完成,耗时 ${elapsed}s,实际速度 ${actualSpeed} KB/s`)
}
// Demo 3:批量处理转换流
async function demo3() {
clearLog('log3')
const batchSize = parseInt(document.getElementById('batchSize').value)
class BatchTransform {
constructor(size) {
this.batchSize = size
this.buffer = []
this.batchCount = 0
this.skippedFlush = 0
}
transform(chunk, controller) {
this.buffer.push(chunk)
log(
'log3',
`📥 接收 ${chunk},缓冲区: ${this.buffer.length}/${this.batchSize}`
)
if (this.buffer.length >= this.batchSize) {
this.flushBatch(controller)
}
}
flush(controller) {
if (this.buffer.length > 0) {
log('log3', '🔚 流结束,刷新剩余缓冲')
this.flushBatch(controller)
}
document.getElementById('stats3').innerHTML = `
<strong>统计</strong><br>
输出批次: ${this.batchCount}<br>
延迟刷新: ${this.skippedFlush} 次(背压)
`
}
flushBatch(controller) {
// 检查背压
if (controller.desiredSize <= 0) {
this.skippedFlush++
log(
'log3',
`⚠️ 下游队列满 (desiredSize=${controller.desiredSize}),缓冲保留`
)
return
}
this.batchCount++
const batch = this.buffer.splice(0, this.batchSize)
const output = {
batch: this.batchCount,
items: batch,
sum: batch.reduce((a, b) => a + b, 0),
}
controller.enqueue(output)
log('log3', `📤 输出批次 #${this.batchCount}: ${JSON.stringify(output)}`)
}
}
const batchTransform = new BatchTransform(batchSize)
const transform = new TransformStream(batchTransform)
// 创建输入流
const source = new ReadableStream({
start(controller) {
for (let i = 1; i <= 20; i++) {
controller.enqueue(i)
}
controller.close()
},
})
// 创建慢速写入
const sink = new WritableStream({
async write(chunk) {
log('log3', ` 📝 处理批次: ${JSON.stringify(chunk)}`)
await new Promise((resolve) => setTimeout(resolve, 500))
},
})
await source.pipeThrough(transform).pipeTo(sink)
log('log3', '✅ 完成')
}2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277