Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion memcache/async_memcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
import hashring

from .errors import MemcacheError
from .memcache import Addr
from .meta_command import MetaCommand, MetaResult
from .serialize import dump, load, DumpFunc, LoadFunc


Addr = Tuple[str, int]


class AsyncConnection:
def __init__(
self,
Expand Down
107 changes: 19 additions & 88 deletions memcache/memcache.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
import socket
import asyncio
import threading
import queue
from contextlib import contextmanager
from typing import Any, Callable, Iterator, List, Optional, Tuple, Union

import hashring

from .errors import MemcacheError
from .async_memcache import Addr, AsyncConnection
from .meta_command import MetaCommand, MetaResult
from .serialize import dump, load, DumpFunc, LoadFunc


NEWLINE = b"\r\n"
from .serialize import DumpFunc, LoadFunc, dump, load


class Connection:
Expand All @@ -24,102 +21,35 @@ def __init__(
username: Optional[str] = None,
password: Optional[str] = None,
):
self._addr = addr
self._load = load_func
self._dump = dump_func
self._username = username
self._password = password
self._connect()

def _connect(self) -> None:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect(self._addr)
self.stream = self.socket.makefile(mode="rwb")
self._auth()

def _auth(self) -> None:
if self._username is None or self._password is None:
return
auth_data = b"%s %s" % (
self._username.encode("utf-8"),
self._password.encode("utf-8"),
self._loop = asyncio.new_event_loop()
self._async_connection = AsyncConnection(
addr,
load_func=load_func,
dump_func=dump_func,
username=username,
password=password,
)
self.stream.write(b"set auth x 0 %d\r\n" % len(auth_data))
self.stream.write(auth_data)
self.stream.write(b"\r\n")
self.stream.flush()
response = self.stream.readline()
if response != b"STORED\r\n":
raise MemcacheError(response.rstrip(NEWLINE))

def close(self) -> None:
self.stream.close()
self.socket.close()

def flush_all(self) -> None:
self.stream.write(b"flush_all\r\n")
self.stream.flush()
response = self.stream.readline()
if response != b"OK\r\n":
raise MemcacheError(response.rstrip(NEWLINE))
return self._loop.run_until_complete(self._async_connection.flush_all())

def execute_meta_command(self, command: MetaCommand) -> MetaResult:
try:
return self._execute_meta_command(command)
except (IndexError, ConnectionResetError, BrokenPipeError):
# This happens when connection is closed by memcached.
self._connect()
return self._execute_meta_command(command)

def _execute_meta_command(self, command: MetaCommand) -> MetaResult:
self.stream.write(command.dump_header())
if command.value:
self.stream.write(command.value + b"\r\n")
self.stream.flush()
return self._receive_meta_result()

def _receive_meta_result(self) -> MetaResult:
result = MetaResult.load_header(self.stream.readline())

if result.rc == b"VA":
if result.datalen is None:
raise MemcacheError("invalid response: missing datalen")
result.value = self.stream.read(result.datalen)
self.stream.read(2) # read the "\r\n"

return result
return self._loop.run_until_complete(
self._async_connection.execute_meta_command(command)
)

def set(
self, key: Union[bytes, str], value: Any, expire: Optional[int] = None
) -> None:
value, client_flags = self._dump(key, value)

flags = [b"F%d" % client_flags]
if expire:
flags.append(b"T%d" % expire)

command = MetaCommand(
cm=b"ms", key=key, datalen=len(value), flags=flags, value=value
return self._loop.run_until_complete(
self._async_connection.set(key, value, expire)
)
self.execute_meta_command(command)

def get(self, key: Union[bytes, str]) -> Optional[Any]:
command = MetaCommand(cm=b"mg", key=key, flags=[b"v", b"f"])
result = self.execute_meta_command(command)

if result.value is None:
return None

client_flags = int(result.flags[0][1:])

return self._load(key, result.value, client_flags)
return self._loop.run_until_complete(self._async_connection.get(key))

def delete(self, key: Union[bytes, str]) -> None:
command = MetaCommand(cm=b"md", key=key, flags=[], value=None)
self.execute_meta_command(command)


Addr = Tuple[str, int]
return self._loop.run_until_complete(self._async_connection.delete(key))


class Pool:
Expand Down Expand Up @@ -181,6 +111,7 @@ class Memcache:
:param username: Memcached ASCII protocol authentication username.
:param password: Memcached ASCII protocol authentication password.
"""

def __init__(
self,
addr: Union[Addr, List[Addr], None] = None,
Expand Down
Loading