-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkorlan.py
183 lines (160 loc) · 8.34 KB
/
korlan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
from tkinter import *
import subprocess
import can
import re
import platform
if platform.system() == 'Windows':
OS = 'Windows'
elif platform.system() == "Linux":
OS = 'Linux'
if OS == 'Windows':
import win32com.client
if OS == 'Windows':
def get_usb_ids(usb_VIDPID = "VID_0483&PID_1234"): # Korlan VID_0483&PID_1234
usb_ids = []
wmi = win32com.client.GetObject ("winmgmts:")
for usb in wmi.InstancesOf ("win32_usbcontrollerdevice"):
if usb_VIDPID in usb.Dependent:
usb_ids.append(usb.Dependent.split('\\\\')[-1][:-1])
return usb_ids
def get_bus(bus, id='D3365AFB', rate=1000000, bus_flags=0, dll_path='./usb2can.dll', filters=[]):
# vendor string example: '2.4;2.0;2.0.0;2.0.0;8devices.com'
if bus != None:
bus.shutdown()
bus = can.interface.Bus(bustype="usb2can", channel=id, bitrate=rate, dll=dll_path,
flags=bus_flags, can_filters=filters)
try:
# Function returns bytes type value (pointer to a vendor string). It needs to be decoded to string.
vs_pointer = bus.can.get_vendor_string()
vendor_string = vs_pointer.decode()
except Exception as e:
vendor_string = '2.x;2.x;2.x.x;2.x.x;8devices.com'
vendor_list = vendor_string.replace(':', ';').split(';')
explanation = ['Firmware version','Hardware version','Canal version','DLL version','Device vendor']
return bus, zip(explanation, vendor_list)
elif OS == 'Linux':
def get_usb2can_devices():
usb_devices_info = [] # Processed usb-devices data. 2D list.
global usb2can_devices
usb2can_devices = []
usb_devices_string = str(subprocess.run(["usb-devices"], stdout=subprocess.PIPE)) # usb-devices command output.
# usb-devices output devided to list, where one item describes one device.
usb_devices_list = usb_devices_string.split(r'\n\n')
for block in usb_devices_list:
new_block = []
list_of_rows = re.split(r"[TDPSCI]: ", block) # Each device information block is devided into rows.
list_of_rows = list(filter(None, list_of_rows)) # Remove empty strings from list.
for row in list_of_rows:
row = row.replace('\\n','') # Remove unwanted symbols.
new_block.append(row)
usb_devices_info.append(new_block)
usb_devices_info[0].pop(0) # Remove irrelevant string.
for block in usb_devices_info: # Find usb2can devices
if "Product=USB2CAN converter" in block[4]: # Fifth row of usb device's information block
usb2can_devices.append(block)
def get_usb_ids():
get_usb2can_devices()
# Dictionary that this function returns.
# Key - korlan's serial number. Value - socketcan interface (can0; can1; etc.)
usb2can_ids = {}
i = 0
for korlan in usb2can_devices:
usb2can_ids[korlan[5].split("SerialNumber=", 1)[1]] = 'can' + str(i)
i = i + 1
return usb2can_ids
def get_bus(bus, id='can0', new_rate=1000000, is_listen_only=False, is_loopback=False, is_one_shot=False,
filters=None):
if bus is not None:
bus.shutdown()
# if "DOWN" in str(subprocess.run(['ip', 'addr', 'ls', 'dev', id], stdout=subprocess.PIPE)):
# current_rate = 0
# else:
# current_rate = pysc.Interface(id).baud
# if int(current_rate) != new_rate:
listen_only_on_off = "on" if is_listen_only else "off"
loopback_on_off = "on" if is_loopback else "off"
oneshot_on_off = "on" if is_one_shot else "off"
subprocess.run(['ip', 'link', 'set', 'down', id])
subprocess.run(['ip', 'link', 'set', id, 'type', 'can', 'bitrate', str(new_rate), 'listen-only',
listen_only_on_off, 'loopback', loopback_on_off, 'one-shot', oneshot_on_off])
subprocess.run(['ip', 'link', 'set', 'up', id])
bus = can.interface.Bus(interface="socketcan", channel=id, can_filters=filters) # Create socketcan bus interface
vendor_list = []
try:
device = int(id.split('can', 1)[1])
vendor_list.append(usb2can_devices[device][1].split(' ')[1]) # Get hardware version
vendor_list.append(usb2can_devices[device][3].split('Manufacturer=', 1)[1]) # Get manufacturer
except Exception:
vendor_list = ['2.x', '8devices.com']
explanation = ['Hardware version', 'Manufacturer']
return bus, zip(explanation, vendor_list)
def get_raw_statistics(can_id):
# Raw statistics is taken with bash command
can_interface_statistics_string = str(subprocess.run(['ifconfig', can_id], stdout=subprocess.PIPE))
rx_frames = int(can_interface_statistics_string.split('RX packets')[1].split()[0])
rx_bytes = int(can_interface_statistics_string.split('bytes')[1].split()[0])
tx_frames = int(can_interface_statistics_string.split('TX packets')[1].split()[0])
tx_bytes = int(can_interface_statistics_string.split('bytes')[2].split()[0])
bus_overr = int(can_interface_statistics_string.split('overruns')[1].split()[0]) + \
int(can_interface_statistics_string.split('overruns')[2].split()[0])
rx_err = int(can_interface_statistics_string.split('RX errors')[1].split()[0])
tx_err = int(can_interface_statistics_string.split('TX errors')[1].split()[0])
return rx_frames, rx_bytes, tx_frames, tx_bytes, bus_overr, rx_err, tx_err
def get_statistics(can_id, rx_frames_count_starting_point, rx_bytes_count_starting_point,
tx_frames_count_starting_point, tx_bytes_count_starting_point, bus_overr_count_starting_point):
# Bash command shows total connection time statistics.
# If user wants to reset statistics, it has to be counted from the new, last marked, point.
rx_frames_raw, rx_bytes_raw, tx_frames_raw, tx_bytes_raw, bus_overr_raw, rx_err, tx_err = get_raw_statistics(can_id)
rx_frames = rx_frames_raw - rx_frames_count_starting_point
rx_bytes = rx_bytes_raw - rx_bytes_count_starting_point
tx_frames = tx_frames_raw - tx_frames_count_starting_point
tx_bytes = tx_bytes_raw - tx_bytes_count_starting_point
bus_overr = bus_overr_raw - bus_overr_count_starting_point
return rx_frames, rx_bytes, tx_frames, tx_bytes, bus_overr, rx_err, tx_err
bit_rates = [1000, 800, 500, 250, 125, 62.5, 20, 10]
bit_rates_menu = [f'{x} kbit/s' for x in bit_rates]
def rx_msgs(bus, main_thread, stop, q, obdii=False):
pid_reply = 0x7E8
try:
# print("RX startup")
while True:
if stop():
# print('RX shutdown')
break
msgl=[]
msg = bus.recv(0.5)
if msg is not None:
if msg.is_error_frame is True or obdii is True and msg.arbitration_id != pid_reply:
continue
msg_flags = f"{'X' if msg.is_extended_id else '.'}" \
f"{'R' if msg.is_remote_frame else '.'}" \
f"{'E' if msg.is_error_frame else '.'}"
msgl.append('r') # RX msg
msgl.append(msg.timestamp)
msgl.append(msg_flags)
msgl.append(msg.arbitration_id)
msgl.append(msg.dlc)
msgl.append(msg.data)
q.put(msgl)
# Generate event so that received message would be shown in a Treeview
main_thread.event_generate('<<CAN_RX_event>>', when='tail')
except Exception as e:
print(f"Korlan RX exception {e}", e)
def tx_msg(bus, q, id, rem_data, ext_id):
try:
msg = can.Message(arbitration_id = id, data=rem_data, is_extended_id=ext_id)
bus.send(msg,0.5)
# print(f"Message sent on{t} {bus.channel_info}\n{msg}")
msgl=[]
msg_flags = f"{'X' if msg.is_extended_id else '.'}" \
f"{'R' if msg.is_remote_frame else '.'}" \
f"{'E' if msg.is_error_frame else '.'}"
msgl.append('T') # TX msg
msgl.append(msg.timestamp)
msgl.append(msg_flags)
msgl.append(msg.arbitration_id)
msgl.append(msg.dlc)
msgl.append(msg.data)
q.put(msgl)
except can.CanError:
print("Message NOT sent")