Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: crud api #2104

Merged
merged 12 commits into from
Mar 9, 2021
11 changes: 6 additions & 5 deletions jina/peapods/runtimes/asyncio/rest/app.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import argparse
import asyncio
import json
import warnings
from typing import Any

Expand Down Expand Up @@ -28,12 +29,12 @@ def get_fastapi_app(args: 'argparse.Namespace', logger: 'JinaLogger'):
"""
with ImportExtensions(required=True):
from fastapi import FastAPI, WebSocket, Body
from fastapi.responses import JSONResponse
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from starlette.endpoints import WebSocketEndpoint
from starlette import status
from starlette.types import Receive, Scope, Send
from starlette.responses import StreamingResponse

from .models import JinaStatusModel, JinaIndexRequestModel, JinaDeleteRequestModel, JinaUpdateRequestModel, \
JinaSearchRequestModel

Expand Down Expand Up @@ -199,13 +200,13 @@ async def delete_api(body: JinaDeleteRequestModel):

async def result_in_stream(req_iter):
"""
Collect the protobuf message converts protobuf message to a dictionary.
Collect the protobuf message converts protobuf messages to strings.

:param req_iter: request iterator
:yield: result
"""
async for k in servicer.Call(request_iterator=req_iter, context=None):
yield MessageToDict(k)
yield json.dumps(MessageToDict(k))
florian-hoenicke marked this conversation as resolved.
Show resolved Hide resolved

@app.websocket_route(path='/stream')
class StreamingEndpoint(WebSocketEndpoint):
Expand Down Expand Up @@ -325,4 +326,4 @@ async def on_disconnect(self, websocket: WebSocket, close_code: int) -> None:
"""
logger.info(f'Client {self.client_info} got disconnected!')

return app
return app
9 changes: 7 additions & 2 deletions jina/peapods/runtimes/asyncio/rest/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@ def build_model_from_pb(name: str, pb_model: Callable):
:return: Model.
"""
from google.protobuf.json_format import MessageToDict

dp = MessageToDict(pb_model(), including_default_value_fields=True)

all_fields = {k: (name if k in ('chunks', 'matches') else type(v), Field(default=v)) for k, v in dp.items()}
if pb_model == QueryLangProto:
all_fields['parameters'] = (Dict, Field(default={}))
if pb_model == DocumentProto:
# these fields are defined as oneof in the jina.proto
# therefore, they are not instantiated when using pb_model()
all_fields['buffer'] = (str, Field()) #base64 encoded
florian-hoenicke marked this conversation as resolved.
Show resolved Hide resolved
all_fields['blob'] = (List, Field()) #ndarray as list
all_fields['text'] = (str, Field())

return create_model(name, **all_fields)

Expand All @@ -51,6 +55,7 @@ class JinaRequestModel(BaseModel):
Field(..., example=[Document().dict()])
request_size: Optional[int] = default_request_size
mime_type: Optional[str] = ''
# TODO mime type is contained in documents already
florian-hoenicke marked this conversation as resolved.
Show resolved Hide resolved
queryset: Optional[List[JinaQueryLangModel]] = None
data_type: DataInputType = DataInputType.AUTO

Expand Down
Empty file.
Empty file.
7 changes: 7 additions & 0 deletions tests/integration/crud/rest/flow.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
!Flow
version: '1'
with:
restful: True
pods:
- name: indexer
uses: yaml/index.yml
7 changes: 7 additions & 0 deletions tests/integration/crud/rest/flow_kv.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
!Flow
version: '1'
with:
restful: True
pods:
- name: indexer
uses: yaml/index_kv.yml
7 changes: 7 additions & 0 deletions tests/integration/crud/rest/flow_vector.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
!Flow
version: '1'
with:
restful: True
pods:
- name: indexer
uses: yaml/index_vector.yml
195 changes: 195 additions & 0 deletions tests/integration/crud/rest/test_rest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
import os
import random
import string
from itertools import chain
from pathlib import Path

import numpy as np
import pytest
import requests

from jina.executors.indexers import BaseIndexer

from jina import Document
from jina.flow import Flow

random.seed(0)
np.random.seed(0)


@pytest.fixture
def config(tmpdir):
os.environ['JINA_REST_DIR'] = str(tmpdir)
yield
del os.environ['JINA_REST_DIR']


def send_rest_request(flow_file, endpoint, method, data):
json = {
'data': data
}
with Flow.load_config(flow_file) as flow:
url = f'http://0.0.0.0:{flow.port_expose}/{endpoint}'
r = getattr(requests, method)(url, json=json)

if r.status_code != 200:
# TODO status_code should be 201 for index
raise Exception(
f'api request failed, url: {url}, status: {r.status_code}, content: {r.content} data: {data}')

return r


def send_rest_index_request(flow_file, documents):
data = [document.dict() for document in documents]
return send_rest_request(flow_file, 'index', 'post', data)


def send_rest_update_request(flow_file, documents):
data = [document.dict() for document in documents]
return send_rest_request(flow_file, 'update', 'put', data)


def send_rest_delete_request(flow_file, ids):
return send_rest_request(flow_file, 'delete', 'delete', ids)


def send_rest_search_request(flow_file, documents):
data = [document.dict() for document in documents]
return send_rest_request(flow_file, 'search', 'post', data)


def random_docs(start, end):
documents = []
for j in range(start, end):
d = Document()
d.id = j
d.tags['id'] = j
d.text = ''.join(random.choice(string.ascii_lowercase) for _ in range(10)).encode('utf8')
d.embedding = np.random.random([10 + np.random.randint(0, 1)])
documents.append(d)
return documents


def get_ids_to_delete(start, end):
return [str(idx) for idx in range(start, end)]


def validate_index_size(num_indexed_docs):
from jina.executors.compound import CompoundExecutor
path_compound = Path(CompoundExecutor.get_component_workspace_from_compound_workspace(os.environ['JINA_REST_DIR'],
'chunk_indexer', 0))
path = Path(os.environ['JINA_REST_DIR'])
bin_files = list(path_compound.glob('*.bin')) + list(path.glob('*.bin'))
assert len(bin_files) > 0
for index_file in bin_files:
index = BaseIndexer.load(str(index_file))
assert index.size == num_indexed_docs


@pytest.mark.parametrize('flow_file', ['flow.yml', 'flow_vector.yml'])
def test_delete_vector(config, flow_file):
NUMBER_OF_SEARCHES = 5

def validate_results(resp, num_matches):
documents = resp.json()['search']['docs']
assert len(documents) == NUMBER_OF_SEARCHES
for doc in documents:
# TODO if there are no matches, the rest api should return an empty list instead of not having the attribute
assert len(Document(doc).matches) == num_matches

send_rest_index_request(flow_file, random_docs(0, 10))
validate_index_size(10)

search_result = send_rest_search_request(flow_file, random_docs(0, NUMBER_OF_SEARCHES))
validate_results(search_result, 10)

delete_ids = []
for d in random_docs(0, 10):
delete_ids.append(d.id)
for c in d.chunks:
delete_ids.append(c.id)

send_rest_delete_request(flow_file, delete_ids)

validate_index_size(0)

search_result = send_rest_search_request(flow_file, random_docs(0, NUMBER_OF_SEARCHES))
validate_results(search_result, 0)


def test_delete_kv(config):
flow_file = 'flow_kv.yml'

def validate_results(resp, num_matches):
assert len(resp.json()['search']['docs']) == num_matches

send_rest_index_request(flow_file, random_docs(0, 10))
validate_index_size(10)

search_result = send_rest_search_request(flow_file, chain(random_docs(2, 5), random_docs(100, 120)))
validate_results(search_result, 3)

send_rest_delete_request(flow_file, get_ids_to_delete(0, 3))
validate_index_size(7)

search_result = send_rest_search_request(flow_file, random_docs(2, 4))
validate_results(search_result, 1)


@pytest.mark.parametrize('flow_file', ['flow.yml', 'flow_vector.yml'])
def test_update_vector(config, flow_file):
NUMBER_OF_SEARCHES = 1
docs_before = list(random_docs(0, 10))
docs_updated = list(random_docs(0, 10))

def validate_results(resp, has_changed):
docs = resp.json()['search']['docs']
assert len(docs) == NUMBER_OF_SEARCHES
hash_set_before = [hash(d.embedding.tobytes()) for d in docs_before]
hash_set_updated = [hash(d.embedding.tobytes()) for d in docs_updated]
for doc_dictionary in docs:
doc = Document(doc_dictionary)
assert len(doc.matches) == 10
for match in doc.matches:
h = hash(match.embedding.tobytes())
if has_changed:
assert h not in hash_set_before
assert h in hash_set_updated
else:
assert h in hash_set_before
assert h not in hash_set_updated

send_rest_index_request(flow_file, docs_before)
validate_index_size(10)

search_result = send_rest_search_request(flow_file, random_docs(0, NUMBER_OF_SEARCHES))
validate_results(search_result, has_changed=False)

send_rest_update_request(flow_file, docs_updated)
validate_index_size(10)

search_result = send_rest_search_request(flow_file, random_docs(0, NUMBER_OF_SEARCHES))
validate_results(search_result, has_changed=True)


def test_update_kv(config):
flow_file = 'flow_kv.yml'
NUMBER_OF_SEARCHES = 1
docs_before = list(random_docs(0, 10))
docs_updated = list(random_docs(0, 10))

def validate_results(resp):
assert len(resp.json()['search']['docs']) == NUMBER_OF_SEARCHES

send_rest_index_request(flow_file, docs_before)
validate_index_size(10)

search_result = send_rest_search_request(flow_file, random_docs(0, NUMBER_OF_SEARCHES))
validate_results(search_result)

send_rest_update_request(flow_file, docs_updated)
validate_index_size(10)

search_result = send_rest_search_request(flow_file, random_docs(0, NUMBER_OF_SEARCHES))
validate_results(search_result)
54 changes: 54 additions & 0 deletions tests/integration/crud/rest/yaml/index.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
!CompoundIndexer
components:
- !NumpyIndexer
with:
index_filename: vec.gz
metric: cosine
metas:
name: vecidx
- !BinaryPbIndexer
with:
index_filename: doc.gz
metas:
name: docidx
metas:
name: chunk_indexer
workspace: $JINA_REST_DIR
requests:
on:
UpdateRequest:
- !VectorIndexDriver
with:
method: update
executor: vecidx
traversal_paths: [ 'r' ]
- !KVIndexDriver
with:
method: update
executor: docidx
traversal_paths: [ 'r' ]
DeleteRequest:
- !DeleteDriver
with:
executor: vecidx
- !DeleteDriver
with:
executor: docidx
IndexRequest:
- !VectorIndexDriver
with:
executor: vecidx
traversal_paths: ['r']
- !KVIndexDriver
with:
executor: docidx
traversal_paths: ['r']
SearchRequest:
- !VectorSearchDriver
with:
executor: vecidx
traversal_paths: ['r']
- !KVSearchDriver
with:
executor: docidx
traversal_paths: ['m']
29 changes: 29 additions & 0 deletions tests/integration/crud/rest/yaml/index_kv.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
!BinaryPbIndexer
with:
index_filename: doc.gzip
metas:
name: docIndexer
workspace: $JINA_REST_DIR

requests:
on:
IndexRequest:
- !KVIndexDriver
with:
executor: docIndexer
traversal_paths: ['r']
SearchRequest:
- !KVSearchDriver
with:
executor: docIndexer
traversal_paths: ['r']
UpdateRequest:
- !KVIndexDriver
with:
method: update
executor: docIndexer
traversal_paths: [ 'r' ]
DeleteRequest:
- !DeleteDriver
with:
executor: docIndexer