Skip to content

Commit 64c28ef

Browse files
authored
Merge pull request #2797 from massooti/feat-timeseries
feat: Add Timeseries Collection Support
2 parents 95f8ba1 + 117664c commit 64c28ef

File tree

3 files changed

+237
-0
lines changed

3 files changed

+237
-0
lines changed

mongoengine/document.py

+23
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,8 @@ def _get_collection(cls):
220220
# Get the collection, either capped or regular.
221221
if cls._meta.get("max_size") or cls._meta.get("max_documents"):
222222
cls._collection = cls._get_capped_collection()
223+
elif cls._meta.get("timeseries"):
224+
cls._collection = cls._get_timeseries_collection()
223225
else:
224226
db = cls._get_db()
225227
collection_name = cls._get_collection_name()
@@ -271,6 +273,27 @@ def _get_capped_collection(cls):
271273

272274
return db.create_collection(collection_name, **opts)
273275

276+
@classmethod
277+
def _get_timeseries_collection(cls):
278+
"""Create a new or get an existing timeseries PyMongo collection."""
279+
db = cls._get_db()
280+
collection_name = cls._get_collection_name()
281+
timeseries_opts = cls._meta.get("timeseries")
282+
283+
if collection_name in list_collection_names(
284+
db, include_system_collections=True
285+
):
286+
collection = db[collection_name]
287+
collection.options()
288+
return collection
289+
290+
opts = {"expireAfterSeconds": timeseries_opts.pop("expireAfterSeconds", None)}
291+
return db.create_collection(
292+
name=collection_name,
293+
timeseries=timeseries_opts,
294+
**opts,
295+
)
296+
274297
def to_mongo(self, *args, **kwargs):
275298
data = super().to_mongo(*args, **kwargs)
276299

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
import unittest
2+
from datetime import datetime, timedelta
3+
4+
from mongoengine import (
5+
DateTimeField,
6+
Document,
7+
FloatField,
8+
StringField,
9+
connect,
10+
get_db,
11+
)
12+
from mongoengine.connection import disconnect
13+
from tests.utils import requires_mongodb_gte_50
14+
15+
16+
class TestTimeSeriesCollections(unittest.TestCase):
17+
def setUp(self):
18+
connect(db="mongoenginetest")
19+
self.db = get_db()
20+
21+
class SensorData(Document):
22+
timestamp = DateTimeField(required=True)
23+
temperature = FloatField()
24+
25+
meta = {
26+
"timeseries": {
27+
"timeField": "timestamp",
28+
"metaField": "temperature",
29+
"granularity": "seconds",
30+
"expireAfterSeconds": 5,
31+
},
32+
"collection": "sensor_data",
33+
}
34+
35+
self.SensorData = SensorData
36+
37+
def test_get_db(self):
38+
"""Ensure that get_db returns the expected db."""
39+
db = self.SensorData._get_db()
40+
assert self.db == db
41+
42+
def tearDown(self):
43+
for collection_name in self.db.list_collection_names():
44+
if not collection_name.startswith("system."):
45+
self.db.drop_collection(collection_name)
46+
disconnect()
47+
48+
def test_definition(self):
49+
"""Ensure that document may be defined using fields."""
50+
assert ["id", "temperature", "timestamp"] == sorted(
51+
self.SensorData._fields.keys()
52+
)
53+
assert ["DateTimeField", "FloatField", "ObjectIdField"] == sorted(
54+
x.__class__.__name__ for x in self.SensorData._fields.values()
55+
)
56+
57+
@requires_mongodb_gte_50
58+
def test_get_collection(self):
59+
"""Ensure that get_collection returns the expected collection."""
60+
collection_name = "sensor_data"
61+
collection = self.SensorData._get_collection()
62+
assert self.db[collection_name] == collection
63+
64+
@requires_mongodb_gte_50
65+
def test_create_timeseries_collection(self):
66+
"""Ensure that a time-series collection can be created."""
67+
collection_name = self.SensorData._get_collection_name()
68+
collection = self.SensorData._get_collection()
69+
70+
assert collection_name in self.db.list_collection_names()
71+
72+
options = collection.options()
73+
74+
assert options.get("timeseries") is not None
75+
assert options["timeseries"]["timeField"] == "timestamp"
76+
assert options["timeseries"]["granularity"] == "seconds"
77+
78+
@requires_mongodb_gte_50
79+
def test_insert_document_into_timeseries_collection(self):
80+
"""Ensure that a document can be inserted into a time-series collection."""
81+
collection_name = self.SensorData._get_collection_name()
82+
collection = self.SensorData._get_collection()
83+
assert collection_name in self.db.list_collection_names()
84+
85+
# Insert a document and ensure it was inserted
86+
self.SensorData(timestamp=datetime.utcnow(), temperature=23.4).save()
87+
assert collection.count_documents({}) == 1
88+
89+
@requires_mongodb_gte_50
90+
def test_timeseries_expiration(self):
91+
"""Ensure that documents in a time-series collection expire after the specified time."""
92+
93+
self.SensorData._meta["timeseries"]["expireAfterSeconds"] = 1
94+
self.SensorData._get_collection_name()
95+
collection = self.SensorData._get_collection()
96+
options = collection.options()
97+
assert options.get("timeseries", {}) is not None
98+
assert options["expireAfterSeconds"] == 1
99+
100+
self.SensorData(timestamp=datetime.utcnow(), temperature=23.4).save()
101+
102+
assert collection.count_documents({}) == 1
103+
104+
# Wait for more than the expiration time
105+
import time
106+
107+
time.sleep(3)
108+
assert collection.count_documents({}) > 0
109+
110+
@requires_mongodb_gte_50
111+
def test_index_creation(self):
112+
"""Test if the index defined in the meta dictionary is created properly."""
113+
114+
# Define the Document with indexes
115+
class SensorDataWithIndex(Document):
116+
timestamp = DateTimeField(required=True)
117+
temperature = FloatField()
118+
location = StringField() # Field to be indexed
119+
120+
meta = {
121+
"timeseries": {
122+
"timeField": "timestamp",
123+
"metaField": "temperature",
124+
"granularity": "seconds",
125+
"expireAfterSeconds": 5,
126+
},
127+
"collection": "sensor_data",
128+
"indexes": [
129+
{"fields": ["timestamp"], "name": "timestamp_index"},
130+
{"fields": ["temperature"], "name": "temperature_index"},
131+
],
132+
}
133+
134+
collection = SensorDataWithIndex._get_collection()
135+
136+
indexes = collection.index_information()
137+
138+
assert (
139+
"timestamp_index" in indexes
140+
), "Index on 'timestamp' field was not created"
141+
assert (
142+
"temperature_index" in indexes
143+
), "Index on 'temperature' field was not created"
144+
145+
@requires_mongodb_gte_50
146+
def test_timeseries_data_insertion_order(self):
147+
"""Ensure that data in the time-series collection is inserted and queried in the correct time order."""
148+
self.SensorData._get_collection_name()
149+
self.SensorData._get_collection()
150+
151+
# Insert documents out of order
152+
now = datetime.utcnow()
153+
self.SensorData(timestamp=now, temperature=23.4).save()
154+
self.SensorData(timestamp=now - timedelta(seconds=5), temperature=22.0).save()
155+
self.SensorData(timestamp=now + timedelta(seconds=5), temperature=24.0).save()
156+
157+
documents = list(self.SensorData.objects.order_by("timestamp"))
158+
159+
# Check the insertion order
160+
assert len(documents) == 3
161+
assert documents[0].temperature == 22.0 # Earliest document
162+
assert documents[1].temperature == 23.4 # Middle document
163+
assert documents[2].temperature == 24.0 # Latest document
164+
165+
@requires_mongodb_gte_50
166+
def test_timeseries_query_by_time_range(self):
167+
"""Ensure that data can be queried by a specific time range in the time-series collection."""
168+
169+
self.SensorData._get_collection_name()
170+
self.SensorData._get_collection()
171+
172+
now = datetime.utcnow()
173+
self.SensorData(timestamp=now - timedelta(seconds=10), temperature=22.0).save()
174+
self.SensorData(timestamp=now - timedelta(seconds=5), temperature=23.0).save()
175+
self.SensorData(timestamp=now, temperature=24.0).save()
176+
177+
# Query documents within the last 6 seconds
178+
start_time = now - timedelta(seconds=6)
179+
documents = self.SensorData.objects(timestamp__gte=start_time)
180+
181+
assert len(documents) == 2
182+
assert documents[0].temperature == 23.0
183+
assert documents[1].temperature == 24.0
184+
185+
@requires_mongodb_gte_50
186+
def test_timeseries_large_data_volume(self):
187+
"""Ensure that the time-series collection can handle a large volume of data insertion."""
188+
189+
self.SensorData._get_collection_name()
190+
collection = self.SensorData._get_collection()
191+
192+
for i in range(10000):
193+
self.SensorData(
194+
timestamp=datetime.utcnow() - timedelta(seconds=i),
195+
temperature=20.0 + i % 5,
196+
).save()
197+
198+
assert collection.count_documents({}) == 10000
199+
200+
201+
if __name__ == "__main__":
202+
unittest.main()

tests/utils.py

+12
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,18 @@ def requires_mongodb_gte_44(func):
4545
return _decorated_with_ver_requirement(func, (4, 4), oper=operator.ge)
4646

4747

48+
def requires_mongodb_gte_50(func):
49+
return _decorated_with_ver_requirement(func, (5, 0), oper=operator.ge)
50+
51+
52+
def requires_mongodb_gte_60(func):
53+
return _decorated_with_ver_requirement(func, (6, 0), oper=operator.ge)
54+
55+
56+
def requires_mongodb_gte_70(func):
57+
return _decorated_with_ver_requirement(func, (7, 0), oper=operator.ge)
58+
59+
4860
def _decorated_with_ver_requirement(func, mongo_version_req, oper):
4961
"""Return a MongoDB version requirement decorator.
5062

0 commit comments

Comments
 (0)