From bba99ab49033854a3e5f4c132830f257cf49eca1 Mon Sep 17 00:00:00 2001 From: AN Long Date: Sat, 26 Jul 2025 18:44:13 +0900 Subject: [PATCH] =?UTF-8?q?Revert=20"=F0=9F=8F=97=EF=B8=8F=20using=20async?= =?UTF-8?q?=20client=20under=20the=20hood=20in=20sync=20client"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit d0674d41694c9e42fbd03d0701ba4d1e129d8861. --- memcache/async_memcache.py | 4 +- memcache/memcache.py | 107 ++++++++++++++++++++++++++++++------- 2 files changed, 89 insertions(+), 22 deletions(-) diff --git a/memcache/async_memcache.py b/memcache/async_memcache.py index 5c0dc2c..46bf10b 100644 --- a/memcache/async_memcache.py +++ b/memcache/async_memcache.py @@ -5,13 +5,11 @@ import hashring from .errors import MemcacheError +from .memcache import Addr from .meta_command import MetaCommand, MetaResult from .serialize import dump, load, DumpFunc, LoadFunc -Addr = Tuple[str, int] - - class AsyncConnection: def __init__( self, diff --git a/memcache/memcache.py b/memcache/memcache.py index 9239621..71dee76 100644 --- a/memcache/memcache.py +++ b/memcache/memcache.py @@ -1,4 +1,4 @@ -import asyncio +import socket import threading import queue from contextlib import contextmanager @@ -6,9 +6,12 @@ import hashring -from .async_memcache import Addr, AsyncConnection +from .errors import MemcacheError from .meta_command import MetaCommand, MetaResult -from .serialize import DumpFunc, LoadFunc, dump, load +from .serialize import dump, load, DumpFunc, LoadFunc + + +NEWLINE = b"\r\n" class Connection: @@ -21,35 +24,102 @@ def __init__( username: Optional[str] = None, password: Optional[str] = None, ): - self._loop = asyncio.new_event_loop() - self._async_connection = AsyncConnection( - addr, - load_func=load_func, - dump_func=dump_func, - username=username, - password=password, + self._addr = addr + self._load = load_func + self._dump = dump_func + self._username = username + self._password = password + self._connect() + + def _connect(self) -> None: + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.connect(self._addr) + self.stream = self.socket.makefile(mode="rwb") + self._auth() + + def _auth(self) -> None: + if self._username is None or self._password is None: + return + auth_data = b"%s %s" % ( + self._username.encode("utf-8"), + self._password.encode("utf-8"), ) + self.stream.write(b"set auth x 0 %d\r\n" % len(auth_data)) + self.stream.write(auth_data) + self.stream.write(b"\r\n") + self.stream.flush() + response = self.stream.readline() + if response != b"STORED\r\n": + raise MemcacheError(response.rstrip(NEWLINE)) + + def close(self) -> None: + self.stream.close() + self.socket.close() def flush_all(self) -> None: - return self._loop.run_until_complete(self._async_connection.flush_all()) + self.stream.write(b"flush_all\r\n") + self.stream.flush() + response = self.stream.readline() + if response != b"OK\r\n": + raise MemcacheError(response.rstrip(NEWLINE)) def execute_meta_command(self, command: MetaCommand) -> MetaResult: - return self._loop.run_until_complete( - self._async_connection.execute_meta_command(command) - ) + try: + return self._execute_meta_command(command) + except (IndexError, ConnectionResetError, BrokenPipeError): + # This happens when connection is closed by memcached. + self._connect() + return self._execute_meta_command(command) + + def _execute_meta_command(self, command: MetaCommand) -> MetaResult: + self.stream.write(command.dump_header()) + if command.value: + self.stream.write(command.value + b"\r\n") + self.stream.flush() + return self._receive_meta_result() + + def _receive_meta_result(self) -> MetaResult: + result = MetaResult.load_header(self.stream.readline()) + + if result.rc == b"VA": + if result.datalen is None: + raise MemcacheError("invalid response: missing datalen") + result.value = self.stream.read(result.datalen) + self.stream.read(2) # read the "\r\n" + + return result def set( self, key: Union[bytes, str], value: Any, expire: Optional[int] = None ) -> None: - return self._loop.run_until_complete( - self._async_connection.set(key, value, expire) + value, client_flags = self._dump(key, value) + + flags = [b"F%d" % client_flags] + if expire: + flags.append(b"T%d" % expire) + + command = MetaCommand( + cm=b"ms", key=key, datalen=len(value), flags=flags, value=value ) + self.execute_meta_command(command) def get(self, key: Union[bytes, str]) -> Optional[Any]: - return self._loop.run_until_complete(self._async_connection.get(key)) + command = MetaCommand(cm=b"mg", key=key, flags=[b"v", b"f"]) + result = self.execute_meta_command(command) + + if result.value is None: + return None + + client_flags = int(result.flags[0][1:]) + + return self._load(key, result.value, client_flags) def delete(self, key: Union[bytes, str]) -> None: - return self._loop.run_until_complete(self._async_connection.delete(key)) + command = MetaCommand(cm=b"md", key=key, flags=[], value=None) + self.execute_meta_command(command) + + +Addr = Tuple[str, int] class Pool: @@ -111,7 +181,6 @@ class Memcache: :param username: Memcached ASCII protocol authentication username. :param password: Memcached ASCII protocol authentication password. """ - def __init__( self, addr: Union[Addr, List[Addr], None] = None,