Commit d3f06a03 authored by Yori 'AGy' Fournier's avatar Yori 'AGy' Fournier
Browse files

add some idea of tests for the serverInterface

add the serverInterface
modified server
modified __init__

under testing phase
parent b8e02dec
......@@ -63,6 +63,9 @@ if D_HIERARCHY in ('CLIENT', 'client', 'LOCAL', 'local'):
from matplotlib.figure import Figure
from matplotlib import is_interactive
from matplotlib import use
from .config import D_HOST, D_PORT
from socket import socket, AF_INET, SOCK_STREAM
elif D_HIERARCHY in ('SERVER', 'server'):
print(INFO+'Import myplotlib as a server.')
......@@ -99,6 +102,9 @@ if D_HIERARCHY in ('CLIENT', 'client'):
from .myFig import MyFig_client as MyFig
elif(D_HIERARCHY in ('SERVER', 'server')):
# for testing purpose
from .test.myIOs import readStupidData
# MyAxes: Overlay on matplotlib.Axes class
from .myAxes_server import MyAxes_server as MyAxes
......
......@@ -33,12 +33,50 @@ class Server():
conn.close()
break
else:
self.treatAnswer()
answer = self.treatAnswer()
self.soc.sendall(answer)
def treatAnswer(self):
answer = None
try:
exec(str(self.answer))
(header, content) = exec(str(self.answer))
except:
print(INFO+"I could not execute the command")
print(INFO+'Received: '+str(self.answer))
print(WARN+"I could not execute the command")
print(SPCE+'Received: '+str(self.answer))
print(SPCE+"I expect '(header, content)'")
answer = ('readData', None, 'answer not correct format')
if header in ('readData', 'READATA', 'readdata'):
try:
(dataName, functionName, args, kwargs) = exec(str(content))
except:
print(WARN+"The content of readData could not be extracted")
print(SPCE+'Received: '+str(content))
print(SPCE+"I expect '(dataName, functionName, args, kwargs)'")
answer = ('readData', None, 'could not extract content')
try:
if args is not None:
if kwargs is not None:
G_RAWDATA[dataName] = exec(str(functionName)+'('+str(args)+', '+str(kwargs)+')')
else:
G_RAWDATA[dataName] = exec(str(functionName)+'('+str(args)+')')
else:
if kwargs is not None:
G_RAWDATA[dataName] = exec(str(functionName)+'('+str(kwargs)+')')
else:
G_RAWDATA[dataName] = exec(str(functionName)+'()')
answer = ('readData', 'dataName', 'no error')
except:
print(WARN+"The read function could not be executed")
answer = ('readData', None, 'function could not be executed')
else:
print(WARN+"I don't know ths signal, sorry")
return(answer)
from socket import socket, AF_INET, SOCK_STREAM
class ServerInterface(object):
def __init__(self, ip, port):
self.ip = ip
self.port = port
# just a list of names
self.currentRemoteData = []
def readData(self, ioFunction, dataName, *args, **kwargs):
content = '['+str(dataName)+', '+ioFunction.__name__+', '+str(args)+', '+str(kwargs)+']'
signal = "("+'readData'+', '+str(content)+')'
# create signal
sock = self.sendSignal(signal)
(answer, errmsg) = self.waitForAnswer(sock)
dataName = answer
# add data in the currentSyncData interface
if dataName is None:
print(SEVR+errmsg)
status=False
else:
self.currentSyncData.append(dataName)
status=True
return(True)
def newSyncFigure(self, figClass, symbolicRawdata, **kwargs):
syncFigure = figClass(symbolicRawdata, **kwargs)
# create a socket
# connect to ip:port
# send signal with parameters
# wait for answer:
# response is [ID, errormsg]
# ID = -1 no figure were created
# maybe error msg?
ID = 1
# if ok then create syncFigure.
# add the identifier of the server-side figure
syncFigure.syncID = ID
# return the syncFigure
return(syncFigure)
def sendSignal(self, signal):
sock = socket(AF_INET, SOCK_STREAM)
sock.connect((self.ip, self.port))
sock.sendall(str(signal))
return(sock)
def waitForAnswer(self, sock):
answer = sock.recv(1024)
return(answer)
from .. import os
from .. import D_OFORMAT, D_OPATH
from .. import MyData
from .. import SEVR, WARN, DBUG, SPCE
# TESTER
def myTest(function, expected, failed, debug):
if debug:
status = function(debug)
if status is not expected:
failed.append(str(function.__name__))
return(False)
else:
return(True)
else:
try:
status = function(debug)
if status is not expected:
failed.append(str(function.__name__))
return(False)
else:
return(True)
except:
print(WARN + str(function.__name__) + " caught exception.")
failed.append(str(function.__name__))
return(False)
return(True)
# TEST 1.* for testing MyFig_server
# TEST 2.* for testing MyAxes_server
# TEST 1.00: Test server.requestServerInterface
def test100(debug):
status = True
wrongIP = 15.12.12.12.42
wrongPort = 22
correctIP = 127.0.0.1
correctPort = 50824
server = requestServerInterface(wrongIP, correctPORT)
if (server is None) or (not isinstance(server, ServerInterface)):
status = True
else:
status = False
server = requestServerInterface(correctIP, wrongPORT)
if (server is None) or (not isinstance(server, ServerInterface)):
status = status and True
else:
status = False
server = requestServerInterface(correctIP, correctPORT)
if (server is None) or (not isinstance(server, ServerInterface)):
status = False
else:
status = status and True
return(status)
# Test readData
def test101(debug):
status = True
IP = 127.0.0.1
Port = 50824
server = requestServerInterface(IP, PORT)
status = server.readData(readStupidData, 'data-name', 42, option=True)
return(status)
# Test listCurrentData
def test102(debug):
status = True
IP = 127.0.0.1
Port = 50824
server = requestServerInterface(IP, PORT)
server.readData(readStupidData, 'data-name', 42, option=True)
listOfData = server.listCurrentData()
if listOfData != ['data-name',]:
status = False
return(status)
# test newFigure
def test103(debug):
status = True
IP = 127.0.0.1
Port = 50824
server = requestServerInterface(IP, PORT)
server.readData(readStupidData, 'data-name', 42, option=True)
listOfData = server.listCurrentData()
myTestFig = server.newFigure(FigTestRemote, ('data-name', ), xRange = [2.,3.])
return(status)
# test listCurrentFigures
def test104(debug):
status = True
IP = 127.0.0.1
Port = 50824
server = requestServerInterface(IP, PORT)
server.readData(readStupidData, 'data-name', 42, option=True)
listOfData = server.listCurrentData()
mySyncFig = newFigure(FigTestRemote, ('data-name', ), xRange = [2.,3.])
listOfCurrentFigures = server.listCurrentFigures()
if listOfCurrentFigures != [mySyncFig,]:
status = False
return(status)
# test plotting
def test105(debug):
status = True
IP = 127.0.0.1
Port = 50824
server = requestServerInterface(IP, PORT)
server.readData(readStupidData, 'data-name', 42, option=True)
listOfData = server.listCurrentData()
mySyncFig = newFigure(FigTestRemote, ('data-name', ), xRange = [2.,3.])
listOfCurrentFigures = server.listCurrentFigures()
status = server.listCurrentFigures()[0].plot()
# this last command should be equivalent to
# mySyncFig.plot()
return(status)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment