From 7fac27aa9da49331951dea44fc4602d28d1e857e Mon Sep 17 00:00:00 2001 From: AN Long Date: Wed, 9 Jul 2025 22:41:07 +0900 Subject: [PATCH] =?UTF-8?q?=F0=9F=8F=97=EF=B8=8F=20using=20async=20client?= =?UTF-8?q?=20under=20the=20hood=20in=20sync=20client?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- memcache/async_memcache.py | 4 +- memcache/memcache.py | 107 +++++++------------------------------ 2 files changed, 22 insertions(+), 89 deletions(-) diff --git a/memcache/async_memcache.py b/memcache/async_memcache.py index 46bf10b..5c0dc2c 100644 --- a/memcache/async_memcache.py +++ b/memcache/async_memcache.py @@ -5,11 +5,13 @@ import hashring from .errors import MemcacheError -from .memcache import Addr from .meta_command import MetaCommand, MetaResult from .serialize import dump, load, DumpFunc, LoadFunc +Addr = Tuple[str, int] + + class AsyncConnection: def __init__( self, diff --git a/memcache/memcache.py b/memcache/memcache.py index 71dee76..9239621 100644 --- a/memcache/memcache.py +++ b/memcache/memcache.py @@ -1,4 +1,4 @@ -import socket +import asyncio import threading import queue from contextlib import contextmanager @@ -6,12 +6,9 @@ import hashring -from .errors import MemcacheError +from .async_memcache import Addr, AsyncConnection from .meta_command import MetaCommand, MetaResult -from .serialize import dump, load, DumpFunc, LoadFunc - - -NEWLINE = b"\r\n" +from .serialize import DumpFunc, LoadFunc, dump, load class Connection: @@ -24,102 +21,35 @@ def __init__( username: Optional[str] = None, password: Optional[str] = None, ): - self._addr = addr - self._load = load_func - self._dump = dump_func - self._username = username - self._password = password - self._connect() - - def _connect(self) -> None: - self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.socket.connect(self._addr) - self.stream = self.socket.makefile(mode="rwb") - self._auth() - - def _auth(self) -> None: - if self._username is None or self._password is None: - return - auth_data = b"%s %s" % ( - self._username.encode("utf-8"), - self._password.encode("utf-8"), + self._loop = asyncio.new_event_loop() + self._async_connection = AsyncConnection( + addr, + load_func=load_func, + dump_func=dump_func, + username=username, + password=password, ) - self.stream.write(b"set auth x 0 %d\r\n" % len(auth_data)) - self.stream.write(auth_data) - self.stream.write(b"\r\n") - self.stream.flush() - response = self.stream.readline() - if response != b"STORED\r\n": - raise MemcacheError(response.rstrip(NEWLINE)) - - def close(self) -> None: - self.stream.close() - self.socket.close() def flush_all(self) -> None: - self.stream.write(b"flush_all\r\n") - self.stream.flush() - response = self.stream.readline() - if response != b"OK\r\n": - raise MemcacheError(response.rstrip(NEWLINE)) + return self._loop.run_until_complete(self._async_connection.flush_all()) def execute_meta_command(self, command: MetaCommand) -> MetaResult: - try: - return self._execute_meta_command(command) - except (IndexError, ConnectionResetError, BrokenPipeError): - # This happens when connection is closed by memcached. - self._connect() - return self._execute_meta_command(command) - - def _execute_meta_command(self, command: MetaCommand) -> MetaResult: - self.stream.write(command.dump_header()) - if command.value: - self.stream.write(command.value + b"\r\n") - self.stream.flush() - return self._receive_meta_result() - - def _receive_meta_result(self) -> MetaResult: - result = MetaResult.load_header(self.stream.readline()) - - if result.rc == b"VA": - if result.datalen is None: - raise MemcacheError("invalid response: missing datalen") - result.value = self.stream.read(result.datalen) - self.stream.read(2) # read the "\r\n" - - return result + return self._loop.run_until_complete( + self._async_connection.execute_meta_command(command) + ) def set( self, key: Union[bytes, str], value: Any, expire: Optional[int] = None ) -> None: - value, client_flags = self._dump(key, value) - - flags = [b"F%d" % client_flags] - if expire: - flags.append(b"T%d" % expire) - - command = MetaCommand( - cm=b"ms", key=key, datalen=len(value), flags=flags, value=value + return self._loop.run_until_complete( + self._async_connection.set(key, value, expire) ) - self.execute_meta_command(command) def get(self, key: Union[bytes, str]) -> Optional[Any]: - command = MetaCommand(cm=b"mg", key=key, flags=[b"v", b"f"]) - result = self.execute_meta_command(command) - - if result.value is None: - return None - - client_flags = int(result.flags[0][1:]) - - return self._load(key, result.value, client_flags) + return self._loop.run_until_complete(self._async_connection.get(key)) def delete(self, key: Union[bytes, str]) -> None: - command = MetaCommand(cm=b"md", key=key, flags=[], value=None) - self.execute_meta_command(command) - - -Addr = Tuple[str, int] + return self._loop.run_until_complete(self._async_connection.delete(key)) class Pool: @@ -181,6 +111,7 @@ class Memcache: :param username: Memcached ASCII protocol authentication username. :param password: Memcached ASCII protocol authentication password. """ + def __init__( self, addr: Union[Addr, List[Addr], None] = None,