Files
appRobotDriver/sendTest_Client.py
2026-02-01 13:25:03 +01:00

88 lines
2.2 KiB
Python
Executable File

import websocket, json
import time
import can
def on_message(ws, message):
j = message
if(type(message) == str):
j = json.loads(message)
if(type(j) == list):
for x in j:
on_message(ws, x)
else:
print(j)
if 'id' not in j:
print("Message ohne ID empfangen. Wird ignoriert.")
return
if 'cmd' not in j:
print("Message ohne CMD empfangen. Wird ignoriert.")
return
id = j['id'] # 0x01
cmd = j['cmd'] # 0xf6 # Speed Mode Command
if "data" in j:
dat = j["data"] # [0x02,0x80,0x02]
data = [cmd] + dat
else:
data = [cmd]
dlc = len(data)+1
cr = id
for i in data:
cr += i
CRC = cr & 0xFF
msg = can.Message(arbitration_id=id, dlc=dlc, data=data+[CRC], is_extended_id=False)
try:
bus.send(msg)
#print(f"CAN Message sent on {bus.channel_info}")
except can.CanError:
print("CAN Message NOT sent: " + str(can.CanError))
print("CAN Message NOT sent: " + str(can.CanError.error_code))
start_time = curtime = time.time()
while 1:
tdiff = start_time + 0.001 - curtime
if tdiff <= 0.:
break
msg = bus.recv(tdiff)
curtime = time.time()
if (msg is None):
continue
print("CAN Reply is received:")
print(msg)
def on_error(ws, error):
print("WebSocket Error:" + error)
def on_close(a, b, c):
print("### WebSocket closed ###")
def on_open(ws):
print("WebSocket Connection open")
if __name__ == "__main__":
websocket.enableTrace(True)
ws = websocket.WebSocketApp("ws://01-KW-S0139:9875",
on_message = on_message,
on_error = on_error,
on_close = on_close)
ws.on_open = on_open
with can.interface.Bus(channel='can0', bustype='socketcan') as bus:
ws.run_forever()