package main import ( "context" "log/slog" "os" "os/signal" "runtime" "syscall" tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api/v5" "git.maximotejeda.com/maximo/us-dop-bot/config" "git.maximotejeda.com/maximo/us-dop-bot/internal/adapters/dolar" "git.maximotejeda.com/maximo/us-dop-bot/internal/adapters/user" "git.maximotejeda.com/maximo/us-dop-bot/internal/application/api" "git.maximotejeda.com/maximo/us-dop-bot/internal/application/broadcaster" "git.maximotejeda.com/maximo/us-dop-bot/internal/ports" "golang.org/x/sync/semaphore" "github.com/nats-io/nats.go" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) var ( maxWorkers = runtime.GOMAXPROCS(0) sem = semaphore.NewWeighted(int64(maxWorkers) * 2) ) func main() { log := slog.New(slog.NewJSONHandler(os.Stderr, nil)) log = log.With("location", "main") nc, _ := nats.Connect(config.GetNatsURI()) ctx := context.Background() bot, err := tgbotapi.NewBotAPI(config.GetToken()) if err != nil { log.Error("token not found", "error", err) panic(err) } botName := bot.Self.UserName bot.Debug = config.GetEnvironment() == "development" log.Info("Bot Authorized", "username", botName) log.Info("Initiated with a concurrency limit", "max concurrency", maxWorkers*2) u := tgbotapi.NewUpdate(0) u.Timeout = 60 // bot user update channel updtChan := bot.GetUpdatesChan(u) // subs chann changeChan := make(chan *nats.Msg, 64) broadcastChan := make(chan *nats.Msg, 64) defer close(changeChan) defer close(broadcastChan) sub, err := nc.ChanSubscribe("dolar-bot-change", changeChan) if err != nil { log.Error("subscribing", "error", err.Error()) } info, err := nc.ChanSubscribe("dolar-bot", broadcastChan) if err != nil { log.Error("subscribing", "error", err.Error()) } defer sub.Drain() defer info.Drain() defer nc.Close() // exit channel sign := make(chan os.Signal, 1) signal.Notify(sign, syscall.SIGINT, syscall.SIGTERM) defer close(sign) app := api.NewApi(bot) // check for bot in db for { select { case update := <-updtChan: if err = sem.Acquire(ctx, 1); err != nil { bot.Send(tgbotapi.NewMessage(update.FromChat().ID, "error adquiring update")) continue } go func() { defer sem.Release(1) dol, user, dolarConn, userConn := CreateAdaptersGRPC() app.Run(&update, dol, user) dolarConn.Close() userConn.Close() }() case message := <-changeChan: log.Info("broadcasting Change") dol, user, dolarConn, userConn := CreateAdaptersGRPC() bcast := broadcaster.NewBroadCast(ctx, user, dol, message.Data) userList := bcast.SendList() for _, msg := range userList { go bot.Send(msg) } dolarConn.Close() userConn.Close() case message := <-broadcastChan: dol, user, dolarConn, userConn := CreateAdaptersGRPC() bcast := broadcaster.NewBroadCast(ctx, user, dol, message.Data) msgs := bcast.SendAllUsers(ctx, log, message.Data, bot.Self.UserName) log.Info("broadcast", "data", string(message.Data), "msg", msgs) for _, msg := range msgs { go bot.Send(msg) } dolarConn.Close() userConn.Close() case <-sign: log.Error("killing app due to syscall ") os.Exit(1) } } } func CreateAdaptersGRPC() (ports.DolarService, ports.UserService, *grpc.ClientConn, *grpc.ClientConn) { log := slog.Default() // we are outside update so we will be querying db to // get users interested in specific updates ex bpd, brd, apa // userID inst=> comma separated string var opts []grpc.DialOption opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) dolarConn, err := grpc.NewClient(config.GetDollarServiceURL(), opts...) if err != nil { log.Error("creating gerpc conn", "error", err) panic(err) } userConn, err := grpc.NewClient(config.GetUserServiceURL(), opts...) if err != nil { log.Error("creating gerpc conn", "error", err) panic(err) } dol, err := dolar.NewAdapter(dolarConn) if err != nil { log.Error("creating service adapter", "error", err) panic(err) } user, err := user.NewAdapter(userConn) if err != nil { log.Error("creating service adapter", "error", err) panic(err) } return dol, user, dolarConn, userConn }