Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/68 parse extended registers #71

Merged
merged 4 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions smartmeter_datacollector/smartmeter/hdlc_dlms_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
#
import logging
from datetime import datetime, timezone
from typing import Any, List, Optional, Tuple
from typing import Any, List, Optional, Tuple, Union

from gurux_dlms import GXByteBuffer, GXDateTime, GXDLMSClient, GXReplyData
from gurux_dlms.enums import InterfaceType, ObjectType, Security
from gurux_dlms.objects import (GXDLMSCaptureObject, GXDLMSClock, GXDLMSData, GXDLMSObject, GXDLMSPushSetup,
GXDLMSRegister)
from gurux_dlms.objects import (GXDLMSCaptureObject, GXDLMSClock, GXDLMSData, GXDLMSExtendedRegister, GXDLMSObject,
GXDLMSPushSetup, GXDLMSRegister)
from gurux_dlms.secure import GXDLMSSecureClient

from .cosem import Cosem
Expand Down Expand Up @@ -116,7 +116,11 @@ def convert_dlms_bundle_to_reader_data(self, dlms_objects: List[GXDLMSObject],
obis_obj_pairs = {}
for obj in dlms_objects:
try:
obis_obj_pairs[OBISCode.from_string(str(obj.logicalName))] = obj
obis = OBISCode.from_string(str(obj.logicalName))
if obis in obis_obj_pairs:
LOGGER.debug("DLMS object with similar OBIS code '%s' skipped.", obis)
else:
obis_obj_pairs[obis] = obj
except ValueError as ex:
LOGGER.warning("Skipping unparsable DLMS object. (Reason: %s)", ex)

Expand All @@ -140,9 +144,10 @@ def convert_dlms_bundle_to_reader_data(self, dlms_objects: List[GXDLMSObject],

# Extract register data
data_points: List[MeterDataPoint] = []
for obis, obj in filter(lambda o: o[1].getObjectType() == ObjectType.REGISTER, obis_obj_pairs.items()):
for obis, obj in filter(lambda o: o[1].getObjectType() in (ObjectType.REGISTER, ObjectType.EXTENDED_REGISTER),
obis_obj_pairs.items()):
reg_type = self._cosem.get_register(obis)
if reg_type and isinstance(obj, GXDLMSRegister):
if reg_type and (isinstance(obj, GXDLMSRegister) or isinstance(obj, GXDLMSExtendedRegister)):
raw_value = self._extract_register_value(obj)
if raw_value is None:
LOGGER.warning("No value received for %s.", obis)
Expand Down Expand Up @@ -197,7 +202,7 @@ def _extract_value_from_data_object(data_object: GXDLMSData) -> Optional[Any]:
return data_object.getValues()[1]

@staticmethod
def _extract_register_value(register: GXDLMSRegister) -> Optional[Any]:
def _extract_register_value(register: Union[GXDLMSRegister, GXDLMSExtendedRegister]) -> Optional[Any]:
return register.getValues()[1]

@staticmethod
Expand Down
7 changes: 0 additions & 7 deletions smartmeter_datacollector/smartmeter/obis.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,6 @@ def __str__(self) -> str:
def to_gurux_str(self) -> str:
return f"{self.a}.{self.b}.{self.c}.{self.d}.{self.e}.{self.f}"

def is_same_type(self, other: 'OBISCode') -> bool:
"""Compares only A, C, D, E parts of an OBIS code."""
return (self.a == other.a and
self.b == other.b and
self.c == other.c and
self.d == other.d)

@classmethod
def from_string(cls, obis_string: str) -> 'OBISCode':
match = cls.PATTERN.match(obis_string)
Expand Down
13 changes: 13 additions & 0 deletions tests/test_hdlc_dlms_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,19 @@ def test_ignore_not_parsable_data_to_meter_data(self, unencrypted_invalid_data_l
assert isinstance(meter_data, list)
assert len(meter_data) == 5

def test_parse_dlms_data_with_extended_registers(self, unencrypted_extended_register_data: List[bytes]):
cosem = Cosem("fallbackId", register_obis_extended=[RegisterCosem(
# this is not the correct MeterDataPointType for this OBIS code (just for testing)
OBISCode(0, 1, 24, 2, 1, 255), MeterDataPointTypes.ACTIVE_POWER_N.value)])

parser = prepare_parser(unencrypted_extended_register_data, cosem)
dlms_objects = parser.parse_to_dlms_objects()
meter_data = parser.convert_dlms_bundle_to_reader_data(dlms_objects)

assert isinstance(meter_data, list)
assert len(meter_data) == 1
assert meter_data[0].value == 30545


class TestDlmsParserEncrypted:
def test_hdlc_to_dlms_objects_without_pushlist(self, encrypted_data_no_pushlist_lg: List[bytes], cosem_config_lg: Cosem):
Expand Down
10 changes: 10 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,13 @@ def encrypted_data_no_pushlist_lg() -> List[bytes]:
"7E A0 30 CE FF 03 13 86 F8 E0 C0 00 03 00 00 1F 1F FE C7 27 11 0F 74 B7 EF F4 1B 48 F7 47 B6 B6 A2 39 5B 42 BD 61 EA 18 7E D9 A0 99 8B 81 45 44 78 7E")
data = list(map(lambda frag: bytes.fromhex(frag.replace(" ", "")), data_str))
return data


@pytest.fixture
def unencrypted_extended_register_data() -> List[bytes]:
data_str: List[str] = []
data_str.append("7E A0 84 CE FF 03 13 12 8B E6 E7 00 E0 40 00 01 00 00 70 0F 00 04 15 7A 0C 07 E8 06 0B 02 10 00 00 FF 80 00 00 02 0A 01 0A 02 04 12 00 28 09 06 00 0B 19 09 00 FF 0F 02 12 00 00 02 04 12 00 28 09 06 00 0B 19 09 00 FF 0F 01 12 00 00 02 04 12 00 01 09 06 00 01 60 01 00 FF 0F 02 12 00 00 02 04 12 00 01 09 06 00 02 60 01 00 FF 0F 02 12 00 00 02 04 12 00 01 09 06 00 03 60 01 00 FF 0F 02 12 00 00 EB F5 7E")
data_str.append("7E A0 86 CE FF 03 13 9A 9D E0 40 00 02 00 00 75 02 04 12 00 01 09 06 00 04 60 01 00 FF 0F 02 12 00 00 02 04 12 00 04 09 06 00 01 18 02 01 FF 0F 02 12 00 00 02 04 12 00 04 09 06 00 02 18 02 01 FF 0F 02 12 00 00 02 04 12 00 04 09 06 00 03 18 02 01 FF 0F 02 12 00 00 02 04 12 00 04 09 06 00 04 18 02 01 FF 0F 02 12 00 00 09 06 00 0B 19 09 00 FF 09 08 32 34 35 32 31 36 36 32 09 01 00 09 01 00 09 01 00 C4 FD 7E")
data_str.append("7E A0 25 CE FF 03 13 92 6A E0 C0 00 03 00 00 14 05 00 00 77 51 05 00 00 00 00 05 00 00 00 00 05 00 00 00 00 87 59 7E")
data = list(map(lambda frag: bytes.fromhex(frag.replace(" ", "")), data_str))
return data
Loading