Skip to content

Commit fd156d5

Browse files
authored
Merge pull request #103 from PowerLoom/feat/cron_restart_cleanup
Feat/cron restart cleanup
2 parents 47474c6 + 54b6e9e commit fd156d5

File tree

10 files changed

+201
-29
lines changed

10 files changed

+201
-29
lines changed

build.sh

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ echo "🏗️ Building image with tag ${IMAGE_TAG}"
2222

2323
# Run collector test
2424
if [ "$NO_COLLECTOR" = "true" ]; then
25-
echo "�️ Skipping collector check (--no-collector flag)"
25+
echo "🤔 Skipping collector check (--no-collector flag)"
2626
COLLECTOR_PROFILE_STRING=""
2727
else
2828
./collector_test.sh --env-file ".env-${FULL_NAMESPACE}"
@@ -43,12 +43,33 @@ fi
4343
PROJECT_NAME="snapshotter-lite-v2-${SLOT_ID}-${FULL_NAMESPACE}"
4444
PROJECT_NAME_LOWER=$(echo "$PROJECT_NAME" | tr '[:upper:]' '[:lower:]')
4545
FULL_NAMESPACE_LOWER=$(echo "$FULL_NAMESPACE" | tr '[:upper:]' '[:lower:]')
46+
export CRON_RESTART=${CRON_RESTART:-false}
4647

4748
# Export the lowercase version for docker-compose
4849
export FULL_NAMESPACE_LOWER
4950

50-
# Run deployment with the correct env file
51+
# Check if running in Windows Subsystem for Linux (WSL)
52+
check_wsl() {
53+
if grep -qi microsoft /proc/version; then
54+
echo "🐧🪆 Running in WSL environment"
55+
return 0 # true in shell
56+
fi
57+
return 1 # false in shell
58+
}
59+
60+
# Configure Docker Compose profiles based on WSL environment
61+
if check_wsl; then
62+
# WSL environment - disable autoheal
63+
COMPOSE_PROFILES="--profile local-collector"
64+
export AUTOHEAL_LABEL=""
65+
else
66+
# Non-WSL environment - enable autoheal
67+
COMPOSE_PROFILES="--profile local-collector --profile autoheal"
68+
export AUTOHEAL_LABEL="autoheal=true"
69+
fi
70+
71+
# Modify the deploy-services call to use the profiles
5172
./deploy-services.sh --env-file ".env-${FULL_NAMESPACE}" \
5273
--project-name "$PROJECT_NAME_LOWER" \
53-
--collector-profile "$COLLECTOR_PROFILE_STRING" \
74+
--collector-profile "$COMPOSE_PROFILES" \
5475
--image-tag "$IMAGE_TAG"

configure-environment.sh

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ while [[ $# -gt 0 ]]; do
2626
NO_COLLECTOR=true
2727
shift
2828
;;
29+
--cron-restart)
30+
CRON_RESTART_FLAG=true
31+
shift
32+
;;
2933
*)
3034
shift
3135
;;
@@ -96,6 +100,7 @@ export PROST_CHAIN_ID=7865
96100
export POWERLOOM_CHAIN=mainnet
97101
export SOURCE_CHAIN=ETH
98102
export FULL_NAMESPACE="${POWERLOOM_CHAIN}-${NAMESPACE}-${SOURCE_CHAIN}"
103+
export CRON_RESTART=${CRON_RESTART_FLAG:-false}
99104

100105
# Environment file management
101106
if [ ! -f ".env-${FULL_NAMESPACE}" ]; then
@@ -130,6 +135,7 @@ if [ ! -f ".env-${FULL_NAMESPACE}" ]; then
130135
sed -i".backup" "s#<prost-rpc-url>#$PROST_RPC_URL#" ".env-${FULL_NAMESPACE}"
131136
sed -i".backup" "s#<prost-chain-id>#$PROST_CHAIN_ID#" ".env-${FULL_NAMESPACE}"
132137
sed -i".backup" "s#<docker-network-name>#$DOCKER_NETWORK_NAME#" ".env-${FULL_NAMESPACE}"
138+
sed -i".backup" "s#^CRON_RESTART=.*#CRON_RESTART=$CRON_RESTART#" ".env-${FULL_NAMESPACE}"
133139
echo "🟢 .env-${FULL_NAMESPACE} file created successfully."
134140
else
135141
echo "🟢 .env-${FULL_NAMESPACE} file found."

docker-compose.yaml

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,14 @@
11
services:
2+
autoheal:
3+
profiles: ["autoheal"]
4+
image: willfarrell/autoheal
5+
container_name: autoheal
6+
restart: always
7+
environment:
8+
- AUTOHEAL_CONTAINER_LABEL=autoheal
9+
volumes:
10+
# Needs access to docker socket to monitor containers
11+
- /var/run/docker.sock:/var/run/docker.sock
212
snapshotter-lite-local-collector:
313
image: ghcr.io/powerloom/snapshotter-lite-local-collector:${IMAGE_TAG}
414
container_name: snapshotter-lite-local-collector-${SLOT_ID}-${FULL_NAMESPACE}
@@ -65,18 +75,26 @@ services:
6575
- SNAPSHOT_CONFIG_REPO_BRANCH=$SNAPSHOT_CONFIG_REPO_BRANCH
6676
- SNAPSHOTTER_COMPUTE_REPO=$SNAPSHOTTER_COMPUTE_REPO
6777
- SNAPSHOTTER_COMPUTE_REPO_BRANCH=$SNAPSHOTTER_COMPUTE_REPO_BRANCH
78+
- CRON_RESTART=$CRON_RESTART
6879
healthcheck:
6980
test: ["CMD", "curl", "-f", "http://localhost:8002/health"]
7081
interval: 10s
7182
timeout: 5s
72-
retries: 3
83+
retries: 2
7384
start_period: 30s
74-
command:
75-
bash -c "bash init_docker.sh"
85+
command: >
86+
bash -c "
87+
rm -f /app/last_successful_submission.txt;
88+
trap 'rm -f /app/last_successful_submission.txt' SIGTERM;
89+
bash init_docker.sh;
90+
"
7691
extra_hosts:
7792
- "host.docker.internal:host-gateway"
7893
networks:
7994
- custom_network
95+
restart: on-failure:10
96+
labels:
97+
- "${AUTOHEAL_LABEL:-}"
8098
networks:
8199
custom_network:
82100
name: ${DOCKER_NETWORK_NAME}

env.example

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ SUBNET_THIRD_OCTET=1
2222
MAX_STREAM_POOL_SIZE=2
2323
STREAM_POOL_HEALTH_CHECK_INTERVAL=30
2424
DATA_MARKET_IN_REQUEST=true
25+
CRON_RESTART=false
2526
# Optional
2627
IPFS_URL=
2728
IPFS_API_KEY=

pm2.config.js

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// this means if app restart {MAX_RESTART} times in 1 min then it stops
22
const NODE_ENV = process.env.NODE_ENV || 'development';
3-
3+
const CRON_RESTART = process.env.CRON_RESTART || 'false';
44
const MAX_RESTART = 3;
55
const MIN_UPTIME = 60000;
66

@@ -29,8 +29,13 @@ module.exports = {
2929
env: {
3030
NODE_ENV: NODE_ENV,
3131
},
32-
cron_restart: "0 * * * *",
33-
autorestart: true
32+
...(CRON_RESTART === 'true' ? { cron_restart: "0 * * * *" } : {}),
33+
autorestart: true,
34+
kill_timeout: 5000,
35+
stop_exit_codes: [0, 143],
36+
treekill: true,
37+
listen_timeout: 10000,
3438
},
3539
]
3640
}
41+

snapshotter/core_api.py

Lines changed: 94 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
from fastapi import FastAPI
23
from fastapi import Request
34
from fastapi import Response
@@ -7,13 +8,19 @@
78
from ipfs_client.main import AsyncIPFSClientSingleton
89
from pydantic import Field
910
from web3 import Web3
11+
import asyncio
12+
import time
13+
from pathlib import Path
14+
from httpx import AsyncClient, Limits, Timeout, AsyncHTTPTransport
1015

1116
from snapshotter.settings.config import settings
17+
from snapshotter.utils.callback_helpers import send_telegram_notification_async
1218
from snapshotter.utils.data_utils import get_project_epoch_snapshot
1319
from snapshotter.utils.data_utils import get_project_finalized_cid
1420
from snapshotter.utils.default_logger import logger
1521
from snapshotter.utils.file_utils import read_json_file
16-
from snapshotter.utils.models.data_models import TaskStatusRequest
22+
from snapshotter.utils.models.data_models import SnapshotterIssue, SnapshotterReportState, SnapshotterStatus, TaskStatusRequest
23+
from snapshotter.utils.models.message_models import TelegramSnapshotterReportMessage
1724
from snapshotter.utils.rpc import RpcHelper
1825

1926

@@ -44,6 +51,56 @@
4451
)
4552

4653

54+
async def check_last_submission():
55+
while True:
56+
try:
57+
submission_file = Path('last_successful_submission.txt')
58+
if submission_file.exists():
59+
last_timestamp = int(submission_file.read_text().strip())
60+
current_time = int(time.time())
61+
62+
# If more than 5 minutes have passed since last submission
63+
if current_time - last_timestamp > 300:
64+
rest_logger.error(
65+
'No successful submission in the last 5 minutes. Last submission: {}',
66+
time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(last_timestamp))
67+
)
68+
# Send Telegram notification
69+
if settings.reporting.telegram_url and settings.reporting.telegram_chat_id:
70+
notification_message = SnapshotterIssue(
71+
instanceID=settings.instance_id,
72+
issueType=SnapshotterReportState.UNHEALTHY_EPOCH_PROCESSING.value,
73+
projectID='',
74+
epochId='',
75+
timeOfReporting=str(time.time()),
76+
extra=json.dumps({
77+
'issueDetails': f'No successful submission in the last 5 minutes. Last submission: {time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(last_timestamp))}'
78+
}),
79+
)
80+
81+
telegram_message = TelegramSnapshotterReportMessage(
82+
chatId=settings.reporting.telegram_chat_id,
83+
slotId=settings.slot_id,
84+
issue=notification_message,
85+
status=SnapshotterStatus(
86+
projects=[],
87+
totalMissedSubmissions=0,
88+
consecutiveMissedSubmissions=0,
89+
),
90+
)
91+
92+
await send_telegram_notification_async(
93+
client=app.state.telegram_client,
94+
message=telegram_message,
95+
)
96+
app.state.healthy = False
97+
await asyncio.sleep(10) # Check every 10 seconds
98+
99+
except Exception as e:
100+
rest_logger.error('Error checking last submission: {}', e)
101+
await asyncio.sleep(10) # Still wait before retrying
102+
103+
47104
@app.on_event('startup')
48105
async def startup_boilerplate():
49106
"""
@@ -59,13 +116,32 @@ async def startup_boilerplate():
59116
abi=protocol_state_contract_abi,
60117
)
61118

119+
# Initialize httpx client for Telegram notifications
120+
transport_limits = Limits(
121+
max_connections=10,
122+
max_keepalive_connections=5,
123+
keepalive_expiry=None,
124+
)
125+
126+
app.state.telegram_client = AsyncClient(
127+
base_url=settings.reporting.telegram_url,
128+
timeout=Timeout(timeout=5.0),
129+
follow_redirects=False,
130+
transport=AsyncHTTPTransport(limits=transport_limits),
131+
)
132+
62133
if not settings.ipfs.url:
63134
rest_logger.warning('IPFS url not set, /data API endpoint will be unusable!')
64135
else:
65136
app.state.ipfs_singleton = AsyncIPFSClientSingleton(settings.ipfs)
66137
await app.state.ipfs_singleton.init_sessions()
67138
app.state.ipfs_reader_client = app.state.ipfs_singleton._ipfs_read_client
68139
app.state.epoch_size = 0
140+
app.state.healthy = True
141+
# Start the background task
142+
app.state.background_tasks = []
143+
background_task = asyncio.create_task(check_last_submission())
144+
app.state.background_tasks.append(background_task)
69145

70146

71147
# Health check endpoint
@@ -84,7 +160,11 @@ async def health_check(
84160
Returns:
85161
dict: A dictionary containing the status of the service.
86162
"""
87-
return {'status': 'OK'}
163+
if app.state.healthy:
164+
return {'status': 'OK'}
165+
else:
166+
response.status_code = 500
167+
return {'status': 'UNHEALTHY'}
88168

89169

90170
@app.get('/current_epoch')
@@ -259,6 +339,7 @@ async def get_data_for_project_id_epoch_id(
259339
'status': 'error',
260340
'message': f'IPFS url not set, /data API endpoint is unusable, please use /cid endpoint instead!',
261341
}
342+
# FIXME: outdated method signature
262343
try:
263344
data = await get_project_epoch_snapshot(
264345
request.app.state.protocol_state_contract,
@@ -397,3 +478,14 @@ async def get_task_status_post(
397478
'completed': False,
398479
'message': f'Task {task_status_request.task_type} for wallet {task_status_request.wallet_address} is not completed yet',
399480
}
481+
482+
483+
@app.on_event("shutdown")
484+
async def shutdown_event():
485+
"""Cleanup background tasks"""
486+
for task in app.state.background_tasks:
487+
task.cancel()
488+
try:
489+
await task
490+
except asyncio.CancelledError:
491+
pass

snapshotter/processor_distributor.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -87,25 +87,24 @@ async def _init_httpx_client(self):
8787
"""
8888
Initializes the HTTPX clients with the specified settings.
8989
"""
90-
transport_settings = dict(
91-
limits=Limits(
92-
max_connections=100,
93-
max_keepalive_connections=50,
94-
keepalive_expiry=None,
95-
),
90+
91+
transport_limits = Limits(
92+
max_connections=100,
93+
max_keepalive_connections=50,
94+
keepalive_expiry=None,
9695
)
9796

9897
self._reporting_httpx_client = AsyncClient(
9998
base_url=settings.reporting.service_url,
10099
timeout=Timeout(timeout=5.0),
101100
follow_redirects=False,
102-
transport=AsyncHTTPTransport(**transport_settings),
101+
transport=AsyncHTTPTransport(limits=transport_limits),
103102
)
104103
self._telegram_httpx_client = AsyncClient(
105104
base_url=settings.reporting.telegram_url,
106105
timeout=Timeout(timeout=5.0),
107106
follow_redirects=False,
108-
transport=AsyncHTTPTransport(**transport_settings),
107+
transport=AsyncHTTPTransport(limits=transport_limits),
109108
)
110109

111110
async def _init_preloader_compute_mapping(self):

0 commit comments

Comments
 (0)