1、 broker-service->auth-service->postgresdb;
2、 zipkin监控:需代码入侵;
一、auth-service
1、 通过context传递span;
main.go
package main
import (
"broker-service/auth-service"
"broker-service/auth-service/data"
"database/sql"
"fmt"
"log"
"net/http"
"os"
"time"
"github.com/opentracing/opentracing-go"
zipkinot "github.com/openzipkin-contrib/zipkin-go-opentracing"
"github.com/openzipkin/zipkin-go"
zipkinhttp "github.com/openzipkin/zipkin-go/reporter/http"
_ "github.com/jackc/pgconn"
_ "github.com/jackc/pgx/v4"
_ "github.com/jackc/pgx/v4/stdlib"
)
const (
// Our service name.
serviceName = "auth"
// Host + port of our service.
hostPort = "localhost:8090"
// Endpoint to send Zipkin spans to.
zipkinHTTPEndpoint = "http://localhost:9411/api/v2/spans"
)
var counts int
//auth
func main() {
// set up a span reporter
reporter := zipkinhttp.NewReporter(zipkinHTTPEndpoint)
defer reporter.Close()
// create our local service endpoint
endpoint, err := zipkin.NewEndpoint(serviceName, hostPort)
if err != nil {
log.Fatalf("unable to create local endpoint: %+v\n", err)
}
// initialize our tracer
nativeTracer, err := zipkin.NewTracer(reporter, zipkin.WithLocalEndpoint(endpoint))
if err != nil {
log.Fatalf("unable to create tracer: %+v\n", err)
}
// use zipkin-go-opentracing to wrap our tracer
tracer := zipkinot.Wrap(nativeTracer)
// optionally set as Global OpenTracing tracer instance
opentracing.SetGlobalTracer(tracer)
//connect to DB
conn := connectToDB()
if conn == nil {
log.Panic("Can't connect to Postgres!")
}
// create the service implementation
service := auth.NewService(conn, data.New(conn))
// create the HTTP Server Handler for the service
handler := auth.NewHTTPHandler(tracer, service)
// start the service
fmt.Printf("Starting %s on %s\n", serviceName, hostPort)
http.ListenAndServe(hostPort, handler)
}
func openDB(dsn string) (*sql.DB, error) {
db, err := sql.Open("pgx", dsn)
if err != nil {
return nil, err
}
err = db.Ping()
if err != nil {
return nil, err
}
return db, nil
}
func connectToDB() *sql.DB {
dsn := "host=localhost port=5432 user=postgres password=password dbname=users sslmode=disable timezone=Asia/Shanghai connect_timeout=5"
}
for {
connection, err := openDB(dsn)
if err != nil {
log.Println(dsn)
log.Println("postgres is not ready...")
time.Sleep(2 * time.Second)
counts++
} else {
log.Println("connected to postgres")
return connection
}
if counts > 100 {
log.Panic(err)
}
}
}
1、 定义auth服务;
httpService.go
package auth
import (
"log"
"net/http"
opentracing "github.com/opentracing/opentracing-go"
"broker-service/middleware"
)
type httpService struct {
service Service
}
type RequestPayload struct {
Action string json:"action"
Auth AuthPayload json:"auth,omitempty"
Log loggerPayload json:"log,omitempty"
}
type AuthPayload struct {
Email string json:"email"
Password string json:"password"
}
type loggerPayload struct {
Name string json:"name"
Data string json:"data"
}
// authHandler is our HTTP Handlerfunc for a Auth request.
func (s *httpService) authHandler(w http.ResponseWriter, req *http.Request) {
var requestPayload AuthPayload
err := s.readJSON(w, req, &requestPayload)
if err != nil {
s.errorJSON(w, err)
return
}
log.Println("requestPayload:", requestPayload)
// call our Auth binding
result, err := s.service.Auth(req.Context(), requestPayload)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// return the result
s.writeJSON(w, http.StatusAccepted, result)
}
// NewHTTPHandler returns a new HTTP handler our svc2.
func NewHTTPHandler(tracer opentracing.Tracer, service Service) http.Handler {
// Create our HTTP Service.
svc := &httpService{
service: service}
// Create the mux.
mux := http.NewServeMux()
// Create the Auth handler.
var authHandler http.Handler
authHandler = http.HandlerFunc(svc.authHandler)
// Wrap the Auth handler with our tracing middleware.
authHandler = middleware.FromHTTPRequest(tracer, "Auth")(authHandler)
// Wire up the mux.
mux.Handle("/auth/", authHandler)
// Return the mux.
return mux
}
service.go
package auth
import (
"context"
"errors"
)
// Service interface to our svc2 service.
type Service interface {
Auth(ctx context.Context, a AuthPayload) (jsonResponse, error)
}
二、middleware.go
自定义middleware.go,context 传递 Http 请求
// Package middleware provides some usable transport middleware to deal with
// propagating Zipkin traces across service boundaries.
package middleware
import (
"fmt"
"net"
"net/http"
"strconv"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
)
// RequestFunc is a middleware function for outgoing HTTP requests.
type RequestFunc func(req *http.Request) *http.Request
// ToHTTPRequest returns a RequestFunc that injects an OpenTracing Span found in
// context into the HTTP Headers. If no such Span can be found, the RequestFunc
// is a noop.
func ToHTTPRequest(tracer opentracing.Tracer) RequestFunc {
return func(req *http.Request) *http.Request {
// Retrieve the Span from context.
if span := opentracing.SpanFromContext(req.Context()); span != nil {
// We are going to use this span in a client request, so mark as such.
ext.SpanKindRPCClient.Set(span)
// Add some standard OpenTracing tags, useful in an HTTP request.
ext.HTTPMethod.Set(span, req.Method)
span.SetTag("http.host", req.URL.Host)
span.SetTag("http.path", req.URL.Path)
ext.HTTPUrl.Set(
span,
fmt.Sprintf("%s://%s%s", req.URL.Scheme, req.URL.Host, req.URL.Path),
)
// Add information on the peer service we're about to contact.
if host, portString, err := net.SplitHostPort(req.URL.Host); err == nil {
ext.PeerHostname.Set(span, host)
if port, err := strconv.Atoi(portString); err != nil {
ext.PeerPort.Set(span, uint16(port))
}
} else {
ext.PeerHostname.Set(span, req.URL.Host)
}
// Inject the Span context into the outgoing HTTP Request.
if err := tracer.Inject(
span.Context(),
opentracing.TextMap,
opentracing.HTTPHeadersCarrier(req.Header),
); err != nil {
fmt.Printf("error encountered while trying to inject span: %+v\n", err)
}
}
return req
}
}
// HandlerFunc is a middleware function for incoming HTTP requests.
type HandlerFunc func(next http.Handler) http.Handler
// FromHTTPRequest returns a Middleware HandlerFunc that tries to join with an
// OpenTracing trace found in the HTTP request headers and starts a new Span
// called operationName. If no trace could be found in the HTTP request
// headers, the Span will be a trace root. The Span is incorporated in the
// HTTP Context object and can be retrieved with
// opentracing.SpanFromContext(ctx).
func FromHTTPRequest(tracer opentracing.Tracer, operationName string,
) HandlerFunc {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
// Try to join to a trace propagated in req.
wireContext, err := tracer.Extract(
opentracing.TextMap,
opentracing.HTTPHeadersCarrier(req.Header),
)
if err != nil {
fmt.Printf("error encountered while trying to extract span: %+v\n", err)
}
// create span
span := tracer.StartSpan(operationName, ext.RPCServerOption(wireContext))
defer span.Finish()
// store span in context
ctx := opentracing.ContextWithSpan(req.Context(), span)
// update request context to include our new span
req = req.WithContext(ctx)
// next middleware or actual request handler
next.ServeHTTP(w, req)
})
}
}
三、implementation.go
处理实现验证服务
package auth
import (
"broker-service/auth-service/data"
"context"
"database/sql"
"fmt"
"log"
"github.com/opentracing/opentracing-go"
)
// Auth is our actual service implementation.
type auth struct {
DB *sql.DB
Models data.Models
}
// NewService returns a new implementation of our Service.
func NewService(db *sql.DB, models data.Models) Service {
return &auth{
DB: db,
Models: models,
}
}
// Auth implements our Service interface.
func (auth *auth) Auth(ctx context.Context, a AuthPayload) (jsonResponse, error) {
var jsonResp jsonResponse
jsonResp.Error = true
jsonResp.Message = "Auth fialed"
// Pull span from context.
span := opentracing.SpanFromContext(ctx)
// Example binary annotations.
span.SetTag("service", "auth")
span.SetTag("AuthPayload", a)
user, err := auth.Models.User.GetByEmail(span, a.Email)
if err != nil {
log.Println("get user failed from db: ", err)
span.SetTag("error", err.Error())
return jsonResp, err
}
log.Println("user:", user)
valid, err := user.PasswordMatches(a.Password)
if err != nil || !valid {
log.Println("invalid user: ", err)
span.SetTag("error", err.Error())
return jsonResp, err
}
jsonResp = jsonResponse{
Error: false,
Message: fmt.Sprintf("Logged in user %s", user.Email),
Data: user,
}
log.Println("auth response: ", jsonResp)
return jsonResp, nil
}
四、httpclinet.go
通过client 向服务发送验证请求(由 broker-service 调用)
package auth
import (
"bytes"
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"broker-service/middleware"
opentracing "github.com/opentracing/opentracing-go"
)
// client is our actual client implementation
type client struct {
baseURL string
httpClient *http.Client
tracer opentracing.Tracer
traceRequest middleware.RequestFunc
}
// Auth implements our Service interface.
func (c *client) Auth(ctx context.Context, a AuthPayload) (data jsonResponse, err error) {
// create new span using span found in context as parent (if none is found,
// our span becomes the trace root).
span, ctx := opentracing.StartSpanFromContext(ctx, "Auth")
defer span.Finish()
log.Println("auth: ", a)
jsonData, _ := json.Marshal(a)
url := fmt.Sprintf("%s/auth/", c.baseURL)
var payload jsonResponse
payload.Error = true
payload.Message = "Authenticatioin failed!"
// create the HTTP request
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
span.SetTag("error", err.Error())
return payload, err
}
// use our middleware to propagate our trace
req = c.traceRequest(req.WithContext(ctx))
// execute the HTTP request
resp, err := c.httpClient.Do(req)
if err != nil {
// annotate our span with the error condition
span.SetTag("error", err.Error())
return
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusAccepted {
err = json.NewDecoder(resp.Body).Decode(&data)
log.Println("result: ", data)
if err != nil {
span.SetTag("error", err.Error())
return
}
if data.Error {
span.SetTag("error", data.Error)
return
}
return data, nil
}
return
}
// NewHTTPClient returns a new client instance to our auth using the HTTP
// transport.
func NewHTTPClient(tracer opentracing.Tracer, baseURL string) Service {
return &client{
baseURL: baseURL,
httpClient: &http.Client{
},
tracer: tracer,
traceRequest: middleware.ToHTTPRequest(tracer),
}
}