Skip to content

Commit

Permalink
moved utility/helper functions to utils.py file
Browse files Browse the repository at this point in the history
  • Loading branch information
akaila-splunk committed Aug 21, 2023
1 parent 2ee5000 commit 21323ea
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 53 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## Version 2.0.0-beta

### Feature updates
* `ensure_binary`, `ensure_str`, `ensure_text` and `assert_regex` utility methods have been migrated from `six.py` to `splunklib/__init__.py`
* `ensure_binary`, `ensure_str`, `ensure_text` and `assert_regex` utility methods have been migrated from `six.py` to `splunklib/utils.py`

### Major changes
* Removed Code specific to Python2
Expand Down
44 changes: 0 additions & 44 deletions splunklib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,49 +30,5 @@ def setup_logging(level, log_format=DEFAULT_LOG_FORMAT, date_format=DEFAULT_DATE
datefmt=date_format)


def ensure_binary(s, encoding='utf-8', errors='strict'):
"""
- `str` -> encoded to `bytes`
- `bytes` -> `bytes`
"""
if isinstance(s, str):
return s.encode(encoding, errors)

if isinstance(s, bytes):
return s

raise TypeError(f"not expecting type '{type(s)}'")


def ensure_str(s, encoding='utf-8', errors='strict'):
"""
- `str` -> `str`
- `bytes` -> decoded to `str`
"""
if isinstance(s, bytes):
return s.decode(encoding, errors)

if isinstance(s, str):
return s

raise TypeError(f"not expecting type '{type(s)}'")


def ensure_text(s, encoding='utf-8', errors='strict'):
"""
- `str` -> `str`
- `bytes` -> decoded to `str`
"""
if isinstance(s, bytes):
return s.decode(encoding, errors)
if isinstance(s, str):
return s
raise TypeError(f"not expecting type '{type(s)}'")


def assertRegex(self, *args, **kwargs):
return getattr(self, "assertRegex")(*args, **kwargs)


__version_info__ = (2, 0, 0)
__version__ = ".".join(map(str, __version_info__))
2 changes: 1 addition & 1 deletion splunklib/modularinput/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from io import TextIOBase
import xml.etree.ElementTree as ET

from splunklib import ensure_text
from splunklib.utils import ensure_text


class Event:
Expand Down
2 changes: 1 addition & 1 deletion splunklib/modularinput/event_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import sys

from splunklib import ensure_str
from splunklib.utils import ensure_str
from .event import ET


Expand Down
8 changes: 5 additions & 3 deletions splunklib/searchcommands/search_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
from urllib.parse import urlsplit
from warnings import warn
from xml.etree import ElementTree
from splunklib.utils import ensure_str


# Relative imports
import splunklib
Expand Down Expand Up @@ -888,7 +890,7 @@ def _read_chunk(istream):
if not header:
return None

match = SearchCommand._header.match(splunklib.ensure_str(header))
match = SearchCommand._header.match(ensure_str(header))

if match is None:
raise RuntimeError(f'Failed to parse transport header: {header}')
Expand All @@ -905,7 +907,7 @@ def _read_chunk(istream):
decoder = MetadataDecoder()

try:
metadata = decoder.decode(splunklib.ensure_str(metadata))
metadata = decoder.decode(ensure_str(metadata))
except Exception as error:
raise RuntimeError(f'Failed to parse metadata of length {metadata_length}: {error}')

Expand All @@ -919,7 +921,7 @@ def _read_chunk(istream):
except Exception as error:
raise RuntimeError(f'Failed to read body of length {body_length}: {error}')

return metadata, splunklib.ensure_str(body,errors="replace")
return metadata, ensure_str(body,errors="replace")

_header = re.compile(r'chunked\s+1.0\s*,\s*(\d+)\s*,\s*(\d+)\s*\n')

Expand Down
60 changes: 60 additions & 0 deletions splunklib/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright © 2011-2023 Splunk, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"): you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""The **splunklib.utils** File for utility functions.
"""


def ensure_binary(s, encoding='utf-8', errors='strict'):
"""
- `str` -> encoded to `bytes`
- `bytes` -> `bytes`
"""
if isinstance(s, str):
return s.encode(encoding, errors)

if isinstance(s, bytes):
return s

raise TypeError(f"not expecting type '{type(s)}'")


def ensure_str(s, encoding='utf-8', errors='strict'):
"""
- `str` -> `str`
- `bytes` -> decoded to `str`
"""
if isinstance(s, bytes):
return s.decode(encoding, errors)

if isinstance(s, str):
return s

raise TypeError(f"not expecting type '{type(s)}'")


def ensure_text(s, encoding='utf-8', errors='strict'):
"""
- `str` -> `str`
- `bytes` -> decoded to `str`
"""
if isinstance(s, bytes):
return s.decode(encoding, errors)
if isinstance(s, str):
return s
raise TypeError(f"not expecting type '{type(s)}'")


def assertRegex(self, *args, **kwargs):
return getattr(self, "assertRegex")(*args, **kwargs)
7 changes: 4 additions & 3 deletions tests/searchcommands/test_search_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,16 @@
from splunklib.searchcommands.decorators import ConfigurationSetting, Option
from splunklib.searchcommands.search_command import SearchCommand
from splunklib.client import Service
from splunklib.utils import ensure_binary

from io import StringIO, BytesIO


def build_command_input(getinfo_metadata, execute_metadata, execute_body):
input = (f'chunked 1.0,{len(splunklib.ensure_binary(getinfo_metadata))},0\n{getinfo_metadata}' +
f'chunked 1.0,{len(splunklib.ensure_binary(execute_metadata))},{len(splunklib.ensure_binary(execute_body))}\n{execute_metadata}{execute_body}')
input = (f'chunked 1.0,{len(ensure_binary(getinfo_metadata))},0\n{getinfo_metadata}' +
f'chunked 1.0,{len(ensure_binary(execute_metadata))},{len(ensure_binary(execute_body))}\n{execute_metadata}{execute_body}')

ifile = BytesIO(splunklib.ensure_binary(input))
ifile = BytesIO(ensure_binary(input))

ifile = TextIOWrapper(ifile)

Expand Down

0 comments on commit 21323ea

Please sign in to comment.