forked from jgrewe/fishbook
[dataserver/cmd] add first steps towards a data server
This commit is contained in:
parent
38b73303c0
commit
3a09e301c0
@ -0,0 +1,10 @@
|
||||
import argparse
|
||||
import json
|
||||
|
||||
from IPython import embed
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
embed()
|
0
fishbook/cmd/runserver.py
Normal file
0
fishbook/cmd/runserver.py
Normal file
0
fishbook/cmd/test_client.py
Normal file
0
fishbook/cmd/test_client.py
Normal file
86
fishbook/dataserver/client.py
Executable file
86
fishbook/dataserver/client.py
Executable file
@ -0,0 +1,86 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import sys
|
||||
import socket
|
||||
import selectors
|
||||
import traceback
|
||||
|
||||
import libclient
|
||||
|
||||
from IPython import embed
|
||||
sel = selectors.DefaultSelector()
|
||||
|
||||
|
||||
def create_request(action, value):
|
||||
if action == "search":
|
||||
return dict(
|
||||
type="text/json",
|
||||
encoding="utf-8",
|
||||
content=dict(action=action, value=value),
|
||||
)
|
||||
elif action == "data":
|
||||
print("send data request")
|
||||
return dict(
|
||||
type="binary/custom-client-binary-type",
|
||||
encoding="binary",
|
||||
content=dict(action=action, value=value),
|
||||
)
|
||||
else:
|
||||
return dict(
|
||||
type="binary/custom-client-binary-type",
|
||||
encoding="binary",
|
||||
content=bytes(action + value, encoding="utf-8"),
|
||||
)
|
||||
|
||||
|
||||
def start_connection(host, port, request):
|
||||
addr = (host, port)
|
||||
print("starting connection to", addr)
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.setblocking(False)
|
||||
sock.connect_ex(addr)
|
||||
events = selectors.EVENT_READ | selectors.EVENT_WRITE
|
||||
message = libclient.Message(sel, sock, addr, request)
|
||||
sel.register(sock, events, data=message)
|
||||
|
||||
|
||||
def send_request(host, port, request):
|
||||
start_connection(host, port, request)
|
||||
message = None
|
||||
try:
|
||||
while True:
|
||||
events = sel.select(timeout=1)
|
||||
for key, mask in events:
|
||||
message = key.data
|
||||
try:
|
||||
message.process_events(mask)
|
||||
except Exception:
|
||||
print(
|
||||
"main: error: exception for",
|
||||
f"{message.addr}:\n{traceback.format_exc()}",
|
||||
)
|
||||
message.close()
|
||||
# Check for a socket being monitored to continue.
|
||||
if not sel.get_map():
|
||||
break
|
||||
except KeyboardInterrupt:
|
||||
print("caught keyboard interrupt, exiting")
|
||||
finally:
|
||||
sel.close()
|
||||
return message.payload if message else "no response"
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) != 5:
|
||||
print("usage:", sys.argv[0], "<host> <port> <action> <value>")
|
||||
sys.exit(1)
|
||||
|
||||
host, port = sys.argv[1], int(sys.argv[2])
|
||||
action, value = sys.argv[3], sys.argv[4]
|
||||
request = create_request(action, value)
|
||||
|
||||
response = send_request(host, port, request)
|
||||
print(response.shape)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
194
fishbook/dataserver/libclient.py
Normal file
194
fishbook/dataserver/libclient.py
Normal file
@ -0,0 +1,194 @@
|
||||
import sys
|
||||
import selectors
|
||||
import json
|
||||
import io
|
||||
import struct
|
||||
import numpy as np
|
||||
|
||||
from IPython.terminal.embed import embed
|
||||
|
||||
class Message:
|
||||
def __init__(self, selector, sock, addr, request):
|
||||
self.selector = selector
|
||||
self.sock = sock
|
||||
self.addr = addr
|
||||
self.request = request
|
||||
self._recv_buffer = b""
|
||||
self._send_buffer = b""
|
||||
self._request_queued = False
|
||||
self._jsonheader_len = None
|
||||
self.jsonheader = None
|
||||
self.response = None
|
||||
self.payload = None
|
||||
|
||||
def _set_selector_events_mask(self, mode):
|
||||
"""Set selector to listen for events: mode is 'r', 'w', or 'rw'."""
|
||||
if mode == "r":
|
||||
events = selectors.EVENT_READ
|
||||
elif mode == "w":
|
||||
events = selectors.EVENT_WRITE
|
||||
elif mode == "rw":
|
||||
events = selectors.EVENT_READ | selectors.EVENT_WRITE
|
||||
else:
|
||||
raise ValueError(f"Invalid events mask mode {repr(mode)}.")
|
||||
self.selector.modify(self.sock, events, data=self)
|
||||
|
||||
def _read(self):
|
||||
try:
|
||||
# Should be ready to read
|
||||
data = self.sock.recv(4096)
|
||||
except BlockingIOError:
|
||||
# Resource temporarily unavailable (errno EWOULDBLOCK)
|
||||
pass
|
||||
else:
|
||||
if data:
|
||||
self._recv_buffer += data
|
||||
else:
|
||||
raise RuntimeError("Peer closed.")
|
||||
|
||||
def _write(self):
|
||||
if self._send_buffer:
|
||||
print("sending", repr(self._send_buffer), "to", self.addr)
|
||||
try:
|
||||
# Should be ready to write
|
||||
sent = self.sock.send(self._send_buffer)
|
||||
except BlockingIOError:
|
||||
# Resource temporarily unavailable (errno EWOULDBLOCK)
|
||||
pass
|
||||
else:
|
||||
self._send_buffer = self._send_buffer[sent:]
|
||||
|
||||
def _json_encode(self, obj, encoding):
|
||||
return json.dumps(obj, ensure_ascii=False).encode(encoding)
|
||||
|
||||
def _json_decode(self, json_bytes, encoding):
|
||||
tiow = io.TextIOWrapper(
|
||||
io.BytesIO(json_bytes), encoding=encoding, newline=""
|
||||
)
|
||||
obj = json.load(tiow)
|
||||
tiow.close()
|
||||
return obj
|
||||
|
||||
def _create_message(self, *, content_bytes, content_type, content_encoding):
|
||||
jsonheader = {
|
||||
"byteorder": sys.byteorder,
|
||||
"content-type": content_type,
|
||||
"content-encoding": content_encoding,
|
||||
"content-length": len(content_bytes),
|
||||
}
|
||||
jsonheader_bytes = self._json_encode(jsonheader, "utf-8")
|
||||
message_hdr = struct.pack(">H", len(jsonheader_bytes))
|
||||
message = message_hdr + jsonheader_bytes + content_bytes
|
||||
return message
|
||||
|
||||
def _process_response_json_content(self):
|
||||
content = self.response
|
||||
result = content.get("result")
|
||||
print(f"got result: {result}")
|
||||
|
||||
def _process_response_binary_content(self):
|
||||
content = self.response
|
||||
shape = tuple(map(int, self.jsonheader["content_shape"].strip("()").split(",")))
|
||||
self.payload = np.frombuffer(content, dtype=self.jsonheader["content_dtype"]).reshape(shape)
|
||||
|
||||
|
||||
def process_events(self, mask):
|
||||
if mask & selectors.EVENT_READ:
|
||||
self.read()
|
||||
if mask & selectors.EVENT_WRITE:
|
||||
self.write()
|
||||
|
||||
def read(self):
|
||||
self._read()
|
||||
|
||||
if self._jsonheader_len is None:
|
||||
self.process_protoheader()
|
||||
|
||||
if self._jsonheader_len is not None:
|
||||
if self.jsonheader is None:
|
||||
self.process_jsonheader()
|
||||
|
||||
if self.jsonheader:
|
||||
if self.response is None:
|
||||
self.process_response()
|
||||
|
||||
def write(self):
|
||||
if not self._request_queued:
|
||||
self.queue_request()
|
||||
|
||||
self._write()
|
||||
|
||||
if self._request_queued:
|
||||
if not self._send_buffer:
|
||||
# Set selector to listen for read events, we're done writing.
|
||||
self._set_selector_events_mask("r")
|
||||
|
||||
def close(self):
|
||||
print("closing connection to", self.addr)
|
||||
try:
|
||||
self.selector.unregister(self.sock)
|
||||
except Exception as e:
|
||||
print(
|
||||
"error: selector.unregister() exception for",
|
||||
f"{self.addr}: {repr(e)}",
|
||||
)
|
||||
|
||||
try:
|
||||
self.sock.close()
|
||||
except OSError as e:
|
||||
print(
|
||||
"error: socket.close() exception for",
|
||||
f"{self.addr}: {repr(e)}",
|
||||
)
|
||||
finally:
|
||||
# Delete reference to socket object for garbage collection
|
||||
self.sock = None
|
||||
|
||||
def queue_request(self):
|
||||
content = self.request["content"]
|
||||
content_type = self.request["type"]
|
||||
content_encoding = self.request["encoding"]
|
||||
req = {
|
||||
"content_bytes": self._json_encode(content, "utf-8"),
|
||||
"content_type": content_type,
|
||||
"content_encoding": content_encoding,
|
||||
}
|
||||
message = self._create_message(**req)
|
||||
self._send_buffer += message
|
||||
self._request_queued = True
|
||||
|
||||
def process_protoheader(self):
|
||||
hdrlen = 2
|
||||
if len(self._recv_buffer) >= hdrlen:
|
||||
self._jsonheader_len = struct.unpack(
|
||||
">H", self._recv_buffer[:hdrlen]
|
||||
)[0]
|
||||
self._recv_buffer = self._recv_buffer[hdrlen:]
|
||||
|
||||
def process_jsonheader(self):
|
||||
hdrlen = self._jsonheader_len
|
||||
if len(self._recv_buffer) >= hdrlen:
|
||||
self.jsonheader = self._json_decode(
|
||||
self._recv_buffer[:hdrlen], "utf-8"
|
||||
)
|
||||
self._recv_buffer = self._recv_buffer[hdrlen:]
|
||||
for reqhdr in ("byteorder", "content-length", "content-type", "content-encoding",):
|
||||
if reqhdr not in self.jsonheader:
|
||||
raise ValueError(f'Missing required header "{reqhdr}".')
|
||||
|
||||
def process_response(self):
|
||||
content_len = self.jsonheader["content-length"]
|
||||
if not len(self._recv_buffer) >= content_len:
|
||||
return
|
||||
data = self._recv_buffer[:content_len]
|
||||
self._recv_buffer = self._recv_buffer[content_len:]
|
||||
|
||||
# Binary or unknown content-type
|
||||
self.response = data
|
||||
print(
|
||||
f'received {self.jsonheader["content-type"]} response from',
|
||||
self.addr,
|
||||
)
|
||||
self._process_response_binary_content()
|
||||
# Close when response has been processed
|
||||
self.close()
|
206
fishbook/dataserver/libserver.py
Normal file
206
fishbook/dataserver/libserver.py
Normal file
@ -0,0 +1,206 @@
|
||||
import sys
|
||||
import selectors
|
||||
import json
|
||||
import io
|
||||
import struct
|
||||
import numpy as np
|
||||
from IPython import embed
|
||||
|
||||
|
||||
class Message:
|
||||
def __init__(self, selector, sock, addr):
|
||||
self.selector = selector
|
||||
self.sock = sock
|
||||
self.addr = addr
|
||||
self._recv_buffer = b""
|
||||
self._send_buffer = b""
|
||||
self._jsonheader_len = None
|
||||
self.jsonheader = None
|
||||
self.request = None
|
||||
self.response_created = False
|
||||
|
||||
def _set_selector_events_mask(self, mode):
|
||||
"""Set selector to listen for events: mode is 'r', 'w', or 'rw'."""
|
||||
if mode == "r":
|
||||
events = selectors.EVENT_READ
|
||||
elif mode == "w":
|
||||
events = selectors.EVENT_WRITE
|
||||
elif mode == "rw":
|
||||
events = selectors.EVENT_READ | selectors.EVENT_WRITE
|
||||
else:
|
||||
raise ValueError(f"Invalid events mask mode {repr(mode)}.")
|
||||
self.selector.modify(self.sock, events, data=self)
|
||||
|
||||
def _read(self):
|
||||
try:
|
||||
# Should be ready to read
|
||||
data = self.sock.recv(4096)
|
||||
except BlockingIOError:
|
||||
# Resource temporarily unavailable (errno EWOULDBLOCK)
|
||||
pass
|
||||
else:
|
||||
if data:
|
||||
self._recv_buffer += data
|
||||
else:
|
||||
raise RuntimeError("Peer closed.")
|
||||
|
||||
def _write(self):
|
||||
if self._send_buffer:
|
||||
print("sending response to", self.addr)
|
||||
try:
|
||||
# Should be ready to write
|
||||
sent = self.sock.send(self._send_buffer)
|
||||
except BlockingIOError:
|
||||
# Resource temporarily unavailable (errno EWOULDBLOCK)
|
||||
pass
|
||||
else:
|
||||
self._send_buffer = self._send_buffer[sent:]
|
||||
# Close when the buffer is drained. The response has been sent.
|
||||
if sent and not self._send_buffer:
|
||||
self.close()
|
||||
|
||||
def _json_encode(self, obj, encoding):
|
||||
return json.dumps(obj, ensure_ascii=False).encode(encoding)
|
||||
|
||||
def _json_decode(self, json_bytes, encoding):
|
||||
tiow = io.TextIOWrapper(
|
||||
io.BytesIO(json_bytes), encoding=encoding, newline=""
|
||||
)
|
||||
obj = json.load(tiow)
|
||||
tiow.close()
|
||||
return obj
|
||||
|
||||
def _create_message(self, *, content_bytes, content_type, content_encoding, content_dtype, content_shape):
|
||||
jsonheader = {
|
||||
"byteorder": sys.byteorder,
|
||||
"content-type": content_type,
|
||||
"content-encoding": content_encoding,
|
||||
"content-length": len(content_bytes),
|
||||
"content_dtype": content_dtype,
|
||||
"content_shape": content_shape,
|
||||
}
|
||||
jsonheader_bytes = self._json_encode(jsonheader, "utf-8")
|
||||
message_hdr = struct.pack(">H", len(jsonheader_bytes))
|
||||
message = message_hdr + jsonheader_bytes + content_bytes
|
||||
return message
|
||||
|
||||
"""
|
||||
def _create_response_json_content(self):
|
||||
action = self.request.get("action")
|
||||
if action == "search":
|
||||
query = self.request.get("value")
|
||||
answer = request_search.get(query) or f'No match for "{query}".'
|
||||
content = {"result": answer}
|
||||
else:
|
||||
content = {"result": f'Error: invalid action "{action}".'}
|
||||
content_encoding = "utf-8"
|
||||
response = {
|
||||
"content_bytes": self._json_encode(content, content_encoding),
|
||||
"content_type": "text/json",
|
||||
"content_encoding": content_encoding,
|
||||
}
|
||||
return response
|
||||
"""
|
||||
|
||||
def _create_response_binary_content(self):
|
||||
embed()
|
||||
data = np.random.randn(100, 10)
|
||||
response = {
|
||||
"content_bytes": data.tobytes(),
|
||||
"content_type": "binary/custom-server-binary-type",
|
||||
"content_encoding": "binary",
|
||||
"content_dtype": data.dtype.str,
|
||||
"content_shape": str(data.shape),
|
||||
}
|
||||
return response
|
||||
|
||||
def process_events(self, mask):
|
||||
if mask & selectors.EVENT_READ:
|
||||
self.read()
|
||||
if mask & selectors.EVENT_WRITE:
|
||||
self.write()
|
||||
|
||||
def read(self):
|
||||
self._read()
|
||||
print("reading a message!")
|
||||
if self._jsonheader_len is None:
|
||||
self.process_protoheader()
|
||||
print(self._jsonheader_len)
|
||||
if self._jsonheader_len is not None:
|
||||
if self.jsonheader is None:
|
||||
self.process_jsonheader()
|
||||
print(self.jsonheader)
|
||||
if self.jsonheader:
|
||||
if self.request is None:
|
||||
self.process_request()
|
||||
|
||||
def write(self):
|
||||
if self.request:
|
||||
if not self.response_created:
|
||||
self.create_response()
|
||||
|
||||
self._write()
|
||||
|
||||
def close(self):
|
||||
print("closing connection to", self.addr)
|
||||
try:
|
||||
self.selector.unregister(self.sock)
|
||||
except Exception as e:
|
||||
print(
|
||||
"error: selector.unregister() exception for",
|
||||
f"{self.addr}: {repr(e)}",
|
||||
)
|
||||
|
||||
try:
|
||||
self.sock.close()
|
||||
except OSError as e:
|
||||
print(
|
||||
"error: socket.close() exception for",
|
||||
f"{self.addr}: {repr(e)}",
|
||||
)
|
||||
finally:
|
||||
# Delete reference to socket object for garbage collection
|
||||
self.sock = None
|
||||
|
||||
def process_protoheader(self):
|
||||
hdrlen = 2
|
||||
if len(self._recv_buffer) >= hdrlen:
|
||||
self._jsonheader_len = struct.unpack(">H", self._recv_buffer[:hdrlen])[0]
|
||||
self._recv_buffer = self._recv_buffer[hdrlen:]
|
||||
|
||||
def process_jsonheader(self):
|
||||
hdrlen = self._jsonheader_len
|
||||
if len(self._recv_buffer) >= hdrlen:
|
||||
self.jsonheader = self._json_decode(
|
||||
self._recv_buffer[:hdrlen], "utf-8"
|
||||
)
|
||||
self._recv_buffer = self._recv_buffer[hdrlen:]
|
||||
for reqhdr in ("byteorder", "content-length", "content-type", "content-encoding"):
|
||||
if reqhdr not in self.jsonheader:
|
||||
raise ValueError(f'Missing required header "{reqhdr}".')
|
||||
|
||||
def process_request(self):
|
||||
content_len = self.jsonheader["content-length"]
|
||||
if not len(self._recv_buffer) >= content_len:
|
||||
return
|
||||
data = self._recv_buffer[:content_len]
|
||||
self._recv_buffer = self._recv_buffer[content_len:]
|
||||
if self.jsonheader["content-type"] == "text/json":
|
||||
encoding = self.jsonheader["content-encoding"]
|
||||
self.request = self._json_decode(data, encoding)
|
||||
print("received request", repr(self.request), "from", self.addr)
|
||||
else:
|
||||
# Binary or unknown content-type
|
||||
self.request = data
|
||||
print(
|
||||
f'received {self.jsonheader["content-type"]} request from',
|
||||
self.addr,
|
||||
)
|
||||
# Set selector to listen for write events, we're done reading.
|
||||
self._set_selector_events_mask("w")
|
||||
|
||||
def create_response(self):
|
||||
response = self._create_response_binary_content()
|
||||
message = self._create_message(**response)
|
||||
self.response_created = True
|
||||
self._send_buffer += message
|
57
fishbook/dataserver/server.py
Executable file
57
fishbook/dataserver/server.py
Executable file
@ -0,0 +1,57 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import sys
|
||||
import socket
|
||||
import selectors
|
||||
import traceback
|
||||
|
||||
import libserver
|
||||
|
||||
sel = selectors.DefaultSelector()
|
||||
|
||||
|
||||
def accept_wrapper(sock):
|
||||
conn, addr = sock.accept() # Should be ready to read
|
||||
print("accepted connection from", addr)
|
||||
conn.setblocking(False)
|
||||
message = libserver.Message(sel, conn, addr)
|
||||
sel.register(conn, selectors.EVENT_READ, data=message)
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) != 3:
|
||||
print("usage:", sys.argv[0], "<host> <port>")
|
||||
sys.exit(1)
|
||||
|
||||
host, port = sys.argv[1], int(sys.argv[2])
|
||||
lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
# Avoid bind() exception: OSError: [Errno 48] Address already in use
|
||||
lsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
lsock.bind((host, port))
|
||||
lsock.listen()
|
||||
print("listening on", (host, port))
|
||||
lsock.setblocking(False)
|
||||
sel.register(lsock, selectors.EVENT_READ, data=None)
|
||||
|
||||
try:
|
||||
while True:
|
||||
events = sel.select(timeout=None)
|
||||
for key, mask in events:
|
||||
if key.data is None:
|
||||
accept_wrapper(key.fileobj)
|
||||
else:
|
||||
message = key.data
|
||||
try:
|
||||
message.process_events(mask)
|
||||
except Exception:
|
||||
print("main: error: exception for",
|
||||
f"{message.addr}:\n{traceback.format_exc()}",)
|
||||
message.close()
|
||||
except KeyboardInterrupt:
|
||||
print("caught keyboard interrupt, exiting")
|
||||
finally:
|
||||
sel.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
Reference in New Issue
Block a user