package main import ( "context" "crypto/sha1" "encoding/hex" "fmt" "os" "time" "github.com/jackc/pgx/v5" ) func getenv(k, fallback string) string { if v := os.Getenv(k); v != "" { return v } return fallback } func buildDSN() string { if url := os.Getenv("DATABASE_URL"); url != "" { return url } host := getenv("POSTGRES_HOST", "database") port := getenv("POSTGRES_PORT", "5432") user := getenv("POSTGRES_USER", "app") pass := getenv("POSTGRES_PASSWORD", "appsecret") name := getenv("POSTGRES_DB", "appdb") return fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable", user, pass, host, port, name) } type FeedMeta struct { ID int URL string } type Article struct { ArticleID string FeedID int Title string Link string Summary string PublishedAt *time.Time } type MarketQuote struct { Instrument string Bid float64 QuotedAt time.Time } func writeVerbindungToSQL(verbindungen [][]Verbindung) error { ctx := context.Background() dsn := buildDSN() conn, err := pgx.Connect(ctx, dsn) if err != nil { return fmt.Errorf("connect db: %w", err) } defer conn.Close(ctx) tx, err := conn.Begin(ctx) if err != nil { return fmt.Errorf("begin tx: %w", err) } defer tx.Rollback(ctx) if _, err := tx.Exec(ctx, "DELETE FROM rmv_data"); err != nil { return fmt.Errorf("clear rmv_data: %w", err) } stmt := `INSERT INTO rmv_data (trip_index, leg_index, linie, abfahrt, ankunft, von, nach) VALUES ($1,$2,$3,$4,$5,$6,$7)` for tripIdx, trip := range verbindungen { for legIdx, leg := range trip { if _, err := tx.Exec(ctx, stmt, tripIdx, legIdx, leg.Linie, leg.Abfahrt, leg.Ankunft, leg.Von, leg.Nach); err != nil { return fmt.Errorf("insert trip %d leg %d: %w", tripIdx, legIdx, err) } } } if err := tx.Commit(ctx); err != nil { return fmt.Errorf("commit tx: %w", err) } return nil } func fetchPendingFeeds(ctx context.Context) ([]FeedMeta, error) { dsn := buildDSN() conn, err := pgx.Connect(ctx, dsn) if err != nil { return nil, fmt.Errorf("connect db: %w", err) } defer conn.Close(ctx) rows, err := conn.Query(ctx, ` SELECT id, url FROM feeds WHERE access = TRUE AND (last_checked IS NULL OR last_checked < now() - interval '5 minutes') `) if err != nil { return nil, fmt.Errorf("query feeds: %w", err) } defer rows.Close() var feeds []FeedMeta for rows.Next() { var f FeedMeta if err := rows.Scan(&f.ID, &f.URL); err != nil { return nil, fmt.Errorf("scan feed: %w", err) } feeds = append(feeds, f) } if err := rows.Err(); err != nil { return nil, fmt.Errorf("iterate feeds: %w", err) } return feeds, nil } func articleIDFromLink(link string) string { h := sha1.Sum([]byte(link)) return hex.EncodeToString(h[:]) } func saveArticles(ctx context.Context, feedID int, articles []Article) error { dsn := buildDSN() conn, err := pgx.Connect(ctx, dsn) if err != nil { return fmt.Errorf("connect db: %w", err) } defer conn.Close(ctx) tx, err := conn.Begin(ctx) if err != nil { return fmt.Errorf("begin tx: %w", err) } defer tx.Rollback(ctx) stmt := ` INSERT INTO articles (article_id, feed_id, title, link, summary, published_at) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (article_id) DO NOTHING ` for _, a := range articles { aid := a.ArticleID if aid == "" { aid = articleIDFromLink(a.Link) } if _, err := tx.Exec(ctx, stmt, aid, feedID, a.Title, a.Link, a.Summary, a.PublishedAt); err != nil { return fmt.Errorf("insert article for feed %d: %w", feedID, err) } } if _, err := tx.Exec(ctx, `UPDATE feeds SET last_checked = now() WHERE id = $1`, feedID); err != nil { return fmt.Errorf("update feed timestamp: %w", err) } if err := tx.Commit(ctx); err != nil { return fmt.Errorf("commit tx: %w", err) } return nil } func setFeedAccess(ctx context.Context, feedID int, access bool) error { dsn := buildDSN() conn, err := pgx.Connect(ctx, dsn) if err != nil { return fmt.Errorf("connect db: %w", err) } defer conn.Close(ctx) if _, err := conn.Exec(ctx, `UPDATE feeds SET access = $1 WHERE id = $2`, access, feedID); err != nil { return fmt.Errorf("update feed access: %w", err) } return nil } func saveMarketQuotes(ctx context.Context, quotes []MarketQuote) error { dsn := buildDSN() conn, err := pgx.Connect(ctx, dsn) if err != nil { return fmt.Errorf("connect db: %w", err) } defer conn.Close(ctx) batch := &pgx.Batch{} for _, q := range quotes { batch.Queue( `INSERT INTO market_quotes (instrument, bid, quoted_at) VALUES ($1, $2, $3) ON CONFLICT (instrument) DO UPDATE SET bid = EXCLUDED.bid, quoted_at = EXCLUDED.quoted_at`, q.Instrument, q.Bid, q.QuotedAt, ) } br := conn.SendBatch(ctx, batch) defer br.Close() for range quotes { if _, err := br.Exec(); err != nil { return fmt.Errorf("insert market quote: %w", err) } } return nil }