forked from trytonus/trytond-shipping-dpd
-
Notifications
You must be signed in to change notification settings - Fork 0
/
dpd_client.py
executable file
·149 lines (129 loc) · 4.44 KB
/
dpd_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# -*- coding: utf-8 -*-
from suds import WebFault
from suds.client import Client
class DPDException(WebFault):
"""
A generic exception type that can be handled by API consumers
"""
pass
class DPDClient(object):
"""
A DPD client API
:param username: DPD Username (Also known as DelisId)
:param password: DPD Password
"""
# Caches for properties
_login_service_client = None
_shipment_service_client = None
_depot_data_service_client = None
_parcel_shop_finder_client = None
_token = None
def __init__(self, login_wsdl, shipment_service_wsdl,
depot_data_service_wsdl, parcel_shop_finder_service_wsdl,
username, password,
message_language='en_US'):
self.login_wsdl = login_wsdl
self.shipment_service_wsdl = shipment_service_wsdl
self.depot_data_service_wsdl = depot_data_service_wsdl
self.parcel_shop_finder_service_wsdl = parcel_shop_finder_service_wsdl
self.username = username
self.password = password
self.message_language = message_language
@property
def login_service_client(self):
"""
Returns a login service client
"""
if self._login_service_client is None:
self._login_service_client = Client(self.login_wsdl)
return self._login_service_client
@property
def shipment_service_client(self):
"""
Returns a shipment service client
"""
if self._shipment_service_client is None:
self._shipment_service_client = Client(self.shipment_service_wsdl)
return self._shipment_service_client
@property
def depot_data_service_client(self): # pragma: no cover
"""
Returns a depot data service client
"""
if self._depot_data_service_client is None:
self._depot_data_service_client = Client(
self.depot_data_service_wsdl
)
return self._depot_data_service_client
@property
def parcel_shop_finder_client(self): # pragma: no cover
"""
Returns a parcel shop finder client
"""
if self._parcel_shop_finder_client is None:
self._parcel_shop_finder_client = Client(
self.parcel_shop_finder_service_wsdl
)
return self._parcel_shop_finder_client
@property
def token(self):
"""
Returns the authentication token as a soap response
"""
if self._token is None:
self._token = self.get_auth()
return self._token
@property
def soap_headers(self):
"""
Returns the SOAP headers for requests which use token
"""
return [{
'delisId': self.token.delisId,
'authToken': self.token.authToken,
'messageLanguage': self.message_language,
}]
def get_auth(self):
"""
Creates an authentication token for the committed user
if user name and password are valid.
The authentication token is needed for accessing other
DPD Web Services.
Returns an object with the ``auth_code`` attribute and ``depot``
"""
try:
response = self.login_service_client.service.getAuth(
self.username, self.password, self.message_language
)
except WebFault, exc:
raise DPDException(exc.fault, exc.document)
else:
return response
def store_orders(self, print_options, shipment_service_data):
"""
Call storeOrders service
"""
self.shipment_service_client.set_options(soapheaders=self.soap_headers)
try:
response = self.shipment_service_client.service.storeOrders(
print_options, shipment_service_data
)
except WebFault, exc: # pragma: no cover
raise DPDException(exc.fault, exc.document)
else:
return response
def get_depot_data(self, country_code=None, depot=None, zip=None):
"""
Call getDepotData service
"""
self.depot_data_service_client.set_options(
soapheaders=self.soap_headers
)
try:
response = self.depot_data_service_client.service.getDepotData(
country_code, depot, zip
)
except WebFault, exc:
raise DPDException(exc.fault, exc.document)
else:
return response