Press "Enter" to skip to content

「推荐系统从0到1」服务发现

本站内容均来自兴趣收集,如不慎侵害的您的相关权益,请留言告知,我们将尽快删除.谢谢.

奥利给!

 

前沿

 

首先谈谈我对推荐系统的引擎和算法的理解。

 

现在市面上讲起推荐系统,大多都是讲各种算法,讲的天花乱坠,高深莫测,其实很多算法都是大同小异,核心思想是差不多的,只不过实现手段略有差异。而在工业上,各种复杂算法能够落地的,我认为不多,大部分的厂商,运用的算法都是很集中的那一部分算法。

 

一套好的推荐系统,对于引擎是非常依赖的,实验显示,响应时长与各项指标之间都是有直接关联的,响应时长越长,指标越低。

 

作为一个朴实的推荐码农,我还是想从基础做起,朴朴实实,脚踏实地,先把引擎部分做好。当然,算法后面也会有,毕竟引擎和算法缺一不可。

 

那幺,废话少说,推荐引擎,搞起吧!

 

服务发现

 

既然是搞引擎,也就是后端,当然要先把架构先搭建起来。

 

后端服务,微服务已经成为了当前的主流,具有非常多的优点,比如高内聚,单独部署,各自负载均衡等,当然缺点也有,通信更复杂了等。具体就不在这里展开了,有兴趣的兄弟们可以百度,google一下。

 

而微服务之间的通信中,客户端如何确定服务端的地址,就需要服务发现了。

 

在整个流程中,可以分为服务端的要做的,以及客户端要做的,下面依次来看一下。

 

服务端比较简单,只需要将自己的信息存储到某个存储中。

 

客户端呢,首先要从存储中拿到服务端信息列表,然后根据一些负载均衡的原则,选择一个地址,最终来调用。

 

是不是原理上非常简单!那幺进入实操吧!

 

etcd介绍

 

以前有zookeeper,而zookeeper可以看到,早就不再维护更新了。

 

而etcd,用go语言开发,因kubernetes而闻名,在kubernetes中,使用etcd作为分布式存储获取分布式锁。

 

所以我们当然用更年轻,更轻量,并且也非常稳定的etcd搞了!就是这幺喜新厌旧= =

 

etcd使用raft算法实现的一致性,至于raft算法,可以看下面这个动画演示,很完美生动。

 

raft动画演示

 

etcd实战

 

我这边用docker来做自己的测试环境,上我的docker-compose.yaml

 

version: '2.2'
services:
  etcd:
    image: gcr.io/etcd-development/etcd:v3.4.13
    container_name: etcd
    restart: always
    ports:
      - 2379:2379
      - 2380:2380
    command:
      - "/usr/local/bin/etcd"
      - "--name"
      - "s1"
      - "--data-dir"
      - "/etcd-data"
      - "--advertise-client-urls"
      - "http://0.0.0.0:2379"
      - "--listen-client-urls"
      - "http://0.0.0.0:2379"
      - "--initial-advertise-peer-urls"
      - "http://0.0.0.0:2380"
      - "--listen-peer-urls"
      - "http://0.0.0.0:2380"
      - "--initial-cluster-token"
      - "tkn"
      - "--initial-cluster"
      - "s1=http://0.0.0.0:2380"
      - "--initial-cluster-state"
      - "new"

 

如果想通过其他途径安装可以看官方的说明:

 

安装etcd

 

那幺,既然是存储,我们就来测试一下CRUD吧,还有etcd的租约功能。

 

CRUD:

 

# etcdctl put test/key hello
OK
# etcdctl get test/key
test/key
hello
# etcdctl put test/key goodbye
OK
# etcdctl get test/key
test/key
goodbye
# etcdctl del test/key
1
# etcdctl get test/key

 

 

创建租约,120s过期

 

# etcdctl lease grant 120
lease 3f3575c45fa5ff26 granted with TTL(120s)

 

查看租约列表

 

# etcdctl lease list
found 1 leases
3f3575c45fa5ff26

 

新建kv,并绑定租约

 

# etcdctl put test/key hello --lease="3f3575c45fa5ff26"
OK

 

查看租约下的key剩余时间

 

# etcdctl lease timetolive 3f3575c45fa5ff26 --keys
lease 3f3575c45fa5ff26 granted with TTL(120s), remaining(46s), attached keys([test/key])

 

查看还存在的key

 

# etcdctl get --prefix ""
test/key
hello

 

等租约过期后,查看key,key已被自动删除

 

# etcdctl lease timetolive 3f3575c45fa5ff26 --keys
lease 3f3575c45fa5ff26 already expired
# etcdctl get --prefix ""

 

租约续约:

 

同样创建租约,绑定kv

 

# etcdctl lease grant 30
lease 3f3575c45fa5ff2c granted with TTL(30s)
# etcdctl put test/key hello --lease="3f3575c45fa5ff2c"
OK

 

续约

 

# etcdctl lease keep-alive 3f3575c45fa5ff2c
lease 3f3575c45fa5ff2c keepalived with TTL(30)
lease 3f3575c45fa5ff2c keepalived with TTL(30)
lease 3f3575c45fa5ff2c keepalived with TTL(30)
lease 3f3575c45fa5ff2c keepalived with TTL(30)

 

打开个新窗口查看租约与key

 

# etcdctl lease timetolive 3f3575c45fa5ff2c --keys
lease 3f3575c45fa5ff2c granted with TTL(30s), remaining(23s), attached keys([test/key])
# etcdctl get --prefix ""
test/key
hello

 

发现并没有过期。

 

golang+grpc+etcd 服务发现终极实战!

 

先上github仓库: github仓库

 

代码目录/go_server/src/lib/discovery/

 

说一下整个流程:

 

服务端向etcd注册服务,就是将本服务的信息写进etcd。

 

客户端大体流程:

 

 

    1. 从etcd取服务端地址列表,并watch列表变化,并更新。

 

    1. 把地址列表写进grpc resolver的resolver.ClientConn的地址列表中。

 

    1. grpc创建连接,根据负载均衡请求。

 

 

整个模块分为7个文件:

config.go,配置文件。
discovery.go,用于初始化。
register.go,用于服务注册。
resolver.go,用于解析etcd里注册的服务地址,以及grpc负载均衡。
util.go,公共方法。
wrapper.go,对外部提供的调用封装。
ctx.go,context,设置超时时间。

config.go

 

package config
import "time"
// etcd
const (
    Timeout        = 15 * time.Second
    Expires        = 10
    TickerInterval = 5
    // scheme
    Scheme = "etcd"
    // etcd中存储key的格式前缀:/scheme/authority/endpoint
    DirFormat = "/%s/%s/%s/"
    // grpc resolver中自定义解析需要提供的格式:scheme://authority/endpoint
    // 其中scheme可以理解为解析策略,authority可以理解为权限管理,endpoint为地址
    TargetFormat = "%s://%s/%s"
)
// server name
const (
    GreetServer = "greet_server"
)

 

discovery.go

 

package discovery
import (
    "fmt"
    "go_server/src/lib/discovery/config"
    "go_server/src/lib/logger"
    "strings"
    "go.etcd.io/etcd/clientv3"
)
var (
    client *clientv3.Client
)
// Init 初始化etcd
func Init(etcdAddr string) error {
    var err error
    if client == nil {
        //构建etcd client
        client, err = clientv3.New(clientv3.Config{
            Endpoints:   strings.Split(etcdAddr, ";"),
            DialTimeout: config.Timeout,
        })
        if err != nil {
            logger.Error("连接etcd失败:%s\n", err)
            fmt.Printf("连接etcd失败:%s\n", err)
            return err
        }
    }
    return nil
}

 

register.go

 

package discovery
import (
    "context"
    "errors"
    "fmt"
    "go_server/src/lib/discovery/config"
    "os"
    "os/signal"
    "syscall"
    "time"
    "go.etcd.io/etcd/clientv3"
)
//Service 服务端用于服务注册的对象
type Service struct {
    Name string //服务名称
    Host string //{ip}:{port}
    Env  string //所属环境
    Key string //保存在etcd中的key
}
var service *Service
func (s *Service) register() error {
    if s.Env == "" {
        return errors.New("env is null")
    }
    s.Key = fmt.Sprintf(config.DirFormat, config.Scheme, s.Env, s.Name) + s.Host
    ticker := time.NewTicker(time.Second * time.Duration(config.TickerInterval))
    go func() {
        for {
            resp, err := client.Get(context.Background(), s.Key)
            if err != nil {
                fmt.Printf("获取服务地址失败:%s", err)
            } else if resp.Count == 0 { //尚未注册
                err = s.keepAlive()
                if err != nil {
                    fmt.Printf("保持连接失败:%s", err)
                }
            }
            <-ticker.C
        }
    }()
    return nil
}
// keepAlive 创建租约,绑定,并续期
func (s *Service) keepAlive() error {
    //创建租约
    leaseResp, err := client.Grant(context.Background(), config.Expires)
    if err != nil {
        fmt.Printf("创建租期失败:%s\n", err)
        return err
    }
    //将服务地址注册到etcd中
    _, err = client.Put(context.Background(), s.Key, s.Host, clientv3.WithLease(leaseResp.ID))
    if err != nil {
        fmt.Printf("注册服务失败:%s", err)
        return err
    }
    //租约续期
    ch, err := client.KeepAlive(context.Background(), leaseResp.ID)
    if err != nil {
        fmt.Printf("租约续期失败:%s\n", err)
        return err
    }
    //清空keepAlive返回的channel
    go func() {
        for {
            <-ch
        }
    }()
    return nil
}
//取消注册
func (s *Service) unRegister() {
    if client != nil {
        _, _ = client.Delete(context.Background(), s.Key)
    }
}
func WaitForClose() {
    ch := make(chan os.Signal, 1)
    signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
    sig := <-ch
    service.unRegister()
    if i, ok := sig.(syscall.Signal); ok {
        os.Exit(int(i))
    } else {
        os.Exit(0)
    }
}

 

resolver.go

 

package discovery
import (
    "context"
    "fmt"
    "go_server/src/lib/discovery/config"
    "strings"
    "go.etcd.io/etcd/clientv3"
    "google.golang.org/grpc"
    "google.golang.org/grpc/resolver"
)
//EtcdResolver解析器
type EtcdResolver struct {
    dir        string
    clientConn resolver.ClientConn
}
func Resolver(env string, name string) *grpc.ClientConn {
    //注册etcd解析器
    r := &EtcdResolver{}
    resolver.Register(r)
    target := fmt.Sprintf(config.TargetFormat, r.Scheme(), env, name)
    //客户端连接服务器(负载均衡:轮询) 会同步调用r.Build()
    dailOpts := []grpc.DialOption{
        grpc.WithBalancerName("round_robin"), // grpc内部提供的轮询负载均衡
        grpc.WithInsecure(),
        grpc.WithDefaultCallOptions(
            grpc.MaxCallRecvMsgSize(1024 * 1024 * 16),
        ),
    }
    conn, err := grpc.Dial(target, dailOpts...)
    if err != nil {
        fmt.Println("连接服务器失败:", err)
    }
    return conn
}
func (r *EtcdResolver) Scheme() string {
    return config.Scheme
}
//构建解析器 grpc.Dial()同步调用
func (r *EtcdResolver) Build(target resolver.Target, clientConn resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
    r.clientConn = clientConn
    r.dir = fmt.Sprintf(config.DirFormat, target.Scheme, target.Authority, target.Endpoint)
    go r.watch()
    return r, nil
}
//监听etcd中某个key前缀的服务地址列表的变化
func (r *EtcdResolver) watch() {
    //初始化服务地址列表
    var addrList []resolver.Address
    resp, err := client.Get(context.Background(), r.dir, clientv3.WithPrefix())
    if err != nil {
        fmt.Println("获取服务地址列表失败:", err)
    } else {
        for i := range resp.Kvs {
            fmt.Println(strings.TrimPrefix(string(resp.Kvs[i].Key), r.dir))
            addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(resp.Kvs[i].Key), r.dir)})
        }
    }
    r.clientConn.NewAddress(addrList)
    //监听服务地址列表的变化
    rch := client.Watch(context.Background(), r.dir, clientv3.WithPrefix())
    for n := range rch {
        for _, ev := range n.Events {
            addr := strings.TrimPrefix(string(ev.Kv.Key), r.dir)
            switch ev.Type {
            case clientv3.EventTypePut:
                if !exists(addrList, addr) {
                    addrList = append(addrList, resolver.Address{Addr: addr})
                    r.clientConn.NewAddress(addrList)
                }
            case clientv3.EventTypeDelete:
                if s, ok := remove(addrList, addr); ok {
                    addrList = s
                    r.clientConn.NewAddress(addrList)
                }
            }
        }
    }
}
func exists(l []resolver.Address, addr string) bool {
    for i := range l {
        if l[i].Addr == addr {
            return true
        }
    }
    return false
}
func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
    for i := range s {
        if s[i].Addr == addr {
            s[i] = s[len(s)-1]
            return s[:len(s)-1], true
        }
    }
    return nil, false
}
//Close ...
func (r *EtcdResolver) Close() {}
//ResolveNow ...
func (r *EtcdResolver) ResolveNow(_ resolver.ResolveNowOption) {}

 

util.go

 

package discovery
import (
    "fmt"
    "net"
)
// 获取本机ip地址
func getIntranetIP() (ip string) {
    if addrs, err := net.InterfaceAddrs(); err == nil {
        for _, address := range addrs {
            if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
                if ipnet.IP.To4() != nil {
                    ip = ipnet.IP.String()
                    break
                }
            }
        }
    }
    return
}
// 自动获取本机的ip以及端口号,ip:port格式
func getListener() (listener net.Listener, host string, err error) {
    host = "0.0.0.0:0"
    listener, err = net.Listen("tcp", host)
    if err == nil {
        addr := listener.Addr().String()
        _, portString, _ := net.SplitHostPort(addr)
        host = fmt.Sprintf("%s:%s", getIntranetIP(), portString)
    }
    return
}

 

wrapper.go

 

package discovery
import (
    "fmt"
    "go_server/src/lib/discovery/config"
    "go_server/src/lib/proto/greet"
    "google.golang.org/grpc"
)
func GreetRegister(env string, server greet.GreetServer) error {
    listener, host, err := getListener()
    if err != nil {
        fmt.Println("监听网络失败:", err)
        return err
    }
    fmt.Println("host:", host)
    srv := grpc.NewServer()
    go srv.Serve(listener)
    greet.RegisterGreetServer(srv, server)
    service = &Service{Name: config.GreetServer, Host: host, Env: env}
    err = service.register()
    if err != nil {
        fmt.Println(err)
        return err
    }
    return nil
}
func GreetResolve(env string) greet.GreetClient {
    return greet.NewGreetClient(Resolver(env, config.GreetServer))
}

 

ctx.go

 

package discovery
import (
    "context"
    "time"
)
// 1s超时
func Context1s() (ctx context.Context, cancel context.CancelFunc) {
    return context.WithTimeout(context.TODO(), time.Second)
}

 

测试一下吧,测试文件也都在github仓库里:

 

搞个测试的proto,server和client,也直接上代码:

 

greet.proto

 

syntax = "proto3";

option go_package = "src/lib/proto/greet";
service Greet {
  rpc Morning(GreetRequest)returns(GreetResponse){}
  rpc Night(GreetRequest)returns(GreetResponse){}
}
message GreetRequest {
  string name = 1;
}
message GreetResponse {
  string message = 1;
  string from = 2;
}

 

server main.go

 

package main
import (
    "context"
    "flag"
    "fmt"
    "go_server/src/lib/discovery"
    proto "go_server/src/lib/proto/greet"
)
var (
    Flag     = flag.String("flag", "a", "flag")
    EtcdAddr = flag.String("EtcdAddr", "127.0.0.1:2379", "register etcd address")
    Env      = flag.String("Env", "test", "env")
)
//rpc服务接口
type GreetServer struct{}
func (gs *GreetServer) Morning(ctx context.Context, req *proto.GreetRequest) (*proto.GreetResponse, error) {
    fmt.Printf("Morning : %s\n", req.Name)
    return &proto.GreetResponse{
        Message: "Good morning, " + req.Name,
        From:    *Flag,
    }, nil
}
func (gs *GreetServer) Night(ctx context.Context, req *proto.GreetRequest) (*proto.GreetResponse, error) {
    fmt.Printf("Night 调用: %s\n", req.Name)
    return &proto.GreetResponse{
        Message: "Good night, " + req.Name,
        From:    *Flag,
    }, nil
}
func main() {
    flag.Parse()
    err := discovery.Init(*EtcdAddr)
    if err != nil {
        fmt.Println(err)
        return
    }
    err = discovery.GreetRegister(*Env, &GreetServer{})
    if err != nil {
        fmt.Println(err)
        return
    }
    discovery.WaitForClose()
}

 

client main.go

 

package main
import (
    "flag"
    "fmt"
    "go_server/src/lib/discovery"
    proto "go_server/src/lib/proto/greet"
    "time"
)
var (
    EtcdAddr = flag.String("EtcdAddr", "127.0.0.1:2379", "register etcd address")
    Env      = flag.String("Env", "test", "env")
)
func main() {
    flag.Parse()
    err := discovery.Init(*EtcdAddr)
    if err != nil {
        fmt.Println(err)
        return
    }
    c := discovery.GreetResolve(*Env)
    ticker := time.NewTicker(1 * time.Second)
    for range ticker.C {
        fmt.Println("Morning 调用...")
        ctx, cancel := discovery.Context1s()
        resp1, err := c.Morning(
            ctx,
            &proto.GreetRequest{Name: "Jinfeng"},
        )
        cancel()
        if err != nil {
            fmt.Println("Morning调用失败:", err)
            return
        }
        fmt.Printf("Morning :%s,来自:%s\n", resp1.Message, resp1.From)
        fmt.Println("Night 调用...")
        ctx, cancel = discovery.Context1s()
        resp2, err := c.Night(
            ctx,
            &proto.GreetRequest{Name: "Jinfeng"},
        )
        cancel()
        if err != nil {
            fmt.Println("Night调用失败:", err)
            return
        }
        fmt.Printf("Night 响应:%s,来自:%s\n", resp2.Message, resp2.From)
    }
}

 

跑起来吧,起3个server,可以看到,在etcd已经注册了3台服务。

 

# etcdctl get --prefix ""
/etcd/test/greet_server/192.168.31.71:52963
192.168.31.71:52963
/etcd/test/greet_server/192.168.31.71:52969
192.168.31.71:52969
/etcd/test/greet_server/192.168.31.71:52973
192.168.31.71:52973

 

client调用

 

➜  client git:(main) ✗ go run .
192.168.31.71:52963
192.168.31.71:52969
192.168.31.71:52973
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:c
Night 调用...
Night 响应:Good night, Jinfeng,来自:a
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:b
Night 调用...
Night 响应:Good night, Jinfeng,来自:c
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:a
Night 调用...
Night 响应:Good night, Jinfeng,来自:b
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:c

 

shutdown一台服务

 

Morning 响应:Good morning, Jinfeng,来自:a
Night 调用...
Night 响应:Good night, Jinfeng,来自:b
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:a
Night 调用...
Night 响应:Good night, Jinfeng,来自:b
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:a
Night 调用...
Night 响应:Good night, Jinfeng,来自:b

 

重新启动

 

Morning 响应:Good morning, Jinfeng,来自:a
Night 调用...
Night 响应:Good night, Jinfeng,来自:b
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:c
Night 调用...
Night 响应:Good night, Jinfeng,来自:a
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:b
Night 调用...
Night 响应:Good night, Jinfeng,来自:c
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:a
Night 调用...
Night 响应:Good night, Jinfeng,来自:b

 

这一轮,只是用grpc内部简单的轮训来做负载均衡,后面有空了,再加入一致性哈希等方法吧!

 

到现在,服务发现已经有了,下面就可以先做一个简单的推荐系统,把流程跑起来了!

 

后面计划先做一个只有简单召回的推荐系统,然后再慢慢优化整套系统。

 

兄弟们,奥利给!

 

原文链接

 

有疑问加站长微信联系(非本文作者)

Be First to Comment

发表评论

电子邮件地址不会被公开。 必填项已用*标注