#asynchronous #rust #connection-pooling
Вопрос:
Я пытаюсь создать пул соединений для tiberius
ящика. Проблема в том , что при вызове query
метода of tiberius::Client
затем QueryStream
заимствует изменяемую ссылку на tiberius::Client
, а затем я не могу вернуться QueryStream
из метода query
of Pool
.
Как я могу это исправить?
type ClientMSSQL = tiberius::Clientlt;tokio_util::compat::Compatlt;tokio::net::TcpStreamgt;gt;; struct MSSQLConnection { client: Arclt;Mutexlt;ClientMSSQLgt;gt; } impl Deref for MSSQLConnection { type Target = Arclt;Mutexlt;ClientMSSQLgt;gt;; fn deref(amp;self) -gt; amp;Self::Target { self.client.borrow() } } impl DerefMut for MSSQLConnection { fn deref_mut(amp;mut self) -gt; amp;mut Self::Target { self.client.borrow_mut() } } type ArrayDequeuelt;Tgt; = Arclt;Mutexlt;VecDequelt;Tgt;gt;gt;; #[derive(Clone)] struct Pool(Arclt;SharedPoolgt;); struct SharedPool { conns: Mutexlt;VecDequelt;MSSQLConnectiongt;gt;, size: AtomicU32, buff_size: usize, config: ConnectionConfig } struct DetachedConnectionlt;'agt; { pool: Pool, conn: MSSQLConnection, stream: Optionlt;QueryStreamlt;'agt;gt; } impllt;'agt; Deref for DetachedConnectionlt;'agt; { type Target = QueryStreamlt;'agt;; fn deref(amp;self) -gt; amp;Self::Target { amp;self.stream.unwrap() } } impllt;'agt; DerefMut for DetachedConnectionlt;'agt; { fn deref_mut(amp;mut self) -gt; amp;mut Self::Target { amp;mut self.stream.unwrap() } } impl SharedPool { async fn connect(amp;self) { if self.size.load(Ordering::Acquire) lt; self.buff_size as u32 { self.conns .lock() .await .push_back({ let config: Config = (amp;self.config).into(); let tcp = TcpStream::connect(config.get_addr()).await.unwrap().compat_write(); MSSQLConnection { client: Arc::new(Mutex::new(Client::connect(config, tcp).await.unwrap())) } }); self.size.fetch_add(1, Ordering::Acquire); } } } impl Pool { fn new(buffer: usize, config: ConnectionConfig) -gt; Self { let shared = Arc::new( SharedPool { conns: Mutex::new(VecDeque::with_capacity(buffer)), size: AtomicU32::new(0), buff_size: buffer, config } ); let shared_cloned = shared.clone(); tokio::task::spawn(async move { loop { shared_cloned.connect().await; tokio::time::sleep(tokio::time::Duration::new(0, 100)).await; } }); Pool(shared) } async fn query(amp;self, q: amp;str, p: amp;[amp;dyn ToSql]) -gt; Resultlt;DetachedConnectionlt;'_gt;, Stringgt; { let mut conn = self.get_connection().await; conn.stream = Some(conn.conn.lock().await.query(q, p).await.unwrap()); Ok(conn) } async fn get_connection(amp;self) -gt; DetachedConnectionlt;'_gt; { loop { if let Some(connection) = self.0.conns.lock().await.pop_back() { self.0.size.fetch_sub(1, Ordering::Acquire); return DetachedConnection { pool: self.clone(), conn: connection, stream: None } } } } }