Python

Overview

Our service takes in a payload containing bytes and capitalizes them.

Using OpenCensus, we can collect traces of our system and export them to the backend of our choice, to give observability to our distributed systems.

Before beginning, if you haven’t already:

Setup

Make sure to setup your GOOGLE_APPLICATION_CREDENTIALS environment variable. Visit here for instructions on how to do so.

Installation

This walkthrough will be using Python 3.

Install the required modules by running this command in your terminal:

python3 -m pip install grpcio-tools opencensus google-cloud-trace

Next, let’s setup our working directory. Run the following commands in your terminal:

touch capitalizeServer.py
touch capitalizeClient.py
mkdir proto
touch proto/defs.proto

Our working directory will now look like this:

./capitalizeServer.py
./capitalizeClient.py
./proto/
./proto/defs.proto

Protobuf Definition

Copy and paste the following code inside of ./proto/defs.proto:

syntax = "proto3";

package rpc;

message Payload {
    int32 id    = 1;
    bytes data  = 2;
}

service Fetch {
    rpc Capitalize(Payload) returns (Payload) {}
}

Now, run the following command in your terminal to create the gRPC stubs.

python3 -m grpc_tools.protoc \
  -I./proto \
  --python_out=. \
  --grpc_python_out=. ./proto/defs.proto

This will create two new files. Your working directory will be:

./defs_pb.py
./defs_pb2_grpc.py
./capitalizeServer.py
./capitalizeClient.py
./proto/
./proto/defs.proto

Generate the Client

Copy and paste the following code inside of ./capitalizeClient.py:

import grpc

import defs_pb2_grpc as proto
import defs_pb2 as pb

def main():
    channel = grpc.insecure_channel('localhost:9778')
    stub = proto.FetchStub(channel)

    while True:
        lineIn = input('> ')
        capitalized = stub.Capitalize(pb.Payload(data=bytes(lineIn, encoding='utf-8')))
        print('< %s\n'%(capitalized.data.decode('utf-8')))

if __name__ == '__main__':
    main()

Generate the Service

Copy and paste the following code inside of ./capitalizeServer.py:

import grpc
import time
from concurrent import futures

import defs_pb2_grpc as proto
import defs_pb2 as pb

class CapitalizeServer(proto.FetchServicer):
    def __init__(self, *args, **kwargs):
        super(CapitalizeServer, self).__init__()

    def Capitalize(self, request, context):
        return pb.Payload(data=request.data.upper())

def main():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    proto.add_FetchServicer_to_server(CapitalizeServer(), server)
    server.add_insecure_port('[::]:9778')
    server.start()

    try:
        while True:
            time.sleep(60 * 60)
    except KeyboardInterrupt:
        server.stop(0)

if __name__ == '__main__':
    main()

Run the Application

Let’s now run two of our python files at once. You may need to open a second terminal tab.

In one terminal tab, run python3 capitalizeServer.py.

In the second terminal tab, run python3 capitalizeClient.py.

Try typing in some text and hitting enter in the tab running capitalizeClient.py. You should see something resembling the following:

Instrumentation

Tracing

Open ./capitalizeServer.py.

First let’s import the required packages:

from opencensus.trace.samplers import always_on
from opencensus.trace.tracer import Tracer
from opencensus.trace.ext.grpc import server_interceptor
import grpc
import time
from concurrent import futures

from opencensus.trace.samplers import always_on
from opencensus.trace.tracer import Tracer
from opencensus.trace.ext.grpc import server_interceptor

import defs_pb2_grpc as proto
import defs_pb2 as pb

class CapitalizeServer(proto.FetchServicer):
    def __init__(self, *args, **kwargs):
        super(CapitalizeServer, self).__init__()

    def Capitalize(self, request, context):
        return pb.Payload(data=request.data.upper())

def main():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    proto.add_FetchServicer_to_server(CapitalizeServer(), server)
    server.add_insecure_port('[::]:9778')
    server.start()

    try:
        while True:
            time.sleep(60 * 60)
    except KeyboardInterrupt:
        server.stop(0)

if __name__ == '__main__':
    main()

Now let’s modify our Capitalize function to create our span:

def Capitalize(self, request, context):
  tracer = Tracer(sampler=always_on.AlwaysOnSampler())
  with tracer.span(name='Capitalize') as span:
    data = request.data
    span.add_annotation('Data in', len=len(data))
    return pb.Payload(data=data.upper())
import grpc
import time
from concurrent import futures

from opencensus.trace.samplers import always_on
from opencensus.trace.tracer import Tracer
from opencensus.trace.ext.grpc import server_interceptor

import defs_pb2_grpc as proto
import defs_pb2 as pb

class CapitalizeServer(proto.FetchServicer):
    def __init__(self, *args, **kwargs):
        super(CapitalizeServer, self).__init__()

     def Capitalize(self, request, context):
        tracer = Tracer(sampler=always_on.AlwaysOnSampler())
        with tracer.span(name='Capitalize') as span:
            data = request.data
            span.add_annotation('Data in', len=len(data))
            return pb.Payload(data=data.upper())

def main():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    proto.add_FetchServicer_to_server(CapitalizeServer(), server)
    server.add_insecure_port('[::]:9778')
    server.start()

    try:
        while True:
            time.sleep(60 * 60)
    except KeyboardInterrupt:
        server.stop(0)

if __name__ == '__main__':
    main()

Finally, let’s modify our main function to setup the interceptor.

def main():
    # Setup the gRPC integration/interceptor
    tracer_interceptor = server_interceptor.OpenCensusServerInterceptor(
            always_on.AlwaysOnSampler())

    server = grpc.server(
            futures.ThreadPoolExecutor(max_workers=10),
            interceptors=(tracer_interceptor,))

    proto.add_FetchServicer_to_server(CapitalizeServer(), server)
    server.add_insecure_port('[::]:9778')
    server.start()
import grpc
import time
from concurrent import futures

from opencensus.trace.samplers import always_on
from opencensus.trace.tracer import Tracer
from opencensus.trace.ext.grpc import server_interceptor

import defs_pb2_grpc as proto
import defs_pb2 as pb

class CapitalizeServer(proto.FetchServicer):
    def __init__(self, *args, **kwargs):
        super(CapitalizeServer, self).__init__()

     def Capitalize(self, request, context):
        tracer = Tracer(sampler=always_on.AlwaysOnSampler())
        with tracer.span(name='Capitalize') as span:
            data = request.data
            span.add_annotation('Data in', len=len(data))
            return pb.Payload(data=data.upper())

def main():
    # Setup the gRPC integration/interceptor
    tracer_interceptor = server_interceptor.OpenCensusServerInterceptor(
            always_on.AlwaysOnSampler())

    server = grpc.server(
            futures.ThreadPoolExecutor(max_workers=10),
            interceptors=(tracer_interceptor,))

    proto.add_FetchServicer_to_server(CapitalizeServer(), server)
    server.add_insecure_port('[::]:9778')
    server.start()

    try:
        while True:
            time.sleep(60 * 60)
    except KeyboardInterrupt:
        server.stop(0)

if __name__ == '__main__':
    main()

Exporting

Import the required packages:

import os
from opencensus.trace.exporters.transports.background_thread import BackgroundThreadTransport
from opencensus.trace.exporters import stackdriver_exporter
import grpc
import os
import time
from concurrent import futures

from opencensus.trace.samplers import always_on
from opencensus.trace.tracer import Tracer
from opencensus.trace.ext.grpc import server_interceptor
from opencensus.trace.exporters.transports.background_thread import BackgroundThreadTransport
from opencensus.trace.exporters import stackdriver_exporter

import defs_pb2_grpc as proto
import defs_pb2 as pb

class CapitalizeServer(proto.FetchServicer):
    def __init__(self, *args, **kwargs):
        super(CapitalizeServer, self).__init__()

     def Capitalize(self, request, context):
        tracer = Tracer(sampler=always_on.AlwaysOnSampler())
        with tracer.span(name='Capitalize') as span:
            data = request.data
            span.add_annotation('Data in', len=len(data))
            return pb.Payload(data=data.upper())

def main():
    # Setup the gRPC integration/interceptor
    tracer_interceptor = server_interceptor.OpenCensusServerInterceptor(
            always_on.AlwaysOnSampler())

    server = grpc.server(
            futures.ThreadPoolExecutor(max_workers=10),
            interceptors=(tracer_interceptor,))

    proto.add_FetchServicer_to_server(CapitalizeServer(), server)
    server.add_insecure_port('[::]:9778')
    server.start()

    try:
        while True:
            time.sleep(60 * 60)
    except KeyboardInterrupt:
        server.stop(0)

if __name__ == '__main__':
    main()

Setup the exporter:

# NOTE: Replace 'YOUR_GOOGLE_PROJECT_ID_HERE' with your actual Google Project ID!
exporter = stackdriver_exporter.StackdriverExporter(
    project_id=os.environ.get('YOUR_GOOGLE_PROJECT_ID_HERE'),
    transport=BackgroundThreadTransport)
import grpc
import os
import time
from concurrent import futures

from opencensus.trace.samplers import always_on
from opencensus.trace.tracer import Tracer
from opencensus.trace.ext.grpc import server_interceptor
from opencensus.trace.exporters.transports.background_thread import BackgroundThreadTransport
from opencensus.trace.exporters import stackdriver_exporter

import defs_pb2_grpc as proto
import defs_pb2 as pb

# NOTE: Replace 'YOUR_GOOGLE_PROJECT_ID_HERE' with your actual Google Project ID!
exporter = stackdriver_exporter.StackdriverExporter(
    project_id=os.environ.get('YOUR_GOOGLE_PROJECT_ID_HERE'),
    transport=BackgroundThreadTransport)

class CapitalizeServer(proto.FetchServicer):
    def __init__(self, *args, **kwargs):
        super(CapitalizeServer, self).__init__()

     def Capitalize(self, request, context):
        tracer = Tracer(sampler=always_on.AlwaysOnSampler())
        with tracer.span(name='Capitalize') as span:
            data = request.data
            span.add_annotation('Data in', len=len(data))
            return pb.Payload(data=data.upper())

def main():
    # Setup the gRPC integration/interceptor
    tracer_interceptor = server_interceptor.OpenCensusServerInterceptor(
            always_on.AlwaysOnSampler())

    server = grpc.server(
            futures.ThreadPoolExecutor(max_workers=10),
            interceptors=(tracer_interceptor,))

    proto.add_FetchServicer_to_server(CapitalizeServer(), server)
    server.add_insecure_port('[::]:9778')
    server.start()

    try:
        while True:
            time.sleep(60 * 60)
    except KeyboardInterrupt:
        server.stop(0)

if __name__ == '__main__':
    main()

Implement the exporter:

def Capitalize(self, request, context):
  tracer = Tracer(sampler=always_on.AlwaysOnSampler(), exporter=exporter)

def main():
  tracer_interceptor = server_interceptor.OpenCensusServerInterceptor(
    always_on.AlwaysOnSampler())
import grpc
import os
import time
from concurrent import futures

from opencensus.trace.samplers import always_on
from opencensus.trace.tracer import Tracer
from opencensus.trace.ext.grpc import server_interceptor
from opencensus.trace.exporters.transports.background_thread import BackgroundThreadTransport
from opencensus.trace.exporters import stackdriver_exporter

import defs_pb2_grpc as proto
import defs_pb2 as pb

# NOTE: Replace 'YOUR_GOOGLE_PROJECT_ID_HERE' with your actual Google Project ID!
exporter = stackdriver_exporter.StackdriverExporter(
    project_id=os.environ.get('YOUR_GOOGLE_PROJECT_ID_HERE'),
    transport=BackgroundThreadTransport)

class CapitalizeServer(proto.FetchServicer):
    def __init__(self, *args, **kwargs):
        super(CapitalizeServer, self).__init__()

     def Capitalize(self, request, context):
        tracer = Tracer(sampler=always_on.AlwaysOnSampler(), exporter=exporter)
        with tracer.span(name='Capitalize') as span:
            data = request.data
            span.add_annotation('Data in', len=len(data))
            return pb.Payload(data=data.upper())

def main():
    # Setup the gRPC integration/interceptor
    tracer_interceptor = server_interceptor.OpenCensusServerInterceptor(
            always_on.AlwaysOnSampler(), exporter)

    server = grpc.server(
            futures.ThreadPoolExecutor(max_workers=10),
            interceptors=(tracer_interceptor,))

    proto.add_FetchServicer_to_server(CapitalizeServer(), server)
    server.add_insecure_port('[::]:9778')
    server.start()

    try:
        while True:
            time.sleep(60 * 60)
    except KeyboardInterrupt:
        server.stop(0)

if __name__ == '__main__':
    main()

Examining Traces

Please visit https://console.cloud.google.com/traces/traces

which will give visuals such as:

Trace list

Single trace details