Some checks failed
dev test / test (push) Successful in 11s
dev test / vulnCheck (push) Successful in 15s
dev test / Ci-Lint (push) Failing after 46s
${{ github.actor }} executed Build Push Prod / build (push) Successful in 4m3s
${{ github.actor }} executed Build Push Prod / deploy (push) Successful in 18s
150 lines
4.1 KiB
Go
150 lines
4.1 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"os"
|
|
"os/signal"
|
|
"runtime"
|
|
"syscall"
|
|
|
|
tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api/v5"
|
|
"git.maximotejeda.com/maximo/us-dop-bot/config"
|
|
"git.maximotejeda.com/maximo/us-dop-bot/internal/adapters/dolar"
|
|
"git.maximotejeda.com/maximo/us-dop-bot/internal/adapters/user"
|
|
"git.maximotejeda.com/maximo/us-dop-bot/internal/application/api"
|
|
"git.maximotejeda.com/maximo/us-dop-bot/internal/application/broadcaster"
|
|
"git.maximotejeda.com/maximo/us-dop-bot/internal/ports"
|
|
"golang.org/x/sync/semaphore"
|
|
|
|
"github.com/nats-io/nats.go"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
)
|
|
|
|
var (
|
|
maxWorkers = runtime.GOMAXPROCS(0)
|
|
sem = semaphore.NewWeighted(int64(maxWorkers) * 2)
|
|
)
|
|
|
|
func main() {
|
|
|
|
log := slog.New(slog.NewJSONHandler(os.Stderr, nil))
|
|
log = log.With("location", "main")
|
|
nc, _ := nats.Connect(config.GetNatsURI())
|
|
ctx := context.Background()
|
|
|
|
bot, err := tgbotapi.NewBotAPI(config.GetToken())
|
|
if err != nil {
|
|
log.Error("token not found", "error", err)
|
|
panic(err)
|
|
}
|
|
|
|
botName := bot.Self.UserName
|
|
|
|
bot.Debug = config.GetEnvironment() == "development"
|
|
log.Info("Bot Authorized", "username", botName)
|
|
log.Info("Initiated with a concurrency limit", "max concurrency", maxWorkers*2)
|
|
u := tgbotapi.NewUpdate(0)
|
|
u.Timeout = 60
|
|
|
|
// bot user update channel
|
|
updtChan := bot.GetUpdatesChan(u)
|
|
// subs chann
|
|
changeChan := make(chan *nats.Msg, 64)
|
|
broadcastChan := make(chan *nats.Msg, 64)
|
|
defer close(changeChan)
|
|
defer close(broadcastChan)
|
|
sub, err := nc.ChanSubscribe("dolar-bot-change", changeChan)
|
|
if err != nil {
|
|
log.Error("subscribing", "error", err.Error())
|
|
}
|
|
info, err := nc.ChanSubscribe("dolar-bot", broadcastChan)
|
|
if err != nil {
|
|
log.Error("subscribing", "error", err.Error())
|
|
}
|
|
defer sub.Drain()
|
|
defer info.Drain()
|
|
defer nc.Close()
|
|
|
|
// exit channel
|
|
sign := make(chan os.Signal, 1)
|
|
signal.Notify(sign, syscall.SIGINT, syscall.SIGTERM)
|
|
defer close(sign)
|
|
app := api.NewApi(bot)
|
|
// check for bot in db
|
|
for {
|
|
select {
|
|
case update := <-updtChan:
|
|
if err = sem.Acquire(ctx, 1); err != nil {
|
|
bot.Send(tgbotapi.NewMessage(update.FromChat().ID, "error adquiring update"))
|
|
continue
|
|
}
|
|
go func() {
|
|
defer sem.Release(1)
|
|
dol, user, dolarConn, userConn := CreateAdaptersGRPC()
|
|
app.Run(&update, dol, user)
|
|
dolarConn.Close()
|
|
userConn.Close()
|
|
}()
|
|
case message := <-changeChan:
|
|
log.Info("broadcasting Change")
|
|
dol, user, dolarConn, userConn := CreateAdaptersGRPC()
|
|
|
|
bcast := broadcaster.NewBroadCast(ctx, user, dol, message.Data)
|
|
userList := bcast.SendList()
|
|
|
|
for _, msg := range userList {
|
|
go bot.Send(msg)
|
|
}
|
|
dolarConn.Close()
|
|
userConn.Close()
|
|
|
|
case message := <-broadcastChan:
|
|
dol, user, dolarConn, userConn := CreateAdaptersGRPC()
|
|
|
|
bcast := broadcaster.NewBroadCast(ctx, user, dol, message.Data)
|
|
msgs := bcast.SendAllUsers(ctx, log, message.Data, bot.Self.UserName)
|
|
log.Info("broadcast", "data", string(message.Data), "msg", msgs)
|
|
for _, msg := range msgs {
|
|
go bot.Send(msg)
|
|
}
|
|
dolarConn.Close()
|
|
userConn.Close()
|
|
case <-sign:
|
|
log.Error("killing app due to syscall ")
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
}
|
|
|
|
func CreateAdaptersGRPC() (ports.DolarService, ports.UserService, *grpc.ClientConn, *grpc.ClientConn) {
|
|
log := slog.Default()
|
|
// we are outside update so we will be querying db to
|
|
// get users interested in specific updates ex bpd, brd, apa
|
|
// userID inst=> comma separated string
|
|
var opts []grpc.DialOption
|
|
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
dolarConn, err := grpc.NewClient(config.GetDollarServiceURL(), opts...)
|
|
if err != nil {
|
|
log.Error("creating gerpc conn", "error", err)
|
|
panic(err)
|
|
}
|
|
userConn, err := grpc.NewClient(config.GetUserServiceURL(), opts...)
|
|
if err != nil {
|
|
log.Error("creating gerpc conn", "error", err)
|
|
panic(err)
|
|
}
|
|
dol, err := dolar.NewAdapter(dolarConn)
|
|
if err != nil {
|
|
log.Error("creating service adapter", "error", err)
|
|
panic(err)
|
|
}
|
|
user, err := user.NewAdapter(userConn)
|
|
if err != nil {
|
|
log.Error("creating service adapter", "error", err)
|
|
panic(err)
|
|
}
|
|
return dol, user, dolarConn, userConn
|
|
}
|