社区首页 >专栏 >聊聊flink的FileSystem


发布2019-03-02 10:49:51
发布2019-03-02 10:49:51




public abstract class FileSystem {
     * The possible write modes. The write mode decides what happens if a file should be created,
     * but already exists.
    public enum WriteMode {
        /** Creates the target file only if no file exists at that path already.
         * Does not overwrite existing files and directories. */
        /** Creates a new target file regardless of any existing files or directories.
         * Existing files and directories will be deleted (recursively) automatically before
         * creating the new file. */
    // ------------------------------------------------------------------------
    /** Logger for all FileSystem work. */
    private static final Logger LOG = LoggerFactory.getLogger(FileSystem.class);
    /** This lock guards the methods {@link #initOutPathLocalFS(Path, WriteMode, boolean)} and
     * {@link #initOutPathDistFS(Path, WriteMode, boolean)} which are otherwise susceptible to races. */
    private static final ReentrantLock OUTPUT_DIRECTORY_INIT_LOCK = new ReentrantLock(true);
    /** Object used to protect calls to specific methods.*/
    private static final ReentrantLock LOCK = new ReentrantLock(true);
    /** Cache for file systems, by scheme + authority. */
    private static final HashMap<FSKey, FileSystem> CACHE = new HashMap<>();
    /** All available file system factories. */
    private static final List<FileSystemFactory> RAW_FACTORIES = loadFileSystems();
    /** Mapping of file system schemes to the corresponding factories,
     * populated in {@link FileSystem#initialize(Configuration)}. */
    private static final HashMap<String, FileSystemFactory> FS_FACTORIES = new HashMap<>();
    /** The default factory that is used when no scheme matches. */
    private static final FileSystemFactory FALLBACK_FACTORY = loadHadoopFsFactory();
    /** The default filesystem scheme to be used, configured during process-wide initialization.
     * This value defaults to the local file systems scheme {@code 'file:///'} or {@code 'file:/'}. */
    private static URI defaultScheme;
    // ------------------------------------------------------------------------
    //  Initialization
    // ------------------------------------------------------------------------
     * Initializes the shared file system settings.
     * <p>The given configuration is passed to each file system factory to initialize the respective
     * file systems. Because the configuration of file systems may be different subsequent to the call
     * of this method, this method clears the file system instance cache.
     * <p>This method also reads the default file system URI from the configuration key
     * {@link CoreOptions#DEFAULT_FILESYSTEM_SCHEME}. All calls to {@link FileSystem#get(URI)} where
     * the URI has no scheme will be interpreted as relative to that URI.
     * As an example, assume the default file system URI is set to {@code 'hdfs://localhost:9000/'}.
     * A file path of {@code '/user/USERNAME/in.txt'} is interpreted as
     * {@code 'hdfs://localhost:9000/user/USERNAME/in.txt'}.
     * @param config the configuration from where to fetch the parameter.
    public static void initialize(Configuration config) throws IOException, IllegalConfigurationException {
        try {
            // make sure file systems are re-instantiated after re-configuration
            // configure all file system factories
            for (FileSystemFactory factory : RAW_FACTORIES) {
                String scheme = factory.getScheme();
                FileSystemFactory fsf = ConnectionLimitingFactory.decorateIfLimited(factory, scheme, config);
                FS_FACTORIES.put(scheme, fsf);
            // configure the default (fallback) factory
            // also read the default file system scheme
            final String stringifiedUri = config.getString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, null);
            if (stringifiedUri == null) {
                defaultScheme = null;
            else {
                try {
                    defaultScheme = new URI(stringifiedUri);
                catch (URISyntaxException e) {
                    throw new IllegalConfigurationException("The default file system scheme ('" +
                            CoreOptions.DEFAULT_FILESYSTEM_SCHEME + "') is invalid: " + stringifiedUri, e);
        finally {
    // ------------------------------------------------------------------------
    //  Obtaining File System Instances
    // ------------------------------------------------------------------------
    public static FileSystem getLocalFileSystem() {
        return FileSystemSafetyNet.wrapWithSafetyNetWhenActivated(LocalFileSystem.getSharedInstance());
    public static FileSystem get(URI uri) throws IOException {
        return FileSystemSafetyNet.wrapWithSafetyNetWhenActivated(getUnguardedFileSystem(uri));
    public static FileSystem getUnguardedFileSystem(final URI fsUri) throws IOException {
        checkNotNull(fsUri, "file system URI");
        try {
            final URI uri;
            if (fsUri.getScheme() != null) {
                uri = fsUri;
            else {
                // Apply the default fs scheme
                final URI defaultUri = getDefaultFsUri();
                URI rewrittenUri = null;
                try {
                    rewrittenUri = new URI(defaultUri.getScheme(), null, defaultUri.getHost(),
                            defaultUri.getPort(), fsUri.getPath(), null, null);
                catch (URISyntaxException e) {
                    // for local URIs, we make one more try to repair the path by making it absolute
                    if (defaultUri.getScheme().equals("file")) {
                        try {
                            rewrittenUri = new URI(
                                    "file", null,
                                    new Path(new File(fsUri.getPath()).getAbsolutePath()).toUri().getPath(),
                        } catch (URISyntaxException ignored) {
                            // could not help it...
                if (rewrittenUri != null) {
                    uri = rewrittenUri;
                else {
                    throw new IOException("The file system URI '" + fsUri +
                            "' declares no scheme and cannot be interpreted relative to the default file system URI ("
                            + defaultUri + ").");
            // print a helpful pointer for malformed local URIs (happens a lot to new users)
            if (uri.getScheme().equals("file") && uri.getAuthority() != null && !uri.getAuthority().isEmpty()) {
                String supposedUri = "file:///" + uri.getAuthority() + uri.getPath();
                throw new IOException("Found local file path with authority '" + uri.getAuthority() + "' in path '"
                        + uri.toString() + "'. Hint: Did you forget a slash? (correct path would be '" + supposedUri + "')");
            final FSKey key = new FSKey(uri.getScheme(), uri.getAuthority());
            // See if there is a file system object in the cache
                FileSystem cached = CACHE.get(key);
                if (cached != null) {
                    return cached;
            // this "default" initialization makes sure that the FileSystem class works
            // even when not configured with an explicit Flink configuration, like on
            // JobManager or TaskManager setup
            if (FS_FACTORIES.isEmpty()) {
                initialize(new Configuration());
            // Try to create a new file system
            final FileSystem fs;
            final FileSystemFactory factory = FS_FACTORIES.get(uri.getScheme());
            if (factory != null) {
                fs = factory.create(uri);
            else {
                try {
                    fs = FALLBACK_FACTORY.create(uri);
                catch (UnsupportedFileSystemSchemeException e) {
                    throw new UnsupportedFileSystemSchemeException(
                            "Could not find a file system implementation for scheme '" + uri.getScheme() +
                                    "'. The scheme is not directly supported by Flink and no Hadoop file " +
                                    "system to support this scheme could be loaded.", e);
            CACHE.put(key, fs);
            return fs;
        finally {
    public static URI getDefaultFsUri() {
        return defaultScheme != null ? defaultScheme : LocalFileSystem.getLocalFsURI();
    // ------------------------------------------------------------------------
    //  File System Methods
    // ------------------------------------------------------------------------
    public abstract Path getWorkingDirectory();
    public abstract Path getHomeDirectory();
    public abstract URI getUri();
    public abstract FileStatus getFileStatus(Path f) throws IOException;
    public abstract BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException;
    public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException;
    public abstract FSDataInputStream open(Path f) throws IOException;
    public RecoverableWriter createRecoverableWriter() throws IOException {
        throw new UnsupportedOperationException("This file system does not support recoverable writers.");
    public abstract FileStatus[] listStatus(Path f) throws IOException;
    public boolean exists(final Path f) throws IOException {
        try {
            return (getFileStatus(f) != null);
        } catch (FileNotFoundException e) {
            return false;
    public abstract boolean delete(Path f, boolean recursive) throws IOException;
    public abstract boolean mkdirs(Path f) throws IOException;
    public abstract FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException;
    public abstract boolean rename(Path src, Path dst) throws IOException;
    public abstract boolean isDistributedFS();
    public abstract FileSystemKind getKind();
    // ------------------------------------------------------------------------
    //  output directory initialization
    // ------------------------------------------------------------------------
    public boolean initOutPathLocalFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException {
        if (isDistributedFS()) {
            return false;
        // NOTE: We actually need to lock here (process wide). Otherwise, multiple threads that
        // concurrently work in this method (multiple output formats writing locally) might end
        // up deleting each other's directories and leave non-retrievable files, without necessarily
        // causing an exception. That results in very subtle issues, like output files looking as if
        // they are not getting created.
        // we acquire the lock interruptibly here, to make sure that concurrent threads waiting
        // here can cancel faster
        try {
        catch (InterruptedException e) {
            // restore the interruption state
            // leave the method - we don't have the lock anyways
            throw new IOException("The thread was interrupted while trying to initialize the output directory");
        try {
            FileStatus status;
            try {
                status = getFileStatus(outPath);
            catch (FileNotFoundException e) {
                // okay, the file is not there
                status = null;
            // check if path exists
            if (status != null) {
                // path exists, check write mode
                switch (writeMode) {
                case NO_OVERWRITE:
                    if (status.isDir() && createDirectory) {
                        return true;
                    } else {
                        // file may not be overwritten
                        throw new IOException("File or directory " + outPath + " already exists. Existing files and directories " +
                                "are not overwritten in " + WriteMode.NO_OVERWRITE.name() + " mode. Use " +
                                WriteMode.OVERWRITE.name() + " mode to overwrite existing files and directories.");
                case OVERWRITE:
                    if (status.isDir()) {
                        if (createDirectory) {
                            // directory exists and does not need to be created
                            return true;
                        } else {
                            // we will write in a single file, delete directory
                            try {
                                delete(outPath, true);
                            catch (IOException e) {
                                throw new IOException("Could not remove existing directory '" + outPath +
                                        "' to allow overwrite by result file", e);
                    else {
                        // delete file
                        try {
                            delete(outPath, false);
                        catch (IOException e) {
                            throw new IOException("Could not remove existing file '" + outPath +
                                    "' to allow overwrite by result file/directory", e);
                    throw new IllegalArgumentException("Invalid write mode: " + writeMode);
            if (createDirectory) {
                // Output directory needs to be created
                if (!exists(outPath)) {
                // double check that the output directory exists
                try {
                    return getFileStatus(outPath).isDir();
                catch (FileNotFoundException e) {
                    return false;
            else {
                // check that the output path does not exist and an output file
                // can be created by the output format.
                return !exists(outPath);
        finally {
    public boolean initOutPathDistFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException {
        if (!isDistributedFS()) {
            return false;
        // NOTE: We actually need to lock here (process wide). Otherwise, multiple threads that
        // concurrently work in this method (multiple output formats writing locally) might end
        // up deleting each other's directories and leave non-retrievable files, without necessarily
        // causing an exception. That results in very subtle issues, like output files looking as if
        // they are not getting created.
        // we acquire the lock interruptibly here, to make sure that concurrent threads waiting
        // here can cancel faster
        try {
        catch (InterruptedException e) {
            // restore the interruption state
            // leave the method - we don't have the lock anyways
            throw new IOException("The thread was interrupted while trying to initialize the output directory");
        try {
            // check if path exists
            if (exists(outPath)) {
                // path exists, check write mode
                switch(writeMode) {
                case NO_OVERWRITE:
                    // file or directory may not be overwritten
                    throw new IOException("File or directory already exists. Existing files and directories are not overwritten in " +
                            WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() +
                                " mode to overwrite existing files and directories.");
                case OVERWRITE:
                    // output path exists. We delete it and all contained files in case of a directory.
                    try {
                        delete(outPath, true);
                    } catch (IOException e) {
                        // Some other thread might already have deleted the path.
                        // If - for some other reason - the path could not be deleted,
                        // this will be handled later.
                    throw new IllegalArgumentException("Invalid write mode: " + writeMode);
            if (createDirectory) {
                // Output directory needs to be created
                try {
                    if (!exists(outPath)) {
                } catch (IOException ioe) {
                    // Some other thread might already have created the directory.
                    // If - for some other reason - the directory could not be created
                    // and the path does not exist, this will be handled later.
                // double check that the output directory exists
                return exists(outPath) && getFileStatus(outPath).isDir();
            else {
                // single file case: check that the output path does not exist and
                // an output file can be created by the output format.
                return !exists(outPath);
        finally {
  • FileSystem是flink使用的文件系统的抽象基类,子类实现的可以是本地文件系统或者分布式文件系统
  • FileSystem定义了getWorkingDirectory、getHomeDirectory、getUri、getFileStatus、getFileBlockLocations、open、listStatus、delete、mkdirs、create、rename、isDistributedFS、getKind这几个抽象方法要求子类实现
  • FileSystem提供了initOutPathLocalFS、initOutPathDistFS这几个已经实现的实例方法以及initialize、getLocalFileSystem、get、getUnguardedFileSystem、getDefaultFsUri这几个静态方法



public class LocalFileSystem extends FileSystem {
    private static final Logger LOG = LoggerFactory.getLogger(LocalFileSystem.class);
    /** The URI representing the local file system. */
    private static final URI LOCAL_URI = OperatingSystem.isWindows() ? URI.create("file:/") : URI.create("file:///");
    /** The shared instance of the local file system. */
    private static final LocalFileSystem INSTANCE = new LocalFileSystem();
    /** Path pointing to the current working directory.
     * Because Paths are not immutable, we cannot cache the proper path here */
    private final URI workingDir;
    /** Path pointing to the current working directory.
     * Because Paths are not immutable, we cannot cache the proper path here. */
    private final URI homeDir;
    /** The host name of this machine. */
    private final String hostName;
     * Constructs a new <code>LocalFileSystem</code> object.
    public LocalFileSystem() {
        this.workingDir = new File(System.getProperty("user.dir")).toURI();
        this.homeDir = new File(System.getProperty("user.home")).toURI();
        String tmp = "unknownHost";
        try {
            tmp = InetAddress.getLocalHost().getHostName();
        } catch (UnknownHostException e) {
            LOG.error("Could not resolve local host", e);
        this.hostName = tmp;
    // ------------------------------------------------------------------------
    public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
        return new BlockLocation[] {
                new LocalBlockLocation(hostName, file.getLen())
    public FileStatus getFileStatus(Path f) throws IOException {
        final File path = pathToFile(f);
        if (path.exists()) {
            return new LocalFileStatus(path, this);
        else {
            throw new FileNotFoundException("File " + f + " does not exist or the user running "
                    + "Flink ('" + System.getProperty("user.name") + "') has insufficient permissions to access it.");
    public URI getUri() {
        return LOCAL_URI;
    public Path getWorkingDirectory() {
        return new Path(workingDir);
    public Path getHomeDirectory() {
        return new Path(homeDir);
    public FSDataInputStream open(final Path f, final int bufferSize) throws IOException {
        return open(f);
    public FSDataInputStream open(final Path f) throws IOException {
        final File file = pathToFile(f);
        return new LocalDataInputStream(file);
    public LocalRecoverableWriter createRecoverableWriter() throws IOException {
        return new LocalRecoverableWriter(this);
    public boolean exists(Path f) throws IOException {
        final File path = pathToFile(f);
        return path.exists();
    public FileStatus[] listStatus(final Path f) throws IOException {
        final File localf = pathToFile(f);
        FileStatus[] results;
        if (!localf.exists()) {
            return null;
        if (localf.isFile()) {
            return new FileStatus[] { new LocalFileStatus(localf, this) };
        final String[] names = localf.list();
        if (names == null) {
            return null;
        results = new FileStatus[names.length];
        for (int i = 0; i < names.length; i++) {
            results[i] = getFileStatus(new Path(f, names[i]));
        return results;
    public boolean delete(final Path f, final boolean recursive) throws IOException {
        final File file = pathToFile(f);
        if (file.isFile()) {
            return file.delete();
        } else if ((!recursive) && file.isDirectory()) {
            File[] containedFiles = file.listFiles();
            if (containedFiles == null) {
                throw new IOException("Directory " + file.toString() + " does not exist or an I/O error occurred");
            } else if (containedFiles.length != 0) {
                throw new IOException("Directory " + file.toString() + " is not empty");
        return delete(file);
     * Deletes the given file or directory.
     * @param f
     *        the file to be deleted
     * @return <code>true</code> if all files were deleted successfully, <code>false</code> otherwise
     * @throws IOException
     *         thrown if an error occurred while deleting the files/directories
    private boolean delete(final File f) throws IOException {
        if (f.isDirectory()) {
            final File[] files = f.listFiles();
            if (files != null) {
                for (File file : files) {
                    final boolean del = delete(file);
                    if (!del) {
                        return false;
        } else {
            return f.delete();
        // Now directory is empty
        return f.delete();
     * Recursively creates the directory specified by the provided path.
     * @return <code>true</code>if the directories either already existed or have been created successfully,
     *         <code>false</code> otherwise
     * @throws IOException
     *         thrown if an error occurred while creating the directory/directories
    public boolean mkdirs(final Path f) throws IOException {
        checkNotNull(f, "path is null");
        return mkdirsInternal(pathToFile(f));
    private boolean mkdirsInternal(File file) throws IOException {
        if (file.isDirectory()) {
                return true;
        else if (file.exists() && !file.isDirectory()) {
            // Important: The 'exists()' check above must come before the 'isDirectory()' check to
            //            be safe when multiple parallel instances try to create the directory
            // exists and is not a directory -> is a regular file
            throw new FileAlreadyExistsException(file.getAbsolutePath());
        else {
            File parent = file.getParentFile();
            return (parent == null || mkdirsInternal(parent)) && (file.mkdir() || file.isDirectory());
    public FSDataOutputStream create(final Path filePath, final WriteMode overwrite) throws IOException {
        checkNotNull(filePath, "filePath");
        if (exists(filePath) && overwrite == WriteMode.NO_OVERWRITE) {
            throw new FileAlreadyExistsException("File already exists: " + filePath);
        final Path parent = filePath.getParent();
        if (parent != null && !mkdirs(parent)) {
            throw new IOException("Mkdirs failed to create " + parent);
        final File file = pathToFile(filePath);
        return new LocalDataOutputStream(file);
    public boolean rename(final Path src, final Path dst) throws IOException {
        final File srcFile = pathToFile(src);
        final File dstFile = pathToFile(dst);
        final File dstParent = dstFile.getParentFile();
        // Files.move fails if the destination directory doesn't exist
        //noinspection ResultOfMethodCallIgnored -- we don't care if the directory existed or was created
        try {
            Files.move(srcFile.toPath(), dstFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
            return true;
        catch (NoSuchFileException | AccessDeniedException | DirectoryNotEmptyException | SecurityException ex) {
            // catch the errors that are regular "move failed" exceptions and return false
            return false;
    public boolean isDistributedFS() {
        return false;
    public FileSystemKind getKind() {
        return FileSystemKind.FILE_SYSTEM;
    // ------------------------------------------------------------------------
     * Converts the given Path to a File for this file system.
     * <p>If the path is not absolute, it is interpreted relative to this FileSystem's working directory.
    public File pathToFile(Path path) {
        if (!path.isAbsolute()) {
            path = new Path(getWorkingDirectory(), path);
        return new File(path.toUri().getPath());
    // ------------------------------------------------------------------------
     * Gets the URI that represents the local file system.
     * That URI is {@code "file:/"} on Windows platforms and {@code "file:///"} on other
     * UNIX family platforms.
     * @return The URI that represents the local file system.
    public static URI getLocalFsURI() {
        return LOCAL_URI;
     * Gets the shared instance of this file system.
     * @return The shared instance of this file system.
    public static LocalFileSystem getSharedInstance() {
        return INSTANCE;
  • LocalFileSystem继承了FileSystem,它使用的是本地文件系统来实现,其isDistributedFS方法返回的false;getKind方法返回的是FileSystemKind.FILE_SYSTEM



public class HadoopFileSystem extends FileSystem {
    /** The wrapped Hadoop File System. */
    private final org.apache.hadoop.fs.FileSystem fs;
    /* This field caches the file system kind. It is lazily set because the file system
    * URL is lazily initialized. */
    private FileSystemKind fsKind;
     * Wraps the given Hadoop File System object as a Flink File System object.
     * The given Hadoop file system object is expected to be initialized already.
     * @param hadoopFileSystem The Hadoop FileSystem that will be used under the hood.
    public HadoopFileSystem(org.apache.hadoop.fs.FileSystem hadoopFileSystem) {
        this.fs = checkNotNull(hadoopFileSystem, "hadoopFileSystem");
     * Gets the underlying Hadoop FileSystem.
     * @return The underlying Hadoop FileSystem.
    public org.apache.hadoop.fs.FileSystem getHadoopFileSystem() {
        return this.fs;
    // ------------------------------------------------------------------------
    //  file system methods
    // ------------------------------------------------------------------------
    public Path getWorkingDirectory() {
        return new Path(this.fs.getWorkingDirectory().toUri());
    public Path getHomeDirectory() {
        return new Path(this.fs.getHomeDirectory().toUri());
    public URI getUri() {
        return fs.getUri();
    public FileStatus getFileStatus(final Path f) throws IOException {
        org.apache.hadoop.fs.FileStatus status = this.fs.getFileStatus(toHadoopPath(f));
        return new HadoopFileStatus(status);
    public BlockLocation[] getFileBlockLocations(final FileStatus file, final long start, final long len)
            throws IOException {
        if (!(file instanceof HadoopFileStatus)) {
            throw new IOException("file is not an instance of DistributedFileStatus");
        final HadoopFileStatus f = (HadoopFileStatus) file;
        final org.apache.hadoop.fs.BlockLocation[] blkLocations = fs.getFileBlockLocations(f.getInternalFileStatus(),
            start, len);
        // Wrap up HDFS specific block location objects
        final HadoopBlockLocation[] distBlkLocations = new HadoopBlockLocation[blkLocations.length];
        for (int i = 0; i < distBlkLocations.length; i++) {
            distBlkLocations[i] = new HadoopBlockLocation(blkLocations[i]);
        return distBlkLocations;
    public HadoopDataInputStream open(final Path f, final int bufferSize) throws IOException {
        final org.apache.hadoop.fs.Path path = toHadoopPath(f);
        final org.apache.hadoop.fs.FSDataInputStream fdis = this.fs.open(path, bufferSize);
        return new HadoopDataInputStream(fdis);
    public HadoopDataInputStream open(final Path f) throws IOException {
        final org.apache.hadoop.fs.Path path = toHadoopPath(f);
        final org.apache.hadoop.fs.FSDataInputStream fdis = fs.open(path);
        return new HadoopDataInputStream(fdis);
    public HadoopDataOutputStream create(
            final Path f,
            final boolean overwrite,
            final int bufferSize,
            final short replication,
            final long blockSize) throws IOException {
        final org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create(
                toHadoopPath(f), overwrite, bufferSize, replication, blockSize);
        return new HadoopDataOutputStream(fdos);
    public HadoopDataOutputStream create(final Path f, final WriteMode overwrite) throws IOException {
        final org.apache.hadoop.fs.FSDataOutputStream fsDataOutputStream =
                this.fs.create(toHadoopPath(f), overwrite == WriteMode.OVERWRITE);
        return new HadoopDataOutputStream(fsDataOutputStream);
    public boolean delete(final Path f, final boolean recursive) throws IOException {
        return this.fs.delete(toHadoopPath(f), recursive);
    public boolean exists(Path f) throws IOException {
        return this.fs.exists(toHadoopPath(f));
    public FileStatus[] listStatus(final Path f) throws IOException {
        final org.apache.hadoop.fs.FileStatus[] hadoopFiles = this.fs.listStatus(toHadoopPath(f));
        final FileStatus[] files = new FileStatus[hadoopFiles.length];
        // Convert types
        for (int i = 0; i < files.length; i++) {
            files[i] = new HadoopFileStatus(hadoopFiles[i]);
        return files;
    public boolean mkdirs(final Path f) throws IOException {
        return this.fs.mkdirs(toHadoopPath(f));
    public boolean rename(final Path src, final Path dst) throws IOException {
        return this.fs.rename(toHadoopPath(src), toHadoopPath(dst));
    public long getDefaultBlockSize() {
        return this.fs.getDefaultBlockSize();
    public boolean isDistributedFS() {
        return true;
    public FileSystemKind getKind() {
        if (fsKind == null) {
            fsKind = getKindForScheme(this.fs.getUri().getScheme());
        return fsKind;
    public RecoverableWriter createRecoverableWriter() throws IOException {
        // This writer is only supported on a subset of file systems, and on
        // specific versions. We check these schemes and versions eagerly for better error
        // messages in the constructor of the writer.
        return new HadoopRecoverableWriter(fs);
    // ------------------------------------------------------------------------
    //  Utilities
    // ------------------------------------------------------------------------
    public static org.apache.hadoop.fs.Path toHadoopPath(Path path) {
        return new org.apache.hadoop.fs.Path(path.toUri());
     * Gets the kind of the file system from its scheme.
     * <p>Implementation note: Initially, especially within the Flink 1.3.x line
     * (in order to not break backwards compatibility), we must only label file systems
     * as 'inconsistent' or as 'not proper filesystems' if we are sure about it.
     * Otherwise, we cause regression for example in the performance and cleanup handling
     * of checkpoints.
     * For that reason, we initially mark some filesystems as 'eventually consistent' or
     * as 'object stores', and leave the others as 'consistent file systems'.
    static FileSystemKind getKindForScheme(String scheme) {
        scheme = scheme.toLowerCase(Locale.US);
        if (scheme.startsWith("s3") || scheme.startsWith("emr")) {
            // the Amazon S3 storage
            return FileSystemKind.OBJECT_STORE;
        else if (scheme.startsWith("http") || scheme.startsWith("ftp")) {
            // file servers instead of file systems
            // they might actually be consistent, but we have no hard guarantees
            // currently to rely on that
            return FileSystemKind.OBJECT_STORE;
        else {
            // the remainder should include hdfs, kosmos, ceph, ...
            // this also includes federated HDFS (viewfs).
            return FileSystemKind.FILE_SYSTEM;
  • HadoopFileSystem继承了FileSystem,它使用的是HDFS文件系统来实现,其isDistributedFS方法返回的true;getKind方法返回的是FileSystemKind.FILE_SYSTEM或者FileSystemKind.OBJECT_STORE;FlinkS3FileSystem及MapRFileSystem都继承至HadoopFileSystem


  • FileSystem是flink使用的文件系统的抽象基类,子类实现的可以是本地文件系统或者分布式文件系统;它定义了getWorkingDirectory、getHomeDirectory、getUri、getFileStatus、getFileBlockLocations、open、listStatus、delete、mkdirs、create、rename、isDistributedFS、getKind这几个抽象方法要求子类实现;提供了initOutPathLocalFS、initOutPathDistFS这几个已经实现的实例方法以及initialize、getLocalFileSystem、get、getUnguardedFileSystem、getDefaultFsUri这几个静态方法
  • LocalFileSystem继承了FileSystem,它使用的是本地文件系统来实现,其isDistributedFS方法返回的false;getKind方法返回的是FileSystemKind.FILE_SYSTEM
  • HadoopFileSystem继承了FileSystem,它使用的是HDFS文件系统来实现,其isDistributedFS方法返回的true;getKind方法返回的是FileSystemKind.FILE_SYSTEM或者FileSystemKind.OBJECT_STORE;FlinkS3FileSystem及MapRFileSystem都继承至HadoopFileSystem



如有侵权,请联系 cloudcommunity@tencent.com 删除。


如有侵权,请联系 cloudcommunity@tencent.com 删除。

0 条评论
  • FileSystem
  • LocalFileSystem
  • HadoopFileSystem
  • 小结
  • doc
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档