Adjusted to properly type each of the database interactions
This commit is contained in:
@@ -128,14 +128,18 @@ defmodule Mixer.Metrics.Buffer do
|
|||||||
|
|
||||||
defp do_flush(events) do
|
defp do_flush(events) do
|
||||||
rows = Enum.reverse(events)
|
rows = Enum.reverse(events)
|
||||||
|
count = length(rows)
|
||||||
|
|
||||||
try do
|
try do
|
||||||
{count, _} = Mixer.ClickhouseRepo.insert_all(PostEvent, rows)
|
# ClickHouse async inserts acknowledge writes immediately and always
|
||||||
|
# return num_rows: 0 — the data is queued for background commitment.
|
||||||
|
# We use our own row count for the log so it is always accurate.
|
||||||
|
Mixer.ClickhouseRepo.insert_all(PostEvent, rows)
|
||||||
Logger.debug("[Mixer.Metrics.Buffer] Flushed #{count} event(s) to ClickHouse")
|
Logger.debug("[Mixer.Metrics.Buffer] Flushed #{count} event(s) to ClickHouse")
|
||||||
rescue
|
rescue
|
||||||
error ->
|
error ->
|
||||||
Logger.error(
|
Logger.error(
|
||||||
"[Mixer.Metrics.Buffer] Failed to flush #{length(rows)} event(s) to ClickHouse: " <>
|
"[Mixer.Metrics.Buffer] Failed to flush #{count} event(s) to ClickHouse: " <>
|
||||||
Exception.message(error)
|
Exception.message(error)
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -22,19 +22,23 @@ defmodule Mixer.Metrics.PostEvent do
|
|||||||
@primary_key false
|
@primary_key false
|
||||||
|
|
||||||
schema "post_events" do
|
schema "post_events" do
|
||||||
# LowCardinality(String) in ClickHouse — keep values in the set above
|
# Must be Ch-typed so ecto_ch emits LowCardinality(String) in the RowBinary
|
||||||
field :event_type, :string
|
# header, matching the ClickHouse table DDL exactly.
|
||||||
|
field :event_type, Ch, type: "LowCardinality(String)"
|
||||||
|
|
||||||
# The tweet that the event relates to
|
# The tweet that the event relates to
|
||||||
field :tweet_id, Ecto.UUID
|
field :tweet_id, Ecto.UUID
|
||||||
|
|
||||||
# The acting user; may be nil for anonymous views
|
# The acting user; may be nil for anonymous views.
|
||||||
field :user_id, Ecto.UUID
|
# Must be Ch-typed so ecto_ch emits Nullable(UUID) in the RowBinary header,
|
||||||
|
# matching the ClickHouse table DDL exactly.
|
||||||
|
field :user_id, Ch, type: "Nullable(UUID)"
|
||||||
|
|
||||||
# Wall-clock time of the event (UTC, second precision)
|
# Wall-clock time of the event (UTC, second precision)
|
||||||
field :occurred_at, :utc_datetime
|
field :occurred_at, :utc_datetime
|
||||||
|
|
||||||
# Optional originating IP, useful for deduplicating anonymous views
|
# Optional originating IP, useful for deduplicating anonymous views.
|
||||||
field :ip_address, :string
|
# Nullable(String) for the same reason as user_id above.
|
||||||
|
field :ip_address, Ch, type: "Nullable(String)"
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -66,6 +66,26 @@ defmodule Mixer.Posts.Tweet do
|
|||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Track a "comment" metric event whenever a reply is posted. We record
|
||||||
|
# the event against the *parent* tweet so that `get_summary/1` and
|
||||||
|
# `get_bulk_summaries/1` can count how many comments each tweet received.
|
||||||
|
change fn changeset, context ->
|
||||||
|
case Ash.Changeset.get_attribute(changeset, :parent_tweet_id) do
|
||||||
|
nil ->
|
||||||
|
changeset
|
||||||
|
|
||||||
|
parent_tweet_id ->
|
||||||
|
Ash.Changeset.after_action(changeset, fn _changeset, tweet ->
|
||||||
|
Mixer.Metrics.track_comment(
|
||||||
|
parent_tweet_id,
|
||||||
|
user_id: context.actor && context.actor.id
|
||||||
|
)
|
||||||
|
|
||||||
|
{:ok, tweet}
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
update :update do
|
update :update do
|
||||||
@@ -80,6 +100,7 @@ defmodule Mixer.Posts.Tweet do
|
|||||||
Ash.Changeset.after_action(changeset, fn _changeset, tweet ->
|
Ash.Changeset.after_action(changeset, fn _changeset, tweet ->
|
||||||
case ensure_like(tweet, context.actor) do
|
case ensure_like(tweet, context.actor) do
|
||||||
{:created, _like} ->
|
{:created, _like} ->
|
||||||
|
Mixer.Metrics.track_like(tweet.id, user_id: context.actor && context.actor.id)
|
||||||
increment_likes(tweet, context.actor)
|
increment_likes(tweet, context.actor)
|
||||||
|
|
||||||
{:noop, _like} ->
|
{:noop, _like} ->
|
||||||
@@ -100,6 +121,7 @@ defmodule Mixer.Posts.Tweet do
|
|||||||
Ash.Changeset.after_action(changeset, fn _changeset, tweet ->
|
Ash.Changeset.after_action(changeset, fn _changeset, tweet ->
|
||||||
case remove_like(tweet, context.actor) do
|
case remove_like(tweet, context.actor) do
|
||||||
{:deleted, _like} ->
|
{:deleted, _like} ->
|
||||||
|
Mixer.Metrics.track_unlike(tweet.id, user_id: context.actor && context.actor.id)
|
||||||
decrement_likes(tweet, context.actor)
|
decrement_likes(tweet, context.actor)
|
||||||
|
|
||||||
{:noop, _like} ->
|
{:noop, _like} ->
|
||||||
|
|||||||
@@ -16,6 +16,8 @@ defmodule MixerWeb.PageController do
|
|||||||
end
|
end
|
||||||
|
|
||||||
def show(conn, %{"tweet_id" => tweet_id}) do
|
def show(conn, %{"tweet_id" => tweet_id}) do
|
||||||
|
user_id = conn.assigns[:current_user] && conn.assigns[:current_user].id
|
||||||
|
Mixer.Metrics.track_view(tweet_id, user_id: user_id, ip_address: conn.remote_ip)
|
||||||
render_spa(conn, %{page: "tweet", tweet_id: tweet_id, user_id: nil})
|
render_spa(conn, %{page: "tweet", tweet_id: tweet_id, user_id: nil})
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user