Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of security ID check #1489

Open
fcarli3 opened this issue Oct 24, 2022 · 5 comments
Open

Implementation of security ID check #1489

fcarli3 opened this issue Oct 24, 2022 · 5 comments

Comments

@fcarli3
Copy link

fcarli3 commented Oct 24, 2022

I'm using python opcua server and client and I'm testing all the possible security features about authentication and communication. As the specs says, there is no implementation about security ID check, then even if I call the method set_security_ID, anonymous connections are still accepted.
Is there a way to implement security ID check on a python opcua server ? Any suggestions ?

@AndreasHeine
Copy link
Member

image

but years ago i wrote something, but its not for production use, so no warranties for that old deprecated stuff:

try:
    from opcua import ua, uamethod, Server
    from opcua.server.user_manager import UserManager
    from time import sleep
except ImportError as e:
    print(e)

users_db =  {
                'user1': 'pw1'
            }

def user_manager(isession, username, password):
    isession.user = UserManager.User
    return username in users_db and password == users_db[username]

@uamethod
def myMethod(parent, rfid):
    print("method call with parameters: ", rfid)
    Out1 = rfid
    Out2 = 12345
    return  (
                ua.Variant(Out1, ua.VariantType.Int64),
                ua.Variant(Out2, ua.VariantType.Int64)
            )

if __name__ == "__main__":
    """
    OPC-UA-Server Setup
    """
    server = Server()

    endpoint = "opc.tcp://127.0.0.1:4840"
    server.set_endpoint(endpoint)

    servername = "Python-OPC-UA"
    server.set_server_name(servername)
    address_space = server.register_namespace("http://andreas-heine.net/UA")
    
    uri = "urn:opcua:python:server"
    server.set_application_uri(uri)
    
    server.load_certificate("certificate.pem")
    server.load_private_key("key.pem")
    server.set_security_policy([
                                    # ua.SecurityPolicyType.NoSecurity,
                                    # ua.SecurityPolicyType.Basic128Rsa15_Sign,
                                    # ua.SecurityPolicyType.Basic128Rsa15_SignAndEncrypt,
                                    # ua.SecurityPolicyType.Basic256Sha256_Sign,
                                    ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt
                                ])
    policyIDs = ["Username"]
    server.set_security_IDs(policyIDs)
    server.user_manager.set_user_manager(user_manager)

    """
    OPC-UA-Modeling
    """
    root_node = server.get_root_node()
    object_node = server.get_objects_node()
    server_node = server.get_server_node()

    try:
        server.import_xml("custom_nodes.xml")
    except FileNotFoundError:
        pass
    except Exception as e:
        print(e)

    servicelevel_node = server.get_node("ns=0;i=2267") #Service-Level Node
    servicelevel_value = 255 #0-255 [Byte]
    servicelevel_dv = ua.DataValue(ua.Variant(servicelevel_value, ua.VariantType.Byte))
    servicelevel_node.set_value(servicelevel_dv)

    parameter_obj = server.nodes.objects.add_object(address_space, "Parameter")
    token_node = parameter_obj.add_variable(address_space, "token", ua.Variant(0, ua.VariantType.UInt32))
    token_node.set_writable() #if clients should be able to write

    myobj = server.nodes.objects.add_object(address_space, "Methods")
    multiply_node = myobj.add_method(   address_space, 
                                        "myMethod", 
                                        myMethod, 
                                        [
                                            #Input-Arguments:
                                            ua.VariantType.Int64
                                        ], 
                                        [
                                            #Output-Arguments:
                                            ua.VariantType.Int64,
                                            ua.VariantType.Int64
                                        ]
                                    )

    """
    OPC-UA-Server Start
    """
    server.start()

    try:
        while 1:
            sleep(1)
    except KeyboardInterrupt:
        server.stop()

@fcarli3
Copy link
Author

fcarli3 commented Oct 24, 2022

I already tried this solution and it doesn’t work properly, because anonymous connection is stil possibile in this way.

@AndreasHeine
Copy link
Member

can you describe a little more detailed how you achieve that?

@schroeder-
Copy link
Contributor

Was fixed in #1458, but it think there was no new pip release, so you have to use the current master.

@fcarli3
Copy link
Author

fcarli3 commented Oct 24, 2022

can you describe a little more detailed how you achieve that?

This my server:

from opcua import Server, ua
from opcua.server.user_manager import UserManager
import sys
from time import sleep
sys.path.insert(0, "..")


users_db = {
    'user1': 'psw1'
}

endpoint = 'opc.tcp://127.0.0.1:49580'


def parsing_parameters():
    l = len(sys.argv)
    port = 49580
    proto = "opc.tcp"
    address = ''

    # Script with no parameters
    if (l == 1):
        print('''\n	Usage:

            python3 server_usr_psw.py -a <Server_Address> -p <Server_Port>

            -a
                opcua server IP address

            -p
                opcua server port (default 49580)

            ''')
        exit()

    for i in range(1, l):
        if (sys.argv[i] == '-a' and i < l):
            server_addr = sys.argv[i+1]
        elif (sys.argv[i] == '-p' and i < l):
            port = sys.argv[i+1]
        elif ((sys.argv[i] == '--help' or sys.argv[i] == '-h') and i <= l):
            print('''\n	Usage:

                    python3 server_usr_psw.py -a <Server_Address> -p <Server_Port>

                -a
                    opcua server IP address

                -p
                    opcua server port (default 49580)

                ''')
            exit()
        else:
            param = sys.argv[i].split('-')
            if (param is not None and len(param) > 1):
                print('''\n	Wrong parameters! Use --help or -h for help\n
                    Usage:

                        python3 server_usr_psw.py -a <Server_Address> -p <Server_Port>

                    -a
                        opcua server IP address

                    -p
                        opcua server port (default 49580)

                    ''')
                exit()
            else:
                pass

    address = f'{proto}://{server_addr}:{port}'

    return address


def create_nodes(server_objects):
    # Some variables to manage the nodes
    node_list_1_lvl = []
    node_list_2_lvl = []
    node_list_3_lvl = []

    # Creating 10 1st level nodes
    for i in range(0, 10):
        new_name = f'node_lvl1_{i}'
        new_node = server_objects.add_object(f'ns=2;s="{new_name}"', new_name)
        node_list_1_lvl.append(new_node)

    # Creating 20 node for each 10 fist level node
    w = 1
    for node_lvl1 in node_list_1_lvl:
        for j in range(0, 5):
            w = w + j + 1
            new_name = f'node_lvl2_{w}'
            new_node = node_lvl1.add_object(f'ns=2;s="{new_name}"', new_name)
            node_list_2_lvl.append(new_node)

    # Creating 40 node for each 20 fist level node
    p = 1
    for node_lvl2 in node_list_2_lvl:
        for k in range(0, 5):
            p = p + k + 1
            new_name = f'node_lvl3_{p}'
            new_node = node_lvl2.add_object(f'ns=2;s="{new_name}"', new_name)
            node_list_3_lvl.append(new_node)

    # Adding variables to level 3 nodes
    s = 0
    for node in node_list_3_lvl:
        name = f'var_{s}'
        node.add_variable(f'ns=2;s="{name}"', name, 16)
        s = s + 1


def user_manager(isession, username, password):
    isession.user = UserManager.User

    return username in users_db and password == users_db[username]


def main():

    # UserManager handles the authentication between the server and the client
    # In this case, the server uses username and password to authenticate the client

    endpoint = parsing_parameters()

    server = Server()

    server.set_endpoint(endpoint)
    server.register_namespace("2")

    servername = "Python-OPC-UA"
    server.set_server_name(servername)

    # Load server certificate and private key.
    server.load_certificate("cert/server_certificate.der")
    server.load_private_key("cert/server_private_key.pem")

    # Security policies for message communication (e.g. plaintext, sign, sign and encrypt)
    # This method set the security policies of the server regarding the communication with the client
    # This server accepts only signed and encrypted communications
    server.set_security_policy([
        ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt, ua.SecurityPolicyType.Basic256Sha256_Sign])

    # Policies for authentication (e.g. anonymous, username and password, certificate)
    # This server only accepts username and password as way to authenticate the client
    policyIDs = ["Username"]
    server.set_security_IDs(policyIDs)
    server.user_manager.set_user_manager(user_manager)

    # Populating address space
    object = server.get_objects_node()
    create_nodes(object)

    server.start()

    try:
        while 1:
            sleep(1)
    except KeyboardInterrupt:
        print("Server stopped")
        server.stop()


if __name__ == "__main__":
    main()

This is the client:

from asyncua.crypto.security_policies import SecurityPolicyBasic256Sha256
from opcua import Client
import sys
sys.path.insert(0, "..")

client_cert = "cert/client_certificate.der"
server_cert = "cert/server_certificate.der"
client_private_key = "cert/client_private_key.pem"
url = ''


def parsing_parameters():
    l = len(sys.argv)
    port = 49580
    proto = "opc.tcp"
    address = ''

    # Script with no parameters
    if (l == 1):
        print('''\n	Usage:

            python3 client_usr_psw.py -a <Server_Address> -p <Server_Port>

            -a
                opcua server IP address 

            -p
                opcua server port (default 49580)

            ''')
        exit()

    for i in range(1, l):
        if (sys.argv[i] == '-a' and i < l):
            server_addr = sys.argv[i+1]
        elif (sys.argv[i] == '-p' and i < l):
            port = sys.argv[i+1]
        elif ((sys.argv[i] == '--help' or sys.argv[i] == '-h') and i <= l):
            print('''\n	Usage:

                    python3 client_usr_psw.py -a <Server_Address> -p <Server_Port>

                -a
                    opcua server IP address 

                -p
                    opcua server port (default 49580)

                ''')
            exit()
        else:
            param = sys.argv[i].split('-')
            if (param is not None and len(param) > 1):
                print('''\n	Wrong parameters! Use --help or -h for help\n
                    Usage:

                        python3 client_usr_psw.py -a <Server_Address> -p <Server_Port>

                    -a
                        opcua server IP address 

                    -p
                        opcua server port (default 49580)

                    ''')
                exit()
            else:
                pass

    address = f'{proto}://{server_addr}:{port}'

    return address


def browse_server(level, node):

    for i in range(1, level):
        print("\t", end='')

    print(node.get_browse_name().Name, end='')
    print(" ---> ", end='')
    print("Node ID: ", node, end='')

    # If the node is a variable, print the value
    if (node.get_node_class() == 2):
        print(" TYPE:", str(node.get_data_type_as_variant_type()).strip(
            "VariantType."), "VALUE: ", node.get_value())
        print("\n")
    else:
        print("\n")

    for child in node.get_children():
        browse_server(level + 1, child)


# Function that show all the node of opcua server
def variable_browsing(object_root):

    childs = object_root.get_children()
    dimc = len(childs)

    # Start from index 1 because 0 is the Server node
    for i in range(1, dimc):
        browse_server(1, childs[i])
        print("_"*100, '\n')


def main():
    url = parsing_parameters()
    client = Client(url=url)

    # This method specifies the client security policy that will be used in the communication with the server
    # If the client wants to communicate in plaintext and if the server supports this mode, this method is not necessary
    client.set_security_string(
        f"Basic256Sha256,SignAndEncrypt,{client_cert},{client_private_key},{server_cert}")
    client.set_user("user1")
    client.set_password("psw1")

    print("Connecting to server at {}".format(url))

    client.connect()

    root = client.get_root_node()
    print("Object node is: ", root)
    print('Children of root are ===> ', root.get_children())
    objects = client.get_objects_node()
    children = objects.get_children()
    print('Children of objects are ===> ', children)
    print()

    variable_browsing(objects)

    client.disconnect()


if __name__ == "__main__":
    main()

I tried to start the server as it is, with only Username as security ID, and then I started the client without setting username and password. I think the client shouldn't be able to connect to the server in this way, but it connects in any case and get all the server information it requires.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants