HEX
Server: Apache/2.4.65 (Debian)
System: Linux wordpress-7cb4c6b6f6-6k2kb 5.15.0-131-generic #141-Ubuntu SMP Fri Jan 10 21:18:28 UTC 2025 x86_64
User: www-data (33)
PHP: 8.3.27
Disabled: NONE
Upload Files
File: /var/www/html/wp-content/plugins/woocommerce/src/Internal/CLI/Migrator/Lib/ImportSession.php
<?php

/**
 * !! Do not apply Woo-specific changes to this class !!
 *
 * This class is a part of the WordPress/php-toolkit project and is currently
 * duplicated between WordPress/php-toolkit and woocommerce/woocommerce:
 * * https://github.com/WordPress/php-toolkit/blob/trunk/components/DataLiberation/Importer/ImportSession.php
 * * https://github.com/woocommerce/woocommerce/blob/trunk/plugins/woocommerce/src/Internal/CLI/Migrator/Lib/ImportSession.php
 *
 * Apply all changes in both projects until Woo consumes php-toolkit as a
 * composer dependency. Generic changes belong to this class. Anything
 * Woo-specific should be implemented as an extension point.
 *
 * MODIFICATION: Made this class standalone by replacing external StreamImporter 
 * and AttachmentDownloaderEvent dependencies with internal constants to eliminate 
 * external imports and make the class fully self-contained.
 */

namespace Automattic\WooCommerce\Internal\CLI\Migrator\Lib;

use WP_Query;

use function get_all_post_meta_flat;
use function is_wp_error;

/**
 * Manages import session data in the WordPress database.
 *
 * Each import session is stored as a post of type 'import_session'.
 * Progress, stage, and other metadata are stored as post meta.
 */
class ImportSession {
	const POST_TYPE = 'import_session';

	// Import stage constants - replaces StreamImporter dependencies
	const STAGE_INITIAL = 'initial';
	const STAGE_FINISHED = 'finished';
	
	// Import stages in processing order
	const STAGES_IN_ORDER = array(
		self::STAGE_INITIAL,
		'indexing',
		'preparing',
		'importing',
		'finalizing',
		self::STAGE_FINISHED,
	);

	// Event type constants - replaces AttachmentDownloaderEvent dependencies  
	const EVENT_SUCCESS = 'success';
	const EVENT_ALREADY_EXISTS = 'already_exists';
	const EVENT_FAILURE = 'failure';

	/**
	 * @TODO: Make it extendable
	 * @TODO: Reuse the same entities list as WP_Stream_Importer
	 */
	const PROGRESS_ENTITIES = array(
		'site_option',
		'user',
		'category',
		'tag',
		'term',
		'post',
		'post_meta',
		'comment',
		'comment_meta',
	);

	const FRONTLOAD_STATUS_AWAITING_DOWNLOAD = 'awaiting_download';
	const FRONTLOAD_STATUS_IGNORED = 'ignored';
	const FRONTLOAD_STATUS_ERROR = 'error';
	const FRONTLOAD_STATUS_SUCCEEDED = 'succeeded';
	private $post_id;
	private $cached_stage;

	/**
	 * Creates a new import session.
	 *
	 * @param  array  $args  {
	 *
	 * @type string $data_source The data source (e.g. 'wxr_file', 'wxr_url', 'markdown_zip')
	 * @type string $source_url Optional. URL of the source file for remote imports
	 * @type int $attachment_id Optional. ID of the uploaded file attachment
	 * @type string $file_name Optional. Original name of the uploaded file
	 * }
	 * @return ImportSession The created ImportSession instance.
	 * @throws \Exception If the arguments are invalid.
	 */
	public static function create( $args ) {
		// Validate the required arguments for each data source.
		// @TODO: Leave it up to filters to make it extendable.
		switch ( $args['data_source'] ) {
			case 'wxr_file':
				if ( empty( $args['file_name'] ) ) {
					throw new \Exception( 'File name is required for WXR file imports' );
				}
				break;
			case 'wxr_url':
				if ( empty( $args['source_url'] ) ) {
					throw new \Exception( 'Source URL is required for remote imports' );
				}
				break;
			case 'markdown_zip':
				if ( empty( $args['file_name'] ) ) {
					throw new \Exception( 'File name is required for Markdown ZIP imports' );
				}
				break;
			case 'local_directory':
				if ( empty( $args['file_name'] ) ) {
					throw new \Exception( 'Directory path is required for local directory imports' );
				}
				break;
		}

		$post_id = wp_insert_post(
			array(
				'post_type'   => self::POST_TYPE,
				'post_status' => 'publish',
				'post_title'  => sprintf(
					'Import from %s - %s',
					$args['data_source'],
					$args['file_name'] ?? $args['source_url'] ?? 'Unknown source'
				),
				'meta_input'  => array(
					'data_source'   => $args['data_source'],
					'started_at'    => time(),
					'file_name'     => $args['file_name'] ?? null,
					'source_url'    => $args['source_url'] ?? null,
					'attachment_id' => $args['attachment_id'] ?? null,
				),
			),
			true
		);
		if ( is_wp_error( $post_id ) ) {
			throw new \Exception( 'Error creating an import session: ' . $post_id->get_error_message() );
		}

		if ( ! empty( $args['attachment_id'] ) ) {
			wp_update_post(
				array(
					'ID'          => $post_id,
					'post_parent' => $args['attachment_id'],
				)
			);
		}

		return new self( $post_id );
	}

	/**
	 * Gets an existing import session by ID.
	 *
	 * @param  int  $post_id  The import session post ID
	 *
	 * @return WP_Import_Model|null The import model instance or null if not found
	 */
	public static function by_id( $post_id ) {
		$post = get_post( $post_id );
		if ( ! $post || $post->post_type !== self::POST_TYPE ) {
			return false;
		}

		return new self( $post_id );
	}

	/**
	 * Gets the most recent active import session.
	 *
	 * @return WP_Import_Session|null The most recent import or null if none found
	 */
	public static function get_active() {
		$posts = get_posts(
			array(
				'post_type'      => self::POST_TYPE,
				'post_status'    => array( 'publish' ),
				'posts_per_page' => 1,
				'orderby'        => 'date',
				'order'          => 'DESC',
				'meta_query'     => array(
					// @TODO: This somehow makes $post empty.
					// array(
					// 'key' => 'current_stage',
					// 'value' => WP_Stream_Importer::STAGE_FINISHED,
					// 'compare' => '!='
					// )
				),
			)
		);

		if ( empty( $posts ) ) {
			return false;
		}

		return new self( $posts[0]->ID );
	}

	public function __construct( $post_id ) {
		$this->post_id = $post_id;
	}

	/**
	 * Gets the import session ID.
	 *
	 * @return int The post ID
	 */
	public function get_id() {
		return $this->post_id;
	}

	public function get_metadata() {
		$cursor = $this->get_reentrancy_cursor();

		return array(
			'post_id'       => $this->post_id,
			'cursor'        => $cursor ? $cursor : null,
			'data_source'   => get_post_meta( $this->post_id, 'data_source', true ),
			'source_url'    => get_post_meta( $this->post_id, 'source_url', true ),
			'attachment_id' => get_post_meta( $this->post_id, 'attachment_id', true ),
		);
	}

	public function get_data_source() {
		return get_post_meta( $this->post_id, 'data_source', true );
	}

	public function get_human_readable_file_reference() {
		switch ( $this->get_data_source() ) {
			case 'wxr_file':
			case 'markdown_zip':
				return get_post_meta( $this->post_id, 'file_name', true );
			case 'wxr_url':
				return get_post_meta( $this->post_id, 'source_url', true );
		}

		return '';
	}

	public function archive() {
		wp_update_post(
			array(
				'ID'          => $this->post_id,
				'post_status' => 'archived',
			)
		);
	}

	/**
	 * Gets the current progress information.
	 *
	 * @return array The progress data
	 */
	public function count_imported_entities() {
		$progress = array();
		foreach ( self::PROGRESS_ENTITIES as $entity ) {
			$progress[] = array(
				'label'    => $entity,
				'imported' => (int) get_post_meta( $this->post_id, 'imported_' . $entity, true ),
				'total'    => (int) get_post_meta( $this->post_id, 'total_' . $entity, true ),
			);
		}

		return $progress;
	}

	public function count_all_imported_entities() {
		$counts = $this->count_imported_entities();

		return array_sum( array_column( $counts, 'imported' ) );
	}

	public function count_all_total_entities() {
		$counts = $this->count_imported_entities();

		return array_sum( array_column( $counts, 'total' ) );
	}

	public function count_remaining_entities() {
		$counts = $this->count_imported_entities();

		return array_sum( array_column( $counts, 'total' ) ) - array_sum( array_column( $counts, 'imported' ) );
	}

	/**
	 * Cache of imported entity counts to avoid repeated database queries
	 *
	 * @var array
	 */
	private $cached_imported_counts = array();

	/**
	 * Updates the progress information.
	 *
	 * @param  array  $newly_imported_entities  The new progress data with keys: posts, comments, terms, attachments, users
	 */
	public function bump_imported_entities_counts( $newly_imported_entities ) {
		foreach ( $newly_imported_entities as $field => $count ) {
			if ( ! in_array( $field, static::PROGRESS_ENTITIES, true ) ) {
				_doing_it_wrong(
					__METHOD__,
					'Cannot bump imported entities count for unknown entity type: ' . $field,
					'1.0.0'
				);
				continue;
			}

			// Get current count from cache or database
			if ( ! isset( $this->cached_imported_counts[ $field ] ) ) {
				$this->cached_imported_counts[ $field ] = (int) get_post_meta( $this->post_id, 'imported_' . $field, true );
			}

			// Add new count to total
			$new_count = $this->cached_imported_counts[ $field ] + $count;

			// Update database and cache
			update_post_meta( $this->post_id, 'imported_' . $field, $new_count );
			$this->cached_imported_counts[ $field ] = $new_count;
			/*
			@TODO run an atomic query instead:
			$sql = $wpdb->prepare(
				"INSERT INTO {$wpdb->postmeta} (post_id, meta_key, meta_value)
				VALUES (%d, %s, %d)
				ON DUPLICATE KEY UPDATE meta_value = meta_value + %d",
				$this->post_id,
				'imported_' . $field,
				$count,
				$count
			);
			$wpdb->query($sql);
			*/
		}
	}

	public function count_awaiting_frontloading_stubs() {
		global $wpdb;

		return (int) $wpdb->get_var(
			$wpdb->prepare(
				"SELECT COUNT(*) FROM $wpdb->posts
				 WHERE post_type = 'frontloading_stub'
				 AND post_parent = %d
				 AND post_status = %s",
				$this->post_id,
				self::FRONTLOAD_STATUS_AWAITING_DOWNLOAD
			)
		);
	}

	public function count_unfinished_frontloading_stubs() {
		global $wpdb;

		return (int) $wpdb->get_var(
			$wpdb->prepare(
				"SELECT COUNT(*) FROM $wpdb->posts
				 WHERE post_type = 'frontloading_stub'
				 AND post_parent = %d
				 AND post_status != %s
				 AND post_status != %s",
				$this->post_id,
				self::FRONTLOAD_STATUS_SUCCEEDED,
				self::FRONTLOAD_STATUS_IGNORED
			)
		);
	}

	public function mark_frontloading_errors_as_ignored() {
		global $wpdb;
		$wpdb->update(
			$wpdb->posts,
			array( 'post_status' => self::FRONTLOAD_STATUS_IGNORED ),
			array(
				'post_type' => 'frontloading_stub',
				// 'post_status !=' => self::FRONTLOAD_STATUS_SUCCEEDED,
			)
		);
	}

	public function get_frontloading_stubs( $options = array() ) {
		$query = new WP_Query(
			array(
				'post_type'      => 'frontloading_stub',
				'post_status'    => 'any',
				'post_parent'    => $this->post_id,
				'posts_per_page' => $options['per_page'] ?? 25,
				'paged'          => $options['page'] ?? 1,
				'orderby'        => array(
					'post_status' => array(
						self::FRONTLOAD_STATUS_ERROR             => 0,
						self::FRONTLOAD_STATUS_AWAITING_DOWNLOAD => 1,
						'any'                                    => 2,
					),
					'ID'          => 'ASC',
				),
			)
		);

		if ( ! $query->have_posts() ) {
			return array();
		}

		$posts = $query->posts;
		$ids   = array_map(
			function ( $post ) {
				return $post->ID;
			},
			$posts
		);
		update_meta_cache( 'post', $ids );
		foreach ( $posts as $post ) {
			$post->meta = get_all_post_meta_flat( $post->ID );
		}

		return $posts;
	}

	public function get_total_number_of_entities() {
		$totals = array();
		foreach ( static::PROGRESS_ENTITIES as $field ) {
			$totals[ $field ] = (int) get_post_meta( $this->post_id, 'total_' . $field, true );
		}
		$totals['download'] = $this->get_total_number_of_assets();

		return $totals;
	}

	public function get_total_number_of_assets() {
		global $wpdb;

		return (int) $wpdb->get_var(
			$wpdb->prepare(
				"SELECT COUNT(*) FROM $wpdb->posts
			WHERE post_type = 'frontloading_stub'
			AND post_parent = %d",
				$this->post_id
			)
		);
	}

	public function get_frontloading_stub( $url ) {
		global $wpdb;
		$id = $wpdb->get_var(
			$wpdb->prepare(
				"SELECT p.ID FROM $wpdb->posts p
				 INNER JOIN $wpdb->postmeta pm ON p.ID = pm.post_id
				 WHERE p.post_type = 'frontloading_stub'
				 AND p.post_parent = %d
				 AND pm.meta_key = 'current_url'
				 AND pm.meta_value = %s
				 LIMIT 1",
				$this->post_id,
				$url
			)
		);

		return get_post( $id );
	}

	/**
	 * Creates placeholder attachments for the assets to be downloaded in the
	 * frontloading stage.
	 */
	public function create_frontloading_stubs( $urls ) {
		global $wpdb;

		foreach ( $urls as $url => $_ ) {
			/**
			 * Check if placeholder with this URL already exists
			 * There's a race condition here – another insert may happen
			 * between the check and the insert.
			 *
			 * @TODO: Explore solutions. A custom table with a UNIQUE constraint
			 * may or may not be an option, depending on the performance impact
			 * on 100GB+ VIP databases.
			 */
			$exists = $wpdb->get_var(
				$wpdb->prepare(
					"SELECT ID FROM $wpdb->posts
				WHERE post_type = 'frontloading_stub'
				AND post_parent = %d
				AND guid = %s
				LIMIT 1",
					$this->post_id,
					$url
				)
			);

			if ( $exists ) {
				continue;
			}

			$post_data        = array(
				'post_type'   => 'frontloading_stub',
				'post_parent' => $this->post_id,
				'post_title'  => basename( $url ),
				'post_status' => self::FRONTLOAD_STATUS_AWAITING_DOWNLOAD,
				'guid'        => $url,
				'meta_input'  => array(
					'original_url' => $url,
					'current_url'  => $url,
					'attempts'     => 0,
					'last_error'   => null,
					'target_path'  => '',
				),
			);
			$insertion_result = wp_insert_post( $post_data );
			if ( is_wp_error( $insertion_result ) ) {
				throw new \Exception( 'Failed to insert frontloading placeholder' );
			}
		}
	}

	/**
	 * Sets the total number of entities to import for each type.
	 *
	 * @param  array  $totals  The total number of entities for each type
	 */
	private $cached_totals = array();

	public function bump_total_number_of_entities( $newly_indexed_entities ) {
		foreach ( $newly_indexed_entities as $field => $count ) {
			if ( ! in_array( $field, static::PROGRESS_ENTITIES, true ) ) {
				_doing_it_wrong(
					__METHOD__,
					'Cannot set total number of entities for unknown entity type: ' . $field,
					'1.0.0'
				);
				continue;
			}

			// Get current total from cache or database
			if ( ! isset( $this->cached_totals[ $field ] ) ) {
				$this->cached_totals[ $field ] = (int) get_post_meta( $this->post_id, 'total_' . $field, true );
			}

			// Add new count to total
			$new_total = $this->cached_totals[ $field ] + $count;

			// Update database and cache
			update_post_meta( $this->post_id, 'total_' . $field, $new_total );
			$this->cached_totals[ $field ] = $new_total;
		}
	}

	/**
	 * Saves an array of [$url => ['received' => $downloaded_bytes, 'total' => $total_bytes | null]]
	 * of the currently fetched files. The list is ephemeral and changes as we stream the data. There
	 * will never be more than $concurrency_limit files in the list at any given time.
	 */
	public function bump_frontloading_progress( $frontloading_progress, $events = array() ) {
		update_post_meta( $this->post_id, 'frontloading_progress', $frontloading_progress );

		foreach ( $events as $event ) {
			$url         = $event->resource_id;
			$placeholder = $this->get_frontloading_stub( $url );
			if ( ! $placeholder ) {
				_doing_it_wrong(
					__METHOD__,
					'Frontloading placeholder post not found for URL: ' . $url,
					'1.0.0'
				);
				continue;
			}

			update_post_meta( $placeholder->ID, 'last_error', $event->error );

			$attempts     = get_post_meta( $placeholder->ID, 'attempts', true );
			$new_attempts = $attempts;
			$new_status   = $placeholder->post_status;
			switch ( $event->type ) {
				case self::EVENT_SUCCESS:
					$new_status   = self::FRONTLOAD_STATUS_SUCCEEDED;
					$new_attempts = $attempts + 1;
					break;
				case self::EVENT_ALREADY_EXISTS:
					$new_status = self::FRONTLOAD_STATUS_SUCCEEDED;
					break;
				case self::EVENT_FAILURE:
					$new_status   = self::FRONTLOAD_STATUS_ERROR;
					$new_attempts = $attempts + 1;
					break;
			}

			if ( $new_attempts !== $attempts ) {
				update_post_meta( $placeholder->ID, 'attempts', $new_attempts );
			}

			if ( $new_status !== $placeholder->post_status ) {
				wp_update_post(
					array(
						'ID'          => $placeholder->ID,
						'post_status' => $new_status,
					)
				);
			}
		}
	}

	public function get_frontloading_progress() {
		$meta = get_post_meta( $this->post_id, 'frontloading_progress', true );

		return $meta ? $meta : array();
	}

	public function is_stage_completed( $stage ) {
		$current_stage       = $this->get_stage();
		$stage_index         = array_search( $stage, self::STAGES_IN_ORDER, true );
		$current_stage_index = array_search( $current_stage, self::STAGES_IN_ORDER, true );

		return $current_stage_index > $stage_index;
	}

	/**
	 * Gets the current import stage.
	 *
	 * @return string The current stage
	 */
	public function get_stage() {
		if ( ! isset( $this->cached_stage ) ) {
			$meta               = get_post_meta( $this->post_id, 'current_stage', true );
			$this->cached_stage = $meta ? $meta : self::STAGE_INITIAL;
		}

		return $this->cached_stage;
	}

	/**
	 * Updates the current import stage.
	 *
	 * @param  string  $stage  The new stage
	 */
	public function set_stage( $stage ) {
		if ( $stage === $this->get_stage() ) {
			return;
		}
		if ( self::STAGE_FINISHED === $stage ) {
			update_post_meta( $this->post_id, 'finished_at', time() );
		}
		update_post_meta( $this->post_id, 'current_stage', $stage );
		$this->cached_stage = $stage;
	}

	public function get_started_at() {
		return get_post_meta( $this->post_id, 'started_at', true );
	}

	public function get_finished_at() {
		return get_post_meta( $this->post_id, 'finished_at', true );
	}

	public function is_finished() {
		return ! empty( get_post_meta( $this->post_id, 'finished_at', true ) );
	}

	/**
	 * Gets the importer cursor for resuming imports.
	 *
	 * @return string|null The cursor data
	 */
	public function get_reentrancy_cursor() {
		return get_post_meta( $this->post_id, 'importer_cursor', true );
	}

	/**
	 * Updates the importer cursor.
	 *
	 * @param  string  $cursor  The new cursor data
	 */
	public function set_reentrancy_cursor( $cursor ) {
		// WordPress, sadly, removes single slashes from the meta value and
		// requires an addslashes() call to preserve them.
		update_post_meta( $this->post_id, 'importer_cursor', addslashes( $cursor ) );
	}
}