Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

impl Stream cannot be unpinned

I'm trying to get data using crates_io_api. I attempted to get data from a stream, but I can not get it to work.

AsyncClient::all_crates returns an impl Stream. How do I get data from it? It would be helpful if you provide code.

I checked out the async book but it didn't work. Thank you.

Here's my current code.

use crates_io_api::{AsyncClient, Error};
use futures::stream::StreamExt;

async fn get_all(query: Option<String>) -> Result<crates_io_api::Crate, Error> {
  // Instantiate the client.
  let client = AsyncClient::new(
    "test ([email protected])",
    std::time::Duration::from_millis(10000),
  )?;

  let stream = client.all_crates(query);

  // what should I do after?
  // ERROR: `impl Stream cannot be unpinned`
  while let Some(item) = stream.next().await {
      // ...
  }
}
like image 453
Pytan Avatar asked May 07 '21 13:05

Pytan


People also ask

How do I iterate a stream that is not pinned?

The into_stream () function returns a stream that is not pinned, we must explicitly pin it in order to iterate it. A Rust value is "pinned" when it can no longer be moved in memory. A key property of a pinned value is that pointers can be taken to the pinned data and the caller can be confident the pointer stays valid.

How does the unpin trait for t work?

Implementing the Unpin trait for T lifts the restrictions of pinning off the type, which then allows moving T out of Pin<P<T>> with functions such as mem::replace. Unpin has no consequence at all for non-pinned data. In particular, mem::replace happily moves !Unpin data (it works for any &mut T, not just when T: Unpin ).

How do I manually implement a stream?

Usually, when manually implementing a Stream, it is done by composing futures and other streams. As an example, let's build off of the Delay future we implemented in Async in depth. We will convert it to a stream that yields () three times at 10 ms intervals


Video Answer


1 Answers

This looks like a mistake on the side of crates_io_api. Getting the next element of a Stream requires that the Stream is Unpin:

pub fn next(&mut self) -> Next<'_, Self> where
    Self: Unpin, 

Because Next stores a reference to Self, you must guarantee that Self is not moved during the process, or risk pointer invalidation. This is what the Unpin marker trait represents. crates_io_api does not provide this guarantee (although they could, and should be), so you must make it yourself. To convert a !Unpin type to a Unpin type, you can pin it to a heap allocation:

use futures::stream::StreamExt;

let stream = client.all_crates(query).boxed();

// boxed simply calls Box::pin
while let Some(elem) = stream.next() { ... }

Or you can pin it to the stack with the pin_mut!/pin! macro:

let stream = client.all_crates(query);
futures::pin_mut!(stream);

while let Some(elem) = stream.next() { ... }

Alternatively, you could use a combinator that does not require Unpin such as for_each:

stream.for_each(|elem| ...)
like image 99
Ibraheem Ahmed Avatar answered Oct 24 '22 03:10

Ibraheem Ahmed