Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/danswer-ai/danswer into bug…
Browse files Browse the repository at this point in the history
…fix/indexing_ui

# Conflicts:
#	backend/danswer/background/celery/apps/primary.py
#	backend/danswer/background/celery/configs/indexing.py
#	backend/danswer/background/celery/configs/light.py
#	backend/danswer/background/celery/versioned_apps/heavy.py
#	backend/danswer/background/celery/versioned_apps/indexing.py
#	backend/danswer/background/celery/versioned_apps/light.py
#	backend/supervisord.conf
  • Loading branch information
rkuo-danswer committed Oct 23, 2024
2 parents 03c4b7e + 9105f95 commit 94bd840
Show file tree
Hide file tree
Showing 20 changed files with 284 additions and 347 deletions.
18 changes: 18 additions & 0 deletions .github/workflows/nightly-close-stale-issues.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
name: 'Close stale issues and PRs'
on:
schedule:
- cron: '0 11 * * *' # Runs every day at 3 AM PST / 4 AM PDT / 11 AM UTC

jobs:
stale:
runs-on: ubuntu-latest
steps:
- uses: actions/stale@v9
with:
stale-issue-message: 'This issue is stale because it has been open 75 days with no activity. Remove stale label or comment or this will be closed in 15 days.'
stale-pr-message: 'This PR is stale because it has been open 75 days with no activity. Remove stale label or comment or this will be closed in 15 days.'
close-issue-message: 'This issue was closed because it has been stalled for 90 days with no activity.'
close-pr-message: 'This PR was closed because it has been stalled for 90 days with no activity.'
days-before-stale: 75
# days-before-close: 90 # uncomment after we test stale behavior

Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""
"""Migrate chat_session and chat_message tables to use UUID primary keys
Revision ID: 6756efa39ada
Revises: 5d12a446f5c0
Create Date: 2024-10-15 17:47:44.108537
"""
from alembic import op
import sqlalchemy as sa
Expand All @@ -12,8 +14,6 @@
depends_on = None

"""
Migrate chat_session and chat_message tables to use UUID primary keys.
This script:
1. Adds UUID columns to chat_session and chat_message
2. Populates new columns with UUIDs
Expand Down
3 changes: 2 additions & 1 deletion backend/danswer/background/celery/configs/indexing.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import danswer.background.celery.configs.base as shared_config
from danswer.configs.app_configs import CELERY_WORKER_INDEXING_CONCURRENCY

broker_url = shared_config.broker_url
broker_connection_retry_on_startup = shared_config.broker_connection_retry_on_startup
Expand All @@ -15,6 +16,6 @@
task_default_priority = shared_config.task_default_priority
task_acks_late = shared_config.task_acks_late

worker_concurrency = 1
worker_concurrency = CELERY_WORKER_INDEXING_CONCURRENCY
worker_pool = "threads"
worker_prefetch_multiplier = 1
6 changes: 4 additions & 2 deletions backend/danswer/background/celery/configs/light.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import danswer.background.celery.configs.base as shared_config
from danswer.configs.app_configs import CELERY_WORKER_LIGHT_CONCURRENCY
from danswer.configs.app_configs import CELERY_WORKER_LIGHT_PREFETCH_MULTIPLIER

broker_url = shared_config.broker_url
broker_connection_retry_on_startup = shared_config.broker_connection_retry_on_startup
Expand All @@ -15,6 +17,6 @@
task_default_priority = shared_config.task_default_priority
task_acks_late = shared_config.task_acks_late

worker_concurrency = 24
worker_concurrency = CELERY_WORKER_LIGHT_CONCURRENCY
worker_pool = "threads"
worker_prefetch_multiplier = 8
worker_prefetch_multiplier = CELERY_WORKER_LIGHT_PREFETCH_MULTIPLIER
2 changes: 1 addition & 1 deletion backend/danswer/background/celery/tasks/shared/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def document_by_cc_pair_cleanup_task(
except Exception as e:
task_logger.exception("Unexpected exception")

if self.retries < DOCUMENT_BY_CC_PAIR_CLEANUP_MAX_RETRIES:
if self.request.retries < DOCUMENT_BY_CC_PAIR_CLEANUP_MAX_RETRIES:
# Still retrying. Exponential backoff from 2^4 to 2^6 ... i.e. 16, 32, 64
countdown = 2 ** (self.request.retries + 4)
self.retry(exc=e, countdown=countdown)
Expand Down
4 changes: 3 additions & 1 deletion backend/danswer/background/celery/versioned_apps/heavy.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
"""Factory stub for running celery worker / celery beat."""
"""Factory stub for running celery worker / celery beat.
This code is different from the primary/beat stubs because there is no EE version to
fetch. Port over the code in those files if we add an EE version of this worker."""
from celery import Celery

from danswer.utils.variable_functionality import set_is_ee_based_on_env_variable
Expand Down
4 changes: 3 additions & 1 deletion backend/danswer/background/celery/versioned_apps/indexing.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
"""Factory stub for running celery worker / celery beat."""
"""Factory stub for running celery worker / celery beat.
This code is different from the primary/beat stubs because there is no EE version to
fetch. Port over the code in those files if we add an EE version of this worker."""
from celery import Celery

from danswer.utils.variable_functionality import set_is_ee_based_on_env_variable
Expand Down
4 changes: 3 additions & 1 deletion backend/danswer/background/celery/versioned_apps/light.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
"""Factory stub for running celery worker / celery beat."""
"""Factory stub for running celery worker / celery beat.
This code is different from the primary/beat stubs because there is no EE version to
fetch. Port over the code in those files if we add an EE version of this worker."""
from celery import Celery

from danswer.utils.variable_functionality import set_is_ee_based_on_env_variable
Expand Down
35 changes: 35 additions & 0 deletions backend/danswer/configs/app_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,41 @@
except ValueError:
CELERY_BROKER_POOL_LIMIT = CELERY_BROKER_POOL_LIMIT_DEFAULT

CELERY_WORKER_LIGHT_CONCURRENCY_DEFAULT = 24
try:
CELERY_WORKER_LIGHT_CONCURRENCY = int(
os.environ.get(
"CELERY_WORKER_LIGHT_CONCURRENCY", CELERY_WORKER_LIGHT_CONCURRENCY_DEFAULT
)
)
except ValueError:
CELERY_WORKER_LIGHT_CONCURRENCY = CELERY_WORKER_LIGHT_CONCURRENCY_DEFAULT

CELERY_WORKER_LIGHT_PREFETCH_MULTIPLIER_DEFAULT = 8
try:
CELERY_WORKER_LIGHT_PREFETCH_MULTIPLIER = int(
os.environ.get(
"CELERY_WORKER_LIGHT_PREFETCH_MULTIPLIER",
CELERY_WORKER_LIGHT_PREFETCH_MULTIPLIER_DEFAULT,
)
)
except ValueError:
CELERY_WORKER_LIGHT_PREFETCH_MULTIPLIER = (
CELERY_WORKER_LIGHT_PREFETCH_MULTIPLIER_DEFAULT
)

CELERY_WORKER_INDEXING_CONCURRENCY_DEFAULT = 1
try:
env_value = os.environ.get("CELERY_WORKER_INDEXING_CONCURRENCY")
if not env_value:
env_value = os.environ.get("NUM_INDEXING_WORKERS")

if not env_value:
env_value = str(CELERY_WORKER_INDEXING_CONCURRENCY_DEFAULT)
CELERY_WORKER_INDEXING_CONCURRENCY = int(env_value)
except ValueError:
CELERY_WORKER_INDEXING_CONCURRENCY = CELERY_WORKER_INDEXING_CONCURRENCY_DEFAULT

#####
# Connector Configs
#####
Expand Down
32 changes: 14 additions & 18 deletions backend/danswer/connectors/confluence/connector.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
from datetime import datetime
from datetime import timezone
from typing import Any
from urllib.parse import quote

from danswer.configs.app_configs import CONFLUENCE_CONNECTOR_LABELS_TO_SKIP
from danswer.configs.app_configs import CONTINUE_ON_CONNECTOR_FAILURE
from danswer.configs.app_configs import INDEX_BATCH_SIZE
from danswer.configs.constants import DocumentSource
from danswer.connectors.confluence.onyx_confluence import OnyxConfluence
from danswer.connectors.confluence.utils import attachment_to_content
from danswer.connectors.confluence.utils import build_confluence_client
from danswer.connectors.confluence.utils import build_confluence_document_id
from danswer.connectors.confluence.utils import datetime_from_string
from danswer.connectors.confluence.utils import extract_text_from_confluence_html
Expand Down Expand Up @@ -74,20 +76,21 @@ def __init__(
self.wiki_base = wiki_base.rstrip("/")

# if nothing is provided, we will fetch all pages
self.cql_page_query = "type=page"
cql_page_query = "type=page"
if cql_query:
# if a cql_query is provided, we will use it to fetch the pages
self.cql_page_query = cql_query
cql_page_query = cql_query
elif space:
# if no cql_query is provided, we will use the space to fetch the pages
self.cql_page_query += f" and space='{space}'"
cql_page_query += f" and space='{quote(space)}'"
elif page_id:
if index_recursively:
self.cql_page_query += f" and ancestor='{page_id}'"
cql_page_query += f" and ancestor='{page_id}'"
else:
# if neither a space nor a cql_query is provided, we will use the page_id to fetch the page
self.cql_page_query += f" and id='{page_id}'"
cql_page_query += f" and id='{page_id}'"

self.cql_page_query = cql_page_query
self.cql_label_filter = ""
self.cql_time_filter = ""
if labels_to_skip:
Expand All @@ -96,19 +99,12 @@ def __init__(
self.cql_label_filter = f"&label not in ({comma_separated_labels})"

def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
username = credentials["confluence_username"]
access_token = credentials["confluence_access_token"]

# see https://github.com/atlassian-api/atlassian-python-api/blob/master/atlassian/rest_client.py
# for a list of other hidden constructor args
self.confluence_client = OnyxConfluence(
url=self.wiki_base,
username=username if self.is_cloud else None,
password=access_token if self.is_cloud else None,
token=access_token if not self.is_cloud else None,
backoff_and_retry=True,
max_backoff_retries=60,
max_backoff_seconds=60,
self.confluence_client = build_confluence_client(
credentials_json=credentials,
is_cloud=self.is_cloud,
wiki_base=self.wiki_base,
)
return None

Expand Down Expand Up @@ -202,12 +198,12 @@ def _fetch_document_batches(self) -> GenerateDocumentsOutput:

page_query = self.cql_page_query + self.cql_label_filter + self.cql_time_filter
# Fetch pages as Documents
for pages in self.confluence_client.paginated_cql_page_retrieval(
for page_batch in self.confluence_client.paginated_cql_page_retrieval(
cql=page_query,
expand=",".join(_PAGE_EXPANSION_FIELDS),
limit=self.batch_size,
):
for page in pages:
for page in page_batch:
confluence_page_ids.append(page["id"])
doc = self._convert_object_to_document(page)
if doc is not None:
Expand Down
Loading

0 comments on commit 94bd840

Please sign in to comment.