1
1
import json
2
2
import grpc
3
- from typing import List , Dict , Tuple , Optional , Type , TypeVar , Any
3
+ from typing import List , Dict , Tuple , Optional , Any
4
4
from src .common .utils .rpc import pymongo_rpc_pb2 , pymongo_rpc_pb2_grpc
5
5
from src .common .config import plugin_config
6
6
7
7
8
- T = TypeVar ('T' )
9
-
10
-
11
8
class CollectionProxy :
12
- def __init__ (self , rpc_client : Type [ 'MongoClient' ] , collection_name : str ):
9
+ def __init__ (self , rpc_client : 'MongoClient' , collection_name : str ):
13
10
self .rpc_client = rpc_client
14
11
self .collection_name = collection_name
15
12
16
- def __getitem__ (self : T , collection_name : str ) -> T :
13
+ def __getitem__ (self , collection_name : str ) -> 'CollectionProxy' :
17
14
return CollectionProxy (self .rpc_client , collection_name )
18
15
19
16
def find (self , filter : Dict = {}) -> List [Dict [str , Any ]]:
@@ -39,7 +36,7 @@ def create_index(self, keys: List[Tuple], name: Optional[str] = None, default_la
39
36
40
37
41
38
class MongoClient :
42
- def __init__ (self , mongo_host : str , mongo_port : str , ** kwargs ):
39
+ def __init__ (self , mongo_host : str , mongo_port : int , ** kwargs ):
43
40
self .channel = grpc .insecure_channel (f'{ mongo_host } :{ mongo_port } ' )
44
41
self .stub = pymongo_rpc_pb2_grpc .MongoDBServiceStub (self .channel )
45
42
self .metadata = [('authorization' , plugin_config .rpc_token )]
0 commit comments