-
Notifications
You must be signed in to change notification settings - Fork 5.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] migrate builtins #15601
base: main
Are you sure you want to change the base?
[WIP] migrate builtins #15601
Conversation
) -> None: | ||
self.callback_manager = callback_manager or CallbackManager([]) | ||
Workflow.__init__(self, timeout=timeout, **kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
passing kwargs is a little dangerous, we should only pass in the args that workflow accepts
class Workflow(metaclass=_WorkflowMeta):
def __init__(
self,
timeout: Optional[float] = 10.0,
disable_validation: bool = False,
verbose: bool = False,
service_manager: ServiceManager = ServiceManager(),
) -> None:
with self.callback_manager.event( | ||
CBEventType.QUERY, payload={EventPayload.QUERY_STR: query_bundle.query_str} | ||
) as query_event: | ||
response = await self.run(query_bundle=query_bundle) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
without defining any @step
methods in the base class, won't self.run()
not run anything? It requires the subclass to have @step
methods in order to work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
imo here for example, def _aquery()
should be the step method, and def aquery()
be the workflow entry point? (I might be wrong)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or I guess, if we rely on _aquery()
being the entry point, that will be a breaking change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to explain my rationale:
- we want to migrate query engines to workflows
- we don't want to do it all at once. most existing query engines override
_query
and_aquery
if this PR lands, then any query engine that hasn't been migrated over (not retriever or sub-question query engine), will call _query
or _aquery
which (ideally) overrides this method on the base class
this parent method will only run for query engines that have been migrated to workflows
|
||
class CustomQueryEngine(BaseModel, BaseQueryEngine): | ||
|
||
class ExtendedCombinedMeta(CombinedMeta, PydanticMetaclass): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a new one for me 😅
@step | ||
async def generate_sub_questions( | ||
self, ctx: Context, ev: StartEvent | ||
) -> SubQuestionEvent | None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be using Union[...]
, otherwise it breaks python3.8 support (I know all our examples use the |
operator, because technically examples don't need to support python3.8 heh)
# filter out sub questions that failed | ||
qa_pairs: List[SubQuestionAnswerPair] = list(filter(None, qa_pairs_all)) | ||
for ind, sub_q in enumerate(sub_questions): | ||
self.send_event(SubQuestionEvent(query=sub_q, color=colors[str(ind)])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we already merged the PR that moves send_event()
to be on the context rather than the self
instance
a very WIP draft to migrate query engine to workflows in a way that we don't have to migrate all query engines at once (and also break existing user's custom query engines)