diff --git a/app/models/activity_pub/releases_subscription.rb b/app/models/activity_pub/releases_subscription.rb index 0a4293b2bdea71872e26ad9497e3f3fc50fbd4d7..8a7ff94e587f89fe5f8e10f745f085b41abeea50 100644 --- a/app/models/activity_pub/releases_subscription.rb +++ b/app/models/activity_pub/releases_subscription.rb @@ -6,6 +6,15 @@ class ReleasesSubscription < ApplicationRecord enum :status, [:requested, :accepted], default: :requested + scope :with_shared_inbox, -> { where.not(shared_inbox_url: nil) } + scope :without_shared_inbox, -> { where(shared_inbox_url: nil) } + scope :pending_notification, ->(release) do + where(project_id: release.project_id).where("updated_at < ?", release.released_at) + end + scope :with_limit, ->(maximum) { limit(maximum) } + scope :group_by_shared_inbox, -> { select(:shared_inbox_url).group(:shared_inbox_url) } + scope :for_project_shared_inbox_url, ->(project_id, url) { where(project_id: project_id, shared_inbox_url: url) } + attribute :payload, Gitlab::Database::Type::JsonPgSafe.new validates :payload, json_schema: { filename: 'activity_pub_follow_payload' }, allow_blank: true @@ -16,7 +25,8 @@ class ReleasesSubscription < ApplicationRecord validates :shared_inbox_url, public_url: { allow_nil: true } def self.find_by_project_and_subscriber(project_id, subscriber_url) - find_by('project_id = ? AND LOWER(subscriber_url) = ?', project_id, subscriber_url.downcase) + find_by('project_id = :project_id AND LOWER(subscriber_url) = :subscriber_url', + project_id: project_id, subscriber_url: subscriber_url.downcase) end end end diff --git a/app/services/activity_pub/accept_follow_service.rb b/app/services/activity_pub/accept_follow_service.rb index 0ec440fa97266858efa3b32977bb5c2956f355fb..6f9b1f3a1d3f2c7eeac7465991571e11a9e5a1a9 100644 --- a/app/services/activity_pub/accept_follow_service.rb +++ b/app/services/activity_pub/accept_follow_service.rb @@ -2,6 +2,8 @@ module ActivityPub class AcceptFollowService + include ActivityPubRequest + MissingInboxURLError = Class.new(StandardError) attr_reader :subscription, :actor @@ -15,22 +17,12 @@ def execute return if subscription.accepted? raise MissingInboxURLError unless subscription.subscriber_inbox_url.present? - upload_accept_activity + upload_activity(payload, subscription.subscriber_inbox_url) subscription.accepted! end private - def upload_accept_activity - body = Gitlab::Json::LimitedEncoder.encode(payload, limit: 1.megabyte) - - begin - Gitlab::HTTP.post(subscription.subscriber_inbox_url, body: body, headers: headers) - rescue StandardError => e - raise ThirdPartyError, e.message - end - end - def payload follow = subscription.payload.dup follow.delete('@context') @@ -43,13 +35,5 @@ def payload object: follow } end - - def headers - { - 'User-Agent' => "GitLab/#{Gitlab::VERSION}", - 'Content-Type' => 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"', - 'Accept' => 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"' - } - end end end diff --git a/app/services/activity_pub/projects/send_release_activities_service.rb b/app/services/activity_pub/projects/send_release_activities_service.rb new file mode 100644 index 0000000000000000000000000000000000000000..47ed51cc5d33aeeb478481a6e36207a6e1556dd2 --- /dev/null +++ b/app/services/activity_pub/projects/send_release_activities_service.rb @@ -0,0 +1,67 @@ +# frozen_string_literal: true + +module ActivityPub + module Projects + class SendReleaseActivitiesService + include ActivityPubRequest + + MAX_REQUEST_PER_RUN = 100 + + attr_reader :release, :activity + + def initialize(release, activity) + @release = release + @activity = activity + @has_pending_subscriptions = true + end + + def execute + subscriptions = ReleasesSubscription.pending_notification(release) + + shared_inboxes = subscriptions.with_shared_inbox.group_by_shared_inbox.with_limit(MAX_REQUEST_PER_RUN) + + if shared_inboxes.count.present? + notify_shared_inboxes(shared_inboxes) + return + end + + individual_inboxes = subscriptions.without_shared_inbox.with_limit(MAX_REQUEST_PER_RUN) + + if individual_inboxes.empty? + @has_pending_subscriptions = false + return + end + + notify_individuals(individual_inboxes) + end + + def has_pending_subscriptions? + @has_pending_subscriptions + end + + private + + def notify_shared_inboxes(subscriptions) + subscriptions.each do |subscription| + begin + upload_activity(activity, subscription.shared_inbox_url) + rescue ActivityPub::ThirdPartyError + end + + ReleasesSubscription.for_project_shared_inbox_url(release.project_id, subscription.shared_inbox_url).touch_all + end + end + + def notify_individuals(subscriptions) + subscriptions.each do |subscription| + begin + upload_activity(activity, subscription.subscriber_inbox_url) + rescue ActivityPub::ThirdPartyError + end + + subscription.touch + end + end + end + end +end diff --git a/app/services/concerns/activity_pub_request.rb b/app/services/concerns/activity_pub_request.rb new file mode 100644 index 0000000000000000000000000000000000000000..8e7f6c5e6688ea7b7e4d08e00e3cfd1982b1fa1d --- /dev/null +++ b/app/services/concerns/activity_pub_request.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true + +module ActivityPubRequest + def upload_activity(payload, inbox_url) + body = Gitlab::Json::LimitedEncoder.encode(payload, limit: 1.megabyte) + + begin + Gitlab::HTTP.post(inbox_url, body: body, headers: activity_pub_headers) + rescue StandardError => e + raise ActivityPub::ThirdPartyError, e.message + end + end + + def activity_pub_headers + { + 'User-Agent' => "GitLab/#{Gitlab::VERSION}", + 'Content-Type' => 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"', + 'Accept' => 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"' + } + end +end diff --git a/app/workers/activity_pub/projects/publish_release_activities_worker.rb b/app/workers/activity_pub/projects/publish_release_activities_worker.rb new file mode 100644 index 0000000000000000000000000000000000000000..3643947d724f95a7cdc96d86139a5c14001f3096 --- /dev/null +++ b/app/workers/activity_pub/projects/publish_release_activities_worker.rb @@ -0,0 +1,41 @@ +# frozen_string_literal: true + +module ActivityPub + module Projects + class PublishReleaseActivitiesWorker + include Gitlab::EventStore::Subscriber + + idempotent! + worker_has_external_dependencies! + feature_category :release_orchestration + data_consistency :delayed + queue_namespace :activity_pub + + def handle_event(event) + release = Release.find_by_id(event.data[:release_id]) + return unless release + + service = SendReleaseActivitiesService.new(release, activity_for(release)) + service.execute + + return unless service.has_pending_subscriptions? + + PublishReleaseActivitiesWorker.perform_async(::Projects::ReleasePublishedEvent, event.data) + end + + private + + def activity_for(release) + # Not being able to use serializer here is a serious problem, given the + # activity we send here is the exact same data generated for + # ActivityPub::Projects::ReleasesController#outbox, and the same thing + # will happen for all actors (they provide the same data both in their + # `#outbox` endpoint and when sending out activities from workers). + # + # Disabling the rule for now, we need to discuss that. + serializer = ActivityPub::PublishReleaseActivitySerializer.new # rubocop:disable CodeReuse/Serializer -- see above. + serializer.represent(release) + end + end + end +end diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index 60ad122288cd344e359c4a09d684fcf103c052ea..ca5f2f350e0037ea8b4b2f4dcd760af0a8e59de4 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -3,6 +3,15 @@ # # Do not edit it manually! --- +- :name: activity_pub:activity_pub_projects_publish_release_activities + :worker_name: ActivityPub::Projects::PublishReleaseActivitiesWorker + :feature_category: :release_orchestration + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: activity_pub:activity_pub_projects_releases_subscription :worker_name: ActivityPub::Projects::ReleasesSubscriptionWorker :feature_category: :release_orchestration diff --git a/lib/gitlab/event_store.rb b/lib/gitlab/event_store.rb index b422fd061ff93a976c0b0b6db80153e5ab2ff147..36da12f79976bbebe0432ee9dc3ee60744d15765 100644 --- a/lib/gitlab/event_store.rb +++ b/lib/gitlab/event_store.rb @@ -51,6 +51,7 @@ def self.configure!(store) to: ::Packages::PackageCreatedEvent, if: -> (event) { ::Ml::ExperimentTracking::AssociateMlCandidateToPackageWorker.handles_event?(event) } store.subscribe ::Ci::InitializePipelinesIidSequenceWorker, to: ::Projects::ProjectCreatedEvent + store.subscribe ::ActivityPub::Projects::PublishReleaseActivitiesWorker, to: ::Projects::ReleasePublishedEvent end private_class_method :configure! end diff --git a/spec/factories/activity_pub/releases_subscriptions.rb b/spec/factories/activity_pub/releases_subscriptions.rb index b789188528a67116013646ec402e79f69e9f90e1..abfa89ea1bacaaf312c78a636081693def0e7f2b 100644 --- a/spec/factories/activity_pub/releases_subscriptions.rb +++ b/spec/factories/activity_pub/releases_subscriptions.rb @@ -3,7 +3,7 @@ FactoryBot.define do factory :activity_pub_releases_subscription, class: 'ActivityPub::ReleasesSubscription' do project - subscriber_url { 'https://example.com/actor' } + subscriber_url { |i| "https://example.com/actor-#{i}" } status { :requested } payload do { @@ -16,7 +16,7 @@ end trait :inbox do - subscriber_inbox_url { 'https://example.com/actor/inbox' } + subscriber_inbox_url { |i| "https://example.com/actor/inbox-#{i}" } end trait :shared_inbox do diff --git a/spec/services/activity_pub/projects/send_release_activities_service_spec.rb b/spec/services/activity_pub/projects/send_release_activities_service_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..73d8c9d681953f5d65ce81a9be29433c3f937cf7 --- /dev/null +++ b/spec/services/activity_pub/projects/send_release_activities_service_spec.rb @@ -0,0 +1,104 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe ActivityPub::Projects::SendReleaseActivitiesService, feature_category: :release_orchestration do + let_it_be(:release) { create(:release, released_at: Time.current) } + let_it_be(:shared_inbox) { 'http://example.com/shared-inbox' } + let_it_be(:individual_inbox) { 'http://example.com/actor/inbox' } + + let_it_be_with_reload(:existing_subscription_with_shared_inbox) do + create_list(:activity_pub_releases_subscription, 2, { + shared_inbox_url: shared_inbox, + project: release.project, + created_at: 1.month.ago, + updated_at: 1.month.ago + }) + end + + let_it_be_with_reload(:existing_individual_subscription) do + create(:activity_pub_releases_subscription, { + subscriber_inbox_url: individual_inbox, + project: release.project, + created_at: 1.month.ago, + updated_at: 1.month.ago + }) + end + + let(:activity) do + { + id: 'http://example.com/activity', + '@context': 'https://www.w3.org/ns/activitystreams', + type: 'Create', + actor: 'http://example.com/actor', + object: { + id: 'http://example.com/release', + type: 'Application' + } + } + end + + let(:service) { described_class.new(release, activity) } + + before do + allow(service).to receive(:upload_activity).and_return(true) + end + + describe '#execute' do + describe 'when there are pending subscriptions with shared inbox' do + before do + service.execute + end + + it 'sends activities to the shared inbox' do + expect(service).to have_received(:upload_activity).with(activity, shared_inbox) + end + + it 'updates subscription with shared inbox' do + expect(ActivityPub::ReleasesSubscription.with_shared_inbox.pending_notification(release)).to be_empty + end + + it 'does not update subscription with individual inbox' do + expect(ActivityPub::ReleasesSubscription.without_shared_inbox.pending_notification(release).count).to eq 1 + end + + it 'says there may be more subscriptions to proceed' do + expect(service.has_pending_subscriptions?).to be_truthy + end + end + + describe 'when there are pending subscriptions without shared inbox' do + before do + ActivityPub::ReleasesSubscription.with_shared_inbox.touch_all + service.execute + end + + it 'sends activities to the individual inboxes' do + expect(service).to have_received(:upload_activity).with(activity, individual_inbox) + end + + it 'updates subscription with individual inbox' do + expect(ActivityPub::ReleasesSubscription.without_shared_inbox.pending_notification(release).count).to eq 0 + end + + it 'says there may be more subscriptions to proceed' do + expect(service.has_pending_subscriptions?).to be_truthy + end + end + + describe 'when there is not more pending subscription' do + before do + ActivityPub::ReleasesSubscription.touch_all + service.execute + end + + it 'does not send any activity' do + expect(service).not_to have_received(:upload_activity) + end + + it 'says there is no more subscriptions to proceed' do + expect(service.has_pending_subscriptions?).to be_falsey + end + end + end +end diff --git a/spec/workers/activity_pub/projects/publish_release_activities_worker_spec.rb b/spec/workers/activity_pub/projects/publish_release_activities_worker_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..208ec4995d1d0e67b35800b64156946bcc040327 --- /dev/null +++ b/spec/workers/activity_pub/projects/publish_release_activities_worker_spec.rb @@ -0,0 +1,92 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe ActivityPub::Projects::PublishReleaseActivitiesWorker, feature_category: :release_orchestration do + let(:worker) { described_class.new } + + let(:release) { build_stubbed(:release) } + + let(:event) do + ::Projects::ReleasePublishedEvent.new(data: HashWithIndifferentAccess.new(release_id: release.id)) + end + + let(:service) do + instance_double(ActivityPub::Projects::SendReleaseActivitiesService, has_pending_subscriptions?: has_more, + execute: true) + end + + let(:serializer) { instance_double(ActivityPub::PublishReleaseActivitySerializer, represent: payload) } + let(:has_more) { true } + + let(:payload) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: 'http://example.com/activity', + type: 'Create', + actor: 'http://example.com/actor', + object: { + id: 'http://example.com/release', + type: 'Application' + } + } + end + + before do + allow(ActivityPub::Projects::SendReleaseActivitiesService).to receive(:new) { service } + allow(ActivityPub::PublishReleaseActivitySerializer).to receive(:new) { serializer } + allow(described_class).to receive(:perform_async) + end + + shared_examples_for 'successful run' do + it 'serializes the activity' do + expect(serializer).to have_received(:represent) + end + + it 'calls the activity sending service' do + expect(ActivityPub::Projects::SendReleaseActivitiesService).to have_received(:new).with(release, payload) + expect(service).to have_received(:execute) + end + end + + describe '#handle_event' do + describe 'when the release exists' do + before do + allow(Release).to receive(:find_by_id) { release } + worker.handle_event(event) + end + + describe 'when the job sent the last activities' do + let(:has_more) { false } + + it_behaves_like 'successful run' + + it 'does not queue an other run' do + expect(described_class).not_to have_received(:perform_async) + end + end + + describe 'when there is still more work to do' do + it_behaves_like 'successful run' + + it 'queues an other run' do + expect(described_class).to have_received(:perform_async) + end + end + end + + describe 'when the release does not exist' do + before do + worker.handle_event(event) + end + + it 'does not call the activity sending service' do + expect(service).not_to have_received(:execute) + end + + it 'does not queue an other run' do + expect(described_class).not_to have_received(:perform_async) + end + end + end +end