-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathat_commands.py
485 lines (402 loc) · 15.6 KB
/
at_commands.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
###########################################################################
# Sint Wind PI
# Copyright 2012 by Tonino Tarsi <[email protected]>
# Modem comunications based on Slawek Ligus pyhumod-0.03 module
#
# Please refer to the LICENSE file for conditions
# Visit http://www.vololiberomontecucco.it
#
##########################################################################
"""Classes and methods for handling AT commands."""
import re
import errors
from warnings import warn
from TTLib import *
# Deprecated decorator.
def deprecated(dep_func):
"""Decorator used to mark functions as deprecated."""
def warn_and_run(*args, **kwargs):
"""Print warning and run the function."""
warn('%r is deprecated and scheduled for removal.' % \
dep_func.__name__)
return dep_func(*args, **kwargs)
return warn_and_run
class Command(object):
"""Class defining generic perations performed on AT commands."""
def __init__(self, modem, cmd, prefixed=True):
"""Constructor for Command class."""
self.cmd = cmd
self.modem = modem
self.prefixed = prefixed
def run(self):
r"""Send the AT command followed by the '\r' character to the modem.
Returns:
List of strings.
"""
self.modem.ctrl_port.read_waiting()
return self.modem.ctrl_port.send_at(self.cmd, '', self.prefixed)
def get(self):
r"""Send the AT command followed by the '?\r' characters to the modem.
Returns:
List of strings.
"""
self.modem.ctrl_port.read_waiting()
return self.modem.ctrl_port.send_at(self.cmd, '?', self.prefixed)
def set(self, value):
r"""Send the 'AT<+CMD>=<value>\r' string to the modem.
Returns:
List of strings.
"""
self.modem.ctrl_port.read_waiting()
return self.modem.ctrl_port.send_at(self.cmd, '=%s' % value,
self.prefixed)
def dsc(self):
r"""Send the AT command followed by the '=?\r' characters to the modem.
Returns:
List of strings.
"""
self.modem.ctrl_port.read_waiting()
return self.modem.ctrl_port.send_at(self.cmd, '=?', self.prefixed)
def _common_run(modem, at_cmd, prefixed=True):
"""Boilerplate for most methods based on Command.run()."""
info_cmd = Command(modem, at_cmd, prefixed)
modem.ctrl_lock.acquire()
try:
data = info_cmd.run()
return data
finally:
modem.ctrl_lock.release()
def _common_get(modem, at_cmd, prefixed=True):
"""Boilerplate for most methods based on Command.get()."""
data_cmd = Command(modem, at_cmd, prefixed)
modem.ctrl_lock.acquire()
try:
data = data_cmd.get()
return data
finally:
modem.ctrl_lock.release()
def _common_dsc(modem, at_cmd, prefixed=True):
"""Boilerplate for most methods based on Command.dsc()."""
data_cmd = Command(modem, at_cmd, prefixed)
modem.ctrl_lock.acquire()
try:
data = data_cmd.dsc()
return data
finally:
modem.ctrl_lock.release()
def _common_set(modem, at_cmd, value, prefixed=True):
"""Boilerplate for most methods based on Command.set()."""
modem.ctrl_lock.acquire()
try:
data = Command(modem, at_cmd, prefixed).set(value)
return data
finally:
modem.ctrl_lock.release()
class InteractiveCommands(object):
"""SIM interactive commands."""
ctrl_lock = None
ctrl_port = None
def sms_send(self, number, contents):
"""Send a text message from the modem.
Arguments:
number -- string with reciepent number,
contents -- text message body.
Returns:
Sent text message number since last counter reset.
"""
self.ctrl_lock.acquire()
try:
self.ctrl_port.write('AT+CMGS="%s"\r\n' % number)
# Perform a SIM test first.
self.ctrl_port.write(contents+chr(26))
result = self.ctrl_port.return_data()
# A text number is an integer number, returned in the
# last returned entry of the result, just after the ": " part.
text_number = int(result[-1].split(': ')[1])
return text_number
finally:
self.ctrl_lock.release()
def sms_list(self, message_type='ALL'):
"""List messages by type.
Arguments:
message_type -- one of the following strings:
'ALL' -- all messages,
'REC READ' -- read messages,
'REC UNREAD' -- unread messages,
'STO SENT' -- stored sent messages,
'STO UNSENT' -- stored unsent messages.
Returns:
list of string lists representing message headers.
"""
self.ctrl_lock.acquire()
try:
message_lister = Command(self, '+CMGL')
messages_data = message_lister.set('"%s"' % message_type)
return _enlist_data(messages_data, 4)
finally:
self.ctrl_lock.release()
def sms_read(self, message_num):
"""Read one message from the SIM.
Arguments:
message_num -- number of a message to read.
Returns:
message body (string) or None if the message isn't found.
"""
self.ctrl_lock.acquire()
try:
message_reader = Command(self, '+CMGR', prefixed=False)
message = message_reader.set(message_num)
# Slicing out the header.
return '\n'.join(message[1:])
finally:
self.ctrl_lock.release()
def sms_del(self, message_num):
"""Delete message from the SIM."""
msg_num_str = '%d' % message_num
_common_set(self, '+CMGD', msg_num_str)
@deprecated
def del_message(self, message_num):
"""Deprecated equivalent of sms_del."""
return self.sms_del(message_num)
@deprecated
def read_message(self, message_num):
"""Deprecated equivalent of sms_read."""
return self.sms_read(message_num)
@deprecated
def send_text(self, number, contents):
"""Deprecated equivalent of sms_send."""
return self.sms_send(number, contents)
@deprecated
def list_messages(self, message_type='ALL'):
"""Deprecated equivalent of sms_list."""
return self.sms_list(message_type)
def hangup(self):
"""Hang up."""
log( "Hang up")
_common_run(self, '+CHUP', prefixed=False)
def answer (self):
"""answer up."""
log( "answering ...")
_common_run(self, 'A', prefixed=False)
def pbent_read(self, start_index, end_index=None):
"""Read phonebook entries."""
return_range = True
if not end_index:
end_index = start_index
return_range = False
index_range = '%d,%d' % (start_index, end_index)
entries = _common_set(self, '+CPBR', index_range)
if start_index > end_index:
entries.reverse()
entries_list = _enlist_data(entries)
if return_range:
return entries_list
return entries_list[0]
def pbent_find(self, query=''):
"""Find phonebook entries matching a query string."""
entries = _common_set(self, '+CPBF', '"%s"' % query)
return _enlist_data(entries)
def pbent_write(self, index, number, text, numtype=145):
"""Write a phonebook entry."""
param = '%d,"%s",%d,"%s"' % (index, number, numtype, text)
_common_set(self, '+CPBW', param)
def pbent_del(self, index):
"""Clear out a phonebook entry."""
_common_set(self, '+CPBW', '%d' % index)
@deprecated
def find_pbent(self, query=''):
"""Deprecated equivalent of pbent_find."""
return self.pbent_find(query)
@deprecated
def read_pbent(self, start_index, end_index=None):
"""Deprecated equivalent of pbent_list."""
return self.pbent_read(start_index, end_index)
@deprecated
def del_pbent(self, index):
"""Deprecated equivalent of pbent_del."""
return self.pbent_del(index)
@deprecated
def write_pbent(self, index, number, text, numtype=145):
"""Deprecated equivalent of pbent_write."""
return self.pbent_write(index, number, text, numtype)
class ShowCommands(object):
"""Show methods extract static read-only data."""
def show_status(self):
"""Show Activity status of the mobile phone."""
return _common_run(self, '+CPAS', prefixed=True)[0]
def show_imei(self):
"""Show IMEI serial number."""
return _common_run(self, '+GSN', prefixed=False)[0]
def show_sn(self):
"""Show serial number."""
return _common_run(self, '^SN', prefixed=True)[0]
def show_manufacturer(self):
"""Show manufacturer name."""
return _common_run(self, '+GMI', prefixed=False)[0]
def show_model(self):
"""Show device model name."""
return _common_run(self, '+GMM', prefixed=False)[0]
def show_revision(self):
"""Show device revision."""
return _common_run(self, '+GMR', prefixed=False)[0]
def show_hardcoded_operators(self):
"""List operators hardcoded on the device."""
hard_ops_list = _common_run(self, '+COPN')
data = dict()
for entry in hard_ops_list:
num, op_name = [item[1:-1] for item in entry.split(',', 1)]
data[num] = op_name
return data
def show_who_locked(self):
"""Show which network operator has locked the device."""
locker_info = _common_dsc(self, '^CARDLOCK', prefixed=True)
if locker_info:
# Slice brackets off.
locker_info = locker_info[0][1:-1].split(',')
return locker_info
class SetCommands(object):
"""Set methods write user settings that are kept permanently."""
# pylint: disable-msg=R0913
def set_pdp_context(self, num, proto='IP', apn='', ip_addr='', d_comp=0,
h_comp=0):
"""Set Packet Data Protocol context."""
pdp_context_str = '%d,"%s","%s","%s",%d,%d' % (num, proto, apn,
ip_addr, d_comp, h_comp)
_common_set(self, '+CGDCONT', pdp_context_str)
def set_destination_port(self,port):
_common_set(self, '^DDSETEX', port)
def set_service_center(self, sca, tosca=145):
"""Set Service Center address and type.
Args:
sca -- String with service center address,
tosca -- Integer with SC type.
Raises:
AtCommandError -- if tosca contains an unknown value for SC type,
TypeError -- if SC type is not an integer.
"""
# Possible type of SC values:
# 128: unknown
# 129: national
# 145: international
# 161: national
if tosca not in (128, 129, 145, 161):
raise errors.AtCommandError('Unknown SC type: %i.' % tosca)
sca_str = '"%s",%i' % (sca, tosca)
_common_set(self, '+CSCA', sca_str)
class EnterCommands(object):
"""Enter methods write user settings that are kept until modem restarts."""
def enter_pin(self, pin, new_pin=None):
"""Enter or set new PIN."""
if new_pin:
set_arg = '"%d","%d"' % (pin, new_pin)
else:
set_arg = '"%d"' % pin
return _common_set(self, '+CPIN', set_arg)
def _common_enable(self, command, active, inactive, status,
active_set=None, inactive_set=None):
"""Enable, disable or check status of a setting."""
if not active_set:
active_set = active
if not inactive_set:
inactive_set = inactive
if status is None:
result = _common_get(self, command)[0]
return result == active
if status is True:
_common_set(self, command, active_set)
else:
_common_set(self, command, inactive_set)
def enable_nmi(self, status=None):
"""Enable, disable or check status on new message indications."""
return self._common_enable('+CNMI', '2,1,0,2,1', '0,0,0,0,0', status)
def enable_clip(self, status=None):
"""Enable, disable or check status of calling line identification."""
return self._common_enable('+CLIP', '1,1', '0,1', status, '1', '0')
def enable_textmode(self, status=None):
"""Enable, disable or find out about current mode."""
return self._common_enable('+CMGF', '1', '0', status)
@deprecated
def enter_text_mode(self):
"""Enter text mode."""
_common_set(self, '+CMGF', '1')
@deprecated
def enter_pdu_mode(self):
"""Enter PDU mode."""
_common_set(self, '+CMGF', '0')
class GetCommands(object):
"""Get methods read dynamic or user-set data."""
def get_networks(self):
"""Scan for networks."""
active_ops = _common_dsc(self, '+COPS')
bracket_group = re.compile('\(.+?\)')
if active_ops:
data = list()
network_data_list = bracket_group.findall(active_ops[0])
for network_data_set in network_data_list:
unbracketed_set = network_data_set[1:-1]
items = unbracketed_set.split(',')
if len(items) == 5:
transformed_set = [_transform(ni) for ni in items]
data.append(transformed_set)
return data
def get_clock(self):
"""Return internal modem clock."""
return _common_get(self, '+CCLK')[0]
def get_service_center(self):
"""Show service center number."""
sc_data = _common_get(self, '+CSCA')[0].split(',', 1)
service_center, sc_type_num = [_transform(item) for item in sc_data]
return service_center, sc_type_num
def get_detailed_error(self):
"""Print detailed error message."""
return _common_run(self, '+CEER')[0]
def get_rssi(self):
"""Show RSSI level."""
rssi_info = _common_run(self, '+CSQ')[0]
rssi = rssi_info.split(',', 1)
return int(rssi[0])
def get_pin_status(self):
"""Inform about PIN status.
Returns:
'READY' -- sim card ready to use,
'SIM PIN' -- PIN required,
'SIM PUK' -- PUK required.
"""
pin_info = _common_get(self, '+CPIN')[0]
return pin_info
def get_pdp_context(self):
"""Read PDP context entries."""
pdp_context_data = _common_get(self, '+CGDCONT')
data = _enlist_data(pdp_context_data)
return data
@deprecated
def get_mode(self):
"""Get current mode.
Returns:
0 -- PDU mode,
1 -- Text mode.
"""
current_mode = _common_get(self, '+CMGF')[0]
return int(current_mode)
def _transform(pdp_item):
"""Return a string if pdp_item starts with quotes or integer otherwise."""
if pdp_item:
if pdp_item.startswith('"'):
return pdp_item[1:-1]
else:
return int(pdp_item)
else:
return ''
def _enlist_data(string_list, max_split=None):
"""Transform data strings into data lists and return them."""
entries_list = list()
if max_split:
for entry in string_list:
entry_list = [_transform(item) for item
in entry.split(',', max_split)]
entries_list.append(entry_list)
else:
for entry in string_list:
entry_list = [_transform(item) for item in entry.split(',')]
entries_list.append(entry_list)
return entries_list