@@ -12,10 +12,16 @@ import signal
import socket
import sys
import multiprocessing
+import logging
from .connection import StreamConnection, WebsocketConnection
from .exceptions import ClientError, ServerError, ConnectionClosedError
+class ClientLoggerAdapter(logging.LoggerAdapter):
+ def process(self, msg, kwargs):
+ return f"[Client {self.extra['address']}] {msg}", kwargs
+
+
class AsyncServerConnection(object):
# If a handler returns this object (e.g. `return self.NO_RESPONSE`), no
# return message will be automatically be sent back to the client
@@ -27,7 +33,12 @@ class AsyncServerConnection(object):
self.handlers = {
"ping": self.handle_ping,
}
- self.logger = logger
+ self.logger = ClientLoggerAdapter(
+ logger,
+ {
+ "address": socket.address,
+ },
+ )
async def close(self):
await self.socket.close()
@@ -242,16 +253,20 @@ class AsyncServer(object):
self.server = WebsocketsServer(host, port, self._client_handler, self.logger)
async def _client_handler(self, socket):
+ address = socket.address
try:
client = self.accept_client(socket)
await client.process_requests()
except Exception as e:
import traceback
- self.logger.error("Error from client: %s" % str(e), exc_info=True)
+ self.logger.error(
+ "Error from client %s: %s" % (address, str(e)), exc_info=True
+ )
traceback.print_exc()
+ finally:
+ self.logger.debug("Client %s disconnected", address)
await socket.close()
- self.logger.debug("Client disconnected")
@abc.abstractmethod
def accept_client(self, socket):
@@ -207,7 +207,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
async def dispatch_message(self, msg):
for k in self.handlers.keys():
if k in msg:
- logger.debug('Handling %s' % k)
+ self.logger.debug('Handling %s' % k)
if 'stream' in k:
return await self.handlers[k](msg[k])
else:
@@ -351,7 +351,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
break
(method, taskhash) = l.split()
- #logger.debug('Looking up %s %s' % (method, taskhash))
+ #self.logger.debug('Looking up %s %s' % (method, taskhash))
cursor = self.db.cursor()
try:
row = self.query_equivalent(cursor, method, taskhash)
@@ -360,7 +360,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
if row is not None:
msg = row['unihash']
- #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
+ #self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
elif self.upstream_client is not None:
upstream = await self.upstream_client.get_unihash(method, taskhash)
if upstream:
@@ -480,8 +480,8 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
row = self.query_equivalent(cursor, data['method'], data['taskhash'])
if row['unihash'] == data['unihash']:
- logger.info('Adding taskhash equivalence for %s with unihash %s',
- data['taskhash'], row['unihash'])
+ self.logger.info('Adding taskhash equivalence for %s with unihash %s',
+ data['taskhash'], row['unihash'])
d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}
Adds a logging adaptor to the asyncrpc clients that prefixes log messages with the client remote address to aid in debugging Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> --- lib/bb/asyncrpc/serv.py | 21 ++++++++++++++++++--- lib/hashserv/server.py | 10 +++++----- 2 files changed, 23 insertions(+), 8 deletions(-)