/*------------------------------------------------------------------------- * * postgres_fdw.c * Foreign-data wrapper for remote PostgreSQL servers * * Portions Copyright (c) 2012-2022, PostgreSQL Global Development Group * * IDENTIFICATION * contrib/postgres_fdw/postgres_fdw.c * *------------------------------------------------------------------------- */ #include "postgres.h" #include <limits.h> #include "access/htup_details.h" #include "access/sysattr.h" #include "access/table.h" #include "catalog/pg_class.h" #include "catalog/pg_opfamily.h" #include "commands/defrem.h" #include "commands/explain.h" #include "commands/vacuum.h" #include "executor/execAsync.h" #include "foreign/fdwapi.h" #include "funcapi.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" #include "optimizer/appendinfo.h" #include "optimizer/clauses.h" #include "optimizer/cost.h" #include "optimizer/inherit.h" #include "optimizer/optimizer.h" #include "optimizer/pathnode.h" #include "optimizer/paths.h" #include "optimizer/planmain.h" #include "optimizer/prep.h" #include "optimizer/restrictinfo.h" #include "optimizer/tlist.h" #include "parser/parsetree.h" #include "postgres_fdw.h" #include "storage/latch.h" #include "utils/builtins.h" #include "utils/float.h" #include "utils/guc.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/rel.h" #include "utils/sampling.h" #include "utils/selfuncs.h" PG_MODULE_MAGIC; /* Default CPU cost to start up a foreign query. */ #define DEFAULT_FDW_STARTUP_COST 100.0 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */ #define DEFAULT_FDW_TUPLE_COST 0.01 /* If no remote estimates, assume a sort costs 20% extra */ #define DEFAULT_FDW_SORT_MULTIPLIER 1.2 /* * Indexes of FDW-private information stored in fdw_private lists. * * These items are indexed with the enum FdwScanPrivateIndex, so an item * can be fetched with list_nth(). For example, to get the SELECT statement: * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql)); */ enum FdwScanPrivateIndex { /* SQL statement to execute remotely (as a String node) */ FdwScanPrivateSelectSql, /* Integer list of attribute numbers retrieved by the SELECT */ FdwScanPrivateRetrievedAttrs, /* Integer representing the desired fetch_size */ FdwScanPrivateFetchSize, /* * String describing join i.e. names of relations being joined and types * of join, added when the scan is join */ FdwScanPrivateRelations }; /* * Similarly, this enum describes what's kept in the fdw_private list for * a ModifyTable node referencing a postgres_fdw foreign table. We store: * * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server * 2) Integer list of target attribute numbers for INSERT/UPDATE * (NIL for a DELETE) * 3) Length till the end of VALUES clause for INSERT * (-1 for a DELETE/UPDATE) * 4) Boolean flag showing if the remote query has a RETURNING clause * 5) Integer list of attribute numbers retrieved by RETURNING, if any */ enum FdwModifyPrivateIndex { /* SQL statement to execute remotely (as a String node) */ FdwModifyPrivateUpdateSql, /* Integer list of target attribute numbers for INSERT/UPDATE */ FdwModifyPrivateTargetAttnums, /* Length till the end of VALUES clause (as an Integer node) */ FdwModifyPrivateLen, /* has-returning flag (as a Boolean node) */ FdwModifyPrivateHasReturning, /* Integer list of attribute numbers retrieved by RETURNING */ FdwModifyPrivateRetrievedAttrs }; /* * Similarly, this enum describes what's kept in the fdw_private list for * a ForeignScan node that modifies a foreign table directly. We store: * * 1) UPDATE/DELETE statement text to be sent to the remote server * 2) Boolean flag showing if the remote query has a RETURNING clause * 3) Integer list of attribute numbers retrieved by RETURNING, if any * 4) Boolean flag showing if we set the command es_processed */ enum FdwDirectModifyPrivateIndex { /* SQL statement to execute remotely (as a String node) */ FdwDirectModifyPrivateUpdateSql, /* has-returning flag (as a Boolean node) */ FdwDirectModifyPrivateHasReturning, /* Integer list of attribute numbers retrieved by RETURNING */ FdwDirectModifyPrivateRetrievedAttrs, /* set-processed flag (as a Boolean node) */ FdwDirectModifyPrivateSetProcessed }; /* * Execution state of a foreign scan using postgres_fdw. */ typedef struct PgFdwScanState { Relation rel; /* relcache entry for the foreign table. NULL * for a foreign join scan. */ TupleDesc tupdesc; /* tuple descriptor of scan */ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ /* extracted fdw_private data */ char *query; /* text of SELECT command */ List *retrieved_attrs; /* list of retrieved attribute numbers */ /* for remote query execution */ PGconn *conn; /* connection for the scan */ PgFdwConnState *conn_state; /* extra per-connection state */ unsigned int cursor_number; /* quasi-unique ID for my cursor */ bool cursor_exists; /* have we created the cursor? */ int numParams; /* number of parameters passed to query */ FmgrInfo *param_flinfo; /* output conversion functions for them */ List *param_exprs; /* executable expressions for param values */ const char **param_values; /* textual values of query parameters */ /* for storing result tuples */ HeapTuple *tuples; /* array of currently-retrieved tuples */ int num_tuples; /* # of tuples in array */ int next_tuple; /* index of next one to return */ /* batch-level state, for optimizing rewinds and avoiding useless fetch */ int fetch_ct_2; /* Min(# of fetches done, 2) */ bool eof_reached; /* true if last fetch reached EOF */ /* for asynchronous execution */ bool async_capable; /* engage asynchronous-capable logic? */ /* working memory contexts */ MemoryContext batch_cxt; /* context holding current batch of tuples */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ int fetch_size; /* number of tuples per fetch */ } PgFdwScanState; /* * Execution state of a foreign insert/update/delete operation. */ typedef struct PgFdwModifyState { Relation rel; /* relcache entry for the foreign table */ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ /* for remote query execution */ PGconn *conn; /* connection for the scan */ PgFdwConnState *conn_state; /* extra per-connection state */ char *p_name; /* name of prepared statement, if created */ /* extracted fdw_private data */ char *query; /* text of INSERT/UPDATE/DELETE command */ char *orig_query; /* original text of INSERT command */ List *target_attrs; /* list of target attribute numbers */ int values_end; /* length up to the end of VALUES */ int batch_size; /* value of FDW option "batch_size" */ bool has_returning; /* is there a RETURNING clause? */ List *retrieved_attrs; /* attr numbers retrieved by RETURNING */ /* info about parameters for prepared statement */ AttrNumber ctidAttno; /* attnum of input resjunk ctid column */ int p_nums; /* number of parameters to transmit */ FmgrInfo *p_flinfo; /* output conversion functions for them */ /* batch operation stuff */ int num_slots; /* number of slots to insert */ /* working memory context */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ /* for update row movement if subplan result rel */ struct PgFdwModifyState *aux_fmstate; /* foreign-insert state, if * created */ } PgFdwModifyState; /* * Execution state of a foreign scan that modifies a foreign table directly. */ typedef struct PgFdwDirectModifyState { Relation rel; /* relcache entry for the foreign table */ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ /* extracted fdw_private data */ char *query; /* text of UPDATE/DELETE command */ bool has_returning; /* is there a RETURNING clause? */ List *retrieved_attrs; /* attr numbers retrieved by RETURNING */ bool set_processed; /* do we set the command es_processed? */ /* for remote query execution */ PGconn *conn; /* connection for the update */ PgFdwConnState *conn_state; /* extra per-connection state */ int numParams; /* number of parameters passed to query */ FmgrInfo *param_flinfo; /* output conversion functions for them */ List *param_exprs; /* executable expressions for param values */ const char **param_values; /* textual values of query parameters */ /* for storing result tuples */ PGresult *result; /* result for query */ int num_tuples; /* # of result tuples */ int next_tuple; /* index of next one to return */ Relation resultRel; /* relcache entry for the target relation */ AttrNumber *attnoMap; /* array of attnums of input user columns */ AttrNumber ctidAttno; /* attnum of input ctid column */ AttrNumber oidAttno; /* attnum of input oid column */ bool hasSystemCols; /* are there system columns of resultRel? */ /* working memory context */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ } PgFdwDirectModifyState; /* * Workspace for analyzing a foreign table. */ typedef struct PgFdwAnalyzeState { Relation rel; /* relcache entry for the foreign table */ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ List *retrieved_attrs; /* attr numbers retrieved by query */ /* collected sample rows */ HeapTuple *rows; /* array of size targrows */ int targrows; /* target # of sample rows */ int numrows; /* # of sample rows collected */ /* for random sampling */ double samplerows; /* # of rows fetched */ double rowstoskip; /* # of rows to skip before next sample */ ReservoirStateData rstate; /* state for reservoir sampling */ /* working memory contexts */ MemoryContext anl_cxt; /* context for per-analyze lifespan data */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ } PgFdwAnalyzeState; /* * This enum describes what's kept in the fdw_private list for a ForeignPath. * We store: * * 1) Boolean flag showing if the remote query has the final sort * 2) Boolean flag showing if the remote query has the LIMIT clause */ enum FdwPathPrivateIndex { /* has-final-sort flag (as a Boolean node) */ FdwPathPrivateHasFinalSort, /* has-limit flag (as a Boolean node) */ FdwPathPrivateHasLimit }; /* Struct for extra information passed to estimate_path_cost_size() */ typedef struct { PathTarget *target; bool has_final_sort; bool has_limit; double limit_tuples; int64 count_est; int64 offset_est; } PgFdwPathExtraData; /* * Identify the attribute where data conversion fails. */ typedef struct ConversionLocation { AttrNumber cur_attno; /* attribute number being processed, or 0 */ Relation rel; /* foreign table being processed, or NULL */ ForeignScanState *fsstate; /* plan node being processed, or NULL */ } ConversionLocation; /* Callback argument for ec_member_matches_foreign */ typedef struct { Expr *current; /* current expr, or NULL if not yet found */ List *already_used; /* expressions already dealt with */ } ec_member_foreign_arg; /* * SQL functions */ PG_FUNCTION_INFO_V1(postgres_fdw_handler); /* * FDW callback routines */ static void postgresGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid); static void postgresGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid); static ForeignScan *postgresGetForeignPlan(PlannerInfo *root, RelOptInfo *foreignrel, Oid foreigntableid, ForeignPath *best_path, List *tlist, List *scan_clauses, Plan *outer_plan); static void postgresBeginForeignScan(ForeignScanState *node, int eflags); static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node); static void postgresReScanForeignScan(ForeignScanState *node); static void postgresEndForeignScan(ForeignScanState *node); static void postgresAddForeignUpdateTargets(PlannerInfo *root, Index rtindex, RangeTblEntry *target_rte, Relation target_relation); static List *postgresPlanForeignModify(PlannerInfo *root, ModifyTable *plan, Index resultRelation, int subplan_index); static void postgresBeginForeignModify(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, List *fdw_private, int subplan_index, int eflags); static TupleTableSlot *postgresExecForeignInsert(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot); static TupleTableSlot **postgresExecForeignBatchInsert(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot **slots, TupleTableSlot **planSlots, int *numSlots); static int postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo); static TupleTableSlot *postgresExecForeignUpdate(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot); static TupleTableSlot *postgresExecForeignDelete(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot); static void postgresEndForeignModify(EState *estate, ResultRelInfo *resultRelInfo); static void postgresBeginForeignInsert(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo); static void postgresEndForeignInsert(EState *estate, ResultRelInfo *resultRelInfo); static int postgresIsForeignRelUpdatable(Relation rel); static bool postgresPlanDirectModify(PlannerInfo *root, ModifyTable *plan, Index resultRelation, int subplan_index); static void postgresBeginDirectModify(ForeignScanState *node, int eflags); static TupleTableSlot *postgresIterateDirectModify(ForeignScanState *node); static void postgresEndDirectModify(ForeignScanState *node); static void postgresExplainForeignScan(ForeignScanState *node, ExplainState *es); static void postgresExplainForeignModify(ModifyTableState *mtstate, ResultRelInfo *rinfo, List *fdw_private, int subplan_index, ExplainState *es); static void postgresExplainDirectModify(ForeignScanState *node, ExplainState *es); static void postgresExecForeignTruncate(List *rels, DropBehavior behavior, bool restart_seqs); static bool postgresAnalyzeForeignTable(Relation relation, AcquireSampleRowsFunc *func, BlockNumber *totalpages); static List *postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid); static void postgresGetForeignJoinPaths(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel, RelOptInfo *innerrel, JoinType jointype, JoinPathExtraData *extra); static bool postgresRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot); static void postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage, RelOptInfo *input_rel, RelOptInfo *output_rel, void *extra); static bool postgresIsForeignPathAsyncCapable(ForeignPath *path); static void postgresForeignAsyncRequest(AsyncRequest *areq); static void postgresForeignAsyncConfigureWait(AsyncRequest *areq); static void postgresForeignAsyncNotify(AsyncRequest *areq); /* * Helper functions */ static void estimate_path_cost_size(PlannerInfo *root, RelOptInfo *foreignrel, List *param_join_conds, List *pathkeys, PgFdwPathExtraData *fpextra, double *p_rows, int *p_width, Cost *p_startup_cost, Cost *p_total_cost); static void get_remote_estimate(const char *sql, PGconn *conn, double *rows, int *width, Cost *startup_cost, Cost *total_cost); static void adjust_foreign_grouping_path_cost(PlannerInfo *root, List *pathkeys, double retrieved_rows, double width, double limit_tuples, Cost *p_startup_cost, Cost *p_run_cost); static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, EquivalenceClass *ec, EquivalenceMember *em, void *arg); static void create_cursor(ForeignScanState *node); static void fetch_more_data(ForeignScanState *node); static void close_cursor(PGconn *conn, unsigned int cursor_number, PgFdwConnState *conn_state); static PgFdwModifyState *create_foreign_modify(EState *estate, RangeTblEntry *rte, ResultRelInfo *resultRelInfo, CmdType operation, Plan *subplan, char *query, List *target_attrs, int len, bool has_returning, List *retrieved_attrs); static TupleTableSlot **execute_foreign_modify(EState *estate, ResultRelInfo *resultRelInfo, CmdType operation, TupleTableSlot **slots, TupleTableSlot **planSlots, int *numSlots); static void prepare_foreign_modify(PgFdwModifyState *fmstate); static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, TupleTableSlot **slots, int numSlots); static void store_returning_result(PgFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res); static void finish_foreign_modify(PgFdwModifyState *fmstate); static void deallocate_query(PgFdwModifyState *fmstate); static List *build_remote_returning(Index rtindex, Relation rel, List *returningList); static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist); static void execute_dml_stmt(ForeignScanState *node); static TupleTableSlot *get_returning_data(ForeignScanState *node); static void init_returning_filter(PgFdwDirectModifyState *dmstate, List *fdw_scan_tlist, Index rtindex); static TupleTableSlot *apply_returning_filter(PgFdwDirectModifyState *dmstate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate); static void prepare_query_params(PlanState *node, List *fdw_exprs, int numParams, FmgrInfo **param_flinfo, List **param_exprs, const char ***param_values); static void process_query_params(ExprContext *econtext, FmgrInfo *param_flinfo, List *param_exprs, const char **param_values); static int postgresAcquireSampleRowsFunc(Relation relation, int elevel, HeapTuple *rows, int targrows, double *totalrows, double *totaldeadrows); static void analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate); static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch); static void fetch_more_data_begin(AsyncRequest *areq); static void complete_pending_request(AsyncRequest *areq); static HeapTuple make_tuple_from_result_row(PGresult *res, int row, Relation rel, AttInMetadata *attinmeta, List *retrieved_attrs, ForeignScanState *fsstate, MemoryContext temp_context); static void conversion_error_callback(void *arg); static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel, JoinPathExtraData *extra); static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, Node *havingQual); static List *get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel); static List *get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel); static void add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel, Path *epq_path); static void add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *grouped_rel, GroupPathExtraData *extra); static void add_foreign_ordered_paths(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *ordered_rel); static void add_foreign_final_paths(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *final_rel, FinalPathExtraData *extra); static void apply_server_options(PgFdwRelationInfo *fpinfo); static void apply_table_options(PgFdwRelationInfo *fpinfo); static void merge_fdw_options(PgFdwRelationInfo *fpinfo, const PgFdwRelationInfo *fpinfo_o, const PgFdwRelationInfo *fpinfo_i); static int get_batch_size_option(Relation rel); /* * Foreign-data wrapper handler function: return a struct with pointers * to my callback routines. */ Datum postgres_fdw_handler(PG_FUNCTION_ARGS) { FdwRoutine *routine = makeNode(FdwRoutine); /* Functions for scanning foreign tables */ routine->GetForeignRelSize = postgresGetForeignRelSize; routine->GetForeignPaths = postgresGetForeignPaths; routine->GetForeignPlan = postgresGetForeignPlan; routine->BeginForeignScan = postgresBeginForeignScan; routine->IterateForeignScan = postgresIterateForeignScan; routine->ReScanForeignScan = postgresReScanForeignScan; routine->EndForeignScan = postgresEndForeignScan; /* Functions for updating foreign tables */ routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets; routine->PlanForeignModify = postgresPlanForeignModify; routine->BeginForeignModify = postgresBeginForeignModify; routine->ExecForeignInsert = postgresExecForeignInsert; routine->ExecForeignBatchInsert = postgresExecForeignBatchInsert; routine->GetForeignModifyBatchSize = postgresGetForeignModifyBatchSize; routine->ExecForeignUpdate = postgresExecForeignUpdate; routine->ExecForeignDelete = postgresExecForeignDelete; routine->EndForeignModify = postgresEndForeignModify; routine->BeginForeignInsert = postgresBeginForeignInsert; routine->EndForeignInsert = postgresEndForeignInsert; routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable; routine->PlanDirectModify = postgresPlanDirectModify; routine->BeginDirectModify = postgresBeginDirectModify; routine->IterateDirectModify = postgresIterateDirectModify; routine->EndDirectModify = postgresEndDirectModify; /* Function for EvalPlanQual rechecks */ routine->RecheckForeignScan = postgresRecheckForeignScan; /* Support functions for EXPLAIN */ routine->ExplainForeignScan = postgresExplainForeignScan; routine->ExplainForeignModify = postgresExplainForeignModify; routine->ExplainDirectModify = postgresExplainDirectModify; /* Support function for TRUNCATE */ routine->ExecForeignTruncate = postgresExecForeignTruncate; /* Support functions for ANALYZE */ routine->AnalyzeForeignTable = postgresAnalyzeForeignTable; /* Support functions for IMPORT FOREIGN SCHEMA */ routine->ImportForeignSchema = postgresImportForeignSchema; /* Support functions for join push-down */ routine->GetForeignJoinPaths = postgresGetForeignJoinPaths; /* Support functions for upper relation push-down */ routine->GetForeignUpperPaths = postgresGetForeignUpperPaths; /* Support functions for asynchronous execution */ routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable; routine->ForeignAsyncRequest = postgresForeignAsyncRequest; routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait; routine->ForeignAsyncNotify = postgresForeignAsyncNotify; PG_RETURN_POINTER(routine); } /* * postgresGetForeignRelSize * Estimate # of rows and width of the result of the scan * * We should consider the effect of all baserestrictinfo clauses here, but * not any join clauses. */ static void postgresGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid) { PgFdwRelationInfo *fpinfo; ListCell *lc; RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root); /* * We use PgFdwRelationInfo to pass various information to subsequent * functions. */ fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo)); baserel->fdw_private = (void *) fpinfo; /* Base foreign tables need to be pushed down always. */ fpinfo->pushdown_safe = true; /* Look up foreign-table catalog info. */ fpinfo->table = GetForeignTable(foreigntableid); fpinfo->server = GetForeignServer(fpinfo->table->serverid); /* * Extract user-settable option values. Note that per-table settings of * use_remote_estimate, fetch_size and async_capable override per-server * settings of them, respectively. */ fpinfo->use_remote_estimate = false; fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST; fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST; fpinfo->shippable_extensions = NIL; fpinfo->fetch_size = 100; fpinfo->async_capable = false; apply_server_options(fpinfo); apply_table_options(fpinfo); /* * If the table or the server is configured to use remote estimates, * identify which user to do remote access as during planning. This * should match what ExecCheckRTEPerms() does. If we fail due to lack of * permissions, the query would have failed at runtime anyway. */ if (fpinfo->use_remote_estimate) { Oid userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid); } else fpinfo->user = NULL; /* * Identify which baserestrictinfo clauses can be sent to the remote * server and which can't. */ classifyConditions(root, baserel, baserel->baserestrictinfo, &fpinfo->remote_conds, &fpinfo->local_conds); /* * Identify which attributes will need to be retrieved from the remote * server. These include all attrs needed for joins or final output, plus * all attrs used in the local_conds. (Note: if we end up using a * parameterized scan, it's possible that some of the join clauses will be * sent to the remote and thus we wouldn't really need to retrieve the * columns used in them. Doesn't seem worth detecting that case though.) */ fpinfo->attrs_used = NULL; pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid, &fpinfo->attrs_used); foreach(lc, fpinfo->local_conds) { RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc); pull_varattnos((Node *) rinfo->clause, baserel->relid, &fpinfo->attrs_used); } /* * Compute the selectivity and cost of the local_conds, so we don't have * to do it over again for each path. The best we can do for these * conditions is to estimate selectivity on the basis of local statistics. */ fpinfo->local_conds_sel = clauselist_selectivity(root, fpinfo->local_conds, baserel->relid, JOIN_INNER, NULL); cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root); /* * Set # of retrieved rows and cached relation costs to some negative * value, so that we can detect when they are set to some sensible values, * during one (usually the first) of the calls to estimate_path_cost_size. */ fpinfo->retrieved_rows = -1; fpinfo->rel_startup_cost = -1; fpinfo->rel_total_cost = -1; /* * If the table or the server is configured to use remote estimates, * connect to the foreign server and execute EXPLAIN to estimate the * number of rows selected by the restriction clauses, as well as the * average row width. Otherwise, estimate using whatever statistics we * have locally, in a way similar to ordinary tables. */ if (fpinfo->use_remote_estimate) { /* * Get cost/size estimates with help of remote server. Save the * values in fpinfo so we don't need to do it again to generate the * basic foreign path. */ estimate_path_cost_size(root, baserel, NIL, NIL, NULL, &fpinfo->rows, &fpinfo->width, &fpinfo->startup_cost, &fpinfo->total_cost); /* Report estimated baserel size to planner. */ baserel->rows = fpinfo->rows; baserel->reltarget->width = fpinfo->width; } else { /* * If the foreign table has never been ANALYZEd, it will have * reltuples < 0, meaning "unknown". We can't do much if we're not * allowed to consult the remote server, but we can use a hack similar * to plancat.c's treatment of empty relations: use a minimum size * estimate of 10 pages, and divide by the column-datatype-based width * estimate to get the corresponding number of tuples. */ if (baserel->tuples < 0) { baserel->pages = 10; baserel->tuples = (10 * BLCKSZ) / (baserel->reltarget->width + MAXALIGN(SizeofHeapTupleHeader)); } /* Estimate baserel size as best we can with local statistics. */ set_baserel_size_estimates(root, baserel); /* Fill in basically-bogus cost estimates for use later. */ estimate_path_cost_size(root, baserel, NIL, NIL, NULL, &fpinfo->rows, &fpinfo->width, &fpinfo->startup_cost, &fpinfo->total_cost); } /* * fpinfo->relation_name gets the numeric rangetable index of the foreign * table RTE. (If this query gets EXPLAIN'd, we'll convert that to a * human-readable string at that time.) */ fpinfo->relation_name = psprintf("%u", baserel->relid); /* No outer and inner relations. */ fpinfo->make_outerrel_subquery = false; fpinfo->make_innerrel_subquery = false; fpinfo->lower_subquery_rels = NULL; /* Set the relation index. */ fpinfo->relation_index = baserel->relid; } /* * get_useful_ecs_for_relation * Determine which EquivalenceClasses might be involved in useful * orderings of this relation. * * This function is in some respects a mirror image of the core function * pathkeys_useful_for_merging: for a regular table, we know what indexes * we have and want to test whether any of them are useful. For a foreign * table, we don't know what indexes are present on the remote side but * want to speculate about which ones we'd like to use if they existed. * * This function returns a list of potentially-useful equivalence classes, * but it does not guarantee that an EquivalenceMember exists which contains * Vars only from the given relation. For example, given ft1 JOIN t1 ON * ft1.x + t1.x = 0, this function will say that the equivalence class * containing ft1.x + t1.x is potentially useful. Supposing ft1 is remote and * t1 is local (or on a different server), it will turn out that no useful * ORDER BY clause can be generated. It's not our job to figure that out * here; we're only interested in identifying relevant ECs. */ static List * get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel) { List *useful_eclass_list = NIL; ListCell *lc; Relids relids; /* * First, consider whether any active EC is potentially useful for a merge * join against this relation. */ if (rel->has_eclass_joins) { foreach(lc, root->eq_classes) { EquivalenceClass *cur_ec = (EquivalenceClass *) lfirst(lc); if (eclass_useful_for_merging(root, cur_ec, rel)) useful_eclass_list = lappend(useful_eclass_list, cur_ec); } } /* * Next, consider whether there are any non-EC derivable join clauses that * are merge-joinable. If the joininfo list is empty, we can exit * quickly. */ if (rel->joininfo == NIL) return useful_eclass_list; /* If this is a child rel, we must use the topmost parent rel to search. */ if (IS_OTHER_REL(rel)) { Assert(!bms_is_empty(rel->top_parent_relids)); relids = rel->top_parent_relids; } else relids = rel->relids; /* Check each join clause in turn. */ foreach(lc, rel->joininfo) { RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(lc); /* Consider only mergejoinable clauses */ if (restrictinfo->mergeopfamilies == NIL) continue; /* Make sure we've got canonical ECs. */ update_mergeclause_eclasses(root, restrictinfo); /* * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee * that left_ec and right_ec will be initialized, per comments in * distribute_qual_to_rels. * * We want to identify which side of this merge-joinable clause * contains columns from the relation produced by this RelOptInfo. We * test for overlap, not containment, because there could be extra * relations on either side. For example, suppose we've got something * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON * A.y = D.y. The input rel might be the joinrel between A and B, and * we'll consider the join clause A.y = D.y. relids contains a * relation not involved in the join class (B) and the equivalence * class for the left-hand side of the clause contains a relation not * involved in the input rel (C). Despite the fact that we have only * overlap and not containment in either direction, A.y is potentially * useful as a sort column. * * Note that it's even possible that relids overlaps neither side of * the join clause. For example, consider A LEFT JOIN B ON A.x = B.x * AND A.x = 1. The clause A.x = 1 will appear in B's joininfo list, * but overlaps neither side of B. In that case, we just skip this * join clause, since it doesn't suggest a useful sort order for this * relation. */ if (bms_overlap(relids, restrictinfo->right_ec->ec_relids)) useful_eclass_list = list_append_unique_ptr(useful_eclass_list, restrictinfo->right_ec); else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids)) useful_eclass_list = list_append_unique_ptr(useful_eclass_list, restrictinfo->left_ec); } return useful_eclass_list; } /* * get_useful_pathkeys_for_relation * Determine which orderings of a relation might be useful. * * Getting data in sorted order can be useful either because the requested * order matches the final output ordering for the overall query we're * planning, or because it enables an efficient merge join. Here, we try * to figure out which pathkeys to consider. */ static List * get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel) { List *useful_pathkeys_list = NIL; List *useful_eclass_list; PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private; EquivalenceClass *query_ec = NULL; ListCell *lc; /* * Pushing the query_pathkeys to the remote server is always worth * considering, because it might let us avoid a local sort. */ fpinfo->qp_is_pushdown_safe = false; if (root->query_pathkeys) { bool query_pathkeys_ok = true; foreach(lc, root->query_pathkeys) { PathKey *pathkey = (PathKey *) lfirst(lc); /* * The planner and executor don't have any clever strategy for * taking data sorted by a prefix of the query's pathkeys and * getting it to be sorted by all of those pathkeys. We'll just * end up resorting the entire data set. So, unless we can push * down all of the query pathkeys, forget it. */ if (!is_foreign_pathkey(root, rel, pathkey)) { query_pathkeys_ok = false; break; } } if (query_pathkeys_ok) { useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys)); fpinfo->qp_is_pushdown_safe = true; } } /* * Even if we're not using remote estimates, having the remote side do the * sort generally won't be any worse than doing it locally, and it might * be much better if the remote side can generate data in the right order * without needing a sort at all. However, what we're going to do next is * try to generate pathkeys that seem promising for possible merge joins, * and that's more speculative. A wrong choice might hurt quite a bit, so * bail out if we can't use remote estimates. */ if (!fpinfo->use_remote_estimate) return useful_pathkeys_list; /* Get the list of interesting EquivalenceClasses. */ useful_eclass_list = get_useful_ecs_for_relation(root, rel); /* Extract unique EC for query, if any, so we don't consider it again. */ if (list_length(root->query_pathkeys) == 1) { PathKey *query_pathkey = linitial(root->query_pathkeys); query_ec = query_pathkey->pk_eclass; } /* * As a heuristic, the only pathkeys we consider here are those of length * one. It's surely possible to consider more, but since each one we * choose to consider will generate a round-trip to the remote side, we * need to be a bit cautious here. It would sure be nice to have a local * cache of information about remote index definitions... */ foreach(lc, useful_eclass_list) { EquivalenceClass *cur_ec = lfirst(lc); PathKey *pathkey; /* If redundant with what we did above, skip it. */ if (cur_ec == query_ec) continue; /* Can't push down the sort if the EC's opfamily is not shippable. */ if (!is_shippable(linitial_oid(cur_ec->ec_opfamilies), OperatorFamilyRelationId, fpinfo)) continue; /* If no pushable expression for this rel, skip it. */ if (find_em_for_rel(root, cur_ec, rel) == NULL) continue; /* Looks like we can generate a pathkey, so let's do it. */ pathkey = make_canonical_pathkey(root, cur_ec, linitial_oid(cur_ec->ec_opfamilies), BTLessStrategyNumber, false); useful_pathkeys_list = lappend(useful_pathkeys_list, list_make1(pathkey)); } return useful_pathkeys_list; } /* * postgresGetForeignPaths * Create possible scan paths for a scan on the foreign table */ static void postgresGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid) { PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private; ForeignPath *path; List *ppi_list; ListCell *lc; /* * Create simplest ForeignScan path node and add it to baserel. This path * corresponds to SeqScan path of regular tables (though depending on what * baserestrict conditions we were able to send to remote, there might * actually be an indexscan happening there). We already did all the work * to estimate cost and size of this path. * * Although this path uses no join clauses, it could still have required * parameterization due to LATERAL refs in its tlist. */ path = create_foreignscan_path(root, baserel, NULL, /* default pathtarget */ fpinfo->rows, fpinfo->startup_cost, fpinfo->total_cost, NIL, /* no pathkeys */ baserel->lateral_relids, NULL, /* no extra plan */ NIL); /* no fdw_private list */ add_path(baserel, (Path *) path); /* Add paths with pathkeys */ add_paths_with_pathkeys_for_rel(root, baserel, NULL); /* * If we're not using remote estimates, stop here. We have no way to * estimate whether any join clauses would be worth sending across, so * don't bother building parameterized paths. */ if (!fpinfo->use_remote_estimate) return; /* * Thumb through all join clauses for the rel to identify which outer * relations could supply one or more safe-to-send-to-remote join clauses. * We'll build a parameterized path for each such outer relation. * * It's convenient to manage this by representing each candidate outer * relation by the ParamPathInfo node for it. We can then use the * ppi_clauses list in the ParamPathInfo node directly as a list of the * interesting join clauses for that rel. This takes care of the * possibility that there are multiple safe join clauses for such a rel, * and also ensures that we account for unsafe join clauses that we'll * still have to enforce locally (since the parameterized-path machinery * insists that we handle all movable clauses). */ ppi_list = NIL; foreach(lc, baserel->joininfo) { RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc); Relids required_outer; ParamPathInfo *param_info; /* Check if clause can be moved to this rel */ if (!join_clause_is_movable_to(rinfo, baserel)) continue; /* See if it is safe to send to remote */ if (!is_foreign_expr(root, baserel, rinfo->clause)) continue; /* Calculate required outer rels for the resulting path */ required_outer = bms_union(rinfo->clause_relids, baserel->lateral_relids); /* We do not want the foreign rel itself listed in required_outer */ required_outer = bms_del_member(required_outer, baserel->relid); /* * required_outer probably can't be empty here, but if it were, we * couldn't make a parameterized path. */ if (bms_is_empty(required_outer)) continue; /* Get the ParamPathInfo */ param_info = get_baserel_parampathinfo(root, baserel, required_outer); Assert(param_info != NULL); /* * Add it to list unless we already have it. Testing pointer equality * is OK since get_baserel_parampathinfo won't make duplicates. */ ppi_list = list_append_unique_ptr(ppi_list, param_info); } /* * The above scan examined only "generic" join clauses, not those that * were absorbed into EquivalenceClauses. See if we can make anything out * of EquivalenceClauses. */ if (baserel->has_eclass_joins) { /* * We repeatedly scan the eclass list looking for column references * (or expressions) belonging to the foreign rel. Each time we find * one, we generate a list of equivalence joinclauses for it, and then * see if any are safe to send to the remote. Repeat till there are * no more candidate EC members. */ ec_member_foreign_arg arg; arg.already_used = NIL; for (;;) { List *clauses; /* Make clauses, skipping any that join to lateral_referencers */ arg.current = NULL; clauses = generate_implied_equalities_for_column(root, baserel, ec_member_matches_foreign, (void *) &arg, baserel->lateral_referencers); /* Done if there are no more expressions in the foreign rel */ if (arg.current == NULL) { Assert(clauses == NIL); break; } /* Scan the extracted join clauses */ foreach(lc, clauses) { RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc); Relids required_outer; ParamPathInfo *param_info; /* Check if clause can be moved to this rel */ if (!join_clause_is_movable_to(rinfo, baserel)) continue; /* See if it is safe to send to remote */ if (!is_foreign_expr(root, baserel, rinfo->clause)) continue; /* Calculate required outer rels for the resulting path */ required_outer = bms_union(rinfo->clause_relids, baserel->lateral_relids); required_outer = bms_del_member(required_outer, baserel->relid); if (bms_is_empty(required_outer)) continue; /* Get the ParamPathInfo */ param_info = get_baserel_parampathinfo(root, baserel, required_outer); Assert(param_info != NULL); /* Add it to list unless we already have it */ ppi_list = list_append_unique_ptr(ppi_list, param_info); } /* Try again, now ignoring the expression we found this time */ arg.already_used = lappend(arg.already_used, arg.current); } } /* * Now build a path for each useful outer relation. */ foreach(lc, ppi_list) { ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc); double rows; int width; Cost startup_cost; Cost total_cost; /* Get a cost estimate from the remote */ estimate_path_cost_size(root, baserel, param_info->ppi_clauses, NIL, NULL, &rows, &width, &startup_cost, &total_cost); /* * ppi_rows currently won't get looked at by anything, but still we * may as well ensure that it matches our idea of the rowcount. */ param_info->ppi_rows = rows; /* Make the path */ path = create_foreignscan_path(root, baserel, NULL, /* default pathtarget */ rows, startup_cost, total_cost, NIL, /* no pathkeys */ param_info->ppi_req_outer, NULL, NIL); /* no fdw_private list */ add_path(baserel, (Path *) path); } } /* * postgresGetForeignPlan * Create ForeignScan plan node which implements selected best path */ static ForeignScan * postgresGetForeignPlan(PlannerInfo *root, RelOptInfo *foreignrel, Oid foreigntableid, ForeignPath *best_path, List *tlist, List *scan_clauses, Plan *outer_plan) { PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private; Index scan_relid; List *fdw_private; List *remote_exprs = NIL; List *local_exprs = NIL; List *params_list = NIL; List *fdw_scan_tlist = NIL; List *fdw_recheck_quals = NIL; List *retrieved_attrs; StringInfoData sql; bool has_final_sort = false; bool has_limit = false; ListCell *lc; /* * Get FDW private data created by postgresGetForeignUpperPaths(), if any. */ if (best_path->fdw_private) { has_final_sort = boolVal(list_nth(best_path->fdw_private, FdwPathPrivateHasFinalSort)); has_limit = boolVal(list_nth(best_path->fdw_private, FdwPathPrivateHasLimit)); } if (IS_SIMPLE_REL(foreignrel)) { /* * For base relations, set scan_relid as the relid of the relation. */ scan_relid = foreignrel->relid; /* * In a base-relation scan, we must apply the given scan_clauses. * * Separate the scan_clauses into those that can be executed remotely * and those that can't. baserestrictinfo clauses that were * previously determined to be safe or unsafe by classifyConditions * are found in fpinfo->remote_conds and fpinfo->local_conds. Anything * else in the scan_clauses list will be a join clause, which we have * to check for remote-safety. * * Note: the join clauses we see here should be the exact same ones * previously examined by postgresGetForeignPaths. Possibly it'd be * worth passing forward the classification work done then, rather * than repeating it here. * * This code must match "extract_actual_clauses(scan_clauses, false)" * except for the additional decision about remote versus local * execution. */ foreach(lc, scan_clauses) { RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc); /* Ignore any pseudoconstants, they're dealt with elsewhere */ if (rinfo->pseudoconstant) continue; if (list_member_ptr(fpinfo->remote_conds, rinfo)) remote_exprs = lappend(remote_exprs, rinfo->clause); else if (list_member_ptr(fpinfo->local_conds, rinfo)) local_exprs = lappend(local_exprs, rinfo->clause); else if (is_foreign_expr(root, foreignrel, rinfo->clause)) remote_exprs = lappend(remote_exprs, rinfo->clause); else local_exprs = lappend(local_exprs, rinfo->clause); } /* * For a base-relation scan, we have to support EPQ recheck, which * should recheck all the remote quals. */ fdw_recheck_quals = remote_exprs; } else { /* * Join relation or upper relation - set scan_relid to 0. */ scan_relid = 0; /* * For a join rel, baserestrictinfo is NIL and we are not considering * parameterization right now, so there should be no scan_clauses for * a joinrel or an upper rel either. */ Assert(!scan_clauses); /* * Instead we get the conditions to apply from the fdw_private * structure. */ remote_exprs = extract_actual_clauses(fpinfo->remote_conds, false); local_exprs = extract_actual_clauses(fpinfo->local_conds, false); /* * We leave fdw_recheck_quals empty in this case, since we never need * to apply EPQ recheck clauses. In the case of a joinrel, EPQ * recheck is handled elsewhere --- see postgresGetForeignJoinPaths(). * If we're planning an upperrel (ie, remote grouping or aggregation) * then there's no EPQ to do because SELECT FOR UPDATE wouldn't be * allowed, and indeed we *can't* put the remote clauses into * fdw_recheck_quals because the unaggregated Vars won't be available * locally. */ /* Build the list of columns to be fetched from the foreign server. */ fdw_scan_tlist = build_tlist_to_deparse(foreignrel); /* * Ensure that the outer plan produces a tuple whose descriptor * matches our scan tuple slot. Also, remove the local conditions * from outer plan's quals, lest they be evaluated twice, once by the * local plan and once by the scan. */ if (outer_plan) { ListCell *lc; /* * Right now, we only consider grouping and aggregation beyond * joins. Queries involving aggregates or grouping do not require * EPQ mechanism, hence should not have an outer plan here. */ Assert(!IS_UPPER_REL(foreignrel)); /* * First, update the plan's qual list if possible. In some cases * the quals might be enforced below the topmost plan level, in * which case we'll fail to remove them; it's not worth working * harder than this. */ foreach(lc, local_exprs) { Node *qual = lfirst(lc); outer_plan->qual = list_delete(outer_plan->qual, qual); /* * For an inner join the local conditions of foreign scan plan * can be part of the joinquals as well. (They might also be * in the mergequals or hashquals, but we can't touch those * without breaking the plan.) */ if (IsA(outer_plan, NestLoop) || IsA(outer_plan, MergeJoin) || IsA(outer_plan, HashJoin)) { Join *join_plan = (Join *) outer_plan; if (join_plan->jointype == JOIN_INNER) join_plan->joinqual = list_delete(join_plan->joinqual, qual); } } /* * Now fix the subplan's tlist --- this might result in inserting * a Result node atop the plan tree. */ outer_plan = change_plan_targetlist(outer_plan, fdw_scan_tlist, best_path->path.parallel_safe); } } /* * Build the query string to be sent for execution, and identify * expressions to be sent as parameters. */ initStringInfo(&sql); deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist, remote_exprs, best_path->path.pathkeys, has_final_sort, has_limit, false, &retrieved_attrs, ¶ms_list); /* Remember remote_exprs for possible use by postgresPlanDirectModify */ fpinfo->final_remote_exprs = remote_exprs; /* * Build the fdw_private list that will be available to the executor. * Items in the list must match order in enum FdwScanPrivateIndex. */ fdw_private = list_make3(makeString(sql.data), retrieved_attrs, makeInteger(fpinfo->fetch_size)); if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel)) fdw_private = lappend(fdw_private, makeString(fpinfo->relation_name)); /* * Create the ForeignScan node for the given relation. * * Note that the remote parameter expressions are stored in the fdw_exprs * field of the finished plan node; we can't keep them in private state * because then they wouldn't be subject to later planner processing. */ return make_foreignscan(tlist, local_exprs, scan_relid, params_list, fdw_private, fdw_scan_tlist, fdw_recheck_quals, outer_plan); } /* * Construct a tuple descriptor for the scan tuples handled by a foreign join. */ static TupleDesc get_tupdesc_for_join_scan_tuples(ForeignScanState *node) { ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan; EState *estate = node->ss.ps.state; TupleDesc tupdesc; /* * The core code has already set up a scan tuple slot based on * fsplan->fdw_scan_tlist, and this slot's tupdesc is mostly good enough, * but there's one case where it isn't. If we have any whole-row row * identifier Vars, they may have vartype RECORD, and we need to replace * that with the associated table's actual composite type. This ensures * that when we read those ROW() expression values from the remote server, * we can convert them to a composite type the local server knows. */ tupdesc = CreateTupleDescCopy(node->ss.ss_ScanTupleSlot->tts_tupleDescriptor); for (int i = 0; i < tupdesc->natts; i++) { Form_pg_attribute att = TupleDescAttr(tupdesc, i); Var *var; RangeTblEntry *rte; Oid reltype; /* Nothing to do if it's not a generic RECORD attribute */ if (att->atttypid != RECORDOID || att->atttypmod >= 0) continue; /* * If we can't identify the referenced table, do nothing. This'll * likely lead to failure later, but perhaps we can muddle through. */ var = (Var *) list_nth_node(TargetEntry, fsplan->fdw_scan_tlist, i)->expr; if (!IsA(var, Var) || var->varattno != 0) continue; rte = list_nth(estate->es_range_table, var->varno - 1); if (rte->rtekind != RTE_RELATION) continue; reltype = get_rel_type_id(rte->relid); if (!OidIsValid(reltype)) continue; att->atttypid = reltype; /* shouldn't need to change anything else */ } return tupdesc; } /* * postgresBeginForeignScan * Initiate an executor scan of a foreign PostgreSQL table. */ static void postgresBeginForeignScan(ForeignScanState *node, int eflags) { ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan; EState *estate = node->ss.ps.state; PgFdwScanState *fsstate; RangeTblEntry *rte; Oid userid; ForeignTable *table; UserMapping *user; int rtindex; int numParams; /* * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL. */ if (eflags & EXEC_FLAG_EXPLAIN_ONLY) return; /* * We'll save private state in node->fdw_state. */ fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState)); node->fdw_state = (void *) fsstate; /* * Identify which user to do the remote access as. This should match what * ExecCheckRTEPerms() does. In case of a join or aggregate, use the * lowest-numbered member RTE as a representative; we would get the same * result from any. */ if (fsplan->scan.scanrelid > 0) rtindex = fsplan->scan.scanrelid; else rtindex = bms_next_member(fsplan->fs_relids, -1); rte = exec_rt_fetch(rtindex, estate); userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); /* Get info about foreign table. */ table = GetForeignTable(rte->relid); user = GetUserMapping(userid, table->serverid); /* * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ fsstate->conn = GetConnection(user, false, &fsstate->conn_state); /* Assign a unique ID for my cursor */ fsstate->cursor_number = GetCursorNumber(fsstate->conn); fsstate->cursor_exists = false; /* Get private info created by planner functions. */ fsstate->query = strVal(list_nth(fsplan->fdw_private, FdwScanPrivateSelectSql)); fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private, FdwScanPrivateRetrievedAttrs); fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private, FdwScanPrivateFetchSize)); /* Create contexts for batches of tuples and per-tuple temp workspace. */ fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt, "postgres_fdw tuple data", ALLOCSET_DEFAULT_SIZES); fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, "postgres_fdw temporary data", ALLOCSET_SMALL_SIZES); /* * Get info we'll need for converting data fetched from the foreign server * into local representation and error reporting during that process. */ if (fsplan->scan.scanrelid > 0) { fsstate->rel = node->ss.ss_currentRelation; fsstate->tupdesc = RelationGetDescr(fsstate->rel); } else { fsstate->rel = NULL; fsstate->tupdesc = get_tupdesc_for_join_scan_tuples(node); } fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc); /* * Prepare for processing of parameters used in remote query, if any. */ numParams = list_length(fsplan->fdw_exprs); fsstate->numParams = numParams; if (numParams > 0) prepare_query_params((PlanState *) node, fsplan->fdw_exprs, numParams, &fsstate->param_flinfo, &fsstate->param_exprs, &fsstate->param_values); /* Set the async-capable flag */ fsstate->async_capable = node->ss.ps.async_capable; } /* * postgresIterateForeignScan * Retrieve next row from the result set, or clear tuple slot to indicate * EOF. */ static TupleTableSlot * postgresIterateForeignScan(ForeignScanState *node) { PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; /* * In sync mode, if this is the first call after Begin or ReScan, we need * to create the cursor on the remote side. In async mode, we would have * already created the cursor before we get here, even if this is the * first call after Begin or ReScan. */ if (!fsstate->cursor_exists) create_cursor(node); /* * Get some more tuples, if we've run out. */ if (fsstate->next_tuple >= fsstate->num_tuples) { /* In async mode, just clear tuple slot. */ if (fsstate->async_capable) return ExecClearTuple(slot); /* No point in another fetch if we already detected EOF, though. */ if (!fsstate->eof_reached) fetch_more_data(node); /* If we didn't get any tuples, must be end of data. */ if (fsstate->next_tuple >= fsstate->num_tuples) return ExecClearTuple(slot); } /* * Return the next tuple. */ ExecStoreHeapTuple(fsstate->tuples[fsstate->next_tuple++], slot, false); return slot; } /* * postgresReScanForeignScan * Restart the scan. */ static void postgresReScanForeignScan(ForeignScanState *node) { PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; char sql[64]; PGresult *res; /* If we haven't created the cursor yet, nothing to do. */ if (!fsstate->cursor_exists) return; /* * If the node is async-capable, and an asynchronous fetch for it has been * begun, the asynchronous fetch might not have yet completed. Check if * the node is async-capable, and an asynchronous fetch for it is still in * progress; if so, complete the asynchronous fetch before restarting the * scan. */ if (fsstate->async_capable && fsstate->conn_state->pendingAreq && fsstate->conn_state->pendingAreq->requestee == (PlanState *) node) fetch_more_data(node); /* * If any internal parameters affecting this node have changed, we'd * better destroy and recreate the cursor. Otherwise, rewinding it should * be good enough. If we've only fetched zero or one batch, we needn't * even rewind the cursor, just rescan what we have. */ if (node->ss.ps.chgParam != NULL) { fsstate->cursor_exists = false; snprintf(sql, sizeof(sql), "CLOSE c%u", fsstate->cursor_number); } else if (fsstate->fetch_ct_2 > 1) { snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u", fsstate->cursor_number); } else { /* Easy: just rescan what we already have in memory, if anything */ fsstate->next_tuple = 0; return; } /* * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, fsstate->conn, true, sql); PQclear(res); /* Now force a fresh FETCH. */ fsstate->tuples = NULL; fsstate->num_tuples = 0; fsstate->next_tuple = 0; fsstate->fetch_ct_2 = 0; fsstate->eof_reached = false; } /* * postgresEndForeignScan * Finish scanning foreign table and dispose objects used for this scan */ static void postgresEndForeignScan(ForeignScanState *node) { PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; /* if fsstate is NULL, we are in EXPLAIN; nothing to do */ if (fsstate == NULL) return; /* Close the cursor if open, to prevent accumulation of cursors */ if (fsstate->cursor_exists) close_cursor(fsstate->conn, fsstate->cursor_number, fsstate->conn_state); /* Release remote connection */ ReleaseConnection(fsstate->conn); fsstate->conn = NULL; /* MemoryContexts will be deleted automatically. */ } /* * postgresAddForeignUpdateTargets * Add resjunk column(s) needed for update/delete on a foreign table */ static void postgresAddForeignUpdateTargets(PlannerInfo *root, Index rtindex, RangeTblEntry *target_rte, Relation target_relation) { Var *var; /* * In postgres_fdw, what we need is the ctid, same as for a regular table. */ /* Make a Var representing the desired value */ var = makeVar(rtindex, SelfItemPointerAttributeNumber, TIDOID, -1, InvalidOid, 0); /* Register it as a row-identity column needed by this target rel */ add_row_identity_var(root, var, rtindex, "ctid"); } /* * postgresPlanForeignModify * Plan an insert/update/delete operation on a foreign table */ static List * postgresPlanForeignModify(PlannerInfo *root, ModifyTable *plan, Index resultRelation, int subplan_index) { CmdType operation = plan->operation; RangeTblEntry *rte = planner_rt_fetch(resultRelation, root); Relation rel; StringInfoData sql; List *targetAttrs = NIL; List *withCheckOptionList = NIL; List *returningList = NIL; List *retrieved_attrs = NIL; bool doNothing = false; int values_end_len = -1; initStringInfo(&sql); /* * Core code already has some lock on each rel being planned, so we can * use NoLock here. */ rel = table_open(rte->relid, NoLock); /* * In an INSERT, we transmit all columns that are defined in the foreign * table. In an UPDATE, if there are BEFORE ROW UPDATE triggers on the * foreign table, we transmit all columns like INSERT; else we transmit * only columns that were explicitly targets of the UPDATE, so as to avoid * unnecessary data transmission. (We can't do that for INSERT since we * would miss sending default values for columns not listed in the source * statement, and for UPDATE if there are BEFORE ROW UPDATE triggers since * those triggers might change values for non-target columns, in which * case we would miss sending changed values for those columns.) */ if (operation == CMD_INSERT || (operation == CMD_UPDATE && rel->trigdesc && rel->trigdesc->trig_update_before_row)) { TupleDesc tupdesc = RelationGetDescr(rel); int attnum; for (attnum = 1; attnum <= tupdesc->natts; attnum++) { Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); if (!attr->attisdropped) targetAttrs = lappend_int(targetAttrs, attnum); } } else if (operation == CMD_UPDATE) { int col; RelOptInfo *rel = find_base_rel(root, resultRelation); Bitmapset *allUpdatedCols = get_rel_all_updated_cols(root, rel); col = -1; while ((col = bms_next_member(allUpdatedCols, col)) >= 0) { /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */ AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber; if (attno <= InvalidAttrNumber) /* shouldn't happen */ elog(ERROR, "system-column update is not supported"); targetAttrs = lappend_int(targetAttrs, attno); } } /* * Extract the relevant WITH CHECK OPTION list if any. */ if (plan->withCheckOptionLists) withCheckOptionList = (List *) list_nth(plan->withCheckOptionLists, subplan_index); /* * Extract the relevant RETURNING list if any. */ if (plan->returningLists) returningList = (List *) list_nth(plan->returningLists, subplan_index); /* * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification * should have already been rejected in the optimizer, as presently there * is no way to recognize an arbiter index on a foreign table. Only DO * NOTHING is supported without an inference specification. */ if (plan->onConflictAction == ONCONFLICT_NOTHING) doNothing = true; else if (plan->onConflictAction != ONCONFLICT_NONE) elog(ERROR, "unexpected ON CONFLICT specification: %d", (int) plan->onConflictAction); /* * Construct the SQL command string. */ switch (operation) { case CMD_INSERT: deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing, withCheckOptionList, returningList, &retrieved_attrs, &values_end_len); break; case CMD_UPDATE: deparseUpdateSql(&sql, rte, resultRelation, rel, targetAttrs, withCheckOptionList, returningList, &retrieved_attrs); break; case CMD_DELETE: deparseDeleteSql(&sql, rte, resultRelation, rel, returningList, &retrieved_attrs); break; default: elog(ERROR, "unexpected operation: %d", (int) operation); break; } table_close(rel, NoLock); /* * Build the fdw_private list that will be available to the executor. * Items in the list must match enum FdwModifyPrivateIndex, above. */ return list_make5(makeString(sql.data), targetAttrs, makeInteger(values_end_len), makeBoolean((retrieved_attrs != NIL)), retrieved_attrs); } /* * postgresBeginForeignModify * Begin an insert/update/delete operation on a foreign table */ static void postgresBeginForeignModify(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, List *fdw_private, int subplan_index, int eflags) { PgFdwModifyState *fmstate; char *query; List *target_attrs; bool has_returning; int values_end_len; List *retrieved_attrs; RangeTblEntry *rte; /* * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState * stays NULL. */ if (eflags & EXEC_FLAG_EXPLAIN_ONLY) return; /* Deconstruct fdw_private data. */ query = strVal(list_nth(fdw_private, FdwModifyPrivateUpdateSql)); target_attrs = (List *) list_nth(fdw_private, FdwModifyPrivateTargetAttnums); values_end_len = intVal(list_nth(fdw_private, FdwModifyPrivateLen)); has_returning = boolVal(list_nth(fdw_private, FdwModifyPrivateHasReturning)); retrieved_attrs = (List *) list_nth(fdw_private, FdwModifyPrivateRetrievedAttrs); /* Find RTE. */ rte = exec_rt_fetch(resultRelInfo->ri_RangeTableIndex, mtstate->ps.state); /* Construct an execution state. */ fmstate = create_foreign_modify(mtstate->ps.state, rte, resultRelInfo, mtstate->operation, outerPlanState(mtstate)->plan, query, target_attrs, values_end_len, has_returning, retrieved_attrs); resultRelInfo->ri_FdwState = fmstate; } /* * postgresExecForeignInsert * Insert one row into a foreign table */ static TupleTableSlot * postgresExecForeignInsert(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot) { PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; TupleTableSlot **rslot; int numSlots = 1; /* * If the fmstate has aux_fmstate set, use the aux_fmstate (see * postgresBeginForeignInsert()) */ if (fmstate->aux_fmstate) resultRelInfo->ri_FdwState = fmstate->aux_fmstate; rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT, &slot, &planSlot, &numSlots); /* Revert that change */ if (fmstate->aux_fmstate) resultRelInfo->ri_FdwState = fmstate; return rslot ? *rslot : NULL; } /* * postgresExecForeignBatchInsert * Insert multiple rows into a foreign table */ static TupleTableSlot ** postgresExecForeignBatchInsert(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot **slots, TupleTableSlot **planSlots, int *numSlots) { PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; TupleTableSlot **rslot; /* * If the fmstate has aux_fmstate set, use the aux_fmstate (see * postgresBeginForeignInsert()) */ if (fmstate->aux_fmstate) resultRelInfo->ri_FdwState = fmstate->aux_fmstate; rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT, slots, planSlots, numSlots); /* Revert that change */ if (fmstate->aux_fmstate) resultRelInfo->ri_FdwState = fmstate; return rslot; } /* * postgresGetForeignModifyBatchSize * Determine the maximum number of tuples that can be inserted in bulk * * Returns the batch size specified for server or table. When batching is not * allowed (e.g. for tables with BEFORE/AFTER ROW triggers or with RETURNING * clause), returns 1. */ static int postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo) { int batch_size; PgFdwModifyState *fmstate = resultRelInfo->ri_FdwState ? (PgFdwModifyState *) resultRelInfo->ri_FdwState : NULL; /* should be called only once */ Assert(resultRelInfo->ri_BatchSize == 0); /* * Should never get called when the insert is being performed as part of a * row movement operation. */ Assert(fmstate == NULL || fmstate->aux_fmstate == NULL); /* * In EXPLAIN without ANALYZE, ri_FdwState is NULL, so we have to lookup * the option directly in server/table options. Otherwise just use the * value we determined earlier. */ if (fmstate) batch_size = fmstate->batch_size; else batch_size = get_batch_size_option(resultRelInfo->ri_RelationDesc); /* * Disable batching when we have to use RETURNING, there are any * BEFORE/AFTER ROW INSERT triggers on the foreign table, or there are any * WITH CHECK OPTION constraints from parent views. * * When there are any BEFORE ROW INSERT triggers on the table, we can't * support it, because such triggers might query the table we're inserting * into and act differently if the tuples that have already been processed * and prepared for insertion are not there. */ if (resultRelInfo->ri_projectReturning != NULL || resultRelInfo->ri_WithCheckOptions != NIL || (resultRelInfo->ri_TrigDesc && (resultRelInfo->ri_TrigDesc->trig_insert_before_row || resultRelInfo->ri_TrigDesc->trig_insert_after_row))) return 1; /* * Otherwise use the batch size specified for server/table. The number of * parameters in a batch is limited to 65535 (uint16), so make sure we * don't exceed this limit by using the maximum batch_size possible. */ if (fmstate && fmstate->p_nums > 0) batch_size = Min(batch_size, PQ_QUERY_PARAM_MAX_LIMIT / fmstate->p_nums); return batch_size; } /* * postgresExecForeignUpdate * Update one row in a foreign table */ static TupleTableSlot * postgresExecForeignUpdate(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot) { TupleTableSlot **rslot; int numSlots = 1; rslot = execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE, &slot, &planSlot, &numSlots); return rslot ? rslot[0] : NULL; } /* * postgresExecForeignDelete * Delete one row from a foreign table */ static TupleTableSlot * postgresExecForeignDelete(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot) { TupleTableSlot **rslot; int numSlots = 1; rslot = execute_foreign_modify(estate, resultRelInfo, CMD_DELETE, &slot, &planSlot, &numSlots); return rslot ? rslot[0] : NULL; } /* * postgresEndForeignModify * Finish an insert/update/delete operation on a foreign table */ static void postgresEndForeignModify(EState *estate, ResultRelInfo *resultRelInfo) { PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; /* If fmstate is NULL, we are in EXPLAIN; nothing to do */ if (fmstate == NULL) return; /* Destroy the execution state */ finish_foreign_modify(fmstate); } /* * postgresBeginForeignInsert * Begin an insert operation on a foreign table */ static void postgresBeginForeignInsert(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo) { PgFdwModifyState *fmstate; ModifyTable *plan = castNode(ModifyTable, mtstate->ps.plan); EState *estate = mtstate->ps.state; Index resultRelation; Relation rel = resultRelInfo->ri_RelationDesc; RangeTblEntry *rte; TupleDesc tupdesc = RelationGetDescr(rel); int attnum; int values_end_len; StringInfoData sql; List *targetAttrs = NIL; List *retrieved_attrs = NIL; bool doNothing = false; /* * If the foreign table we are about to insert routed rows into is also an * UPDATE subplan result rel that will be updated later, proceeding with * the INSERT will result in the later UPDATE incorrectly modifying those * routed rows, so prevent the INSERT --- it would be nice if we could * handle this case; but for now, throw an error for safety. */ if (plan && plan->operation == CMD_UPDATE && (resultRelInfo->ri_usesFdwDirectModify || resultRelInfo->ri_FdwState)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot route tuples into foreign table to be updated \"%s\"", RelationGetRelationName(rel)))); initStringInfo(&sql); /* We transmit all columns that are defined in the foreign table. */ for (attnum = 1; attnum <= tupdesc->natts; attnum++) { Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); if (!attr->attisdropped) targetAttrs = lappend_int(targetAttrs, attnum); } /* Check if we add the ON CONFLICT clause to the remote query. */ if (plan) { OnConflictAction onConflictAction = plan->onConflictAction; /* We only support DO NOTHING without an inference specification. */ if (onConflictAction == ONCONFLICT_NOTHING) doNothing = true; else if (onConflictAction != ONCONFLICT_NONE) elog(ERROR, "unexpected ON CONFLICT specification: %d", (int) onConflictAction); } /* * If the foreign table is a partition that doesn't have a corresponding * RTE entry, we need to create a new RTE describing the foreign table for * use by deparseInsertSql and create_foreign_modify() below, after first * copying the parent's RTE and modifying some fields to describe the * foreign partition to work on. However, if this is invoked by UPDATE, * the existing RTE may already correspond to this partition if it is one * of the UPDATE subplan target rels; in that case, we can just use the * existing RTE as-is. */ if (resultRelInfo->ri_RangeTableIndex == 0) { ResultRelInfo *rootResultRelInfo = resultRelInfo->ri_RootResultRelInfo; rte = exec_rt_fetch(rootResultRelInfo->ri_RangeTableIndex, estate); rte = copyObject(rte); rte->relid = RelationGetRelid(rel); rte->relkind = RELKIND_FOREIGN_TABLE; /* * For UPDATE, we must use the RT index of the first subplan target * rel's RTE, because the core code would have built expressions for * the partition, such as RETURNING, using that RT index as varno of * Vars contained in those expressions. */ if (plan && plan->operation == CMD_UPDATE && rootResultRelInfo->ri_RangeTableIndex == plan->rootRelation) resultRelation = mtstate->resultRelInfo[0].ri_RangeTableIndex; else resultRelation = rootResultRelInfo->ri_RangeTableIndex; } else { resultRelation = resultRelInfo->ri_RangeTableIndex; rte = exec_rt_fetch(resultRelation, estate); } /* Construct the SQL command string. */ deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing, resultRelInfo->ri_WithCheckOptions, resultRelInfo->ri_returningList, &retrieved_attrs, &values_end_len); /* Construct an execution state. */ fmstate = create_foreign_modify(mtstate->ps.state, rte, resultRelInfo, CMD_INSERT, NULL, sql.data, targetAttrs, values_end_len, retrieved_attrs != NIL, retrieved_attrs); /* * If the given resultRelInfo already has PgFdwModifyState set, it means * the foreign table is an UPDATE subplan result rel; in which case, store * the resulting state into the aux_fmstate of the PgFdwModifyState. */ if (resultRelInfo->ri_FdwState) { Assert(plan && plan->operation == CMD_UPDATE); Assert(resultRelInfo->ri_usesFdwDirectModify == false); ((PgFdwModifyState *) resultRelInfo->ri_FdwState)->aux_fmstate = fmstate; } else resultRelInfo->ri_FdwState = fmstate; } /* * postgresEndForeignInsert * Finish an insert operation on a foreign table */ static void postgresEndForeignInsert(EState *estate, ResultRelInfo *resultRelInfo) { PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; Assert(fmstate != NULL); /* * If the fmstate has aux_fmstate set, get the aux_fmstate (see * postgresBeginForeignInsert()) */ if (fmstate->aux_fmstate) fmstate = fmstate->aux_fmstate; /* Destroy the execution state */ finish_foreign_modify(fmstate); } /* * postgresIsForeignRelUpdatable * Determine whether a foreign table supports INSERT, UPDATE and/or * DELETE. */ static int postgresIsForeignRelUpdatable(Relation rel) { bool updatable; ForeignTable *table; ForeignServer *server; ListCell *lc; /* * By default, all postgres_fdw foreign tables are assumed updatable. This * can be overridden by a per-server setting, which in turn can be * overridden by a per-table setting. */ updatable = true; table = GetForeignTable(RelationGetRelid(rel)); server = GetForeignServer(table->serverid); foreach(lc, server->options) { DefElem *def = (DefElem *) lfirst(lc); if (strcmp(def->defname, "updatable") == 0) updatable = defGetBoolean(def); } foreach(lc, table->options) { DefElem *def = (DefElem *) lfirst(lc); if (strcmp(def->defname, "updatable") == 0) updatable = defGetBoolean(def); } /* * Currently "updatable" means support for INSERT, UPDATE and DELETE. */ return updatable ? (1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0; } /* * postgresRecheckForeignScan * Execute a local join execution plan for a foreign join */ static bool postgresRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot) { Index scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid; PlanState *outerPlan = outerPlanState(node); TupleTableSlot *result; /* For base foreign relations, it suffices to set fdw_recheck_quals */ if (scanrelid > 0) return true; Assert(outerPlan != NULL); /* Execute a local join execution plan */ result = ExecProcNode(outerPlan); if (TupIsNull(result)) return false; /* Store result in the given slot */ ExecCopySlot(slot, result); return true; } /* * find_modifytable_subplan * Helper routine for postgresPlanDirectModify to find the * ModifyTable subplan node that scans the specified RTI. * * Returns NULL if the subplan couldn't be identified. That's not a fatal * error condition, we just abandon trying to do the update directly. */ static ForeignScan * find_modifytable_subplan(PlannerInfo *root, ModifyTable *plan, Index rtindex, int subplan_index) { Plan *subplan = outerPlan(plan); /* * The cases we support are (1) the desired ForeignScan is the immediate * child of ModifyTable, or (2) it is the subplan_index'th child of an * Append node that is the immediate child of ModifyTable. There is no * point in looking further down, as that would mean that local joins are * involved, so we can't do the update directly. * * There could be a Result atop the Append too, acting to compute the * UPDATE targetlist values. We ignore that here; the tlist will be * checked by our caller. * * In principle we could examine all the children of the Append, but it's * currently unlikely that the core planner would generate such a plan * with the children out-of-order. Moreover, such a search risks costing * O(N^2) time when there are a lot of children. */ if (IsA(subplan, Append)) { Append *appendplan = (Append *) subplan; if (subplan_index < list_length(appendplan->appendplans)) subplan = (Plan *) list_nth(appendplan->appendplans, subplan_index); } else if (IsA(subplan, Result) && outerPlan(subplan) != NULL && IsA(outerPlan(subplan), Append)) { Append *appendplan = (Append *) outerPlan(subplan); if (subplan_index < list_length(appendplan->appendplans)) subplan = (Plan *) list_nth(appendplan->appendplans, subplan_index); } /* Now, have we got a ForeignScan on the desired rel? */ if (IsA(subplan, ForeignScan)) { ForeignScan *fscan = (ForeignScan *) subplan; if (bms_is_member(rtindex, fscan->fs_relids)) return fscan; } return NULL; } /* * postgresPlanDirectModify * Consider a direct foreign table modification * * Decide whether it is safe to modify a foreign table directly, and if so, * rewrite subplan accordingly. */ static bool postgresPlanDirectModify(PlannerInfo *root, ModifyTable *plan, Index resultRelation, int subplan_index) { CmdType operation = plan->operation; RelOptInfo *foreignrel; RangeTblEntry *rte; PgFdwRelationInfo *fpinfo; Relation rel; StringInfoData sql; ForeignScan *fscan; List *processed_tlist = NIL; List *targetAttrs = NIL; List *remote_exprs; List *params_list = NIL; List *returningList = NIL; List *retrieved_attrs = NIL; /* * Decide whether it is safe to modify a foreign table directly. */ /* * The table modification must be an UPDATE or DELETE. */ if (operation != CMD_UPDATE && operation != CMD_DELETE) return false; /* * Try to locate the ForeignScan subplan that's scanning resultRelation. */ fscan = find_modifytable_subplan(root, plan, resultRelation, subplan_index); if (!fscan) return false; /* * It's unsafe to modify a foreign table directly if there are any quals * that should be evaluated locally. */ if (fscan->scan.plan.qual != NIL) return false; /* Safe to fetch data about the target foreign rel */ if (fscan->scan.scanrelid == 0) { foreignrel = find_join_rel(root, fscan->fs_relids); /* We should have a rel for this foreign join. */ Assert(foreignrel); } else foreignrel = root->simple_rel_array[resultRelation]; rte = root->simple_rte_array[resultRelation]; fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private; /* * It's unsafe to update a foreign table directly, if any expressions to * assign to the target columns are unsafe to evaluate remotely. */ if (operation == CMD_UPDATE) { ListCell *lc, *lc2; /* * The expressions of concern are the first N columns of the processed * targetlist, where N is the length of the rel's update_colnos. */ get_translated_update_targetlist(root, resultRelation, &processed_tlist, &targetAttrs); forboth(lc, processed_tlist, lc2, targetAttrs) { TargetEntry *tle = lfirst_node(TargetEntry, lc); AttrNumber attno = lfirst_int(lc2); /* update's new-value expressions shouldn't be resjunk */ Assert(!tle->resjunk); if (attno <= InvalidAttrNumber) /* shouldn't happen */ elog(ERROR, "system-column update is not supported"); if (!is_foreign_expr(root, foreignrel, (Expr *) tle->expr)) return false; } } /* * Ok, rewrite subplan so as to modify the foreign table directly. */ initStringInfo(&sql); /* * Core code already has some lock on each rel being planned, so we can * use NoLock here. */ rel = table_open(rte->relid, NoLock); /* * Recall the qual clauses that must be evaluated remotely. (These are * bare clauses not RestrictInfos, but deparse.c's appendConditions() * doesn't care.) */ remote_exprs = fpinfo->final_remote_exprs; /* * Extract the relevant RETURNING list if any. */ if (plan->returningLists) { returningList = (List *) list_nth(plan->returningLists, subplan_index); /* * When performing an UPDATE/DELETE .. RETURNING on a join directly, * we fetch from the foreign server any Vars specified in RETURNING * that refer not only to the target relation but to non-target * relations. So we'll deparse them into the RETURNING clause of the * remote query; use a targetlist consisting of them instead, which * will be adjusted to be new fdw_scan_tlist of the foreign-scan plan * node below. */ if (fscan->scan.scanrelid == 0) returningList = build_remote_returning(resultRelation, rel, returningList); } /* * Construct the SQL command string. */ switch (operation) { case CMD_UPDATE: deparseDirectUpdateSql(&sql, root, resultRelation, rel, foreignrel, processed_tlist, targetAttrs, remote_exprs, ¶ms_list, returningList, &retrieved_attrs); break; case CMD_DELETE: deparseDirectDeleteSql(&sql, root, resultRelation, rel, foreignrel, remote_exprs, ¶ms_list, returningList, &retrieved_attrs); break; default: elog(ERROR, "unexpected operation: %d", (int) operation); break; } /* * Update the operation and target relation info. */ fscan->operation = operation; fscan->resultRelation = resultRelation; /* * Update the fdw_exprs list that will be available to the executor. */ fscan->fdw_exprs = params_list; /* * Update the fdw_private list that will be available to the executor. * Items in the list must match enum FdwDirectModifyPrivateIndex, above. */ fscan->fdw_private = list_make4(makeString(sql.data), makeBoolean((retrieved_attrs != NIL)), retrieved_attrs, makeBoolean(plan->canSetTag)); /* * Update the foreign-join-related fields. */ if (fscan->scan.scanrelid == 0) { /* No need for the outer subplan. */ fscan->scan.plan.lefttree = NULL; /* Build new fdw_scan_tlist if UPDATE/DELETE .. RETURNING. */ if (returningList) rebuild_fdw_scan_tlist(fscan, returningList); } /* * Finally, unset the async-capable flag if it is set, as we currently * don't support asynchronous execution of direct modifications. */ if (fscan->scan.plan.async_capable) fscan->scan.plan.async_capable = false; table_close(rel, NoLock); return true; } /* * postgresBeginDirectModify * Prepare a direct foreign table modification */ static void postgresBeginDirectModify(ForeignScanState *node, int eflags) { ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan; EState *estate = node->ss.ps.state; PgFdwDirectModifyState *dmstate; Index rtindex; RangeTblEntry *rte; Oid userid; ForeignTable *table; UserMapping *user; int numParams; /* * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL. */ if (eflags & EXEC_FLAG_EXPLAIN_ONLY) return; /* * We'll save private state in node->fdw_state. */ dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState)); node->fdw_state = (void *) dmstate; /* * Identify which user to do the remote access as. This should match what * ExecCheckRTEPerms() does. */ rtindex = node->resultRelInfo->ri_RangeTableIndex; rte = exec_rt_fetch(rtindex, estate); userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); /* Get info about foreign table. */ if (fsplan->scan.scanrelid == 0) dmstate->rel = ExecOpenScanRelation(estate, rtindex, eflags); else dmstate->rel = node->ss.ss_currentRelation; table = GetForeignTable(RelationGetRelid(dmstate->rel)); user = GetUserMapping(userid, table->serverid); /* * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ dmstate->conn = GetConnection(user, false, &dmstate->conn_state); /* Update the foreign-join-related fields. */ if (fsplan->scan.scanrelid == 0) { /* Save info about foreign table. */ dmstate->resultRel = dmstate->rel; /* * Set dmstate->rel to NULL to teach get_returning_data() and * make_tuple_from_result_row() that columns fetched from the remote * server are described by fdw_scan_tlist of the foreign-scan plan * node, not the tuple descriptor for the target relation. */ dmstate->rel = NULL; } /* Initialize state variable */ dmstate->num_tuples = -1; /* -1 means not set yet */ /* Get private info created by planner functions. */ dmstate->query = strVal(list_nth(fsplan->fdw_private, FdwDirectModifyPrivateUpdateSql)); dmstate->has_returning = boolVal(list_nth(fsplan->fdw_private, FdwDirectModifyPrivateHasReturning)); dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private, FdwDirectModifyPrivateRetrievedAttrs); dmstate->set_processed = boolVal(list_nth(fsplan->fdw_private, FdwDirectModifyPrivateSetProcessed)); /* Create context for per-tuple temp workspace. */ dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, "postgres_fdw temporary data", ALLOCSET_SMALL_SIZES); /* Prepare for input conversion of RETURNING results. */ if (dmstate->has_returning) { TupleDesc tupdesc; if (fsplan->scan.scanrelid == 0) tupdesc = get_tupdesc_for_join_scan_tuples(node); else tupdesc = RelationGetDescr(dmstate->rel); dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc); /* * When performing an UPDATE/DELETE .. RETURNING on a join directly, * initialize a filter to extract an updated/deleted tuple from a scan * tuple. */ if (fsplan->scan.scanrelid == 0) init_returning_filter(dmstate, fsplan->fdw_scan_tlist, rtindex); } /* * Prepare for processing of parameters used in remote query, if any. */ numParams = list_length(fsplan->fdw_exprs); dmstate->numParams = numParams; if (numParams > 0) prepare_query_params((PlanState *) node, fsplan->fdw_exprs, numParams, &dmstate->param_flinfo, &dmstate->param_exprs, &dmstate->param_values); } /* * postgresIterateDirectModify * Execute a direct foreign table modification */ static TupleTableSlot * postgresIterateDirectModify(ForeignScanState *node) { PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state; EState *estate = node->ss.ps.state; ResultRelInfo *resultRelInfo = node->resultRelInfo; /* * If this is the first call after Begin, execute the statement. */ if (dmstate->num_tuples == -1) execute_dml_stmt(node); /* * If the local query doesn't specify RETURNING, just clear tuple slot. */ if (!resultRelInfo->ri_projectReturning) { TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; Instrumentation *instr = node->ss.ps.instrument; Assert(!dmstate->has_returning); /* Increment the command es_processed count if necessary. */ if (dmstate->set_processed) estate->es_processed += dmstate->num_tuples; /* Increment the tuple count for EXPLAIN ANALYZE if necessary. */ if (instr) instr->tuplecount += dmstate->num_tuples; return ExecClearTuple(slot); } /* * Get the next RETURNING tuple. */ return get_returning_data(node); } /* * postgresEndDirectModify * Finish a direct foreign table modification */ static void postgresEndDirectModify(ForeignScanState *node) { PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state; /* if dmstate is NULL, we are in EXPLAIN; nothing to do */ if (dmstate == NULL) return; /* Release PGresult */ if (dmstate->result) PQclear(dmstate->result); /* Release remote connection */ ReleaseConnection(dmstate->conn); dmstate->conn = NULL; /* MemoryContext will be deleted automatically. */ } /* * postgresExplainForeignScan * Produce extra output for EXPLAIN of a ForeignScan on a foreign table */ static void postgresExplainForeignScan(ForeignScanState *node, ExplainState *es) { ForeignScan *plan = castNode(ForeignScan, node->ss.ps.plan); List *fdw_private = plan->fdw_private; /* * Identify foreign scans that are really joins or upper relations. The * input looks something like "(1) LEFT JOIN (2)", and we must replace the * digit string(s), which are RT indexes, with the correct relation names. * We do that here, not when the plan is created, because we can't know * what aliases ruleutils.c will assign at plan creation time. */ if (list_length(fdw_private) > FdwScanPrivateRelations) { StringInfo relations; char *rawrelations; char *ptr; int minrti, rtoffset; rawrelations = strVal(list_nth(fdw_private, FdwScanPrivateRelations)); /* * A difficulty with using a string representation of RT indexes is * that setrefs.c won't update the string when flattening the * rangetable. To find out what rtoffset was applied, identify the * minimum RT index appearing in the string and compare it to the * minimum member of plan->fs_relids. (We expect all the relids in * the join will have been offset by the same amount; the Asserts * below should catch it if that ever changes.) */ minrti = INT_MAX; ptr = rawrelations; while (*ptr) { if (isdigit((unsigned char) *ptr)) { int rti = strtol(ptr, &ptr, 10); if (rti < minrti) minrti = rti; } else ptr++; } rtoffset = bms_next_member(plan->fs_relids, -1) - minrti; /* Now we can translate the string */ relations = makeStringInfo(); ptr = rawrelations; while (*ptr) { if (isdigit((unsigned char) *ptr)) { int rti = strtol(ptr, &ptr, 10); RangeTblEntry *rte; char *relname; char *refname; rti += rtoffset; Assert(bms_is_member(rti, plan->fs_relids)); rte = rt_fetch(rti, es->rtable); Assert(rte->rtekind == RTE_RELATION); /* This logic should agree with explain.c's ExplainTargetRel */ relname = get_rel_name(rte->relid); if (es->verbose) { char *namespace; namespace = get_namespace_name_or_temp(get_rel_namespace(rte->relid)); appendStringInfo(relations, "%s.%s", quote_identifier(namespace), quote_identifier(relname)); } else appendStringInfoString(relations, quote_identifier(relname)); refname = (char *) list_nth(es->rtable_names, rti - 1); if (refname == NULL) refname = rte->eref->aliasname; if (strcmp(refname, relname) != 0) appendStringInfo(relations, " %s", quote_identifier(refname)); } else appendStringInfoChar(relations, *ptr++); } ExplainPropertyText("Relations", relations->data, es); } /* * Add remote query, when VERBOSE option is specified. */ if (es->verbose) { char *sql; sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql)); ExplainPropertyText("Remote SQL", sql, es); } } /* * postgresExplainForeignModify * Produce extra output for EXPLAIN of a ModifyTable on a foreign table */ static void postgresExplainForeignModify(ModifyTableState *mtstate, ResultRelInfo *rinfo, List *fdw_private, int subplan_index, ExplainState *es) { if (es->verbose) { char *sql = strVal(list_nth(fdw_private, FdwModifyPrivateUpdateSql)); ExplainPropertyText("Remote SQL", sql, es); /* * For INSERT we should always have batch size >= 1, but UPDATE and * DELETE don't support batching so don't show the property. */ if (rinfo->ri_BatchSize > 0) ExplainPropertyInteger("Batch Size", NULL, rinfo->ri_BatchSize, es); } } /* * postgresExplainDirectModify * Produce extra output for EXPLAIN of a ForeignScan that modifies a * foreign table directly */ static void postgresExplainDirectModify(ForeignScanState *node, ExplainState *es) { List *fdw_private; char *sql; if (es->verbose) { fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private; sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql)); ExplainPropertyText("Remote SQL", sql, es); } } /* * postgresExecForeignTruncate * Truncate one or more foreign tables */ static void postgresExecForeignTruncate(List *rels, DropBehavior behavior, bool restart_seqs) { Oid serverid = InvalidOid; UserMapping *user = NULL; PGconn *conn = NULL; StringInfoData sql; ListCell *lc; bool server_truncatable = true; /* * By default, all postgres_fdw foreign tables are assumed truncatable. * This can be overridden by a per-server setting, which in turn can be * overridden by a per-table setting. */ foreach(lc, rels) { ForeignServer *server = NULL; Relation rel = lfirst(lc); ForeignTable *table = GetForeignTable(RelationGetRelid(rel)); ListCell *cell; bool truncatable; /* * First time through, determine whether the foreign server allows * truncates. Since all specified foreign tables are assumed to belong * to the same foreign server, this result can be used for other * foreign tables. */ if (!OidIsValid(serverid)) { serverid = table->serverid; server = GetForeignServer(serverid); foreach(cell, server->options) { DefElem *defel = (DefElem *) lfirst(cell); if (strcmp(defel->defname, "truncatable") == 0) { server_truncatable = defGetBoolean(defel); break; } } } /* * Confirm that all specified foreign tables belong to the same * foreign server. */ Assert(table->serverid == serverid); /* Determine whether this foreign table allows truncations */ truncatable = server_truncatable; foreach(cell, table->options) { DefElem *defel = (DefElem *) lfirst(cell); if (strcmp(defel->defname, "truncatable") == 0) { truncatable = defGetBoolean(defel); break; } } if (!truncatable) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("foreign table \"%s\" does not allow truncates", RelationGetRelationName(rel)))); } Assert(OidIsValid(serverid)); /* * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ user = GetUserMapping(GetUserId(), serverid); conn = GetConnection(user, false, NULL); /* Construct the TRUNCATE command string */ initStringInfo(&sql); deparseTruncateSql(&sql, rels, behavior, restart_seqs); /* Issue the TRUNCATE command to remote server */ do_sql_command(conn, sql.data); pfree(sql.data); } /* * estimate_path_cost_size * Get cost and size estimates for a foreign scan on given foreign relation * either a base relation or a join between foreign relations or an upper * relation containing foreign relations. * * param_join_conds are the parameterization clauses with outer relations. * pathkeys specify the expected sort order if any for given path being costed. * fpextra specifies additional post-scan/join-processing steps such as the * final sort and the LIMIT restriction. * * The function returns the cost and size estimates in p_rows, p_width, * p_startup_cost and p_total_cost variables. */ static void estimate_path_cost_size(PlannerInfo *root, RelOptInfo *foreignrel, List *param_join_conds, List *pathkeys, PgFdwPathExtraData *fpextra, double *p_rows, int *p_width, Cost *p_startup_cost, Cost *p_total_cost) { PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private; double rows; double retrieved_rows; int width; Cost startup_cost; Cost total_cost; /* Make sure the core code has set up the relation's reltarget */ Assert(foreignrel->reltarget); /* * If the table or the server is configured to use remote estimates, * connect to the foreign server and execute EXPLAIN to estimate the * number of rows selected by the restriction+join clauses. Otherwise, * estimate rows using whatever statistics we have locally, in a way * similar to ordinary tables. */ if (fpinfo->use_remote_estimate) { List *remote_param_join_conds; List *local_param_join_conds; StringInfoData sql; PGconn *conn; Selectivity local_sel; QualCost local_cost; List *fdw_scan_tlist = NIL; List *remote_conds; /* Required only to be passed to deparseSelectStmtForRel */ List *retrieved_attrs; /* * param_join_conds might contain both clauses that are safe to send * across, and clauses that aren't. */ classifyConditions(root, foreignrel, param_join_conds, &remote_param_join_conds, &local_param_join_conds); /* Build the list of columns to be fetched from the foreign server. */ if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel)) fdw_scan_tlist = build_tlist_to_deparse(foreignrel); else fdw_scan_tlist = NIL; /* * The complete list of remote conditions includes everything from * baserestrictinfo plus any extra join_conds relevant to this * particular path. */ remote_conds = list_concat(remote_param_join_conds, fpinfo->remote_conds); /* * Construct EXPLAIN query including the desired SELECT, FROM, and * WHERE clauses. Params and other-relation Vars are replaced by dummy * values, so don't request params_list. */ initStringInfo(&sql); appendStringInfoString(&sql, "EXPLAIN "); deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist, remote_conds, pathkeys, fpextra ? fpextra->has_final_sort : false, fpextra ? fpextra->has_limit : false, false, &retrieved_attrs, NULL); /* Get the remote estimate */ conn = GetConnection(fpinfo->user, false, NULL); get_remote_estimate(sql.data, conn, &rows, &width, &startup_cost, &total_cost); ReleaseConnection(conn); retrieved_rows = rows; /* Factor in the selectivity of the locally-checked quals */ local_sel = clauselist_selectivity(root, local_param_join_conds, foreignrel->relid, JOIN_INNER, NULL); local_sel *= fpinfo->local_conds_sel; rows = clamp_row_est(rows * local_sel); /* Add in the eval cost of the locally-checked quals */ startup_cost += fpinfo->local_conds_cost.startup; total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows; cost_qual_eval(&local_cost, local_param_join_conds, root); startup_cost += local_cost.startup; total_cost += local_cost.per_tuple * retrieved_rows; /* * Add in tlist eval cost for each output row. In case of an * aggregate, some of the tlist expressions such as grouping * expressions will be evaluated remotely, so adjust the costs. */ startup_cost += foreignrel->reltarget->cost.startup; total_cost += foreignrel->reltarget->cost.startup; total_cost += foreignrel->reltarget->cost.per_tuple * rows; if (IS_UPPER_REL(foreignrel)) { QualCost tlist_cost; cost_qual_eval(&tlist_cost, fdw_scan_tlist, root); startup_cost -= tlist_cost.startup; total_cost -= tlist_cost.startup; total_cost -= tlist_cost.per_tuple * rows; } } else { Cost run_cost = 0; /* * We don't support join conditions in this mode (hence, no * parameterized paths can be made). */ Assert(param_join_conds == NIL); /* * We will come here again and again with different set of pathkeys or * additional post-scan/join-processing steps that caller wants to * cost. We don't need to calculate the cost/size estimates for the * underlying scan, join, or grouping each time. Instead, use those * estimates if we have cached them already. */ if (fpinfo->rel_startup_cost >= 0 && fpinfo->rel_total_cost >= 0) { Assert(fpinfo->retrieved_rows >= 0); rows = fpinfo->rows; retrieved_rows = fpinfo->retrieved_rows; width = fpinfo->width; startup_cost = fpinfo->rel_startup_cost; run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost; /* * If we estimate the costs of a foreign scan or a foreign join * with additional post-scan/join-processing steps, the scan or * join costs obtained from the cache wouldn't yet contain the * eval costs for the final scan/join target, which would've been * updated by apply_scanjoin_target_to_paths(); add the eval costs * now. */ if (fpextra && !IS_UPPER_REL(foreignrel)) { /* Shouldn't get here unless we have LIMIT */ Assert(fpextra->has_limit); Assert(foreignrel->reloptkind == RELOPT_BASEREL || foreignrel->reloptkind == RELOPT_JOINREL); startup_cost += foreignrel->reltarget->cost.startup; run_cost += foreignrel->reltarget->cost.per_tuple * rows; } } else if (IS_JOIN_REL(foreignrel)) { PgFdwRelationInfo *fpinfo_i; PgFdwRelationInfo *fpinfo_o; QualCost join_cost; QualCost remote_conds_cost; double nrows; /* Use rows/width estimates made by the core code. */ rows = foreignrel->rows; width = foreignrel->reltarget->width; /* For join we expect inner and outer relations set */ Assert(fpinfo->innerrel && fpinfo->outerrel); fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private; fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private; /* Estimate of number of rows in cross product */ nrows = fpinfo_i->rows * fpinfo_o->rows; /* * Back into an estimate of the number of retrieved rows. Just in * case this is nuts, clamp to at most nrows. */ retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel); retrieved_rows = Min(retrieved_rows, nrows); /* * The cost of foreign join is estimated as cost of generating * rows for the joining relations + cost for applying quals on the * rows. */ /* * Calculate the cost of clauses pushed down to the foreign server */ cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root); /* Calculate the cost of applying join clauses */ cost_qual_eval(&join_cost, fpinfo->joinclauses, root); /* * Startup cost includes startup cost of joining relations and the * startup cost for join and other clauses. We do not include the * startup cost specific to join strategy (e.g. setting up hash * tables) since we do not know what strategy the foreign server * is going to use. */ startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost; startup_cost += join_cost.startup; startup_cost += remote_conds_cost.startup; startup_cost += fpinfo->local_conds_cost.startup; /* * Run time cost includes: * * 1. Run time cost (total_cost - startup_cost) of relations being * joined * * 2. Run time cost of applying join clauses on the cross product * of the joining relations. * * 3. Run time cost of applying pushed down other clauses on the * result of join * * 4. Run time cost of applying nonpushable other clauses locally * on the result fetched from the foreign server. */ run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost; run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost; run_cost += nrows * join_cost.per_tuple; nrows = clamp_row_est(nrows * fpinfo->joinclause_sel); run_cost += nrows * remote_conds_cost.per_tuple; run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows; /* Add in tlist eval cost for each output row */ startup_cost += foreignrel->reltarget->cost.startup; run_cost += foreignrel->reltarget->cost.per_tuple * rows; } else if (IS_UPPER_REL(foreignrel)) { RelOptInfo *outerrel = fpinfo->outerrel; PgFdwRelationInfo *ofpinfo; AggClauseCosts aggcosts; double input_rows; int numGroupCols; double numGroups = 1; /* The upper relation should have its outer relation set */ Assert(outerrel); /* and that outer relation should have its reltarget set */ Assert(outerrel->reltarget); /* * This cost model is mixture of costing done for sorted and * hashed aggregates in cost_agg(). We are not sure which * strategy will be considered at remote side, thus for * simplicity, we put all startup related costs in startup_cost * and all finalization and run cost are added in total_cost. */ ofpinfo = (PgFdwRelationInfo *) outerrel->fdw_private; /* Get rows from input rel */ input_rows = ofpinfo->rows; /* Collect statistics about aggregates for estimating costs. */ MemSet(&aggcosts, 0, sizeof(AggClauseCosts)); if (root->parse->hasAggs) { get_agg_clause_costs(root, AGGSPLIT_SIMPLE, &aggcosts); } /* Get number of grouping columns and possible number of groups */ numGroupCols = list_length(root->parse->groupClause); numGroups = estimate_num_groups(root, get_sortgrouplist_exprs(root->parse->groupClause, fpinfo->grouped_tlist), input_rows, NULL, NULL); /* * Get the retrieved_rows and rows estimates. If there are HAVING * quals, account for their selectivity. */ if (root->parse->havingQual) { /* Factor in the selectivity of the remotely-checked quals */ retrieved_rows = clamp_row_est(numGroups * clauselist_selectivity(root, fpinfo->remote_conds, 0, JOIN_INNER, NULL)); /* Factor in the selectivity of the locally-checked quals */ rows = clamp_row_est(retrieved_rows * fpinfo->local_conds_sel); } else { rows = retrieved_rows = numGroups; } /* Use width estimate made by the core code. */ width = foreignrel->reltarget->width; /*----- * Startup cost includes: * 1. Startup cost for underneath input relation, adjusted for * tlist replacement by apply_scanjoin_target_to_paths() * 2. Cost of performing aggregation, per cost_agg() *----- */ startup_cost = ofpinfo->rel_startup_cost; startup_cost += outerrel->reltarget->cost.startup; startup_cost += aggcosts.transCost.startup; startup_cost += aggcosts.transCost.per_tuple * input_rows; startup_cost += aggcosts.finalCost.startup; startup_cost += (cpu_operator_cost * numGroupCols) * input_rows; /*----- * Run time cost includes: * 1. Run time cost of underneath input relation, adjusted for * tlist replacement by apply_scanjoin_target_to_paths() * 2. Run time cost of performing aggregation, per cost_agg() *----- */ run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost; run_cost += outerrel->reltarget->cost.per_tuple * input_rows; run_cost += aggcosts.finalCost.per_tuple * numGroups; run_cost += cpu_tuple_cost * numGroups; /* Account for the eval cost of HAVING quals, if any */ if (root->parse->havingQual) { QualCost remote_cost; /* Add in the eval cost of the remotely-checked quals */ cost_qual_eval(&remote_cost, fpinfo->remote_conds, root); startup_cost += remote_cost.startup; run_cost += remote_cost.per_tuple * numGroups; /* Add in the eval cost of the locally-checked quals */ startup_cost += fpinfo->local_conds_cost.startup; run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows; } /* Add in tlist eval cost for each output row */ startup_cost += foreignrel->reltarget->cost.startup; run_cost += foreignrel->reltarget->cost.per_tuple * rows; } else { Cost cpu_per_tuple; /* Use rows/width estimates made by set_baserel_size_estimates. */ rows = foreignrel->rows; width = foreignrel->reltarget->width; /* * Back into an estimate of the number of retrieved rows. Just in * case this is nuts, clamp to at most foreignrel->tuples. */ retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel); retrieved_rows = Min(retrieved_rows, foreignrel->tuples); /* * Cost as though this were a seqscan, which is pessimistic. We * effectively imagine the local_conds are being evaluated * remotely, too. */ startup_cost = 0; run_cost = 0; run_cost += seq_page_cost * foreignrel->pages; startup_cost += foreignrel->baserestrictcost.startup; cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple; run_cost += cpu_per_tuple * foreignrel->tuples; /* Add in tlist eval cost for each output row */ startup_cost += foreignrel->reltarget->cost.startup; run_cost += foreignrel->reltarget->cost.per_tuple * rows; } /* * Without remote estimates, we have no real way to estimate the cost * of generating sorted output. It could be free if the query plan * the remote side would have chosen generates properly-sorted output * anyway, but in most cases it will cost something. Estimate a value * high enough that we won't pick the sorted path when the ordering * isn't locally useful, but low enough that we'll err on the side of * pushing down the ORDER BY clause when it's useful to do so. */ if (pathkeys != NIL) { if (IS_UPPER_REL(foreignrel)) { Assert(foreignrel->reloptkind == RELOPT_UPPER_REL && fpinfo->stage == UPPERREL_GROUP_AGG); adjust_foreign_grouping_path_cost(root, pathkeys, retrieved_rows, width, fpextra->limit_tuples, &startup_cost, &run_cost); } else { startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER; run_cost *= DEFAULT_FDW_SORT_MULTIPLIER; } } total_cost = startup_cost + run_cost; /* Adjust the cost estimates if we have LIMIT */ if (fpextra && fpextra->has_limit) { adjust_limit_rows_costs(&rows, &startup_cost, &total_cost, fpextra->offset_est, fpextra->count_est); retrieved_rows = rows; } } /* * If this includes the final sort step, the given target, which will be * applied to the resulting path, might have different expressions from * the foreignrel's reltarget (see make_sort_input_target()); adjust tlist * eval costs. */ if (fpextra && fpextra->has_final_sort && fpextra->target != foreignrel->reltarget) { QualCost oldcost = foreignrel->reltarget->cost; QualCost newcost = fpextra->target->cost; startup_cost += newcost.startup - oldcost.startup; total_cost += newcost.startup - oldcost.startup; total_cost += (newcost.per_tuple - oldcost.per_tuple) * rows; } /* * Cache the retrieved rows and cost estimates for scans, joins, or * groupings without any parameterization, pathkeys, or additional * post-scan/join-processing steps, before adding the costs for * transferring data from the foreign server. These estimates are useful * for costing remote joins involving this relation or costing other * remote operations on this relation such as remote sorts and remote * LIMIT restrictions, when the costs can not be obtained from the foreign * server. This function will be called at least once for every foreign * relation without any parameterization, pathkeys, or additional * post-scan/join-processing steps. */ if (pathkeys == NIL && param_join_conds == NIL && fpextra == NULL) { fpinfo->retrieved_rows = retrieved_rows; fpinfo->rel_startup_cost = startup_cost; fpinfo->rel_total_cost = total_cost; } /* * Add some additional cost factors to account for connection overhead * (fdw_startup_cost), transferring data across the network * (fdw_tuple_cost per retrieved row), and local manipulation of the data * (cpu_tuple_cost per retrieved row). */ startup_cost += fpinfo->fdw_startup_cost; total_cost += fpinfo->fdw_startup_cost; total_cost += fpinfo->fdw_tuple_cost * retrieved_rows; total_cost += cpu_tuple_cost * retrieved_rows; /* * If we have LIMIT, we should prefer performing the restriction remotely * rather than locally, as the former avoids extra row fetches from the * remote that the latter might cause. But since the core code doesn't * account for such fetches when estimating the costs of the local * restriction (see create_limit_path()), there would be no difference * between the costs of the local restriction and the costs of the remote * restriction estimated above if we don't use remote estimates (except * for the case where the foreignrel is a grouping relation, the given * pathkeys is not NIL, and the effects of a bounded sort for that rel is * accounted for in costing the remote restriction). Tweak the costs of * the remote restriction to ensure we'll prefer it if LIMIT is a useful * one. */ if (!fpinfo->use_remote_estimate && fpextra && fpextra->has_limit && fpextra->limit_tuples > 0 && fpextra->limit_tuples < fpinfo->rows) { Assert(fpinfo->rows > 0); total_cost -= (total_cost - startup_cost) * 0.05 * (fpinfo->rows - fpextra->limit_tuples) / fpinfo->rows; } /* Return results. */ *p_rows = rows; *p_width = width; *p_startup_cost = startup_cost; *p_total_cost = total_cost; } /* * Estimate costs of executing a SQL statement remotely. * The given "sql" must be an EXPLAIN command. */ static void get_remote_estimate(const char *sql, PGconn *conn, double *rows, int *width, Cost *startup_cost, Cost *total_cost) { PGresult *volatile res = NULL; /* PGresult must be released before leaving this function. */ PG_TRY(); { char *line; char *p; int n; /* * Execute EXPLAIN remotely. */ res = pgfdw_exec_query(conn, sql, NULL); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql); /* * Extract cost numbers for topmost plan node. Note we search for a * left paren from the end of the line to avoid being confused by * other uses of parentheses. */ line = PQgetvalue(res, 0, 0); p = strrchr(line, '('); if (p == NULL) elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line); n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)", startup_cost, total_cost, rows, width); if (n != 4) elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line); } PG_FINALLY(); { if (res) PQclear(res); } PG_END_TRY(); } /* * Adjust the cost estimates of a foreign grouping path to include the cost of * generating properly-sorted output. */ static void adjust_foreign_grouping_path_cost(PlannerInfo *root, List *pathkeys, double retrieved_rows, double width, double limit_tuples, Cost *p_startup_cost, Cost *p_run_cost) { /* * If the GROUP BY clause isn't sort-able, the plan chosen by the remote * side is unlikely to generate properly-sorted output, so it would need * an explicit sort; adjust the given costs with cost_sort(). Likewise, * if the GROUP BY clause is sort-able but isn't a superset of the given * pathkeys, adjust the costs with that function. Otherwise, adjust the * costs by applying the same heuristic as for the scan or join case. */ if (!grouping_is_sortable(root->parse->groupClause) || !pathkeys_contained_in(pathkeys, root->group_pathkeys)) { Path sort_path; /* dummy for result of cost_sort */ cost_sort(&sort_path, root, pathkeys, *p_startup_cost + *p_run_cost, retrieved_rows, width, 0.0, work_mem, limit_tuples); *p_startup_cost = sort_path.startup_cost; *p_run_cost = sort_path.total_cost - sort_path.startup_cost; } else { /* * The default extra cost seems too large for foreign-grouping cases; * add 1/4th of that default. */ double sort_multiplier = 1.0 + (DEFAULT_FDW_SORT_MULTIPLIER - 1.0) * 0.25; *p_startup_cost *= sort_multiplier; *p_run_cost *= sort_multiplier; } } /* * Detect whether we want to process an EquivalenceClass member. * * This is a callback for use by generate_implied_equalities_for_column. */ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, EquivalenceClass *ec, EquivalenceMember *em, void *arg) { ec_member_foreign_arg *state = (ec_member_foreign_arg *) arg; Expr *expr = em->em_expr; /* * If we've identified what we're processing in the current scan, we only * want to match that expression. */ if (state->current != NULL) return equal(expr, state->current); /* * Otherwise, ignore anything we've already processed. */ if (list_member(state->already_used, expr)) return false; /* This is the new target to process. */ state->current = expr; return true; } /* * Create cursor for node's query with current parameter values. */ static void create_cursor(ForeignScanState *node) { PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; ExprContext *econtext = node->ss.ps.ps_ExprContext; int numParams = fsstate->numParams; const char **values = fsstate->param_values; PGconn *conn = fsstate->conn; StringInfoData buf; PGresult *res; /* First, process a pending asynchronous request, if any. */ if (fsstate->conn_state->pendingAreq) process_pending_request(fsstate->conn_state->pendingAreq); /* * Construct array of query parameter values in text format. We do the * conversions in the short-lived per-tuple context, so as not to cause a * memory leak over repeated scans. */ if (numParams > 0) { MemoryContext oldcontext; oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); process_query_params(econtext, fsstate->param_flinfo, fsstate->param_exprs, values); MemoryContextSwitchTo(oldcontext); } /* Construct the DECLARE CURSOR command */ initStringInfo(&buf); appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s", fsstate->cursor_number, fsstate->query); /* * Notice that we pass NULL for paramTypes, thus forcing the remote server * to infer types for all parameters. Since we explicitly cast every * parameter (see deparse.c), the "inference" is trivial and will produce * the desired result. This allows us to avoid assuming that the remote * server has the same OIDs we do for the parameters' types. */ if (!PQsendQueryParams(conn, buf.data, numParams, NULL, values, NULL, NULL, 0)) pgfdw_report_error(ERROR, NULL, conn, false, buf.data); /* * Get the result, and check for success. * * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ res = pgfdw_get_result(conn, buf.data); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, true, fsstate->query); PQclear(res); /* Mark the cursor as created, and show no tuples have been retrieved */ fsstate->cursor_exists = true; fsstate->tuples = NULL; fsstate->num_tuples = 0; fsstate->next_tuple = 0; fsstate->fetch_ct_2 = 0; fsstate->eof_reached = false; /* Clean up */ pfree(buf.data); } /* * Fetch some more rows from the node's cursor. */ static void fetch_more_data(ForeignScanState *node) { PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; PGresult *volatile res = NULL; MemoryContext oldcontext; /* * We'll store the tuples in the batch_cxt. First, flush the previous * batch. */ fsstate->tuples = NULL; MemoryContextReset(fsstate->batch_cxt); oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt); /* PGresult must be released before leaving this function. */ PG_TRY(); { PGconn *conn = fsstate->conn; int numrows; int i; if (fsstate->async_capable) { Assert(fsstate->conn_state->pendingAreq); /* * The query was already sent by an earlier call to * fetch_more_data_begin. So now we just fetch the result. */ res = pgfdw_get_result(conn, fsstate->query); /* On error, report the original query, not the FETCH. */ if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, fsstate->query); /* Reset per-connection state */ fsstate->conn_state->pendingAreq = NULL; } else { char sql[64]; /* This is a regular synchronous fetch. */ snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", fsstate->fetch_size, fsstate->cursor_number); res = pgfdw_exec_query(conn, sql, fsstate->conn_state); /* On error, report the original query, not the FETCH. */ if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, fsstate->query); } /* Convert the data into HeapTuples */ numrows = PQntuples(res); fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple)); fsstate->num_tuples = numrows; fsstate->next_tuple = 0; for (i = 0; i < numrows; i++) { Assert(IsA(node->ss.ps.plan, ForeignScan)); fsstate->tuples[i] = make_tuple_from_result_row(res, i, fsstate->rel, fsstate->attinmeta, fsstate->retrieved_attrs, node, fsstate->temp_cxt); } /* Update fetch_ct_2 */ if (fsstate->fetch_ct_2 < 2) fsstate->fetch_ct_2++; /* Must be EOF if we didn't get as many tuples as we asked for. */ fsstate->eof_reached = (numrows < fsstate->fetch_size); } PG_FINALLY(); { if (res) PQclear(res); } PG_END_TRY(); MemoryContextSwitchTo(oldcontext); } /* * Force assorted GUC parameters to settings that ensure that we'll output * data values in a form that is unambiguous to the remote server. * * This is rather expensive and annoying to do once per row, but there's * little choice if we want to be sure values are transmitted accurately; * we can't leave the settings in place between rows for fear of affecting * user-visible computations. * * We use the equivalent of a function SET option to allow the settings to * persist only until the caller calls reset_transmission_modes(). If an * error is thrown in between, guc.c will take care of undoing the settings. * * The return value is the nestlevel that must be passed to * reset_transmission_modes() to undo things. */ int set_transmission_modes(void) { int nestlevel = NewGUCNestLevel(); /* * The values set here should match what pg_dump does. See also * configure_remote_session in connection.c. */ if (DateStyle != USE_ISO_DATES) (void) set_config_option("datestyle", "ISO", PGC_USERSET, PGC_S_SESSION, GUC_ACTION_SAVE, true, 0, false); if (IntervalStyle != INTSTYLE_POSTGRES) (void) set_config_option("intervalstyle", "postgres", PGC_USERSET, PGC_S_SESSION, GUC_ACTION_SAVE, true, 0, false); if (extra_float_digits < 3) (void) set_config_option("extra_float_digits", "3", PGC_USERSET, PGC_S_SESSION, GUC_ACTION_SAVE, true, 0, false); /* * In addition force restrictive search_path, in case there are any * regproc or similar constants to be printed. */ (void) set_config_option("search_path", "pg_catalog", PGC_USERSET, PGC_S_SESSION, GUC_ACTION_SAVE, true, 0, false); return nestlevel; } /* * Undo the effects of set_transmission_modes(). */ void reset_transmission_modes(int nestlevel) { AtEOXact_GUC(true, nestlevel); } /* * Utility routine to close a cursor. */ static void close_cursor(PGconn *conn, unsigned int cursor_number, PgFdwConnState *conn_state) { char sql[64]; PGresult *res; snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number); /* * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ res = pgfdw_exec_query(conn, sql, conn_state); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, true, sql); PQclear(res); } /* * create_foreign_modify * Construct an execution state of a foreign insert/update/delete * operation */ static PgFdwModifyState * create_foreign_modify(EState *estate, RangeTblEntry *rte, ResultRelInfo *resultRelInfo, CmdType operation, Plan *subplan, char *query, List *target_attrs, int values_end, bool has_returning, List *retrieved_attrs) { PgFdwModifyState *fmstate; Relation rel = resultRelInfo->ri_RelationDesc; TupleDesc tupdesc = RelationGetDescr(rel); Oid userid; ForeignTable *table; UserMapping *user; AttrNumber n_params; Oid typefnoid; bool isvarlena; ListCell *lc; /* Begin constructing PgFdwModifyState. */ fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState)); fmstate->rel = rel; /* * Identify which user to do the remote access as. This should match what * ExecCheckRTEPerms() does. */ userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); /* Get info about foreign table. */ table = GetForeignTable(RelationGetRelid(rel)); user = GetUserMapping(userid, table->serverid); /* Open connection; report that we'll create a prepared statement. */ fmstate->conn = GetConnection(user, true, &fmstate->conn_state); fmstate->p_name = NULL; /* prepared statement not made yet */ /* Set up remote query information. */ fmstate->query = query; if (operation == CMD_INSERT) { fmstate->query = pstrdup(fmstate->query); fmstate->orig_query = pstrdup(fmstate->query); } fmstate->target_attrs = target_attrs; fmstate->values_end = values_end; fmstate->has_returning = has_returning; fmstate->retrieved_attrs = retrieved_attrs; /* Create context for per-tuple temp workspace. */ fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, "postgres_fdw temporary data", ALLOCSET_SMALL_SIZES); /* Prepare for input conversion of RETURNING results. */ if (fmstate->has_returning) fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc); /* Prepare for output conversion of parameters used in prepared stmt. */ n_params = list_length(fmstate->target_attrs) + 1; fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params); fmstate->p_nums = 0; if (operation == CMD_UPDATE || operation == CMD_DELETE) { Assert(subplan != NULL); /* Find the ctid resjunk column in the subplan's result */ fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist, "ctid"); if (!AttributeNumberIsValid(fmstate->ctidAttno)) elog(ERROR, "could not find junk ctid column"); /* First transmittable parameter will be ctid */ getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena); fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); fmstate->p_nums++; } if (operation == CMD_INSERT || operation == CMD_UPDATE) { /* Set up for remaining transmittable parameters */ foreach(lc, fmstate->target_attrs) { int attnum = lfirst_int(lc); Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); Assert(!attr->attisdropped); /* Ignore generated columns; they are set to DEFAULT */ if (attr->attgenerated) continue; getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena); fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); fmstate->p_nums++; } } Assert(fmstate->p_nums <= n_params); /* Set batch_size from foreign server/table options. */ if (operation == CMD_INSERT) fmstate->batch_size = get_batch_size_option(rel); fmstate->num_slots = 1; /* Initialize auxiliary state */ fmstate->aux_fmstate = NULL; return fmstate; } /* * execute_foreign_modify * Perform foreign-table modification as required, and fetch RETURNING * result if any. (This is the shared guts of postgresExecForeignInsert, * postgresExecForeignBatchInsert, postgresExecForeignUpdate, and * postgresExecForeignDelete.) */ static TupleTableSlot ** execute_foreign_modify(EState *estate, ResultRelInfo *resultRelInfo, CmdType operation, TupleTableSlot **slots, TupleTableSlot **planSlots, int *numSlots) { PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; ItemPointer ctid = NULL; const char **p_values; PGresult *res; int n_rows; StringInfoData sql; /* The operation should be INSERT, UPDATE, or DELETE */ Assert(operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE); /* First, process a pending asynchronous request, if any. */ if (fmstate->conn_state->pendingAreq) process_pending_request(fmstate->conn_state->pendingAreq); /* * If the existing query was deparsed and prepared for a different number * of rows, rebuild it for the proper number. */ if (operation == CMD_INSERT && fmstate->num_slots != *numSlots) { /* Destroy the prepared statement created previously */ if (fmstate->p_name) deallocate_query(fmstate); /* Build INSERT string with numSlots records in its VALUES clause. */ initStringInfo(&sql); rebuildInsertSql(&sql, fmstate->rel, fmstate->orig_query, fmstate->target_attrs, fmstate->values_end, fmstate->p_nums, *numSlots - 1); pfree(fmstate->query); fmstate->query = sql.data; fmstate->num_slots = *numSlots; } /* Set up the prepared statement on the remote server, if we didn't yet */ if (!fmstate->p_name) prepare_foreign_modify(fmstate); /* * For UPDATE/DELETE, get the ctid that was passed up as a resjunk column */ if (operation == CMD_UPDATE || operation == CMD_DELETE) { Datum datum; bool isNull; datum = ExecGetJunkAttribute(planSlots[0], fmstate->ctidAttno, &isNull); /* shouldn't ever get a null result... */ if (isNull) elog(ERROR, "ctid is NULL"); ctid = (ItemPointer) DatumGetPointer(datum); } /* Convert parameters needed by prepared statement to text form */ p_values = convert_prep_stmt_params(fmstate, ctid, slots, *numSlots); /* * Execute the prepared statement. */ if (!PQsendQueryPrepared(fmstate->conn, fmstate->p_name, fmstate->p_nums * (*numSlots), p_values, NULL, NULL, 0)) pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); /* * Get the result, and check for success. * * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ res = pgfdw_get_result(fmstate->conn, fmstate->query); if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); /* Check number of rows affected, and fetch RETURNING tuple if any */ if (fmstate->has_returning) { Assert(*numSlots == 1); n_rows = PQntuples(res); if (n_rows > 0) store_returning_result(fmstate, slots[0], res); } else n_rows = atoi(PQcmdTuples(res)); /* And clean up */ PQclear(res); MemoryContextReset(fmstate->temp_cxt); *numSlots = n_rows; /* * Return NULL if nothing was inserted/updated/deleted on the remote end */ return (n_rows > 0) ? slots : NULL; } /* * prepare_foreign_modify * Establish a prepared statement for execution of INSERT/UPDATE/DELETE */ static void prepare_foreign_modify(PgFdwModifyState *fmstate) { char prep_name[NAMEDATALEN]; char *p_name; PGresult *res; /* * The caller would already have processed a pending asynchronous request * if any, so no need to do it here. */ /* Construct name we'll use for the prepared statement. */ snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u", GetPrepStmtNumber(fmstate->conn)); p_name = pstrdup(prep_name); /* * We intentionally do not specify parameter types here, but leave the * remote server to derive them by default. This avoids possible problems * with the remote server using different type OIDs than we do. All of * the prepared statements we use in this module are simple enough that * the remote server will make the right choices. */ if (!PQsendPrepare(fmstate->conn, p_name, fmstate->query, 0, NULL)) pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); /* * Get the result, and check for success. * * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ res = pgfdw_get_result(fmstate->conn, fmstate->query); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); PQclear(res); /* This action shows that the prepare has been done. */ fmstate->p_name = p_name; } /* * convert_prep_stmt_params * Create array of text strings representing parameter values * * tupleid is ctid to send, or NULL if none * slot is slot to get remaining parameters from, or NULL if none * * Data is constructed in temp_cxt; caller should reset that after use. */ static const char ** convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, TupleTableSlot **slots, int numSlots) { const char **p_values; int i; int j; int pindex = 0; MemoryContext oldcontext; oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt); p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums * numSlots); /* ctid is provided only for UPDATE/DELETE, which don't allow batching */ Assert(!(tupleid != NULL && numSlots > 1)); /* 1st parameter should be ctid, if it's in use */ if (tupleid != NULL) { Assert(numSlots == 1); /* don't need set_transmission_modes for TID output */ p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], PointerGetDatum(tupleid)); pindex++; } /* get following parameters from slots */ if (slots != NULL && fmstate->target_attrs != NIL) { TupleDesc tupdesc = RelationGetDescr(fmstate->rel); int nestlevel; ListCell *lc; nestlevel = set_transmission_modes(); for (i = 0; i < numSlots; i++) { j = (tupleid != NULL) ? 1 : 0; foreach(lc, fmstate->target_attrs) { int attnum = lfirst_int(lc); Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); Datum value; bool isnull; /* Ignore generated columns; they are set to DEFAULT */ if (attr->attgenerated) continue; value = slot_getattr(slots[i], attnum, &isnull); if (isnull) p_values[pindex] = NULL; else p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[j], value); pindex++; j++; } } reset_transmission_modes(nestlevel); } Assert(pindex == fmstate->p_nums * numSlots); MemoryContextSwitchTo(oldcontext); return p_values; } /* * store_returning_result * Store the result of a RETURNING clause * * On error, be sure to release the PGresult on the way out. Callers do not * have PG_TRY blocks to ensure this happens. */ static void store_returning_result(PgFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res) { PG_TRY(); { HeapTuple newtup; newtup = make_tuple_from_result_row(res, 0, fmstate->rel, fmstate->attinmeta, fmstate->retrieved_attrs, NULL, fmstate->temp_cxt); /* * The returning slot will not necessarily be suitable to store * heaptuples directly, so allow for conversion. */ ExecForceStoreHeapTuple(newtup, slot, true); } PG_CATCH(); { if (res) PQclear(res); PG_RE_THROW(); } PG_END_TRY(); } /* * finish_foreign_modify * Release resources for a foreign insert/update/delete operation */ static void finish_foreign_modify(PgFdwModifyState *fmstate) { Assert(fmstate != NULL); /* If we created a prepared statement, destroy it */ deallocate_query(fmstate); /* Release remote connection */ ReleaseConnection(fmstate->conn); fmstate->conn = NULL; } /* * deallocate_query * Deallocate a prepared statement for a foreign insert/update/delete * operation */ static void deallocate_query(PgFdwModifyState *fmstate) { char sql[64]; PGresult *res; /* do nothing if the query is not allocated */ if (!fmstate->p_name) return; snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name); /* * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ res = pgfdw_exec_query(fmstate->conn, sql, fmstate->conn_state); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); PQclear(res); pfree(fmstate->p_name); fmstate->p_name = NULL; } /* * build_remote_returning * Build a RETURNING targetlist of a remote query for performing an * UPDATE/DELETE .. RETURNING on a join directly */ static List * build_remote_returning(Index rtindex, Relation rel, List *returningList) { bool have_wholerow = false; List *tlist = NIL; List *vars; ListCell *lc; Assert(returningList); vars = pull_var_clause((Node *) returningList, PVC_INCLUDE_PLACEHOLDERS); /* * If there's a whole-row reference to the target relation, then we'll * need all the columns of the relation. */ foreach(lc, vars) { Var *var = (Var *) lfirst(lc); if (IsA(var, Var) && var->varno == rtindex && var->varattno == InvalidAttrNumber) { have_wholerow = true; break; } } if (have_wholerow) { TupleDesc tupdesc = RelationGetDescr(rel); int i; for (i = 1; i <= tupdesc->natts; i++) { Form_pg_attribute attr = TupleDescAttr(tupdesc, i - 1); Var *var; /* Ignore dropped attributes. */ if (attr->attisdropped) continue; var = makeVar(rtindex, i, attr->atttypid, attr->atttypmod, attr->attcollation, 0); tlist = lappend(tlist, makeTargetEntry((Expr *) var, list_length(tlist) + 1, NULL, false)); } } /* Now add any remaining columns to tlist. */ foreach(lc, vars) { Var *var = (Var *) lfirst(lc); /* * No need for whole-row references to the target relation. We don't * need system columns other than ctid and oid either, since those are * set locally. */ if (IsA(var, Var) && var->varno == rtindex && var->varattno <= InvalidAttrNumber && var->varattno != SelfItemPointerAttributeNumber) continue; /* don't need it */ if (tlist_member((Expr *) var, tlist)) continue; /* already got it */ tlist = lappend(tlist, makeTargetEntry((Expr *) var, list_length(tlist) + 1, NULL, false)); } list_free(vars); return tlist; } /* * rebuild_fdw_scan_tlist * Build new fdw_scan_tlist of given foreign-scan plan node from given * tlist * * There might be columns that the fdw_scan_tlist of the given foreign-scan * plan node contains that the given tlist doesn't. The fdw_scan_tlist would * have contained resjunk columns such as 'ctid' of the target relation and * 'wholerow' of non-target relations, but the tlist might not contain them, * for example. So, adjust the tlist so it contains all the columns specified * in the fdw_scan_tlist; else setrefs.c will get confused. */ static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist) { List *new_tlist = tlist; List *old_tlist = fscan->fdw_scan_tlist; ListCell *lc; foreach(lc, old_tlist) { TargetEntry *tle = (TargetEntry *) lfirst(lc); if (tlist_member(tle->expr, new_tlist)) continue; /* already got it */ new_tlist = lappend(new_tlist, makeTargetEntry(tle->expr, list_length(new_tlist) + 1, NULL, false)); } fscan->fdw_scan_tlist = new_tlist; } /* * Execute a direct UPDATE/DELETE statement. */ static void execute_dml_stmt(ForeignScanState *node) { PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state; ExprContext *econtext = node->ss.ps.ps_ExprContext; int numParams = dmstate->numParams; const char **values = dmstate->param_values; /* First, process a pending asynchronous request, if any. */ if (dmstate->conn_state->pendingAreq) process_pending_request(dmstate->conn_state->pendingAreq); /* * Construct array of query parameter values in text format. */ if (numParams > 0) process_query_params(econtext, dmstate->param_flinfo, dmstate->param_exprs, values); /* * Notice that we pass NULL for paramTypes, thus forcing the remote server * to infer types for all parameters. Since we explicitly cast every * parameter (see deparse.c), the "inference" is trivial and will produce * the desired result. This allows us to avoid assuming that the remote * server has the same OIDs we do for the parameters' types. */ if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams, NULL, values, NULL, NULL, 0)) pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query); /* * Get the result, and check for success. * * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query); if (PQresultStatus(dmstate->result) != (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true, dmstate->query); /* Get the number of rows affected. */ if (dmstate->has_returning) dmstate->num_tuples = PQntuples(dmstate->result); else dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result)); } /* * Get the result of a RETURNING clause. */ static TupleTableSlot * get_returning_data(ForeignScanState *node) { PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state; EState *estate = node->ss.ps.state; ResultRelInfo *resultRelInfo = node->resultRelInfo; TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; TupleTableSlot *resultSlot; Assert(resultRelInfo->ri_projectReturning); /* If we didn't get any tuples, must be end of data. */ if (dmstate->next_tuple >= dmstate->num_tuples) return ExecClearTuple(slot); /* Increment the command es_processed count if necessary. */ if (dmstate->set_processed) estate->es_processed += 1; /* * Store a RETURNING tuple. If has_returning is false, just emit a dummy * tuple. (has_returning is false when the local query is of the form * "UPDATE/DELETE .. RETURNING 1" for example.) */ if (!dmstate->has_returning) { ExecStoreAllNullTuple(slot); resultSlot = slot; } else { /* * On error, be sure to release the PGresult on the way out. Callers * do not have PG_TRY blocks to ensure this happens. */ PG_TRY(); { HeapTuple newtup; newtup = make_tuple_from_result_row(dmstate->result, dmstate->next_tuple, dmstate->rel, dmstate->attinmeta, dmstate->retrieved_attrs, node, dmstate->temp_cxt); ExecStoreHeapTuple(newtup, slot, false); } PG_CATCH(); { if (dmstate->result) PQclear(dmstate->result); PG_RE_THROW(); } PG_END_TRY(); /* Get the updated/deleted tuple. */ if (dmstate->rel) resultSlot = slot; else resultSlot = apply_returning_filter(dmstate, resultRelInfo, slot, estate); } dmstate->next_tuple++; /* Make slot available for evaluation of the local query RETURNING list. */ resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple = resultSlot; return slot; } /* * Initialize a filter to extract an updated/deleted tuple from a scan tuple. */ static void init_returning_filter(PgFdwDirectModifyState *dmstate, List *fdw_scan_tlist, Index rtindex) { TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel); ListCell *lc; int i; /* * Calculate the mapping between the fdw_scan_tlist's entries and the * result tuple's attributes. * * The "map" is an array of indexes of the result tuple's attributes in * fdw_scan_tlist, i.e., one entry for every attribute of the result * tuple. We store zero for any attributes that don't have the * corresponding entries in that list, marking that a NULL is needed in * the result tuple. * * Also get the indexes of the entries for ctid and oid if any. */ dmstate->attnoMap = (AttrNumber *) palloc0(resultTupType->natts * sizeof(AttrNumber)); dmstate->ctidAttno = dmstate->oidAttno = 0; i = 1; dmstate->hasSystemCols = false; foreach(lc, fdw_scan_tlist) { TargetEntry *tle = (TargetEntry *) lfirst(lc); Var *var = (Var *) tle->expr; Assert(IsA(var, Var)); /* * If the Var is a column of the target relation to be retrieved from * the foreign server, get the index of the entry. */ if (var->varno == rtindex && list_member_int(dmstate->retrieved_attrs, i)) { int attrno = var->varattno; if (attrno < 0) { /* * We don't retrieve system columns other than ctid and oid. */ if (attrno == SelfItemPointerAttributeNumber) dmstate->ctidAttno = i; else Assert(false); dmstate->hasSystemCols = true; } else { /* * We don't retrieve whole-row references to the target * relation either. */ Assert(attrno > 0); dmstate->attnoMap[attrno - 1] = i; } } i++; } } /* * Extract and return an updated/deleted tuple from a scan tuple. */ static TupleTableSlot * apply_returning_filter(PgFdwDirectModifyState *dmstate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate) { TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel); TupleTableSlot *resultSlot; Datum *values; bool *isnull; Datum *old_values; bool *old_isnull; int i; /* * Use the return tuple slot as a place to store the result tuple. */ resultSlot = ExecGetReturningSlot(estate, resultRelInfo); /* * Extract all the values of the scan tuple. */ slot_getallattrs(slot); old_values = slot->tts_values; old_isnull = slot->tts_isnull; /* * Prepare to build the result tuple. */ ExecClearTuple(resultSlot); values = resultSlot->tts_values; isnull = resultSlot->tts_isnull; /* * Transpose data into proper fields of the result tuple. */ for (i = 0; i < resultTupType->natts; i++) { int j = dmstate->attnoMap[i]; if (j == 0) { values[i] = (Datum) 0; isnull[i] = true; } else { values[i] = old_values[j - 1]; isnull[i] = old_isnull[j - 1]; } } /* * Build the virtual tuple. */ ExecStoreVirtualTuple(resultSlot); /* * If we have any system columns to return, materialize a heap tuple in * the slot from column values set above and install system columns in * that tuple. */ if (dmstate->hasSystemCols) { HeapTuple resultTup = ExecFetchSlotHeapTuple(resultSlot, true, NULL); /* ctid */ if (dmstate->ctidAttno) { ItemPointer ctid = NULL; ctid = (ItemPointer) DatumGetPointer(old_values[dmstate->ctidAttno - 1]); resultTup->t_self = *ctid; } /* * And remaining columns * * Note: since we currently don't allow the target relation to appear * on the nullable side of an outer join, any system columns wouldn't * go to NULL. * * Note: no need to care about tableoid here because it will be * initialized in ExecProcessReturning(). */ HeapTupleHeaderSetXmin(resultTup->t_data, InvalidTransactionId); HeapTupleHeaderSetXmax(resultTup->t_data, InvalidTransactionId); HeapTupleHeaderSetCmin(resultTup->t_data, InvalidTransactionId); } /* * And return the result tuple. */ return resultSlot; } /* * Prepare for processing of parameters used in remote query. */ static void prepare_query_params(PlanState *node, List *fdw_exprs, int numParams, FmgrInfo **param_flinfo, List **param_exprs, const char ***param_values) { int i; ListCell *lc; Assert(numParams > 0); /* Prepare for output conversion of parameters used in remote query. */ *param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams); i = 0; foreach(lc, fdw_exprs) { Node *param_expr = (Node *) lfirst(lc); Oid typefnoid; bool isvarlena; getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena); fmgr_info(typefnoid, &(*param_flinfo)[i]); i++; } /* * Prepare remote-parameter expressions for evaluation. (Note: in * practice, we expect that all these expressions will be just Params, so * we could possibly do something more efficient than using the full * expression-eval machinery for this. But probably there would be little * benefit, and it'd require postgres_fdw to know more than is desirable * about Param evaluation.) */ *param_exprs = ExecInitExprList(fdw_exprs, node); /* Allocate buffer for text form of query parameters. */ *param_values = (const char **) palloc0(numParams * sizeof(char *)); } /* * Construct array of query parameter values in text format. */ static void process_query_params(ExprContext *econtext, FmgrInfo *param_flinfo, List *param_exprs, const char **param_values) { int nestlevel; int i; ListCell *lc; nestlevel = set_transmission_modes(); i = 0; foreach(lc, param_exprs) { ExprState *expr_state = (ExprState *) lfirst(lc); Datum expr_value; bool isNull; /* Evaluate the parameter expression */ expr_value = ExecEvalExpr(expr_state, econtext, &isNull); /* * Get string representation of each parameter value by invoking * type-specific output function, unless the value is null. */ if (isNull) param_values[i] = NULL; else param_values[i] = OutputFunctionCall(¶m_flinfo[i], expr_value); i++; } reset_transmission_modes(nestlevel); } /* * postgresAnalyzeForeignTable * Test whether analyzing this foreign table is supported */ static bool postgresAnalyzeForeignTable(Relation relation, AcquireSampleRowsFunc *func, BlockNumber *totalpages) { ForeignTable *table; UserMapping *user; PGconn *conn; StringInfoData sql; PGresult *volatile res = NULL; /* Return the row-analysis function pointer */ *func = postgresAcquireSampleRowsFunc; /* * Now we have to get the number of pages. It's annoying that the ANALYZE * API requires us to return that now, because it forces some duplication * of effort between this routine and postgresAcquireSampleRowsFunc. But * it's probably not worth redefining that API at this point. */ /* * Get the connection to use. We do the remote access as the table's * owner, even if the ANALYZE was started by some other user. */ table = GetForeignTable(RelationGetRelid(relation)); user = GetUserMapping(relation->rd_rel->relowner, table->serverid); conn = GetConnection(user, false, NULL); /* * Construct command to get page count for relation. */ initStringInfo(&sql); deparseAnalyzeSizeSql(&sql, relation); /* In what follows, do not risk leaking any PGresults. */ PG_TRY(); { res = pgfdw_exec_query(conn, sql.data, NULL); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); if (PQntuples(res) != 1 || PQnfields(res) != 1) elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query"); *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10); } PG_FINALLY(); { if (res) PQclear(res); } PG_END_TRY(); ReleaseConnection(conn); return true; } /* * Acquire a random sample of rows from foreign table managed by postgres_fdw. * * We fetch the whole table from the remote side and pick out some sample rows. * * Selected rows are returned in the caller-allocated array rows[], * which must have at least targrows entries. * The actual number of rows selected is returned as the function result. * We also count the total number of rows in the table and return it into * *totalrows. Note that *totaldeadrows is always set to 0. * * Note that the returned list of rows is not always in order by physical * position in the table. Therefore, correlation estimates derived later * may be meaningless, but it's OK because we don't use the estimates * currently (the planner only pays attention to correlation for indexscans). */ static int postgresAcquireSampleRowsFunc(Relation relation, int elevel, HeapTuple *rows, int targrows, double *totalrows, double *totaldeadrows) { PgFdwAnalyzeState astate; ForeignTable *table; ForeignServer *server; UserMapping *user; PGconn *conn; unsigned int cursor_number; StringInfoData sql; PGresult *volatile res = NULL; /* Initialize workspace state */ astate.rel = relation; astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation)); astate.rows = rows; astate.targrows = targrows; astate.numrows = 0; astate.samplerows = 0; astate.rowstoskip = -1; /* -1 means not set yet */ reservoir_init_selection_state(&astate.rstate, targrows); /* Remember ANALYZE context, and create a per-tuple temp context */ astate.anl_cxt = CurrentMemoryContext; astate.temp_cxt = AllocSetContextCreate(CurrentMemoryContext, "postgres_fdw temporary data", ALLOCSET_SMALL_SIZES); /* * Get the connection to use. We do the remote access as the table's * owner, even if the ANALYZE was started by some other user. */ table = GetForeignTable(RelationGetRelid(relation)); server = GetForeignServer(table->serverid); user = GetUserMapping(relation->rd_rel->relowner, table->serverid); conn = GetConnection(user, false, NULL); /* * Construct cursor that retrieves whole rows from remote. */ cursor_number = GetCursorNumber(conn); initStringInfo(&sql); appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number); deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs); /* In what follows, do not risk leaking any PGresults. */ PG_TRY(); { char fetch_sql[64]; int fetch_size; ListCell *lc; res = pgfdw_exec_query(conn, sql.data, NULL); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); PQclear(res); res = NULL; /* * Determine the fetch size. The default is arbitrary, but shouldn't * be enormous. */ fetch_size = 100; foreach(lc, server->options) { DefElem *def = (DefElem *) lfirst(lc); if (strcmp(def->defname, "fetch_size") == 0) { (void) parse_int(defGetString(def), &fetch_size, 0, NULL); break; } } foreach(lc, table->options) { DefElem *def = (DefElem *) lfirst(lc); if (strcmp(def->defname, "fetch_size") == 0) { (void) parse_int(defGetString(def), &fetch_size, 0, NULL); break; } } /* Construct command to fetch rows from remote. */ snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u", fetch_size, cursor_number); /* Retrieve and process rows a batch at a time. */ for (;;) { int numrows; int i; /* Allow users to cancel long query */ CHECK_FOR_INTERRUPTS(); /* * XXX possible future improvement: if rowstoskip is large, we * could issue a MOVE rather than physically fetching the rows, * then just adjust rowstoskip and samplerows appropriately. */ /* Fetch some rows */ res = pgfdw_exec_query(conn, fetch_sql, NULL); /* On error, report the original query, not the FETCH. */ if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); /* Process whatever we got. */ numrows = PQntuples(res); for (i = 0; i < numrows; i++) analyze_row_processor(res, i, &astate); PQclear(res); res = NULL; /* Must be EOF if we didn't get all the rows requested. */ if (numrows < fetch_size) break; } /* Close the cursor, just to be tidy. */ close_cursor(conn, cursor_number, NULL); } PG_CATCH(); { if (res) PQclear(res); PG_RE_THROW(); } PG_END_TRY(); ReleaseConnection(conn); /* We assume that we have no dead tuple. */ *totaldeadrows = 0.0; /* We've retrieved all living tuples from foreign server. */ *totalrows = astate.samplerows; /* * Emit some interesting relation info */ ereport(elevel, (errmsg("\"%s\": table contains %.0f rows, %d rows in sample", RelationGetRelationName(relation), astate.samplerows, astate.numrows))); return astate.numrows; } /* * Collect sample rows from the result of query. * - Use all tuples in sample until target # of samples are collected. * - Subsequently, replace already-sampled tuples randomly. */ static void analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate) { int targrows = astate->targrows; int pos; /* array index to store tuple in */ MemoryContext oldcontext; /* Always increment sample row counter. */ astate->samplerows += 1; /* * Determine the slot where this sample row should be stored. Set pos to * negative value to indicate the row should be skipped. */ if (astate->numrows < targrows) { /* First targrows rows are always included into the sample */ pos = astate->numrows++; } else { /* * Now we start replacing tuples in the sample until we reach the end * of the relation. Same algorithm as in acquire_sample_rows in * analyze.c; see Jeff Vitter's paper. */ if (astate->rowstoskip < 0) astate->rowstoskip = reservoir_get_next_S(&astate->rstate, astate->samplerows, targrows); if (astate->rowstoskip <= 0) { /* Choose a random reservoir element to replace. */ pos = (int) (targrows * sampler_random_fract(&astate->rstate.randstate)); Assert(pos >= 0 && pos < targrows); heap_freetuple(astate->rows[pos]); } else { /* Skip this tuple. */ pos = -1; } astate->rowstoskip -= 1; } if (pos >= 0) { /* * Create sample tuple from current result row, and store it in the * position determined above. The tuple has to be created in anl_cxt. */ oldcontext = MemoryContextSwitchTo(astate->anl_cxt); astate->rows[pos] = make_tuple_from_result_row(res, row, astate->rel, astate->attinmeta, astate->retrieved_attrs, NULL, astate->temp_cxt); MemoryContextSwitchTo(oldcontext); } } /* * Import a foreign schema */ static List * postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) { List *commands = NIL; bool import_collate = true; bool import_default = false; bool import_generated = true; bool import_not_null = true; ForeignServer *server; UserMapping *mapping; PGconn *conn; StringInfoData buf; PGresult *volatile res = NULL; int numrows, i; ListCell *lc; /* Parse statement options */ foreach(lc, stmt->options) { DefElem *def = (DefElem *) lfirst(lc); if (strcmp(def->defname, "import_collate") == 0) import_collate = defGetBoolean(def); else if (strcmp(def->defname, "import_default") == 0) import_default = defGetBoolean(def); else if (strcmp(def->defname, "import_generated") == 0) import_generated = defGetBoolean(def); else if (strcmp(def->defname, "import_not_null") == 0) import_not_null = defGetBoolean(def); else ereport(ERROR, (errcode(ERRCODE_FDW_INVALID_OPTION_NAME), errmsg("invalid option \"%s\"", def->defname))); } /* * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ server = GetForeignServer(serverOid); mapping = GetUserMapping(GetUserId(), server->serverid); conn = GetConnection(mapping, false, NULL); /* Don't attempt to import collation if remote server hasn't got it */ if (PQserverVersion(conn) < 90100) import_collate = false; /* Create workspace for strings */ initStringInfo(&buf); /* In what follows, do not risk leaking any PGresults. */ PG_TRY(); { /* Check that the schema really exists */ appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = "); deparseStringLiteral(&buf, stmt->remote_schema); res = pgfdw_exec_query(conn, buf.data, NULL); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, buf.data); if (PQntuples(res) != 1) ereport(ERROR, (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND), errmsg("schema \"%s\" is not present on foreign server \"%s\"", stmt->remote_schema, server->servername))); PQclear(res); res = NULL; resetStringInfo(&buf); /* * Fetch all table data from this schema, possibly restricted by * EXCEPT or LIMIT TO. (We don't actually need to pay any attention * to EXCEPT/LIMIT TO here, because the core code will filter the * statements we return according to those lists anyway. But it * should save a few cycles to not process excluded tables in the * first place.) * * Import table data for partitions only when they are explicitly * specified in LIMIT TO clause. Otherwise ignore them and only * include the definitions of the root partitioned tables to allow * access to the complete remote data set locally in the schema * imported. * * Note: because we run the connection with search_path restricted to * pg_catalog, the format_type() and pg_get_expr() outputs will always * include a schema name for types/functions in other schemas, which * is what we want. */ appendStringInfoString(&buf, "SELECT relname, " " attname, " " format_type(atttypid, atttypmod), " " attnotnull, " " pg_get_expr(adbin, adrelid), "); /* Generated columns are supported since Postgres 12 */ if (PQserverVersion(conn) >= 120000) appendStringInfoString(&buf, " attgenerated, "); else appendStringInfoString(&buf, " NULL, "); if (import_collate) appendStringInfoString(&buf, " collname, " " collnsp.nspname "); else appendStringInfoString(&buf, " NULL, NULL "); appendStringInfoString(&buf, "FROM pg_class c " " JOIN pg_namespace n ON " " relnamespace = n.oid " " LEFT JOIN pg_attribute a ON " " attrelid = c.oid AND attnum > 0 " " AND NOT attisdropped " " LEFT JOIN pg_attrdef ad ON " " adrelid = c.oid AND adnum = attnum "); if (import_collate) appendStringInfoString(&buf, " LEFT JOIN pg_collation coll ON " " coll.oid = attcollation " " LEFT JOIN pg_namespace collnsp ON " " collnsp.oid = collnamespace "); appendStringInfoString(&buf, "WHERE c.relkind IN (" CppAsString2(RELKIND_RELATION) "," CppAsString2(RELKIND_VIEW) "," CppAsString2(RELKIND_FOREIGN_TABLE) "," CppAsString2(RELKIND_MATVIEW) "," CppAsString2(RELKIND_PARTITIONED_TABLE) ") " " AND n.nspname = "); deparseStringLiteral(&buf, stmt->remote_schema); /* Partitions are supported since Postgres 10 */ if (PQserverVersion(conn) >= 100000 && stmt->list_type != FDW_IMPORT_SCHEMA_LIMIT_TO) appendStringInfoString(&buf, " AND NOT c.relispartition "); /* Apply restrictions for LIMIT TO and EXCEPT */ if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO || stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT) { bool first_item = true; appendStringInfoString(&buf, " AND c.relname "); if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT) appendStringInfoString(&buf, "NOT "); appendStringInfoString(&buf, "IN ("); /* Append list of table names within IN clause */ foreach(lc, stmt->table_list) { RangeVar *rv = (RangeVar *) lfirst(lc); if (first_item) first_item = false; else appendStringInfoString(&buf, ", "); deparseStringLiteral(&buf, rv->relname); } appendStringInfoChar(&buf, ')'); } /* Append ORDER BY at the end of query to ensure output ordering */ appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum"); /* Fetch the data */ res = pgfdw_exec_query(conn, buf.data, NULL); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, buf.data); /* Process results */ numrows = PQntuples(res); /* note: incrementation of i happens in inner loop's while() test */ for (i = 0; i < numrows;) { char *tablename = PQgetvalue(res, i, 0); bool first_item = true; resetStringInfo(&buf); appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n", quote_identifier(tablename)); /* Scan all rows for this table */ do { char *attname; char *typename; char *attnotnull; char *attgenerated; char *attdefault; char *collname; char *collnamespace; /* If table has no columns, we'll see nulls here */ if (PQgetisnull(res, i, 1)) continue; attname = PQgetvalue(res, i, 1); typename = PQgetvalue(res, i, 2); attnotnull = PQgetvalue(res, i, 3); attdefault = PQgetisnull(res, i, 4) ? (char *) NULL : PQgetvalue(res, i, 4); attgenerated = PQgetisnull(res, i, 5) ? (char *) NULL : PQgetvalue(res, i, 5); collname = PQgetisnull(res, i, 6) ? (char *) NULL : PQgetvalue(res, i, 6); collnamespace = PQgetisnull(res, i, 7) ? (char *) NULL : PQgetvalue(res, i, 7); if (first_item) first_item = false; else appendStringInfoString(&buf, ",\n"); /* Print column name and type */ appendStringInfo(&buf, " %s %s", quote_identifier(attname), typename); /* * Add column_name option so that renaming the foreign table's * column doesn't break the association to the underlying * column. */ appendStringInfoString(&buf, " OPTIONS (column_name "); deparseStringLiteral(&buf, attname); appendStringInfoChar(&buf, ')'); /* Add COLLATE if needed */ if (import_collate && collname != NULL && collnamespace != NULL) appendStringInfo(&buf, " COLLATE %s.%s", quote_identifier(collnamespace), quote_identifier(collname)); /* Add DEFAULT if needed */ if (import_default && attdefault != NULL && (!attgenerated || !attgenerated[0])) appendStringInfo(&buf, " DEFAULT %s", attdefault); /* Add GENERATED if needed */ if (import_generated && attgenerated != NULL && attgenerated[0] == ATTRIBUTE_GENERATED_STORED) { Assert(attdefault != NULL); appendStringInfo(&buf, " GENERATED ALWAYS AS (%s) STORED", attdefault); } /* Add NOT NULL if needed */ if (import_not_null && attnotnull[0] == 't') appendStringInfoString(&buf, " NOT NULL"); } while (++i < numrows && strcmp(PQgetvalue(res, i, 0), tablename) == 0); /* * Add server name and table-level options. We specify remote * schema and table name as options (the latter to ensure that * renaming the foreign table doesn't break the association). */ appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (", quote_identifier(server->servername)); appendStringInfoString(&buf, "schema_name "); deparseStringLiteral(&buf, stmt->remote_schema); appendStringInfoString(&buf, ", table_name "); deparseStringLiteral(&buf, tablename); appendStringInfoString(&buf, ");"); commands = lappend(commands, pstrdup(buf.data)); } } PG_FINALLY(); { if (res) PQclear(res); } PG_END_TRY(); ReleaseConnection(conn); return commands; } /* * Assess whether the join between inner and outer relations can be pushed down * to the foreign server. As a side effect, save information we obtain in this * function to PgFdwRelationInfo passed in. */ static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel, JoinPathExtraData *extra) { PgFdwRelationInfo *fpinfo; PgFdwRelationInfo *fpinfo_o; PgFdwRelationInfo *fpinfo_i; ListCell *lc; List *joinclauses; /* * We support pushing down INNER, LEFT, RIGHT and FULL OUTER joins. * Constructing queries representing SEMI and ANTI joins is hard, hence * not considered right now. */ if (jointype != JOIN_INNER && jointype != JOIN_LEFT && jointype != JOIN_RIGHT && jointype != JOIN_FULL) return false; /* * If either of the joining relations is marked as unsafe to pushdown, the * join can not be pushed down. */ fpinfo = (PgFdwRelationInfo *) joinrel->fdw_private; fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private; fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private; if (!fpinfo_o || !fpinfo_o->pushdown_safe || !fpinfo_i || !fpinfo_i->pushdown_safe) return false; /* * If joining relations have local conditions, those conditions are * required to be applied before joining the relations. Hence the join can * not be pushed down. */ if (fpinfo_o->local_conds || fpinfo_i->local_conds) return false; /* * Merge FDW options. We might be tempted to do this after we have deemed * the foreign join to be OK. But we must do this beforehand so that we * know which quals can be evaluated on the foreign server, which might * depend on shippable_extensions. */ fpinfo->server = fpinfo_o->server; merge_fdw_options(fpinfo, fpinfo_o, fpinfo_i); /* * Separate restrict list into join quals and pushed-down (other) quals. * * Join quals belonging to an outer join must all be shippable, else we * cannot execute the join remotely. Add such quals to 'joinclauses'. * * Add other quals to fpinfo->remote_conds if they are shippable, else to * fpinfo->local_conds. In an inner join it's okay to execute conditions * either locally or remotely; the same is true for pushed-down conditions * at an outer join. * * Note we might return failure after having already scribbled on * fpinfo->remote_conds and fpinfo->local_conds. That's okay because we * won't consult those lists again if we deem the join unshippable. */ joinclauses = NIL; foreach(lc, extra->restrictlist) { RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc); bool is_remote_clause = is_foreign_expr(root, joinrel, rinfo->clause); if (IS_OUTER_JOIN(jointype) && !RINFO_IS_PUSHED_DOWN(rinfo, joinrel->relids)) { if (!is_remote_clause) return false; joinclauses = lappend(joinclauses, rinfo); } else { if (is_remote_clause) fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo); else fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo); } } /* * deparseExplicitTargetList() isn't smart enough to handle anything other * than a Var. In particular, if there's some PlaceHolderVar that would * need to be evaluated within this join tree (because there's an upper * reference to a quantity that may go to NULL as a result of an outer * join), then we can't try to push the join down because we'll fail when * we get to deparseExplicitTargetList(). However, a PlaceHolderVar that * needs to be evaluated *at the top* of this join tree is OK, because we * can do that locally after fetching the results from the remote side. */ foreach(lc, root->placeholder_list) { PlaceHolderInfo *phinfo = lfirst(lc); Relids relids; /* PlaceHolderInfo refers to parent relids, not child relids. */ relids = IS_OTHER_REL(joinrel) ? joinrel->top_parent_relids : joinrel->relids; if (bms_is_subset(phinfo->ph_eval_at, relids) && bms_nonempty_difference(relids, phinfo->ph_eval_at)) return false; } /* Save the join clauses, for later use. */ fpinfo->joinclauses = joinclauses; fpinfo->outerrel = outerrel; fpinfo->innerrel = innerrel; fpinfo->jointype = jointype; /* * By default, both the input relations are not required to be deparsed as * subqueries, but there might be some relations covered by the input * relations that are required to be deparsed as subqueries, so save the * relids of those relations for later use by the deparser. */ fpinfo->make_outerrel_subquery = false; fpinfo->make_innerrel_subquery = false; Assert(bms_is_subset(fpinfo_o->lower_subquery_rels, outerrel->relids)); Assert(bms_is_subset(fpinfo_i->lower_subquery_rels, innerrel->relids)); fpinfo->lower_subquery_rels = bms_union(fpinfo_o->lower_subquery_rels, fpinfo_i->lower_subquery_rels); /* * Pull the other remote conditions from the joining relations into join * clauses or other remote clauses (remote_conds) of this relation * wherever possible. This avoids building subqueries at every join step. * * For an inner join, clauses from both the relations are added to the * other remote clauses. For LEFT and RIGHT OUTER join, the clauses from * the outer side are added to remote_conds since those can be evaluated * after the join is evaluated. The clauses from inner side are added to * the joinclauses, since they need to be evaluated while constructing the * join. * * For a FULL OUTER JOIN, the other clauses from either relation can not * be added to the joinclauses or remote_conds, since each relation acts * as an outer relation for the other. * * The joining sides can not have local conditions, thus no need to test * shippability of the clauses being pulled up. */ switch (jointype) { case JOIN_INNER: fpinfo->remote_conds = list_concat(fpinfo->remote_conds, fpinfo_i->remote_conds); fpinfo->remote_conds = list_concat(fpinfo->remote_conds, fpinfo_o->remote_conds); break; case JOIN_LEFT: fpinfo->joinclauses = list_concat(fpinfo->joinclauses, fpinfo_i->remote_conds); fpinfo->remote_conds = list_concat(fpinfo->remote_conds, fpinfo_o->remote_conds); break; case JOIN_RIGHT: fpinfo->joinclauses = list_concat(fpinfo->joinclauses, fpinfo_o->remote_conds); fpinfo->remote_conds = list_concat(fpinfo->remote_conds, fpinfo_i->remote_conds); break; case JOIN_FULL: /* * In this case, if any of the input relations has conditions, we * need to deparse that relation as a subquery so that the * conditions can be evaluated before the join. Remember it in * the fpinfo of this relation so that the deparser can take * appropriate action. Also, save the relids of base relations * covered by that relation for later use by the deparser. */ if (fpinfo_o->remote_conds) { fpinfo->make_outerrel_subquery = true; fpinfo->lower_subquery_rels = bms_add_members(fpinfo->lower_subquery_rels, outerrel->relids); } if (fpinfo_i->remote_conds) { fpinfo->make_innerrel_subquery = true; fpinfo->lower_subquery_rels = bms_add_members(fpinfo->lower_subquery_rels, innerrel->relids); } break; default: /* Should not happen, we have just checked this above */ elog(ERROR, "unsupported join type %d", jointype); } /* * For an inner join, all restrictions can be treated alike. Treating the * pushed down conditions as join conditions allows a top level full outer * join to be deparsed without requiring subqueries. */ if (jointype == JOIN_INNER) { Assert(!fpinfo->joinclauses); fpinfo->joinclauses = fpinfo->remote_conds; fpinfo->remote_conds = NIL; } /* Mark that this join can be pushed down safely */ fpinfo->pushdown_safe = true; /* Get user mapping */ if (fpinfo->use_remote_estimate) { if (fpinfo_o->use_remote_estimate) fpinfo->user = fpinfo_o->user; else fpinfo->user = fpinfo_i->user; } else fpinfo->user = NULL; /* * Set # of retrieved rows and cached relation costs to some negative * value, so that we can detect when they are set to some sensible values, * during one (usually the first) of the calls to estimate_path_cost_size. */ fpinfo->retrieved_rows = -1; fpinfo->rel_startup_cost = -1; fpinfo->rel_total_cost = -1; /* * Set the string describing this join relation to be used in EXPLAIN * output of corresponding ForeignScan. Note that the decoration we add * to the base relation names mustn't include any digits, or it'll confuse * postgresExplainForeignScan. */ fpinfo->relation_name = psprintf("(%s) %s JOIN (%s)", fpinfo_o->relation_name, get_jointype_name(fpinfo->jointype), fpinfo_i->relation_name); /* * Set the relation index. This is defined as the position of this * joinrel in the join_rel_list list plus the length of the rtable list. * Note that since this joinrel is at the end of the join_rel_list list * when we are called, we can get the position by list_length. */ Assert(fpinfo->relation_index == 0); /* shouldn't be set yet */ fpinfo->relation_index = list_length(root->parse->rtable) + list_length(root->join_rel_list); return true; } static void add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel, Path *epq_path) { List *useful_pathkeys_list = NIL; /* List of all pathkeys */ ListCell *lc; useful_pathkeys_list = get_useful_pathkeys_for_relation(root, rel); /* * Before creating sorted paths, arrange for the passed-in EPQ path, if * any, to return columns needed by the parent ForeignScan node so that * they will propagate up through Sort nodes injected below, if necessary. */ if (epq_path != NULL && useful_pathkeys_list != NIL) { PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private; PathTarget *target = copy_pathtarget(epq_path->pathtarget); /* Include columns required for evaluating PHVs in the tlist. */ add_new_columns_to_pathtarget(target, pull_var_clause((Node *) target->exprs, PVC_RECURSE_PLACEHOLDERS)); /* Include columns required for evaluating the local conditions. */ foreach(lc, fpinfo->local_conds) { RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc); add_new_columns_to_pathtarget(target, pull_var_clause((Node *) rinfo->clause, PVC_RECURSE_PLACEHOLDERS)); } /* * If we have added any new columns, adjust the tlist of the EPQ path. * * Note: the plan created using this path will only be used to execute * EPQ checks, where accuracy of the plan cost and width estimates * would not be important, so we do not do set_pathtarget_cost_width() * for the new pathtarget here. See also postgresGetForeignPlan(). */ if (list_length(target->exprs) > list_length(epq_path->pathtarget->exprs)) { /* The EPQ path is a join path, so it is projection-capable. */ Assert(is_projection_capable_path(epq_path)); /* * Use create_projection_path() here, so as to avoid modifying it * in place. */ epq_path = (Path *) create_projection_path(root, rel, epq_path, target); } } /* Create one path for each set of pathkeys we found above. */ foreach(lc, useful_pathkeys_list) { double rows; int width; Cost startup_cost; Cost total_cost; List *useful_pathkeys = lfirst(lc); Path *sorted_epq_path; estimate_path_cost_size(root, rel, NIL, useful_pathkeys, NULL, &rows, &width, &startup_cost, &total_cost); /* * The EPQ path must be at least as well sorted as the path itself, in * case it gets used as input to a mergejoin. */ sorted_epq_path = epq_path; if (sorted_epq_path != NULL && !pathkeys_contained_in(useful_pathkeys, sorted_epq_path->pathkeys)) sorted_epq_path = (Path *) create_sort_path(root, rel, sorted_epq_path, useful_pathkeys, -1.0); if (IS_SIMPLE_REL(rel)) add_path(rel, (Path *) create_foreignscan_path(root, rel, NULL, rows, startup_cost, total_cost, useful_pathkeys, rel->lateral_relids, sorted_epq_path, NIL)); else add_path(rel, (Path *) create_foreign_join_path(root, rel, NULL, rows, startup_cost, total_cost, useful_pathkeys, rel->lateral_relids, sorted_epq_path, NIL)); } } /* * Parse options from foreign server and apply them to fpinfo. * * New options might also require tweaking merge_fdw_options(). */ static void apply_server_options(PgFdwRelationInfo *fpinfo) { ListCell *lc; foreach(lc, fpinfo->server->options) { DefElem *def = (DefElem *) lfirst(lc); if (strcmp(def->defname, "use_remote_estimate") == 0) fpinfo->use_remote_estimate = defGetBoolean(def); else if (strcmp(def->defname, "fdw_startup_cost") == 0) (void) parse_real(defGetString(def), &fpinfo->fdw_startup_cost, 0, NULL); else if (strcmp(def->defname, "fdw_tuple_cost") == 0) (void) parse_real(defGetString(def), &fpinfo->fdw_tuple_cost, 0, NULL); else if (strcmp(def->defname, "extensions") == 0) fpinfo->shippable_extensions = ExtractExtensionList(defGetString(def), false); else if (strcmp(def->defname, "fetch_size") == 0) (void) parse_int(defGetString(def), &fpinfo->fetch_size, 0, NULL); else if (strcmp(def->defname, "async_capable") == 0) fpinfo->async_capable = defGetBoolean(def); } } /* * Parse options from foreign table and apply them to fpinfo. * * New options might also require tweaking merge_fdw_options(). */ static void apply_table_options(PgFdwRelationInfo *fpinfo) { ListCell *lc; foreach(lc, fpinfo->table->options) { DefElem *def = (DefElem *) lfirst(lc); if (strcmp(def->defname, "use_remote_estimate") == 0) fpinfo->use_remote_estimate = defGetBoolean(def); else if (strcmp(def->defname, "fetch_size") == 0) (void) parse_int(defGetString(def), &fpinfo->fetch_size, 0, NULL); else if (strcmp(def->defname, "async_capable") == 0) fpinfo->async_capable = defGetBoolean(def); } } /* * Merge FDW options from input relations into a new set of options for a join * or an upper rel. * * For a join relation, FDW-specific information about the inner and outer * relations is provided using fpinfo_i and fpinfo_o. For an upper relation, * fpinfo_o provides the information for the input relation; fpinfo_i is * expected to NULL. */ static void merge_fdw_options(PgFdwRelationInfo *fpinfo, const PgFdwRelationInfo *fpinfo_o, const PgFdwRelationInfo *fpinfo_i) { /* We must always have fpinfo_o. */ Assert(fpinfo_o); /* fpinfo_i may be NULL, but if present the servers must both match. */ Assert(!fpinfo_i || fpinfo_i->server->serverid == fpinfo_o->server->serverid); /* * Copy the server specific FDW options. (For a join, both relations come * from the same server, so the server options should have the same value * for both relations.) */ fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost; fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost; fpinfo->shippable_extensions = fpinfo_o->shippable_extensions; fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate; fpinfo->fetch_size = fpinfo_o->fetch_size; fpinfo->async_capable = fpinfo_o->async_capable; /* Merge the table level options from either side of the join. */ if (fpinfo_i) { /* * We'll prefer to use remote estimates for this join if any table * from either side of the join is using remote estimates. This is * most likely going to be preferred since they're already willing to * pay the price of a round trip to get the remote EXPLAIN. In any * case it's not entirely clear how we might otherwise handle this * best. */ fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate || fpinfo_i->use_remote_estimate; /* * Set fetch size to maximum of the joining sides, since we are * expecting the rows returned by the join to be proportional to the * relation sizes. */ fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size); /* * We'll prefer to consider this join async-capable if any table from * either side of the join is considered async-capable. This would be * reasonable because in that case the foreign server would have its * own resources to scan that table asynchronously, and the join could * also be computed asynchronously using the resources. */ fpinfo->async_capable = fpinfo_o->async_capable || fpinfo_i->async_capable; } } /* * postgresGetForeignJoinPaths * Add possible ForeignPath to joinrel, if join is safe to push down. */ static void postgresGetForeignJoinPaths(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel, RelOptInfo *innerrel, JoinType jointype, JoinPathExtraData *extra) { PgFdwRelationInfo *fpinfo; ForeignPath *joinpath; double rows; int width; Cost startup_cost; Cost total_cost; Path *epq_path; /* Path to create plan to be executed when * EvalPlanQual gets triggered. */ /* * Skip if this join combination has been considered already. */ if (joinrel->fdw_private) return; /* * This code does not work for joins with lateral references, since those * must have parameterized paths, which we don't generate yet. */ if (!bms_is_empty(joinrel->lateral_relids)) return; /* * Create unfinished PgFdwRelationInfo entry which is used to indicate * that the join relation is already considered, so that we won't waste * time in judging safety of join pushdown and adding the same paths again * if found safe. Once we know that this join can be pushed down, we fill * the entry. */ fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo)); fpinfo->pushdown_safe = false; joinrel->fdw_private = fpinfo; /* attrs_used is only for base relations. */ fpinfo->attrs_used = NULL; /* * If there is a possibility that EvalPlanQual will be executed, we need * to be able to reconstruct the row using scans of the base relations. * GetExistingLocalJoinPath will find a suitable path for this purpose in * the path list of the joinrel, if one exists. We must be careful to * call it before adding any ForeignPath, since the ForeignPath might * dominate the only suitable local path available. We also do it before * calling foreign_join_ok(), since that function updates fpinfo and marks * it as pushable if the join is found to be pushable. */ if (root->parse->commandType == CMD_DELETE || root->parse->commandType == CMD_UPDATE || root->rowMarks) { epq_path = GetExistingLocalJoinPath(joinrel); if (!epq_path) { elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found"); return; } } else epq_path = NULL; if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra)) { /* Free path required for EPQ if we copied one; we don't need it now */ if (epq_path) pfree(epq_path); return; } /* * Compute the selectivity and cost of the local_conds, so we don't have * to do it over again for each path. The best we can do for these * conditions is to estimate selectivity on the basis of local statistics. * The local conditions are applied after the join has been computed on * the remote side like quals in WHERE clause, so pass jointype as * JOIN_INNER. */ fpinfo->local_conds_sel = clauselist_selectivity(root, fpinfo->local_conds, 0, JOIN_INNER, NULL); cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root); /* * If we are going to estimate costs locally, estimate the join clause * selectivity here while we have special join info. */ if (!fpinfo->use_remote_estimate) fpinfo->joinclause_sel = clauselist_selectivity(root, fpinfo->joinclauses, 0, fpinfo->jointype, extra->sjinfo); /* Estimate costs for bare join relation */ estimate_path_cost_size(root, joinrel, NIL, NIL, NULL, &rows, &width, &startup_cost, &total_cost); /* Now update this information in the joinrel */ joinrel->rows = rows; joinrel->reltarget->width = width; fpinfo->rows = rows; fpinfo->width = width; fpinfo->startup_cost = startup_cost; fpinfo->total_cost = total_cost; /* * Create a new join path and add it to the joinrel which represents a * join between foreign tables. */ joinpath = create_foreign_join_path(root, joinrel, NULL, /* default pathtarget */ rows, startup_cost, total_cost, NIL, /* no pathkeys */ joinrel->lateral_relids, epq_path, NIL); /* no fdw_private */ /* Add generated path into joinrel by add_path(). */ add_path(joinrel, (Path *) joinpath); /* Consider pathkeys for the join relation */ add_paths_with_pathkeys_for_rel(root, joinrel, epq_path); /* XXX Consider parameterized paths for the join relation */ } /* * Assess whether the aggregation, grouping and having operations can be pushed * down to the foreign server. As a side effect, save information we obtain in * this function to PgFdwRelationInfo of the input relation. */ static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, Node *havingQual) { Query *query = root->parse; PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private; PathTarget *grouping_target = grouped_rel->reltarget; PgFdwRelationInfo *ofpinfo; ListCell *lc; int i; List *tlist = NIL; /* We currently don't support pushing Grouping Sets. */ if (query->groupingSets) return false; /* Get the fpinfo of the underlying scan relation. */ ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private; /* * If underlying scan relation has any local conditions, those conditions * are required to be applied before performing aggregation. Hence the * aggregate cannot be pushed down. */ if (ofpinfo->local_conds) return false; /* * Examine grouping expressions, as well as other expressions we'd need to * compute, and check whether they are safe to push down to the foreign * server. All GROUP BY expressions will be part of the grouping target * and thus there is no need to search for them separately. Add grouping * expressions into target list which will be passed to foreign server. * * A tricky fine point is that we must not put any expression into the * target list that is just a foreign param (that is, something that * deparse.c would conclude has to be sent to the foreign server). If we * do, the expression will also appear in the fdw_exprs list of the plan * node, and setrefs.c will get confused and decide that the fdw_exprs * entry is actually a reference to the fdw_scan_tlist entry, resulting in * a broken plan. Somewhat oddly, it's OK if the expression contains such * a node, as long as it's not at top level; then no match is possible. */ i = 0; foreach(lc, grouping_target->exprs) { Expr *expr = (Expr *) lfirst(lc); Index sgref = get_pathtarget_sortgroupref(grouping_target, i); ListCell *l; /* Check whether this expression is part of GROUP BY clause */ if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause)) { TargetEntry *tle; /* * If any GROUP BY expression is not shippable, then we cannot * push down aggregation to the foreign server. */ if (!is_foreign_expr(root, grouped_rel, expr)) return false; /* * If it would be a foreign param, we can't put it into the tlist, * so we have to fail. */ if (is_foreign_param(root, grouped_rel, expr)) return false; /* * Pushable, so add to tlist. We need to create a TLE for this * expression and apply the sortgroupref to it. We cannot use * add_to_flat_tlist() here because that avoids making duplicate * entries in the tlist. If there are duplicate entries with * distinct sortgrouprefs, we have to duplicate that situation in * the output tlist. */ tle = makeTargetEntry(expr, list_length(tlist) + 1, NULL, false); tle->ressortgroupref = sgref; tlist = lappend(tlist, tle); } else { /* * Non-grouping expression we need to compute. Can we ship it * as-is to the foreign server? */ if (is_foreign_expr(root, grouped_rel, expr) && !is_foreign_param(root, grouped_rel, expr)) { /* Yes, so add to tlist as-is; OK to suppress duplicates */ tlist = add_to_flat_tlist(tlist, list_make1(expr)); } else { /* Not pushable as a whole; extract its Vars and aggregates */ List *aggvars; aggvars = pull_var_clause((Node *) expr, PVC_INCLUDE_AGGREGATES); /* * If any aggregate expression is not shippable, then we * cannot push down aggregation to the foreign server. (We * don't have to check is_foreign_param, since that certainly * won't return true for any such expression.) */ if (!is_foreign_expr(root, grouped_rel, (Expr *) aggvars)) return false; /* * Add aggregates, if any, into the targetlist. Plain Vars * outside an aggregate can be ignored, because they should be * either same as some GROUP BY column or part of some GROUP * BY expression. In either case, they are already part of * the targetlist and thus no need to add them again. In fact * including plain Vars in the tlist when they do not match a * GROUP BY column would cause the foreign server to complain * that the shipped query is invalid. */ foreach(l, aggvars) { Expr *expr = (Expr *) lfirst(l); if (IsA(expr, Aggref)) tlist = add_to_flat_tlist(tlist, list_make1(expr)); } } } i++; } /* * Classify the pushable and non-pushable HAVING clauses and save them in * remote_conds and local_conds of the grouped rel's fpinfo. */ if (havingQual) { ListCell *lc; foreach(lc, (List *) havingQual) { Expr *expr = (Expr *) lfirst(lc); RestrictInfo *rinfo; /* * Currently, the core code doesn't wrap havingQuals in * RestrictInfos, so we must make our own. */ Assert(!IsA(expr, RestrictInfo)); rinfo = make_restrictinfo(root, expr, true, false, false, root->qual_security_level, grouped_rel->relids, NULL, NULL); if (is_foreign_expr(root, grouped_rel, expr)) fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo); else fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo); } } /* * If there are any local conditions, pull Vars and aggregates from it and * check whether they are safe to pushdown or not. */ if (fpinfo->local_conds) { List *aggvars = NIL; ListCell *lc; foreach(lc, fpinfo->local_conds) { RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc); aggvars = list_concat(aggvars, pull_var_clause((Node *) rinfo->clause, PVC_INCLUDE_AGGREGATES)); } foreach(lc, aggvars) { Expr *expr = (Expr *) lfirst(lc); /* * If aggregates within local conditions are not safe to push * down, then we cannot push down the query. Vars are already * part of GROUP BY clause which are checked above, so no need to * access them again here. Again, we need not check * is_foreign_param for a foreign aggregate. */ if (IsA(expr, Aggref)) { if (!is_foreign_expr(root, grouped_rel, expr)) return false; tlist = add_to_flat_tlist(tlist, list_make1(expr)); } } } /* Store generated targetlist */ fpinfo->grouped_tlist = tlist; /* Safe to pushdown */ fpinfo->pushdown_safe = true; /* * Set # of retrieved rows and cached relation costs to some negative * value, so that we can detect when they are set to some sensible values, * during one (usually the first) of the calls to estimate_path_cost_size. */ fpinfo->retrieved_rows = -1; fpinfo->rel_startup_cost = -1; fpinfo->rel_total_cost = -1; /* * Set the string describing this grouped relation to be used in EXPLAIN * output of corresponding ForeignScan. Note that the decoration we add * to the base relation name mustn't include any digits, or it'll confuse * postgresExplainForeignScan. */ fpinfo->relation_name = psprintf("Aggregate on (%s)", ofpinfo->relation_name); return true; } /* * postgresGetForeignUpperPaths * Add paths for post-join operations like aggregation, grouping etc. if * corresponding operations are safe to push down. */ static void postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage, RelOptInfo *input_rel, RelOptInfo *output_rel, void *extra) { PgFdwRelationInfo *fpinfo; /* * If input rel is not safe to pushdown, then simply return as we cannot * perform any post-join operations on the foreign server. */ if (!input_rel->fdw_private || !((PgFdwRelationInfo *) input_rel->fdw_private)->pushdown_safe) return; /* Ignore stages we don't support; and skip any duplicate calls. */ if ((stage != UPPERREL_GROUP_AGG && stage != UPPERREL_ORDERED && stage != UPPERREL_FINAL) || output_rel->fdw_private) return; fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo)); fpinfo->pushdown_safe = false; fpinfo->stage = stage; output_rel->fdw_private = fpinfo; switch (stage) { case UPPERREL_GROUP_AGG: add_foreign_grouping_paths(root, input_rel, output_rel, (GroupPathExtraData *) extra); break; case UPPERREL_ORDERED: add_foreign_ordered_paths(root, input_rel, output_rel); break; case UPPERREL_FINAL: add_foreign_final_paths(root, input_rel, output_rel, (FinalPathExtraData *) extra); break; default: elog(ERROR, "unexpected upper relation: %d", (int) stage); break; } } /* * add_foreign_grouping_paths * Add foreign path for grouping and/or aggregation. * * Given input_rel represents the underlying scan. The paths are added to the * given grouped_rel. */ static void add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *grouped_rel, GroupPathExtraData *extra) { Query *parse = root->parse; PgFdwRelationInfo *ifpinfo = input_rel->fdw_private; PgFdwRelationInfo *fpinfo = grouped_rel->fdw_private; ForeignPath *grouppath; double rows; int width; Cost startup_cost; Cost total_cost; /* Nothing to be done, if there is no grouping or aggregation required. */ if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs && !root->hasHavingQual) return; Assert(extra->patype == PARTITIONWISE_AGGREGATE_NONE || extra->patype == PARTITIONWISE_AGGREGATE_FULL); /* save the input_rel as outerrel in fpinfo */ fpinfo->outerrel = input_rel; /* * Copy foreign table, foreign server, user mapping, FDW options etc. * details from the input relation's fpinfo. */ fpinfo->table = ifpinfo->table; fpinfo->server = ifpinfo->server; fpinfo->user = ifpinfo->user; merge_fdw_options(fpinfo, ifpinfo, NULL); /* * Assess if it is safe to push down aggregation and grouping. * * Use HAVING qual from extra. In case of child partition, it will have * translated Vars. */ if (!foreign_grouping_ok(root, grouped_rel, extra->havingQual)) return; /* * Compute the selectivity and cost of the local_conds, so we don't have * to do it over again for each path. (Currently we create just a single * path here, but in future it would be possible that we build more paths * such as pre-sorted paths as in postgresGetForeignPaths and * postgresGetForeignJoinPaths.) The best we can do for these conditions * is to estimate selectivity on the basis of local statistics. */ fpinfo->local_conds_sel = clauselist_selectivity(root, fpinfo->local_conds, 0, JOIN_INNER, NULL); cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root); /* Estimate the cost of push down */ estimate_path_cost_size(root, grouped_rel, NIL, NIL, NULL, &rows, &width, &startup_cost, &total_cost); /* Now update this information in the fpinfo */ fpinfo->rows = rows; fpinfo->width = width; fpinfo->startup_cost = startup_cost; fpinfo->total_cost = total_cost; /* Create and add foreign path to the grouping relation. */ grouppath = create_foreign_upper_path(root, grouped_rel, grouped_rel->reltarget, rows, startup_cost, total_cost, NIL, /* no pathkeys */ NULL, NIL); /* no fdw_private */ /* Add generated path into grouped_rel by add_path(). */ add_path(grouped_rel, (Path *) grouppath); } /* * add_foreign_ordered_paths * Add foreign paths for performing the final sort remotely. * * Given input_rel contains the source-data Paths. The paths are added to the * given ordered_rel. */ static void add_foreign_ordered_paths(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *ordered_rel) { Query *parse = root->parse; PgFdwRelationInfo *ifpinfo = input_rel->fdw_private; PgFdwRelationInfo *fpinfo = ordered_rel->fdw_private; PgFdwPathExtraData *fpextra; double rows; int width; Cost startup_cost; Cost total_cost; List *fdw_private; ForeignPath *ordered_path; ListCell *lc; /* Shouldn't get here unless the query has ORDER BY */ Assert(parse->sortClause); /* We don't support cases where there are any SRFs in the targetlist */ if (parse->hasTargetSRFs) return; /* Save the input_rel as outerrel in fpinfo */ fpinfo->outerrel = input_rel; /* * Copy foreign table, foreign server, user mapping, FDW options etc. * details from the input relation's fpinfo. */ fpinfo->table = ifpinfo->table; fpinfo->server = ifpinfo->server; fpinfo->user = ifpinfo->user; merge_fdw_options(fpinfo, ifpinfo, NULL); /* * If the input_rel is a base or join relation, we would already have * considered pushing down the final sort to the remote server when * creating pre-sorted foreign paths for that relation, because the * query_pathkeys is set to the root->sort_pathkeys in that case (see * standard_qp_callback()). */ if (input_rel->reloptkind == RELOPT_BASEREL || input_rel->reloptkind == RELOPT_JOINREL) { Assert(root->query_pathkeys == root->sort_pathkeys); /* Safe to push down if the query_pathkeys is safe to push down */ fpinfo->pushdown_safe = ifpinfo->qp_is_pushdown_safe; return; } /* The input_rel should be a grouping relation */ Assert(input_rel->reloptkind == RELOPT_UPPER_REL && ifpinfo->stage == UPPERREL_GROUP_AGG); /* * We try to create a path below by extending a simple foreign path for * the underlying grouping relation to perform the final sort remotely, * which is stored into the fdw_private list of the resulting path. */ /* Assess if it is safe to push down the final sort */ foreach(lc, root->sort_pathkeys) { PathKey *pathkey = (PathKey *) lfirst(lc); EquivalenceClass *pathkey_ec = pathkey->pk_eclass; /* * is_foreign_expr would detect volatile expressions as well, but * checking ec_has_volatile here saves some cycles. */ if (pathkey_ec->ec_has_volatile) return; /* * Can't push down the sort if pathkey's opfamily is not shippable. */ if (!is_shippable(pathkey->pk_opfamily, OperatorFamilyRelationId, fpinfo)) return; /* * The EC must contain a shippable EM that is computed in input_rel's * reltarget, else we can't push down the sort. */ if (find_em_for_rel_target(root, pathkey_ec, input_rel) == NULL) return; } /* Safe to push down */ fpinfo->pushdown_safe = true; /* Construct PgFdwPathExtraData */ fpextra = (PgFdwPathExtraData *) palloc0(sizeof(PgFdwPathExtraData)); fpextra->target = root->upper_targets[UPPERREL_ORDERED]; fpextra->has_final_sort = true; /* Estimate the costs of performing the final sort remotely */ estimate_path_cost_size(root, input_rel, NIL, root->sort_pathkeys, fpextra, &rows, &width, &startup_cost, &total_cost); /* * Build the fdw_private list that will be used by postgresGetForeignPlan. * Items in the list must match order in enum FdwPathPrivateIndex. */ fdw_private = list_make2(makeBoolean(true), makeBoolean(false)); /* Create foreign ordering path */ ordered_path = create_foreign_upper_path(root, input_rel, root->upper_targets[UPPERREL_ORDERED], rows, startup_cost, total_cost, root->sort_pathkeys, NULL, /* no extra plan */ fdw_private); /* and add it to the ordered_rel */ add_path(ordered_rel, (Path *) ordered_path); } /* * add_foreign_final_paths * Add foreign paths for performing the final processing remotely. * * Given input_rel contains the source-data Paths. The paths are added to the * given final_rel. */ static void add_foreign_final_paths(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *final_rel, FinalPathExtraData *extra) { Query *parse = root->parse; PgFdwRelationInfo *ifpinfo = (PgFdwRelationInfo *) input_rel->fdw_private; PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) final_rel->fdw_private; bool has_final_sort = false; List *pathkeys = NIL; PgFdwPathExtraData *fpextra; bool save_use_remote_estimate = false; double rows; int width; Cost startup_cost; Cost total_cost; List *fdw_private; ForeignPath *final_path; /* * Currently, we only support this for SELECT commands */ if (parse->commandType != CMD_SELECT) return; /* * No work if there is no FOR UPDATE/SHARE clause and if there is no need * to add a LIMIT node */ if (!parse->rowMarks && !extra->limit_needed) return; /* We don't support cases where there are any SRFs in the targetlist */ if (parse->hasTargetSRFs) return; /* Save the input_rel as outerrel in fpinfo */ fpinfo->outerrel = input_rel; /* * Copy foreign table, foreign server, user mapping, FDW options etc. * details from the input relation's fpinfo. */ fpinfo->table = ifpinfo->table; fpinfo->server = ifpinfo->server; fpinfo->user = ifpinfo->user; merge_fdw_options(fpinfo, ifpinfo, NULL); /* * If there is no need to add a LIMIT node, there might be a ForeignPath * in the input_rel's pathlist that implements all behavior of the query. * Note: we would already have accounted for the query's FOR UPDATE/SHARE * (if any) before we get here. */ if (!extra->limit_needed) { ListCell *lc; Assert(parse->rowMarks); /* * Grouping and aggregation are not supported with FOR UPDATE/SHARE, * so the input_rel should be a base, join, or ordered relation; and * if it's an ordered relation, its input relation should be a base or * join relation. */ Assert(input_rel->reloptkind == RELOPT_BASEREL || input_rel->reloptkind == RELOPT_JOINREL || (input_rel->reloptkind == RELOPT_UPPER_REL && ifpinfo->stage == UPPERREL_ORDERED && (ifpinfo->outerrel->reloptkind == RELOPT_BASEREL || ifpinfo->outerrel->reloptkind == RELOPT_JOINREL))); foreach(lc, input_rel->pathlist) { Path *path = (Path *) lfirst(lc); /* * apply_scanjoin_target_to_paths() uses create_projection_path() * to adjust each of its input paths if needed, whereas * create_ordered_paths() uses apply_projection_to_path() to do * that. So the former might have put a ProjectionPath on top of * the ForeignPath; look through ProjectionPath and see if the * path underneath it is ForeignPath. */ if (IsA(path, ForeignPath) || (IsA(path, ProjectionPath) && IsA(((ProjectionPath *) path)->subpath, ForeignPath))) { /* * Create foreign final path; this gets rid of a * no-longer-needed outer plan (if any), which makes the * EXPLAIN output look cleaner */ final_path = create_foreign_upper_path(root, path->parent, path->pathtarget, path->rows, path->startup_cost, path->total_cost, path->pathkeys, NULL, /* no extra plan */ NULL); /* no fdw_private */ /* and add it to the final_rel */ add_path(final_rel, (Path *) final_path); /* Safe to push down */ fpinfo->pushdown_safe = true; return; } } /* * If we get here it means no ForeignPaths; since we would already * have considered pushing down all operations for the query to the * remote server, give up on it. */ return; } Assert(extra->limit_needed); /* * If the input_rel is an ordered relation, replace the input_rel with its * input relation */ if (input_rel->reloptkind == RELOPT_UPPER_REL && ifpinfo->stage == UPPERREL_ORDERED) { input_rel = ifpinfo->outerrel; ifpinfo = (PgFdwRelationInfo *) input_rel->fdw_private; has_final_sort = true; pathkeys = root->sort_pathkeys; } /* The input_rel should be a base, join, or grouping relation */ Assert(input_rel->reloptkind == RELOPT_BASEREL || input_rel->reloptkind == RELOPT_JOINREL || (input_rel->reloptkind == RELOPT_UPPER_REL && ifpinfo->stage == UPPERREL_GROUP_AGG)); /* * We try to create a path below by extending a simple foreign path for * the underlying base, join, or grouping relation to perform the final * sort (if has_final_sort) and the LIMIT restriction remotely, which is * stored into the fdw_private list of the resulting path. (We * re-estimate the costs of sorting the underlying relation, if * has_final_sort.) */ /* * Assess if it is safe to push down the LIMIT and OFFSET to the remote * server */ /* * If the underlying relation has any local conditions, the LIMIT/OFFSET * cannot be pushed down. */ if (ifpinfo->local_conds) return; /* * Also, the LIMIT/OFFSET cannot be pushed down, if their expressions are * not safe to remote. */ if (!is_foreign_expr(root, input_rel, (Expr *) parse->limitOffset) || !is_foreign_expr(root, input_rel, (Expr *) parse->limitCount)) return; /* Safe to push down */ fpinfo->pushdown_safe = true; /* Construct PgFdwPathExtraData */ fpextra = (PgFdwPathExtraData *) palloc0(sizeof(PgFdwPathExtraData)); fpextra->target = root->upper_targets[UPPERREL_FINAL]; fpextra->has_final_sort = has_final_sort; fpextra->has_limit = extra->limit_needed; fpextra->limit_tuples = extra->limit_tuples; fpextra->count_est = extra->count_est; fpextra->offset_est = extra->offset_est; /* * Estimate the costs of performing the final sort and the LIMIT * restriction remotely. If has_final_sort is false, we wouldn't need to * execute EXPLAIN anymore if use_remote_estimate, since the costs can be * roughly estimated using the costs we already have for the underlying * relation, in the same way as when use_remote_estimate is false. Since * it's pretty expensive to execute EXPLAIN, force use_remote_estimate to * false in that case. */ if (!fpextra->has_final_sort) { save_use_remote_estimate = ifpinfo->use_remote_estimate; ifpinfo->use_remote_estimate = false; } estimate_path_cost_size(root, input_rel, NIL, pathkeys, fpextra, &rows, &width, &startup_cost, &total_cost); if (!fpextra->has_final_sort) ifpinfo->use_remote_estimate = save_use_remote_estimate; /* * Build the fdw_private list that will be used by postgresGetForeignPlan. * Items in the list must match order in enum FdwPathPrivateIndex. */ fdw_private = list_make2(makeBoolean(has_final_sort), makeBoolean(extra->limit_needed)); /* * Create foreign final path; this gets rid of a no-longer-needed outer * plan (if any), which makes the EXPLAIN output look cleaner */ final_path = create_foreign_upper_path(root, input_rel, root->upper_targets[UPPERREL_FINAL], rows, startup_cost, total_cost, pathkeys, NULL, /* no extra plan */ fdw_private); /* and add it to the final_rel */ add_path(final_rel, (Path *) final_path); } /* * postgresIsForeignPathAsyncCapable * Check whether a given ForeignPath node is async-capable. */ static bool postgresIsForeignPathAsyncCapable(ForeignPath *path) { RelOptInfo *rel = ((Path *) path)->parent; PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private; return fpinfo->async_capable; } /* * postgresForeignAsyncRequest * Asynchronously request next tuple from a foreign PostgreSQL table. */ static void postgresForeignAsyncRequest(AsyncRequest *areq) { produce_tuple_asynchronously(areq, true); } /* * postgresForeignAsyncConfigureWait * Configure a file descriptor event for which we wish to wait. */ static void postgresForeignAsyncConfigureWait(AsyncRequest *areq) { ForeignScanState *node = (ForeignScanState *) areq->requestee; PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq; AppendState *requestor = (AppendState *) areq->requestor; WaitEventSet *set = requestor->as_eventset; /* This should not be called unless callback_pending */ Assert(areq->callback_pending); /* * If process_pending_request() has been invoked on the given request * before we get here, we might have some tuples already; in which case * complete the request */ if (fsstate->next_tuple < fsstate->num_tuples) { complete_pending_request(areq); if (areq->request_complete) return; Assert(areq->callback_pending); } /* We must have run out of tuples */ Assert(fsstate->next_tuple >= fsstate->num_tuples); /* The core code would have registered postmaster death event */ Assert(GetNumRegisteredWaitEvents(set) >= 1); /* Begin an asynchronous data fetch if not already done */ if (!pendingAreq) fetch_more_data_begin(areq); else if (pendingAreq->requestor != areq->requestor) { /* * This is the case when the in-process request was made by another * Append. Note that it might be useless to process the request, * because the query might not need tuples from that Append anymore. * If there are any child subplans of the same parent that are ready * for new requests, skip the given request. Likewise, if there are * any configured events other than the postmaster death event, skip * it. Otherwise, process the in-process request, then begin a fetch * to configure the event below, because we might otherwise end up * with no configured events other than the postmaster death event. */ if (!bms_is_empty(requestor->as_needrequest)) return; if (GetNumRegisteredWaitEvents(set) > 1) return; process_pending_request(pendingAreq); fetch_more_data_begin(areq); } else if (pendingAreq->requestee != areq->requestee) { /* * This is the case when the in-process request was made by the same * parent but for a different child. Since we configure only the * event for the request made for that child, skip the given request. */ return; } else Assert(pendingAreq == areq); AddWaitEventToSet(set, WL_SOCKET_READABLE, PQsocket(fsstate->conn), NULL, areq); } /* * postgresForeignAsyncNotify * Fetch some more tuples from a file descriptor that becomes ready, * requesting next tuple. */ static void postgresForeignAsyncNotify(AsyncRequest *areq) { ForeignScanState *node = (ForeignScanState *) areq->requestee; PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; /* The core code would have initialized the callback_pending flag */ Assert(!areq->callback_pending); /* * If process_pending_request() has been invoked on the given request * before we get here, we might have some tuples already; in which case * produce the next tuple */ if (fsstate->next_tuple < fsstate->num_tuples) { produce_tuple_asynchronously(areq, true); return; } /* We must have run out of tuples */ Assert(fsstate->next_tuple >= fsstate->num_tuples); /* The request should be currently in-process */ Assert(fsstate->conn_state->pendingAreq == areq); /* On error, report the original query, not the FETCH. */ if (!PQconsumeInput(fsstate->conn)) pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query); fetch_more_data(node); produce_tuple_asynchronously(areq, true); } /* * Asynchronously produce next tuple from a foreign PostgreSQL table. */ static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch) { ForeignScanState *node = (ForeignScanState *) areq->requestee; PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq; TupleTableSlot *result; /* This should not be called if the request is currently in-process */ Assert(areq != pendingAreq); /* Fetch some more tuples, if we've run out */ if (fsstate->next_tuple >= fsstate->num_tuples) { /* No point in another fetch if we already detected EOF, though */ if (!fsstate->eof_reached) { /* Mark the request as pending for a callback */ ExecAsyncRequestPending(areq); /* Begin another fetch if requested and if no pending request */ if (fetch && !pendingAreq) fetch_more_data_begin(areq); } else { /* There's nothing more to do; just return a NULL pointer */ result = NULL; /* Mark the request as complete */ ExecAsyncRequestDone(areq, result); } return; } /* Get a tuple from the ForeignScan node */ result = areq->requestee->ExecProcNodeReal(areq->requestee); if (!TupIsNull(result)) { /* Mark the request as complete */ ExecAsyncRequestDone(areq, result); return; } /* We must have run out of tuples */ Assert(fsstate->next_tuple >= fsstate->num_tuples); /* Fetch some more tuples, if we've not detected EOF yet */ if (!fsstate->eof_reached) { /* Mark the request as pending for a callback */ ExecAsyncRequestPending(areq); /* Begin another fetch if requested and if no pending request */ if (fetch && !pendingAreq) fetch_more_data_begin(areq); } else { /* There's nothing more to do; just return a NULL pointer */ result = NULL; /* Mark the request as complete */ ExecAsyncRequestDone(areq, result); } } /* * Begin an asynchronous data fetch. * * Note: this function assumes there is no currently-in-progress asynchronous * data fetch. * * Note: fetch_more_data must be called to fetch the result. */ static void fetch_more_data_begin(AsyncRequest *areq) { ForeignScanState *node = (ForeignScanState *) areq->requestee; PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; char sql[64]; Assert(!fsstate->conn_state->pendingAreq); /* Create the cursor synchronously. */ if (!fsstate->cursor_exists) create_cursor(node); /* We will send this query, but not wait for the response. */ snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", fsstate->fetch_size, fsstate->cursor_number); if (!PQsendQuery(fsstate->conn, sql)) pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query); /* Remember that the request is in process */ fsstate->conn_state->pendingAreq = areq; } /* * Process a pending asynchronous request. */ void process_pending_request(AsyncRequest *areq) { ForeignScanState *node = (ForeignScanState *) areq->requestee; PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; /* The request would have been pending for a callback */ Assert(areq->callback_pending); /* The request should be currently in-process */ Assert(fsstate->conn_state->pendingAreq == areq); fetch_more_data(node); /* * If we didn't get any tuples, must be end of data; complete the request * now. Otherwise, we postpone completing the request until we are called * from postgresForeignAsyncConfigureWait()/postgresForeignAsyncNotify(). */ if (fsstate->next_tuple >= fsstate->num_tuples) { /* Unlike AsyncNotify, we unset callback_pending ourselves */ areq->callback_pending = false; /* Mark the request as complete */ ExecAsyncRequestDone(areq, NULL); /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */ ExecAsyncResponse(areq); } } /* * Complete a pending asynchronous request. */ static void complete_pending_request(AsyncRequest *areq) { /* The request would have been pending for a callback */ Assert(areq->callback_pending); /* Unlike AsyncNotify, we unset callback_pending ourselves */ areq->callback_pending = false; /* We begin a fetch afterwards if necessary; don't fetch */ produce_tuple_asynchronously(areq, false); /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */ ExecAsyncResponse(areq); /* Also, we do instrumentation ourselves, if required */ if (areq->requestee->instrument) InstrUpdateTupleCount(areq->requestee->instrument, TupIsNull(areq->result) ? 0.0 : 1.0); } /* * Create a tuple from the specified row of the PGresult. * * rel is the local representation of the foreign table, attinmeta is * conversion data for the rel's tupdesc, and retrieved_attrs is an * integer list of the table column numbers present in the PGresult. * fsstate is the ForeignScan plan node's execution state. * temp_context is a working context that can be reset after each tuple. * * Note: either rel or fsstate, but not both, can be NULL. rel is NULL * if we're processing a remote join, while fsstate is NULL in a non-query * context such as ANALYZE, or if we're processing a non-scan query node. */ static HeapTuple make_tuple_from_result_row(PGresult *res, int row, Relation rel, AttInMetadata *attinmeta, List *retrieved_attrs, ForeignScanState *fsstate, MemoryContext temp_context) { HeapTuple tuple; TupleDesc tupdesc; Datum *values; bool *nulls; ItemPointer ctid = NULL; ConversionLocation errpos; ErrorContextCallback errcallback; MemoryContext oldcontext; ListCell *lc; int j; Assert(row < PQntuples(res)); /* * Do the following work in a temp context that we reset after each tuple. * This cleans up not only the data we have direct access to, but any * cruft the I/O functions might leak. */ oldcontext = MemoryContextSwitchTo(temp_context); /* * Get the tuple descriptor for the row. Use the rel's tupdesc if rel is * provided, otherwise look to the scan node's ScanTupleSlot. */ if (rel) tupdesc = RelationGetDescr(rel); else { Assert(fsstate); tupdesc = fsstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor; } values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum)); nulls = (bool *) palloc(tupdesc->natts * sizeof(bool)); /* Initialize to nulls for any columns not present in result */ memset(nulls, true, tupdesc->natts * sizeof(bool)); /* * Set up and install callback to report where conversion error occurs. */ errpos.cur_attno = 0; errpos.rel = rel; errpos.fsstate = fsstate; errcallback.callback = conversion_error_callback; errcallback.arg = (void *) &errpos; errcallback.previous = error_context_stack; error_context_stack = &errcallback; /* * i indexes columns in the relation, j indexes columns in the PGresult. */ j = 0; foreach(lc, retrieved_attrs) { int i = lfirst_int(lc); char *valstr; /* fetch next column's textual value */ if (PQgetisnull(res, row, j)) valstr = NULL; else valstr = PQgetvalue(res, row, j); /* * convert value to internal representation * * Note: we ignore system columns other than ctid and oid in result */ errpos.cur_attno = i; if (i > 0) { /* ordinary column */ Assert(i <= tupdesc->natts); nulls[i - 1] = (valstr == NULL); /* Apply the input function even to nulls, to support domains */ values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1], valstr, attinmeta->attioparams[i - 1], attinmeta->atttypmods[i - 1]); } else if (i == SelfItemPointerAttributeNumber) { /* ctid */ if (valstr != NULL) { Datum datum; datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr)); ctid = (ItemPointer) DatumGetPointer(datum); } } errpos.cur_attno = 0; j++; } /* Uninstall error context callback. */ error_context_stack = errcallback.previous; /* * Check we got the expected number of columns. Note: j == 0 and * PQnfields == 1 is expected, since deparse emits a NULL if no columns. */ if (j > 0 && j != PQnfields(res)) elog(ERROR, "remote query result does not match the foreign table"); /* * Build the result tuple in caller's memory context. */ MemoryContextSwitchTo(oldcontext); tuple = heap_form_tuple(tupdesc, values, nulls); /* * If we have a CTID to return, install it in both t_self and t_ctid. * t_self is the normal place, but if the tuple is converted to a * composite Datum, t_self will be lost; setting t_ctid allows CTID to be * preserved during EvalPlanQual re-evaluations (see ROW_MARK_COPY code). */ if (ctid) tuple->t_self = tuple->t_data->t_ctid = *ctid; /* * Stomp on the xmin, xmax, and cmin fields from the tuple created by * heap_form_tuple. heap_form_tuple actually creates the tuple with * DatumTupleFields, not HeapTupleFields, but the executor expects * HeapTupleFields and will happily extract system columns on that * assumption. If we don't do this then, for example, the tuple length * ends up in the xmin field, which isn't what we want. */ HeapTupleHeaderSetXmax(tuple->t_data, InvalidTransactionId); HeapTupleHeaderSetXmin(tuple->t_data, InvalidTransactionId); HeapTupleHeaderSetCmin(tuple->t_data, InvalidTransactionId); /* Clean up */ MemoryContextReset(temp_context); return tuple; } /* * Callback function which is called when error occurs during column value * conversion. Print names of column and relation. * * Note that this function mustn't do any catalog lookups, since we are in * an already-failed transaction. Fortunately, we can get the needed info * from the relation or the query's rangetable instead. */ static void conversion_error_callback(void *arg) { ConversionLocation *errpos = (ConversionLocation *) arg; Relation rel = errpos->rel; ForeignScanState *fsstate = errpos->fsstate; const char *attname = NULL; const char *relname = NULL; bool is_wholerow = false; /* * If we're in a scan node, always use aliases from the rangetable, for * consistency between the simple-relation and remote-join cases. Look at * the relation's tupdesc only if we're not in a scan node. */ if (fsstate) { /* ForeignScan case */ ForeignScan *fsplan = castNode(ForeignScan, fsstate->ss.ps.plan); int varno = 0; AttrNumber colno = 0; if (fsplan->scan.scanrelid > 0) { /* error occurred in a scan against a foreign table */ varno = fsplan->scan.scanrelid; colno = errpos->cur_attno; } else { /* error occurred in a scan against a foreign join */ TargetEntry *tle; tle = list_nth_node(TargetEntry, fsplan->fdw_scan_tlist, errpos->cur_attno - 1); /* * Target list can have Vars and expressions. For Vars, we can * get some information, however for expressions we can't. Thus * for expressions, just show generic context message. */ if (IsA(tle->expr, Var)) { Var *var = (Var *) tle->expr; varno = var->varno; colno = var->varattno; } } if (varno > 0) { EState *estate = fsstate->ss.ps.state; RangeTblEntry *rte = exec_rt_fetch(varno, estate); relname = rte->eref->aliasname; if (colno == 0) is_wholerow = true; else if (colno > 0 && colno <= list_length(rte->eref->colnames)) attname = strVal(list_nth(rte->eref->colnames, colno - 1)); else if (colno == SelfItemPointerAttributeNumber) attname = "ctid"; } } else if (rel) { /* Non-ForeignScan case (we should always have a rel here) */ TupleDesc tupdesc = RelationGetDescr(rel); relname = RelationGetRelationName(rel); if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts) { Form_pg_attribute attr = TupleDescAttr(tupdesc, errpos->cur_attno - 1); attname = NameStr(attr->attname); } else if (errpos->cur_attno == SelfItemPointerAttributeNumber) attname = "ctid"; } if (relname && is_wholerow) errcontext("whole-row reference to foreign table \"%s\"", relname); else if (relname && attname) errcontext("column \"%s\" of foreign table \"%s\"", attname, relname); else errcontext("processing expression at position %d in select list", errpos->cur_attno); } /* * Given an EquivalenceClass and a foreign relation, find an EC member * that can be used to sort the relation remotely according to a pathkey * using this EC. * * If there is more than one suitable candidate, return an arbitrary * one of them. If there is none, return NULL. * * This checks that the EC member expression uses only Vars from the given * rel and is shippable. Caller must separately verify that the pathkey's * ordering operator is shippable. */ EquivalenceMember * find_em_for_rel(PlannerInfo *root, EquivalenceClass *ec, RelOptInfo *rel) { ListCell *lc; foreach(lc, ec->ec_members) { EquivalenceMember *em = (EquivalenceMember *) lfirst(lc); /* * Note we require !bms_is_empty, else we'd accept constant * expressions which are not suitable for the purpose. */ if (bms_is_subset(em->em_relids, rel->relids) && !bms_is_empty(em->em_relids) && is_foreign_expr(root, rel, em->em_expr)) return em; } return NULL; } /* * Find an EquivalenceClass member that is to be computed as a sort column * in the given rel's reltarget, and is shippable. * * If there is more than one suitable candidate, return an arbitrary * one of them. If there is none, return NULL. * * This checks that the EC member expression uses only Vars from the given * rel and is shippable. Caller must separately verify that the pathkey's * ordering operator is shippable. */ EquivalenceMember * find_em_for_rel_target(PlannerInfo *root, EquivalenceClass *ec, RelOptInfo *rel) { PathTarget *target = rel->reltarget; ListCell *lc1; int i; i = 0; foreach(lc1, target->exprs) { Expr *expr = (Expr *) lfirst(lc1); Index sgref = get_pathtarget_sortgroupref(target, i); ListCell *lc2; /* Ignore non-sort expressions */ if (sgref == 0 || get_sortgroupref_clause_noerr(sgref, root->parse->sortClause) == NULL) { i++; continue; } /* We ignore binary-compatible relabeling on both ends */ while (expr && IsA(expr, RelabelType)) expr = ((RelabelType *) expr)->arg; /* Locate an EquivalenceClass member matching this expr, if any */ foreach(lc2, ec->ec_members) { EquivalenceMember *em = (EquivalenceMember *) lfirst(lc2); Expr *em_expr; /* Don't match constants */ if (em->em_is_const) continue; /* Ignore child members */ if (em->em_is_child) continue; /* Match if same expression (after stripping relabel) */ em_expr = em->em_expr; while (em_expr && IsA(em_expr, RelabelType)) em_expr = ((RelabelType *) em_expr)->arg; if (!equal(em_expr, expr)) continue; /* Check that expression (including relabels!) is shippable */ if (is_foreign_expr(root, rel, em->em_expr)) return em; } i++; } return NULL; } /* * Determine batch size for a given foreign table. The option specified for * a table has precedence. */ static int get_batch_size_option(Relation rel) { Oid foreigntableid = RelationGetRelid(rel); ForeignTable *table; ForeignServer *server; List *options; ListCell *lc; /* we use 1 by default, which means "no batching" */ int batch_size = 1; /* * Load options for table and server. We append server options after table * options, because table options take precedence. */ table = GetForeignTable(foreigntableid); server = GetForeignServer(table->serverid); options = NIL; options = list_concat(options, table->options); options = list_concat(options, server->options); /* See if either table or server specifies batch_size. */ foreach(lc, options) { DefElem *def = (DefElem *) lfirst(lc); if (strcmp(def->defname, "batch_size") == 0) { (void) parse_int(defGetString(def), &batch_size, 0, NULL); break; } } return batch_size; }