Quick Tour

We will do a quick tour of Pinecone’s features. Each section is intended to be self-contained to help you quickly reference a feature in the future.

Creating a vector database service

It is easy to build vector-based ML applications using Pinecone’s simple workflow:

  1. Create a Pinecone Graph. A Pinecone graph is the serialization of a Pinecone Service.

  2. Deploy the Pinecone graph. It takes a few minutes for Pinecone to set up the service. Once the deployment finishes, you will have a production-ready vector database service. (Yes, it’s that simple!)

  3. Establish a Connection to the Pinecone service. You can then start inserting vectors and getting query results.

In this example, we show you how you can build a simple nearest neighbor classifier. We will sample data from two multivariate normal distributions with different means and equal variance. We will then classify which multivariate normal distribution an unknow vector belongs to by the majority vote of the nearest neighbors’ class labels.

Multivariate normals
"""Generate data from multivariate normal distributions"""

import numpy as np
from collections import Counter

sample_size = 50000
dim = 10
A_mean = 0
B_mean = 2

# Create multivariate normal samples
A_vectors = A_mean + np.random.randn(sample_size, dim)
B_vectors = B_mean + np.random.randn(sample_size, dim)

A_vectors_id_tuple = [('A{}'.format(ii), vector) for ii, vector in enumerate(A_vectors)]
B_vectors_id_tuple = [('B{}'.format(ii), vector) for ii, vector in enumerate(B_vectors)]

# Query data generated from A distribution
query_size = 100
A_queries = A_mean + np.random.randn(query_size, dim)


"""Build a classifier using Pinecone"""

import pinecone.graph
import pinecone.service
import pinecone.connector

# Create a graph
graph = pinecone.graph.IndexGraph(metric="euclidean")

# Deploy the graph
service_name = 'knn-classifier'
pinecone.service.deploy(service_name=service_name, graph=graph)

# Create a connection
conn = pinecone.connector.connect(service_name)

# Upload the sample data formatted as (id, vector) tuples.
acks_A = conn.upsert(items=A_vectors_id_tuple).collect()
acks_B = conn.upsert(items=B_vectors_id_tuple).collect()

# We expect most of a query's nearest neighbors are from the A distribution
for result in conn.query(queries=A_queries, top_k=10).collect():
    print(Counter(id_[0] for id_ in result.ids))

# Stop the service
pinecone.service.stop(service_name)

Managing services

You can manage a Pinecone Service via the following APIs:

  • ls: list your Pinecone services.

  • deploy: deploy a Pinecone service from a Pinecone graph.

  • stop: stop a Pinecone service.

  • describe: get metadata about a Pinecone service such as its status and graphical serialization.

import pinecone.graph
import pinecone.service

# List services
pinecone.service.ls()

# Deploy a service
graph = pinecone.graph.IndexGraph()
pinecone.service.deploy(service_name='hello-pinecone', graph=graph)

# Describe a service
pinecone.service.describe(service_name='hello-pinecone')

# Stop a service
pinecone.service.stop(service_name='hello-pinecone')

Interacting with a service

A Pinecone service creats an index for your input vectors, and let’s you query their nearest neighbors. A service supports the following operations:

  • upsert: insert data formatted as (id, vector) tuples into the index, or update existing data by id.

  • delete: delete data by id.

  • query: query the index and retrieve the top-k nearest neighbors based on dot-product, cosine-similarity, Euclidean distance, and more.

  • fetch: fetch vectors stored in the index by id.

  • info: get statistics about the index.

import pinecone.graph
import pinecone.service
import pinecone.connector

service_name='hello-pinecone'

# Deploy a service
graph = pinecone.graph.IndexGraph()
pinecone.service.deploy(service_name=service_name, graph=graph)

# Create a connection
conn = pinecone.connector.connect(service_name)

# Insert vectors

# Method 1: in-memory, all at once
acks_AB = conn.upsert(items=[("A", [1, 1]), ("B", [1, 1])]).collect()

# Method 2: in-memory, batch by batch
db_cursor = conn.upsert(items=[("C", [1, 1]), ("D", [1, 1]), ("E", [1, 1])])
acks_C = db_cursor.take(1)
acks_DE = db_cursor.take(2)

# Method 3: stream with generators. This allows you to, for instance, consume Kafka streams.
stream_iterator = (('A{}'.format(ii), [1, 1]) for ii in range(10))
for ack in conn.upsert(items=stream_iterator).stream():
    print(ack)

# Update vectors
acks = conn.upsert(items=[("A", [0, 0])]).collect()

# Fetch vectors
vectors = conn.fetch(ids=["A", "B"]).collect()

# Delete vectors
acks = conn.delete(ids=["A", "B"]).collect()

# Query
query_results = conn.query(queries=[[0, 1], [0, 0]], top_k=5).collect()

# Index statistics
conn.info()

# Stop the service
pinecone.service.stop(service_name=service_name)

Namespacing data

Namespacing is a neat feature in a Pinecone service that allows you to partition your data in an index. When you read from or write to a namespace in an index, you only access data in that particular namespace. In other words, data from two namespaces may have the same ids but different values. Namespacing is useful when you want to reuse the same data processing pipeline but query only a subset of your data. For example, when you are building a movie recommender system, you could use namespacing to separate recommendations by genre.

import pinecone.graph
import pinecone.service
import pinecone.connector

service_name='hello-pinecone'

# Deploy a service
graph = pinecone.graph.IndexGraph()
pinecone.service.deploy(service_name=service_name, graph=graph)

# Create a connection
conn = pinecone.connector.connect(service_name)

# Insert vectors without specifying a namespace
conn.upsert(items=[("Wall-E", [0, 0]), ("Toy Story", [0, 1])]).collect()
conn.info()

# Insert vectors into a namespace
conn.upsert(items=[("Wall-E", [1, 1])], namespace="sci-fi").collect()
conn.info(namespace="sci-fi")

# Fetch a vector
conn.fetch(ids=["Wall-E"]).collect()

# Fetch a vector from a different namespace with the same id
conn.fetch(ids=["Wall-E"], namespace="sci-fi").collect()

# Stop the service
pinecone.service.stop(service_name=service_name)

Preprocessing data

In most vector-based ML applications, your data and your queries should be comparable (i.e., in the same vector space). For example, in a user-movie recommender system, user data and movie data are usually tranformed into embeddings in the same vector space for retrieving recommendations. The data transformation tasks during insertion, update, and query are usually performed offline or as a service separate from the database. Pinecone lets you integrate these tasks within a Pinecone service to optimize performance and streamline deployments.

Let’s consider the following application. Suppose that the data consist of RGB color images whereas the expected queries are greyscale images. We will create a preprocessor function to transform the data into greyscale images, then flatten them. We add this preprocessor fucntion to a Pinecone service so that we can index the color images directly.

First, create the preprocessor function and write the function locally as preprocessor.py. You can use your favorite framework (Tensorflow, Pytorch, Scikit-learn, etc) to write the preprocessor function.

# preprocessor.py
import numpy as np
from pinecone.hub import preprocessor


@preprocessor
class MyFunc:
    def transform(self, vectors):
        weights = [0.299, 0.587, 0.144]
        mat = np.dot(vectors, weights)
        return np.array([vec.flatten() for vec in mat])

The next step is to package and upload the preprocessor as a Pinecone-compatible docker image. You can use convenience functions in the pinecone package to easily package, build, and push docker images.

import pinecone.hub
import numpy as np

# Docker image helper
image_builder = pinecone.hub.ImageBuilder(
    # The name of the docker image (you should also tag the image)
    image="mock-preprocessor:v1",

    # path to which docker build artifacts are saved
    build_path="./docker_build/image_preprocessor/v1",

    # main model file
    model_path='./preprocessor.py',

    # pip packages needed, in addition to numpy
    pip=[],

    # additional files or directories needed
    data_paths=[],
)

# Prepare for docker build. Copy all the necessary files to the build path.
image_builder.package(exist_ok=True)

# Get commands for logging into Pinecone Hub.
login_cmd = pinecone.hub.get_login_cmd()

# Run the commands in the terminal.
# or in Jupyter notebook:
# !{login_cmd}
assert False, f"Run the commands in the terminal or in Jupyter notebook: {login_cmd}"

# Get commands for building the docker image.
build_cmd = image_builder.get_build_cmd()

# Run the commands in the terminal.
# or in Jupyter notebook:
# !{build_cmd}
assert False, f"Run the commands in the terminal or in Jupyter notebook: {build_cmd}"

# Test the docker image locally and make sure that the preprocessor returns the expected output.
rgb_img = np.random.rand(5, 5, 3) * 255
weights = [0.299, 0.587, 0.144]

input_vectors = np.array([rgb_img])
expected_output_vectors = np.array([vec.flatten() for vec in np.dot(input_vectors, weights)])
output_vectors = None

with pinecone.hub.ImageServer(image=image_builder.image, image_type="preprocessor") as preprocessor:
    output_vectors = await preprocessor.transform(input_vectors)

assert all([(np.float32(v1) == np.float32(v2)).all() for v1, v2 in zip(output_vectors, expected_output_vectors)])

# After testing the docker image locally, we will push the docker image to Pinecone hub.
# Get commands for pushing the docker image.
push_cmd = image_builder.get_push_cmd()

# Run the commands in the terminal.
# or in Jupyter notebook:
# !{push_cmd}
assert False, f"Run the commands in the terminal or in Jupyter notebook: {push_cmd}"

# Check that the docker image is uploaded.
assert image_builder.repository in pinecone.hub.list_repositories()
assert image_builder.tag in pinecone.hub.list_repository_tags(image_builder.repository)

Once the preprocessor image is pushed to the Model Hub, we can add it to a Pinecone graph. Upsert requests flow through the “write” path in the graph, and query requests flow through the “read” path in the graph.

graph = pinecone.graph.IndexGraph()

# Add the rgb-to-greyscale preprocessor to the write path of the graph
image_name = pinecone.hub.as_user_image(image_builder.image)
write_preprocessor = pinecone.hub.HubFunction(name='rgb2grey-preprocessor', image=image_name)
graph.add_write_preprocessor(fn=write_preprocessor)

# View the graph. It works best if you are using Jupyter notebook.
graph.view()

You can deploy the graph as usual, but now you can insert RGB images and query with greyscale images.

Postprocessing query results

We often need to re-rank the query results. For example, we may replace some nearest neighbors in the query results with more popular items when their similarity scores are below a certain threshold. Suppose that popular_items.json is a file containing the popular items and their respective similarity scores.

[["A", 0.85], ["B", 0.8]]

Let’s create the postprocessor function and write it to postprocessor.py:

# postprocessor.py
import json
import os
import numpy as np
from pinecone.hub import postprocessor, QueryResult

# by default all the data files on HyperCloud are placed in the ./data directory.
DATA_PREFIX = './data'
THRESHOLD = 0.7

# Load popular items
popular_items = []  # list of (id, score)
with open(os.path.join(DATA_PREFIX, 'popular_items.json')) as infile:
    popular_items = json.load(infile)

@postprocessor
class MyFunc:
    def transform(self, queries, matches):
        output = []
        for match in matches:
            # First query result with score below the threshold
            threshold_idx = -1
            for ii, score in enumerate(match.scores):
                if score < THRESHOLD:
                    threshold_idx = ii
                    break
            if threshold_idx < 0:
                # All results are above threshold
                output.append(match)
            else:
                backfill_size = len(match.ids) - threshold_idx
                new_ids = match.ids[:threshold_idx] + [popular_items[ii][0] for ii in range(backfill_size)]
                new_scores = match.scores[:threshold_idx] + [popular_items[ii][1] for ii in range(backfill_size)]
                output.append(QueryResult(ids=new_ids, scores=new_scores, data=match.data))
        return output

The next step is to package and upload the postprocessor as a Pinecone-compatible docker image. You can use convenience functions in the pinecone package to easily package, build, and push docker images.

# Docker image helper
image_builder = pinecone.hub.ImageBuilder(
    # The name of the docker image (you should also tag the image)
    image="mock-postprocessor:v1",

    # path to which docker build artifacts are saved
    build_path="./docker_build/image_postprocessor/v1",

    # main model file
    model_path='./postprocessor.py',

    # pip packages needed, in addition to numpy
    pip=[],

    # additional files or directories needed
    data_paths=['./popular_items.json'],
)

# Prepare for docker build. Copy all the necessary files to the build path.
image_builder.package(exist_ok=True)


# Get commands for logging into Pinecone Hub.
login_cmd = pinecone.hub.get_login_cmd()

# Run the commands in the terminal.
# or in Jupyter notebook:
# !{login_cmd}


# Get commands for building the docker image.
build_cmd = image_builder.get_build_cmd()

# Run the commands in the terminal.
# or in Jupyter notebook:
# !{build_cmd}

# Test the docker image locally and make sure that the postprocessor returns the expected output.
from pinecone.hub import QueryResult

input_vectors = [QueryResult(ids=["AA", "BB"], scores=np.array([0.9, 0.3]))]
expected_output_vectors = [QueryResult(ids=["AA", "A"], scores=np.array([0.9, 0.85]))]
output_vectors = None

with pinecone.hub.ImageServer(image=image_builder.image, image_type="postprocessor") as postprocessor:
    output_vectors = await postprocessor.transform(np.ones(1), input_vectors)

# Verify that output_vectors is the same as expected_output_vectors
assert output_vectors[0].ids == expected_output_vectors[0].ids
assert all(np.array(output_vectors[0].scores, dtype=np.float32) == np.array(expected_output_vectors[0].scores, dtype=np.float32))


# After testing the docker image locally, we will push the docker image to Pinecone hub.
# Get commands for pushing the docker image.
push_cmd = image_builder.get_push_cmd()

# Run the commands in the terminal.
# or in Jupyter notebook:
# !{push_cmd}

# Check that the docker image is uploaded.
assert image_builder.repository in pinecone.hub.list_repositories()
assert image_builder.tag in pinecone.hub.list_repository_tags(image_builder.repository)

Once the postprocessor image is pushed to the Model Hub, we can add it to a Pinecone graph.

graph = pinecone.graph.IndexGraph()

# Add the postprocessor to the graph
image_name = pinecone.hub.as_user_image(image_builder.image)
postprocessor = pinecone.hub.HubFunction(name='postprocessor', image=image_name)
graph.add_postprocessor(fn=postprocessor)

# View the graph. It works best if you are using Jupyter notebook.
graph.view()

You can then deploy the graph as usual.

Routing traffic

With Pinecone traffic routers, your in-production ML models can be updated with zero downtime. You can manage Pinecone traffic routers via the following APIs:

  • ls: list your Pinecone traffic routers.

  • deploy: deploy a Pinecone traffic router.

  • update_active_service: updates a traffic router’s active service.

  • update_services: updates the list of services available to a traffic router.

  • stop: stop a Pinecone traffic router.

  • describe: get metadata about a Pinecone traffic router.

import pinecone.router
import pinecone.connector

# List routers
pinecone.router.ls()

# Deploy a router with existing services.
pinecone.router.deploy(
    router_name="movie_recommender_router",
    services=["recommender-v1", "recommender-v2"],
    active_service="recommender-v1",
)

# Connect to the active service of a traffic router
conn = pinecone.connector.connect(router_name="movie_recommender_router")

# Update a traffic router's active service.
pinecone.router.update_active_service(
    router_name="movie_recommender_router",
    active_service="recommender-v2",
)

# Update the list of services available to a traffic router.
# NOTE: the new list of services much contain the currently active service.
pinecone.router.update_services(
    router_name="movie_recommender_router",
    services=["recommender-v2", "recommender-v3"],
)

# Stop a traffic router
pinecone.router.stop("movie_recommender_router")

# Describe a traffic router
pinecone.router.describe("movie_recommender_router")