-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathhes_datasets.py
148 lines (109 loc) · 4.3 KB
/
hes_datasets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
"""HES Datasets
Provides variables for connecting to HES datasets, and methods for working with these datasets.
- `hes_apc`: the main admitted patient care dataset
- `diagnoses`: the diagnoses for the `hes_apc` dataset
- `procedures`: the procedures for the `hes_apc` dataset
- `nhp_apc`: the view of `hes_apc` used for the NHP model
"""
from typing import Callable
from databricks.connect import DatabricksSession
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
spark = DatabricksSession.builder.getOrCreate()
hes_apc = spark.read.table("hes.silver.apc")
diagnoses = spark.read.table("hes.silver.apc_diagnoses")
procedures = spark.read.table("hes.silver.apc_procedures")
nhp_apc = spark.read.table("su_data.nhp.apc")
def combine_into_regex(*args) -> str:
"""Combine into a single regex
Takes a variable amount of arguments and combines into a single regex.
Each argument is concatenated together with a |
.. code-block:: python
combine_into_regex("a", "b", "c(1|2)")
# "^(a|b|c(1|2))"
:param *args: strings to combine into a single regex
:type *args: string
:return: A single regex pattern
:rtype: string
"""
return f"^({'|'.join(args)})"
def primary_diagnosis(codes: str) -> DataFrame:
"""Filter for primary diagnosis
:param codes: a regex for the diagnosis codes to search for
:type codes: string
:return: Filtered diagnosis DataFrame
:rtype: DataFrame
"""
return any_diagnosis(codes).filter(F.col("diag_order") == 1)
def secondary_diagnosis(codes: str) -> DataFrame:
"""Filter for secondary diagnosis
:param codes: a regex for the diagnosis codes to search for
:type codes: string
:return: Filtered diagnosis DataFrame
:rtype: DataFrame
"""
return any_diagnosis(codes).filter(F.col("diag_order") > 1)
def any_diagnosis(codes: str) -> DataFrame:
"""Filter for a diagnosis
:param codes: a regex for the diagnosis codes to search for
:type codes: string
:return: Filtered diagnosis DataFrame
:rtype: DataFrame
"""
return diagnoses.filter(F.col("diagnosis").rlike(codes))
def primary_procedure(codes: str) -> DataFrame:
"""Filter for primary procedure
:param codes: a regex for the procedure codes to search for
:type codes: string
:return: Filtered procedures DataFrame
:rtype: DataFrame
"""
return any_procedure(codes).filter(F.col("procedure_order") == 1)
def secondary_procedure(codes: str) -> DataFrame:
"""Filter for secondary procedure
:param codes: a regex for the procedure codes to search for
:type codes: string
:return: Filtered procedures DataFrame
:rtype: DataFrame
"""
return any_procedure(codes).filter(F.col("procedure_order") > 1)
def any_procedure(codes: str) -> DataFrame:
"""Filter for a procedure
:param codes: a regex for the procedure codes to search for
:type codes: string
:return: Filtered procedures DataFrame
:rtype: DataFrame
"""
return procedures.filter(F.col("procedure_code").rlike(codes))
def _admission_has(
df: DataFrame, filter_function: Callable[[str], DataFrame], *args: str
) -> DataFrame:
"""Filter admissions where they have...
:param df: the data we want to filter
:type df: DataFrame
:param filter_function: the function to filter the df by
:type filter_function: Callable[[str], DataFrame]
:param *args: a list of codes which will be combined into a single regex
:type *args: str
:return: Filtered DataFrame
:rtype: DataFrame
"""
regex = combine_into_regex(*args)
return df.join(filter_function(regex), ["epikey", "fyear"], "semi")
def _admission_not(
df: DataFrame, filter_function: Callable[[str], DataFrame], *args: str
) -> DataFrame:
"""Filter admissions where they don't have...
:param df: the data we want to filter
:type df: DataFrame
:param filter_function: the function to filter the df by
:type filter_function: Callable[[str], DataFrame]
:param *args: a list of codes which will be combined into a single regex
:type *args: str
:return: Filtered DataFrame
:rtype: DataFrame
"""
regex = combine_into_regex(*args)
return df.join(filter_function(regex), ["epikey", "fyear"], "anti")
DataFrame.admission_has = _admission_has
DataFrame.admission_not = _admission_not