๐ŸŽฏ

rust-async-pattern

๐ŸŽฏSkill

from huiali/rust-skills

VibeIndex|
What it does

Expertly manages complex Rust async patterns, solving Stream implementation, zero-copy, tokio::spawn lifecycle, and plugin system scheduling challenges.

๐Ÿ“ฆ

Part of

huiali/rust-skills(30 items)

rust-async-pattern

Installation

๐Ÿ“‹ No install commands found in docs. Showing default command. Check GitHub for actual instructions.
Quick InstallInstall with npx
npx skills add huiali/rust-skills --skill rust-async-pattern
3Installs
-
AddedFeb 4, 2026

Skill Details

SKILL.md

"้ซ˜็บงๅผ‚ๆญฅๆจกๅผไธ“ๅฎถใ€‚ๅค„็† Stream ๅฎž็Žฐ, ้›ถๆ‹ท่ด, tokio::spawn ็”Ÿๅ‘ฝๅ‘จๆœŸ, ๆ’ไปถ็ณป็ปŸ่ฐƒๅบฆ, tonic ๆตๅผๅ“ๅบ”็ญ‰้—ฎ้ข˜ใ€‚่งฆๅ‘่ฏ๏ผšasync, Stream, tokio::spawn, ้›ถๆ‹ท่ด, ๆ’ไปถ็ณป็ปŸ, tonic, ๆตๅผ, BorrowedMessage, ๅผ‚ๆญฅ่ฐƒๅบฆ"

Overview

# ้ซ˜็บงๅผ‚ๆญฅๆจกๅผ

ๆ ธๅฟƒ้—ฎ้ข˜

ๅผ‚ๆญฅไปฃ็ ็š„็”Ÿๅ‘ฝๅ‘จๆœŸๆ€Žไนˆ่ฟ™ไนˆ้šพ็ฎก๏ผŸ

async ่ฎฉ็”Ÿๅ‘ฝๅ‘จๆœŸ้—ฎ้ข˜ๆ›ดๅคๆ‚ใ€‚

---

Stream + ่‡ชๅผ•็”จ็ผ“ๅ†ฒๅŒบ

้—ฎ้ข˜ไปฃ็ 

```rust

// โŒ Stream ๅฎž็Žฐไธญ่ฟ”ๅ›žๅ€Ÿ็”จๅ†…้ƒจ็ผ“ๅ†ฒๅŒบ็š„ slice

pub struct SessionStream<'buf> {

buf: Vec,

cache: Vec>,

}

impl Stream for SessionStream<'buf> {

type Item = Result, Status>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> {

// โŒ ่ฟ”ๅ›ž็š„ CachedResponse<'buf> ็”Ÿๅ‘ฝๅ‘จๆœŸไพ่ต–ไบŽ self.buf

// ไฝ† Stream trait ็š„ Item ๅฟ…้กป่ƒฝๅœจไปปๆ„ๆ—ถๅˆป่ขซไฝฟ็”จ

}

}

```

้”™่ฏฏไฟกๆฏ

```

error[E0700]: hidden type for impl futures_core::Stream captures lifetime that does not appear in bounds

error[E0310]: the parameter type may not live long enough

```

ๅŽŸๅ› 

  • Stream ็š„ Item ๅฏไปฅ่ขซไปปๆ„ๆŒๆœ‰
  • 'buf ๅ’Œ self ็ป‘ๅฎšๅœจไธ€่ตท
  • ่ฟ”ๅ›ž็š„ Item ้€ƒ้€ธไบ† self ็š„็”Ÿๅ‘ฝๅ‘จๆœŸ

่งฃๅ†ณ๏ผšWorker + Channel ๆจกๅผ

```rust

// โœ… ๅ†…้ƒจ worker ๆŒๆœ‰็ผ“ๅ†ฒๅŒบ๏ผŒๅฏนๅค–ๅชๅ‘ owned snapshot

pub struct SessionWorker {

rx_events: Receiver,

tx_snapshots: Sender,

buf: Vec,

}

impl SessionWorker {

pub async fn run(&mut self) {

while let Some(event) = self.rx_events.recv().await {

let snapshot = self.process_event(event);

self.tx_snapshots.send(snapshot).await;

}

}

fn process_event(&mut self, event: Bytes) -> SnapshotResponse {

// ๅ†…้ƒจๅฏไปฅๅ€Ÿ็”จ self.buf

let start = self.buf.len();

self.buf.extend_from_slice(&event);

// ไฝ†ๅฏนๅค–ๅ‘็š„ๆ˜ฏ owned SnapshotResponse

SnapshotResponse {

id: self.next_id,

payload: Bytes::copy_from_slice(&self.buf[start..]),

}

}

}

// โœ… Stream ๅช่ฏป channel๏ผŒๅ‘็š„้ƒฝๆ˜ฏ owned

pub struct SessionStream {

rx_snapshots: Receiver,

}

impl Stream for SessionStream {

type Item = Result;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> {

// ่ฟ™้‡Œๅ‘็š„้ƒฝๆ˜ฏ SnapshotResponse (owned)๏ผŒๆฒก้—ฎ้ข˜

}

}

```

---

tokio::spawn + ้ž 'static ็”Ÿๅ‘ฝๅ‘จๆœŸ

้—ฎ้ข˜ไปฃ็ 

```rust

// โŒ tokio::spawn ่ฆๆฑ‚ 'static๏ผŒไฝ† BorrowedMessage<'a> ไธๆ˜ฏ

pub struct BorrowedMessage<'a> {

pub raw: &'a [u8],

pub meta: MessageMeta,

}

pub trait Plugin: Send + Sync {

fn handle<'a>(&'a self, msg: BorrowedMessage<'a>)

-> Pin> + Send + 'a>>;

}

fn dispatch_to_plugins(msg: BorrowedMessage<'a>) {

for p in &plugins {

let fut = p.handle(msg);

tokio::spawn(fut); // โŒ fut ไธๆ˜ฏ 'static

}

}

```

ๅŽŸๅ› 

  • tokio::spawn ไธ็Ÿฅ้“ไปปๅŠกไฝ•ๆ—ถๅฎŒๆˆ
  • ๅฆ‚ๆžœไปปๅŠกๆŒๆœ‰ 'a ๅผ•็”จ๏ผŒ'a ๅฏ่ƒฝๅทฒ่ฟ‡ๆœŸ

่งฃๅ†ณ๏ผšไบ‹ไปถๅพช็Žฏ + Actor ๆจกๅผ

```rust

// โœ… ไธ spawn๏ผŒๆฏไธชๆ’ไปถๆ˜ฏไธ€ไธช้•ฟๆœŸๅญ˜ๅœจ็š„ actor

struct PluginActor {

plugin: M,

queue: Receiver,

arena: MessageArena,

}

impl PluginActor {

pub async fn run(&mut self) {

while let Some(msg) = self.queue.recv().await {

// ๅœจ arena ๅŸŸๅ†…ๅค„็†ๆถˆๆฏ

self.arena.with_message(msg, |msg_ref| {

self.plugin.handle(msg_ref);

});

}

}

}

// โœ… ็”จ็ดขๅผ•ไปฃๆ›ฟ็›ดๆŽฅๅ€Ÿ็”จ

pub struct MessageRef {

index: usize,

generation: u64,

}

struct MessageArena {

buffers: Vec>,

}

impl MessageArena {

pub fn get(&self, ref: MessageRef) -> Option<&[u8]> {

// ้€š่ฟ‡็ดขๅผ•ๅฎ‰ๅ…จ่Žทๅ–

self.buffers.get(ref.index)?.get(ref.generation)

}

}

```

---

ๆ’ไปถ็ณป็ปŸ่ฐƒๅบฆๆจกๅผ

็บฆๆŸ

  1. ้›ถๆ‹ท่ด็ผ“ๅ†ฒๅŒบๅค็”จ
  2. ๆ’ไปถ็ƒญๆ’ๆ‹”
  3. ๅผ‚ๆญฅ handler
  4. ๅฏ้‡่ฏ•/ๅปถๅŽ ack

ๆœ€็ปˆๆžถๆž„

```

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”

โ”‚ Decode Layer โ”‚ ๆŒๆœ‰็ผ“ๅ†ฒๅŒบ

โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค

โ”‚ MessageArena โ”‚ ็ผ“ๅ†ฒๅŒบ็ฎก็†

โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค

โ”‚ Event Loop โ”‚ ๅไฝœๅผ่ฐƒๅบฆ

โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค

โ”‚ Plugin Actor โ”‚ ๆฏไธชๆ’ไปถไธ€ไธช

โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

โ”‚

โ†“ API ๅฑ‚ๅช็œ‹ๅˆฐ owned ๆ•ฐๆฎ

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”

โ”‚ GraphQL / gRPC โ”‚ 'static ่ฆๆฑ‚

โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

```

ๅ…ณ้”ฎ่ฎพ่ฎก

```rust

// 1. ็ผ“ๅ†ฒๅŒบ็ฎก็†ๅŸŸ

struct MessageArena {

buffers: Vec>,

free_list: Vec,

}

impl MessageArena {

// ๅˆ†้…ๆ—ถ่ฟ”ๅ›ž็ดขๅผ•๏ผŒไธๆ˜ฏๅผ•็”จ

fn alloc(&mut self, data: &[u8]) -> MessageRef {

let idx = self.buffers.len();

self.buffers.push(Arc::new(data.to_vec()));

MessageRef { index: idx, generation: 0 }

}

}

// 2. API ๅฑ‚ๅชๆšด้œฒ owned

pub trait Plugin: Send + Sync {

async fn handle(&self, msg: OwnedMessage); // owned

}

```

---

ๅธธ่ง้—ฎ้ข˜้€ŸๆŸฅ

| ้—ฎ้ข˜ | ๅŽŸๅ›  | ่งฃๅ†ณ |

|-----|------|-----|

| Stream ่ฟ”ๅ›žๅ€Ÿ็”จ | Item ็”Ÿๅ‘ฝๅ‘จๆœŸ้€ƒ้€ธ | Worker + Channel |

| tokio::spawn ้ž 'static | ไปปๅŠกๅฏ่ƒฝๆŒๆœ‰ไธดๆ—ถๅผ•็”จ | ไบ‹ไปถๅพช็Žฏๆจกๅผ |

| ๆ’ไปถ handler ็”Ÿๅ‘ฝๅ‘จๆœŸ | ๆ’ไปถๆŒๆœ‰ๆถˆๆฏ | Actor + ็ดขๅผ• |

| async-graphql + GAT | 'static ่ฆๆฑ‚ | owned DTO |

| tonic Stream ่‡ชๅผ•็”จ | ็ผ“ๅ†ฒๅŒบๅค็”จๅ†ฒ็ช | Snapshot ๆจกๅผ |

---

ไฝ•ๆ—ถ็”จ spawn๏ผŒไฝ•ๆ—ถ็”จ actor

| ๅœบๆ™ฏ | ๆ–นๆกˆ |

|-----|-----|

| ็‹ฌ็ซ‹ไปปๅŠก๏ผŒๅฏๅนถ่กŒ | tokio::spawn |

| ้œ€่ฆๅไฝœ่ฐƒๅบฆ | Event Loop |

| ๆ’ไปถ็ณป็ปŸ | Actor ๆจกๅผ |

| ้•ฟๆœŸ่ฟ่กŒ็š„็Šถๆ€ful | Actor |

| ็Ÿญๅ‘ฝไปปๅŠก | spawn |

| ้œ€่ฆ่ƒŒๅŽ‹ๆŽงๅˆถ | Channel + actor |

More from this repository10

๐ŸŽฏ
rust-skill๐ŸŽฏSkill

Provides expert Rust programming assistance, solving compilation errors, ownership, lifetimes, concurrency, and performance optimization challenges.

๐ŸŽฏ
rust-skill-index๐ŸŽฏSkill

Indexes and provides quick navigation for 35 Rust skills across core, advanced, and expert categories.

๐ŸŽฏ
rust-async๐ŸŽฏSkill

Handles advanced Rust async patterns like Stream processing, backpressure, select, cancellation, and concurrency management with Tokio.

๐ŸŽฏ
rust-error๐ŸŽฏSkill

Expertly handles Rust error scenarios by providing comprehensive guidance on Result, Option, error types, propagation, and panic strategies.

๐ŸŽฏ
rust-anti-pattern๐ŸŽฏSkill

Identifies and helps refactor Rust anti-patterns like unnecessary cloning, unwrapping, and inefficient iterations to improve code quality.

๐ŸŽฏ
rust-linear-type๐ŸŽฏSkill

Manages Rust linear type resources with precise ownership, ensuring single-use semantics and preventing resource leaks or double-free errors.

๐ŸŽฏ
rust-concurrency๐ŸŽฏSkill

Expertly handles Rust concurrency challenges by safely managing threads, async operations, and preventing race conditions and deadlocks.

๐ŸŽฏ
rust-ownership๐ŸŽฏSkill

Skill

๐ŸŽฏ
rust-middleware๐ŸŽฏSkill

Implements comprehensive Rust web middleware for request tracing, CORS configuration, rate limiting, and advanced middleware patterns.

๐ŸŽฏ
rust-mutability๐ŸŽฏSkill

Expertly manages Rust mutability challenges, resolving borrowing conflicts and providing safe interior mutability strategies across different contexts.