package db import ( "context" "database/sql" _ "embed" "errors" "fmt" "log/slog" "time" "git.maximotejeda.com/maximo/dolar/internal/adapter/nats" "git.maximotejeda.com/maximo/dolar/internal/application/core/domain" "git.maximotejeda.com/maximo/dolar/internal/ports" _ "modernc.org/sqlite" ) //go:embed schema.sql var schema string type History struct { ID int64 `json:"id"` NameID int64 `json:"name_id"` Compra float64 `json:"compra"` Venta float64 `json:"venta"` Parser string `json:"parser"` Parsed int64 `json:"parsed"` } type Institution struct { ID int64 `json:"id"` Name string `json:"name"` ShortName string `json:"short_name"` Created int64 `json:"created"` } type Adapter struct { db *sql.DB log *slog.Logger nats ports.NATSPort } func NewAdapter(dataSourceURL string, natsConn ports.NATSPort) (*Adapter, error) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() log := slog.Default().With("adapter", "db") pragmas := "cache=shared&_foreign_keys=on&_busy_timeout=3000&_journal_mode=WAL" db, err := sql.Open("sqlite", fmt.Sprintf("file:%s?%s", dataSourceURL, pragmas)) if err != nil { return nil, fmt.Errorf("connecion error: %w", err) } err = db.PingContext(ctx) if err != nil { return nil, fmt.Errorf("ping error: %w", err) } db.SetConnMaxIdleTime(10 * time.Second) CreateTables(db) // TODO create tables and trigers on first run return &Adapter{db: db, log: log, nats: natsConn}, nil } // Schema func CreateTables(db *sql.DB) { _, err := db.Exec(schema) if err != nil { panic(err) } } // Save // TODO func (a *Adapter) Save(history *domain.History) error { if a.db == nil { return fmt.Errorf("nil or empty database") } if history == nil { return fmt.Errorf("nil struct passed [%v]", history) } inst, err := a.GETInstitution(history.Institution.Name) if err != nil { a.log.Error("geting inst ", "error", err) if errors.Is(err, sql.ErrNoRows) { change := domain.NewChange(domain.History{}, *history) message := domain.NewMessage("change adding institution", change, err) if v, ok := a.nats.(*nats.Adapter); ok && v != nil { err = a.nats.NewInstRegistered(message) if err != nil { a.log.Error(err.Error()) } } id, _ := a.ADDInstitution(history.Institution.Name, history.Institution.ShortName) inst = &Institution{ID: id} a.log.Info("Adding institution", "inst", inst) } } // TODO: look for institution inside db // If inst is alredy in db pass ID to History // Else Create a new institution hist := History{ NameID: inst.ID, Compra: history.Compra, Venta: history.Venta, Parser: history.Parser, Parsed: history.Parsed, } lH, err := a.GetLatest(history.Institution.Name) if errors.Is(err, sql.ErrNoRows ) { a.log.Info("adding new item to table: ", "parse", history.Parser, "name", history.Institution.Name) change := domain.NewChange(domain.History{}, *history) message := domain.NewMessage("change adding institution", change, err) if v, ok := a.nats.(*nats.Adapter); ok && v != nil { err = a.nats.ChangeRegistered(message) if err != nil { a.log.Error(err.Error()) } } return a.AddNew(hist, inst.ID) } if history.Compra == lH.Compra && history.Venta == lH.Venta { return nil }else if history.Compra == 0 || history.Venta == 0 { return nil }else { // if one of them changes create a new row a.log.Info("change registered, adding item", "parse", history.Parser, "name", history.Institution.Name, "compra enter", history.Compra, "compra db", hist.Compra, "venta enter", history.Venta, "venta db", hist.Venta) ins, err := a.GETInstitution(history.Institution.Name) if err != nil { return err } change := domain.NewChange(*lH, *history) message := domain.NewMessage("change registered", change, nil) if v, ok := a.nats.(*nats.Adapter); ok && v != nil { a.log.Info("calling nats", "struct", ok) err = a.nats.ChangeRegistered(message) if err != nil { a.log.Error("sending change to nats", "error", err) } } return a.AddNew(hist, int64(ins.ID)) } } // GetLatest // TODO func (a *Adapter) GetLatest(name string) (*domain.History, error) { inst := Institution{} // TODO check inst in db and get latest price hist := &History{} stmtt, err := a.db.Prepare(` SELECT i.id, i.name, i.short_name, h.parser, h.compra, h.venta, h.parsed FROM histories AS h JOIN institutions as i ON h.name_id = i.id JOIN actual_price as ac ON i.id = ac.inst_id AND h.id = ac.hist_id WHERE i.name = ? AND ac.hist_id = h.id;`) if err != nil { a.log.Error("preparing stmtt", "error", err.Error()) return nil, err } defer stmtt.Close() if err := stmtt.QueryRow(name).Scan(&hist.ID, &inst.Name, &inst.ShortName, &hist.Parser, &hist.Compra, &hist.Venta, &hist.Parsed); err != nil { a.log.Error("getting latest", "error", err.Error(), "parser", hist.Parser, "name", name) return nil, err } dHist := &domain.History{ ID: hist.ID, Institution: domain.Institution{Name: inst.Name, ShortName: inst.ShortName}, Compra: hist.Compra, Venta: hist.Venta, Parser: hist.Parser, Parsed: hist.Parsed, } return dHist, nil } // GetSince // TODO func (a *Adapter) GetSince(name string, duration int64) (hists []*domain.History, err error) { tDuration := time.Now().Add(-time.Minute * time.Duration(duration)).Unix() a.log.Info("getsince", "name", name, "duration", duration, "unixDuration", tDuration) stmt, err := a.db.Prepare("SELECT h.id, i.name, h.parser, h.compra, h.venta, h.parsed FROM histories AS h JOIN institutions as i ON h.name_id = i.id WHERE i.name = ? AND h.parsed > ? ORDER BY parsed DESC;") if err != nil { a.log.Error("[GetChangeSince] preparing", "error", err.Error()) return nil, err } defer stmt.Close() rows, err := stmt.Query(name, tDuration) if err != nil { a.log.Error("[GetChangeSince] preparing", "error", err.Error()) return nil, err } defer rows.Close() for rows.Next() { inst := Institution{} hist := History{} if err := rows.Scan(&hist.ID, &inst.Name, &hist.Parser, &hist.Compra, &hist.Venta, &hist.Parsed); err != nil { a.log.Error("[GetChangeSince] scanning", "error", err) return nil, err } hists = append(hists, &domain.History{ ID: hist.ID, Institution: domain.Institution{ Name: inst.Name, }, Compra: hist.Compra, Venta: hist.Venta, Parser: hist.Parser, Parsed: hist.Parsed, }) } return hists, nil } // GetInstByType // Get institutions names if bancos, cajas or agentes is passed func (a *Adapter) GetInstByType(name string) ([]string, error) { var ( stmt *sql.Stmt err error ) switch name { case "bancos": stmt, err = a.db.Prepare(` SELECT i.name FROM institutions AS i JOIN histories AS h ON i.id = h.name_id WHERE (i.name LIKE '%ban%' OR i.name LIKE '%scoti%') AND h.name_id IS NOT NULL `) case "cajas": stmt, err = a.db.Prepare(` SELECT i.name FROM institutions AS i JOIN histories AS h ON i.id = h.name_id WHERE i.name LIKE '%asociacion%' AND h.name_id IS NOT NULL`) case "agentes": stmt, err = a.db.Prepare(` SELECT i.name FROM institutions AS i JOIN histories AS h ON i.id = h.name_id WHERE i.name NOT LIKE '%ban%' AND i.name NOT LIKE '%scoti%' AND i.name NOT LIKE '%asociacion%'`) default: err = fmt.Errorf("name not recognized") } if err != nil { a.log.Error("[inst-GetAll]", "error", err) return nil, err } defer stmt.Close() rows, err := stmt.Query() if err != nil { a.log.Error("[inst-GetAll-stmt]", "error", err) return nil, err } defer rows.Close() insts := []string{} for rows.Next() { inst := "" if err = rows.Scan(&inst); err != nil { return nil, err } if inst == "" { continue } insts = append(insts, inst) } if err := rows.Err(); err != nil { return insts, err } return insts, nil } func (a *Adapter) ADDInstitution(name, short string) (id int64, err error) { stmt, err := a.db.Prepare("INSERT INTO institutions (name, short_name, created) VALUES(?,?,?);") if err != nil { return 0, err } defer stmt.Close() parsed := time.Now().Format(time.DateTime) res, err := stmt.Exec(&name, short, &parsed) if err != nil { return 0, err } id, err = res.LastInsertId() if err != nil { return 0, err } return id, nil } // GETInstitutionByID func (a *Adapter) GETInstitutionByIDS(ids string) (instList []*Institution, err error) { stmtt, err := a.db.Prepare(fmt.Sprintf("SELECT id, name, short_name FROM institutions WHERE id IN (%s)", ids)) if err != nil { a.log.Error("preparing stmt", "error", err.Error()) return nil, err } defer stmtt.Close() rows, err := stmtt.Query() if err != nil { a.log.Error("getting institution", "error", err.Error(), "id", ids) return nil, err } defer rows.Close() for rows.Next() { insti := Institution{} if err = rows.Scan(&insti.ID, &insti.Name, &insti.ShortName); err != nil { a.log.Error("[getting institution scanning]", "error", err.Error(), "id", ids) continue } instList = append(instList, &insti) } return instList, nil } // GETInstitution func (a *Adapter) GETInstitution(name string) (inst *Institution, err error) { inst = &Institution{} stmtt, err := a.db.Prepare("SELECT i.id, i.name, i.short_name FROM institutions AS i WHERE i.name = ?") if err != nil { a.log.Error("preparing stmt", "error", err.Error()) return nil, err } defer stmtt.Close() if err := stmtt.QueryRow(name).Scan(&inst.ID, &inst.Name, &inst.ShortName); err != nil { a.log.Error("getting institution", "error", err.Error(), "short name", inst.ShortName, "name", name) return nil, err } return inst, nil } // AddNew // Add a new row in the dolar table // Will send to nats changes on prices func (a *Adapter) AddNew(row History, id int64) error { stmt, err := a.db.Prepare("INSERT INTO histories (name_id, compra, venta, parser, parsed) VALUES(?,?,?,?,?);") if err != nil { return err } defer stmt.Close() _, err = stmt.Exec(&id, &row.Compra, &row.Venta, &row.Parser, time.Now().Unix()) if err != nil { return err } return nil } func (a Adapter) GetInstByName(name string) (*domain.Institution, error) { inst, err := a.GETInstitution(name) insti := &domain.Institution{ ID: inst.ID, Name: inst.Name, ShortName: inst.ShortName, Created: inst.Created, } return insti, err } func (a Adapter) TGBSubscribe(tgb_id int64, inst_name string) (bool, error) { stmt, err := a.db.Prepare(` INSERT INTO subscriptions(tgb_id, inst_id, created) VALUES(?,(SELECT id FROM institutions WHERE name = ?),strftime('%s', 'now')); `) if err != nil { return false, err } defer stmt.Close() _, err = stmt.Exec(tgb_id, inst_name) if err != nil { return false, err } return true, nil } func (a Adapter) TGBUnsubscribe(tgb_id int64, inst_name string) (bool, error) { stmt, err := a.db.Prepare(` DELETE FROM subscriptions WHERE tgb_id = ? AND inst_id = (SELECT id FROM institutions WHERE name = ?) `) if err != nil { return false, err } defer stmt.Close() _, err = stmt.Exec(tgb_id, inst_name) if err != nil { return false, err } return true, nil } func (a Adapter) TGBGetSubscribedUsers(inst_name string) ([]int64, error) { stmt, err := a.db.Prepare(` SELECT s.tgb_id FROM subscriptions AS s JOIN institutions AS i ON s.inst_id = i.id WHERE i.name = ?; `) if err != nil { return nil, err } defer stmt.Close() rows, err := stmt.Query(inst_name) if err != nil { return nil, err } defer rows.Close() tgbList := []int64{} for rows.Next() { var tgb int64 err := rows.Scan(&tgb) if err != nil { return nil, err } tgbList = append(tgbList, tgb) } return tgbList, nil } func (a Adapter) TGBGetSubscribedInst(tgb_id int64) ([]string, error) { stmt, err := a.db.Prepare(` SELECT i.name FROM subscriptions AS s JOIN institutions AS i ON s.inst_id = i.id WHERE s.tgb_id = ?; `) if err != nil { return nil, err } defer stmt.Close() rows, err := stmt.Query(tgb_id) if err != nil { return nil, err } defer rows.Close() instList := []string{} for rows.Next() { var inst string err := rows.Scan(&inst) if err != nil { return nil, err } instList = append(instList, inst) } return instList, nil }